Merge branch 'webserver-impl' into 'master'

Web Server Implementation

See merge request s3lph/matemat!7
This commit is contained in:
s3lph 2018-07-11 16:45:37 +00:00
commit 2347908381
37 changed files with 2900 additions and 48 deletions

2
.gitignore vendored
View file

@ -8,4 +8,4 @@
**/.mypy_cache/ **/.mypy_cache/
*.sqlite3 *.sqlite3
*.db *.db

View file

@ -1,5 +1,5 @@
--- ---
image: debian:buster image: s3lph/matemat-ci:20180619-01
stages: stages:
- test - test
@ -8,9 +8,6 @@ stages:
test: test:
stage: test stage: test
script: script:
- apt-get update -qy
- apt-get install -y --no-install-recommends python3-dev python3-pip python3-coverage python3-setuptools build-essential
- pip3 install wheel
- pip3 install -r requirements.txt - pip3 install -r requirements.txt
- python3-coverage run --branch -m unittest discover matemat - python3-coverage run --branch -m unittest discover matemat
- python3-coverage report -m --include 'matemat/*' --omit '*/test_*.py' - python3-coverage report -m --include 'matemat/*' --omit '*/test_*.py'
@ -18,9 +15,6 @@ test:
codestyle: codestyle:
stage: codestyle stage: codestyle
script: script:
- apt-get update -qy
- apt-get install -y --no-install-recommends python3-dev python3-pip python3-coverage python3-setuptools build-essential
- pip3 install wheel pycodestyle mypy
- pip3 install -r requirements.txt - pip3 install -r requirements.txt
- pycodestyle matemat - pycodestyle matemat
# - mypy --ignore-missing-imports --strict -p matemat # - mypy --ignore-missing-imports --strict -p matemat

3
.gitmodules vendored Normal file
View file

@ -0,0 +1,3 @@
[submodule "doc"]
path = doc
url = gitlab.com:s3lph/matemat.wiki.git

10
Dockerfile Normal file
View file

@ -0,0 +1,10 @@
FROM debian:buster
RUN useradd -d /home/matemat -m matemat
RUN apt-get update -qy
RUN apt-get install -y --no-install-recommends python3-dev python3-pip python3-coverage python3-setuptools build-essential
RUN pip3 install wheel pycodestyle mypy
WORKDIR /home/matemat
USER matemat

View file

