From 118de8bf95f344ea0c567040334a229d089544f6 Mon Sep 17 00:00:00 2001 From: s3lph Date: Thu, 28 Jun 2018 23:58:01 +0200 Subject: [PATCH] New request parsing (WIP: Documentation) --- matemat/webserver/__init__.py | 1 + matemat/webserver/httpd.py | 14 +- matemat/webserver/pagelets/__init__.py | 1 - matemat/webserver/pagelets/login.py | 16 +- matemat/webserver/pagelets/logout.py | 4 +- matemat/webserver/pagelets/main.py | 4 +- matemat/webserver/pagelets/touchkey.py | 14 +- matemat/webserver/pagelets/upload_test.py | 28 -- matemat/webserver/requestargs.py | 121 ++++++ matemat/webserver/test/abstract_httpd_test.py | 6 +- matemat/webserver/test/test_parse_request.py | 347 ++++++++++++++++++ matemat/webserver/test/test_post.py | 84 ++--- matemat/webserver/test/test_requestargs.py | 204 ++++++++++ matemat/webserver/test/test_serve.py | 6 +- matemat/webserver/test/test_session.py | 4 +- matemat/webserver/util.py | 64 ++-- 16 files changed, 774 insertions(+), 144 deletions(-) delete mode 100644 matemat/webserver/pagelets/upload_test.py create mode 100644 matemat/webserver/requestargs.py create mode 100644 matemat/webserver/test/test_parse_request.py create mode 100644 matemat/webserver/test/test_requestargs.py diff --git a/matemat/webserver/__init__.py b/matemat/webserver/__init__.py index f4d86f3..1b4ab06 100644 --- a/matemat/webserver/__init__.py +++ b/matemat/webserver/__init__.py @@ -6,4 +6,5 @@ API that can be used by 'pagelets' - single pages of a web service. If a reques server will attempt to serve the request with a static resource in a previously configured webroot directory. """ +from .requestargs import RequestArgument from .httpd import MatematWebserver, HttpHandler, pagelet diff --git a/matemat/webserver/httpd.py b/matemat/webserver/httpd.py index a4e9cca..79efb98 100644 --- a/matemat/webserver/httpd.py +++ b/matemat/webserver/httpd.py @@ -1,5 +1,5 @@ -from typing import Any, Callable, Dict, List, Tuple, Union +from typing import Any, Callable, Dict, Tuple, Union import traceback @@ -13,6 +13,7 @@ from uuid import uuid4 from datetime import datetime, timedelta from matemat import __version__ as matemat_version +from matemat.webserver import RequestArgument from matemat.webserver.util import parse_args @@ -30,7 +31,7 @@ BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None # Dictionary to hold registered pagelet paths and their handler functions _PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...) str, # Request path - Dict[str, Tuple[str, Union[bytes, str, List[str]]]], # args: (name, (type, value)) + Dict[str, RequestArgument], # args: (name, argument) Dict[str, Any], # Session vars Dict[str, str]], # Response headers Tuple[int, Union[bytes, str]]]] = dict() # Returns: (status code, response body) @@ -50,15 +51,14 @@ def pagelet(path: str): (method: str, path: str, - args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + args: Dict[str, RequestArgument], session_vars: Dict[str, Any], headers: Dict[str, str]) -> (int, Optional[Union[str, bytes]]) method: The HTTP method (GET, POST) that was used. path: The path that was requested. - args: The arguments that were passed with the request (as GET or POST arguments), each of which may be - either a str or bytes object, or a list of str. + args: The arguments that were passed with the request (as GET or POST arguments). session_vars: The session storage. May be read from and written to. headers: The dictionary of HTTP response headers. Add headers you wish to send with the response. returns: A tuple consisting of the HTTP status code (as an int) and the response body (as str or bytes, @@ -69,7 +69,7 @@ def pagelet(path: str): def http_handler(fun: Callable[[str, str, - Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + Dict[str, RequestArgument], Dict[str, Any], Dict[str, str]], Tuple[int, Union[bytes, str]]]): @@ -181,7 +181,7 @@ class HttpHandler(BaseHTTPRequestHandler): if session_id in self.server.session_vars: del self.server.session_vars[session_id] - def _handle(self, method: str, path: str, args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]]) -> None: + def _handle(self, method: str, path: str, args: Dict[str, RequestArgument]) -> None: """ Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource. diff --git a/matemat/webserver/pagelets/__init__.py b/matemat/webserver/pagelets/__init__.py index 71ded5e..9b926d6 100644 --- a/matemat/webserver/pagelets/__init__.py +++ b/matemat/webserver/pagelets/__init__.py @@ -8,4 +8,3 @@ from .main import main_page from .login import login_page from .logout import logout from .touchkey import touchkey_page -from .upload_test import upload_test diff --git a/matemat/webserver/pagelets/login.py b/matemat/webserver/pagelets/login.py index 8fbe831..f7813b4 100644 --- a/matemat/webserver/pagelets/login.py +++ b/matemat/webserver/pagelets/login.py @@ -1,8 +1,8 @@ -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, Optional, Tuple, Union from matemat.exceptions import AuthenticationError -from matemat.webserver import pagelet +from matemat.webserver import pagelet, RequestArgument from matemat.primitives import User from matemat.db import MatematDatabase @@ -10,7 +10,7 @@ from matemat.db import MatematDatabase @pagelet('/login') def login_page(method: str, path: str, - args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + args: Dict[str, RequestArgument], session_vars: Dict[str, Any], headers: Dict[str, str])\ -> Tuple[int, Optional[Union[str, bytes]]]: @@ -43,15 +43,11 @@ def login_page(method: str, ''' return 200, data.format(msg=args['msg'] if 'msg' in args else '') elif method == 'POST': - if 'username' not in args or not isinstance(args['username'], str): - return 400, None - if 'password' not in args or not isinstance(args['password'], str): - return 400, None - username: str = args['username'] - password: str = args['password'] + username: RequestArgument = args['username'] + password: RequestArgument = args['password'] with MatematDatabase('test.db') as db: try: - user: User = db.login(username, password) + user: User = db.login(username.get_str(), password.get_str()) except AuthenticationError: headers['Location'] = '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.' return 301, bytes() diff --git a/matemat/webserver/pagelets/logout.py b/matemat/webserver/pagelets/logout.py index 53a292a..b70d7c1 100644 --- a/matemat/webserver/pagelets/logout.py +++ b/matemat/webserver/pagelets/logout.py @@ -1,13 +1,13 @@ from typing import Any, Dict, List, Optional, Tuple, Union -from matemat.webserver import pagelet +from matemat.webserver import pagelet, RequestArgument @pagelet('/logout') def logout(method: str, path: str, - args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + args: Dict[str, RequestArgument], session_vars: Dict[str, Any], headers: Dict[str, str])\ -> Tuple[int, Optional[Union[str, bytes]]]: diff --git a/matemat/webserver/pagelets/main.py b/matemat/webserver/pagelets/main.py index d2dd208..2b9ce79 100644 --- a/matemat/webserver/pagelets/main.py +++ b/matemat/webserver/pagelets/main.py @@ -1,7 +1,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union -from matemat.webserver import MatematWebserver, pagelet +from matemat.webserver import MatematWebserver, pagelet, RequestArgument from matemat.primitives import User from matemat.db import MatematDatabase @@ -9,7 +9,7 @@ from matemat.db import MatematDatabase @pagelet('/') def main_page(method: str, path: str, - args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + args: Dict[str, RequestArgument], session_vars: Dict[str, Any], headers: Dict[str, str])\ -> Tuple[int, Optional[Union[str, bytes]]]: diff --git a/matemat/webserver/pagelets/touchkey.py b/matemat/webserver/pagelets/touchkey.py index 2a8202d..22e3df4 100644 --- a/matemat/webserver/pagelets/touchkey.py +++ b/matemat/webserver/pagelets/touchkey.py @@ -2,7 +2,7 @@ from typing import Any, Dict, List, Optional, Tuple, Union from matemat.exceptions import AuthenticationError -from matemat.webserver import pagelet +from matemat.webserver import pagelet, RequestArgument from matemat.primitives import User from matemat.db import MatematDatabase @@ -10,7 +10,7 @@ from matemat.db import MatematDatabase @pagelet('/touchkey') def touchkey_page(method: str, path: str, - args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + args: Dict[str, RequestArgument], session_vars: Dict[str, Any], headers: Dict[str, str])\ -> Tuple[int, Optional[Union[str, bytes]]]: @@ -42,15 +42,11 @@ def touchkey_page(method: str, ''' return 200, data.format(username=args['username'] if 'username' in args else '') elif method == 'POST': - if 'username' not in args or not isinstance(args['username'], str): - return 400, None - if 'touchkey' not in args or not isinstance(args['touchkey'], str): - return 400, None - username: str = args['username'] - touchkey: str = args['touchkey'] + username: RequestArgument = args['username'] + touchkey: RequestArgument = args['touchkey'] with MatematDatabase('test.db') as db: try: - user: User = db.login(username, touchkey=touchkey) + user: User = db.login(username.get_str(), touchkey=touchkey.get_str()) except AuthenticationError: headers['Location'] = f'/touchkey?username={args["username"]}&msg=Please%20try%20again.' return 301, bytes() diff --git a/matemat/webserver/pagelets/upload_test.py b/matemat/webserver/pagelets/upload_test.py deleted file mode 100644 index a6f1e85..0000000 --- a/matemat/webserver/pagelets/upload_test.py +++ /dev/null @@ -1,28 +0,0 @@ - -from typing import Any, Dict, Union - -from matemat.webserver import pagelet - - -@pagelet('/upload') -def upload_test(method: str, - path: str, - args: Dict[str, Union[str, bytes]], - session_vars: Dict[str, Any], - headers: Dict[str, str]): - if method == 'GET': - return 200, ''' - - - -
- - - -
- - - ''' - else: - headers['Content-Type'] = 'text/plain' - return 200, args.items().__str__() diff --git a/matemat/webserver/requestargs.py b/matemat/webserver/requestargs.py new file mode 100644 index 0000000..a35f759 --- /dev/null +++ b/matemat/webserver/requestargs.py @@ -0,0 +1,121 @@ + +from typing import List, Optional, Tuple, Union + + +class RequestArgument(object): + + def __init__(self, + name: str, + value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]] = None) -> None: + self.__name: str = name + self.__value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]] = None + if value is None: + self.__value = [] + else: + if isinstance(value, list): + if len(value) == 1: + self.__value = value[0] + else: + self.__value = value + else: + self.__value = value + + @property + def is_array(self) -> bool: + return isinstance(self.__value, list) + + @property + def is_scalar(self) -> bool: + return not isinstance(self.__value, list) + + @property + def is_view(self) -> bool: + return False + + @property + def name(self) -> str: + return self.__name + + def get_str(self, index: int = None) -> Optional[str]: + if self.is_array: + if index is None: + raise ValueError('index must not be None') + v: Tuple[str, Union[bytes, str]] = self.__value[index] + if isinstance(v[1], str): + return v[1] + elif isinstance(v[1], bytes): + return v[1].decode('utf-8') + else: + if index is not None: + raise ValueError('index must be None') + if isinstance(self.__value[1], str): + return self.__value[1] + elif isinstance(self.__value[1], bytes): + return self.__value[1].decode('utf-8') + + def get_bytes(self, index: int = None) -> Optional[bytes]: + if self.is_array: + if index is None: + raise ValueError('index must not be None') + v: Tuple[str, Union[bytes, str]] = self.__value[index] + if isinstance(v[1], bytes): + return v[1] + elif isinstance(v[1], str): + return v[1].encode('utf-8') + else: + if index is not None: + raise ValueError('index must be None') + if isinstance(self.__value[1], bytes): + return self.__value[1] + elif isinstance(self.__value[1], str): + return self.__value[1].encode('utf-8') + + def get_content_type(self, index: int = None) -> Optional[str]: + if self.is_array: + if index is None: + raise ValueError('index must not be None') + v: Tuple[str, Union[bytes, str]] = self.__value[index] + return v[0] + else: + if index is not None: + raise ValueError('index must be None') + return self.__value[0] + + def append(self, ctype: str, value: Union[str, bytes]): + if self.is_view: + raise TypeError('A RequestArgument view is immutable!') + if len(self) == 0: + self.__value = ctype, value + else: + if self.is_scalar: + self.__value = [self.__value] + self.__value.append((ctype, value)) + + def __len__(self): + return len(self.__value) if self.is_array else 1 + + def __iter__(self): + if self.is_scalar: + yield _View(self.__name, self.__value) + else: + # Typing helper + _value: List[Tuple[str, Union[bytes, str]]] = self.__value + for v in _value: + yield _View(self.__name, v) + + def __getitem__(self, index: Union[int, slice]): + if self.is_scalar: + if index == 0: + return _View(self.__name, self.__value) + raise ValueError('Scalar RequestArgument only indexable with 0') + return _View(self.__name, self.__value[index]) + + +class _View(RequestArgument): + + def __init__(self, name: str, value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]]): + super().__init__(name, value) + + @property + def is_view(self) -> bool: + return True diff --git a/matemat/webserver/test/abstract_httpd_test.py b/matemat/webserver/test/abstract_httpd_test.py index b96767e..daa1126 100644 --- a/matemat/webserver/test/abstract_httpd_test.py +++ b/matemat/webserver/test/abstract_httpd_test.py @@ -9,7 +9,7 @@ from abc import ABC from datetime import datetime from http.server import HTTPServer -from matemat.webserver.httpd import pagelet +from matemat.webserver import pagelet, RequestArgument class HttpResponse: @@ -158,14 +158,14 @@ def test_pagelet(path: str): def with_testing_headers(fun: Callable[[str, str, - Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + Dict[str, RequestArgument], Dict[str, Any], Dict[str, str]], Tuple[int, Union[bytes, str]]]): @pagelet(path) def testing_wrapper(method: str, path: str, - args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + args: Dict[str, RequestArgument], session_vars: Dict[str, Any], headers: Dict[str, str]): status, body = fun(method, path, args, session_vars, headers) diff --git a/matemat/webserver/test/test_parse_request.py b/matemat/webserver/test/test_parse_request.py new file mode 100644 index 0000000..a533936 --- /dev/null +++ b/matemat/webserver/test/test_parse_request.py @@ -0,0 +1,347 @@ + +import unittest + +from matemat.webserver.util import parse_args + + +class TestParseRequest(unittest.TestCase): + + def test_parse_get_root(self): + path, args = parse_args('/') + self.assertEqual('/', path) + self.assertEqual(0, len(args)) + + def test_parse_get_no_args(self): + path, args = parse_args('/index.html') + self.assertEqual('/index.html', path) + self.assertEqual(0, len(args)) + + def test_parse_get_root_getargs(self): + path, args = parse_args('/?foo=42&bar=1337&baz=Hello,%20World!') + self.assertEqual('/', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args.keys()) + self.assertIn('bar', args.keys()) + self.assertIn('baz', args.keys()) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('text/plain', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual('1337', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_get_getargs(self): + path, args = parse_args('/abc/def?foo=42&bar=1337&baz=Hello,%20World!') + self.assertEqual('/abc/def', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args.keys()) + self.assertIn('bar', args.keys()) + self.assertIn('baz', args.keys()) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('text/plain', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual('1337', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_get_getarray(self): + path, args = parse_args('/abc/def?foo=42&foo=1337&baz=Hello,%20World!') + self.assertEqual('/abc/def', path) + self.assertEqual(2, len(args)) + self.assertIn('foo', args.keys()) + self.assertIn('baz', args.keys()) + self.assertTrue(args['foo'].is_array) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual(2, len(args['foo'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + + def test_parse_get_zero_arg(self): + path, args = parse_args('/abc/def?foo=&bar=42') + self.assertEqual(2, len(args)) + self.assertIn('foo', args.keys()) + self.assertIn('bar', args.keys()) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertEqual(1, len(args['foo'])) + self.assertEqual('', args['foo'].get_str()) + self.assertEqual('42', args['bar'].get_str()) + + def test_parse_get_urlencoded_encoding_fail(self): + with self.assertRaises(ValueError): + parse_args('/?foo=42&bar=%80&baz=Hello,%20World!') + + def test_parse_post_urlencoded(self): + path, args = parse_args('/', + postbody=b'foo=42&bar=1337&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + self.assertEqual('/', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args.keys()) + self.assertIn('bar', args.keys()) + self.assertIn('baz', args.keys()) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('text/plain', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual('1337', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_post_urlencoded_array(self): + path, args = parse_args('/', + postbody=b'foo=42&foo=1337&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + self.assertEqual('/', path) + self.assertEqual(2, len(args)) + self.assertIn('foo', args.keys()) + self.assertIn('baz', args.keys()) + self.assertTrue(args['foo'].is_array) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual(2, len(args['foo'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + + def test_parse_post_urlencoded_zero_arg(self): + path, args = parse_args('/abc/def', postbody=b'foo=&bar=42', enctype='application/x-www-form-urlencoded') + self.assertEqual(2, len(args)) + self.assertIn('foo', args.keys()) + self.assertIn('bar', args.keys()) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertEqual(1, len(args['foo'])) + self.assertEqual('', args['foo'].get_str()) + self.assertEqual('42', args['bar'].get_str()) + + def test_parse_post_urlencoded_encoding_fail(self): + with self.assertRaises(ValueError): + parse_args('/', + postbody=b'foo=42&bar=%80&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + + def test_parse_post_multipart_no_args(self): + path, args = parse_args('/', + postbody=b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertEqual('/', path) + self.assertEqual(0, len(args)) + + def test_parse_post_multipart(self): + path, args = parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertEqual('/', path) + self.assertEqual(3, len(args)) + self.assertIn('foo', args.keys()) + self.assertIn('bar', args.keys()) + self.assertIn('baz', args.keys()) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertTrue(args['baz'].is_scalar) + self.assertEqual('text/plain', args['foo'].get_content_type()) + self.assertEqual('application/octet-stream', args['bar'].get_content_type()) + self.assertEqual('text/plain', args['baz'].get_content_type()) + self.assertEqual('42', args['foo'].get_str()) + self.assertEqual(b'1337', args['bar'].get_bytes()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_parse_post_multipart_zero_arg(self): + path, args = parse_args('/abc/def', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertEqual(2, len(args)) + self.assertIn('foo', args.keys()) + self.assertIn('bar', args.keys()) + self.assertTrue(args['foo'].is_scalar) + self.assertTrue(args['bar'].is_scalar) + self.assertEqual(1, len(args['foo'])) + self.assertEqual('', args['foo'].get_str()) + self.assertEqual('42', args['bar'].get_str()) + + def test_parse_post_multipart_broken_boundaries(self): + with self.assertRaises(ValueError): + # Boundary not defined in Content-Type + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data') + with self.assertRaises(ValueError): + # Corrupted "--" head at first boundary + parse_args('/', + postbody=b'-+testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Missing "--" tail at end boundary + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Missing Content-Type header in one part + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Missing Content-Disposition header in one part + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Missing form-data name argument + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + with self.assertRaises(ValueError): + # Unknown Content-Disposition + parse_args('/', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: attachment; name="bar"; filename="bar.bin"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + + def test_get_post_precedence_urlencoded(self): + path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived', + postbody=b'foo=42&foo=1337&baz=Hello,%20World!', + enctype='application/x-www-form-urlencoded') + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertEqual(2, len(args['foo'])) + self.assertEqual(1, len(args['bar'])) + self.assertEqual(1, len(args['baz'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + self.assertEqual('isurvived', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) + + def test_get_post_precedence_multipart(self): + path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived', + postbody=b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'42\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="foo"; filename="bar.bin"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'1337\r\n' + b'--testBoundary1337\r\n' + b'Content-Disposition: form-data; name="baz"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'--testBoundary1337--\r\n', + enctype='multipart/form-data; boundary=testBoundary1337') + self.assertIn('foo', args) + self.assertIn('bar', args) + self.assertIn('baz', args) + self.assertEqual(2, len(args['foo'])) + self.assertEqual(1, len(args['bar'])) + self.assertEqual(1, len(args['baz'])) + self.assertEqual('42', args['foo'].get_str(0)) + self.assertEqual('1337', args['foo'].get_str(1)) + self.assertEqual('isurvived', args['bar'].get_str()) + self.assertEqual('Hello, World!', args['baz'].get_str()) diff --git a/matemat/webserver/test/test_post.py b/matemat/webserver/test/test_post.py index 0c6e3d2..9b2fe22 100644 --- a/matemat/webserver/test/test_post.py +++ b/matemat/webserver/test/test_post.py @@ -1,7 +1,7 @@ from typing import Any, Dict, List, Tuple, Union -from matemat.webserver.httpd import HttpHandler +from matemat.webserver import HttpHandler, RequestArgument from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet import codecs @@ -10,7 +10,7 @@ import codecs @test_pagelet('/just/testing/post') def post_test_pagelet(method: str, path: str, - args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + args: Dict[str, RequestArgument], session_vars: Dict[str, Any], headers: Dict[str, str]): """ @@ -18,13 +18,12 @@ def post_test_pagelet(method: str, """ headers['Content-Type'] = 'text/plain' dump: str = '' - for k, (t, v) in args.items(): - if t.startswith('text/'): - if isinstance(v, bytes): - v = v.decode('utf-8') - dump += f'{k}: {",".join(v) if isinstance(v, list) else v}\n' - else: - dump += f'{k}: {codecs.encode(v, "hex").decode("utf-8")}\n' + for k, ra in args.items(): + for a in ra: + if a.get_content_type().startswith('text/'): + dump += f'{k}: {a.get_str()}\n' + else: + dump += f'{k}: {codecs.encode(a.get_bytes(), "hex").decode("utf-8")}\n' return 200, dump @@ -118,10 +117,14 @@ class TestPost(AbstractHttpdTest): kv: Dict[str, str] = dict() for l in lines: k, v = l.decode('utf-8').split(':', 1) - kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') - + k = k.strip() + v = v.strip() + if k in kv: + kv[k] += f',{v}' + else: + kv[k] = v # Make sure the arguments were properly parsed - self.assertListEqual(['bar', 'baz'], kv['foo']) + self.assertEqual('bar,baz', kv['foo']) self.assertEqual('1', kv['test']) def test_post_urlenc_post_array(self): @@ -141,10 +144,14 @@ class TestPost(AbstractHttpdTest): kv: Dict[str, str] = dict() for l in lines: k, v = l.decode('utf-8').split(':', 1) - kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') - + k = k.strip() + v = v.strip() + if k in kv: + kv[k] += f',{v}' + else: + kv[k] = v # Make sure the arguments were properly parsed - self.assertListEqual(['bar', 'baz'], kv['foo']) + self.assertEqual('bar,baz', kv['foo']) self.assertEqual('1', kv['test']) def test_post_urlenc_mixed_array(self): @@ -164,12 +171,16 @@ class TestPost(AbstractHttpdTest): kv: Dict[str, str] = dict() for l in lines: k, v = l.decode('utf-8').split(':', 1) - kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') - + k = k.strip() + v = v.strip() + if k in kv: + kv[k] += f',{v}' + else: + kv[k] = v # Make sure the arguments were properly parsed - self.assertListEqual(['postbar', 'postbaz'], kv['foo']) - self.assertListEqual(['1', '42'], kv['gettest']) - self.assertListEqual(['1', '2'], kv['posttest']) + self.assertEqual('postbar,postbaz', kv['foo']) + self.assertEqual('1,42', kv['gettest']) + self.assertEqual('1,2', kv['posttest']) def test_post_no_body(self): """ @@ -184,7 +195,7 @@ class TestPost(AbstractHttpdTest): def test_post_multipart_post_only(self): """ - Test a POST request with a miltipart/form-data body. + Test a POST request with a miutipart/form-data body. """ # Send POST request formdata = (b'------testboundary\r\n' @@ -211,34 +222,3 @@ class TestPost(AbstractHttpdTest): self.assertIn('bar', kv) self.assertEqual(kv['foo'], b'Hello, World!') self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f') - - def test_post_multipart_mixed(self): - """ - Test a POST request with a miltipart/form-data body. - """ - # Send POST request - formdata = (b'------testboundary\r\n' - b'Content-Disposition: form-data; name="foo"\r\n' - b'Content-Type: text/plain\r\n\r\n' - b'Hello, World!\r\n' - b'------testboundary\r\n' - b'Content-Disposition: form-data; name="bar"; filename="foo.bar"\r\n' - b'Content-Type: application/octet-stream\r\n\r\n' - b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x80\x0b\x0c\x73\x0e\x0f\r\n' - b'------testboundary--\r\n') - - self.client_sock.set_request(f'POST /just/testing/post?getfoo=bar&foo=thisshouldbegone HTTP/1.1\r\n' - f'Content-Type: multipart/form-data; boundary=----testboundary\r\n' - f'Content-Length: {len(formdata)}\r\n\r\n'.encode('utf-8') + formdata) - HttpHandler(self.client_sock, ('::1', 45678), self.server) - packet = self.client_sock.get_response() - lines: List[bytes] = packet.body.split(b'\n')[:-1] - kv: Dict[str, Any] = dict() - for l in lines: - k, v = l.split(b':', 1) - kv[k.decode('utf-8').strip()] = v.strip() - self.assertIn('foo', kv) - self.assertIn('bar', kv) - self.assertEqual(kv['getfoo'], b'bar') - self.assertEqual(kv['foo'], b'Hello, World!') - self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f') diff --git a/matemat/webserver/test/test_requestargs.py b/matemat/webserver/test/test_requestargs.py new file mode 100644 index 0000000..dcdde14 --- /dev/null +++ b/matemat/webserver/test/test_requestargs.py @@ -0,0 +1,204 @@ + +from typing import List + +import unittest + +from matemat.webserver import RequestArgument +# noinspection PyProtectedMember +from matemat.webserver.requestargs import _View + + +class TestRequestArguments(unittest.TestCase): + + def test_create_default(self): + ra = RequestArgument('foo') + self.assertEqual('foo', ra.name) + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + self.assertTrue(ra.is_array) + self.assertFalse(ra.is_view) + + def test_create_str_scalar(self): + ra = RequestArgument('foo', ('text/plain', 'bar')) + self.assertEqual('foo', ra.name) + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertFalse(ra.is_array) + self.assertEqual('bar', ra.get_str()) + self.assertEqual(b'bar', ra.get_bytes()) + self.assertEqual('text/plain', ra.get_content_type()) + with self.assertRaises(ValueError): + self.assertEqual('bar', ra.get_str(0)) + with self.assertRaises(ValueError): + self.assertEqual('bar', ra.get_bytes(0)) + with self.assertRaises(ValueError): + self.assertEqual('bar', ra.get_content_type(0)) + self.assertFalse(ra.is_view) + + def test_create_str_scalar_array(self): + ra = RequestArgument('foo', [('text/plain', 'bar')]) + self.assertEqual('foo', ra.name) + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertFalse(ra.is_array) + self.assertEqual('bar', ra.get_str()) + self.assertEqual(b'bar', ra.get_bytes()) + self.assertEqual('text/plain', ra.get_content_type()) + with self.assertRaises(ValueError): + self.assertEqual('bar', ra.get_str(0)) + with self.assertRaises(ValueError): + self.assertEqual('bar', ra.get_bytes(0)) + with self.assertRaises(ValueError): + self.assertEqual('bar', ra.get_content_type(0)) + self.assertFalse(ra.is_view) + + def test_create_bytes_scalar(self): + ra = RequestArgument('foo', ('application/octet-stream', b'\x00\x80\xff\xfe')) + self.assertEqual('foo', ra.name) + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertFalse(ra.is_array) + with self.assertRaises(UnicodeDecodeError): + ra.get_str() + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes()) + self.assertEqual('application/octet-stream', ra.get_content_type()) + with self.assertRaises(ValueError): + self.assertEqual('bar', ra.get_str(0)) + with self.assertRaises(ValueError): + self.assertEqual('bar', ra.get_bytes(0)) + with self.assertRaises(ValueError): + self.assertEqual('bar', ra.get_content_type(0)) + self.assertFalse(ra.is_view) + + def test_create_array(self): + ra = RequestArgument('foo', [ + ('text/plain', 'bar'), + ('application/octet-stream', b'\x00\x80\xff\xfe') + ]) + self.assertEqual('foo', ra.name) + self.assertEqual(2, len(ra)) + self.assertFalse(ra.is_scalar) + self.assertTrue(ra.is_array) + with self.assertRaises(ValueError): + ra.get_str() + with self.assertRaises(ValueError): + ra.get_bytes() + with self.assertRaises(ValueError): + ra.get_content_type() + self.assertEqual('bar', ra.get_str(0)) + self.assertEqual(b'bar', ra.get_bytes(0)) + self.assertEqual('text/plain', ra.get_content_type(0)) + with self.assertRaises(UnicodeDecodeError): + ra.get_str(1) + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1)) + self.assertEqual('application/octet-stream', ra.get_content_type(1)) + self.assertFalse(ra.is_view) + + def test_append_empty_str(self): + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + ra.append('text/plain', 'bar') + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + self.assertEqual('bar', ra.get_str()) + self.assertEqual(b'bar', ra.get_bytes()) + self.assertEqual('text/plain', ra.get_content_type()) + self.assertFalse(ra.is_view) + + def test_append_empty_bytes(self): + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + ra.append('application/octet-stream', b'\x00\x80\xff\xfe') + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + with self.assertRaises(UnicodeDecodeError): + ra.get_str() + self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes()) + self.assertEqual('application/octet-stream', ra.get_content_type()) + self.assertFalse(ra.is_view) + + def test_append_multiple(self): + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + ra.append('text/plain', 'bar') + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + + ra.append('application/octet-stream', b'\x00\x80\xff\xfe') + self.assertEqual(2, len(ra)) + self.assertFalse(ra.is_scalar) + + ra.append('text/plain', 'Hello, World!') + self.assertEqual(3, len(ra)) + self.assertFalse(ra.is_scalar) + + def test_iterate_empty(self): + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + for _ in ra: + self.fail() + + def test_iterate_scalar(self): + ra = RequestArgument('foo', ('text/plain', 'bar')) + self.assertTrue(ra.is_scalar) + count: int = 0 + for it in ra: + self.assertIsInstance(it, _View) + self.assertEqual('foo', it.name) + self.assertTrue(it.is_view) + self.assertTrue(it.is_scalar) + count += 1 + self.assertEqual(1, count) + + def test_iterate_array(self): + ra = RequestArgument('foo', [('text/plain', 'bar'), ('abc', b'def'), ('xyz', '1337')]) + self.assertFalse(ra.is_scalar) + items: List[str] = list() + for it in ra: + self.assertIsInstance(it, _View) + self.assertTrue(it.is_view) + self.assertTrue(it.is_scalar) + items.append(it.get_content_type()) + self.assertEqual(['text/plain', 'abc', 'xyz'], items) + + def test_iterate_sliced(self): + ra = RequestArgument('foo', [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h'), ('i', 'j'), ('k', 'l')]) + self.assertFalse(ra.is_scalar) + items: List[str] = list() + for it in ra[1:4:2]: + self.assertIsInstance(it, _View) + self.assertTrue(it.is_view) + self.assertTrue(it.is_scalar) + items.append(it.get_content_type()) + self.assertEqual(['c', 'g'], items) + + def test_index_scalar(self): + ra = RequestArgument('foo', ('bar', 'baz')) + it = ra[0] + self.assertIsInstance(it, _View) + self.assertEqual('foo', it.name) + self.assertEqual('bar', it.get_content_type()) + self.assertEqual('baz', it.get_str()) + with self.assertRaises(ValueError): + _ = ra[1] + + def test_index_array(self): + ra = RequestArgument('foo', [('a', 'b'), ('c', 'd')]) + it = ra[1] + self.assertIsInstance(it, _View) + self.assertEqual('foo', it.name) + self.assertEqual('c', it.get_content_type()) + self.assertEqual('d', it.get_str()) + + def test_view_immutable(self): + ra = RequestArgument('foo', ('bar', 'baz')) + it = ra[0] + self.assertIsInstance(it, _View) + with self.assertRaises(TypeError): + it.append('foo', 'bar') diff --git a/matemat/webserver/test/test_serve.py b/matemat/webserver/test/test_serve.py index 0556764..7e159e3 100644 --- a/matemat/webserver/test/test_serve.py +++ b/matemat/webserver/test/test_serve.py @@ -3,14 +3,14 @@ from typing import Any, Dict, Union import os import os.path -from matemat.webserver.httpd import HttpHandler +from matemat.webserver import HttpHandler, RequestArgument from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet @test_pagelet('/just/testing/serve_pagelet_ok') def serve_test_pagelet_ok(method: str, path: str, - args: Dict[str, Union[bytes, str]], + args: Dict[str, RequestArgument], session_vars: Dict[str, Any], headers: Dict[str, str]): headers['Content-Type'] = 'text/plain' @@ -20,7 +20,7 @@ def serve_test_pagelet_ok(method: str, @test_pagelet('/just/testing/serve_pagelet_fail') def serve_test_pagelet_fail(method: str, path: str, - args: Dict[str, Union[bytes, str]], + args: Dict[str, RequestArgument], session_vars: Dict[str, Any], headers: Dict[str, str]): session_vars['test'] = 'hello, world!' diff --git a/matemat/webserver/test/test_session.py b/matemat/webserver/test/test_session.py index 50ade85..fe30529 100644 --- a/matemat/webserver/test/test_session.py +++ b/matemat/webserver/test/test_session.py @@ -4,14 +4,14 @@ from typing import Any, Dict, Union from datetime import datetime, timedelta from time import sleep -from matemat.webserver.httpd import HttpHandler +from matemat.webserver import HttpHandler, RequestArgument from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet @test_pagelet('/just/testing/sessions') def session_test_pagelet(method: str, path: str, - args: Dict[str, Union[bytes, str]], + args: Dict[str, RequestArgument], session_vars: Dict[str, Any], headers: Dict[str, str]): session_vars['test'] = 'hello, world!' diff --git a/matemat/webserver/util.py b/matemat/webserver/util.py index 931f759..85ef721 100644 --- a/matemat/webserver/util.py +++ b/matemat/webserver/util.py @@ -3,8 +3,10 @@ from typing import Dict, List, Tuple, Optional, Union import urllib.parse +from matemat.webserver import RequestArgument -def _parse_multipart(body: bytes, boundary: str) -> Dict[str, List[Tuple[str, Union[bytes, str]]]]: + +def _parse_multipart(body: bytes, boundary: str) -> List[RequestArgument]: """ Given a HTTP body with form-data in multipart form, and the multipart-boundary, parse the multipart items and return them as a dictionary. @@ -13,6 +15,8 @@ def _parse_multipart(body: bytes, boundary: str) -> Dict[str, List[Tuple[str, Un :param boundary: The multipart boundary. :return: A dictionary of field names as key, and content types and field values as value. """ + # Prepend a CRLF for the first boundary to match + body = b'\r\n' + body # Generate item header boundary and terminating boundary from general boundary string _boundary = f'\r\n--{boundary}\r\n'.encode('utf-8') _end_boundary = f'\r\n--{boundary}--\r\n'.encode('utf-8') @@ -20,16 +24,15 @@ def _parse_multipart(body: bytes, boundary: str) -> Dict[str, List[Tuple[str, Un allparts = body.split(_end_boundary, 1) if len(allparts) != 2 or allparts[1] != b'': raise ValueError('Last boundary missing or corrupted') - # Split remaining body into its parts (appending a CRLF for the first boundary to match), and verify at least 1 part - # is there - parts: List[bytes] = (b'\r\n' + allparts[0]).split(_boundary) + # Split remaining body into its parts, and verify at least 1 part is there + parts: List[bytes] = (allparts[0]).split(_boundary) if len(parts) < 1 or parts[0] != b'': raise ValueError('First boundary missing or corrupted') # Remove the first, empty part parts = parts[1:] # Results go into this dict - args: Dict[str, List[Tuple[str, Union[bytes, str]]]] = dict() + args: Dict[str, RequestArgument] = dict() # Parse each multipart part for part in parts: @@ -50,25 +53,29 @@ def _parse_multipart(body: bytes, boundary: str) -> Dict[str, List[Tuple[str, Un cd, *cdargs = hdr['Content-Disposition'].split(';') # Content-Disposition MUST be form-data; everything else is rejected if cd.strip() != 'form-data': - raise ValueError(f'Unknown Content-Disposition: cd') + raise ValueError(f'Unknown Content-Disposition: {cd}') # Extract the "name" header argument + has_name = False for cdarg in cdargs: k, v = cdarg.split('=', 1) if k.strip() == 'name': + has_name = True name: str = v.strip() # Remove quotation marks around the name value if name.startswith('"') and name.endswith('"'): name = v[1:-1] # Add the Content-Type and the content to the header, with the provided name if name not in args: - args[name] = list() - args[name].append((hdr['Content-Type'].strip(), part)) + args[name] = RequestArgument(name) + args[name].append(hdr['Content-Type'].strip(), part) + if not has_name: + raise ValueError('mutlipart/form-data part without name attribute') - return args + return list(args.values()) def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 'text/plain') \ - -> Tuple[str, Dict[str, Tuple[str, Union[bytes, str, List[str]]]]]: + -> Tuple[str, Dict[str, RequestArgument]]: """ Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded or multipart/form-data form, parse the arguments and return them as a dictionary. @@ -85,34 +92,41 @@ def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 't # Parse the request "URL" (i.e. only the path) tokens = urllib.parse.urlparse(request) # Parse the GET arguments - getargs = urllib.parse.parse_qs(tokens.query) + if len(tokens.query) == 0: + getargs = dict() + else: + getargs = urllib.parse.parse_qs(tokens.query, strict_parsing=True, keep_blank_values=True, errors='strict') - # TODO: { 'foo': [ ('text/plain', 'bar'), ('application/octet-stream', '\x80') ] } - # TODO: Use a @dataclass once Python 3.7 is out - args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]] = dict() + args: Dict[str, RequestArgument] = dict() for k, v in getargs.items(): - args[k] = 'text/plain', v + args[k] = RequestArgument(k) + for _v in v: + args[k].append('text/plain', _v) if postbody is not None: if enctype == 'application/x-www-form-urlencoded': # Parse the POST body - postargs = urllib.parse.parse_qs(postbody.decode('utf-8')) + pb: str = postbody.decode('utf-8') + if len(pb) == 0: + postargs = dict() + else: + postargs = urllib.parse.parse_qs(pb, strict_parsing=True, keep_blank_values=True, errors='strict') # Write all POST values into the dict, overriding potential duplicates from GET for k, v in postargs.items(): - args[k] = 'text/plain', v + args[k] = RequestArgument(k) + for _v in v: + args[k].append('text/plain', _v) elif enctype.startswith('multipart/form-data'): # Parse the multipart boundary from the Content-Type header - boundary: str = enctype.split('boundary=')[1] + try: + boundary: str = enctype.split('boundary=')[1].strip() + except IndexError: + raise ValueError('Multipart boundary in header not set or corrupted') # Parse the multipart body mpargs = _parse_multipart(postbody, boundary) - for k, v in mpargs.items(): - # TODO: Process all values, not just the first - args[k] = v[0] + for ra in mpargs: + args[ra.name] = ra else: raise ValueError(f'Unsupported Content-Type: {enctype}') - # urllib.parse.parse_qs turns ALL arguments into arrays. This turns arrays of length 1 into scalar values - for (k, (ct, v)) in args.items(): - if len(v) == 1: - args[k] = ct, v[0] # Return the path and the parsed arguments return tokens.path, args