diff --git a/.gitignore b/.gitignore index 95dc460..cca262e 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,4 @@ **/.mypy_cache/ *.sqlite3 -*.db \ No newline at end of file +*.db diff --git a/matemat/__main__.py b/matemat/__main__.py new file mode 100644 index 0000000..819ae2b --- /dev/null +++ b/matemat/__main__.py @@ -0,0 +1,16 @@ + +import sys + +if __name__ == '__main__': + # Those imports are actually needed, as they implicitly register pagelets. + # noinspection PyUnresolvedReferences + from matemat.webserver.pagelets import * + from matemat.webserver import MatematWebserver + + # Read HTTP port from command line + port: int = 8080 + if len(sys.argv) > 1: + port = int(sys.argv[1]) + + # Start the web server + MatematWebserver(port).start() diff --git a/matemat/db/facade.py b/matemat/db/facade.py index f91bf4a..027c62f 100644 --- a/matemat/db/facade.py +++ b/matemat/db/facade.py @@ -140,8 +140,10 @@ class MatematDatabase(object): user_id, username, email, pwhash, tkhash, admin, member = row if password is not None and not bcrypt.checkpw(password.encode('utf-8'), pwhash): raise AuthenticationError('Password mismatch') - elif touchkey is not None and not bcrypt.checkpw(touchkey.encode('utf-8'), tkhash): + elif touchkey is not None and tkhash is not None and not bcrypt.checkpw(touchkey.encode('utf-8'), tkhash): raise AuthenticationError('Touchkey mismatch') + elif touchkey is not None and tkhash is None: + raise AuthenticationError('Touchkey not set') return User(user_id, username, email, admin, member) def change_password(self, user: User, oldpass: str, newpass: str, verify_password: bool = True) -> None: diff --git a/matemat/webserver/__init__.py b/matemat/webserver/__init__.py index e69de29..bb51b73 100644 --- a/matemat/webserver/__init__.py +++ b/matemat/webserver/__init__.py @@ -0,0 +1,2 @@ + +from .httpd import MatematWebserver, HttpHandler, pagelet diff --git a/matemat/webserver/httpd.py b/matemat/webserver/httpd.py index 2ae1685..ff6c7d4 100644 --- a/matemat/webserver/httpd.py +++ b/matemat/webserver/httpd.py @@ -1,6 +1,13 @@ -from typing import Tuple, Dict +from typing import Any, Callable, Dict, List, Optional, Tuple, Union +import traceback + +import os +import socket +import mimetypes +import urllib.parse +from socketserver import TCPServer from http.server import HTTPServer, BaseHTTPRequestHandler from http.cookies import SimpleCookie from uuid import uuid4 @@ -9,10 +16,52 @@ from datetime import datetime, timedelta from matemat import __version__ as matemat_version +# Enable IPv6 support (with implicit DualStack). +TCPServer.address_family = socket.AF_INET6 + + +# Dictionary to hold registered pagelet paths and their handler functions. +_PAGELET_PATHS: Dict[str, Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes], + Tuple[int, Union[bytes, str]]]] = dict() + + +def pagelet(path: str): + """ + Annotate a function to act as a pagelet (part of a website). The function will be called if a request is made to + the path specified as argument to the annotation. + + The function must have the following signature: + + (method: str, path: str, args: Dict[str, Union[str, List[str]], session_vars: Dict[str, Any], + headers: Dict[str, str]) -> (int, Optional[Union[str, bytes]]) + + method: The HTTP method (GET, POST) that was used. + path: The path that was requested. + args: The arguments that were passed with the request (as GET or POST arguments). + session_vars: The session storage. May be read from and written to. + headers: The dictionary of HTTP response headers. Add headers you wish to send with the response. + returns: A tuple consisting of the HTTP status code (as an int) and the response body (as str or bytes, + may be None) + + :param path: The path to register the function for. + """ + def http_handler(fun: Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes], + Tuple[int, Union[bytes, str]]]): + # Add the function to the dict of pagelets. + _PAGELET_PATHS[path] = fun + # Don't change the function itself at all. + return fun + # Return the inner function (Python requires a "real" function annotation to not have any arguments except + # the function itself). + return http_handler + + class MatematWebserver(object): - def __init__(self) -> None: - self._httpd = HTTPServer(('', 8080), HttpHandler) + def __init__(self, port: int = 80, webroot: str = './webroot') -> None: + self._httpd = HTTPServer(('::', port), HttpHandler) + self._httpd.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict() + self._httpd.webroot = os.path.abspath(webroot) def start(self) -> None: self._httpd.serve_forever() @@ -22,8 +71,6 @@ class HttpHandler(BaseHTTPRequestHandler): def __init__(self, request: bytes, client_address: Tuple[str, int], server: HTTPServer) -> None: super().__init__(request, client_address, server) - self._session_vars: Dict[str, Tuple[datetime, Dict[str, object]]] = dict() - print(self._session_vars) @property def server_version(self) -> str: @@ -34,28 +81,146 @@ class HttpHandler(BaseHTTPRequestHandler): cookiestring = '\n'.join(self.headers.get_all('Cookie', failobj=[])) cookie = SimpleCookie() cookie.load(cookiestring) - session_id = cookie['matemat_session_id'] if 'matemat_session_id' in cookie else str(uuid4()) + session_id = str(cookie['matemat_session_id'].value) if 'matemat_session_id' in cookie else None + if session_id is None or session_id not in self.server.session_vars: + session_id = str(uuid4()) - if session_id in self._session_vars and self._session_vars[session_id][0] < now: + if session_id in self.server.session_vars and self.server.session_vars[session_id][0] < now: self.end_session(session_id) - raise TimeoutError('Session timed out') - elif session_id not in self._session_vars: - self._session_vars[session_id] = (now + timedelta(hours=1)), dict() - return session_id, now + raise TimeoutError('Session timed out.') + elif session_id not in self.server.session_vars: + self.server.session_vars[session_id] = (now + timedelta(seconds=10)), dict() + return session_id, self.server.session_vars[session_id][0] def end_session(self, session_id: str) -> None: - if session_id in self._session_vars: - del self._session_vars[session_id] + if session_id in self.server.session_vars: + del self.server.session_vars[session_id] - def do_GET(self) -> None: + def _handle(self, method: str, path: str, args: Dict[str, str]) -> None: try: session_id, timeout = self.start_session() except TimeoutError: + self.send_error(599, 'Session Timed Out', 'Session Timed Out.') self.send_header('Set-Cookie', 'matemat_session_id=; expires=Thu, 01 Jan 1970 00:00:00 GMT') - self.send_error(599, 'Session Timed Out.', 'Please log in again.') + self.end_headers() return - self.send_response(200, 'Welcome!') + self.session_id: str = session_id + if path in _PAGELET_PATHS: + headers: Dict[str, str] = { + 'Content-Type': 'text/html', + 'Cache-Control': 'no-cache' + } + hsc, data = _PAGELET_PATHS[path](method, path, args, self.session_vars, headers) + if data is None: + data = bytes() + if isinstance(data, str): + data = data.encode('utf-8') + self.send_response(hsc) + expires = timeout.strftime("%a, %d %b %Y %H:%M:%S GMT") + self.send_header('Set-Cookie', + f'matemat_session_id={session_id}; expires={expires}') + headers['Content-Length'] = str(len(data)) + for name, value in headers.items(): + self.send_header(name, value) + self.end_headers() + self.wfile.write(data) + else: + if method != 'GET': + self.send_error(405) + self.end_headers() + return + filepath: str = os.path.abspath(os.path.join(self.server.webroot, path[1:])) + if os.path.commonpath([filepath, self.server.webroot]) == self.server.webroot and os.path.exists(filepath): + with open(filepath, 'rb') as f: + data = f.read() + self.send_response(200) + mimetype, _ = mimetypes.guess_type(filepath) + if mimetype is not None: + self.send_header('Content-Type', mimetype) + self.end_headers() + if method == 'GET': + self.wfile.write(data) + else: + self.send_response(404) + self.end_headers() -if __name__ == '__main__': - MatematWebserver().start() + @staticmethod + def _parse_args(request: str, postbody: Optional[str] = None) -> Tuple[str, Dict[str, Union[str, List[str]]]]: + """ + Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded form, parse the + arguments and return them as a dictionary. + If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded. + :param request: The request string to parse. + :param postbody: The POST body to parse, defaults to None. + :return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs. + """ + # Parse the request "URL" (i.e. only the path). + tokens = urllib.parse.urlparse(request) + # Parse the GET arguments. + args = urllib.parse.parse_qs(tokens.query) + + if postbody is not None: + # Parse the POST body. + postargs = urllib.parse.parse_qs(postbody) + # Write all POST values into the dict, overriding potential duplicates from GET. + for k, v in postargs.items(): + args[k] = v + # urllib.parse.parse_qs turns ALL arguments into arrays. This turns arrays of length 1 into scalar values. + for k, v in args.items(): + if len(v) == 1: + args[k] = v[0] + # Return the path and the parsed arguments. + return tokens.path, args + + # noinspection PyPep8Naming + def do_GET(self) -> None: + try: + path, args = self._parse_args(self.path) + self._handle('GET', path, args) + except PermissionError as e: + self.send_error(403, 'Forbidden') + self.end_headers() + print(type(e)) + traceback.print_tb(e.__traceback__) + except ValueError as e: + self.send_header(400, 'Bad Request') + self.end_headers() + print(type(e)) + traceback.print_tb(e.__traceback__) + except BaseException as e: + self.send_error(500, 'Internal Server Error') + self.end_headers() + print(type(e)) + traceback.print_tb(e.__traceback__) + + # noinspection PyPep8Naming + def do_POST(self) -> None: + try: + clen: str = self.headers.get('Content-Length', failobj='0') + ctype: str = self.headers.get('Content-Type', failobj='application/octet-stream') + post = '' + if ctype == 'application/x-www-form-urlencoded': + post: str = self.rfile.read(int(clen)).decode('utf-8') + + path, args = self._parse_args(self.path, postbody=post) + self._handle('POST', path, args) + except PermissionError as e: + self.send_error(403, 'Forbidden') + self.end_headers() + print(type(e)) + traceback.print_tb(e.__traceback__) + except ValueError as e: + self.send_header(400, 'Bad Request') + self.end_headers() + print(type(e)) + traceback.print_tb(e.__traceback__) + except BaseException as e: + self.send_error(500, 'Internal Server Error') + self.end_headers() + print(type(e)) + traceback.print_tb(e.__traceback__) + + @property + def session_vars(self) -> Dict[str, Any]: + return self.server.session_vars[self.session_id][1] diff --git a/matemat/webserver/pagelets/__init__.py b/matemat/webserver/pagelets/__init__.py new file mode 100644 index 0000000..dabd91d --- /dev/null +++ b/matemat/webserver/pagelets/__init__.py @@ -0,0 +1,5 @@ + +from .main import main_page +from .login import login_page +from .logout import logout +from .touchkey import touchkey_page diff --git a/matemat/webserver/pagelets/login.py b/matemat/webserver/pagelets/login.py new file mode 100644 index 0000000..876fd71 --- /dev/null +++ b/matemat/webserver/pagelets/login.py @@ -0,0 +1,50 @@ + +from typing import Any, Dict + +from matemat.exceptions import AuthenticationError +from matemat.webserver import pagelet +from matemat.primitives import User +from matemat.db import MatematDatabase + + +@pagelet('/login') +def login_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]): + if 'user' in session_vars: + headers['Location'] = '/' + return 301, None + if method == 'GET': + data = ''' + + + + Matemat + + + +

