1
0
Fork 0
forked from s3lph/matemat

Merge branch 'staging' into python-3.7

This commit is contained in:
s3lph 2018-08-02 17:34:39 +02:00
commit 207f4f1531
52 changed files with 2457 additions and 244 deletions

2
.gitignore vendored
View file

@ -9,5 +9,5 @@
*.sqlite3 *.sqlite3
*.db *.db
**/matemat.conf
static/upload/ static/upload/
**/matemat.conf

View file

@ -1,20 +1,44 @@
--- ---
image: s3lph/matemat-ci:20180619-01 image: s3lph/matemat-ci:20180720-01
stages: stages:
- test - test
- codestyle - build
- staging
test: test:
stage: test stage: test
script: script:
- pip3 install -r requirements.txt - pip3 install -r requirements.txt
- python3-coverage run --branch -m unittest discover matemat - sudo -u matemat python3-coverage run --branch -m unittest discover matemat
- python3-coverage report -m --include 'matemat/*' --omit '*/test_*.py' - sudo -u matemat python3-coverage report -m --include 'matemat/*' --omit '*/test_*.py'
codestyle: codestyle:
stage: codestyle stage: test
script: script:
- pip3 install -r requirements.txt - pip3 install -r requirements.txt
- pycodestyle matemat - sudo -u matemat pycodestyle matemat
# - mypy --ignore-missing-imports --strict -p matemat # - sudo -u matemat mypy --ignore-missing-imports --strict -p matemat
build:
stage: build
script:
- docker build -t "registry.gitlab.com/s3lph/matemat:$(git rev-parse HEAD)" .
- docker tag "registry.gitlab.com/s3lph/matemat:$(git rev-parse HEAD)" registry.gitlab.com/s3lph/matemat:latest-staging
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_TOKEN registry.gitlab.com
- docker push "registry.gitlab.com/s3lph/matemat:$(git rev-parse HEAD)"
- docker push registry.gitlab.com/s3lph/matemat:latest-staging
only:
- staging
staging:
stage: staging
script:
- eval $(ssh-agent -s)
- ssh-add - <<<"$STAGING_SSH_PRIVATE_KEY"
- echo "$(git rev-parse HEAD)" | ssh -p 20022 -oStrictHostKeyChecking=no matemat@kernelpanic.lol
environment:
name: staging
url: https://matemat.kernelpanic.lol/
only:
- staging

View file

@ -1,10 +1,10 @@
FROM debian:buster FROM python:3.6-alpine
RUN useradd -d /home/matemat -m matemat RUN mkdir -p /var/matemat/db /var/matemat/upload
RUN apt-get update -qy RUN apk --update add libmagic
RUN apt-get install -y --no-install-recommends python3-dev python3-pip python3-coverage python3-setuptools build-essential ADD . /
RUN pip3 install wheel pycodestyle mypy RUN pip3 install -r /requirements.txt
WORKDIR /home/matemat EXPOSE 80/tcp
USER matemat CMD [ "/run.sh" ]

View file

@ -18,6 +18,7 @@ This project intends to provide a well-tested and maintainable alternative to
- Python 3 (>=3.7) - Python 3 (>=3.7)
- Python dependencies: - Python dependencies:
- file-magic
- jinja2 - jinja2
## Usage ## Usage

2
doc

@ -1 +1 @@
Subproject commit c68df9d86af1d8d0ebb6b6609efeef14f7103761 Subproject commit ece840af5b19d2c78b2dbfa14adac145fab79f4f

13
matemat.docker.conf Normal file
View file

@ -0,0 +1,13 @@
[Matemat]
Address=::
Port=80
StaticPath=/static
TemplatePath=/templates
LogTarget=stdout
[Pagelets]
UploadDir=/static/upload
DatabaseFile=/var/matemat/db/test.db

View file

