1
0
Fork 0
forked from s3lph/matemat

Merge branch '7-configuration-file' into DO-NOT-MERGE-horrible-webapp

This commit is contained in:
s3lph 2018-07-13 20:26:25 +02:00
commit 9b67d1d1ff
22 changed files with 322 additions and 70 deletions

3
.gitignore vendored
View file

@ -9,4 +9,5 @@
*.sqlite3 *.sqlite3
*.db *.db
static/img/thumbnails static/upload/
**/matemat.conf

2
doc

@ -1 +1 @@
Subproject commit 9634785e5621b324f72598b3cee83bbc45c25d7b Subproject commit d5dc5f794ad1b959b9d4dce47eeb6068e5a75115

View file

@ -1,16 +1,23 @@
from typing import Any, Dict, Iterable, Union
import sys import sys
from matemat.webserver import parse_config_file
if __name__ == '__main__': if __name__ == '__main__':
# Those imports are actually needed, as they implicitly register pagelets. # Those imports are actually needed, as they implicitly register pagelets.
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
from matemat.webserver.pagelets import * from matemat.webserver.pagelets import *
from matemat.webserver import MatematWebserver from matemat.webserver import MatematWebserver
# Read HTTP port from command line # Use config file name from command line, if present
port: int = 8080 configfile: Union[str, Iterable[str]] = '/etc/matemat.conf'
if len(sys.argv) > 1: if len(sys.argv) > 1:
port = int(sys.argv[1]) configfile = sys.argv[1:]
# Parse the config file
config: Dict[str, Any] = parse_config_file(configfile)
# Start the web server # Start the web server
MatematWebserver(port=port, staticroot='./static').start() MatematWebserver(**config).start()

View file

@ -9,3 +9,4 @@ server will attempt to serve the request with a static resource in a previously
from .requestargs import RequestArgument, RequestArguments from .requestargs import RequestArgument, RequestArguments
from .responses import PageletResponse, RedirectResponse, TemplateResponse from .responses import PageletResponse, RedirectResponse, TemplateResponse
from .httpd import MatematWebserver, HttpHandler, pagelet from .httpd import MatematWebserver, HttpHandler, pagelet
from .config import parse_config_file

View file

@ -0,0 +1,57 @@
from typing import Any, Dict, Iterable, List, Union
import os
from configparser import ConfigParser
def parse_config_file(paths: Union[str, Iterable[str]]) -> Dict[str, Any]:
"""
Parse the configuration file at the given path.
:param paths: The config file(s) to parse.
:return: A dictionary containing the parsed configuration.
"""
# Set up default values
config: Dict[str, Any] = {
# Address to listen on
'listen': '::',
# TCP port to listen on
'port': 80,
# Root directory of statically served content
'staticroot': '/var/matemat/static',
# Root directory of Jinja2 templates
'templateroot': '/var/matemat/templates',
# Variables passed to pagelets
'pagelet_variables': dict()
}
# Initialize the config parser
parser: ConfigParser = ConfigParser()
# Replace the original option transformation by a string constructor to preserve the case of config keys
parser.optionxform = str
# Normalize the input argument (turn a scalar into a list and expand ~ in paths)
files: List[str] = list()
if isinstance(paths, str):
files.append(os.path.expanduser(paths))
else:
for path in paths:
if not isinstance(path, str):
raise TypeError(f'Not a string: {path}')
files.append(os.path.expanduser(path))
# Read the configuration files
parser.read(files, 'utf-8')
# Read values from the [Matemat] section, if present, falling back to default values
if 'Matemat' in parser.sections():
config['listen'] = parser['Matemat'].get('Address', config['listen'])
config['port'] = int(parser['Matemat'].get('Port', config['port']))
config['staticroot'] = parser['Matemat'].get('StaticPath', os.path.expanduser(config['staticroot']))
config['templateroot'] = parser['Matemat'].get('TemplatePath', os.path.expanduser(config['templateroot']))
# Read all values from the [Pagelets] section, if present. These values are passed to pagelet functions
if 'Pagelets' in parser.sections():
for k, v in parser['Pagelets'].items():
config['pagelet_variables'][k] = v
return config

