feat: add barcode login feature
All checks were successful
/ build_wheel (push) Successful in 2m4s
/ build_debian (push) Successful in 3m2s
/ test (push) Successful in 1m26s
/ codestyle (push) Successful in 31m13s

This commit is contained in:
s3lph 2024-12-01 21:58:16 +01:00
parent 66f23f5dda
commit 418cff7348
Signed by: s3lph
GPG key ID: 0AA29A52FB33CFB5
14 changed files with 363 additions and 7 deletions

View file

@ -1,5 +1,18 @@
# Matemat Changelog # Matemat Changelog
<!-- BEGIN RELEASE v0.3.17 -->
## Version 0.3.17
Add barcode login feature
### Changes
<!-- BEGIN CHANGES 0.3.17 -->
- feat: add barcode login feature
<!-- END CHANGES 0.3.17 -->
<!-- END RELEASE v0.3.17 -->
<!-- BEGIN RELEASE v0.3.16 --> <!-- BEGIN RELEASE v0.3.16 -->
## Version 0.3.16 ## Version 0.3.16

View file

@ -1,2 +1,2 @@
__version__ = '0.3.16' __version__ = '0.3.17'

View file

@ -5,7 +5,7 @@ import crypt
from hmac import compare_digest from hmac import compare_digest
from datetime import datetime, UTC from datetime import datetime, UTC
from matemat.db.primitives import User, Product, ReceiptPreference, Receipt, \ from matemat.db.primitives import User, Token, Product, ReceiptPreference, Receipt, \
Transaction, Consumption, Deposit, Modification Transaction, Consumption, Deposit, Modification
from matemat.exceptions import AuthenticationError, DatabaseConsistencyError from matemat.exceptions import AuthenticationError, DatabaseConsistencyError
from matemat.db import DatabaseWrapper from matemat.db import DatabaseWrapper
@ -358,6 +358,80 @@ class MatematDatabase(object):
raise DatabaseConsistencyError( raise DatabaseConsistencyError(
f'delete_user should affect 1 users row, but affected {affected}') f'delete_user should affect 1 users row, but affected {affected}')
def list_tokens(self, uid: int) -> List[Token]:
tokens: List[Token] = []
with self.db.transaction(exclusive=False) as c:
for row in c.execute('''
SELECT token_id, user_id, token, name, date
FROM tokens
WHERE user_id = :user_id
ORDER BY date
''', {'user_id': uid}):
token_id, user_id, token, name, date = row
tokens.append(
Token(token_id, user_id, token, name, datetime.fromtimestamp(date, UTC)))
return tokens
def get_token(self, tid: int) -> List[Token]:
with self.db.transaction(exclusive=False) as c:
c.execute('''
SELECT token_id, user_id, token, name, date
FROM tokens
WHERE token_id = :token_id
''', {'token_id': tid})
token_id, user_id, token, name, date = c.fetchone()
return Token(token_id, user_id, token, name, datetime.fromtimestamp(date, UTC))
def tokenlogin(self, token: str) -> User:
"""
Validte a user's token and return a User object on success.
:param token: The token to log in with.
:return: A tuple of (User, Token).
:raises ValueError: If none or both of password and touchkey are provided.
:raises AuthenticationError: If the user does not exist or the password or touchkey is wrong.
"""
if token is None:
raise ValueError('token must be provided')
with self.db.transaction(exclusive=False) as c:
c.execute('''
SELECT token_id, user_id, token, name, date
FROM tokens
WHERE token = :token
''', {'token': token})
row = c.fetchone()
if row is None:
raise AuthenticationError('Token does not exist')
token_id, user_id, token, name, date = row
user = self.get_user(user_id)
return user, Token(token_id, user_id, token, name, datetime.fromtimestamp(date, UTC))
def add_token(self, user: User, token: str, name: str):
with self.db.transaction() as c:
if name is None:
name = token[:3] + (len(token) - 3) * '*'
c.execute('''
INSERT INTO tokens (user_id, token, name)
VALUES (:user_id, :token, :name)
''', {
'user_id': user.id,
'token': token,
'name': name
})
c.execute('SELECT last_insert_rowid()')
token_id = int(c.fetchone()[0])
return Token(token_id, user.id, token, name)
def delete_token(self, token: Token):
with self.db.transaction() as c:
c.execute('''
DELETE FROM tokens
WHERE token_id = :token_id
''', {'token_id': token.id})
affected = c.execute('SELECT changes()').fetchone()[0]
if affected != 1:
raise DatabaseConsistencyError(
f'delete_token should affect 1 token row, but affected {affected}')
def list_products(self) -> List[Product]: def list_products(self) -> List[Product]:
""" """
Return a list of products in the database. Return a list of products in the database.

