s3lph
67e2a813d5
feat: split user settings and admin settings fix: list user tokens in admin user settings feat!: remove osk, osk should be provided by kiosk browser
193 lines
9.1 KiB
Python
193 lines
9.1 KiB
Python
import os
|
|
from datetime import datetime, UTC
|
|
from io import BytesIO
|
|
from shutil import copyfile
|
|
|
|
import magic
|
|
from PIL import Image
|
|
from bottle import get, post, abort, redirect, request, FormsDict
|
|
|
|
from matemat.db import MatematDatabase
|
|
from matemat.db.primitives import User, ReceiptPreference
|
|
from matemat.exceptions import AuthenticationError, DatabaseConsistencyError
|
|
from matemat.util.currency_format import parse_chf
|
|
from matemat.webserver import session, template
|
|
from matemat.webserver.config import get_app_config, get_stock_provider
|
|
from matemat.webserver.template import Notification
|
|
|
|
|
|
@get('/admin')
|
|
@post('/admin')
|
|
def admin():
|
|
"""
|
|
The admin panel, shows settings to modify other users and products.
|
|
"""
|
|
config = get_app_config()
|
|
session_id: str = session.start()
|
|
# If no user is logged in, redirect to the login page
|
|
if not session.has(session_id, 'authentication_level') or not session.has(session_id, 'authenticated_user'):
|
|
redirect('/login')
|
|
authlevel: int = session.get(session_id, 'authentication_level')
|
|
uid: int = session.get(session_id, 'authenticated_user')
|
|
# Show a 403 Forbidden error page if no user is logged in (0) or a user logged in via touchkey or token (1)
|
|
if authlevel < 2:
|
|
abort(403)
|
|
|
|
# Connect to the database
|
|
with MatematDatabase(config['DatabaseFile']) as db:
|
|
# Fetch the authenticated user
|
|
user = db.get_user(uid)
|
|
# If the POST request contains an "adminchange" parameter, delegate the change handling to the function below
|
|
if 'adminchange' in request.params and user.is_admin:
|
|
handle_change(request.params, request.files, db)
|
|
|
|
# Fetch all existing users and products from the database
|
|
users = db.list_users()
|
|
tokens = db.list_tokens(uid)
|
|
products = db.list_products()
|
|
# Render the "Admin" page
|
|
now = str(int(datetime.now(UTC).timestamp()))
|
|
return template.render('admin.html',
|
|
authuser=user, authlevel=authlevel, tokens=tokens, users=users, products=products,
|
|
receipt_preference_class=ReceiptPreference, now=now,
|
|
setupname=config['InstanceName'], config_smtp_enabled=config['SmtpSendReceipts'])
|
|
|
|
|
|
def handle_change(args: FormsDict, files: FormsDict, db: MatematDatabase):
|
|
"""
|
|
Write the changes requested by an admin for users of products.
|
|
|
|
:param args: The RequestArguments object passed to the pagelet.
|
|
:param db: The database facade where changes are written to.
|
|
"""
|
|
config = get_app_config()
|
|
try:
|
|
# Read the type of change requested by the admin, then switch over it
|
|
change = str(args.adminchange)
|
|
|
|
# The user requested to create a new user
|
|
if change == 'newuser':
|
|
# Only create a new user if all required properties of the user are present in the request arguments
|
|
if 'username' not in args or 'password' not in args:
|
|
return
|
|
# Read the properties from the request arguments
|
|
username = str(args.username)
|
|
email = None
|
|
if 'email' in args and len(args.email) > 0:
|
|
# An empty e-mail field should be interpreted as NULL
|
|
email = str(args.email)
|
|
password = str(args.password)
|
|
is_member = 'ismember' in args
|
|
is_admin = 'isadmin' in args
|
|
logout_after_purchase = 'logout_after_purchase' in args
|
|
# Create the user in the database
|
|
newuser: User = db.create_user(username, password, email, member=is_member, admin=is_admin,
|
|
logout_after_purchase=logout_after_purchase)
|
|
|
|
# If a default avatar is set, copy it to the user's avatar path
|
|
|
|
# Create the absolute path of the upload directory
|
|
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/users/')
|
|
# Derive the individual paths
|
|
default: str = os.path.join(abspath, 'default.png')
|
|
userimg: str = os.path.join(abspath, f'{newuser.id}.png')
|
|
# Copy the default image, if it exists
|
|
if os.path.exists(default):
|
|
copyfile(default, userimg, follow_symlinks=True)
|
|
|
|
# The user requested to create a new product
|
|
elif change == 'newproduct':
|
|
# Only create a new product if all required properties of the product are present in the request arguments
|
|
for key in ['name', 'pricemember', 'pricenonmember']:
|
|
if key not in args:
|
|
return
|
|
# Read the properties from the request arguments
|
|
name = str(args.name)
|
|
price_member = parse_chf(str(args.pricemember))
|
|
price_non_member = parse_chf(str(args.pricenonmember))
|
|
custom_price = 'custom_price' in args
|
|
stockable = 'stockable' in args
|
|
ean = str(args.ean) or None
|
|
# Create the product in the database
|
|
newproduct = db.create_product(name, price_member, price_non_member, custom_price, stockable, ean)
|
|
# If a new product image was uploaded, process it
|
|
image = files.image.file.read() if 'image' in files else None
|
|
if image is not None and len(image) > 0:
|
|
# Detect the MIME type
|
|
filemagic: magic.FileMagic = magic.detect_from_content(image)
|
|
if not filemagic.mime_type.startswith('image/'):
|
|
return
|
|
# Create the absolute path of the upload directory
|
|
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/products/')
|
|
os.makedirs(abspath, exist_ok=True)
|
|
try:
|
|
# Parse the image data
|
|
image: Image = Image.open(BytesIO(image))
|
|
# Resize the image to 150x150
|
|
image.thumbnail((150, 150), Image.LANCZOS)
|
|
# Write the image to the file
|
|
image.save(os.path.join(abspath, f'{newproduct.id}.png'), 'PNG')
|
|
except OSError as e:
|
|
Notification.error(str(e), decay=True)
|
|
return
|
|
else:
|
|
# If no image was uploaded and a default avatar is set, copy it to the product's avatar path
|
|
|
|
# Create the absolute path of the upload directory
|
|
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/products/')
|
|
# Derive the individual paths
|
|
default: str = os.path.join(abspath, 'default.png')
|
|
userimg: str = os.path.join(abspath, f'{newproduct.id}.png')
|
|
# Copy the default image, if it exists
|
|
if os.path.exists(default):
|
|
copyfile(default, userimg, follow_symlinks=True)
|
|
|
|
# The user requested to restock a product
|
|
elif change == 'restock':
|
|
stock_provider = get_stock_provider()
|
|
if not stock_provider.needs_update():
|
|
return
|
|
# Only restock a product if all required properties are present in the request arguments
|
|
if 'productid' not in args or 'amount' not in args:
|
|
return
|
|
# Read the properties from the request arguments
|
|
productid = int(str(args.productid))
|
|
amount = int(str(args.amount))
|
|
# Fetch the product to restock from the database
|
|
product = db.get_product(productid)
|
|
if not product.stockable:
|
|
return
|
|
stock_provider.update_stock(product, amount)
|
|
|
|
# The user requested to set default images
|
|
elif change == 'defaultimg':
|
|
# Iterate the possible images to set
|
|
for category in 'users', 'products':
|
|
if category not in files:
|
|
continue
|
|
# Read the raw image data from the request
|
|
default: bytes = files[category].file.read()
|
|
# Only process the image, if its size is more than zero. Zero size means no new image was uploaded
|
|
if len(default) == 0:
|
|
continue
|
|
# Detect the MIME type
|
|
filemagic: magic.FileMagic = magic.detect_from_content(default)
|
|
if not filemagic.mime_type.startswith('image/'):
|
|
continue
|
|
# Create the absolute path of the upload directory
|
|
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), f'thumbnails/{category}/')
|
|
os.makedirs(abspath, exist_ok=True)
|
|
try:
|
|
# Parse the image data
|
|
image: Image = Image.open(BytesIO(default))
|
|
# Resize the image to 150x150
|
|
image.thumbnail((150, 150), Image.LANCZOS)
|
|
# Write the image to the file
|
|
image.save(os.path.join(abspath, f'default.png'), 'PNG')
|
|
except OSError as e:
|
|
Notification.error(str(e), decay=True)
|
|
return
|
|
|
|
except Exception as e:
|
|
Notification.error(str(e), decay=True)
|
|
return
|