View file

@ -34,7 +34,8 @@ _PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
str, # Request path str, # Request path
RequestArguments, # HTTP Request arguments RequestArguments, # HTTP Request arguments
Dict[str, Any], # Session vars Dict[str, Any], # Session vars
Dict[str, str]], # Response headers Dict[str, str], # Response headers
Dict[str, str]], # Items from the [Pagelets] section in the config file
Union[ # Return type: either a response body, or a redirect Union[ # Return type: either a response body, or a redirect
bytes, str, # Response body: will assign HTTP/1.0 200 OK bytes, str, # Response body: will assign HTTP/1.0 200 OK
PageletResponse, # A generic response PageletResponse, # A generic response
@ -56,7 +57,8 @@ def pagelet(path: str):
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) headers: Dict[str, str],
config: Dict[str, str])
-> Union[bytes, str, Tuple[int, str]] -> Union[bytes, str, Tuple[int, str]]
method: The HTTP method (GET, POST) that was used. method: The HTTP method (GET, POST) that was used.
@ -64,6 +66,7 @@ def pagelet(path: str):
args: The arguments that were passed with the request (as GET or POST arguments). args: The arguments that were passed with the request (as GET or POST arguments).
session_vars: The session storage. May be read from and written to. session_vars: The session storage. May be read from and written to.
headers: The dictionary of HTTP response headers. Add headers you wish to send with the response. headers: The dictionary of HTTP response headers. Add headers you wish to send with the response.
config: The dictionary of variables read from the [Pagelets] section of the configuration file.
returns: One of the following: returns: One of the following:
- A HTTP Response body as str or bytes - A HTTP Response body as str or bytes
- A PageletResponse class instance: An instance of (a subclass of) - A PageletResponse class instance: An instance of (a subclass of)
@ -77,6 +80,7 @@ def pagelet(path: str):
str, str,
RequestArguments, RequestArguments,
Dict[str, Any], Dict[str, Any],
Dict[str, str],
Dict[str, str]], Dict[str, str]],
Union[ Union[
bytes, str, bytes, str,
@ -102,12 +106,15 @@ class MatematHTTPServer(HTTPServer):
handler: Type[BaseHTTPRequestHandler], handler: Type[BaseHTTPRequestHandler],
staticroot: str, staticroot: str,
templateroot: str, templateroot: str,
pagelet_variables: Dict[str, str],
bind_and_activate: bool = True) -> None: bind_and_activate: bool = True) -> None:
super().__init__(server_address, handler, bind_and_activate) super().__init__(server_address, handler, bind_and_activate)
# Resolve webroot directory # Resolve webroot directory
self.webroot = os.path.abspath(staticroot) self.webroot = os.path.abspath(staticroot)
# Set up session vars dict # Set up session vars dict
self.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict() self.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict()
# Set up pagelet arguments dict
self.pagelet_variables = pagelet_variables
# Set up the Jinja2 environment # Set up the Jinja2 environment
self.jinja_env: jinja2.Environment = jinja2.Environment( self.jinja_env: jinja2.Environment = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.abspath(templateroot)) loader=jinja2.FileSystemLoader(os.path.abspath(templateroot))
@ -130,10 +137,11 @@ class MatematWebserver(object):
""" """
def __init__(self, def __init__(self,
listen: str = '::', listen: str,
port: int = 80, port: int,
staticroot: str = './static', staticroot: str,
templateroot: str = './templates') -> None: templateroot: str,
pagelet_variables: Dict[str, str]) -> None:
""" """
Instantiate a MatematWebserver. Instantiate a MatematWebserver.
@ -141,16 +149,14 @@ class MatematWebserver(object):
:param port: The TCP port to listen on. :param port: The TCP port to listen on.
:param staticroot: Path to the static webroot directory. :param staticroot: Path to the static webroot directory.
:param templateroot: Path to the Jinja2 templates root directory. :param templateroot: Path to the Jinja2 templates root directory.
:param pagelet_variables: Dictionary of variables to pass to pagelet functions.
""" """
if len(listen) == 0:
# Empty string should be interpreted as all addresses
listen = '::'
# IPv4 address detection heuristic # IPv4 address detection heuristic
if ':' not in listen and '.' in listen: if ':' not in listen and '.' in listen:
# Rewrite IPv4 address to IPv6-mapped form # Rewrite IPv4 address to IPv6-mapped form
listen = f'::ffff:{listen}' listen = f'::ffff:{listen}'
# Create the http server # Create the http server
self._httpd = MatematHTTPServer((listen, port), HttpHandler, staticroot, templateroot) self._httpd = MatematHTTPServer((listen, port), HttpHandler, staticroot, templateroot, pagelet_variables)
def start(self) -> None: def start(self) -> None:
""" """
@ -290,7 +296,12 @@ class HttpHandler(BaseHTTPRequestHandler):
'Cache-Control': 'no-cache' 'Cache-Control': 'no-cache'
} }
# Call the pagelet function # Call the pagelet function
pagelet_res = _PAGELET_PATHS[path](method, path, args, self.session_vars, headers) pagelet_res = _PAGELET_PATHS[path](method,
path,
args,
self.session_vars,
headers,
self.server.pagelet_variables)
# Parse the pagelet's return value, vielding a HTTP status code and a response body # Parse the pagelet's return value, vielding a HTTP status code and a response body
hsc, data = self._parse_pagelet_result(pagelet_res, headers) hsc, data = self._parse_pagelet_result(pagelet_res, headers)
# Send the HTTP status code # Send the HTTP status code

