Merge branch 'multipart' into 'webserver-impl'

Implemented parsing of multipart/form-data POST requests. Also syntactic sugar for request arguments handling.

See merge request s3lph/matemat!6
This commit is contained in:
s3lph 2018-07-04 16:51:34 +00:00
commit 9d6c0c9de2
14 changed files with 1529 additions and 123 deletions

View file

@ -6,4 +6,5 @@ API that can be used by 'pagelets' - single pages of a web service. If a reques
server will attempt to serve the request with a static resource in a previously configured webroot directory.
"""
from .requestargs import RequestArgument, RequestArguments
from .httpd import MatematWebserver, HttpHandler, pagelet

View file

@ -1,12 +1,11 @@
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, Tuple, Union
import traceback
import os
import socket
import mimetypes
import urllib.parse
from socketserver import TCPServer
from http.server import HTTPServer, BaseHTTPRequestHandler
from http.cookies import SimpleCookie
@ -14,6 +13,8 @@ from uuid import uuid4
from datetime import datetime, timedelta
from matemat import __version__ as matemat_version
from matemat.webserver import RequestArguments
from matemat.webserver.util import parse_args
#
@ -28,12 +29,17 @@ BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None
# Dictionary to hold registered pagelet paths and their handler functions
_PAGELET_PATHS: Dict[str, Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes],
Tuple[int, Union[bytes, str]]]] = dict()
_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
str, # Request path
RequestArguments, # HTTP Request arguments
Dict[str, Any], # Session vars
Dict[str, str]], # Response headers
Tuple[int, Union[bytes, str]]]] = dict() # Returns: (status code, response body)
# Inactivity timeout for client sessions
_SESSION_TIMEOUT: int = 3600
_MAX_POST: int = 1_000_000
def pagelet(path: str):
@ -43,8 +49,12 @@ def pagelet(path: str):
The function must have the following signature:
(method: str, path: str, args: Dict[str, Union[str, List[str]], session_vars: Dict[str, Any],
headers: Dict[str, str]) -> (int, Optional[Union[str, bytes]])
(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])
-> (int, Optional[Union[str, bytes]])
method: The HTTP method (GET, POST) that was used.
path: The path that was requested.
@ -56,7 +66,12 @@ def pagelet(path: str):
:param path: The path to register the function for.
"""
def http_handler(fun: Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes],
def http_handler(fun: Callable[[str,
str,
RequestArguments,
Dict[str, Any],
Dict[str, str]],
Tuple[int, Union[bytes, str]]]):
# Add the function to the dict of pagelets
_PAGELET_PATHS[path] = fun
@ -166,7 +181,7 @@ class HttpHandler(BaseHTTPRequestHandler):
if session_id in self.server.session_vars:
del self.server.session_vars[session_id]
def _handle(self, method: str, path: str, args: Dict[str, Union[str, List[str]]]) -> None:
def _handle(self, method: str, path: str, args: RequestArguments) -> None:
"""
Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource.
@ -238,7 +253,7 @@ class HttpHandler(BaseHTTPRequestHandler):
mimetype = 'application/octet-stream'
# Send content type and length header
self.send_header('Content-Type', mimetype)
self.send_header('Content-Length', len(data))
self.send_header('Content-Length', str(len(data)))
self.end_headers()
# Send the requested resource as response body
self.wfile.write(data)
@ -247,36 +262,6 @@ class HttpHandler(BaseHTTPRequestHandler):
self.send_response(404)
self.end_headers()
@staticmethod
def _parse_args(request: str, postbody: Optional[str] = None) -> Tuple[str, Dict[str, Union[str, List[str]]]]:
"""
Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded form, parse the
arguments and return them as a dictionary.
If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded.
:param request: The request string to parse.
:param postbody: The POST body to parse, defaults to None.
:return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs.
"""
# Parse the request "URL" (i.e. only the path)
tokens = urllib.parse.urlparse(request)
# Parse the GET arguments
args = urllib.parse.parse_qs(tokens.query)
if postbody is not None:
# Parse the POST body
postargs = urllib.parse.parse_qs(postbody)
# Write all POST values into the dict, overriding potential duplicates from GET
for k, v in postargs.items():
args[k] = v
# urllib.parse.parse_qs turns ALL arguments into arrays. This turns arrays of length 1 into scalar values
for k, v in args.items():
if len(v) == 1:
args[k] = v[0]
# Return the path and the parsed arguments
return tokens.path, args
# noinspection PyPep8Naming
def do_GET(self) -> None:
"""
@ -284,7 +269,7 @@ class HttpHandler(BaseHTTPRequestHandler):
"""
try:
# Parse the request and hand it to the handle function
path, args = self._parse_args(self.path)
path, args = parse_args(self.path)
self._handle('GET', path, args)
# Special handling for some errors
except PermissionError:
@ -305,25 +290,24 @@ class HttpHandler(BaseHTTPRequestHandler):
"""
try:
# Read the POST body, if it exists, and its MIME type is application/x-www-form-urlencoded
clen: str = self.headers.get('Content-Length', failobj='0')
clen: int = int(str(self.headers.get('Content-Length', failobj='0')))
if clen > _MAX_POST:
raise ValueError('Request too big')
ctype: str = self.headers.get('Content-Type', failobj='application/octet-stream')
post: str = ''
if ctype == 'application/x-www-form-urlencoded':
post = self.rfile.read(int(clen)).decode('utf-8')
post: bytes = self.rfile.read(clen)
path, args = parse_args(self.path, postbody=post, enctype=ctype)
# Parse the request and hand it to the handle function
path, args = self._parse_args(self.path, postbody=post)
self._handle('POST', path, args)
# Special handling for some errors
except PermissionError as e:
# Special handling for some errors
except PermissionError:
self.send_response(403, 'Forbidden')
self.end_headers()
print(e)
traceback.print_tb(e.__traceback__)
except ValueError as e:
except ValueError:
self.send_response(400, 'Bad Request')
self.end_headers()
except TypeError:
self.send_response(400, 'Bad Request')
self.end_headers()
print(e)
traceback.print_tb(e.__traceback__)
except BaseException as e:
# Generic error handling
self.send_response(500, 'Internal Server Error')

View file