@ -5,7 +5,7 @@ from typing import List, Optional, Any, Type
import crypt import crypt
from hmac import compare_digest from hmac import compare_digest
from matemat.primitives import User, Product from matemat.db.primitives import User, Product
from matemat.exceptions import AuthenticationError, DatabaseConsistencyError from matemat.exceptions import AuthenticationError, DatabaseConsistencyError
from matemat.db import DatabaseWrapper from matemat.db import DatabaseWrapper
from matemat.db.wrapper import Transaction from matemat.db.wrapper import Transaction
@ -70,14 +70,30 @@ class MatematDatabase(object):
users: List[User] = [] users: List[User] = []
with self.db.transaction(exclusive=False) as c: with self.db.transaction(exclusive=False) as c:
for row in c.execute(''' for row in c.execute('''
SELECT user_id, username, email, is_admin, is_member SELECT user_id, username, email, is_admin, is_member, balance
FROM users FROM users
'''): '''):
# Decompose each row and put the values into a User object # Decompose each row and put the values into a User object
user_id, username, email, is_admin, is_member = row user_id, username, email, is_admin, is_member, balance = row
users.append(User(user_id, username, email, is_admin, is_member)) users.append(User(user_id, username, balance, email, is_admin, is_member))
return users return users
def get_user(self, uid: int) -> User:
"""
Return a user identified by its user ID.
:param uid: The user's ID.
"""
with self.db.transaction(exclusive=False) as c:
# Fetch all values to construct the user
c.execute('SELECT user_id, username, email, is_admin, is_member, balance FROM users WHERE user_id = ?',
[uid])
row = c.fetchone()
if row is None:
raise ValueError(f'No user with user ID {uid} exists.')
# Unpack the row and construct the user
user_id, username, email, is_admin, is_member, balance = row
return User(user_id, username, balance, email, is_admin, is_member)
def create_user(self, def create_user(self,
username: str, username: str,
password: str, password: str,
@ -116,7 +132,7 @@ class MatematDatabase(object):
# Fetch the new user's rowid. # Fetch the new user's rowid.
c.execute('SELECT last_insert_rowid()') c.execute('SELECT last_insert_rowid()')
user_id = int(c.fetchone()[0]) user_id = int(c.fetchone()[0])
return User(user_id, username, email, admin, member) return User(user_id, username, 0, email, admin, member)
def login(self, username: str, password: Optional[str] = None, touchkey: Optional[str] = None) -> User: def login(self, username: str, password: Optional[str] = None, touchkey: Optional[str] = None) -> User:
""" """
@ -133,14 +149,14 @@ class MatematDatabase(object):
raise ValueError('Exactly one of password and touchkey must be provided') raise ValueError('Exactly one of password and touchkey must be provided')
with self.db.transaction(exclusive=False) as c: with self.db.transaction(exclusive=False) as c:
c.execute(''' c.execute('''
SELECT user_id, username, email, password, touchkey, is_admin, is_member SELECT user_id, username, email, password, touchkey, is_admin, is_member, balance
FROM users FROM users
WHERE username = ? WHERE username = ?
''', [username]) ''', [username])
row = c.fetchone() row = c.fetchone()
if row is None: if row is None:
raise AuthenticationError('User does not exist') raise AuthenticationError('User does not exist')
user_id, username, email, pwhash, tkhash, admin, member = row user_id, username, email, pwhash, tkhash, admin, member, balance = row
if password is not None and not compare_digest(crypt.crypt(password, pwhash), pwhash): if password is not None and not compare_digest(crypt.crypt(password, pwhash), pwhash):
raise AuthenticationError('Password mismatch') raise AuthenticationError('Password mismatch')
elif touchkey is not None \ elif touchkey is not None \
@ -149,7 +165,7 @@ class MatematDatabase(object):
raise AuthenticationError('Touchkey mismatch') raise AuthenticationError('Touchkey mismatch')
elif touchkey is not None and tkhash is None: elif touchkey is not None and tkhash is None:
raise AuthenticationError('Touchkey not set') raise AuthenticationError('Touchkey not set')
return User(user_id, username, email, admin, member) return User(user_id, username, balance, email, admin, member)
def change_password(self, user: User, oldpass: str, newpass: str, verify_password: bool = True) -> None: def change_password(self, user: User, oldpass: str, newpass: str, verify_password: bool = True) -> None:
""" """
@ -211,30 +227,73 @@ class MatematDatabase(object):
'tkhash': tkhash 'tkhash': tkhash
}) })
def change_user(self, user: User) -> None: def change_user(self, user: User, agent: Optional[User], **kwargs)\
-> None:
""" """
Write changes in the User object to the database. Commit changes to the user in the database. If writing the requested changes succeeded, the values are updated
:param user: The user object to update in the database. in the provided user object. Otherwise the user object is left untouched. The user to update is identified by
the ID field in the provided user object.
:param user: The user object to update and to identify the requested user by.
:param agent: The user that is performing the change. Must be present if the balance is changed.
:param kwargs: The properties to change.
:raises DatabaseConsistencyError: If the user represented by the object does not exist. :raises DatabaseConsistencyError: If the user represented by the object does not exist.
""" """
# Resolve the values to change
name: str = kwargs['name'] if 'name' in kwargs else user.name
email: str = kwargs['email'] if 'email' in kwargs else user.email
balance: int = kwargs['balance'] if 'balance' in kwargs else user.balance
is_admin: bool = kwargs['is_admin'] if 'is_admin' in kwargs else user.is_admin
is_member: bool = kwargs['is_member'] if 'is_member' in kwargs else user.is_member
with self.db.transaction() as c: with self.db.transaction() as c:
c.execute('SELECT balance FROM users WHERE user_id = :user_id', {'user_id': user.id})
row = c.fetchone()
if row is None:
raise DatabaseConsistencyError(f'User with ID {user.id} does not exist')
oldbalance: int = row[0]
if balance != oldbalance:
if agent is None:
raise ValueError('agent must not be None for a balance change')
c.execute('''
INSERT INTO transactions (user_id, value, old_balance)
VALUES (:user_id, :value, :old_balance)
''', {
'user_id': user.id,
'value': balance - oldbalance,
'old_balance': oldbalance
})
# TODO: Implement reason field
c.execute('''
INSERT INTO modifications (ta_id, agent_id, reason)
VALUES (last_insert_rowid(), :agent_id, NULL)
''', {'agent_id': agent.id})
c.execute(''' c.execute('''
UPDATE users SET UPDATE users SET
username = :username,
email = :email, email = :email,
balance = :balance,
is_admin = :is_admin, is_admin = :is_admin,
is_member = :is_member, is_member = :is_member,
lastchange = STRFTIME('%s', 'now') lastchange = STRFTIME('%s', 'now')
WHERE user_id = :user_id WHERE user_id = :user_id
''', { ''', {
'user_id': user.id, 'user_id': user.id,
'email': user.email, 'username': name,
'is_admin': user.is_admin, 'email': email,
'is_member': user.is_member 'balance': balance,
'is_admin': is_admin,
'is_member': is_member
}) })
affected = c.execute('SELECT changes()').fetchone()[0] affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1: if affected != 1:
raise DatabaseConsistencyError( raise DatabaseConsistencyError(
f'change_user should affect 1 users row, but affected {affected}') f'change_user should affect 1 users row, but affected {affected}')
# Only update the actual user object after the changes in the database succeeded
user.name = name
user.email = email
user.balance = balance
user.is_admin = is_admin
user.is_member = is_member
def delete_user(self, user: User) -> None: def delete_user(self, user: User) -> None:
""" """
@ -260,13 +319,32 @@ class MatematDatabase(object):
products: List[Product] = [] products: List[Product] = []
with self.db.transaction(exclusive=False) as c: with self.db.transaction(exclusive=False) as c:
for row in c.execute(''' for row in c.execute('''
SELECT product_id, name, price_member, price_non_member SELECT product_id, name, price_member, price_non_member, stock
FROM products FROM products
'''): '''):
product_id, name, price_member, price_external = row product_id, name, price_member, price_external, stock = row
products.append(Product(product_id, name, price_member, price_external)) products.append(Product(product_id, name, price_member, price_external, stock))
return products return products
def get_product(self, pid: int) -> Product:
"""
Return a product identified by its product ID.
:param pid: The products's ID.
"""
with self.db.transaction(exclusive=False) as c:
# Fetch all values to construct the product
c.execute('''
SELECT product_id, name, price_member, price_non_member, stock
FROM products
WHERE product_id = ?''',
[pid])
row = c.fetchone()
if row is None:
raise ValueError(f'No product with product ID {pid} exists.')
# Unpack the row and construct the product
product_id, name, price_member, price_non_member, stock = row
return Product(product_id, name, price_member, price_non_member, stock)
def create_product(self, name: str, price_member: int, price_non_member: int) -> Product: def create_product(self, name: str, price_member: int, price_non_member: int) -> Product:
""" """
Creates a new product. Creates a new product.
@ -291,27 +369,48 @@ class MatematDatabase(object):
}) })
c.execute('SELECT last_insert_rowid()') c.execute('SELECT last_insert_rowid()')
product_id = int(c.fetchone()[0]) product_id = int(c.fetchone()[0])
return Product(product_id, name, price_member, price_non_member) return Product(product_id, name, price_member, price_non_member, 0)
def change_product(self, product: Product) -> None: def change_product(self, product: Product, **kwargs) -> None:
"""
Commit changes to the product in the database. If writing the requested changes succeeded, the values are
updated in the provided product object. Otherwise the product object is left untouched. The product to update
is identified by the ID field in the provided product object.
:param product: The product object to update and to identify the requested product by.
:param kwargs: The properties to change.
:raises DatabaseConsistencyError: If the product represented by the object does not exist.
"""
# Resolve the values to change
name: str = kwargs['name'] if 'name' in kwargs else product.name
price_member: int = kwargs['price_member'] if 'price_member' in kwargs else product.price_member
price_non_member: int = kwargs['price_non_member'] if 'price_non_member' in kwargs else product.price_non_member
stock: int = kwargs['stock'] if 'stock' in kwargs else product.stock
with self.db.transaction() as c: with self.db.transaction() as c:
c.execute(''' c.execute('''
UPDATE products UPDATE products
SET SET
name = :name, name = :name,
price_member = :price_member, price_member = :price_member,
price_non_member = :price_non_member price_non_member = :price_non_member,
stock = :stock
WHERE product_id = :product_id WHERE product_id = :product_id
''', { ''', {
'product_id': product.id, 'product_id': product.id,
'name': product.name, 'name': name,
'price_member': product.price_member, 'price_member': price_member,
'price_non_member': product.price_non_member 'price_non_member': price_non_member,
'stock': stock
}) })
affected = c.execute('SELECT changes()').fetchone()[0] affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1: if affected != 1:
raise DatabaseConsistencyError( raise DatabaseConsistencyError(
f'change_product should affect 1 products row, but affected {affected}') f'change_product should affect 1 products row, but affected {affected}')
# Only update the actual product object after the changes in the database succeeded
product.name = name
product.price_member = price_member
product.price_non_member = price_non_member
product.stock = stock
def delete_product(self, product: Product) -> None: def delete_product(self, product: Product) -> None:
""" """
@ -329,60 +428,38 @@ class MatematDatabase(object):
raise DatabaseConsistencyError( raise DatabaseConsistencyError(
f'delete_product should affect 1 products row, but affected {affected}') f'delete_product should affect 1 products row, but affected {affected}')
def increment_consumption(self, user: User, product: Product, count: int = 1) -> None: def increment_consumption(self, user: User, product: Product) -> None:
""" """
Decrement the user's balance by the price of the product, decrement the products stock, and create an entry in Decrement the user's balance by the price of the product, decrement the products stock, and create an entry in
the statistics table. the statistics table.
:param user: The user buying a product. :param user: The user buying a product.
:param product: The product the user is buying. :param product: The product the user is buying.
:param count: How many units of the product the user is buying, defaults to 1.
:raises DatabaseConsistencyError: If the user or the product does not exist in the database. :raises DatabaseConsistencyError: If the user or the product does not exist in the database.
""" """
price: int = product.price_member if user.is_member else product.price_non_member
with self.db.transaction() as c: with self.db.transaction() as c:
# Retrieve the consumption entry for the (user, product) pair, if any.
c.execute(''' c.execute('''
SELECT count INSERT INTO transactions (user_id, value, old_balance)
FROM consumption VALUES (:user_id, :value, :old_balance)
WHERE user_id = :user_id
AND product_id = :product_id
''', { ''', {
'user_id': user.id, 'user_id': user.id,
'value': -price,
'old_balance': user.balance
})
c.execute('''
INSERT INTO consumptions (ta_id, product_id)
VALUES (last_insert_rowid(), :product_id)
''', {
'product_id': product.id 'product_id': product.id
}) })
row = c.fetchone() # Subtract the price from the user's account balance.
if row is None:
# If the entry does not exist, create a new one.
c.execute('''
INSERT INTO consumption (user_id, product_id, count)
VALUES (:user_id, :product_id, :count)
''', {
'user_id': user.id,
'product_id': product.id,
'count': count
})
else:
# If the entry exists, update the consumption count.
c.execute('''
UPDATE consumption
SET count = count + :count
WHERE user_id = :user_id AND product_id = :product_id
''', {
'user_id': user.id,
'product_id': product.id,
'count': count
})
# Make sure exactly one consumption row was updated/inserted.
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
f'increment_consumption should affect 1 consumption row, but affected {affected}')
# Compute the cost of the transaction and subtract it from the user's account balance.
c.execute(''' c.execute('''
UPDATE users UPDATE users
SET balance = balance - :cost SET balance = balance - :cost
WHERE user_id = :user_id''', { WHERE user_id = :user_id''', {
'user_id': user.id, 'user_id': user.id,
'cost': count * product.price_member if user.is_member else count * product.price_non_member 'cost': price
}) })
# Make sure exactly one user row was updated. # Make sure exactly one user row was updated.
affected = c.execute('SELECT changes()').fetchone()[0] affected = c.execute('SELECT changes()').fetchone()[0]
@ -392,11 +469,10 @@ class MatematDatabase(object):
# Subtract the number of purchased units from the product's stock. # Subtract the number of purchased units from the product's stock.
c.execute(''' c.execute('''
UPDATE products UPDATE products
SET stock = stock - :count SET stock = stock - 1
WHERE product_id = :product_id WHERE product_id = :product_id
''', { ''', {
'product_id': product.id, 'product_id': product.id,
'count': count
}) })
# Make sure exactly one product row was updated. # Make sure exactly one product row was updated.
affected = c.execute('SELECT changes()').fetchone()[0] affected = c.execute('SELECT changes()').fetchone()[0]
@ -427,6 +503,7 @@ class MatematDatabase(object):
def deposit(self, user: User, amount: int) -> None: def deposit(self, user: User, amount: int) -> None:
""" """
Update the account balance of a user. Update the account balance of a user.
:param user: The user to update the account balance for. :param user: The user to update the account balance for.
:param amount: The amount to add to the account balance. :param amount: The amount to add to the account balance.
:raises DatabaseConsistencyError: If the user represented by the object does not exist. :raises DatabaseConsistencyError: If the user represented by the object does not exist.
@ -434,6 +511,18 @@ class MatematDatabase(object):
if amount < 0: if amount < 0:
raise ValueError('Cannot deposit a negative value') raise ValueError('Cannot deposit a negative value')
with self.db.transaction() as c: with self.db.transaction() as c:
c.execute('''
INSERT INTO transactions (user_id, value, old_balance)
VALUES (:user_id, :value, :old_balance)
''', {
'user_id': user.id,
'value': amount,
'old_balance': user.balance
})
c.execute('''
INSERT INTO deposits (ta_id)
VALUES (last_insert_rowid())
''')
c.execute(''' c.execute('''
UPDATE users UPDATE users
SET balance = balance + :amount SET balance = balance + :amount

115
matemat/db/migrations.py Normal file
View file

@ -0,0 +1,115 @@
from typing import Dict
import sqlite3
def migrate_schema_1_to_2(c: sqlite3.Cursor):
# Create missing tables
c.execute('''
CREATE TABLE transactions (
ta_id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
value INTEGER(8) NOT NULL,
old_balance INTEGER(8) NOT NULL,
date INTEGER(8) DEFAULT (STRFTIME('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''')
c.execute('''
CREATE TABLE consumptions (
ta_id INTEGER PRIMARY KEY,
product_id INTEGER DEFAULT NULL,
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (product_id) REFERENCES products(product_id)
ON DELETE SET NULL ON UPDATE CASCADE
);
''')
c.execute('''
CREATE TABLE deposits (
ta_id INTEGER PRIMARY KEY,
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''')
c.execute('''
CREATE TABLE modifications (
ta_id INTEGER NOT NULL,
agent_id INTEGER NOT NULL,
reason TEXT DEFAULT NULL,
PRIMARY KEY (ta_id),
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (agent_id) REFERENCES users(user_id)
ON DELETE SET NULL ON UPDATE CASCADE
);
''')
#
# Convert entries from the old consumption table into entries for the new consumptions table
#
# Fetch current users, their balance and membership status
c.execute('SELECT user_id, balance, is_member FROM users')
balances: Dict[int, int] = dict()
memberships: Dict[int, bool] = dict()
for user_id, balance, member in c:
balances[user_id] = balance
memberships[user_id] = bool(member)
# Fetch current products and their prices
c.execute('SELECT product_id, price_member, price_non_member FROM products')
prices_member: Dict[int, int] = dict()
prices_non_member: Dict[int, int] = dict()
for product_id, price_member, price_non_member in c:
prices_member[product_id] = price_member
prices_non_member[product_id] = price_non_member
# As the following migration does reverse insertions, compute the max. primary key that can occur, and
# further down count downward from there
c.execute('SELECT SUM(count) FROM consumption')
ta_id: int = c.fetchone()[0]
# Iterate (users x products)
for user_id in balances.keys():
member: bool = memberships[user_id]
for product_id in prices_member:
price: int = prices_member[product_id] if member else prices_non_member[product_id]
# Select the number of items the user has bought from this product
c.execute('''
SELECT consumption.count FROM consumption
WHERE user_id = :user_id AND product_id = :product_id
''', {
'user_id': user_id,
'product_id': product_id
})
row = c.fetchone()
if row is not None:
count: int = row[0]
# Insert one row per bought item, setting the date to NULL, as it is not known
for _ in range(count):
# This migration "goes back in time", so after processing a purchase entry, "locally
# refund" the payment to reconstruct the "older" entries
balances[user_id] += price
# Insert into base table
c.execute('''
INSERT INTO transactions (ta_id, user_id, value, old_balance, date)
VALUES (:ta_id, :user_id, :value, :old_balance, NULL)
''', {
'ta_id': ta_id,
'user_id': user_id,
'value': -price,
'old_balance': balances[user_id]
})
# Insert into specialization table
c.execute('INSERT INTO consumptions (ta_id, product_id) VALUES (:ta_id, :product_id)', {
'ta_id': ta_id,
'product_id': product_id
})
# Decrement the transaction table insertion primary key
ta_id -= 1
# Drop the old consumption table
c.execute('DROP TABLE consumption')

View file

@ -0,0 +1,22 @@
from dataclasses import dataclass
@dataclass
class Product:
"""
Representation of a product offered by the Matemat, with a name, prices for users, and the number of items
currently in stock.
:param id: The product ID in the database.
:param name: The product's name.
:param price_member: The price of a unit of this product for users marked as "members".
:param price_non_member: The price of a unit of this product for users NOT marked as "members".
:param stock: The number of items of this product currently in stock.
"""
id: int
name: str
price_member: int
price_non_member: int
stock: int

View file

@ -0,0 +1,27 @@
from typing import Optional
from dataclasses import dataclass
@dataclass
class User:
"""
Representation of a user registered with the Matemat, with a name, e-mail address (optional), whether the user is a
member of the organization the Matemat instance is used in, whether the user is an administrator, and the user's
account balance.
:param id: The user ID in the database.
:param username: The user's name.
:param balance: The balance of the user's account.
:param email: The user's e-mail address (optional).
:param admin: Whether the user is an administrator.
:param member: Whether the user is a member.
"""
id: int
name: str
balance: int
email: Optional[str] = None
is_admin: bool = False
is_member: bool = False

103
matemat/db/schemas.py Normal file
View file

@ -0,0 +1,103 @@
from typing import Dict, List
SCHEMAS: Dict[int, List[str]] = dict()
SCHEMAS[1] = [
'''
CREATE TABLE users (
user_id INTEGER PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT DEFAULT NULL,
password TEXT NOT NULL,
touchkey TEXT DEFAULT NULL,
is_admin INTEGER(1) NOT NULL DEFAULT 0,
is_member INTEGER(1) NOT NULL DEFAULT 1,
balance INTEGER(8) NOT NULL DEFAULT 0,
lastchange INTEGER(8) NOT NULL DEFAULT 0
);
''',
'''
CREATE TABLE products (
product_id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
stock INTEGER(8) NOT NULL DEFAULT 0,
price_member INTEGER(8) NOT NULL,
price_non_member INTEGER(8) NOT NULL
);
''',
'''
CREATE TABLE consumption (
user_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
count INTEGER(8) NOT NULL DEFAULT 0,
PRIMARY KEY (user_id, product_id),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (product_id) REFERENCES products(product_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''']
SCHEMAS[2] = [
'''
CREATE TABLE users (
user_id INTEGER PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT DEFAULT NULL,
password TEXT NOT NULL,
touchkey TEXT DEFAULT NULL,
is_admin INTEGER(1) NOT NULL DEFAULT 0,
is_member INTEGER(1) NOT NULL DEFAULT 1,
balance INTEGER(8) NOT NULL DEFAULT 0,
lastchange INTEGER(8) NOT NULL DEFAULT 0
);
''',
'''
CREATE TABLE products (
product_id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
stock INTEGER(8) NOT NULL DEFAULT 0,
price_member INTEGER(8) NOT NULL,
price_non_member INTEGER(8) NOT NULL
);
''',
'''
CREATE TABLE transactions ( -- "superclass" of the following 3 tables
ta_id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
value INTEGER(8) NOT NULL,
old_balance INTEGER(8) NOT NULL,
date INTEGER(8) DEFAULT (STRFTIME('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''',
'''
CREATE TABLE consumptions ( -- transactions involving buying a product
ta_id INTEGER PRIMARY KEY,
product_id INTEGER DEFAULT NULL,
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (product_id) REFERENCES products(product_id)
ON DELETE SET NULL ON UPDATE CASCADE
);
''',
'''
CREATE TABLE deposits ( -- transactions involving depositing cash
ta_id INTEGER PRIMARY KEY,
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''',
'''
CREATE TABLE modifications ( -- transactions involving balance modification by an admin
ta_id INTEGER NOT NULL,
agent_id INTEGER NOT NULL,
reason TEXT DEFAULT NULL,
PRIMARY KEY (ta_id),
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (agent_id) REFERENCES users(user_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''']

View file

@ -26,6 +26,19 @@ class DatabaseTest(unittest.TestCase):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
db.create_user('testuser', 'supersecurepassword2', 'testuser2@example.com') db.create_user('testuser', 'supersecurepassword2', 'testuser2@example.com')
def test_get_user(self) -> None:
with self.db as db:
with db.transaction(exclusive=False):
created = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com',
admin=True, member=False)
user = db.get_user(created.id)
self.assertEqual('testuser', user.name)
self.assertEqual('testuser@example.com', user.email)
self.assertEqual(False, user.is_member)
self.assertEqual(True, user.is_admin)
with self.assertRaises(ValueError):
db.get_user(-1)
def test_list_users(self) -> None: def test_list_users(self) -> None:
with self.db as db: with self.db as db:
users = db.list_users() users = db.list_users()
@ -130,18 +143,23 @@ class DatabaseTest(unittest.TestCase):
def test_change_user(self) -> None: def test_change_user(self) -> None:
with self.db as db: with self.db as db:
agent = db.create_user('admin', 'supersecurepassword', 'admin@example.com', True, True)
user = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com', True, True) user = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com', True, True)
user.email = 'newaddress@example.com' db.change_user(user, agent, email='newaddress@example.com', is_admin=False, is_member=False, balance=4200)
user.is_admin = False # Changes must be reflected in the passed user object
user.is_member = False self.assertEqual('newaddress@example.com', user.email)
db.change_user(user) self.assertFalse(user.is_admin)
checkuser = db.login('testuser', 'supersecurepassword') self.assertFalse(user.is_member)
self.assertEqual('newaddress@example.com', checkuser.email) self.assertEqual(4200, user.balance)
# Changes must be reflected in the database
checkuser = db.get_user(user.id)
self.assertEqual('newaddress@example.com', user.email)
self.assertFalse(checkuser.is_admin) self.assertFalse(checkuser.is_admin)
self.assertFalse(checkuser.is_member) self.assertFalse(checkuser.is_member)
self.assertEqual(4200, checkuser.balance)
user.id = -1 user.id = -1
with self.assertRaises(DatabaseConsistencyError): with self.assertRaises(DatabaseConsistencyError):
db.change_user(user) db.change_user(user, agent, is_member='True')
def test_delete_user(self) -> None: def test_delete_user(self) -> None:
with self.db as db: with self.db as db:
@ -170,6 +188,17 @@ class DatabaseTest(unittest.TestCase):
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
db.create_product('Club Mate', 250, 250) db.create_product('Club Mate', 250, 250)
def test_get_product(self) -> None:
with self.db as db:
with db.transaction(exclusive=False):
created = db.create_product('Club Mate', 150, 250)
product = db.get_product(created.id)
self.assertEqual('Club Mate', product.name)
self.assertEqual(150, product.price_member)
self.assertEqual(250, product.price_non_member)
with self.assertRaises(ValueError):
db.get_product(-1)
def test_list_products(self) -> None: def test_list_products(self) -> None:
with self.db as db: with self.db as db:
# Test empty list # Test empty list
@ -197,14 +226,18 @@ class DatabaseTest(unittest.TestCase):
def test_change_product(self) -> None: def test_change_product(self) -> None:
with self.db as db: with self.db as db:
product = db.create_product('Club Mate', 200, 200) product = db.create_product('Club Mate', 200, 200)
product.name = 'Flora Power Mate' db.change_product(product, name='Flora Power Mate', price_member=150, price_non_member=250, stock=42)
product.price_member = 150 # Changes must be reflected in the passed object
product.price_non_member = 250 self.assertEqual('Flora Power Mate', product.name)
db.change_product(product) self.assertEqual(150, product.price_member)
checkproduct = db.list_products()[0] self.assertEqual(250, product.price_non_member)
self.assertEqual(42, product.stock)
# Changes must be reflected in the database
checkproduct = db.get_product(product.id)
self.assertEqual('Flora Power Mate', checkproduct.name) self.assertEqual('Flora Power Mate', checkproduct.name)
self.assertEqual(150, checkproduct.price_member) self.assertEqual(150, checkproduct.price_member)
self.assertEqual(250, checkproduct.price_non_member) self.assertEqual(250, checkproduct.price_non_member)
self.assertEqual(42, checkproduct.stock)
product.id = -1 product.id = -1
with self.assertRaises(DatabaseConsistencyError): with self.assertRaises(DatabaseConsistencyError):
db.change_product(product) db.change_product(product)
@ -289,14 +322,14 @@ class DatabaseTest(unittest.TestCase):
db.restock(fritzmate, 10) db.restock(fritzmate, 10)
# user1 is somewhat addicted to caffeine # user1 is somewhat addicted to caffeine
db.increment_consumption(user1, clubmate, 1) for _ in range(3):
db.increment_consumption(user1, clubmate, 2) db.increment_consumption(user1, clubmate)
db.increment_consumption(user1, florapowermate, 3) db.increment_consumption(user1, florapowermate)
# user2 is reeeally addicted # user2 is reeeally addicted
db.increment_consumption(user2, clubmate, 7) for _ in range(7):
db.increment_consumption(user2, florapowermate, 3) db.increment_consumption(user2, clubmate)
db.increment_consumption(user2, florapowermate, 4) db.increment_consumption(user2, florapowermate)
with db.transaction(exclusive=False) as c: with db.transaction(exclusive=False) as c:
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user1.id]) c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user1.id])

