matemat/matemat/webserver/pagelets/admin.py

227 lines
10 KiB
Python

from typing import Any, Dict, Union
import os
import magic
from io import BytesIO
from PIL import Image
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.util.currency_format import parse_chf
from matemat.db import MatematDatabase
from matemat.db.primitives import User, ReceiptPreference
from matemat.exceptions import DatabaseConsistencyError, HttpException
@pagelet('/admin')
def admin(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str],
config: Dict[str, str]) \
-> Union[str, bytes, PageletResponse]:
"""
The admin panel, shows a user's own settings. Additionally, for administrators, settings to modify other users and
products are shown.
"""
# If no user is logged in, redirect to the login page
if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars:
return RedirectResponse('/login')
authlevel: int = session_vars['authentication_level']
uid: int = session_vars['authenticated_user']
# Show a 403 Forbidden error page if no user is logged in (0) or a user logged in via touchkey (1)
if authlevel < 2:
raise HttpException(403)
# Connect to the database
with MatematDatabase(config['DatabaseFile']) as db:
# Fetch the authenticated user
user = db.get_user(uid)
# If the POST request contains a "change" parameter, delegate the change handling to the function below
if method == 'POST' and 'change' in args:
handle_change(args, user, db, config)
# If the POST request contains an "adminchange" parameter, delegate the change handling to the function below
elif method == 'POST' and 'adminchange' in args and user.is_admin:
handle_admin_change(args, db, config)
# Fetch all existing users and products from the database
users = db.list_users()
products = db.list_products()
# Render the "Admin/Settings" page
return TemplateResponse('admin.html',
authuser=user, authlevel=authlevel, users=users, products=products,
receipt_preference_class=ReceiptPreference,
setupname=config['InstanceName'], config_smtp_enabled=config['SmtpSendReceipts'])
def handle_change(args: RequestArguments, user: User, db: MatematDatabase, config: Dict[str, str]) -> None:
"""
Write the changes requested by a user for its own account to the database.
:param args: The RequestArguments object passed to the pagelet.
:param user: The user to edit.
:param db: The database facade where changes are written to.
:param config: The dictionary of config file entries from the [Pagelets] section.
"""
try:
# Read the type of change requested by the user, then switch over it
change = str(args.change)
# The user requested a modification of its general account information (username, email)
if change == 'account':
# Username and email must be set in the request arguments
if 'username' not in args or 'email' not in args:
return
username = str(args.username)
email = str(args.email)
# An empty e-mail field should be interpreted as NULL
if len(email) == 0:
email = None
try:
receipt_pref = ReceiptPreference(int(str(args.receipt_pref)))
except ValueError:
return
# Attempt to update username, e-mail and receipt preference
try:
db.change_user(user, agent=None, name=username, email=email, receipt_pref=receipt_pref)
except DatabaseConsistencyError:
return
# The user requested a password change
elif change == 'password':
# The old password and 2x the new password must be present
if 'oldpass' not in args or 'newpass' not in args or 'newpass2' not in args:
return
# Read the passwords from the request arguments
oldpass = str(args.oldpass)
newpass = str(args.newpass)
newpass2 = str(args.newpass2)
# The two instances of the new password must match
if newpass != newpass2:
raise ValueError('New passwords don\'t match')
# Write the new password to the database
db.change_password(user, oldpass, newpass)
# The user requested a touchkey change
elif change == 'touchkey':
# The touchkey must be present
if 'touchkey' not in args:
return
# Read the touchkey from the request arguments
touchkey = str(args.touchkey)
# An empty touchkey field should set the touchkey to NULL (disable touchkey login)
if len(touchkey) == 0:
touchkey = None
# Write the new touchkey to the database
db.change_touchkey(user, '', touchkey, verify_password=False)
# The user requested an avatar change
elif change == 'avatar':
# The new avatar field must be present
if 'avatar' not in args:
return
# Read the raw image data from the request
avatar = bytes(args.avatar)
# Only process the image, if its size is more than zero. Zero size means no new image was uploaded
if len(avatar) == 0:
return
# Detect the MIME type
filemagic: magic.FileMagic = magic.detect_from_content(avatar)
if not filemagic.mime_type.startswith('image/'):
return
# Create the absolute path of the upload directory
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/users/')
os.makedirs(abspath, exist_ok=True)
try:
# Parse the image data
image: Image = Image.open(BytesIO(avatar))
# Resize the image to 150x150
image.thumbnail((150, 150), Image.LANCZOS)
# Write the image to the file
image.save(os.path.join(abspath, f'{user.id}.png'), 'PNG')
except OSError:
return
except UnicodeDecodeError:
raise ValueError('an argument not a string')
def handle_admin_change(args: RequestArguments, db: MatematDatabase, config: Dict[str, str]):
"""
Write the changes requested by an admin for users of products.
:param args: The RequestArguments object passed to the pagelet.
:param db: The database facade where changes are written to.
:param config: The dictionary of config file entries from the [Pagelets] section.
"""
try:
# Read the type of change requested by the admin, then switch over it
change = str(args.adminchange)
# The user requested to create a new user
if change == 'newuser':
# Only create a new user if all required properties of the user are present in the request arguments
if 'username' not in args or 'email' not in args or 'password' not in args:
return
# Read the properties from the request arguments
username = str(args.username)
email = str(args.email)
# An empty e-mail field should be interpreted as NULL
if len(email) == 0:
email = None
password = str(args.password)
is_member = 'ismember' in args
is_admin = 'isadmin' in args
# Create the user in the database
db.create_user(username, password, email, member=is_member, admin=is_admin)
# The user requested to create a new product
elif change == 'newproduct':
# Only create a new product if all required properties of the product are present in the request arguments
if 'name' not in args or 'pricemember' not in args or 'pricenonmember' not in args:
return
# Read the properties from the request arguments
name = str(args.name)
price_member = parse_chf(str(args.pricemember))
price_non_member = parse_chf(str(args.pricenonmember))
# Create the user in the database
newproduct = db.create_product(name, price_member, price_non_member)
# If a new product image was uploaded, process it
if 'image' in args:
# Read the raw image data from the request
avatar = bytes(args.image)
# Only process the image, if its size is more than zero. Zero size means no new image was uploaded
if len(avatar) == 0:
return
# Detect the MIME type
filemagic: magic.FileMagic = magic.detect_from_content(avatar)
if not filemagic.mime_type.startswith('image/'):
return
# Create the absolute path of the upload directory
abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/products/')
os.makedirs(abspath, exist_ok=True)
try:
# Parse the image data
image: Image = Image.open(BytesIO(avatar))
# Resize the image to 150x150
image.thumbnail((150, 150), Image.LANCZOS)
# Write the image to the file
image.save(os.path.join(abspath, f'{newproduct.id}.png'), 'PNG')
except OSError:
return
# The user requested to restock a product
elif change == 'restock':
# Only restock a product if all required properties are present in the request arguments
if 'productid' not in args or 'amount' not in args:
return
# Read the properties from the request arguments
productid = int(str(args.productid))
amount = int(str(args.amount))
# Fetch the product to restock from the database
product = db.get_product(productid)
# Write the new stock count to the database
db.restock(product, amount)
except UnicodeDecodeError:
raise ValueError('an argument not a string')