forked from s3lph/matemat
Initial commit for webserver code. Still needs a lot of documentation, and even more, test coverage.
This commit is contained in:
parent
2a53952003
commit
700af6883f
10 changed files with 373 additions and 20 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -8,4 +8,4 @@
|
|||
**/.mypy_cache/
|
||||
|
||||
*.sqlite3
|
||||
*.db
|
||||
*.db
|
||||
|
|
16
matemat/__main__.py
Normal file
16
matemat/__main__.py
Normal file
|
@ -0,0 +1,16 @@
|
|||
|
||||
import sys
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Those imports are actually needed, as they implicitly register pagelets.
|
||||
# noinspection PyUnresolvedReferences
|
||||
from matemat.webserver.pagelets import *
|
||||
from matemat.webserver import MatematWebserver
|
||||
|
||||
# Read HTTP port from command line
|
||||
port: int = 8080
|
||||
if len(sys.argv) > 1:
|
||||
port = int(sys.argv[1])
|
||||
|
||||
# Start the web server
|
||||
MatematWebserver(port).start()
|
|
@ -140,8 +140,10 @@ class MatematDatabase(object):
|
|||
user_id, username, email, pwhash, tkhash, admin, member = row
|
||||
if password is not None and not bcrypt.checkpw(password.encode('utf-8'), pwhash):
|
||||
raise AuthenticationError('Password mismatch')
|
||||
elif touchkey is not None and not bcrypt.checkpw(touchkey.encode('utf-8'), tkhash):
|
||||
elif touchkey is not None and tkhash is not None and not bcrypt.checkpw(touchkey.encode('utf-8'), tkhash):
|
||||
raise AuthenticationError('Touchkey mismatch')
|
||||
elif touchkey is not None and tkhash is None:
|
||||
raise AuthenticationError('Touchkey not set')
|
||||
return User(user_id, username, email, admin, member)
|
||||
|
||||
def change_password(self, user: User, oldpass: str, newpass: str, verify_password: bool = True) -> None:
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
|
||||
from .httpd import MatematWebserver, HttpHandler, pagelet
|
|
@ -1,6 +1,13 @@
|
|||
|
||||
from typing import Tuple, Dict
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
|
||||
|
||||
import traceback
|
||||
|
||||
import os
|
||||
import socket
|
||||
import mimetypes
|
||||
import urllib.parse
|
||||
from socketserver import TCPServer
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from http.cookies import SimpleCookie
|
||||
from uuid import uuid4
|
||||
|
@ -9,10 +16,52 @@ from datetime import datetime, timedelta
|
|||
from matemat import __version__ as matemat_version
|
||||
|
||||
|
||||
# Enable IPv6 support (with implicit DualStack).
|
||||
TCPServer.address_family = socket.AF_INET6
|
||||
|
||||
|
||||
# Dictionary to hold registered pagelet paths and their handler functions.
|
||||
_PAGELET_PATHS: Dict[str, Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes],
|
||||
Tuple[int, Union[bytes, str]]]] = dict()
|
||||
|
||||
|
||||
def pagelet(path: str):
|
||||
"""
|
||||
Annotate a function to act as a pagelet (part of a website). The function will be called if a request is made to
|
||||
the path specified as argument to the annotation.
|
||||
|
||||
The function must have the following signature:
|
||||
|
||||
(method: str, path: str, args: Dict[str, Union[str, List[str]], session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str]) -> (int, Optional[Union[str, bytes]])
|
||||
|
||||
method: The HTTP method (GET, POST) that was used.
|
||||
path: The path that was requested.
|
||||
args: The arguments that were passed with the request (as GET or POST arguments).
|
||||
session_vars: The session storage. May be read from and written to.
|
||||
headers: The dictionary of HTTP response headers. Add headers you wish to send with the response.
|
||||
returns: A tuple consisting of the HTTP status code (as an int) and the response body (as str or bytes,
|
||||
may be None)
|
||||
|
||||
:param path: The path to register the function for.
|
||||
"""
|
||||
def http_handler(fun: Callable[[str, str, Dict[str, str], Dict[str, Any], Dict[str, str], bytes],
|
||||
Tuple[int, Union[bytes, str]]]):
|
||||
# Add the function to the dict of pagelets.
|
||||
_PAGELET_PATHS[path] = fun
|
||||
# Don't change the function itself at all.
|
||||
return fun
|
||||
# Return the inner function (Python requires a "real" function annotation to not have any arguments except
|
||||
# the function itself).
|
||||
return http_handler
|
||||
|
||||
|
||||
class MatematWebserver(object):
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._httpd = HTTPServer(('', 8080), HttpHandler)
|
||||
def __init__(self, port: int = 80, webroot: str = './webroot') -> None:
|
||||
self._httpd = HTTPServer(('::', port), HttpHandler)
|
||||
self._httpd.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict()
|
||||
self._httpd.webroot = os.path.abspath(webroot)
|
||||
|
||||
def start(self) -> None:
|
||||
self._httpd.serve_forever()
|
||||
|
@ -22,8 +71,6 @@ class HttpHandler(BaseHTTPRequestHandler):
|
|||
|
||||
def __init__(self, request: bytes, client_address: Tuple[str, int], server: HTTPServer) -> None:
|
||||
super().__init__(request, client_address, server)
|
||||
self._session_vars: Dict[str, Tuple[datetime, Dict[str, object]]] = dict()
|
||||
print(self._session_vars)
|
||||
|
||||
@property
|
||||
def server_version(self) -> str:
|
||||
|
@ -34,28 +81,146 @@ class HttpHandler(BaseHTTPRequestHandler):
|
|||
cookiestring = '\n'.join(self.headers.get_all('Cookie', failobj=[]))
|
||||
cookie = SimpleCookie()
|
||||
cookie.load(cookiestring)
|
||||
session_id = cookie['matemat_session_id'] if 'matemat_session_id' in cookie else str(uuid4())
|
||||
session_id = str(cookie['matemat_session_id'].value) if 'matemat_session_id' in cookie else None
|
||||
if session_id is None or session_id not in self.server.session_vars:
|
||||
session_id = str(uuid4())
|
||||
|
||||
if session_id in self._session_vars and self._session_vars[session_id][0] < now:
|
||||
if session_id in self.server.session_vars and self.server.session_vars[session_id][0] < now:
|
||||
self.end_session(session_id)
|
||||
raise TimeoutError('Session timed out')
|
||||
elif session_id not in self._session_vars:
|
||||
self._session_vars[session_id] = (now + timedelta(hours=1)), dict()
|
||||
return session_id, now
|
||||
raise TimeoutError('Session timed out.')
|
||||
elif session_id not in self.server.session_vars:
|
||||
self.server.session_vars[session_id] = (now + timedelta(seconds=10)), dict()
|
||||
return session_id, self.server.session_vars[session_id][0]
|
||||
|
||||
def end_session(self, session_id: str) -> None:
|
||||
if session_id in self._session_vars:
|
||||
del self._session_vars[session_id]
|
||||
if session_id in self.server.session_vars:
|
||||
del self.server.session_vars[session_id]
|
||||
|
||||
def do_GET(self) -> None:
|
||||
def _handle(self, method: str, path: str, args: Dict[str, str]) -> None:
|
||||
try:
|
||||
session_id, timeout = self.start_session()
|
||||
except TimeoutError:
|
||||
self.send_error(599, 'Session Timed Out', 'Session Timed Out.')
|
||||
self.send_header('Set-Cookie', 'matemat_session_id=; expires=Thu, 01 Jan 1970 00:00:00 GMT')
|
||||
self.send_error(599, 'Session Timed Out.', 'Please log in again.')
|
||||
self.end_headers()
|
||||
return
|
||||
self.send_response(200, 'Welcome!')
|
||||
self.session_id: str = session_id
|
||||
|
||||
if path in _PAGELET_PATHS:
|
||||
headers: Dict[str, str] = {
|
||||
'Content-Type': 'text/html',
|
||||
'Cache-Control': 'no-cache'
|
||||
}
|
||||
hsc, data = _PAGELET_PATHS[path](method, path, args, self.session_vars, headers)
|
||||
if data is None:
|
||||
data = bytes()
|
||||
if isinstance(data, str):
|
||||
data = data.encode('utf-8')
|
||||
self.send_response(hsc)
|
||||
expires = timeout.strftime("%a, %d %b %Y %H:%M:%S GMT")
|
||||
self.send_header('Set-Cookie',
|
||||
f'matemat_session_id={session_id}; expires={expires}')
|
||||
headers['Content-Length'] = str(len(data))
|
||||
for name, value in headers.items():
|
||||
self.send_header(name, value)
|
||||
self.end_headers()
|
||||
self.wfile.write(data)
|
||||
else:
|
||||
if method != 'GET':
|
||||
self.send_error(405)
|
||||
self.end_headers()
|
||||
return
|
||||
filepath: str = os.path.abspath(os.path.join(self.server.webroot, path[1:]))
|
||||
if os.path.commonpath([filepath, self.server.webroot]) == self.server.webroot and os.path.exists(filepath):
|
||||
with open(filepath, 'rb') as f:
|
||||
data = f.read()
|
||||
self.send_response(200)
|
||||
mimetype, _ = mimetypes.guess_type(filepath)
|
||||
if mimetype is not None:
|
||||
self.send_header('Content-Type', mimetype)
|
||||
self.end_headers()
|
||||
if method == 'GET':
|
||||
self.wfile.write(data)
|
||||
else:
|
||||
self.send_response(404)
|
||||
self.end_headers()
|
||||
|
||||
if __name__ == '__main__':
|
||||
MatematWebserver().start()
|
||||
@staticmethod
|
||||
def _parse_args(request: str, postbody: Optional[str] = None) -> Tuple[str, Dict[str, Union[str, List[str]]]]:
|
||||
"""
|
||||
Given a HTTP request path, and optionally a HTTP POST body in application/x-www-form-urlencoded form, parse the
|
||||
arguments and return them as a dictionary.
|
||||
If a key is used both in GET and in POST, the POST value takes precedence, and the GET value is discarded.
|
||||
:param request: The request string to parse.
|
||||
:param postbody: The POST body to parse, defaults to None.
|
||||
:return: A tuple consisting of the base path and a dictionary with the parsed key/value pairs.
|
||||
"""
|
||||
# Parse the request "URL" (i.e. only the path).
|
||||
tokens = urllib.parse.urlparse(request)
|
||||
# Parse the GET arguments.
|
||||
args = urllib.parse.parse_qs(tokens.query)
|
||||
|
||||
if postbody is not None:
|
||||
# Parse the POST body.
|
||||
postargs = urllib.parse.parse_qs(postbody)
|
||||
# Write all POST values into the dict, overriding potential duplicates from GET.
|
||||
for k, v in postargs.items():
|
||||
args[k] = v
|
||||
# urllib.parse.parse_qs turns ALL arguments into arrays. This turns arrays of length 1 into scalar values.
|
||||
for k, v in args.items():
|
||||
if len(v) == 1:
|
||||
args[k] = v[0]
|
||||
# Return the path and the parsed arguments.
|
||||
return tokens.path, args
|
||||
|
||||
# noinspection PyPep8Naming
|
||||
def do_GET(self) -> None:
|
||||
try:
|
||||
path, args = self._parse_args(self.path)
|
||||
self._handle('GET', path, args)
|
||||
except PermissionError as e:
|
||||
self.send_error(403, 'Forbidden')
|
||||
self.end_headers()
|
||||
print(type(e))
|
||||
traceback.print_tb(e.__traceback__)
|
||||
except ValueError as e:
|
||||
self.send_header(400, 'Bad Request')
|
||||
self.end_headers()
|
||||
print(type(e))
|
||||
traceback.print_tb(e.__traceback__)
|
||||
except BaseException as e:
|
||||
self.send_error(500, 'Internal Server Error')
|
||||
self.end_headers()
|
||||
print(type(e))
|
||||
traceback.print_tb(e.__traceback__)
|
||||
|
||||
# noinspection PyPep8Naming
|
||||
def do_POST(self) -> None:
|
||||
try:
|
||||
clen: str = self.headers.get('Content-Length', failobj='0')
|
||||
ctype: str = self.headers.get('Content-Type', failobj='application/octet-stream')
|
||||
post = ''
|
||||
if ctype == 'application/x-www-form-urlencoded':
|
||||
post: str = self.rfile.read(int(clen)).decode('utf-8')
|
||||
|
||||
path, args = self._parse_args(self.path, postbody=post)
|
||||
self._handle('POST', path, args)
|
||||
except PermissionError as e:
|
||||
self.send_error(403, 'Forbidden')
|
||||
self.end_headers()
|
||||
print(type(e))
|
||||
traceback.print_tb(e.__traceback__)
|
||||
except ValueError as e:
|
||||
self.send_header(400, 'Bad Request')
|
||||
self.end_headers()
|
||||
print(type(e))
|
||||
traceback.print_tb(e.__traceback__)
|
||||
except BaseException as e:
|
||||
self.send_error(500, 'Internal Server Error')
|
||||
self.end_headers()
|
||||
print(type(e))
|
||||
traceback.print_tb(e.__traceback__)
|
||||
|
||||
@property
|
||||
def session_vars(self) -> Dict[str, Any]:
|
||||
return self.server.session_vars[self.session_id][1]
|
||||
|
|
5
matemat/webserver/pagelets/__init__.py
Normal file
5
matemat/webserver/pagelets/__init__.py
Normal file
|
@ -0,0 +1,5 @@
|
|||
|
||||
from .main import main_page
|
||||
from .login import login_page
|
||||
from .logout import logout
|
||||
from .touchkey import touchkey_page
|
50
matemat/webserver/pagelets/login.py
Normal file
50
matemat/webserver/pagelets/login.py
Normal file
|
@ -0,0 +1,50 @@
|
|||
|
||||
from typing import Any, Dict
|
||||
|
||||
from matemat.exceptions import AuthenticationError
|
||||
from matemat.webserver import pagelet
|
||||
from matemat.primitives import User
|
||||
from matemat.db import MatematDatabase
|
||||
|
||||
|
||||
@pagelet('/login')
|
||||
def login_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]):
|
||||
if 'user' in session_vars:
|
||||
headers['Location'] = '/'
|
||||
return 301, None
|
||||
if method == 'GET':
|
||||
data = '''
|
||||
<DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Matemat</title>
|
||||
<style>
|
||||
body {{
|
||||
color: #f0f0f0;
|
||||
background: #000000;
|
||||
}};
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Matemat</h1>
|
||||
{msg}
|
||||
<form action="/login" method="post">
|
||||
Username: <input type="text" name="username"/><br/>
|
||||
Password: <input type="password" name="password" /><br/>
|
||||
<input type="submit" value="Login"/>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
||||
'''
|
||||
return 200, data.format(msg=args['msg'] if 'msg' in args else '')
|
||||
elif method == 'POST':
|
||||
print(args)
|
||||
with MatematDatabase('test.db') as db:
|
||||
try:
|
||||
user: User = db.login(args['username'], args['password'])
|
||||
except AuthenticationError:
|
||||
headers['Location'] = '/login?msg=Username%20or%20password%20wrong.%20Please%20try%20again.'
|
||||
return 301, bytes()
|
||||
session_vars['user'] = user
|
||||
headers['Location'] = '/'
|
||||
return 301, bytes()
|
12
matemat/webserver/pagelets/logout.py
Normal file
12
matemat/webserver/pagelets/logout.py
Normal file
|
@ -0,0 +1,12 @@
|
|||
|
||||
from typing import Any, Dict
|
||||
|
||||
from matemat.webserver import pagelet
|
||||
|
||||
|
||||
@pagelet('/logout')
|
||||
def logout(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]):
|
||||
if 'user' in session_vars:
|
||||
del session_vars['user']
|
||||
headers['Location'] = '/'
|
||||
return 301, None
|
53
matemat/webserver/pagelets/main.py
Normal file
53
matemat/webserver/pagelets/main.py
Normal file
|
@ -0,0 +1,53 @@
|
|||
|
||||
from typing import Any, Dict, Optional, Tuple, Union
|
||||
|
||||
from matemat.webserver import MatematWebserver, pagelet
|
||||
from matemat.primitives import User
|
||||
from matemat.db import MatematDatabase
|
||||
|
||||
|
||||
@pagelet('/')
|
||||
def main_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str])\
|
||||
-> Tuple[int, Optional[Union[str, bytes]]]:
|
||||
data = '''
|
||||
<DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Matemat</title>
|
||||
<style>
|
||||
body {{
|
||||
color: #f0f0f0;
|
||||
background: #000000;
|
||||
}};
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Matemat</h1>
|
||||
{user}
|
||||
<ul>
|
||||
{list}
|
||||
</ul>
|
||||
</body>
|
||||
</html>
|
||||
'''
|
||||
try:
|
||||
with MatematDatabase('test.db') as db:
|
||||
if 'user' in session_vars:
|
||||
user: User = session_vars['user']
|
||||
products = db.list_products()
|
||||
plist = '\n'.join([f'<li/> <b>{p.name}</b> ' +
|
||||
f'{p.price_member//100 if user.is_member else p.price_non_member//100}' +
|
||||
f'.{p.price_member%100 if user.is_member else p.price_non_member%100}'
|
||||
for p in products])
|
||||
uname = f'<b>{user.name}</b> <a href="/logout">(Logout)</a>'
|
||||
data = data.format(user=uname, list=plist)
|
||||
else:
|
||||
users = db.list_users()
|
||||
ulist = '\n'.join([f'<li/> <b><a href=/touchkey?username={u.name}>{u.name}</a></b>' for u in users])
|
||||
ulist = ulist + '<li/> <a href=/login>Password login</a>'
|
||||
data = data.format(user='', list=ulist)
|
||||
return 200, data
|
||||
except BaseException as e:
|
||||
import traceback
|
||||
traceback.print_tb(e.__traceback__)
|
||||
return 500, None
|
48
matemat/webserver/pagelets/touchkey.py
Normal file
48
matemat/webserver/pagelets/touchkey.py
Normal file
|
@ -0,0 +1,48 @@
|
|||
|
||||
from typing import Any, Dict
|
||||
|
||||
from matemat.exceptions import AuthenticationError
|
||||
from matemat.webserver import pagelet
|
||||
from matemat.primitives import User
|
||||
from matemat.db import MatematDatabase
|
||||
|
||||
|
||||
@pagelet('/touchkey')
|
||||
def touchkey_page(method: str, path: str, args: Dict[str, str], session_vars: Dict[str, Any], headers: Dict[str, str]):
|
||||
if 'user' in session_vars:
|
||||
headers['Location'] = '/'
|
||||
return 301, bytes()
|
||||
if method == 'GET':
|
||||
data = '''
|
||||
<DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title>Matemat</title>
|
||||
<style>
|
||||
body {{
|
||||
color: #f0f0f0;
|
||||
background: #000000;
|
||||
}};
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Matemat</h1>
|
||||
<form action="/touchkey" method="post">
|
||||
<input type="hidden" name="username" value="{username}"/><br/>
|
||||
Touchkey: <input type="password" name="touchkey" /><br/>
|
||||
<input type="submit" value="Login"/>
|
||||
</form>
|
||||
</body>
|
||||
</html>
|
||||
'''
|
||||
return 200, data.format(username=args['username'] if 'username' in args else '')
|
||||
elif method == 'POST':
|
||||
with MatematDatabase('test.db') as db:
|
||||
try:
|
||||
user: User = db.login(args['username'], touchkey=args['touchkey'])
|
||||
except AuthenticationError:
|
||||
headers['Location'] = f'/touchkey?username={args["username"]}&msg=Please%20try%20again.'
|
||||
return 301, bytes()
|
||||
session_vars['user'] = user
|
||||
headers['Location'] = '/'
|
||||
return 301, None
|
Loading…
Reference in a new issue