1
0
Fork 0
forked from s3lph/matemat

Compare commits

...

3 commits
main ... main

Author SHA1 Message Date
4eb71415fd
fix: show the purchase warning banner also on the touchkey login
feat: replace overlay system with a generic notification banner system
feat: add a config option to automatically close tabs after ean purchase
2024-11-23 09:48:53 +01:00
8287dc1947
fix: codestyle 2024-11-23 04:37:47 +01:00
f3af4d64a7
feat: Immediately purchase a product by calling /?ean=...
chore: Replace datetime.utcnow with datetime.now(UTC)
chore: Replace sqlite3 qmark-bindings with named bindings
2024-11-23 04:35:05 +01:00
30 changed files with 404 additions and 207 deletions

View file

@ -1,5 +1,35 @@
# Matemat Changelog
<!-- BEGIN RELEASE v0.3.14 -->
## Version 0.3.14
Improvement of quick-purchase via EAN codes
### Changes
<!-- BEGIN CHANGES 0.3.14 -->
- fix: show the purchase warning banner also on the touchkey login
- feat: replace overlay system with a generic notification banner system
- feat: add a config option to automatically close tabs after ean purchase
<!-- END CHANGES 0.3.14 -->
<!-- END RELEASE v0.3.14 -->
<!-- BEGIN RELEASE v0.3.13 -->
## Version 0.3.13
Quick-purchase via EAN codes
### Changes
<!-- BEGIN CHANGES 0.3.13 -->
- feat: Immediately purchase a product by calling `/?ean=...`
- chore: Replace datetime.utcnow with datetime.now(UTC)
- chore: Replace sqlite3 qmark-bindings with named bindings
<!-- END CHANGES 0.3.13 -->
<!-- END RELEASE v0.3.13 -->
<!-- BEGIN RELEASE v0.3.12 -->
## Version 0.3.12

View file

@ -1,2 +1,2 @@
__version__ = '0.3.12'
__version__ = '0.3.14'

View file

