feat: Immediately purchase a product by calling /?ean=...
Some checks failed
/ test (push) Successful in 1m26s
/ codestyle (push) Failing after 57s

chore: Replace datetime.utcnow with datetime.now(UTC)
chore: Replace sqlite3 qmark-bindings with named bindings
This commit is contained in:
s3lph 2024-11-23 04:35:05 +01:00
parent bfc503c5d3
commit f3af4d64a7
Signed by: s3lph
GPG key ID: 0AA29A52FB33CFB5
23 changed files with 281 additions and 100 deletions

View file

@ -1,5 +1,20 @@
# Matemat Changelog
<!-- BEGIN RELEASE v0.3.13 -->
## Version 0.3.13
Quick-purchase via EAN codes
### Changes
<!-- BEGIN CHANGES 0.3.13 -->
- feat: Immediately purchase a product by calling `/?ean=...`
- chore: Replace datetime.utcnow with datetime.now(UTC)
- chore: Replace sqlite3 qmark-bindings with named bindings
<!-- END CHANGES 0.3.13 -->
<!-- END RELEASE v0.3.13 -->
<!-- BEGIN RELEASE v0.3.12 -->
## Version 0.3.12

View file

@ -1,2 +1,2 @@
__version__ = '0.3.12'
__version__ = '0.3.13'

View file