Matemat

+ {msg} +
+ Username:
+ Password:
+ +
+ + + ''' + return 200, data.format(msg=args['msg'] if 'msg' in args else '') + elif method == 'POST': + print(args) + with MatematDatabase('test.db') as db: + try: + user: User = db.login(args['username'], args['password']) + except AuthenticationError: + headers['Location'] = '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.' + return 301, bytes() + session_vars['user'] = user + headers['Location'] = '/' + return 301, bytes() diff --git a/matemat/webserver/pagelets/logout.py b/matemat/webserver/pagelets/logout.py new file mode 100644 index 0000000..86095b0 --- /dev/null +++ b/matemat/webserver/pagelets/logout.py @@ -0,0 +1,12 @@ + +from typing import Any, Dict + +from matemat.webserver import pagelet + + +@pagelet('/logout') +def logout(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]): + if 'user' in session_vars: + del session_vars['user'] + headers['Location'] = '/' + return 301, None diff --git a/matemat/webserver/pagelets/main.py b/matemat/webserver/pagelets/main.py new file mode 100644 index 0000000..2ead15d --- /dev/null +++ b/matemat/webserver/pagelets/main.py @@ -0,0 +1,53 @@ + +from typing import Any, Dict, Optional, Tuple, Union + +from matemat.webserver import MatematWebserver, pagelet +from matemat.primitives import User +from matemat.db import MatematDatabase + + +@pagelet('/') +def main_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str])\ + -> Tuple[int, Optional[Union[str, bytes]]]: + data = ''' + + + + Matemat + + + +