@ -3,7 +3,7 @@ from typing import Any, Dict, List, Optional, Tuple, Type
import crypt
from hmac import compare_digest
from datetime import datetime
from datetime import datetime, UTC
from matemat.db.primitives import User, Product, ReceiptPreference, Receipt, \
Transaction, Consumption, Deposit, Modification
@ -111,9 +111,8 @@ class MatematDatabase(object):
c.execute('''
SELECT user_id, username, email, is_admin, is_member, balance, receipt_pref, logout_after_purchase
FROM users
WHERE user_id = ?
''',
[uid])
WHERE user_id = :user_id
''', {'user_id': uid})
row = c.fetchone()
if row is None:
raise ValueError(f'No user with user ID {uid} exists.')
@ -148,7 +147,7 @@ class MatematDatabase(object):
user_id: int = -1
with self.db.transaction() as c:
# Look up whether a user with the same name already exists.
c.execute('SELECT user_id FROM users WHERE username = ?', [username])
c.execute('SELECT user_id FROM users WHERE username = :username', {'username': username})
if c.fetchone() is not None:
raise ValueError(f'A user with the name \'{username}\' already exists.')
# Insert the user into the database.
@ -188,8 +187,8 @@ class MatematDatabase(object):
SELECT user_id, username, email, password, touchkey, is_admin, is_member, balance, receipt_pref,
logout_after_purchase
FROM users
WHERE username = ?
''', [username])
WHERE username = :username
''', {'username': username})
row = c.fetchone()
if row is None:
raise AuthenticationError('User does not exist')
@ -221,8 +220,8 @@ class MatematDatabase(object):
with self.db.transaction() as c:
# Fetch the old password.
c.execute('''
SELECT password FROM users WHERE user_id = ?
''', [user.id])
SELECT password FROM users WHERE user_id = :user_id
''', {'user_id': user.id})
row = c.fetchone()
if row is None:
raise AuthenticationError('User does not exist in database.')
@ -251,8 +250,8 @@ class MatematDatabase(object):
with self.db.transaction() as c:
# Fetch the password.
c.execute('''
SELECT password FROM users WHERE user_id = ?
''', [user.id])
SELECT password FROM users WHERE user_id = :user_id
''', {'user_id': user.id})
row = c.fetchone()
if row is None:
raise AuthenticationError('User does not exist in database.')
@ -352,8 +351,8 @@ class MatematDatabase(object):
with self.db.transaction() as c:
c.execute('''
DELETE FROM users
WHERE user_id = ?
''', [user.id])
WHERE user_id = :user_id
''', {'user_id': user.id})
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
@ -367,34 +366,52 @@ class MatematDatabase(object):
products: List[Product] = []
with self.db.transaction(exclusive=False) as c:
for row in c.execute('''
SELECT product_id, name, price_member, price_non_member, custom_price, stock, stockable
SELECT product_id, name, price_member, price_non_member, custom_price, stock, stockable, ean
FROM products ORDER BY name
'''):
product_id, name, price_member, price_external, custom_price, stock, stockable = row
products.append(Product(product_id, name, price_member, price_external, custom_price, stockable, stock))
product_id, name, price_member, price_external, custom_price, stock, stockable, ean = row
products.append(
Product(product_id, name, price_member, price_external, custom_price, stockable, stock, ean))
return products
def get_product(self, pid: int) -> Product:
"""
Return a product identified by its product ID.
:param pid: The products's ID.
:param pid: The product's ID.
"""
with self.db.transaction(exclusive=False) as c:
# Fetch all values to construct the product
c.execute('''
SELECT product_id, name, price_member, price_non_member, custom_price, stock, stockable
SELECT product_id, name, price_member, price_non_member, custom_price, stock, stockable, ean
FROM products
WHERE product_id = ?''',
[pid])
WHERE product_id = :product_id''', {'product_id': pid})
row = c.fetchone()
if row is None:
raise ValueError(f'No product with product ID {pid} exists.')
# Unpack the row and construct the product
product_id, name, price_member, price_non_member, custom_price, stock, stockable = row
return Product(product_id, name, price_member, price_non_member, custom_price, stockable, stock)
product_id, name, price_member, price_non_member, custom_price, stock, stockable, ean = row
return Product(product_id, name, price_member, price_non_member, custom_price, stockable, stock, ean)
def get_product_by_ean(self, ean: str) -> Product:
"""
Return a product identified by its EAN code.
:param ean: The product's EAN code.
"""
with self.db.transaction(exclusive=False) as c:
# Fetch all values to construct the product
c.execute('''
SELECT product_id, name, price_member, price_non_member, custom_price, stock, stockable, ean
FROM products
WHERE ean = :ean''', {'ean': ean})
row = c.fetchone()
if row is None:
raise ValueError(f'No product with EAN code {ean} exists.')
# Unpack the row and construct the product
product_id, name, price_member, price_non_member, custom_price, stock, stockable, ean = row
return Product(product_id, name, price_member, price_non_member, custom_price, stockable, stock, ean)
def create_product(self, name: str, price_member: int, price_non_member: int, custom_price:
bool, stockable: bool) -> Product:
bool, stockable: bool, ean: str) -> Product:
"""
Creates a new product.
:param name: Name of the product.
@ -407,22 +424,23 @@ class MatematDatabase(object):
"""
product_id: int = -1
with self.db.transaction() as c:
c.execute('SELECT product_id FROM products WHERE name = ?', [name])
c.execute('SELECT product_id FROM products WHERE name = :name', {'name': name})
if c.fetchone() is not None:
raise ValueError(f'A product with the name \'{name}\' already exists.')
c.execute('''
INSERT INTO products (name, price_member, price_non_member, custom_price, stock, stockable)
VALUES (:name, :price_member, :price_non_member, :custom_price, 0, :stockable)
INSERT INTO products (name, price_member, price_non_member, custom_price, stock, stockable, ean)
VALUES (:name, :price_member, :price_non_member, :custom_price, 0, :stockable, :ean)
''', {
'name': name,
'price_member': price_member,
'price_non_member': price_non_member,
'custom_price': custom_price,
'stockable': stockable
'stockable': stockable,
'ean': ean,
})
c.execute('SELECT last_insert_rowid()')
product_id = int(c.fetchone()[0])
return Product(product_id, name, price_member, price_non_member, custom_price, stockable, 0)
return Product(product_id, name, price_member, price_non_member, custom_price, stockable, 0, ean)
def change_product(self, product: Product, **kwargs) -> None:
"""
@ -441,6 +459,7 @@ class MatematDatabase(object):
custom_price: int = kwargs['custom_price'] if 'custom_price' in kwargs else product.custom_price
stock: int = kwargs['stock'] if 'stock' in kwargs else product.stock
stockable: bool = kwargs['stockable'] if 'stockable' in kwargs else product.stockable
ean: str = kwargs['ean'] if 'ean' in kwargs else product.ean
with self.db.transaction() as c:
c.execute('''
UPDATE products
@ -450,7 +469,8 @@ class MatematDatabase(object):
price_non_member = :price_non_member,
custom_price = :custom_price,
stock = :stock,
stockable = :stockable
stockable = :stockable,
ean = :ean
WHERE product_id = :product_id
''', {
'product_id': product.id,
@ -459,7 +479,8 @@ class MatematDatabase(object):
'price_non_member': price_non_member,
'custom_price': custom_price,
'stock': stock,
'stockable': stockable
'stockable': stockable,
'ean': ean
})
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
@ -472,6 +493,7 @@ class MatematDatabase(object):
product.custom_price = custom_price
product.stock = stock
product.stockable = stockable
product.ean = ean
def delete_product(self, product: Product) -> None:
"""
@ -482,8 +504,8 @@ class MatematDatabase(object):
with self.db.transaction() as c:
c.execute('''
DELETE FROM products
WHERE product_id = ?
''', [product.id])
WHERE product_id = :product_id
''', {'product_id': product.id})
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
@ -543,8 +565,7 @@ class MatematDatabase(object):
if amount < 0:
raise ValueError('Cannot deposit a negative value')
with self.db.transaction() as c:
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''',
[user.id])
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''', {'user_id': user.id})
row = c.fetchone()
if row is None:
raise DatabaseConsistencyError(f'No such user: {user.id}')
@ -588,8 +609,7 @@ class MatematDatabase(object):
raise ValueError('Cannot transfer a negative value')
with self.db.transaction() as c:
# First, remove amount from the source user's account
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''',
[source.id])
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''', {'user_id': source.id})
row = c.fetchone()
if row is None:
raise DatabaseConsistencyError(f'No such user: {source.id}')
@ -621,8 +641,7 @@ class MatematDatabase(object):
if affected != 1:
raise DatabaseConsistencyError(f'transfer should affect 1 users row, but affected {affected}')
# Then, add the amount to the destination user's account
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''',
[dest.id])
c.execute('''SELECT balance FROM users WHERE user_id = :user_id''', {'user_id': dest.id})
row = c.fetchone()
if row is None:
raise DatabaseConsistencyError(f'No such user: {dest.id}')
@ -669,11 +688,11 @@ class MatematDatabase(object):
LEFT JOIN receipts AS r
ON r.user_id = u.user_id
WHERE u.user_id = :user_id
''', [user.id])
''', {'user_id': user.id})
last_receipt: datetime = datetime.fromtimestamp(c.fetchone()[0])
last_receipt: datetime = datetime.fromtimestamp(c.fetchone()[0], UTC)
next_receipt_due: datetime = user.receipt_pref.next_receipt_due(last_receipt)
return datetime.utcnow() > next_receipt_due
return datetime.now(UTC) > next_receipt_due
def create_receipt(self, user: User, write: bool = False) -> Receipt:
transactions: List[Transaction] = []
@ -684,12 +703,12 @@ class MatematDatabase(object):
LEFT JOIN receipts AS r
ON r.user_id = u.user_id
WHERE u.user_id = :user_id
''', [user.id])
''', {'user_id': user.id})
row = cursor.fetchone()
if row is None:
raise DatabaseConsistencyError(f'No such user: {user.id}')
fromdate, min_id = row
created: datetime = datetime.fromtimestamp(fromdate)
created: datetime = datetime.fromtimestamp(fromdate, UTC)
cursor.execute('''
SELECT
t.ta_id, t.value, t.old_balance, COALESCE(t.date, 0),
@ -712,13 +731,15 @@ class MatematDatabase(object):
for row in rows:
ta_id, value, old_balance, date, c, d, m, c_prod, m_agent, m_reason = row
if c == ta_id:
t: Transaction = Consumption(ta_id, user, value, old_balance, datetime.fromtimestamp(date), c_prod)
t: Transaction = Consumption(ta_id, user, value, old_balance,
datetime.fromtimestamp(date, UTC), c_prod)
elif d == ta_id:
t = Deposit(ta_id, user, value, old_balance, datetime.fromtimestamp(date))
t = Deposit(ta_id, user, value, old_balance, datetime.fromtimestamp(date, UTC))
elif m == ta_id:
t = Modification(ta_id, user, value, old_balance, datetime.fromtimestamp(date), m_agent, m_reason)
t = Modification(ta_id, user, value, old_balance,
datetime.fromtimestamp(date, UTC), m_agent, m_reason)
else:
t = Transaction(ta_id, user, value, old_balance, datetime.fromtimestamp(date))
t = Transaction(ta_id, user, value, old_balance, datetime.fromtimestamp(date, UTC))
transactions.append(t)
if write:
cursor.execute('''
@ -733,7 +754,7 @@ class MatematDatabase(object):
receipt_id: int = int(cursor.fetchone()[0])
else:
receipt_id = -1
receipt = Receipt(receipt_id, transactions, user, created, datetime.utcnow())
receipt = Receipt(receipt_id, transactions, user, created, datetime.now(UTC))
return receipt
def generate_sales_statistics(self, from_date: datetime, to_date: datetime) -> Dict[str, Any]:
@ -775,7 +796,7 @@ class MatematDatabase(object):
LIMIT 1
), u.balance)
FROM users AS u
''', [to_date.timestamp()])
''', {'to_date': to_date.timestamp()})
for balance, in c.fetchall():
if balance > 0:
positive_balance += balance

View file

@ -284,3 +284,14 @@ def migrate_schema_6_to_7(c: sqlite3.Cursor):
ALTER TABLE users ADD COLUMN
logout_after_purchase INTEGER(1) DEFAULT 0;
''')
def migrate_schema_7_to_8(c: sqlite3.Cursor):
# Add ean column
c.execute('''
ALTER TABLE products ADD COLUMN ean TEXT DEFAULT NULL
''')
# Make ean column unique
c.execute('''
CREATE UNIQUE INDEX _matemat_products_ean_unique ON products(ean)
''')