@ -1,14 +1,25 @@
# matemat # Matemat
[![pipeline status](https://gitlab.com/s3lph/matemat/badges/master/pipeline.svg)](https://gitlab.com/s3lph/matemat/commits/master) [![pipeline status](https://gitlab.com/s3lph/matemat/badges/master/pipeline.svg)][master]
[![coverage report](https://gitlab.com/s3lph/matemat/badges/master/coverage.svg)](https://gitlab.com/s3lph/matemat/commits/master) [![coverage report](https://gitlab.com/s3lph/matemat/badges/master/coverage.svg)][master]
A web service for automated stock-keeping of a soda machine written in Python.
It provides a touch-input-friendly user interface (as most input happens through the
soda machine's touch screen).
This project intends to provide a well-tested and maintainable alternative to
[ckruse/matemat][oldapp] (last commit 2013-07-09).
## Further Documentation
[Wiki][wiki]
## Dependencies ## Dependencies
- Python 3.6 - Python 3 (>=3.6)
- Python dependencies: - Python dependencies:
- apsw - apsw
- bcrypt - jinja2
## Usage ## Usage
@ -16,6 +27,17 @@
python -m matemat python -m matemat
``` ```
## Contributors
- s3lph
- SPiNNiX
## License ## License
[MIT License](https://gitlab.com/s3lph/matemat/blob/master/LICENSE) [MIT License][mit-license]
[oldapp]: https://github.com/ckruse/matemat
[mit-license]: https://gitlab.com/s3lph/matemat/blob/master/LICENSE
[master]: https://gitlab.com/s3lph/matemat/commits/master
[wiki]: https://gitlab.com/s3lph/matemat/wikis/home

1
doc Submodule

@ -0,0 +1 @@
Subproject commit 9634785e5621b324f72598b3cee83bbc45c25d7b

16
matemat/__main__.py Normal file
View file

@ -0,0 +1,16 @@
import sys
if __name__ == '__main__':
# Those imports are actually needed, as they implicitly register pagelets.
# noinspection PyUnresolvedReferences
from matemat.webserver.pagelets import *
from matemat.webserver import MatematWebserver
# Read HTTP port from command line
port: int = 8080
if len(sys.argv) > 1:
port = int(sys.argv[1])
# Start the web server
MatematWebserver(port=port).start()

View file

@ -1,3 +1,6 @@
"""
This package provides a developer-friendly API to the SQLite3 database backend of the Matemat software.
"""
from .wrapper import DatabaseWrapper from .wrapper import DatabaseWrapper
from .facade import MatematDatabase from .facade import MatematDatabase

View file

@ -1,12 +1,18 @@
from typing import List, Optional, Any, Type from typing import List, Optional, Any, Type
import bcrypt import crypt
from matemat.primitives import User, Product from matemat.primitives import User, Product
from matemat.exceptions import AuthenticationError, DatabaseConsistencyError from matemat.exceptions import AuthenticationError, DatabaseConsistencyError
from matemat.db import DatabaseWrapper from matemat.db import DatabaseWrapper
# TODO: Change to METHOD_BLOWFISH when adopting Python 3.7
"""
The method to use for password hashing.
"""
_CRYPT_METHOD = crypt.METHOD_SHA512
class MatematDatabase(object): class MatematDatabase(object):
""" """
@ -92,7 +98,7 @@ class MatematDatabase(object):
:raises ValueError: If a user with the same name already exists. :raises ValueError: If a user with the same name already exists.
""" """
# Hash the password. # Hash the password.
pwhash: str = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(12)) pwhash: str = crypt.crypt(password, crypt.mksalt(_CRYPT_METHOD))
user_id: int = -1 user_id: int = -1
with self.db.transaction() as c: with self.db.transaction() as c:
# Look up whether a user with the same name already exists. # Look up whether a user with the same name already exists.
@ -138,10 +144,12 @@ class MatematDatabase(object):
if row is None: if row is None:
raise AuthenticationError('User does not exist') raise AuthenticationError('User does not exist')
user_id, username, email, pwhash, tkhash, admin, member = row user_id, username, email, pwhash, tkhash, admin, member = row
if password is not None and not bcrypt.checkpw(password.encode('utf-8'), pwhash): if password is not None and crypt.crypt(password, pwhash) != pwhash:
raise AuthenticationError('Password mismatch') raise AuthenticationError('Password mismatch')
elif touchkey is not None and not bcrypt.checkpw(touchkey.encode('utf-8'), tkhash): elif touchkey is not None and tkhash is not None and crypt.crypt(touchkey, tkhash) != tkhash:
raise AuthenticationError('Touchkey mismatch') raise AuthenticationError('Touchkey mismatch')
elif touchkey is not None and tkhash is None:
raise AuthenticationError('Touchkey not set')
return User(user_id, username, email, admin, member) return User(user_id, username, email, admin, member)
def change_password(self, user: User, oldpass: str, newpass: str, verify_password: bool = True) -> None: def change_password(self, user: User, oldpass: str, newpass: str, verify_password: bool = True) -> None:
@ -163,10 +171,10 @@ class MatematDatabase(object):
if row is None: if row is None:
raise AuthenticationError('User does not exist in database.') raise AuthenticationError('User does not exist in database.')
# Verify the old password, if it should be verified. # Verify the old password, if it should be verified.
if verify_password and not bcrypt.checkpw(oldpass.encode('utf-8'), row[0]): if verify_password and crypt.crypt(oldpass, row[0]) != row[0]:
raise AuthenticationError('Old password does not match.') raise AuthenticationError('Old password does not match.')
# Hash the new password and write it to the database. # Hash the new password and write it to the database.
pwhash: str = bcrypt.hashpw(newpass.encode('utf-8'), bcrypt.gensalt(12)) pwhash: str = crypt.crypt(newpass, crypt.mksalt(_CRYPT_METHOD))
c.execute(''' c.execute('''
UPDATE users SET password = :pwhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id UPDATE users SET password = :pwhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
''', { ''', {
@ -193,10 +201,10 @@ class MatematDatabase(object):
if row is None: if row is None:
raise AuthenticationError('User does not exist in database.') raise AuthenticationError('User does not exist in database.')
# Verify the password, if it should be verified. # Verify the password, if it should be verified.
if verify_password and not bcrypt.checkpw(password.encode('utf-8'), row[0]): if verify_password and crypt.crypt(password, row[0]) != row[0]:
raise AuthenticationError('Password does not match.') raise AuthenticationError('Password does not match.')
# Hash the new touchkey and write it to the database. # Hash the new touchkey and write it to the database.
tkhash: str = bcrypt.hashpw(touchkey.encode('utf-8'), bcrypt.gensalt(12)) if touchkey is not None else None tkhash: str = crypt.crypt(touchkey, crypt.mksalt(_CRYPT_METHOD)) if touchkey is not None else None
c.execute(''' c.execute('''
UPDATE users SET touchkey = :tkhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id UPDATE users SET touchkey = :tkhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
''', { ''', {

View file

@ -1,7 +1,7 @@
import unittest import unittest
import bcrypt import crypt
from matemat.db import MatematDatabase from matemat.db import MatematDatabase
from matemat.exceptions import AuthenticationError, DatabaseConsistencyError from matemat.exceptions import AuthenticationError, DatabaseConsistencyError
@ -10,6 +10,7 @@ from matemat.exceptions import AuthenticationError, DatabaseConsistencyError
class DatabaseTest(unittest.TestCase): class DatabaseTest(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
# Create an in-memory database for testing
self.db = MatematDatabase(':memory:') self.db = MatematDatabase(':memory:')
def test_create_user(self) -> None: def test_create_user(self) -> None:
@ -57,7 +58,7 @@ class DatabaseTest(unittest.TestCase):
u = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com') u = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com')
# Add a touchkey without using the provided function # Add a touchkey without using the provided function
c.execute('''UPDATE users SET touchkey = :tkhash WHERE user_id = :user_id''', { c.execute('''UPDATE users SET touchkey = :tkhash WHERE user_id = :user_id''', {
'tkhash': bcrypt.hashpw(b'0123', bcrypt.gensalt(12)), 'tkhash': crypt.crypt('0123', crypt.mksalt(crypt.METHOD_SHA512)),
'user_id': u.id 'user_id': u.id
}) })
user = db.login('testuser', 'supersecurepassword') user = db.login('testuser', 'supersecurepassword')

View file

@ -7,6 +7,7 @@ from matemat.db import DatabaseWrapper
class DatabaseTest(unittest.TestCase): class DatabaseTest(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
# Create an in-memory database for testing
self.db = DatabaseWrapper(':memory:') self.db = DatabaseWrapper(':memory:')
def test_create_schema(self) -> None: def test_create_schema(self) -> None:

View file

@ -2,7 +2,7 @@
from typing import Optional from typing import Optional
class AuthenticationError(BaseException): class AuthenticationError(Exception):
def __init__(self, msg: Optional[str] = None) -> None: def __init__(self, msg: Optional[str] = None) -> None:
super().__init__() super().__init__()

View file

@ -0,0 +1,20 @@
class HttpException(Exception):
def __init__(self, status: int = 500, title: str = 'An error occurred', message: str = None) -> None:
super().__init__()
self.__status: int = status
self.__title: str = title
self.__message: str = message
@property
def status(self) -> int:
return self.__status
@property
def title(self) -> str:
return self.__title
@property
def message(self) -> str:
return self.__message

View file

@ -1,3 +1,7 @@
"""
This package provides custom exception classes used in the Matemat codebase.
"""
from .AuthenticatonError import AuthenticationError from .AuthenticatonError import AuthenticationError
from .DatabaseConsistencyError import DatabaseConsistencyError from .DatabaseConsistencyError import DatabaseConsistencyError
from .HttpException import HttpException

View file

@ -1,3 +1,6 @@
"""
This package provides the 'primitive types' the Matemat software deals with - namely users and products.
"""
from .User import User from .User import User
from .Product import Product from .Product import Product

View file

@ -0,0 +1,11 @@
"""
The Matemat Webserver.
This package provides the webserver for the Matemat software. It uses Python's http.server and extends it with an event
API that can be used by 'pagelets' - single pages of a web service. If a request cannot be handled by a pagelet, the
server will attempt to serve the request with a static resource in a previously configured webroot directory.
"""
from .requestargs import RequestArgument, RequestArguments
from .responses import PageletResponse, RedirectResponse, TemplateResponse
from .httpd import MatematWebserver, HttpHandler, pagelet

View file

@ -1,61 +1,399 @@
from typing import Tuple, Dict from typing import Any, Callable, Dict, Tuple, Type, Union
import traceback
import os
import socket
import mimetypes
from socketserver import TCPServer
from http.server import HTTPServer, BaseHTTPRequestHandler from http.server import HTTPServer, BaseHTTPRequestHandler
from http.cookies import SimpleCookie from http.cookies import SimpleCookie
from uuid import uuid4 from uuid import uuid4
from datetime import datetime, timedelta from datetime import datetime, timedelta
import jinja2
from matemat import __version__ as matemat_version from matemat import __version__ as matemat_version
from matemat.exceptions import HttpException
from matemat.webserver import RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.webserver.util import parse_args
#
# Python internal class hacks
#
# Enable IPv6 support (IPv6/IPv4 dual-stack support should be implicitly enabled)
TCPServer.address_family = socket.AF_INET6
# Redirect internal logging to somewhere else, or, for now, silently discard (TODO: logger will come later)
BaseHTTPRequestHandler.log_request = lambda self, code='-', size='-': None
BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None
# Dictionary to hold registered pagelet paths and their handler functions
_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
str, # Request path
RequestArguments, # HTTP Request arguments
Dict[str, Any], # Session vars
Dict[str, str]], # Response headers
Union[ # Return type: either a response body, or a redirect
bytes, str, # Response body: will assign HTTP/1.0 200 OK
PageletResponse, # A generic response
]]] = dict()
# Inactivity timeout for client sessions
_SESSION_TIMEOUT: int = 3600
_MAX_POST: int = 1_000_000
def pagelet(path: str):
"""
Annotate a function to act as a pagelet (part of a website). The function will be called if a request is made to
the path specified as argument to the annotation.
The function must have the following signature:
(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])
-> Union[bytes, str, Tuple[int, str]]
method: The HTTP method (GET, POST) that was used.
path: The path that was requested.
args: The arguments that were passed with the request (as GET or POST arguments).
session_vars: The session storage. May be read from and written to.
headers: The dictionary of HTTP response headers. Add headers you wish to send with the response.
returns: One of the following:
- A HTTP Response body as str or bytes
- A PageletResponse class instance: An instance of (a subclass of)
matemat.webserver.PageletResponse, e.g. encapsulating a redirect or a Jinja2 template.
raises: HttpException: If a non-200 HTTP status code should be returned
:param path: The path to register the function for.
"""
def http_handler(fun: Callable[[str,
str,
RequestArguments,
Dict[str, Any],
Dict[str, str]],
Union[
bytes, str,
PageletResponse
]]):
# Add the function to the dict of pagelets
_PAGELET_PATHS[path] = fun
# Don't change the function itself at all
return fun
# Return the inner function (Python requires a "real" function annotation to not have any arguments except
# the function itself)
return http_handler
class MatematHTTPServer(HTTPServer):
"""
A http.server.HTTPServer subclass that acts as a container for data that must be persistent between requests.
"""
def __init__(self,
server_address: Any,
handler: Type[BaseHTTPRequestHandler],
staticroot: str,
templateroot: str,
bind_and_activate: bool = True) -> None:
super().__init__(server_address, handler, bind_and_activate)
# Resolve webroot directory
self.webroot = os.path.abspath(staticroot)
# Set up session vars dict
self.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict()
# Set up the Jinja2 environment
self.jinja_env: jinja2.Environment = jinja2.Environment(
loader=jinja2.FileSystemLoader(os.path.abspath(templateroot))
)
class MatematWebserver(object): class MatematWebserver(object):
"""
Then main webserver class, internally uses Python's http.server.
def __init__(self) -> None: The server will serve a pagelet, if one is defined for a request path, else it will attempt to serve a static
self._httpd = HTTPServer(('', 8080), HttpHandler) resource from the webroot.
Usage:
# Listen on all interfaces on port 80 (dual-stack IPv6/IPv4)
server = MatematWebserver('::', 80, webroot='/var/www/matemat')
# Start the server. This call blocks while the server is running.
server.start()
"""
def __init__(self,
listen: str = '::',
port: int = 80,
staticroot: str = './static',
templateroot: str = './templates') -> None:
"""
Instantiate a MatematWebserver.
:param listen: The IPv4 or IPv6 address to listen on.
:param port: The TCP port to listen on.
:param staticroot: Path to the static webroot directory.
:param templateroot: Path to the Jinja2 templates root directory.
"""
if len(listen) == 0:
# Empty string should be interpreted as all addresses
listen = '::'
# IPv4 address detection heuristic
if ':' not in listen and '.' in listen:
# Rewrite IPv4 address to IPv6-mapped form
listen = f'::ffff:{listen}'
# Create the http server
self._httpd = MatematHTTPServer((listen, port), HttpHandler, staticroot, templateroot)
def start(self) -> None: def start(self) -> None:
"""
Start the web server. This call blocks while the server is running.
"""
self._httpd.serve_forever() self._httpd.serve_forever()
class HttpHandler(BaseHTTPRequestHandler): class HttpHandler(BaseHTTPRequestHandler):
"""
HTTP Request handler.
This class parses HTTP requests, and calls the appropriate pagelets, or fetches a static resource from the webroot
directory.
"""
def __init__(self, request: bytes, client_address: Tuple[str, int], server: HTTPServer) -> None: def __init__(self, request: bytes, client_address: Tuple[str, int], server: HTTPServer) -> None:
super().__init__(request, client_address, server) super().__init__(request, client_address, server)
self._session_vars: Dict[str, Tuple[datetime, Dict[str, object]]] = dict() self.server: MatematHTTPServer
print(self._session_vars)
@property @property
def server_version(self) -> str: def server_version(self) -> str:
return f'matemat/{matemat_version}' return f'matemat/{matemat_version}'
def start_session(self) -> Tuple[str, datetime]: def _start_session(self) -> Tuple[str, datetime]:
"""
Start a new session, or resume the session identified by the session cookie sent in the HTTP request.
:return: A tuple consisting of the session ID (a UUID string), and the session timeout date.
"""
# Reference date for session timeout
now = datetime.utcnow() now = datetime.utcnow()
# Parse cookies sent by the client
cookiestring = '\n'.join(self.headers.get_all('Cookie', failobj=[])) cookiestring = '\n'.join(self.headers.get_all('Cookie', failobj=[]))
cookie = SimpleCookie() cookie = SimpleCookie()
cookie.load(cookiestring) cookie.load(cookiestring)
session_id = cookie['matemat_session_id'] if 'matemat_session_id' in cookie else str(uuid4()) # Read the client's session ID, if any
session_id = str(cookie['matemat_session_id'].value) if 'matemat_session_id' in cookie else None
# If there is no active session, create a new session ID
if session_id is None or session_id not in self.server.session_vars:
session_id = str(uuid4())
if session_id in self._session_vars and self._session_vars[session_id][0] < now: # Check for session timeout
self.end_session(session_id) if session_id in self.server.session_vars and self.server.session_vars[session_id][0] < now:
raise TimeoutError('Session timed out') self._end_session(session_id)
elif session_id not in self._session_vars: raise TimeoutError('Session timed out.')
self._session_vars[session_id] = (now + timedelta(hours=1)), dict() # Update or initialize the session timeout
return session_id, now if session_id not in self.server.session_vars:
self.server.session_vars[session_id] = (now + timedelta(seconds=_SESSION_TIMEOUT)), dict()
else:
self.server.session_vars[session_id] = \
(now + timedelta(seconds=_SESSION_TIMEOUT), self.server.session_vars[session_id][1])
# Return the session ID and timeout
return session_id, self.server.session_vars[session_id][0]
def end_session(self, session_id: str) -> None: def _end_session(self, session_id: str) -> None:
if session_id in self._session_vars: """
del self._session_vars[session_id] Destroy a session identified by the session ID.
def do_GET(self) -> None: :param session_id: ID of the session to destroy.
"""
if session_id in self.server.session_vars:
del self.server.session_vars[session_id]
def _parse_pagelet_result(self,
pagelet_res: Union[bytes, # Response body as bytes
str, # Response body as str
PageletResponse], # Encapsulated or unresolved response body
headers: Dict[str, str]) \
-> Tuple[int, bytes]:
"""
Process the return value of a pagelet function call.
:param pagelet_res: The pagelet return value.
:param headers: The dict of HTTP response headers, needed for setting the redirect header.
:return: The HTTP Response status code (an int) and body (a bytes).
:raises TypeError: If the pagelet result was not in the expected form.
"""
# The HTTP Response Status Code, defaults to 200 OK
hsc: int = 200
# The HTTP Response body, defaults to empty
data: bytes = bytes()
# If the response is a bytes object, it is used without further modification
if isinstance(pagelet_res, bytes):
data = pagelet_res
# If the response is a str object, it is encoded into a bytes object
elif isinstance(pagelet_res, str):
data = pagelet_res.encode('utf-8')
# If the response is a PageletResponse object, the status code is extracted. Generation of the body depends
# on the subtype
elif isinstance(pagelet_res, PageletResponse):
hsc = pagelet_res.status
# If the object is a RedirectResponse instance, no body is needed
if isinstance(pagelet_res, RedirectResponse):
headers['Location'] = pagelet_res.location
# If the object is a TemplateRespinse instance, pass the Jinja2 environment instance for rendering
elif isinstance(pagelet_res, TemplateResponse):
# noinspection PyProtectedMember
data = pagelet_res._render(self.server.jinja_env)
# else: Empty body
else:
raise TypeError(f'Return value of pagelet not understood: {pagelet_res}')
# Return the resulting status code and body
return hsc, data
def _handle(self, method: str, path: str, args: RequestArguments) -> None:
"""
Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource.
:param method: The HTTP request method (GET, POST).
:param path: The request path without GET arguments.
:param args: Arguments sent with the request. This includes GET and POST arguments, where the POST arguments
take precedence.
"""
# Start or resume a session; redirect to / on session timeout
try: try:
session_id, timeout = self.start_session() session_id, timeout = self._start_session()
except TimeoutError: except TimeoutError:
self.send_response(302)
self.send_header('Set-Cookie', 'matemat_session_id=; expires=Thu, 01 Jan 1970 00:00:00 GMT') self.send_header('Set-Cookie', 'matemat_session_id=; expires=Thu, 01 Jan 1970 00:00:00 GMT')
self.send_error(599, 'Session Timed Out.', 'Please log in again.') self.send_header('Location', '/')
self.end_headers()
return return
self.send_response(200, 'Welcome!') self.session_id: str = session_id
# Call a pagelet function, if one is registered for the requested path
if path in _PAGELET_PATHS:
# Prepare some headers. Those can still be overwritten by the pagelet
headers: Dict[str, str] = {
'Content-Type': 'text/html',
'Cache-Control': 'no-cache'
}
# Call the pagelet function
pagelet_res = _PAGELET_PATHS[path](method, path, args, self.session_vars, headers)
# Parse the pagelet's return value, vielding a HTTP status code and a response body
hsc, data = self._parse_pagelet_result(pagelet_res, headers)
# Send the HTTP status code
self.send_response(hsc)
# Format the session cookie timeout string and send the session cookie header
expires = timeout.strftime("%a, %d %b %Y %H:%M:%S GMT")
self.send_header('Set-Cookie',
f'matemat_session_id={session_id}; expires={expires}')
# Compute the body length and add the appropriate header
headers['Content-Length'] = str(len(data))
# Send all headers set by the pagelet
for name, value in headers.items():
self.send_header(name, value)
# End the header section and write the body
self.end_headers()
self.wfile.write(data)
else:
# No pagelet function for this path, try a static serve instead
# Only HTTP GET is allowed, else reply with a 'Method Not Allowed' header
if method != 'GET':
self.send_error(405)
self.end_headers()
return
# Create the absolute resource path, resolving '..'
filepath: str = os.path.abspath(os.path.join(self.server.webroot, path[1:]))
# Make sure the file is actually inside the webroot directory and that it exists
if os.path.commonpath([filepath, self.server.webroot]) == self.server.webroot and os.path.exists(filepath):
# Open and read the file
with open(filepath, 'rb') as f:
data = f.read()
# File read successfully, send 'OK' header
self.send_response(200)
# TODO: Guess the MIME type. Unfortunately this call solely relies on the file extension, not ideal?
mimetype, _ = mimetypes.guess_type(filepath)
# Fall back to octet-stream type, if unknown
if mimetype is None:
mimetype = 'application/octet-stream'
# Send content type and length header
self.send_header('Content-Type', mimetype)
self.send_header('Content-Length', str(len(data)))
self.end_headers()
# Send the requested resource as response body
self.wfile.write(data)
else:
# File does not exist or path points outside the webroot directory
self.send_response(404)
self.end_headers()
if __name__ == '__main__': # noinspection PyPep8Naming
MatematWebserver().start() def do_GET(self) -> None:
"""
Called by BasicHTTPRequestHandler for GET requests.
"""
try:
# Parse the request and hand it to the handle function
path, args = parse_args(self.path)
self._handle('GET', path, args)
# Special handling for some errors
except HttpException as e:
self.send_error(e.status, e.title, e.message)
except PermissionError:
self.send_error(403, 'Forbidden')
except ValueError:
self.send_error(400, 'Bad Request')
except BaseException as e:
# Generic error handling
self.send_error(500, 'Internal Server Error')
print(e)
traceback.print_tb(e.__traceback__)
# noinspection PyPep8Naming
def do_POST(self) -> None:
"""
Called by BasicHTTPRequestHandler for POST requests.
"""
try:
# Read the POST body, if it exists, and its MIME type is application/x-www-form-urlencoded
clen: int = int(str(self.headers.get('Content-Length', failobj='0')))
if clen > _MAX_POST:
raise ValueError('Request too big')
ctype: str = self.headers.get('Content-Type', failobj='application/octet-stream')
post: bytes = self.rfile.read(clen)
path, args = parse_args(self.path, postbody=post, enctype=ctype)
# Parse the request and hand it to the handle function
self._handle('POST', path, args)
# Special handling for some errors
except HttpException as e:
self.send_error(e.status, e.title, e.message)
except PermissionError:
self.send_error(403, 'Forbidden')
except ValueError:
self.send_error(400, 'Bad Request')
except BaseException as e:
# Generic error handling
self.send_error(500, 'Internal Server Error')
print(e)
traceback.print_tb(e.__traceback__)
@property
def session_vars(self) -> Dict[str, Any]:
"""
Get the session variables for the current session.
:return: Dictionary of named session variables.
"""
return self.server.session_vars[self.session_id][1]

View file

@ -0,0 +1,10 @@
"""
This package contains the pagelet functions served by the Matemat software.
A new pagelet function must be imported here to be automatically loaded when the server is started.
"""
from .main import main_page
from .login import login_page
from .logout import logout
from .touchkey import touchkey_page

View file

@ -0,0 +1,29 @@
from typing import Any, Dict, Union
from matemat.exceptions import AuthenticationError, HttpException
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.primitives import User
from matemat.db import MatematDatabase
@pagelet('/login')
def login_page(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Union[bytes, str, PageletResponse]:
if 'user' in session_vars:
return RedirectResponse('/')
if method == 'GET':
return TemplateResponse('login.html')
elif method == 'POST':
with MatematDatabase('test.db') as db:
try:
user: User = db.login(str(args.username), str(args.password))
except AuthenticationError:
return RedirectResponse('/login')
session_vars['user'] = user
return RedirectResponse('/')
raise HttpException(405)

View file

@ -0,0 +1,16 @@
from typing import Any, Dict, Union
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse
@pagelet('/logout')
def logout(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Union[bytes, str, PageletResponse]:
if 'user' in session_vars:
del session_vars['user']
return RedirectResponse('/')

View file

@ -0,0 +1,23 @@
from typing import Any, Dict, Union
from matemat.webserver import pagelet, RequestArguments, PageletResponse, TemplateResponse
from matemat.primitives import User
from matemat.db import MatematDatabase
@pagelet('/')
def main_page(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Union[bytes, str, PageletResponse]:
with MatematDatabase('test.db') as db:
if 'user' in session_vars:
user: User = session_vars['user']
products = db.list_products()
return TemplateResponse('main.html', user=user, list=products)
else:
users = db.list_users()
return TemplateResponse('main.html', list=users)

View file

@ -0,0 +1,32 @@
from typing import Any, Dict, Tuple, Union
import urllib.parse
from matemat.exceptions import AuthenticationError, HttpException
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.primitives import User
from matemat.db import MatematDatabase
@pagelet('/touchkey')
def touchkey_page(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Union[bytes, str, PageletResponse]:
if 'user' in session_vars:
return RedirectResponse('/')
if method == 'GET':
return TemplateResponse('touchkey.html', username=str(args.username) if 'username' in args else None)
elif method == 'POST':
with MatematDatabase('test.db') as db:
try:
user: User = db.login(str(args.username), touchkey=str(args.touchkey))
except AuthenticationError:
quoted = urllib.parse.quote_plus(bytes(args.username))
return RedirectResponse(f'/touchkey?username={quoted}')
session_vars['user'] = user
return RedirectResponse('/')
raise HttpException(405)

View file

@ -0,0 +1,324 @@
from typing import Dict, Iterator, List, Tuple, Union
class RequestArguments(object):
"""
Container for HTTP Request arguments.
Usage:
# Create empty instance
ra = RequestArguments()
# Add an entry for the key 'foo' with the value 'bar' and Content-Type 'text/plain'
ra['foo'].append('text/plain', 'bar')
# Retrieve the value for the key 'foo', as a string...
foo = str(ra.foo)
# ... or as raw bytes
foo = bytes(ra.foo)
"""
def __init__(self) -> None:
"""
Create an empty container instance.
"""
self.__container: Dict[str, RequestArgument] = dict()
def __getitem__(self, key: str) -> 'RequestArgument':
"""
Retrieve the argument for the given name, creating it on the fly, if it doesn't exist.
:param key: Name of the argument to retrieve.
:return: A RequestArgument instance.
:raises TypeError: If key is not a string.
"""
if not isinstance(key, str):
raise TypeError('key must be a str')
# Create empty argument, if it doesn't exist
if key not in self.__container:
self.__container[key] = RequestArgument(key)
# Return the argument for the name
return self.__container[key]
def __getattr__(self, key: str) -> 'RequestArgument':
"""
Syntactic sugar for accessing values with a name that can be used in Python attributes. The value will be
returned as an immutable view.
:param key: Name of the argument to retrieve.
:return: An immutable view of the RequestArgument instance.
"""
return _View.of(self.__container[key])
def __iter__(self) -> Iterator['RequestArgument']:
"""
Returns an iterator over the values in this instance. Values are represented as immutable views.
:return: An iterator that yields immutable views of the values.
"""
for ra in self.__container.values():
# Yield an immutable scalar view for each value
yield _View.of(ra)
def __contains__(self, key: str) -> bool:
"""
Checks whether an argument with a given name exists in the RequestArguments instance.
:param key: The name to check whether it exists.
:return: True, if present, False otherwise.
"""
return key in self.__container
def __len__(self) -> int:
"""
:return: The number of arguments in this instance.
"""
return len(self.__container)
class RequestArgument(object):
"""
Container class for HTTP request arguments that simplifies dealing with
- scalar and array arguments:
Automatically converts between single values and arrays where necessary: Arrays with one element can be
accessed as scalars, and scalars can be iterated, yielding themselves as a single item.
- UTF-8 strings and binary data (e.g. file uploads):
All data can be retrieved both as a str (if utf-8 decoding is possible) and a bytes object.
The objects returned from iteration or indexing are immutable views of (parts of) this object.
Usage example:
qsargs = urllib.parse.parse_qs(qs, strict_parsing=True, keep_blank_values=True, errors='strict')
args: RequestArguments
for k, vs in qsargs:
args[k].clear()
for v in vs:
# text/plain usually is a sensible choice for values decoded from urlencoded strings
# IF ALREADY IN STRING FORM (which parse_qs does)!
args[k].append('text/plain', v)
if 'username' in args and args.username.is_scalar:
username = str(args.username)
"""
def __init__(self,
name: str,
value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]] = None) -> None:
"""
Create a new RequestArgument with a name and optionally an initial value.
:param name: The name for this argument, as provided via GET or POST.
:param value: The initial value, if any. Optional, initializes with empty array if omitted.
"""
# Assign name
self.__name: str = name
# Initialize value
self.__value: List[Tuple[str, Union[bytes, str]]] = []
# Default to empty array
if value is None:
self.__value = []
else:
if isinstance(value, list):
# Store the array
self.__value = value
else:
# Turn scalar into an array before storing
self.__value = [value]
@property
def is_array(self) -> bool:
"""
:return: True, if the value is a (possibly empty) array, False otherwise.
"""
return len(self.__value) != 1
@property
def is_scalar(self) -> bool:
"""
:return: True, if the value is a single scalar value, False otherwise.
"""
return len(self.__value) == 1
@property
def is_view(self) -> bool:
"""
:return: True, if this instance is an immutable view, False otherwise.
"""
return False
@property
def name(self) -> str:
"""
:return: The name of this argument.
"""
return self.__name
def get_str(self, index: int = 0) -> str:
"""
Attempts to return a value as a string. The index defaults to 0.
:param index: The index of the value to retrieve. Default: 0.
:return: An UTF-8 string representation of the requested value.
:raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string.
:raises IndexError: If the index is out of bounds.
:raises TypeError: If the index is not an int.
:raises TypeError: If the requested value is neither a str nor a bytes object.
"""
if not isinstance(index, int):
# Index must be an int
raise TypeError('index must be an int')
# Type hint; access array element
v: Tuple[str, Union[bytes, str]] = self.__value[index]
if isinstance(v[1], str):
# The value already is a string, return
return v[1]
elif isinstance(v[1], bytes):
# The value is a bytes object, attempt to decode
return v[1].decode('utf-8')
raise TypeError('Value is neither a str nor bytes')
def __str__(self) -> str:
"""
Attempts to return the first value as a string.
:return: An UTF-8 string representation of the first value.
:raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string.
"""
return self.get_str()
def get_bytes(self, index: int = 0) -> bytes:
"""
Attempts to return a value as a bytes object. The index defaults to 0.
:param index: The index of the value to retrieve. Default: 0.
:return: A bytes object representation of the requested value. Strings will be encoded as UTF-8.
:raises IndexError: If the index is out of bounds.
:raises TypeError: If the index is not an int.
:raises TypeError: If the requested value is neither a str nor a bytes object.
"""
if not isinstance(index, int):
# Index must be a int
raise TypeError('index must be an int')
# Type hint; access array element
v: Tuple[str, Union[bytes, str]] = self.__value[index]
if isinstance(v[1], bytes):
# The value already is a bytes object, return
return v[1]
elif isinstance(v[1], str):
# The value is a string, encode first
return v[1].encode('utf-8')
raise TypeError('Value is neither a str nor bytes')
def __bytes__(self) -> bytes:
"""
Attempts to return the first value as a bytes object.
:return: A bytes string representation of the first value.
"""
return self.get_bytes()
def get_content_type(self, index: int = 0) -> str:
"""
Attempts to retrieve a value's Content-Type. The index defaults to 0.
:param index: The index of the value to retrieve. Default: 0.
:return: The Content-Type of the requested value, as sent by the client. Not necessarily trustworthy.
:raises IndexError: If the index is out of bounds.
:raises TypeError: If the index is not an int.
"""
# instance is an array value
if not isinstance(index, int):
# Needs an index for array values
raise TypeError('index must be an int')
# Type hint; access array element
va: Tuple[str, Union[bytes, str]] = self.__value[index]
# Return the content type of the requested value
if not isinstance(va[0], str):
raise TypeError('Content-Type is not a str')
return va[0]
def append(self, ctype: str, value: Union[str, bytes]) -> None:
"""
Append a value to this instance. Turns an empty argument into a scalar and a scalar into an array.
:param ctype: The Content-Type, as provided in the request.
:param value: The scalar value to append, either a string or bytes object.
:raises TypeError: If called on an immutable view.
"""
if self.is_view:
# This is an immutable view, raise exception
raise TypeError('A RequestArgument view is immutable!')
self.__value.append((ctype, value))
def clear(self) -> None:
"""
Remove all values from this instance.
:raises TypeError: If called on an immutable view.
"""
if self.is_view:
# This is an immutable view, raise exception
raise TypeError('A RequestArgument view is immutable!')
self.__value.clear()
def __len__(self) -> int:
"""
:return: Number of values for this argument.
"""
return len(self.__value)
def __iter__(self) -> Iterator['RequestArgument']:
"""
Iterate the values of this argument. Each value is accessible as if it were a scalar RequestArgument in turn,
although they are immutable.
:return: An iterator that yields immutable views of the values.
"""
for v in self.__value:
# Yield an immutable scalar view for each (ctype, value) element in the array
yield _View(self.__name, v)
def __getitem__(self, index: Union[int, slice]) -> 'RequestArgument':
"""
Index the argument with either an int or a slice. The returned values are represented as immutable
RequestArgument views.
:param index: The index or slice.
:return: An immutable view of the indexed elements of this argument.
"""
# Pass the index or slice through to the array, packing the result in an immutable view
return _View(self.__name, self.__value[index])
class _View(RequestArgument):
"""
This class represents an immutable view of a (subset of a) RequestArgument object. Should not be instantiated
directly.
"""
def __init__(self, name: str, value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]])\
-> None:
"""
Create a new immutable view of a (subset of a) RequestArgument.
:param name: The name for this argument, same as in the original RequestArgument.
:param value: The values to represent in this view, obtained by e.g. indexing or slicing.
"""
super().__init__(name, value)
@staticmethod
def of(argument: 'RequestArgument') ->'RequestArgument':
"""
Create an immutable, unsliced view of an RequestArgument instance.
:param argument: The RequestArgument instance to create a view of.
:return: An immutable view of the provided RequestArgument instance.
"""
return argument[:]
@property
def is_view(self) -> bool:
"""
:return: True, if this instance is an immutable view, False otherwise.
"""
return True

View file

@ -0,0 +1,65 @@
from jinja2 import Environment, Template
from matemat import __version__
class PageletResponse:
"""
Base class for pagelet return values that require more action than simply sending plain data.
An instance of this base class will result in an empty 200 OK response.
"""
def __init__(self, status: int = 200):
"""
Create an empty response.
:param status: The HTTP status code, defaults to 200 (OK).
"""
self.status: int = status
class RedirectResponse(PageletResponse):
"""
A pagelet response that causes the server to redirect to another location, using a 301 Permanently Moved (uncached)
response status, and a Location header.
"""
def __init__(self, location: str):
"""
Create a redirection response with the given redirection location.
:param location: The location to redirect to.
"""
super().__init__(status=301)
self.location: str = location
class TemplateResponse(PageletResponse):
"""
A pagelet response that causes the server to load a Jinja2 template and render it with the provided arguments, then
sending the result as response body, with a 200 OK response status.
"""
def __init__(self, name: str, **kwargs):
"""
Create a template response with the given template name and arguments.
:param name: Name of the template to load.
:param kwargs: Arguments for rendering the template, will be passed to jinja2.Template.render as is.
"""
super().__init__()
self.name: str = name
self.kwargs = kwargs
def _render(self, jinja_env: Environment) -> bytes:
"""
Load and render the template using the Jinja2 environment managed by the web server instance. This method
should not be called by a pagelet.
:param jinja_env: The Jinja2 environment.
:return: An UTF-8 encoded bytes object containing the template rendering result.
"""
template: Template = jinja_env.get_template(self.name)
return template.render(**self.kwargs, __version__=__version__).encode('utf-8')

View file

View file

@ -0,0 +1,204 @@
from typing import Any, Callable, Dict, Tuple, Union
import unittest.mock
from io import BytesIO
from tempfile import TemporaryDirectory
from abc import ABC
from datetime import datetime
from http.server import HTTPServer
import jinja2
from matemat.webserver import pagelet, RequestArguments
class HttpResponse:
"""
A really basic HTTP response container and parser class, just good enough for unit testing a HTTP server, if even.
DO NOT USE THIS OUTSIDE UNIT TESTING!
Usage:
response = HttpResponse()
while response.parse_phase != 'done'
response.parse(<read from somewhere>)
print(response.statuscode)
"""
def __init__(self) -> None:
# The HTTP status code of the response
self.statuscode: int = 0
# HTTP headers set in the response
self.headers: Dict[str, str] = {
'Content-Length': 0
}
self.pagelet: str = None
# The response body
self.body: bytes = bytes()
# Parsing phase, one of 'begin', 'hdr', 'body' or 'done'
self.parse_phase = 'begin'
# Buffer for uncompleted lines
self.buffer: bytes = bytes()
def __finalize(self):
self.parse_phase = 'done'
self.pagelet = self.headers.get('X-Test-Pagelet', None)
def parse(self, fragment: bytes) -> None:
"""
Parse a new fragment of data. This function does nothing if the parsed HTTP response is already complete.
DO NOT USE THIS OUTSIDE UNIT TESTING!
:param fragment: The data fragment to parse.
"""
# response packet complete, nothing to do
if self.parse_phase == 'done':
return
# If in the body phase, simply decode and append to the body, while the body is not complete yet
elif self.parse_phase == 'body':
self.body += fragment
if len(self.body) >= int(self.headers['Content-Length']):
self.__finalize()
return
if b'\r\n' not in fragment:
# If the fragment does not contain a CR-LF, add it to the buffer, we only want to parse whole lines
self.buffer = self.buffer + fragment
else:
if not fragment.endswith(b'\r\n'):
# Special treatment for no trailing CR-LF: Add remainder to buffer
head, tail = fragment.rsplit(b'\r\n', 1)
data: bytes = (self.buffer + head)
self.buffer = tail
else:
data: bytes = (self.buffer + fragment)
self.buffer = bytes()
# Iterate the lines that are ready to be parsed
for line in data.split(b'\r\n'):
# The 'begin' phase indicates that the parser is waiting for the HTTP status line
if self.parse_phase == 'begin':
if line.startswith(b'HTTP/'):
# Parse the statuscode and advance to header parsing
_, statuscode, _ = line.decode('utf-8').split(' ', 2)
self.statuscode = int(statuscode)
self.parse_phase = 'hdr'
elif self.parse_phase == 'hdr':
# Parse a header line and add it to the header dict
if len(line) > 0:
k, v = line.decode('utf-8').split(':', 1)
self.headers[k.strip()] = v.strip()
else:
# Empty line separates header from body
self.parse_phase = 'body'
elif self.parse_phase == 'body':
# if there is a remainder in the data packet, it is (part of) the body, add to body string
self.body += line
if len(self.body) >= int(self.headers['Content-Length']):
self.__finalize()
class MockServer:
"""
A mock implementation of http.server.HTTPServer. Only used for matemat-specific storage.
"""
def __init__(self, webroot: str = '/var/matemat/webroot') -> None:
# Session timeout and variables for all sessions
self.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict()
# Webroot for statically served content
self.webroot: str = webroot
# Jinja environment with a single, static template
self.jinja_env = jinja2.Environment(
loader=jinja2.DictLoader({'test.txt': 'Hello, {{ what }}!'})
)
class MockSocket(bytes):
"""
A mock implementation of a socket.socket for http.server.BaseHTTPRequestHandler.
The bytes inheritance is due to a broken type annotation in BaseHTTPRequestHandler.
"""
def __init__(self) -> None:
super().__init__()
# The request string
self.__request = bytes()
# The parsed response
self.__packet = HttpResponse()
def set_request(self, request: bytes) -> None:
"""
Sets the HTTP request to send to the server.
:param request: The request
"""
self.__request: bytes = request
def makefile(self, mode: str, size: int) -> BytesIO:
"""
Required by http.server.HTTPServer.
:return: A dummy buffer IO object instead of a network socket file handle.
"""
return BytesIO(self.__request)
def sendall(self, b: bytes) -> None:
"""
Required by http.server.HTTPServer.
:param b: The data to send to the client. Will be parsed directly instead.
"""
self.__packet.parse(b)
def get_response(self) -> HttpResponse:
"""
Fetches the parsed HTTP response generated by the server.
:return: The response object.
"""
return self.__packet
def test_pagelet(path: str):
def with_testing_headers(fun: Callable[[str,
str,
RequestArguments,
Dict[str, Any],
Dict[str, str]],
Union[bytes, str, Tuple[int, str]]]):
@pagelet(path)
def testing_wrapper(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]):
headers['X-Test-Pagelet'] = fun.__name__
result = fun(method, path, args, session_vars, headers)
return result
return testing_wrapper
return with_testing_headers
class AbstractHttpdTest(ABC, unittest.TestCase):
"""
An abstract test case that can be inherited by test case classes that want to test part of the webserver's core
functionality.
Usage (subclass test method):
self.client_sock.set_request(b'GET /just/testing/sessions HTTP/1.1\r\n\r\n')
handler = HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
"""
def setUp(self) -> None:
self.tempdir: TemporaryDirectory = TemporaryDirectory(prefix='matemat.', dir='/tmp/')
self.server: HTTPServer = MockServer(webroot=self.tempdir.name)
self.client_sock: MockSocket = MockSocket()
def tearDown(self):
self.tempdir.cleanup()

View file

@ -0,0 +1,411 @@
import unittest
from matemat.webserver.util import parse_args
class TestParseRequest(unittest.TestCase):
def test_parse_get_root(self):
"""
Test that the simple root path is parsed correctly ('/' path, no args).
"""
path, args = parse_args('/')
self.assertEqual('/', path)
self.assertEqual(0, len(args))
def test_parse_get_no_args(self):
"""
Test that a GET request without arguments is parsed correctly.
"""
path, args = parse_args('/index.html')
self.assertEqual('/index.html', path)
self.assertEqual(0, len(args))
def test_parse_get_root_getargs(self):
"""
Test that a GET request for '/' with scalar arguments is parsed correctly.
"""
path, args = parse_args('/?foo=42&bar=1337&baz=Hello,%20World!')
self.assertEqual('/', path)
self.assertEqual(3, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual('text/plain', args['foo'].get_content_type())
self.assertEqual('text/plain', args['bar'].get_content_type())
self.assertEqual('text/plain', args['baz'].get_content_type())
self.assertEqual('42', args['foo'].get_str())
self.assertEqual('1337', args['bar'].get_str())
self.assertEqual('Hello, World!', args['baz'].get_str())
def test_parse_get_getargs(self):
"""
Test that a GET request for an arbitrary path with scalar arguments is parsed correctly.
"""
path, args = parse_args('/abc/def?foo=42&bar=1337&baz=Hello,%20World!')
self.assertEqual('/abc/def', path)
self.assertEqual(3, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual('text/plain', args['foo'].get_content_type())
self.assertEqual('text/plain', args['bar'].get_content_type())
self.assertEqual('text/plain', args['baz'].get_content_type())
self.assertEqual('42', args['foo'].get_str())
self.assertEqual('1337', args['bar'].get_str())
self.assertEqual('Hello, World!', args['baz'].get_str())
def test_parse_get_getarray(self):
"""
Test that a GET request with mixed scalar and array arguments is parsed correctly.
"""
path, args = parse_args('/abc/def?foo=42&foo=1337&baz=Hello,%20World!')
self.assertEqual('/abc/def', path)
self.assertEqual(2, len(args))
self.assertIn('foo', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_array)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual(2, len(args['foo']))
self.assertEqual('42', args['foo'].get_str(0))
self.assertEqual('1337', args['foo'].get_str(1))
def test_parse_get_zero_arg(self):
"""
Test that a GET request with an empty argument is parsed correctly.
"""
path, args = parse_args('/abc/def?foo=&bar=42')
self.assertEqual(2, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertEqual(1, len(args['foo']))
self.assertEqual('', args['foo'].get_str())
self.assertEqual('42', args['bar'].get_str())
def test_parse_get_urlencoded_encoding_fail(self):
"""
Test that a GET request with non-decodable escape sequences fails.
"""
with self.assertRaises(ValueError):
parse_args('/?foo=42&bar=%80&baz=Hello,%20World!')
def test_parse_post_urlencoded(self):
"""
Test that a urlencoded POST request with scalar arguments is parsed correctly.
"""
path, args = parse_args('/',
postbody=b'foo=42&bar=1337&baz=Hello,%20World!',
enctype='application/x-www-form-urlencoded')
self.assertEqual('/', path)
self.assertEqual(3, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual('text/plain', args['foo'].get_content_type())
self.assertEqual('text/plain', args['bar'].get_content_type())
self.assertEqual('text/plain', args['baz'].get_content_type())
self.assertEqual('42', args['foo'].get_str())
self.assertEqual('1337', args['bar'].get_str())
self.assertEqual('Hello, World!', args['baz'].get_str())
def test_parse_post_urlencoded_array(self):
"""
Test that a urlencoded POST request with mixed scalar and array arguments is parsed correctly.
"""
path, args = parse_args('/',
postbody=b'foo=42&foo=1337&baz=Hello,%20World!',
enctype='application/x-www-form-urlencoded')
self.assertEqual('/', path)
self.assertEqual(2, len(args))
self.assertIn('foo', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_array)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual(2, len(args['foo']))
self.assertEqual('42', args['foo'].get_str(0))
self.assertEqual('1337', args['foo'].get_str(1))
def test_parse_post_urlencoded_zero_arg(self):
"""
Test that a urlencoded POST request with an empty argument is parsed correctly.
"""
path, args = parse_args('/abc/def', postbody=b'foo=&bar=42', enctype='application/x-www-form-urlencoded')
self.assertEqual(2, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertEqual(1, len(args['foo']))
self.assertEqual('', args['foo'].get_str())
self.assertEqual('42', args['bar'].get_str())
def test_parse_post_urlencoded_encoding_fail(self):
"""
Test that a urlencoded POST request with a non-decodable escape sequence fails.
"""
with self.assertRaises(ValueError):
parse_args('/',
postbody=b'foo=42&bar=%80&baz=Hello,%20World!',
enctype='application/x-www-form-urlencoded')
def test_parse_post_multipart_no_args(self):
"""
Test that a multipart POST request with no arguments is parsed correctly.
"""
path, args = parse_args('/',
postbody=b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
self.assertEqual('/', path)
self.assertEqual(0, len(args))
def test_parse_post_multipart(self):
"""
Test that a multipart POST request with scalar arguments is parsed correctly.
"""
path, args = parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
self.assertEqual('/', path)
self.assertEqual(3, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual('text/plain', args['foo'].get_content_type())
self.assertEqual('application/octet-stream', args['bar'].get_content_type())
self.assertEqual('text/plain', args['baz'].get_content_type())
self.assertEqual('42', args['foo'].get_str())
self.assertEqual(b'1337', args['bar'].get_bytes())
self.assertEqual('Hello, World!', args['baz'].get_str())
def test_parse_post_multipart_zero_arg(self):
"""
Test that a multipart POST request with an empty argument is parsed correctly.
"""
path, args = parse_args('/abc/def',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
self.assertEqual(2, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertEqual(1, len(args['foo']))
self.assertEqual('', args['foo'].get_str())
self.assertEqual('42', args['bar'].get_str())
def test_parse_post_multipart_no_contenttype(self):
"""
Test that the Content-Type is set to 'application/octet-stream' if it is absent from the multipart header.
"""
path, args = parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
self.assertEqual('/', path)
self.assertEqual(3, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual('application/octet-stream', args['baz'].get_content_type())
self.assertEqual('Hello, World!', args['baz'].get_str())
def test_parse_post_multipart_broken_boundaries(self):
"""
Test that multiple cases with broken multipart boundaries fail.
"""
with self.assertRaises(ValueError):
# Boundary not defined in Content-Type
parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data')
with self.assertRaises(ValueError):
# Corrupted "--" head at first boundary
parse_args('/',
postbody=b'-+testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
with self.assertRaises(ValueError):
# Missing "--" tail at end boundary
parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
with self.assertRaises(ValueError):
# Missing Content-Disposition header in one part
parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
with self.assertRaises(ValueError):
# Missing form-data name argument
parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
with self.assertRaises(ValueError):
# Unknown Content-Disposition
parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: attachment; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
def test_get_post_precedence_urlencoded(self):
"""
Test the precedence of urlencoded POST arguments over GET arguments.
"""
path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived',
postbody=b'foo=42&foo=1337&baz=Hello,%20World!',
enctype='application/x-www-form-urlencoded')
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertEqual(2, len(args['foo']))
self.assertEqual(1, len(args['bar']))
self.assertEqual(1, len(args['baz']))
self.assertEqual('42', args['foo'].get_str(0))
self.assertEqual('1337', args['foo'].get_str(1))
self.assertEqual('isurvived', args['bar'].get_str())
self.assertEqual('Hello, World!', args['baz'].get_str())
def test_get_post_precedence_multipart(self):
"""
Test the precedence of multipart POST arguments over GET arguments.
"""
path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"; filename="bar.bin"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertEqual(2, len(args['foo']))
self.assertEqual(1, len(args['bar']))
self.assertEqual(1, len(args['baz']))
self.assertEqual('42', args['foo'].get_str(0))
self.assertEqual('1337', args['foo'].get_str(1))
self.assertEqual('isurvived', args['bar'].get_str())
self.assertEqual('Hello, World!', args['baz'].get_str())

View file

@ -0,0 +1,224 @@
from typing import Any, Dict, List
from matemat.webserver import HttpHandler, RequestArguments
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
import codecs
@test_pagelet('/just/testing/post')
def post_test_pagelet(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]):
"""
Test pagelet that simply prints the parsed arguments as response body.
"""
headers['Content-Type'] = 'text/plain'
dump: str = ''
for ra in args:
for a in ra:
if a.get_content_type().startswith('text/'):
dump += f'{a.name}: {a.get_str()}\n'
else:
dump += f'{a.name}: {codecs.encode(a.get_bytes(), "hex").decode("utf-8")}\n'
return dump
class TestPost(AbstractHttpdTest):
"""
Test cases for the content serving of the web server.
"""
def test_post_urlenc_get_only_args(self):
"""
Test a POST request that only contains GET arguments.
"""
# Send POST request
self.client_sock.set_request(b'POST /just/testing/post?foo=bar&test=1 HTTP/1.1\r\n'
b'Content-Length: 0\r\n'
b'Content-Type: application/x-www-form-urlencoded\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Parse response body
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
self.assertEqual('bar', kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_urlenc_post_only_args(self):
"""
Test a POST request that only contains POST arguments (urlencoded).
"""
# Send POST request
self.client_sock.set_request(b'POST /just/testing/post HTTP/1.1\r\n'
b'Content-Type: application/x-www-form-urlencoded\r\n'
b'Content-Length: 14\r\n\r\n'
b'foo=bar&test=1\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Parse response body
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
self.assertEqual('bar', kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_urlenc_mixed_args(self):
"""
Test that mixed POST and GET args are properly parsed, and that POST takes precedence over GET.
"""
# Send POST request
self.client_sock.set_request(b'POST /just/testing/post?gettest=1&foo=baz HTTP/1.1\r\n'
b'Content-Type: application/x-www-form-urlencoded\r\n'
b'Content-Length: 18\r\n\r\n'
b'foo=bar&posttest=2\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Parse response body
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
self.assertEqual('bar', kv['foo'])
self.assertEqual('1', kv['gettest'])
self.assertEqual('2', kv['posttest'])
def test_post_urlenc_get_array(self):
"""
Test a POST request that contains GET array arguments.
"""
# Send POST request
self.client_sock.set_request(b'POST /just/testing/post?foo=bar&test=1&foo=baz HTTP/1.1\r\n'
b'Content-Length: 0\r\n'
b'Content-Type: application/x-www-form-urlencoded\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Parse response body
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.decode('utf-8').split(':', 1)
k = k.strip()
v = v.strip()
if k in kv:
kv[k] += f',{v}'
else:
kv[k] = v
# Make sure the arguments were properly parsed
self.assertEqual('bar,baz', kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_urlenc_post_array(self):
"""
Test a POST request that contains POST array arguments.
"""
# Send POST request
self.client_sock.set_request(b'POST /just/testing/post HTTP/1.1\r\n'
b'Content-Length: 22\r\n'
b'Content-Type: application/x-www-form-urlencoded\r\n\r\n'
b'foo=bar&test=1&foo=baz\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Parse response body
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.decode('utf-8').split(':', 1)
k = k.strip()
v = v.strip()
if k in kv:
kv[k] += f',{v}'
else:
kv[k] = v
# Make sure the arguments were properly parsed
self.assertEqual('bar,baz', kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_urlenc_mixed_array(self):
"""
Test a POST request that contains both GET and POST array arguments.
"""
# Send POST request
self.client_sock.set_request(b'POST /just/testing/post?foo=getbar&gettest=1&gettest=42&foo=getbaz HTTP/1.1\r\n'
b'Content-Length: 45\r\n'
b'Content-Type: application/x-www-form-urlencoded\r\n\r\n'
b'foo=postbar&posttest=1&posttest=2&foo=postbaz\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Parse response body
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.decode('utf-8').split(':', 1)
k = k.strip()
v = v.strip()
if k in kv:
kv[k] += f',{v}'
else:
kv[k] = v
# Make sure the arguments were properly parsed
self.assertEqual('postbar,postbaz', kv['foo'])
self.assertEqual('1,42', kv['gettest'])
self.assertEqual('1,2', kv['posttest'])
def test_post_no_body(self):
"""
Test a POST request that contains no headers or body.
"""
# Send POST request
self.client_sock.set_request(b'POST /just/testing/post?foo=bar HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure a 400 Bad Request is returned
self.assertEqual(400, packet.statuscode)
def test_post_multipart_post_only(self):
"""
Test a POST request with a miutipart/form-data body.
"""
# Send POST request
formdata = (b'------testboundary\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'------testboundary\r\n'
b'Content-Disposition: form-data; name="bar"; filename="foo.bar"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x80\x0b\x0c\x73\x0e\x0f\r\n'
b'------testboundary--\r\n')
self.client_sock.set_request(f'POST /just/testing/post HTTP/1.1\r\n'
f'Content-Type: multipart/form-data; boundary=----testboundary\r\n'
f'Content-Length: {len(formdata)}\r\n\r\n'.encode('utf-8') + formdata)
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, Any] = dict()
for l in lines:
k, v = l.split(b':', 1)
kv[k.decode('utf-8').strip()] = v.strip()
self.assertIn('foo', kv)
self.assertIn('bar', kv)
self.assertEqual(kv['foo'], b'Hello, World!')
self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f')

View file

@ -0,0 +1,529 @@
from typing import Dict, List, Set, Tuple
import unittest
import urllib.parse
from matemat.webserver import RequestArgument, RequestArguments
# noinspection PyProtectedMember
from matemat.webserver.requestargs import _View
class TestRequestArguments(unittest.TestCase):
"""
Test cases for the RequestArgument class.
"""
def test_create_default(self):
"""
Test creation of an empty RequestArgument
"""
ra = RequestArgument('foo')
# Name must be set to 1st argument
self.assertEqual('foo', ra.name)
# Must be a 0-length array
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
self.assertTrue(ra.is_array)
# Must not be a view
self.assertFalse(ra.is_view)
def test_create_str_scalar(self):
"""
Test creation of a scalar RequestArgument with string value.
"""
ra = RequestArgument('foo', ('text/plain', 'bar'))
# Name must be set to 1st argument
self.assertEqual('foo', ra.name)
# Must be a scalar, length 1
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
self.assertFalse(ra.is_array)
# Scalar value must be representable both as str and bytes
self.assertEqual('bar', ra.get_str())
self.assertEqual(b'bar', ra.get_bytes())
# Content-Type must be set correctly
self.assertEqual('text/plain', ra.get_content_type())
# Using 0 indices must yield the same results
self.assertEqual('bar', ra.get_str(0))
self.assertEqual(b'bar', ra.get_bytes(0))
self.assertEqual('text/plain', ra.get_content_type(0))
# Using other indices must result in an error
with self.assertRaises(IndexError):
ra.get_str(1)
with self.assertRaises(IndexError):
ra.get_bytes(1)
with self.assertRaises(IndexError):
ra.get_content_type(1)
# Must not be a view
self.assertFalse(ra.is_view)
def test_create_str_scalar_array(self):
"""
Test creation of a scalar RequestArgument with string value, passing an array instead of a single tuple.
"""
ra = RequestArgument('foo', [('text/plain', 'bar')])
# Name must be set to 1st argument
self.assertEqual('foo', ra.name)
# Must be a scalar, length 1
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
self.assertFalse(ra.is_array)
# Scalar value must be representable both as str and bytes
self.assertEqual('bar', ra.get_str())
self.assertEqual(b'bar', ra.get_bytes())
# Content-Type must be set correctly
self.assertEqual('text/plain', ra.get_content_type())
# Using 0 indices must yield the same results
self.assertEqual('bar', ra.get_str(0))
self.assertEqual(b'bar', ra.get_bytes(0))
self.assertEqual('text/plain', ra.get_content_type(0))
# Using other indices must result in an error
with self.assertRaises(IndexError):
ra.get_str(1)
with self.assertRaises(IndexError):
ra.get_bytes(1)
with self.assertRaises(IndexError):
ra.get_content_type(1)
# Must not be a view
self.assertFalse(ra.is_view)
def test_create_bytes_scalar(self):
"""
Test creation of a scalar RequestArgument with bytes value.
"""
ra = RequestArgument('foo', ('application/octet-stream', b'\x00\x80\xff\xfe'))
# Name must be set to 1st argument
self.assertEqual('foo', ra.name)
# Must be a scalar, length 1
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
self.assertFalse(ra.is_array)
# Conversion to UTF-8 string must fail; bytes representation must work
with self.assertRaises(UnicodeDecodeError):
ra.get_str()
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes())
# Content-Type must be set correctly
self.assertEqual('application/octet-stream', ra.get_content_type())
# Using 0 indices must yield the same results
with self.assertRaises(UnicodeDecodeError):
ra.get_str(0)
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(0))
self.assertEqual('application/octet-stream', ra.get_content_type(0))
# Using other indices must result in an error
with self.assertRaises(IndexError):
ra.get_str(1)
with self.assertRaises(IndexError):
ra.get_bytes(1)
with self.assertRaises(IndexError):
ra.get_content_type(1)
# Must not be a view
self.assertFalse(ra.is_view)
def test_create_array(self):
"""
Test creation of an array RequestArgument with mixed str and bytes initial value.
"""
ra = RequestArgument('foo', [
('text/plain', 'bar'),
('application/octet-stream', b'\x00\x80\xff\xfe')
])
# Name must be set to 1st argument
self.assertEqual('foo', ra.name)
# Must be an array, length 2
self.assertEqual(2, len(ra))
self.assertFalse(ra.is_scalar)
self.assertTrue(ra.is_array)
# Retrieving values without an index must yield the first element
self.assertEqual('bar', ra.get_str())
self.assertEqual(b'bar', ra.get_bytes())
self.assertEqual('text/plain', ra.get_content_type())
# The first value must be representable both as str and bytes, and have ctype text/plain
self.assertEqual('bar', ra.get_str(0))
self.assertEqual(b'bar', ra.get_bytes(0))
self.assertEqual('text/plain', ra.get_content_type(0))
# Conversion of the second value to UTF-8 string must fail; bytes representation must work
with self.assertRaises(UnicodeDecodeError):
ra.get_str(1)
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1))
# The second value's ctype must be correct
self.assertEqual('application/octet-stream', ra.get_content_type(1))
# Must not be a view
self.assertFalse(ra.is_view)
def test_append_empty_str(self):
"""
Test appending a str value to an empty RequestArgument.
"""
# Initialize the empty RequestArgument
ra = RequestArgument('foo')
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
# Append a string value
ra.append('text/plain', 'bar')
# New length must be 1, empty array must be converted to scalar
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
# Retrieval of the new value must work both in str and bytes representation
self.assertEqual('bar', ra.get_str())
self.assertEqual(b'bar', ra.get_bytes())
# Content type of the new value must be correct
self.assertEqual('text/plain', ra.get_content_type())
# Must not be a view
self.assertFalse(ra.is_view)
def test_append_empty_bytes(self):
"""
Test appending a bytes value to an empty RequestArgument.
"""
# Initialize the empty RequestArgument
ra = RequestArgument('foo')
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
# Append a bytes value
ra.append('application/octet-stream', b'\x00\x80\xff\xfe')
# New length must be 1, empty array must be converted to scalar
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
# Conversion of the new value to UTF-8 string must fail; bytes representation must work
with self.assertRaises(UnicodeDecodeError):
ra.get_str()
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes())
# Content type of the new value must be correct
self.assertEqual('application/octet-stream', ra.get_content_type())
# Must not be a view
self.assertFalse(ra.is_view)
def test_append_multiple(self):
"""
Test appending multiple values to an empty RequestArgument.
"""
# Initialize the empty RequestArgument
ra = RequestArgument('foo')
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
# Append a first value
ra.append('text/plain', 'bar')
# New length must be 1, empty array must be converted to scalar
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
self.assertEqual(b'bar', ra.get_bytes())
# Append a second value
ra.append('application/octet-stream', b'\x00\x80\xff\xfe')
# New length must be 2, scalar must be converted to array
self.assertEqual(2, len(ra))
self.assertFalse(ra.is_scalar)
self.assertEqual(b'bar', ra.get_bytes(0))
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1))
# Append a third value
ra.append('text/plain', 'Hello, World!')
# New length must be 3, array must remain array
self.assertEqual(3, len(ra))
self.assertFalse(ra.is_scalar)
self.assertEqual(b'bar', ra.get_bytes(0))
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1))
self.assertEqual(b'Hello, World!', ra.get_bytes(2))
def test_clear_empty(self):
"""
Test clearing an empty RequestArgument.
"""
# Initialize the empty RequestArgument
ra = RequestArgument('foo')
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
ra.clear()
# Clearing an empty RequestArgument shouldn't have any effect
self.assertEqual('foo', ra.name)
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
def test_clear_scalar(self):
"""
Test clearing a scalar RequestArgument.
"""
# Initialize the scalar RequestArgument
ra = RequestArgument('foo', ('text/plain', 'bar'))
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
ra.clear()
# Clearing a scalar RequestArgument should reduce its size to 0
self.assertEqual('foo', ra.name)
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
with self.assertRaises(IndexError):
ra.get_str()
def test_clear_array(self):
"""
Test clearing an array RequestArgument.
"""
# Initialize the array RequestArgument
ra = RequestArgument('foo', [
('text/plain', 'bar'),
('application/octet-stream', b'\x00\x80\xff\xfe'),
('text/plain', 'baz'),
])
self.assertEqual(3, len(ra))
self.assertFalse(ra.is_scalar)
ra.clear()
# Clearing an array RequestArgument should reduce its size to 0
self.assertEqual('foo', ra.name)
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
with self.assertRaises(IndexError):
ra.get_str()
def test_iterate_empty(self):
"""
Test iterating an empty RequestArgument.
"""
ra = RequestArgument('foo')
self.assertEqual(0, len(ra))
# No value must be yielded from iterating an empty instance
for _ in ra:
self.fail()
def test_iterate_scalar(self):
"""
Test iterating a scalar RequestArgument.
"""
ra = RequestArgument('foo', ('text/plain', 'bar'))
self.assertTrue(ra.is_scalar)
# Counter for the number of iterations
count: int = 0
for it in ra:
# Make sure the yielded value is a scalar view and has the same name as the original instance
self.assertIsInstance(it, _View)
self.assertTrue(it.is_view)
self.assertEqual('foo', it.name)
self.assertTrue(it.is_scalar)
count += 1
# Only one value must be yielded from iterating a scalar instance
self.assertEqual(1, count)
def test_iterate_array(self):
"""
Test iterating an array RequestArgument.
"""
ra = RequestArgument('foo', [('text/plain', 'bar'), ('abc', b'def'), ('xyz', '1337')])
self.assertFalse(ra.is_scalar)
# Container to put the iterated ctypes into
items: List[str] = list()
for it in ra:
# Make sure the yielded values are scalar views and have the same name as the original instance
self.assertIsInstance(it, _View)
self.assertTrue(it.is_view)
self.assertTrue(it.is_scalar)
# Collect the value's ctype
items.append(it.get_content_type())
# Compare collected ctypes with expected result
self.assertEqual(['text/plain', 'abc', 'xyz'], items)
def test_slice(self):
"""
Test slicing an array RequestArgument.
"""
ra = RequestArgument('foo', [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h'), ('i', 'j'), ('k', 'l')])
# Create the sliced view
sliced = ra[1:4:2]
# Make sure the sliced value is a view
self.assertIsInstance(sliced, _View)
self.assertTrue(sliced.is_view)
# Make sure the slice has the same name
self.assertEqual('foo', sliced.name)
# Make sure the slice has the expected shape (array of the 2nd and 4th scalar in the original)
self.assertTrue(sliced.is_array)
self.assertEqual(2, len(sliced))
self.assertEqual('d', sliced.get_str(0))
self.assertEqual('h', sliced.get_str(1))
def test_iterate_sliced(self):
"""
Test iterating a sliced array RequestArgument.
"""
ra = RequestArgument('foo', [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h'), ('i', 'j'), ('k', 'l')])
# Container to put the iterated ctypes into
items: List[str] = list()
# Iterate the sliced view
for it in ra[1:4:2]:
# Make sure the yielded values are scalar views and have the same name as the original instance
self.assertIsInstance(it, _View)
self.assertTrue(it.is_view)
self.assertEqual('foo', it.name)
self.assertTrue(it.is_scalar)
items.append(it.get_content_type())
# Make sure the expected values are collected (array of the 2nd and 4th scalar in the original)
self.assertEqual(['c', 'g'], items)
def test_index_scalar(self):
"""
Test indexing of a scalar RequestArgument.
"""
ra = RequestArgument('foo', ('bar', 'baz'))
# Index the scalar RequestArgument instance, obtaining an immutable view
it = ra[0]
# Make sure the value is a scalar view with the same properties as the original instance
self.assertIsInstance(it, _View)
self.assertTrue(it.is_scalar)
self.assertEqual('foo', it.name)
self.assertEqual('bar', it.get_content_type())
self.assertEqual('baz', it.get_str())
# Make sure other indices don't work
with self.assertRaises(IndexError):
_ = ra[1]
def test_index_array(self):
"""
Test indexing of an array RequestArgument.
"""
ra = RequestArgument('foo', [('a', 'b'), ('c', 'd')])
# Index the array RequestArgument instance, obtaining an immutable view
it = ra[1]
# Make sure the value is a scalar view with the same properties as the value in the original instance
self.assertIsInstance(it, _View)
self.assertEqual('foo', it.name)
self.assertEqual('c', it.get_content_type())
self.assertEqual('d', it.get_str())
def test_view_immutable(self):
"""
Test immutability of views.
"""
ra = RequestArgument('foo', ('bar', 'baz'))
# Index the scalar RequestArgument instance, obtaining an immutable view
it = ra[0]
# Make sure the returned value is a view
self.assertIsInstance(it, _View)
# Make sure the returned value is immutable
with self.assertRaises(TypeError):
it.append('foo', 'bar')
with self.assertRaises(TypeError):
it.clear()
def test_str_shorthand(self):
"""
Test the shorthand for get_str(0).
"""
ra = RequestArgument('foo', ('bar', 'baz'))
self.assertEqual('baz', str(ra))
def test_bytes_shorthand(self):
"""
Test the shorthand for get_bytes(0).
"""
ra = RequestArgument('foo', ('bar', b'\x00\x80\xff\xfe'))
self.assertEqual(b'\x00\x80\xff\xfe', bytes(ra))
# noinspection PyTypeChecker
def test_insert_garbage(self):
"""
Test proper handling with non-int indices and non-str/non-bytes data
:return:
"""
ra = RequestArgument('foo', 42)
with self.assertRaises(TypeError):
str(ra)
ra = RequestArgument('foo', (None, 42))
with self.assertRaises(TypeError):
str(ra)
with self.assertRaises(TypeError):
bytes(ra)
with self.assertRaises(TypeError):
ra.get_content_type()
with self.assertRaises(TypeError):
ra.get_str('foo')
with self.assertRaises(TypeError):
ra.get_bytes('foo')
with self.assertRaises(TypeError):
ra.get_content_type('foo')
def test_requestarguments_index(self):
"""
Make sure indexing a RequestArguments instance creates a new entry on the fly.
"""
ra = RequestArguments()
self.assertEqual(0, len(ra))
self.assertFalse('foo' in ra)
# Create new entry
_ = ra['foo']
self.assertEqual(1, len(ra))
self.assertTrue('foo' in ra)
# Already exists, no new entry created
_ = ra['foo']
self.assertEqual(1, len(ra))
# Entry must be empty and mutable, and have the correct name
self.assertFalse(ra['foo'].is_view)
self.assertEqual(0, len(ra['foo']))
self.assertEqual('foo', ra['foo'].name)
# Key must be a string
with self.assertRaises(TypeError):
# noinspection PyTypeChecker
_ = ra[42]
def test_requestarguments_attr(self):
"""
Test attribute access syntactic sugar.
"""
ra = RequestArguments()
# Attribute should not exist yet
with self.assertRaises(KeyError):
_ = ra.foo
# Create entry
_ = ra['foo']
# Creating entry should have created the attribute
self.assertEqual('foo', ra.foo.name)
# Attribute access should yield an immutable view
self.assertTrue(ra.foo.is_view)
def test_requestarguments_iterate(self):
"""
Test iterating a RequestArguments instance.
"""
# Create an instance with some values
ra = RequestArguments()
ra['foo'].append('a', 'b')
ra['bar'].append('c', 'd')
ra['foo'].append('e', 'f')
# Container for test values (name, value)
items: Set[Tuple[str, str]] = set()
# Iterate RequestArguments instance, adding the name and value of each to the set
for a in ra:
items.add((a.name, str(a)))
# Compare result with expected value
self.assertEqual(2, len(items))
self.assertIn(('foo', 'b'), items)
self.assertIn(('bar', 'd'), items)
def test_requestarguments_full_use_case(self):
"""
Simulate a minimal RequestArguments use case.
"""
# Create empty RequestArguments instance
ra = RequestArguments()
# Parse GET request
getargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=42&bar=1337&foo=43&baz=Hello,%20World!')
# Insert GET arguments into RequestArguments
for k, vs in getargs.items():
for v in vs:
ra[k].append('text/plain', v)
# Parse POST request
postargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=postfoo&postbar=42&foo=postfoo')
# Insert POST arguments into RequestArguments
for k, vs in postargs.items():
# In this implementation, POST args replace GET args
ra[k].clear()
for v in vs:
ra[k].append('text/plain', v)
# Someplace else: Use the RequestArguments instance.
self.assertEqual('1337', ra.bar.get_str())
self.assertEqual('Hello, World!', ra.baz.get_str())
self.assertEqual('42', ra.postbar.get_str())
for a in ra.foo:
self.assertEqual('postfoo', a.get_str())

View file

@ -0,0 +1,195 @@
from typing import Any, Dict, Union
import os
import os.path
from matemat.exceptions import HttpException
from matemat.webserver import HttpHandler, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
@test_pagelet('/just/testing/serve_pagelet_str')
def serve_test_pagelet_str(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'text/plain'
return 'serve test pagelet str'
@test_pagelet('/just/testing/serve_pagelet_bytes')
def serve_test_pagelet_bytes(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'application/octet-stream'
return b'serve\x80test\xffpagelet\xfebytes'
@test_pagelet('/just/testing/serve_pagelet_redirect')
def serve_test_pagelet_redirect(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
return RedirectResponse('/foo/bar')
@test_pagelet('/just/testing/serve_pagelet_template')
def serve_test_pagelet_template(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
headers['Content-Type'] = 'text/plain'
return TemplateResponse('test.txt', what='World')
# noinspection PyTypeChecker
@test_pagelet('/just/testing/serve_pagelet_fail')
def serve_test_pagelet_fail(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]:
session_vars['test'] = 'hello, world!'
headers['Content-Type'] = 'text/plain'
raise HttpException()
class TestServe(AbstractHttpdTest):
"""
Test cases for the content serving of the web server.
"""
def setUp(self):
super().setUp()
# Create a static resource in the temp dir
with open(os.path.join(self.tempdir.name, 'static_resource.txt'), 'w') as f:
f.write('static resource test')
# Create a second static resource chmodded to 0000, to test 403 Forbidden error
forbidden: str = os.path.join(self.tempdir.name, 'forbidden_static_resource.txt')
with open(forbidden, 'w') as f:
f.write('This should not be readable')
os.chmod(forbidden, 0)
def test_serve_pagelet_str(self):
# Call the test pagelet that produces a 200 OK result
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_str HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure the correct pagelet was called
self.assertEqual('serve_test_pagelet_str', packet.pagelet)
# Make sure the expected content is served
self.assertEqual(200, packet.statuscode)
self.assertEqual(b'serve test pagelet str', packet.body)
def test_serve_pagelet_bytes(self):
# Call the test pagelet that produces a 200 OK result
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_bytes HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure the correct pagelet was called
self.assertEqual('serve_test_pagelet_bytes', packet.pagelet)
# Make sure the expected content is served
self.assertEqual(200, packet.statuscode)
self.assertEqual(b'serve\x80test\xffpagelet\xfebytes', packet.body)
def test_serve_pagelet_fail(self):
# Call the test pagelet that produces a 500 Internal Server Error result
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_fail HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure an error is raised
self.assertEqual(500, packet.statuscode)
def test_serve_pagelet_redirect(self):
# Call the test pagelet that redirects to another path
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_redirect HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure the correct pagelet was called
self.assertEqual('serve_test_pagelet_redirect', packet.pagelet)
# Make sure the correct redirect is issued
self.assertEqual(301, packet.statuscode)
self.assertEqual('/foo/bar', packet.headers['Location'])
# Make sure the response body is empty
self.assertEqual(0, len(packet.body))
def test_serve_pagelet_template(self):
# Call the test pagelet that redirects to another path
self.client_sock.set_request(b'GET /just/testing/serve_pagelet_template HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure the correct pagelet was called
self.assertEqual('serve_test_pagelet_template', packet.pagelet)
self.assertEqual(200, packet.statuscode)
# Make sure the response body was rendered correctly by the templating engine
self.assertEqual(b'Hello, World!', packet.body)
def test_serve_static_ok(self):
# Request a static resource
self.client_sock.set_request(b'GET /static_resource.txt HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure that no pagelet was called
self.assertIsNone(packet.pagelet)
# Make sure the expected content is served
self.assertEqual(200, packet.statuscode)
self.assertEqual(b'static resource test', packet.body)
def test_serve_static_forbidden(self):
# Request a static resource with lacking permissions
self.client_sock.set_request(b'GET /forbidden_static_resource.txt HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure that no pagelet was called
self.assertIsNone(packet.pagelet)
# Make sure a 403 header is served
self.assertEqual(403, packet.statuscode)
self.assertNotEqual(b'This should not be readable', packet.body)
def test_serve_not_found(self):
# Request a nonexistent resource
self.client_sock.set_request(b'GET /nonexistent HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure that no pagelet was called
self.assertIsNone(packet.pagelet)
# Make sure a 404 header is served
self.assertEqual(404, packet.statuscode)
def test_serve_directory_traversal(self):
# Request a resource outside the webroot
self.client_sock.set_request(b'GET /../../../../../etc/passwd HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure that no pagelet was called
self.assertIsNone(packet.pagelet)
# Make sure a 404 header is served
self.assertEqual(404, packet.statuscode)
def test_static_post_not_allowed(self):
# Request a resource outside the webroot
self.client_sock.set_request(b'POST /iwanttopostthis HTTP/1.1\r\n'
b'Content-Type: application/x-www-form-urlencoded\r\n'
b'Content-length: 37\r\n\r\n'
b'q=this%20should%20not%20be%20uploaded')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure that no pagelet was called
self.assertIsNone(packet.pagelet)
# Make sure a 405 Method Not Allowed header is served
self.assertEqual(405, packet.statuscode)

View file

@ -0,0 +1,160 @@
from typing import Any, Dict
from datetime import datetime, timedelta
from time import sleep
from matemat.webserver import HttpHandler, RequestArguments
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
@test_pagelet('/just/testing/sessions')
def session_test_pagelet(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]):
session_vars['test'] = 'hello, world!'
headers['Content-Type'] = 'text/plain'
return 'session test'
class TestSession(AbstractHttpdTest):
"""
Test session handling of the Matemat webserver.
"""
def test_create_new_session(self):
# Reference date to make sure the session expiry lies in the future
refdate: datetime = datetime.utcnow() + timedelta(seconds=3500)
# Send a mock GET request for '/just/testing/sessions'
self.client_sock.set_request(b'GET /just/testing/sessions HTTP/1.1\r\n\r\n')
# Trigger request handling
handler = HttpHandler(self.client_sock, ('::1', 45678), self.server)
# Fetch the parsed response
packet = self.client_sock.get_response()
# Make sure a full HTTP response was parsed
self.assertEqual('done', packet.parse_phase)
# Make sure the request was served by the test pagelet
self.assertEqual('session_test_pagelet', packet.pagelet)
self.assertEqual(200, packet.statuscode)
session_id: str = list(handler.server.session_vars.keys())[0]
# Make sure a cookie was set - assuming that only one was set
self.assertIn('Set-Cookie', packet.headers)
# Split into the cookie itself
cookie, expiry = packet.headers['Set-Cookie'].split(';')
cookie: str = cookie.strip()
expiry: str = expiry.strip()
# Make sure the 'matemat_session_id' cookie was set to the session ID string
self.assertEqual(f'matemat_session_id={session_id}', cookie)
# Make sure the session expires in about one hour
self.assertTrue(expiry.startswith('expires='))
_, expdatestr = expiry.split('=', 1)
expdate = datetime.strptime(expdatestr, '%a, %d %b %Y %H:%M:%S GMT')
self.assertTrue(expdate > refdate)
# Make sure the session exists on the server
self.assertIn('test', handler.session_vars)
self.assertEqual('hello, world!', handler.session_vars['test'])
def test_resume_session(self):
# Test session expiry date
refdate: datetime = datetime.utcnow() + timedelta(hours=1)
# Session ID for testing
session_id: str = 'testsessionid'
# Insert test session
self.server.session_vars[session_id] = refdate, {'test': 'bar'}
sleep(2)
# Send a mock GET request for '/just/testing/sessions' with a matemat session cookie
self.client_sock.set_request(
f'GET /just/testing/sessions HTTP/1.1\r\nCookie: matemat_session_id={session_id}\r\n\r\n'.encode('utf-8'))
# Trigger request handling
handler = HttpHandler(self.client_sock, ('::1', 45678), self.server)
# Fetch the parsed response
packet = self.client_sock.get_response()
# Make sure a full HTTP response was parsed
self.assertEqual('done', packet.parse_phase)
# Make sure the request was served by the test pagelet
self.assertEqual('session_test_pagelet', packet.pagelet)
self.assertEqual(200, packet.statuscode)
response_session_id: str = list(handler.server.session_vars.keys())[0]
# Make sure a cookie was set - assuming that only one was set
self.assertIn('Set-Cookie', packet.headers)
# Split into the cookie itself
cookie, expiry = packet.headers['Set-Cookie'].split(';')
cookie: str = cookie.strip()
expiry: str = expiry.strip()
# Make sure the 'matemat_session_id' cookie was set to the session ID string
self.assertEqual(f'matemat_session_id={response_session_id}', cookie)
# Make sure the session ID matches the one we sent along
self.assertEqual(session_id, response_session_id)
# Make sure the session timeout was postponed
self.assertTrue(expiry.startswith('expires='))
_, expdatestr = expiry.split('=', 1)
expdate = datetime.strptime(expdatestr, '%a, %d %b %Y %H:%M:%S GMT')
self.assertTrue(expdate > refdate)
# Make sure the session exists on the server
self.assertIn('test', handler.session_vars)
self.assertEqual('hello, world!', handler.session_vars['test'])
def test_unknown_session_id(self):
# Unknown session ID
session_id: str = 'theserverdoesnotknowthisid'
refdate: datetime = datetime.utcnow() + timedelta(seconds=3500)
# Send a mock GET request for '/just/testing/sessions' with a session cookie not known to the server
self.client_sock.set_request(
f'GET /just/testing/sessions HTTP/1.1\r\nCookie: matemat_session_id={session_id}\r\n\r\n'.encode('utf-8'))
# Trigger request handling
handler = HttpHandler(self.client_sock, ('::1', 45678), self.server)
# Fetch the parsed response
packet = self.client_sock.get_response()
# Make sure a full HTTP response was parsed
self.assertEqual('done', packet.parse_phase)
# Make sure the request was served by the test pagelet
self.assertEqual('session_test_pagelet', packet.pagelet)
self.assertEqual(200, packet.statuscode)
server_session_id: str = list(handler.server.session_vars.keys())[0]
self.assertNotEqual(session_id, server_session_id)
# Make sure a cookie was set - assuming that only one was set
self.assertIn('Set-Cookie', packet.headers)
# Split into the cookie itself
cookie, expiry = packet.headers['Set-Cookie'].split(';')
cookie: str = cookie.strip()
expiry: str = expiry.strip()
# Make sure the 'matemat_session_id' cookie was set to the session ID string
self.assertEqual(f'matemat_session_id={server_session_id}', cookie)
# Make sure the session expires in about one hour
self.assertTrue(expiry.startswith('expires='))
_, expdatestr = expiry.split('=', 1)
expdate = datetime.strptime(expdatestr, '%a, %d %b %Y %H:%M:%S GMT')
self.assertTrue(expdate > refdate)
# Make sure the session exists on the server
self.assertIn('test', handler.session_vars)
self.assertEqual('hello, world!', handler.session_vars['test'])
def test_session_expired(self):
# Test session expiry date
refdate: datetime = datetime.utcnow() - timedelta(hours=1)
# Session ID for testing
session_id: str = 'testsessionid'
# Insert test session
self.server.session_vars[session_id] = refdate, {'test': 'bar'}
# Send a mock GET request for '/just/testing/sessions' with a matemat session cookie
self.client_sock.set_request(
f'GET /just/testing/sessions HTTP/1.1\r\nCookie: matemat_session_id={session_id}\r\n\r\n'.encode('utf-8'))
# Trigger request handling
handler = HttpHandler(self.client_sock, ('::1', 45678), self.server)
# Fetch the parsed response
packet = self.client_sock.get_response()
# Make sure a full HTTP response was parsed
self.assertEqual('done', packet.parse_phase)
# Make sure the server redirects to /
self.assertEqual(302, packet.statuscode)
self.assertIn('Location', packet.headers)
self.assertEqual('/', packet.headers['Location'])
# Make sure the session was terminated
self.assertNotIn(session_id, self.server.session_vars)

138
matemat/webserver/util.py Normal file
View file

@ -0,0 +1,138 @@
from typing import Dict, List, Tuple, Optional
import urllib.parse
from matemat.webserver import RequestArguments, RequestArgument
def _parse_multipart(body: bytes, boundary: str) -> List[RequestArgument]:
"""
Given a HTTP body with form-data in multipart form, and the multipart-boundary, parse the multipart items and
return them as a dictionary.
:param body: The HTTP multipart/form-data body.
:param boundary: The multipart boundary.
:return: A dictionary of field names as key, and content types and field values as value.
"""
# Prepend a CRLF for the first boundary to match
body = b'\r\n' + body
# Generate item header boundary and terminating boundary from general boundary string
_boundary = f'\r\n--{boundary}\r\n'.encode('utf-8')
_end_boundary = f'\r\n--{boundary}--\r\n'.encode('utf-8')
# Split at the end boundary and make sure there comes nothing after it
allparts = body.split(_end_boundary, 1)
if len(allparts) != 2 or allparts[1] != b'':
raise ValueError('Last boundary missing or corrupted')
# Split remaining body into its parts, and verify at least 1 part is there
parts: List[bytes] = (allparts[0]).split(_boundary)
if len(parts) < 1 or parts[0] != b'':
raise ValueError('First boundary missing or corrupted')
# Remove the first, empty part
parts = parts[1:]
# Results go into this dict
args: Dict[str, RequestArgument] = dict()
# Parse each multipart part
for part in parts:
# Parse multipart headers
hdr: Dict[str, str] = dict()
while True:
head, part = part.split(b'\r\n', 1)
# Break on header/body delimiter
if head == b'':
break
# Add header to hdr dict
hk, hv = head.decode('utf-8').split(':')
hdr[hk.strip()] = hv.strip()
# No content type set - set broadest possible type
if 'Content-Type' not in hdr:
hdr['Content-Type'] = 'application/octet-stream'
# At least Content-Disposition must be present
if 'Content-Disposition' not in hdr:
raise ValueError('Missing Content-Type or Content-Disposition header')
# Extract Content-Disposition header value and its arguments
cd, *cdargs = hdr['Content-Disposition'].split(';')
# Content-Disposition MUST be form-data; everything else is rejected
if cd.strip() != 'form-data':
raise ValueError(f'Unknown Content-Disposition: {cd}')
# Extract the "name" header argument
has_name = False
for cdarg in cdargs:
k, v = cdarg.split('=', 1)
if k.strip() == 'name':
has_name = True
name: str = v.strip()
# Remove quotation marks around the name value
if name.startswith('"') and name.endswith('"'):
name = v[1:-1]
# Add the Content-Type and the content to the header, with the provided name
if name not in args:
args[name] = RequestArgument(name)
args[name].append(hdr['Content-Type'].strip(), part)
if not has_name:
# Content-Disposition header without name attribute
raise ValueError('mutlipart/form-data part without name attribute')
return list(args.values())
def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 'text/plain') \
-> Tuple[str, RequestArguments]:
"""
Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded or
multipart/form-data form, parse the arguments and return them as a dictionary.
If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded.
:param request: The request string to parse.
:param postbody: The POST body to parse, defaults to None.
:param enctype: Encoding of the POST body; supported values are application/x-www-form-urlencoded and
multipart/form-data.
:return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs, and the value's
content type.
"""
# Parse the request "URL" (i.e. only the path)
tokens = urllib.parse.urlparse(request)
# Parse the GET arguments
if len(tokens.query) == 0:
getargs: Dict[str, List[str]] = dict()
else:
getargs = urllib.parse.parse_qs(tokens.query, strict_parsing=True, keep_blank_values=True, errors='strict')
args = RequestArguments()
for k, vs in getargs.items():
args[k].clear()
for v in vs:
args[k].append('text/plain', v)
if postbody is not None:
if enctype == 'application/x-www-form-urlencoded':
# Parse the POST body
pb: str = postbody.decode('utf-8')
if len(pb) == 0:
postargs: Dict[str, List[str]] = dict()
else:
postargs = urllib.parse.parse_qs(pb, strict_parsing=True, keep_blank_values=True, errors='strict')
# Write all POST values into the dict, overriding potential duplicates from GET
for k, vs in postargs.items():
args[k].clear()
for v in vs:
args[k].append('text/plain', v)
elif enctype.startswith('multipart/form-data'):
# Parse the multipart boundary from the Content-Type header
try:
boundary: str = enctype.split('boundary=')[1].strip()
except IndexError:
raise ValueError('Multipart boundary in header not set or corrupted')
# Parse the multipart body
mpargs = _parse_multipart(postbody, boundary)
for ra in mpargs:
args[ra.name].clear()
for a in ra:
args[ra.name].append(a.get_content_type(), bytes(a))
else:
raise ValueError(f'Unsupported Content-Type: {enctype}')
# Return the path and the parsed arguments
return tokens.path, args

View file

@ -1,2 +1,2 @@
bcrypt
apsw apsw
jinja2

17
templates/base.html Normal file
View file

@ -0,0 +1,17 @@
<!DOCTYPE html>
<html>
<head>
<title>Matemat</title>
<style>
body {
color: #f0f0f0;
background: #000000;
}
</style>
</head>
<body>
<h1>Matemat {{__version__}}</h1>
{% block main %}
{% endblock %}
</body>
</html>

9
templates/login.html Normal file
View file

@ -0,0 +1,9 @@
{% extends "base.html" %}
{% block main %}
<form action="/login" method="post">
Username: <input type="text" name="username"/><br/>
Password: <input type="password" name="password" /><br/>
<input type="submit" value="Login"/>
</form>
{% endblock%}

22
templates/main.html Normal file
View file

@ -0,0 +1,22 @@
{% extends "base.html" %}
{% block main %}
{{ user|default("") }}
<ul>
{% if user is defined %}
{% for l in list %}
<li/> <b>{{ l.name }}</b>
{% if user.is_member %}
{{ l.price_member }}
{% else %}
{{ l.price_non_member }}
{% endif %}
{% endfor %}
{% else %}
{% for l in list %}
<li/> <b><a href="/touchkey?username={{ l.name }}">{{ l.name }}</a></b>
{% endfor %}
<li/> <a href="/login">Password login</a>
{% endif %}
</ul>
{% endblock %}

9
templates/touchkey.html Normal file
View file

@ -0,0 +1,9 @@
{% extends "base.html" %}
{% block main %}
<form action="/touchkey" method="post">
<input type="hidden" name="username" value="{{ username }}"/><br/>
Touchkey: <input type="password" name="touchkey" /><br/>
<input type="submit" value="Login"/>
</form>
{% endblock %}