forked from s3lph/matemat
636 lines
28 KiB
Python
636 lines
28 KiB
Python
|
|
from typing import Any, Callable, Dict, Set, Tuple, Type, Union
|
|
|
|
import logging
|
|
import os
|
|
import socket
|
|
import mimetypes
|
|
import magic
|
|
from socketserver import TCPServer
|
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
|
from http.cookies import SimpleCookie
|
|
from uuid import uuid4
|
|
from datetime import datetime, timedelta
|
|
from threading import Event, Timer, Thread
|
|
|
|
import jinja2
|
|
|
|
from matemat import __version__ as matemat_version
|
|
from matemat.exceptions import HttpException
|
|
from matemat.webserver import RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
|
|
from matemat.webserver.util import parse_args
|
|
from matemat.util.currency_format import format_chf
|
|
|
|
#
|
|
# Python internal class hacks
|
|
#
|
|
|
|
# Enable IPv6 support (IPv6/IPv4 dual-stack support should be implicitly enabled)
|
|
TCPServer.address_family = socket.AF_INET6
|
|
|
|
# Dictionary to hold registered pagelet paths and their handler functions
|
|
_PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
|
|
str, # Request path
|
|
RequestArguments, # HTTP Request arguments
|
|
Dict[str, Any], # Session vars
|
|
Dict[str, str], # Response headers
|
|
Dict[str, str]], # Items from the [Pagelets] section in the config file
|
|
Union[ # Return type: either a response body, or a redirect
|
|
bytes, str, # Response body: will assign HTTP/1.0 200 OK
|
|
PageletResponse, # A generic response
|
|
]]] = dict()
|
|
# The pagelet initialization functions, to be executed upon startup
|
|
_PAGELET_INIT_FUNCTIONS: Set[Callable[[Dict[str, str], logging.Logger], None]] = set()
|
|
|
|
_PAGELET_CRON_STATIC_EVENT: Event = Event()
|
|
_PAGELET_CRON_RUNNER: Callable[[Callable[[Dict[str, str], jinja2.Environment, logging.Logger], None]], None] = None
|
|
|
|
# Inactivity timeout for client sessions
|
|
_SESSION_TIMEOUT: int = 3600
|
|
_MAX_POST: int = 1_000_000
|
|
|
|
|
|
def pagelet(path: str):
|
|
"""
|
|
Annotate a function to act as a pagelet (part of a website). The function will be called if a request is made to
|
|
the path specified as argument to the annotation.
|
|
|
|
The function must have the following signature:
|
|
|
|
(method: str,
|
|
path: str,
|
|
args: RequestArguments,
|
|
session_vars: Dict[str, Any],
|
|
headers: Dict[str, str],
|
|
config: Dict[str, str])
|
|
-> Union[bytes, str, Tuple[int, str]]
|
|
|
|
method: The HTTP method (GET, POST) that was used.
|
|
path: The path that was requested.
|
|
args: The arguments that were passed with the request (as GET or POST arguments).
|
|
session_vars: The session storage. May be read from and written to.
|
|
headers: The dictionary of HTTP response headers. Add headers you wish to send with the response.
|
|
config: The dictionary of variables read from the [Pagelets] section of the configuration file.
|
|
returns: One of the following:
|
|
- A HTTP Response body as str or bytes
|
|
- A PageletResponse class instance: An instance of (a subclass of)
|
|
matemat.webserver.PageletResponse, e.g. encapsulating a redirect or a Jinja2 template.
|
|
raises: HttpException: If a non-200 HTTP status code should be returned
|
|
|
|
:param path: The path to register the function for.
|
|
"""
|
|
|
|
def http_handler(fun: Callable[[str,
|
|
str,
|
|
RequestArguments,
|
|
Dict[str, Any],
|
|
Dict[str, str],
|
|
Dict[str, str]],
|
|
Union[
|
|
bytes, str,
|
|
PageletResponse
|
|
]]):
|
|
# Add the function to the dict of pagelets
|
|
_PAGELET_PATHS[path] = fun
|
|
# Don't change the function itself at all
|
|
return fun
|
|
|
|
# Return the inner function (Python requires a "real" function annotation to not have any arguments except
|
|
# the function itself)
|
|
return http_handler
|
|
|
|
|
|
def pagelet_init(fun: Callable[[Dict[str, str], logging.Logger], None]):
|
|
"""
|
|
Annotate a function to act as a pagelet initialization function. The function will be called when the webserver is
|
|
started. The function should set up everything required for pagelets to operate correctly, e.g. providing default
|
|
values for configuration items, or performing database schema migrations.
|
|
|
|
Any exception thrown by an initialization function will cause the web server to seize operation. Multiple
|
|
initialization functions can be used.
|
|
|
|
The function must have the following signature:
|
|
|
|
(config: Dict[str, str], logger: logging.Logger) -> None
|
|
|
|
config: The mutable dictionary of variables read from the [Pagelets] section of the configuration file.
|
|
logger: The server's logger instance.
|
|
returns: Nothing.
|
|
|
|
:param fun: The function to annotate
|
|
"""
|
|
_PAGELET_INIT_FUNCTIONS.add(fun)
|
|
|
|
|
|
class _GlobalEventTimer(Thread):
|
|
"""
|
|
A timer similar to threading.Timer, except that waits on an externally supplied threading.Event instance,
|
|
therefore allowing all timers waiting on the same event to be cancelled at once.
|
|
"""
|
|
|
|
def __init__(self, interval: float, event: Event, fun, *args, **kwargs):
|
|
"""
|
|
Create a new _GlobalEventTimer.
|
|
:param interval: The delay after which to run the function.
|
|
:param event: The external threading.Event to wait on.
|
|
:param fun: The function to call.
|
|
:param args: The positional arguments to pass to the function.
|
|
:param kwargs: The keyword arguments to pass to the function.
|
|
"""
|
|
Thread.__init__(self)
|
|
self.interval = interval
|
|
self.fun = fun
|
|
self.args = args if args is not None else []
|
|
self.kwargs = kwargs if kwargs is not None else {}
|
|
self.event = event
|
|
|
|
def run(self):
|
|
self.event.wait(self.interval)
|
|
if not self.event.is_set():
|
|
self.fun(*self.args, **self.kwargs)
|
|
# Do NOT call event.set(), as done in threading.Timer, as that would cancel all other timers
|
|
|
|
|
|
def pagelet_cron(weeks: int = 0,
|
|
days: int = 0,
|
|
hours: int = 0,
|
|
seconds: int = 0,
|
|
minutes: int = 0,
|
|
milliseconds: int = 0,
|
|
microseconds: int = 0):
|
|
"""
|
|
Annotate a function to act as a pagelet cron function. The function will be called in a regular interval, defined
|
|
by the arguments passed to the decorator, which are passed to a timedelta object.
|
|
|
|
The function must have the following signature:
|
|
|
|
(config: Dict[str, str], jinja_env: jinja2.Environment, logger: logging.Logger) -> None
|
|
|
|
config: The mutable dictionary of variables read from the [Pagelets] section of the configuration file.
|
|
jinja_env: The Jinja2 environment used by the web server.
|
|
logger: The server's logger instance.
|
|
returns: Nothing.
|
|
|
|
:param weeks: Number of weeks in the interval.
|
|
:param days: Number of days in the interval.
|
|
:param hours: Number of hours in the interval.
|
|
:param seconds: Number of seconds in the interval.
|
|
:param minutes: Number of minutes in the interval.
|
|
:param milliseconds: Number of milliseconds in the interval.
|
|
:param microseconds: Number of microseconds in the interval.
|
|
"""
|
|
|
|
def cron_wrapper(fun: Callable[[Dict[str, str], jinja2.Environment, logging.Logger], None]):
|
|
# Create the timedelta object
|
|
delta: timedelta = timedelta(weeks=weeks,
|
|
days=days,
|
|
hours=hours,
|
|
seconds=seconds,
|
|
minutes=minutes,
|
|
milliseconds=milliseconds,
|
|
microseconds=microseconds)
|
|
|
|
# This function is called once in the specified interval
|
|
def cron():
|
|
# Set a new timer
|
|
t: Timer = _GlobalEventTimer(delta.total_seconds(), _PAGELET_CRON_STATIC_EVENT, cron)
|
|
t.start()
|
|
# Have the cron job be picked up by the cron runner provided by the web server
|
|
if _PAGELET_CRON_RUNNER is not None:
|
|
_PAGELET_CRON_RUNNER(fun)
|
|
|
|
# Set a timer to run the cron job after the specified interval
|
|
timer: Timer = _GlobalEventTimer(delta.total_seconds(), _PAGELET_CRON_STATIC_EVENT, cron)
|
|
timer.start()
|
|
|
|
return cron_wrapper
|
|
|
|
|
|
class MatematHTTPServer(HTTPServer):
|
|
"""
|
|
A http.server.HTTPServer subclass that acts as a container for data that must be persistent between requests.
|
|
"""
|
|
|
|
def __init__(self,
|
|
server_address: Any,
|
|
handler: Type[BaseHTTPRequestHandler],
|
|
staticroot: str,
|
|
templateroot: str,
|
|
pagelet_variables: Dict[str, str],
|
|
static_headers: Dict[str, str],
|
|
log_level: int,
|
|
log_handler: logging.Handler,
|
|
bind_and_activate: bool = True) -> None:
|
|
super().__init__(server_address, handler, bind_and_activate)
|
|
# Resolve webroot directory
|
|
self.webroot = os.path.abspath(staticroot)
|
|
# Set up session vars dict
|
|
self.session_vars: Dict[str, Tuple[datetime, Dict[str, Any]]] = dict()
|
|
# Set up pagelet arguments dict
|
|
self.pagelet_variables = pagelet_variables
|
|
# Set up static HTTP Response headers
|
|
self.static_headers = static_headers
|
|
# Set up the Jinja2 environment
|
|
self.jinja_env: jinja2.Environment = jinja2.Environment(
|
|
loader=jinja2.FileSystemLoader(os.path.abspath(templateroot)),
|
|
autoescape=jinja2.select_autoescape(default=True),
|
|
)
|
|
self.jinja_env.filters['chf'] = format_chf
|
|
# Set up logger
|
|
self.logger: logging.Logger = logging.getLogger('matemat.webserver')
|
|
self.logger.setLevel(log_level)
|
|
# Set up the log handler's (obtained from config parsing) format string
|
|
log_handler.setFormatter(logging.Formatter('%(asctime)s %(name)s [%(levelname)s]: %(message)s'))
|
|
self.logger.addHandler(log_handler)
|
|
|
|
|
|
class MatematWebserver(object):
|
|
"""
|
|
Then main webserver class, internally uses Python's http.server.
|
|
|
|
The server will serve a pagelet, if one is defined for a request path, else it will attempt to serve a static
|
|
resource from the webroot.
|
|
|
|
Usage:
|
|
|
|
# Listen on all interfaces on port 80 (dual-stack IPv6/IPv4)
|
|
server = MatematWebserver('::', 80, webroot='/var/www/matemat')
|
|
# Start the server. This call blocks while the server is running.
|
|
server.start()
|
|
"""
|
|
|
|
def __init__(self,
|
|
listen: str,
|
|
port: int,
|
|
staticroot: str,
|
|
templateroot: str,
|
|
pagelet_variables: Dict[str, str],
|
|
headers: Dict[str, str],
|
|
log_level: int,
|
|
log_handler: logging.Handler) -> None:
|
|
"""
|
|
Instantiate a MatematWebserver.
|
|
|
|
:param listen: The IPv4 or IPv6 address to listen on.
|
|
:param port: The TCP port to listen on.
|
|
:param staticroot: Path to the static webroot directory.
|
|
:param templateroot: Path to the Jinja2 templates root directory.
|
|
:param pagelet_variables: Dictionary of variables to pass to pagelet functions.
|
|
:param headers: Dictionary of statically configured headers.
|
|
:param log_level: The log level, as defined in the builtin logging module.
|
|
:param log_handler: The logging handler.
|
|
"""
|
|
# IPv4 address detection heuristic
|
|
if ':' not in listen and '.' in listen:
|
|
# Rewrite IPv4 address to IPv6-mapped form
|
|
listen = f'::ffff:{listen}'
|
|
# Create the http server
|
|
self._httpd = MatematHTTPServer((listen, port),
|
|
HttpHandler,
|
|
staticroot,
|
|
templateroot,
|
|
pagelet_variables,
|
|
headers,
|
|
log_level,
|
|
log_handler)
|
|
|
|
def start(self) -> None:
|
|
"""
|
|
Call all pagelet initialization functions and start the web server. This call blocks while the server is
|
|
running. If any exception is raised in the initialization phase, the program is terminated with a non-zero
|
|
exit code.
|
|
"""
|
|
global _PAGELET_CRON_RUNNER
|
|
try:
|
|
try:
|
|
# Run all pagelet initialization functions
|
|
for fun in _PAGELET_INIT_FUNCTIONS:
|
|
fun(self._httpd.pagelet_variables, self._httpd.logger)
|
|
# Set pagelet cron runner to self
|
|
_PAGELET_CRON_RUNNER = self._cron_runner
|
|
except BaseException as e:
|
|
# If an error occurs, log it and terminate
|
|
self._httpd.logger.exception(e)
|
|
self._httpd.logger.critical('An initialization pagelet raised an error. Stopping.')
|
|
raise e
|
|
# If pagelet initialization went fine, start the HTTP server
|
|
self._httpd.serve_forever()
|
|
finally:
|
|
# Cancel all cron timers at once when the webserver is shutting down
|
|
_PAGELET_CRON_STATIC_EVENT.set()
|
|
|
|
def _cron_runner(self, fun: Callable[[Dict[str, str], jinja2.Environment, logging.Logger], None]):
|
|
self._httpd.logger.info('Executing cron job "%s"', fun.__name__)
|
|
try:
|
|
fun(self._httpd.pagelet_variables,
|
|
self._httpd.jinja_env,
|
|
self._httpd.logger)
|
|
self._httpd.logger.info('Completed cron job "%s"', fun.__name__)
|
|
except BaseException as e:
|
|
self._httpd.logger.exception('Cron job "%s" failed:', fun.__name__, exc_info=e)
|
|
|
|
|
|
class HttpHandler(BaseHTTPRequestHandler):
|
|
"""
|
|
HTTP Request handler.
|
|
|
|
This class parses HTTP requests, and calls the appropriate pagelets, or fetches a static resource from the webroot
|
|
directory.
|
|
"""
|
|
|
|
def __init__(self, request: bytes, client_address: Tuple[str, int], server: HTTPServer) -> None:
|
|
super().__init__(request, client_address, server)
|
|
self.server: MatematHTTPServer
|
|
|
|
@property
|
|
def server_version(self) -> str:
|
|
return f'matemat/{matemat_version}'
|
|
|
|
def _start_session(self) -> Tuple[str, datetime]:
|
|
"""
|
|
Start a new session, or resume the session identified by the session cookie sent in the HTTP request.
|
|
|
|
:return: A tuple consisting of the session ID (a UUID string), and the session timeout date.
|
|
"""
|
|
# Reference date for session timeout
|
|
now = datetime.utcnow()
|
|
# Parse cookies sent by the client
|
|
cookiestring = '\n'.join(self.headers.get_all('Cookie', failobj=[]))
|
|
cookie = SimpleCookie()
|
|
cookie.load(cookiestring)
|
|
# Read the client's session ID, if any
|
|
session_id = str(cookie['matemat_session_id'].value) if 'matemat_session_id' in cookie else None
|
|
# If there is no active session, create a new session ID
|
|
if session_id is None or session_id not in self.server.session_vars:
|
|
session_id = str(uuid4())
|
|
self.server.logger.debug('Started session %s', session_id)
|
|
|
|
# Check for session timeout
|
|
if session_id in self.server.session_vars and self.server.session_vars[session_id][0] < now:
|
|
self._end_session(session_id)
|
|
self.server.logger.debug('Session %s timed out', session_id)
|
|
raise TimeoutError('Session timed out.')
|
|
# Update or initialize the session timeout
|
|
if session_id not in self.server.session_vars:
|
|
self.server.session_vars[session_id] = (now + timedelta(seconds=_SESSION_TIMEOUT)), dict()
|
|
else:
|
|
self.server.session_vars[session_id] = \
|
|
(now + timedelta(seconds=_SESSION_TIMEOUT), self.server.session_vars[session_id][1])
|
|
# Return the session ID and timeout
|
|
return session_id, self.server.session_vars[session_id][0]
|
|
|
|
def _end_session(self, session_id: str) -> None:
|
|
"""
|
|
Destroy a session identified by the session ID.
|
|
|
|
:param session_id: ID of the session to destroy.
|
|
"""
|
|
if session_id in self.server.session_vars:
|
|
del self.server.session_vars[session_id]
|
|
|
|
def _parse_pagelet_result(self,
|
|
pagelet_res: Union[bytes, # Response body as bytes
|
|
str, # Response body as str
|
|
PageletResponse], # Encapsulated or unresolved response body
|
|
headers: Dict[str, str]) \
|
|
-> Tuple[int, bytes]:
|
|
"""
|
|
Process the return value of a pagelet function call.
|
|
|
|
:param pagelet_res: The pagelet return value.
|
|
:param headers: The dict of HTTP response headers, needed for setting the redirect header.
|
|
:return: The HTTP Response status code (an int) and body (a bytes).
|
|
:raises TypeError: If the pagelet result was not in the expected form.
|
|
"""
|
|
# The HTTP Response Status Code, defaults to 200 OK
|
|
hsc: int = 200
|
|
# The HTTP Response body, defaults to empty
|
|
data: bytes = bytes()
|
|
|
|
# If the response is a bytes object, it is used without further modification
|
|
if isinstance(pagelet_res, bytes):
|
|
data = pagelet_res
|
|
|
|
# If the response is a str object, it is encoded into a bytes object
|
|
elif isinstance(pagelet_res, str):
|
|
data = pagelet_res.encode('utf-8')
|
|
|
|
# If the response is a PageletResponse object, the status code is extracted. Generation of the body depends
|
|
# on the subtype
|
|
elif isinstance(pagelet_res, PageletResponse):
|
|
hsc = pagelet_res.status
|
|
|
|
# If the object is a RedirectResponse instance, no body is needed
|
|
if isinstance(pagelet_res, RedirectResponse):
|
|
headers['Location'] = pagelet_res.location
|
|
# If the object is a TemplateRespinse instance, pass the Jinja2 environment instance for rendering
|
|
elif isinstance(pagelet_res, TemplateResponse):
|
|
# noinspection PyProtectedMember
|
|
data = pagelet_res._render(self.server.jinja_env)
|
|
# else: Empty body
|
|
|
|
else:
|
|
raise TypeError(f'Return value of pagelet not understood: {pagelet_res}')
|
|
|
|
# Return the resulting status code and body
|
|
return hsc, data
|
|
|
|
def _handle(self, method: str, path: str, args: RequestArguments) -> None:
|
|
"""
|
|
Handle a HTTP request by either dispatching it to the appropriate pagelet or by serving a static resource.
|
|
|
|
:param method: The HTTP request method (GET, POST).
|
|
:param path: The request path without GET arguments.
|
|
:param args: Arguments sent with the request. This includes GET and POST arguments, where the POST arguments
|
|
take precedence.
|
|
"""
|
|
# Start or resume a session; redirect to / on session timeout
|
|
try:
|
|
session_id, timeout = self._start_session()
|
|
except TimeoutError:
|
|
self.send_response(302)
|
|
self.send_header('Set-Cookie', 'matemat_session_id=; expires=Thu, 01 Jan 1970 00:00:00 GMT')
|
|
self.send_header('Location', '/')
|
|
self.end_headers()
|
|
return
|
|
self.session_id: str = session_id
|
|
|
|
# Set all static headers. These can still be overwritten by a pagelet
|
|
headers: Dict[str, str] = dict()
|
|
for k, v in self.server.static_headers.items():
|
|
headers[k] = v
|
|
|
|
# Call a pagelet function, if one is registered for the requested path
|
|
if path in _PAGELET_PATHS:
|
|
# Call the pagelet function
|
|
pagelet_res = _PAGELET_PATHS[path](method,
|
|
path,
|
|
args,
|
|
self.session_vars,
|
|
headers,
|
|
self.server.pagelet_variables)
|
|
# Parse the pagelet's return value, vielding a HTTP status code and a response body
|
|
hsc, data = self._parse_pagelet_result(pagelet_res, headers)
|
|
# Send the HTTP status code
|
|
self.send_response(hsc)
|
|
# Format the session cookie timeout string and send the session cookie header
|
|
expires = timeout.strftime('%a, %d %b %Y %H:%M:%S GMT')
|
|
headers['Set-Cookie'] = f'matemat_session_id={session_id}; expires={expires}'
|
|
# Disable caching
|
|
headers['Cache-Control'] = 'no-cache'
|
|
# Compute the body length and add the appropriate header
|
|
headers['Content-Length'] = str(len(data))
|
|
# If the pagelet did not set its own Content-Type header, use libmagic to guess an appropriate one
|
|
if 'Content-Type' not in headers:
|
|
try:
|
|
filemagic: magic.FileMagic = magic.detect_from_content(data)
|
|
mimetype: str = filemagic.mime_type
|
|
charset: str = filemagic.encoding
|
|
except ValueError:
|
|
mimetype = 'application/octet-stream'
|
|
charset = 'binary'
|
|
# Only append the charset if it is not "binary"
|
|
if charset == 'binary':
|
|
headers['Content-Type'] = mimetype
|
|
else:
|
|
headers['Content-Type'] = f'{mimetype}; charset={charset}'
|
|
# Send all headers set by the pagelet
|
|
for name, value in headers.items():
|
|
self.send_header(name, value)
|
|
# End the header section and write the body
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
else:
|
|
# No pagelet function for this path, try a static serve instead
|
|
# Only HTTP GET is allowed, else reply with a 'Method Not Allowed' header
|
|
if method != 'GET':
|
|
self.send_error(405)
|
|
self.end_headers()
|
|
return
|
|
# Create the absolute resource path, resolving '..'
|
|
filepath: str = os.path.abspath(os.path.join(self.server.webroot, path[1:]))
|
|
# Make sure the file is actually inside the webroot directory and that it exists
|
|
if os.path.commonpath([filepath, self.server.webroot]) == self.server.webroot and os.path.exists(filepath):
|
|
# Parse the If-Modified-Since header to check whether the browser can reuse cached content
|
|
datestr: str = self.headers.get('If-Modified-Since', 'Thu, 01 Jan 1970 00:00:00 GMT')
|
|
maxage: datetime = datetime.strptime(datestr, '%a, %d %b %Y %H:%M:%S %Z')
|
|
# Get file modification time
|
|
filestat: int = int(os.path.getmtime(filepath))
|
|
# Create UTC datetime object from mtime
|
|
fileage: datetime = datetime.utcfromtimestamp(filestat)
|
|
|
|
# If the file has not been replaced by a newer version than requested by the client, send a 304 response
|
|
if fileage <= maxage:
|
|
self.send_response(304, 'Not Modified')
|
|
self.send_header('Content-Length', '0')
|
|
self.end_headers()
|
|
return
|
|
|
|
# Open and read the file
|
|
with open(filepath, 'rb') as f:
|
|
data = f.read()
|
|
# File read successfully, send 'OK' header
|
|
self.send_response(200)
|
|
# Guess the MIME type by file extension, or use libmagic as fallback
|
|
# Use libmagic to guess the charset
|
|
try:
|
|
exttype: str = mimetypes.guess_type(filepath)[0]
|
|
filemagic: magic.FileMagic = magic.detect_from_filename(filepath)
|
|
mimetype: str = exttype if exttype is not None else filemagic.mime_type
|
|
charset: str = filemagic.encoding
|
|
except ValueError:
|
|
mimetype = 'application/octet-stream'
|
|
charset = 'binary'
|
|
# Send content type and length header. Only set the charset if it's not "binary"
|
|
if charset == 'binary':
|
|
headers['Content-Type'] = mimetype
|
|
else:
|
|
headers['Content-Type'] = f'{mimetype}; charset={charset}'
|
|
headers['Content-Length'] = str(len(data))
|
|
headers['Last-Modified'] = fileage.strftime('%a, %d %b %Y %H:%M:%S GMT')
|
|
headers['Cache-Control'] = 'max-age=1'
|
|
# Send all headers
|
|
for name, value in headers.items():
|
|
self.send_header(name, value)
|
|
self.end_headers()
|
|
# Send the requested resource as response body
|
|
self.wfile.write(data)
|
|
else:
|
|
# File does not exist or path points outside the webroot directory
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
|
|
def _handle_request(self, method: str, path: str, args: RequestArguments):
|
|
try:
|
|
self._handle(method, path, args)
|
|
# Special handling for some errors
|
|
except HttpException as e:
|
|
self.send_error(e.status, e.title, e.message)
|
|
if 500 <= e.status < 600:
|
|
self.send_error(500, 'Internal Server Error')
|
|
self.server.logger.exception('', exc_info=e)
|
|
else:
|
|
self.server.logger.debug('', exc_info=e)
|
|
except PermissionError as e:
|
|
self.send_error(403, 'Forbidden')
|
|
self.server.logger.debug('', exc_info=e)
|
|
except ValueError as e:
|
|
self.send_error(400, 'Bad Request')
|
|
self.server.logger.debug('', exc_info=e)
|
|
except BaseException as e:
|
|
# Generic error handling
|
|
self.send_error(500, 'Internal Server Error')
|
|
self.server.logger.exception('', e.args, e)
|
|
|
|
# noinspection PyPep8Naming
|
|
def do_GET(self) -> None:
|
|
"""
|
|
Called by BasicHTTPRequestHandler for GET requests.
|
|
"""
|
|
# Parse the request and hand it to the handle function
|
|
try:
|
|
path, args = parse_args(self.path)
|
|
self._handle_request('GET', path, args)
|
|
except ValueError as e:
|
|
self.send_error(400, 'Bad Request')
|
|
self.server.logger.debug('', exc_info=e)
|
|
|
|
# noinspection PyPep8Naming
|
|
def do_POST(self) -> None:
|
|
"""
|
|
Called by BasicHTTPRequestHandler for POST requests.
|
|
"""
|
|
try:
|
|
# Read the POST body, if it exists, and its MIME type is application/x-www-form-urlencoded
|
|
clen: int = int(str(self.headers.get('Content-Length', failobj='0')))
|
|
if clen > _MAX_POST:
|
|
# Return a 413 error page if the request size exceeds boundaries
|
|
self.send_error(413, 'Payload Too Large')
|
|
self.server.logger.debug('', exc_info=HttpException(413, 'Payload Too Large'))
|
|
return
|
|
ctype: str = self.headers.get('Content-Type', failobj='application/octet-stream')
|
|
post: bytes = self.rfile.read(clen)
|
|
# Parse the request and hand it to the handle function
|
|
path, args = parse_args(self.path, postbody=post, enctype=ctype)
|
|
self._handle_request('POST', path, args)
|
|
except ValueError as e:
|
|
self.send_error(400, 'Bad Request')
|
|
self.server.logger.debug('', exc_info=e)
|
|
|
|
@property
|
|
def session_vars(self) -> Dict[str, Any]:
|
|
"""
|
|
Get the session variables for the current session.
|
|
|
|
:return: Dictionary of named session variables.
|
|
"""
|
|
return self.server.session_vars[self.session_id][1]
|
|
|
|
def log_request(self, code='-', size='-'):
|
|
self.server.logger.info('%s %s from %s', self.requestline, code, self.client_address)
|
|
|
|
def log_error(self, fmt: str, *args):
|
|
self.server.logger.warning(f'{self.requestline} from {self.client_address}: {fmt}', *args)
|
|
|
|
def log_message(self, fmt, *args):
|
|
self.server.logger.info(fmt, *args)
|