Merge branch 'jinja2-template' into DO-NOT-MERGE-horrible-webapp

This commit is contained in:
s3lph 2018-07-09 22:38:39 +02:00
commit 6db093437b
38 changed files with 422 additions and 217 deletions

1
.gitignore vendored
View file

@ -9,3 +9,4 @@
*.sqlite3
*.db
static/img/thumbnails

View file

@ -19,6 +19,7 @@ This project intends to provide a well-tested and maintainable alternative to
- Python 3 (>=3.6)
- Python dependencies:
- apsw
- jinja2
## Usage

2
doc

@ -1 +1 @@
Subproject commit 14b8380090858c3bed5c3b2ee7cf1408aaa133df
Subproject commit 51e940460ddbaebb7f2ffc48d00d9ef19cf8d33f

View file

@ -13,4 +13,4 @@ if __name__ == '__main__':
port = int(sys.argv[1])
# Start the web server
MatematWebserver(port=port, webroot='./static').start()
MatematWebserver(port=port).start()

View file

@ -2,7 +2,7 @@
from typing import Optional
class AuthenticationError(BaseException):
class AuthenticationError(Exception):
def __init__(self, msg: Optional[str] = None) -> None:
super().__init__()

View file

@ -0,0 +1,20 @@
class HttpException(Exception):
def __init__(self, status: int = 500, title: str = 'An error occurred', message: str = None) -> None:
super().__init__()
self.__status: int = status
self.__title: str = title
self.__message: str = message
@property
def status(self) -> int:
return self.__status
@property
def title(self) -> str:
return self.__title
@property
def message(self) -> str:
return self.__message

View file

@ -4,3 +4,4 @@ This package provides custom exception classes used in the Matemat codebase.
from .AuthenticatonError import AuthenticationError
from .DatabaseConsistencyError import DatabaseConsistencyError
from .HttpException import HttpException

View file