Matemat

+ {user} + + + + ''' + try: + with MatematDatabase('test.db') as db: + if 'user' in session_vars: + user: User = session_vars['user'] + products = db.list_products() + plist = '\n'.join([f'
  • {p.name} ' + + f'{p.price_member//100 if user.is_member else p.price_non_member//100}' + + f'.{p.price_member%100 if user.is_member else p.price_non_member%100}' + for p in products]) + uname = f'{user.name} (Logout)' + data = data.format(user=uname, list=plist) + else: + users = db.list_users() + ulist = '\n'.join([f'
  • {u.name}' for u in users]) + ulist = ulist + '
  • Password login' + data = data.format(user='', list=ulist) + return 200, data + except BaseException as e: + import traceback + traceback.print_tb(e.__traceback__) + return 500, None diff --git a/matemat/webserver/pagelets/touchkey.py b/matemat/webserver/pagelets/touchkey.py new file mode 100644 index 0000000..fd99fea --- /dev/null +++ b/matemat/webserver/pagelets/touchkey.py @@ -0,0 +1,48 @@ + +from typing import Any, Dict + +from matemat.exceptions import AuthenticationError +from matemat.webserver import pagelet +from matemat.primitives import User +from matemat.db import MatematDatabase + + +@pagelet('/touchkey') +def touchkey_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]): + if 'user' in session_vars: + headers['Location'] = '/' + return 301, bytes() + if method == 'GET': + data = ''' + + + + Matemat + + + +

    Matemat

    +
    +
    + Touchkey:
    + +
    + + + ''' + return 200, data.format(username=args['username'] if 'username' in args else '') + elif method == 'POST': + with MatematDatabase('test.db') as db: + try: + user: User = db.login(args['username'], touchkey=args['touchkey']) + except AuthenticationError: + headers['Location'] = f'/touchkey?username={args["username"]}&msg=Please%20try%20again.' + return 301, bytes() + session_vars['user'] = user + headers['Location'] = '/' + return 301, None