@ -1,14 +1,19 @@
from typing import Any, Dict
from typing import Any, Dict, Optional, Tuple, Union
from matemat.exceptions import AuthenticationError
from matemat.webserver import pagelet
from matemat.webserver import pagelet, RequestArguments
from matemat.primitives import User
from matemat.db import MatematDatabase
@pagelet('/login')
def login_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]):
def login_page(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
if 'user' in session_vars:
headers['Location'] = '/'
return 301, None
@ -36,15 +41,15 @@ def login_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[
</body>
</html>
'''
return 200, data.format(msg=args['msg'] if 'msg' in args else '')
return 200, data.format(msg=str(args.msg) if 'msg' in args else '')
elif method == 'POST':
print(args)
with MatematDatabase('test.db') as db:
try:
user: User = db.login(args['username'], args['password'])
user: User = db.login(str(args.username), str(args.password))
except AuthenticationError:
headers['Location'] = '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.'
return 301, bytes()
session_vars['user'] = user
headers['Location'] = '/'
return 301, bytes()
return 405, None

View file

@ -1,11 +1,16 @@
from typing import Any, Dict
from typing import Any, Dict, List, Optional, Tuple, Union
from matemat.webserver import pagelet
from matemat.webserver import pagelet, RequestArguments
@pagelet('/logout')
def logout(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]):
def logout(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
if 'user' in session_vars:
del session_vars['user']
headers['Location'] = '/'

View file

@ -1,13 +1,17 @@
from typing import Any, Dict, Optional, Tuple, Union
from matemat.webserver import MatematWebserver, pagelet
from matemat.webserver import pagelet, RequestArguments
from matemat.primitives import User
from matemat.db import MatematDatabase
@pagelet('/')
def main_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str])\
def main_page(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
data = '''
<DOCTYPE html>

View file

@ -1,14 +1,19 @@
from typing import Any, Dict
from typing import Any, Dict, Optional, Tuple, Union
from matemat.exceptions import AuthenticationError
from matemat.webserver import pagelet
from matemat.webserver import pagelet, RequestArguments
from matemat.primitives import User
from matemat.db import MatematDatabase
@pagelet('/touchkey')
def touchkey_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]):
def touchkey_page(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]:
if 'user' in session_vars:
headers['Location'] = '/'
return 301, bytes()
@ -35,14 +40,15 @@ def touchkey_page(method: str, path: str, args: Dict[str, str], session_vars: Di
</body>
</html>
'''
return 200, data.format(username=args['username'] if 'username' in args else '')
return 200, data.format(username=str(args.username) if 'username' in args else '')
elif method == 'POST':
with MatematDatabase('test.db') as db:
try:
user: User = db.login(args['username'], touchkey=args['touchkey'])
user: User = db.login(str(args.username), touchkey=str(args.touchkey))
except AuthenticationError:
headers['Location'] = f'/touchkey?username={args["username"]}&msg=Please%20try%20again.'
return 301, bytes()
session_vars['user'] = user
headers['Location'] = '/'
return 301, None
return 405, None

View file

@ -0,0 +1,324 @@
from typing import Dict, Iterator, List, Tuple, Union
class RequestArguments(object):
"""
Container for HTTP Request arguments.
Usage:
# Create empty instance
ra = RequestArguments()
# Add an entry for the key 'foo' with the value 'bar' and Content-Type 'text/plain'
ra['foo'].append('text/plain', 'bar')
# Retrieve the value for the key 'foo', as a string...
foo = str(ra.foo)
# ... or as raw bytes
foo = bytes(ra.foo)
"""
def __init__(self) -> None:
"""
Create an empty container instance.
"""
self.__container: Dict[str, RequestArgument] = dict()
def __getitem__(self, key: str) -> 'RequestArgument':
"""
Retrieve the argument for the given name, creating it on the fly, if it doesn't exist.
:param key: Name of the argument to retrieve.
:return: A RequestArgument instance.
:raises TypeError: If key is not a string.
"""
if not isinstance(key, str):
raise TypeError('key must be a str')
# Create empty argument, if it doesn't exist
if key not in self.__container:
self.__container[key] = RequestArgument(key)
# Return the argument for the name
return self.__container[key]
def __getattr__(self, key: str) -> 'RequestArgument':
"""
Syntactic sugar for accessing values with a name that can be used in Python attributes. The value will be
returned as an immutable view.
:param key: Name of the argument to retrieve.
:return: An immutable view of the RequestArgument instance.
"""
return _View.of(self.__container[key])
def __iter__(self) -> Iterator['RequestArguments']:
"""
Returns an iterator over the values in this instance. Values are represented as immutable views.
:return: An iterator that yields immutable views of the values.
"""
for ra in self.__container.values():
# Yield an immutable scalar view for each value
yield _View.of(ra)
def __contains__(self, key: str) -> bool:
"""
Checks whether an argument with a given name exists in the RequestArguments instance.
:param key: The name to check whether it exists.
:return: True, if present, False otherwise.
"""
return key in self.__container
def __len__(self) -> int:
"""
:return: The number of arguments in this instance.
"""
return len(self.__container)
class RequestArgument(object):
"""
Container class for HTTP request arguments that simplifies dealing with
- scalar and array arguments:
Automatically converts between single values and arrays where necessary: Arrays with one element can be
accessed as scalars, and scalars can be iterated, yielding themselves as a single item.
- UTF-8 strings and binary data (e.g. file uploads):
All data can be retrieved both as a str (if utf-8 decoding is possible) and a bytes object.
The objects returned from iteration or indexing are immutable views of (parts of) this object.
Usage example:
qsargs = urllib.parse.parse_qs(qs, strict_parsing=True, keep_blank_values=True, errors='strict')
args: RequestArguments
for k, vs in qsargs:
args[k].clear()
for v in vs:
# text/plain usually is a sensible choice for values decoded from urlencoded strings
# IF ALREADY IN STRING FORM (which parse_qs does)!
args[k].append('text/plain', v)
if 'username' in args and args.username.is_scalar:
username = str(args.username)
"""
def __init__(self,
name: str,
value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]] = None) -> None:
"""
Create a new RequestArgument with a name and optionally an initial value.
:param name: The name for this argument, as provided via GET or POST.
:param value: The initial value, if any. Optional, initializes with empty array if omitted.
"""
# Assign name
self.__name: str = name
# Initialize value
self.__value: List[Tuple[str, Union[bytes, str]]] = []
# Default to empty array
if value is None:
self.__value = []
else:
if isinstance(value, list):
# Store the array
self.__value = value
else:
# Turn scalar into an array before storing
self.__value = [value]
@property
def is_array(self) -> bool:
"""
:return: True, if the value is a (possibly empty) array, False otherwise.
"""
return len(self.__value) != 1
@property
def is_scalar(self) -> bool:
"""
:return: True, if the value is a single scalar value, False otherwise.
"""
return len(self.__value) == 1
@property
def is_view(self) -> bool:
"""
:return: True, if this instance is an immutable view, False otherwise.
"""
return False
@property
def name(self) -> str:
"""
:return: The name of this argument.
"""
return self.__name
def get_str(self, index: int = 0) -> str:
"""
Attempts to return a value as a string. The index defaults to 0.
:param index: The index of the value to retrieve. Default: 0.
:return: An UTF-8 string representation of the requested value.
:raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string.
:raises IndexError: If the index is out of bounds.
:raises TypeError: If the index is not an int.
:raises TypeError: If the requested value is neither a str nor a bytes object.
"""
if not isinstance(index, int):
# Index must be an int
raise TypeError('index must be an int')
# Type hint; access array element
v: Tuple[str, Union[bytes, str]] = self.__value[index]
if isinstance(v[1], str):
# The value already is a string, return
return v[1]
elif isinstance(v[1], bytes):
# The value is a bytes object, attempt to decode
return v[1].decode('utf-8')
raise TypeError('Value is neither a str nor bytes')
def __str__(self) -> str:
"""
Attempts to return the first value as a string.
:return: An UTF-8 string representation of the first value.
:raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string.
"""
return self.get_str()
def get_bytes(self, index: int = 0) -> bytes:
"""
Attempts to return a value as a bytes object. The index defaults to 0.
:param index: The index of the value to retrieve. Default: 0.
:return: A bytes object representation of the requested value. Strings will be encoded as UTF-8.
:raises IndexError: If the index is out of bounds.
:raises TypeError: If the index is not an int.
:raises TypeError: If the requested value is neither a str nor a bytes object.
"""
if not isinstance(index, int):
# Index must be a int
raise TypeError('index must be an int')
# Type hint; access array element
v: Tuple[str, Union[bytes, str]] = self.__value[index]
if isinstance(v[1], bytes):
# The value already is a bytes object, return
return v[1]
elif isinstance(v[1], str):
# The value is a string, encode first
return v[1].encode('utf-8')
raise TypeError('Value is neither a str nor bytes')
def __bytes__(self) -> bytes:
"""
Attempts to return the first value as a bytes object.
:return: A bytes string representation of the first value.
"""
return self.get_bytes()
def get_content_type(self, index: int = 0) -> str:
"""
Attempts to retrieve a value's Content-Type. The index defaults to 0.
:param index: The index of the value to retrieve. Default: 0.
:return: The Content-Type of the requested value, as sent by the client. Not necessarily trustworthy.
:raises IndexError: If the index is out of bounds.
:raises TypeError: If the index is not an int.
"""
# instance is an array value
if not isinstance(index, int):
# Needs an index for array values
raise TypeError('index must be an int')
# Type hint; access array element
va: Tuple[str, Union[bytes, str]] = self.__value[index]
# Return the content type of the requested value
if not isinstance(va[0], str):
raise TypeError('Content-Type is not a str')
return va[0]
def append(self, ctype: str, value: Union[str, bytes]) -> None:
"""
Append a value to this instance. Turns an empty argument into a scalar and a scalar into an array.
:param ctype: The Content-Type, as provided in the request.
:param value: The scalar value to append, either a string or bytes object.
:raises TypeError: If called on an immutable view.
"""
if self.is_view:
# This is an immutable view, raise exception
raise TypeError('A RequestArgument view is immutable!')
self.__value.append((ctype, value))
def clear(self) -> None:
"""
Remove all values from this instance.
:raises TypeError: If called on an immutable view.
"""
if self.is_view:
# This is an immutable view, raise exception
raise TypeError('A RequestArgument view is immutable!')
self.__value.clear()
def __len__(self) -> int:
"""
:return: Number of values for this argument.
"""
return len(self.__value)
def __iter__(self) -> Iterator['RequestArgument']:
"""
Iterate the values of this argument. Each value is accessible as if it were a scalar RequestArgument in turn,
although they are immutable.
:return: An iterator that yields immutable views of the values.
"""
for v in self.__value:
# Yield an immutable scalar view for each (ctype, value) element in the array
yield _View(self.__name, v)
def __getitem__(self, index: Union[int, slice]) -> 'RequestArgument':
"""
Index the argument with either an int or a slice. The returned values are represented as immutable
RequestArgument views.
:param index: The index or slice.
:return: An immutable view of the indexed elements of this argument.
"""
# Pass the index or slice through to the array, packing the result in an immutable view
return _View(self.__name, self.__value[index])
class _View(RequestArgument):
"""
This class represents an immutable view of a (subset of a) RequestArgument object. Should not be instantiated
directly.
"""
def __init__(self, name: str, value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]])\
-> None:
"""
Create a new immutable view of a (subset of a) RequestArgument.
:param name: The name for this argument, same as in the original RequestArgument.
:param value: The values to represent in this view, obtained by e.g. indexing or slicing.
"""
super().__init__(name, value)
@staticmethod
def of(argument: 'RequestArgument') ->'RequestArgument':
"""
Create an immutable, unsliced view of an RequestArgument instance.
:param argument: The RequestArgument instance to create a view of.
:return: An immutable view of the provided RequestArgument instance.
"""
return argument[:]
@property
def is_view(self) -> bool:
"""
:return: True, if this instance is an immutable view, False otherwise.
"""
return True

View file

@ -9,7 +9,7 @@ from abc import ABC
from datetime import datetime
from http.server import HTTPServer
from matemat.webserver.httpd import pagelet
from matemat.webserver import pagelet, RequestArguments
class HttpResponse:
@ -31,8 +31,8 @@ class HttpResponse:
'Content-Length': 0
}
self.pagelet: str = None
# The response body. Only UTF-8 strings are supported
self.body: str = ''
# The response body
self.body: bytes = bytes()
# Parsing phase, one of 'begin', 'hdr', 'body' or 'done'
self.parse_phase = 'begin'
# Buffer for uncompleted lines
@ -55,7 +55,7 @@ class HttpResponse:
return
# If in the body phase, simply decode and append to the body, while the body is not complete yet
elif self.parse_phase == 'body':
self.body += fragment.decode('utf-8')
self.body += fragment
if len(self.body) >= int(self.headers['Content-Length']):
self.__finalize()
return
@ -66,24 +66,24 @@ class HttpResponse:
if not fragment.endswith(b'\r\n'):
# Special treatment for no trailing CR-LF: Add remainder to buffer
head, tail = fragment.rsplit(b'\r\n', 1)
data: str = (self.buffer + head).decode('utf-8')
data: bytes = (self.buffer + head)
self.buffer = tail
else:
data: str = (self.buffer + fragment).decode('utf-8')
data: bytes = (self.buffer + fragment)
self.buffer = bytes()
# Iterate the lines that are ready to be parsed
for line in data.split('\r\n'):
for line in data.split(b'\r\n'):
# The 'begin' phase indicates that the parser is waiting for the HTTP status line
if self.parse_phase == 'begin':
if line.startswith('HTTP/'):
if line.startswith(b'HTTP/'):
# Parse the statuscode and advance to header parsing
_, statuscode, _ = line.split(' ', 2)
_, statuscode, _ = line.decode('utf-8').split(' ', 2)
self.statuscode = int(statuscode)
self.parse_phase = 'hdr'
elif self.parse_phase == 'hdr':
# Parse a header line and add it to the header dict
if len(line) > 0:
k, v = line.split(':', 1)
k, v = line.decode('utf-8').split(':', 1)
self.headers[k.strip()] = v.strip()
else:
# Empty line separates header from body
@ -156,12 +156,16 @@ class MockSocket(bytes):
def test_pagelet(path: str):
def with_testing_headers(fun: Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str]],
def with_testing_headers(fun: Callable[[str,
str,
RequestArguments,
Dict[str, Any],
Dict[str, str]],
Tuple[int, Union[bytes, str]]]):
@pagelet(path)
def testing_wrapper(method: str,
path: str,
args: Dict[str, str],
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]):
status, body = fun(method, path, args, session_vars, headers)

View file

@ -0,0 +1,347 @@
import unittest
from matemat.webserver.util import parse_args
class TestParseRequest(unittest.TestCase):
def test_parse_get_root(self):
path, args = parse_args('/')
self.assertEqual('/', path)
self.assertEqual(0, len(args))
def test_parse_get_no_args(self):
path, args = parse_args('/index.html')
self.assertEqual('/index.html', path)
self.assertEqual(0, len(args))
def test_parse_get_root_getargs(self):
path, args = parse_args('/?foo=42&bar=1337&baz=Hello,%20World!')
self.assertEqual('/', path)
self.assertEqual(3, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual('text/plain', args['foo'].get_content_type())
self.assertEqual('text/plain', args['bar'].get_content_type())
self.assertEqual('text/plain', args['baz'].get_content_type())
self.assertEqual('42', args['foo'].get_str())
self.assertEqual('1337', args['bar'].get_str())
self.assertEqual('Hello, World!', args['baz'].get_str())
def test_parse_get_getargs(self):
path, args = parse_args('/abc/def?foo=42&bar=1337&baz=Hello,%20World!')
self.assertEqual('/abc/def', path)
self.assertEqual(3, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual('text/plain', args['foo'].get_content_type())
self.assertEqual('text/plain', args['bar'].get_content_type())
self.assertEqual('text/plain', args['baz'].get_content_type())
self.assertEqual('42', args['foo'].get_str())
self.assertEqual('1337', args['bar'].get_str())
self.assertEqual('Hello, World!', args['baz'].get_str())
def test_parse_get_getarray(self):
path, args = parse_args('/abc/def?foo=42&foo=1337&baz=Hello,%20World!')
self.assertEqual('/abc/def', path)
self.assertEqual(2, len(args))
self.assertIn('foo', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_array)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual(2, len(args['foo']))
self.assertEqual('42', args['foo'].get_str(0))
self.assertEqual('1337', args['foo'].get_str(1))
def test_parse_get_zero_arg(self):
path, args = parse_args('/abc/def?foo=&bar=42')
self.assertEqual(2, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertEqual(1, len(args['foo']))
self.assertEqual('', args['foo'].get_str())
self.assertEqual('42', args['bar'].get_str())
def test_parse_get_urlencoded_encoding_fail(self):
with self.assertRaises(ValueError):
parse_args('/?foo=42&bar=%80&baz=Hello,%20World!')
def test_parse_post_urlencoded(self):
path, args = parse_args('/',
postbody=b'foo=42&bar=1337&baz=Hello,%20World!',
enctype='application/x-www-form-urlencoded')
self.assertEqual('/', path)
self.assertEqual(3, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual('text/plain', args['foo'].get_content_type())
self.assertEqual('text/plain', args['bar'].get_content_type())
self.assertEqual('text/plain', args['baz'].get_content_type())
self.assertEqual('42', args['foo'].get_str())
self.assertEqual('1337', args['bar'].get_str())
self.assertEqual('Hello, World!', args['baz'].get_str())
def test_parse_post_urlencoded_array(self):
path, args = parse_args('/',
postbody=b'foo=42&foo=1337&baz=Hello,%20World!',
enctype='application/x-www-form-urlencoded')
self.assertEqual('/', path)
self.assertEqual(2, len(args))
self.assertIn('foo', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_array)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual(2, len(args['foo']))
self.assertEqual('42', args['foo'].get_str(0))
self.assertEqual('1337', args['foo'].get_str(1))
def test_parse_post_urlencoded_zero_arg(self):
path, args = parse_args('/abc/def', postbody=b'foo=&bar=42', enctype='application/x-www-form-urlencoded')
self.assertEqual(2, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertEqual(1, len(args['foo']))
self.assertEqual('', args['foo'].get_str())
self.assertEqual('42', args['bar'].get_str())
def test_parse_post_urlencoded_encoding_fail(self):
with self.assertRaises(ValueError):
parse_args('/',
postbody=b'foo=42&bar=%80&baz=Hello,%20World!',
enctype='application/x-www-form-urlencoded')
def test_parse_post_multipart_no_args(self):
path, args = parse_args('/',
postbody=b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
self.assertEqual('/', path)
self.assertEqual(0, len(args))
def test_parse_post_multipart(self):
path, args = parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
self.assertEqual('/', path)
self.assertEqual(3, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertTrue(args['baz'].is_scalar)
self.assertEqual('text/plain', args['foo'].get_content_type())
self.assertEqual('application/octet-stream', args['bar'].get_content_type())
self.assertEqual('text/plain', args['baz'].get_content_type())
self.assertEqual('42', args['foo'].get_str())
self.assertEqual(b'1337', args['bar'].get_bytes())
self.assertEqual('Hello, World!', args['baz'].get_str())
def test_parse_post_multipart_zero_arg(self):
path, args = parse_args('/abc/def',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
self.assertEqual(2, len(args))
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertTrue(args['foo'].is_scalar)
self.assertTrue(args['bar'].is_scalar)
self.assertEqual(1, len(args['foo']))
self.assertEqual('', args['foo'].get_str())
self.assertEqual('42', args['bar'].get_str())
def test_parse_post_multipart_broken_boundaries(self):
with self.assertRaises(ValueError):
# Boundary not defined in Content-Type
parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data')
with self.assertRaises(ValueError):
# Corrupted "--" head at first boundary
parse_args('/',
postbody=b'-+testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
with self.assertRaises(ValueError):
# Missing "--" tail at end boundary
parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
with self.assertRaises(ValueError):
# Missing Content-Type header in one part
parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
with self.assertRaises(ValueError):
# Missing Content-Disposition header in one part
parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
with self.assertRaises(ValueError):
# Missing form-data name argument
parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
with self.assertRaises(ValueError):
# Unknown Content-Disposition
parse_args('/',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: attachment; name="bar"; filename="bar.bin"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
def test_get_post_precedence_urlencoded(self):
path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived',
postbody=b'foo=42&foo=1337&baz=Hello,%20World!',
enctype='application/x-www-form-urlencoded')
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertEqual(2, len(args['foo']))
self.assertEqual(1, len(args['bar']))
self.assertEqual(1, len(args['baz']))
self.assertEqual('42', args['foo'].get_str(0))
self.assertEqual('1337', args['foo'].get_str(1))
self.assertEqual('isurvived', args['bar'].get_str())
self.assertEqual('Hello, World!', args['baz'].get_str())
def test_get_post_precedence_multipart(self):
path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived',
postbody=b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'42\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="foo"; filename="bar.bin"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'1337\r\n'
b'--testBoundary1337\r\n'
b'Content-Disposition: form-data; name="baz"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'--testBoundary1337--\r\n',
enctype='multipart/form-data; boundary=testBoundary1337')
self.assertIn('foo', args)
self.assertIn('bar', args)
self.assertIn('baz', args)
self.assertEqual(2, len(args['foo']))
self.assertEqual(1, len(args['bar']))
self.assertEqual(1, len(args['baz']))
self.assertEqual('42', args['foo'].get_str(0))
self.assertEqual('1337', args['foo'].get_str(1))
self.assertEqual('isurvived', args['bar'].get_str())
self.assertEqual('Hello, World!', args['baz'].get_str())

View file

@ -1,14 +1,16 @@
from typing import Any, Dict, List
from matemat.webserver.httpd import HttpHandler
from matemat.webserver import HttpHandler, RequestArguments
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
import codecs
@test_pagelet('/just/testing/post')
def post_test_pagelet(method: str,
path: str,
args: Dict[str, str],
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]):
"""
@ -16,8 +18,12 @@ def post_test_pagelet(method: str,
"""
headers['Content-Type'] = 'text/plain'
dump: str = ''
for k, v in args.items():
dump += f'{k}: {v if isinstance(v, str) else ",".join(v)}\n'
for ra in args:
for a in ra:
if a.get_content_type().startswith('text/'):
dump += f'{a.name}: {a.get_str()}\n'
else:
dump += f'{a.name}: {codecs.encode(a.get_bytes(), "hex").decode("utf-8")}\n'
return 200, dump
@ -26,7 +32,7 @@ class TestPost(AbstractHttpdTest):
Test cases for the content serving of the web server.
"""
def test_post_get_only_args(self):
def test_post_urlenc_get_only_args(self):
"""
Test a POST request that only contains GET arguments.
"""
@ -38,17 +44,17 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
self.assertEqual('bar', kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_post_only_args(self):
def test_post_urlenc_post_only_args(self):
"""
Test a POST request that only contains POST arguments (urlencoded).
"""
@ -61,17 +67,17 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
self.assertEqual('bar', kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_mixed_args(self):
def test_post_urlenc_mixed_args(self):
"""
Test that mixed POST and GET args are properly parsed, and that POST takes precedence over GET.
"""
@ -84,10 +90,10 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
k, v = l.decode('utf-8').split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
# Make sure the arguments were properly parsed
@ -95,7 +101,7 @@ class TestPost(AbstractHttpdTest):
self.assertEqual('1', kv['gettest'])
self.assertEqual('2', kv['posttest'])
def test_post_get_array(self):
def test_post_urlenc_get_array(self):
"""
Test a POST request that contains GET array arguments.
"""
@ -107,17 +113,21 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
k, v = l.decode('utf-8').split(':', 1)
k = k.strip()
v = v.strip()
if k in kv:
kv[k] += f',{v}'
else:
kv[k] = v
# Make sure the arguments were properly parsed
self.assertListEqual(['bar', 'baz'], kv['foo'])
self.assertEqual('bar,baz', kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_post_array(self):
def test_post_urlenc_post_array(self):
"""
Test a POST request that contains POST array arguments.
"""
@ -130,17 +140,21 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
k, v = l.decode('utf-8').split(':', 1)
k = k.strip()
v = v.strip()
if k in kv:
kv[k] += f',{v}'
else:
kv[k] = v
# Make sure the arguments were properly parsed
self.assertListEqual(['bar', 'baz'], kv['foo'])
self.assertEqual('bar,baz', kv['foo'])
self.assertEqual('1', kv['test'])
def test_post_mixed_array(self):
def test_post_urlenc_mixed_array(self):
"""
Test a POST request that contains both GET and POST array arguments.
"""
@ -153,13 +167,58 @@ class TestPost(AbstractHttpdTest):
packet = self.client_sock.get_response()
# Parse response body
lines: List[str] = packet.body.split('\n')[:-1]
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, str] = dict()
for l in lines:
k, v = l.split(':', 1)
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
k, v = l.decode('utf-8').split(':', 1)
k = k.strip()
v = v.strip()
if k in kv:
kv[k] += f',{v}'
else:
kv[k] = v
# Make sure the arguments were properly parsed
self.assertListEqual(['postbar', 'postbaz'], kv['foo'])
self.assertListEqual(['1', '42'], kv['gettest'])
self.assertListEqual(['1', '2'], kv['posttest'])
self.assertEqual('postbar,postbaz', kv['foo'])
self.assertEqual('1,42', kv['gettest'])
self.assertEqual('1,2', kv['posttest'])
def test_post_no_body(self):
"""
Test a POST request that contains no headers or body.
"""
# Send POST request
self.client_sock.set_request(b'POST /just/testing/post?foo=bar HTTP/1.1\r\n\r\n')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
# Make sure a 400 Bad Request is returned
self.assertEqual(400, packet.statuscode)
def test_post_multipart_post_only(self):
"""
Test a POST request with a miutipart/form-data body.
"""
# Send POST request
formdata = (b'------testboundary\r\n'
b'Content-Disposition: form-data; name="foo"\r\n'
b'Content-Type: text/plain\r\n\r\n'
b'Hello, World!\r\n'
b'------testboundary\r\n'
b'Content-Disposition: form-data; name="bar"; filename="foo.bar"\r\n'
b'Content-Type: application/octet-stream\r\n\r\n'
b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x80\x0b\x0c\x73\x0e\x0f\r\n'
b'------testboundary--\r\n')
self.client_sock.set_request(f'POST /just/testing/post HTTP/1.1\r\n'
f'Content-Type: multipart/form-data; boundary=----testboundary\r\n'
f'Content-Length: {len(formdata)}\r\n\r\n'.encode('utf-8') + formdata)
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()
lines: List[bytes] = packet.body.split(b'\n')[:-1]
kv: Dict[str, Any] = dict()
for l in lines:
k, v = l.split(b':', 1)
kv[k.decode('utf-8').strip()] = v.strip()
self.assertIn('foo', kv)
self.assertIn('bar', kv)
self.assertEqual(kv['foo'], b'Hello, World!')
self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f')

View file

@ -0,0 +1,529 @@
from typing import Dict, List, Set, Tuple
import unittest
import urllib.parse
from matemat.webserver import RequestArgument, RequestArguments
# noinspection PyProtectedMember
from matemat.webserver.requestargs import _View
class TestRequestArguments(unittest.TestCase):
"""
Test cases for the RequestArgument class.
"""
def test_create_default(self):
"""
Test creation of an empty RequestArgument
"""
ra = RequestArgument('foo')
# Name must be set to 1st argument
self.assertEqual('foo', ra.name)
# Must be a 0-length array
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
self.assertTrue(ra.is_array)
# Must not be a view
self.assertFalse(ra.is_view)
def test_create_str_scalar(self):
"""
Test creation of a scalar RequestArgument with string value.
"""
ra = RequestArgument('foo', ('text/plain', 'bar'))
# Name must be set to 1st argument
self.assertEqual('foo', ra.name)
# Must be a scalar, length 1
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
self.assertFalse(ra.is_array)
# Scalar value must be representable both as str and bytes
self.assertEqual('bar', ra.get_str())
self.assertEqual(b'bar', ra.get_bytes())
# Content-Type must be set correctly
self.assertEqual('text/plain', ra.get_content_type())
# Using 0 indices must yield the same results
self.assertEqual('bar', ra.get_str(0))
self.assertEqual(b'bar', ra.get_bytes(0))
self.assertEqual('text/plain', ra.get_content_type(0))
# Using other indices must result in an error
with self.assertRaises(IndexError):
ra.get_str(1)
with self.assertRaises(IndexError):
ra.get_bytes(1)
with self.assertRaises(IndexError):
ra.get_content_type(1)
# Must not be a view
self.assertFalse(ra.is_view)
def test_create_str_scalar_array(self):
"""
Test creation of a scalar RequestArgument with string value, passing an array instead of a single tuple.
"""
ra = RequestArgument('foo', [('text/plain', 'bar')])
# Name must be set to 1st argument
self.assertEqual('foo', ra.name)
# Must be a scalar, length 1
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
self.assertFalse(ra.is_array)
# Scalar value must be representable both as str and bytes
self.assertEqual('bar', ra.get_str())
self.assertEqual(b'bar', ra.get_bytes())
# Content-Type must be set correctly
self.assertEqual('text/plain', ra.get_content_type())
# Using 0 indices must yield the same results
self.assertEqual('bar', ra.get_str(0))
self.assertEqual(b'bar', ra.get_bytes(0))
self.assertEqual('text/plain', ra.get_content_type(0))
# Using other indices must result in an error
with self.assertRaises(IndexError):
ra.get_str(1)
with self.assertRaises(IndexError):
ra.get_bytes(1)
with self.assertRaises(IndexError):
ra.get_content_type(1)
# Must not be a view
self.assertFalse(ra.is_view)
def test_create_bytes_scalar(self):
"""
Test creation of a scalar RequestArgument with bytes value.
"""
ra = RequestArgument('foo', ('application/octet-stream', b'\x00\x80\xff\xfe'))
# Name must be set to 1st argument
self.assertEqual('foo', ra.name)
# Must be a scalar, length 1
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
self.assertFalse(ra.is_array)
# Conversion to UTF-8 string must fail; bytes representation must work
with self.assertRaises(UnicodeDecodeError):
ra.get_str()
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes())
# Content-Type must be set correctly
self.assertEqual('application/octet-stream', ra.get_content_type())
# Using 0 indices must yield the same results
with self.assertRaises(UnicodeDecodeError):
ra.get_str(0)
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(0))
self.assertEqual('application/octet-stream', ra.get_content_type(0))
# Using other indices must result in an error
with self.assertRaises(IndexError):
ra.get_str(1)
with self.assertRaises(IndexError):
ra.get_bytes(1)
with self.assertRaises(IndexError):
ra.get_content_type(1)
# Must not be a view
self.assertFalse(ra.is_view)
def test_create_array(self):
"""
Test creation of an array RequestArgument with mixed str and bytes initial value.
"""
ra = RequestArgument('foo', [
('text/plain', 'bar'),
('application/octet-stream', b'\x00\x80\xff\xfe')
])
# Name must be set to 1st argument
self.assertEqual('foo', ra.name)
# Must be an array, length 2
self.assertEqual(2, len(ra))
self.assertFalse(ra.is_scalar)
self.assertTrue(ra.is_array)
# Retrieving values without an index must yield the first element
self.assertEqual('bar', ra.get_str())
self.assertEqual(b'bar', ra.get_bytes())
self.assertEqual('text/plain', ra.get_content_type())
# The first value must be representable both as str and bytes, and have ctype text/plain
self.assertEqual('bar', ra.get_str(0))
self.assertEqual(b'bar', ra.get_bytes(0))
self.assertEqual('text/plain', ra.get_content_type(0))
# Conversion of the second value to UTF-8 string must fail; bytes representation must work
with self.assertRaises(UnicodeDecodeError):
ra.get_str(1)
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1))
# The second value's ctype must be correct
self.assertEqual('application/octet-stream', ra.get_content_type(1))
# Must not be a view
self.assertFalse(ra.is_view)
def test_append_empty_str(self):
"""
Test appending a str value to an empty RequestArgument.
"""
# Initialize the empty RequestArgument
ra = RequestArgument('foo')
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
# Append a string value
ra.append('text/plain', 'bar')
# New length must be 1, empty array must be converted to scalar
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
# Retrieval of the new value must work both in str and bytes representation
self.assertEqual('bar', ra.get_str())
self.assertEqual(b'bar', ra.get_bytes())
# Content type of the new value must be correct
self.assertEqual('text/plain', ra.get_content_type())
# Must not be a view
self.assertFalse(ra.is_view)
def test_append_empty_bytes(self):
"""
Test appending a bytes value to an empty RequestArgument.
"""
# Initialize the empty RequestArgument
ra = RequestArgument('foo')
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
# Append a bytes value
ra.append('application/octet-stream', b'\x00\x80\xff\xfe')
# New length must be 1, empty array must be converted to scalar
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
# Conversion of the new value to UTF-8 string must fail; bytes representation must work
with self.assertRaises(UnicodeDecodeError):
ra.get_str()
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes())
# Content type of the new value must be correct
self.assertEqual('application/octet-stream', ra.get_content_type())
# Must not be a view
self.assertFalse(ra.is_view)
def test_append_multiple(self):
"""
Test appending multiple values to an empty RequestArgument.
"""
# Initialize the empty RequestArgument
ra = RequestArgument('foo')
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
# Append a first value
ra.append('text/plain', 'bar')
# New length must be 1, empty array must be converted to scalar
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
self.assertEqual(b'bar', ra.get_bytes())
# Append a second value
ra.append('application/octet-stream', b'\x00\x80\xff\xfe')
# New length must be 2, scalar must be converted to array
self.assertEqual(2, len(ra))
self.assertFalse(ra.is_scalar)
self.assertEqual(b'bar', ra.get_bytes(0))
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1))
# Append a third value
ra.append('text/plain', 'Hello, World!')
# New length must be 3, array must remain array
self.assertEqual(3, len(ra))
self.assertFalse(ra.is_scalar)
self.assertEqual(b'bar', ra.get_bytes(0))
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1))
self.assertEqual(b'Hello, World!', ra.get_bytes(2))
def test_clear_empty(self):
"""
Test clearing an empty RequestArgument.
"""
# Initialize the empty RequestArgument
ra = RequestArgument('foo')
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
ra.clear()
# Clearing an empty RequestArgument shouldn't have any effect
self.assertEqual('foo', ra.name)
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
def test_clear_scalar(self):
"""
Test clearing a scalar RequestArgument.
"""
# Initialize the scalar RequestArgument
ra = RequestArgument('foo', ('text/plain', 'bar'))
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
ra.clear()
# Clearing a scalar RequestArgument should reduce its size to 0
self.assertEqual('foo', ra.name)
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
with self.assertRaises(IndexError):
ra.get_str()
def test_clear_array(self):
"""
Test clearing an array RequestArgument.
"""
# Initialize the array RequestArgument
ra = RequestArgument('foo', [
('text/plain', 'bar'),
('application/octet-stream', b'\x00\x80\xff\xfe'),
('text/plain', 'baz'),
])
self.assertEqual(3, len(ra))
self.assertFalse(ra.is_scalar)
ra.clear()
# Clearing an array RequestArgument should reduce its size to 0
self.assertEqual('foo', ra.name)
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
with self.assertRaises(IndexError):
ra.get_str()
def test_iterate_empty(self):
"""
Test iterating an empty RequestArgument.
"""
ra = RequestArgument('foo')
self.assertEqual(0, len(ra))
# No value must be yielded from iterating an empty instance
for _ in ra:
self.fail()
def test_iterate_scalar(self):
"""
Test iterating a scalar RequestArgument.
"""
ra = RequestArgument('foo', ('text/plain', 'bar'))
self.assertTrue(ra.is_scalar)
# Counter for the number of iterations
count: int = 0
for it in ra:
# Make sure the yielded value is a scalar view and has the same name as the original instance
self.assertIsInstance(it, _View)
self.assertTrue(it.is_view)
self.assertEqual('foo', it.name)
self.assertTrue(it.is_scalar)
count += 1
# Only one value must be yielded from iterating a scalar instance
self.assertEqual(1, count)
def test_iterate_array(self):
"""
Test iterating an array RequestArgument.
"""
ra = RequestArgument('foo', [('text/plain', 'bar'), ('abc', b'def'), ('xyz', '1337')])
self.assertFalse(ra.is_scalar)
# Container to put the iterated ctypes into
items: List[str] = list()
for it in ra:
# Make sure the yielded values are scalar views and have the same name as the original instance
self.assertIsInstance(it, _View)
self.assertTrue(it.is_view)
self.assertTrue(it.is_scalar)
# Collect the value's ctype
items.append(it.get_content_type())
# Compare collected ctypes with expected result
self.assertEqual(['text/plain', 'abc', 'xyz'], items)
def test_slice(self):
"""
Test slicing an array RequestArgument.
"""
ra = RequestArgument('foo', [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h'), ('i', 'j'), ('k', 'l')])
# Create the sliced view
sliced = ra[1:4:2]
# Make sure the sliced value is a view
self.assertIsInstance(sliced, _View)
self.assertTrue(sliced.is_view)
# Make sure the slice has the same name
self.assertEqual('foo', sliced.name)
# Make sure the slice has the expected shape (array of the 2nd and 4th scalar in the original)
self.assertTrue(sliced.is_array)
self.assertEqual(2, len(sliced))
self.assertEqual('d', sliced.get_str(0))
self.assertEqual('h', sliced.get_str(1))
def test_iterate_sliced(self):
"""
Test iterating a sliced array RequestArgument.
"""
ra = RequestArgument('foo', [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h'), ('i', 'j'), ('k', 'l')])
# Container to put the iterated ctypes into
items: List[str] = list()
# Iterate the sliced view
for it in ra[1:4:2]:
# Make sure the yielded values are scalar views and have the same name as the original instance
self.assertIsInstance(it, _View)
self.assertTrue(it.is_view)
self.assertEqual('foo', it.name)
self.assertTrue(it.is_scalar)
items.append(it.get_content_type())
# Make sure the expected values are collected (array of the 2nd and 4th scalar in the original)
self.assertEqual(['c', 'g'], items)
def test_index_scalar(self):
"""
Test indexing of a scalar RequestArgument.
"""
ra = RequestArgument('foo', ('bar', 'baz'))
# Index the scalar RequestArgument instance, obtaining an immutable view
it = ra[0]
# Make sure the value is a scalar view with the same properties as the original instance
self.assertIsInstance(it, _View)
self.assertTrue(it.is_scalar)
self.assertEqual('foo', it.name)
self.assertEqual('bar', it.get_content_type())
self.assertEqual('baz', it.get_str())
# Make sure other indices don't work
with self.assertRaises(IndexError):
_ = ra[1]
def test_index_array(self):
"""
Test indexing of an array RequestArgument.
"""
ra = RequestArgument('foo', [('a', 'b'), ('c', 'd')])
# Index the array RequestArgument instance, obtaining an immutable view
it = ra[1]
# Make sure the value is a scalar view with the same properties as the value in the original instance
self.assertIsInstance(it, _View)
self.assertEqual('foo', it.name)
self.assertEqual('c', it.get_content_type())
self.assertEqual('d', it.get_str())
def test_view_immutable(self):
"""
Test immutability of views.
"""
ra = RequestArgument('foo', ('bar', 'baz'))
# Index the scalar RequestArgument instance, obtaining an immutable view
it = ra[0]
# Make sure the returned value is a view
self.assertIsInstance(it, _View)
# Make sure the returned value is immutable
with self.assertRaises(TypeError):
it.append('foo', 'bar')
with self.assertRaises(TypeError):
it.clear()
def test_str_shorthand(self):
"""
Test the shorthand for get_str(0).
"""
ra = RequestArgument('foo', ('bar', 'baz'))
self.assertEqual('baz', str(ra))
def test_bytes_shorthand(self):
"""
Test the shorthand for get_bytes(0).
"""
ra = RequestArgument('foo', ('bar', b'\x00\x80\xff\xfe'))
self.assertEqual(b'\x00\x80\xff\xfe', bytes(ra))
# noinspection PyTypeChecker
def test_insert_garbage(self):
"""
Test proper handling with non-int indices and non-str/non-bytes data
:return:
"""
ra = RequestArgument('foo', 42)
with self.assertRaises(TypeError):
str(ra)
ra = RequestArgument('foo', (None, 42))
with self.assertRaises(TypeError):
str(ra)
with self.assertRaises(TypeError):
bytes(ra)
with self.assertRaises(TypeError):
ra.get_content_type()
with self.assertRaises(TypeError):
ra.get_str('foo')
with self.assertRaises(TypeError):
ra.get_bytes('foo')
with self.assertRaises(TypeError):
ra.get_content_type('foo')
def test_requestarguments_index(self):
"""
Make sure indexing a RequestArguments instance creates a new entry on the fly.
"""
ra = RequestArguments()
self.assertEqual(0, len(ra))
self.assertFalse('foo' in ra)
# Create new entry
_ = ra['foo']
self.assertEqual(1, len(ra))
self.assertTrue('foo' in ra)
# Already exists, no new entry created
_ = ra['foo']
self.assertEqual(1, len(ra))
# Entry must be empty and mutable, and have the correct name
self.assertFalse(ra['foo'].is_view)
self.assertEqual(0, len(ra['foo']))
self.assertEqual('foo', ra['foo'].name)
# Key must be a string
with self.assertRaises(TypeError):
# noinspection PyTypeChecker
_ = ra[42]
def test_requestarguments_attr(self):
"""
Test attribute access syntactic sugar.
"""
ra = RequestArguments()
# Attribute should not exist yet
with self.assertRaises(KeyError):
_ = ra.foo
# Create entry
_ = ra['foo']
# Creating entry should have created the attribute
self.assertEqual('foo', ra.foo.name)
# Attribute access should yield an immutable view
self.assertTrue(ra.foo.is_view)
def test_requestarguments_iterate(self):
"""
Test iterating a RequestArguments instance.
"""
# Create an instance with some values
ra = RequestArguments()
ra['foo'].append('a', 'b')
ra['bar'].append('c', 'd')
ra['foo'].append('e', 'f')
# Container for test values (name, value)
items: Set[Tuple[str, str]] = set()
# Iterate RequestArguments instance, adding the name and value of each to the set
for a in ra:
items.add((a.name, str(a)))
# Compare result with expected value
self.assertEqual(2, len(items))
self.assertIn(('foo', 'b'), items)
self.assertIn(('bar', 'd'), items)
def test_requestarguments_full_use_case(self):
"""
Simulate a minimal RequestArguments use case.
"""
# Create empty RequestArguments instance
ra = RequestArguments()
# Parse GET request
getargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=42&bar=1337&foo=43&baz=Hello,%20World!')
# Insert GET arguments into RequestArguments
for k, vs in getargs.items():
for v in vs:
ra[k].append('text/plain', v)
# Parse POST request
postargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=postfoo&postbar=42&foo=postfoo')
# Insert POST arguments into RequestArguments
for k, vs in postargs.items():
# In this implementation, POST args replace GET args
ra[k].clear()
for v in vs:
ra[k].append('text/plain', v)
# Someplace else: Use the RequestArguments instance.
self.assertEqual('1337', ra.bar.get_str())
self.assertEqual('Hello, World!', ra.baz.get_str())
self.assertEqual('42', ra.postbar.get_str())
for a in ra.foo:
self.assertEqual('postfoo', a.get_str())

View file

@ -3,14 +3,14 @@ from typing import Any, Dict
import os
import os.path
from matemat.webserver.httpd import HttpHandler
from matemat.webserver import HttpHandler, RequestArguments
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
@test_pagelet('/just/testing/serve_pagelet_ok')
def serve_test_pagelet_ok(method: str,
path: str,
args: Dict[str, str],
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]):
headers['Content-Type'] = 'text/plain'
@ -20,7 +20,7 @@ def serve_test_pagelet_ok(method: str,
@test_pagelet('/just/testing/serve_pagelet_fail')
def serve_test_pagelet_fail(method: str,
path: str,
args: Dict[str, str],
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]):
session_vars['test'] = 'hello, world!'
@ -54,7 +54,7 @@ class TestServe(AbstractHttpdTest):
self.assertEqual('serve_test_pagelet_ok', packet.pagelet)
# Make sure the expected content is served
self.assertEqual(200, packet.statuscode)
self.assertEqual('serve test pagelet ok', packet.body)
self.assertEqual(b'serve test pagelet ok', packet.body)
def test_serve_pagelet_fail(self):
# Call the test pagelet that produces a 500 Internal Server Error result
@ -66,7 +66,7 @@ class TestServe(AbstractHttpdTest):
self.assertEqual('serve_test_pagelet_fail', packet.pagelet)
# Make sure the expected content is served
self.assertEqual(500, packet.statuscode)
self.assertEqual('serve test pagelet fail', packet.body)
self.assertEqual(b'serve test pagelet fail', packet.body)
def test_serve_static_ok(self):
# Request a static resource
@ -78,7 +78,7 @@ class TestServe(AbstractHttpdTest):
self.assertIsNone(packet.pagelet)
# Make sure the expected content is served
self.assertEqual(200, packet.statuscode)
self.assertEqual('static resource test', packet.body)
self.assertEqual(b'static resource test', packet.body)
def test_serve_static_forbidden(self):
# Request a static resource with lacking permissions
@ -90,7 +90,7 @@ class TestServe(AbstractHttpdTest):
self.assertIsNone(packet.pagelet)
# Make sure a 403 header is served
self.assertEqual(403, packet.statuscode)
self.assertNotEqual('This should not be readable', packet.body)
self.assertNotEqual(b'This should not be readable', packet.body)
def test_serve_not_found(self):
# Request a nonexistent resource
@ -116,7 +116,10 @@ class TestServe(AbstractHttpdTest):
def test_static_post_not_allowed(self):
# Request a resource outside the webroot
self.client_sock.set_request(b'POST /iwanttouploadthis HTTP/1.1\r\n\r\nq=this%20should%20not%20be%20uploaded')
self.client_sock.set_request(b'POST /iwanttopostthis HTTP/1.1\r\n'
b'Content-Type: application/x-www-form-urlencoded\r\n'
b'Content-length: 37\r\n\r\n'
b'q=this%20should%20not%20be%20uploaded')
HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response()

View file

@ -4,14 +4,14 @@ from typing import Any, Dict
from datetime import datetime, timedelta
from time import sleep
from matemat.webserver.httpd import HttpHandler
from matemat.webserver import HttpHandler, RequestArguments
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
@test_pagelet('/just/testing/sessions')
def session_test_pagelet(method: str,
path: str,
args: Dict[str, str],
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str]):
session_vars['test'] = 'hello, world!'

135
matemat/webserver/util.py Normal file
View file

@ -0,0 +1,135 @@
from typing import Dict, List, Tuple, Optional
import urllib.parse
from matemat.webserver import RequestArguments, RequestArgument
def _parse_multipart(body: bytes, boundary: str) -> List[RequestArgument]:
"""
Given a HTTP body with form-data in multipart form, and the multipart-boundary, parse the multipart items and
return them as a dictionary.
:param body: The HTTP multipart/form-data body.
:param boundary: The multipart boundary.
:return: A dictionary of field names as key, and content types and field values as value.
"""
# Prepend a CRLF for the first boundary to match
body = b'\r\n' + body
# Generate item header boundary and terminating boundary from general boundary string
_boundary = f'\r\n--{boundary}\r\n'.encode('utf-8')
_end_boundary = f'\r\n--{boundary}--\r\n'.encode('utf-8')
# Split at the end boundary and make sure there comes nothing after it
allparts = body.split(_end_boundary, 1)
if len(allparts) != 2 or allparts[1] != b'':
raise ValueError('Last boundary missing or corrupted')
# Split remaining body into its parts, and verify at least 1 part is there
parts: List[bytes] = (allparts[0]).split(_boundary)
if len(parts) < 1 or parts[0] != b'':
raise ValueError('First boundary missing or corrupted')
# Remove the first, empty part
parts = parts[1:]
# Results go into this dict
args: Dict[str, RequestArgument] = dict()
# Parse each multipart part
for part in parts:
# Parse multipart headers
hdr: Dict[str, str] = dict()
while True:
head, part = part.split(b'\r\n', 1)
# Break on header/body delimiter
if head == b'':
break
# Add header to hdr dict
hk, hv = head.decode('utf-8').split(':')
hdr[hk.strip()] = hv.strip()
# At least Content-Type and Content-Disposition must be present
if 'Content-Type' not in hdr or 'Content-Disposition' not in hdr:
raise ValueError('Missing Content-Type or Content-Disposition header')
# Extract Content-Disposition header value and its arguments
cd, *cdargs = hdr['Content-Disposition'].split(';')
# Content-Disposition MUST be form-data; everything else is rejected
if cd.strip() != 'form-data':
raise ValueError(f'Unknown Content-Disposition: {cd}')
# Extract the "name" header argument
has_name = False
for cdarg in cdargs:
k, v = cdarg.split('=', 1)
if k.strip() == 'name':
has_name = True
name: str = v.strip()
# Remove quotation marks around the name value
if name.startswith('"') and name.endswith('"'):
name = v[1:-1]
# Add the Content-Type and the content to the header, with the provided name
if name not in args:
args[name] = RequestArgument(name)
args[name].append(hdr['Content-Type'].strip(), part)
if not has_name:
# Content-Disposition header without name attribute
raise ValueError('mutlipart/form-data part without name attribute')
return list(args.values())
def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 'text/plain') \
-> Tuple[str, RequestArguments]:
"""
Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded or
multipart/form-data form, parse the arguments and return them as a dictionary.
If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded.
:param request: The request string to parse.
:param postbody: The POST body to parse, defaults to None.
:param enctype: Encoding of the POST body; supported values are application/x-www-form-urlencoded and
multipart/form-data.
:return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs, and the value's
content type.
"""
# Parse the request "URL" (i.e. only the path)
tokens = urllib.parse.urlparse(request)
# Parse the GET arguments
if len(tokens.query) == 0:
getargs = dict()
else:
getargs = urllib.parse.parse_qs(tokens.query, strict_parsing=True, keep_blank_values=True, errors='strict')
args = RequestArguments()
for k, vs in getargs.items():
args[k].clear()
for v in vs:
args[k].append('text/plain', v)
if postbody is not None:
if enctype == 'application/x-www-form-urlencoded':
# Parse the POST body
pb: str = postbody.decode('utf-8')
if len(pb) == 0:
postargs = dict()
else:
postargs = urllib.parse.parse_qs(pb, strict_parsing=True, keep_blank_values=True, errors='strict')
# Write all POST values into the dict, overriding potential duplicates from GET
for k, vs in postargs.items():
args[k].clear()
for v in vs:
args[k].append('text/plain', v)
elif enctype.startswith('multipart/form-data'):
# Parse the multipart boundary from the Content-Type header
try:
boundary: str = enctype.split('boundary=')[1].strip()
except IndexError:
raise ValueError('Multipart boundary in header not set or corrupted')
# Parse the multipart body
mpargs = _parse_multipart(postbody, boundary)
for ra in mpargs:
args[ra.name].clear()
for a in ra:
args[ra.name].append(a.get_content_type(), bytes(a))
else:
raise ValueError(f'Unsupported Content-Type: {enctype}')
# Return the path and the parsed arguments
return tokens.path, args