View file

@ -0,0 +1,124 @@
import unittest
import sqlite3
from matemat.db import DatabaseWrapper
from matemat.db.schemas import SCHEMAS
class TestMigrations(unittest.TestCase):
def setUp(self):
# Create an in-memory database for testing
self.db = DatabaseWrapper(':memory:')
def _initialize_db(self, schema_version: int):
self.db._sqlite_db = sqlite3.connect(':memory:')
cursor: sqlite3.Cursor = self.db._sqlite_db.cursor()
cursor.execute('BEGIN EXCLUSIVE')
for cmd in SCHEMAS[schema_version]:
cursor.execute(cmd)
cursor.execute('COMMIT')
def test_downgrade_fail(self):
# Test that downgrades are forbidden
self.db.SCHEMA_VERSION = 1
self.db._sqlite_db = sqlite3.connect(':memory:')
self.db._sqlite_db.execute('PRAGMA user_version = 2')
with self.assertRaises(RuntimeError):
with self.db:
pass
def test_upgrade_1_to_2(self):
# Setup test db with example entries covering - hopefully - all cases
self._initialize_db(1)
cursor: sqlite3.Cursor = self.db._sqlite_db.cursor()
cursor.execute('''
INSERT INTO users VALUES
(1, 'testadmin', 'a@b.c', '$2a$10$herebehashes', NULL, 1, 1, 1337, 0),
(2, 'testuser', NULL, '$2a$10$herebehashes', '$2a$10$herebehashes', 0, 1, 4242, 0),
(3, 'alien', NULL, '$2a$10$herebehashes', '$2a$10$herebehashes', 0, 0, 1234, 0)
''')
cursor.execute('''
INSERT INTO products VALUES
(1, 'Club Mate', 42, 200, 250),
(2, 'Flora Power Mate (1/4l)', 10, 100, 150)
''')
cursor.execute('''
INSERT INTO consumption VALUES
(1, 1, 5), (1, 2, 3), (2, 2, 10), (3, 1, 3), (3, 2, 4)
''')
cursor.execute('PRAGMA user_version = 1')
# Kick off the migration
self.db._setup()
# Test whether the new tables were created
cursor.execute('PRAGMA table_info(transactions)')
self.assertNotEqual(0, len(cursor.fetchall()))
cursor.execute('PRAGMA table_info(consumptions)')
self.assertNotEqual(0, len(cursor.fetchall()))
cursor.execute('PRAGMA table_info(deposits)')
self.assertNotEqual(0, len(cursor.fetchall()))
cursor.execute('PRAGMA table_info(modifications)')
self.assertNotEqual(0, len(cursor.fetchall()))
# Test whether the old consumption table was dropped
cursor.execute('PRAGMA table_info(consumption)')
self.assertEqual(0, len(cursor.fetchall()))
# Test number of entries in the new tables
cursor.execute('SELECT COUNT(ta_id) FROM transactions')
self.assertEqual(25, cursor.fetchone()[0])
cursor.execute('SELECT COUNT(ta_id) FROM consumptions')
self.assertEqual(25, cursor.fetchone()[0])
cursor.execute('SELECT COUNT(ta_id) FROM deposits')
self.assertEqual(0, cursor.fetchone()[0])
cursor.execute('SELECT COUNT(ta_id) FROM modifications')
self.assertEqual(0, cursor.fetchone()[0])
# The (user_id=2 x product_id=1) combination should never appear
cursor.execute('''
SELECT COUNT(t.ta_id)
FROM transactions AS t
LEFT JOIN consumptions AS c
ON t.ta_id = c.ta_id
WHERE t.user_id = 2 AND c.product_id = 1''')
self.assertEqual(0, cursor.fetchone()[0])
# Test that one entry per consumption was created, and their values match the negative price
cursor.execute('''
SELECT COUNT(t.ta_id)
FROM transactions AS t
LEFT JOIN consumptions AS c
ON t.ta_id = c.ta_id
WHERE t.user_id = 1 AND c.product_id = 1 AND t.value = -200''')
self.assertEqual(5, cursor.fetchone()[0])
cursor.execute('''
SELECT COUNT(t.ta_id)
FROM transactions AS t
LEFT JOIN consumptions AS c
ON t.ta_id = c.ta_id
WHERE t.user_id = 1 AND c.product_id = 2 AND t.value = -100''')
self.assertEqual(3, cursor.fetchone()[0])
cursor.execute('''
SELECT COUNT(t.ta_id)
FROM transactions AS t
LEFT JOIN consumptions AS c
ON t.ta_id = c.ta_id
WHERE t.user_id = 2 AND c.product_id = 2 AND t.value = -100''')
self.assertEqual(10, cursor.fetchone()[0])
cursor.execute('''
SELECT COUNT(t.ta_id)
FROM transactions AS t
LEFT JOIN consumptions AS c
ON t.ta_id = c.ta_id
WHERE t.user_id = 3 AND c.product_id = 1 AND t.value = -250''')
self.assertEqual(3, cursor.fetchone()[0])
cursor.execute('''
SELECT COUNT(t.ta_id)
FROM transactions AS t
LEFT JOIN consumptions AS c
ON t.ta_id = c.ta_id
WHERE t.user_id = 3 AND c.product_id = 2 AND t.value = -150''')
self.assertEqual(4, cursor.fetchone()[0])

View file

@ -1,6 +1,8 @@
import unittest import unittest
import sqlite3
from matemat.db import DatabaseWrapper from matemat.db import DatabaseWrapper

View file

@ -5,6 +5,8 @@ from typing import Any, Optional
import sqlite3 import sqlite3
from matemat.exceptions import DatabaseConsistencyError from matemat.exceptions import DatabaseConsistencyError
from matemat.db.schemas import SCHEMAS
from matemat.db.migrations import migrate_schema_1_to_2
class Transaction(object): class Transaction(object):
@ -40,38 +42,8 @@ class Transaction(object):
class DatabaseWrapper(object): class DatabaseWrapper(object):
SCHEMA_VERSION = 1
SCHEMA = ''' SCHEMA_VERSION = 2
CREATE TABLE users (
user_id INTEGER PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT DEFAULT NULL,
password TEXT NOT NULL,
touchkey TEXT DEFAULT NULL,
is_admin INTEGER(1) NOT NULL DEFAULT 0,
is_member INTEGER(1) NOT NULL DEFAULT 1,
balance INTEGER(8) NOT NULL DEFAULT 0,
lastchange INTEGER(8) NOT NULL DEFAULT 0
);
CREATE TABLE products (
product_id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
stock INTEGER(8) NOT NULL DEFAULT 0,
price_member INTEGER(8) NOT NULL,
price_non_member INTEGER(8) NOT NULL
);
CREATE TABLE consumption (
user_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
count INTEGER(8) NOT NULL DEFAULT 0,
PRIMARY KEY (user_id, product_id),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (product_id) REFERENCES products(product_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
'''
def __init__(self, filename: str) -> None: def __init__(self, filename: str) -> None:
self._filename: str = filename self._filename: str = filename
@ -93,13 +65,20 @@ class DatabaseWrapper(object):
with self.transaction() as c: with self.transaction() as c:
version: int = self._user_version version: int = self._user_version
if version < 1: if version < 1:
c.executescript(self.SCHEMA) # Don't use executescript, as it issues a COMMIT first
for command in SCHEMAS[self.SCHEMA_VERSION]:
c.execute(command)
elif version < self.SCHEMA_VERSION: elif version < self.SCHEMA_VERSION:
self._upgrade(old=version, new=self.SCHEMA_VERSION) self._upgrade(from_version=version, to_version=self.SCHEMA_VERSION)
elif version > self.SCHEMA_VERSION:
raise RuntimeError('Database schema is newer than supported by this version of Matemat.')
self._user_version = self.SCHEMA_VERSION self._user_version = self.SCHEMA_VERSION
def _upgrade(self, old: int, new: int) -> None: def _upgrade(self, from_version: int, to_version: int) -> None:
pass with self.transaction() as c:
# Note to future s3lph: If there are further migrations, also consider upgrades like 1 -> 3
if from_version == 1 and to_version == 2:
migrate_schema_1_to_2(c)
def connect(self) -> None: def connect(self) -> None:
if self.is_connected(): if self.is_connected():

View file

@ -1,10 +0,0 @@
from dataclasses import dataclass
@dataclass
class Product:
id: int
name: str
price_member: int
price_non_member: int

View file

@ -1,13 +0,0 @@
from typing import Optional
from dataclasses import dataclass
@dataclass
class User:
id: int
name: str
email: Optional[str] = None
is_admin: bool = False
is_member: bool = False

0
matemat/util/__init__.py Normal file
View file

View file

@ -0,0 +1,56 @@
def format_chf(value: int, with_currencysign: bool = True) -> str:
"""
Formats a centime value into a commonly understood representation ("CHF -13.37").
:param value: The value to format, in centimes.
:param with_currencysign: Whether to include the currency prefix ("CHF ") in the output.
:return: A human-readable string representation.
"""
sign: str = ''
if value < 0:
# As // and % round towards -Inf, convert into a positive value and prepend the negative sign
sign = '-'
value = -value
# Split into full francs and fractions (centimes)
full: int = value // 100
frac: int = value % 100
csign: str = 'CHF ' if with_currencysign else ''
# Put it all together; centimes are always padded with 2 zeros
return f'{csign}{sign}{full}.{frac:02}'
def parse_chf(value: str) -> int:
"""
Parse a currency value into machine-readable format (integer centimes). The prefix "CHF", the decimal point, and
digits after the decimal point are optional.
:param value: The value to parse.
:return: An integer representation of the value.
:raises: Value error: If more than two digits after the decimal point are present.
"""
# Remove optional leading "CHF" and strip whitespace
value = value.strip()
if value.startswith('CHF'):
value = value[3:]
value = value.strip()
if '.' not in value:
# already is an integer; parse and turn into centimes
return int(value, 10) * 100
# Split at the decimal point
full, frac = value.split('.', 1)
if len(frac) > 2:
raise ValueError('Needs max. 2 digits after decimal point')
elif len(frac) < 2:
# Right-pad fraction with zeros ("x." -> "x.00", "x.x" -> "x.x0")
frac = frac + '0' * (2 - len(frac))
# Parse both parts
ifrac: int = int(frac, 10)
ifull: int = int(full, 10)
if ifrac < 0:
raise ValueError('Fraction part must not be negative.')
if full.startswith('-'):
ifrac = -ifrac
# Combine into centime integer
return ifull * 100 + ifrac

View file

View file