@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional, Tuple, Type
import crypt
from hmac import compare_digest
from datetime import datetime
from datetime import datetime, UTC
from matemat.db.primitives import User, Product, ReceiptPreference, Receipt, \
Transaction, Consumption, Deposit, Modification
@ -111,9 +111,8 @@ class MatematDatabase(object):
c.execute('''
SELECT user_id, username, email, is_admin, is_member, balance, receipt_pref, logout_after_purchase
FROM users
WHERE user_id = ?
''',
[uid])
WHERE user_id = :user_id
''', {'user_id': uid})
row = c.fetchone()
if row is None:
raise ValueError(f'No user with user ID {uid} exists.')
@ -148,7 +147,7 @@ class MatematDatabase(object):
user_id: int = -1
with self.db.transaction() as c:
# Look up whether a user with the same name already exists.
c.execute('SELECT user_id FROM users WHERE username = ?', [username])
c.execute('SELECT user_id FROM users WHERE username = :username', {'username': username})
if c.fetchone() is not None:
raise ValueError(f'A user with the name \'{username}\' already exists.')
# Insert the user into the database.
@ -188,8 +187,8 @@ class MatematDatabase(object):
SELECT user_id, username, email, password, touchkey, is_admin, is_member, balance, receipt_pref,
logout_after_purchase
FROM users
WHERE username = ?
''', [username])
WHERE username = :username
''', {'username': username})
row = c.fetchone()
if row is None:
raise AuthenticationError('User does not exist')
@ -221,8 +220,8 @@ class MatematDatabase(object):
with self.db.transaction() as c:
# Fetch the old password.
c.execute('''
SELECT password FROM users WHERE user_id = ?
''', [user.id])
SELECT password FROM users WHERE user_id = :user_id
''', {'user_id': user.id})
row = c.fetchone()
if row is None:
raise AuthenticationError('User does not exist in database.')
@ -251,8 +250,8 @@ class MatematDatabase(object):
with self.db.transaction() as c:
# Fetch the password.
c.execute('''
SELECT password FROM users WHERE user_id = ?
''', [user.id])
SELECT password FROM users WHERE user_id = :user_id
''', {'user_id': user.id})
row = c.fetchone()
if row is None:
raise AuthenticationError('User does not exist in database.')
@ -352,8 +351,8 @@ class MatematDatabase(object):
with self.db.transaction() as c:
c.execute('''
DELETE FROM users
WHERE user_id = ?
''', [user.id])
WHERE user_id = :user_id
''', {'user_id': user.id})
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
@ -367,34 +366,51 @@ class MatematDatabase(object):
products: List[Product] = []
with self.db.transaction(exclusive=False) as c:
for row in c.execute('''
SELECT product_id, name, price_member, price_non_member, custom_price, stock, stockable
SELECT product_id, name, price_member, price_non_member, custom_price, stock, stockable, ean
FROM products ORDER BY name
'''):
product_id, name, price_member, price_external, custom_price, stock, stockable = row
products.append(Product(product_id, name, price_member, price_external, custom_price, stockable, stock))
product_id, name, price_member, price_external, custom_price, stock, stockable, ean = row
products.append(Product(product_id, name, price_member, price_external, custom_price, stockable, stock, ean))
return products
def get_product(self, pid: int) -> Product:
"""
Return a product identified by its product ID.
:param pid: The products's ID.
:param pid: The product's ID.
"""
with self.db.transaction(exclusive=False) as c:
# Fetch all values to construct the product
c.execute('''
SELECT product_id, name, price_member, price_non_member, custom_price, stock, stockable
SELECT product_id, name, price_member, price_non_member, custom_price, stock, stockable, ean
FROM products
WHERE product_id = ?''',
[pid])
WHERE product_id = :product_id''', {'product_id': pid})
row = c.fetchone()
if row is None:
raise ValueError(f'No product with product ID {pid} exists.')
# Unpack the row and construct the product
product_id, name, price_member, price_non_member, custom_price, stock, stockable = row
return Product(product_id, name, price_member, price_non_member, custom_price, stockable, stock)
product_id, name, price_member, price_non_member, custom_price, stock, stockable, ean = row
return Product(product_id, name, price_member, price_non_member, custom_price, stockable, stock, ean)
def get_product_by_ean(self, ean: str) -> Product:
"""
Return a product identified by its EAN code.
:param ean: The product's EAN code.
"""
with self.db.transaction(exclusive=False) as c:
# Fetch all values to construct the product
c.execute('''
SELECT product_id, name, price_member, price_non_member, custom_price, stock, stockable, ean
FROM products
WHERE ean = :ean''', {'ean': ean})
row = c.fetchone()
if row is None:
raise ValueError(f'No product with EAN code {ean} exists.')
# Unpack the row and construct the product
product_id, name, price_member, price_non_member, custom_price, stock, stockable, ean = row
return Product(product_id, name, price_member, price_non_member, custom_price, stockable, stock, ean)
def create_product(self, name: str, price_member: int, price_non_member: int, custom_price:
bool, stockable: bool) -> Product:
bool, stockable: bool, ean: str) -> Product:
"""
Creates a new product.
:param name: Name of the product.
@ -407,22 +423,23 @@ class MatematDatabase(object):
"""
product_id: int = -1
with self.db.transaction() as c:
c.execute('SELECT product_id FROM products WHERE name = ?', [name])
c.execute('SELECT product_id FROM products WHERE name = :name', {'name': name})
if c.fetchone() is not None:
raise ValueError(f'A product with the name \'{name}\' already exists.')
c.execute('''
INSERT INTO products (name, price_member, price_non_member, custom_price, stock, stockable)
VALUES (:name, :price_member, :price_non_member, :custom_price, 0, :stockable)
INSERT INTO products (name, price_member, price_non_member, custom_price, stock, stockable, ean)
VALUES (:name, :price_member, :price_non_member, :custom_price, 0, :stockable, :ean)
''', {
'name': name,
'price_member': price_member,
'price_non_member': price_non_member,
'custom_price': custom_price,
'stockable': stockable
'stockable': stockable,
'ean': ean,
})
c.execute('SELECT last_insert_rowid()')
product_id = int(c.fetchone()[0])
return Product(product_id, name, price_member, price_non_member, custom_price, stockable, 0)
return Product(product_id, name, price_member, price_non_member, custom_price, stockable, 0, ean)
def change_product(self, product: Product, **kwargs) -> None:
"""
@ -441,6 +458,7 @@ class MatematDatabase(object):
custom_price: int = kwargs['custom_price'] if 'custom_price' in kwargs else product.custom_price
stock: int = kwargs['stock'] if 'stock' in kwargs else product.stock
stockable: bool = kwargs['stockable'] if 'stockable' in kwargs else product.stockable
ean: str = kwargs['ean'] if 'ean' in kwargs else product.ean
with self.db.transaction() as c:
c.execute('''
UPDATE products
@ -450,7 +468,8 @@ class MatematDatabase(object):
price_non_member = :price_non_member,
custom_price = :custom_price,
stock = :stock,
stockable = :stockable
stockable = :stockable,
ean = :ean
WHERE product_id = :product_id
''', {
'product_id': product.id,
@ -459,7 +478,8 @@ class MatematDatabase(object):
'price_non_member': price_non_member,
'custom_price': custom_price,
'stock': stock,
'stockable': stockable
'stockable': stockable,
'ean': ean
})
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
@ -472,6 +492,7 @@ class MatematDatabase(object):
product.custom_price = custom_price
product.stock = stock
product.stockable = stockable
product.ean = ean
def delete_product(self, product: Product) -> None:
"""
@ -482,8 +503,8 @@ class MatematDatabase(object):
with self.db.transaction() as c:
c.execute('''
DELETE FROM products
WHERE product_id = ?
''', [product.id])
WHERE product_id = :product_id
''', {'product_id': product.id})
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
@ -543,8 +564,7 @@ class MatematDatabase(object):
if amount < 0:
raise ValueError('Cannot deposit a negative value')
with self.db.transaction() as c:
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''',
[user.id])
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''', {'user_id': user.id})
row = c.fetchone()
if row is None:
raise DatabaseConsistencyError(f'No such user: {user.id}')
@ -588,8 +608,7 @@ class MatematDatabase(object):
raise ValueError('Cannot transfer a negative value')
with self.db.transaction() as c:
# First, remove amount from the source user's account
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''',
[source.id])
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''', {'user_id': source.id})
row = c.fetchone()
if row is None:
raise DatabaseConsistencyError(f'No such user: {source.id}')
@ -621,8 +640,7 @@ class MatematDatabase(object):
if affected != 1:
raise DatabaseConsistencyError(f'transfer should affect 1 users row, but affected {affected}')
# Then, add the amount to the destination user's account
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''',
[dest.id])
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''', {'user_id': dest.id})
row = c.fetchone()
if row is None:
raise DatabaseConsistencyError(f'No such user: {dest.id}')
@ -669,11 +687,11 @@ class MatematDatabase(object):
LEFT JOIN receipts AS r
ON r.user_id = u.user_id
WHERE u.user_id = :user_id
''', [user.id])
''', {'user_id': user.id})
last_receipt: datetime = datetime.fromtimestamp(c.fetchone()[0])
last_receipt: datetime = datetime.fromtimestamp(c.fetchone()[0], UTC)
next_receipt_due: datetime = user.receipt_pref.next_receipt_due(last_receipt)
return datetime.utcnow() > next_receipt_due
return datetime.now(UTC) > next_receipt_due
def create_receipt(self, user: User, write: bool = False) -> Receipt:
transactions: List[Transaction] = []
@ -684,12 +702,12 @@ class MatematDatabase(object):
LEFT JOIN receipts AS r
ON r.user_id = u.user_id
WHERE u.user_id = :user_id
''', [user.id])
''', {'user_id': user.id})
row = cursor.fetchone()
if row is None:
raise DatabaseConsistencyError(f'No such user: {user.id}')
fromdate, min_id = row
created: datetime = datetime.fromtimestamp(fromdate)
created: datetime = datetime.fromtimestamp(fromdate, UTC)
cursor.execute('''
SELECT
t.ta_id, t.value, t.old_balance, COALESCE(t.date, 0),
@ -712,13 +730,13 @@ class MatematDatabase(object):
for row in rows:
ta_id, value, old_balance, date, c, d, m, c_prod, m_agent, m_reason = row
if c == ta_id:
t: Transaction = Consumption(ta_id, user, value, old_balance, datetime.fromtimestamp(date), c_prod)
t: Transaction = Consumption(ta_id, user, value, old_balance, datetime.fromtimestamp(date, UTC), c_prod)
elif d == ta_id:
t = Deposit(ta_id, user, value, old_balance, datetime.fromtimestamp(date))
t = Deposit(ta_id, user, value, old_balance, datetime.fromtimestamp(date, UTC))
elif m == ta_id:
t = Modification(ta_id, user, value, old_balance, datetime.fromtimestamp(date), m_agent, m_reason)
t = Modification(ta_id, user, value, old_balance, datetime.fromtimestamp(date, UTC), m_agent, m_reason)
else:
t = Transaction(ta_id, user, value, old_balance, datetime.fromtimestamp(date))
t = Transaction(ta_id, user, value, old_balance, datetime.fromtimestamp(date, UTC))
transactions.append(t)
if write:
cursor.execute('''
@ -733,7 +751,7 @@ class MatematDatabase(object):
receipt_id: int = int(cursor.fetchone()[0])
else:
receipt_id = -1
receipt = Receipt(receipt_id, transactions, user, created, datetime.utcnow())
receipt = Receipt(receipt_id, transactions, user, created, datetime.now(UTC))
return receipt
def generate_sales_statistics(self, from_date: datetime, to_date: datetime) -> Dict[str, Any]:
@ -775,7 +793,7 @@ class MatematDatabase(object):
LIMIT 1
), u.balance)
FROM users AS u
''', [to_date.timestamp()])
''', {'to_date': to_date.timestamp()})
for balance, in c.fetchall():
if balance > 0:
positive_balance += balance