@ -7,4 +7,5 @@ server will attempt to serve the request with a static resource in a previously
"""
from .requestargs import RequestArgument, RequestArguments
from .responses import PageletResponse, RedirectResponse, TemplateResponse
from .httpd import MatematWebserver, HttpHandler, pagelet

View file

@ -1,5 +1,5 @@
from typing import Any, Callable, Dict, Optional, Tuple, Union
from typing import Any, Callable, Dict, Tuple, Type, Union
import traceback
@ -12,10 +12,12 @@ from http.cookies import SimpleCookie
from uuid import uuid4
from datetime import datetime, timedelta
from matemat import __version__ as matemat_version
from matemat.webserver import RequestArguments
from matemat.webserver.util import parse_args
import jinja2
from matemat import __version__ as matemat_version
from matemat.exceptions import HttpException
from matemat.webserver import RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.webserver.util import parse_args
#
# Python internal class hacks
@ -27,15 +29,16 @@ TCPServer.address_family = socket.AF_INET6
BaseHTTPRequestHandler.log_request = lambda self, code='-', size='-': None
BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None
# Dictionary to hold registered pagelet paths and their handler functions
_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
str, # Request path
RequestArguments, # HTTP Request arguments
Dict[str, Any], # Session vars
Dict[str, str]], # Response headers
Tuple[int, Union[bytes, str]]]] = dict() # Returns: (status code, response body)
Union[ # Return type: either a response body, or a redirect
bytes, str, # Response body: will assign HTTP/1.0 200 OK
PageletResponse, # A generic response
]]] = dict()
# Inactivity timeout for client sessions
_SESSION_TIMEOUT: int = 3600
@ -54,15 +57,18 @@ def pagelet(path: str):
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])
-> (int, Optional[Union[str, bytes]])
-> Union[bytes, str, Tuple[int, str]]
method: The HTTP method (GET, POST) that was used.
path: The path that was requested.
args: The arguments that were passed with the request (as GET or POST arguments).
session_vars: The session storage. May be read from and written to.
headers: The dictionary of HTTP response headers. Add headers you wish to send with the response.
returns: A tuple consisting of the HTTP status code (as an int) and the response body (as str or bytes,
may be None)
returns: One of the following:
- A HTTP Response body as str or bytes
- A PageletResponse class instance: An instance of (a subclass of)
matemat.webserver.PageletResponse, e.g. encapsulating a redirect or a Jinja2 template.
raises: HttpException: If a non-200 HTTP status code should be returned
:param path: The path to register the function for.
"""
@ -72,16 +78,42 @@ def pagelet(path: str):
RequestArguments,
Dict[str, Any],
Dict[str, str]],
Tuple[int, Optional[Union[bytes, str]]]]):
Union[
bytes, str,
PageletResponse
]]):
# Add the function to the dict of pagelets
_PAGELET_PATHS[path] = fun
# Don't change the function itself at all
return fun
# Return the inner function (Python requires a "real" function annotation to not have any arguments except
# the function itself)
return http_handler
class MatematHTTPServer(HTTPServer):
"""
A http.server.HTTPServer subclass that acts as a container for data that must be persistent between requests.
"""
def __init__(self,
server_address: Any,
handler: Type[BaseHTTPRequestHandler],
staticroot: str,
templateroot: str,
bind_and_activate: bool = True) -> None:
super().__init__(server_address, handler, bind_and_activate)
# Resolve webroot directory
self.webroot = os.path.abspath(staticroot)
# Set up session vars dict
self.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict()
# Set up the Jinja2 environment
self.jinja_env: jinja2.Environment = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.abspath(templateroot))
)
class MatematWebserver(object):
"""
Then main webserver class, internally uses Python's http.server.
@ -97,13 +129,18 @@ class MatematWebserver(object):
server.start()
"""
def __init__(self, listen: str = '::', port: int = 80, webroot: str = './webroot') -> None:
def __init__(self,
listen: str = '::',
port: int = 80,
staticroot: str = './static',
templateroot: str = './templates') -> None:
"""
Instantiate a MatematWebserver.
:param listen: The IPv4 or IPv6 address to listen on
:param port: The TCP port to listen on
:param webroot: Path to the webroot directory
:param listen: The IPv4 or IPv6 address to listen on.
:param port: The TCP port to listen on.
:param staticroot: Path to the static webroot directory.
:param templateroot: Path to the Jinja2 templates root directory.
"""
if len(listen) == 0:
# Empty string should be interpreted as all addresses
@ -113,11 +150,7 @@ class MatematWebserver(object):
# Rewrite IPv4 address to IPv6-mapped form
listen = f'::ffff:{listen}'
# Create the http server
self._httpd = HTTPServer((listen, port), HttpHandler)
# Set up session vars dict
self._httpd.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict()
# Resolve webroot directory
self._httpd.webroot = os.path.abspath(webroot)
self._httpd = MatematHTTPServer((listen, port), HttpHandler, staticroot, templateroot)
def start(self) -> None:
"""
@ -136,6 +169,7 @@ class HttpHandler(BaseHTTPRequestHandler):
def __init__(self, request: bytes, client_address: Tuple[str, int], server: HTTPServer) -> None:
super().__init__(request, client_address, server)
self.server: MatematHTTPServer
@property
def server_version(self) -> str:
@ -167,7 +201,7 @@ class HttpHandler(BaseHTTPRequestHandler):
if session_id not in self.server.session_vars:
self.server.session_vars[session_id] = (now + timedelta(seconds=_SESSION_TIMEOUT)), dict()
else:
self.server.session_vars[session_id] =\
self.server.session_vars[session_id] = \
(now + timedelta(seconds=_SESSION_TIMEOUT), self.server.session_vars[session_id][1])
# Return the session ID and timeout
return session_id, self.server.session_vars[session_id][0]
@ -181,6 +215,53 @@ class HttpHandler(BaseHTTPRequestHandler):
if session_id in self.server.session_vars:
del self.server.session_vars[session_id]
def _parse_pagelet_result(self,
pagelet_res: Union[bytes, # Response body as bytes
str, # Response body as str
PageletResponse], # Encapsulated or unresolved response body
headers: Dict[str, str]) \
-> Tuple[int, bytes]:
"""
Process the return value of a pagelet function call.
:param pagelet_res: The pagelet return value.
:param headers: The dict of HTTP response headers, needed for setting the redirect header.
:return: The HTTP Response status code (an int) and body (a bytes).
:raises TypeError: If the pagelet result was not in the expected form.
"""
# The HTTP Response Status Code, defaults to 200 OK
hsc: int = 200
# The HTTP Response body, defaults to empty
data: bytes = bytes()
# If the response is a bytes object, it is used without further modification
if isinstance(pagelet_res, bytes):
data = pagelet_res
# If the response is a str object, it is encoded into a bytes object
elif isinstance(pagelet_res, str):
data = pagelet_res.encode('utf-8')
# If the response is a PageletResponse object, the status code is extracted. Generation of the body depends
# on the subtype
elif isinstance(pagelet_res, PageletResponse):
hsc = pagelet_res.status
# If the object is a RedirectResponse instance, no body is needed
if isinstance(pagelet_res, RedirectResponse):
headers['Location'] = pagelet_res.location
# If the object is a TemplateRespinse instance, pass the Jinja2 environment instance for rendering
elif isinstance(pagelet_res, TemplateResponse):
# noinspection PyProtectedMember
data = pagelet_res._render(self.server.jinja_env)
# else: Empty body
else:
raise TypeError(f'Return value of pagelet not understood: {pagelet_res}')
# Return the resulting status code and body
return hsc, data
def _handle(self, method: str, path: str, args: RequestArguments) -> None:
"""
Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource.
@ -205,17 +286,13 @@ class HttpHandler(BaseHTTPRequestHandler):
if path in _PAGELET_PATHS:
# Prepare some headers. Those can still be overwritten by the pagelet
headers: Dict[str, str] = {
'Content-Type': 'text/html',
'Content-Type': 'text/html; charset=utf-8',
'Cache-Control': 'no-cache'
}
# Call the pagelet function
hsc, data = _PAGELET_PATHS[path](method, path, args, self.session_vars, headers)
# The pagelet may return None as data as a shorthand for an empty response
if data is None:
data = bytes()
# If the pagelet returns a Python str, convert it to an UTF-8 encoded bytes object
if isinstance(data, str):
data = data.encode('utf-8')
pagelet_res = _PAGELET_PATHS[path](method, path, args, self.session_vars, headers)
# Parse the pagelet's return value, vielding a HTTP status code and a response body
hsc, data = self._parse_pagelet_result(pagelet_res, headers)
# Send the HTTP status code
self.send_response(hsc)
# Format the session cookie timeout string and send the session cookie header
@ -272,18 +349,23 @@ class HttpHandler(BaseHTTPRequestHandler):
path, args = parse_args(self.path)
self._handle('GET', path, args)
# Special handling for some errors
except PermissionError:
self.send_response(403, 'Forbidden')
self.end_headers()
except ValueError:
self.send_response(400, 'Bad Request')
self.end_headers()
except BaseException as e:
except HttpException as e:
self.send_error(e.status, e.title, e.message)
print(e)
traceback.print_tb(e.__traceback__)
except PermissionError as e:
self.send_error(403, 'Forbidden')
print(e)
traceback.print_tb(e.__traceback__)
except ValueError as e:
self.send_error(400, 'Bad Request')
print(e)
traceback.print_tb(e.__traceback__)
except BaseException as e:
# Generic error handling
self.send_response(500, 'Internal Server Error')
self.end_headers()
self.send_error(500, 'Internal Server Error')
print(e)
traceback.print_tb(e.__traceback__)
# noinspection PyPep8Naming
def do_POST(self) -> None:
@ -301,19 +383,21 @@ class HttpHandler(BaseHTTPRequestHandler):
# Parse the request and hand it to the handle function
self._handle('POST', path, args)
# Special handling for some errors
except PermissionError:
self.send_response(403, 'Forbidden')
self.end_headers()
except ValueError:
self.send_response(400, 'Bad Request')
self.end_headers()
except TypeError:
self.send_response(400, 'Bad Request')
self.end_headers()
except HttpException as e:
self.send_error(e.status, e.title, e.message)
print(e)
traceback.print_tb(e.__traceback__)
except PermissionError as e:
self.send_error(403, 'Forbidden')
print(e)
traceback.print_tb(e.__traceback__)
except ValueError as e:
self.send_error(400, 'Bad Request')
print(e)
traceback.print_tb(e.__traceback__)
except BaseException as e:
# Generic error handling
self.send_response(500, 'Internal Server Error')
self.end_headers()
self.send_error(500, 'Internal Server Error')
print(e)
traceback.print_tb(e.__traceback__)