@ -0,0 +1,133 @@
import unittest
from matemat.util.currency_format import format_chf, parse_chf
class TestCurrencyFormat(unittest.TestCase):
def test_format_zero(self):
self.assertEqual('CHF 0.00', format_chf(0))
self.assertEqual('0.00', format_chf(0, False))
def test_format_positive_full(self):
self.assertEqual('CHF 42.00', format_chf(4200))
self.assertEqual('42.00', format_chf(4200, False))
def test_format_negative_full(self):
self.assertEqual('CHF -42.00', format_chf(-4200))
self.assertEqual('-42.00', format_chf(-4200, False))
def test_format_positive_frac(self):
self.assertEqual('CHF 13.37', format_chf(1337))
self.assertEqual('13.37', format_chf(1337, False))
def test_format_negative_frac(self):
self.assertEqual('CHF -13.37', format_chf(-1337))
self.assertEqual('-13.37', format_chf(-1337, False))
def test_format_pad_left_positive(self):
self.assertEqual('CHF 0.01', format_chf(1))
self.assertEqual('0.01', format_chf(1, False))
def test_format_pad_left_negative(self):
self.assertEqual('CHF -0.01', format_chf(-1))
self.assertEqual('-0.01', format_chf(-1, False))
def test_format_pad_right_positive(self):
self.assertEqual('CHF 4.20', format_chf(420))
self.assertEqual('4.20', format_chf(420, False))
def test_format_pad_right_negative(self):
self.assertEqual('CHF -4.20', format_chf(-420))
self.assertEqual('-4.20', format_chf(-420, False))
def test_parse_empty(self):
with self.assertRaises(ValueError):
parse_chf('')
with self.assertRaises(ValueError):
parse_chf('CHF')
with self.assertRaises(ValueError):
parse_chf('CHF ')
def test_parse_zero(self):
self.assertEqual(0, parse_chf('CHF0'))
self.assertEqual(0, parse_chf('CHF 0'))
self.assertEqual(0, parse_chf('CHF -0'))
self.assertEqual(0, parse_chf('CHF 0.'))
self.assertEqual(0, parse_chf('CHF 0.0'))
self.assertEqual(0, parse_chf('CHF 0.00'))
self.assertEqual(0, parse_chf('CHF -0.'))
self.assertEqual(0, parse_chf('CHF -0.0'))
self.assertEqual(0, parse_chf('CHF -0.00'))
self.assertEqual(0, parse_chf('0'))
self.assertEqual(0, parse_chf('0'))
self.assertEqual(0, parse_chf('-0'))
self.assertEqual(0, parse_chf('0.'))
self.assertEqual(0, parse_chf('0.0'))
self.assertEqual(0, parse_chf('0.00'))
self.assertEqual(0, parse_chf('-0.'))
self.assertEqual(0, parse_chf('-0.0'))
self.assertEqual(0, parse_chf('-0.00'))
def test_parse_positive_full(self):
self.assertEqual(4200, parse_chf('CHF 42.00'))
self.assertEqual(4200, parse_chf('42.00'))
self.assertEqual(4200, parse_chf('CHF 42'))
self.assertEqual(4200, parse_chf('42'))
self.assertEqual(4200, parse_chf('CHF 42.'))
self.assertEqual(4200, parse_chf('42.'))
self.assertEqual(4200, parse_chf('CHF 42.0'))
self.assertEqual(4200, parse_chf('42.0'))
def test_parse_negative_full(self):
self.assertEqual(-4200, parse_chf('CHF -42.00'))
self.assertEqual(-4200, parse_chf('-42.00'))
self.assertEqual(-4200, parse_chf('CHF -42'))
self.assertEqual(-4200, parse_chf('-42'))
self.assertEqual(-4200, parse_chf('CHF -42.'))
self.assertEqual(-4200, parse_chf('-42.'))
self.assertEqual(-4200, parse_chf('CHF -42.0'))
self.assertEqual(-4200, parse_chf('-42.0'))
def test_parse_positive_frac(self):
self.assertEqual(1337, parse_chf('CHF 13.37'))
self.assertEqual(1337, parse_chf('13.37'))
def test_parse_negative_frac(self):
self.assertEqual(-1337, parse_chf('CHF -13.37'))
self.assertEqual(-1337, parse_chf('-13.37'))
def test_parse_pad_left_positive(self):
self.assertEqual(1, parse_chf('CHF 0.01'))
self.assertEqual(1, parse_chf('0.01'))
def test_parse_pad_left_negative(self):
self.assertEqual(-1, parse_chf('CHF -0.01'))
self.assertEqual(-1, parse_chf('-0.01'))
def test_parse_pad_right_positive(self):
self.assertEqual(420, parse_chf('CHF 4.20'))
self.assertEqual(420, parse_chf('4.20'))
self.assertEqual(420, parse_chf('CHF 4.2'))
self.assertEqual(420, parse_chf('4.2'))
def test_parse_pad_right_negative(self):
self.assertEqual(-420, parse_chf('CHF -4.20'))
self.assertEqual(-420, parse_chf('-4.20'))
self.assertEqual(-420, parse_chf('CHF -4.2'))
self.assertEqual(-420, parse_chf('-4.2'))
def test_parse_too_many_decimals(self):
with self.assertRaises(ValueError):
parse_chf('123.456')
with self.assertRaises(ValueError):
parse_chf('CHF 0.456')
with self.assertRaises(ValueError):
parse_chf('CHF 0.450')
def test_parse_wrong_separator(self):
with self.assertRaises(ValueError):
parse_chf('13,37')
with self.assertRaises(ValueError):
parse_chf('CHF 13,37')

View file

@ -5,6 +5,7 @@ import logging
import os import os
import socket import socket
import mimetypes import mimetypes
import magic
from socketserver import TCPServer from socketserver import TCPServer
from http.server import HTTPServer, BaseHTTPRequestHandler from http.server import HTTPServer, BaseHTTPRequestHandler
from http.cookies import SimpleCookie from http.cookies import SimpleCookie
@ -17,6 +18,7 @@ from matemat import __version__ as matemat_version
from matemat.exceptions import HttpException from matemat.exceptions import HttpException
from matemat.webserver import RequestArguments, PageletResponse, RedirectResponse, TemplateResponse from matemat.webserver import RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.webserver.util import parse_args from matemat.webserver.util import parse_args
from matemat.util.currency_format import format_chf
# #
# Python internal class hacks # Python internal class hacks
@ -115,8 +117,10 @@ class MatematHTTPServer(HTTPServer):
self.pagelet_variables = pagelet_variables self.pagelet_variables = pagelet_variables
# Set up the Jinja2 environment # Set up the Jinja2 environment
self.jinja_env: jinja2.Environment = jinja2.Environment( self.jinja_env: jinja2.Environment = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.abspath(templateroot)) loader=jinja2.FileSystemLoader(os.path.abspath(templateroot)),
autoescape=jinja2.select_autoescape(default=True),
) )
self.jinja_env.filters['chf'] = format_chf
# Set up logger # Set up logger
self.logger: logging.Logger = logging.getLogger('matemat.webserver') self.logger: logging.Logger = logging.getLogger('matemat.webserver')
self.logger.setLevel(log_level) self.logger.setLevel(log_level)
@ -308,7 +312,6 @@ class HttpHandler(BaseHTTPRequestHandler):
if path in _PAGELET_PATHS: if path in _PAGELET_PATHS:
# Prepare some headers. Those can still be overwritten by the pagelet # Prepare some headers. Those can still be overwritten by the pagelet
headers: Dict[str, str] = { headers: Dict[str, str] = {
'Content-Type': 'text/html',
'Cache-Control': 'no-cache' 'Cache-Control': 'no-cache'
} }
# Call the pagelet function # Call the pagelet function
@ -328,6 +331,20 @@ class HttpHandler(BaseHTTPRequestHandler):
f'matemat_session_id={session_id}; expires={expires}') f'matemat_session_id={session_id}; expires={expires}')
# Compute the body length and add the appropriate header # Compute the body length and add the appropriate header
headers['Content-Length'] = str(len(data)) headers['Content-Length'] = str(len(data))
# If the pagelet did not set its own Content-Type header, use libmagic to guess an appropriate one
if 'Content-Type' not in headers:
try:
filemagic: magic.FileMagic = magic.detect_from_content(data)
mimetype: str = filemagic.mime_type
charset: str = filemagic.encoding
except ValueError:
mimetype = 'application/octet-stream'
charset = 'binary'
# Only append the charset if it is not "binary"
if charset == 'binary':
headers['Content-Type'] = mimetype
else:
headers['Content-Type'] = f'{mimetype}; charset={charset}'
# Send all headers set by the pagelet # Send all headers set by the pagelet
for name, value in headers.items(): for name, value in headers.items():
self.send_header(name, value) self.send_header(name, value)
@ -365,13 +382,21 @@ class HttpHandler(BaseHTTPRequestHandler):
data = f.read() data = f.read()
# File read successfully, send 'OK' header # File read successfully, send 'OK' header
self.send_response(200) self.send_response(200)
# TODO: Guess the MIME type. Unfortunately this call solely relies on the file extension, not ideal? # Guess the MIME type by file extension, or use libmagic as fallback
mimetype, _ = mimetypes.guess_type(filepath) # Use libmagic to guess the charset
# Fall back to octet-stream type, if unknown try:
if mimetype is None: exttype: str = mimetypes.guess_type(filepath)[0]
filemagic: magic.FileMagic = magic.detect_from_filename(filepath)
mimetype: str = exttype if exttype is not None else filemagic.mime_type
charset: str = filemagic.encoding
except ValueError:
mimetype = 'application/octet-stream' mimetype = 'application/octet-stream'
# Send content type and length header charset = 'binary'
# Send content type and length header. Only set the charset if it's not "binary"
if charset == 'binary':
self.send_header('Content-Type', mimetype) self.send_header('Content-Type', mimetype)
else:
self.send_header('Content-Type', f'{mimetype}; charset={charset}')
self.send_header('Content-Length', str(len(data))) self.send_header('Content-Length', str(len(data)))
self.send_header('Last-Modified', fileage.strftime('%a, %d %b %Y %H:%M:%S GMT')) self.send_header('Last-Modified', fileage.strftime('%a, %d %b %Y %H:%M:%S GMT'))
self.send_header('Cache-Control', 'max-age=1') self.send_header('Cache-Control', 'max-age=1')

View file

@ -8,3 +8,8 @@ from .main import main_page
from .login import login_page from .login import login_page
from .logout import logout from .logout import logout
from .touchkey import touchkey_page from .touchkey import touchkey_page
from .buy import buy
from .deposit import deposit
from .admin import admin
from .moduser import moduser
from .modproduct import modproduct

View file

@ -0,0 +1,211 @@
from typing import Any, Dict, Union
import os
import magic
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.db import MatematDatabase
from matemat.db.primitives import User
from matemat.exceptions import DatabaseConsistencyError, HttpException
@pagelet('/admin')
def admin(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str],
config: Dict[str, str]) \
-> Union[str, bytes, PageletResponse]:
"""
The admin panel, shows a user's own settings. Additionally, for administrators, settings to modify other users and
products are shown.
"""
# If no user is logged in, redirect to the login page
if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars:
return RedirectResponse('/login')
authlevel: int = session_vars['authentication_level']
uid: int = session_vars['authenticated_user']
# Show a 403 Forbidden error page if no user is logged in (0) or a user logged in via touchkey (1)
if authlevel < 2:
raise HttpException(403)
# Connect to the database
with MatematDatabase(config['DatabaseFile']) as db:
# Fetch the authenticated user
user = db.get_user(uid)
# If the POST request contains a "change" parameter, delegate the change handling to the function below
if method == 'POST' and 'change' in args:
handle_change(args, user, db, config)
# If the POST request contains an "adminchange" parameter, delegate the change handling to the function below
elif method == 'POST' and 'adminchange' in args and user.is_admin:
handle_admin_change(args, db, config)
# Fetch all existing users and products from the database
users = db.list_users()
products = db.list_products()
# Render the "Admin/Settings" page
return TemplateResponse('admin.html',
authuser=user, authlevel=authlevel, users=users, products=products,
setupname=config['InstanceName'])
def handle_change(args: RequestArguments, user: User, db: MatematDatabase, config: Dict[str, str]) -> None:
"""
Write the changes requested by a user for its own account to the database.
:param args: The RequestArguments object passed to the pagelet.
:param user: The user to edit.
:param db: The database facade where changes are written to.
:param config: The dictionary of config file entries from the [Pagelets] section.
"""
try:
# Read the type of change requested by the user, then switch over it
change = str(args.change)
# The user requested a modification of its general account information (username, email)
if change == 'account':
# Username and email must be set in the request arguments
if 'username' not in args or 'email' not in args:
return
username = str(args.username)
email = str(args.email)
# An empty e-mail field should be interpreted as NULL
if len(email) == 0:
email = None
# Attempt to update username and e-mail
try:
db.change_user(user, agent=None, name=username, email=email)
except DatabaseConsistencyError:
return
# The user requested a password change
elif change == 'password':
# The old password and 2x the new password must be present
if 'oldpass' not in args or 'newpass' not in args or 'newpass2' not in args:
return
# Read the passwords from the request arguments
oldpass = str(args.oldpass)
newpass = str(args.newpass)
newpass2 = str(args.newpass2)
# The two instances of the new password must match
if newpass != newpass2:
raise ValueError('New passwords don\'t match')
# Write the new password to the database
db.change_password(user, oldpass, newpass)
# The user requested a touchkey change
elif change == 'touchkey':
# The touchkey must be present
if 'touchkey' not in args:
return
# Read the touchkey from the request arguments
touchkey = str(args.touchkey)
# An empty touchkey field should set the touchkey to NULL (disable touchkey login)
if len(touchkey) == 0:
touchkey = None
# Write the new touchkey to the database
db.change_touchkey(user, '', touchkey, verify_password=False)
# The user requested an avatar change
elif change == 'avatar':
# The new avatar field must be present
if 'avatar' not in args:
return
# Read the raw image data from the request
avatar = bytes(args.avatar)
# Only process the image, if its size is more than zero. Zero size means no new image was uploaded
if len(avatar) == 0:
return
# Detect the MIME type
filemagic: magic.FileMagic = magic.detect_from_content(avatar)
# Currently, only image/png is supported, don't process any other formats
if filemagic.mime_type != 'image/png':
# TODO: Optionally convert to png
return
# Create the absolute path of the upload directory
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/users/')
os.makedirs(abspath, exist_ok=True)
# Write the image to the file
with open(os.path.join(abspath, f'{user.id}.png'), 'wb') as f:
f.write(avatar)
except UnicodeDecodeError:
raise ValueError('an argument not a string')
def handle_admin_change(args: RequestArguments, db: MatematDatabase, config: Dict[str, str]):
"""
Write the changes requested by an admin for users of products.
:param args: The RequestArguments object passed to the pagelet.
:param db: The database facade where changes are written to.
:param config: The dictionary of config file entries from the [Pagelets] section.
"""
try:
# Read the type of change requested by the admin, then switch over it
change = str(args.adminchange)
# The user requested to create a new user
if change == 'newuser':
# Only create a new user if all required properties of the user are present in the request arguments
if 'username' not in args or 'email' not in args or 'password' not in args:
return
# Read the properties from the request arguments
username = str(args.username)
email = str(args.email)
# An empty e-mail field should be interpreted as NULL
if len(email) == 0:
email = None
password = str(args.password)
is_member = 'ismember' in args
is_admin = 'isadmin' in args
# Create the user in the database
db.create_user(username, password, email, member=is_member, admin=is_admin)
# The user requested to create a new product
elif change == 'newproduct':
# Only create a new product if all required properties of the product are present in the request arguments
if 'name' not in args or 'pricemember' not in args or 'pricenonmember' not in args:
return
# Read the properties from the request arguments
name = str(args.name)
price_member = int(str(args.pricemember))
price_non_member = int(str(args.pricenonmember))
# Create the user in the database
newproduct = db.create_product(name, price_member, price_non_member)
# If a new product image was uploaded, process it
if 'image' in args:
# Read the raw image data from the request
image = bytes(args.image)
# Only process the image, if its size is more than zero. Zero size means no new image was uploaded
if len(image) == 0:
return
# Detect the MIME type
filemagic: magic.FileMagic = magic.detect_from_content(image)
# Currently, only image/png is supported, don't process any other formats
if filemagic.mime_type != 'image/png':
# TODO: Optionally convert to png
return
# Create the absolute path of the upload directory
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/products/')
os.makedirs(abspath, exist_ok=True)
# Write the image to the file
with open(os.path.join(abspath, f'{newproduct.id}.png'), 'wb') as f:
f.write(image)
# The user requested to restock a product
elif change == 'restock':
# Only restock a product if all required properties are present in the request arguments
if 'productid' not in args or 'amount' not in args:
return
# Read the properties from the request arguments
productid = int(str(args.productid))
amount = int(str(args.amount))
# Fetch the product to restock from the database
product = db.get_product(productid)
# Write the new stock count to the database
db.restock(product, amount)
except UnicodeDecodeError:
raise ValueError('an argument not a string')