View file

@ -11,11 +11,12 @@ class Product:
:param custom_price: If true, the user can choose the price to pay, but at least the regular price.
:param stock: The number of items of this product currently in stock, or None if not stockable.
:param stockable: Whether this product is stockable.
:param ean: The product's EAN code. May be None.
"""
def __init__(self, _id: int, name: str,
price_member: int, price_non_member: int, custom_price: bool,
stockable: bool, stock: int) -> None:
stockable: bool, stock: int, ean: str) -> None:
self.id: int = _id
self.name: str = name
self.price_member: int = price_member
@ -23,6 +24,7 @@ class Product:
self.custom_price: bool = custom_price
self.stock: int = stock
self.stockable: bool = stockable
self.ean: str = ean
def __eq__(self, other) -> bool:
if not isinstance(other, Product):
@ -33,8 +35,9 @@ class Product:
self.price_non_member == other.price_non_member and \
self.custom_price == other.custom_price and \
self.stock == other.stock and \
self.stockable == other.stockable
self.stockable == other.stockable and \
self.ean == other.ean
def __hash__(self) -> int:
return hash((self.id, self.name, self.price_member, self.price_non_member, self.custom_price,
self.stock, self.stockable))
self.stock, self.stockable, self.ean))

View file

@ -27,7 +27,7 @@ class Transaction:
@property
def receipt_date(self) -> str:
if self.date == datetime.fromtimestamp(0):
if self.date == datetime.fromtimestamp(0, UTC):
return '<unknown> '
date: str = self.date.strftime('%d.%m.%Y, %H:%M')
return date

View file

@ -494,3 +494,85 @@ SCHEMAS[7] = [
ON DELETE SET NULL ON UPDATE CASCADE
);
''']
SCHEMAS[8] = [
'''
CREATE TABLE users (
user_id INTEGER PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT DEFAULT NULL,
password TEXT NOT NULL,
touchkey TEXT DEFAULT NULL,
is_admin INTEGER(1) NOT NULL DEFAULT 0,
is_member INTEGER(1) NOT NULL DEFAULT 1,
balance INTEGER(8) NOT NULL DEFAULT 0,
lastchange INTEGER(8) NOT NULL DEFAULT 0,
receipt_pref INTEGER(1) NOT NULL DEFAULT 0,
created INTEGER(8) NOT NULL DEFAULT 0,
logout_after_purchase INTEGER(1) DEFAULT 0
);
''',
'''
CREATE TABLE products (
product_id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
stock INTEGER(8) DEFAULT 0,
stockable INTEGER(1) DEFAULT 1,
price_member INTEGER(8) NOT NULL,
price_non_member INTEGER(8) NOT NULL,
custom_price INTEGER(1) DEFAULT 0,
ean TEXT UNIQUE DEFAULT NULL
);
''',
'''
CREATE TABLE transactions ( -- "superclass" of the following 3 tables
ta_id INTEGER PRIMARY KEY,
user_id INTEGER DEFAULT NULL,
value INTEGER(8) NOT NULL,
old_balance INTEGER(8) NOT NULL,
date INTEGER(8) DEFAULT (STRFTIME('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE SET NULL ON UPDATE CASCADE
);
''',
'''
CREATE TABLE consumptions ( -- transactions involving buying a product
ta_id INTEGER PRIMARY KEY,
product TEXT NOT NULL,
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''',
'''
CREATE TABLE deposits ( -- transactions involving depositing cash
ta_id INTEGER PRIMARY KEY,
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''',
'''
CREATE TABLE modifications ( -- transactions involving balance modification by an admin
ta_id INTEGER NOT NULL,
agent TEXT NOT NULL,
reason TEXT DEFAULT NULL,
PRIMARY KEY (ta_id),
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''',
'''
CREATE TABLE receipts ( -- receipts sent to the users
receipt_id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
first_ta_id INTEGER DEFAULT NULL,
last_ta_id INTEGER DEFAULT NULL,
date INTEGER(8) DEFAULT (STRFTIME('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (first_ta_id) REFERENCES transactions(ta_id)
ON DELETE SET NULL ON UPDATE CASCADE,
FOREIGN KEY (last_ta_id) REFERENCES transactions(ta_id)
ON DELETE SET NULL ON UPDATE CASCADE
);
''']

View file

@ -2,7 +2,7 @@
import unittest
import crypt
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
from matemat.db import MatematDatabase
from matemat.db.primitives import User, Product, ReceiptPreference, Receipt, \
@ -220,7 +220,7 @@ class DatabaseTest(unittest.TestCase):
def test_create_product(self) -> None:
with self.db as db:
with db.transaction() as c:
db.create_product('Club Mate', 200, 200, True, True)
db.create_product('Club Mate', 200, 200, True, True, '4029764001807')
c.execute("SELECT * FROM products")
row = c.fetchone()
self.assertEqual('Club Mate', row[1])
@ -229,18 +229,20 @@ class DatabaseTest(unittest.TestCase):
self.assertEqual(200, row[4])
self.assertEqual(200, row[5])
self.assertEqual(1, row[6])
self.assertEqual('4029764001807', row[7])
with self.assertRaises(ValueError):
db.create_product('Club Mate', 250, 250, False, False)
db.create_product('Club Mate', 250, 250, False, False, '4029764001807')
def test_get_product(self) -> None:
with self.db as db:
with db.transaction(exclusive=False):
created = db.create_product('Club Mate', 150, 250, False, False)
created = db.create_product('Club Mate', 150, 250, False, False, '4029764001807')
product = db.get_product(created.id)
self.assertEqual('Club Mate', product.name)
self.assertEqual(150, product.price_member)
self.assertEqual(250, product.price_non_member)
self.assertEqual(False, product.stockable)
self.assertEqual('4029764001807', product.ean)
with self.assertRaises(ValueError):
db.get_product(-1)
@ -249,9 +251,9 @@ class DatabaseTest(unittest.TestCase):
# Test empty list
products = db.list_products()
self.assertEqual(0, len(products))
db.create_product('Club Mate', 200, 200, False, True)
db.create_product('Flora Power Mate', 200, 200, False, False)
db.create_product('Fritz Mate', 200, 250, False, True)
db.create_product('Club Mate', 200, 200, False, True, '4029764001807')
db.create_product('Flora Power Mate', 200, 200, False, False, None)
db.create_product('Fritz Mate', 200, 250, False, True, '4260107223177')
products = db.list_products()
self.assertEqual(3, len(products))
productcheck = {}
@ -260,22 +262,25 @@ class DatabaseTest(unittest.TestCase):
self.assertEqual(200, product.price_member)
self.assertEqual(200, product.price_non_member)
self.assertTrue(product.stockable)
self.assertEqual('4029764001807', product.ean)
elif product.name == 'Flora Power Mate':
self.assertEqual(200, product.price_member)
self.assertEqual(200, product.price_non_member)
self.assertFalse(product.stockable)
self.assertEqual(None, product.ean)
elif product.name == 'Fritz Mate':
self.assertEqual(200, product.price_member)
self.assertEqual(250, product.price_non_member)
self.assertTrue(product.stockable)
self.assertEqual('4260107223177', product.ean)
productcheck[product.id] = 1
self.assertEqual(3, len(productcheck))
def test_change_product(self) -> None:
with self.db as db:
product = db.create_product('Club Mate', 200, 200, False, True)
product = db.create_product('Club Mate', 200, 200, False, True, '4029764001807')
db.change_product(product, name='Flora Power Mate', price_member=150, price_non_member=250,
custom_price=True, stock=None, stockable=False)
custom_price=True, stock=None, stockable=False, ean=None)
# Changes must be reflected in the passed object
self.assertEqual('Flora Power Mate', product.name)
self.assertEqual(150, product.price_member)
@ -283,6 +288,7 @@ class DatabaseTest(unittest.TestCase):
self.assertEqual(True, product.custom_price)
self.assertEqual(None, product.stock)
self.assertEqual(False, product.stockable)
self.assertEqual(None, product.ean)
# Changes must be reflected in the database
checkproduct = db.get_product(product.id)
self.assertEqual('Flora Power Mate', checkproduct.name)
@ -294,7 +300,7 @@ class DatabaseTest(unittest.TestCase):
product.id = -1
with self.assertRaises(DatabaseConsistencyError):
db.change_product(product)
product2 = db.create_product('Club Mate', 200, 200, False, True)
product2 = db.create_product('Club Mate', 200, 200, False, True, '4029764001807')
product2.name = 'Flora Power Mate'
with self.assertRaises(DatabaseConsistencyError):
# Should fail, as a product with the same name already exists.
@ -302,8 +308,8 @@ class DatabaseTest(unittest.TestCase):
def test_delete_product(self) -> None:
with self.db as db:
product = db.create_product('Club Mate', 200, 200, False, True)
product2 = db.create_product('Flora Power Mate', 200, 200, False, False)
product = db.create_product('Club Mate', 200, 200, False, True, '4029764001807')
product2 = db.create_product('Flora Power Mate', 200, 200, False, False, None)
self.assertEqual(2, len(db.list_products()))
db.delete_product(product)
@ -378,9 +384,9 @@ class DatabaseTest(unittest.TestCase):
db.deposit(user1, 1337)
db.deposit(user2, 4242)
db.deposit(user3, 1234)
clubmate = db.create_product('Club Mate', 200, 200, False, True)
florapowermate = db.create_product('Flora Power Mate', 150, 250, False, True)
fritzmate = db.create_product('Fritz Mate', 200, 200, False, True)
clubmate = db.create_product('Club Mate', 200, 200, False, True, '4029764001807')
florapowermate = db.create_product('Flora Power Mate', 150, 250, False, True, None)
fritzmate = db.create_product('Fritz Mate', 200, 200, False, True, '4260107223177')
# user1 is somewhat addicted to caffeine
for _ in range(3):
@ -430,10 +436,10 @@ class DatabaseTest(unittest.TestCase):
user7 = db.create_user('user7', 'supersecurepassword', 'user7@example.com', True, True)
user7.receipt_pref = 42
twoyears: int = int((datetime.utcnow() - timedelta(days=730)).timestamp())
halfyear: int = int((datetime.utcnow() - timedelta(days=183)).timestamp())
twomonths: int = int((datetime.utcnow() - timedelta(days=61)).timestamp())
halfmonth: int = int((datetime.utcnow() - timedelta(days=15)).timestamp())
twoyears: int = int((datetime.now(UTC) - timedelta(days=730)).timestamp())
halfyear: int = int((datetime.now(UTC) - timedelta(days=183)).timestamp())
twomonths: int = int((datetime.now(UTC) - timedelta(days=61)).timestamp())
halfmonth: int = int((datetime.now(UTC) - timedelta(days=15)).timestamp())
with db.transaction() as c:
# Fix creation date for user2
@ -506,7 +512,7 @@ class DatabaseTest(unittest.TestCase):
admin: User = db.create_user('admin', 'supersecurepassword', 'admin@example.com', True, True)
user: User = db.create_user('user', 'supersecurepassword', 'user@example.com', True, True)
product: Product = db.create_product('Flora Power Mate', 200, 200, False, True)
product: Product = db.create_product('Flora Power Mate', 200, 200, False, True, None)
# Create some transactions
db.change_user(user, agent=admin,
@ -533,7 +539,7 @@ class DatabaseTest(unittest.TestCase):
SELECT user_id, 500, balance
FROM users
WHERE user_id = :id
''', [user.id])
''', {'id': user.id})
receipt3: Receipt = db.create_receipt(user, write=False)
with db.transaction() as c:
@ -595,8 +601,8 @@ class DatabaseTest(unittest.TestCase):
user2: User = db.create_user('user2', 'supersecurepassword', 'user2@example.com', True, False)
user3: User = db.create_user('user3', 'supersecurepassword', 'user3@example.com', True, False)
user4: User = db.create_user('user4', 'supersecurepassword', 'user4@example.com', True, False)
flora: Product = db.create_product('Flora Power Mate', 200, 200, False, True)
club: Product = db.create_product('Club Mate', 200, 200, False, False)
flora: Product = db.create_product('Flora Power Mate', 200, 200, False, True, None)
club: Product = db.create_product('Club Mate', 200, 200, False, False, '4029764001807')
# Create some transactions
db.deposit(user1, 1337)
@ -610,7 +616,7 @@ class DatabaseTest(unittest.TestCase):
db.increment_consumption(user4, club)
# Generate statistics
now = datetime.utcnow()
now = datetime.now(UTC)
stats = db.generate_sales_statistics(now - timedelta(days=1), now + timedelta(days=1))
self.assertEqual(7, len(stats))

View file

@ -40,7 +40,7 @@ class DatabaseTransaction(object):
class DatabaseWrapper(object):
SCHEMA_VERSION = 7
SCHEMA_VERSION = 8
def __init__(self, filename: str) -> None:
self._filename: str = filename
@ -91,6 +91,8 @@ class DatabaseWrapper(object):
migrate_schema_5_to_6(c)
if from_version <= 6 and to_version >= 7:
migrate_schema_6_to_7(c)
if from_version <= 7 and to_version >= 8:
migrate_schema_7_to_8(c)
def connect(self) -> None:
if self.is_connected():

View file

@ -28,5 +28,5 @@ def add_months(d: datetime, months: int) -> datetime:
# Set the day of month temporarily to 1, then add the day offset to reach the 1st of the target month
newdate: datetime = d.replace(day=1) + timedelta(days=days)
# Re-set the day of month to the intended value, but capped by the max. day in the target month
newdate = newdate.replace(day=min(d.day, calendar.monthrange(newdate.year, newdate.month)[1]))
newdate = newdate.replace(day=min(d.day, calendar.monthrange(newdate.year, newdate.month)[1]), tzinfo=d.tzinfo)
return newdate

View file

@ -1,5 +1,5 @@
import os
from datetime import datetime
from datetime import datetime, UTC
from io import BytesIO
from shutil import copyfile
@ -48,7 +48,7 @@ def admin():
users = db.list_users()
products = db.list_products()
# Render the "Admin/Settings" page
now = str(int(datetime.utcnow().timestamp()))
now = str(int(datetime.now(UTC).timestamp()))
return template.render('admin.html',
authuser=user, authlevel=authlevel, users=users, products=products,
receipt_preference_class=ReceiptPreference, now=now,
@ -206,8 +206,9 @@ def handle_admin_change(args: FormsDict, files: FormsDict, db: MatematDatabase):
price_non_member = parse_chf(str(args.pricenonmember))
custom_price = 'custom_price' in args
stockable = 'stockable' in args
ean = str(args.ean) or None
# Create the user in the database
newproduct = db.create_product(name, price_member, price_non_member, custom_price, stockable)
newproduct = db.create_product(name, price_member, price_non_member, custom_price, stockable, ean)
# If a new product image was uploaded, process it
image = files.image.file.read() if 'image' in files else None
if image is not None and len(image) > 0:

View file

@ -26,6 +26,7 @@ def buy():
if 'pid' in request.params:
pid = int(str(request.params.pid))
product = db.get_product(pid)
closetab = int(str(request.params.closetab) or 0)
if c.get_dispenser().dispense(product, 1):
price = product.price_member if user.is_member else product.price_non_member
if 'price' in request.params:
@ -37,7 +38,7 @@ def buy():
stock_provider.update_stock(product, -1)
# Logout user if configured, logged in via touchkey and no price entry input was shown
if user.logout_after_purchase and authlevel < 2 and not product.custom_price:
redirect(f'/logout?lastaction=buy&lastproduct={pid}&lastprice={price}')
redirect(f'/logout?lastaction=buy&lastproduct={pid}&lastprice={price}&closetab={closetab}')
# Redirect to the main page (where this request should have come from)
redirect(f'/?lastaction=buy&lastproduct={pid}&lastprice={price}')
redirect(f'/?lastaction=buy&lastproduct={pid}&lastprice={price}&closetab={closetab}')
redirect('/')

View file

@ -1,10 +1,12 @@
from datetime import datetime
from datetime import datetime, UTC
from bottle import route, redirect, request
from matemat.db import MatematDatabase
from matemat.webserver import template, session
from matemat.webserver.template import Notification
from matemat.webserver.config import get_app_config, get_stock_provider
from matemat.util.currency_format import format_chf
@route('/')
@ -14,27 +16,42 @@ def main_page():
"""
config = get_app_config()
session_id: str = session.start()
now = str(int(datetime.utcnow().timestamp()))
now = str(int(datetime.now(UTC).timestamp()))
with MatematDatabase(config['DatabaseFile']) as db:
# Fetch the list of products to display
products = db.list_products()
if request.params.lastproduct:
lastproduct = db.get_product(request.params.lastproduct)
else:
lastproduct = None
closetab = int(str(request.params.closetab) or 0)
lastprice = int(request.params.lastprice) if request.params.lastprice else None
if request.params.lastaction == 'deposit' and lastprice:
Notification.success(f'Deposited {format_chf(lastprice)}', decay=True)
elif request.params.lastaction == 'buy' and lastprice and request.params.lastproduct:
lastproduct = db.get_product(request.params.lastproduct)
Notification.success(f'Purchased {lastproduct.name} for {format_chf(lastprice)}', decay=True)
buyproduct = None
if request.params.ean:
try:
buyproduct = db.get_product_by_ean(request.params.ean)
Notification.success(
f'Login will purchase <strong>{buyproduct.name}</strong>. Click <a href="/">here</a> to abort.')
except ValueError:
Notification.error(f'EAN code {request.params.ean} is not associated with any product.', decay=True)
# Check whether a user is logged in
if session.has(session_id, 'authenticated_user'):
# Fetch the user id and authentication level (touchkey vs password) from the session storage
uid: int = session.get(session_id, 'authenticated_user')
authlevel: int = session.get(session_id, 'authentication_level')
# If an EAN code was scanned, directly trigger the purchase
if buyproduct:
url = f'/buy?pid={buyproduct.id}'
if config.get('CloseTabAfterEANPurchase', '0') == '1':
url += '&closetab=1'
redirect(url)
# Fetch the user object from the database (for name display, price calculation and admin check)
users = db.list_users()
user = db.get_user(uid)
# Prepare a response with a jinja2 template
return template.render('productlist.html',
authuser=user, users=users, products=products, authlevel=authlevel,
lastaction=request.params.lastaction, lastprice=lastprice, lastproduct=lastproduct,
stock=get_stock_provider(), setupname=config['InstanceName'], now=now)
else:
# If there are no admin users registered, jump to the admin creation procedure
@ -45,4 +62,4 @@ def main_page():
return template.render('userlist.html',
users=users, setupname=config['InstanceName'], now=now,
signup=(config.get('SignupEnabled', '0') == '1'),
lastaction=request.params.lastaction, lastprice=lastprice, lastproduct=lastproduct)
buyproduct=buyproduct, closetab=closetab)