View file

@ -284,3 +284,14 @@ def migrate_schema_6_to_7(c: sqlite3.Cursor):
ALTER TABLE users ADD COLUMN
logout_after_purchase INTEGER(1) DEFAULT 0;
''')
def migrate_schema_7_to_8(c: sqlite3.Cursor):
# Add ean column
c.execute('''
ALTER TABLE products ADD COLUMN ean TEXT DEFAULT NULL
''')
# Make ean column unique
c.execute('''
CREATE UNIQUE INDEX _matemat_products_ean_unique ON products(ean)
''')

View file

@ -11,11 +11,12 @@ class Product:
:param custom_price: If true, the user can choose the price to pay, but at least the regular price.
:param stock: The number of items of this product currently in stock, or None if not stockable.
:param stockable: Whether this product is stockable.
:param ean: The product's EAN code. May be None.
"""
def __init__(self, _id: int, name: str,
price_member: int, price_non_member: int, custom_price: bool,
stockable: bool, stock: int) -> None:
stockable: bool, stock: int, ean: str) -> None:
self.id: int = _id
self.name: str = name
self.price_member: int = price_member
@ -23,6 +24,7 @@ class Product:
self.custom_price: bool = custom_price
self.stock: int = stock
self.stockable: bool = stockable
self.ean: str = ean
def __eq__(self, other) -> bool:
if not isinstance(other, Product):
@ -33,8 +35,9 @@ class Product:
self.price_non_member == other.price_non_member and \
self.custom_price == other.custom_price and \
self.stock == other.stock and \
self.stockable == other.stockable
self.stockable == other.stockable and \
self.ean == other.ean
def __hash__(self) -> int:
return hash((self.id, self.name, self.price_member, self.price_non_member, self.custom_price,
self.stock, self.stockable))
self.stock, self.stockable, self.ean))

View file

@ -27,7 +27,7 @@ class Transaction:
@property
def receipt_date(self) -> str:
if self.date == datetime.fromtimestamp(0):
if self.date == datetime.fromtimestamp(0, UTC):
return '<unknown> '
date: str = self.date.strftime('%d.%m.%Y, %H:%M')
return date

View file

@ -494,3 +494,85 @@ SCHEMAS[7] = [
ON DELETE SET NULL ON UPDATE CASCADE
);
''']
SCHEMAS[8] = [
'''
CREATE TABLE users (
user_id INTEGER PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT DEFAULT NULL,
password TEXT NOT NULL,
touchkey TEXT DEFAULT NULL,
is_admin INTEGER(1) NOT NULL DEFAULT 0,
is_member INTEGER(1) NOT NULL DEFAULT 1,
balance INTEGER(8) NOT NULL DEFAULT 0,
lastchange INTEGER(8) NOT NULL DEFAULT 0,
receipt_pref INTEGER(1) NOT NULL DEFAULT 0,
created INTEGER(8) NOT NULL DEFAULT 0,
logout_after_purchase INTEGER(1) DEFAULT 0
);
''',
'''
CREATE TABLE products (
product_id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
stock INTEGER(8) DEFAULT 0,
stockable INTEGER(1) DEFAULT 1,
price_member INTEGER(8) NOT NULL,
price_non_member INTEGER(8) NOT NULL,
custom_price INTEGER(1) DEFAULT 0,
ean TEXT UNIQUE DEFAULT NULL
);
''',
'''
CREATE TABLE transactions ( -- "superclass" of the following 3 tables
ta_id INTEGER PRIMARY KEY,
user_id INTEGER DEFAULT NULL,
value INTEGER(8) NOT NULL,
old_balance INTEGER(8) NOT NULL,
date INTEGER(8) DEFAULT (STRFTIME('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE SET NULL ON UPDATE CASCADE
);
''',
'''
CREATE TABLE consumptions ( -- transactions involving buying a product
ta_id INTEGER PRIMARY KEY,
product TEXT NOT NULL,
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''',
'''
CREATE TABLE deposits ( -- transactions involving depositing cash
ta_id INTEGER PRIMARY KEY,
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''',
'''
CREATE TABLE modifications ( -- transactions involving balance modification by an admin
ta_id INTEGER NOT NULL,
agent TEXT NOT NULL,
reason TEXT DEFAULT NULL,
PRIMARY KEY (ta_id),
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''',
'''
CREATE TABLE receipts ( -- receipts sent to the users
receipt_id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
first_ta_id INTEGER DEFAULT NULL,
last_ta_id INTEGER DEFAULT NULL,
date INTEGER(8) DEFAULT (STRFTIME('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (first_ta_id) REFERENCES transactions(ta_id)
ON DELETE SET NULL ON UPDATE CASCADE,
FOREIGN KEY (last_ta_id) REFERENCES transactions(ta_id)
ON DELETE SET NULL ON UPDATE CASCADE
);
''']