View file

@ -0,0 +1,33 @@
from typing import Any, Dict, Union
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse
from matemat.db import MatematDatabase
@pagelet('/buy')
def buy(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str],
config: Dict[str, str]) \
-> Union[str, bytes, PageletResponse]:
"""
The purchasing mechanism. Called by the user clicking an item on the product list.
"""
# If no user is logged in, redirect to the main page, as a purchase must always be bound to a user
if 'authenticated_user' not in session_vars:
return RedirectResponse('/')
# Connect to the database
with MatematDatabase(config['DatabaseFile']) as db:
# Fetch the authenticated user from the database
uid: int = session_vars['authenticated_user']
user = db.get_user(uid)
# Read the product from the database, identified by the product ID passed as request argument
if 'pid' in args:
pid = int(str(args.pid))
product = db.get_product(pid)
# Create a consumption entry for the (user, product) combination
db.increment_consumption(user, product)
# Redirect to the main page (where this request should have come from)
return RedirectResponse('/')

View file

@ -0,0 +1,32 @@
from typing import Any, Dict, Union
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse
from matemat.db import MatematDatabase
@pagelet('/deposit')
def deposit(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str],
config: Dict[str, str]) \
-> Union[str, bytes, PageletResponse]:
"""
The cash depositing mechanism. Called by the user submitting a deposit from the product list.
"""
# If no user is logged in, redirect to the main page, as a deposit must always be bound to a user
if 'authenticated_user' not in session_vars:
return RedirectResponse('/')
# Connect to the database
with MatematDatabase(config['DatabaseFile']) as db:
# Fetch the authenticated user from the database
uid: int = session_vars['authenticated_user']
user = db.get_user(uid)
if 'n' in args:
# Read the amount of cash to deposit from the request arguments
n = int(str(args.n))
# Write the deposit to the database
db.deposit(user, n)
# Redirect to the main page (where this request should have come from)
return RedirectResponse('/')

View file

@ -1,9 +1,8 @@
from typing import Any, Dict, Union from typing import Any, Dict, Union
from matemat.exceptions import AuthenticationError, HttpException from matemat.exceptions import AuthenticationError, HttpException
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.primitives import User from matemat.db.primitives import User
from matemat.db import MatematDatabase from matemat.db import MatematDatabase
@ -13,18 +12,34 @@ def login_page(method: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str], headers: Dict[str, str],
config: Dict[str, str])\ config: Dict[str, str]) \
-> Union[bytes, str, PageletResponse]: -> Union[bytes, str, PageletResponse]:
if 'user' in session_vars: """
The password login mechanism. If called via GET, render the UI template; if called via POST, attempt to log in with
the provided credentials (username and passsword).
"""
# If a user is already logged in, simply redirect to the main page, showing the product list
if 'authenticated_user' in session_vars:
return RedirectResponse('/') return RedirectResponse('/')
# If requested via HTTP GET, render the login page showing the login UI
if method == 'GET': if method == 'GET':
return TemplateResponse('login.html', setupname=config['InstanceName']) return TemplateResponse('login.html',
setupname=config['InstanceName'])
# If requested via HTTP POST, read the request arguments and attempt to log in with the provided credentials
elif method == 'POST': elif method == 'POST':
# Connect to the database
with MatematDatabase(config['DatabaseFile']) as db: with MatematDatabase(config['DatabaseFile']) as db:
try: try:
# Read the request arguments and attempt to log in with them
user: User = db.login(str(args.username), str(args.password)) user: User = db.login(str(args.username), str(args.password))
except AuthenticationError: except AuthenticationError:
# Reload the touchkey login page on failure
return RedirectResponse('/login') return RedirectResponse('/login')
session_vars['user'] = user # Set the user ID session variable
session_vars['authenticated_user'] = user.id
# Set the authlevel session variable (0 = none, 1 = touchkey, 2 = password login)
session_vars['authentication_level'] = 2
# Redirect to the main page, showing the product list
return RedirectResponse('/') return RedirectResponse('/')
# If neither GET nor POST was used, show a 405 Method Not Allowed error page
raise HttpException(405) raise HttpException(405)

View file

@ -1,4 +1,3 @@
from typing import Any, Dict, Union from typing import Any, Dict, Union
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse
@ -10,8 +9,15 @@ def logout(method: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str], headers: Dict[str, str],
config: Dict[str, str])\ config: Dict[str, str]) \
-> Union[bytes, str, PageletResponse]: -> Union[bytes, str, PageletResponse]:
if 'user' in session_vars: """
del session_vars['user'] The logout mechanism, clearing the authentication values in the session storage.
"""
# Remove the authenticated user ID from the session storage, if any
if 'authenticated_user' in session_vars:
del session_vars['authenticated_user']
# Reset the authlevel session variable (0 = none, 1 = touchkey, 2 = password login)
session_vars['authentication_level'] = 0
# Redirect to the main page, showing the user list
return RedirectResponse('/') return RedirectResponse('/')

View file

@ -1,8 +1,6 @@
from typing import Any, Dict, Union from typing import Any, Dict, Union
from matemat.webserver import pagelet, RequestArguments, PageletResponse, TemplateResponse from matemat.webserver import pagelet, RequestArguments, PageletResponse, TemplateResponse
from matemat.primitives import User
from matemat.db import MatematDatabase from matemat.db import MatematDatabase
@ -12,13 +10,27 @@ def main_page(method: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str], headers: Dict[str, str],
config: Dict[str, str])\ config: Dict[str, str]) \
-> Union[bytes, str, PageletResponse]: -> Union[bytes, str, PageletResponse]:
"""
The main page, showing either the user list (if no user is logged in) or the product list (if a user is logged in).
"""
with MatematDatabase(config['DatabaseFile']) as db: with MatematDatabase(config['DatabaseFile']) as db:
if 'user' in session_vars: # Check whether a user is logged in
user: User = session_vars['user'] if 'authenticated_user' in session_vars:
# Fetch the user id and authentication level (touchkey vs password) from the session storage
uid: int = session_vars['authenticated_user']
authlevel: int = session_vars['authentication_level']
# Fetch the user object from the database (for name display, price calculation and admin check)
user = db.get_user(uid)
# Fetch the list of products to display
products = db.list_products() products = db.list_products()
return TemplateResponse('main.html', user=user, list=products, setupname=config['InstanceName']) # Prepare a response with a jinja2 template
return TemplateResponse('productlist.html',
authuser=user, products=products, authlevel=authlevel,
setupname=config['InstanceName'])
else: else:
# If no user is logged in, fetch the list of users and render the userlist template
users = db.list_users() users = db.list_users()
return TemplateResponse('main.html', list=users, setupname=config['InstanceName']) return TemplateResponse('userlist.html',
users=users, setupname=config['InstanceName'])

View file

@ -0,0 +1,119 @@
from typing import Any, Dict, Union
import os
import magic
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.db import MatematDatabase
from matemat.db.primitives import Product
from matemat.exceptions import DatabaseConsistencyError, HttpException
from matemat.util.currency_format import parse_chf
@pagelet('/modproduct')
def modproduct(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str],
config: Dict[str, str]) \
-> Union[str, bytes, PageletResponse]:
"""
The product modification page available from the admin panel.
"""
# If no user is logged in, redirect to the login page
if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars:
return RedirectResponse('/login')
authlevel: int = session_vars['authentication_level']
auth_uid: int = session_vars['authenticated_user']
# Show a 403 Forbidden error page if no user is logged in (0) or a user logged in via touchkey (1)
if authlevel < 2:
raise HttpException(403)
# Connect to the database
with MatematDatabase(config['DatabaseFile']) as db:
# Fetch the authenticated user
authuser = db.get_user(auth_uid)
if not authuser.is_admin:
# Show a 403 Forbidden error page if the user is not an admin
raise HttpException(403)
if 'productid' not in args:
# Show a 400 Bad Request error page if no product to edit was specified
# (should never happen during normal operation)
raise HttpException(400, '"productid" argument missing')
# Fetch the product to modify from the database
modproduct_id = int(str(args.productid))
product = db.get_product(modproduct_id)
# If the request contains a "change" parameter, delegate the change handling to the function below
if 'change' in args:
handle_change(args, product, db, config)
# If the product was deleted, redirect back to the admin page, as there is nothing to edit any more
if str(args.change) == 'del':
return RedirectResponse('/admin')
# Render the "Modify Product" page
return TemplateResponse('modproduct.html',
authuser=authuser, product=product, authlevel=authlevel,
setupname=config['InstanceName'])
def handle_change(args: RequestArguments, product: Product, db: MatematDatabase, config: Dict[str, str]) -> None:
"""
Write the changes requested by an admin to the database.
:param args: The RequestArguments object passed to the pagelet.
:param product: The product to edit.
:param db: The database facade where changes are written to.
:param config: The dictionary of config file entries from the [Pagelets] section.
"""
# Read the type of change requested by the admin, then switch over it
change = str(args.change)
# Admin requested deletion of the product
if change == 'del':
# Delete the product from the database
db.delete_product(product)
# Delete the product image, if it exists
try:
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/products/')
os.remove(os.path.join(abspath, f'{product.id}.png'))
except FileNotFoundError:
pass
# Admin requested update of the product details
elif change == 'update':
# Only write a change if all properties of the product are present in the request arguments
if 'name' not in args or 'pricemember' not in args or 'pricenonmember' not in args or 'stock' not in args:
return
# Read the properties from the request arguments
name = str(args.name)
price_member = parse_chf(str(args.pricemember))
price_non_member = parse_chf(str(args.pricenonmember))
stock = int(str(args.stock))
# Attempt to write the changes to the database
try:
db.change_product(product,
name=name, price_member=price_member, price_non_member=price_non_member, stock=stock)
except DatabaseConsistencyError:
return
# If a new product image was uploaded, process it
if 'image' in args:
# Read the raw image data from the request
image = bytes(args.image)
# Only process the image, if its size is more than zero. Zero size means no new image was uploaded
if len(image) == 0:
return
# Detect the MIME type
filemagic: magic.FileMagic = magic.detect_from_content(image)
# Currently, only image/png is supported, don't process any other formats
if filemagic.mime_type != 'image/png':
# TODO: Optionally convert to png
return
# Create the absolute path of the upload directory
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/products/')
os.makedirs(abspath, exist_ok=True)
# Write the image to the file
with open(os.path.join(abspath, f'{product.id}.png'), 'wb') as f:
f.write(image)

View file

