matemat/matemat/db/facade.py

451 lines
20 KiB
Python

from typing import List, Optional, Any, Type
import crypt
from hmac import compare_digest
from matemat.primitives import User, Product
from matemat.exceptions import AuthenticationError, DatabaseConsistencyError
from matemat.db import DatabaseWrapper
# TODO: Change to METHOD_BLOWFISH when adopting Python 3.7
"""
The method to use for password hashing.
"""
_CRYPT_METHOD = crypt.METHOD_SHA512
class MatematDatabase(object):
"""
This class provides a facade that abstracts every (TM) needed database access in high level functions.
Usage example (creating a user, changing the users touchkey, login as that user and delete the user:
with MatematDatabase('/srv/matemat/production.sqlite3') as db:
user: User = db.create_user('testuser', 'supersecurepassword')
db.change_touchkey(user, 'supersecurepassword', '048cdef')
user2: User = db.login('testuser', touchkey='048cdef')
db.delete_user(user2)
"""
def __init__(self, filename: str) -> None:
"""
Create a new database facade. This does not connect to the SQLite3 database yet. To actually open the
connection, use the Context Manager 'with' syntax (PEP 343), e.g.:
with MatematDatabase('path/to/database.sqlite3') as db:
db.foo()
:param filename: The SQLite3 database file to use.
"""
self.db: DatabaseWrapper = DatabaseWrapper(filename)
def __enter__(self) -> 'MatematDatabase':
# Pass context manager stuff through to the database wrapper
self.db.__enter__()
return self
def __exit__(self, exc_type: Type, exc_val: Any, exc_tb: Any) -> None:
# Pass context manager stuff through to the database wrapper
self.db.__exit__(exc_type, exc_val, exc_tb)
def transaction(self, exclusive: bool = True) -> Any:
"""
Begin a new SQLite3 transaction (exclusive by default). You should never need to use the returned object (an
APSW cursor). It is provided in case there is a real need for it (e.g. for unit testing).
This function should be used with the Context Manager 'with' syntax (PEP 343), e.g.:
with db.transaction():
db.foo()
db.bar()
:param exclusive: Whether to begin an exclusive transaction or not, defaults to True (exclusive).
:return: An APSW cursor.
"""
return self.db.transaction(exclusive=exclusive)
def list_users(self) -> List[User]:
"""
Return a list of users in the database.
:return: A list of users.
"""
users: List[User] = []
with self.db.transaction(exclusive=False) as c:
for row in c.execute('''
SELECT user_id, username, email, is_admin, is_member
FROM users
'''):
# Decompose each row and put the values into a User object
user_id, username, email, is_admin, is_member = row
users.append(User(user_id, username, email, is_admin, is_member))
return users
def create_user(self,
username: str,
password: str,
email: Optional[str] = None,
admin: bool = False,
member: bool = True) -> User:
"""
Create a new user.
:param username: The name of the new user.
:param password: The user's password.
:param email: The user's email address, defaults to None.
:param admin: Whether the user is an administrator, defaults to False.
:param member: Whether the user is a member, defaults to True.
:return: A User object representing the created user.
:raises ValueError: If a user with the same name already exists.
"""
# Hash the password.
pwhash: str = crypt.crypt(password, crypt.mksalt(_CRYPT_METHOD))
user_id: int = -1
with self.db.transaction() as c:
# Look up whether a user with the same name already exists.
c.execute('SELECT user_id FROM users WHERE username = ?', [username])
if c.fetchone() is not None:
raise ValueError(f'A user with the name \'{username}\' already exists.')
# Insert the user into the database.
c.execute('''
INSERT INTO users (username, email, password, balance, is_admin, is_member, lastchange)
VALUES (:username, :email, :pwhash, 0, :admin, :member, STRFTIME('%s', 'now'))
''', {
'username': username,
'email': email,
'pwhash': pwhash,
'admin': admin,
'member': member
})
# Fetch the new user's rowid.
c.execute('SELECT last_insert_rowid()')
user_id = int(c.fetchone()[0])
return User(user_id, username, email, admin, member)
def login(self, username: str, password: Optional[str] = None, touchkey: Optional[str] = None) -> User:
"""
Validate a user's password or touchkey, and return a User object on success. EITHER password OR touchkey must
be provided, the other one must be None.
:param username: The username to login with.
:param password: The user's password.
:param touchkey: The user's touchkey.
:return: A User object.
:raises ValueError: If none or both of password and touchkey are provided.
:raises AuthenticationError: If the user does not exist or the password or touchkey is wrong.
"""
if (password is None) == (touchkey is None):
raise ValueError('Exactly one of password and touchkey must be provided')
with self.db.transaction(exclusive=False) as c:
c.execute('''
SELECT user_id, username, email, password, touchkey, is_admin, is_member
FROM users
WHERE username = ?
''', [username])
row = c.fetchone()
if row is None:
raise AuthenticationError('User does not exist')
user_id, username, email, pwhash, tkhash, admin, member = row
if password is not None and not compare_digest(crypt.crypt(password, pwhash), pwhash):
raise AuthenticationError('Password mismatch')
elif touchkey is not None \
and tkhash is not None \
and not compare_digest(crypt.crypt(touchkey, tkhash), tkhash):
raise AuthenticationError('Touchkey mismatch')
elif touchkey is not None and tkhash is None:
raise AuthenticationError('Touchkey not set')
return User(user_id, username, email, admin, member)
def change_password(self, user: User, oldpass: str, newpass: str, verify_password: bool = True) -> None:
"""
Change a user's password. Either the old password must be provided (for a password change by the user
him-/herself), or verify_password must be set to False (for a password change by an administrator).
:param user: User object representing the user to change the password for.
:param oldpass: The old password.
:param newpass: The new password.
:param verify_password: Whether to actually verify the old password, defaults to True.
:raises AuthenticationError: If the user does not exist or oldpass is wrong (if verify_password is True).
"""
with self.db.transaction() as c:
# Fetch the old password.
c.execute('''
SELECT password FROM users WHERE user_id = ?
''', [user.id])
row = c.fetchone()
if row is None:
raise AuthenticationError('User does not exist in database.')
# Verify the old password, if it should be verified.
if verify_password and not compare_digest(crypt.crypt(oldpass, row[0]), row[0]):
raise AuthenticationError('Old password does not match.')
# Hash the new password and write it to the database.
pwhash: str = crypt.crypt(newpass, crypt.mksalt(_CRYPT_METHOD))
c.execute('''
UPDATE users SET password = :pwhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
''', {
'user_id': user.id,
'pwhash': pwhash
})
def change_touchkey(self, user: User, password: str, touchkey: Optional[str], verify_password: bool = True) -> None:
"""
Change a user's touchkey. Either the old password must be provided (for a password change by the user
him-/herself), or verify_password must be set to False (for a password change by an administrator).
:param user: User object representing the user to change the password for.
:param password: The user's password.
:param touchkey: The new touchkey.
:param verify_password: Whether to actually verify the password, defaults to True.
:raises AuthenticationError: If the user does not exist or password is wrong (if verify_password is True).
"""
with self.db.transaction() as c:
# Fetch the password.
c.execute('''
SELECT password FROM users WHERE user_id = ?
''', [user.id])
row = c.fetchone()
if row is None:
raise AuthenticationError('User does not exist in database.')
# Verify the password, if it should be verified.
if verify_password and not compare_digest(crypt.crypt(password, row[0]), row[0]):
raise AuthenticationError('Password does not match.')
# Hash the new touchkey and write it to the database.
tkhash: str = crypt.crypt(touchkey, crypt.mksalt(_CRYPT_METHOD)) if touchkey is not None else None
c.execute('''
UPDATE users SET touchkey = :tkhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
''', {
'user_id': user.id,
'tkhash': tkhash
})
def change_user(self, user: User) -> None:
"""
Write changes in the User object to the database.
:param user: The user object to update in the database.
:raises DatabaseConsistencyError: If the user represented by the object does not exist.
"""
with self.db.transaction() as c:
c.execute('''
UPDATE users SET
email = :email,
is_admin = :is_admin,
is_member = :is_member,
lastchange = STRFTIME('%s', 'now')
WHERE user_id = :user_id
''', {
'user_id': user.id,
'email': user.email,
'is_admin': user.is_admin,
'is_member': user.is_member
})
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
f'change_user should affect 1 users row, but affected {affected}')
def delete_user(self, user: User) -> None:
"""
Delete the user represented by the User object.
:param user: The user to delete.
:raises DatabaseConsistencyError: If the user represented by the object does not exist.
"""
with self.db.transaction() as c:
c.execute('''
DELETE FROM users
WHERE user_id = ?
''', [user.id])
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
f'delete_user should affect 1 users row, but affected {affected}')
def list_products(self) -> List[Product]:
"""
Return a list of products in the database.
:return: A list of products.
"""
products: List[Product] = []
with self.db.transaction(exclusive=False) as c:
for row in c.execute('''
SELECT product_id, name, price_member, price_non_member
FROM products
'''):
product_id, name, price_member, price_external = row
products.append(Product(product_id, name, price_member, price_external))
return products
def create_product(self, name: str, price_member: int, price_non_member: int) -> Product:
"""
Creates a new product.
:param name: Name of the product.
:param price_member: Price of the product for members.
:param price_non_member: Price of the product for non-members.
:return: A Product object representing the created product.
:raises ValueError: If a product with the same name already exists.
"""
product_id: int = -1
with self.db.transaction() as c:
c.execute('SELECT product_id FROM products WHERE name = ?', [name])
if c.fetchone() is not None:
raise ValueError(f'A product with the name \'{name}\' already exists.')
c.execute('''
INSERT INTO products (name, price_member, price_non_member, stock)
VALUES (:name, :price_member, :price_non_member, 0)
''', {
'name': name,
'price_member': price_member,
'price_non_member': price_non_member
})
c.execute('SELECT last_insert_rowid()')
product_id = int(c.fetchone()[0])
return Product(product_id, name, price_member, price_non_member)
def change_product(self, product: Product) -> None:
with self.db.transaction() as c:
c.execute('''
UPDATE products
SET
name = :name,
price_member = :price_member,
price_non_member = :price_non_member
WHERE product_id = :product_id
''', {
'product_id': product.id,
'name': product.name,
'price_member': product.price_member,
'price_non_member': product.price_non_member
})
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
f'change_product should affect 1 products row, but affected {affected}')
def delete_product(self, product: Product) -> None:
"""
Write changes in the Product object to the database.
:param product: The product object to update in the database.
:raises DatabaseConsistencyError: If the product represented by the object does not exist.
"""
with self.db.transaction() as c:
c.execute('''
DELETE FROM products
WHERE product_id = ?
''', [product.id])
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
f'delete_product should affect 1 products row, but affected {affected}')
def increment_consumption(self, user: User, product: Product, count: int = 1) -> None:
"""
Decrement the user's balance by the price of the product, decrement the products stock, and create an entry in
the statistics table.
:param user: The user buying a product.
:param product: The product the user is buying.
:param count: How many units of the product the user is buying, defaults to 1.
:raises DatabaseConsistencyError: If the user or the product does not exist in the database.
"""
with self.db.transaction() as c:
# Retrieve the consumption entry for the (user, product) pair, if any.
c.execute('''
SELECT count
FROM consumption
WHERE user_id = :user_id
AND product_id = :product_id
''', {
'user_id': user.id,
'product_id': product.id
})
row = c.fetchone()
if row is None:
# If the entry does not exist, create a new one.
c.execute('''
INSERT INTO consumption (user_id, product_id, count)
VALUES (:user_id, :product_id, :count)
''', {
'user_id': user.id,
'product_id': product.id,
'count': count
})
else:
# If the entry exists, update the consumption count.
c.execute('''
UPDATE consumption
SET count = count + :count
WHERE user_id = :user_id AND product_id = :product_id
''', {
'user_id': user.id,
'product_id': product.id,
'count': count
})
# Make sure exactly one consumption row was updated/inserted.
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
f'increment_consumption should affect 1 consumption row, but affected {affected}')
# Compute the cost of the transaction and subtract it from the user's account balance.
c.execute('''
UPDATE users
SET balance = balance - :cost
WHERE user_id = :user_id''', {
'user_id': user.id,
'cost': count * product.price_member if user.is_member else count * product.price_non_member
})
# Make sure exactly one user row was updated.
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
f'increment_consumption should affect 1 users row, but affected {affected}')
# Subtract the number of purchased units from the product's stock.
c.execute('''
UPDATE products
SET stock = stock - :count
WHERE product_id = :product_id
''', {
'product_id': product.id,
'count': count
})
# Make sure exactly one product row was updated.
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
f'increment_consumption should affect 1 products row, but affected {affected}')
def restock(self, product: Product, count: int) -> None:
"""
Update the stock of a product.
:param product: The product to restock.
:param count: Number of units of the product to add.
:raises DatabaseConsistencyError: If the product represented by the object does not exist.
"""
with self.db.transaction() as c:
c.execute('''
UPDATE products
SET stock = stock + :count
WHERE product_id = :product_id
''', {
'product_id': product.id,
'count': count
})
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(f'restock should affect 1 products row, but affected {affected}')
def deposit(self, user: User, amount: int) -> None:
"""
Update the account balance of a user.
:param user: The user to update the account balance for.
:param amount: The amount to add to the account balance.
:raises DatabaseConsistencyError: If the user represented by the object does not exist.
"""
if amount < 0:
raise ValueError('Cannot deposit a negative value')
with self.db.transaction() as c:
c.execute('''
UPDATE users
SET balance = balance + :amount
WHERE user_id = :user_id
''', {
'user_id': user.id,
'amount': amount
})
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(f'deposit should affect 1 users row, but affected {affected}')