matemat/matemat/webserver/util.py

138 lines
6 KiB
Python

from typing import Dict, List, Tuple, Optional
import urllib.parse
from matemat.webserver import RequestArguments, RequestArgument
def _parse_multipart(body: bytes, boundary: str) -> List[RequestArgument]:
"""
Given a HTTP body with form-data in multipart form, and the multipart-boundary, parse the multipart items and
return them as a dictionary.
:param body: The HTTP multipart/form-data body.
:param boundary: The multipart boundary.
:return: A dictionary of field names as key, and content types and field values as value.
"""
# Prepend a CRLF for the first boundary to match
body = b'\r\n' + body
# Generate item header boundary and terminating boundary from general boundary string
_boundary = f'\r\n--{boundary}\r\n'.encode('utf-8')
_end_boundary = f'\r\n--{boundary}--\r\n'.encode('utf-8')
# Split at the end boundary and make sure there comes nothing after it
allparts = body.split(_end_boundary, 1)
if len(allparts) != 2 or allparts[1] != b'':
raise ValueError('Last boundary missing or corrupted')
# Split remaining body into its parts, and verify at least 1 part is there
parts: List[bytes] = (allparts[0]).split(_boundary)
if len(parts) < 1 or parts[0] != b'':
raise ValueError('First boundary missing or corrupted')
# Remove the first, empty part
parts = parts[1:]
# Results go into this dict
args: Dict[str, RequestArgument] = dict()
# Parse each multipart part
for part in parts:
# Parse multipart headers
hdr: Dict[str, str] = dict()
while True:
head, part = part.split(b'\r\n', 1)
# Break on header/body delimiter
if head == b'':
break
# Add header to hdr dict
hk, hv = head.decode('utf-8').split(':')
hdr[hk.strip()] = hv.strip()
# No content type set - set broadest possible type
if 'Content-Type' not in hdr:
hdr['Content-Type'] = 'application/octet-stream'
# At least Content-Disposition must be present
if 'Content-Disposition' not in hdr:
raise ValueError('Missing Content-Type or Content-Disposition header')
# Extract Content-Disposition header value and its arguments
cd, *cdargs = hdr['Content-Disposition'].split(';')
# Content-Disposition MUST be form-data; everything else is rejected
if cd.strip() != 'form-data':
raise ValueError(f'Unknown Content-Disposition: {cd}')
# Extract the "name" header argument
has_name = False
for cdarg in cdargs:
k, v = cdarg.split('=', 1)
if k.strip() == 'name':
has_name = True
name: str = v.strip()
# Remove quotation marks around the name value
if name.startswith('"') and name.endswith('"'):
name = v[1:-1]
# Add the Content-Type and the content to the header, with the provided name
if name not in args:
args[name] = RequestArgument(name)
args[name].append(hdr['Content-Type'].strip(), part)
if not has_name:
# Content-Disposition header without name attribute
raise ValueError('mutlipart/form-data part without name attribute')
return list(args.values())
def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 'text/plain') \
-> Tuple[str, RequestArguments]:
"""
Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded or
multipart/form-data form, parse the arguments and return them as a dictionary.
If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded.
:param request: The request string to parse.
:param postbody: The POST body to parse, defaults to None.
:param enctype: Encoding of the POST body; supported values are application/x-www-form-urlencoded and
multipart/form-data.
:return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs, and the value's
content type.
"""
# Parse the request "URL" (i.e. only the path)
tokens = urllib.parse.urlparse(request)
# Parse the GET arguments
if len(tokens.query) == 0:
getargs: Dict[str, List[str]] = dict()
else:
getargs = urllib.parse.parse_qs(tokens.query, strict_parsing=True, keep_blank_values=True, errors='strict')
args = RequestArguments()
for k, vs in getargs.items():
args[k].clear()
for v in vs:
args[k].append('text/plain', v)
if postbody is not None:
if enctype == 'application/x-www-form-urlencoded':
# Parse the POST body
pb: str = postbody.decode('utf-8')
if len(pb) == 0:
postargs: Dict[str, List[str]] = dict()
else:
postargs = urllib.parse.parse_qs(pb, strict_parsing=True, keep_blank_values=True, errors='strict')
# Write all POST values into the dict, overriding potential duplicates from GET
for k, vs in postargs.items():
args[k].clear()
for v in vs:
args[k].append('text/plain', v)
elif enctype.startswith('multipart/form-data'):
# Parse the multipart boundary from the Content-Type header
try:
boundary: str = enctype.split('boundary=')[1].strip()
except IndexError:
raise ValueError('Multipart boundary in header not set or corrupted')
# Parse the multipart body
mpargs = _parse_multipart(postbody, boundary)
for ra in mpargs:
args[ra.name].clear()
for a in ra:
args[ra.name].append(a.get_content_type(), bytes(a))
else:
raise ValueError(f'Unsupported Content-Type: {enctype}')
# Return the path and the parsed arguments
return tokens.path, args