@ -0,0 +1,130 @@
from typing import Any, Dict, Union
import os
import magic
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.db import MatematDatabase
from matemat.db.primitives import User
from matemat.exceptions import DatabaseConsistencyError, HttpException
from matemat.util.currency_format import parse_chf
@pagelet('/moduser')
def moduser(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str],
config: Dict[str, str]) \
-> Union[str, bytes, PageletResponse]:
"""
The user modification page available from the admin panel.
"""
# If no user is logged in, redirect to the login page
if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars:
return RedirectResponse('/login')
authlevel: int = session_vars['authentication_level']
auth_uid: int = session_vars['authenticated_user']
# Show a 403 Forbidden error page if no user is logged in (0) or a user logged in via touchkey (1)
if authlevel < 2:
raise HttpException(403)
# Connect to the database
with MatematDatabase(config['DatabaseFile']) as db:
# Fetch the authenticated user
authuser = db.get_user(auth_uid)
if not authuser.is_admin:
# Show a 403 Forbidden error page if the user is not an admin
raise HttpException(403)
if 'userid' not in args:
# Show a 400 Bad Request error page if no users to edit was specified
# (should never happen during normal operation)
raise HttpException(400, '"userid" argument missing')
# Fetch the user to modify from the database
moduser_id = int(str(args.userid))
user = db.get_user(moduser_id)
# If the request contains a "change" parameter, delegate the change handling to the function below
if 'change' in args:
handle_change(args, user, authuser, db, config)
# If the user was deleted, redirect back to the admin page, as there is nothing to edit any more
if str(args.change) == 'del':
return RedirectResponse('/admin')
# Render the "Modify User" page
return TemplateResponse('moduser.html',
authuser=authuser, user=user, authlevel=authlevel,
setupname=config['InstanceName'])
def handle_change(args: RequestArguments, user: User, authuser: User, db: MatematDatabase, config: Dict[str, str]) \
-> None:
"""
Write the changes requested by an admin to the database.
:param args: The RequestArguments object passed to the pagelet.
:param user: The user to edit.
:param authuser: The user performing the modification.
:param db: The database facade where changes are written to.
:param config: The dictionary of config file entries from the [Pagelets] section.
"""
# Read the type of change requested by the admin, then switch over it
change = str(args.change)
# Admin requested deletion of the user
if change == 'del':
# Delete the user from the database
db.delete_user(user)
# Delete the user's avatar, if it exists
try:
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/users/')
os.remove(os.path.join(abspath, f'{user.id}.png'))
except FileNotFoundError:
pass
# Admin requested update of the user's details
elif change == 'update':
# Only write a change if all properties of the user are present in the request arguments
if 'username' not in args or 'email' not in args or 'password' not in args or 'balance' not in args:
return
# Read the properties from the request arguments
username = str(args.username)
email = str(args.email)
password = str(args.password)
balance = parse_chf(str(args.balance))
is_member = 'ismember' in args
is_admin = 'isadmin' in args
# An empty e-mail field should be interpreted as NULL
if len(email) == 0:
email = None
# Attempt to write the changes to the database
try:
# If a password was entered, replace the password in the database
if len(password) > 0:
db.change_password(user, '', password, verify_password=False)
# Write the user detail changes
db.change_user(user, agent=authuser, name=username, email=email, is_member=is_member, is_admin=is_admin,
balance=balance)
except DatabaseConsistencyError:
return
# If a new avatar was uploaded, process it
if 'avatar' in args:
# Read the raw image data from the request
avatar = bytes(args.avatar)
# Only process the image, if its size is more than zero. Zero size means no new image was uploaded
if len(avatar) == 0:
return
# Detect the MIME type
filemagic: magic.FileMagic = magic.detect_from_content(avatar)
# Currently, only image/png is supported, don't process any other formats
if filemagic.mime_type != 'image/png':
# TODO: Optionally convert to png
return
# Create the absolute path of the upload directory
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/users/')
os.makedirs(abspath, exist_ok=True)
# Write the image to the file
with open(os.path.join(abspath, f'{user.id}.png'), 'wb') as f:
f.write(avatar)

View file

@ -1,11 +1,8 @@
from typing import Any, Dict, Union
from typing import Any, Dict, Tuple, Union
import urllib.parse
from matemat.exceptions import AuthenticationError, HttpException from matemat.exceptions import AuthenticationError, HttpException
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.primitives import User from matemat.db.primitives import User
from matemat.db import MatematDatabase from matemat.db import MatematDatabase
@ -15,21 +12,35 @@ def touchkey_page(method: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str], headers: Dict[str, str],
config: Dict[str, str])\ config: Dict[str, str]) \
-> Union[bytes, str, PageletResponse]: -> Union[bytes, str, PageletResponse]:
if 'user' in session_vars: """
The touchkey login mechanism. If called via GET, render the UI template; if called via POST, attempt to log in with
the provided credentials (username and touchkey).
"""
# If a user is already logged in, simply redirect to the main page, showing the product list
if 'authenticated_user' in session_vars:
return RedirectResponse('/') return RedirectResponse('/')
# If requested via HTTP GET, render the login page showing the touchkey UI
if method == 'GET': if method == 'GET':
return TemplateResponse('touchkey.html', return TemplateResponse('touchkey.html',
username=str(args.username) if 'username' in args else None, username=str(args.username), uid=int(str(args.uid)),
setupname=config['InstanceName']) setupname=config['InstanceName'])
# If requested via HTTP POST, read the request arguments and attempt to log in with the provided credentials
elif method == 'POST': elif method == 'POST':
# Connect to the database
with MatematDatabase(config['DatabaseFile']) as db: with MatematDatabase(config['DatabaseFile']) as db:
try: try:
# Read the request arguments and attempt to log in with them
user: User = db.login(str(args.username), touchkey=str(args.touchkey)) user: User = db.login(str(args.username), touchkey=str(args.touchkey))
except AuthenticationError: except AuthenticationError:
quoted = urllib.parse.quote_plus(bytes(args.username)) # Reload the touchkey login page on failure
return RedirectResponse(f'/touchkey?username={quoted}') return RedirectResponse(f'/touchkey?uid={str(args.uid)}&username={str(args.username)}')
session_vars['user'] = user # Set the user ID session variable
session_vars['authenticated_user'] = user.id
# Set the authlevel session variable (0 = none, 1 = touchkey, 2 = password login)
session_vars['authentication_level'] = 1
# Redirect to the main page, showing the product list
return RedirectResponse('/') return RedirectResponse('/')
# If neither GET nor POST was used, show a 405 Method Not Allowed error page
raise HttpException(405) raise HttpException(405)

View file

@ -119,10 +119,12 @@ class MockServer:
# Set up logger # Set up logger
self.logger: logging.Logger = logging.getLogger('matemat unit test') self.logger: logging.Logger = logging.getLogger('matemat unit test')
self.logger.setLevel(logging.DEBUG) self.logger.setLevel(logging.DEBUG)
# Disable logging
self.logger.addHandler(logging.NullHandler())
# Initalize a log handler to stderr and set the log format # Initalize a log handler to stderr and set the log format
sh: logging.StreamHandler = logging.StreamHandler() # sh: logging.StreamHandler = logging.StreamHandler()
sh.setFormatter(logging.Formatter('%(asctime)s %(name)s [%(levelname)s]: %(message)s')) # sh.setFormatter(logging.Formatter('%(asctime)s %(name)s [%(levelname)s]: %(message)s'))
self.logger.addHandler(sh) # self.logger.addHandler(sh)
class MockSocket(bytes): class MockSocket(bytes):

View file