View file

@ -1,6 +1,6 @@
import os
from io import BytesIO
from datetime import datetime
from datetime import datetime, UTC
from typing import Dict
import magic
@ -56,7 +56,7 @@ def modproduct():
redirect('/admin')
# Render the "Modify Product" page
now = str(int(datetime.utcnow().timestamp()))
now = str(int(datetime.now(UTC).timestamp()))
return template.render('modproduct.html',
authuser=authuser, product=product, authlevel=authlevel,
setupname=config['InstanceName'], now=now)
@ -98,11 +98,12 @@ def handle_change(args: FormsDict, files: FormsDict, product: Product, db: Matem
custom_price = 'custom_price' in args
stock = int(str(args.stock))
stockable = 'stockable' in args
ean = str(args.ean) or None
# Attempt to write the changes to the database
try:
db.change_product(product,
name=name, price_member=price_member, price_non_member=price_non_member,
custom_price=custom_price, stock=stock, stockable=stockable)
custom_price=custom_price, stock=stock, stockable=stockable, ean=ean)
stock_provider = get_stock_provider()
if stock_provider.needs_update() and product.stockable:
stock_provider.set_stock(product, stock)

View file

@ -1,5 +1,5 @@
import os
from datetime import datetime
from datetime import datetime, UTC
from io import BytesIO
from typing import Dict, Optional
@ -56,7 +56,7 @@ def moduser():
redirect('/admin')
# Render the "Modify User" page
now = str(int(datetime.utcnow().timestamp()))
now = str(int(datetime.now(UTC).timestamp()))
return template.render('moduser.html',
authuser=authuser, user=user, authlevel=authlevel, now=now,
receipt_preference_class=ReceiptPreference,

View file

@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
from math import pi, sin, cos
from typing import Any, Dict, List, Tuple
@ -34,7 +34,7 @@ def statistics():
# Show a 403 Forbidden error page if the user is not an admin
abort(403)
todate: datetime = datetime.utcnow()
todate: datetime = datetime.now(UTC)
fromdate: datetime = (todate - timedelta(days=365)).replace(hour=0, minute=0, second=0, microsecond=0)
if 'fromdate' in request.params:
fdarg: str = str(request.params.fromdate)

View file

@ -4,6 +4,7 @@ from matemat.db import MatematDatabase
from matemat.db.primitives import User
from matemat.exceptions import AuthenticationError
from matemat.webserver import template, session
from matemat.webserver.template import Notification
from matemat.webserver.config import get_app_config
@ -16,14 +17,24 @@ def touchkey_page():
"""
config = get_app_config()
session_id: str = session.start()
with MatematDatabase(config['DatabaseFile']) as db:
# If a user is already logged in, simply redirect to the main page, showing the product list
if session.has(session_id, 'authenticated_user'):
redirect('/')
# If requested via HTTP GET, render the login page showing the touchkey UI
if request.method == 'GET':
buypid = None
if request.params.buypid:
buypid = str(request.params.buypid)
try:
buyproduct = db.get_product(int(buypid))
Notification.success(
f'Login will purchase <strong>{buyproduct.name}</strong>. Click <a href="/">here</a> to abort.')
except ValueError:
Notification.error(f'No product with id {buypid}', decay=True)
return template.render('touchkey.html',
username=str(request.params.username), uid=int(str(request.params.uid)),
setupname=config['InstanceName'])
setupname=config['InstanceName'], buypid=buypid)
# If requested via HTTP POST, read the request arguments and attempt to log in with the provided credentials
elif request.method == 'POST':
# Connect to the database
@ -38,6 +49,12 @@ def touchkey_page():
session.put(session_id, 'authenticated_user', user.id)
# Set the authlevel session variable (0 = none, 1 = touchkey, 2 = password login)
session.put(session_id, 'authentication_level', 1)
if request.params.buypid:
buypid = str(request.params.buypid)
url = f'/buy?pid={buypid}'
if config.get('CloseTabAfterEANPurchase', '0') == '1':
url += '&closetab=1'
redirect(url)
# Redirect to the main page, showing the product list
redirect('/')
# If neither GET nor POST was used, show a 405 Method Not Allowed error page

View file

@ -3,7 +3,7 @@ from typing import Any, Dict, Tuple, Optional
from bottle import request, response
from secrets import token_bytes
from uuid import uuid4
from datetime import datetime, timedelta
from datetime import datetime, timedelta, UTC
__key: Optional[str] = token_bytes(32)
@ -21,7 +21,7 @@ def start() -> str:
:return: The session ID.
"""
# Reference date for session timeout
now = datetime.utcnow()
now = datetime.now(UTC)
# Read the client's session ID, if any
session_id = request.get_cookie(_COOKIE_NAME, secret=__key)
# If there is no active session, create a new session ID

View file

@ -1,2 +1,3 @@
from .notification import Notification
from .template import init, render

View file

@ -0,0 +1,25 @@
class Notification:
notifications = []
def __init__(self, msg: str, classes=None, decay: bool = False):
self.msg = msg
self.classes = []
self.classes.extend(classes)
if decay:
self.classes.append('decay')
@classmethod
def render(cls):
n = list(cls.notifications)
cls.notifications.clear()
return n
@classmethod
def success(cls, msg: str, decay: bool = False):
cls.notifications.append(cls(msg, classes=['success'], decay=decay))
@classmethod
def error(cls, msg: str, decay: bool = False):
cls.notifications.append(cls(msg, classes=['error'], decay=decay))

View file

@ -5,6 +5,7 @@ import jinja2
from matemat import __version__
from matemat.util.currency_format import format_chf
from matemat.webserver.template import Notification
__jinja_env: jinja2.Environment = None
@ -22,4 +23,8 @@ def init(config: Dict[str, Any]) -> None:
def render(name: str, **kwargs):
global __jinja_env
template: jinja2.Template = __jinja_env.get_template(name)
return template.render(__version__=__version__, **kwargs).encode('utf-8')
return template.render(
__version__=__version__,
notifications=Notification.render(),
**kwargs
).encode('utf-8')

View file

@ -37,6 +37,11 @@ InstanceName=Matemat
#SignupEnabled=1
#SignupKioskMode= ::1, ::ffff:127.0.0.0/8, 127.0.0.0/8
#
# Close tabs after completing an EAN code scan based purchase.
# This only works in Firefox with dom.allow_scripts_to_close_windows=true
#
#CloseTabAfterEANPurchase=1
# Add static HTTP headers in this section
# [HttpHeaders]

View file

@ -52,6 +52,25 @@ nav div {
padding: 10px;
}
.notification {
display: block;
width: calc(100% - 36px);
margin: 10px;
padding: 10px;
}
.notification.success {
background-color: #c0ffc0;
}
.notification.error {
background-color: #ffc0c0;
}
.notification.decay {
animation: notificationdecay 0s 7s forwards;
}
@keyframes notificationdecay {
to { display: none; }
}
@media print {
footer {
position: fixed;
@ -298,37 +317,3 @@ div.osk-button.osk-button-space {
flex: 5 0 1px;
color: #606060;
}
aside#overlay {
position: absolute;
top: 0;
right: 0;
bottom: 0;
left: 0;
background: #88ff88;
text-align: center;
z-index: 1000;
padding: 5%;
font-family: sans-serif;
display: none;
transition: opacity 700ms;
opacity: 0;
}
aside#overlay.fade {
opacity: 100%;
}
aside#overlay > h2 {
font-size: 3em;
}
aside#overlay > img {
width: 30%;
height: auto;
}
aside#overlay > div.price {
padding-top: 30px;
font-size: 2em;
}