View file

@ -1,14 +1,12 @@
from typing import Any, Dict, Optional, Tuple, Union
from jinja2 import Environment, FileSystemLoader
from typing import Any, Dict, Union
import os
from matemat.webserver import pagelet, RequestArguments
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.db import MatematDatabase
from matemat.primitives import User
from matemat.exceptions import DatabaseConsistencyError
from matemat.exceptions import DatabaseConsistencyError, HttpException
@pagelet('/admin')
@ -17,15 +15,13 @@ def admin(method: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) \
-> Tuple[int, Optional[Union[str, bytes]]]:
env = Environment(loader=FileSystemLoader('templates'))
-> Union[str, bytes, PageletResponse]:
if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars:
headers['Location'] = '/login'
return 301, None
return RedirectResponse('/login')
authlevel: int = session_vars['authentication_level']
uid: int = session_vars['authenticated_user']
if authlevel < 2:
return 403, None
raise HttpException(403)
with MatematDatabase('test.db') as db:
user = db.get_user(uid)
@ -36,8 +32,7 @@ def admin(method: str,
users = db.list_users()
products = db.list_products()
template = env.get_template('admin.html')
return 200, template.render(user=user, authlevel=authlevel, users=users, products=products)
return TemplateResponse('admin.html', authuser=user, authlevel=authlevel, users=users, products=products)
def handle_change(args: RequestArguments, user: User, db: MatematDatabase) -> None:

View file

@ -1,7 +1,6 @@
from typing import Any, Dict, Optional, Tuple, Union
from typing import Any, Dict, Union
from matemat.webserver import pagelet, RequestArguments
from matemat.primitives import User
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse
from matemat.db import MatematDatabase
@ -11,10 +10,9 @@ def buy(method: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) \
-> Tuple[int, Optional[Union[str, bytes]]]:
-> Union[str, bytes, PageletResponse]:
if 'authenticated_user' not in session_vars:
headers['Location'] = '/'
return 301, None
return RedirectResponse('/')
with MatematDatabase('test.db') as db:
uid: int = session_vars['authenticated_user']
user = db.get_user(uid)
@ -26,5 +24,4 @@ def buy(method: str,
pid = int(str(args.pid))
product = db.get_product(pid)
db.increment_consumption(user, product, n)
headers['Location'] = '/'
return 301, None
return RedirectResponse('/')

View file

@ -1,7 +1,6 @@
from typing import Any, Dict, Optional, Tuple, Union
from typing import Any, Dict, Union
from matemat.webserver import pagelet, RequestArguments
from matemat.primitives import User
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse
from matemat.db import MatematDatabase
@ -11,15 +10,13 @@ def deposit(method: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) \
-> Tuple[int, Optional[Union[str, bytes]]]:
-> Union[str, bytes, PageletResponse]:
if 'authenticated_user' not in session_vars:
headers['Location'] = '/'
return 301, None
return RedirectResponse('/')
with MatematDatabase('test.db') as db:
uid: int = session_vars['authenticated_user']
user = db.get_user(uid)
if 'n' in args:
n = int(str(args.n))
db.deposit(user, n)
headers['Location'] = '/'
return 301, None
return RedirectResponse('/')

View file

@ -1,10 +1,8 @@
from typing import Any, Dict, Optional, Tuple, Union
from typing import Any, Dict, Union
from jinja2 import Environment, FileSystemLoader
from matemat.exceptions import AuthenticationError
from matemat.webserver import pagelet, RequestArguments
from matemat.exceptions import AuthenticationError, HttpException
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.primitives import User
from matemat.db import MatematDatabase
@ -15,23 +13,18 @@ def login_page(method: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
-> Union[bytes, str, PageletResponse]:
if 'authenticated_user' in session_vars:
headers['Location'] = '/'
return 301, bytes()
env = Environment(loader=FileSystemLoader('templates'))
return RedirectResponse('/')
if method == 'GET':
template = env.get_template('login.html')
return 200, template.render()
return TemplateResponse('login.html')
elif method == 'POST':
with MatematDatabase('test.db') as db:
try:
user: User = db.login(str(args.username), str(args.password))
except AuthenticationError:
headers['Location'] = '/login'
return 301, bytes()
return RedirectResponse('/login')
session_vars['authenticated_user'] = user.id
session_vars['authentication_level'] = 2
headers['Location'] = '/'
return 301, None
return 405, None
return RedirectResponse('/')
raise HttpException(405)

View file

@ -1,7 +1,7 @@
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Any, Dict, Union
from matemat.webserver import pagelet, RequestArguments
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse
@pagelet('/logout')
@ -9,10 +9,9 @@ def logout(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
headers: Dict[str, str]) \
-> Union[bytes, str, PageletResponse]:
if 'authenticated_user' in session_vars:
del session_vars['authenticated_user']
session_vars['authentication_level'] = 0
headers['Location'] = '/'
return 301, None
return RedirectResponse('/')

View file

@ -1,9 +1,7 @@
from typing import Any, Dict, Optional, Tuple, Union
from typing import Any, Dict, Union
from jinja2 import Environment, FileSystemLoader
from matemat.webserver import pagelet, RequestArguments
from matemat.webserver import pagelet, RequestArguments, PageletResponse, TemplateResponse
from matemat.db import MatematDatabase
@ -13,17 +11,14 @@ def main_page(method: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
env = Environment(loader=FileSystemLoader('templates'))
-> Union[bytes, str, PageletResponse]:
with MatematDatabase('test.db') as db:
if 'authenticated_user' in session_vars:
uid: int = session_vars['authenticated_user']
authlevel: int = session_vars['authentication_level']
user = db.get_user(uid)
products = db.list_products()
template = env.get_template('productlist.html')
return 200, template.render(user=user, products=products, authlevel=authlevel)
return TemplateResponse('productlist.html', authuser=user, products=products, authlevel=authlevel)
else:
users = db.list_users()
template = env.get_template('userlist.html')
return 200, template.render(users=users)
return TemplateResponse('userlist.html', users=users)

View file

@ -1,13 +1,12 @@
from typing import Any, Dict, Optional, Tuple, Union
from jinja2 import Environment, FileSystemLoader
from typing import Any, Dict, Union
import os
from matemat.webserver import pagelet, RequestArguments
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.db import MatematDatabase
from matemat.primitives import Product
from matemat.exceptions import DatabaseConsistencyError
from matemat.exceptions import DatabaseConsistencyError, HttpException
@pagelet('/modproduct')
@ -16,22 +15,20 @@ def modproduct(method: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) \
-> Tuple[int, Optional[Union[str, bytes]]]:
env = Environment(loader=FileSystemLoader('templates'))
-> Union[str, bytes, PageletResponse]:
if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars:
headers['Location'] = '/login'
return 301, None
return RedirectResponse('/login')
authlevel: int = session_vars['authentication_level']
auth_uid: int = session_vars['authenticated_user']
if authlevel < 2:
return 403, None
raise HttpException(403)
with MatematDatabase('test.db') as db:
authuser = db.get_user(auth_uid)
if not authuser.is_admin:
return 403, None
raise HttpException(403)
if 'productid' not in args:
return 400, None
raise HttpException(400, '"productid" argument missing')
modproduct_id = int(str(args.productid))
product = db.get_product(modproduct_id)
@ -39,11 +36,9 @@ def modproduct(method: str,
if 'change' in args:
handle_change(args, product, db)
if str(args.change) == 'del':
headers['Location'] = '/admin'
return 301, None
return RedirectResponse('/admin')
template = env.get_template('modproduct.html')
return 200, template.render(product=product, authlevel=authlevel)
return TemplateResponse('modproduct.html', authuser=authuser, product=product, authlevel=authlevel)
def handle_change(args: RequestArguments, product: Product, db: MatematDatabase) -> None:

View file

@ -1,13 +1,12 @@
from typing import Any, Dict, Optional, Tuple, Union
from jinja2 import Environment, FileSystemLoader
from typing import Any, Dict, Union
import os
from matemat.webserver import pagelet, RequestArguments
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.db import MatematDatabase
from matemat.primitives import User
from matemat.exceptions import DatabaseConsistencyError
from matemat.exceptions import DatabaseConsistencyError, HttpException
@pagelet('/moduser')
@ -16,22 +15,20 @@ def moduser(method: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) \
-> Tuple[int, Optional[Union[str, bytes]]]:
env = Environment(loader=FileSystemLoader('templates'))
-> Union[str, bytes, PageletResponse]:
if 'authentication_level' not in session_vars or 'authenticated_user' not in session_vars:
headers['Location'] = '/login'
return 301, None
return RedirectResponse('/login')
authlevel: int = session_vars['authentication_level']
auth_uid: int = session_vars['authenticated_user']
if authlevel < 2:
return 403, None
raise HttpException(403)
with MatematDatabase('test.db') as db:
authuser = db.get_user(auth_uid)
if not authuser.is_admin:
return 403, None
raise HttpException(403)
if 'userid' not in args:
return 400, None
raise HttpException(400, '"userid" argument missing')
moduser_id = int(str(args.userid))
user = db.get_user(moduser_id)
@ -39,11 +36,9 @@ def moduser(method: str,
if 'change' in args:
handle_change(args, user, db)
if str(args.change) == 'del':
headers['Location'] = '/admin'
return 301, None
return RedirectResponse('/admin')
template = env.get_template('moduser.html')
return 200, template.render(user=user, authlevel=authlevel)
return TemplateResponse('moduser.html', authuser=authuser, user=user, authlevel=authlevel)
def handle_change(args: RequestArguments, user: User, db: MatematDatabase) -> None:
@ -86,7 +81,6 @@ def handle_change(args: RequestArguments, user: User, db: MatematDatabase) -> No
if 'avatar' in args:
avatar = bytes(args.avatar)
if len(avatar) > 0:
os.makedirs('./static/img/thumbnails/products/', exist_ok=True)
with open(f'./static/img/thumbnails/products/{user.id}.png', 'wb') as f:
os.makedirs('./static/img/thumbnails/users/', exist_ok=True)
with open(f'./static/img/thumbnails/users/{user.id}.png', 'wb') as f:
f.write(avatar)

View file

@ -1,10 +1,8 @@
from typing import Any, Dict, Optional, Tuple, Union
from typing import Any, Dict, Union
from jinja2 import Environment, FileSystemLoader
from matemat.exceptions import AuthenticationError
from matemat.webserver import pagelet, RequestArguments
from matemat.exceptions import AuthenticationError, HttpException
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.primitives import User
from matemat.db import MatematDatabase
@ -15,23 +13,18 @@ def touchkey_page(method: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
-> Union[bytes, str, PageletResponse]:
if 'authenticated_user' in session_vars:
headers['Location'] = '/'
return 301, bytes()
env = Environment(loader=FileSystemLoader('templates'))
return RedirectResponse('/')
if method == 'GET':
template = env.get_template('touchkey.html')
return 200, template.render(username=str(args.username), uid=int(str(args.uid)))
return TemplateResponse('touchkey.html', username=str(args.username), uid=int(str(args.uid)))
elif method == 'POST':
with MatematDatabase('test.db') as db:
try:
user: User = db.login(str(args.username), touchkey=str(args.touchkey))
except AuthenticationError:
headers['Location'] = f'/touchkey?uid={str(args.uid)}&username={str(args.username)}&fail=1'
return 301, bytes()
return RedirectResponse(f'/touchkey?uid={str(args.uid)}&username={str(args.username)}')
session_vars['authenticated_user'] = user.id
session_vars['authentication_level'] = 1
headers['Location'] = '/'
return 301, None
return 405, None
return RedirectResponse('/')
raise HttpException(405)

View file

@ -50,7 +50,7 @@ class RequestArguments(object):
"""
return _View.of(self.__container[key])
def __iter__(self) -> Iterator['RequestArguments']:
def __iter__(self) -> Iterator['RequestArgument']:
"""
Returns an iterator over the values in this instance. Values are represented as immutable views.

View file

@ -0,0 +1,63 @@
from jinja2 import Environment, Template
class PageletResponse:
"""
Base class for pagelet return values that require more action than simply sending plain data.
An instance of this base class will result in an empty 200 OK response.
"""
def __init__(self, status: int = 200):
"""
Create an empty response.
:param status: The HTTP status code, defaults to 200 (OK).
"""
self.status: int = status
class RedirectResponse(PageletResponse):
"""
A pagelet response that causes the server to redirect to another location, using a 301 Permanently Moved (uncached)
response status, and a Location header.
"""
def __init__(self, location: str):
"""
Create a redirection response with the given redirection location.
:param location: The location to redirect to.
"""
super().__init__(status=301)
self.location: str = location
class TemplateResponse(PageletResponse):
"""
A pagelet response that causes the server to load a Jinja2 template and render it with the provided arguments, then
sending the result as response body, with a 200 OK response status.
"""
def __init__(self, name: str, **kwargs):
"""
Create a template response with the given template name and arguments.
:param name: Name of the template to load.
:param kwargs: Arguments for rendering the template, will be passed to jinja2.Template.render as is.
"""
super().__init__()
self.name: str = name
self.kwargs = kwargs
def _render(self, jinja_env: Environment) -> bytes:
"""
Load and render the template using the Jinja2 environment managed by the web server instance. This method
should not be called by a pagelet.
:param jinja_env: The Jinja2 environment.
:return: An UTF-8 encoded bytes object containing the template rendering result.
"""
template: Template = jinja_env.get_template(self.name)
return template.render(**self.kwargs).encode('utf-8')

View file

@ -9,6 +9,8 @@ from abc import ABC
from datetime import datetime
from http.server import HTTPServer
import jinja2
from matemat.webserver import pagelet, RequestArguments
@ -16,6 +18,8 @@ class HttpResponse:
"""
A really basic HTTP response container and parser class, just good enough for unit testing a HTTP server, if even.
DO NOT USE THIS OUTSIDE UNIT TESTING!
Usage:
response = HttpResponse()
while response.parse_phase != 'done'
@ -105,6 +109,10 @@ class MockServer:
self.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict()
# Webroot for statically served content
self.webroot: str = webroot
# Jinja environment with a single, static template
self.jinja_env = jinja2.Environment(
loader=jinja2.DictLoader({'test.txt': 'Hello, {{ what }}!'})
)
class MockSocket(bytes):
@ -161,16 +169,16 @@ def test_pagelet(path: str):
RequestArguments,
Dict[str, Any],
Dict[str, str]],
Tuple[int, Union[bytes, str]]]):
Union[bytes, str, Tuple[int, str]]]):
@pagelet(path)
def testing_wrapper(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]):
status, body = fun(method, path, args, session_vars, headers)
headers['X-Test-Pagelet'] = fun.__name__
return status, body
result = fun(method, path, args, session_vars, headers)
return result
return testing_wrapper
return with_testing_headers

View file

@ -24,7 +24,7 @@ def post_test_pagelet(method: str,
dump += f'{a.name}: {a.get_str()}\n'
else:
dump += f'{a.name}: {codecs.encode(a.get_bytes(), "hex").decode("utf-8")}\n'
return 200, dump
return dump
class TestPost(AbstractHttpdTest):

View file

@ -1,31 +1,62 @@
from typing import Any, Dict
from typing import Any, Dict, Union
import os
import os.path
from matemat.webserver import HttpHandler, RequestArguments
from matemat.exceptions import HttpException
from matemat.webserver import HttpHandler, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
@test_pagelet('/just/testing/serve_pagelet_ok')
def serve_test_pagelet_ok(method: str,
@test_pagelet('/just/testing/serve_pagelet_str')
def serve_test_pagelet_str(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]):
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'text/plain'
return 200, 'serve test pagelet ok'
return 'serve test pagelet str'
@test_pagelet('/just/testing/serve_pagelet_bytes')
def serve_test_pagelet_bytes(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'application/octet-stream'
return b'serve\x80test\xffpagelet\xfebytes'
@test_pagelet('/just/testing/serve_pagelet_redirect')
def serve_test_pagelet_redirect(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
return RedirectResponse('/foo/bar')
@test_pagelet('/just/testing/serve_pagelet_template')
def serve_test_pagelet_template(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'text/plain'
return TemplateResponse('test.txt', what='World')
# noinspection PyTypeChecker
@test_pagelet('/just/testing/serve_pagelet_fail')
def serve_test_pagelet_fail(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]):
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
session_vars['test'] = 'hello, world!'
headers['Content-Type'] = 'text/plain'
return 500, 'serve test pagelet fail'
raise HttpException()
class TestServe(AbstractHttpdTest):
@ -44,17 +75,29 @@ class TestServe(AbstractHttpdTest):
f.write('This should not be readable')
os.chmod(forbidden, 0)
def test_serve_pagelet_ok(self):
def test_serve_pagelet_str(self):
# Call the test pagelet that produces a 200 OK result
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_ok HTTP/1.1\r\n\r\n')
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_str HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure the correct pagelet was called
self.assertEqual('serve_test_pagelet_ok', packet.pagelet)
self.assertEqual('serve_test_pagelet_str', packet.pagelet)
# Make sure the expected content is served
self.assertEqual(200, packet.statuscode)
self.assertEqual(b'serve test pagelet ok', packet.body)
self.assertEqual(b'serve test pagelet str', packet.body)
def test_serve_pagelet_bytes(self):
# Call the test pagelet that produces a 200 OK result
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_bytes HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure the correct pagelet was called
self.assertEqual('serve_test_pagelet_bytes', packet.pagelet)
# Make sure the expected content is served
self.assertEqual(200, packet.statuscode)
self.assertEqual(b'serve\x80test\xffpagelet\xfebytes', packet.body)
def test_serve_pagelet_fail(self):
# Call the test pagelet that produces a 500 Internal Server Error result
@ -62,11 +105,34 @@ class TestServe(AbstractHttpdTest):
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure the correct pagelet was called
self.assertEqual('serve_test_pagelet_fail', packet.pagelet)
# Make sure the expected content is served
# Make sure an error is raised
self.assertEqual(500, packet.statuscode)
self.assertEqual(b'serve test pagelet fail', packet.body)
def test_serve_pagelet_redirect(self):
# Call the test pagelet that redirects to another path
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_redirect HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure the correct pagelet was called
self.assertEqual('serve_test_pagelet_redirect', packet.pagelet)
# Make sure the correct redirect is issued
self.assertEqual(301, packet.statuscode)
self.assertEqual('/foo/bar', packet.headers['Location'])
# Make sure the response body is empty
self.assertEqual(0, len(packet.body))
def test_serve_pagelet_template(self):
# Call the test pagelet that redirects to another path
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_template HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure the correct pagelet was called
self.assertEqual('serve_test_pagelet_template', packet.pagelet)
self.assertEqual(200, packet.statuscode)
# Make sure the response body was rendered correctly by the templating engine
self.assertEqual(b'Hello, World!', packet.body)
def test_serve_static_ok(self):
# Request a static resource

View file

@ -16,7 +16,7 @@ def session_test_pagelet(method: str,
headers: Dict[str, str]):
session_vars['test'] = 'hello, world!'
headers['Content-Type'] = 'text/plain'
return 200, 'session test'
return 'session test'
class TestSession(AbstractHttpdTest):

View file

@ -97,7 +97,7 @@ def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 't
tokens = urllib.parse.urlparse(request)
# Parse the GET arguments
if len(tokens.query) == 0:
getargs = dict()
getargs: Dict[str, List[str]] = dict()
else:
getargs = urllib.parse.parse_qs(tokens.query, strict_parsing=True, keep_blank_values=True, errors='strict')
@ -112,7 +112,7 @@ def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 't
# Parse the POST body
pb: str = postbody.decode('utf-8')
if len(pb) == 0:
postargs = dict()
postargs: Dict[str, List[str]] = dict()
else:
postargs = urllib.parse.parse_qs(pb, strict_parsing=True, keep_blank_values=True, errors='strict')
# Write all POST values into the dict, overriding potential duplicates from GET

Binary file not shown.

Before

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 477 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 9.5 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 11 KiB

View file

@ -1,7 +1,7 @@
{% extends "base.html" %}
{% block header %}
{% if user.is_admin %}
{% if authuser.is_admin %}
<h1>Administration</h1>
{% else %}
<h1>Settings</h1>
@ -13,7 +13,7 @@
{% include "admin_all.html" %}
{% if user.is_admin %}
{% if authuser.is_admin %}
{% include "admin_restricted.html" %}
{% endif %}

View file

@ -1,18 +1,18 @@
<section id="admin-myaccount">
<h2>My Account</h2>
<form id="admin-myaccount-form" method="post" action="/admin?change=account">
<form id="admin-myaccount-form" method="post" action="/admin?change=account" accept-charset="UTF-8">
<label for="admin-myaccount-username">Username: </label>
<input id="admin-myaccount-username" type="text" name="username" value="{{ user.name }}" /><br/>
<input id="admin-myaccount-username" type="text" name="username" value="{{ authuser.name }}" /><br/>
<label for="admin-myaccount-email">E-Mail: </label>
<input id="admin-myaccount-email" type="text" name="email" value="{% if user.email is not none %}{{ user.email }}{% endif %}" /><br/>
<input id="admin-myaccount-email" type="text" name="email" value="{% if authuser.email is not none %}{{ authuser.email }}{% endif %}" /><br/>
<label for="admin-myaccount-ismember">Member: </label>
<input id="admin-myaccount-ismember" type="checkbox" disabled="disabled" {% if user.is_member %} checked="checked" {% endif %}/><br/>
<input id="admin-myaccount-ismember" type="checkbox" disabled="disabled" {% if authuser.is_member %} checked="checked" {% endif %}/><br/>
<label for="admin-myaccount-isadmin">Admin: </label>
<input id="admin-myaccount-isadmin" type="checkbox" disabled="disabled" {% if user.is_admin %} checked="checked" {% endif %}/><br/>
<input id="admin-myaccount-isadmin" type="checkbox" disabled="disabled" {% if authuser.is_admin %} checked="checked" {% endif %}/><br/>
<input type="submit" value="Save changes" />
</form>
@ -21,8 +21,8 @@
<section id="admin-avatar">
<h2>Avatar</h2>
<form id="admin-avatar-form" method="post" action="/admin?change=avatar" enctype="multipart/form-data">
<img src="/img/thumbnails/users/{{ user.id }}.png" alt="Avatar of {{ user.name }}" /><br/>
<form id="admin-avatar-form" method="post" action="/admin?change=avatar" enctype="multipart/form-data" accept-charset="UTF-8">
<img src="/img/thumbnails/users/{{ authuser.id }}.png" alt="Avatar of {{ authuser.name }}" /><br/>
<label for="admin-avatar-avatar">Upload new file: </label>
<input id="admin-avatar-avatar" type="file" name="avatar" accept="image/png" /><br/>
@ -34,7 +34,7 @@
<section id="admin-password">
<h2>Password</h2>
<form id="admin-password-form" method="post" action="/admin?change=password">
<form id="admin-password-form" method="post" action="/admin?change=password" accept-charset="UTF-8">
<label for="admin-password-oldpass">Current password: </label>
<input id="admin-password-oldpass" type="password" name="oldpass" /><br/>
@ -51,7 +51,7 @@
<section id="admin-touchkey">
<h2>Touchkey</h2>
<form id="admin-touchkey-form" method="post" action="/admin?change=touchkey">
<form id="admin-touchkey-form" method="post" action="/admin?change=touchkey" accept-charset="UTF-8">
<label for="admin-touchkey-oldpass">Current password: </label>
<input id="admin-touchkey-oldpass" type="password" name="oldpass" /><br/>

View file

@ -1,7 +1,7 @@
<section id="admin-restricted-newuser">
<h2>Create New User</h2>
<form id="admin-newuser-form" method="post" action="/admin?adminchange=newuser">
<form id="admin-newuser-form" method="post" action="/admin?adminchange=newuser" accept-charset="UTF-8">
<label for="admin-newuser-username">Username: </label>
<input id="admin-newuser-username" type="text" name="username" /><br/>
@ -24,7 +24,7 @@
<section id="admin-restricted-moduser">
<h2>Modify User</h2>
<form id="admin-moduser-form" method="get" action="/moduser">
<form id="admin-moduser-form" method="get" action="/moduser" accept-charset="UTF-8">
<label for="admin-moduser-userid">Username: </label>
<select id="admin-moduser-userid" name="userid">
{% for user in users %}
@ -39,7 +39,7 @@
<section id="admin-restricted-newproduct">
<h2>Create New Product</h2>
<form id="admin-newproduct-form" method="post" action="/admin?adminchange=newproduct" enctype="multipart/form-data">
<form id="admin-newproduct-form" method="post" action="/admin?adminchange=newproduct" enctype="multipart/form-data" accept-charset="UTF-8">
<label for="admin-newproduct-name">Name: </label>
<input id="admin-newproduct-name" type="text" name="name" /><br/>
@ -59,7 +59,7 @@
<section id="admin-restricted-restock">
<h2>Restock Product</h2>
<form id="admin-restock-form" method="post" action="/admin?adminchange=restock">
<form id="admin-restock-form" method="post" action="/admin?adminchange=restock" accept-charset="UTF-8">
<label for="admin-restock-productid">Product: </label>
<select id="admin-restock-productid" name="productid">
{% for product in products %}
@ -77,7 +77,7 @@
<section id="admin-restricted-modproduct">
<h2>Modify Product</h2>
<form id="admin-modproduct-form" method="get" action="/modproduct">
<form id="admin-modproduct-form" method="get" action="/modproduct" accept-charset="UTF-8">
<label for="admin-modproduct-productid">Product: </label>
<select id="admin-modproduct-productid" name="productid">
{% for product in products %}

View file

@ -11,12 +11,14 @@
<a href="/">Home</a>
{% if authlevel|default(0) > 1 %}
{% if user.is_admin %}
{% if authuser is defined %}
{% if authuser.is_admin %}
<a href="/admin">Administration</a>
{% else %}
<a href="/admin">Settings</a>
{% endif %}
{% endif %}
{% endif %}
{% endblock %}
</header>
<main>

View file

@ -8,7 +8,7 @@
{% block main %}
<form method="post" action="/login" id="loginform">
<form method="post" action="/login" id="loginform" accept-charset="UTF-8">
<label for="login-username">Username: </label>
<input id="login-username" type="text" name="username" /><br/>

View file

@ -10,7 +10,7 @@
<section id="modproduct">
<h2>Modify {{ product.name }}</h2>
<form id="modproduct-form" method="post" action="/modproduct?change=update" enctype="multipart/form-data">
<form id="modproduct-form" method="post" action="/modproduct?change=update" enctype="multipart/form-data" accept-charset="UTF-8">
<label for="modproduct-name">Name: </label>
<input id="modproduct-name" type="text" name="name" value="{{ product.name }}" /><br/>
@ -33,8 +33,8 @@
<input type="submit" value="Save changes">
</form>
<form id="modproduct-delproduct-form" method="post" action="/modproduct?change=del">
<input id="modproduct-delproduct-userid" type="hidden" name="productid" value="{{ product.id }}" /><br/>
<form id="modproduct-delproduct-form" method="post" action="/modproduct?change=del" accept-charset="UTF-8">
<input id="modproduct-delproduct-productid" type="hidden" name="productid" value="{{ product.id }}" /><br/>
<input type="submit" value="Delete product" />
</form>

View file

@ -10,7 +10,7 @@
<section id="moduser-account">
<h2>Modify {{ user.name }}</h2>
<form id="moduser-account-form" method="post" action="/moduser?change=update" enctype="multipart/form-data">
<form id="moduser-account-form" method="post" action="/moduser?change=update" enctype="multipart/form-data" accept-charset="UTF-8">
<label for="moduser-account-username">Username: </label>
<input id="moduser-account-username" type="text" name="username" value="{{ user.name }}" /><br/>
@ -39,7 +39,7 @@
<input type="submit" value="Save changes">
</form>
<form id="moduser-deluser-form" method="post" action="/moduser?change=del">
<form id="moduser-deluser-form" method="post" action="/moduser?change=del" accept-charset="UTF-8">
<input id="moduser-deluser-userid" type="hidden" name="userid" value="{{ user.id }}" /><br/>
<input type="submit" value="Delete user" />
</form>

View file

@ -1,7 +1,7 @@
{% extends "base.html" %}
{% block header %}
<h1>Welcome, {{ user.name }}</h1>
<h1>Welcome, {{ authuser.name }}</h1>
{{ super() }}
@ -9,7 +9,7 @@
{% block main %}
Your balance: {{ user.balance }}
Your balance: {{ authuser.balance }}
<a href="/deposit?n=100">Deposit CHF 1</a>
<a href="/deposit?n=1000">Deposit CHF 10</a>
@ -19,7 +19,7 @@ Your balance: {{ user.balance }}
<a href="/buy?pid={{ product.id }}">
<span class="thumblist-title">{{ product.name }}</span>
<span class="thumblist-detail">Price:
{% if user.is_member %}
{% if authuser.is_member %}
{{ product.price_member }}
{% else %}
{{ product.price_non_member }}

View file

@ -18,7 +18,7 @@
{% block main %}
{% include "touchkey.svg" %}
<form method="post" action="/touchkey" id="loginform">
<form method="post" action="/touchkey" id="loginform" accept-charset="UTF-8">
<input type="hidden" name="uid" value="{{ uid }}" />
<input type="hidden" name="username" value="{{ username }}" />
<input type="hidden" name="touchkey" value="" id="loginform-touchkey-value" />