View file

@ -2,7 +2,7 @@
import unittest
import crypt
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
from matemat.db import MatematDatabase
from matemat.db.primitives import User, Product, ReceiptPreference, Receipt, \
@ -220,7 +220,7 @@ class DatabaseTest(unittest.TestCase):
def test_create_product(self) -> None:
with self.db as db:
with db.transaction() as c:
db.create_product('Club Mate', 200, 200, True, True)
db.create_product('Club Mate', 200, 200, True, True, '4029764001807')
c.execute("SELECT * FROM products")
row = c.fetchone()
self.assertEqual('Club Mate', row[1])
@ -229,18 +229,20 @@ class DatabaseTest(unittest.TestCase):
self.assertEqual(200, row[4])
self.assertEqual(200, row[5])
self.assertEqual(1, row[6])
self.assertEqual('4029764001807', row[7])
with self.assertRaises(ValueError):
db.create_product('Club Mate', 250, 250, False, False)
db.create_product('Club Mate', 250, 250, False, False, '4029764001807')
def test_get_product(self) -> None:
with self.db as db:
with db.transaction(exclusive=False):
created = db.create_product('Club Mate', 150, 250, False, False)
created = db.create_product('Club Mate', 150, 250, False, False, '4029764001807')
product = db.get_product(created.id)
self.assertEqual('Club Mate', product.name)
self.assertEqual(150, product.price_member)
self.assertEqual(250, product.price_non_member)
self.assertEqual(False, product.stockable)
self.assertEqual('4029764001807', product.ean)
with self.assertRaises(ValueError):
db.get_product(-1)
@ -249,9 +251,9 @@ class DatabaseTest(unittest.TestCase):
# Test empty list
products = db.list_products()
self.assertEqual(0, len(products))
db.create_product('Club Mate', 200, 200, False, True)
db.create_product('Flora Power Mate', 200, 200, False, False)
db.create_product('Fritz Mate', 200, 250, False, True)
db.create_product('Club Mate', 200, 200, False, True, '4029764001807')
db.create_product('Flora Power Mate', 200, 200, False, False, None)
db.create_product('Fritz Mate', 200, 250, False, True, '4260107223177')
products = db.list_products()
self.assertEqual(3, len(products))
productcheck = {}
@ -260,22 +262,25 @@ class DatabaseTest(unittest.TestCase):
self.assertEqual(200, product.price_member)
self.assertEqual(200, product.price_non_member)
self.assertTrue(product.stockable)
self.assertEqual('4029764001807', product.ean)
elif product.name == 'Flora Power Mate':
self.assertEqual(200, product.price_member)
self.assertEqual(200, product.price_non_member)
self.assertFalse(product.stockable)
self.assertEqual(None, product.ean)
elif product.name == 'Fritz Mate':
self.assertEqual(200, product.price_member)
self.assertEqual(250, product.price_non_member)
self.assertTrue(product.stockable)
self.assertEqual('4260107223177', product.ean)
productcheck[product.id] = 1
self.assertEqual(3, len(productcheck))
def test_change_product(self) -> None:
with self.db as db:
product = db.create_product('Club Mate', 200, 200, False, True)
product = db.create_product('Club Mate', 200, 200, False, True, '4029764001807')
db.change_product(product, name='Flora Power Mate', price_member=150, price_non_member=250,
custom_price=True, stock=None, stockable=False)
custom_price=True, stock=None, stockable=False, ean=None)
# Changes must be reflected in the passed object
self.assertEqual('Flora Power Mate', product.name)
self.assertEqual(150, product.price_member)
@ -283,6 +288,7 @@ class DatabaseTest(unittest.TestCase):
self.assertEqual(True, product.custom_price)
self.assertEqual(None, product.stock)
self.assertEqual(False, product.stockable)
self.assertEqual(None, product.ean)
# Changes must be reflected in the database
checkproduct = db.get_product(product.id)
self.assertEqual('Flora Power Mate', checkproduct.name)
@ -294,7 +300,7 @@ class DatabaseTest(unittest.TestCase):
product.id = -1
with self.assertRaises(DatabaseConsistencyError):
db.change_product(product)
product2 = db.create_product('Club Mate', 200, 200, False, True)
product2 = db.create_product('Club Mate', 200, 200, False, True, '4029764001807')
product2.name = 'Flora Power Mate'
with self.assertRaises(DatabaseConsistencyError):
# Should fail, as a product with the same name already exists.
@ -302,8 +308,8 @@ class DatabaseTest(unittest.TestCase):
def test_delete_product(self) -> None:
with self.db as db:
product = db.create_product('Club Mate', 200, 200, False, True)
product2 = db.create_product('Flora Power Mate', 200, 200, False, False)
product = db.create_product('Club Mate', 200, 200, False, True, '4029764001807')
product2 = db.create_product('Flora Power Mate', 200, 200, False, False, None)
self.assertEqual(2, len(db.list_products()))
db.delete_product(product)
@ -378,9 +384,9 @@ class DatabaseTest(unittest.TestCase):
db.deposit(user1, 1337)
db.deposit(user2, 4242)
db.deposit(user3, 1234)
clubmate = db.create_product('Club Mate', 200, 200, False, True)
florapowermate = db.create_product('Flora Power Mate', 150, 250, False, True)
fritzmate = db.create_product('Fritz Mate', 200, 200, False, True)
clubmate = db.create_product('Club Mate', 200, 200, False, True, '4029764001807')
florapowermate = db.create_product('Flora Power Mate', 150, 250, False, True, None)
fritzmate = db.create_product('Fritz Mate', 200, 200, False, True, '4260107223177')
# user1 is somewhat addicted to caffeine
for _ in range(3):
@ -430,10 +436,10 @@ class DatabaseTest(unittest.TestCase):
user7 = db.create_user('user7', 'supersecurepassword', 'user7@example.com', True, True)
user7.receipt_pref = 42
twoyears: int = int((datetime.utcnow() - timedelta(days=730)).timestamp())
halfyear: int = int((datetime.utcnow() - timedelta(days=183)).timestamp())
twomonths: int = int((datetime.utcnow() - timedelta(days=61)).timestamp())
halfmonth: int = int((datetime.utcnow() - timedelta(days=15)).timestamp())
twoyears: int = int((datetime.now(UTC) - timedelta(days=730)).timestamp())
halfyear: int = int((datetime.now(UTC) - timedelta(days=183)).timestamp())
twomonths: int = int((datetime.now(UTC) - timedelta(days=61)).timestamp())
halfmonth: int = int((datetime.now(UTC) - timedelta(days=15)).timestamp())
with db.transaction() as c:
# Fix creation date for user2
@ -506,7 +512,7 @@ class DatabaseTest(unittest.TestCase):
admin: User = db.create_user('admin', 'supersecurepassword', 'admin@example.com', True, True)
user: User = db.create_user('user', 'supersecurepassword', 'user@example.com', True, True)
product: Product = db.create_product('Flora Power Mate', 200, 200, False, True)
product: Product = db.create_product('Flora Power Mate', 200, 200, False, True, None)
# Create some transactions
db.change_user(user, agent=admin,
@ -533,7 +539,7 @@ class DatabaseTest(unittest.TestCase):
SELECT user_id, 500, balance
FROM users
WHERE user_id = :id
''', [user.id])
''', {'id': user.id})
receipt3: Receipt = db.create_receipt(user, write=False)
with db.transaction() as c:
@ -595,8 +601,8 @@ class DatabaseTest(unittest.TestCase):
user2: User = db.create_user('user2', 'supersecurepassword', 'user2@example.com', True, False)
user3: User = db.create_user('user3', 'supersecurepassword', 'user3@example.com', True, False)
user4: User = db.create_user('user4', 'supersecurepassword', 'user4@example.com', True, False)
flora: Product = db.create_product('Flora Power Mate', 200, 200, False, True)
club: Product = db.create_product('Club Mate', 200, 200, False, False)
flora: Product = db.create_product('Flora Power Mate', 200, 200, False, True, None)
club: Product = db.create_product('Club Mate', 200, 200, False, False, '4029764001807')
# Create some transactions
db.deposit(user1, 1337)
@ -610,7 +616,7 @@ class DatabaseTest(unittest.TestCase):
db.increment_consumption(user4, club)
# Generate statistics
now = datetime.utcnow()
now = datetime.now(UTC)
stats = db.generate_sales_statistics(now - timedelta(days=1), now + timedelta(days=1))
self.assertEqual(7, len(stats))