View file

@ -1,18 +0,0 @@
setTimeout(() => {
let overlay = document.getElementById('overlay');
if (overlay !== null) {
overlay.style.display = 'block';
setTimeout(() => {
overlay.classList.add('fade');
setTimeout(() => {
setTimeout(() => {
overlay.classList.remove('fade');
setTimeout(() => {
overlay.style.display = 'none';
}, 700);
}, 700);
}, 700);
}, 10);
}
}, 0);

View file

@ -46,6 +46,9 @@
<label for="admin-newproduct-name">Name: </label>
<input id="admin-newproduct-name" type="text" name="name" /><br/>
<label for="admin-newproduct-ean">EAN code: </label>
<input id="admin-newproduct-ean" type="text" name="ean" /><br/>
<label for="admin-newproduct-price-member">Member price: </label>
CHF <input id="admin-newproduct-price-member" type="number" step="0.01" name="pricemember" value="0" /><br/>

View file

@ -12,27 +12,6 @@
<body>
{% block overlay %}
{% if lastaction is defined and lastaction is not none %}
{% if lastaction == 'buy' %}
<aside id="overlay">
<h2>{{ lastproduct.name }}</h2>
<img src="/static/upload/thumbnails/products/{{ lastproduct.id }}.png?cacheBuster={{ now }}" alt="Picture of {{ lastproduct.name }}" draggable="false"/>
{% if lastprice is not none %}
<div class="price">{{ lastprice|chf }}</div>
{% endif %}
</aside>
{% elif lastaction == 'deposit' %}
<aside id="overlay">
<h2>Deposit</h2>
{% if lastprice is not none %}
<div class="price">{{ lastprice|chf }}</div>
{% endif %}
</aside>
{% endif %}
{% endif %}
{% endblock %}
<header>
{% block header %}
@ -55,6 +34,11 @@
</header>
<main>
{% block notifications %}
{% for n in notifications | default([]) %}
<div class="notification {{ n.classes | join(' ') }}">{{ n.msg|safe }}</div>
{% endfor %}
{% endblock %}
{% block main %}
{# Here be content. #}
{% endblock %}
@ -72,6 +56,5 @@
{% endblock %}
</footer>
<script src="/static/js/overlay.js"></script>
</body>
</html>

View file

@ -14,6 +14,9 @@
<label for="modproduct-name">Name: </label>
<input id="modproduct-name" type="text" name="name" value="{{ product.name }}" /><br/>
<label for="modproduct-ean">EAN code: </label>
<input id="modproduct-ean" type="text" name="ean" value="{{ product.ean or '' }}" /><br/>
<label for="modproduct-price-member">Member price: </label>
CHF <input id="modproduct-price-member" type="number" step="0.01" name="pricemember" value="{{ product.price_member|chf(False) }}" /><br/>

View file

@ -81,4 +81,9 @@
{{ super() }}
{% if closetab | default(0) %}
{# This only works in Firefox with dom.allow_scripts_to_close_windows=true #}
<script>setTimeout(window.close, 3000);</script>
{% endif %}
{% endblock %}

View file

@ -23,6 +23,9 @@
<input type="hidden" name="uid" value="{{ uid }}" />
<input type="hidden" name="username" value="{{ username }}" />
<input type="hidden" name="touchkey" value="" id="loginform-touchkey-value" />
{% if buypid %}
<input type="hidden" name="buypid" value="{{ buypid }}" />
{% endif %}
</form>
<a href="/">Cancel</a>

View file

@ -11,7 +11,7 @@
{% for user in users %}
{# Show an item per user, consisting of the username, and the avatar, linking to the touchkey login #}
<div class="thumblist-item">
<a href="/touchkey?uid={{ user.id }}&username={{ user.name }}">
<a href="/touchkey?uid={{ user.id }}&username={{ user.name }}{% if buyproduct %}&buypid={{ buyproduct.id }}{% endif %}">
<span class="thumblist-title">{{ user.name }}</span><br/>
<div class="imgcontainer">
<img src="/static/upload/thumbnails/users/{{ user.id }}.png?cacheBuster={{ now }}" alt="Avatar of {{ user.name }}" draggable="false"/>
@ -33,4 +33,9 @@
{{ super() }}
{% if closetab | default(0) %}
{# This only works in Firefox with dom.allow_scripts_to_close_windows=true #}
<script>setTimeout(window.close, 3000);</script>
{% endif %}
{% endblock %}