View file

@ -295,3 +295,17 @@ def migrate_schema_7_to_8(c: sqlite3.Cursor):
c.execute(''' c.execute('''
CREATE UNIQUE INDEX _matemat_products_ean_unique ON products(ean) CREATE UNIQUE INDEX _matemat_products_ean_unique ON products(ean)
''') ''')
def migrate_schema_8_to_9(c: sqlite3.Cursor):
c.execute('''
CREATE TABLE tokens (
token_id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
token TEXT UNIQUE NOT NULL,
name TEXT UNIQUE NOT NULL,
date INTEGER(8) DEFAULT (STRFTIME('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE CASCADE ON UPDATE CASCADE
)
''')

View file

@ -0,0 +1,40 @@
from typing import Optional
from datetime import datetime, UTC
class Token:
"""
Representation of an authentication token (such as a barcode) associated with a user.
:param _id: The token ID in the database.
:param user_id: The ID of the user this token belongs to.
:param token: The token secret.
:param name: The display name of the token:
:param date: The date the tokenw as created.
"""
def __init__(self,
_id: int,
user_id: int,
token: str,
name: Optional[str] = None,
date: Optional[datetime] = None) -> None:
self.id: int = _id
self.user_id: int = user_id
self.token: str = token
self.name = name
self.date = date
def __eq__(self, other) -> bool:
if not isinstance(other, Token):
return False
return self.id == other.id and \
self.user_id == other.user_id and \
self.token == other.token and \
self.name == other.name and \
self.date == other.date
def __hash__(self) -> int:
return hash((self.id, self.user_id, self.token, self.name, self.date))

View file

@ -3,6 +3,7 @@ This package provides the 'primitive types' the Matemat software deals with - na
""" """
from .User import User from .User import User
from .Token import Token
from .Product import Product from .Product import Product
from .ReceiptPreference import ReceiptPreference from .ReceiptPreference import ReceiptPreference
from .Transaction import Transaction, Consumption, Deposit, Modification from .Transaction import Transaction, Consumption, Deposit, Modification

View file

