matemat/matemat/webserver/httpd.py

327 lines
14 KiB
Python

from typing import Any, Callable, Dict, Optional, Tuple, Union
import traceback
import os
import socket
import mimetypes
from socketserver import TCPServer
from http.server import HTTPServer, BaseHTTPRequestHandler
from http.cookies import SimpleCookie
from uuid import uuid4
from datetime import datetime, timedelta
from matemat import __version__ as matemat_version
from matemat.webserver import RequestArguments
from matemat.webserver.util import parse_args
#
# Python internal class hacks
#
# Enable IPv6 support (IPv6/IPv4 dual-stack support should be implicitly enabled)
TCPServer.address_family = socket.AF_INET6
# Redirect internal logging to somewhere else, or, for now, silently discard (TODO: logger will come later)
BaseHTTPRequestHandler.log_request = lambda self, code='-', size='-': None
BaseHTTPRequestHandler.log_error = lambda self, fstring='', *args: None
# Dictionary to hold registered pagelet paths and their handler functions
_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
str, # Request path
RequestArguments, # HTTP Request arguments
Dict[str, Any], # Session vars
Dict[str, str]], # Response headers
Tuple[int, Union[bytes, str]]]] = dict() # Returns: (status code, response body)
# Inactivity timeout for client sessions
_SESSION_TIMEOUT: int = 3600
_MAX_POST: int = 1_000_000
def pagelet(path: str):
"""
Annotate a function to act as a pagelet (part of a website). The function will be called if a request is made to
the path specified as argument to the annotation.
The function must have the following signature:
(method: str,
path: str,
args: RequestArguments,
session_vars: Dict[str, Any],
headers: Dict[str, str])
-> (int, Optional[Union[str, bytes]])
method: The HTTP method (GET, POST) that was used.
path: The path that was requested.
args: The arguments that were passed with the request (as GET or POST arguments).
session_vars: The session storage. May be read from and written to.
headers: The dictionary of HTTP response headers. Add headers you wish to send with the response.
returns: A tuple consisting of the HTTP status code (as an int) and the response body (as str or bytes,
may be None)
:param path: The path to register the function for.
"""
def http_handler(fun: Callable[[str,
str,
RequestArguments,
Dict[str, Any],
Dict[str, str]],
Tuple[int, Optional[Union[bytes, str]]]]):
# Add the function to the dict of pagelets
_PAGELET_PATHS[path] = fun
# Don't change the function itself at all
return fun
# Return the inner function (Python requires a "real" function annotation to not have any arguments except
# the function itself)
return http_handler
class MatematWebserver(object):
"""
Then main webserver class, internally uses Python's http.server.
The server will serve a pagelet, if one is defined for a request path, else it will attempt to serve a static
resource from the webroot.
Usage:
# Listen on all interfaces on port 80 (dual-stack IPv6/IPv4)
server = MatematWebserver('::', 80, webroot='/var/www/matemat')
# Start the server. This call blocks while the server is running.
server.start()
"""
def __init__(self, listen: str = '::', port: int = 80, webroot: str = './webroot') -> None:
"""
Instantiate a MatematWebserver.
:param listen: The IPv4 or IPv6 address to listen on
:param port: The TCP port to listen on
:param webroot: Path to the webroot directory
"""
if len(listen) == 0:
# Empty string should be interpreted as all addresses
listen = '::'
# IPv4 address detection heuristic
if ':' not in listen and '.' in listen:
# Rewrite IPv4 address to IPv6-mapped form
listen = f'::ffff:{listen}'
# Create the http server
self._httpd = HTTPServer((listen, port), HttpHandler)
# Set up session vars dict
self._httpd.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict()
# Resolve webroot directory
self._httpd.webroot = os.path.abspath(webroot)
def start(self) -> None:
"""
Start the web server. This call blocks while the server is running.
"""
self._httpd.serve_forever()
class HttpHandler(BaseHTTPRequestHandler):
"""
HTTP Request handler.
This class parses HTTP requests, and calls the appropriate pagelets, or fetches a static resource from the webroot
directory.
"""
def __init__(self, request: bytes, client_address: Tuple[str, int], server: HTTPServer) -> None:
super().__init__(request, client_address, server)
@property
def server_version(self) -> str:
return f'matemat/{matemat_version}'
def _start_session(self) -> Tuple[str, datetime]:
"""
Start a new session, or resume the session identified by the session cookie sent in the HTTP request.
:return: A tuple consisting of the session ID (a UUID string), and the session timeout date.
"""
# Reference date for session timeout
now = datetime.utcnow()
# Parse cookies sent by the client
cookiestring = '\n'.join(self.headers.get_all('Cookie', failobj=[]))
cookie = SimpleCookie()
cookie.load(cookiestring)
# Read the client's session ID, if any
session_id = str(cookie['matemat_session_id'].value) if 'matemat_session_id' in cookie else None
# If there is no active session, create a new session ID
if session_id is None or session_id not in self.server.session_vars:
session_id = str(uuid4())
# Check for session timeout
if session_id in self.server.session_vars and self.server.session_vars[session_id][0] < now:
self._end_session(session_id)
raise TimeoutError('Session timed out.')
# Update or initialize the session timeout
if session_id not in self.server.session_vars:
self.server.session_vars[session_id] = (now + timedelta(seconds=_SESSION_TIMEOUT)), dict()
else:
self.server.session_vars[session_id] =\
(now + timedelta(seconds=_SESSION_TIMEOUT), self.server.session_vars[session_id][1])
# Return the session ID and timeout
return session_id, self.server.session_vars[session_id][0]
def _end_session(self, session_id: str) -> None:
"""
Destroy a session identified by the session ID.
:param session_id: ID of the session to destroy.
"""
if session_id in self.server.session_vars:
del self.server.session_vars[session_id]
def _handle(self, method: str, path: str, args: RequestArguments) -> None:
"""
Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource.
:param method: The HTTP request method (GET, POST).
:param path: The request path without GET arguments.
:param args: Arguments sent with the request. This includes GET and POST arguments, where the POST arguments
take precedence.
"""
# Start or resume a session; redirect to / on session timeout
try:
session_id, timeout = self._start_session()
except TimeoutError:
self.send_response(302)
self.send_header('Set-Cookie', 'matemat_session_id=; expires=Thu, 01 Jan 1970 00:00:00 GMT')
self.send_header('Location', '/')
self.end_headers()
return
self.session_id: str = session_id
# Call a pagelet function, if one is registered for the requested path
if path in _PAGELET_PATHS:
# Prepare some headers. Those can still be overwritten by the pagelet
headers: Dict[str, str] = {
'Content-Type': 'text/html',
'Cache-Control': 'no-cache'
}
# Call the pagelet function
hsc, data = _PAGELET_PATHS[path](method, path, args, self.session_vars, headers)
# The pagelet may return None as data as a shorthand for an empty response
if data is None:
data = bytes()
# If the pagelet returns a Python str, convert it to an UTF-8 encoded bytes object
if isinstance(data, str):
data = data.encode('utf-8')
# Send the HTTP status code
self.send_response(hsc)
# Format the session cookie timeout string and send the session cookie header
expires = timeout.strftime("%a, %d %b %Y %H:%M:%S GMT")
self.send_header('Set-Cookie',
f'matemat_session_id={session_id}; expires={expires}')
# Compute the body length and add the appropriate header
headers['Content-Length'] = str(len(data))
# Send all headers set by the pagelet
for name, value in headers.items():
self.send_header(name, value)
# End the header section and write the body
self.end_headers()
self.wfile.write(data)
else:
# No pagelet function for this path, try a static serve instead
# Only HTTP GET is allowed, else reply with a 'Method Not Allowed' header
if method != 'GET':
self.send_error(405)
self.end_headers()
return
# Create the absolute resource path, resolving '..'
filepath: str = os.path.abspath(os.path.join(self.server.webroot, path[1:]))
# Make sure the file is actually inside the webroot directory and that it exists
if os.path.commonpath([filepath, self.server.webroot]) == self.server.webroot and os.path.exists(filepath):
# Open and read the file
with open(filepath, 'rb') as f:
data = f.read()
# File read successfully, send 'OK' header
self.send_response(200)
# TODO: Guess the MIME type. Unfortunately this call solely relies on the file extension, not ideal?
mimetype, _ = mimetypes.guess_type(filepath)
# Fall back to octet-stream type, if unknown
if mimetype is None:
mimetype = 'application/octet-stream'
# Send content type and length header
self.send_header('Content-Type', mimetype)
self.send_header('Content-Length', str(len(data)))
self.end_headers()
# Send the requested resource as response body
self.wfile.write(data)
else:
# File does not exist or path points outside the webroot directory
self.send_response(404)
self.end_headers()
# noinspection PyPep8Naming
def do_GET(self) -> None:
"""
Called by BasicHTTPRequestHandler for GET requests.
"""
try:
# Parse the request and hand it to the handle function
path, args = parse_args(self.path)
self._handle('GET', path, args)
# Special handling for some errors
except PermissionError:
self.send_response(403, 'Forbidden')
self.end_headers()
except ValueError:
self.send_response(400, 'Bad Request')
self.end_headers()
except BaseException as e:
print(e)
traceback.print_tb(e.__traceback__)
# Generic error handling
self.send_response(500, 'Internal Server Error')
self.end_headers()
# noinspection PyPep8Naming
def do_POST(self) -> None:
"""
Called by BasicHTTPRequestHandler for POST requests.
"""
try:
# Read the POST body, if it exists, and its MIME type is application/x-www-form-urlencoded
clen: int = int(str(self.headers.get('Content-Length', failobj='0')))
if clen > _MAX_POST:
raise ValueError('Request too big')
ctype: str = self.headers.get('Content-Type', failobj='application/octet-stream')
post: bytes = self.rfile.read(clen)
path, args = parse_args(self.path, postbody=post, enctype=ctype)
# Parse the request and hand it to the handle function
self._handle('POST', path, args)
# Special handling for some errors
except PermissionError:
self.send_response(403, 'Forbidden')
self.end_headers()
except ValueError:
self.send_response(400, 'Bad Request')
self.end_headers()
except TypeError:
self.send_response(400, 'Bad Request')
self.end_headers()
except BaseException as e:
# Generic error handling
self.send_response(500, 'Internal Server Error')
self.end_headers()
print(e)
traceback.print_tb(e.__traceback__)
@property
def session_vars(self) -> Dict[str, Any]:
"""
Get the session variables for the current session.
:return: Dictionary of named session variables.
"""
return self.server.session_vars[self.session_id][1]