forked from s3lph/matemat
703 lines
32 KiB
Python
703 lines
32 KiB
Python
|
|
from typing import Any, Dict, List, Optional, Tuple, Type
|
|
|
|
import crypt
|
|
from hmac import compare_digest
|
|
from datetime import datetime
|
|
|
|
from matemat.db.primitives import User, Product, ReceiptPreference, Receipt,\
|
|
Transaction, Consumption, Deposit, Modification
|
|
from matemat.exceptions import AuthenticationError, DatabaseConsistencyError
|
|
from matemat.db import DatabaseWrapper
|
|
from matemat.db.wrapper import DatabaseTransaction
|
|
|
|
|
|
class MatematDatabase(object):
|
|
"""
|
|
This class provides a facade that abstracts every (TM) needed database access in high level functions.
|
|
|
|
Usage example (creating a user, changing the users touchkey, login as that user and delete the user:
|
|
|
|
with MatematDatabase('/srv/matemat/production.sqlite3') as db:
|
|
user: User = db.create_user('testuser', 'supersecurepassword')
|
|
db.change_touchkey(user, 'supersecurepassword', '048cdef')
|
|
user2: User = db.login('testuser', touchkey='048cdef')
|
|
db.delete_user(user2)
|
|
|
|
"""
|
|
|
|
def __init__(self, filename: str) -> None:
|
|
"""
|
|
Create a new database facade. This does not connect to the SQLite3 database yet. To actually open the
|
|
connection, use the Context Manager 'with' syntax (PEP 343), e.g.:
|
|
|
|
with MatematDatabase('path/to/database.sqlite3') as db:
|
|
db.foo()
|
|
|
|
:param filename: The SQLite3 database file to use.
|
|
"""
|
|
self.db: DatabaseWrapper = DatabaseWrapper(filename)
|
|
|
|
def __enter__(self) -> 'MatematDatabase':
|
|
# Pass context manager stuff through to the database wrapper
|
|
self.db.__enter__()
|
|
return self
|
|
|
|
def __exit__(self, exc_type: Type, exc_val: Any, exc_tb: Any) -> None:
|
|
# Pass context manager stuff through to the database wrapper
|
|
self.db.__exit__(exc_type, exc_val, exc_tb)
|
|
|
|
def transaction(self, exclusive: bool = True) -> DatabaseTransaction:
|
|
"""
|
|
Begin a new SQLite3 transaction (exclusive by default). You should never need to use the returned object (a
|
|
Transaction instance). It is provided in case there is a real need for it (e.g. for unit testing).
|
|
|
|
This function should be used with the Context Manager 'with' syntax (PEP 343), yielding a sqlite3.Cursor, e.g.:
|
|
|
|
with db.transaction():
|
|
db.foo()
|
|
db.bar()
|
|
|
|
:param exclusive: Whether to begin an exclusive transaction or not, defaults to True (exclusive).
|
|
:return: A a transaction object.
|
|
"""
|
|
return self.db.transaction(exclusive=exclusive)
|
|
|
|
def has_admin_users(self) -> bool:
|
|
"""
|
|
Check whether the instance has any admin users configured.
|
|
|
|
:return: True if there are admin users, false otherwise.
|
|
"""
|
|
with self.db.transaction(exclusive=False) as c:
|
|
c.execute('SELECT COUNT(user_id) FROM users WHERE is_admin = 1')
|
|
n, = c.fetchone()
|
|
return n > 0
|
|
|
|
def list_users(self, with_touchkey: bool = False) -> List[User]:
|
|
"""
|
|
Return a list of users in the database.
|
|
|
|
:param with_touchkey: If true, only lists those users that have a touchkey set. Defaults to false.
|
|
:return: A list of users.
|
|
"""
|
|
users: List[User] = []
|
|
with self.db.transaction(exclusive=False) as c:
|
|
for row in c.execute('''
|
|
SELECT user_id, username, email, is_admin, is_member, balance, receipt_pref
|
|
FROM users
|
|
WHERE touchkey IS NOT NULL OR NOT :must_have_touchkey
|
|
''', {
|
|
'must_have_touchkey': with_touchkey
|
|
}):
|
|
# Decompose each row and put the values into a User object
|
|
user_id, username, email, is_admin, is_member, balance, receipt_p = row
|
|
try:
|
|
receipt_pref: ReceiptPreference = ReceiptPreference(receipt_p)
|
|
except ValueError:
|
|
raise DatabaseConsistencyError(f'{receipt_p} is not a valid ReceiptPreference')
|
|
users.append(User(user_id, username, balance, email, is_admin, is_member, receipt_pref))
|
|
return users
|
|
|
|
def get_user(self, uid: int) -> User:
|
|
"""
|
|
Return a user identified by its user ID.
|
|
:param uid: The user's ID.
|
|
"""
|
|
with self.db.transaction(exclusive=False) as c:
|
|
# Fetch all values to construct the user
|
|
c.execute('''
|
|
SELECT user_id, username, email, is_admin, is_member, balance, receipt_pref
|
|
FROM users
|
|
WHERE user_id = ?
|
|
''',
|
|
[uid])
|
|
row = c.fetchone()
|
|
if row is None:
|
|
raise ValueError(f'No user with user ID {uid} exists.')
|
|
# Unpack the row and construct the user
|
|
user_id, username, email, is_admin, is_member, balance, receipt_p = row
|
|
try:
|
|
receipt_pref: ReceiptPreference = ReceiptPreference(receipt_p)
|
|
except ValueError:
|
|
raise DatabaseConsistencyError(f'{receipt_p} is not a valid ReceiptPreference')
|
|
return User(user_id, username, balance, email, is_admin, is_member, receipt_pref)
|
|
|
|
def create_user(self,
|
|
username: str,
|
|
password: str,
|
|
email: Optional[str] = None,
|
|
admin: bool = False,
|
|
member: bool = True) -> User:
|
|
"""
|
|
Create a new user.
|
|
:param username: The name of the new user.
|
|
:param password: The user's password.
|
|
:param email: The user's email address, defaults to None.
|
|
:param admin: Whether the user is an administrator, defaults to False.
|
|
:param member: Whether the user is a member, defaults to True.
|
|
:return: A User object representing the created user.
|
|
:raises ValueError: If a user with the same name already exists.
|
|
"""
|
|
# Hash the password.
|
|
pwhash: str = crypt.crypt(password, crypt.mksalt())
|
|
user_id: int = -1
|
|
with self.db.transaction() as c:
|
|
# Look up whether a user with the same name already exists.
|
|
c.execute('SELECT user_id FROM users WHERE username = ?', [username])
|
|
if c.fetchone() is not None:
|
|
raise ValueError(f'A user with the name \'{username}\' already exists.')
|
|
# Insert the user into the database.
|
|
c.execute('''
|
|
INSERT INTO users (username, email, password, balance, is_admin, is_member, lastchange, created)
|
|
VALUES (:username, :email, :pwhash, 0, :admin, :member, STRFTIME('%s', 'now'), STRFTIME('%s', 'now'))
|
|
''', {
|
|
'username': username,
|
|
'email': email,
|
|
'pwhash': pwhash,
|
|
'admin': admin,
|
|
'member': member
|
|
})
|
|
# Fetch the new user's rowid.
|
|
c.execute('SELECT last_insert_rowid()')
|
|
user_id = int(c.fetchone()[0])
|
|
return User(user_id, username, 0, email, admin, member)
|
|
|
|
def login(self, username: str, password: Optional[str] = None, touchkey: Optional[str] = None) -> User:
|
|
"""
|
|
Validate a user's password or touchkey, and return a User object on success. EITHER password OR touchkey must
|
|
be provided, the other one must be None.
|
|
:param username: The username to login with.
|
|
:param password: The user's password.
|
|
:param touchkey: The user's touchkey.
|
|
:return: A User object.
|
|
:raises ValueError: If none or both of password and touchkey are provided.
|
|
:raises AuthenticationError: If the user does not exist or the password or touchkey is wrong.
|
|
"""
|
|
if (password is None) == (touchkey is None):
|
|
raise ValueError('Exactly one of password and touchkey must be provided')
|
|
with self.db.transaction(exclusive=False) as c:
|
|
c.execute('''
|
|
SELECT user_id, username, email, password, touchkey, is_admin, is_member, balance, receipt_pref
|
|
FROM users
|
|
WHERE username = ?
|
|
''', [username])
|
|
row = c.fetchone()
|
|
if row is None:
|
|
raise AuthenticationError('User does not exist')
|
|
user_id, username, email, pwhash, tkhash, admin, member, balance, receipt_p = row
|
|
if password is not None and not compare_digest(crypt.crypt(password, pwhash), pwhash):
|
|
raise AuthenticationError('Password mismatch')
|
|
elif touchkey is not None \
|
|
and tkhash is not None \
|
|
and not compare_digest(crypt.crypt(touchkey, tkhash), tkhash):
|
|
raise AuthenticationError('Touchkey mismatch')
|
|
elif touchkey is not None and tkhash is None:
|
|
raise AuthenticationError('Touchkey not set')
|
|
try:
|
|
receipt_pref: ReceiptPreference = ReceiptPreference(receipt_p)
|
|
except ValueError:
|
|
raise DatabaseConsistencyError(f'{receipt_p} is not a valid ReceiptPreference')
|
|
return User(user_id, username, balance, email, admin, member, receipt_pref)
|
|
|
|
def change_password(self, user: User, oldpass: str, newpass: str, verify_password: bool = True) -> None:
|
|
"""
|
|
Change a user's password. Either the old password must be provided (for a password change by the user
|
|
him-/herself), or verify_password must be set to False (for a password change by an administrator).
|
|
:param user: User object representing the user to change the password for.
|
|
:param oldpass: The old password.
|
|
:param newpass: The new password.
|
|
:param verify_password: Whether to actually verify the old password, defaults to True.
|
|
:raises AuthenticationError: If the user does not exist or oldpass is wrong (if verify_password is True).
|
|
"""
|
|
with self.db.transaction() as c:
|
|
# Fetch the old password.
|
|
c.execute('''
|
|
SELECT password FROM users WHERE user_id = ?
|
|
''', [user.id])
|
|
row = c.fetchone()
|
|
if row is None:
|
|
raise AuthenticationError('User does not exist in database.')
|
|
# Verify the old password, if it should be verified.
|
|
if verify_password and not compare_digest(crypt.crypt(oldpass, row[0]), row[0]):
|
|
raise AuthenticationError('Old password does not match.')
|
|
# Hash the new password and write it to the database.
|
|
pwhash: str = crypt.crypt(newpass, crypt.mksalt())
|
|
c.execute('''
|
|
UPDATE users SET password = :pwhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
|
|
''', {
|
|
'user_id': user.id,
|
|
'pwhash': pwhash
|
|
})
|
|
|
|
def change_touchkey(self, user: User, password: str, touchkey: Optional[str], verify_password: bool = True) -> None:
|
|
"""
|
|
Change a user's touchkey. Either the old password must be provided (for a password change by the user
|
|
him-/herself), or verify_password must be set to False (for a password change by an administrator).
|
|
:param user: User object representing the user to change the password for.
|
|
:param password: The user's password.
|
|
:param touchkey: The new touchkey.
|
|
:param verify_password: Whether to actually verify the password, defaults to True.
|
|
:raises AuthenticationError: If the user does not exist or password is wrong (if verify_password is True).
|
|
"""
|
|
with self.db.transaction() as c:
|
|
# Fetch the password.
|
|
c.execute('''
|
|
SELECT password FROM users WHERE user_id = ?
|
|
''', [user.id])
|
|
row = c.fetchone()
|
|
if row is None:
|
|
raise AuthenticationError('User does not exist in database.')
|
|
# Verify the password, if it should be verified.
|
|
if verify_password and not compare_digest(crypt.crypt(password, row[0]), row[0]):
|
|
raise AuthenticationError('Password does not match.')
|
|
# Hash the new touchkey and write it to the database.
|
|
tkhash: Optional[str] = crypt.crypt(touchkey, crypt.mksalt()) if touchkey is not None else None
|
|
c.execute('''
|
|
UPDATE users SET touchkey = :tkhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
|
|
''', {
|
|
'user_id': user.id,
|
|
'tkhash': tkhash
|
|
})
|
|
|
|
def change_user(self, user: User, agent: Optional[User], **kwargs) -> None:
|
|
"""
|
|
Commit changes to the user in the database. If writing the requested changes succeeded, the values are updated
|
|
in the provided user object. Otherwise the user object is left untouched. The user to update is identified by
|
|
the ID field in the provided user object.
|
|
|
|
:param user: The user object to update and to identify the requested user by.
|
|
:param agent: The user that is performing the change. Must be present if the balance is changed.
|
|
:param kwargs: The properties to change.
|
|
:raises DatabaseConsistencyError: If the user represented by the object does not exist.
|
|
"""
|
|
# Resolve the values to change
|
|
name: str = kwargs['name'] if 'name' in kwargs else user.name
|
|
email: str = kwargs['email'] if 'email' in kwargs else user.email
|
|
is_admin: bool = kwargs['is_admin'] if 'is_admin' in kwargs else user.is_admin
|
|
is_member: bool = kwargs['is_member'] if 'is_member' in kwargs else user.is_member
|
|
balance: int = kwargs['balance'] if 'balance' in kwargs else user.balance
|
|
balance_reason: Optional[str] = kwargs['balance_reason'] if 'balance_reason' in kwargs else None
|
|
receipt_pref: ReceiptPreference = kwargs['receipt_pref'] if 'receipt_pref' in kwargs else user.receipt_pref
|
|
with self.db.transaction() as c:
|
|
c.execute('SELECT balance FROM users WHERE user_id = :user_id', {'user_id': user.id})
|
|
row = c.fetchone()
|
|
if row is None:
|
|
raise DatabaseConsistencyError(f'User with ID {user.id} does not exist')
|
|
oldbalance: int = row[0]
|
|
if balance != oldbalance:
|
|
if agent is None:
|
|
raise ValueError('agent must not be None for a balance change')
|
|
c.execute('''
|
|
INSERT INTO transactions (user_id, value, old_balance)
|
|
VALUES (:user_id, :value, :old_balance)
|
|
''', {
|
|
'user_id': user.id,
|
|
'value': balance - oldbalance,
|
|
'old_balance': oldbalance
|
|
})
|
|
c.execute('''
|
|
INSERT INTO modifications (ta_id, agent, reason)
|
|
VALUES (last_insert_rowid(), :agent, :reason)
|
|
''', {
|
|
'agent': agent.name,
|
|
'reason': balance_reason
|
|
})
|
|
c.execute('''
|
|
UPDATE users SET
|
|
username = :username,
|
|
email = :email,
|
|
balance = :balance,
|
|
is_admin = :is_admin,
|
|
is_member = :is_member,
|
|
receipt_pref = :receipt_pref,
|
|
lastchange = STRFTIME('%s', 'now')
|
|
WHERE user_id = :user_id
|
|
''', {
|
|
'user_id': user.id,
|
|
'username': name,
|
|
'email': email,
|
|
'balance': balance,
|
|
'is_admin': is_admin,
|
|
'is_member': is_member,
|
|
'receipt_pref': receipt_pref.value
|
|
})
|
|
# Only update the actual user object after the changes in the database succeeded
|
|
user.name = name
|
|
user.email = email
|
|
user.balance = balance
|
|
user.is_admin = is_admin
|
|
user.is_member = is_member
|
|
user.receipt_pref = receipt_pref
|
|
|
|
def delete_user(self, user: User) -> None:
|
|
"""
|
|
Delete the user represented by the User object.
|
|
:param user: The user to delete.
|
|
:raises DatabaseConsistencyError: If the user represented by the object does not exist.
|
|
"""
|
|
with self.db.transaction() as c:
|
|
c.execute('''
|
|
DELETE FROM users
|
|
WHERE user_id = ?
|
|
''', [user.id])
|
|
affected = c.execute('SELECT changes()').fetchone()[0]
|
|
if affected != 1:
|
|
raise DatabaseConsistencyError(
|
|
f'delete_user should affect 1 users row, but affected {affected}')
|
|
|
|
def list_products(self) -> List[Product]:
|
|
"""
|
|
Return a list of products in the database.
|
|
:return: A list of products.
|
|
"""
|
|
products: List[Product] = []
|
|
with self.db.transaction(exclusive=False) as c:
|
|
for row in c.execute('''
|
|
SELECT product_id, name, price_member, price_non_member, stock, stockable
|
|
FROM products
|
|
'''):
|
|
product_id, name, price_member, price_external, stock, stockable = row
|
|
products.append(Product(product_id, name, price_member, price_external, stockable, stock))
|
|
return products
|
|
|
|
def get_product(self, pid: int) -> Product:
|
|
"""
|
|
Return a product identified by its product ID.
|
|
:param pid: The products's ID.
|
|
"""
|
|
with self.db.transaction(exclusive=False) as c:
|
|
# Fetch all values to construct the product
|
|
c.execute('''
|
|
SELECT product_id, name, price_member, price_non_member, stock, stockable
|
|
FROM products
|
|
WHERE product_id = ?''',
|
|
[pid])
|
|
row = c.fetchone()
|
|
if row is None:
|
|
raise ValueError(f'No product with product ID {pid} exists.')
|
|
# Unpack the row and construct the product
|
|
product_id, name, price_member, price_non_member, stock, stockable = row
|
|
return Product(product_id, name, price_member, price_non_member, stockable, stock)
|
|
|
|
def create_product(self, name: str, price_member: int, price_non_member: int, stockable: bool) -> Product:
|
|
"""
|
|
Creates a new product.
|
|
:param name: Name of the product.
|
|
:param price_member: Price of the product for members.
|
|
:param price_non_member: Price of the product for non-members.
|
|
:param stockable: True if the product should be stockable, false otherwise.
|
|
:return: A Product object representing the created product.
|
|
:raises ValueError: If a product with the same name already exists.
|
|
"""
|
|
product_id: int = -1
|
|
with self.db.transaction() as c:
|
|
c.execute('SELECT product_id FROM products WHERE name = ?', [name])
|
|
if c.fetchone() is not None:
|
|
raise ValueError(f'A product with the name \'{name}\' already exists.')
|
|
c.execute('''
|
|
INSERT INTO products (name, price_member, price_non_member, stock, stockable)
|
|
VALUES (:name, :price_member, :price_non_member, 0, :stockable)
|
|
''', {
|
|
'name': name,
|
|
'price_member': price_member,
|
|
'price_non_member': price_non_member,
|
|
'stockable': stockable
|
|
})
|
|
c.execute('SELECT last_insert_rowid()')
|
|
product_id = int(c.fetchone()[0])
|
|
return Product(product_id, name, price_member, price_non_member, stockable, 0)
|
|
|
|
def change_product(self, product: Product, **kwargs) -> None:
|
|
"""
|
|
Commit changes to the product in the database. If writing the requested changes succeeded, the values are
|
|
updated in the provided product object. Otherwise the product object is left untouched. The product to update
|
|
is identified by the ID field in the provided product object.
|
|
|
|
:param product: The product object to update and to identify the requested product by.
|
|
:param kwargs: The properties to change.
|
|
:raises DatabaseConsistencyError: If the product represented by the object does not exist.
|
|
"""
|
|
# Resolve the values to change
|
|
name: str = kwargs['name'] if 'name' in kwargs else product.name
|
|
price_member: int = kwargs['price_member'] if 'price_member' in kwargs else product.price_member
|
|
price_non_member: int = kwargs['price_non_member'] if 'price_non_member' in kwargs else product.price_non_member
|
|
stock: int = kwargs['stock'] if 'stock' in kwargs else product.stock
|
|
stockable: bool = kwargs['stockable'] if 'stockable' in kwargs else product.stockable
|
|
with self.db.transaction() as c:
|
|
c.execute('''
|
|
UPDATE products
|
|
SET
|
|
name = :name,
|
|
price_member = :price_member,
|
|
price_non_member = :price_non_member,
|
|
stock = :stock,
|
|
stockable = :stockable
|
|
WHERE product_id = :product_id
|
|
''', {
|
|
'product_id': product.id,
|
|
'name': name,
|
|
'price_member': price_member,
|
|
'price_non_member': price_non_member,
|
|
'stock': stock,
|
|
'stockable': stockable
|
|
})
|
|
affected = c.execute('SELECT changes()').fetchone()[0]
|
|
if affected != 1:
|
|
raise DatabaseConsistencyError(
|
|
f'change_product should affect 1 products row, but affected {affected}')
|
|
# Only update the actual product object after the changes in the database succeeded
|
|
product.name = name
|
|
product.price_member = price_member
|
|
product.price_non_member = price_non_member
|
|
product.stock = stock
|
|
product.stockable = stockable
|
|
|
|
def delete_product(self, product: Product) -> None:
|
|
"""
|
|
Write changes in the Product object to the database.
|
|
:param product: The product object to update in the database.
|
|
:raises DatabaseConsistencyError: If the product represented by the object does not exist.
|
|
"""
|
|
with self.db.transaction() as c:
|
|
c.execute('''
|
|
DELETE FROM products
|
|
WHERE product_id = ?
|
|
''', [product.id])
|
|
affected = c.execute('SELECT changes()').fetchone()[0]
|
|
if affected != 1:
|
|
raise DatabaseConsistencyError(
|
|
f'delete_product should affect 1 products row, but affected {affected}')
|
|
|
|
def increment_consumption(self, user: User, product: Product) -> None:
|
|
"""
|
|
Decrement the user's balance by the price of the product and create an entry in the statistics table.
|
|
|
|
:param user: The user buying a product.
|
|
:param product: The product the user is buying.
|
|
:raises DatabaseConsistencyError: If the user or the product does not exist in the database.
|
|
"""
|
|
price: int = product.price_member if user.is_member else product.price_non_member
|
|
with self.db.transaction() as c:
|
|
c.execute('''
|
|
INSERT INTO transactions (user_id, value, old_balance)
|
|
VALUES (:user_id, :value, :old_balance)
|
|
''', {
|
|
'user_id': user.id,
|
|
'value': -price,
|
|
'old_balance': user.balance
|
|
})
|
|
c.execute('''
|
|
INSERT INTO consumptions (ta_id, product)
|
|
VALUES (last_insert_rowid(), :product)
|
|
''', {
|
|
'product': product.name
|
|
})
|
|
# Subtract the price from the user's account balance.
|
|
c.execute('''
|
|
UPDATE users
|
|
SET balance = balance - :cost
|
|
WHERE user_id = :user_id''', {
|
|
'user_id': user.id,
|
|
'cost': price
|
|
})
|
|
# Make sure exactly one user row was updated.
|
|
affected = c.execute('SELECT changes()').fetchone()[0]
|
|
if affected != 1:
|
|
raise DatabaseConsistencyError(
|
|
f'increment_consumption should affect 1 users row, but affected {affected}')
|
|
# Reflect the change in the user and product objects
|
|
user.balance -= price
|
|
|
|
def deposit(self, user: User, amount: int) -> None:
|
|
"""
|
|
Update the account balance of a user.
|
|
|
|
:param user: The user to update the account balance for.
|
|
:param amount: The amount to add to the account balance.
|
|
:raises DatabaseConsistencyError: If the user represented by the object does not exist.
|
|
"""
|
|
if amount < 0:
|
|
raise ValueError('Cannot deposit a negative value')
|
|
with self.db.transaction() as c:
|
|
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''',
|
|
[user.id])
|
|
row = c.fetchone()
|
|
if row is None:
|
|
raise DatabaseConsistencyError(f'No such user: {user.id}')
|
|
old_balance: int = row[0]
|
|
c.execute('''
|
|
INSERT INTO transactions (user_id, value, old_balance)
|
|
VALUES (:user_id, :value, :old_balance)
|
|
''', {
|
|
'user_id': user.id,
|
|
'value': amount,
|
|
'old_balance': old_balance
|
|
})
|
|
c.execute('''
|
|
INSERT INTO deposits (ta_id)
|
|
VALUES (last_insert_rowid())
|
|
''')
|
|
c.execute('''
|
|
UPDATE users
|
|
SET balance = balance + :amount
|
|
WHERE user_id = :user_id
|
|
''', {
|
|
'user_id': user.id,
|
|
'amount': amount
|
|
})
|
|
affected = c.execute('SELECT changes()').fetchone()[0]
|
|
if affected != 1:
|
|
raise DatabaseConsistencyError(f'deposit should affect 1 users row, but affected {affected}')
|
|
# Reflect the change in the user object
|
|
user.balance = old_balance + amount
|
|
|
|
def check_receipt_due(self, user: User) -> bool:
|
|
if not isinstance(user.receipt_pref, ReceiptPreference):
|
|
raise TypeError()
|
|
if user.receipt_pref == ReceiptPreference.NONE or user.email is None:
|
|
return False
|
|
with self.db.transaction() as c:
|
|
c.execute('''
|
|
SELECT COALESCE(MAX(r.date), u.created)
|
|
FROM users AS u
|
|
LEFT JOIN receipts AS r
|
|
ON r.user_id = u.user_id
|
|
WHERE u.user_id = :user_id
|
|
''', [user.id])
|
|
|
|
last_receipt: datetime = datetime.fromtimestamp(c.fetchone()[0])
|
|
next_receipt_due: datetime = user.receipt_pref.next_receipt_due(last_receipt)
|
|
return datetime.utcnow() > next_receipt_due
|
|
|
|
def create_receipt(self, user: User, write: bool = False) -> Receipt:
|
|
transactions: List[Transaction] = []
|
|
with self.db.transaction() as cursor:
|
|
cursor.execute('''
|
|
SELECT COALESCE(MAX(r.date), u.created), COALESCE(MAX(r.last_ta_id), 0)
|
|
FROM users AS u
|
|
LEFT JOIN receipts AS r
|
|
ON r.user_id = u.user_id
|
|
WHERE u.user_id = :user_id
|
|
''', [user.id])
|
|
row = cursor.fetchone()
|
|
if row is None:
|
|
raise DatabaseConsistencyError(f'No such user: {user.id}')
|
|
fromdate, min_id = row
|
|
created: datetime = datetime.fromtimestamp(fromdate)
|
|
cursor.execute('''
|
|
SELECT
|
|
t.ta_id, t.value, t.old_balance, COALESCE(t.date, 0),
|
|
c.ta_id, d.ta_id, m.ta_id, c.product, m.agent, m.reason
|
|
FROM transactions AS t
|
|
LEFT JOIN consumptions AS c
|
|
ON t.ta_id = c.ta_id
|
|
LEFT JOIN deposits AS d
|
|
ON t.ta_id = d.ta_id
|
|
LEFT JOIN modifications AS m
|
|
ON t.ta_id = m.ta_id
|
|
WHERE t.user_id = :user_id
|
|
AND t.ta_id > :min_id
|
|
ORDER BY t.date ASC
|
|
''', {
|
|
'user_id': user.id,
|
|
'min_id': min_id
|
|
})
|
|
rows = cursor.fetchall()
|
|
for row in rows:
|
|
ta_id, value, old_balance, date, c, d, m, c_prod, m_agent, m_reason = row
|
|
if c == ta_id:
|
|
t: Transaction = Consumption(ta_id, user, value, old_balance, datetime.fromtimestamp(date), c_prod)
|
|
elif d == ta_id:
|
|
t = Deposit(ta_id, user, value, old_balance, datetime.fromtimestamp(date))
|
|
elif m == ta_id:
|
|
t = Modification(ta_id, user, value, old_balance, datetime.fromtimestamp(date), m_agent, m_reason)
|
|
else:
|
|
t = Transaction(ta_id, user, value, old_balance, datetime.fromtimestamp(date))
|
|
transactions.append(t)
|
|
if write:
|
|
cursor.execute('''
|
|
INSERT INTO receipts (user_id, first_ta_id, last_ta_id)
|
|
VALUES (:user_id, :first_ta, :last_ta)
|
|
''', {
|
|
'user_id': user.id,
|
|
'first_ta': transactions[0].id if len(transactions) != 0 else None,
|
|
'last_ta': transactions[-1].id if len(transactions) != 0 else None
|
|
})
|
|
cursor.execute('''SELECT last_insert_rowid()''')
|
|
receipt_id: int = int(cursor.fetchone()[0])
|
|
else:
|
|
receipt_id = -1
|
|
receipt = Receipt(receipt_id, transactions, user, created, datetime.utcnow())
|
|
return receipt
|
|
|
|
def generate_sales_statistics(self, from_date: datetime, to_date: datetime) -> Dict[str, Any]:
|
|
consumptions: Dict[str, Tuple[int, int]] = dict()
|
|
total_income: int = 0
|
|
total_consumption: int = 0
|
|
total_balance: int = 0
|
|
positive_balance: int = 0
|
|
negative_balance: int = 0
|
|
total_deposits: int = 0
|
|
|
|
with self.db.transaction(exclusive=False) as c:
|
|
|
|
# Fetch sum of consumptions, grouped by products
|
|
c.execute('''
|
|
SELECT COALESCE(SUM(t.value), 0), COUNT(c.ta_id), c.product
|
|
FROM transactions AS t
|
|
JOIN consumptions AS c
|
|
ON t.ta_id = c.ta_id
|
|
WHERE t.date >= :from_date
|
|
AND t.date < :to_date
|
|
GROUP BY c.product''', {
|
|
'from_date': from_date.timestamp(),
|
|
'to_date': to_date.timestamp()
|
|
})
|
|
for value, count, product in c.fetchall():
|
|
consumptions[product] = value, count
|
|
total_income -= value
|
|
total_consumption += count
|
|
|
|
# Fetch sum of balances, grouped by users
|
|
c.execute('''
|
|
SELECT COALESCE(
|
|
(
|
|
SELECT t.old_balance
|
|
FROM transactions AS t
|
|
WHERE t.date >= :to_date
|
|
AND t.user_id = u.user_id
|
|
LIMIT 1
|
|
), u.balance)
|
|
FROM users AS u
|
|
''', [to_date.timestamp()])
|
|
for balance, in c.fetchall():
|
|
if balance > 0:
|
|
positive_balance += balance
|
|
else:
|
|
negative_balance += balance
|
|
total_balance += balance
|
|
|
|
# Fetch sum of deposits
|
|
c.execute('''
|
|
SELECT COALESCE(SUM(t.value), 0)
|
|
FROM transactions AS t
|
|
JOIN deposits AS d
|
|
ON t.ta_id = d.ta_id
|
|
WHERE t.date >= :from_date
|
|
AND t.date < :to_date''', {
|
|
'from_date': from_date.timestamp(),
|
|
'to_date': to_date.timestamp()
|
|
})
|
|
for deposit, in c.fetchall():
|
|
total_deposits += deposit
|
|
|
|
return {
|
|
'consumptions': consumptions,
|
|
'total_income': total_income,
|
|
'total_consumption': total_consumption,
|
|
'total_balance': total_balance,
|
|
'positive_balance': positive_balance,
|
|
'negative_balance': negative_balance,
|
|
'total_deposits': total_deposits,
|
|
}
|