@ -576,3 +576,96 @@ SCHEMAS[8] = [
ON DELETE SET NULL ON UPDATE CASCADE ON DELETE SET NULL ON UPDATE CASCADE
); );
'''] ''']
SCHEMAS[9] = [
'''
CREATE TABLE users (
user_id INTEGER PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
email TEXT DEFAULT NULL,
password TEXT NOT NULL,
touchkey TEXT DEFAULT NULL,
is_admin INTEGER(1) NOT NULL DEFAULT 0,
is_member INTEGER(1) NOT NULL DEFAULT 1,
balance INTEGER(8) NOT NULL DEFAULT 0,
lastchange INTEGER(8) NOT NULL DEFAULT 0,
receipt_pref INTEGER(1) NOT NULL DEFAULT 0,
created INTEGER(8) NOT NULL DEFAULT 0,
logout_after_purchase INTEGER(1) DEFAULT 0
);
''',
'''
CREATE TABLE products (
product_id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
stock INTEGER(8) DEFAULT 0,
stockable INTEGER(1) DEFAULT 1,
price_member INTEGER(8) NOT NULL,
price_non_member INTEGER(8) NOT NULL,
custom_price INTEGER(1) DEFAULT 0,
ean TEXT UNIQUE DEFAULT NULL
);
''',
'''
CREATE TABLE transactions ( -- "superclass" of the following 3 tables
ta_id INTEGER PRIMARY KEY,
user_id INTEGER DEFAULT NULL,
value INTEGER(8) NOT NULL,
old_balance INTEGER(8) NOT NULL,
date INTEGER(8) DEFAULT (STRFTIME('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE SET NULL ON UPDATE CASCADE
);
''',
'''
CREATE TABLE consumptions ( -- transactions involving buying a product
ta_id INTEGER PRIMARY KEY,
product TEXT NOT NULL,
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''',
'''
CREATE TABLE deposits ( -- transactions involving depositing cash
ta_id INTEGER PRIMARY KEY,
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''',
'''
CREATE TABLE modifications ( -- transactions involving balance modification by an admin
ta_id INTEGER NOT NULL,
agent TEXT NOT NULL,
reason TEXT DEFAULT NULL,
PRIMARY KEY (ta_id),
FOREIGN KEY (ta_id) REFERENCES transactions(ta_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''',
'''
CREATE TABLE receipts ( -- receipts sent to the users
receipt_id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
first_ta_id INTEGER DEFAULT NULL,
last_ta_id INTEGER DEFAULT NULL,
date INTEGER(8) DEFAULT (STRFTIME('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE CASCADE ON UPDATE CASCADE,
FOREIGN KEY (first_ta_id) REFERENCES transactions(ta_id)
ON DELETE SET NULL ON UPDATE CASCADE,
FOREIGN KEY (last_ta_id) REFERENCES transactions(ta_id)
ON DELETE SET NULL ON UPDATE CASCADE
);
''',
'''
CREATE TABLE tokens ( -- authentication tokens such as barcodes
token_id INTEGER PRIMARY KEY,
user_id INTEGER NOT NULL,
token TEXT UNIQUE NOT NULL,
name TEXT UNIQUE NOT NULL,
date INTEGER(8) DEFAULT (STRFTIME('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES users(user_id)
ON DELETE CASCADE ON UPDATE CASCADE
);
''']

View file

@ -40,7 +40,7 @@ class DatabaseTransaction(object):
class DatabaseWrapper(object): class DatabaseWrapper(object):
SCHEMA_VERSION = 8 SCHEMA_VERSION = 9
def __init__(self, filename: str) -> None: def __init__(self, filename: str) -> None:
self._filename: str = filename self._filename: str = filename
@ -93,6 +93,10 @@ class DatabaseWrapper(object):
migrate_schema_6_to_7(c) migrate_schema_6_to_7(c)
if from_version <= 7 and to_version >= 8: if from_version <= 7 and to_version >= 8:
migrate_schema_7_to_8(c) migrate_schema_7_to_8(c)
if from_version <= 7 and to_version >= 8:
migrate_schema_7_to_8(c)
if from_version <= 8 and to_version >= 9:
migrate_schema_8_to_9(c)
def connect(self) -> None: def connect(self) -> None:
if self.is_connected(): if self.is_connected():

View file

