forked from s3lph/matemat
New request parsing (WIP: Documentation)
This commit is contained in:
parent
5bb1dfad21
commit
118de8bf95
16 changed files with 774 additions and 144 deletions
|
@ -6,4 +6,5 @@ API that can be used by 'pagelets' - single pages of a web service. If a reques
|
||||||
server will attempt to serve the request with a static resource in a previously configured webroot directory.
|
server will attempt to serve the request with a static resource in a previously configured webroot directory.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from .requestargs import RequestArgument
|
||||||
from .httpd import MatematWebserver, HttpHandler, pagelet
|
from .httpd import MatematWebserver, HttpHandler, pagelet
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
|
|
||||||
from typing import Any, Callable, Dict, List, Tuple, Union
|
from typing import Any, Callable, Dict, Tuple, Union
|
||||||
|
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
|
@ -13,6 +13,7 @@ from uuid import uuid4
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
from matemat import __version__ as matemat_version
|
from matemat import __version__ as matemat_version
|
||||||
|
from matemat.webserver import RequestArgument
|
||||||
from matemat.webserver.util import parse_args
|
from matemat.webserver.util import parse_args
|
||||||
|
|
||||||
|
|
||||||
|
@ -30,7 +31,7 @@ BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None
|
||||||
# Dictionary to hold registered pagelet paths and their handler functions
|
# Dictionary to hold registered pagelet paths and their handler functions
|
||||||
_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
|
_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
|
||||||
str, # Request path
|
str, # Request path
|
||||||
Dict[str, Tuple[str, Union[bytes, str, List[str]]]], # args: (name, (type, value))
|
Dict[str, RequestArgument], # args: (name, argument)
|
||||||
Dict[str, Any], # Session vars
|
Dict[str, Any], # Session vars
|
||||||
Dict[str, str]], # Response headers
|
Dict[str, str]], # Response headers
|
||||||
Tuple[int, Union[bytes, str]]]] = dict() # Returns: (status code, response body)
|
Tuple[int, Union[bytes, str]]]] = dict() # Returns: (status code, response body)
|
||||||
|
@ -50,15 +51,14 @@ def pagelet(path: str):
|
||||||
|
|
||||||
(method: str,
|
(method: str,
|
||||||
path: str,
|
path: str,
|
||||||
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
|
args: Dict[str, RequestArgument],
|
||||||
session_vars: Dict[str, Any],
|
session_vars: Dict[str, Any],
|
||||||
headers: Dict[str, str])
|
headers: Dict[str, str])
|
||||||
-> (int, Optional[Union[str, bytes]])
|
-> (int, Optional[Union[str, bytes]])
|
||||||
|
|
||||||
method: The HTTP method (GET, POST) that was used.
|
method: The HTTP method (GET, POST) that was used.
|
||||||
path: The path that was requested.
|
path: The path that was requested.
|
||||||
args: The arguments that were passed with the request (as GET or POST arguments), each of which may be
|
args: The arguments that were passed with the request (as GET or POST arguments).
|
||||||
either a str or bytes object, or a list of str.
|
|
||||||
session_vars: The session storage. May be read from and written to.
|
session_vars: The session storage. May be read from and written to.
|
||||||
headers: The dictionary of HTTP response headers. Add headers you wish to send with the response.
|
headers: The dictionary of HTTP response headers. Add headers you wish to send with the response.
|
||||||
returns: A tuple consisting of the HTTP status code (as an int) and the response body (as str or bytes,
|
returns: A tuple consisting of the HTTP status code (as an int) and the response body (as str or bytes,
|
||||||
|
@ -69,7 +69,7 @@ def pagelet(path: str):
|
||||||
|
|
||||||
def http_handler(fun: Callable[[str,
|
def http_handler(fun: Callable[[str,
|
||||||
str,
|
str,
|
||||||
Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
|
Dict[str, RequestArgument],
|
||||||
Dict[str, Any],
|
Dict[str, Any],
|
||||||
Dict[str, str]],
|
Dict[str, str]],
|
||||||
Tuple[int, Union[bytes, str]]]):
|
Tuple[int, Union[bytes, str]]]):
|
||||||
|
@ -181,7 +181,7 @@ class HttpHandler(BaseHTTPRequestHandler):
|
||||||
if session_id in self.server.session_vars:
|
if session_id in self.server.session_vars:
|
||||||
del self.server.session_vars[session_id]
|
del self.server.session_vars[session_id]
|
||||||
|
|
||||||
def _handle(self, method: str, path: str, args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]]) -> None:
|
def _handle(self, method: str, path: str, args: Dict[str, RequestArgument]) -> None:
|
||||||
"""
|
"""
|
||||||
Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource.
|
Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource.
|
||||||
|
|
||||||
|
|
|
@ -8,4 +8,3 @@ from .main import main_page
|
||||||
from .login import login_page
|
from .login import login_page
|
||||||
from .logout import logout
|
from .logout import logout
|
||||||
from .touchkey import touchkey_page
|
from .touchkey import touchkey_page
|
||||||
from .upload_test import upload_test
|
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
|
|
||||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
from typing import Any, Dict, Optional, Tuple, Union
|
||||||
|
|
||||||
from matemat.exceptions import AuthenticationError
|
from matemat.exceptions import AuthenticationError
|
||||||
from matemat.webserver import pagelet
|
from matemat.webserver import pagelet, RequestArgument
|
||||||
from matemat.primitives import User
|
from matemat.primitives import User
|
||||||
from matemat.db import MatematDatabase
|
from matemat.db import MatematDatabase
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ from matemat.db import MatematDatabase
|
||||||
@pagelet('/login')
|
@pagelet('/login')
|
||||||
def login_page(method: str,
|
def login_page(method: str,
|
||||||
path: str,
|
path: str,
|
||||||
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
|
args: Dict[str, RequestArgument],
|
||||||
session_vars: Dict[str, Any],
|
session_vars: Dict[str, Any],
|
||||||
headers: Dict[str, str])\
|
headers: Dict[str, str])\
|
||||||
-> Tuple[int, Optional[Union[str, bytes]]]:
|
-> Tuple[int, Optional[Union[str, bytes]]]:
|
||||||
|
@ -43,15 +43,11 @@ def login_page(method: str,
|
||||||
'''
|
'''
|
||||||
return 200, data.format(msg=args['msg'] if 'msg' in args else '')
|
return 200, data.format(msg=args['msg'] if 'msg' in args else '')
|
||||||
elif method == 'POST':
|
elif method == 'POST':
|
||||||
if 'username' not in args or not isinstance(args['username'], str):
|
username: RequestArgument = args['username']
|
||||||
return 400, None
|
password: RequestArgument = args['password']
|
||||||
if 'password' not in args or not isinstance(args['password'], str):
|
|
||||||
return 400, None
|
|
||||||
username: str = args['username']
|
|
||||||
password: str = args['password']
|
|
||||||
with MatematDatabase('test.db') as db:
|
with MatematDatabase('test.db') as db:
|
||||||
try:
|
try:
|
||||||
user: User = db.login(username, password)
|
user: User = db.login(username.get_str(), password.get_str())
|
||||||
except AuthenticationError:
|
except AuthenticationError:
|
||||||
headers['Location'] = '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.'
|
headers['Location'] = '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.'
|
||||||
return 301, bytes()
|
return 301, bytes()
|
||||||
|
|
|
@ -1,13 +1,13 @@
|
||||||
|
|
||||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||||
|
|
||||||
from matemat.webserver import pagelet
|
from matemat.webserver import pagelet, RequestArgument
|
||||||
|
|
||||||
|
|
||||||
@pagelet('/logout')
|
@pagelet('/logout')
|
||||||
def logout(method: str,
|
def logout(method: str,
|
||||||
path: str,
|
path: str,
|
||||||
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
|
args: Dict[str, RequestArgument],
|
||||||
session_vars: Dict[str, Any],
|
session_vars: Dict[str, Any],
|
||||||
headers: Dict[str, str])\
|
headers: Dict[str, str])\
|
||||||
-> Tuple[int, Optional[Union[str, bytes]]]:
|
-> Tuple[int, Optional[Union[str, bytes]]]:
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
|
|
||||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||||
|
|
||||||
from matemat.webserver import MatematWebserver, pagelet
|
from matemat.webserver import MatematWebserver, pagelet, RequestArgument
|
||||||
from matemat.primitives import User
|
from matemat.primitives import User
|
||||||
from matemat.db import MatematDatabase
|
from matemat.db import MatematDatabase
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ from matemat.db import MatematDatabase
|
||||||
@pagelet('/')
|
@pagelet('/')
|
||||||
def main_page(method: str,
|
def main_page(method: str,
|
||||||
path: str,
|
path: str,
|
||||||
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
|
args: Dict[str, RequestArgument],
|
||||||
session_vars: Dict[str, Any],
|
session_vars: Dict[str, Any],
|
||||||
headers: Dict[str, str])\
|
headers: Dict[str, str])\
|
||||||
-> Tuple[int, Optional[Union[str, bytes]]]:
|
-> Tuple[int, Optional[Union[str, bytes]]]:
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||||
|
|
||||||
from matemat.exceptions import AuthenticationError
|
from matemat.exceptions import AuthenticationError
|
||||||
from matemat.webserver import pagelet
|
from matemat.webserver import pagelet, RequestArgument
|
||||||
from matemat.primitives import User
|
from matemat.primitives import User
|
||||||
from matemat.db import MatematDatabase
|
from matemat.db import MatematDatabase
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ from matemat.db import MatematDatabase
|
||||||
@pagelet('/touchkey')
|
@pagelet('/touchkey')
|
||||||
def touchkey_page(method: str,
|
def touchkey_page(method: str,
|
||||||
path: str,
|
path: str,
|
||||||
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
|
args: Dict[str, RequestArgument],
|
||||||
session_vars: Dict[str, Any],
|
session_vars: Dict[str, Any],
|
||||||
headers: Dict[str, str])\
|
headers: Dict[str, str])\
|
||||||
-> Tuple[int, Optional[Union[str, bytes]]]:
|
-> Tuple[int, Optional[Union[str, bytes]]]:
|
||||||
|
@ -42,15 +42,11 @@ def touchkey_page(method: str,
|
||||||
'''
|
'''
|
||||||
return 200, data.format(username=args['username'] if 'username' in args else '')
|
return 200, data.format(username=args['username'] if 'username' in args else '')
|
||||||
elif method == 'POST':
|
elif method == 'POST':
|
||||||
if 'username' not in args or not isinstance(args['username'], str):
|
username: RequestArgument = args['username']
|
||||||
return 400, None
|
touchkey: RequestArgument = args['touchkey']
|
||||||
if 'touchkey' not in args or not isinstance(args['touchkey'], str):
|
|
||||||
return 400, None
|
|
||||||
username: str = args['username']
|
|
||||||
touchkey: str = args['touchkey']
|
|
||||||
with MatematDatabase('test.db') as db:
|
with MatematDatabase('test.db') as db:
|
||||||
try:
|
try:
|
||||||
user: User = db.login(username, touchkey=touchkey)
|
user: User = db.login(username.get_str(), touchkey=touchkey.get_str())
|
||||||
except AuthenticationError:
|
except AuthenticationError:
|
||||||
headers['Location'] = f'/touchkey?username={args["username"]}&msg=Please%20try%20again.'
|
headers['Location'] = f'/touchkey?username={args["username"]}&msg=Please%20try%20again.'
|
||||||
return 301, bytes()
|
return 301, bytes()
|
||||||
|
|
|
@ -1,28 +0,0 @@
|
||||||
|
|
||||||
from typing import Any, Dict, Union
|
|
||||||
|
|
||||||
from matemat.webserver import pagelet
|
|
||||||
|
|
||||||
|
|
||||||
@pagelet('/upload')
|
|
||||||
def upload_test(method: str,
|
|
||||||
path: str,
|
|
||||||
args: Dict[str, Union[str, bytes]],
|
|
||||||
session_vars: Dict[str, Any],
|
|
||||||
headers: Dict[str, str]):
|
|
||||||
if method == 'GET':
|
|
||||||
return 200, '''
|
|
||||||
<!DOCTYPE html>
|
|
||||||
<html>
|
|
||||||
<body>
|
|
||||||
<form method="post" action="#" enctype="multipart/form-data">
|
|
||||||
<input type="file" name="testfile" />
|
|
||||||
<input type="hidden" name="teststatic" value="statictest" />
|
|
||||||
<input type="submit" value="Upload" />
|
|
||||||
</form>
|
|
||||||
</body>
|
|
||||||
</html>
|
|
||||||
'''
|
|
||||||
else:
|
|
||||||
headers['Content-Type'] = 'text/plain'
|
|
||||||
return 200, args.items().__str__()
|
|
121
matemat/webserver/requestargs.py
Normal file
121
matemat/webserver/requestargs.py
Normal file
|
@ -0,0 +1,121 @@
|
||||||
|
|
||||||
|
from typing import List, Optional, Tuple, Union
|
||||||
|
|
||||||
|
|
||||||
|
class RequestArgument(object):
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
name: str,
|
||||||
|
value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]] = None) -> None:
|
||||||
|
self.__name: str = name
|
||||||
|
self.__value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]] = None
|
||||||
|
if value is None:
|
||||||
|
self.__value = []
|
||||||
|
else:
|
||||||
|
if isinstance(value, list):
|
||||||
|
if len(value) == 1:
|
||||||
|
self.__value = value[0]
|
||||||
|
else:
|
||||||
|
self.__value = value
|
||||||
|
else:
|
||||||
|
self.__value = value
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_array(self) -> bool:
|
||||||
|
return isinstance(self.__value, list)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_scalar(self) -> bool:
|
||||||
|
return not isinstance(self.__value, list)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_view(self) -> bool:
|
||||||
|
return False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self) -> str:
|
||||||
|
return self.__name
|
||||||
|
|
||||||
|
def get_str(self, index: int = None) -> Optional[str]:
|
||||||
|
if self.is_array:
|
||||||
|
if index is None:
|
||||||
|
raise ValueError('index must not be None')
|
||||||
|
v: Tuple[str, Union[bytes, str]] = self.__value[index]
|
||||||
|
if isinstance(v[1], str):
|
||||||
|
return v[1]
|
||||||
|
elif isinstance(v[1], bytes):
|
||||||
|
return v[1].decode('utf-8')
|
||||||
|
else:
|
||||||
|
if index is not None:
|
||||||
|
raise ValueError('index must be None')
|
||||||
|
if isinstance(self.__value[1], str):
|
||||||
|
return self.__value[1]
|
||||||
|
elif isinstance(self.__value[1], bytes):
|
||||||
|
return self.__value[1].decode('utf-8')
|
||||||
|
|
||||||
|
def get_bytes(self, index: int = None) -> Optional[bytes]:
|
||||||
|
if self.is_array:
|
||||||
|
if index is None:
|
||||||
|
raise ValueError('index must not be None')
|
||||||
|
v: Tuple[str, Union[bytes, str]] = self.__value[index]
|
||||||
|
if isinstance(v[1], bytes):
|
||||||
|
return v[1]
|
||||||
|
elif isinstance(v[1], str):
|
||||||
|
return v[1].encode('utf-8')
|
||||||
|
else:
|
||||||
|
if index is not None:
|
||||||
|
raise ValueError('index must be None')
|
||||||
|
if isinstance(self.__value[1], bytes):
|
||||||
|
return self.__value[1]
|
||||||
|
elif isinstance(self.__value[1], str):
|
||||||
|
return self.__value[1].encode('utf-8')
|
||||||
|
|
||||||
|
def get_content_type(self, index: int = None) -> Optional[str]:
|
||||||
|
if self.is_array:
|
||||||
|
if index is None:
|
||||||
|
raise ValueError('index must not be None')
|
||||||
|
v: Tuple[str, Union[bytes, str]] = self.__value[index]
|
||||||
|
return v[0]
|
||||||
|
else:
|
||||||
|
if index is not None:
|
||||||
|
raise ValueError('index must be None')
|
||||||
|
return self.__value[0]
|
||||||
|
|
||||||
|
def append(self, ctype: str, value: Union[str, bytes]):
|
||||||
|
if self.is_view:
|
||||||
|
raise TypeError('A RequestArgument view is immutable!')
|
||||||
|
if len(self) == 0:
|
||||||
|
self.__value = ctype, value
|
||||||
|
else:
|
||||||
|
if self.is_scalar:
|
||||||
|
self.__value = [self.__value]
|
||||||
|
self.__value.append((ctype, value))
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self.__value) if self.is_array else 1
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
if self.is_scalar:
|
||||||
|
yield _View(self.__name, self.__value)
|
||||||
|
else:
|
||||||
|
# Typing helper
|
||||||
|
_value: List[Tuple[str, Union[bytes, str]]] = self.__value
|
||||||
|
for v in _value:
|
||||||
|
yield _View(self.__name, v)
|
||||||
|
|
||||||
|
def __getitem__(self, index: Union[int, slice]):
|
||||||
|
if self.is_scalar:
|
||||||
|
if index == 0:
|
||||||
|
return _View(self.__name, self.__value)
|
||||||
|
raise ValueError('Scalar RequestArgument only indexable with 0')
|
||||||
|
return _View(self.__name, self.__value[index])
|
||||||
|
|
||||||
|
|
||||||
|
class _View(RequestArgument):
|
||||||
|
|
||||||
|
def __init__(self, name: str, value: Union[Tuple[str, Union[bytes, str]], List[Tuple[str, Union[bytes, str]]]]):
|
||||||
|
super().__init__(name, value)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_view(self) -> bool:
|
||||||
|
return True
|
|
@ -9,7 +9,7 @@ from abc import ABC
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from http.server import HTTPServer
|
from http.server import HTTPServer
|
||||||
|
|
||||||
from matemat.webserver.httpd import pagelet
|
from matemat.webserver import pagelet, RequestArgument
|
||||||
|
|
||||||
|
|
||||||
class HttpResponse:
|
class HttpResponse:
|
||||||
|
@ -158,14 +158,14 @@ def test_pagelet(path: str):
|
||||||
|
|
||||||
def with_testing_headers(fun: Callable[[str,
|
def with_testing_headers(fun: Callable[[str,
|
||||||
str,
|
str,
|
||||||
Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
|
Dict[str, RequestArgument],
|
||||||
Dict[str, Any],
|
Dict[str, Any],
|
||||||
Dict[str, str]],
|
Dict[str, str]],
|
||||||
Tuple[int, Union[bytes, str]]]):
|
Tuple[int, Union[bytes, str]]]):
|
||||||
@pagelet(path)
|
@pagelet(path)
|
||||||
def testing_wrapper(method: str,
|
def testing_wrapper(method: str,
|
||||||
path: str,
|
path: str,
|
||||||
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
|
args: Dict[str, RequestArgument],
|
||||||
session_vars: Dict[str, Any],
|
session_vars: Dict[str, Any],
|
||||||
headers: Dict[str, str]):
|
headers: Dict[str, str]):
|
||||||
status, body = fun(method, path, args, session_vars, headers)
|
status, body = fun(method, path, args, session_vars, headers)
|
||||||
|
|
347
matemat/webserver/test/test_parse_request.py
Normal file
347
matemat/webserver/test/test_parse_request.py
Normal file
|
@ -0,0 +1,347 @@
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from matemat.webserver.util import parse_args
|
||||||
|
|
||||||
|
|
||||||
|
class TestParseRequest(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_parse_get_root(self):
|
||||||
|
path, args = parse_args('/')
|
||||||
|
self.assertEqual('/', path)
|
||||||
|
self.assertEqual(0, len(args))
|
||||||
|
|
||||||
|
def test_parse_get_no_args(self):
|
||||||
|
path, args = parse_args('/index.html')
|
||||||
|
self.assertEqual('/index.html', path)
|
||||||
|
self.assertEqual(0, len(args))
|
||||||
|
|
||||||
|
def test_parse_get_root_getargs(self):
|
||||||
|
path, args = parse_args('/?foo=42&bar=1337&baz=Hello,%20World!')
|
||||||
|
self.assertEqual('/', path)
|
||||||
|
self.assertEqual(3, len(args))
|
||||||
|
self.assertIn('foo', args.keys())
|
||||||
|
self.assertIn('bar', args.keys())
|
||||||
|
self.assertIn('baz', args.keys())
|
||||||
|
self.assertTrue(args['foo'].is_scalar)
|
||||||
|
self.assertTrue(args['bar'].is_scalar)
|
||||||
|
self.assertTrue(args['baz'].is_scalar)
|
||||||
|
self.assertEqual('text/plain', args['foo'].get_content_type())
|
||||||
|
self.assertEqual('text/plain', args['bar'].get_content_type())
|
||||||
|
self.assertEqual('text/plain', args['baz'].get_content_type())
|
||||||
|
self.assertEqual('42', args['foo'].get_str())
|
||||||
|
self.assertEqual('1337', args['bar'].get_str())
|
||||||
|
self.assertEqual('Hello, World!', args['baz'].get_str())
|
||||||
|
|
||||||
|
def test_parse_get_getargs(self):
|
||||||
|
path, args = parse_args('/abc/def?foo=42&bar=1337&baz=Hello,%20World!')
|
||||||
|
self.assertEqual('/abc/def', path)
|
||||||
|
self.assertEqual(3, len(args))
|
||||||
|
self.assertIn('foo', args.keys())
|
||||||
|
self.assertIn('bar', args.keys())
|
||||||
|
self.assertIn('baz', args.keys())
|
||||||
|
self.assertTrue(args['foo'].is_scalar)
|
||||||
|
self.assertTrue(args['bar'].is_scalar)
|
||||||
|
self.assertTrue(args['baz'].is_scalar)
|
||||||
|
self.assertEqual('text/plain', args['foo'].get_content_type())
|
||||||
|
self.assertEqual('text/plain', args['bar'].get_content_type())
|
||||||
|
self.assertEqual('text/plain', args['baz'].get_content_type())
|
||||||
|
self.assertEqual('42', args['foo'].get_str())
|
||||||
|
self.assertEqual('1337', args['bar'].get_str())
|
||||||
|
self.assertEqual('Hello, World!', args['baz'].get_str())
|
||||||
|
|
||||||
|
def test_parse_get_getarray(self):
|
||||||
|
path, args = parse_args('/abc/def?foo=42&foo=1337&baz=Hello,%20World!')
|
||||||
|
self.assertEqual('/abc/def', path)
|
||||||
|
self.assertEqual(2, len(args))
|
||||||
|
self.assertIn('foo', args.keys())
|
||||||
|
self.assertIn('baz', args.keys())
|
||||||
|
self.assertTrue(args['foo'].is_array)
|
||||||
|
self.assertTrue(args['baz'].is_scalar)
|
||||||
|
self.assertEqual(2, len(args['foo']))
|
||||||
|
self.assertEqual('42', args['foo'].get_str(0))
|
||||||
|
self.assertEqual('1337', args['foo'].get_str(1))
|
||||||
|
|
||||||
|
def test_parse_get_zero_arg(self):
|
||||||
|
path, args = parse_args('/abc/def?foo=&bar=42')
|
||||||
|
self.assertEqual(2, len(args))
|
||||||
|
self.assertIn('foo', args.keys())
|
||||||
|
self.assertIn('bar', args.keys())
|
||||||
|
self.assertTrue(args['foo'].is_scalar)
|
||||||
|
self.assertTrue(args['bar'].is_scalar)
|
||||||
|
self.assertEqual(1, len(args['foo']))
|
||||||
|
self.assertEqual('', args['foo'].get_str())
|
||||||
|
self.assertEqual('42', args['bar'].get_str())
|
||||||
|
|
||||||
|
def test_parse_get_urlencoded_encoding_fail(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
parse_args('/?foo=42&bar=%80&baz=Hello,%20World!')
|
||||||
|
|
||||||
|
def test_parse_post_urlencoded(self):
|
||||||
|
path, args = parse_args('/',
|
||||||
|
postbody=b'foo=42&bar=1337&baz=Hello,%20World!',
|
||||||
|
enctype='application/x-www-form-urlencoded')
|
||||||
|
self.assertEqual('/', path)
|
||||||
|
self.assertEqual(3, len(args))
|
||||||
|
self.assertIn('foo', args.keys())
|
||||||
|
self.assertIn('bar', args.keys())
|
||||||
|
self.assertIn('baz', args.keys())
|
||||||
|
self.assertTrue(args['foo'].is_scalar)
|
||||||
|
self.assertTrue(args['bar'].is_scalar)
|
||||||
|
self.assertTrue(args['baz'].is_scalar)
|
||||||
|
self.assertEqual('text/plain', args['foo'].get_content_type())
|
||||||
|
self.assertEqual('text/plain', args['bar'].get_content_type())
|
||||||
|
self.assertEqual('text/plain', args['baz'].get_content_type())
|
||||||
|
self.assertEqual('42', args['foo'].get_str())
|
||||||
|
self.assertEqual('1337', args['bar'].get_str())
|
||||||
|
self.assertEqual('Hello, World!', args['baz'].get_str())
|
||||||
|
|
||||||
|
def test_parse_post_urlencoded_array(self):
|
||||||
|
path, args = parse_args('/',
|
||||||
|
postbody=b'foo=42&foo=1337&baz=Hello,%20World!',
|
||||||
|
enctype='application/x-www-form-urlencoded')
|
||||||
|
self.assertEqual('/', path)
|
||||||
|
self.assertEqual(2, len(args))
|
||||||
|
self.assertIn('foo', args.keys())
|
||||||
|
self.assertIn('baz', args.keys())
|
||||||
|
self.assertTrue(args['foo'].is_array)
|
||||||
|
self.assertTrue(args['baz'].is_scalar)
|
||||||
|
self.assertEqual(2, len(args['foo']))
|
||||||
|
self.assertEqual('42', args['foo'].get_str(0))
|
||||||
|
self.assertEqual('1337', args['foo'].get_str(1))
|
||||||
|
|
||||||
|
def test_parse_post_urlencoded_zero_arg(self):
|
||||||
|
path, args = parse_args('/abc/def', postbody=b'foo=&bar=42', enctype='application/x-www-form-urlencoded')
|
||||||
|
self.assertEqual(2, len(args))
|
||||||
|
self.assertIn('foo', args.keys())
|
||||||
|
self.assertIn('bar', args.keys())
|
||||||
|
self.assertTrue(args['foo'].is_scalar)
|
||||||
|
self.assertTrue(args['bar'].is_scalar)
|
||||||
|
self.assertEqual(1, len(args['foo']))
|
||||||
|
self.assertEqual('', args['foo'].get_str())
|
||||||
|
self.assertEqual('42', args['bar'].get_str())
|
||||||
|
|
||||||
|
def test_parse_post_urlencoded_encoding_fail(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
parse_args('/',
|
||||||
|
postbody=b'foo=42&bar=%80&baz=Hello,%20World!',
|
||||||
|
enctype='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
def test_parse_post_multipart_no_args(self):
|
||||||
|
path, args = parse_args('/',
|
||||||
|
postbody=b'--testBoundary1337--\r\n',
|
||||||
|
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||||
|
self.assertEqual('/', path)
|
||||||
|
self.assertEqual(0, len(args))
|
||||||
|
|
||||||
|
def test_parse_post_multipart(self):
|
||||||
|
path, args = parse_args('/',
|
||||||
|
postbody=b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="foo"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'42\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
|
||||||
|
b'Content-Type: application/octet-stream\r\n\r\n'
|
||||||
|
b'1337\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="baz"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'Hello, World!\r\n'
|
||||||
|
b'--testBoundary1337--\r\n',
|
||||||
|
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||||
|
self.assertEqual('/', path)
|
||||||
|
self.assertEqual(3, len(args))
|
||||||
|
self.assertIn('foo', args.keys())
|
||||||
|
self.assertIn('bar', args.keys())
|
||||||
|
self.assertIn('baz', args.keys())
|
||||||
|
self.assertTrue(args['foo'].is_scalar)
|
||||||
|
self.assertTrue(args['bar'].is_scalar)
|
||||||
|
self.assertTrue(args['baz'].is_scalar)
|
||||||
|
self.assertEqual('text/plain', args['foo'].get_content_type())
|
||||||
|
self.assertEqual('application/octet-stream', args['bar'].get_content_type())
|
||||||
|
self.assertEqual('text/plain', args['baz'].get_content_type())
|
||||||
|
self.assertEqual('42', args['foo'].get_str())
|
||||||
|
self.assertEqual(b'1337', args['bar'].get_bytes())
|
||||||
|
self.assertEqual('Hello, World!', args['baz'].get_str())
|
||||||
|
|
||||||
|
def test_parse_post_multipart_zero_arg(self):
|
||||||
|
path, args = parse_args('/abc/def',
|
||||||
|
postbody=b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="foo"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="bar"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'42\r\n'
|
||||||
|
b'--testBoundary1337--\r\n',
|
||||||
|
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||||
|
self.assertEqual(2, len(args))
|
||||||
|
self.assertIn('foo', args.keys())
|
||||||
|
self.assertIn('bar', args.keys())
|
||||||
|
self.assertTrue(args['foo'].is_scalar)
|
||||||
|
self.assertTrue(args['bar'].is_scalar)
|
||||||
|
self.assertEqual(1, len(args['foo']))
|
||||||
|
self.assertEqual('', args['foo'].get_str())
|
||||||
|
self.assertEqual('42', args['bar'].get_str())
|
||||||
|
|
||||||
|
def test_parse_post_multipart_broken_boundaries(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
# Boundary not defined in Content-Type
|
||||||
|
parse_args('/',
|
||||||
|
postbody=b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="foo"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'42\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
|
||||||
|
b'Content-Type: application/octet-stream\r\n\r\n'
|
||||||
|
b'1337\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="baz"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'Hello, World!\r\n'
|
||||||
|
b'--testBoundary1337--\r\n',
|
||||||
|
enctype='multipart/form-data')
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
# Corrupted "--" head at first boundary
|
||||||
|
parse_args('/',
|
||||||
|
postbody=b'-+testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="foo"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'42\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
|
||||||
|
b'Content-Type: application/octet-stream\r\n\r\n'
|
||||||
|
b'1337\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="baz"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'Hello, World!\r\n'
|
||||||
|
b'--testBoundary1337--\r\n',
|
||||||
|
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
# Missing "--" tail at end boundary
|
||||||
|
parse_args('/',
|
||||||
|
postbody=b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="foo"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'42\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
|
||||||
|
b'Content-Type: application/octet-stream\r\n\r\n'
|
||||||
|
b'1337\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="baz"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'Hello, World!\r\n'
|
||||||
|
b'--testBoundary1337\r\n',
|
||||||
|
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
# Missing Content-Type header in one part
|
||||||
|
parse_args('/',
|
||||||
|
postbody=b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="foo"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'42\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="bar"; filename="bar.bin"\r\n'
|
||||||
|
b'Content-Type: application/octet-stream\r\n\r\n'
|
||||||
|
b'1337\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="baz"\r\n\r\n'
|
||||||
|
b'Hello, World!\r\n'
|
||||||
|
b'--testBoundary1337--\r\n',
|
||||||
|
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
# Missing Content-Disposition header in one part
|
||||||
|
parse_args('/',
|
||||||
|
postbody=b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="foo"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'42\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Type: application/octet-stream\r\n\r\n'
|
||||||
|
b'1337\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="baz"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'Hello, World!\r\n'
|
||||||
|
b'--testBoundary1337--\r\n',
|
||||||
|
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
# Missing form-data name argument
|
||||||
|
parse_args('/',
|
||||||
|
postbody=b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="foo"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'42\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; filename="bar.bin"\r\n'
|
||||||
|
b'Content-Type: application/octet-stream\r\n\r\n'
|
||||||
|
b'1337\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="baz"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'Hello, World!\r\n'
|
||||||
|
b'--testBoundary1337--\r\n',
|
||||||
|
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
# Unknown Content-Disposition
|
||||||
|
parse_args('/',
|
||||||
|
postbody=b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="foo"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'42\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: attachment; name="bar"; filename="bar.bin"\r\n'
|
||||||
|
b'Content-Type: application/octet-stream\r\n\r\n'
|
||||||
|
b'1337\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="baz"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'Hello, World!\r\n'
|
||||||
|
b'--testBoundary1337--\r\n',
|
||||||
|
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||||
|
|
||||||
|
def test_get_post_precedence_urlencoded(self):
|
||||||
|
path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived',
|
||||||
|
postbody=b'foo=42&foo=1337&baz=Hello,%20World!',
|
||||||
|
enctype='application/x-www-form-urlencoded')
|
||||||
|
self.assertIn('foo', args)
|
||||||
|
self.assertIn('bar', args)
|
||||||
|
self.assertIn('baz', args)
|
||||||
|
self.assertEqual(2, len(args['foo']))
|
||||||
|
self.assertEqual(1, len(args['bar']))
|
||||||
|
self.assertEqual(1, len(args['baz']))
|
||||||
|
self.assertEqual('42', args['foo'].get_str(0))
|
||||||
|
self.assertEqual('1337', args['foo'].get_str(1))
|
||||||
|
self.assertEqual('isurvived', args['bar'].get_str())
|
||||||
|
self.assertEqual('Hello, World!', args['baz'].get_str())
|
||||||
|
|
||||||
|
def test_get_post_precedence_multipart(self):
|
||||||
|
path, args = parse_args('/foo?foo=thisshouldnotbethere&bar=isurvived',
|
||||||
|
postbody=b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="foo"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'42\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="foo"; filename="bar.bin"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'1337\r\n'
|
||||||
|
b'--testBoundary1337\r\n'
|
||||||
|
b'Content-Disposition: form-data; name="baz"\r\n'
|
||||||
|
b'Content-Type: text/plain\r\n\r\n'
|
||||||
|
b'Hello, World!\r\n'
|
||||||
|
b'--testBoundary1337--\r\n',
|
||||||
|
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||||
|
self.assertIn('foo', args)
|
||||||
|
self.assertIn('bar', args)
|
||||||
|
self.assertIn('baz', args)
|
||||||
|
self.assertEqual(2, len(args['foo']))
|
||||||
|
self.assertEqual(1, len(args['bar']))
|
||||||
|
self.assertEqual(1, len(args['baz']))
|
||||||
|
self.assertEqual('42', args['foo'].get_str(0))
|
||||||
|
self.assertEqual('1337', args['foo'].get_str(1))
|
||||||
|
self.assertEqual('isurvived', args['bar'].get_str())
|
||||||
|
self.assertEqual('Hello, World!', args['baz'].get_str())
|
|
@ -1,7 +1,7 @@
|
||||||
|
|
||||||
from typing import Any, Dict, List, Tuple, Union
|
from typing import Any, Dict, List, Tuple, Union
|
||||||
|
|
||||||
from matemat.webserver.httpd import HttpHandler
|
from matemat.webserver import HttpHandler, RequestArgument
|
||||||
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
|
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
|
||||||
|
|
||||||
import codecs
|
import codecs
|
||||||
|
@ -10,7 +10,7 @@ import codecs
|
||||||
@test_pagelet('/just/testing/post')
|
@test_pagelet('/just/testing/post')
|
||||||
def post_test_pagelet(method: str,
|
def post_test_pagelet(method: str,
|
||||||
path: str,
|
path: str,
|
||||||
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]],
|
args: Dict[str, RequestArgument],
|
||||||
session_vars: Dict[str, Any],
|
session_vars: Dict[str, Any],
|
||||||
headers: Dict[str, str]):
|
headers: Dict[str, str]):
|
||||||
"""
|
"""
|
||||||
|
@ -18,13 +18,12 @@ def post_test_pagelet(method: str,
|
||||||
"""
|
"""
|
||||||
headers['Content-Type'] = 'text/plain'
|
headers['Content-Type'] = 'text/plain'
|
||||||
dump: str = ''
|
dump: str = ''
|
||||||
for k, (t, v) in args.items():
|
for k, ra in args.items():
|
||||||
if t.startswith('text/'):
|
for a in ra:
|
||||||
if isinstance(v, bytes):
|
if a.get_content_type().startswith('text/'):
|
||||||
v = v.decode('utf-8')
|
dump += f'{k}: {a.get_str()}\n'
|
||||||
dump += f'{k}: {",".join(v) if isinstance(v, list) else v}\n'
|
|
||||||
else:
|
else:
|
||||||
dump += f'{k}: {codecs.encode(v, "hex").decode("utf-8")}\n'
|
dump += f'{k}: {codecs.encode(a.get_bytes(), "hex").decode("utf-8")}\n'
|
||||||
return 200, dump
|
return 200, dump
|
||||||
|
|
||||||
|
|
||||||
|
@ -118,10 +117,14 @@ class TestPost(AbstractHttpdTest):
|
||||||
kv: Dict[str, str] = dict()
|
kv: Dict[str, str] = dict()
|
||||||
for l in lines:
|
for l in lines:
|
||||||
k, v = l.decode('utf-8').split(':', 1)
|
k, v = l.decode('utf-8').split(':', 1)
|
||||||
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
|
k = k.strip()
|
||||||
|
v = v.strip()
|
||||||
|
if k in kv:
|
||||||
|
kv[k] += f',{v}'
|
||||||
|
else:
|
||||||
|
kv[k] = v
|
||||||
# Make sure the arguments were properly parsed
|
# Make sure the arguments were properly parsed
|
||||||
self.assertListEqual(['bar', 'baz'], kv['foo'])
|
self.assertEqual('bar,baz', kv['foo'])
|
||||||
self.assertEqual('1', kv['test'])
|
self.assertEqual('1', kv['test'])
|
||||||
|
|
||||||
def test_post_urlenc_post_array(self):
|
def test_post_urlenc_post_array(self):
|
||||||
|
@ -141,10 +144,14 @@ class TestPost(AbstractHttpdTest):
|
||||||
kv: Dict[str, str] = dict()
|
kv: Dict[str, str] = dict()
|
||||||
for l in lines:
|
for l in lines:
|
||||||
k, v = l.decode('utf-8').split(':', 1)
|
k, v = l.decode('utf-8').split(':', 1)
|
||||||
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
|
k = k.strip()
|
||||||
|
v = v.strip()
|
||||||
|
if k in kv:
|
||||||
|
kv[k] += f',{v}'
|
||||||
|
else:
|
||||||
|
kv[k] = v
|
||||||
# Make sure the arguments were properly parsed
|
# Make sure the arguments were properly parsed
|
||||||
self.assertListEqual(['bar', 'baz'], kv['foo'])
|
self.assertEqual('bar,baz', kv['foo'])
|
||||||
self.assertEqual('1', kv['test'])
|
self.assertEqual('1', kv['test'])
|
||||||
|
|
||||||
def test_post_urlenc_mixed_array(self):
|
def test_post_urlenc_mixed_array(self):
|
||||||
|
@ -164,12 +171,16 @@ class TestPost(AbstractHttpdTest):
|
||||||
kv: Dict[str, str] = dict()
|
kv: Dict[str, str] = dict()
|
||||||
for l in lines:
|
for l in lines:
|
||||||
k, v = l.decode('utf-8').split(':', 1)
|
k, v = l.decode('utf-8').split(':', 1)
|
||||||
kv[k.strip()] = v.strip() if ',' not in v else v.strip().split(',')
|
k = k.strip()
|
||||||
|
v = v.strip()
|
||||||
|
if k in kv:
|
||||||
|
kv[k] += f',{v}'
|
||||||
|
else:
|
||||||
|
kv[k] = v
|
||||||
# Make sure the arguments were properly parsed
|
# Make sure the arguments were properly parsed
|
||||||
self.assertListEqual(['postbar', 'postbaz'], kv['foo'])
|
self.assertEqual('postbar,postbaz', kv['foo'])
|
||||||
self.assertListEqual(['1', '42'], kv['gettest'])
|
self.assertEqual('1,42', kv['gettest'])
|
||||||
self.assertListEqual(['1', '2'], kv['posttest'])
|
self.assertEqual('1,2', kv['posttest'])
|
||||||
|
|
||||||
def test_post_no_body(self):
|
def test_post_no_body(self):
|
||||||
"""
|
"""
|
||||||
|
@ -184,7 +195,7 @@ class TestPost(AbstractHttpdTest):
|
||||||
|
|
||||||
def test_post_multipart_post_only(self):
|
def test_post_multipart_post_only(self):
|
||||||
"""
|
"""
|
||||||
Test a POST request with a miltipart/form-data body.
|
Test a POST request with a miutipart/form-data body.
|
||||||
"""
|
"""
|
||||||
# Send POST request
|
# Send POST request
|
||||||
formdata = (b'------testboundary\r\n'
|
formdata = (b'------testboundary\r\n'
|
||||||
|
@ -211,34 +222,3 @@ class TestPost(AbstractHttpdTest):
|
||||||
self.assertIn('bar', kv)
|
self.assertIn('bar', kv)
|
||||||
self.assertEqual(kv['foo'], b'Hello, World!')
|
self.assertEqual(kv['foo'], b'Hello, World!')
|
||||||
self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f')
|
self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f')
|
||||||
|
|
||||||
def test_post_multipart_mixed(self):
|
|
||||||
"""
|
|
||||||
Test a POST request with a miltipart/form-data body.
|
|
||||||
"""
|
|
||||||
# Send POST request
|
|
||||||
formdata = (b'------testboundary\r\n'
|
|
||||||
b'Content-Disposition: form-data; name="foo"\r\n'
|
|
||||||
b'Content-Type: text/plain\r\n\r\n'
|
|
||||||
b'Hello, World!\r\n'
|
|
||||||
b'------testboundary\r\n'
|
|
||||||
b'Content-Disposition: form-data; name="bar"; filename="foo.bar"\r\n'
|
|
||||||
b'Content-Type: application/octet-stream\r\n\r\n'
|
|
||||||
b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x80\x0b\x0c\x73\x0e\x0f\r\n'
|
|
||||||
b'------testboundary--\r\n')
|
|
||||||
|
|
||||||
self.client_sock.set_request(f'POST /just/testing/post?getfoo=bar&foo=thisshouldbegone HTTP/1.1\r\n'
|
|
||||||
f'Content-Type: multipart/form-data; boundary=----testboundary\r\n'
|
|
||||||
f'Content-Length: {len(formdata)}\r\n\r\n'.encode('utf-8') + formdata)
|
|
||||||
HttpHandler(self.client_sock, ('::1', 45678), self.server)
|
|
||||||
packet = self.client_sock.get_response()
|
|
||||||
lines: List[bytes] = packet.body.split(b'\n')[:-1]
|
|
||||||
kv: Dict[str, Any] = dict()
|
|
||||||
for l in lines:
|
|
||||||
k, v = l.split(b':', 1)
|
|
||||||
kv[k.decode('utf-8').strip()] = v.strip()
|
|
||||||
self.assertIn('foo', kv)
|
|
||||||
self.assertIn('bar', kv)
|
|
||||||
self.assertEqual(kv['getfoo'], b'bar')
|
|
||||||
self.assertEqual(kv['foo'], b'Hello, World!')
|
|
||||||
self.assertEqual(kv['bar'], b'00010203040506070809800b0c730e0f')
|
|
||||||
|
|
204
matemat/webserver/test/test_requestargs.py
Normal file
204
matemat/webserver/test/test_requestargs.py
Normal file
|
@ -0,0 +1,204 @@
|
||||||
|
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from matemat.webserver import RequestArgument
|
||||||
|
# noinspection PyProtectedMember
|
||||||
|
from matemat.webserver.requestargs import _View
|
||||||
|
|
||||||
|
|
||||||
|
class TestRequestArguments(unittest.TestCase):
|
||||||
|
|
||||||
|
def test_create_default(self):
|
||||||
|
ra = RequestArgument('foo')
|
||||||
|
self.assertEqual('foo', ra.name)
|
||||||
|
self.assertEqual(0, len(ra))
|
||||||
|
self.assertFalse(ra.is_scalar)
|
||||||
|
self.assertTrue(ra.is_array)
|
||||||
|
self.assertFalse(ra.is_view)
|
||||||
|
|
||||||
|
def test_create_str_scalar(self):
|
||||||
|
ra = RequestArgument('foo', ('text/plain', 'bar'))
|
||||||
|
self.assertEqual('foo', ra.name)
|
||||||
|
self.assertEqual(1, len(ra))
|
||||||
|
self.assertTrue(ra.is_scalar)
|
||||||
|
self.assertFalse(ra.is_array)
|
||||||
|
self.assertEqual('bar', ra.get_str())
|
||||||
|
self.assertEqual(b'bar', ra.get_bytes())
|
||||||
|
self.assertEqual('text/plain', ra.get_content_type())
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.assertEqual('bar', ra.get_str(0))
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.assertEqual('bar', ra.get_bytes(0))
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.assertEqual('bar', ra.get_content_type(0))
|
||||||
|
self.assertFalse(ra.is_view)
|
||||||
|
|
||||||
|
def test_create_str_scalar_array(self):
|
||||||
|
ra = RequestArgument('foo', [('text/plain', 'bar')])
|
||||||
|
self.assertEqual('foo', ra.name)
|
||||||
|
self.assertEqual(1, len(ra))
|
||||||
|
self.assertTrue(ra.is_scalar)
|
||||||
|
self.assertFalse(ra.is_array)
|
||||||
|
self.assertEqual('bar', ra.get_str())
|
||||||
|
self.assertEqual(b'bar', ra.get_bytes())
|
||||||
|
self.assertEqual('text/plain', ra.get_content_type())
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.assertEqual('bar', ra.get_str(0))
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.assertEqual('bar', ra.get_bytes(0))
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.assertEqual('bar', ra.get_content_type(0))
|
||||||
|
self.assertFalse(ra.is_view)
|
||||||
|
|
||||||
|
def test_create_bytes_scalar(self):
|
||||||
|
ra = RequestArgument('foo', ('application/octet-stream', b'\x00\x80\xff\xfe'))
|
||||||
|
self.assertEqual('foo', ra.name)
|
||||||
|
self.assertEqual(1, len(ra))
|
||||||
|
self.assertTrue(ra.is_scalar)
|
||||||
|
self.assertFalse(ra.is_array)
|
||||||
|
with self.assertRaises(UnicodeDecodeError):
|
||||||
|
ra.get_str()
|
||||||
|
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes())
|
||||||
|
self.assertEqual('application/octet-stream', ra.get_content_type())
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.assertEqual('bar', ra.get_str(0))
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.assertEqual('bar', ra.get_bytes(0))
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.assertEqual('bar', ra.get_content_type(0))
|
||||||
|
self.assertFalse(ra.is_view)
|
||||||
|
|
||||||
|
def test_create_array(self):
|
||||||
|
ra = RequestArgument('foo', [
|
||||||
|
('text/plain', 'bar'),
|
||||||
|
('application/octet-stream', b'\x00\x80\xff\xfe')
|
||||||
|
])
|
||||||
|
self.assertEqual('foo', ra.name)
|
||||||
|
self.assertEqual(2, len(ra))
|
||||||
|
self.assertFalse(ra.is_scalar)
|
||||||
|
self.assertTrue(ra.is_array)
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
ra.get_str()
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
ra.get_bytes()
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
ra.get_content_type()
|
||||||
|
self.assertEqual('bar', ra.get_str(0))
|
||||||
|
self.assertEqual(b'bar', ra.get_bytes(0))
|
||||||
|
self.assertEqual('text/plain', ra.get_content_type(0))
|
||||||
|
with self.assertRaises(UnicodeDecodeError):
|
||||||
|
ra.get_str(1)
|
||||||
|
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1))
|
||||||
|
self.assertEqual('application/octet-stream', ra.get_content_type(1))
|
||||||
|
self.assertFalse(ra.is_view)
|
||||||
|
|
||||||
|
def test_append_empty_str(self):
|
||||||
|
ra = RequestArgument('foo')
|
||||||
|
self.assertEqual(0, len(ra))
|
||||||
|
self.assertFalse(ra.is_scalar)
|
||||||
|
|
||||||
|
ra.append('text/plain', 'bar')
|
||||||
|
self.assertEqual(1, len(ra))
|
||||||
|
self.assertTrue(ra.is_scalar)
|
||||||
|
self.assertEqual('bar', ra.get_str())
|
||||||
|
self.assertEqual(b'bar', ra.get_bytes())
|
||||||
|
self.assertEqual('text/plain', ra.get_content_type())
|
||||||
|
self.assertFalse(ra.is_view)
|
||||||
|
|
||||||
|
def test_append_empty_bytes(self):
|
||||||
|
ra = RequestArgument('foo')
|
||||||
|
self.assertEqual(0, len(ra))
|
||||||
|
self.assertFalse(ra.is_scalar)
|
||||||
|
|
||||||
|
ra.append('application/octet-stream', b'\x00\x80\xff\xfe')
|
||||||
|
self.assertEqual(1, len(ra))
|
||||||
|
self.assertTrue(ra.is_scalar)
|
||||||
|
with self.assertRaises(UnicodeDecodeError):
|
||||||
|
ra.get_str()
|
||||||
|
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes())
|
||||||
|
self.assertEqual('application/octet-stream', ra.get_content_type())
|
||||||
|
self.assertFalse(ra.is_view)
|
||||||
|
|
||||||
|
def test_append_multiple(self):
|
||||||
|
ra = RequestArgument('foo')
|
||||||
|
self.assertEqual(0, len(ra))
|
||||||
|
self.assertFalse(ra.is_scalar)
|
||||||
|
|
||||||
|
ra.append('text/plain', 'bar')
|
||||||
|
self.assertEqual(1, len(ra))
|
||||||
|
self.assertTrue(ra.is_scalar)
|
||||||
|
|
||||||
|
ra.append('application/octet-stream', b'\x00\x80\xff\xfe')
|
||||||
|
self.assertEqual(2, len(ra))
|
||||||
|
self.assertFalse(ra.is_scalar)
|
||||||
|
|
||||||
|
ra.append('text/plain', 'Hello, World!')
|
||||||
|
self.assertEqual(3, len(ra))
|
||||||
|
self.assertFalse(ra.is_scalar)
|
||||||
|
|
||||||
|
def test_iterate_empty(self):
|
||||||
|
ra = RequestArgument('foo')
|
||||||
|
self.assertEqual(0, len(ra))
|
||||||
|
for _ in ra:
|
||||||
|
self.fail()
|
||||||
|
|
||||||
|
def test_iterate_scalar(self):
|
||||||
|
ra = RequestArgument('foo', ('text/plain', 'bar'))
|
||||||
|
self.assertTrue(ra.is_scalar)
|
||||||
|
count: int = 0
|
||||||
|
for it in ra:
|
||||||
|
self.assertIsInstance(it, _View)
|
||||||
|
self.assertEqual('foo', it.name)
|
||||||
|
self.assertTrue(it.is_view)
|
||||||
|
self.assertTrue(it.is_scalar)
|
||||||
|
count += 1
|
||||||
|
self.assertEqual(1, count)
|
||||||
|
|
||||||
|
def test_iterate_array(self):
|
||||||
|
ra = RequestArgument('foo', [('text/plain', 'bar'), ('abc', b'def'), ('xyz', '1337')])
|
||||||
|
self.assertFalse(ra.is_scalar)
|
||||||
|
items: List[str] = list()
|
||||||
|
for it in ra:
|
||||||
|
self.assertIsInstance(it, _View)
|
||||||
|
self.assertTrue(it.is_view)
|
||||||
|
self.assertTrue(it.is_scalar)
|
||||||
|
items.append(it.get_content_type())
|
||||||
|
self.assertEqual(['text/plain', 'abc', 'xyz'], items)
|
||||||
|
|
||||||
|
def test_iterate_sliced(self):
|
||||||
|
ra = RequestArgument('foo', [('a', 'b'), ('c', 'd'), ('e', 'f'), ('g', 'h'), ('i', 'j'), ('k', 'l')])
|
||||||
|
self.assertFalse(ra.is_scalar)
|
||||||
|
items: List[str] = list()
|
||||||
|
for it in ra[1:4:2]:
|
||||||
|
self.assertIsInstance(it, _View)
|
||||||
|
self.assertTrue(it.is_view)
|
||||||
|
self.assertTrue(it.is_scalar)
|
||||||
|
items.append(it.get_content_type())
|
||||||
|
self.assertEqual(['c', 'g'], items)
|
||||||
|
|
||||||
|
def test_index_scalar(self):
|
||||||
|
ra = RequestArgument('foo', ('bar', 'baz'))
|
||||||
|
it = ra[0]
|
||||||
|
self.assertIsInstance(it, _View)
|
||||||
|
self.assertEqual('foo', it.name)
|
||||||
|
self.assertEqual('bar', it.get_content_type())
|
||||||
|
self.assertEqual('baz', it.get_str())
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
_ = ra[1]
|
||||||
|
|
||||||
|
def test_index_array(self):
|
||||||
|
ra = RequestArgument('foo', [('a', 'b'), ('c', 'd')])
|
||||||
|
it = ra[1]
|
||||||
|
self.assertIsInstance(it, _View)
|
||||||
|
self.assertEqual('foo', it.name)
|
||||||
|
self.assertEqual('c', it.get_content_type())
|
||||||
|
self.assertEqual('d', it.get_str())
|
||||||
|
|
||||||
|
def test_view_immutable(self):
|
||||||
|
ra = RequestArgument('foo', ('bar', 'baz'))
|
||||||
|
it = ra[0]
|
||||||
|
self.assertIsInstance(it, _View)
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
it.append('foo', 'bar')
|
|
@ -3,14 +3,14 @@ from typing import Any, Dict, Union
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import os.path
|
import os.path
|
||||||
from matemat.webserver.httpd import HttpHandler
|
from matemat.webserver import HttpHandler, RequestArgument
|
||||||
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
|
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
|
||||||
|
|
||||||
|
|
||||||
@test_pagelet('/just/testing/serve_pagelet_ok')
|
@test_pagelet('/just/testing/serve_pagelet_ok')
|
||||||
def serve_test_pagelet_ok(method: str,
|
def serve_test_pagelet_ok(method: str,
|
||||||
path: str,
|
path: str,
|
||||||
args: Dict[str, Union[bytes, str]],
|
args: Dict[str, RequestArgument],
|
||||||
session_vars: Dict[str, Any],
|
session_vars: Dict[str, Any],
|
||||||
headers: Dict[str, str]):
|
headers: Dict[str, str]):
|
||||||
headers['Content-Type'] = 'text/plain'
|
headers['Content-Type'] = 'text/plain'
|
||||||
|
@ -20,7 +20,7 @@ def serve_test_pagelet_ok(method: str,
|
||||||
@test_pagelet('/just/testing/serve_pagelet_fail')
|
@test_pagelet('/just/testing/serve_pagelet_fail')
|
||||||
def serve_test_pagelet_fail(method: str,
|
def serve_test_pagelet_fail(method: str,
|
||||||
path: str,
|
path: str,
|
||||||
args: Dict[str, Union[bytes, str]],
|
args: Dict[str, RequestArgument],
|
||||||
session_vars: Dict[str, Any],
|
session_vars: Dict[str, Any],
|
||||||
headers: Dict[str, str]):
|
headers: Dict[str, str]):
|
||||||
session_vars['test'] = 'hello, world!'
|
session_vars['test'] = 'hello, world!'
|
||||||
|
|
|
@ -4,14 +4,14 @@ from typing import Any, Dict, Union
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
from matemat.webserver.httpd import HttpHandler
|
from matemat.webserver import HttpHandler, RequestArgument
|
||||||
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
|
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
|
||||||
|
|
||||||
|
|
||||||
@test_pagelet('/just/testing/sessions')
|
@test_pagelet('/just/testing/sessions')
|
||||||
def session_test_pagelet(method: str,
|
def session_test_pagelet(method: str,
|
||||||
path: str,
|
path: str,
|
||||||
args: Dict[str, Union[bytes, str]],
|
args: Dict[str, RequestArgument],
|
||||||
session_vars: Dict[str, Any],
|
session_vars: Dict[str, Any],
|
||||||
headers: Dict[str, str]):
|
headers: Dict[str, str]):
|
||||||
session_vars['test'] = 'hello, world!'
|
session_vars['test'] = 'hello, world!'
|
||||||
|
|
|
@ -3,8 +3,10 @@ from typing import Dict, List, Tuple, Optional, Union
|
||||||
|
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
|
|
||||||
|
from matemat.webserver import RequestArgument
|
||||||
|
|
||||||
def _parse_multipart(body: bytes, boundary: str) -> Dict[str, List[Tuple[str, Union[bytes, str]]]]:
|
|
||||||
|
def _parse_multipart(body: bytes, boundary: str) -> List[RequestArgument]:
|
||||||
"""
|
"""
|
||||||
Given a HTTP body with form-data in multipart form, and the multipart-boundary, parse the multipart items and
|
Given a HTTP body with form-data in multipart form, and the multipart-boundary, parse the multipart items and
|
||||||
return them as a dictionary.
|
return them as a dictionary.
|
||||||
|
@ -13,6 +15,8 @@ def _parse_multipart(body: bytes, boundary: str) -> Dict[str, List[Tuple[str, Un
|
||||||
:param boundary: The multipart boundary.
|
:param boundary: The multipart boundary.
|
||||||
:return: A dictionary of field names as key, and content types and field values as value.
|
:return: A dictionary of field names as key, and content types and field values as value.
|
||||||
"""
|
"""
|
||||||
|
# Prepend a CRLF for the first boundary to match
|
||||||
|
body = b'\r\n' + body
|
||||||
# Generate item header boundary and terminating boundary from general boundary string
|
# Generate item header boundary and terminating boundary from general boundary string
|
||||||
_boundary = f'\r\n--{boundary}\r\n'.encode('utf-8')
|
_boundary = f'\r\n--{boundary}\r\n'.encode('utf-8')
|
||||||
_end_boundary = f'\r\n--{boundary}--\r\n'.encode('utf-8')
|
_end_boundary = f'\r\n--{boundary}--\r\n'.encode('utf-8')
|
||||||
|
@ -20,16 +24,15 @@ def _parse_multipart(body: bytes, boundary: str) -> Dict[str, List[Tuple[str, Un
|
||||||
allparts = body.split(_end_boundary, 1)
|
allparts = body.split(_end_boundary, 1)
|
||||||
if len(allparts) != 2 or allparts[1] != b'':
|
if len(allparts) != 2 or allparts[1] != b'':
|
||||||
raise ValueError('Last boundary missing or corrupted')
|
raise ValueError('Last boundary missing or corrupted')
|
||||||
# Split remaining body into its parts (appending a CRLF for the first boundary to match), and verify at least 1 part
|
# Split remaining body into its parts, and verify at least 1 part is there
|
||||||
# is there
|
parts: List[bytes] = (allparts[0]).split(_boundary)
|
||||||
parts: List[bytes] = (b'\r\n' + allparts[0]).split(_boundary)
|
|
||||||
if len(parts) < 1 or parts[0] != b'':
|
if len(parts) < 1 or parts[0] != b'':
|
||||||
raise ValueError('First boundary missing or corrupted')
|
raise ValueError('First boundary missing or corrupted')
|
||||||
# Remove the first, empty part
|
# Remove the first, empty part
|
||||||
parts = parts[1:]
|
parts = parts[1:]
|
||||||
|
|
||||||
# Results go into this dict
|
# Results go into this dict
|
||||||
args: Dict[str, List[Tuple[str, Union[bytes, str]]]] = dict()
|
args: Dict[str, RequestArgument] = dict()
|
||||||
|
|
||||||
# Parse each multipart part
|
# Parse each multipart part
|
||||||
for part in parts:
|
for part in parts:
|
||||||
|
@ -50,25 +53,29 @@ def _parse_multipart(body: bytes, boundary: str) -> Dict[str, List[Tuple[str, Un
|
||||||
cd, *cdargs = hdr['Content-Disposition'].split(';')
|
cd, *cdargs = hdr['Content-Disposition'].split(';')
|
||||||
# Content-Disposition MUST be form-data; everything else is rejected
|
# Content-Disposition MUST be form-data; everything else is rejected
|
||||||
if cd.strip() != 'form-data':
|
if cd.strip() != 'form-data':
|
||||||
raise ValueError(f'Unknown Content-Disposition: cd')
|
raise ValueError(f'Unknown Content-Disposition: {cd}')
|
||||||
# Extract the "name" header argument
|
# Extract the "name" header argument
|
||||||
|
has_name = False
|
||||||
for cdarg in cdargs:
|
for cdarg in cdargs:
|
||||||
k, v = cdarg.split('=', 1)
|
k, v = cdarg.split('=', 1)
|
||||||
if k.strip() == 'name':
|
if k.strip() == 'name':
|
||||||
|
has_name = True
|
||||||
name: str = v.strip()
|
name: str = v.strip()
|
||||||
# Remove quotation marks around the name value
|
# Remove quotation marks around the name value
|
||||||
if name.startswith('"') and name.endswith('"'):
|
if name.startswith('"') and name.endswith('"'):
|
||||||
name = v[1:-1]
|
name = v[1:-1]
|
||||||
# Add the Content-Type and the content to the header, with the provided name
|
# Add the Content-Type and the content to the header, with the provided name
|
||||||
if name not in args:
|
if name not in args:
|
||||||
args[name] = list()
|
args[name] = RequestArgument(name)
|
||||||
args[name].append((hdr['Content-Type'].strip(), part))
|
args[name].append(hdr['Content-Type'].strip(), part)
|
||||||
|
if not has_name:
|
||||||
|
raise ValueError('mutlipart/form-data part without name attribute')
|
||||||
|
|
||||||
return args
|
return list(args.values())
|
||||||
|
|
||||||
|
|
||||||
def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 'text/plain') \
|
def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 'text/plain') \
|
||||||
-> Tuple[str, Dict[str, Tuple[str, Union[bytes, str, List[str]]]]]:
|
-> Tuple[str, Dict[str, RequestArgument]]:
|
||||||
"""
|
"""
|
||||||
Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded or
|
Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded or
|
||||||
multipart/form-data form, parse the arguments and return them as a dictionary.
|
multipart/form-data form, parse the arguments and return them as a dictionary.
|
||||||
|
@ -85,34 +92,41 @@ def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 't
|
||||||
# Parse the request "URL" (i.e. only the path)
|
# Parse the request "URL" (i.e. only the path)
|
||||||
tokens = urllib.parse.urlparse(request)
|
tokens = urllib.parse.urlparse(request)
|
||||||
# Parse the GET arguments
|
# Parse the GET arguments
|
||||||
getargs = urllib.parse.parse_qs(tokens.query)
|
if len(tokens.query) == 0:
|
||||||
|
getargs = dict()
|
||||||
|
else:
|
||||||
|
getargs = urllib.parse.parse_qs(tokens.query, strict_parsing=True, keep_blank_values=True, errors='strict')
|
||||||
|
|
||||||
# TODO: { 'foo': [ ('text/plain', 'bar'), ('application/octet-stream', '\x80') ] }
|
args: Dict[str, RequestArgument] = dict()
|
||||||
# TODO: Use a @dataclass once Python 3.7 is out
|
|
||||||
args: Dict[str, Tuple[str, Union[bytes, str, List[str]]]] = dict()
|
|
||||||
for k, v in getargs.items():
|
for k, v in getargs.items():
|
||||||
args[k] = 'text/plain', v
|
args[k] = RequestArgument(k)
|
||||||
|
for _v in v:
|
||||||
|
args[k].append('text/plain', _v)
|
||||||
|
|
||||||
if postbody is not None:
|
if postbody is not None:
|
||||||
if enctype == 'application/x-www-form-urlencoded':
|
if enctype == 'application/x-www-form-urlencoded':
|
||||||
# Parse the POST body
|
# Parse the POST body
|
||||||
postargs = urllib.parse.parse_qs(postbody.decode('utf-8'))
|
pb: str = postbody.decode('utf-8')
|
||||||
|
if len(pb) == 0:
|
||||||
|
postargs = dict()
|
||||||
|
else:
|
||||||
|
postargs = urllib.parse.parse_qs(pb, strict_parsing=True, keep_blank_values=True, errors='strict')
|
||||||
# Write all POST values into the dict, overriding potential duplicates from GET
|
# Write all POST values into the dict, overriding potential duplicates from GET
|
||||||
for k, v in postargs.items():
|
for k, v in postargs.items():
|
||||||
args[k] = 'text/plain', v
|
args[k] = RequestArgument(k)
|
||||||
|
for _v in v:
|
||||||
|
args[k].append('text/plain', _v)
|
||||||
elif enctype.startswith('multipart/form-data'):
|
elif enctype.startswith('multipart/form-data'):
|
||||||
# Parse the multipart boundary from the Content-Type header
|
# Parse the multipart boundary from the Content-Type header
|
||||||
boundary: str = enctype.split('boundary=')[1]
|
try:
|
||||||
|
boundary: str = enctype.split('boundary=')[1].strip()
|
||||||
|
except IndexError:
|
||||||
|
raise ValueError('Multipart boundary in header not set or corrupted')
|
||||||
# Parse the multipart body
|
# Parse the multipart body
|
||||||
mpargs = _parse_multipart(postbody, boundary)
|
mpargs = _parse_multipart(postbody, boundary)
|
||||||
for k, v in mpargs.items():
|
for ra in mpargs:
|
||||||
# TODO: Process all values, not just the first
|
args[ra.name] = ra
|
||||||
args[k] = v[0]
|
|
||||||
else:
|
else:
|
||||||
raise ValueError(f'Unsupported Content-Type: {enctype}')
|
raise ValueError(f'Unsupported Content-Type: {enctype}')
|
||||||
# urllib.parse.parse_qs turns ALL arguments into arrays. This turns arrays of length 1 into scalar values
|
|
||||||
for (k, (ct, v)) in args.items():
|
|
||||||
if len(v) == 1:
|
|
||||||
args[k] = ct, v[0]
|
|
||||||
# Return the path and the parsed arguments
|
# Return the path and the parsed arguments
|
||||||
return tokens.path, args
|
return tokens.path, args
|
||||||
|
|
Loading…
Reference in a new issue