From f702eccc57e38789687175cc118870a8611b999e Mon Sep 17 00:00:00 2001 From: s3lph Date: Wed, 27 Jun 2018 21:17:18 +0200 Subject: [PATCH] First implementation of multipart/form-data parsing --- matemat/webserver/httpd.py | 90 ++++++------- matemat/webserver/pagelets/__init__.py | 1 + matemat/webserver/pagelets/login.py | 19 ++- matemat/webserver/pagelets/logout.py | 9 +- matemat/webserver/pagelets/main.py | 8 +- matemat/webserver/pagelets/touchkey.py | 18 ++- matemat/webserver/pagelets/upload_test.py | 28 ++++ matemat/webserver/test/abstract_httpd_test.py | 28 ++-- matemat/webserver/test/test_post.py | 123 ++++++++++++++---- matemat/webserver/test/test_serve.py | 19 +-- matemat/webserver/test/test_session.py | 4 +- matemat/webserver/util.py | 118 +++++++++++++++++ 12 files changed, 357 insertions(+), 108 deletions(-) create mode 100644 matemat/webserver/pagelets/upload_test.py create mode 100644 matemat/webserver/util.py diff --git a/matemat/webserver/httpd.py b/matemat/webserver/httpd.py index 220849c..a4e9cca 100644 --- a/matemat/webserver/httpd.py +++ b/matemat/webserver/httpd.py @@ -1,12 +1,11 @@ -from typing import Any, Callable, Dict, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Tuple, Union import traceback import os import socket import mimetypes -import urllib.parse from socketserver import TCPServer from http.server import HTTPServer, BaseHTTPRequestHandler from http.cookies import SimpleCookie @@ -14,6 +13,7 @@ from uuid import uuid4 from datetime import datetime, timedelta from matemat import __version__ as matemat_version +from matemat.webserver.util import parse_args # @@ -28,12 +28,17 @@ BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None # Dictionary to hold registered pagelet paths and their handler functions -_PAGELET_PATHS: Dict[str, Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes], - Tuple[int, Union[bytes, str]]]] = dict() +_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...) + str, # Request path + Dict[str, Tuple[str, Union[bytes, str, List[str]]]], # args: (name, (type, value)) + Dict[str, Any], # Session vars + Dict[str, str]], # Response headers + Tuple[int, Union[bytes, str]]]] = dict() # Returns: (status code, response body) # Inactivity timeout for client sessions _SESSION_TIMEOUT: int = 3600 +_MAX_POST: int = 1_000_000 def pagelet(path: str): @@ -43,12 +48,17 @@ def pagelet(path: str): The function must have the following signature: - (method: str, path: str, args: Dict[str, Union[str, List[str]], session_vars: Dict[str, Any], - headers: Dict[str, str]) -> (int, Optional[Union[str, bytes]]) + (method: str, + path: str, + args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + session_vars: Dict[str, Any], + headers: Dict[str, str]) + -> (int, Optional[Union[str, bytes]]) method: The HTTP method (GET, POST) that was used. path: The path that was requested. - args: The arguments that were passed with the request (as GET or POST arguments). + args: The arguments that were passed with the request (as GET or POST arguments), each of which may be + either a str or bytes object, or a list of str. session_vars: The session storage. May be read from and written to. headers: The dictionary of HTTP response headers. Add headers you wish to send with the response. returns: A tuple consisting of the HTTP status code (as an int) and the response body (as str or bytes, @@ -56,7 +66,12 @@ def pagelet(path: str): :param path: The path to register the function for. """ - def http_handler(fun: Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes], + + def http_handler(fun: Callable[[str, + str, + Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + Dict[str, Any], + Dict[str, str]], Tuple[int, Union[bytes, str]]]): # Add the function to the dict of pagelets _PAGELET_PATHS[path] = fun @@ -166,7 +181,7 @@ class HttpHandler(BaseHTTPRequestHandler): if session_id in self.server.session_vars: del self.server.session_vars[session_id] - def _handle(self, method: str, path: str, args: Dict[str, Union[str, List[str]]]) -> None: + def _handle(self, method: str, path: str, args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]]) -> None: """ Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource. @@ -238,7 +253,7 @@ class HttpHandler(BaseHTTPRequestHandler): mimetype = 'application/octet-stream' # Send content type and length header self.send_header('Content-Type', mimetype) - self.send_header('Content-Length', len(data)) + self.send_header('Content-Length', str(len(data))) self.end_headers() # Send the requested resource as response body self.wfile.write(data) @@ -247,36 +262,6 @@ class HttpHandler(BaseHTTPRequestHandler): self.send_response(404) self.end_headers() - @staticmethod - def _parse_args(request: str, postbody: Optional[str] = None) -> Tuple[str, Dict[str, Union[str, List[str]]]]: - """ - Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded form, parse the - arguments and return them as a dictionary. - - If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded. - - :param request: The request string to parse. - :param postbody: The POST body to parse, defaults to None. - :return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs. - """ - # Parse the request "URL" (i.e. only the path) - tokens = urllib.parse.urlparse(request) - # Parse the GET arguments - args = urllib.parse.parse_qs(tokens.query) - - if postbody is not None: - # Parse the POST body - postargs = urllib.parse.parse_qs(postbody) - # Write all POST values into the dict, overriding potential duplicates from GET - for k, v in postargs.items(): - args[k] = v - # urllib.parse.parse_qs turns ALL arguments into arrays. This turns arrays of length 1 into scalar values - for k, v in args.items(): - if len(v) == 1: - args[k] = v[0] - # Return the path and the parsed arguments - return tokens.path, args - # noinspection PyPep8Naming def do_GET(self) -> None: """ @@ -284,7 +269,7 @@ class HttpHandler(BaseHTTPRequestHandler): """ try: # Parse the request and hand it to the handle function - path, args = self._parse_args(self.path) + path, args = parse_args(self.path) self._handle('GET', path, args) # Special handling for some errors except PermissionError: @@ -305,25 +290,24 @@ class HttpHandler(BaseHTTPRequestHandler): """ try: # Read the POST body, if it exists, and its MIME type is application/x-www-form-urlencoded - clen: str = self.headers.get('Content-Length', failobj='0') + clen: int = int(str(self.headers.get('Content-Length', failobj='0'))) + if clen > _MAX_POST: + raise ValueError('Request too big') ctype: str = self.headers.get('Content-Type', failobj='application/octet-stream') - post: str = '' - if ctype == 'application/x-www-form-urlencoded': - post = self.rfile.read(int(clen)).decode('utf-8') + post: bytes = self.rfile.read(clen) + path, args = parse_args(self.path, postbody=post, enctype=ctype) # Parse the request and hand it to the handle function - path, args = self._parse_args(self.path, postbody=post) self._handle('POST', path, args) - # Special handling for some errors - except PermissionError as e: + # Special handling for some errors + except PermissionError: self.send_response(403, 'Forbidden') self.end_headers() - print(e) - traceback.print_tb(e.__traceback__) - except ValueError as e: + except ValueError: + self.send_response(400, 'Bad Request') + self.end_headers() + except TypeError: self.send_response(400, 'Bad Request') self.end_headers() - print(e) - traceback.print_tb(e.__traceback__) except BaseException as e: # Generic error handling self.send_response(500, 'Internal Server Error') diff --git a/matemat/webserver/pagelets/__init__.py b/matemat/webserver/pagelets/__init__.py index 9b926d6..71ded5e 100644 --- a/matemat/webserver/pagelets/__init__.py +++ b/matemat/webserver/pagelets/__init__.py @@ -8,3 +8,4 @@ from .main import main_page from .login import login_page from .logout import logout from .touchkey import touchkey_page +from .upload_test import upload_test diff --git a/matemat/webserver/pagelets/login.py b/matemat/webserver/pagelets/login.py index 876fd71..8fbe831 100644 --- a/matemat/webserver/pagelets/login.py +++ b/matemat/webserver/pagelets/login.py @@ -1,5 +1,5 @@ -from typing import Any, Dict +from typing import Any, Dict, List, Optional, Tuple, Union from matemat.exceptions import AuthenticationError from matemat.webserver import pagelet @@ -8,7 +8,12 @@ from matemat.db import MatematDatabase @pagelet('/login') -def login_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]): +def login_page(method: str, + path: str, + args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + session_vars: Dict[str, Any], + headers: Dict[str, str])\ + -> Tuple[int, Optional[Union[str, bytes]]]: if 'user' in session_vars: headers['Location'] = '/' return 301, None @@ -38,13 +43,19 @@ def login_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[ ''' return 200, data.format(msg=args['msg'] if 'msg' in args else '') elif method == 'POST': - print(args) + if 'username' not in args or not isinstance(args['username'], str): + return 400, None + if 'password' not in args or not isinstance(args['password'], str): + return 400, None + username: str = args['username'] + password: str = args['password'] with MatematDatabase('test.db') as db: try: - user: User = db.login(args['username'], args['password']) + user: User = db.login(username, password) except AuthenticationError: headers['Location'] = '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.' return 301, bytes() session_vars['user'] = user headers['Location'] = '/' return 301, bytes() + return 405, None diff --git a/matemat/webserver/pagelets/logout.py b/matemat/webserver/pagelets/logout.py index 86095b0..53a292a 100644 --- a/matemat/webserver/pagelets/logout.py +++ b/matemat/webserver/pagelets/logout.py @@ -1,11 +1,16 @@ -from typing import Any, Dict +from typing import Any, Dict, List, Optional, Tuple, Union from matemat.webserver import pagelet @pagelet('/logout') -def logout(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]): +def logout(method: str, + path: str, + args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + session_vars: Dict[str, Any], + headers: Dict[str, str])\ + -> Tuple[int, Optional[Union[str, bytes]]]: if 'user' in session_vars: del session_vars['user'] headers['Location'] = '/' diff --git a/matemat/webserver/pagelets/main.py b/matemat/webserver/pagelets/main.py index 2ead15d..d2dd208 100644 --- a/matemat/webserver/pagelets/main.py +++ b/matemat/webserver/pagelets/main.py @@ -1,5 +1,5 @@ -from typing import Any, Dict, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union from matemat.webserver import MatematWebserver, pagelet from matemat.primitives import User @@ -7,7 +7,11 @@ from matemat.db import MatematDatabase @pagelet('/') -def main_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str])\ +def main_page(method: str, + path: str, + args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + session_vars: Dict[str, Any], + headers: Dict[str, str])\ -> Tuple[int, Optional[Union[str, bytes]]]: data = ''' diff --git a/matemat/webserver/pagelets/touchkey.py b/matemat/webserver/pagelets/touchkey.py index fd99fea..2a8202d 100644 --- a/matemat/webserver/pagelets/touchkey.py +++ b/matemat/webserver/pagelets/touchkey.py @@ -1,5 +1,5 @@ -from typing import Any, Dict +from typing import Any, Dict, List, Optional, Tuple, Union from matemat.exceptions import AuthenticationError from matemat.webserver import pagelet @@ -8,7 +8,12 @@ from matemat.db import MatematDatabase @pagelet('/touchkey') -def touchkey_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]): +def touchkey_page(method: str, + path: str, + args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + session_vars: Dict[str, Any], + headers: Dict[str, str])\ + -> Tuple[int, Optional[Union[str, bytes]]]: if 'user' in session_vars: headers['Location'] = '/' return 301, bytes() @@ -37,12 +42,19 @@ def touchkey_page(method: str, path: str, args: Dict[str, str], session_vars: Di ''' return 200, data.format(username=args['username'] if 'username' in args else '') elif method == 'POST': + if 'username' not in args or not isinstance(args['username'], str): + return 400, None + if 'touchkey' not in args or not isinstance(args['touchkey'], str): + return 400, None + username: str = args['username'] + touchkey: str = args['touchkey'] with MatematDatabase('test.db') as db: try: - user: User = db.login(args['username'], touchkey=args['touchkey']) + user: User = db.login(username, touchkey=touchkey) except AuthenticationError: headers['Location'] = f'/touchkey?username={args["username"]}&msg=Please%20try%20again.' return 301, bytes() session_vars['user'] = user headers['Location'] = '/' return 301, None + return 405, None diff --git a/matemat/webserver/pagelets/upload_test.py b/matemat/webserver/pagelets/upload_test.py new file mode 100644 index 0000000..a6f1e85 --- /dev/null +++ b/matemat/webserver/pagelets/upload_test.py @@ -0,0 +1,28 @@ + +from typing import Any, Dict, Union + +from matemat.webserver import pagelet + + +@pagelet('/upload') +def upload_test(method: str, + path: str, + args: Dict[str, Union[str, bytes]], + session_vars: Dict[str, Any], + headers: Dict[str, str]): + if method == 'GET': + return 200, ''' + + + +
+ + + +
+ + + ''' + else: + headers['Content-Type'] = 'text/plain' + return 200, args.items().__str__() diff --git a/matemat/webserver/test/abstract_httpd_test.py b/matemat/webserver/test/abstract_httpd_test.py index de0daf6..b96767e 100644 --- a/matemat/webserver/test/abstract_httpd_test.py +++ b/matemat/webserver/test/abstract_httpd_test.py @@ -1,5 +1,5 @@ -from typing import Any, Callable, Dict, Tuple, Union +from typing import Any, Callable, Dict, List, Tuple, Union import unittest.mock from io import BytesIO @@ -31,8 +31,8 @@ class HttpResponse: 'Content-Length': 0 } self.pagelet: str = None - # The response body. Only UTF-8 strings are supported - self.body: str = '' + # The response body + self.body: bytes = bytes() # Parsing phase, one of 'begin', 'hdr', 'body' or 'done' self.parse_phase = 'begin' # Buffer for uncompleted lines @@ -55,7 +55,7 @@ class HttpResponse: return # If in the body phase, simply decode and append to the body, while the body is not complete yet elif self.parse_phase == 'body': - self.body += fragment.decode('utf-8') + self.body += fragment if len(self.body) >= int(self.headers['Content-Length']): self.__finalize() return @@ -66,24 +66,24 @@ class HttpResponse: if not fragment.endswith(b'\r\n'): # Special treatment for no trailing CR-LF: Add remainder to buffer head, tail = fragment.rsplit(b'\r\n', 1) - data: str = (self.buffer + head).decode('utf-8') + data: bytes = (self.buffer + head) self.buffer = tail else: - data: str = (self.buffer + fragment).decode('utf-8') + data: bytes = (self.buffer + fragment) self.buffer = bytes() # Iterate the lines that are ready to be parsed - for line in data.split('\r\n'): + for line in data.split(b'\r\n'): # The 'begin' phase indicates that the parser is waiting for the HTTP status line if self.parse_phase == 'begin': - if line.startswith('HTTP/'): + if line.startswith(b'HTTP/'): # Parse the statuscode and advance to header parsing - _, statuscode, _ = line.split(' ', 2) + _, statuscode, _ = line.decode('utf-8').split(' ', 2) self.statuscode = int(statuscode) self.parse_phase = 'hdr' elif self.parse_phase == 'hdr': # Parse a header line and add it to the header dict if len(line) > 0: - k, v = line.split(':', 1) + k, v = line.decode('utf-8').split(':', 1) self.headers[k.strip()] = v.strip() else: # Empty line separates header from body @@ -156,12 +156,16 @@ class MockSocket(bytes): def test_pagelet(path: str): - def with_testing_headers(fun: Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str]], + def with_testing_headers(fun: Callable[[str, + str, + Dict[str, Tuple[str, Union[bytes, str, List[str]]]], + Dict[str, Any], + Dict[str, str]], Tuple[int, Union[bytes, str]]]): @pagelet(path) def testing_wrapper(method: str, path: str, - args: Dict[str, str], + args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], session_vars: Dict[str, Any], headers: Dict[str, str]): status, body = fun(method, path, args, session_vars, headers) diff --git a/matemat/webserver/test/test_post.py b/matemat/webserver/test/test_post.py index ad99247..511c6e3 100644 --- a/matemat/webserver/test/test_post.py +++ b/matemat/webserver/test/test_post.py @@ -1,14 +1,16 @@ -from typing import Any, Dict, List +from typing import Any, Dict, List, Tuple,Union from matemat.webserver.httpd import HttpHandler from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet +import codecs + @test_pagelet('/just/testing/post') def post_test_pagelet(method: str, path: str, - args: Dict[str, str], + args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]], session_vars: Dict[str, Any], headers: Dict[str, str]): """ @@ -16,8 +18,13 @@ def post_test_pagelet(method: str, """ headers['Content-Type'] = 'text/plain' dump: str = '' - for k, v in args.items(): - dump += f'{k}: {v if isinstance(v, str) else ",".join(v)}\n' + for k, (t, v) in args.items(): + if t.startswith('text/'): + if isinstance(v, bytes): + v = v.decode('utf-8') + dump += f'{k}: {",".join(v) if isinstance(v, list) else v}\n' + else: + dump += f'{k}: {codecs.encode(v, "hex").decode("utf-8")}\n' return 200, dump @@ -26,7 +33,7 @@ class TestPost(AbstractHttpdTest): Test cases for the content serving of the web server. """ - def test_post_get_only_args(self): + def test_post_urlenc_get_only_args(self): """ Test a POST request that only contains GET arguments. """ @@ -38,17 +45,17 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) + k, v = l.decode('utf-8').split(':', 1) kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') # Make sure the arguments were properly parsed self.assertEqual('bar', kv['foo']) self.assertEqual('1', kv['test']) - def test_post_post_only_args(self): + def test_post_urlenc_post_only_args(self): """ Test a POST request that only contains POST arguments (urlencoded). """ @@ -61,17 +68,17 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) + k, v = l.decode('utf-8').split(':', 1) kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') # Make sure the arguments were properly parsed self.assertEqual('bar', kv['foo']) self.assertEqual('1', kv['test']) - def test_post_mixed_args(self): + def test_post_urlenc_mixed_args(self): """ Test that mixed POST and GET args are properly parsed, and that POST takes precedence over GET. """ @@ -84,10 +91,10 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) + k, v = l.decode('utf-8').split(':', 1) kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') # Make sure the arguments were properly parsed @@ -95,7 +102,7 @@ class TestPost(AbstractHttpdTest): self.assertEqual('1', kv['gettest']) self.assertEqual('2', kv['posttest']) - def test_post_get_array(self): + def test_post_urlenc_get_array(self): """ Test a POST request that contains GET array arguments. """ @@ -107,17 +114,17 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) + k, v = l.decode('utf-8').split(':', 1) kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') # Make sure the arguments were properly parsed self.assertListEqual(['bar', 'baz'], kv['foo']) self.assertEqual('1', kv['test']) - def test_post_post_array(self): + def test_post_urlenc_post_array(self): """ Test a POST request that contains POST array arguments. """ @@ -130,17 +137,17 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) + k, v = l.decode('utf-8').split(':', 1) kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') # Make sure the arguments were properly parsed self.assertListEqual(['bar', 'baz'], kv['foo']) self.assertEqual('1', kv['test']) - def test_post_mixed_array(self): + def test_post_urlenc_mixed_array(self): """ Test a POST request that contains both GET and POST array arguments. """ @@ -153,13 +160,85 @@ class TestPost(AbstractHttpdTest): packet = self.client_sock.get_response() # Parse response body - lines: List[str] = packet.body.split('\n')[:-1] + lines: List[bytes] = packet.body.split(b'\n')[:-1] kv: Dict[str, str] = dict() for l in lines: - k, v = l.split(':', 1) + k, v = l.decode('utf-8').split(':', 1) kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',') # Make sure the arguments were properly parsed self.assertListEqual(['postbar', 'postbaz'], kv['foo']) self.assertListEqual(['1', '42'], kv['gettest']) self.assertListEqual(['1', '2'], kv['posttest']) + + def test_post_no_body(self): + """ + Test a POST request that contains no headers or body. + """ + # Send POST request + self.client_sock.set_request(b'POST /just/testing/post?foo=bar HTTP/1.1\r\n\r\n') + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + # Make sure a 400 Bad Request is returned + self.assertEqual(400, packet.statuscode) + + def test_post_multipart_post_only(self): + """ + Test a POST request with a miltipart/form-data body. + """ + # Send POST request + formdata = (b'------testboundary\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'------testboundary\r\n' + b'Content-Disposition: form-data; name="bar"; filename="foo.bar"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x80\x0b\x0c\x73\x0e\x0f\r\n' + b'------testboundary--\r\n') + + self.client_sock.set_request(f'POST /just/testing/post HTTP/1.1\r\n' + f'Content-Type: multipart/form-data; boundary=----testboundary\r\n' + f'Content-Length: {len(formdata)}\r\n\r\n'.encode('utf-8') + formdata) + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + lines: List[bytes] = packet.body.split(b'\n')[:-1] + kv: Dict[str, Any] = dict() + for l in lines: + k, v = l.split(b':', 1) + kv[k.decode('utf-8').strip()] = v.strip() + self.assertIn('foo', kv) + self.assertIn('bar', kv) + self.assertEqual(kv['foo'], b'Hello, World!') + self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f') + + def test_post_multipart_mixed(self): + """ + Test a POST request with a miltipart/form-data body. + """ + # Send POST request + formdata = (b'------testboundary\r\n' + b'Content-Disposition: form-data; name="foo"\r\n' + b'Content-Type: text/plain\r\n\r\n' + b'Hello, World!\r\n' + b'------testboundary\r\n' + b'Content-Disposition: form-data; name="bar"; filename="foo.bar"\r\n' + b'Content-Type: application/octet-stream\r\n\r\n' + b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x80\x0b\x0c\x73\x0e\x0f\r\n' + b'------testboundary--\r\n') + + self.client_sock.set_request(f'POST /just/testing/post?getfoo=bar&foo=thisshouldbegone HTTP/1.1\r\n' + f'Content-Type: multipart/form-data; boundary=----testboundary\r\n' + f'Content-Length: {len(formdata)}\r\n\r\n'.encode('utf-8') + formdata) + HttpHandler(self.client_sock, ('::1', 45678), self.server) + packet = self.client_sock.get_response() + lines: List[bytes] = packet.body.split(b'\n')[:-1] + kv: Dict[str, Any] = dict() + for l in lines: + k, v = l.split(b':', 1) + kv[k.decode('utf-8').strip()] = v.strip() + self.assertIn('foo', kv) + self.assertIn('bar', kv) + self.assertEqual(kv['getfoo'], b'bar') + self.assertEqual(kv['foo'], b'Hello, World!') + self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f') diff --git a/matemat/webserver/test/test_serve.py b/matemat/webserver/test/test_serve.py index f3dc6be..0556764 100644 --- a/matemat/webserver/test/test_serve.py +++ b/matemat/webserver/test/test_serve.py @@ -1,5 +1,5 @@ -from typing import Any, Dict +from typing import Any, Dict, Union import os import os.path @@ -10,7 +10,7 @@ from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_p @test_pagelet('/just/testing/serve_pagelet_ok') def serve_test_pagelet_ok(method: str, path: str, - args: Dict[str, str], + args: Dict[str, Union[bytes, str]], session_vars: Dict[str, Any], headers: Dict[str, str]): headers['Content-Type'] = 'text/plain' @@ -20,7 +20,7 @@ def serve_test_pagelet_ok(method: str, @test_pagelet('/just/testing/serve_pagelet_fail') def serve_test_pagelet_fail(method: str, path: str, - args: Dict[str, str], + args: Dict[str, Union[bytes, str]], session_vars: Dict[str, Any], headers: Dict[str, str]): session_vars['test'] = 'hello, world!' @@ -54,7 +54,7 @@ class TestServe(AbstractHttpdTest): self.assertEqual('serve_test_pagelet_ok', packet.pagelet) # Make sure the expected content is served self.assertEqual(200, packet.statuscode) - self.assertEqual('serve test pagelet ok', packet.body) + self.assertEqual(b'serve test pagelet ok', packet.body) def test_serve_pagelet_fail(self): # Call the test pagelet that produces a 500 Internal Server Error result @@ -66,7 +66,7 @@ class TestServe(AbstractHttpdTest): self.assertEqual('serve_test_pagelet_fail', packet.pagelet) # Make sure the expected content is served self.assertEqual(500, packet.statuscode) - self.assertEqual('serve test pagelet fail', packet.body) + self.assertEqual(b'serve test pagelet fail', packet.body) def test_serve_static_ok(self): # Request a static resource @@ -78,7 +78,7 @@ class TestServe(AbstractHttpdTest): self.assertIsNone(packet.pagelet) # Make sure the expected content is served self.assertEqual(200, packet.statuscode) - self.assertEqual('static resource test', packet.body) + self.assertEqual(b'static resource test', packet.body) def test_serve_static_forbidden(self): # Request a static resource with lacking permissions @@ -90,7 +90,7 @@ class TestServe(AbstractHttpdTest): self.assertIsNone(packet.pagelet) # Make sure a 403 header is served self.assertEqual(403, packet.statuscode) - self.assertNotEqual('This should not be readable', packet.body) + self.assertNotEqual(b'This should not be readable', packet.body) def test_serve_not_found(self): # Request a nonexistent resource @@ -116,7 +116,10 @@ class TestServe(AbstractHttpdTest): def test_static_post_not_allowed(self): # Request a resource outside the webroot - self.client_sock.set_request(b'POST /iwanttouploadthis HTTP/1.1\r\n\r\nq=this%20should%20not%20be%20uploaded') + self.client_sock.set_request(b'POST /iwanttopostthis HTTP/1.1\r\n' + b'Content-Type: application/x-www-form-urlencoded\r\n' + b'Content-length: 37\r\n\r\n' + b'q=this%20should%20not%20be%20uploaded') HttpHandler(self.client_sock, ('::1', 45678), self.server) packet = self.client_sock.get_response() diff --git a/matemat/webserver/test/test_session.py b/matemat/webserver/test/test_session.py index b8e21cf..50ade85 100644 --- a/matemat/webserver/test/test_session.py +++ b/matemat/webserver/test/test_session.py @@ -1,5 +1,5 @@ -from typing import Any, Dict +from typing import Any, Dict, Union from datetime import datetime, timedelta from time import sleep @@ -11,7 +11,7 @@ from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_p @test_pagelet('/just/testing/sessions') def session_test_pagelet(method: str, path: str, - args: Dict[str, str], + args: Dict[str, Union[bytes, str]], session_vars: Dict[str, Any], headers: Dict[str, str]): session_vars['test'] = 'hello, world!' diff --git a/matemat/webserver/util.py b/matemat/webserver/util.py new file mode 100644 index 0000000..931f759 --- /dev/null +++ b/matemat/webserver/util.py @@ -0,0 +1,118 @@ + +from typing import Dict, List, Tuple, Optional, Union + +import urllib.parse + + +def _parse_multipart(body: bytes, boundary: str) -> Dict[str, List[Tuple[str, Union[bytes, str]]]]: + """ + Given a HTTP body with form-data in multipart form, and the multipart-boundary, parse the multipart items and + return them as a dictionary. + + :param body: The HTTP multipart/form-data body. + :param boundary: The multipart boundary. + :return: A dictionary of field names as key, and content types and field values as value. + """ + # Generate item header boundary and terminating boundary from general boundary string + _boundary = f'\r\n--{boundary}\r\n'.encode('utf-8') + _end_boundary = f'\r\n--{boundary}--\r\n'.encode('utf-8') + # Split at the end boundary and make sure there comes nothing after it + allparts = body.split(_end_boundary, 1) + if len(allparts) != 2 or allparts[1] != b'': + raise ValueError('Last boundary missing or corrupted') + # Split remaining body into its parts (appending a CRLF for the first boundary to match), and verify at least 1 part + # is there + parts: List[bytes] = (b'\r\n' + allparts[0]).split(_boundary) + if len(parts) < 1 or parts[0] != b'': + raise ValueError('First boundary missing or corrupted') + # Remove the first, empty part + parts = parts[1:] + + # Results go into this dict + args: Dict[str, List[Tuple[str, Union[bytes, str]]]] = dict() + + # Parse each multipart part + for part in parts: + # Parse multipart headers + hdr: Dict[str, str] = dict() + while True: + head, part = part.split(b'\r\n', 1) + # Break on header/body delimiter + if head == b'': + break + # Add header to hdr dict + hk, hv = head.decode('utf-8').split(':') + hdr[hk.strip()] = hv.strip() + # At least Content-Type and Content-Disposition must be present + if 'Content-Type' not in hdr or 'Content-Disposition' not in hdr: + raise ValueError('Missing Content-Type or Content-Disposition header') + # Extract Content-Disposition header value and its arguments + cd, *cdargs = hdr['Content-Disposition'].split(';') + # Content-Disposition MUST be form-data; everything else is rejected + if cd.strip() != 'form-data': + raise ValueError(f'Unknown Content-Disposition: cd') + # Extract the "name" header argument + for cdarg in cdargs: + k, v = cdarg.split('=', 1) + if k.strip() == 'name': + name: str = v.strip() + # Remove quotation marks around the name value + if name.startswith('"') and name.endswith('"'): + name = v[1:-1] + # Add the Content-Type and the content to the header, with the provided name + if name not in args: + args[name] = list() + args[name].append((hdr['Content-Type'].strip(), part)) + + return args + + +def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 'text/plain') \ + -> Tuple[str, Dict[str, Tuple[str, Union[bytes, str, List[str]]]]]: + """ + Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded or + multipart/form-data form, parse the arguments and return them as a dictionary. + + If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded. + + :param request: The request string to parse. + :param postbody: The POST body to parse, defaults to None. + :param enctype: Encoding of the POST body; supported values are application/x-www-form-urlencoded and + multipart/form-data. + :return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs, and the value's + content type. + """ + # Parse the request "URL" (i.e. only the path) + tokens = urllib.parse.urlparse(request) + # Parse the GET arguments + getargs = urllib.parse.parse_qs(tokens.query) + + # TODO: { 'foo': [ ('text/plain', 'bar'), ('application/octet-stream', '\x80') ] } + # TODO: Use a @dataclass once Python 3.7 is out + args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]] = dict() + for k, v in getargs.items(): + args[k] = 'text/plain', v + + if postbody is not None: + if enctype == 'application/x-www-form-urlencoded': + # Parse the POST body + postargs = urllib.parse.parse_qs(postbody.decode('utf-8')) + # Write all POST values into the dict, overriding potential duplicates from GET + for k, v in postargs.items(): + args[k] = 'text/plain', v + elif enctype.startswith('multipart/form-data'): + # Parse the multipart boundary from the Content-Type header + boundary: str = enctype.split('boundary=')[1] + # Parse the multipart body + mpargs = _parse_multipart(postbody, boundary) + for k, v in mpargs.items(): + # TODO: Process all values, not just the first + args[k] = v[0] + else: + raise ValueError(f'Unsupported Content-Type: {enctype}') + # urllib.parse.parse_qs turns ALL arguments into arrays. This turns arrays of length 1 into scalar values + for (k, (ct, v)) in args.items(): + if len(v) == 1: + args[k] = ct, v[0] + # Return the path and the parsed arguments + return tokens.path, args