diff --git a/matemat/webserver/__init__.py b/matemat/webserver/__init__.py index f4d86f3..c52368e 100644 --- a/matemat/webserver/__init__.py +++ b/matemat/webserver/__init__.py @@ -6,4 +6,5 @@ API that can be used by 'pagelets' - single pages of a web service. If a reques server will attempt to serve the request with a static resource in a previously configured webroot directory. """ +from .requestargs import RequestArgument, RequestArguments from .httpd import MatematWebserver, HttpHandler, pagelet diff --git a/matemat/webserver/httpd.py b/matemat/webserver/httpd.py index 220849c..c59e3fc 100644 --- a/matemat/webserver/httpd.py +++ b/matemat/webserver/httpd.py @@ -1,12 +1,11 @@ -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, Tuple, Union import traceback import os import socket import mimetypes -import urllib.parse from socketserver import TCPServer from http.server import HTTPServer, BaseHTTPRequestHandler from http.cookies import SimpleCookie @@ -14,6 +13,8 @@ from uuid import uuid4 from datetime import datetime, timedelta from matemat import __version__ as matemat_version +from matemat.webserver import RequestArguments +from matemat.webserver.util import parse_args # @@ -28,12 +29,17 @@ BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None # Dictionary to hold registered pagelet paths and their handler functions -_PAGELET_PATHS: Dict[str, Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes], - Tuple[int, Union[bytes, str]]]] = dict() +_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...) + str, # Request path + RequestArguments, # HTTP Request arguments + Dict[str, Any], # Session vars + Dict[str, str]], # Response headers + Tuple[int, Union[bytes, str]]]] = dict() # Returns: (status code, response body) # Inactivity timeout for client sessions _SESSION_TIMEOUT: int = 3600 +_MAX_POST: int = 1_000_000 def pagelet(path: str): @@ -43,8 +49,12 @@ def pagelet(path: str): The function must have the following signature: - (method: str, path: str, args: Dict[str, Union[str, List[str]], session_vars: Dict[str, Any], - headers: Dict[str, str]) -> (int, Optional[Union[str, bytes]]) + (method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str]) + -> (int, Optional[Union[str, bytes]]) method: The HTTP method (GET, POST) that was used. path: The path that was requested. @@ -56,7 +66,12 @@ def pagelet(path: str): :param path: The path to register the function for. """ - def http_handler(fun: Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes], + + def http_handler(fun: Callable[[str, + str, + RequestArguments, + Dict[str, Any], + Dict[str, str]], Tuple[int, Union[bytes, str]]]): # Add the function to the dict of pagelets _PAGELET_PATHS[path] = fun @@ -166,7 +181,7 @@ class HttpHandler(BaseHTTPRequestHandler): if session_id in self.server.session_vars: del self.server.session_vars[session_id] - def _handle(self, method: str, path: str, args: Dict[str, Union[str, List[str]]]) -> None: + def _handle(self, method: str, path: str, args: RequestArguments) -> None: """ Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource. @@ -238,7 +253,7 @@ class HttpHandler(BaseHTTPRequestHandler): mimetype = 'application/octet-stream' # Send content type and length header self.send_header('Content-Type', mimetype) - self.send_header('Content-Length', len(data)) + self.send_header('Content-Length', str(len(data))) self.end_headers() # Send the requested resource as response body self.wfile.write(data) @@ -247,36 +262,6 @@ class HttpHandler(BaseHTTPRequestHandler): self.send_response(404) self.end_headers() - @staticmethod - def _parse_args(request: str, postbody: Optional[str] = None) -> Tuple[str, Dict[str, Union[str, List[str]]]]: - """ - Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded form, parse the - arguments and return them as a dictionary. - - If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded. - - :param request: The request string to parse. - :param postbody: The POST body to parse, defaults to None. - :return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs. - """ - # Parse the request "URL" (i.e. only the path) - tokens = urllib.parse.urlparse(request) - # Parse the GET arguments - args = urllib.parse.parse_qs(tokens.query) - - if postbody is not None: - # Parse the POST body - postargs = urllib.parse.parse_qs(postbody) - # Write all POST values into the dict, overriding potential duplicates from GET - for k, v in postargs.items(): - args[k] = v - # urllib.parse.parse_qs turns ALL arguments into arrays. This turns arrays of length 1 into scalar values - for k, v in args.items(): - if len(v) == 1: - args[k] = v[0] - # Return the path and the parsed arguments - return tokens.path, args - # noinspection PyPep8Naming def do_GET(self) -> None: """ @@ -284,7 +269,7 @@ class HttpHandler(BaseHTTPRequestHandler): """ try: # Parse the request and hand it to the handle function - path, args = self._parse_args(self.path) + path, args = parse_args(self.path) self._handle('GET', path, args) # Special handling for some errors except PermissionError: @@ -305,25 +290,24 @@ class HttpHandler(BaseHTTPRequestHandler): """ try: # Read the POST body, if it exists, and its MIME type is application/x-www-form-urlencoded - clen: str = self.headers.get('Content-Length', failobj='0') + clen: int = int(str(self.headers.get('Content-Length', failobj='0'))) + if clen > _MAX_POST: + raise ValueError('Request too big') ctype: str = self.headers.get('Content-Type', failobj='application/octet-stream') - post: str = '' - if ctype == 'application/x-www-form-urlencoded': - post = self.rfile.read(int(clen)).decode('utf-8') + post: bytes = self.rfile.read(clen) + path, args = parse_args(self.path, postbody=post, enctype=ctype) # Parse the request and hand it to the handle function - path, args = self._parse_args(self.path, postbody=post) self._handle('POST', path, args) - # Special handling for some errors - except PermissionError as e: + # Special handling for some errors + except PermissionError: self.send_response(403, 'Forbidden') self.end_headers() - print(e) - traceback.print_tb(e.__traceback__) - except ValueError as e: + except ValueError: + self.send_response(400, 'Bad Request') + self.end_headers() + except TypeError: self.send_response(400, 'Bad Request') self.end_headers() - print(e) - traceback.print_tb(e.__traceback__) except BaseException as e: # Generic error handling self.send_response(500, 'Internal Server Error') diff --git a/matemat/webserver/pagelets/login.py b/matemat/webserver/pagelets/login.py index 876fd71..7d0cc2d 100644 --- a/matemat/webserver/pagelets/login.py +++ b/matemat/webserver/pagelets/login.py @@ -1,14 +1,19 @@ -from typing import Any, Dict +from typing import Any, Dict, Optional, Tuple, Union from matemat.exceptions import AuthenticationError -from matemat.webserver import pagelet +from matemat.webserver import pagelet, RequestArguments from matemat.primitives import User from matemat.db import MatematDatabase @pagelet('/login') -def login_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]): +def login_page(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str])\ + -> Tuple[int, Optional[Union[str, bytes]]]: if 'user' in session_vars: headers['Location'] = '/' return 301, None @@ -36,15 +41,15 @@ def login_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[ ''' - return 200, data.format(msg=args['msg'] if 'msg' in args else '') + return 200, data.format(msg=str(args.msg) if 'msg' in args else '') elif method == 'POST': - print(args) with MatematDatabase('test.db') as db: try: - user: User = db.login(args['username'], args['password']) + user: User = db.login(str(args.username), str(args.password)) except AuthenticationError: headers['Location'] = '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.' return 301, bytes() session_vars['user'] = user headers['Location'] = '/' return 301, bytes() + return 405, None diff --git a/matemat/webserver/pagelets/logout.py b/matemat/webserver/pagelets/logout.py index 86095b0..beb86a3 100644 --- a/matemat/webserver/pagelets/logout.py +++ b/matemat/webserver/pagelets/logout.py @@ -1,11 +1,16 @@ -from typing import Any, Dict +from typing import Any, Dict, List, Optional, Tuple, Union -from matemat.webserver import pagelet +from matemat.webserver import pagelet, RequestArguments @pagelet('/logout') -def logout(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]): +def logout(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str])\ + -> Tuple[int, Optional[Union[str, bytes]]]: if 'user' in session_vars: del session_vars['user'] headers['Location'] = '/' diff --git a/matemat/webserver/pagelets/main.py b/matemat/webserver/pagelets/main.py index 2ead15d..e22c872 100644 --- a/matemat/webserver/pagelets/main.py +++ b/matemat/webserver/pagelets/main.py @@ -1,13 +1,17 @@ from typing import Any, Dict, Optional, Tuple, Union -from matemat.webserver import MatematWebserver, pagelet +from matemat.webserver import pagelet, RequestArguments from matemat.primitives import User from matemat.db import MatematDatabase @pagelet('/') -def main_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str])\ +def main_page(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str])\ -> Tuple[int, Optional[Union[str, bytes]]]: data = ''' diff --git a/matemat/webserver/pagelets/touchkey.py b/matemat/webserver/pagelets/touchkey.py index fd99fea..4de8009 100644 --- a/matemat/webserver/pagelets/touchkey.py +++ b/matemat/webserver/pagelets/touchkey.py @@ -1,14 +1,19 @@ -from typing import Any, Dict +from typing import Any, Dict, Optional, Tuple, Union from matemat.exceptions import AuthenticationError -from matemat.webserver import pagelet +from matemat.webserver import pagelet, RequestArguments from matemat.primitives import User from matemat.db import MatematDatabase @pagelet('/touchkey') -def touchkey_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]): +def touchkey_page(method: str, + path: str, + args: RequestArguments, + session_vars: Dict[str, Any], + headers: Dict[str, str])\ + -> Tuple[int, Optional[Union[str, bytes]]]: if 'user' in session_vars: headers['Location'] = '/' return 301, bytes() @@ -35,14 +40,15 @@ def touchkey_page(method: str, path: str, args: Dict[str, str], session_vars: Di ''' - return 200, data.format(username=args['username'] if 'username' in args else '') + return 200, data.format(username=str(args.username) if 'username' in args else '') elif method == 'POST': with MatematDatabase('test.db') as db: try: - user: User = db.login(args['username'], touchkey=args['touchkey']) + user: User = db.login(str(args.username), touchkey=str(args.touchkey)) except AuthenticationError: headers['Location'] = f'/touchkey?username={args["username"]}&msg=Please%20try%20again.' return 301, bytes() session_vars['user'] = user headers['Location'] = '/' return 301, None + return 405, None diff --git a/matemat/webserver/requestargs.py b/matemat/webserver/requestargs.py new file mode 100644 index 0000000..2150b31 --- /dev/null +++ b/matemat/webserver/requestargs.py @@ -0,0 +1,324 @@ + +from typing import Dict, Iterator, List, Tuple, Union + + +class RequestArguments(object): + """ + Container for HTTP Request arguments. + + Usage: + + # Create empty instance + ra = RequestArguments() + # Add an entry for the key 'foo' with the value 'bar' and Content-Type 'text/plain' + ra['foo'].append('text/plain', 'bar') + # Retrieve the value for the key 'foo', as a string... + foo = str(ra.foo) + # ... or as raw bytes + foo = bytes(ra.foo) + """ + + def __init__(self) -> None: + """ + Create an empty container instance. + """ + self.__container: Dict[str, RequestArgument] = dict() + + def __getitem__(self, key: str) -> 'RequestArgument': + """ + Retrieve the argument for the given name, creating it on the fly, if it doesn't exist. + + :param key: Name of the argument to retrieve. + :return: A RequestArgument instance. + :raises TypeError: If key is not a string. + """ + if not isinstance(key, str): + raise TypeError('key must be a str') + # Create empty argument, if it doesn't exist + if key not in self.__container: + self.__container[key] = RequestArgument(key) + # Return the argument for the name + return self.__container[key] + + def __getattr__(self, key: str) -> 'RequestArgument': + """ + Syntactic sugar for accessing values with a name that can be used in Python attributes. The value will be + returned as an immutable view. + + :param key: Name of the argument to retrieve. + :return: An immutable view of the RequestArgument instance. + """ + return _View.of(self.__container[key]) + + def __iter__(self) -> Iterator['RequestArguments']: + """ + Returns an iterator over the values in this instance. Values are represented as immutable views. + + :return: An iterator that yields immutable views of the values. + """ + for ra in self.__container.values(): + # Yield an immutable scalar view for each value + yield _View.of(ra) + + def __contains__(self, key: str) -> bool: + """ + Checks whether an argument with a given name exists in the RequestArguments instance. + + :param key: The name to check whether it exists. + :return: True, if present, False otherwise. + """ + return key in self.__container + + def __len__(self) -> int: + """ + :return: The number of arguments in this instance. + """ + return len(self.__container) + + +class RequestArgument(object): + """ + Container class for HTTP request arguments that simplifies dealing with + - scalar and array arguments: + Automatically converts between single values and arrays where necessary: Arrays with one element can be + accessed as scalars, and scalars can be iterated, yielding themselves as a single item. + - UTF-8 strings and binary data (e.g. file uploads): + All data can be retrieved both as a str (if utf-8 decoding is possible) and a bytes object. + + The objects returned from iteration or indexing are immutable views of (parts of) this object. + + Usage example: + + qsargs = urllib.parse.parse_qs(qs, strict_parsing=True, keep_blank_values=True, errors='strict') + args: RequestArguments + for k, vs in qsargs: + args[k].clear() + for v in vs: + # text/plain usually is a sensible choice for values decoded from urlencoded strings + # IF ALREADY IN STRING FORM (which parse_qs does)! + args[k].append('text/plain', v) + + if 'username' in args and args.username.is_scalar: + username = str(args.username) + + """ + + def __init__(self, + name: str, + value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]] = None) -> None: + """ + Create a new RequestArgument with a name and optionally an initial value. + + :param name: The name for this argument, as provided via GET or POST. + :param value: The initial value, if any. Optional, initializes with empty array if omitted. + """ + # Assign name + self.__name: str = name + # Initialize value + self.__value: List[Tuple[str, Union[bytes, str]]] = [] + # Default to empty array + if value is None: + self.__value = [] + else: + if isinstance(value, list): + # Store the array + self.__value = value + else: + # Turn scalar into an array before storing + self.__value = [value] + + @property + def is_array(self) -> bool: + """ + :return: True, if the value is a (possibly empty) array, False otherwise. + """ + return len(self.__value) != 1 + + @property + def is_scalar(self) -> bool: + """ + :return: True, if the value is a single scalar value, False otherwise. + """ + return len(self.__value) == 1 + + @property + def is_view(self) -> bool: + """ + :return: True, if this instance is an immutable view, False otherwise. + """ + return False + + @property + def name(self) -> str: + """ + :return: The name of this argument. + """ + return self.__name + + def get_str(self, index: int = 0) -> str: + """ + Attempts to return a value as a string. The index defaults to 0. + + :param index: The index of the value to retrieve. Default: 0. + :return: An UTF-8 string representation of the requested value. + :raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string. + :raises IndexError: If the index is out of bounds. + :raises TypeError: If the index is not an int. + :raises TypeError: If the requested value is neither a str nor a bytes object. + """ + if not isinstance(index, int): + # Index must be an int + raise TypeError('index must be an int') + # Type hint; access array element + v: Tuple[str, Union[bytes, str]] = self.__value[index] + if isinstance(v[1], str): + # The value already is a string, return + return v[1] + elif isinstance(v[1], bytes): + # The value is a bytes object, attempt to decode + return v[1].decode('utf-8') + raise TypeError('Value is neither a str nor bytes') + + def __str__(self) -> str: + """ + Attempts to return the first value as a string. + :return: An UTF-8 string representation of the first value. + :raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string. + """ + return self.get_str() + + def get_bytes(self, index: int = 0) -> bytes: + """ + Attempts to return a value as a bytes object. The index defaults to 0. + + :param index: The index of the value to retrieve. Default: 0. + :return: A bytes object representation of the requested value. Strings will be encoded as UTF-8. + :raises IndexError: If the index is out of bounds. + :raises TypeError: If the index is not an int. + :raises TypeError: If the requested value is neither a str nor a bytes object. + """ + if not isinstance(index, int): + # Index must be a int + raise TypeError('index must be an int') + # Type hint; access array element + v: Tuple[str, Union[bytes, str]] = self.__value[index] + if isinstance(v[1], bytes): + # The value already is a bytes object, return + return v[1] + elif isinstance(v[1], str): + # The value is a string, encode first + return v[1].encode('utf-8') + raise TypeError('Value is neither a str nor bytes') + + def __bytes__(self) -> bytes: + """ + Attempts to return the first value as a bytes object. + :return: A bytes string representation of the first value. + """ + return self.get_bytes() + + def get_content_type(self, index: int = 0) -> str: + """ + Attempts to retrieve a value's Content-Type. The index defaults to 0. + + :param index: The index of the value to retrieve. Default: 0. + :return: The Content-Type of the requested value, as sent by the client. Not necessarily trustworthy. + :raises IndexError: If the index is out of bounds. + :raises TypeError: If the index is not an int. + """ + # instance is an array value + if not isinstance(index, int): + # Needs an index for array values + raise TypeError('index must be an int') + # Type hint; access array element + va: Tuple[str, Union[bytes, str]] = self.__value[index] + # Return the content type of the requested value + if not isinstance(va[0], str): + raise TypeError('Content-Type is not a str') + return va[0] + + def append(self, ctype: str, value: Union[str, bytes]) -> None: + """ + Append a value to this instance. Turns an empty argument into a scalar and a scalar into an array. + + :param ctype: The Content-Type, as provided in the request. + :param value: The scalar value to append, either a string or bytes object. + :raises TypeError: If called on an immutable view. + """ + if self.is_view: + # This is an immutable view, raise exception + raise TypeError('A RequestArgument view is immutable!') + self.__value.append((ctype, value)) + + def clear(self) -> None: + """ + Remove all values from this instance. + + :raises TypeError: If called on an immutable view. + """ + if self.is_view: + # This is an immutable view, raise exception + raise TypeError('A RequestArgument view is immutable!') + self.__value.clear() + + def __len__(self) -> int: + """ + :return: Number of values for this argument. + """ + return len(self.__value) + + def __iter__(self) -> Iterator['RequestArgument']: + """ + Iterate the values of this argument. Each value is accessible as if it were a scalar RequestArgument in turn, + although they are immutable. + + :return: An iterator that yields immutable views of the values. + """ + for v in self.__value: + # Yield an immutable scalar view for each (ctype, value) element in the array + yield _View(self.__name, v) + + def __getitem__(self, index: Union[int, slice]) -> 'RequestArgument': + """ + Index the argument with either an int or a slice. The returned values are represented as immutable + RequestArgument views. + + :param index: The index or slice. + :return: An immutable view of the indexed elements of this argument. + """ + # Pass the index or slice through to the array, packing the result in an immutable view + return _View(self.__name, self.__value[index]) + + +class _View(RequestArgument): + """ + This class represents an immutable view of a (subset of a) RequestArgument object. Should not be instantiated + directly. + """ + + def __init__(self, name: str, value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]])\ + -> None: + """ + Create a new immutable view of a (subset of a) RequestArgument. + + :param name: The name for this argument, same as in the original RequestArgument. + :param value: The values to represent in this view, obtained by e.g. indexing or slicing. + """ + super().__init__(name, value) + + @staticmethod + def of(argument: 'RequestArgument') ->'RequestArgument': + """ + Create an immutable, unsliced view of an RequestArgument instance. + + :param argument: The RequestArgument instance to create a view of. + :return: An immutable view of the provided RequestArgument instance. + """ + return argument[:] + + @property + def is_view(self) -> bool: + """ + :return: True, if this instance is an immutable view, False otherwise. + """ + return True diff --git a/matemat/webserver/test/abstract_httpd_test.py b/matemat/webserver/test/abstract_httpd_test.py index de0daf6..103979b 100644 --- a/matemat/webserver/test/abstract_httpd_test.py +++ b/matemat/webserver/test/abstract_httpd_test.py @@ -9,7 +9,7 @@ from abc import ABC from datetime import datetime from http.server import HTTPServer -from matemat.webserver.httpd import pagelet +from matemat.webserver import pagelet, RequestArguments class HttpResponse: @@ -31,8 +31,8 @@ class HttpResponse: 'Content-Length': 0 } self.pagelet: str = None - # The response body. Only UTF-8 strings are supported - self.body: str = '' + # The response body + self.body: bytes = bytes() # Parsing phase, one of 'begin', 'hdr', 'body' or 'done' self.parse_phase = 'begin' # Buffer for uncompleted lines @@ -55,7 +55,7 @@ class HttpResponse: return # If in the body phase, simply decode and append to the body, while the body is not complete yet elif self.parse_phase == 'body': - self.body += fragment.decode('utf-8') + self.body += fragment if len(self.body) >= int(self.headers['Content-Length']): self.__finalize() return @@ -66,24 +66,24 @@ class HttpResponse: if not fragment.endswith(b'\r\n'): # Special treatment for no trailing CR-LF: Add remainder to buffer head, tail = fragment.rsplit(b'\r\n', 1) - data: str = (self.buffer + head).decode('utf-8') + data: bytes = (self.buffer + head) self.buffer = tail else: - data: str = (self.buffer + fragment).decode('utf-8') + data: bytes = (self.buffer + fragment) self.buffer = bytes() # Iterate the lines that are ready to be parsed - for line in data.split('\r\n'): + for line in data.split(b'\r\n'): # The 'begin' phase indicates that the parser is waiting for the HTTP status line if self.parse_phase == 'begin': - if line.startswith('HTTP/'): + if line.startswith(b'HTTP/'): # Parse the statuscode and advance to header parsing - _, statuscode, _ = line.split(' ', 2) + _, statuscode, _ = line.decode('utf-8').split(' ', 2) self.statuscode = int(statuscode) self.parse_phase = 'hdr' elif self.parse_phase == 'hdr': # Parse a header line and add it to the header dict if len(line) > 0: - k, v = line.split(':', 1) + k, v = line.decode('utf-8').split(':', 1) self.headers[k.strip()] = v.strip() else: # Empty line separates header from body @@ -156,12 +156,16 @@ class MockSocket(bytes): def test_pagelet(path: str): - def with_testing_headers(fun: Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str]], + def with_testing_headers(fun: Callable[[str, + str, + RequestArguments, + Dict[str, Any], + Dict[str, str]], Tuple[int, Union[bytes, str]]]): @pagelet(path) def testing_wrapper(method: str, path: str, - args: Dict[str, str], + args: RequestArguments, session_vars: Dict[str, Any], headers: Dict[str, str]): status, body = fun(method, path, args, session_vars, headers) diff --git a/matemat/webserver/test/test_parse_request.py b/matemat/webserver/test/test_parse_request.py new file mode 100644 index 0000000..0a94065 --- /dev/null +++ b/matemat/webserver/test/test_parse_request.py @@ -0,0 +1,347 @@ + +import unittest + +from matemat.webserver.util import parse_args + + +class TestParseRequest(unittest.TestCase): + + def test_parse_get_root(self): + path, args = parse_args('/') + self.assertEqual('/', path) + self.assertEqual(0, len(args)) + + def test_parse_get_no_args(self): + path, args = parse_args('/index.html') + self.assertEqual('/index.html', path) + self.assertEqual(0, len(args)) + + def test_parse_get_root_getargs(self): + path, args = parse_args('/?foo=42&bar=1337&baz=Hello,%20World!') + self.assertEqual('/', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('text/plain', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual('1337', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_get_getargs(self): + path, args = parse_args('/abc/def?foo=42&bar=1337&baz=Hello,%20World!') + self.assertEqual('/abc/def', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('text/plain', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual('1337', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_get_getarray(self): + path, args = parse_args('/abc/def?foo=42&foo=1337&baz=Hello,%20World!') + self.assertEqual('/abc/def', path) + self.assertEqual(2, len(args)) + self.assertIn('foo', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_array) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual(2, len(args['foo'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + + def test_parse_get_zero_arg(self): + path, args = parse_args('/abc/def?foo=&bar=42') + self.assertEqual(2, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertEqual(1, len(args['foo'])) + self.assertEqual('', args['foo'].get_str()) + self.assertEqual('42', args['bar'].get_str()) + + def test_parse_get_urlencoded_encoding_fail(self): + with self.assertRaises(ValueError): + parse_args('/?foo=42&bar=%80&baz=Hello,%20World!') + + def test_parse_post_urlencoded(self): + path, args = parse_args('/', + postbody=b'foo=42&bar=1337&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + self.assertEqual('/', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('text/plain', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual('1337', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_post_urlencoded_array(self): + path, args = parse_args('/', + postbody=b'foo=42&foo=1337&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + self.assertEqual('/', path) + self.assertEqual(2, len(args)) + self.assertIn('foo', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_array) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual(2, len(args['foo'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + + def test_parse_post_urlencoded_zero_arg(self): + path, args = parse_args('/abc/def', postbody=b'foo=&bar=42', enctype='application/x-www-form-urlencoded') + self.assertEqual(2, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertEqual(1, len(args['foo'])) + self.assertEqual('', args['foo'].get_str()) + self.assertEqual('42', args['bar'].get_str()) + + def test_parse_post_urlencoded_encoding_fail(self): + with self.assertRaises(ValueError): + parse_args('/', + postbody=b'foo=42&bar=%80&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + + def test_parse_post_multipart_no_args(self): + path, args = parse_args('/', + postbody=b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertEqual('/', path) + self.assertEqual(0, len(args)) + + def test_parse_post_multipart(self): + path, args = parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertEqual('/', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('application/octet-stream', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual(b'1337', args['bar'].get_bytes()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_post_multipart_zero_arg(self): + path, args = parse_args('/abc/def', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertEqual(2, len(args)) + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertEqual(1, len(args['foo'])) + self.assertEqual('', args['foo'].get_str()) + self.assertEqual('42', args['bar'].get_str()) + + def test_parse_post_multipart_broken_boundaries(self): + with self.assertRaises(ValueError): + # Boundary not defined in Content-Type + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data') + with self.assertRaises(ValueError): + # Corrupted "--" head at first boundary + parse_args('/', + postbody=b'-+testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Missing "--" tail at end boundary + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Missing Content-Type header in one part + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Missing Content-Disposition header in one part + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Missing form-data name argument + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Unknown Content-Disposition + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: attachment; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + + def test_get_post_precedence_urlencoded(self): + path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived', + postbody=b'foo=42&foo=1337&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertEqual(2, len(args['foo'])) + self.assertEqual(1, len(args['bar'])) + self.assertEqual(1, len(args['baz'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + self.assertEqual('isurvived', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_get_post_precedence_multipart(self): + path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"; filename="bar.bin"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertEqual(2, len(args['foo'])) + self.assertEqual(1, len(args['bar'])) + self.assertEqual(1, len(args['baz'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + self.assertEqual('isurvived', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) diff --git a/matemat/webserver/test/test_post.py b/matemat/webserver/test/test_post.py index ad99247..0bc5d16 100644 --- a/matemat/webserver/test/test_post.py +++ b/matemat/webserver/test/test_post.py @@ -1,14 +1,16 @@ from typing import Any, Dict, List -from matemat.webserver.httpd import HttpHandler +from matemat.webserver import HttpHandler, RequestArguments from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet +import codecs + @test_pagelet('/just/testing/post') def post_test_pagelet(method: str, path: str, - args: Dict[str, str], + args: RequestArguments, session_vars: Dict[str, Any], headers: Dict[str, str]): """ @@ -16,8 +18,12 @@ def post_test_pagelet(method: str, """ headers['Content-Type'] = 'text/plain' dump: str = '' - for k, v in args.items(): - dump += f'{k}: {v if isinstance(v, str) else ",".join(v)}\n' + for ra in args: + for a in ra: + if a.get_content_type().startswith('text/'): + dump += f'{a.name}: {a.get_str()}\n' + else: + dump += f'{a.name}: {codecs.encode(a.get_bytes(), "hex").decode("utf-8")}\n' return 200, dump @@ -26,7 +32,7 @@ class TestPost(AbstractHttpdTest): Test cases for the content serving of the web server. """ - def test_post_get_only_args(self): + def test_post_urlenc_get_only_args(self): """ Test a POST request that only contains GET arguments. """ @@ -38,17 +44,17 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) + k, v = l.decode('utf-8').split(':', 1) kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') # Make sure the arguments were properly parsed self.assertEqual('bar', kv['foo']) self.assertEqual('1', kv['test']) - def test_post_post_only_args(self): + def test_post_urlenc_post_only_args(self): """ Test a POST request that only contains POST arguments (urlencoded). """ @@ -61,17 +67,17 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) + k, v = l.decode('utf-8').split(':', 1) kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') # Make sure the arguments were properly parsed self.assertEqual('bar', kv['foo']) self.assertEqual('1', kv['test']) - def test_post_mixed_args(self): + def test_post_urlenc_mixed_args(self): """ Test that mixed POST and GET args are properly parsed, and that POST takes precedence over GET. """ @@ -84,10 +90,10 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) + k, v = l.decode('utf-8').split(':', 1) kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') # Make sure the arguments were properly parsed @@ -95,7 +101,7 @@ class TestPost(AbstractHttpdTest): self.assertEqual('1', kv['gettest']) self.assertEqual('2', kv['posttest']) - def test_post_get_array(self): + def test_post_urlenc_get_array(self): """ Test a POST request that contains GET array arguments. """ @@ -107,17 +113,21 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) - kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') - + k, v = l.decode('utf-8').split(':', 1) + k = k.strip() + v = v.strip() + if k in kv: + kv[k] += f',{v}' + else: + kv[k] = v # Make sure the arguments were properly parsed - self.assertListEqual(['bar', 'baz'], kv['foo']) + self.assertEqual('bar,baz', kv['foo']) self.assertEqual('1', kv['test']) - def test_post_post_array(self): + def test_post_urlenc_post_array(self): """ Test a POST request that contains POST array arguments. """ @@ -130,17 +140,21 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) - kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') - + k, v = l.decode('utf-8').split(':', 1) + k = k.strip() + v = v.strip() + if k in kv: + kv[k] += f',{v}' + else: + kv[k] = v # Make sure the arguments were properly parsed - self.assertListEqual(['bar', 'baz'], kv['foo']) + self.assertEqual('bar,baz', kv['foo']) self.assertEqual('1', kv['test']) - def test_post_mixed_array(self): + def test_post_urlenc_mixed_array(self): """ Test a POST request that contains both GET and POST array arguments. """ @@ -153,13 +167,58 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) - kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') - + k, v = l.decode('utf-8').split(':', 1) + k = k.strip() + v = v.strip() + if k in kv: + kv[k] += f',{v}' + else: + kv[k] = v # Make sure the arguments were properly parsed - self.assertListEqual(['postbar', 'postbaz'], kv['foo']) - self.assertListEqual(['1', '42'], kv['gettest']) - self.assertListEqual(['1', '2'], kv['posttest']) + self.assertEqual('postbar,postbaz', kv['foo']) + self.assertEqual('1,42', kv['gettest']) + self.assertEqual('1,2', kv['posttest']) + + def test_post_no_body(self): + """ + Test a POST request that contains no headers or body. + """ + # Send POST request + self.client_sock.set_request(b'POST /just/testing/post?foo=bar HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + # Make sure a 400 Bad Request is returned + self.assertEqual(400, packet.statuscode) + + def test_post_multipart_post_only(self): + """ + Test a POST request with a miutipart/form-data body. + """ + # Send POST request + formdata = (b'------testboundary\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'------testboundary\r\n' + b'Content-Disposition: form-data; name="bar"; filename="foo.bar"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x80\x0b\x0c\x73\x0e\x0f\r\n' + b'------testboundary--\r\n') + + self.client_sock.set_request(f'POST /just/testing/post HTTP/1.1\r\n' + f'Content-Type: multipart/form-data; boundary=----testboundary\r\n' + f'Content-Length: {len(formdata)}\r\n\r\n'.encode('utf-8') + formdata) + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + lines: List[bytes] = packet.body.split(b'\n')[:-1] + kv: Dict[str, Any] = dict() + for l in lines: + k, v = l.split(b':', 1) + kv[k.decode('utf-8').strip()] = v.strip() + self.assertIn('foo', kv) + self.assertIn('bar', kv) + self.assertEqual(kv['foo'], b'Hello, World!') + self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f') diff --git a/matemat/webserver/test/test_requestargs.py b/matemat/webserver/test/test_requestargs.py new file mode 100644 index 0000000..3e093a2 --- /dev/null +++ b/matemat/webserver/test/test_requestargs.py @@ -0,0 +1,529 @@ + +from typing import Dict, List, Set, Tuple + +import unittest +import urllib.parse + +from matemat.webserver import RequestArgument, RequestArguments +# noinspection PyProtectedMember +from matemat.webserver.requestargs import _View + + +class TestRequestArguments(unittest.TestCase): + """ + Test cases for the RequestArgument class. + """ + + def test_create_default(self): + """ + Test creation of an empty RequestArgument + """ + ra = RequestArgument('foo') + # Name must be set to 1st argument + self.assertEqual('foo', ra.name) + # Must be a 0-length array + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + self.assertTrue(ra.is_array) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_create_str_scalar(self): + """ + Test creation of a scalar RequestArgument with string value. + """ + ra = RequestArgument('foo', ('text/plain', 'bar')) + # Name must be set to 1st argument + self.assertEqual('foo', ra.name) + # Must be a scalar, length 1 + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertFalse(ra.is_array) + # Scalar value must be representable both as str and bytes + self.assertEqual('bar', ra.get_str()) + self.assertEqual(b'bar', ra.get_bytes()) + # Content-Type must be set correctly + self.assertEqual('text/plain', ra.get_content_type()) + # Using 0 indices must yield the same results + self.assertEqual('bar', ra.get_str(0)) + self.assertEqual(b'bar', ra.get_bytes(0)) + self.assertEqual('text/plain', ra.get_content_type(0)) + # Using other indices must result in an error + with self.assertRaises(IndexError): + ra.get_str(1) + with self.assertRaises(IndexError): + ra.get_bytes(1) + with self.assertRaises(IndexError): + ra.get_content_type(1) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_create_str_scalar_array(self): + """ + Test creation of a scalar RequestArgument with string value, passing an array instead of a single tuple. + """ + ra = RequestArgument('foo', [('text/plain', 'bar')]) + # Name must be set to 1st argument + self.assertEqual('foo', ra.name) + # Must be a scalar, length 1 + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertFalse(ra.is_array) + # Scalar value must be representable both as str and bytes + self.assertEqual('bar', ra.get_str()) + self.assertEqual(b'bar', ra.get_bytes()) + # Content-Type must be set correctly + self.assertEqual('text/plain', ra.get_content_type()) + # Using 0 indices must yield the same results + self.assertEqual('bar', ra.get_str(0)) + self.assertEqual(b'bar', ra.get_bytes(0)) + self.assertEqual('text/plain', ra.get_content_type(0)) + # Using other indices must result in an error + with self.assertRaises(IndexError): + ra.get_str(1) + with self.assertRaises(IndexError): + ra.get_bytes(1) + with self.assertRaises(IndexError): + ra.get_content_type(1) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_create_bytes_scalar(self): + """ + Test creation of a scalar RequestArgument with bytes value. + """ + ra = RequestArgument('foo', ('application/octet-stream', b'\x00\x80\xff\xfe')) + # Name must be set to 1st argument + self.assertEqual('foo', ra.name) + # Must be a scalar, length 1 + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertFalse(ra.is_array) + # Conversion to UTF-8 string must fail; bytes representation must work + with self.assertRaises(UnicodeDecodeError): + ra.get_str() + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes()) + # Content-Type must be set correctly + self.assertEqual('application/octet-stream', ra.get_content_type()) + # Using 0 indices must yield the same results + with self.assertRaises(UnicodeDecodeError): + ra.get_str(0) + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(0)) + self.assertEqual('application/octet-stream', ra.get_content_type(0)) + # Using other indices must result in an error + with self.assertRaises(IndexError): + ra.get_str(1) + with self.assertRaises(IndexError): + ra.get_bytes(1) + with self.assertRaises(IndexError): + ra.get_content_type(1) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_create_array(self): + """ + Test creation of an array RequestArgument with mixed str and bytes initial value. + """ + ra = RequestArgument('foo', [ + ('text/plain', 'bar'), + ('application/octet-stream', b'\x00\x80\xff\xfe') + ]) + # Name must be set to 1st argument + self.assertEqual('foo', ra.name) + # Must be an array, length 2 + self.assertEqual(2, len(ra)) + self.assertFalse(ra.is_scalar) + self.assertTrue(ra.is_array) + # Retrieving values without an index must yield the first element + self.assertEqual('bar', ra.get_str()) + self.assertEqual(b'bar', ra.get_bytes()) + self.assertEqual('text/plain', ra.get_content_type()) + # The first value must be representable both as str and bytes, and have ctype text/plain + self.assertEqual('bar', ra.get_str(0)) + self.assertEqual(b'bar', ra.get_bytes(0)) + self.assertEqual('text/plain', ra.get_content_type(0)) + # Conversion of the second value to UTF-8 string must fail; bytes representation must work + with self.assertRaises(UnicodeDecodeError): + ra.get_str(1) + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1)) + # The second value's ctype must be correct + self.assertEqual('application/octet-stream', ra.get_content_type(1)) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_append_empty_str(self): + """ + Test appending a str value to an empty RequestArgument. + """ + # Initialize the empty RequestArgument + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + # Append a string value + ra.append('text/plain', 'bar') + # New length must be 1, empty array must be converted to scalar + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + # Retrieval of the new value must work both in str and bytes representation + self.assertEqual('bar', ra.get_str()) + self.assertEqual(b'bar', ra.get_bytes()) + # Content type of the new value must be correct + self.assertEqual('text/plain', ra.get_content_type()) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_append_empty_bytes(self): + """ + Test appending a bytes value to an empty RequestArgument. + """ + # Initialize the empty RequestArgument + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + # Append a bytes value + ra.append('application/octet-stream', b'\x00\x80\xff\xfe') + # New length must be 1, empty array must be converted to scalar + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + # Conversion of the new value to UTF-8 string must fail; bytes representation must work + with self.assertRaises(UnicodeDecodeError): + ra.get_str() + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes()) + # Content type of the new value must be correct + self.assertEqual('application/octet-stream', ra.get_content_type()) + # Must not be a view + self.assertFalse(ra.is_view) + + def test_append_multiple(self): + """ + Test appending multiple values to an empty RequestArgument. + """ + # Initialize the empty RequestArgument + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + # Append a first value + ra.append('text/plain', 'bar') + # New length must be 1, empty array must be converted to scalar + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertEqual(b'bar', ra.get_bytes()) + + # Append a second value + ra.append('application/octet-stream', b'\x00\x80\xff\xfe') + # New length must be 2, scalar must be converted to array + self.assertEqual(2, len(ra)) + self.assertFalse(ra.is_scalar) + self.assertEqual(b'bar', ra.get_bytes(0)) + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1)) + + # Append a third value + ra.append('text/plain', 'Hello, World!') + # New length must be 3, array must remain array + self.assertEqual(3, len(ra)) + self.assertFalse(ra.is_scalar) + self.assertEqual(b'bar', ra.get_bytes(0)) + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1)) + self.assertEqual(b'Hello, World!', ra.get_bytes(2)) + + def test_clear_empty(self): + """ + Test clearing an empty RequestArgument. + """ + # Initialize the empty RequestArgument + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + ra.clear() + # Clearing an empty RequestArgument shouldn't have any effect + self.assertEqual('foo', ra.name) + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + def test_clear_scalar(self): + """ + Test clearing a scalar RequestArgument. + """ + # Initialize the scalar RequestArgument + ra = RequestArgument('foo', ('text/plain', 'bar')) + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + ra.clear() + # Clearing a scalar RequestArgument should reduce its size to 0 + self.assertEqual('foo', ra.name) + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + with self.assertRaises(IndexError): + ra.get_str() + + def test_clear_array(self): + """ + Test clearing an array RequestArgument. + """ + # Initialize the array RequestArgument + ra = RequestArgument('foo', [ + ('text/plain', 'bar'), + ('application/octet-stream', b'\x00\x80\xff\xfe'), + ('text/plain', 'baz'), + ]) + self.assertEqual(3, len(ra)) + self.assertFalse(ra.is_scalar) + ra.clear() + # Clearing an array RequestArgument should reduce its size to 0 + self.assertEqual('foo', ra.name) + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + with self.assertRaises(IndexError): + ra.get_str() + + def test_iterate_empty(self): + """ + Test iterating an empty RequestArgument. + """ + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + # No value must be yielded from iterating an empty instance + for _ in ra: + self.fail() + + def test_iterate_scalar(self): + """ + Test iterating a scalar RequestArgument. + """ + ra = RequestArgument('foo', ('text/plain', 'bar')) + self.assertTrue(ra.is_scalar) + # Counter for the number of iterations + count: int = 0 + for it in ra: + # Make sure the yielded value is a scalar view and has the same name as the original instance + self.assertIsInstance(it, _View) + self.assertTrue(it.is_view) + self.assertEqual('foo', it.name) + self.assertTrue(it.is_scalar) + count += 1 + # Only one value must be yielded from iterating a scalar instance + self.assertEqual(1, count) + + def test_iterate_array(self): + """ + Test iterating an array RequestArgument. + """ + ra = RequestArgument('foo', [('text/plain', 'bar'), ('abc', b'def'), ('xyz', '1337')]) + self.assertFalse(ra.is_scalar) + # Container to put the iterated ctypes into + items: List[str] = list() + for it in ra: + # Make sure the yielded values are scalar views and have the same name as the original instance + self.assertIsInstance(it, _View) + self.assertTrue(it.is_view) + self.assertTrue(it.is_scalar) + # Collect the value's ctype + items.append(it.get_content_type()) + # Compare collected ctypes with expected result + self.assertEqual(['text/plain', 'abc', 'xyz'], items) + + def test_slice(self): + """ + Test slicing an array RequestArgument. + """ + ra = RequestArgument('foo', [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h'), ('i', 'j'), ('k', 'l')]) + # Create the sliced view + sliced = ra[1:4:2] + # Make sure the sliced value is a view + self.assertIsInstance(sliced, _View) + self.assertTrue(sliced.is_view) + # Make sure the slice has the same name + self.assertEqual('foo', sliced.name) + # Make sure the slice has the expected shape (array of the 2nd and 4th scalar in the original) + self.assertTrue(sliced.is_array) + self.assertEqual(2, len(sliced)) + self.assertEqual('d', sliced.get_str(0)) + self.assertEqual('h', sliced.get_str(1)) + + def test_iterate_sliced(self): + """ + Test iterating a sliced array RequestArgument. + """ + ra = RequestArgument('foo', [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h'), ('i', 'j'), ('k', 'l')]) + # Container to put the iterated ctypes into + items: List[str] = list() + # Iterate the sliced view + for it in ra[1:4:2]: + # Make sure the yielded values are scalar views and have the same name as the original instance + self.assertIsInstance(it, _View) + self.assertTrue(it.is_view) + self.assertEqual('foo', it.name) + self.assertTrue(it.is_scalar) + items.append(it.get_content_type()) + # Make sure the expected values are collected (array of the 2nd and 4th scalar in the original) + self.assertEqual(['c', 'g'], items) + + def test_index_scalar(self): + """ + Test indexing of a scalar RequestArgument. + """ + ra = RequestArgument('foo', ('bar', 'baz')) + # Index the scalar RequestArgument instance, obtaining an immutable view + it = ra[0] + # Make sure the value is a scalar view with the same properties as the original instance + self.assertIsInstance(it, _View) + self.assertTrue(it.is_scalar) + self.assertEqual('foo', it.name) + self.assertEqual('bar', it.get_content_type()) + self.assertEqual('baz', it.get_str()) + # Make sure other indices don't work + with self.assertRaises(IndexError): + _ = ra[1] + + def test_index_array(self): + """ + Test indexing of an array RequestArgument. + """ + ra = RequestArgument('foo', [('a', 'b'), ('c', 'd')]) + # Index the array RequestArgument instance, obtaining an immutable view + it = ra[1] + # Make sure the value is a scalar view with the same properties as the value in the original instance + self.assertIsInstance(it, _View) + self.assertEqual('foo', it.name) + self.assertEqual('c', it.get_content_type()) + self.assertEqual('d', it.get_str()) + + def test_view_immutable(self): + """ + Test immutability of views. + """ + ra = RequestArgument('foo', ('bar', 'baz')) + # Index the scalar RequestArgument instance, obtaining an immutable view + it = ra[0] + # Make sure the returned value is a view + self.assertIsInstance(it, _View) + # Make sure the returned value is immutable + with self.assertRaises(TypeError): + it.append('foo', 'bar') + with self.assertRaises(TypeError): + it.clear() + + def test_str_shorthand(self): + """ + Test the shorthand for get_str(0). + """ + ra = RequestArgument('foo', ('bar', 'baz')) + self.assertEqual('baz', str(ra)) + + def test_bytes_shorthand(self): + """ + Test the shorthand for get_bytes(0). + """ + ra = RequestArgument('foo', ('bar', b'\x00\x80\xff\xfe')) + self.assertEqual(b'\x00\x80\xff\xfe', bytes(ra)) + + # noinspection PyTypeChecker + def test_insert_garbage(self): + """ + Test proper handling with non-int indices and non-str/non-bytes data + :return: + """ + ra = RequestArgument('foo', 42) + with self.assertRaises(TypeError): + str(ra) + ra = RequestArgument('foo', (None, 42)) + with self.assertRaises(TypeError): + str(ra) + with self.assertRaises(TypeError): + bytes(ra) + with self.assertRaises(TypeError): + ra.get_content_type() + with self.assertRaises(TypeError): + ra.get_str('foo') + with self.assertRaises(TypeError): + ra.get_bytes('foo') + with self.assertRaises(TypeError): + ra.get_content_type('foo') + + def test_requestarguments_index(self): + """ + Make sure indexing a RequestArguments instance creates a new entry on the fly. + """ + ra = RequestArguments() + self.assertEqual(0, len(ra)) + self.assertFalse('foo' in ra) + # Create new entry + _ = ra['foo'] + self.assertEqual(1, len(ra)) + self.assertTrue('foo' in ra) + # Already exists, no new entry created + _ = ra['foo'] + self.assertEqual(1, len(ra)) + # Entry must be empty and mutable, and have the correct name + self.assertFalse(ra['foo'].is_view) + self.assertEqual(0, len(ra['foo'])) + self.assertEqual('foo', ra['foo'].name) + # Key must be a string + with self.assertRaises(TypeError): + # noinspection PyTypeChecker + _ = ra[42] + + def test_requestarguments_attr(self): + """ + Test attribute access syntactic sugar. + """ + ra = RequestArguments() + # Attribute should not exist yet + with self.assertRaises(KeyError): + _ = ra.foo + # Create entry + _ = ra['foo'] + # Creating entry should have created the attribute + self.assertEqual('foo', ra.foo.name) + # Attribute access should yield an immutable view + self.assertTrue(ra.foo.is_view) + + def test_requestarguments_iterate(self): + """ + Test iterating a RequestArguments instance. + """ + # Create an instance with some values + ra = RequestArguments() + ra['foo'].append('a', 'b') + ra['bar'].append('c', 'd') + ra['foo'].append('e', 'f') + # Container for test values (name, value) + items: Set[Tuple[str, str]] = set() + # Iterate RequestArguments instance, adding the name and value of each to the set + for a in ra: + items.add((a.name, str(a))) + # Compare result with expected value + self.assertEqual(2, len(items)) + self.assertIn(('foo', 'b'), items) + self.assertIn(('bar', 'd'), items) + + def test_requestarguments_full_use_case(self): + """ + Simulate a minimal RequestArguments use case. + """ + # Create empty RequestArguments instance + ra = RequestArguments() + # Parse GET request + getargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=42&bar=1337&foo=43&baz=Hello,%20World!') + # Insert GET arguments into RequestArguments + for k, vs in getargs.items(): + for v in vs: + ra[k].append('text/plain', v) + # Parse POST request + postargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=postfoo&postbar=42&foo=postfoo') + # Insert POST arguments into RequestArguments + for k, vs in postargs.items(): + # In this implementation, POST args replace GET args + ra[k].clear() + for v in vs: + ra[k].append('text/plain', v) + + # Someplace else: Use the RequestArguments instance. + self.assertEqual('1337', ra.bar.get_str()) + self.assertEqual('Hello, World!', ra.baz.get_str()) + self.assertEqual('42', ra.postbar.get_str()) + for a in ra.foo: + self.assertEqual('postfoo', a.get_str()) diff --git a/matemat/webserver/test/test_serve.py b/matemat/webserver/test/test_serve.py index f3dc6be..722870b 100644 --- a/matemat/webserver/test/test_serve.py +++ b/matemat/webserver/test/test_serve.py @@ -3,14 +3,14 @@ from typing import Any, Dict import os import os.path -from matemat.webserver.httpd import HttpHandler +from matemat.webserver import HttpHandler, RequestArguments from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet @test_pagelet('/just/testing/serve_pagelet_ok') def serve_test_pagelet_ok(method: str, path: str, - args: Dict[str, str], + args: RequestArguments, session_vars: Dict[str, Any], headers: Dict[str, str]): headers['Content-Type'] = 'text/plain' @@ -20,7 +20,7 @@ def serve_test_pagelet_ok(method: str, @test_pagelet('/just/testing/serve_pagelet_fail') def serve_test_pagelet_fail(method: str, path: str, - args: Dict[str, str], + args: RequestArguments, session_vars: Dict[str, Any], headers: Dict[str, str]): session_vars['test'] = 'hello, world!' @@ -54,7 +54,7 @@ class TestServe(AbstractHttpdTest): self.assertEqual('serve_test_pagelet_ok', packet.pagelet) # Make sure the expected content is served self.assertEqual(200, packet.statuscode) - self.assertEqual('serve test pagelet ok', packet.body) + self.assertEqual(b'serve test pagelet ok', packet.body) def test_serve_pagelet_fail(self): # Call the test pagelet that produces a 500 Internal Server Error result @@ -66,7 +66,7 @@ class TestServe(AbstractHttpdTest): self.assertEqual('serve_test_pagelet_fail', packet.pagelet) # Make sure the expected content is served self.assertEqual(500, packet.statuscode) - self.assertEqual('serve test pagelet fail', packet.body) + self.assertEqual(b'serve test pagelet fail', packet.body) def test_serve_static_ok(self): # Request a static resource @@ -78,7 +78,7 @@ class TestServe(AbstractHttpdTest): self.assertIsNone(packet.pagelet) # Make sure the expected content is served self.assertEqual(200, packet.statuscode) - self.assertEqual('static resource test', packet.body) + self.assertEqual(b'static resource test', packet.body) def test_serve_static_forbidden(self): # Request a static resource with lacking permissions @@ -90,7 +90,7 @@ class TestServe(AbstractHttpdTest): self.assertIsNone(packet.pagelet) # Make sure a 403 header is served self.assertEqual(403, packet.statuscode) - self.assertNotEqual('This should not be readable', packet.body) + self.assertNotEqual(b'This should not be readable', packet.body) def test_serve_not_found(self): # Request a nonexistent resource @@ -116,7 +116,10 @@ class TestServe(AbstractHttpdTest): def test_static_post_not_allowed(self): # Request a resource outside the webroot - self.client_sock.set_request(b'POST /iwanttouploadthis HTTP/1.1\r\n\r\nq=this%20should%20not%20be%20uploaded') + self.client_sock.set_request(b'POST /iwanttopostthis HTTP/1.1\r\n' + b'Content-Type: application/x-www-form-urlencoded\r\n' + b'Content-length: 37\r\n\r\n' + b'q=this%20should%20not%20be%20uploaded') HttpHandler(self.client_sock, ('::1', 45678), self.server) packet = self.client_sock.get_response() diff --git a/matemat/webserver/test/test_session.py b/matemat/webserver/test/test_session.py index b8e21cf..5cf408e 100644 --- a/matemat/webserver/test/test_session.py +++ b/matemat/webserver/test/test_session.py @@ -4,14 +4,14 @@ from typing import Any, Dict from datetime import datetime, timedelta from time import sleep -from matemat.webserver.httpd import HttpHandler +from matemat.webserver import HttpHandler, RequestArguments from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet @test_pagelet('/just/testing/sessions') def session_test_pagelet(method: str, path: str, - args: Dict[str, str], + args: RequestArguments, session_vars: Dict[str, Any], headers: Dict[str, str]): session_vars['test'] = 'hello, world!' diff --git a/matemat/webserver/util.py b/matemat/webserver/util.py new file mode 100644 index 0000000..2bc2244 --- /dev/null +++ b/matemat/webserver/util.py @@ -0,0 +1,135 @@ + +from typing import Dict, List, Tuple, Optional + +import urllib.parse + +from matemat.webserver import RequestArguments, RequestArgument + + +def _parse_multipart(body: bytes, boundary: str) -> List[RequestArgument]: + """ + Given a HTTP body with form-data in multipart form, and the multipart-boundary, parse the multipart items and + return them as a dictionary. + + :param body: The HTTP multipart/form-data body. + :param boundary: The multipart boundary. + :return: A dictionary of field names as key, and content types and field values as value. + """ + # Prepend a CRLF for the first boundary to match + body = b'\r\n' + body + # Generate item header boundary and terminating boundary from general boundary string + _boundary = f'\r\n--{boundary}\r\n'.encode('utf-8') + _end_boundary = f'\r\n--{boundary}--\r\n'.encode('utf-8') + # Split at the end boundary and make sure there comes nothing after it + allparts = body.split(_end_boundary, 1) + if len(allparts) != 2 or allparts[1] != b'': + raise ValueError('Last boundary missing or corrupted') + # Split remaining body into its parts, and verify at least 1 part is there + parts: List[bytes] = (allparts[0]).split(_boundary) + if len(parts) < 1 or parts[0] != b'': + raise ValueError('First boundary missing or corrupted') + # Remove the first, empty part + parts = parts[1:] + + # Results go into this dict + args: Dict[str, RequestArgument] = dict() + + # Parse each multipart part + for part in parts: + # Parse multipart headers + hdr: Dict[str, str] = dict() + while True: + head, part = part.split(b'\r\n', 1) + # Break on header/body delimiter + if head == b'': + break + # Add header to hdr dict + hk, hv = head.decode('utf-8').split(':') + hdr[hk.strip()] = hv.strip() + # At least Content-Type and Content-Disposition must be present + if 'Content-Type' not in hdr or 'Content-Disposition' not in hdr: + raise ValueError('Missing Content-Type or Content-Disposition header') + # Extract Content-Disposition header value and its arguments + cd, *cdargs = hdr['Content-Disposition'].split(';') + # Content-Disposition MUST be form-data; everything else is rejected + if cd.strip() != 'form-data': + raise ValueError(f'Unknown Content-Disposition: {cd}') + # Extract the "name" header argument + has_name = False + for cdarg in cdargs: + k, v = cdarg.split('=', 1) + if k.strip() == 'name': + has_name = True + name: str = v.strip() + # Remove quotation marks around the name value + if name.startswith('"') and name.endswith('"'): + name = v[1:-1] + # Add the Content-Type and the content to the header, with the provided name + if name not in args: + args[name] = RequestArgument(name) + args[name].append(hdr['Content-Type'].strip(), part) + if not has_name: + # Content-Disposition header without name attribute + raise ValueError('mutlipart/form-data part without name attribute') + + return list(args.values()) + + +def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 'text/plain') \ + -> Tuple[str, RequestArguments]: + """ + Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded or + multipart/form-data form, parse the arguments and return them as a dictionary. + + If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded. + + :param request: The request string to parse. + :param postbody: The POST body to parse, defaults to None. + :param enctype: Encoding of the POST body; supported values are application/x-www-form-urlencoded and + multipart/form-data. + :return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs, and the value's + content type. + """ + # Parse the request "URL" (i.e. only the path) + tokens = urllib.parse.urlparse(request) + # Parse the GET arguments + if len(tokens.query) == 0: + getargs = dict() + else: + getargs = urllib.parse.parse_qs(tokens.query, strict_parsing=True, keep_blank_values=True, errors='strict') + + args = RequestArguments() + for k, vs in getargs.items(): + args[k].clear() + for v in vs: + args[k].append('text/plain', v) + + if postbody is not None: + if enctype == 'application/x-www-form-urlencoded': + # Parse the POST body + pb: str = postbody.decode('utf-8') + if len(pb) == 0: + postargs = dict() + else: + postargs = urllib.parse.parse_qs(pb, strict_parsing=True, keep_blank_values=True, errors='strict') + # Write all POST values into the dict, overriding potential duplicates from GET + for k, vs in postargs.items(): + args[k].clear() + for v in vs: + args[k].append('text/plain', v) + elif enctype.startswith('multipart/form-data'): + # Parse the multipart boundary from the Content-Type header + try: + boundary: str = enctype.split('boundary=')[1].strip() + except IndexError: + raise ValueError('Multipart boundary in header not set or corrupted') + # Parse the multipart body + mpargs = _parse_multipart(postbody, boundary) + for ra in mpargs: + args[ra.name].clear() + for a in ra: + args[ra.name].append(a.get_content_type(), bytes(a)) + else: + raise ValueError(f'Unsupported Content-Type: {enctype}') + # Return the path and the parsed arguments + return tokens.path, args