View file

@ -40,7 +40,7 @@ class DatabaseTransaction(object):
class DatabaseWrapper(object):
SCHEMA_VERSION = 7
SCHEMA_VERSION = 8
def __init__(self, filename: str) -> None:
self._filename: str = filename
@ -91,6 +91,8 @@ class DatabaseWrapper(object):
migrate_schema_5_to_6(c)
if from_version <= 6 and to_version >= 7:
migrate_schema_6_to_7(c)
if from_version <= 7 and to_version >= 8:
migrate_schema_7_to_8(c)
def connect(self) -> None:
if self.is_connected():

View file

@ -28,5 +28,5 @@ def add_months(d: datetime, months: int) -> datetime:
# Set the day of month temporarily to 1, then add the day offset to reach the 1st of the target month
newdate: datetime = d.replace(day=1) + timedelta(days=days)
# Re-set the day of month to the intended value, but capped by the max. day in the target month
newdate = newdate.replace(day=min(d.day, calendar.monthrange(newdate.year, newdate.month)[1]))
newdate = newdate.replace(day=min(d.day, calendar.monthrange(newdate.year, newdate.month)[1]), tzinfo=d.tzinfo)
return newdate

View file

@ -1,5 +1,5 @@
import os
from datetime import datetime
from datetime import datetime, UTC
from io import BytesIO
from shutil import copyfile
@ -48,7 +48,7 @@ def admin():
users = db.list_users()
products = db.list_products()
# Render the "Admin/Settings" page
now = str(int(datetime.utcnow().timestamp()))
now = str(int(datetime.now(UTC).timestamp()))
return template.render('admin.html',
authuser=user, authlevel=authlevel, users=users, products=products,
receipt_preference_class=ReceiptPreference, now=now,
@ -206,8 +206,9 @@ def handle_admin_change(args: FormsDict, files: FormsDict, db: MatematDatabase):
price_non_member = parse_chf(str(args.pricenonmember))
custom_price = 'custom_price' in args
stockable = 'stockable' in args
ean = str(args.ean) or None
# Create the user in the database
newproduct = db.create_product(name, price_member, price_non_member, custom_price, stockable)
newproduct = db.create_product(name, price_member, price_non_member, custom_price, stockable, ean)
# If a new product image was uploaded, process it
image = files.image.file.read() if 'image' in files else None
if image is not None and len(image) > 0:

View file

@ -1,4 +1,4 @@
from datetime import datetime
from datetime import datetime, UTC
from bottle import route, redirect, request
@ -14,7 +14,7 @@ def main_page():
"""
config = get_app_config()
session_id: str = session.start()
now = str(int(datetime.utcnow().timestamp()))
now = str(int(datetime.now(UTC).timestamp()))
with MatematDatabase(config['DatabaseFile']) as db:
# Fetch the list of products to display
products = db.list_products()
@ -23,11 +23,21 @@ def main_page():
else:
lastproduct = None
lastprice = int(request.params.lastprice) if request.params.lastprice else None
if request.params.ean:
try:
buyproduct = db.get_product_by_ean(request.params.ean)
except ValueError:
buyproduct = None
else:
buyproduct = None
# Check whether a user is logged in
if session.has(session_id, 'authenticated_user'):
# Fetch the user id and authentication level (touchkey vs password) from the session storage
uid: int = session.get(session_id, 'authenticated_user')
authlevel: int = session.get(session_id, 'authentication_level')
# If an EAN code was scanned, directly trigger the purchase
if buyproduct:
redirect(f'/buy?pid={buyproduct.id}')
# Fetch the user object from the database (for name display, price calculation and admin check)
users = db.list_users()
user = db.get_user(uid)
@ -45,4 +55,5 @@ def main_page():
return template.render('userlist.html',
users=users, setupname=config['InstanceName'], now=now,
signup=(config.get('SignupEnabled', '0') == '1'),
lastaction=request.params.lastaction, lastprice=lastprice, lastproduct=lastproduct)
lastaction=request.params.lastaction, lastprice=lastprice, lastproduct=lastproduct,
buyproduct=buyproduct)

View file

@ -1,6 +1,6 @@
import os
from io import BytesIO
from datetime import datetime
from datetime import datetime, UTC
from typing import Dict
import magic
@ -56,7 +56,7 @@ def modproduct():
redirect('/admin')
# Render the "Modify Product" page
now = str(int(datetime.utcnow().timestamp()))
now = str(int(datetime.now(UTC).timestamp()))
return template.render('modproduct.html',
authuser=authuser, product=product, authlevel=authlevel,
setupname=config['InstanceName'], now=now)
@ -98,11 +98,12 @@ def handle_change(args: FormsDict, files: FormsDict, product: Product, db: Matem
custom_price = 'custom_price' in args
stock = int(str(args.stock))
stockable = 'stockable' in args
ean = str(args.ean) or None
# Attempt to write the changes to the database
try:
db.change_product(product,
name=name, price_member=price_member, price_non_member=price_non_member,
custom_price=custom_price, stock=stock, stockable=stockable)
custom_price=custom_price, stock=stock, stockable=stockable, ean=ean)
stock_provider = get_stock_provider()
if stock_provider.needs_update() and product.stockable:
stock_provider.set_stock(product, stock)

View file

@ -1,5 +1,5 @@
import os
from datetime import datetime
from datetime import datetime, UTC
from io import BytesIO
from typing import Dict, Optional
@ -56,7 +56,7 @@ def moduser():
redirect('/admin')
# Render the "Modify User" page
now = str(int(datetime.utcnow().timestamp()))
now = str(int(datetime.now(UTC).timestamp()))
return template.render('moduser.html',
authuser=authuser, user=user, authlevel=authlevel, now=now,
receipt_preference_class=ReceiptPreference,

View file

@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
from math import pi, sin, cos
from typing import Any, Dict, List, Tuple
@ -34,7 +34,7 @@ def statistics():
# Show a 403 Forbidden error page if the user is not an admin
abort(403)
todate: datetime = datetime.utcnow()
todate: datetime = datetime.now(UTC)
fromdate: datetime = (todate - timedelta(days=365)).replace(hour=0, minute=0, second=0, microsecond=0)
if 'fromdate' in request.params:
fdarg: str = str(request.params.fromdate)

View file

@ -21,9 +21,13 @@ def touchkey_page():
redirect('/')
# If requested via HTTP GET, render the login page showing the touchkey UI
if request.method == 'GET':
if request.params.buypid:
buypid = str(request.params.buypid)
else:
buypid = None
return template.render('touchkey.html',
username=str(request.params.username), uid=int(str(request.params.uid)),
setupname=config['InstanceName'])
setupname=config['InstanceName'], buypid=buypid)
# If requested via HTTP POST, read the request arguments and attempt to log in with the provided credentials
elif request.method == 'POST':
# Connect to the database
@ -38,6 +42,9 @@ def touchkey_page():
session.put(session_id, 'authenticated_user', user.id)
# Set the authlevel session variable (0 = none, 1 = touchkey, 2 = password login)
session.put(session_id, 'authentication_level', 1)
if request.params.buypid:
buypid = str(request.params.buypid)
redirect(f'/buy?pid={buypid}')
# Redirect to the main page, showing the product list
redirect('/')
# If neither GET nor POST was used, show a 405 Method Not Allowed error page

View file

@ -3,7 +3,7 @@ from typing import Any, Dict, Tuple, Optional
from bottle import request, response
from secrets import token_bytes
from uuid import uuid4
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
__key: Optional[str] = token_bytes(32)
@ -21,7 +21,7 @@ def start() -> str:
:return: The session ID.
"""
# Reference date for session timeout
now = datetime.utcnow()
now = datetime.now(UTC)
# Read the client's session ID, if any
session_id = request.get_cookie(_COOKIE_NAME, secret=__key)
# If there is no active session, create a new session ID

View file

@ -52,6 +52,14 @@ nav div {
padding: 10px;
}
.notification {
display: block;
width: calc(100% - 36px);
margin: 10px;
padding: 10px;
background-color: #c0ffc0;
}
@media print {
footer {
position: fixed;
@ -132,7 +140,7 @@ nav div {
.numpad {
background: #f0f0f0;
text-decoration: none;
text-decoration: none;
font-size: 50px;
font-family: sans-serif;
line-height: 100px;

View file

@ -46,6 +46,9 @@
<label for="admin-newproduct-name">Name: </label>
<input id="admin-newproduct-name" type="text" name="name" /><br/>
<label for="admin-newproduct-ean">EAN code: </label>
<input id="admin-newproduct-ean" type="text" name="ean" /><br/>
<label for="admin-newproduct-price-member">Member price: </label>
CHF <input id="admin-newproduct-price-member" type="number" step="0.01" name="pricemember" value="0" /><br/>

View file

@ -36,7 +36,7 @@
<header>
{% block header %}
<nav class="navbarbutton">
<nav class="navbarbutton">
{# Show a link to the settings, if a user logged in via password (authlevel 2). #}
{% if authlevel|default(0) > 1 %}
{% if authuser is defined %}

View file

@ -14,6 +14,9 @@
<label for="modproduct-name">Name: </label>
<input id="modproduct-name" type="text" name="name" value="{{ product.name }}" /><br/>
<label for="modproduct-ean">EAN code: </label>
<input id="modproduct-ean" type="text" name="ean" value="{{ product.ean or '' }}" /><br/>
<label for="modproduct-price-member">Member price: </label>
CHF <input id="modproduct-price-member" type="number" step="0.01" name="pricemember" value="{{ product.price_member|chf(False) }}" /><br/>

View file

@ -23,6 +23,9 @@
<input type="hidden" name="uid" value="{{ uid }}" />
<input type="hidden" name="username" value="{{ username }}" />
<input type="hidden" name="touchkey" value="" id="loginform-touchkey-value" />
{% if buypid %}
<input type="hidden" name="buypid" value="{{ buypid }}" />
{% endif %}
</form>
<a href="/">Cancel</a>
@ -33,4 +36,4 @@
{{ super() }}
{% endblock %}
{% endblock %}

View file

@ -8,10 +8,17 @@
{% block main %}
{% if buyproduct %}
<div class="notification">
Login will purchase <strong>{{ buyproduct.name }}</strong>.
Click <a href="/">here</a> to abort.
</div>
{% endif %}
{% for user in users %}
{# Show an item per user, consisting of the username, and the avatar, linking to the touchkey login #}
<div class="thumblist-item">
<a href="/touchkey?uid={{ user.id }}&username={{ user.name }}">
<a href="/touchkey?uid={{ user.id }}&username={{ user.name }}{% if buyproduct %}&buypid={{ buyproduct.id }}{% endif %}">
<span class="thumblist-title">{{ user.name }}</span><br/>
<div class="imgcontainer">
<img src="/static/upload/thumbnails/users/{{ user.id }}.png?cacheBuster={{ now }}" alt="Avatar of {{ user.name }}" draggable="false"/>