forked from s3lph/matemat
Added lots of documentation to the Database Facade.
This commit is contained in:
parent
8059765410
commit
0eb1161d7a
1 changed files with 124 additions and 0 deletions
|
@ -11,25 +11,54 @@ from matemat.db import DatabaseWrapper
|
||||||
class DatabaseFacade(object):
|
class DatabaseFacade(object):
|
||||||
|
|
||||||
def __init__(self, filename: str):
|
def __init__(self, filename: str):
|
||||||
|
"""
|
||||||
|
Create a new database facade. This does not connect to the SQLite3 database yet. To actually open the
|
||||||
|
connection, use the Context Manager 'with' syntax (PEP 343), e.g.:
|
||||||
|
|
||||||
|
with Database('path/to/database.sqlite3') as db:
|
||||||
|
db.foo()
|
||||||
|
|
||||||
|
:param filename: The SQLite3 database file to use.
|
||||||
|
"""
|
||||||
self.db: DatabaseWrapper = DatabaseWrapper(filename)
|
self.db: DatabaseWrapper = DatabaseWrapper(filename)
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
|
# Pass context manager stuff through to the database wrapper
|
||||||
self.db.__enter__()
|
self.db.__enter__()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
# Pass context manager stuff through to the database wrapper
|
||||||
self.db.__exit__(exc_type, exc_val, exc_tb)
|
self.db.__exit__(exc_type, exc_val, exc_tb)
|
||||||
|
|
||||||
def transaction(self, exclusive: bool = True):
|
def transaction(self, exclusive: bool = True):
|
||||||
|
"""
|
||||||
|
Begin a new SQLite3 transaction (exclusive by default). You should never need to use the returned object (an
|
||||||
|
APSW cursor). It is provided in case there is a real need for it (e.g. for unit testing).
|
||||||
|
|
||||||
|
This function should be used with the Context Manager 'with' syntax (PEP 343), e.g.:
|
||||||
|
|
||||||
|
with db.transaction():
|
||||||
|
db.foo()
|
||||||
|
db.bar()
|
||||||
|
|
||||||
|
:param exclusive: Whether to begin an exclusive transaction or not, defaults to True (exclusive).
|
||||||
|
:return: An APSW cursor.
|
||||||
|
"""
|
||||||
return self.db.transaction(exclusive=exclusive)
|
return self.db.transaction(exclusive=exclusive)
|
||||||
|
|
||||||
def list_users(self) -> List[User]:
|
def list_users(self) -> List[User]:
|
||||||
|
"""
|
||||||
|
Return a list of users in the database.
|
||||||
|
:return: A list of users.
|
||||||
|
"""
|
||||||
users: List[User] = []
|
users: List[User] = []
|
||||||
with self.db.transaction(exclusive=False) as c:
|
with self.db.transaction(exclusive=False) as c:
|
||||||
for row in c.execute('''
|
for row in c.execute('''
|
||||||
SELECT user_id, username, email, is_admin, is_member
|
SELECT user_id, username, email, is_admin, is_member
|
||||||
FROM users
|
FROM users
|
||||||
'''):
|
'''):
|
||||||
|
# Decompose each row and put the values into a User object
|
||||||
user_id, username, email, is_admin, is_member = row
|
user_id, username, email, is_admin, is_member = row
|
||||||
users.append(User(user_id, username, email, is_admin, is_member))
|
users.append(User(user_id, username, email, is_admin, is_member))
|
||||||
return users
|
return users
|
||||||
|
@ -40,12 +69,25 @@ class DatabaseFacade(object):
|
||||||
email: Optional[str] = None,
|
email: Optional[str] = None,
|
||||||
admin: bool = False,
|
admin: bool = False,
|
||||||
member: bool = True) -> User:
|
member: bool = True) -> User:
|
||||||
|
"""
|
||||||
|
Create a new user.
|
||||||
|
:param username: The name of the new user.
|
||||||
|
:param password: The user's password.
|
||||||
|
:param email: The user's email address, defaults to None.
|
||||||
|
:param admin: Whether the user is an administrator, defaults to False.
|
||||||
|
:param member: Whether the user is a member, defaults to True.
|
||||||
|
:return: A User object representing the created user.
|
||||||
|
:raises ValueError: If a user with the same name already exists.
|
||||||
|
"""
|
||||||
|
# Hash the password.
|
||||||
pwhash: str = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(12))
|
pwhash: str = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(12))
|
||||||
user_id: int = -1
|
user_id: int = -1
|
||||||
with self.db.transaction() as c:
|
with self.db.transaction() as c:
|
||||||
|
# Look up whether a user with the same name already exists.
|
||||||
c.execute('SELECT user_id FROM users WHERE username = ?', [username])
|
c.execute('SELECT user_id FROM users WHERE username = ?', [username])
|
||||||
if c.fetchone() is not None:
|
if c.fetchone() is not None:
|
||||||
raise ValueError(f'A user with the name \'{username}\' already exists.')
|
raise ValueError(f'A user with the name \'{username}\' already exists.')
|
||||||
|
# Insert the user into the database.
|
||||||
c.execute('''
|
c.execute('''
|
||||||
INSERT INTO users (username, email, password, balance, is_admin, is_member, lastchange)
|
INSERT INTO users (username, email, password, balance, is_admin, is_member, lastchange)
|
||||||
VALUES (:username, :email, :pwhash, 0, :admin, :member, STRFTIME('%s', 'now'))
|
VALUES (:username, :email, :pwhash, 0, :admin, :member, STRFTIME('%s', 'now'))
|
||||||
|
@ -56,11 +98,22 @@ class DatabaseFacade(object):
|
||||||
'admin': admin,
|
'admin': admin,
|
||||||
'member': member
|
'member': member
|
||||||
})
|
})
|
||||||
|
# Fetch the new user's rowid.
|
||||||
c.execute('SELECT last_insert_rowid()')
|
c.execute('SELECT last_insert_rowid()')
|
||||||
user_id = int(c.fetchone()[0])
|
user_id = int(c.fetchone()[0])
|
||||||
return User(user_id, username, email, admin, member)
|
return User(user_id, username, email, admin, member)
|
||||||
|
|
||||||
def login(self, username: str, password: Optional[str] = None, touchkey: Optional[str] = None) -> User:
|
def login(self, username: str, password: Optional[str] = None, touchkey: Optional[str] = None) -> User:
|
||||||
|
"""
|
||||||
|
Validate a user's password or touchkey, and return a User object on success. EITHER password OR touchkey must
|
||||||
|
be provided, the other one must be None.
|
||||||
|
:param username: The username to login with.
|
||||||
|
:param password: The user's password.
|
||||||
|
:param touchkey: The user's touchkey.
|
||||||
|
:return: A User object.
|
||||||
|
:raises ValueError: If none or both of password and touchkey are provided.
|
||||||
|
:raises AuthenticationError: If the user does not exist or the password or touchkey is wrong.
|
||||||
|
"""
|
||||||
if (password is None) == (touchkey is None):
|
if (password is None) == (touchkey is None):
|
||||||
raise ValueError('Exactly one of password and touchkey must be provided')
|
raise ValueError('Exactly one of password and touchkey must be provided')
|
||||||
with self.db.transaction(exclusive=False) as c:
|
with self.db.transaction(exclusive=False) as c:
|
||||||
|
@ -80,15 +133,27 @@ class DatabaseFacade(object):
|
||||||
return User(user_id, username, email, admin, member)
|
return User(user_id, username, email, admin, member)
|
||||||
|
|
||||||
def change_password(self, user: User, oldpass: str, newpass: str, verify_password: bool = True):
|
def change_password(self, user: User, oldpass: str, newpass: str, verify_password: bool = True):
|
||||||
|
"""
|
||||||
|
Change a user's password. Either the old password must be provided (for a password change by the user
|
||||||
|
him-/herself), or verify_password must be set to False (for a password change by an administrator).
|
||||||
|
:param user: User object representing the user to change the password for.
|
||||||
|
:param oldpass: The old password.
|
||||||
|
:param newpass: The new password.
|
||||||
|
:param verify_password: Whether to actually verify the old password, defaults to True.
|
||||||
|
:raises AuthenticationError: If the user does not exist or oldpass is wrong (if verify_password is True).
|
||||||
|
"""
|
||||||
with self.db.transaction() as c:
|
with self.db.transaction() as c:
|
||||||
|
# Fetch the old password.
|
||||||
c.execute('''
|
c.execute('''
|
||||||
SELECT password FROM users WHERE user_id = ?
|
SELECT password FROM users WHERE user_id = ?
|
||||||
''', [user.id])
|
''', [user.id])
|
||||||
row = c.fetchone()
|
row = c.fetchone()
|
||||||
if row is None:
|
if row is None:
|
||||||
raise AuthenticationError('User does not exist in database.')
|
raise AuthenticationError('User does not exist in database.')
|
||||||
|
# Verify the old password, if it should be verified.
|
||||||
if verify_password and not bcrypt.checkpw(oldpass.encode('utf-8'), row[0]):
|
if verify_password and not bcrypt.checkpw(oldpass.encode('utf-8'), row[0]):
|
||||||
raise AuthenticationError('Old password does not match.')
|
raise AuthenticationError('Old password does not match.')
|
||||||
|
# Hash the new password and write it to the database.
|
||||||
pwhash: str = bcrypt.hashpw(newpass.encode('utf-8'), bcrypt.gensalt(12))
|
pwhash: str = bcrypt.hashpw(newpass.encode('utf-8'), bcrypt.gensalt(12))
|
||||||
c.execute('''
|
c.execute('''
|
||||||
UPDATE users SET password = :pwhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
|
UPDATE users SET password = :pwhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
|
||||||
|
@ -98,15 +163,27 @@ class DatabaseFacade(object):
|
||||||
})
|
})
|
||||||
|
|
||||||
def change_touchkey(self, user: User, password: str, touchkey: Optional[str], verify_password: bool = True):
|
def change_touchkey(self, user: User, password: str, touchkey: Optional[str], verify_password: bool = True):
|
||||||
|
"""
|
||||||
|
Change a user's touchkey. Either the old password must be provided (for a password change by the user
|
||||||
|
him-/herself), or verify_password must be set to False (for a password change by an administrator).
|
||||||
|
:param user: User object representing the user to change the password for.
|
||||||
|
:param password: The user's password.
|
||||||
|
:param touchkey: The new touchkey.
|
||||||
|
:param verify_password: Whether to actually verify the password, defaults to True.
|
||||||
|
:raises AuthenticationError: If the user does not exist or password is wrong (if verify_password is True).
|
||||||
|
"""
|
||||||
with self.db.transaction() as c:
|
with self.db.transaction() as c:
|
||||||
|
# Fetch the password.
|
||||||
c.execute('''
|
c.execute('''
|
||||||
SELECT password FROM users WHERE user_id = ?
|
SELECT password FROM users WHERE user_id = ?
|
||||||
''', [user.id])
|
''', [user.id])
|
||||||
row = c.fetchone()
|
row = c.fetchone()
|
||||||
if row is None:
|
if row is None:
|
||||||
raise AuthenticationError('User does not exist in database.')
|
raise AuthenticationError('User does not exist in database.')
|
||||||
|
# Verify the password, if it should be verified.
|
||||||
if verify_password and not bcrypt.checkpw(password.encode('utf-8'), row[0]):
|
if verify_password and not bcrypt.checkpw(password.encode('utf-8'), row[0]):
|
||||||
raise AuthenticationError('Password does not match.')
|
raise AuthenticationError('Password does not match.')
|
||||||
|
# Hash the new touchkey and write it to the database.
|
||||||
tkhash: str = bcrypt.hashpw(touchkey.encode('utf-8'), bcrypt.gensalt(12)) if touchkey is not None else None
|
tkhash: str = bcrypt.hashpw(touchkey.encode('utf-8'), bcrypt.gensalt(12)) if touchkey is not None else None
|
||||||
c.execute('''
|
c.execute('''
|
||||||
UPDATE users SET touchkey = :tkhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
|
UPDATE users SET touchkey = :tkhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
|
||||||
|
@ -116,6 +193,11 @@ class DatabaseFacade(object):
|
||||||
})
|
})
|
||||||
|
|
||||||
def change_user(self, user: User):
|
def change_user(self, user: User):
|
||||||
|
"""
|
||||||
|
Write changes in the User object to the database.
|
||||||
|
:param user: The user object to update in the database.
|
||||||
|
:raises DatabaseConsistencyError: If the user represented by the object does not exist.
|
||||||
|
"""
|
||||||
with self.db.transaction() as c:
|
with self.db.transaction() as c:
|
||||||
c.execute('''
|
c.execute('''
|
||||||
UPDATE users SET
|
UPDATE users SET
|
||||||
|
@ -136,6 +218,11 @@ class DatabaseFacade(object):
|
||||||
f'change_user should affect 1 users row, but affected {affected}')
|
f'change_user should affect 1 users row, but affected {affected}')
|
||||||
|
|
||||||
def delete_user(self, user: User):
|
def delete_user(self, user: User):
|
||||||
|
"""
|
||||||
|
Delete the user represented by the User object.
|
||||||
|
:param user: The user to delete.
|
||||||
|
:raises DatabaseConsistencyError: If the user represented by the object does not exist.
|
||||||
|
"""
|
||||||
with self.db.transaction() as c:
|
with self.db.transaction() as c:
|
||||||
c.execute('''
|
c.execute('''
|
||||||
DELETE FROM users
|
DELETE FROM users
|
||||||
|
@ -147,6 +234,10 @@ class DatabaseFacade(object):
|
||||||
f'delete_user should affect 1 users row, but affected {affected}')
|
f'delete_user should affect 1 users row, but affected {affected}')
|
||||||
|
|
||||||
def list_products(self) -> List[Product]:
|
def list_products(self) -> List[Product]:
|
||||||
|
"""
|
||||||
|
Return a list of products in the database.
|
||||||
|
:return: A list of products.
|
||||||
|
"""
|
||||||
products: List[Product] = []
|
products: List[Product] = []
|
||||||
with self.db.transaction(exclusive=False) as c:
|
with self.db.transaction(exclusive=False) as c:
|
||||||
for row in c.execute('''
|
for row in c.execute('''
|
||||||
|
@ -158,6 +249,14 @@ class DatabaseFacade(object):
|
||||||
return products
|
return products
|
||||||
|
|
||||||
def create_product(self, name: str, price_member: int, price_non_member: int) -> Product:
|
def create_product(self, name: str, price_member: int, price_non_member: int) -> Product:
|
||||||
|
"""
|
||||||
|
Creates a new product.
|
||||||
|
:param name: Name of the product.
|
||||||
|
:param price_member: Price of the product for members.
|
||||||
|
:param price_non_member: Price of the product for non-members.
|
||||||
|
:return: A Product object representing the created product.
|
||||||
|
:raises ValueError: If a product with the same name already exists.
|
||||||
|
"""
|
||||||
product_id: int = -1
|
product_id: int = -1
|
||||||
with self.db.transaction() as c:
|
with self.db.transaction() as c:
|
||||||
c.execute('SELECT product_id FROM products WHERE name = ?', [name])
|
c.execute('SELECT product_id FROM products WHERE name = ?', [name])
|
||||||
|
@ -196,6 +295,11 @@ class DatabaseFacade(object):
|
||||||
f'change_product should affect 1 products row, but affected {affected}')
|
f'change_product should affect 1 products row, but affected {affected}')
|
||||||
|
|
||||||
def delete_product(self, product: Product):
|
def delete_product(self, product: Product):
|
||||||
|
"""
|
||||||
|
Write changes in the Product object to the database.
|
||||||
|
:param product: The product object to update in the database.
|
||||||
|
:raises DatabaseConsistencyError: If the product represented by the object does not exist.
|
||||||
|
"""
|
||||||
with self.db.transaction() as c:
|
with self.db.transaction() as c:
|
||||||
c.execute('''
|
c.execute('''
|
||||||
DELETE FROM products
|
DELETE FROM products
|
||||||
|
@ -207,6 +311,14 @@ class DatabaseFacade(object):
|
||||||
f'delete_product should affect 1 products row, but affected {affected}')
|
f'delete_product should affect 1 products row, but affected {affected}')
|
||||||
|
|
||||||
def increment_consumption(self, user: User, product: Product, count: int = 1):
|
def increment_consumption(self, user: User, product: Product, count: int = 1):
|
||||||
|
"""
|
||||||
|
Decrement the user's balance by the price of the product, decrement the products stock, and create an entry in
|
||||||
|
the statistics table.
|
||||||
|
:param user: The user buying a product.
|
||||||
|
:param product: The product the user is buying.
|
||||||
|
:param count: How many instances of the product the user is buying, defaults to 1.
|
||||||
|
:raises DatabaseConsistencyError: If the user or the product does not exist in the database.
|
||||||
|
"""
|
||||||
with self.db.transaction() as c:
|
with self.db.transaction() as c:
|
||||||
c.execute('''
|
c.execute('''
|
||||||
SELECT count
|
SELECT count
|
||||||
|
@ -266,6 +378,12 @@ class DatabaseFacade(object):
|
||||||
f'increment_consumption should affect 1 products row, but affected {affected}')
|
f'increment_consumption should affect 1 products row, but affected {affected}')
|
||||||
|
|
||||||
def restock(self, product: Product, count: int):
|
def restock(self, product: Product, count: int):
|
||||||
|
"""
|
||||||
|
Update the stock of a product.
|
||||||
|
:param product: The product to restock.
|
||||||
|
:param count: Number of instances of the product to add.
|
||||||
|
:raises DatabaseConsistencyError: If the product represented by the object does not exist.
|
||||||
|
"""
|
||||||
with self.db.transaction() as c:
|
with self.db.transaction() as c:
|
||||||
c.execute('''
|
c.execute('''
|
||||||
UPDATE products
|
UPDATE products
|
||||||
|
@ -280,6 +398,12 @@ class DatabaseFacade(object):
|
||||||
raise DatabaseConsistencyError(f'restock should affect 1 products row, but affected {affected}')
|
raise DatabaseConsistencyError(f'restock should affect 1 products row, but affected {affected}')
|
||||||
|
|
||||||
def deposit(self, user: User, amount: int):
|
def deposit(self, user: User, amount: int):
|
||||||
|
"""
|
||||||
|
Update the account balance of a user.
|
||||||
|
:param user: The user to update the account balance for.
|
||||||
|
:param amount: The amount to add to the account balance.
|
||||||
|
:raises DatabaseConsistencyError: If the user represented by the object does not exist.
|
||||||
|
"""
|
||||||
if amount < 0:
|
if amount < 0:
|
||||||
raise ValueError('Cannot deposit a negative value')
|
raise ValueError('Cannot deposit a negative value')
|
||||||
with self.db.transaction() as c:
|
with self.db.transaction() as c:
|
||||||
|
|
Loading…
Reference in a new issue