1
0
Fork 0
forked from s3lph/matemat

More user-friendly return value in the pagelet API.

This commit is contained in:
s3lph 2018-07-07 18:58:23 +02:00
parent 528f7322ac
commit f1ff14d29c
12 changed files with 141 additions and 92 deletions

View file

@ -2,7 +2,7 @@
from typing import Optional from typing import Optional
class AuthenticationError(BaseException): class AuthenticationError(Exception):
def __init__(self, msg: Optional[str] = None) -> None: def __init__(self, msg: Optional[str] = None) -> None:
super().__init__() super().__init__()

View file

@ -0,0 +1,20 @@
class HttpException(Exception):
def __init__(self, status: int = 500, title: str = 'An error occurred', message: str = None) -> None:
super().__init__()
self.__status: int = status
self.__title: str = title
self.__message: str = message
@property
def status(self) -> int:
return self.__status
@property
def title(self) -> str:
return self.__title
@property
def message(self) -> str:
return self.__message

View file

@ -4,3 +4,4 @@ This package provides custom exception classes used in the Matemat codebase.
from .AuthenticatonError import AuthenticationError from .AuthenticatonError import AuthenticationError
from .DatabaseConsistencyError import DatabaseConsistencyError from .DatabaseConsistencyError import DatabaseConsistencyError
from .HttpException import HttpException

View file

