First implementation of multipart/form-data parsing

This commit is contained in:
s3lph 2018-06-27 21:17:18 +02:00
parent 5ccb2c9304
commit f702eccc57
12 changed files with 357 additions and 108 deletions

View file

@ -1,12 +1,11 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, List, Tuple, Union
import traceback
import os
import socket
import mimetypes
import urllib.parse
from socketserver import TCPServer
from http.server import HTTPServer, BaseHTTPRequestHandler
from http.cookies import SimpleCookie
@ -14,6 +13,7 @@ from uuid import uuid4
from datetime import datetime, timedelta
from matemat import __version__ as matemat_version
from matemat.webserver.util import parse_args
#
@ -28,12 +28,17 @@ BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None
# Dictionary to hold registered pagelet paths and their handler functions
_PAGELET_PATHS: Dict[str, Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes],
Tuple[int, Union[bytes, str]]]] = dict()
_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
str, # Request path
Dict[str, Tuple[str, Union[bytes, str, List[str]]]], # args: (name, (type, value))
Dict[str, Any], # Session vars
Dict[str, str]], # Response headers
Tuple[int, Union[bytes, str]]]] = dict() # Returns: (status code, response body)
# Inactivity timeout for client sessions
_SESSION_TIMEOUT: int = 3600
_MAX_POST: int = 1_000_000
def pagelet(path: str):
@ -43,12 +48,17 @@ def pagelet(path: str):
The function must have the following signature:
(method: str, path: str, args: Dict[str, Union[str, List[str]], session_vars: Dict[str, Any],
headers: Dict[str, str]) -> (int, Optional[Union[str, bytes]])
(method: str,
path: str,
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
session_vars: Dict[str, Any],
headers: Dict[str, str])
-> (int, Optional[Union[str, bytes]])
method: The HTTP method (GET, POST) that was used.
path: The path that was requested.
args: The arguments that were passed with the request (as GET or POST arguments).
args: The arguments that were passed with the request (as GET or POST arguments), each of which may be
either a str or bytes object, or a list of str.
session_vars: The session storage. May be read from and written to.
headers: The dictionary of HTTP response headers. Add headers you wish to send with the response.
returns: A tuple consisting of the HTTP status code (as an int) and the response body (as str or bytes,
@ -56,7 +66,12 @@ def pagelet(path: str):
:param path: The path to register the function for.
"""
def http_handler(fun: Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes],
def http_handler(fun: Callable[[str,
str,
Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
Dict[str, Any],
Dict[str, str]],
Tuple[int, Union[bytes, str]]]):
# Add the function to the dict of pagelets
_PAGELET_PATHS[path] = fun
@ -166,7 +181,7 @@ class HttpHandler(BaseHTTPRequestHandler):
if session_id in self.server.session_vars:
del self.server.session_vars[session_id]
def _handle(self, method: str, path: str, args: Dict[str, Union[str, List[str]]]) -> None:
def _handle(self, method: str, path: str, args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]]) -> None:
"""
Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource.
@ -238,7 +253,7 @@ class HttpHandler(BaseHTTPRequestHandler):
mimetype = 'application/octet-stream'
# Send content type and length header
self.send_header('Content-Type', mimetype)
self.send_header('Content-Length', len(data))
self.send_header('Content-Length', str(len(data)))
self.end_headers()
# Send the requested resource as response body
self.wfile.write(data)
@ -247,36 +262,6 @@ class HttpHandler(BaseHTTPRequestHandler):
self.send_response(404)
self.end_headers()
@staticmethod
def _parse_args(request: str, postbody: Optional[str] = None) -> Tuple[str, Dict[str, Union[str, List[str]]]]:
"""
Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded form, parse the
arguments and return them as a dictionary.
If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded.
:param request: The request string to parse.
:param postbody: The POST body to parse, defaults to None.
:return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs.
"""
# Parse the request "URL" (i.e. only the path)
tokens = urllib.parse.urlparse(request)
# Parse the GET arguments
args = urllib.parse.parse_qs(tokens.query)
if postbody is not None:
# Parse the POST body
postargs = urllib.parse.parse_qs(postbody)
# Write all POST values into the dict, overriding potential duplicates from GET
for k, v in postargs.items():
args[k] = v
# urllib.parse.parse_qs turns ALL arguments into arrays. This turns arrays of length 1 into scalar values
for k, v in args.items():
if len(v) == 1:
args[k] = v[0]
# Return the path and the parsed arguments
return tokens.path, args
# noinspection PyPep8Naming
def do_GET(self) -> None:
"""
@ -284,7 +269,7 @@ class HttpHandler(BaseHTTPRequestHandler):
"""
try:
# Parse the request and hand it to the handle function
path, args = self._parse_args(self.path)
path, args = parse_args(self.path)
self._handle('GET', path, args)
# Special handling for some errors
except PermissionError:
@ -305,25 +290,24 @@ class HttpHandler(BaseHTTPRequestHandler):
"""
try:
# Read the POST body, if it exists, and its MIME type is application/x-www-form-urlencoded
clen: str = self.headers.get('Content-Length', failobj='0')
clen: int = int(str(self.headers.get('Content-Length', failobj='0')))
if clen > _MAX_POST:
raise ValueError('Request too big')
ctype: str = self.headers.get('Content-Type', failobj='application/octet-stream')
post: str = ''
if ctype == 'application/x-www-form-urlencoded':
post = self.rfile.read(int(clen)).decode('utf-8')
post: bytes = self.rfile.read(clen)
path, args = parse_args(self.path, postbody=post, enctype=ctype)
# Parse the request and hand it to the handle function
path, args = self._parse_args(self.path, postbody=post)
self._handle('POST', path, args)
# Special handling for some errors
except PermissionError as e:
# Special handling for some errors
except PermissionError:
self.send_response(403, 'Forbidden')
self.end_headers()
print(e)
traceback.print_tb(e.__traceback__)
except ValueError as e:
except ValueError:
self.send_response(400, 'Bad Request')
self.end_headers()
except TypeError:
self.send_response(400, 'Bad Request')
self.end_headers()
print(e)
traceback.print_tb(e.__traceback__)
except BaseException as e:
# Generic error handling
self.send_response(500, 'Internal Server Error')

View file

@ -8,3 +8,4 @@ from .main import main_page
from .login import login_page
from .logout import logout
from .touchkey import touchkey_page
from .upload_test import upload_test

View file

@ -1,5 +1,5 @@
from typing import Any, Dict
from typing import Any, Dict, List, Optional, Tuple, Union
from matemat.exceptions import AuthenticationError
from matemat.webserver import pagelet
@ -8,7 +8,12 @@ from matemat.db import MatematDatabase
@pagelet('/login')
def login_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]):
def login_page(method: str,
path: str,
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
if 'user' in session_vars:
headers['Location'] = '/'
return 301, None
@ -38,13 +43,19 @@ def login_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[
'''
return 200, data.format(msg=args['msg'] if 'msg' in args else '')
elif method == 'POST':
print(args)
if 'username' not in args or not isinstance(args['username'], str):
return 400, None
if 'password' not in args or not isinstance(args['password'], str):
return 400, None
username: str = args['username']
password: str = args['password']
with MatematDatabase('test.db') as db:
try:
user: User = db.login(args['username'], args['password'])
user: User = db.login(username, password)
except AuthenticationError:
headers['Location'] = '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.'
return 301, bytes()
session_vars['user'] = user
headers['Location'] = '/'
return 301, bytes()
return 405, None

View file

@ -1,11 +1,16 @@
from typing import Any, Dict
from typing import Any, Dict, List, Optional, Tuple, Union
from matemat.webserver import pagelet
@pagelet('/logout')
def logout(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]):
def logout(method: str,
path: str,
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
if 'user' in session_vars:
del session_vars['user']
headers['Location'] = '/'

View file

@ -1,5 +1,5 @@
from typing import Any, Dict, Optional, Tuple, Union
from typing import Any, Dict, List, Optional, Tuple, Union
from matemat.webserver import MatematWebserver, pagelet
from matemat.primitives import User
@ -7,7 +7,11 @@ from matemat.db import MatematDatabase
@pagelet('/')
def main_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str])\
def main_page(method: str,
path: str,
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
data = '''
<DOCTYPE html>

View file

@ -1,5 +1,5 @@
from typing import Any, Dict
from typing import Any, Dict, List, Optional, Tuple, Union
from matemat.exceptions import AuthenticationError
from matemat.webserver import pagelet
@ -8,7 +8,12 @@ from matemat.db import MatematDatabase
@pagelet('/touchkey')
def touchkey_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]):
def touchkey_page(method: str,
path: str,
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
if 'user' in session_vars:
headers['Location'] = '/'
return 301, bytes()
@ -37,12 +42,19 @@ def touchkey_page(method: str, path: str, args: Dict[str, str], session_vars: Di
'''
return 200, data.format(username=args['username'] if 'username' in args else '')
elif method == 'POST':
if 'username' not in args or not isinstance(args['username'], str):
return 400, None
if 'touchkey' not in args or not isinstance(args['touchkey'], str):
return 400, None
username: str = args['username']
touchkey: str = args['touchkey']
with MatematDatabase('test.db') as db:
try:
user: User = db.login(args['username'], touchkey=args['touchkey'])
user: User = db.login(username, touchkey=touchkey)
except AuthenticationError:
headers['Location'] = f'/touchkey?username={args["username"]}&msg=Please%20try%20again.'
return 301, bytes()
session_vars['user'] = user
headers['Location'] = '/'
return 301, None
return 405, None

View file

@ -0,0 +1,28 @@
from typing import Any, Dict, Union
from matemat.webserver import pagelet
@pagelet('/upload')
def upload_test(method: str,
path: str,
args: Dict[str, Union[str, bytes]],
session_vars: Dict[str, Any],
headers: Dict[str, str]):
if method == 'GET':
return 200, '''
<!DOCTYPE html>
<html>
<body>
<form method="post" action="#" enctype="multipart/form-data">
<input type="file" name="testfile" />
<input type="hidden" name="teststatic" value="statictest" />
<input type="submit" value="Upload" />
</form>
</body>
</html>
'''
else:
headers['Content-Type'] = 'text/plain'
return 200, args.items().__str__()

View file

@ -1,5 +1,5 @@
from typing import Any, Callable, Dict, Tuple, Union
from typing import Any, Callable, Dict, List, Tuple, Union
import unittest.mock
from io import BytesIO
@ -31,8 +31,8 @@ class HttpResponse:
'Content-Length': 0
}
self.pagelet: str = None
# The response body. Only UTF-8 strings are supported
self.body: str = ''
# The response body
self.body: bytes = bytes()
# Parsing phase, one of 'begin', 'hdr', 'body' or 'done'
self.parse_phase = 'begin'
# Buffer for uncompleted lines
@ -55,7 +55,7 @@ class HttpResponse:
return
# If in the body phase, simply decode and append to the body, while the body is not complete yet
elif self.parse_phase == 'body':
self.body += fragment.decode('utf-8')
self.body += fragment
if len(self.body) >= int(self.headers['Content-Length']):
self.__finalize()
return
@ -66,24 +66,24 @@ class HttpResponse:
if not fragment.endswith(b'\r\n'):
# Special treatment for no trailing CR-LF: Add remainder to buffer
head, tail = fragment.rsplit(b'\r\n', 1)
data: str = (self.buffer + head).decode('utf-8')
data: bytes = (self.buffer + head)
self.buffer = tail
else:
data: str = (self.buffer + fragment).decode('utf-8')
data: bytes = (self.buffer + fragment)
self.buffer = bytes()
# Iterate the lines that are ready to be parsed
for line in data.split('\r\n'):
for line in data.split(b'\r\n'):
# The 'begin' phase indicates that the parser is waiting for the HTTP status line
if self.parse_phase == 'begin':
if line.startswith('HTTP/'):
if line.startswith(b'HTTP/'):
# Parse the statuscode and advance to header parsing
_, statuscode, _ = line.split(' ', 2)
_, statuscode, _ = line.decode('utf-8').split(' ', 2)
self.statuscode = int(statuscode)
self.parse_phase = 'hdr'
elif self.parse_phase == 'hdr':
# Parse a header line and add it to the header dict
if len(line) > 0:
k, v = line.split(':', 1)
k, v = line.decode('utf-8').split(':', 1)
self.headers[k.strip()] = v.strip()
else:
# Empty line separates header from body
@ -156,12 +156,16 @@ class MockSocket(bytes):
def test_pagelet(path: str):
def with_testing_headers(fun: Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str]],
def with_testing_headers(fun: Callable[[str,
str,
Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
Dict[str, Any],
Dict[str, str]],
Tuple[int, Union[bytes, str]]]):
@pagelet(path)
def testing_wrapper(method: str,
path: str,
args: Dict[str, str],
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
session_vars: Dict[str, Any],
headers: Dict[str, str]):
status, body = fun(method, path, args, session_vars, headers)

View file

@ -1,14 +1,16 @@
from typing import Any, Dict, List
from typing import Any, Dict, List, Tuple,Union
from matemat.webserver.httpd import HttpHandler
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
import codecs
@test_pagelet('/just/testing/post')
def post_test_pagelet(method: str,
path: str,
args: Dict[str, str],
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
session_vars: Dict[str, Any],
headers: Dict[str, str]):
"""
@ -16,8 +18,13 @@ def post_test_pagelet(method: str,
"""
headers['Content-Type'] = 'text/plain'
dump: str = ''
for k, v in args.items():
dump += f'{k}: {v if isinstance(v, str) else ",".join(v)}\n'
for k, (t, v) in args.items():
if t.startswith('text/'):
if isinstance(v, bytes):
v = v.decode('utf-8')
dump += f'{k}: {",".join(v) if isinstance(v, list) else v}\n'
else:
dump += f'{k}: {codecs.encode(v, "hex").decode("utf-8")}\n'
return 200, dump
@ -26,7 +33,7 @@ class TestPost(AbstractHttpdTest):
Test cases for the content serving of the web server.
"""
def test_post_get_only_args(self):
def test_post_urlenc_get_only_args(self):
"""
Test a POST request that only contains GET arguments.
"""
@ -38,17 +45,17 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
self.assertEqual('bar', kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_post_only_args(self):
def test_post_urlenc_post_only_args(self):
"""
Test a POST request that only contains POST arguments (urlencoded).
"""
@ -61,17 +68,17 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
self.assertEqual('bar', kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_mixed_args(self):
def test_post_urlenc_mixed_args(self):
"""
Test that mixed POST and GET args are properly parsed, and that POST takes precedence over GET.
"""
@ -84,10 +91,10 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
@ -95,7 +102,7 @@ class TestPost(AbstractHttpdTest):
self.assertEqual('1', kv['gettest'])
self.assertEqual('2', kv['posttest'])
def test_post_get_array(self):
def test_post_urlenc_get_array(self):
"""
Test a POST request that contains GET array arguments.
"""
@ -107,17 +114,17 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
self.assertListEqual(['bar', 'baz'], kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_post_array(self):
def test_post_urlenc_post_array(self):
"""
Test a POST request that contains POST array arguments.
"""
@ -130,17 +137,17 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
self.assertListEqual(['bar', 'baz'], kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_mixed_array(self):
def test_post_urlenc_mixed_array(self):
"""
Test a POST request that contains both GET and POST array arguments.
"""
@ -153,13 +160,85 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
self.assertListEqual(['postbar', 'postbaz'], kv['foo'])
self.assertListEqual(['1', '42'], kv['gettest'])
self.assertListEqual(['1', '2'], kv['posttest'])
def test_post_no_body(self):
"""
Test a POST request that contains no headers or body.
"""
# Send POST request
self.client_sock.set_request(b'POST /just/testing/post?foo=bar HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure a 400 Bad Request is returned
self.assertEqual(400, packet.statuscode)
def test_post_multipart_post_only(self):
"""
Test a POST request with a miltipart/form-data body.
"""
# Send POST request
formdata = (b'------testboundary\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'------testboundary\r\n'
b'Content-Disposition: form-data; name="bar"; filename="foo.bar"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x80\x0b\x0c\x73\x0e\x0f\r\n'
b'------testboundary--\r\n')
self.client_sock.set_request(f'POST /just/testing/post HTTP/1.1\r\n'
f'Content-Type: multipart/form-data; boundary=----testboundary\r\n'
f'Content-Length: {len(formdata)}\r\n\r\n'.encode('utf-8') + formdata)
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, Any] = dict()
for l in lines:
k, v = l.split(b':', 1)
kv[k.decode('utf-8').strip()] = v.strip()
self.assertIn('foo', kv)
self.assertIn('bar', kv)
self.assertEqual(kv['foo'], b'Hello, World!')
self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f')
def test_post_multipart_mixed(self):
"""
Test a POST request with a miltipart/form-data body.
"""
# Send POST request
formdata = (b'------testboundary\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'------testboundary\r\n'
b'Content-Disposition: form-data; name="bar"; filename="foo.bar"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x80\x0b\x0c\x73\x0e\x0f\r\n'
b'------testboundary--\r\n')
self.client_sock.set_request(f'POST /just/testing/post?getfoo=bar&foo=thisshouldbegone HTTP/1.1\r\n'
f'Content-Type: multipart/form-data; boundary=----testboundary\r\n'
f'Content-Length: {len(formdata)}\r\n\r\n'.encode('utf-8') + formdata)
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, Any] = dict()
for l in lines:
k, v = l.split(b':', 1)
kv[k.decode('utf-8').strip()] = v.strip()
self.assertIn('foo', kv)
self.assertIn('bar', kv)
self.assertEqual(kv['getfoo'], b'bar')
self.assertEqual(kv['foo'], b'Hello, World!')
self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f')

View file

@ -1,5 +1,5 @@
from typing import Any, Dict
from typing import Any, Dict, Union
import os
import os.path
@ -10,7 +10,7 @@ from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_p
@test_pagelet('/just/testing/serve_pagelet_ok')
def serve_test_pagelet_ok(method: str,
path: str,
args: Dict[str, str],
args: Dict[str, Union[bytes, str]],
session_vars: Dict[str, Any],
headers: Dict[str, str]):
headers['Content-Type'] = 'text/plain'
@ -20,7 +20,7 @@ def serve_test_pagelet_ok(method: str,
@test_pagelet('/just/testing/serve_pagelet_fail')
def serve_test_pagelet_fail(method: str,
path: str,
args: Dict[str, str],
args: Dict[str, Union[bytes, str]],
session_vars: Dict[str, Any],
headers: Dict[str, str]):
session_vars['test'] = 'hello, world!'
@ -54,7 +54,7 @@ class TestServe(AbstractHttpdTest):
self.assertEqual('serve_test_pagelet_ok', packet.pagelet)
# Make sure the expected content is served
self.assertEqual(200, packet.statuscode)
self.assertEqual('serve test pagelet ok', packet.body)
self.assertEqual(b'serve test pagelet ok', packet.body)
def test_serve_pagelet_fail(self):
# Call the test pagelet that produces a 500 Internal Server Error result
@ -66,7 +66,7 @@ class TestServe(AbstractHttpdTest):
self.assertEqual('serve_test_pagelet_fail', packet.pagelet)
# Make sure the expected content is served
self.assertEqual(500, packet.statuscode)
self.assertEqual('serve test pagelet fail', packet.body)
self.assertEqual(b'serve test pagelet fail', packet.body)
def test_serve_static_ok(self):
# Request a static resource
@ -78,7 +78,7 @@ class TestServe(AbstractHttpdTest):
self.assertIsNone(packet.pagelet)
# Make sure the expected content is served
self.assertEqual(200, packet.statuscode)
self.assertEqual('static resource test', packet.body)
self.assertEqual(b'static resource test', packet.body)
def test_serve_static_forbidden(self):
# Request a static resource with lacking permissions
@ -90,7 +90,7 @@ class TestServe(AbstractHttpdTest):
self.assertIsNone(packet.pagelet)
# Make sure a 403 header is served
self.assertEqual(403, packet.statuscode)
self.assertNotEqual('This should not be readable', packet.body)
self.assertNotEqual(b'This should not be readable', packet.body)
def test_serve_not_found(self):
# Request a nonexistent resource
@ -116,7 +116,10 @@ class TestServe(AbstractHttpdTest):
def test_static_post_not_allowed(self):
# Request a resource outside the webroot
self.client_sock.set_request(b'POST /iwanttouploadthis HTTP/1.1\r\n\r\nq=this%20should%20not%20be%20uploaded')
self.client_sock.set_request(b'POST /iwanttopostthis HTTP/1.1\r\n'
b'Content-Type: application/x-www-form-urlencoded\r\n'
b'Content-length: 37\r\n\r\n'
b'q=this%20should%20not%20be%20uploaded')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()

View file

@ -1,5 +1,5 @@
from typing import Any, Dict
from typing import Any, Dict, Union
from datetime import datetime, timedelta
from time import sleep
@ -11,7 +11,7 @@ from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_p
@test_pagelet('/just/testing/sessions')
def session_test_pagelet(method: str,
path: str,
args: Dict[str, str],
args: Dict[str, Union[bytes, str]],
session_vars: Dict[str, Any],
headers: Dict[str, str]):
session_vars['test'] = 'hello, world!'

118
matemat/webserver/util.py Normal file
View file

@ -0,0 +1,118 @@
from typing import Dict, List, Tuple, Optional, Union
import urllib.parse
def _parse_multipart(body: bytes, boundary: str) -> Dict[str, List[Tuple[str, Union[bytes, str]]]]:
"""
Given a HTTP body with form-data in multipart form, and the multipart-boundary, parse the multipart items and
return them as a dictionary.
:param body: The HTTP multipart/form-data body.
:param boundary: The multipart boundary.
:return: A dictionary of field names as key, and content types and field values as value.
"""
# Generate item header boundary and terminating boundary from general boundary string
_boundary = f'\r\n--{boundary}\r\n'.encode('utf-8')
_end_boundary = f'\r\n--{boundary}--\r\n'.encode('utf-8')
# Split at the end boundary and make sure there comes nothing after it
allparts = body.split(_end_boundary, 1)
if len(allparts) != 2 or allparts[1] != b'':
raise ValueError('Last boundary missing or corrupted')
# Split remaining body into its parts (appending a CRLF for the first boundary to match), and verify at least 1 part
# is there
parts: List[bytes] = (b'\r\n' + allparts[0]).split(_boundary)
if len(parts) < 1 or parts[0] != b'':
raise ValueError('First boundary missing or corrupted')
# Remove the first, empty part
parts = parts[1:]
# Results go into this dict
args: Dict[str, List[Tuple[str, Union[bytes, str]]]] = dict()
# Parse each multipart part
for part in parts:
# Parse multipart headers
hdr: Dict[str, str] = dict()
while True:
head, part = part.split(b'\r\n', 1)
# Break on header/body delimiter
if head == b'':
break
# Add header to hdr dict
hk, hv = head.decode('utf-8').split(':')
hdr[hk.strip()] = hv.strip()
# At least Content-Type and Content-Disposition must be present
if 'Content-Type' not in hdr or 'Content-Disposition' not in hdr:
raise ValueError('Missing Content-Type or Content-Disposition header')
# Extract Content-Disposition header value and its arguments
cd, *cdargs = hdr['Content-Disposition'].split(';')
# Content-Disposition MUST be form-data; everything else is rejected
if cd.strip() != 'form-data':
raise ValueError(f'Unknown Content-Disposition: cd')
# Extract the "name" header argument
for cdarg in cdargs:
k, v = cdarg.split('=', 1)
if k.strip() == 'name':
name: str = v.strip()
# Remove quotation marks around the name value
if name.startswith('"') and name.endswith('"'):
name = v[1:-1]
# Add the Content-Type and the content to the header, with the provided name
if name not in args:
args[name] = list()
args[name].append((hdr['Content-Type'].strip(), part))
return args
def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 'text/plain') \
-> Tuple[str, Dict[str, Tuple[str, Union[bytes, str, List[str]]]]]:
"""
Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded or
multipart/form-data form, parse the arguments and return them as a dictionary.
If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded.
:param request: The request string to parse.
:param postbody: The POST body to parse, defaults to None.
:param enctype: Encoding of the POST body; supported values are application/x-www-form-urlencoded and
multipart/form-data.
:return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs, and the value's
content type.
"""
# Parse the request "URL" (i.e. only the path)
tokens = urllib.parse.urlparse(request)
# Parse the GET arguments
getargs = urllib.parse.parse_qs(tokens.query)
# TODO: { 'foo': [ ('text/plain', 'bar'), ('application/octet-stream', '\x80') ] }
# TODO: Use a @dataclass once Python 3.7 is out
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]] = dict()
for k, v in getargs.items():
args[k] = 'text/plain', v
if postbody is not None:
if enctype == 'application/x-www-form-urlencoded':
# Parse the POST body
postargs = urllib.parse.parse_qs(postbody.decode('utf-8'))
# Write all POST values into the dict, overriding potential duplicates from GET
for k, v in postargs.items():
args[k] = 'text/plain', v
elif enctype.startswith('multipart/form-data'):
# Parse the multipart boundary from the Content-Type header
boundary: str = enctype.split('boundary=')[1]
# Parse the multipart body
mpargs = _parse_multipart(postbody, boundary)
for k, v in mpargs.items():
# TODO: Process all values, not just the first
args[k] = v[0]
else:
raise ValueError(f'Unsupported Content-Type: {enctype}')
# urllib.parse.parse_qs turns ALL arguments into arrays. This turns arrays of length 1 into scalar values
for (k, (ct, v)) in args.items():
if len(v) == 1:
args[k] = ct, v[0]
# Return the path and the parsed arguments
return tokens.path, args