View file

@ -14,7 +14,8 @@ def admin(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) \ headers: Dict[str, str],
config: Dict[str, str]) \
-> Union[str, bytes, PageletResponse]: -> Union[str, bytes, PageletResponse]:
if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars: if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars:
return RedirectResponse('/login') return RedirectResponse('/login')
@ -23,19 +24,21 @@ def admin(method: str,
if authlevel < 2: if authlevel < 2:
raise HttpException(403) raise HttpException(403)
with MatematDatabase('/var/matemat/db/test.db') as db: with MatematDatabase(config['DatabaseFile']) as db:
user = db.get_user(uid) user = db.get_user(uid)
if method == 'POST' and 'change' in args: if method == 'POST' and 'change' in args:
handle_change(args, user, db) handle_change(args, user, db, config)
elif method == 'POST' and 'adminchange' in args and user.is_admin: elif method == 'POST' and 'adminchange' in args and user.is_admin:
handle_admin_change(args, db) handle_admin_change(args, db, config)
users = db.list_users() users = db.list_users()
products = db.list_products() products = db.list_products()
return TemplateResponse('admin.html', authuser=user, authlevel=authlevel, users=users, products=products) return TemplateResponse('admin.html',
authuser=user, authlevel=authlevel, users=users, products=products,
setupname=config['InstanceName'])
def handle_change(args: RequestArguments, user: User, db: MatematDatabase) -> None: def handle_change(args: RequestArguments, user: User, db: MatematDatabase, config: Dict[str, str]) -> None:
try: try:
change = str(args.change) change = str(args.change)
@ -78,15 +81,16 @@ def handle_change(args: RequestArguments, user: User, db: MatematDatabase) -> No
if 'avatar' not in args: if 'avatar' not in args:
return return
avatar = bytes(args.avatar) avatar = bytes(args.avatar)
os.makedirs('./static/upload/thumbnails/users/', exist_ok=True) abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/users/')
with open(f'./static/upload/thumbnails/users/{user.id}.png', 'wb') as f: os.makedirs(abspath, exist_ok=True)
with open(os.path.join(abspath, f'{user.id}.png'), 'wb') as f:
f.write(avatar) f.write(avatar)
except UnicodeDecodeError: except UnicodeDecodeError:
raise ValueError('an argument not a string') raise ValueError('an argument not a string')
def handle_admin_change(args: RequestArguments, db: MatematDatabase): def handle_admin_change(args: RequestArguments, db: MatematDatabase, config: Dict[str, str]):
try: try:
change = str(args.adminchange) change = str(args.adminchange)
@ -111,8 +115,9 @@ def handle_admin_change(args: RequestArguments, db: MatematDatabase):
newproduct = db.create_product(name, price_member, price_non_member) newproduct = db.create_product(name, price_member, price_non_member)
if 'image' in args: if 'image' in args:
image = bytes(args.image) image = bytes(args.image)
os.makedirs('./static/upload/thumbnails/products/', exist_ok=True) abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/products/')
with open(f'./static/upload/thumbnails/products/{newproduct.id}.png', 'wb') as f: os.makedirs(abspath, exist_ok=True)
with open(os.path.join(abspath, f'{newproduct.id}.png'), 'wb') as f:
f.write(image) f.write(image)
elif change == 'restock': elif change == 'restock':

View file

@ -9,11 +9,12 @@ def buy(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) \ headers: Dict[str, str],
config: Dict[str, str]) \
-> Union[str, bytes, PageletResponse]: -> Union[str, bytes, PageletResponse]:
if 'authenticated_user' not in session_vars: if 'authenticated_user' not in session_vars:
return RedirectResponse('/') return RedirectResponse('/')
with MatematDatabase('/var/matemat/db/test.db') as db: with MatematDatabase(config['DatabaseFile']) as db:
uid: int = session_vars['authenticated_user'] uid: int = session_vars['authenticated_user']
user = db.get_user(uid) user = db.get_user(uid)
if 'n' in args: if 'n' in args:

View file

@ -9,11 +9,12 @@ def deposit(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) \ headers: Dict[str, str],
config: Dict[str, str]) \
-> Union[str, bytes, PageletResponse]: -> Union[str, bytes, PageletResponse]:
if 'authenticated_user' not in session_vars: if 'authenticated_user' not in session_vars:
return RedirectResponse('/') return RedirectResponse('/')
with MatematDatabase('/var/matemat/db/test.db') as db: with MatematDatabase(config['DatabaseFile']) as db:
uid: int = session_vars['authenticated_user'] uid: int = session_vars['authenticated_user']
user = db.get_user(uid) user = db.get_user(uid)
if 'n' in args: if 'n' in args:

View file

@ -12,14 +12,16 @@ def login_page(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str])\ headers: Dict[str, str],
config: Dict[str, str])\
-> Union[bytes, str, PageletResponse]: -> Union[bytes, str, PageletResponse]:
if 'authenticated_user' in session_vars: if 'authenticated_user' in session_vars:
return RedirectResponse('/') return RedirectResponse('/')
if method == 'GET': if method == 'GET':
return TemplateResponse('login.html') return TemplateResponse('login.html',
setupname=config['InstanceName'])
elif method == 'POST': elif method == 'POST':
with MatematDatabase('/var/matemat/db/test.db') as db: with MatematDatabase(config['DatabaseFile']) as db:
try: try:
user: User = db.login(str(args.username), str(args.password)) user: User = db.login(str(args.username), str(args.password))
except AuthenticationError: except AuthenticationError:

View file

@ -9,7 +9,8 @@ def logout(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) \ headers: Dict[str, str],
config: Dict[str, str])\
-> Union[bytes, str, PageletResponse]: -> Union[bytes, str, PageletResponse]:
if 'authenticated_user' in session_vars: if 'authenticated_user' in session_vars:
del session_vars['authenticated_user'] del session_vars['authenticated_user']

View file

@ -10,15 +10,19 @@ def main_page(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str])\ headers: Dict[str, str],
config: Dict[str, str])\
-> Union[bytes, str, PageletResponse]: -> Union[bytes, str, PageletResponse]:
with MatematDatabase('/var/matemat/db/test.db') as db: with MatematDatabase(config['DatabaseFile']) as db:
if 'authenticated_user' in session_vars: if 'authenticated_user' in session_vars:
uid: int = session_vars['authenticated_user'] uid: int = session_vars['authenticated_user']
authlevel: int = session_vars['authentication_level'] authlevel: int = session_vars['authentication_level']
user = db.get_user(uid) user = db.get_user(uid)
products = db.list_products() products = db.list_products()
return TemplateResponse('productlist.html', authuser=user, products=products, authlevel=authlevel) return TemplateResponse('productlist.html',
authuser=user, products=products, authlevel=authlevel,
setupname=config['InstanceName'])
else: else:
users = db.list_users() users = db.list_users()
return TemplateResponse('userlist.html', users=users) return TemplateResponse('userlist.html',
users=users, setupname=config['InstanceName'])