@ -1,5 +1,5 @@
from typing import Any, Callable, Dict, Optional, Tuple, Union from typing import Any, Callable, Dict, Tuple, Union
import traceback import traceback
@ -13,10 +13,10 @@ from uuid import uuid4
from datetime import datetime, timedelta from datetime import datetime, timedelta
from matemat import __version__ as matemat_version from matemat import __version__ as matemat_version
from matemat.exceptions import HttpException
from matemat.webserver import RequestArguments from matemat.webserver import RequestArguments
from matemat.webserver.util import parse_args from matemat.webserver.util import parse_args
# #
# Python internal class hacks # Python internal class hacks
# #
@ -27,15 +27,16 @@ TCPServer.address_family = socket.AF_INET6
BaseHTTPRequestHandler.log_request = lambda self, code='-', size='-': None BaseHTTPRequestHandler.log_request = lambda self, code='-', size='-': None
BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None
# Dictionary to hold registered pagelet paths and their handler functions # Dictionary to hold registered pagelet paths and their handler functions
_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...) _PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
str, # Request path str, # Request path
RequestArguments, # HTTP Request arguments RequestArguments, # HTTP Request arguments
Dict[str, Any], # Session vars Dict[str, Any], # Session vars
Dict[str, str]], # Response headers Dict[str, str]], # Response headers
Tuple[int, Union[bytes, str]]]] = dict() # Returns: (status code, response body) Union[ # Return type: either a response body, or a redirect
bytes, str, # Response body: will assign HTTP/1.0 200 OK
Tuple[int, str] # Redirect: First element must be 301, second the redirect path
]]] = dict()
# Inactivity timeout for client sessions # Inactivity timeout for client sessions
_SESSION_TIMEOUT: int = 3600 _SESSION_TIMEOUT: int = 3600
@ -54,15 +55,17 @@ def pagelet(path: str):
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]) headers: Dict[str, str])
-> (int, Optional[Union[str, bytes]]) -> Union[bytes, str, Tuple[int, str]]
method: The HTTP method (GET, POST) that was used. method: The HTTP method (GET, POST) that was used.
path: The path that was requested. path: The path that was requested.
args: The arguments that were passed with the request (as GET or POST arguments). args: The arguments that were passed with the request (as GET or POST arguments).
session_vars: The session storage. May be read from and written to. session_vars: The session storage. May be read from and written to.
headers: The dictionary of HTTP response headers. Add headers you wish to send with the response. headers: The dictionary of HTTP response headers. Add headers you wish to send with the response.
returns: A tuple consisting of the HTTP status code (as an int) and the response body (as str or bytes, returns: One of the following:
may be None) - A HTTP Response body as str or bytes
- A HTTP redirect: A tuple of 301 (an int) and the path to redirect to (a str)
raises: HttpException: If a non-200 HTTP status code should be returned
:param path: The path to register the function for. :param path: The path to register the function for.
""" """
@ -72,11 +75,15 @@ def pagelet(path: str):
RequestArguments, RequestArguments,
Dict[str, Any], Dict[str, Any],
Dict[str, str]], Dict[str, str]],
Tuple[int, Optional[Union[bytes, str]]]]): Union[
bytes, str,
Tuple[int, str]
]]):
# Add the function to the dict of pagelets # Add the function to the dict of pagelets
_PAGELET_PATHS[path] = fun _PAGELET_PATHS[path] = fun
# Don't change the function itself at all # Don't change the function itself at all
return fun return fun
# Return the inner function (Python requires a "real" function annotation to not have any arguments except # Return the inner function (Python requires a "real" function annotation to not have any arguments except
# the function itself) # the function itself)
return http_handler return http_handler
@ -167,7 +174,7 @@ class HttpHandler(BaseHTTPRequestHandler):
if session_id not in self.server.session_vars: if session_id not in self.server.session_vars:
self.server.session_vars[session_id] = (now + timedelta(seconds=_SESSION_TIMEOUT)), dict() self.server.session_vars[session_id] = (now + timedelta(seconds=_SESSION_TIMEOUT)), dict()
else: else:
self.server.session_vars[session_id] =\ self.server.session_vars[session_id] = \
(now + timedelta(seconds=_SESSION_TIMEOUT), self.server.session_vars[session_id][1]) (now + timedelta(seconds=_SESSION_TIMEOUT), self.server.session_vars[session_id][1])
# Return the session ID and timeout # Return the session ID and timeout
return session_id, self.server.session_vars[session_id][0] return session_id, self.server.session_vars[session_id][0]
@ -181,6 +188,45 @@ class HttpHandler(BaseHTTPRequestHandler):
if session_id in self.server.session_vars: if session_id in self.server.session_vars:
del self.server.session_vars[session_id] del self.server.session_vars[session_id]
@staticmethod
def _parse_pagelet_result(pagelet_res: Union[bytes, str, Tuple[int, str]], headers: Dict[str, str]) \
-> Tuple[int, bytes]:
"""
Process the return value of a pagelet function call.
:param pagelet_res: The pagelet return value.
:param headers: The dict of HTTP response headers, needed for setting the redirect header.
:return: The HTTP Response status code (an int) and body (a bytes).
:raises TypeError: If the pagelet result was not in the expected form.
"""
# The HTTP Response Status Code, defaults to 200 OK
hsc: int = 200
# The HTTP Response body, defaults to None
data: Union[bytes, str] = None
if isinstance(pagelet_res, tuple):
# If the return type is a tuple, the first element must be 301 (the HTTP Redirect status code)
head, tail = pagelet_res
if head == 301:
# Set the HTTP Response Status Code, and the redirect header
hsc = 301
headers['Location'] = tail
else:
raise TypeError(f'Return value of pagelet not understood: {pagelet_res}')
elif isinstance(pagelet_res, str) or isinstance(pagelet_res, bytes):
# Return value is a response body
data = pagelet_res
else:
# Return value is not a response body or a redirect
raise TypeError(f'Return value of pagelet not understood: {pagelet_res}')
# The pagelet may return None as data as a shorthand for an empty response
if data is None:
data = bytes()
# If the pagelet returns a Python str, convert it to an UTF-8 encoded bytes object
if isinstance(data, str):
data = data.encode('utf-8')
# Return the resulting status code and body
return hsc, data
def _handle(self, method: str, path: str, args: RequestArguments) -> None: def _handle(self, method: str, path: str, args: RequestArguments) -> None:
""" """
Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource. Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource.
@ -209,13 +255,9 @@ class HttpHandler(BaseHTTPRequestHandler):
'Cache-Control': 'no-cache' 'Cache-Control': 'no-cache'
} }
# Call the pagelet function # Call the pagelet function
hsc, data = _PAGELET_PATHS[path](method, path, args, self.session_vars, headers) pagelet_res = _PAGELET_PATHS[path](method, path, args, self.session_vars, headers)
# The pagelet may return None as data as a shorthand for an empty response # Parse the pagelet's return value, vielding a HTTP status code and a response body
if data is None: hsc, data = HttpHandler._parse_pagelet_result(pagelet_res, headers)
data = bytes()
# If the pagelet returns a Python str, convert it to an UTF-8 encoded bytes object
if isinstance(data, str):
data = data.encode('utf-8')
# Send the HTTP status code # Send the HTTP status code
self.send_response(hsc) self.send_response(hsc)
# Format the session cookie timeout string and send the session cookie header # Format the session cookie timeout string and send the session cookie header
@ -272,16 +314,18 @@ class HttpHandler(BaseHTTPRequestHandler):
path, args = parse_args(self.path) path, args = parse_args(self.path)
self._handle('GET', path, args) self._handle('GET', path, args)
# Special handling for some errors # Special handling for some errors
except HttpException as e:
self.send_error(e.status, e.title, e.message)
except PermissionError: except PermissionError:
self.send_response(403, 'Forbidden') self.send_error(403, 'Forbidden')
self.end_headers()
except ValueError: except ValueError:
self.send_response(400, 'Bad Request') self.send_error(400, 'Bad Request')
self.end_headers() except BaseException as e:
except BaseException:
# Generic error handling # Generic error handling
self.send_response(500, 'Internal Server Error') self.send_error(500, 'Internal Server Error')
self.end_headers() print(e)
traceback.print_tb(e.__traceback__)
# noinspection PyPep8Naming # noinspection PyPep8Naming
def do_POST(self) -> None: def do_POST(self) -> None:
@ -299,19 +343,15 @@ class HttpHandler(BaseHTTPRequestHandler):
# Parse the request and hand it to the handle function # Parse the request and hand it to the handle function
self._handle('POST', path, args) self._handle('POST', path, args)
# Special handling for some errors # Special handling for some errors
except HttpException as e:
self.send_error(e.status, e.title, e.message)
except PermissionError: except PermissionError:
self.send_response(403, 'Forbidden') self.send_error(403, 'Forbidden')
self.end_headers()
except ValueError: except ValueError:
self.send_response(400, 'Bad Request') self.send_error(400, 'Bad Request')
self.end_headers()
except TypeError:
self.send_response(400, 'Bad Request')
self.end_headers()
except BaseException as e: except BaseException as e:
# Generic error handling # Generic error handling
self.send_response(500, 'Internal Server Error') self.send_error(500, 'Internal Server Error')
self.end_headers()
print(e) print(e)
traceback.print_tb(e.__traceback__) traceback.print_tb(e.__traceback__)

