Implemented receipt generation, including unit tests.
This commit is contained in:
parent
a3fa86fb25
commit
2c6996a9b4
7 changed files with 377 additions and 17 deletions
|
@ -4,11 +4,13 @@ from typing import List, Optional, Any, Type
|
|||
|
||||
import crypt
|
||||
from hmac import compare_digest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from matemat.db.primitives import User, Product, ReceiptPreference
|
||||
from matemat.db.primitives import User, Product, ReceiptPreference, Receipt,\
|
||||
Transaction, Consumption, Deposit, Modification
|
||||
from matemat.exceptions import AuthenticationError, DatabaseConsistencyError
|
||||
from matemat.db import DatabaseWrapper
|
||||
from matemat.db.wrapper import Transaction
|
||||
from matemat.db.wrapper import DatabaseTransaction
|
||||
|
||||
|
||||
class MatematDatabase(object):
|
||||
|
@ -46,7 +48,7 @@ class MatematDatabase(object):
|
|||
# Pass context manager stuff through to the database wrapper
|
||||
self.db.__exit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
def transaction(self, exclusive: bool = True) -> Transaction:
|
||||
def transaction(self, exclusive: bool = True) -> DatabaseTransaction:
|
||||
"""
|
||||
Begin a new SQLite3 transaction (exclusive by default). You should never need to use the returned object (a
|
||||
Transaction instance). It is provided in case there is a real need for it (e.g. for unit testing).
|
||||
|
@ -251,8 +253,7 @@ class MatematDatabase(object):
|
|||
'tkhash': tkhash
|
||||
})
|
||||
|
||||
def change_user(self, user: User, agent: Optional[User], **kwargs)\
|
||||
-> None:
|
||||
def change_user(self, user: User, agent: Optional[User], **kwargs) -> None:
|
||||
"""
|
||||
Commit changes to the user in the database. If writing the requested changes succeeded, the values are updated
|
||||
in the provided user object. Otherwise the user object is left untouched. The user to update is identified by
|
||||
|
@ -506,6 +507,9 @@ class MatematDatabase(object):
|
|||
if affected != 1:
|
||||
raise DatabaseConsistencyError(
|
||||
f'increment_consumption should affect 1 products row, but affected {affected}')
|
||||
# Reflect the change in the user and product objects
|
||||
user.balance -= price
|
||||
product.stock -= 1
|
||||
|
||||
def restock(self, product: Product, count: int) -> None:
|
||||
"""
|
||||
|
@ -526,6 +530,8 @@ class MatematDatabase(object):
|
|||
affected = c.execute('SELECT changes()').fetchone()[0]
|
||||
if affected != 1:
|
||||
raise DatabaseConsistencyError(f'restock should affect 1 products row, but affected {affected}')
|
||||
# Reflect the change in the product object
|
||||
product.stock += count
|
||||
|
||||
def deposit(self, user: User, amount: int) -> None:
|
||||
"""
|
||||
|
@ -538,17 +544,19 @@ class MatematDatabase(object):
|
|||
if amount < 0:
|
||||
raise ValueError('Cannot deposit a negative value')
|
||||
with self.db.transaction() as c:
|
||||
c.execute('''SELECT username FROM users WHERE user_id = ?''',
|
||||
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''',
|
||||
[user.id])
|
||||
if c.fetchone() is None:
|
||||
row = c.fetchone()
|
||||
if row is None:
|
||||
raise DatabaseConsistencyError(f'No such user: {user.id}')
|
||||
old_balance: int = row[0]
|
||||
c.execute('''
|
||||
INSERT INTO transactions (user_id, value, old_balance)
|
||||
VALUES (:user_id, :value, :old_balance)
|
||||
''', {
|
||||
'user_id': user.id,
|
||||
'value': amount,
|
||||
'old_balance': user.balance
|
||||
'old_balance': old_balance
|
||||
})
|
||||
c.execute('''
|
||||
INSERT INTO deposits (ta_id)
|
||||
|
@ -565,3 +573,84 @@ class MatematDatabase(object):
|
|||
affected = c.execute('SELECT changes()').fetchone()[0]
|
||||
if affected != 1:
|
||||
raise DatabaseConsistencyError(f'deposit should affect 1 users row, but affected {affected}')
|
||||
# Reflect the change in the user object
|
||||
user.balance = old_balance + amount
|
||||
|
||||
def check_receipt_due(self, user: User) -> bool:
|
||||
if user.receipt_pref == ReceiptPreference.NONE or user.email is None:
|
||||
return False
|
||||
with self.db.transaction() as c:
|
||||
c.execute('''
|
||||
SELECT COALESCE(MAX(r.date), u.created)
|
||||
FROM users AS u
|
||||
LEFT JOIN receipts AS r
|
||||
ON r.user_id = u.user_id
|
||||
WHERE u.user_id = :user_id
|
||||
''', [user.id])
|
||||
last_receipt: int = c.fetchone()[0]
|
||||
if user.receipt_pref == ReceiptPreference.MONTHLY:
|
||||
date_diff: int = timedelta(days=31).total_seconds()
|
||||
elif user.receipt_pref == ReceiptPreference.YEARLY:
|
||||
date_diff = timedelta(days=365).total_seconds()
|
||||
else:
|
||||
raise ValueError()
|
||||
return datetime.utcnow().timestamp() > last_receipt + date_diff
|
||||
|
||||
def create_receipt(self, user: User, write: bool = False) -> Receipt:
|
||||
transactions: List[Transaction] = []
|
||||
with self.db.transaction() as cursor:
|
||||
cursor.execute('''
|
||||
SELECT COALESCE(MAX(r.date), u.created), COALESCE(MAX(r.last_ta_id), 0)
|
||||
FROM users AS u
|
||||
LEFT JOIN receipts AS r
|
||||
ON r.user_id = u.user_id
|
||||
WHERE u.user_id = :user_id
|
||||
''', [user.id])
|
||||
row = cursor.fetchone()
|
||||
if row is None:
|
||||
raise DatabaseConsistencyError(f'No such user: {user.id}')
|
||||
fromdate, min_id = row
|
||||
created: datetime = datetime.fromtimestamp(fromdate)
|
||||
cursor.execute('''
|
||||
SELECT t.ta_id, t.value, t.old_balance, t.date, c.ta_id, d.ta_id, m.ta_id, c.product, m.agent, m.reason
|
||||
FROM transactions AS t
|
||||
LEFT JOIN consumptions AS c
|
||||
ON t.ta_id = c.ta_id
|
||||
LEFT JOIN deposits AS d
|
||||
ON t.ta_id = d.ta_id
|
||||
LEFT JOIN modifications AS m
|
||||
ON t.ta_id = m.ta_id
|
||||
WHERE t.user_id = :user_id
|
||||
AND t.ta_id > :min_id
|
||||
ORDER BY t.date ASC
|
||||
''', {
|
||||
'user_id': user.id,
|
||||
'min_id': min_id
|
||||
})
|
||||
rows = cursor.fetchall()
|
||||
for row in rows:
|
||||
ta_id, value, old_balance, date, c, d, m, c_prod, m_agent, m_reason = row
|
||||
if c == ta_id:
|
||||
t: Transaction = Consumption(ta_id, user, value, old_balance, datetime.fromtimestamp(date), c_prod)
|
||||
elif d == ta_id:
|
||||
t = Deposit(ta_id, user, value, old_balance, datetime.fromtimestamp(date))
|
||||
elif m == ta_id:
|
||||
t = Modification(ta_id, user, value, old_balance, datetime.fromtimestamp(date), m_agent, m_reason)
|
||||
else:
|
||||
t = Transaction(ta_id, user, value, old_balance, datetime.fromtimestamp(date))
|
||||
transactions.append(t)
|
||||
if write:
|
||||
cursor.execute('''
|
||||
INSERT INTO receipts (user_id, first_ta_id, last_ta_id)
|
||||
VALUES (:user_id, :first_ta, :last_ta)
|
||||
''', {
|
||||
'user_id': user.id,
|
||||
'first_ta': transactions[0].id,
|
||||
'last_ta': transactions[-1].id
|
||||
})
|
||||
cursor.execute('''SELECT last_insert_rowid()''')
|
||||
receipt_id: int = int(cursor.fetchone()[0])
|
||||
else:
|
||||
receipt_id = -1
|
||||
receipt = Receipt(receipt_id, transactions, user, created, datetime.utcnow())
|
||||
return receipt
|
||||
|
|
17
matemat/db/primitives/Receipt.py
Normal file
17
matemat/db/primitives/Receipt.py
Normal file
|
@ -0,0 +1,17 @@
|
|||
|
||||
from typing import List
|
||||
from dataclasses import dataclass
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from matemat.db.primitives import User, Transaction
|
||||
|
||||
|
||||
@dataclass
|
||||
class Receipt:
|
||||
|
||||
id: int
|
||||
transactions: List[Transaction]
|
||||
user: User
|
||||
from_date: datetime
|
||||
to_date: datetime
|
|
@ -26,11 +26,6 @@ class ReceiptPreference(Enum):
|
|||
"""
|
||||
NONE = 0, 'No receipts'
|
||||
|
||||
"""
|
||||
A receipt should be generated for each transaction.
|
||||
"""
|
||||
EACH = 1, 'Individual receipt per transaction'
|
||||
|
||||
"""
|
||||
A receipt should be generated once a month.
|
||||
"""
|
||||
|
|
72
matemat/db/primitives/Transaction.py
Normal file
72
matemat/db/primitives/Transaction.py
Normal file
|
@ -0,0 +1,72 @@
|
|||
|
||||
from typing import Optional
|
||||
from dataclasses import dataclass
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from matemat.db.primitives import User
|
||||
from matemat.util.currency_format import format_chf
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Transaction:
|
||||
|
||||
id: int
|
||||
user: User
|
||||
value: int
|
||||
old_balance: int
|
||||
date: datetime
|
||||
|
||||
@property
|
||||
def receipt_date(self) -> str:
|
||||
date: str = self.date.strftime('%d.%m.%Y, %H:%M')
|
||||
return date
|
||||
|
||||
@property
|
||||
def receipt_value(self) -> str:
|
||||
value: str = format_chf(self.value, with_currencysign=False, plus_sign=True).rjust(8)
|
||||
return value
|
||||
|
||||
@property
|
||||
def receipt_description(self) -> str:
|
||||
return 'Unidentified transaction'
|
||||
|
||||
@property
|
||||
def receipt_message(self) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Consumption(Transaction):
|
||||
|
||||
product: str
|
||||
|
||||
@property
|
||||
def receipt_description(self) -> str:
|
||||
return self.product
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Deposit(Transaction):
|
||||
|
||||
@property
|
||||
def receipt_description(self) -> str:
|
||||
return 'Deposit'
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Modification(Transaction):
|
||||
|
||||
agent: str
|
||||
reason: Optional[str]
|
||||
|
||||
@property
|
||||
def receipt_description(self) -> str:
|
||||
return f'Balance modified by {self.agent}'
|
||||
|
||||
@property
|
||||
def receipt_message(self) -> Optional[str]:
|
||||
if self.reason is None:
|
||||
return None
|
||||
else:
|
||||
return f'Reason: «{self.reason}»'
|
|
@ -5,3 +5,5 @@ This package provides the 'primitive types' the Matemat software deals with - na
|
|||
from .User import User
|
||||
from .Product import Product
|
||||
from .ReceiptPreference import ReceiptPreference
|
||||
from .Transaction import Transaction, Consumption, Deposit, Modification
|
||||
from .Receipt import Receipt
|
||||
|
|
|
@ -2,9 +2,11 @@
|
|||
import unittest
|
||||
|
||||
import crypt
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from matemat.db import MatematDatabase
|
||||
from matemat.db.primitives import User, ReceiptPreference
|
||||
from matemat.db.primitives import User, Product, ReceiptPreference, Receipt,\
|
||||
Transaction, Modification, Deposit, Consumption
|
||||
from matemat.exceptions import AuthenticationError, DatabaseConsistencyError
|
||||
|
||||
|
||||
|
@ -389,3 +391,186 @@ class DatabaseTest(unittest.TestCase):
|
|||
db.increment_consumption(user1, florapowermate)
|
||||
with self.assertRaises(DatabaseConsistencyError):
|
||||
db.increment_consumption(user2, clubmate)
|
||||
|
||||
def test_check_receipt_due(self):
|
||||
with self.db as db:
|
||||
# Receipt preference set to 0
|
||||
user0 = db.create_user('user0', 'supersecurepassword', 'user0@example.com', True, True)
|
||||
# No email, no receipts
|
||||
user1 = db.create_user('user1', 'supersecurepassword', None, True, True)
|
||||
db.change_user(user1, agent=None, receipt_pref=ReceiptPreference.MONTHLY)
|
||||
# Should receive a receipt, has never received a receipt before
|
||||
user2 = db.create_user('user2', 'supersecurepassword', 'user2@example.com', True, True)
|
||||
db.change_user(user2, agent=None, receipt_pref=ReceiptPreference.MONTHLY)
|
||||
# Should receive a receipt, has received receipts before
|
||||
user3 = db.create_user('user3', 'supersecurepassword', 'user3@example.com', True, True)
|
||||
db.change_user(user3, agent=None, receipt_pref=ReceiptPreference.MONTHLY)
|
||||
# Shouldn't receive a receipt, a month hasn't passed since the last receipt
|
||||
user4 = db.create_user('user4', 'supersecurepassword', 'user4@example.com', True, True)
|
||||
db.change_user(user4, agent=None, receipt_pref=ReceiptPreference.MONTHLY)
|
||||
# Should receive a receipt, has been more than a year since the last receipt
|
||||
user5 = db.create_user('user5', 'supersecurepassword', 'user5@example.com', True, True)
|
||||
db.change_user(user5, agent=None, receipt_pref=ReceiptPreference.YEARLY)
|
||||
# Shouldn't receive a receipt, a year hasn't passed since the last receipt
|
||||
user6 = db.create_user('user6', 'supersecurepassword', 'user6@example.com', True, True)
|
||||
db.change_user(user6, agent=None, receipt_pref=ReceiptPreference.YEARLY)
|
||||
# Invalid receipt preference, should raise a ValueError
|
||||
user7 = db.create_user('user7', 'supersecurepassword', 'user7@example.com', True, True)
|
||||
user7.receipt_pref = 42
|
||||
|
||||
twoyears: int = int((datetime.utcnow() - timedelta(days=730)).timestamp())
|
||||
halfyear: int = int((datetime.utcnow() - timedelta(days=183)).timestamp())
|
||||
twomonths: int = int((datetime.utcnow() - timedelta(days=61)).timestamp())
|
||||
halfmonth: int = int((datetime.utcnow() - timedelta(days=15)).timestamp())
|
||||
|
||||
with db.transaction() as c:
|
||||
# Fix creation date for user2
|
||||
c.execute('''
|
||||
UPDATE users SET created = :twomonths WHERE user_id = :user2
|
||||
''', {
|
||||
'twomonths': twomonths,
|
||||
'user2': user2.id
|
||||
})
|
||||
# Create transactions
|
||||
c.execute('''
|
||||
INSERT INTO transactions (ta_id, user_id, value, old_balance, date) VALUES
|
||||
(1, :user0, 4200, 0, :twomonths),
|
||||
(2, :user0, 100, 4200, :halfmonth),
|
||||
(3, :user1, 4200, 0, :twomonths),
|
||||
(4, :user1, 100, 4200, :halfmonth),
|
||||
(5, :user2, 4200, 0, :twomonths),
|
||||
(6, :user2, 100, 4200, :halfmonth),
|
||||
(7, :user3, 4200, 0, :twomonths),
|
||||
(8, :user3, 100, 4200, :halfmonth),
|
||||
(9, :user4, 4200, 0, :twomonths),
|
||||
(10, :user4, 100, 4200, :halfmonth),
|
||||
(11, :user5, 4200, 0, :twoyears),
|
||||
(12, :user5, 100, 4200, :halfyear),
|
||||
(13, :user6, 4200, 0, :twoyears),
|
||||
(14, :user6, 100, 4200, :halfyear)
|
||||
''', {
|
||||
'twoyears': twoyears,
|
||||
'halfyear': halfyear,
|
||||
'twomonths': twomonths,
|
||||
'halfmonth': halfmonth,
|
||||
'user0': user0.id,
|
||||
'user1': user1.id,
|
||||
'user2': user2.id,
|
||||
'user3': user3.id,
|
||||
'user4': user4.id,
|
||||
'user5': user5.id,
|
||||
'user6': user6.id
|
||||
})
|
||||
# Create receipts
|
||||
c.execute('''
|
||||
INSERT INTO receipts (user_id, first_ta_id, last_ta_id, date) VALUES
|
||||
(:user3, 7, 7, :twomonths),
|
||||
(:user4, 9, 9, :halfmonth),
|
||||
(:user5, 11, 11, :twoyears),
|
||||
(:user6, 13, 13, :halfyear)
|
||||
''', {
|
||||
'twoyears': twoyears,
|
||||
'halfyear': halfyear,
|
||||
'twomonths': twomonths,
|
||||
'halfmonth': halfmonth,
|
||||
'user3': user3.id,
|
||||
'user4': user4.id,
|
||||
'user5': user5.id,
|
||||
'user6': user6.id
|
||||
})
|
||||
|
||||
self.assertFalse(db.check_receipt_due(user0))
|
||||
self.assertFalse(self.db.check_receipt_due(user1))
|
||||
self.assertTrue(self.db.check_receipt_due(user2))
|
||||
self.assertTrue(self.db.check_receipt_due(user3))
|
||||
self.assertFalse(self.db.check_receipt_due(user4))
|
||||
self.assertTrue(self.db.check_receipt_due(user5))
|
||||
self.assertFalse(self.db.check_receipt_due(user6))
|
||||
with self.assertRaises(ValueError):
|
||||
self.db.check_receipt_due(user7)
|
||||
|
||||
def test_create_receipt(self):
|
||||
with self.db as db:
|
||||
|
||||
now: datetime = datetime.utcnow()
|
||||
admin: User = db.create_user('admin', 'supersecurepassword', 'admin@example.com', True, True)
|
||||
user: User = db.create_user('user', 'supersecurepassword', 'user@example.com', True, True)
|
||||
product: Product = db.create_product('Flora Power Mate', 200, 200)
|
||||
|
||||
# Create some transactions
|
||||
db.change_user(user, agent=admin,
|
||||
receipt_pref=ReceiptPreference.MONTHLY,
|
||||
balance=4200, balance_reason='Here\'s a gift!')
|
||||
db.increment_consumption(user, product)
|
||||
db.deposit(user, 1337)
|
||||
receipt1: Receipt = db.create_receipt(user, write=True)
|
||||
|
||||
with db.transaction() as c:
|
||||
c.execute('SELECT COUNT(receipt_id) FROM receipts')
|
||||
self.assertEqual(1, c.fetchone()[0])
|
||||
|
||||
db.increment_consumption(user, product)
|
||||
db.change_user(user, agent=admin, balance=4200)
|
||||
with db.transaction() as c:
|
||||
# Unknown transactions
|
||||
c.execute('''
|
||||
INSERT INTO transactions (user_id, value, old_balance)
|
||||
SELECT user_id, 500, balance
|
||||
FROM users
|
||||
WHERE user_id = :id
|
||||
''', [user.id])
|
||||
receipt2: Receipt = db.create_receipt(user, write=False)
|
||||
|
||||
with db.transaction() as c:
|
||||
c.execute('SELECT COUNT(receipt_id) FROM receipts')
|
||||
self.assertEqual(1, c.fetchone()[0])
|
||||
|
||||
self.assertEqual(user, receipt1.user)
|
||||
self.assertEqual(3, len(receipt1.transactions))
|
||||
|
||||
self.assertIsInstance(receipt1.transactions[0], Modification)
|
||||
t10: Modification = receipt1.transactions[0]
|
||||
self.assertEqual(user, receipt1.user)
|
||||
self.assertEqual(4200, t10.value)
|
||||
self.assertEqual(0, t10.old_balance)
|
||||
self.assertEqual(admin.name, t10.agent)
|
||||
self.assertEqual('Here\'s a gift!', t10.reason)
|
||||
|
||||
self.assertIsInstance(receipt1.transactions[1], Consumption)
|
||||
t11: Consumption = receipt1.transactions[1]
|
||||
self.assertEqual(user, receipt1.user)
|
||||
self.assertEqual(-200, t11.value)
|
||||
self.assertEqual(4200, t11.old_balance)
|
||||
self.assertEqual('Flora Power Mate', t11.product)
|
||||
|
||||
self.assertIsInstance(receipt1.transactions[2], Deposit)
|
||||
t12: Deposit = receipt1.transactions[2]
|
||||
self.assertEqual(user, receipt1.user)
|
||||
self.assertEqual(1337, t12.value)
|
||||
self.assertEqual(4000, t12.old_balance)
|
||||
|
||||
self.assertEqual(user, receipt2.user)
|
||||
self.assertEqual(3, len(receipt2.transactions))
|
||||
|
||||
self.assertIsInstance(receipt2.transactions[0], Consumption)
|
||||
t20: Consumption = receipt2.transactions[0]
|
||||
self.assertEqual(user, receipt2.user)
|
||||
self.assertEqual(-200, t20.value)
|
||||
self.assertEqual(5337, t20.old_balance)
|
||||
self.assertEqual('Flora Power Mate', t20.product)
|
||||
|
||||
self.assertIsInstance(receipt2.transactions[1], Modification)
|
||||
t21: Modification = receipt2.transactions[1]
|
||||
self.assertEqual(user, receipt2.user)
|
||||
self.assertEqual(-937, t21.value)
|
||||
self.assertEqual(5137, t21.old_balance)
|
||||
self.assertEqual(admin.name, t21.agent)
|
||||
self.assertEqual(None, t21.reason)
|
||||
|
||||
self.assertIs(type(receipt2.transactions[2]), Transaction)
|
||||
t22: Transaction = receipt2.transactions[2]
|
||||
self.assertEqual(user, receipt2.user)
|
||||
self.assertEqual(500, t22.value)
|
||||
self.assertEqual(4200, t22.old_balance)
|
||||
|
||||
# TODO: Test cases for primitive object vs database row mismatch
|
||||
|
|
|
@ -9,7 +9,7 @@ from matemat.db.schemas import SCHEMAS
|
|||
from matemat.db.migrations import migrate_schema_1_to_2, migrate_schema_2_to_3
|
||||
|
||||
|
||||
class Transaction(object):
|
||||
class DatabaseTransaction(object):
|
||||
|
||||
def __init__(self, db: sqlite3.Connection, exclusive: bool = True) -> None:
|
||||
self._db: sqlite3.Connection = db
|
||||
|
@ -56,10 +56,10 @@ class DatabaseWrapper(object):
|
|||
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
||||
self.close()
|
||||
|
||||
def transaction(self, exclusive: bool = True) -> Transaction:
|
||||
def transaction(self, exclusive: bool = True) -> DatabaseTransaction:
|
||||
if self._sqlite_db is None:
|
||||
raise RuntimeError(f'Database connection to {self._filename} is not established.')
|
||||
return Transaction(self._sqlite_db, exclusive)
|
||||
return DatabaseTransaction(self._sqlite_db, exclusive)
|
||||
|
||||
def _setup(self) -> None:
|
||||
# Enable foreign key enforcement
|
||||
|
|
Loading…
Reference in a new issue