View file

@ -14,7 +14,8 @@ def modproduct(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) \ headers: Dict[str, str],
config: Dict[str, str]) \
-> Union[str, bytes, PageletResponse]: -> Union[str, bytes, PageletResponse]:
if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars: if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars:
return RedirectResponse('/login') return RedirectResponse('/login')
@ -23,7 +24,7 @@ def modproduct(method: str,
if authlevel < 2: if authlevel < 2:
raise HttpException(403) raise HttpException(403)
with MatematDatabase('/var/matemat/db/test.db') as db: with MatematDatabase(config['DatabaseFile']) as db:
authuser = db.get_user(auth_uid) authuser = db.get_user(auth_uid)
if not authuser.is_admin: if not authuser.is_admin:
raise HttpException(403) raise HttpException(403)
@ -34,20 +35,23 @@ def modproduct(method: str,
product = db.get_product(modproduct_id) product = db.get_product(modproduct_id)
if 'change' in args: if 'change' in args:
handle_change(args, product, db) handle_change(args, product, db, config)
if str(args.change) == 'del': if str(args.change) == 'del':
return RedirectResponse('/admin') return RedirectResponse('/admin')
return TemplateResponse('modproduct.html', authuser=authuser, product=product, authlevel=authlevel) return TemplateResponse('modproduct.html',
authuser=authuser, product=product, authlevel=authlevel,
setupname=config['InstanceName'])
def handle_change(args: RequestArguments, product: Product, db: MatematDatabase) -> None: def handle_change(args: RequestArguments, product: Product, db: MatematDatabase, config: Dict[str, str]) -> None:
change = str(args.change) change = str(args.change)
if change == 'del': if change == 'del':
db.delete_product(product) db.delete_product(product)
try: try:
os.remove(f'./static/upload/thumbnails/products/{product.id}.png') abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/products/')
os.remove(os.path.join(abspath, f'{product.id}.png'))
except FileNotFoundError: except FileNotFoundError:
pass pass
@ -76,6 +80,7 @@ def handle_change(args: RequestArguments, product: Product, db: MatematDatabase)
if 'image' in args: if 'image' in args:
image = bytes(args.image) image = bytes(args.image)
if len(image) > 0: if len(image) > 0:
os.makedirs('./static/upload/thumbnails/products/', exist_ok=True) abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/products/')
with open(f'./static/upload/thumbnails/products/{product.id}.png', 'wb') as f: os.makedirs(abspath, exist_ok=True)
with open(os.path.join(abspath, f'{product.id}.png'), 'wb') as f:
f.write(image) f.write(image)

View file

@ -14,7 +14,8 @@ def moduser(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) \ headers: Dict[str, str],
config: Dict[str, str]) \
-> Union[str, bytes, PageletResponse]: -> Union[str, bytes, PageletResponse]:
if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars: if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars:
return RedirectResponse('/login') return RedirectResponse('/login')
@ -23,7 +24,7 @@ def moduser(method: str,
if authlevel < 2: if authlevel < 2:
raise HttpException(403) raise HttpException(403)
with MatematDatabase('/var/matemat/db/test.db') as db: with MatematDatabase(config['DatabaseFile']) as db:
authuser = db.get_user(auth_uid) authuser = db.get_user(auth_uid)
if not authuser.is_admin: if not authuser.is_admin:
raise HttpException(403) raise HttpException(403)
@ -34,20 +35,23 @@ def moduser(method: str,
user = db.get_user(moduser_id) user = db.get_user(moduser_id)
if 'change' in args: if 'change' in args:
handle_change(args, user, db) handle_change(args, user, db, config)
if str(args.change) == 'del': if str(args.change) == 'del':
return RedirectResponse('/admin') return RedirectResponse('/admin')
return TemplateResponse('moduser.html', authuser=authuser, user=user, authlevel=authlevel) return TemplateResponse('moduser.html',
authuser=authuser, user=user, authlevel=authlevel,
setupname=config['InstanceName'])
def handle_change(args: RequestArguments, user: User, db: MatematDatabase) -> None: def handle_change(args: RequestArguments, user: User, db: MatematDatabase, config: Dict[str, str]) -> None:
change = str(args.change) change = str(args.change)
if change == 'del': if change == 'del':
db.delete_user(user) db.delete_user(user)
try: try:
os.remove(f'./static/upload/thumbnails/users/{user.id}.png') abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/users/')
os.remove(os.path.join(abspath, f'{user.id}.png'))
except FileNotFoundError: except FileNotFoundError:
pass pass
@ -83,6 +87,7 @@ def handle_change(args: RequestArguments, user: User, db: MatematDatabase) -> No
if 'avatar' in args: if 'avatar' in args:
avatar = bytes(args.avatar) avatar = bytes(args.avatar)
if len(avatar) > 0: if len(avatar) > 0:
os.makedirs('./static/upload/thumbnails/users/', exist_ok=True) abspath: str = os.path.join(os.path.abspath(config['UploadDir']), 'thumbnails/users/')
with open(f'./static/upload/thumbnails/users/{user.id}.png', 'wb') as f: os.makedirs(abspath, exist_ok=True)
with open(os.path.join(abspath, f'{user.id}.png'), 'wb') as f:
f.write(avatar) f.write(avatar)

View file

@ -12,14 +12,17 @@ def touchkey_page(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str])\ headers: Dict[str, str],
config: Dict[str, str])\
-> Union[bytes, str, PageletResponse]: -> Union[bytes, str, PageletResponse]:
if 'authenticated_user' in session_vars: if 'authenticated_user' in session_vars:
return RedirectResponse('/') return RedirectResponse('/')
if method == 'GET': if method == 'GET':
return TemplateResponse('touchkey.html', username=str(args.username), uid=int(str(args.uid))) return TemplateResponse('touchkey.html',
username=str(args.username), uid=int(str(args.uid)),
setupname=config['InstanceName'])
elif method == 'POST': elif method == 'POST':
with MatematDatabase('/var/matemat/db/test.db') as db: with MatematDatabase(config['DatabaseFile']) as db:
try: try:
user: User = db.login(str(args.username), touchkey=str(args.touchkey)) user: User = db.login(str(args.username), touchkey=str(args.touchkey))
except AuthenticationError: except AuthenticationError:

View file

@ -109,6 +109,8 @@ class MockServer:
self.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict() self.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict()
# Webroot for statically served content # Webroot for statically served content
self.webroot: str = webroot self.webroot: str = webroot
# Variables to pass to pagelets
self.pagelet_variables: Dict[str, str] = dict()
# Jinja environment with a single, static template # Jinja environment with a single, static template
self.jinja_env = jinja2.Environment( self.jinja_env = jinja2.Environment(
loader=jinja2.DictLoader({'test.txt': 'Hello, {{ what }}!'}) loader=jinja2.DictLoader({'test.txt': 'Hello, {{ what }}!'})
@ -168,6 +170,7 @@ def test_pagelet(path: str):
str, str,
RequestArguments, RequestArguments,
Dict[str, Any], Dict[str, Any],
Dict[str, str],
Dict[str, str]], Dict[str, str]],
Union[bytes, str, Tuple[int, str]]]): Union[bytes, str, Tuple[int, str]]]):
@pagelet(path) @pagelet(path)
@ -175,9 +178,10 @@ def test_pagelet(path: str):
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]): headers: Dict[str, str],
pagelet_variables: Dict[str, str]):
headers['X-Test-Pagelet'] = fun.__name__ headers['X-Test-Pagelet'] = fun.__name__
result = fun(method, path, args, session_vars, headers) result = fun(method, path, args, session_vars, headers, pagelet_variables)
return result return result
return testing_wrapper return testing_wrapper
return with_testing_headers return with_testing_headers

