diff --git a/.gitignore b/.gitignore index 95dc460..cca262e 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,4 @@ **/.mypy_cache/ *.sqlite3 -*.db \ No newline at end of file +*.db diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d351014..9adc47d 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,5 +1,5 @@ --- -image: debian:buster +image: s3lph/matemat-ci:20180619-01 stages: - test @@ -8,9 +8,6 @@ stages: test: stage: test script: - - apt-get update -qy - - apt-get install -y --no-install-recommends python3-dev python3-pip python3-coverage python3-setuptools build-essential - - pip3 install wheel - pip3 install -r requirements.txt - python3-coverage run --branch -m unittest discover matemat - python3-coverage report -m --include 'matemat/*' --omit '*/test_*.py' @@ -18,9 +15,6 @@ test: codestyle: stage: codestyle script: - - apt-get update -qy - - apt-get install -y --no-install-recommends python3-dev python3-pip python3-coverage python3-setuptools build-essential - - pip3 install wheel pycodestyle mypy - pip3 install -r requirements.txt - pycodestyle matemat # - mypy --ignore-missing-imports --strict -p matemat diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..55531b0 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "doc"] + path = doc + url = gitlab.com:s3lph/matemat.wiki.git diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..126a06f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ + +FROM debian:buster + +RUN useradd -d /home/matemat -m matemat +RUN apt-get update -qy +RUN apt-get install -y --no-install-recommends python3-dev python3-pip python3-coverage python3-setuptools build-essential +RUN pip3 install wheel pycodestyle mypy + +WORKDIR /home/matemat +USER matemat diff --git a/README.md b/README.md index 8082659..b27f921 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,25 @@ -# matemat +# Matemat -[![pipeline status](https://gitlab.com/s3lph/matemat/badges/master/pipeline.svg)](https://gitlab.com/s3lph/matemat/commits/master) -[![coverage report](https://gitlab.com/s3lph/matemat/badges/master/coverage.svg)](https://gitlab.com/s3lph/matemat/commits/master) +[![pipeline status](https://gitlab.com/s3lph/matemat/badges/master/pipeline.svg)][master] +[![coverage report](https://gitlab.com/s3lph/matemat/badges/master/coverage.svg)][master] + +A web service for automated stock-keeping of a soda machine written in Python. +It provides a touch-input-friendly user interface (as most input happens through the +soda machine's touch screen). + +This project intends to provide a well-tested and maintainable alternative to +[ckruse/matemat][oldapp] (last commit 2013-07-09). + +## Further Documentation + +[Wiki][wiki] ## Dependencies -- Python 3.6 +- Python 3 (>=3.6) - Python dependencies: - apsw - - bcrypt + - jinja2 ## Usage @@ -16,6 +27,17 @@ python -m matemat ``` +## Contributors + +- s3lph +- SPiNNiX + ## License -[MIT License](https://gitlab.com/s3lph/matemat/blob/master/LICENSE) +[MIT License][mit-license] + + +[oldapp]: https://github.com/ckruse/matemat +[mit-license]: https://gitlab.com/s3lph/matemat/blob/master/LICENSE +[master]: https://gitlab.com/s3lph/matemat/commits/master +[wiki]: https://gitlab.com/s3lph/matemat/wikis/home \ No newline at end of file diff --git a/doc b/doc new file mode 160000 index 0000000..9634785 --- /dev/null +++ b/doc @@ -0,0 +1 @@ +Subproject commit 9634785e5621b324f72598b3cee83bbc45c25d7b diff --git a/matemat/__main__.py b/matemat/__main__.py new file mode 100644 index 0000000..9654b64 --- /dev/null +++ b/matemat/__main__.py @@ -0,0 +1,16 @@ + +import sys + +if __name__ == '__main__': + # Those imports are actually needed, as they implicitly register pagelets. + # noinspection PyUnresolvedReferences + from matemat.webserver.pagelets import * + from matemat.webserver import MatematWebserver + + # Read HTTP port from command line + port: int = 8080 + if len(sys.argv) > 1: + port = int(sys.argv[1]) + + # Start the web server + MatematWebserver(port=port).start() diff --git a/matemat/db/__init__.py b/matemat/db/__init__.py index 93b6c8c..4f80a25 100644 --- a/matemat/db/__init__.py +++ b/matemat/db/__init__.py @@ -1,3 +1,6 @@ +""" +This package provides a developer-friendly API to the SQLite3 database backend of the Matemat software. +""" from .wrapper import DatabaseWrapper from .facade import MatematDatabase diff --git a/matemat/db/facade.py b/matemat/db/facade.py index f91bf4a..4bd27cd 100644 --- a/matemat/db/facade.py +++ b/matemat/db/facade.py @@ -1,12 +1,18 @@ from typing import List, Optional, Any, Type -import bcrypt +import crypt from matemat.primitives import User, Product from matemat.exceptions import AuthenticationError, DatabaseConsistencyError from matemat.db import DatabaseWrapper +# TODO: Change to METHOD_BLOWFISH when adopting Python 3.7 +""" +The method to use for password hashing. +""" +_CRYPT_METHOD = crypt.METHOD_SHA512 + class MatematDatabase(object): """ @@ -92,7 +98,7 @@ class MatematDatabase(object): :raises ValueError: If a user with the same name already exists. """ # Hash the password. - pwhash: str = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(12)) + pwhash: str = crypt.crypt(password, crypt.mksalt(_CRYPT_METHOD)) user_id: int = -1 with self.db.transaction() as c: # Look up whether a user with the same name already exists. @@ -138,10 +144,12 @@ class MatematDatabase(object): if row is None: raise AuthenticationError('User does not exist') user_id, username, email, pwhash, tkhash, admin, member = row - if password is not None and not bcrypt.checkpw(password.encode('utf-8'), pwhash): + if password is not None and crypt.crypt(password, pwhash) != pwhash: raise AuthenticationError('Password mismatch') - elif touchkey is not None and not bcrypt.checkpw(touchkey.encode('utf-8'), tkhash): + elif touchkey is not None and tkhash is not None and crypt.crypt(touchkey, tkhash) != tkhash: raise AuthenticationError('Touchkey mismatch') + elif touchkey is not None and tkhash is None: + raise AuthenticationError('Touchkey not set') return User(user_id, username, email, admin, member) def change_password(self, user: User, oldpass: str, newpass: str, verify_password: bool = True) -> None: @@ -163,10 +171,10 @@ class MatematDatabase(object): if row is None: raise AuthenticationError('User does not exist in database.') # Verify the old password, if it should be verified. - if verify_password and not bcrypt.checkpw(oldpass.encode('utf-8'), row[0]): + if verify_password and crypt.crypt(oldpass, row[0]) != row[0]: raise AuthenticationError('Old password does not match.') # Hash the new password and write it to the database. - pwhash: str = bcrypt.hashpw(newpass.encode('utf-8'), bcrypt.gensalt(12)) + pwhash: str = crypt.crypt(newpass, crypt.mksalt(_CRYPT_METHOD)) c.execute(''' UPDATE users SET password = :pwhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id ''', { @@ -193,10 +201,10 @@ class MatematDatabase(object): if row is None: raise AuthenticationError('User does not exist in database.') # Verify the password, if it should be verified. - if verify_password and not bcrypt.checkpw(password.encode('utf-8'), row[0]): + if verify_password and crypt.crypt(password, row[0]) != row[0]: raise AuthenticationError('Password does not match.') # Hash the new touchkey and write it to the database. - tkhash: str = bcrypt.hashpw(touchkey.encode('utf-8'), bcrypt.gensalt(12)) if touchkey is not None else None + tkhash: str = crypt.crypt(touchkey, crypt.mksalt(_CRYPT_METHOD)) if touchkey is not None else None c.execute(''' UPDATE users SET touchkey = :tkhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id ''', { diff --git a/matemat/db/test/test_facade.py b/matemat/db/test/test_facade.py index c266feb..b17262f 100644 --- a/matemat/db/test/test_facade.py +++ b/matemat/db/test/test_facade.py @@ -1,7 +1,7 @@ import unittest -import bcrypt +import crypt from matemat.db import MatematDatabase from matemat.exceptions import AuthenticationError, DatabaseConsistencyError @@ -10,6 +10,7 @@ from matemat.exceptions import AuthenticationError, DatabaseConsistencyError class DatabaseTest(unittest.TestCase): def setUp(self) -> None: + # Create an in-memory database for testing self.db = MatematDatabase(':memory:') def test_create_user(self) -> None: @@ -57,7 +58,7 @@ class DatabaseTest(unittest.TestCase): u = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com') # Add a touchkey without using the provided function c.execute('''UPDATE users SET touchkey = :tkhash WHERE user_id = :user_id''', { - 'tkhash': bcrypt.hashpw(b'0123', bcrypt.gensalt(12)), + 'tkhash': crypt.crypt('0123', crypt.mksalt(crypt.METHOD_SHA512)), 'user_id': u.id }) user = db.login('testuser', 'supersecurepassword') diff --git a/matemat/db/test/test_wrapper.py b/matemat/db/test/test_wrapper.py index 152066a..dafff65 100644 --- a/matemat/db/test/test_wrapper.py +++ b/matemat/db/test/test_wrapper.py @@ -7,6 +7,7 @@ from matemat.db import DatabaseWrapper class DatabaseTest(unittest.TestCase): def setUp(self) -> None: + # Create an in-memory database for testing self.db = DatabaseWrapper(':memory:') def test_create_schema(self) -> None: diff --git a/matemat/exceptions/AuthenticatonError.py b/matemat/exceptions/AuthenticatonError.py index 0d23c2e..d332fdc 100644 --- a/matemat/exceptions/AuthenticatonError.py +++ b/matemat/exceptions/AuthenticatonError.py @@ -2,7 +2,7 @@ from typing import Optional -class AuthenticationError(BaseException): +class AuthenticationError(Exception): def __init__(self, msg: Optional[str] = None) -> None: super().__init__() diff --git a/matemat/exceptions/HttpException.py b/matemat/exceptions/HttpException.py new file mode 100644 index 0000000..4f792ce --- /dev/null +++ b/matemat/exceptions/HttpException.py @@ -0,0 +1,20 @@ + +class HttpException(Exception): + + def __init__(self, status: int = 500, title: str = 'An error occurred', message: str = None) -> None: + super().__init__() + self.__status: int = status + self.__title: str = title + self.__message: str = message + + @property + def status(self) -> int: + return self.__status + + @property + def title(self) -> str: + return self.__title + + @property + def message(self) -> str: + return self.__message diff --git a/matemat/exceptions/__init__.py b/matemat/exceptions/__init__.py index cacbbb9..9b10b5f 100644 --- a/matemat/exceptions/__init__.py +++ b/matemat/exceptions/__init__.py @@ -1,3 +1,7 @@ +""" +This package provides custom exception classes used in the Matemat codebase. +""" from .AuthenticatonError import AuthenticationError from .DatabaseConsistencyError import DatabaseConsistencyError +from .HttpException import HttpException diff --git a/matemat/primitives/__init__.py b/matemat/primitives/__init__.py index f380d86..a18a142 100644 --- a/matemat/primitives/__init__.py +++ b/matemat/primitives/__init__.py @@ -1,3 +1,6 @@ +""" +This package provides the 'primitive types' the Matemat software deals with - namely users and products. +""" from .User import User from .Product import Product diff --git a/matemat/webserver/__init__.py b/matemat/webserver/__init__.py index e69de29..6059687 100644 --- a/matemat/webserver/__init__.py +++ b/matemat/webserver/__init__.py @@ -0,0 +1,11 @@ +""" +The Matemat Webserver. + +This package provides the webserver for the Matemat software. It uses Python's http.server and extends it with an event +API that can be used by 'pagelets' - single pages of a web service. If a request cannot be handled by a pagelet, the +server will attempt to serve the request with a static resource in a previously configured webroot directory. +""" + +from .requestargs import RequestArgument, RequestArguments +from .responses import PageletResponse, RedirectResponse, TemplateResponse +from .httpd import MatematWebserver, HttpHandler, pagelet diff --git a/matemat/webserver/httpd.py b/matemat/webserver/httpd.py index 2ae1685..41866de 100644 --- a/matemat/webserver/httpd.py +++ b/matemat/webserver/httpd.py @@ -1,61 +1,399 @@ -from typing import Tuple, Dict +from typing import Any, Callable, Dict, Tuple, Type, Union +import traceback + +import os +import socket +import mimetypes +from socketserver import TCPServer from http.server import HTTPServer, BaseHTTPRequestHandler from http.cookies import SimpleCookie from uuid import uuid4 from datetime import datetime, timedelta +import jinja2 + from matemat import __version__ as matemat_version +from matemat.exceptions import HttpException +from matemat.webserver import RequestArguments, PageletResponse, RedirectResponse, TemplateResponse +from matemat.webserver.util import parse_args + +# +# Python internal class hacks +# + +# Enable IPv6 support (IPv6/IPv4 dual-stack support should be implicitly enabled) +TCPServer.address_family = socket.AF_INET6 +# Redirect internal logging to somewhere else, or, for now, silently discard (TODO: logger will come later) +BaseHTTPRequestHandler.log_request = lambda self, code='-', size='-': None +BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None + +# Dictionary to hold registered pagelet paths and their handler functions +_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...) + str, # Request path + RequestArguments, # HTTP Request arguments + Dict[str, Any], # Session vars + Dict[str, str]], # Response headers + Union[ # Return type: either a response body, or a redirect + bytes, str, # Response body: will assign HTTP/1.0 200 OK + PageletResponse, # A generic response + ]]] = dict() + +# Inactivity timeout for client sessions +_SESSION_TIMEOUT: int = 3600 +_MAX_POST: int = 1_000_000 + + +def pagelet(path: str): + """ + Annotate a function to act as a pagelet (part of a website). The function will be called if a request is made to + the path specified as argument to the annotation. + + The function must have the following signature: + + (method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str]) + -> Union[bytes, str, Tuple[int, str]] + + method: The HTTP method (GET, POST) that was used. + path: The path that was requested. + args: The arguments that were passed with the request (as GET or POST arguments). + session_vars: The session storage. May be read from and written to. + headers: The dictionary of HTTP response headers. Add headers you wish to send with the response. + returns: One of the following: + - A HTTP Response body as str or bytes + - A PageletResponse class instance: An instance of (a subclass of) + matemat.webserver.PageletResponse, e.g. encapsulating a redirect or a Jinja2 template. + raises: HttpException: If a non-200 HTTP status code should be returned + + :param path: The path to register the function for. + """ + + def http_handler(fun: Callable[[str, + str, + RequestArguments, + Dict[str, Any], + Dict[str, str]], + Union[ + bytes, str, + PageletResponse + ]]): + # Add the function to the dict of pagelets + _PAGELET_PATHS[path] = fun + # Don't change the function itself at all + return fun + + # Return the inner function (Python requires a "real" function annotation to not have any arguments except + # the function itself) + return http_handler + + +class MatematHTTPServer(HTTPServer): + """ + A http.server.HTTPServer subclass that acts as a container for data that must be persistent between requests. + """ + + def __init__(self, + server_address: Any, + handler: Type[BaseHTTPRequestHandler], + staticroot: str, + templateroot: str, + bind_and_activate: bool = True) -> None: + super().__init__(server_address, handler, bind_and_activate) + # Resolve webroot directory + self.webroot = os.path.abspath(staticroot) + # Set up session vars dict + self.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict() + # Set up the Jinja2 environment + self.jinja_env: jinja2.Environment = jinja2.Environment( + loader=jinja2.FileSystemLoader(os.path.abspath(templateroot)) + ) class MatematWebserver(object): + """ + Then main webserver class, internally uses Python's http.server. - def __init__(self) -> None: - self._httpd = HTTPServer(('', 8080), HttpHandler) + The server will serve a pagelet, if one is defined for a request path, else it will attempt to serve a static + resource from the webroot. + + Usage: + + # Listen on all interfaces on port 80 (dual-stack IPv6/IPv4) + server = MatematWebserver('::', 80, webroot='/var/www/matemat') + # Start the server. This call blocks while the server is running. + server.start() + """ + + def __init__(self, + listen: str = '::', + port: int = 80, + staticroot: str = './static', + templateroot: str = './templates') -> None: + """ + Instantiate a MatematWebserver. + + :param listen: The IPv4 or IPv6 address to listen on. + :param port: The TCP port to listen on. + :param staticroot: Path to the static webroot directory. + :param templateroot: Path to the Jinja2 templates root directory. + """ + if len(listen) == 0: + # Empty string should be interpreted as all addresses + listen = '::' + # IPv4 address detection heuristic + if ':' not in listen and '.' in listen: + # Rewrite IPv4 address to IPv6-mapped form + listen = f'::ffff:{listen}' + # Create the http server + self._httpd = MatematHTTPServer((listen, port), HttpHandler, staticroot, templateroot) def start(self) -> None: + """ + Start the web server. This call blocks while the server is running. + """ self._httpd.serve_forever() class HttpHandler(BaseHTTPRequestHandler): + """ + HTTP Request handler. + + This class parses HTTP requests, and calls the appropriate pagelets, or fetches a static resource from the webroot + directory. + """ def __init__(self, request: bytes, client_address: Tuple[str, int], server: HTTPServer) -> None: super().__init__(request, client_address, server) - self._session_vars: Dict[str, Tuple[datetime, Dict[str, object]]] = dict() - print(self._session_vars) + self.server: MatematHTTPServer @property def server_version(self) -> str: return f'matemat/{matemat_version}' - def start_session(self) -> Tuple[str, datetime]: + def _start_session(self) -> Tuple[str, datetime]: + """ + Start a new session, or resume the session identified by the session cookie sent in the HTTP request. + + :return: A tuple consisting of the session ID (a UUID string), and the session timeout date. + """ + # Reference date for session timeout now = datetime.utcnow() + # Parse cookies sent by the client cookiestring = '\n'.join(self.headers.get_all('Cookie', failobj=[])) cookie = SimpleCookie() cookie.load(cookiestring) - session_id = cookie['matemat_session_id'] if 'matemat_session_id' in cookie else str(uuid4()) + # Read the client's session ID, if any + session_id = str(cookie['matemat_session_id'].value) if 'matemat_session_id' in cookie else None + # If there is no active session, create a new session ID + if session_id is None or session_id not in self.server.session_vars: + session_id = str(uuid4()) - if session_id in self._session_vars and self._session_vars[session_id][0] < now: - self.end_session(session_id) - raise TimeoutError('Session timed out') - elif session_id not in self._session_vars: - self._session_vars[session_id] = (now + timedelta(hours=1)), dict() - return session_id, now + # Check for session timeout + if session_id in self.server.session_vars and self.server.session_vars[session_id][0] < now: + self._end_session(session_id) + raise TimeoutError('Session timed out.') + # Update or initialize the session timeout + if session_id not in self.server.session_vars: + self.server.session_vars[session_id] = (now + timedelta(seconds=_SESSION_TIMEOUT)), dict() + else: + self.server.session_vars[session_id] = \ + (now + timedelta(seconds=_SESSION_TIMEOUT), self.server.session_vars[session_id][1]) + # Return the session ID and timeout + return session_id, self.server.session_vars[session_id][0] - def end_session(self, session_id: str) -> None: - if session_id in self._session_vars: - del self._session_vars[session_id] + def _end_session(self, session_id: str) -> None: + """ + Destroy a session identified by the session ID. - def do_GET(self) -> None: + :param session_id: ID of the session to destroy. + """ + if session_id in self.server.session_vars: + del self.server.session_vars[session_id] + + def _parse_pagelet_result(self, + pagelet_res: Union[bytes, # Response body as bytes + str, # Response body as str + PageletResponse], # Encapsulated or unresolved response body + headers: Dict[str, str]) \ + -> Tuple[int, bytes]: + """ + Process the return value of a pagelet function call. + + :param pagelet_res: The pagelet return value. + :param headers: The dict of HTTP response headers, needed for setting the redirect header. + :return: The HTTP Response status code (an int) and body (a bytes). + :raises TypeError: If the pagelet result was not in the expected form. + """ + # The HTTP Response Status Code, defaults to 200 OK + hsc: int = 200 + # The HTTP Response body, defaults to empty + data: bytes = bytes() + + # If the response is a bytes object, it is used without further modification + if isinstance(pagelet_res, bytes): + data = pagelet_res + + # If the response is a str object, it is encoded into a bytes object + elif isinstance(pagelet_res, str): + data = pagelet_res.encode('utf-8') + + # If the response is a PageletResponse object, the status code is extracted. Generation of the body depends + # on the subtype + elif isinstance(pagelet_res, PageletResponse): + hsc = pagelet_res.status + + # If the object is a RedirectResponse instance, no body is needed + if isinstance(pagelet_res, RedirectResponse): + headers['Location'] = pagelet_res.location + # If the object is a TemplateRespinse instance, pass the Jinja2 environment instance for rendering + elif isinstance(pagelet_res, TemplateResponse): + # noinspection PyProtectedMember + data = pagelet_res._render(self.server.jinja_env) + # else: Empty body + + else: + raise TypeError(f'Return value of pagelet not understood: {pagelet_res}') + + # Return the resulting status code and body + return hsc, data + + def _handle(self, method: str, path: str, args: RequestArguments) -> None: + """ + Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource. + + :param method: The HTTP request method (GET, POST). + :param path: The request path without GET arguments. + :param args: Arguments sent with the request. This includes GET and POST arguments, where the POST arguments + take precedence. + """ + # Start or resume a session; redirect to / on session timeout try: - session_id, timeout = self.start_session() + session_id, timeout = self._start_session() except TimeoutError: + self.send_response(302) self.send_header('Set-Cookie', 'matemat_session_id=; expires=Thu, 01 Jan 1970 00:00:00 GMT') - self.send_error(599, 'Session Timed Out.', 'Please log in again.') + self.send_header('Location', '/') + self.end_headers() return - self.send_response(200, 'Welcome!') + self.session_id: str = session_id + # Call a pagelet function, if one is registered for the requested path + if path in _PAGELET_PATHS: + # Prepare some headers. Those can still be overwritten by the pagelet + headers: Dict[str, str] = { + 'Content-Type': 'text/html', + 'Cache-Control': 'no-cache' + } + # Call the pagelet function + pagelet_res = _PAGELET_PATHS[path](method, path, args, self.session_vars, headers) + # Parse the pagelet's return value, vielding a HTTP status code and a response body + hsc, data = self._parse_pagelet_result(pagelet_res, headers) + # Send the HTTP status code + self.send_response(hsc) + # Format the session cookie timeout string and send the session cookie header + expires = timeout.strftime("%a, %d %b %Y %H:%M:%S GMT") + self.send_header('Set-Cookie', + f'matemat_session_id={session_id}; expires={expires}') + # Compute the body length and add the appropriate header + headers['Content-Length'] = str(len(data)) + # Send all headers set by the pagelet + for name, value in headers.items(): + self.send_header(name, value) + # End the header section and write the body + self.end_headers() + self.wfile.write(data) + else: + # No pagelet function for this path, try a static serve instead + # Only HTTP GET is allowed, else reply with a 'Method Not Allowed' header + if method != 'GET': + self.send_error(405) + self.end_headers() + return + # Create the absolute resource path, resolving '..' + filepath: str = os.path.abspath(os.path.join(self.server.webroot, path[1:])) + # Make sure the file is actually inside the webroot directory and that it exists + if os.path.commonpath([filepath, self.server.webroot]) == self.server.webroot and os.path.exists(filepath): + # Open and read the file + with open(filepath, 'rb') as f: + data = f.read() + # File read successfully, send 'OK' header + self.send_response(200) + # TODO: Guess the MIME type. Unfortunately this call solely relies on the file extension, not ideal? + mimetype, _ = mimetypes.guess_type(filepath) + # Fall back to octet-stream type, if unknown + if mimetype is None: + mimetype = 'application/octet-stream' + # Send content type and length header + self.send_header('Content-Type', mimetype) + self.send_header('Content-Length', str(len(data))) + self.end_headers() + # Send the requested resource as response body + self.wfile.write(data) + else: + # File does not exist or path points outside the webroot directory + self.send_response(404) + self.end_headers() -if __name__ == '__main__': - MatematWebserver().start() + # noinspection PyPep8Naming + def do_GET(self) -> None: + """ + Called by BasicHTTPRequestHandler for GET requests. + """ + try: + # Parse the request and hand it to the handle function + path, args = parse_args(self.path) + self._handle('GET', path, args) + # Special handling for some errors + except HttpException as e: + self.send_error(e.status, e.title, e.message) + except PermissionError: + self.send_error(403, 'Forbidden') + except ValueError: + self.send_error(400, 'Bad Request') + except BaseException as e: + # Generic error handling + self.send_error(500, 'Internal Server Error') + print(e) + traceback.print_tb(e.__traceback__) + + # noinspection PyPep8Naming + def do_POST(self) -> None: + """ + Called by BasicHTTPRequestHandler for POST requests. + """ + try: + # Read the POST body, if it exists, and its MIME type is application/x-www-form-urlencoded + clen: int = int(str(self.headers.get('Content-Length', failobj='0'))) + if clen > _MAX_POST: + raise ValueError('Request too big') + ctype: str = self.headers.get('Content-Type', failobj='application/octet-stream') + post: bytes = self.rfile.read(clen) + path, args = parse_args(self.path, postbody=post, enctype=ctype) + # Parse the request and hand it to the handle function + self._handle('POST', path, args) + # Special handling for some errors + except HttpException as e: + self.send_error(e.status, e.title, e.message) + except PermissionError: + self.send_error(403, 'Forbidden') + except ValueError: + self.send_error(400, 'Bad Request') + except BaseException as e: + # Generic error handling + self.send_error(500, 'Internal Server Error') + print(e) + traceback.print_tb(e.__traceback__) + + @property + def session_vars(self) -> Dict[str, Any]: + """ + Get the session variables for the current session. + + :return: Dictionary of named session variables. + """ + return self.server.session_vars[self.session_id][1] diff --git a/matemat/webserver/pagelets/__init__.py b/matemat/webserver/pagelets/__init__.py new file mode 100644 index 0000000..9b926d6 --- /dev/null +++ b/matemat/webserver/pagelets/__init__.py @@ -0,0 +1,10 @@ +""" +This package contains the pagelet functions served by the Matemat software. + +A new pagelet function must be imported here to be automatically loaded when the server is started. +""" + +from .main import main_page +from .login import login_page +from .logout import logout +from .touchkey import touchkey_page diff --git a/matemat/webserver/pagelets/login.py b/matemat/webserver/pagelets/login.py new file mode 100644 index 0000000..6ef9a9c --- /dev/null +++ b/matemat/webserver/pagelets/login.py @@ -0,0 +1,29 @@ + +from typing import Any, Dict, Union + +from matemat.exceptions import AuthenticationError, HttpException +from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse +from matemat.primitives import User +from matemat.db import MatematDatabase + + +@pagelet('/login') +def login_page(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str])\ + -> Union[bytes, str, PageletResponse]: + if 'user' in session_vars: + return RedirectResponse('/') + if method == 'GET': + return TemplateResponse('login.html') + elif method == 'POST': + with MatematDatabase('test.db') as db: + try: + user: User = db.login(str(args.username), str(args.password)) + except AuthenticationError: + return RedirectResponse('/login') + session_vars['user'] = user + return RedirectResponse('/') + raise HttpException(405) diff --git a/matemat/webserver/pagelets/logout.py b/matemat/webserver/pagelets/logout.py new file mode 100644 index 0000000..cbd7cad --- /dev/null +++ b/matemat/webserver/pagelets/logout.py @@ -0,0 +1,16 @@ + +from typing import Any, Dict, Union + +from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse + + +@pagelet('/logout') +def logout(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str])\ + -> Union[bytes, str, PageletResponse]: + if 'user' in session_vars: + del session_vars['user'] + return RedirectResponse('/') diff --git a/matemat/webserver/pagelets/main.py b/matemat/webserver/pagelets/main.py new file mode 100644 index 0000000..be0bd60 --- /dev/null +++ b/matemat/webserver/pagelets/main.py @@ -0,0 +1,23 @@ + +from typing import Any, Dict, Union + +from matemat.webserver import pagelet, RequestArguments, PageletResponse, TemplateResponse +from matemat.primitives import User +from matemat.db import MatematDatabase + + +@pagelet('/') +def main_page(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str])\ + -> Union[bytes, str, PageletResponse]: + with MatematDatabase('test.db') as db: + if 'user' in session_vars: + user: User = session_vars['user'] + products = db.list_products() + return TemplateResponse('main.html', user=user, list=products) + else: + users = db.list_users() + return TemplateResponse('main.html', list=users) diff --git a/matemat/webserver/pagelets/touchkey.py b/matemat/webserver/pagelets/touchkey.py new file mode 100644 index 0000000..2b81c89 --- /dev/null +++ b/matemat/webserver/pagelets/touchkey.py @@ -0,0 +1,32 @@ + +from typing import Any, Dict, Tuple, Union + +import urllib.parse + +from matemat.exceptions import AuthenticationError, HttpException +from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse +from matemat.primitives import User +from matemat.db import MatematDatabase + + +@pagelet('/touchkey') +def touchkey_page(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str])\ + -> Union[bytes, str, PageletResponse]: + if 'user' in session_vars: + return RedirectResponse('/') + if method == 'GET': + return TemplateResponse('touchkey.html', username=str(args.username) if 'username' in args else None) + elif method == 'POST': + with MatematDatabase('test.db') as db: + try: + user: User = db.login(str(args.username), touchkey=str(args.touchkey)) + except AuthenticationError: + quoted = urllib.parse.quote_plus(bytes(args.username)) + return RedirectResponse(f'/touchkey?username={quoted}') + session_vars['user'] = user + return RedirectResponse('/') + raise HttpException(405) diff --git a/matemat/webserver/requestargs.py b/matemat/webserver/requestargs.py new file mode 100644 index 0000000..373db90 --- /dev/null +++ b/matemat/webserver/requestargs.py @@ -0,0 +1,324 @@ + +from typing import Dict, Iterator, List, Tuple, Union + + +class RequestArguments(object): + """ + Container for HTTP Request arguments. + + Usage: + + # Create empty instance + ra = RequestArguments() + # Add an entry for the key 'foo' with the value 'bar' and Content-Type 'text/plain' + ra['foo'].append('text/plain', 'bar') + # Retrieve the value for the key 'foo', as a string... + foo = str(ra.foo) + # ... or as raw bytes + foo = bytes(ra.foo) + """ + + def __init__(self) -> None: + """ + Create an empty container instance. + """ + self.__container: Dict[str, RequestArgument] = dict() + + def __getitem__(self, key: str) -> 'RequestArgument': + """ + Retrieve the argument for the given name, creating it on the fly, if it doesn't exist. + + :param key: Name of the argument to retrieve. + :return: A RequestArgument instance. + :raises TypeError: If key is not a string. + """ + if not isinstance(key, str): + raise TypeError('key must be a str') + # Create empty argument, if it doesn't exist + if key not in self.__container: + self.__container[key] = RequestArgument(key) + # Return the argument for the name + return self.__container[key] + + def __getattr__(self, key: str) -> 'RequestArgument': + """ + Syntactic sugar for accessing values with a name that can be used in Python attributes. The value will be + returned as an immutable view. + + :param key: Name of the argument to retrieve. + :return: An immutable view of the RequestArgument instance. + """ + return _View.of(self.__container[key]) + + def __iter__(self) -> Iterator['RequestArgument']: + """ + Returns an iterator over the values in this instance. Values are represented as immutable views. + + :return: An iterator that yields immutable views of the values. + """ + for ra in self.__container.values(): + # Yield an immutable scalar view for each value + yield _View.of(ra) + + def __contains__(self, key: str) -> bool: + """ + Checks whether an argument with a given name exists in the RequestArguments instance. + + :param key: The name to check whether it exists. + :return: True, if present, False otherwise. + """ + return key in self.__container + + def __len__(self) -> int: + """ + :return: The number of arguments in this instance. + """ + return len(self.__container) + + +class RequestArgument(object): + """ + Container class for HTTP request arguments that simplifies dealing with + - scalar and array arguments: + Automatically converts between single values and arrays where necessary: Arrays with one element can be + accessed as scalars, and scalars can be iterated, yielding themselves as a single item. + - UTF-8 strings and binary data (e.g. file uploads): + All data can be retrieved both as a str (if utf-8 decoding is possible) and a bytes object. + + The objects returned from iteration or indexing are immutable views of (parts of) this object. + + Usage example: + + qsargs = urllib.parse.parse_qs(qs, strict_parsing=True, keep_blank_values=True, errors='strict') + args: RequestArguments + for k, vs in qsargs: + args[k].clear() + for v in vs: + # text/plain usually is a sensible choice for values decoded from urlencoded strings + # IF ALREADY IN STRING FORM (which parse_qs does)! + args[k].append('text/plain', v) + + if 'username' in args and args.username.is_scalar: + username = str(args.username) + + """ + + def __init__(self, + name: str, + value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]] = None) -> None: + """ + Create a new RequestArgument with a name and optionally an initial value. + + :param name: The name for this argument, as provided via GET or POST. + :param value: The initial value, if any. Optional, initializes with empty array if omitted. + """ + # Assign name + self.__name: str = name + # Initialize value + self.__value: List[Tuple[str, Union[bytes, str]]] = [] + # Default to empty array + if value is None: + self.__value = [] + else: + if isinstance(value, list): + # Store the array + self.__value = value + else: + # Turn scalar into an array before storing + self.__value = [value] + + @property + def is_array(self) -> bool: + """ + :return: True, if the value is a (possibly empty) array, False otherwise. + """ + return len(self.__value) != 1 + + @property + def is_scalar(self) -> bool: + """ + :return: True, if the value is a single scalar value, False otherwise. + """ + return len(self.__value) == 1 + + @property + def is_view(self) -> bool: + """ + :return: True, if this instance is an immutable view, False otherwise. + """ + return False + + @property + def name(self) -> str: + """ + :return: The name of this argument. + """ + return self.__name + + def get_str(self, index: int = 0) -> str: + """ + Attempts to return a value as a string. The index defaults to 0. + + :param index: The index of the value to retrieve. Default: 0. + :return: An UTF-8 string representation of the requested value. + :raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string. + :raises IndexError: If the index is out of bounds. + :raises TypeError: If the index is not an int. + :raises TypeError: If the requested value is neither a str nor a bytes object. + """ + if not isinstance(index, int): + # Index must be an int + raise TypeError('index must be an int') + # Type hint; access array element + v: Tuple[str, Union[bytes, str]] = self.__value[index] + if isinstance(v[1], str): + # The value already is a string, return + return v[1] + elif isinstance(v[1], bytes): + # The value is a bytes object, attempt to decode + return v[1].decode('utf-8') + raise TypeError('Value is neither a str nor bytes') + + def __str__(self) -> str: + """ + Attempts to return the first value as a string. + :return: An UTF-8 string representation of the first value. + :raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string. + """ + return self.get_str() + + def get_bytes(self, index: int = 0) -> bytes: + """ + Attempts to return a value as a bytes object. The index defaults to 0. + + :param index: The index of the value to retrieve. Default: 0. + :return: A bytes object representation of the requested value. Strings will be encoded as UTF-8. + :raises IndexError: If the index is out of bounds. + :raises TypeError: If the index is not an int. + :raises TypeError: If the requested value is neither a str nor a bytes object. + """ + if not isinstance(index, int): + # Index must be a int + raise TypeError('index must be an int') + # Type hint; access array element + v: Tuple[str, Union[bytes, str]] = self.__value[index] + if isinstance(v[1], bytes): + # The value already is a bytes object, return + return v[1] + elif isinstance(v[1], str): + # The value is a string, encode first + return v[1].encode('utf-8') + raise TypeError('Value is neither a str nor bytes') + + def __bytes__(self) -> bytes: + """ + Attempts to return the first value as a bytes object. + :return: A bytes string representation of the first value. + """ + return self.get_bytes() + + def get_content_type(self, index: int = 0) -> str: + """ + Attempts to retrieve a value's Content-Type. The index defaults to 0. + + :param index: The index of the value to retrieve. Default: 0. + :return: The Content-Type of the requested value, as sent by the client. Not necessarily trustworthy. + :raises IndexError: If the index is out of bounds. + :raises TypeError: If the index is not an int. + """ + # instance is an array value + if not isinstance(index, int): + # Needs an index for array values + raise TypeError('index must be an int') + # Type hint; access array element + va: Tuple[str, Union[bytes, str]] = self.__value[index] + # Return the content type of the requested value + if not isinstance(va[0], str): + raise TypeError('Content-Type is not a str') + return va[0] + + def append(self, ctype: str, value: Union[str, bytes]) -> None: + """ + Append a value to this instance. Turns an empty argument into a scalar and a scalar into an array. + + :param ctype: The Content-Type, as provided in the request. + :param value: The scalar value to append, either a string or bytes object. + :raises TypeError: If called on an immutable view. + """ + if self.is_view: + # This is an immutable view, raise exception + raise TypeError('A RequestArgument view is immutable!') + self.__value.append((ctype, value)) + + def clear(self) -> None: + """ + Remove all values from this instance. + + :raises TypeError: If called on an immutable view. + """ + if self.is_view: + # This is an immutable view, raise exception + raise TypeError('A RequestArgument view is immutable!') + self.__value.clear() + + def __len__(self) -> int: + """ + :return: Number of values for this argument. + """ + return len(self.__value) + + def __iter__(self) -> Iterator['RequestArgument']: + """ + Iterate the values of this argument. Each value is accessible as if it were a scalar RequestArgument in turn, + although they are immutable. + + :return: An iterator that yields immutable views of the values. + """ + for v in self.__value: + # Yield an immutable scalar view for each (ctype, value) element in the array + yield _View(self.__name, v) + + def __getitem__(self, index: Union[int, slice]) -> 'RequestArgument': + """ + Index the argument with either an int or a slice. The returned values are represented as immutable + RequestArgument views. + + :param index: The index or slice. + :return: An immutable view of the indexed elements of this argument. + """ + # Pass the index or slice through to the array, packing the result in an immutable view + return _View(self.__name, self.__value[index]) + + +class _View(RequestArgument): + """ + This class represents an immutable view of a (subset of a) RequestArgument object. Should not be instantiated + directly. + """ + + def __init__(self, name: str, value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]])\ + -> None: + """ + Create a new immutable view of a (subset of a) RequestArgument. + + :param name: The name for this argument, same as in the original RequestArgument. + :param value: The values to represent in this view, obtained by e.g. indexing or slicing. + """ + super().__init__(name, value) + + @staticmethod + def of(argument: 'RequestArgument') ->'RequestArgument': + """ + Create an immutable, unsliced view of an RequestArgument instance. + + :param argument: The RequestArgument instance to create a view of. + :return: An immutable view of the provided RequestArgument instance. + """ + return argument[:] + + @property + def is_view(self) -> bool: + """ + :return: True, if this instance is an immutable view, False otherwise. + """ + return True diff --git a/matemat/webserver/responses.py b/matemat/webserver/responses.py new file mode 100644 index 0000000..6f0308f --- /dev/null +++ b/matemat/webserver/responses.py @@ -0,0 +1,65 @@ + +from jinja2 import Environment, Template + +from matemat import __version__ + + +class PageletResponse: + """ + Base class for pagelet return values that require more action than simply sending plain data. + + An instance of this base class will result in an empty 200 OK response. + """ + + def __init__(self, status: int = 200): + """ + Create an empty response. + + :param status: The HTTP status code, defaults to 200 (OK). + """ + self.status: int = status + + +class RedirectResponse(PageletResponse): + """ + A pagelet response that causes the server to redirect to another location, using a 301 Permanently Moved (uncached) + response status, and a Location header. + """ + + def __init__(self, location: str): + """ + Create a redirection response with the given redirection location. + + :param location: The location to redirect to. + """ + super().__init__(status=301) + self.location: str = location + + +class TemplateResponse(PageletResponse): + """ + A pagelet response that causes the server to load a Jinja2 template and render it with the provided arguments, then + sending the result as response body, with a 200 OK response status. + """ + + def __init__(self, name: str, **kwargs): + """ + Create a template response with the given template name and arguments. + + :param name: Name of the template to load. + :param kwargs: Arguments for rendering the template, will be passed to jinja2.Template.render as is. + """ + super().__init__() + self.name: str = name + self.kwargs = kwargs + + def _render(self, jinja_env: Environment) -> bytes: + """ + Load and render the template using the Jinja2 environment managed by the web server instance. This method + should not be called by a pagelet. + + :param jinja_env: The Jinja2 environment. + :return: An UTF-8 encoded bytes object containing the template rendering result. + """ + template: Template = jinja_env.get_template(self.name) + return template.render(**self.kwargs, __version__=__version__).encode('utf-8') diff --git a/matemat/webserver/test/__init__.py b/matemat/webserver/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/matemat/webserver/test/abstract_httpd_test.py b/matemat/webserver/test/abstract_httpd_test.py new file mode 100644 index 0000000..b121dfd --- /dev/null +++ b/matemat/webserver/test/abstract_httpd_test.py @@ -0,0 +1,204 @@ + +from typing import Any, Callable, Dict, Tuple, Union + +import unittest.mock +from io import BytesIO +from tempfile import TemporaryDirectory + +from abc import ABC +from datetime import datetime +from http.server import HTTPServer + +import jinja2 + +from matemat.webserver import pagelet, RequestArguments + + +class HttpResponse: + """ + A really basic HTTP response container and parser class, just good enough for unit testing a HTTP server, if even. + + DO NOT USE THIS OUTSIDE UNIT TESTING! + + Usage: + response = HttpResponse() + while response.parse_phase != 'done' + response.parse() + print(response.statuscode) + """ + + def __init__(self) -> None: + # The HTTP status code of the response + self.statuscode: int = 0 + # HTTP headers set in the response + self.headers: Dict[str, str] = { + 'Content-Length': 0 + } + self.pagelet: str = None + # The response body + self.body: bytes = bytes() + # Parsing phase, one of 'begin', 'hdr', 'body' or 'done' + self.parse_phase = 'begin' + # Buffer for uncompleted lines + self.buffer: bytes = bytes() + + def __finalize(self): + self.parse_phase = 'done' + self.pagelet = self.headers.get('X-Test-Pagelet', None) + + def parse(self, fragment: bytes) -> None: + """ + Parse a new fragment of data. This function does nothing if the parsed HTTP response is already complete. + + DO NOT USE THIS OUTSIDE UNIT TESTING! + + :param fragment: The data fragment to parse. + """ + # response packet complete, nothing to do + if self.parse_phase == 'done': + return + # If in the body phase, simply decode and append to the body, while the body is not complete yet + elif self.parse_phase == 'body': + self.body += fragment + if len(self.body) >= int(self.headers['Content-Length']): + self.__finalize() + return + if b'\r\n' not in fragment: + # If the fragment does not contain a CR-LF, add it to the buffer, we only want to parse whole lines + self.buffer = self.buffer + fragment + else: + if not fragment.endswith(b'\r\n'): + # Special treatment for no trailing CR-LF: Add remainder to buffer + head, tail = fragment.rsplit(b'\r\n', 1) + data: bytes = (self.buffer + head) + self.buffer = tail + else: + data: bytes = (self.buffer + fragment) + self.buffer = bytes() + # Iterate the lines that are ready to be parsed + for line in data.split(b'\r\n'): + # The 'begin' phase indicates that the parser is waiting for the HTTP status line + if self.parse_phase == 'begin': + if line.startswith(b'HTTP/'): + # Parse the statuscode and advance to header parsing + _, statuscode, _ = line.decode('utf-8').split(' ', 2) + self.statuscode = int(statuscode) + self.parse_phase = 'hdr' + elif self.parse_phase == 'hdr': + # Parse a header line and add it to the header dict + if len(line) > 0: + k, v = line.decode('utf-8').split(':', 1) + self.headers[k.strip()] = v.strip() + else: + # Empty line separates header from body + self.parse_phase = 'body' + elif self.parse_phase == 'body': + # if there is a remainder in the data packet, it is (part of) the body, add to body string + self.body += line + if len(self.body) >= int(self.headers['Content-Length']): + self.__finalize() + + +class MockServer: + """ + A mock implementation of http.server.HTTPServer. Only used for matemat-specific storage. + """ + + def __init__(self, webroot: str = '/var/matemat/webroot') -> None: + # Session timeout and variables for all sessions + self.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict() + # Webroot for statically served content + self.webroot: str = webroot + # Jinja environment with a single, static template + self.jinja_env = jinja2.Environment( + loader=jinja2.DictLoader({'test.txt': 'Hello, {{ what }}!'}) + ) + + +class MockSocket(bytes): + """ + A mock implementation of a socket.socket for http.server.BaseHTTPRequestHandler. + + The bytes inheritance is due to a broken type annotation in BaseHTTPRequestHandler. + """ + + def __init__(self) -> None: + super().__init__() + # The request string + self.__request = bytes() + # The parsed response + self.__packet = HttpResponse() + + def set_request(self, request: bytes) -> None: + """ + Sets the HTTP request to send to the server. + + :param request: The request + """ + self.__request: bytes = request + + def makefile(self, mode: str, size: int) -> BytesIO: + """ + Required by http.server.HTTPServer. + + :return: A dummy buffer IO object instead of a network socket file handle. + """ + return BytesIO(self.__request) + + def sendall(self, b: bytes) -> None: + """ + Required by http.server.HTTPServer. + + :param b: The data to send to the client. Will be parsed directly instead. + """ + self.__packet.parse(b) + + def get_response(self) -> HttpResponse: + """ + Fetches the parsed HTTP response generated by the server. + + :return: The response object. + """ + return self.__packet + + +def test_pagelet(path: str): + + def with_testing_headers(fun: Callable[[str, + str, + RequestArguments, + Dict[str, Any], + Dict[str, str]], + Union[bytes, str, Tuple[int, str]]]): + @pagelet(path) + def testing_wrapper(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str]): + headers['X-Test-Pagelet'] = fun.__name__ + result = fun(method, path, args, session_vars, headers) + return result + return testing_wrapper + return with_testing_headers + + +class AbstractHttpdTest(ABC, unittest.TestCase): + """ + An abstract test case that can be inherited by test case classes that want to test part of the webserver's core + functionality. + + Usage (subclass test method): + + self.client_sock.set_request(b'GET /just/testing/sessions HTTP/1.1\r\n\r\n') + handler = HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + """ + + def setUp(self) -> None: + self.tempdir: TemporaryDirectory = TemporaryDirectory(prefix='matemat.', dir='/tmp/') + self.server: HTTPServer = MockServer(webroot=self.tempdir.name) + self.client_sock: MockSocket = MockSocket() + + def tearDown(self): + self.tempdir.cleanup() diff --git a/matemat/webserver/test/test_parse_request.py b/matemat/webserver/test/test_parse_request.py new file mode 100644 index 0000000..d144ac1 --- /dev/null +++ b/matemat/webserver/test/test_parse_request.py @@ -0,0 +1,411 @@ + +import unittest + +from matemat.webserver.util import parse_args + + +class TestParseRequest(unittest.TestCase): + + def test_parse_get_root(self): + """ + Test that the simple root path is parsed correctly ('/' path, no args). + """ + path, args = parse_args('/') + self.assertEqual('/', path) + self.assertEqual(0, len(args)) + + def test_parse_get_no_args(self): + """ + Test that a GET request without arguments is parsed correctly. + """ + path, args = parse_args('/index.html') + self.assertEqual('/index.html', path) + self.assertEqual(0, len(args)) + + def test_parse_get_root_getargs(self): + """ + Test that a GET request for '/' with scalar arguments is parsed correctly. + """ + path, args = parse_args('/?foo=42&bar=1337&baz=Hello,%20World!') + self.assertEqual('/', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('text/plain', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual('1337', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_get_getargs(self): + """ + Test that a GET request for an arbitrary path with scalar arguments is parsed correctly. + """ + path, args = parse_args('/abc/def?foo=42&bar=1337&baz=Hello,%20World!') + self.assertEqual('/abc/def', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('text/plain', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual('1337', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_get_getarray(self): + """ + Test that a GET request with mixed scalar and array arguments is parsed correctly. + """ + path, args = parse_args('/abc/def?foo=42&foo=1337&baz=Hello,%20World!') + self.assertEqual('/abc/def', path) + self.assertEqual(2, len(args)) + self.assertIn('foo', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_array) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual(2, len(args['foo'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + + def test_parse_get_zero_arg(self): + """ + Test that a GET request with an empty argument is parsed correctly. + """ + path, args = parse_args('/abc/def?foo=&bar=42') + self.assertEqual(2, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertEqual(1, len(args['foo'])) + self.assertEqual('', args['foo'].get_str()) + self.assertEqual('42', args['bar'].get_str()) + + def test_parse_get_urlencoded_encoding_fail(self): + """ + Test that a GET request with non-decodable escape sequences fails. + """ + with self.assertRaises(ValueError): + parse_args('/?foo=42&bar=%80&baz=Hello,%20World!') + + def test_parse_post_urlencoded(self): + """ + Test that a urlencoded POST request with scalar arguments is parsed correctly. + """ + path, args = parse_args('/', + postbody=b'foo=42&bar=1337&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + self.assertEqual('/', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('text/plain', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual('1337', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_post_urlencoded_array(self): + """ + Test that a urlencoded POST request with mixed scalar and array arguments is parsed correctly. + """ + path, args = parse_args('/', + postbody=b'foo=42&foo=1337&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + self.assertEqual('/', path) + self.assertEqual(2, len(args)) + self.assertIn('foo', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_array) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual(2, len(args['foo'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + + def test_parse_post_urlencoded_zero_arg(self): + """ + Test that a urlencoded POST request with an empty argument is parsed correctly. + """ + path, args = parse_args('/abc/def', postbody=b'foo=&bar=42', enctype='application/x-www-form-urlencoded') + self.assertEqual(2, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertEqual(1, len(args['foo'])) + self.assertEqual('', args['foo'].get_str()) + self.assertEqual('42', args['bar'].get_str()) + + def test_parse_post_urlencoded_encoding_fail(self): + """ + Test that a urlencoded POST request with a non-decodable escape sequence fails. + """ + with self.assertRaises(ValueError): + parse_args('/', + postbody=b'foo=42&bar=%80&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + + def test_parse_post_multipart_no_args(self): + """ + Test that a multipart POST request with no arguments is parsed correctly. + """ + path, args = parse_args('/', + postbody=b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertEqual('/', path) + self.assertEqual(0, len(args)) + + def test_parse_post_multipart(self): + """ + Test that a multipart POST request with scalar arguments is parsed correctly. + """ + path, args = parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertEqual('/', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('application/octet-stream', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual(b'1337', args['bar'].get_bytes()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_post_multipart_zero_arg(self): + """ + Test that a multipart POST request with an empty argument is parsed correctly. + """ + path, args = parse_args('/abc/def', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertEqual(2, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertEqual(1, len(args['foo'])) + self.assertEqual('', args['foo'].get_str()) + self.assertEqual('42', args['bar'].get_str()) + + def test_parse_post_multipart_no_contenttype(self): + """ + Test that the Content-Type is set to 'application/octet-stream' if it is absent from the multipart header. + """ + path, args = parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertEqual('/', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('application/octet-stream', args['baz'].get_content_type()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_post_multipart_broken_boundaries(self): + """ + Test that multiple cases with broken multipart boundaries fail. + """ + with self.assertRaises(ValueError): + # Boundary not defined in Content-Type + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data') + with self.assertRaises(ValueError): + # Corrupted "--" head at first boundary + parse_args('/', + postbody=b'-+testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Missing "--" tail at end boundary + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Missing Content-Disposition header in one part + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Missing form-data name argument + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Unknown Content-Disposition + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: attachment; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + + def test_get_post_precedence_urlencoded(self): + """ + Test the precedence of urlencoded POST arguments over GET arguments. + """ + path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived', + postbody=b'foo=42&foo=1337&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertEqual(2, len(args['foo'])) + self.assertEqual(1, len(args['bar'])) + self.assertEqual(1, len(args['baz'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + self.assertEqual('isurvived', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_get_post_precedence_multipart(self): + """ + Test the precedence of multipart POST arguments over GET arguments. + """ + path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"; filename="bar.bin"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertEqual(2, len(args['foo'])) + self.assertEqual(1, len(args['bar'])) + self.assertEqual(1, len(args['baz'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + self.assertEqual('isurvived', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) diff --git a/matemat/webserver/test/test_post.py b/matemat/webserver/test/test_post.py new file mode 100644 index 0000000..e105354 --- /dev/null +++ b/matemat/webserver/test/test_post.py @@ -0,0 +1,224 @@ + +from typing import Any, Dict, List + +from matemat.webserver import HttpHandler, RequestArguments +from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet + +import codecs + + +@test_pagelet('/just/testing/post') +def post_test_pagelet(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str]): + """ + Test pagelet that simply prints the parsed arguments as response body. + """ + headers['Content-Type'] = 'text/plain' + dump: str = '' + for ra in args: + for a in ra: + if a.get_content_type().startswith('text/'): + dump += f'{a.name}: {a.get_str()}\n' + else: + dump += f'{a.name}: {codecs.encode(a.get_bytes(), "hex").decode("utf-8")}\n' + return dump + + +class TestPost(AbstractHttpdTest): + """ + Test cases for the content serving of the web server. + """ + + def test_post_urlenc_get_only_args(self): + """ + Test a POST request that only contains GET arguments. + """ + # Send POST request + self.client_sock.set_request(b'POST /just/testing/post?foo=bar&test=1 HTTP/1.1\r\n' + b'Content-Length: 0\r\n' + b'Content-Type: application/x-www-form-urlencoded\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Parse response body + lines: List[bytes] = packet.body.split(b'\n')[:-1] + kv: Dict[str, str] = dict() + for l in lines: + k, v = l.decode('utf-8').split(':', 1) + kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') + + # Make sure the arguments were properly parsed + self.assertEqual('bar', kv['foo']) + self.assertEqual('1', kv['test']) + + def test_post_urlenc_post_only_args(self): + """ + Test a POST request that only contains POST arguments (urlencoded). + """ + # Send POST request + self.client_sock.set_request(b'POST /just/testing/post HTTP/1.1\r\n' + b'Content-Type: application/x-www-form-urlencoded\r\n' + b'Content-Length: 14\r\n\r\n' + b'foo=bar&test=1\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Parse response body + lines: List[bytes] = packet.body.split(b'\n')[:-1] + kv: Dict[str, str] = dict() + for l in lines: + k, v = l.decode('utf-8').split(':', 1) + kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') + + # Make sure the arguments were properly parsed + self.assertEqual('bar', kv['foo']) + self.assertEqual('1', kv['test']) + + def test_post_urlenc_mixed_args(self): + """ + Test that mixed POST and GET args are properly parsed, and that POST takes precedence over GET. + """ + # Send POST request + self.client_sock.set_request(b'POST /just/testing/post?gettest=1&foo=baz HTTP/1.1\r\n' + b'Content-Type: application/x-www-form-urlencoded\r\n' + b'Content-Length: 18\r\n\r\n' + b'foo=bar&posttest=2\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Parse response body + lines: List[bytes] = packet.body.split(b'\n')[:-1] + kv: Dict[str, str] = dict() + for l in lines: + k, v = l.decode('utf-8').split(':', 1) + kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') + + # Make sure the arguments were properly parsed + self.assertEqual('bar', kv['foo']) + self.assertEqual('1', kv['gettest']) + self.assertEqual('2', kv['posttest']) + + def test_post_urlenc_get_array(self): + """ + Test a POST request that contains GET array arguments. + """ + # Send POST request + self.client_sock.set_request(b'POST /just/testing/post?foo=bar&test=1&foo=baz HTTP/1.1\r\n' + b'Content-Length: 0\r\n' + b'Content-Type: application/x-www-form-urlencoded\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Parse response body + lines: List[bytes] = packet.body.split(b'\n')[:-1] + kv: Dict[str, str] = dict() + for l in lines: + k, v = l.decode('utf-8').split(':', 1) + k = k.strip() + v = v.strip() + if k in kv: + kv[k] += f',{v}' + else: + kv[k] = v + # Make sure the arguments were properly parsed + self.assertEqual('bar,baz', kv['foo']) + self.assertEqual('1', kv['test']) + + def test_post_urlenc_post_array(self): + """ + Test a POST request that contains POST array arguments. + """ + # Send POST request + self.client_sock.set_request(b'POST /just/testing/post HTTP/1.1\r\n' + b'Content-Length: 22\r\n' + b'Content-Type: application/x-www-form-urlencoded\r\n\r\n' + b'foo=bar&test=1&foo=baz\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Parse response body + lines: List[bytes] = packet.body.split(b'\n')[:-1] + kv: Dict[str, str] = dict() + for l in lines: + k, v = l.decode('utf-8').split(':', 1) + k = k.strip() + v = v.strip() + if k in kv: + kv[k] += f',{v}' + else: + kv[k] = v + # Make sure the arguments were properly parsed + self.assertEqual('bar,baz', kv['foo']) + self.assertEqual('1', kv['test']) + + def test_post_urlenc_mixed_array(self): + """ + Test a POST request that contains both GET and POST array arguments. + """ + # Send POST request + self.client_sock.set_request(b'POST /just/testing/post?foo=getbar&gettest=1&gettest=42&foo=getbaz HTTP/1.1\r\n' + b'Content-Length: 45\r\n' + b'Content-Type: application/x-www-form-urlencoded\r\n\r\n' + b'foo=postbar&posttest=1&posttest=2&foo=postbaz\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Parse response body + lines: List[bytes] = packet.body.split(b'\n')[:-1] + kv: Dict[str, str] = dict() + for l in lines: + k, v = l.decode('utf-8').split(':', 1) + k = k.strip() + v = v.strip() + if k in kv: + kv[k] += f',{v}' + else: + kv[k] = v + # Make sure the arguments were properly parsed + self.assertEqual('postbar,postbaz', kv['foo']) + self.assertEqual('1,42', kv['gettest']) + self.assertEqual('1,2', kv['posttest']) + + def test_post_no_body(self): + """ + Test a POST request that contains no headers or body. + """ + # Send POST request + self.client_sock.set_request(b'POST /just/testing/post?foo=bar HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + # Make sure a 400 Bad Request is returned + self.assertEqual(400, packet.statuscode) + + def test_post_multipart_post_only(self): + """ + Test a POST request with a miutipart/form-data body. + """ + # Send POST request + formdata = (b'------testboundary\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'------testboundary\r\n' + b'Content-Disposition: form-data; name="bar"; filename="foo.bar"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x80\x0b\x0c\x73\x0e\x0f\r\n' + b'------testboundary--\r\n') + + self.client_sock.set_request(f'POST /just/testing/post HTTP/1.1\r\n' + f'Content-Type: multipart/form-data; boundary=----testboundary\r\n' + f'Content-Length: {len(formdata)}\r\n\r\n'.encode('utf-8') + formdata) + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + lines: List[bytes] = packet.body.split(b'\n')[:-1] + kv: Dict[str, Any] = dict() + for l in lines: + k, v = l.split(b':', 1) + kv[k.decode('utf-8').strip()] = v.strip() + self.assertIn('foo', kv) + self.assertIn('bar', kv) + self.assertEqual(kv['foo'], b'Hello, World!') + self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f') diff --git a/matemat/webserver/test/test_requestargs.py b/matemat/webserver/test/test_requestargs.py new file mode 100644 index 0000000..3e093a2 --- /dev/null +++ b/matemat/webserver/test/test_requestargs.py @@ -0,0 +1,529 @@ + +from typing import Dict, List, Set, Tuple + +import unittest +import urllib.parse + +from matemat.webserver import RequestArgument, RequestArguments +# noinspection PyProtectedMember +from matemat.webserver.requestargs import _View + + +class TestRequestArguments(unittest.TestCase): + """ + Test cases for the RequestArgument class. + """ + + def test_create_default(self): + """ + Test creation of an empty RequestArgument + """ + ra = RequestArgument('foo') + # Name must be set to 1st argument + self.assertEqual('foo', ra.name) + # Must be a 0-length array + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + self.assertTrue(ra.is_array) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_create_str_scalar(self): + """ + Test creation of a scalar RequestArgument with string value. + """ + ra = RequestArgument('foo', ('text/plain', 'bar')) + # Name must be set to 1st argument + self.assertEqual('foo', ra.name) + # Must be a scalar, length 1 + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertFalse(ra.is_array) + # Scalar value must be representable both as str and bytes + self.assertEqual('bar', ra.get_str()) + self.assertEqual(b'bar', ra.get_bytes()) + # Content-Type must be set correctly + self.assertEqual('text/plain', ra.get_content_type()) + # Using 0 indices must yield the same results + self.assertEqual('bar', ra.get_str(0)) + self.assertEqual(b'bar', ra.get_bytes(0)) + self.assertEqual('text/plain', ra.get_content_type(0)) + # Using other indices must result in an error + with self.assertRaises(IndexError): + ra.get_str(1) + with self.assertRaises(IndexError): + ra.get_bytes(1) + with self.assertRaises(IndexError): + ra.get_content_type(1) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_create_str_scalar_array(self): + """ + Test creation of a scalar RequestArgument with string value, passing an array instead of a single tuple. + """ + ra = RequestArgument('foo', [('text/plain', 'bar')]) + # Name must be set to 1st argument + self.assertEqual('foo', ra.name) + # Must be a scalar, length 1 + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertFalse(ra.is_array) + # Scalar value must be representable both as str and bytes + self.assertEqual('bar', ra.get_str()) + self.assertEqual(b'bar', ra.get_bytes()) + # Content-Type must be set correctly + self.assertEqual('text/plain', ra.get_content_type()) + # Using 0 indices must yield the same results + self.assertEqual('bar', ra.get_str(0)) + self.assertEqual(b'bar', ra.get_bytes(0)) + self.assertEqual('text/plain', ra.get_content_type(0)) + # Using other indices must result in an error + with self.assertRaises(IndexError): + ra.get_str(1) + with self.assertRaises(IndexError): + ra.get_bytes(1) + with self.assertRaises(IndexError): + ra.get_content_type(1) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_create_bytes_scalar(self): + """ + Test creation of a scalar RequestArgument with bytes value. + """ + ra = RequestArgument('foo', ('application/octet-stream', b'\x00\x80\xff\xfe')) + # Name must be set to 1st argument + self.assertEqual('foo', ra.name) + # Must be a scalar, length 1 + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertFalse(ra.is_array) + # Conversion to UTF-8 string must fail; bytes representation must work + with self.assertRaises(UnicodeDecodeError): + ra.get_str() + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes()) + # Content-Type must be set correctly + self.assertEqual('application/octet-stream', ra.get_content_type()) + # Using 0 indices must yield the same results + with self.assertRaises(UnicodeDecodeError): + ra.get_str(0) + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(0)) + self.assertEqual('application/octet-stream', ra.get_content_type(0)) + # Using other indices must result in an error + with self.assertRaises(IndexError): + ra.get_str(1) + with self.assertRaises(IndexError): + ra.get_bytes(1) + with self.assertRaises(IndexError): + ra.get_content_type(1) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_create_array(self): + """ + Test creation of an array RequestArgument with mixed str and bytes initial value. + """ + ra = RequestArgument('foo', [ + ('text/plain', 'bar'), + ('application/octet-stream', b'\x00\x80\xff\xfe') + ]) + # Name must be set to 1st argument + self.assertEqual('foo', ra.name) + # Must be an array, length 2 + self.assertEqual(2, len(ra)) + self.assertFalse(ra.is_scalar) + self.assertTrue(ra.is_array) + # Retrieving values without an index must yield the first element + self.assertEqual('bar', ra.get_str()) + self.assertEqual(b'bar', ra.get_bytes()) + self.assertEqual('text/plain', ra.get_content_type()) + # The first value must be representable both as str and bytes, and have ctype text/plain + self.assertEqual('bar', ra.get_str(0)) + self.assertEqual(b'bar', ra.get_bytes(0)) + self.assertEqual('text/plain', ra.get_content_type(0)) + # Conversion of the second value to UTF-8 string must fail; bytes representation must work + with self.assertRaises(UnicodeDecodeError): + ra.get_str(1) + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1)) + # The second value's ctype must be correct + self.assertEqual('application/octet-stream', ra.get_content_type(1)) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_append_empty_str(self): + """ + Test appending a str value to an empty RequestArgument. + """ + # Initialize the empty RequestArgument + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + # Append a string value + ra.append('text/plain', 'bar') + # New length must be 1, empty array must be converted to scalar + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + # Retrieval of the new value must work both in str and bytes representation + self.assertEqual('bar', ra.get_str()) + self.assertEqual(b'bar', ra.get_bytes()) + # Content type of the new value must be correct + self.assertEqual('text/plain', ra.get_content_type()) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_append_empty_bytes(self): + """ + Test appending a bytes value to an empty RequestArgument. + """ + # Initialize the empty RequestArgument + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + # Append a bytes value + ra.append('application/octet-stream', b'\x00\x80\xff\xfe') + # New length must be 1, empty array must be converted to scalar + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + # Conversion of the new value to UTF-8 string must fail; bytes representation must work + with self.assertRaises(UnicodeDecodeError): + ra.get_str() + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes()) + # Content type of the new value must be correct + self.assertEqual('application/octet-stream', ra.get_content_type()) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_append_multiple(self): + """ + Test appending multiple values to an empty RequestArgument. + """ + # Initialize the empty RequestArgument + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + # Append a first value + ra.append('text/plain', 'bar') + # New length must be 1, empty array must be converted to scalar + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertEqual(b'bar', ra.get_bytes()) + + # Append a second value + ra.append('application/octet-stream', b'\x00\x80\xff\xfe') + # New length must be 2, scalar must be converted to array + self.assertEqual(2, len(ra)) + self.assertFalse(ra.is_scalar) + self.assertEqual(b'bar', ra.get_bytes(0)) + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1)) + + # Append a third value + ra.append('text/plain', 'Hello, World!') + # New length must be 3, array must remain array + self.assertEqual(3, len(ra)) + self.assertFalse(ra.is_scalar) + self.assertEqual(b'bar', ra.get_bytes(0)) + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1)) + self.assertEqual(b'Hello, World!', ra.get_bytes(2)) + + def test_clear_empty(self): + """ + Test clearing an empty RequestArgument. + """ + # Initialize the empty RequestArgument + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + ra.clear() + # Clearing an empty RequestArgument shouldn't have any effect + self.assertEqual('foo', ra.name) + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + def test_clear_scalar(self): + """ + Test clearing a scalar RequestArgument. + """ + # Initialize the scalar RequestArgument + ra = RequestArgument('foo', ('text/plain', 'bar')) + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + ra.clear() + # Clearing a scalar RequestArgument should reduce its size to 0 + self.assertEqual('foo', ra.name) + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + with self.assertRaises(IndexError): + ra.get_str() + + def test_clear_array(self): + """ + Test clearing an array RequestArgument. + """ + # Initialize the array RequestArgument + ra = RequestArgument('foo', [ + ('text/plain', 'bar'), + ('application/octet-stream', b'\x00\x80\xff\xfe'), + ('text/plain', 'baz'), + ]) + self.assertEqual(3, len(ra)) + self.assertFalse(ra.is_scalar) + ra.clear() + # Clearing an array RequestArgument should reduce its size to 0 + self.assertEqual('foo', ra.name) + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + with self.assertRaises(IndexError): + ra.get_str() + + def test_iterate_empty(self): + """ + Test iterating an empty RequestArgument. + """ + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + # No value must be yielded from iterating an empty instance + for _ in ra: + self.fail() + + def test_iterate_scalar(self): + """ + Test iterating a scalar RequestArgument. + """ + ra = RequestArgument('foo', ('text/plain', 'bar')) + self.assertTrue(ra.is_scalar) + # Counter for the number of iterations + count: int = 0 + for it in ra: + # Make sure the yielded value is a scalar view and has the same name as the original instance + self.assertIsInstance(it, _View) + self.assertTrue(it.is_view) + self.assertEqual('foo', it.name) + self.assertTrue(it.is_scalar) + count += 1 + # Only one value must be yielded from iterating a scalar instance + self.assertEqual(1, count) + + def test_iterate_array(self): + """ + Test iterating an array RequestArgument. + """ + ra = RequestArgument('foo', [('text/plain', 'bar'), ('abc', b'def'), ('xyz', '1337')]) + self.assertFalse(ra.is_scalar) + # Container to put the iterated ctypes into + items: List[str] = list() + for it in ra: + # Make sure the yielded values are scalar views and have the same name as the original instance + self.assertIsInstance(it, _View) + self.assertTrue(it.is_view) + self.assertTrue(it.is_scalar) + # Collect the value's ctype + items.append(it.get_content_type()) + # Compare collected ctypes with expected result + self.assertEqual(['text/plain', 'abc', 'xyz'], items) + + def test_slice(self): + """ + Test slicing an array RequestArgument. + """ + ra = RequestArgument('foo', [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h'), ('i', 'j'), ('k', 'l')]) + # Create the sliced view + sliced = ra[1:4:2] + # Make sure the sliced value is a view + self.assertIsInstance(sliced, _View) + self.assertTrue(sliced.is_view) + # Make sure the slice has the same name + self.assertEqual('foo', sliced.name) + # Make sure the slice has the expected shape (array of the 2nd and 4th scalar in the original) + self.assertTrue(sliced.is_array) + self.assertEqual(2, len(sliced)) + self.assertEqual('d', sliced.get_str(0)) + self.assertEqual('h', sliced.get_str(1)) + + def test_iterate_sliced(self): + """ + Test iterating a sliced array RequestArgument. + """ + ra = RequestArgument('foo', [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h'), ('i', 'j'), ('k', 'l')]) + # Container to put the iterated ctypes into + items: List[str] = list() + # Iterate the sliced view + for it in ra[1:4:2]: + # Make sure the yielded values are scalar views and have the same name as the original instance + self.assertIsInstance(it, _View) + self.assertTrue(it.is_view) + self.assertEqual('foo', it.name) + self.assertTrue(it.is_scalar) + items.append(it.get_content_type()) + # Make sure the expected values are collected (array of the 2nd and 4th scalar in the original) + self.assertEqual(['c', 'g'], items) + + def test_index_scalar(self): + """ + Test indexing of a scalar RequestArgument. + """ + ra = RequestArgument('foo', ('bar', 'baz')) + # Index the scalar RequestArgument instance, obtaining an immutable view + it = ra[0] + # Make sure the value is a scalar view with the same properties as the original instance + self.assertIsInstance(it, _View) + self.assertTrue(it.is_scalar) + self.assertEqual('foo', it.name) + self.assertEqual('bar', it.get_content_type()) + self.assertEqual('baz', it.get_str()) + # Make sure other indices don't work + with self.assertRaises(IndexError): + _ = ra[1] + + def test_index_array(self): + """ + Test indexing of an array RequestArgument. + """ + ra = RequestArgument('foo', [('a', 'b'), ('c', 'd')]) + # Index the array RequestArgument instance, obtaining an immutable view + it = ra[1] + # Make sure the value is a scalar view with the same properties as the value in the original instance + self.assertIsInstance(it, _View) + self.assertEqual('foo', it.name) + self.assertEqual('c', it.get_content_type()) + self.assertEqual('d', it.get_str()) + + def test_view_immutable(self): + """ + Test immutability of views. + """ + ra = RequestArgument('foo', ('bar', 'baz')) + # Index the scalar RequestArgument instance, obtaining an immutable view + it = ra[0] + # Make sure the returned value is a view + self.assertIsInstance(it, _View) + # Make sure the returned value is immutable + with self.assertRaises(TypeError): + it.append('foo', 'bar') + with self.assertRaises(TypeError): + it.clear() + + def test_str_shorthand(self): + """ + Test the shorthand for get_str(0). + """ + ra = RequestArgument('foo', ('bar', 'baz')) + self.assertEqual('baz', str(ra)) + + def test_bytes_shorthand(self): + """ + Test the shorthand for get_bytes(0). + """ + ra = RequestArgument('foo', ('bar', b'\x00\x80\xff\xfe')) + self.assertEqual(b'\x00\x80\xff\xfe', bytes(ra)) + + # noinspection PyTypeChecker + def test_insert_garbage(self): + """ + Test proper handling with non-int indices and non-str/non-bytes data + :return: + """ + ra = RequestArgument('foo', 42) + with self.assertRaises(TypeError): + str(ra) + ra = RequestArgument('foo', (None, 42)) + with self.assertRaises(TypeError): + str(ra) + with self.assertRaises(TypeError): + bytes(ra) + with self.assertRaises(TypeError): + ra.get_content_type() + with self.assertRaises(TypeError): + ra.get_str('foo') + with self.assertRaises(TypeError): + ra.get_bytes('foo') + with self.assertRaises(TypeError): + ra.get_content_type('foo') + + def test_requestarguments_index(self): + """ + Make sure indexing a RequestArguments instance creates a new entry on the fly. + """ + ra = RequestArguments() + self.assertEqual(0, len(ra)) + self.assertFalse('foo' in ra) + # Create new entry + _ = ra['foo'] + self.assertEqual(1, len(ra)) + self.assertTrue('foo' in ra) + # Already exists, no new entry created + _ = ra['foo'] + self.assertEqual(1, len(ra)) + # Entry must be empty and mutable, and have the correct name + self.assertFalse(ra['foo'].is_view) + self.assertEqual(0, len(ra['foo'])) + self.assertEqual('foo', ra['foo'].name) + # Key must be a string + with self.assertRaises(TypeError): + # noinspection PyTypeChecker + _ = ra[42] + + def test_requestarguments_attr(self): + """ + Test attribute access syntactic sugar. + """ + ra = RequestArguments() + # Attribute should not exist yet + with self.assertRaises(KeyError): + _ = ra.foo + # Create entry + _ = ra['foo'] + # Creating entry should have created the attribute + self.assertEqual('foo', ra.foo.name) + # Attribute access should yield an immutable view + self.assertTrue(ra.foo.is_view) + + def test_requestarguments_iterate(self): + """ + Test iterating a RequestArguments instance. + """ + # Create an instance with some values + ra = RequestArguments() + ra['foo'].append('a', 'b') + ra['bar'].append('c', 'd') + ra['foo'].append('e', 'f') + # Container for test values (name, value) + items: Set[Tuple[str, str]] = set() + # Iterate RequestArguments instance, adding the name and value of each to the set + for a in ra: + items.add((a.name, str(a))) + # Compare result with expected value + self.assertEqual(2, len(items)) + self.assertIn(('foo', 'b'), items) + self.assertIn(('bar', 'd'), items) + + def test_requestarguments_full_use_case(self): + """ + Simulate a minimal RequestArguments use case. + """ + # Create empty RequestArguments instance + ra = RequestArguments() + # Parse GET request + getargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=42&bar=1337&foo=43&baz=Hello,%20World!') + # Insert GET arguments into RequestArguments + for k, vs in getargs.items(): + for v in vs: + ra[k].append('text/plain', v) + # Parse POST request + postargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=postfoo&postbar=42&foo=postfoo') + # Insert POST arguments into RequestArguments + for k, vs in postargs.items(): + # In this implementation, POST args replace GET args + ra[k].clear() + for v in vs: + ra[k].append('text/plain', v) + + # Someplace else: Use the RequestArguments instance. + self.assertEqual('1337', ra.bar.get_str()) + self.assertEqual('Hello, World!', ra.baz.get_str()) + self.assertEqual('42', ra.postbar.get_str()) + for a in ra.foo: + self.assertEqual('postfoo', a.get_str()) diff --git a/matemat/webserver/test/test_serve.py b/matemat/webserver/test/test_serve.py new file mode 100644 index 0000000..7b0b61d --- /dev/null +++ b/matemat/webserver/test/test_serve.py @@ -0,0 +1,195 @@ + +from typing import Any, Dict, Union + +import os +import os.path +from matemat.exceptions import HttpException +from matemat.webserver import HttpHandler, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse +from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet + + +@test_pagelet('/just/testing/serve_pagelet_str') +def serve_test_pagelet_str(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]: + headers['Content-Type'] = 'text/plain' + return 'serve test pagelet str' + + +@test_pagelet('/just/testing/serve_pagelet_bytes') +def serve_test_pagelet_bytes(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]: + headers['Content-Type'] = 'application/octet-stream' + return b'serve\x80test\xffpagelet\xfebytes' + + +@test_pagelet('/just/testing/serve_pagelet_redirect') +def serve_test_pagelet_redirect(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]: + return RedirectResponse('/foo/bar') + + +@test_pagelet('/just/testing/serve_pagelet_template') +def serve_test_pagelet_template(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]: + headers['Content-Type'] = 'text/plain' + return TemplateResponse('test.txt', what='World') + + +# noinspection PyTypeChecker +@test_pagelet('/just/testing/serve_pagelet_fail') +def serve_test_pagelet_fail(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str]) -> Union[bytes, str, PageletResponse]: + session_vars['test'] = 'hello, world!' + headers['Content-Type'] = 'text/plain' + raise HttpException() + + +class TestServe(AbstractHttpdTest): + """ + Test cases for the content serving of the web server. + """ + + def setUp(self): + super().setUp() + # Create a static resource in the temp dir + with open(os.path.join(self.tempdir.name, 'static_resource.txt'), 'w') as f: + f.write('static resource test') + # Create a second static resource chmodded to 0000, to test 403 Forbidden error + forbidden: str = os.path.join(self.tempdir.name, 'forbidden_static_resource.txt') + with open(forbidden, 'w') as f: + f.write('This should not be readable') + os.chmod(forbidden, 0) + + def test_serve_pagelet_str(self): + # Call the test pagelet that produces a 200 OK result + self.client_sock.set_request(b'GET /just/testing/serve_pagelet_str HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Make sure the correct pagelet was called + self.assertEqual('serve_test_pagelet_str', packet.pagelet) + # Make sure the expected content is served + self.assertEqual(200, packet.statuscode) + self.assertEqual(b'serve test pagelet str', packet.body) + + def test_serve_pagelet_bytes(self): + # Call the test pagelet that produces a 200 OK result + self.client_sock.set_request(b'GET /just/testing/serve_pagelet_bytes HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Make sure the correct pagelet was called + self.assertEqual('serve_test_pagelet_bytes', packet.pagelet) + # Make sure the expected content is served + self.assertEqual(200, packet.statuscode) + self.assertEqual(b'serve\x80test\xffpagelet\xfebytes', packet.body) + + def test_serve_pagelet_fail(self): + # Call the test pagelet that produces a 500 Internal Server Error result + self.client_sock.set_request(b'GET /just/testing/serve_pagelet_fail HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Make sure an error is raised + self.assertEqual(500, packet.statuscode) + + def test_serve_pagelet_redirect(self): + # Call the test pagelet that redirects to another path + self.client_sock.set_request(b'GET /just/testing/serve_pagelet_redirect HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Make sure the correct pagelet was called + self.assertEqual('serve_test_pagelet_redirect', packet.pagelet) + # Make sure the correct redirect is issued + self.assertEqual(301, packet.statuscode) + self.assertEqual('/foo/bar', packet.headers['Location']) + # Make sure the response body is empty + self.assertEqual(0, len(packet.body)) + + def test_serve_pagelet_template(self): + # Call the test pagelet that redirects to another path + self.client_sock.set_request(b'GET /just/testing/serve_pagelet_template HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Make sure the correct pagelet was called + self.assertEqual('serve_test_pagelet_template', packet.pagelet) + self.assertEqual(200, packet.statuscode) + # Make sure the response body was rendered correctly by the templating engine + self.assertEqual(b'Hello, World!', packet.body) + + def test_serve_static_ok(self): + # Request a static resource + self.client_sock.set_request(b'GET /static_resource.txt HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Make sure that no pagelet was called + self.assertIsNone(packet.pagelet) + # Make sure the expected content is served + self.assertEqual(200, packet.statuscode) + self.assertEqual(b'static resource test', packet.body) + + def test_serve_static_forbidden(self): + # Request a static resource with lacking permissions + self.client_sock.set_request(b'GET /forbidden_static_resource.txt HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Make sure that no pagelet was called + self.assertIsNone(packet.pagelet) + # Make sure a 403 header is served + self.assertEqual(403, packet.statuscode) + self.assertNotEqual(b'This should not be readable', packet.body) + + def test_serve_not_found(self): + # Request a nonexistent resource + self.client_sock.set_request(b'GET /nonexistent HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Make sure that no pagelet was called + self.assertIsNone(packet.pagelet) + # Make sure a 404 header is served + self.assertEqual(404, packet.statuscode) + + def test_serve_directory_traversal(self): + # Request a resource outside the webroot + self.client_sock.set_request(b'GET /../../../../../etc/passwd HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Make sure that no pagelet was called + self.assertIsNone(packet.pagelet) + # Make sure a 404 header is served + self.assertEqual(404, packet.statuscode) + + def test_static_post_not_allowed(self): + # Request a resource outside the webroot + self.client_sock.set_request(b'POST /iwanttopostthis HTTP/1.1\r\n' + b'Content-Type: application/x-www-form-urlencoded\r\n' + b'Content-length: 37\r\n\r\n' + b'q=this%20should%20not%20be%20uploaded') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + + # Make sure that no pagelet was called + self.assertIsNone(packet.pagelet) + # Make sure a 405 Method Not Allowed header is served + self.assertEqual(405, packet.statuscode) diff --git a/matemat/webserver/test/test_session.py b/matemat/webserver/test/test_session.py new file mode 100644 index 0000000..2e64ceb --- /dev/null +++ b/matemat/webserver/test/test_session.py @@ -0,0 +1,160 @@ + +from typing import Any, Dict + +from datetime import datetime, timedelta +from time import sleep + +from matemat.webserver import HttpHandler, RequestArguments +from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet + + +@test_pagelet('/just/testing/sessions') +def session_test_pagelet(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str]): + session_vars['test'] = 'hello, world!' + headers['Content-Type'] = 'text/plain' + return 'session test' + + +class TestSession(AbstractHttpdTest): + """ + Test session handling of the Matemat webserver. + """ + + def test_create_new_session(self): + # Reference date to make sure the session expiry lies in the future + refdate: datetime = datetime.utcnow() + timedelta(seconds=3500) + # Send a mock GET request for '/just/testing/sessions' + self.client_sock.set_request(b'GET /just/testing/sessions HTTP/1.1\r\n\r\n') + # Trigger request handling + handler = HttpHandler(self.client_sock, ('::1', 45678), self.server) + # Fetch the parsed response + packet = self.client_sock.get_response() + # Make sure a full HTTP response was parsed + self.assertEqual('done', packet.parse_phase) + # Make sure the request was served by the test pagelet + self.assertEqual('session_test_pagelet', packet.pagelet) + self.assertEqual(200, packet.statuscode) + + session_id: str = list(handler.server.session_vars.keys())[0] + # Make sure a cookie was set - assuming that only one was set + self.assertIn('Set-Cookie', packet.headers) + # Split into the cookie itself + cookie, expiry = packet.headers['Set-Cookie'].split(';') + cookie: str = cookie.strip() + expiry: str = expiry.strip() + # Make sure the 'matemat_session_id' cookie was set to the session ID string + self.assertEqual(f'matemat_session_id={session_id}', cookie) + # Make sure the session expires in about one hour + self.assertTrue(expiry.startswith('expires=')) + _, expdatestr = expiry.split('=', 1) + expdate = datetime.strptime(expdatestr, '%a, %d %b %Y %H:%M:%S GMT') + self.assertTrue(expdate > refdate) + # Make sure the session exists on the server + self.assertIn('test', handler.session_vars) + self.assertEqual('hello, world!', handler.session_vars['test']) + + def test_resume_session(self): + # Test session expiry date + refdate: datetime = datetime.utcnow() + timedelta(hours=1) + # Session ID for testing + session_id: str = 'testsessionid' + # Insert test session + self.server.session_vars[session_id] = refdate, {'test': 'bar'} + sleep(2) + + # Send a mock GET request for '/just/testing/sessions' with a matemat session cookie + self.client_sock.set_request( + f'GET /just/testing/sessions HTTP/1.1\r\nCookie: matemat_session_id={session_id}\r\n\r\n'.encode('utf-8')) + # Trigger request handling + handler = HttpHandler(self.client_sock, ('::1', 45678), self.server) + # Fetch the parsed response + packet = self.client_sock.get_response() + # Make sure a full HTTP response was parsed + self.assertEqual('done', packet.parse_phase) + # Make sure the request was served by the test pagelet + self.assertEqual('session_test_pagelet', packet.pagelet) + self.assertEqual(200, packet.statuscode) + + response_session_id: str = list(handler.server.session_vars.keys())[0] + # Make sure a cookie was set - assuming that only one was set + self.assertIn('Set-Cookie', packet.headers) + # Split into the cookie itself + cookie, expiry = packet.headers['Set-Cookie'].split(';') + cookie: str = cookie.strip() + expiry: str = expiry.strip() + # Make sure the 'matemat_session_id' cookie was set to the session ID string + self.assertEqual(f'matemat_session_id={response_session_id}', cookie) + # Make sure the session ID matches the one we sent along + self.assertEqual(session_id, response_session_id) + # Make sure the session timeout was postponed + self.assertTrue(expiry.startswith('expires=')) + _, expdatestr = expiry.split('=', 1) + expdate = datetime.strptime(expdatestr, '%a, %d %b %Y %H:%M:%S GMT') + self.assertTrue(expdate > refdate) + # Make sure the session exists on the server + self.assertIn('test', handler.session_vars) + self.assertEqual('hello, world!', handler.session_vars['test']) + + def test_unknown_session_id(self): + # Unknown session ID + session_id: str = 'theserverdoesnotknowthisid' + refdate: datetime = datetime.utcnow() + timedelta(seconds=3500) + # Send a mock GET request for '/just/testing/sessions' with a session cookie not known to the server + self.client_sock.set_request( + f'GET /just/testing/sessions HTTP/1.1\r\nCookie: matemat_session_id={session_id}\r\n\r\n'.encode('utf-8')) + # Trigger request handling + handler = HttpHandler(self.client_sock, ('::1', 45678), self.server) + # Fetch the parsed response + packet = self.client_sock.get_response() + # Make sure a full HTTP response was parsed + self.assertEqual('done', packet.parse_phase) + # Make sure the request was served by the test pagelet + self.assertEqual('session_test_pagelet', packet.pagelet) + self.assertEqual(200, packet.statuscode) + + server_session_id: str = list(handler.server.session_vars.keys())[0] + self.assertNotEqual(session_id, server_session_id) + # Make sure a cookie was set - assuming that only one was set + self.assertIn('Set-Cookie', packet.headers) + # Split into the cookie itself + cookie, expiry = packet.headers['Set-Cookie'].split(';') + cookie: str = cookie.strip() + expiry: str = expiry.strip() + # Make sure the 'matemat_session_id' cookie was set to the session ID string + self.assertEqual(f'matemat_session_id={server_session_id}', cookie) + # Make sure the session expires in about one hour + self.assertTrue(expiry.startswith('expires=')) + _, expdatestr = expiry.split('=', 1) + expdate = datetime.strptime(expdatestr, '%a, %d %b %Y %H:%M:%S GMT') + self.assertTrue(expdate > refdate) + # Make sure the session exists on the server + self.assertIn('test', handler.session_vars) + self.assertEqual('hello, world!', handler.session_vars['test']) + + def test_session_expired(self): + # Test session expiry date + refdate: datetime = datetime.utcnow() - timedelta(hours=1) + # Session ID for testing + session_id: str = 'testsessionid' + # Insert test session + self.server.session_vars[session_id] = refdate, {'test': 'bar'} + + # Send a mock GET request for '/just/testing/sessions' with a matemat session cookie + self.client_sock.set_request( + f'GET /just/testing/sessions HTTP/1.1\r\nCookie: matemat_session_id={session_id}\r\n\r\n'.encode('utf-8')) + # Trigger request handling + handler = HttpHandler(self.client_sock, ('::1', 45678), self.server) + # Fetch the parsed response + packet = self.client_sock.get_response() + # Make sure a full HTTP response was parsed + self.assertEqual('done', packet.parse_phase) + # Make sure the server redirects to / + self.assertEqual(302, packet.statuscode) + self.assertIn('Location', packet.headers) + self.assertEqual('/', packet.headers['Location']) + # Make sure the session was terminated + self.assertNotIn(session_id, self.server.session_vars) diff --git a/matemat/webserver/util.py b/matemat/webserver/util.py new file mode 100644 index 0000000..c95d303 --- /dev/null +++ b/matemat/webserver/util.py @@ -0,0 +1,138 @@ + +from typing import Dict, List, Tuple, Optional + +import urllib.parse + +from matemat.webserver import RequestArguments, RequestArgument + + +def _parse_multipart(body: bytes, boundary: str) -> List[RequestArgument]: + """ + Given a HTTP body with form-data in multipart form, and the multipart-boundary, parse the multipart items and + return them as a dictionary. + + :param body: The HTTP multipart/form-data body. + :param boundary: The multipart boundary. + :return: A dictionary of field names as key, and content types and field values as value. + """ + # Prepend a CRLF for the first boundary to match + body = b'\r\n' + body + # Generate item header boundary and terminating boundary from general boundary string + _boundary = f'\r\n--{boundary}\r\n'.encode('utf-8') + _end_boundary = f'\r\n--{boundary}--\r\n'.encode('utf-8') + # Split at the end boundary and make sure there comes nothing after it + allparts = body.split(_end_boundary, 1) + if len(allparts) != 2 or allparts[1] != b'': + raise ValueError('Last boundary missing or corrupted') + # Split remaining body into its parts, and verify at least 1 part is there + parts: List[bytes] = (allparts[0]).split(_boundary) + if len(parts) < 1 or parts[0] != b'': + raise ValueError('First boundary missing or corrupted') + # Remove the first, empty part + parts = parts[1:] + + # Results go into this dict + args: Dict[str, RequestArgument] = dict() + + # Parse each multipart part + for part in parts: + # Parse multipart headers + hdr: Dict[str, str] = dict() + while True: + head, part = part.split(b'\r\n', 1) + # Break on header/body delimiter + if head == b'': + break + # Add header to hdr dict + hk, hv = head.decode('utf-8').split(':') + hdr[hk.strip()] = hv.strip() + # No content type set - set broadest possible type + if 'Content-Type' not in hdr: + hdr['Content-Type'] = 'application/octet-stream' + # At least Content-Disposition must be present + if 'Content-Disposition' not in hdr: + raise ValueError('Missing Content-Type or Content-Disposition header') + # Extract Content-Disposition header value and its arguments + cd, *cdargs = hdr['Content-Disposition'].split(';') + # Content-Disposition MUST be form-data; everything else is rejected + if cd.strip() != 'form-data': + raise ValueError(f'Unknown Content-Disposition: {cd}') + # Extract the "name" header argument + has_name = False + for cdarg in cdargs: + k, v = cdarg.split('=', 1) + if k.strip() == 'name': + has_name = True + name: str = v.strip() + # Remove quotation marks around the name value + if name.startswith('"') and name.endswith('"'): + name = v[1:-1] + # Add the Content-Type and the content to the header, with the provided name + if name not in args: + args[name] = RequestArgument(name) + args[name].append(hdr['Content-Type'].strip(), part) + if not has_name: + # Content-Disposition header without name attribute + raise ValueError('mutlipart/form-data part without name attribute') + + return list(args.values()) + + +def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 'text/plain') \ + -> Tuple[str, RequestArguments]: + """ + Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded or + multipart/form-data form, parse the arguments and return them as a dictionary. + + If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded. + + :param request: The request string to parse. + :param postbody: The POST body to parse, defaults to None. + :param enctype: Encoding of the POST body; supported values are application/x-www-form-urlencoded and + multipart/form-data. + :return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs, and the value's + content type. + """ + # Parse the request "URL" (i.e. only the path) + tokens = urllib.parse.urlparse(request) + # Parse the GET arguments + if len(tokens.query) == 0: + getargs: Dict[str, List[str]] = dict() + else: + getargs = urllib.parse.parse_qs(tokens.query, strict_parsing=True, keep_blank_values=True, errors='strict') + + args = RequestArguments() + for k, vs in getargs.items(): + args[k].clear() + for v in vs: + args[k].append('text/plain', v) + + if postbody is not None: + if enctype == 'application/x-www-form-urlencoded': + # Parse the POST body + pb: str = postbody.decode('utf-8') + if len(pb) == 0: + postargs: Dict[str, List[str]] = dict() + else: + postargs = urllib.parse.parse_qs(pb, strict_parsing=True, keep_blank_values=True, errors='strict') + # Write all POST values into the dict, overriding potential duplicates from GET + for k, vs in postargs.items(): + args[k].clear() + for v in vs: + args[k].append('text/plain', v) + elif enctype.startswith('multipart/form-data'): + # Parse the multipart boundary from the Content-Type header + try: + boundary: str = enctype.split('boundary=')[1].strip() + except IndexError: + raise ValueError('Multipart boundary in header not set or corrupted') + # Parse the multipart body + mpargs = _parse_multipart(postbody, boundary) + for ra in mpargs: + args[ra.name].clear() + for a in ra: + args[ra.name].append(a.get_content_type(), bytes(a)) + else: + raise ValueError(f'Unsupported Content-Type: {enctype}') + # Return the path and the parsed arguments + return tokens.path, args diff --git a/requirements.txt b/requirements.txt index f2fa21b..7663204 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -bcrypt apsw +jinja2 diff --git a/templates/base.html b/templates/base.html new file mode 100644 index 0000000..1ddd113 --- /dev/null +++ b/templates/base.html @@ -0,0 +1,17 @@ + + + + Matemat + + + +

Matemat {{__version__}}

+ {% block main %} + {% endblock %} + + diff --git a/templates/login.html b/templates/login.html new file mode 100644 index 0000000..13c5dca --- /dev/null +++ b/templates/login.html @@ -0,0 +1,9 @@ +{% extends "base.html" %} + +{% block main %} +
+ Username:
+ Password:
+ +
+{% endblock%} diff --git a/templates/main.html b/templates/main.html new file mode 100644 index 0000000..6aa5887 --- /dev/null +++ b/templates/main.html @@ -0,0 +1,22 @@ +{% extends "base.html" %} + +{% block main %} + {{ user|default("") }} + +{% endblock %} diff --git a/templates/touchkey.html b/templates/touchkey.html new file mode 100644 index 0000000..bb0a5b9 --- /dev/null +++ b/templates/touchkey.html @@ -0,0 +1,9 @@ +{% extends "base.html" %} + +{% block main %} +
+
+ Touchkey:
+ +
+{% endblock %}