View file

@ -1,7 +1,7 @@
from typing import Any, Dict, Optional, Tuple, Union from typing import Any, Dict, Tuple, Union
from matemat.exceptions import AuthenticationError from matemat.exceptions import AuthenticationError, HttpException
from matemat.webserver import pagelet, RequestArguments from matemat.webserver import pagelet, RequestArguments
from matemat.primitives import User from matemat.primitives import User
from matemat.db import MatematDatabase from matemat.db import MatematDatabase
@ -13,10 +13,9 @@ def login_page(method: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str])\ headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]: -> Union[bytes, str, Tuple[int, str]]:
if 'user' in session_vars: if 'user' in session_vars:
headers['Location'] = '/' return 301, '/'
return 301, None
if method == 'GET': if method == 'GET':
data = ''' data = '''
<DOCTYPE html> <DOCTYPE html>
@ -41,15 +40,13 @@ def login_page(method: str,
</body> </body>
</html> </html>
''' '''
return 200, data.format(msg=str(args.msg) if 'msg' in args else '') return data.format(msg=str(args.msg) if 'msg' in args else '')
elif method == 'POST': elif method == 'POST':
with MatematDatabase('test.db') as db: with MatematDatabase('test.db') as db:
try: try:
user: User = db.login(str(args.username), str(args.password)) user: User = db.login(str(args.username), str(args.password))
except AuthenticationError: except AuthenticationError:
headers['Location'] = '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.' return 301, '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.'
return 301, bytes()
session_vars['user'] = user session_vars['user'] = user
headers['Location'] = '/' return 301, '/'
return 301, bytes() raise HttpException(405)
return 405, None

View file

@ -10,8 +10,7 @@ def logout(method: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str])\ headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]: -> Union[bytes, str, Tuple[int, str]]:
if 'user' in session_vars: if 'user' in session_vars:
del session_vars['user'] del session_vars['user']
headers['Location'] = '/' return 301, '/'
return 301, None