View file

@ -0,0 +1,136 @@
from typing import List
from unittest import TestCase
from unittest.mock import patch
from io import StringIO
from matemat.webserver import parse_config_file
_EMPTY_CONFIG = ''
_FULL_CONFIG = '''
[Matemat]
Address=fe80::0123:45ff:fe67:89ab
Port = 8080
StaticPath =/var/test/static
TemplatePath= /var/test/templates
[Pagelets]
Name=Matemat
(Unit Test)
UploadDir= /var/test/static/upload
DatabaseFile=/var/test/db/test.db
'''
_PARTIAL_CONFIG = '''
[Matemat]
Port=443
[Pagelets]
Name=Matemat (Unit Test 2)
'''
class IterOpenMock:
"""
Enable mocking of subsequent open() class for different files. Usage:
with mock.patch('builtins.open', IterOpenMock(['content 1', 'content 2'])):
...
with open('foo') as f:
# Reading from f will yield 'content 1'
with open('foo') as f:
# Reading from f will yield 'content 2'
"""
def __init__(self, files: List[str]):
self.files = files
def __enter__(self):
return StringIO(self.files[0])
def __exit__(self, exc_type, exc_val, exc_tb):
self.files = self.files[1:]
class TestConfig(TestCase):
def test_parse_config_empty_defualt_values(self):
"""
Test that default values are set when reading an empty config file.
"""
# Mock the open() function to return an empty config file example
with patch('builtins.open', return_value=StringIO(_EMPTY_CONFIG)):
# The filename is only a placeholder, file content is determined by mocking open
config = parse_config_file('test')
# Make sure all mandatory values are present
self.assertIn('listen', config)
self.assertIn('port', config)
self.assertIn('staticroot', config)
self.assertIn('templateroot', config)
self.assertIn('pagelet_variables', config)
# Make sure all mandatory values are set to their default
self.assertEqual('::', config['listen'])
self.assertEqual(80, config['port'])
self.assertEqual('/var/matemat/static', config['staticroot'])
self.assertEqual('/var/matemat/templates', config['templateroot'])
self.assertIsInstance(config['pagelet_variables'], dict)
self.assertEqual(0, len(config['pagelet_variables']))
def test_parse_config_full(self):
"""
Test that all default values are overridden by the values provided in the config file.
"""
# Mock the open() function to return a full config file example
with patch('builtins.open', return_value=StringIO(_FULL_CONFIG)):
# The filename is only a placeholder, file content is determined by mocking open
config = parse_config_file('test')
# Make sure all mandatory values are present
self.assertIn('listen', config)
self.assertIn('port', config)
self.assertIn('staticroot', config)
self.assertIn('templateroot', config)
self.assertIn('pagelet_variables', config)
self.assertIn('Name', config['pagelet_variables'])
self.assertIn('UploadDir', config['pagelet_variables'])
self.assertIn('DatabaseFile', config['pagelet_variables'])
# Make sure all values are set as described in the config file
self.assertEqual('fe80::0123:45ff:fe67:89ab', config['listen'])
self.assertEqual(8080, config['port'])
self.assertEqual('/var/test/static', config['staticroot'])
self.assertEqual('/var/test/templates', config['templateroot'])
self.assertEqual('Matemat\n(Unit Test)', config['pagelet_variables']['Name'])
self.assertEqual('/var/test/static/upload', config['pagelet_variables']['UploadDir'])
self.assertEqual('/var/test/db/test.db', config['pagelet_variables']['DatabaseFile'])
def test_parse_config_precedence(self):
"""
Test that config items from files with higher precedence replace items with the same key from files with lower
precedence.
"""
# Mock the open() function to return a full config file on the first call, and a partial config file on the
# second call
with patch('builtins.open', return_value=IterOpenMock([_FULL_CONFIG, _PARTIAL_CONFIG])):
# These filenames are only placeholders, file content is determined by mocking open
config = parse_config_file(['full', 'partial'])
# Make sure all mandatory values are present
self.assertIn('listen', config)
self.assertIn('port', config)
self.assertIn('staticroot', config)
self.assertIn('templateroot', config)
self.assertIn('pagelet_variables', config)
self.assertIn('Name', config['pagelet_variables'])
self.assertIn('UploadDir', config['pagelet_variables'])
self.assertIn('DatabaseFile', config['pagelet_variables'])
# Make sure all values are set as described in the config files, values from the partial file take precedence
self.assertEqual('fe80::0123:45ff:fe67:89ab', config['listen'])
self.assertEqual(443, config['port'])
self.assertEqual('/var/test/static', config['staticroot'])
self.assertEqual('/var/test/templates', config['templateroot'])
self.assertEqual('Matemat (Unit Test 2)', config['pagelet_variables']['Name'])
self.assertEqual('/var/test/static/upload', config['pagelet_variables']['UploadDir'])
self.assertEqual('/var/test/db/test.db', config['pagelet_variables']['DatabaseFile'])

