forked from s3lph/matemat
Using the new RequestArguments API throughout the project.
This commit is contained in:
parent
2f927cec41
commit
0fb60d1828
11 changed files with 73 additions and 75 deletions
|
@ -13,7 +13,7 @@ from uuid import uuid4
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from matemat import __version__ as matemat_version
|
||||
from matemat.webserver import RequestArgument
|
||||
from matemat.webserver import RequestArguments
|
||||
from matemat.webserver.util import parse_args
|
||||
|
||||
|
||||
|
@ -31,7 +31,7 @@ BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None
|
|||
# Dictionary to hold registered pagelet paths and their handler functions
|
||||
_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
|
||||
str, # Request path
|
||||
Dict[str, RequestArgument], # args: (name, argument)
|
||||
RequestArguments, # HTTP Request arguments
|
||||
Dict[str, Any], # Session vars
|
||||
Dict[str, str]], # Response headers
|
||||
Tuple[int, Union[bytes, str]]]] = dict() # Returns: (status code, response body)
|
||||
|
@ -51,7 +51,7 @@ def pagelet(path: str):
|
|||
|
||||
(method: str,
|
||||
path: str,
|
||||
args: Dict[str, RequestArgument],
|
||||
args: RequestArguments,
|
||||
session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str])
|
||||
-> (int, Optional[Union[str, bytes]])
|
||||
|
@ -69,7 +69,7 @@ def pagelet(path: str):
|
|||
|
||||
def http_handler(fun: Callable[[str,
|
||||
str,
|
||||
Dict[str, RequestArgument],
|
||||
RequestArguments,
|
||||
Dict[str, Any],
|
||||
Dict[str, str]],
|
||||
Tuple[int, Union[bytes, str]]]):
|
||||
|
@ -181,7 +181,7 @@ class HttpHandler(BaseHTTPRequestHandler):
|
|||
if session_id in self.server.session_vars:
|
||||
del self.server.session_vars[session_id]
|
||||
|
||||
def _handle(self, method: str, path: str, args: Dict[str, RequestArgument]) -> None:
|
||||
def _handle(self, method: str, path: str, args: RequestArguments) -> None:
|
||||
"""
|
||||
Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource.
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
from typing import Any, Dict, Optional, Tuple, Union
|
||||
|
||||
from matemat.exceptions import AuthenticationError
|
||||
from matemat.webserver import pagelet, RequestArgument
|
||||
from matemat.webserver import pagelet, RequestArguments
|
||||
from matemat.primitives import User
|
||||
from matemat.db import MatematDatabase
|
||||
|
||||
|
@ -10,7 +10,7 @@ from matemat.db import MatematDatabase
|
|||
@pagelet('/login')
|
||||
def login_page(method: str,
|
||||
path: str,
|
||||
args: Dict[str, RequestArgument],
|
||||
args: RequestArguments,
|
||||
session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str])\
|
||||
-> Tuple[int, Optional[Union[str, bytes]]]:
|
||||
|
@ -41,13 +41,11 @@ def login_page(method: str,
|
|||
</body>
|
||||
</html>
|
||||
'''
|
||||
return 200, data.format(msg=args['msg'] if 'msg' in args else '')
|
||||
return 200, data.format(msg=str(args.msg) if 'msg' in args else '')
|
||||
elif method == 'POST':
|
||||
username: RequestArgument = args['username']
|
||||
password: RequestArgument = args['password']
|
||||
with MatematDatabase('test.db') as db:
|
||||
try:
|
||||
user: User = db.login(username.get_str(), password.get_str())
|
||||
user: User = db.login(str(args.username), str(args.password))
|
||||
except AuthenticationError:
|
||||
headers['Location'] = '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.'
|
||||
return 301, bytes()
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from matemat.webserver import pagelet, RequestArgument
|
||||
from matemat.webserver import pagelet, RequestArguments
|
||||
|
||||
|
||||
@pagelet('/logout')
|
||||
def logout(method: str,
|
||||
path: str,
|
||||
args: Dict[str, RequestArgument],
|
||||
args: RequestArguments,
|
||||
session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str])\
|
||||
-> Tuple[int, Optional[Union[str, bytes]]]:
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
from typing import Any, Dict, Optional, Tuple, Union
|
||||
|
||||
from matemat.webserver import MatematWebserver, pagelet, RequestArgument
|
||||
from matemat.webserver import pagelet, RequestArguments
|
||||
from matemat.primitives import User
|
||||
from matemat.db import MatematDatabase
|
||||
|
||||
|
@ -9,7 +9,7 @@ from matemat.db import MatematDatabase
|
|||
@pagelet('/')
|
||||
def main_page(method: str,
|
||||
path: str,
|
||||
args: Dict[str, RequestArgument],
|
||||
args: RequestArguments,
|
||||
session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str])\
|
||||
-> Tuple[int, Optional[Union[str, bytes]]]:
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
from typing import Any, Dict, Optional, Tuple, Union
|
||||
|
||||
from matemat.exceptions import AuthenticationError
|
||||
from matemat.webserver import pagelet, RequestArgument
|
||||
from matemat.webserver import pagelet, RequestArguments
|
||||
from matemat.primitives import User
|
||||
from matemat.db import MatematDatabase
|
||||
|
||||
|
@ -10,7 +10,7 @@ from matemat.db import MatematDatabase
|
|||
@pagelet('/touchkey')
|
||||
def touchkey_page(method: str,
|
||||
path: str,
|
||||
args: Dict[str, RequestArgument],
|
||||
args: RequestArguments,
|
||||
session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str])\
|
||||
-> Tuple[int, Optional[Union[str, bytes]]]:
|
||||
|
@ -40,13 +40,11 @@ def touchkey_page(method: str,
|
|||
</body>
|
||||
</html>
|
||||
'''
|
||||
return 200, data.format(username=args['username'] if 'username' in args else '')
|
||||
return 200, data.format(username=str(args.username) if 'username' in args else '')
|
||||
elif method == 'POST':
|
||||
username: RequestArgument = args['username']
|
||||
touchkey: RequestArgument = args['touchkey']
|
||||
with MatematDatabase('test.db') as db:
|
||||
try:
|
||||
user: User = db.login(username.get_str(), touchkey=touchkey.get_str())
|
||||
user: User = db.login(str(args.username), touchkey=str(args.touchkey))
|
||||
except AuthenticationError:
|
||||
headers['Location'] = f'/touchkey?username={args["username"]}&msg=Please%20try%20again.'
|
||||
return 301, bytes()
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
from typing import Any, Callable, Dict, List, Tuple, Union
|
||||
from typing import Any, Callable, Dict, Tuple, Union
|
||||
|
||||
import unittest.mock
|
||||
from io import BytesIO
|
||||
|
@ -9,7 +9,7 @@ from abc import ABC
|
|||
from datetime import datetime
|
||||
from http.server import HTTPServer
|
||||
|
||||
from matemat.webserver import pagelet, RequestArgument
|
||||
from matemat.webserver import pagelet, RequestArguments
|
||||
|
||||
|
||||
class HttpResponse:
|
||||
|
@ -158,14 +158,14 @@ def test_pagelet(path: str):
|
|||
|
||||
def with_testing_headers(fun: Callable[[str,
|
||||
str,
|
||||
Dict[str, RequestArgument],
|
||||
RequestArguments,
|
||||
Dict[str, Any],
|
||||
Dict[str, str]],
|
||||
Tuple[int, Union[bytes, str]]]):
|
||||
@pagelet(path)
|
||||
def testing_wrapper(method: str,
|
||||
path: str,
|
||||
args: Dict[str, RequestArgument],
|
||||
args: RequestArguments,
|
||||
session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str]):
|
||||
status, body = fun(method, path, args, session_vars, headers)
|
||||
|
|
|
@ -20,9 +20,9 @@ class TestParseRequest(unittest.TestCase):
|
|||
path, args = parse_args('/?foo=42&bar=1337&baz=Hello,%20World!')
|
||||
self.assertEqual('/', path)
|
||||
self.assertEqual(3, len(args))
|
||||
self.assertIn('foo', args.keys())
|
||||
self.assertIn('bar', args.keys())
|
||||
self.assertIn('baz', args.keys())
|
||||
self.assertIn('foo', args)
|
||||
self.assertIn('bar', args)
|
||||
self.assertIn('baz', args)
|
||||
self.assertTrue(args['foo'].is_scalar)
|
||||
self.assertTrue(args['bar'].is_scalar)
|
||||
self.assertTrue(args['baz'].is_scalar)
|
||||
|
@ -37,9 +37,9 @@ class TestParseRequest(unittest.TestCase):
|
|||
path, args = parse_args('/abc/def?foo=42&bar=1337&baz=Hello,%20World!')
|
||||
self.assertEqual('/abc/def', path)
|
||||
self.assertEqual(3, len(args))
|
||||
self.assertIn('foo', args.keys())
|
||||
self.assertIn('bar', args.keys())
|
||||
self.assertIn('baz', args.keys())
|
||||
self.assertIn('foo', args)
|
||||
self.assertIn('bar', args)
|
||||
self.assertIn('baz', args)
|
||||
self.assertTrue(args['foo'].is_scalar)
|
||||
self.assertTrue(args['bar'].is_scalar)
|
||||
self.assertTrue(args['baz'].is_scalar)
|
||||
|
@ -54,8 +54,8 @@ class TestParseRequest(unittest.TestCase):
|
|||
path, args = parse_args('/abc/def?foo=42&foo=1337&baz=Hello,%20World!')
|
||||
self.assertEqual('/abc/def', path)
|
||||
self.assertEqual(2, len(args))
|
||||
self.assertIn('foo', args.keys())
|
||||
self.assertIn('baz', args.keys())
|
||||
self.assertIn('foo', args)
|
||||
self.assertIn('baz', args)
|
||||
self.assertTrue(args['foo'].is_array)
|
||||
self.assertTrue(args['baz'].is_scalar)
|
||||
self.assertEqual(2, len(args['foo']))
|
||||
|
@ -65,8 +65,8 @@ class TestParseRequest(unittest.TestCase):
|
|||
def test_parse_get_zero_arg(self):
|
||||
path, args = parse_args('/abc/def?foo=&bar=42')
|
||||
self.assertEqual(2, len(args))
|
||||
self.assertIn('foo', args.keys())
|
||||
self.assertIn('bar', args.keys())
|
||||
self.assertIn('foo', args)
|
||||
self.assertIn('bar', args)
|
||||
self.assertTrue(args['foo'].is_scalar)
|
||||
self.assertTrue(args['bar'].is_scalar)
|
||||
self.assertEqual(1, len(args['foo']))
|
||||
|
@ -83,9 +83,9 @@ class TestParseRequest(unittest.TestCase):
|
|||
enctype='application/x-www-form-urlencoded')
|
||||
self.assertEqual('/', path)
|
||||
self.assertEqual(3, len(args))
|
||||
self.assertIn('foo', args.keys())
|
||||
self.assertIn('bar', args.keys())
|
||||
self.assertIn('baz', args.keys())
|
||||
self.assertIn('foo', args)
|
||||
self.assertIn('bar', args)
|
||||
self.assertIn('baz', args)
|
||||
self.assertTrue(args['foo'].is_scalar)
|
||||
self.assertTrue(args['bar'].is_scalar)
|
||||
self.assertTrue(args['baz'].is_scalar)
|
||||
|
@ -102,8 +102,8 @@ class TestParseRequest(unittest.TestCase):
|
|||
enctype='application/x-www-form-urlencoded')
|
||||
self.assertEqual('/', path)
|
||||
self.assertEqual(2, len(args))
|
||||
self.assertIn('foo', args.keys())
|
||||
self.assertIn('baz', args.keys())
|
||||
self.assertIn('foo', args)
|
||||
self.assertIn('baz', args)
|
||||
self.assertTrue(args['foo'].is_array)
|
||||
self.assertTrue(args['baz'].is_scalar)
|
||||
self.assertEqual(2, len(args['foo']))
|
||||
|
@ -113,8 +113,8 @@ class TestParseRequest(unittest.TestCase):
|
|||
def test_parse_post_urlencoded_zero_arg(self):
|
||||
path, args = parse_args('/abc/def', postbody=b'foo=&bar=42', enctype='application/x-www-form-urlencoded')
|
||||
self.assertEqual(2, len(args))
|
||||
self.assertIn('foo', args.keys())
|
||||
self.assertIn('bar', args.keys())
|
||||
self.assertIn('foo', args)
|
||||
self.assertIn('bar', args)
|
||||
self.assertTrue(args['foo'].is_scalar)
|
||||
self.assertTrue(args['bar'].is_scalar)
|
||||
self.assertEqual(1, len(args['foo']))
|
||||
|
@ -152,9 +152,9 @@ class TestParseRequest(unittest.TestCase):
|
|||
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||
self.assertEqual('/', path)
|
||||
self.assertEqual(3, len(args))
|
||||
self.assertIn('foo', args.keys())
|
||||
self.assertIn('bar', args.keys())
|
||||
self.assertIn('baz', args.keys())
|
||||
self.assertIn('foo', args)
|
||||
self.assertIn('bar', args)
|
||||
self.assertIn('baz', args)
|
||||
self.assertTrue(args['foo'].is_scalar)
|
||||
self.assertTrue(args['bar'].is_scalar)
|
||||
self.assertTrue(args['baz'].is_scalar)
|
||||
|
@ -177,8 +177,8 @@ class TestParseRequest(unittest.TestCase):
|
|||
b'--testBoundary1337--\r\n',
|
||||
enctype='multipart/form-data; boundary=testBoundary1337')
|
||||
self.assertEqual(2, len(args))
|
||||
self.assertIn('foo', args.keys())
|
||||
self.assertIn('bar', args.keys())
|
||||
self.assertIn('foo', args)
|
||||
self.assertIn('bar', args)
|
||||
self.assertTrue(args['foo'].is_scalar)
|
||||
self.assertTrue(args['bar'].is_scalar)
|
||||
self.assertEqual(1, len(args['foo']))
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
|
||||
from typing import Any, Dict, List, Tuple, Union
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from matemat.webserver import HttpHandler, RequestArgument
|
||||
from matemat.webserver import HttpHandler, RequestArguments
|
||||
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
|
||||
|
||||
import codecs
|
||||
|
@ -10,7 +10,7 @@ import codecs
|
|||
@test_pagelet('/just/testing/post')
|
||||
def post_test_pagelet(method: str,
|
||||
path: str,
|
||||
args: Dict[str, RequestArgument],
|
||||
args: RequestArguments,
|
||||
session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str]):
|
||||
"""
|
||||
|
@ -18,12 +18,12 @@ def post_test_pagelet(method: str,
|
|||
"""
|
||||
headers['Content-Type'] = 'text/plain'
|
||||
dump: str = ''
|
||||
for k, ra in args.items():
|
||||
for ra in args:
|
||||
for a in ra:
|
||||
if a.get_content_type().startswith('text/'):
|
||||
dump += f'{k}: {a.get_str()}\n'
|
||||
dump += f'{a.name}: {a.get_str()}\n'
|
||||
else:
|
||||
dump += f'{k}: {codecs.encode(a.get_bytes(), "hex").decode("utf-8")}\n'
|
||||
dump += f'{a.name}: {codecs.encode(a.get_bytes(), "hex").decode("utf-8")}\n'
|
||||
return 200, dump
|
||||
|
||||
|
||||
|
|
|
@ -1,16 +1,16 @@
|
|||
|
||||
from typing import Any, Dict, Union
|
||||
from typing import Any, Dict
|
||||
|
||||
import os
|
||||
import os.path
|
||||
from matemat.webserver import HttpHandler, RequestArgument
|
||||
from matemat.webserver import HttpHandler, RequestArguments
|
||||
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
|
||||
|
||||
|
||||
@test_pagelet('/just/testing/serve_pagelet_ok')
|
||||
def serve_test_pagelet_ok(method: str,
|
||||
path: str,
|
||||
args: Dict[str, RequestArgument],
|
||||
args: RequestArguments,
|
||||
session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str]):
|
||||
headers['Content-Type'] = 'text/plain'
|
||||
|
@ -20,7 +20,7 @@ def serve_test_pagelet_ok(method: str,
|
|||
@test_pagelet('/just/testing/serve_pagelet_fail')
|
||||
def serve_test_pagelet_fail(method: str,
|
||||
path: str,
|
||||
args: Dict[str, RequestArgument],
|
||||
args: RequestArguments,
|
||||
session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str]):
|
||||
session_vars['test'] = 'hello, world!'
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
|
||||
from typing import Any, Dict, Union
|
||||
from typing import Any, Dict
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from time import sleep
|
||||
|
||||
from matemat.webserver import HttpHandler, RequestArgument
|
||||
from matemat.webserver import HttpHandler, RequestArguments
|
||||
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
|
||||
|
||||
|
||||
@test_pagelet('/just/testing/sessions')
|
||||
def session_test_pagelet(method: str,
|
||||
path: str,
|
||||
args: Dict[str, RequestArgument],
|
||||
args: RequestArguments,
|
||||
session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str]):
|
||||
session_vars['test'] = 'hello, world!'
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
|
||||
from typing import Dict, List, Tuple, Optional, Union
|
||||
from typing import Dict, List, Tuple, Optional
|
||||
|
||||
import urllib.parse
|
||||
|
||||
from matemat.webserver import RequestArgument
|
||||
from matemat.webserver import RequestArguments, RequestArgument
|
||||
|
||||
|
||||
def _parse_multipart(body: bytes, boundary: str) -> List[RequestArgument]:
|
||||
|
@ -76,7 +76,7 @@ def _parse_multipart(body: bytes, boundary: str) -> List[RequestArgument]:
|
|||
|
||||
|
||||
def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 'text/plain') \
|
||||
-> Tuple[str, Dict[str, RequestArgument]]:
|
||||
-> Tuple[str, RequestArguments]:
|
||||
"""
|
||||
Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded or
|
||||
multipart/form-data form, parse the arguments and return them as a dictionary.
|
||||
|
@ -98,11 +98,11 @@ def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 't
|
|||
else:
|
||||
getargs = urllib.parse.parse_qs(tokens.query, strict_parsing=True, keep_blank_values=True, errors='strict')
|
||||
|
||||
args: Dict[str, RequestArgument] = dict()
|
||||
for k, v in getargs.items():
|
||||
args[k] = RequestArgument(k)
|
||||
for _v in v:
|
||||
args[k].append('text/plain', _v)
|
||||
args = RequestArguments()
|
||||
for k, vs in getargs.items():
|
||||
args[k].clear()
|
||||
for v in vs:
|
||||
args[k].append('text/plain', v)
|
||||
|
||||
if postbody is not None:
|
||||
if enctype == 'application/x-www-form-urlencoded':
|
||||
|
@ -113,10 +113,10 @@ def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 't
|
|||
else:
|
||||
postargs = urllib.parse.parse_qs(pb, strict_parsing=True, keep_blank_values=True, errors='strict')
|
||||
# Write all POST values into the dict, overriding potential duplicates from GET
|
||||
for k, v in postargs.items():
|
||||
args[k] = RequestArgument(k)
|
||||
for _v in v:
|
||||
args[k].append('text/plain', _v)
|
||||
for k, vs in postargs.items():
|
||||
args[k].clear()
|
||||
for v in vs:
|
||||
args[k].append('text/plain', v)
|
||||
elif enctype.startswith('multipart/form-data'):
|
||||
# Parse the multipart boundary from the Content-Type header
|
||||
try:
|
||||
|
@ -126,7 +126,9 @@ def parse_args(request: str, postbody: Optional[bytes] = None, enctype: str = 't
|
|||
# Parse the multipart body
|
||||
mpargs = _parse_multipart(postbody, boundary)
|
||||
for ra in mpargs:
|
||||
args[ra.name] = ra
|
||||
args[ra.name].clear()
|
||||
for a in ra:
|
||||
args[ra.name].append(a.get_content_type(), bytes(a))
|
||||
else:
|
||||
raise ValueError(f'Unsupported Content-Type: {enctype}')
|
||||
# Return the path and the parsed arguments
|
||||
|
|
Loading…
Reference in a new issue