@ -30,7 +30,7 @@ def admin():
redirect('/login') redirect('/login')
authlevel: int = session.get(session_id, 'authentication_level') authlevel: int = session.get(session_id, 'authentication_level')
uid: int = session.get(session_id, 'authenticated_user') uid: int = session.get(session_id, 'authenticated_user')
# Show a 403 Forbidden error page if no user is logged in (0) or a user logged in via touchkey (1) # Show a 403 Forbidden error page if no user is logged in (0) or a user logged in via touchkey or token (1)
if authlevel < 2: if authlevel < 2:
abort(403) abort(403)
@ -39,19 +39,20 @@ def admin():
# Fetch the authenticated user # Fetch the authenticated user
user = db.get_user(uid) user = db.get_user(uid)
# If the POST request contains a "change" parameter, delegate the change handling to the function below # If the POST request contains a "change" parameter, delegate the change handling to the function below
if request.method == 'POST' and 'change' in request.params: if 'change' in request.params:
handle_change(request.params, request.files, user, db) handle_change(request.params, request.files, user, db)
# If the POST request contains an "adminchange" parameter, delegate the change handling to the function below # If the POST request contains an "adminchange" parameter, delegate the change handling to the function below
elif request.method == 'POST' and 'adminchange' in request.params and user.is_admin: elif 'adminchange' in request.params and user.is_admin:
handle_admin_change(request.params, request.files, db) handle_admin_change(request.params, request.files, db)
# Fetch all existing users and products from the database # Fetch all existing users and products from the database
users = db.list_users() users = db.list_users()
tokens = db.list_tokens(uid)
products = db.list_products() products = db.list_products()
# Render the "Admin/Settings" page # Render the "Admin/Settings" page
now = str(int(datetime.now(UTC).timestamp())) now = str(int(datetime.now(UTC).timestamp()))
return template.render('admin.html', return template.render('admin.html',
authuser=user, authlevel=authlevel, users=users, products=products, authuser=user, authlevel=authlevel, tokens=tokens, users=users, products=products,
receipt_preference_class=ReceiptPreference, now=now, receipt_preference_class=ReceiptPreference, now=now,
setupname=config['InstanceName'], config_smtp_enabled=config['SmtpSendReceipts']) setupname=config['InstanceName'], config_smtp_enabled=config['SmtpSendReceipts'])
@ -122,6 +123,36 @@ def handle_change(args: FormsDict, files: FormsDict, user: User, db: MatematData
# Write the new touchkey to the database # Write the new touchkey to the database
db.change_touchkey(user, '', touchkey, verify_password=False) db.change_touchkey(user, '', touchkey, verify_password=False)
# The user added a new token
elif change == 'addtoken':
if 'token' not in args:
return
token = str(args.token)
if len(token) < 6:
return
name = None if 'name' not in args or len(args.name) == 0 else str(args.name)
try:
tokobj = db.add_token(user, token, name)
Notification.success(f'Token {tokobj.name} created successfully')
except DatabaseConsistencyError:
Notification.error('Token already exists', decay=True)
elif change == 'deltoken':
try:
tokid = int(str(request.params.token))
token = db.get_token(tokid)
except Exception as e:
Notification.error('Token not found', decay=True)
return
if token.user_id != user.id:
Notification.error('Token not found', decay=True)
return
try:
db.delete_token(token)
except DatabaseConsistencyError:
Notification.error(f'Failed to delete token {token.name}', decay=True)
Notification.success(f'Token {token.name} removed', decay=True)
# The user requested an avatar change # The user requested an avatar change
elif change == 'avatar': elif change == 'avatar':
# The new avatar field must be present # The new avatar field must be present

View file

@ -3,6 +3,7 @@ from datetime import datetime, UTC
from bottle import route, redirect, request from bottle import route, redirect, request
from matemat.db import MatematDatabase from matemat.db import MatematDatabase
from matemat.exceptions import AuthenticationError
from matemat.webserver import template, session from matemat.webserver import template, session
from matemat.webserver.template import Notification from matemat.webserver.template import Notification
from matemat.webserver.config import get_app_config, get_stock_provider from matemat.webserver.config import get_app_config, get_stock_provider
@ -28,7 +29,19 @@ def main_page():
Notification.success( Notification.success(
f'Login will purchase <strong>{buyproduct.name}</strong>. Click <a href="/">here</a> to abort.') f'Login will purchase <strong>{buyproduct.name}</strong>. Click <a href="/">here</a> to abort.')
except ValueError: except ValueError:
if not session.has(session_id, 'authenticated_user'):
try:
user, token = db.tokenlogin(str(request.params.ean))
# Set the user ID session variable
session.put(session_id, 'authenticated_user', user.id)
# Set the authlevel session variable (0 = none, 1 = touchkey/token, 2 = password login)
session.put(session_id, 'authentication_level', 1)
redirect('/')
except AuthenticationError:
# Redirect to main page on token login error
redirect('/')
Notification.error(f'EAN code {request.params.ean} is not associated with any product.', decay=True) Notification.error(f'EAN code {request.params.ean} is not associated with any product.', decay=True)
redirect('/')
# Check whether a user is logged in # Check whether a user is logged in
if session.has(session_id, 'authenticated_user'): if session.has(session_id, 'authenticated_user'):

View file

@ -88,6 +88,20 @@ def handle_change(args: FormsDict, files: FormsDict, user: User, authuser: User,
except FileNotFoundError: except FileNotFoundError:
pass pass
elif change == 'deltoken':
try:
tokid = id(str(request.params.token))
token = db.get_token(tokid)
except Exception as e:
Notification.error('Token not found', decay=True)
return
try:
db.delete_token(token)
except DatabaseConsistencyError:
Notification.error(f'Failed to delete token {token.name}', decay=True)
Notification.success(f'Token {token.name} removed', decay=True)
return
# Admin requested update of the user's details # Admin requested update of the user's details
elif change == 'update': elif change == 'update':
# Only write a change if all properties of the user are present in the request arguments # Only write a change if all properties of the user are present in the request arguments

View file

@ -25,8 +25,15 @@
{% endblock %} {% endblock %}
{% block eanwebsocket %} {% block eanwebsocket %}
{% if authuser.is_admin %}
let eaninput = document.getElementById("admin-newproduct-ean"); let eaninput = document.getElementById("admin-newproduct-ean");
eaninput.value = e.data; eaninput.value = e.data;
eaninput.select(); eaninput.select();
eaninput.scrollIntoView(); eaninput.scrollIntoView();
{% else %}
let tokeninput = document.getElementById("admin-newtoken-token");
tokeninput.value = e.data;
tokeninput.select();
tokeninput.scrollIntoView();
{% endif %}
{% endblock %} {% endblock %}

View file

@ -78,3 +78,36 @@
initTouchkey(true, 'touchkey-svg', null, 'admin-touchkey-touchkey'); initTouchkey(true, 'touchkey-svg', null, 'admin-touchkey-touchkey');
</script> </script>
</section> </section>
<section id="admin-tokens">
<h2>Tokens</h2>
<strong>Warning:</strong>
Login tokens are a convenience feature that if used may weaken security.
Make sure you only use tokens not easily accessible to other people.
<form id="admin-newtoken-form" method="post" action="/admin?change=addtoken" accept-charset="UTF-8">
<table border="1">
<tr>
<th>Token</th>
<th>Name</th>
<th>Created</th>
<th>Actions</th>
</tr>
<tr>
<td><input id="admin-newtoken-token" type="password" name="token" value=""></td>
<td><input id="admin-newtoken-name" type="text" name="name" value=""></td>
<td></td>
<td><input type="submit" value="Create Token"></td>
</tr>
{% for token in tokens %}
<tr>
<td>••••••••</td>
<td>{{ token.name }}</td>
<td>{{ token.date }}</td>
<td><a style="text-decoration: none; color: #ff0000;" href="/admin?change=deltoken&token={{ token.id }}">🗑</a></td>
</tr>
{% endfor %}
</table>
</form>
</section>

View file

@ -54,6 +54,25 @@
<input type="submit" value="Save changes"> <input type="submit" value="Save changes">
</form> </form>
<h2>Tokens</h2>
<table border="1">
<tr>
<th>Token</th>
<th>Name</th>
<th>Created</th>
<th>Actions</th>
</tr>
{% for token in tokens %}
<tr>
<td>••••••••</td>
<td>{{ token.name }}</td>
<td>{{ token.date }}</td>
<td><a style="text-decoration: none; color: #ff0000;" href="/moduser?change=deltoken&token={{ token.id }}">🗑</a></td>
</tr>
{% endfor %}
</table>
<form id="moduser-deluser-form" method="post" action="/moduser?change=del" accept-charset="UTF-8"> <form id="moduser-deluser-form" method="post" action="/moduser?change=del" accept-charset="UTF-8">
<input id="moduser-deluser-userid" type="hidden" name="userid" value="{{ user.id }}" /><br/> <input id="moduser-deluser-userid" type="hidden" name="userid" value="{{ user.id }}" /><br/>
<input type="submit" value="Delete user" /> <input type="submit" value="Delete user" />