View file

@ -12,7 +12,8 @@ def post_test_pagelet(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]): headers: Dict[str, str],
pagelet_variables: Dict[str, str]):
""" """
Test pagelet that simply prints the parsed arguments as response body. Test pagelet that simply prints the parsed arguments as response body.
""" """

View file

@ -13,7 +13,8 @@ def serve_test_pagelet_str(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]: headers: Dict[str, str],
pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'text/plain' headers['Content-Type'] = 'text/plain'
return 'serve test pagelet str' return 'serve test pagelet str'
@ -23,7 +24,8 @@ def serve_test_pagelet_bytes(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]: headers: Dict[str, str],
pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'application/octet-stream' headers['Content-Type'] = 'application/octet-stream'
return b'serve\x80test\xffpagelet\xfebytes' return b'serve\x80test\xffpagelet\xfebytes'
@ -33,7 +35,8 @@ def serve_test_pagelet_redirect(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]: headers: Dict[str, str],
pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
return RedirectResponse('/foo/bar') return RedirectResponse('/foo/bar')
@ -42,7 +45,8 @@ def serve_test_pagelet_template(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]: headers: Dict[str, str],
pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'text/plain' headers['Content-Type'] = 'text/plain'
return TemplateResponse('test.txt', what='World') return TemplateResponse('test.txt', what='World')
@ -53,7 +57,8 @@ def serve_test_pagelet_fail(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]: headers: Dict[str, str],
pagelet_variables: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
session_vars['test'] = 'hello, world!' session_vars['test'] = 'hello, world!'
headers['Content-Type'] = 'text/plain' headers['Content-Type'] = 'text/plain'
raise HttpException() raise HttpException()