View file

@ -12,7 +12,7 @@ def main_page(method: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str])\ headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]: -> Union[bytes, str, Tuple[int, str]]:
data = ''' data = '''
<DOCTYPE html> <DOCTYPE html>
<html> <html>
@ -34,24 +34,19 @@ def main_page(method: str,
</body> </body>
</html> </html>
''' '''
try: with MatematDatabase('test.db') as db:
with MatematDatabase('test.db') as db: if 'user' in session_vars:
if 'user' in session_vars: user: User = session_vars['user']
user: User = session_vars['user'] products = db.list_products()
products = db.list_products() plist = '\n'.join([f'<li/> <b>{p.name}</b> ' +
plist = '\n'.join([f'<li/> <b>{p.name}</b> ' + f'{p.price_member//100 if user.is_member else p.price_non_member//100}' +
f'{p.price_member//100 if user.is_member else p.price_non_member//100}' + f'.{p.price_member%100 if user.is_member else p.price_non_member%100}'
f'.{p.price_member%100 if user.is_member else p.price_non_member%100}' for p in products])
for p in products]) uname = f'<b>{user.name}</b> <a href="/logout">(Logout)</a>'
uname = f'<b>{user.name}</b> <a href="/logout">(Logout)</a>' data = data.format(user=uname, list=plist)
data = data.format(user=uname, list=plist) else:
else: users = db.list_users()
users = db.list_users() ulist = '\n'.join([f'<li/> <b><a href=/touchkey?username={u.name}>{u.name}</a></b>' for u in users])
ulist = '\n'.join([f'<li/> <b><a href=/touchkey?username={u.name}>{u.name}</a></b>' for u in users]) ulist = ulist + '<li/> <a href=/login>Password login</a>'
ulist = ulist + '<li/> <a href=/login>Password login</a>' data = data.format(user='', list=ulist)
data = data.format(user='', list=ulist) return data
return 200, data
except BaseException as e:
import traceback
traceback.print_tb(e.__traceback__)
return 500, None

View file

@ -1,7 +1,7 @@
from typing import Any, Dict, Optional, Tuple, Union from typing import Any, Dict, Tuple, Union
from matemat.exceptions import AuthenticationError from matemat.exceptions import AuthenticationError, HttpException
from matemat.webserver import pagelet, RequestArguments from matemat.webserver import pagelet, RequestArguments
from matemat.primitives import User from matemat.primitives import User
from matemat.db import MatematDatabase from matemat.db import MatematDatabase
@ -13,10 +13,9 @@ def touchkey_page(method: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str])\ headers: Dict[str, str])\
-> Tuple[int, Optional[Union[str, bytes]]]: -> Union[bytes, str, Tuple[int, str]]:
if 'user' in session_vars: if 'user' in session_vars:
headers['Location'] = '/' return 301, '/'
return 301, bytes()
if method == 'GET': if method == 'GET':
data = ''' data = '''
<DOCTYPE html> <DOCTYPE html>
@ -40,15 +39,13 @@ def touchkey_page(method: str,
</body> </body>
</html> </html>
''' '''
return 200, data.format(username=str(args.username) if 'username' in args else '') return data.format(username=str(args.username) if 'username' in args else '')
elif method == 'POST': elif method == 'POST':
with MatematDatabase('test.db') as db: with MatematDatabase('test.db') as db:
try: try:
user: User = db.login(str(args.username), touchkey=str(args.touchkey)) user: User = db.login(str(args.username), touchkey=str(args.touchkey))
except AuthenticationError: except AuthenticationError:
headers['Location'] = f'/touchkey?username={args["username"]}&msg=Please%20try%20again.' return 301, f'/touchkey?username={args["username"]}&msg=Please%20try%20again.'
return 301, bytes()
session_vars['user'] = user session_vars['user'] = user
headers['Location'] = '/' return 301, '/'
return 301, None raise HttpException(405)
return 405, None

View file

@ -16,6 +16,8 @@ class HttpResponse:
""" """
A really basic HTTP response container and parser class, just good enough for unit testing a HTTP server, if even. A really basic HTTP response container and parser class, just good enough for unit testing a HTTP server, if even.
DO NOT USE THIS OUTSIDE UNIT TESTING!
Usage: Usage:
response = HttpResponse() response = HttpResponse()
while response.parse_phase != 'done' while response.parse_phase != 'done'
@ -161,16 +163,16 @@ def test_pagelet(path: str):
RequestArguments, RequestArguments,
Dict[str, Any], Dict[str, Any],
Dict[str, str]], Dict[str, str]],
Tuple[int, Union[bytes, str]]]): Union[bytes, str, Tuple[int, str]]]):
@pagelet(path) @pagelet(path)
def testing_wrapper(method: str, def testing_wrapper(method: str,
path: str, path: str,
args: RequestArguments, args: RequestArguments,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]): headers: Dict[str, str]):
status, body = fun(method, path, args, session_vars, headers)
headers['X-Test-Pagelet'] = fun.__name__ headers['X-Test-Pagelet'] = fun.__name__
return status, body result = fun(method, path, args, session_vars, headers)
return result
return testing_wrapper return testing_wrapper
return with_testing_headers return with_testing_headers