@ -17,7 +17,6 @@ def serve_test_pagelet_str(method: str,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str], headers: Dict[str, str],
pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]: pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'text/plain'
return 'serve test pagelet str' return 'serve test pagelet str'
@ -28,7 +27,7 @@ def serve_test_pagelet_bytes(method: str,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str], headers: Dict[str, str],
pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]: pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'application/octet-stream' headers['Content-Type'] = 'application/x-foo-bar'
return b'serve\x80test\xffpagelet\xfebytes' return b'serve\x80test\xffpagelet\xfebytes'
@ -49,7 +48,6 @@ def serve_test_pagelet_template(method: str,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str], headers: Dict[str, str],
pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]: pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'text/plain'
return TemplateResponse('test.txt', what='World') return TemplateResponse('test.txt', what='World')
@ -62,7 +60,6 @@ def serve_test_pagelet_fail(method: str,
headers: Dict[str, str], headers: Dict[str, str],
pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]: pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
session_vars['test'] = 'hello, world!' session_vars['test'] = 'hello, world!'
headers['Content-Type'] = 'text/plain'
raise HttpException(599, 'Error expected during unit testing') raise HttpException(599, 'Error expected during unit testing')
@ -81,6 +78,15 @@ class TestServe(AbstractHttpdTest):
with open(forbidden, 'w') as f: with open(forbidden, 'w') as f:
f.write('This should not be readable') f.write('This should not be readable')
os.chmod(forbidden, 0) os.chmod(forbidden, 0)
# Create a CSS resource whose MIME type should be detected by file extension
with open(os.path.join(self.tempdir.name, 'teststyle.css'), 'w') as f:
f.write('.ninja { display: none; }\n')
# Create a file without extension (containing UTF-16 text with BOM); libmagic should take over
with open(os.path.join(self.tempdir.name, 'testdata'), 'wb') as f:
f.write(b'\xFE\xFFH\x00e\x00l\x00l\x00o\x00,\x00 \x00w\x00o\x00r\x00l\x00d\x00!\x00\n\x00')
# Create a file that will yield "text/plain; charset=binary"
with open(os.path.join(self.tempdir.name, 'testbin.txt'), 'wb') as f:
f.write(b'\x00\x00\x00\x00\x00\x00\x00\x00')
def test_serve_pagelet_str(self): def test_serve_pagelet_str(self):
# Call the test pagelet that produces a 200 OK result # Call the test pagelet that produces a 200 OK result
@ -228,3 +234,41 @@ class TestServe(AbstractHttpdTest):
self.assertIsNone(packet.pagelet) self.assertIsNone(packet.pagelet)
# Make sure a 405 Method Not Allowed header is served # Make sure a 405 Method Not Allowed header is served
self.assertEqual(405, packet.statuscode) self.assertEqual(405, packet.statuscode)
def test_serve_pagelet_libmagic(self):
# The correct Content-Type header must be guessed, if a pagelet does not provide one
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_str HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
self.assertEqual('text/plain; charset=us-ascii', packet.headers['Content-Type'])
def test_serve_pagelet_libmagic_skipped(self):
# The Content-Type set by a pagelet should not be overwritten
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_bytes HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
self.assertEqual('application/x-foo-bar', packet.headers['Content-Type'])
def test_serve_static_mime_extension(self):
# The correct Content-Type should be guessed by file extension primarily
self.client_sock.set_request(b'GET /teststyle.css HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# libmagic would say text/plain instead
self.assertEqual('text/css; charset=us-ascii', packet.headers['Content-Type'])
def test_serve_static_mime_magic(self):
# The correct Content-Type should be guessed by file extension primarily
self.client_sock.set_request(b'GET /testdata HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Extension-based would fail, as there is no extension
self.assertEqual('text/plain; charset=utf-16be', packet.headers['Content-Type'])
def test_serve_static_mime_magic_binary(self):
# The correct Content-Type should be guessed by file extension primarily
self.client_sock.set_request(b'GET /testbin.txt HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# No charset should be in the header. Yes, this is a stupid example
self.assertEqual('text/plain', packet.headers['Content-Type'])

View file

@ -1 +1,2 @@
file-magic
jinja2 jinja2

5
run.sh Executable file
View file

@ -0,0 +1,5 @@
#!/bin/sh
export LD_PRELOAD=/usr/lib/libmagic.so.1
/usr/local/bin/python3 -m matemat /etc/matemat.conf /matemat.docker.conf

31
static/css/matemat.css Normal file
View file

@ -0,0 +1,31 @@
.thumblist-item {
display: inline-block;
margin: 10px;
padding: 10px;
background: #f0f0f0;
text-decoration: none;
}
.thumblist-item a {
text-decoration: none;
}
.thumblist-item .imgcontainer {
width: 150px;
height: 150px;
position: relative;
}
.thumblist-item .imgcontainer img {
max-width: 150px;
max-height: 150px;
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
}
.thumblist-title {
display: block;
font-weight: bolder;
}

268
static/js/touchkey.js Normal file
View file

@ -0,0 +1,268 @@
/**
* Initialize the touchkey setup.
*
* Requires an empty SVG container, and a HTML input tag (recommended: type="hidden") to write the string representation
* to. Can additionally be provided with the ID of a HTML form which should be auto-submitted after touchkey entry.
*
* Example:
*
* <form id="touchkey-form" method="post" action="/login">
* <svg id="touchkey-svg"></svg>
* <input type="hidden" name="touchkey" id="touchkey-field" />
* </form>
* <script>
* initTouchkey(false, "touchkey-svg", "touchkey-form", "touchkey-field");
* </script>
*
* @param {boolean} keepPattern: Whether to keep the pattern on screen, or to clear it after the end event.
* @param {string} svgid: HTML id of the SVG container the touchkey is drawn in.
* @param {string} formid: HTML id of the form that should be submitted after touchkey entry. null to disable
* auto-submit.
* @param {string} formfieldid: ID of the input object that should receive the entered touchkey as its value.
*/
initTouchkey = (keepPattern, svgid, formid, formfieldid) => {
// Define forEach (iteration) and slice (abused for shallow copying) on HTMLCollections (reuse the Array methods)
HTMLCollection.prototype.forEach = Array.prototype.forEach;
HTMLCollection.prototype.slice = Array.prototype.slice;
// Max. distance to a nodes center, before the path snaps to the node.
// Expressed as inverse ratio of the container width
const SNAPPING_SENSITIVITY = 12;
// Get the DOM objects for the SVG, the form and the input field
let svg = document.getElementById(svgid);
let form;
if (formid !== null) {
form = document.getElementById(formid);
}
let formfield = document.getElementById(formfieldid);
// Reference to the SVG line object that's currently being drawn by the user, or null, if none
let currentStroke = null;
// ID generator for the SVG line objects drawn by the user
let strokeId = 0;
// Set of touchkey pattern nodes that have already been visited by a users pattern, and may not be reused again
let doneMap = {};
// The string representation of the touchkey entered by the user.
let enteredKey = '';
/**
* Helper function to create a new stroke after the user completed one stroke by connecting two pattern nodes.
*
* @param {number} fromX: X coordinate of the starting point of this line.
* @param {number} fromY: Y coordinate of the starting point of this line.
* @param {number} toX: X coordinate of the ending point of this line.
* @param {number} toY: Y coordinate of the ending point of this line.
* @returns {string} The ID of the generated line object.
*/
let drawLine = (fromX, fromY, toX, toY) => {
// Create a new SVG line object
let line = document.createElementNS('http://www.w3.org/2000/svg', 'line');
// Generate and set an unique ID for the line object
let id = 'touchkey-svg-stroke-' + (strokeId++);
line.setAttribute('id', id);
// Create and set the HTML class attribute
line.setAttribute('class', 'touchkey-svg-stroke');
// Create and set the coordinate attributes
line.setAttribute('x1', fromX.toString());
line.setAttribute('y1', fromY.toString());
line.setAttribute('x2', toX.toString());
line.setAttribute('y2', toY.toString());
// Create and set the style attribute (grey, circular ends, 5% width)
line.setAttribute('style', 'stroke: grey; stroke-width: 5%; stroke-linecap: round');
// Add the line to the SVG
svg.appendChild(line);
// Return the previously generated ID to identify the line object by
return id;
};
/**
* Helper function used to remove the "trailing stroke" (i.e. after the user let go, there is a dangling stroke from
* the node that was hit last to the mouse pointer/finger).
*/
let endPath = () => {
// Remove the current stroke ...
svg.removeChild(svg.getElementById(currentStroke));
// ... and set its reference to null
currentStroke = null;
};
/**
* Helper function used to clear the touchkey pattern drawn by the user.
*/
let clearTouchkey = () => {
// Reset the set of visited pattern nodes
doneMap = {};
// Reset the touchkey string representation
enteredKey = '';
// Remove all line objects. Create a shallow copy of the list first to retain deletion order.
svg.getElementsByClassName('touchkey-svg-stroke').slice().forEach((line) => {
svg.removeChild(line);
});
};
/**
* Helper function to read and convert the event coordinates of a MouseEvent or TouchEvent to coordinates relative
* to the SVG's origin.
*
* @param {(MouseEvent|TouchEvent)} ev: The event to get the X and Y coordinates from.
* @param {(ClientRect|DOMRect)} svgrect: Bounds rectangle of the SVG container.
* @returns {Array} The X and Y coordinates relative to the SVG's origin.
*/
let getEventCoordinates = (ev, svgrect) => {
// Check for existence of the "touches" property to distinguish between touch and mouse events
// For a touch event, take the page coordinates of the first touch
// For a mouse event, take the event coordinates
// Then subtract the SVG's origin from the page-relative coordinates to obtain the translated coordinates
const trX = (typeof ev.touches !== 'undefined' ? ev.touches[0].pageX : ev.x) - svgrect.left;
const trY = (typeof ev.touches !== 'undefined' ? ev.touches[0].pageY : ev.y) - svgrect.top;
return [trX, trY]
};
/**
* Find the pattern node closest to a coordinate.
*
* @param {number} evX: X coordinate of the point to search the closest node for.
* @param {number} evY: Y coordinate of the point to search the closest node for.
* @param {(ClientRect|DOMRect)} svgrect: Bounds rectangle of the SVG container.
* @returns {Array} The node's ID, the distance, the X and Y coordinate of the node's center.
*/
let getNearestPatternNode = (evX, evY, svgrect) => {
// Initialize nearest neighbors search variables
let minId = '';
let minDist2 = Infinity; // Squared distance
let minX = 0;
let minY = 0;
// Iterate the pattern nodes for nearest neighbors search
document.getElementsByClassName('touchkey-svg-node').forEach((node) => {
// Get the position of a node's center, converted from ratio into absolute pixel count
let x = parseFloat(node.getAttribute('cx')) / 100.0 * svgrect.width;
let y = parseFloat(node.getAttribute('cy')) / 100.0 * svgrect.height;
// Compute the squared distance from the event coordinate to the node's center
let dist2 = Math.pow(evX - x, 2) + Math.pow(evY - y, 2);
// Keep the properties of the closest node
if (dist2 < minDist2) {
minDist2 = dist2;
minId = node.dataset.stringRepresentation;
minX = x;
minY = y;
}
});
return [minId, Math.sqrt(minDist2), minX, minY];
};
/**
* Event handler for "mouse down" / "touch down" events.
*
* Selects an "anchor node", i.e. the node where the pattern path started, and draws a line from there to the event
* coordinates.
*/
svg.ontouchstart = svg.onmousedown = (ev) => {
// Remove any previous strokes that may still be there if "keepPattern" was set to true in the init call
clearTouchkey();
// Get the SVG container's rectangle
const svgrect = svg.getBoundingClientRect();
// Get the event coordinates relative to the SVG container's origin
const [trX, trY] = getEventCoordinates(ev, svgrect);
// Get the closest pattern node
const [minId, _, minX, minY] = getNearestPatternNode(trX, trY, svgrect);
// Create the line from the anchor node to the event position
currentStroke = drawLine(minX, minY, trX, trY);
// Mark the anchor node as visited
doneMap[minId] = 1;
// Add the anchor node's string representation to the touchkey string representation
enteredKey += minId;
};
/**
* Event handler for "mouse move" / "touch move" events.
*/
svg.ontouchmove = svg.onmousemove = (ev) => {
// Only act if the user started is drawing a pattern (only relevant for mouse input)
if (currentStroke != null) {
// Get the SVG container's rectangle
const svgrect = svg.getBoundingClientRect();
// Get the event coordinates relative to the SVG container's origin
const [trX, trY] = getEventCoordinates(ev, svgrect);
// Get the closest pattern node
const [minId, minDist, minX, minY] = getNearestPatternNode(trX, trY, svgrect);
// If the closest node is not visited yet, and the event coordinate is less than from the node's
// center, snap the current stroke to the node, and create a new stroke starting from this node
if (minDist < (svgrect.width / SNAPPING_SENSITIVITY) && !(minId in doneMap)) {
// Snap the current stroke to the node
let line = svg.getElementById(currentStroke);
line.setAttribute('x2', minX.toString());
line.setAttribute('y2', minY.toString());
// Create a new line object from the closest node to the event position
currentStroke = drawLine(minX, minY, trX, trY);
// Mark the closest node as visited
doneMap[minId] = 1;
// Append its string representation to the touchkey string representation
enteredKey += minId;
} else {
// If the stroke was not snapped to the closest node, update its end position
let line = svg.getElementById(currentStroke);
line.setAttribute('x2', trX);
line.setAttribute('y2', trY);
}
}
};
/**
* Event handler for "mouse up" / "touch end" events.
*
* Sets the input object value, and optionally clears the SVG path and submits the form.
*/
svg.ontouchend = svg.onmouseup = () => {
// Remove the trailing, unfinished stroke
endPath();
// Write the touchkey string representation to the input field
formfield.value = enteredKey;
// Erase the touchkey pattern, if requested in the init call
if (keepPattern !== true) {
clearTouchkey();
}
// Submit the HTML form, if requested in the init call
if (formid !== null) {
form.submit();
}
};
/*
* Create the SVG touchkey nodes
*/
// Node ID provider
let touchkey_node_counter = 0;
// Percentages for centers of quarters of the container's width and height
['12.5%', '37.5%', '62.5%', '87.5%'].forEach((y) => {
['12.5%', '37.5%', '62.5%', '87.5%'].forEach((x) => {
// Create a new pattern node
let node = document.createElementNS('http://www.w3.org/2000/svg', 'circle');
// Generate a new ID (and the touchkey string representation from it)
let id = touchkey_node_counter++;
node.dataset.stringRepresentation = id.toString(16).toLowerCase();
// Class
node.setAttribute('class', 'touchkey-svg-node');
// Center
node.setAttribute('cx', x);
node.setAttribute('cy', y);
// Radius
node.setAttribute('r', '10%');
// Center color
node.setAttribute('fill', 'white');
// Circle color
node.setAttribute('stroke', 'grey');
// Circle width
node.setAttribute('stroke-width', '2%');
// Add the node to the SVG container
svg.appendChild(node);
});
});
};

25
templates/admin.html Normal file
View file

@ -0,0 +1,25 @@
{% extends "base.html" %}
{% block header %}
{# If the logged in user is an administrator, call the title "Administration", otherwise "Settings" #}
{% if authuser.is_admin %}
<h1>Administration</h1>
{% else %}
<h1>Settings</h1>
{% endif %}
{{ super() }}
{% endblock %}
{% block main %}
{# Always show the settings a user can edit for itself #}
{% include "admin_all.html" %}
{# Only show the "restricted" section if the user is an admin #}
{% if authuser.is_admin %}
{% include "admin_restricted.html" %}
{% endif %}
{{ super() }}
{% endblock %}

68
templates/admin_all.html Normal file
View file

@ -0,0 +1,68 @@
<section id="admin-myaccount">
<h2>My Account</h2>
<form id="admin-myaccount-form" method="post" action="/admin?change=account" accept-charset="UTF-8">
<label for="admin-myaccount-username">Username: </label>
<input id="admin-myaccount-username" type="text" name="username" value="{{ authuser.name }}" /><br/>
<label for="admin-myaccount-email">E-Mail: </label>
<input id="admin-myaccount-email" type="text" name="email" value="{% if authuser.email is not none %}{{ authuser.email }}{% endif %}" /><br/>
<label for="admin-myaccount-ismember">Member: </label>
<input id="admin-myaccount-ismember" type="checkbox" disabled="disabled" {% if authuser.is_member %} checked="checked" {% endif %}/><br/>
<label for="admin-myaccount-isadmin">Admin: </label>
<input id="admin-myaccount-isadmin" type="checkbox" disabled="disabled" {% if authuser.is_admin %} checked="checked" {% endif %}/><br/>
<input type="submit" value="Save changes" />
</form>
</section>
<section id="admin-avatar">
<h2>Avatar</h2>
<form id="admin-avatar-form" method="post" action="/admin?change=avatar" enctype="multipart/form-data" accept-charset="UTF-8">
<img src="/upload/thumbnails/users/{{ authuser.id }}.png" alt="Avatar of {{ authuser.name }}" /><br/>
<label for="admin-avatar-avatar">Upload new file: </label>
<input id="admin-avatar-avatar" type="file" name="avatar" accept="image/png" /><br/>
<input type="submit" value="Save changes" />
</form>
</section>
<section id="admin-password">
<h2>Password</h2>
<form id="admin-password-form" method="post" action="/admin?change=password" accept-charset="UTF-8">
<label for="admin-password-oldpass">Current password: </label>
<input id="admin-password-oldpass" type="password" name="oldpass" /><br/>
<label for="admin-password-newpass">New password: </label>
<input id="admin-password-newpass" type="password" name="newpass" /><br/>
<label for="admin-password-newpass2">Repeat password: </label>
<input id="admin-password-newpass2" type="password" name="newpass2" /><br/>
<input type="submit" value="Save changes" />
</form>
</section>
<section id="admin-touchkey">
<h2>Touchkey</h2>
<form id="admin-touchkey-form" method="post" action="/admin?change=touchkey" accept-charset="UTF-8">
Draw a new touchkey (leave empty to disable):
<br/>
<svg id="touchkey-svg" width="400" height="400"></svg>
<br/>
<input id="admin-touchkey-touchkey" type="hidden" name="touchkey" value="" />
<input type="submit" value="Save changes" />
</form>
<script src="/js/touchkey.js" ></script>
<script>
initTouchkey(true, 'touchkey-svg', null, 'admin-touchkey-touchkey');
</script>
</section>

View file

@ -0,0 +1,90 @@
<section id="admin-restricted-newuser">
<h2>Create New User</h2>
<form id="admin-newuser-form" method="post" action="/admin?adminchange=newuser" accept-charset="UTF-8">
<label for="admin-newuser-username">Username: </label>
<input id="admin-newuser-username" type="text" name="username" /><br/>
<label for="admin-newuser-email">E-Mail (optional): </label>
<input id="admin-newuser-email" type="text" name="email" /><br/>
<label for="admin-newuser-password">Password: </label>
<input id="admin-newuser-password" type="password" name="password" /><br/>
<label for="admin-newuser-ismember">Member: </label>
<input id="admin-newuser-ismember" type="checkbox" name="ismember" /><br/>
<label for="admin-newuser-isadmin">Admin: </label>
<input id="admin-newuser-isadmin" type="checkbox" name="isadmin" /><br/>
<input type="submit" value="Create User" />
</form>
</section>
<section id="admin-restricted-moduser">
<h2>Modify User</h2>
<form id="admin-moduser-form" method="get" action="/moduser" accept-charset="UTF-8">
<label for="admin-moduser-userid">Username: </label>
<select id="admin-moduser-userid" name="userid">
{% for user in users %}
<option value="{{ user.id }}">{{ user.name }}</option>
{% endfor %}
</select><br/>
<input type="submit" value="Go" />
</form>
</section>
<section id="admin-restricted-newproduct">
<h2>Create New Product</h2>
<form id="admin-newproduct-form" method="post" action="/admin?adminchange=newproduct" enctype="multipart/form-data" accept-charset="UTF-8">
<label for="admin-newproduct-name">Name: </label>
<input id="admin-newproduct-name" type="text" name="name" /><br/>
<label for="admin-newproduct-price-member">Member price: </label>
<input id="admin-newproduct-price-member" type="number" min="0" name="pricemember" /><br/>
<label for="admin-newproduct-price-non-member">Non-member price: </label>
<input id="admin-newproduct-price-non-member" type="number" min="0" name="pricenonmember" /><br/>
<label for="admin-newproduct-image">Image: </label>
<input id="admin-newproduct-image" type="file" accept="image/png" /><br/>
<input type="submit" value="Create Product" />
</form>
</section>
<section id="admin-restricted-restock">
<h2>Restock Product</h2>
<form id="admin-restock-form" method="post" action="/admin?adminchange=restock" accept-charset="UTF-8">
<label for="admin-restock-productid">Product: </label>
<select id="admin-restock-productid" name="productid">
{% for product in products %}
<option value="{{ product.id }}">{{ product.name }} ({{ product.stock }})</option>
{% endfor %}
</select><br/>
<label for="admin-restock-amount">Amount: </label>
<input id="admin-restock-amount" type="number" min="0" name="amount" /><br/>
<input type="submit" value="Restock" />
</form>
</section>
<section id="admin-restricted-modproduct">
<h2>Modify Product</h2>
<form id="admin-modproduct-form" method="get" action="/modproduct" accept-charset="UTF-8">
<label for="admin-modproduct-productid">Product: </label>
<select id="admin-modproduct-productid" name="productid">
{% for product in products %}
<option value="{{ product.id }}">{{ product.name }}</option>
{% endfor %}
</select><br/>
<input type="submit" value="Go">
</form>
</section>

View file

@ -1,17 +1,55 @@
<!DOCTYPE html> <!DOCTYPE html>
<html> <html>
<head>
<title>{{ setupname }}</title> <head>
<style> {% block head %}
body { {# Show the setup name, as set in the config file, as tab title. Don't escape HTML entities. #}
color: #f0f0f0; <title>{{ setupname|safe }}</title>
background: #000000; <link rel="stylesheet" href="/css/matemat.css"/>
}
</style>
</head>
<body>
<h1>{{ setupname }}</h1>
{% block main %}
{% endblock %} {% endblock %}
</body> </head>
<body>
<header>
{% block header %}
{# Always show a link to the home page, either a list of users or of products. #}
<a href="/">Home</a>
{# Show a link to the settings, if a user logged in via password (authlevel 2). #}
{% if authlevel|default(0) > 1 %}
{% if authuser is defined %}
{# If a user is an admin, call the link "Administration". Otherwise, call it "Settings". #}
{% if authuser.is_admin %}
<a href="/admin">Administration</a>
{% else %}
<a href="/admin">Settings</a>
{% endif %}
{% endif %}
{% endif %}
{% endblock %}
</header>
<main>
{% block main %}
{# Here be content. #}
{% endblock %}
</main>
<footer>
{% block footer %}
{# Show some information in the footer, e.g. the instance name, the version, and copyright info. #}
<ul>
<li> {{ setupname|safe }}
<li> Matemat {{ __version__ }}
<li> &copy; 2018 s3lph
<li> MIT License
{# This used to be a link to the GitLab repo. However, users of the testing environment always clicked
that link and couldn't come back, because the UI was running in touch-only kiosk mode. #}
<li> gitlab.com/s3lph/matemat
</ul>
{% endblock %}
</footer>
</body>
</html> </html>

View file

@ -1,9 +1,23 @@
{% extends "base.html" %} {% extends "base.html" %}
{% block header %}
<h1>Welcome</h1>
{{ super() }}
{% endblock %}
{% block main %} {% block main %}
<form action="/login" method="post">
Username: <input type="text" name="username"/><br/> {# Show a username/password login form #}
Password: <input type="password" name="password" /><br/> <form method="post" action="/login" id="loginform" accept-charset="UTF-8">
<input type="submit" value="Login"/> <label for="login-username">Username: </label>
<input id="login-username" type="text" name="username"/><br/>
<label for="login-password">Password: </label>
<input id="login-password" type="password" name="password"/><br/>
<input type="submit" value="Login">
</form> </form>
{% endblock%}
{{ super() }}
{% endblock %}

View file

@ -1,22 +0,0 @@
{% extends "base.html" %}
{% block main %}
{{ user|default("") }}
<ul>
{% if user is defined %}
{% for l in list %}
<li/> <b>{{ l.name }}</b>
{% if user.is_member %}
{{ l.price_member }}
{% else %}
{{ l.price_non_member }}
{% endif %}
{% endfor %}
{% else %}
{% for l in list %}
<li/> <b><a href="/touchkey?username={{ l.name }}">{{ l.name }}</a></b>
{% endfor %}
<li/> <a href="/login">Password login</a>
{% endif %}
</ul>
{% endblock %}

45
templates/modproduct.html Normal file
View file

@ -0,0 +1,45 @@
{% extends "base.html" %}
{% block header %}
<h1>Administration</h1>
{{ super() }}
{% endblock %}
{% block main %}
<section id="modproduct">
<h2>Modify {{ product.name }}</h2>
<form id="modproduct-form" method="post" action="/modproduct?change=update" enctype="multipart/form-data" accept-charset="UTF-8">
<label for="modproduct-name">Name: </label>
<input id="modproduct-name" type="text" name="name" value="{{ product.name }}" /><br/>
<label for="modproduct-price-member">Member price: </label>
CHF <input id="modproduct-price-member" type="number" step="0.01" name="pricemember" value="{{ product.price_member|chf(False) }}" /><br/>
<label for="modproduct-price-non-member">Non-member price: </label>
CHF <input id="modproduct-price-non-member" type="number" step="0.01" name="pricenonmember" value="{{ product.price_non_member|chf(False) }}" /><br/>
<label for="modproduct-balance">Stock: </label>
<input id="modproduct-balance" name="stock" type="text" value="{{ product.stock }}" /><br/>
<label for="modproduct-image">
<img height="150" src="/upload/thumbnails/products/{{ product.id }}.png" alt="Image of {{ product.name }}" />
</label><br/>
<input id="modproduct-image" type="file" name="image" accept="image/png" /><br/>
<input id="modproduct-productid" type="hidden" name="productid" value="{{ product.id }}" /><br/>
<input type="submit" value="Save changes">
</form>
<form id="modproduct-delproduct-form" method="post" action="/modproduct?change=del" accept-charset="UTF-8">
<input id="modproduct-delproduct-productid" type="hidden" name="productid" value="{{ product.id }}" /><br/>
<input type="submit" value="Delete product" />
</form>
</section>
{{ super() }}
{% endblock %}

51
templates/moduser.html Normal file
View file

@ -0,0 +1,51 @@
{% extends "base.html" %}
{% block header %}
<h1>Administration</h1>
{{ super() }}
{% endblock %}
{% block main %}
<section id="moduser-account">
<h2>Modify {{ user.name }}</h2>
<form id="moduser-account-form" method="post" action="/moduser?change=update" enctype="multipart/form-data" accept-charset="UTF-8">
<label for="moduser-account-username">Username: </label>
<input id="moduser-account-username" type="text" name="username" value="{{ user.name }}" /><br/>
<label for="moduser-account-email">E-Mail: </label>
<input id="moduser-account-email" type="text" name="email" value="{% if user.email is not none %}{{ user.email }}{% endif %}" /><br/>
<label for="moduser-account-password">Password: </label>
<input id="moduser-account-password" type="password" name="password" /><br/>
<label for="moduser-account-ismember">Member: </label>
<input id="moduser-account-ismember" name="ismember" type="checkbox" {% if user.is_member %} checked="checked" {% endif %}/><br/>
<label for="moduser-account-isadmin">Admin: </label>
<input id="moduser-account-isadmin" name="isadmin" type="checkbox" {% if user.is_admin %} checked="checked" {% endif %}/><br/>
<label for="moduser-account-balance">Balance: </label>
CHF <input id="moduser-account-balance" name="balance" type="number" step="0.01" value="{{ user.balance|chf(False) }}" /><br/>
<label for="moduser-account-avatar">
<img height="150" src="/upload/thumbnails/users/{{ user.id }}.png" alt="Avatar of {{ user.name }}" />
</label><br/>
<input id="moduser-account-avatar" type="file" name="avatar" accept="image/png" /><br/>
<input id="moduser-account-userid" type="hidden" name="userid" value="{{ user.id }}" /><br/>
<input type="submit" value="Save changes">
</form>
<form id="moduser-deluser-form" method="post" action="/moduser?change=del" accept-charset="UTF-8">
<input id="moduser-deluser-userid" type="hidden" name="userid" value="{{ user.id }}" /><br/>
<input type="submit" value="Delete user" />
</form>
</section>
{{ super() }}
{% endblock %}

View file

@ -0,0 +1,42 @@
{% extends "base.html" %}
{% block header %}
{# Show the username. #}
<h1>Welcome, {{ authuser.name }}</h1>
{{ super() }}
{% endblock %}
{% block main %}
{# Show the users current balance #}
Your balance: {{ authuser.balance|chf }}
<br/>
{# Links to deposit two common amounts of cash. TODO: Will be replaced by a nicer UI later (#20) #}
<a href="/deposit?n=100">Deposit CHF 1</a><br/>
<a href="/deposit?n=1000">Deposit CHF 10</a><br/>
{% for product in products %}
{# Show an item per product, consisting of the name, image, price and stock, triggering a purchase on click #}
<div class="thumblist-item">
<a href="/buy?pid={{ product.id }}">
<span class="thumblist-title">{{ product.name }}</span>
<span class="thumblist-detail">Price:
{% if authuser.is_member %}
{{ product.price_member|chf }}
{% else %}
{{ product.price_non_member|chf }}
{% endif %}
; Stock: {{ product.stock }}</span><br/>
<div class="imgcontainer">
<img src="/upload/thumbnails/products/{{ product.id }}.png" alt="Picture of {{ product.name }}"/>
</div>
</a>
</div>
{% endfor %}
<br/>
{# Logout link #}
<a href="/logout">Logout</a>
{{ super() }}
{% endblock %}

View file

@ -1,9 +1,36 @@
{% extends "base.html" %} {% extends "base.html" %}
{% block main %} {% block head %}
<form action="/touchkey" method="post"> {{ super() }}
<input type="hidden" name="username" value="{{ username }}"/><br/> <style>
Touchkey: <input type="password" name="touchkey" /><br/> svg {
<input type="submit" value="Login"/> width: 600px;
</form> height: auto;
}
</style>
{% endblock %}
{% block header %}
<h1>Welcome, {{ username }}</h1>
{{ super() }}
{% endblock %}
{% block main %}
<svg id="touchkey-svg" width="400" height="400"></svg>
<form method="post" action="/touchkey" id="loginform" accept-charset="UTF-8">
<input type="hidden" name="uid" value="{{ uid }}" />
<input type="hidden" name="username" value="{{ username }}" />
<input type="hidden" name="touchkey" value="" id="loginform-touchkey-value" />
</form>
<a href="/">Cancel</a>
<script src="/js/touchkey.js"></script>
<script>
initTouchkey(false, 'touchkey-svg', 'loginform', 'loginform-touchkey-value');
</script>
{{ super() }}
{% endblock %} {% endblock %}

29
templates/userlist.html Normal file
View file

@ -0,0 +1,29 @@
{% extends "base.html" %}
{% block header %}
{# Show the setup name, as set in the config file, as page title. Don't escape HTML entities. #}
<h1>{{ setupname|safe }}</h1>
{{ super() }}
{% endblock %}
{% block main %}
{% for user in users %}
{# Show an item per user, consisting of the username, and the avatar, linking to the touchkey login #}
<div class="thumblist-item">
<a href="/touchkey?uid={{ user.id }}&username={{ user.name }}">
<span class="thumblist-title">{{ user.name }}</span><br/>
<div class="imgcontainer">
<img src="/upload/thumbnails/users/{{ user.id }}.png" alt="Avatar of {{ user.name }}"/>
</div>
</a>
</div>
{% endfor %}
<br/>
{# Link to the password login #}
<a href="/login">Password login</a>
{{ super() }}
{% endblock %}

11
testing/Dockerfile Normal file
View file

@ -0,0 +1,11 @@
FROM debian:buster
RUN useradd -d /home/matemat -m matemat
RUN mkdir -p /var/matemat/db && chown matemat:matemat -R /var/matemat/db
RUN mkdir -p /var/matemat/upload && chown matemat:matemat -R /var/matemat/upload
RUN apt-get update -qy
RUN apt-get install -y --no-install-recommends file sudo openssh-client git docker.io python3-dev python3-pip python3-coverage python3-setuptools build-essential
RUN pip3 install wheel pycodestyle mypy
WORKDIR /home/matemat

147
touchkey.html Normal file
View file

@ -0,0 +1,147 @@
<!DOCTYPE html>
<html>
<head>
<style>
svg {
width: 600px;
height: auto;
}
</style>
</head>
<body>
<h1>Welcome, {{username}}</h1>
<svg id="svg" width="400" height="400">
<circle class="c" id="0" cx="12.5%" cy="12.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="1" cx="37.5%" cy="12.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="2" cx="62.5%" cy="12.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="3" cx="87.5%" cy="12.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="4" cx="12.5%" cy="37.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="5" cx="37.5%" cy="37.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="6" cx="62.5%" cy="37.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="7" cx="87.5%" cy="37.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="8" cx="12.5%" cy="62.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="9" cx="37.5%" cy="62.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="a" cx="62.5%" cy="62.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="b" cx="87.5%" cy="62.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="c" cx="12.5%" cy="87.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="d" cx="37.5%" cy="87.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="e" cx="62.5%" cy="87.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
<circle class="c" id="f" cx="87.5%" cy="87.5%" r="10%" stroke="grey" stroke-width="2%" fill="white" />
</svg>
<script>
HTMLCollection.prototype.forEach = Array.prototype.forEach;
HTMLCollection.prototype.slice = Array.prototype.slice;
mouseDown = false;
currentStroke = null;
strokeId = 0;
doneMap = {};
enteredKey = '';
svg = document.getElementById('svg');
drawLine = (fromX, fromY, toX, toY) => {
var line = document.createElementNS('http://www.w3.org/2000/svg', 'line');
var id = 'l-' + (strokeId++);
var idAttr = document.createAttribute('id');
var classAttr = document.createAttribute('class');
var x1attr = document.createAttribute('x1');
var y1attr = document.createAttribute('y1');
var x2attr = document.createAttribute('x2');
var y2attr = document.createAttribute('y2');
var styleAttr = document.createAttribute('style');
idAttr.value = id;
classAttr.value = 'l';
x1attr.value = fromX;
y1attr.value = fromY;
x2attr.value = toX;
y2attr.value = toY;
styleAttr.value = 'stroke: grey; stroke-width: 5%; stroke-linecap: round';
line.setAttributeNode(idAttr);
line.setAttributeNode(classAttr);
line.setAttributeNode(x1attr);
line.setAttributeNode(y1attr);
line.setAttributeNode(x2attr);
line.setAttributeNode(y2attr);
line.setAttributeNode(styleAttr);
svg.appendChild(line);
return id;
};
svg.onmousedown = (ev) => {
var svgrect = svg.getBoundingClientRect();
var minId = '';
var minDist = Infinity;
var minx = 0;
var miny = 0;
doneMap = {};
document.getElementsByClassName('c').forEach((circle) => {
var x = parseFloat(circle.getAttribute('cx'))/100.0 * svgrect.width;
var y = parseFloat(circle.getAttribute('cy'))/100.0 * svgrect.height;
var dist = Math.pow(ev.offsetX - x, 2) + Math.pow(ev.offsetY - y, 2);
if (dist < minDist) {
minDist = dist;
minId = circle.id;
minx = x;
miny = y;
}
});
var minNode = svg.getElementById(minId);
currentStroke = drawLine(minx, miny, ev.offsetX, ev.offsetY);
doneMap[minId] = 1;
enteredKey += minId;
};
svg.onmouseup = (ev) => {
currentStroke = null;
doneMap = {};
console.log(enteredKey);
enteredKey = '';
svg.getElementsByClassName('l').slice().reverse().forEach((line) => {
svg.removeChild(line);
});
};
svg.onmousemove = (ev) => {
if (currentStroke != null) {
var svgrect = svg.getBoundingClientRect();
var minId = '';
var minDist = Infinity;
var minx = 0;
var miny = 0;
document.getElementsByClassName('c').forEach((circle) => {
var x = parseFloat(circle.getAttribute('cx'))/100.0 * svgrect.width;
var y = parseFloat(circle.getAttribute('cy'))/100.0 * svgrect.height;
var dist = Math.pow(ev.offsetX - x, 2) + Math.pow(ev.offsetY - y, 2);
if (dist < minDist) {
minDist = dist;
minId = circle.id;
minx = x;
miny = y;
}
});
if (minDist < 2000 && !(minId in doneMap)) {
var line = svg.getElementById(currentStroke);
line.setAttribute('x2', minx);
line.setAttribute('y2', miny);
currentStroke = drawLine(minx, miny, ev.offsetX, ev.offsetY);
doneMap[minId] = 1;
enteredKey += minId;
}
var line = svg.getElementById(currentStroke);
line.setAttribute('x2', ev.offsetX);
line.setAttribute('y2', ev.offsetY);
}
};
</script>
</body>
</html>