View file

@ -13,7 +13,8 @@ def session_test_pagelet(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]): headers: Dict[str, str],
pagelet_variables: Dict[str, str]):
session_vars['test'] = 'hello, world!' session_vars['test'] = 'hello, world!'
headers['Content-Type'] = 'text/plain' headers['Content-Type'] = 'text/plain'
return 'session test' return 'session test'

View file

@ -2,7 +2,7 @@
<html> <html>
<head> <head>
{% block head %} {% block head %}
<title>Matemat</title> <title>{{ setupname }}</title>
<link rel="stylesheet" href="/css/matemat.css" /> <link rel="stylesheet" href="/css/matemat.css" />
{% endblock %} {% endblock %}
</head> </head>
@ -28,6 +28,7 @@
<footer> <footer>
{% block footer %} {% block footer %}
<ul> <ul>
<li> {{ setupname }}
<li> Matemat {{__version__}} <li> Matemat {{__version__}}
<li> &copy; 2018 s3lph <li> &copy; 2018 s3lph
<li> MIT License <li> MIT License

View file

@ -1,7 +1,7 @@
{% extends "base.html" %} {% extends "base.html" %}
{% block header %} {% block header %}
<h1>Welcome</h1> <h1>{{ setupname }}</h1>
{{ super() }} {{ super() }}
{% endblock %} {% endblock %}