View file

@ -24,7 +24,7 @@ def post_test_pagelet(method: str,
dump += f'{a.name}: {a.get_str()}\n' dump += f'{a.name}: {a.get_str()}\n'
else: else:
dump += f'{a.name}: {codecs.encode(a.get_bytes(), "hex").decode("utf-8")}\n' dump += f'{a.name}: {codecs.encode(a.get_bytes(), "hex").decode("utf-8")}\n'
return 200, dump return dump
class TestPost(AbstractHttpdTest): class TestPost(AbstractHttpdTest):

View file

@ -3,6 +3,7 @@ from typing import Any, Dict
import os import os
import os.path import os.path
from matemat.exceptions import HttpException
from matemat.webserver import HttpHandler, RequestArguments from matemat.webserver import HttpHandler, RequestArguments
from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet from matemat.webserver.test.abstract_httpd_test import AbstractHttpdTest, test_pagelet
@ -14,7 +15,7 @@ def serve_test_pagelet_ok(method: str,
session_vars: Dict[str, Any], session_vars: Dict[str, Any],
headers: Dict[str, str]): headers: Dict[str, str]):
headers['Content-Type'] = 'text/plain' headers['Content-Type'] = 'text/plain'
return 200, 'serve test pagelet ok' return 'serve test pagelet ok'
@test_pagelet('/just/testing/serve_pagelet_fail') @test_pagelet('/just/testing/serve_pagelet_fail')
@ -25,7 +26,7 @@ def serve_test_pagelet_fail(method: str,
headers: Dict[str, str]): headers: Dict[str, str]):
session_vars['test'] = 'hello, world!' session_vars['test'] = 'hello, world!'
headers['Content-Type'] = 'text/plain' headers['Content-Type'] = 'text/plain'
return 500, 'serve test pagelet fail' raise HttpException()
class TestServe(AbstractHttpdTest): class TestServe(AbstractHttpdTest):
@ -62,11 +63,8 @@ class TestServe(AbstractHttpdTest):
HttpHandler(self.client_sock, ('::1', 45678), self.server) HttpHandler(self.client_sock, ('::1', 45678), self.server)
packet = self.client_sock.get_response() packet = self.client_sock.get_response()
# Make sure the correct pagelet was called # Make sure an error is raised
self.assertEqual('serve_test_pagelet_fail', packet.pagelet)
# Make sure the expected content is served
self.assertEqual(500, packet.statuscode) self.assertEqual(500, packet.statuscode)
self.assertEqual(b'serve test pagelet fail', packet.body)
def test_serve_static_ok(self): def test_serve_static_ok(self):
# Request a static resource # Request a static resource

View file

@ -16,7 +16,7 @@ def session_test_pagelet(method: str,
headers: Dict[str, str]): headers: Dict[str, str]):
session_vars['test'] = 'hello, world!' session_vars['test'] = 'hello, world!'
headers['Content-Type'] = 'text/plain' headers['Content-Type'] = 'text/plain'
return 200, 'session test' return 'session test'
class TestSession(AbstractHttpdTest): class TestSession(AbstractHttpdTest):