Implemented cron jobs
This commit is contained in:
parent
22d4bd2cd5
commit
2056b0fb81
6 changed files with 213 additions and 12 deletions
2
doc
2
doc
|
@ -1 +1 @@
|
|||
Subproject commit 0cf3d59c8b37f84e915f5e30e7447f0611cc1238
|
||||
Subproject commit cb222b6a7131c24b958740bd5974fca26b450654
|
|
@ -8,5 +8,5 @@ server will attempt to serve the request with a static resource in a previously
|
|||
|
||||
from .requestargs import RequestArgument, RequestArguments
|
||||
from .responses import PageletResponse, RedirectResponse, TemplateResponse
|
||||
from .httpd import MatematWebserver, HttpHandler, pagelet, pagelet_init
|
||||
from .httpd import MatematWebserver, HttpHandler, pagelet, pagelet_init, pagelet_cron
|
||||
from .config import parse_config_file
|
||||
|
|
|
@ -11,6 +11,7 @@ from http.server import HTTPServer, BaseHTTPRequestHandler
|
|||
from http.cookies import SimpleCookie
|
||||
from uuid import uuid4
|
||||
from datetime import datetime, timedelta
|
||||
from threading import Event, Timer, Thread
|
||||
|
||||
import jinja2
|
||||
|
||||
|
@ -41,6 +42,9 @@ _PAGELET_PATHS: Dict[str, Callable[[str, # HTTP method (GET, POST, ...)
|
|||
# The pagelet initialization functions, to be executed upon startup
|
||||
_PAGELET_INIT_FUNCTIONS: Set[Callable[[Dict[str, str], logging.Logger], None]] = set()
|
||||
|
||||
_PAGELET_CRON_STATIC_EVENT: Event = Event()
|
||||
_PAGELET_CRON_RUNNER: Callable[[Callable[[Dict[str, str], jinja2.Environment, logging.Logger], None]], None] = None
|
||||
|
||||
# Inactivity timeout for client sessions
|
||||
_SESSION_TIMEOUT: int = 3600
|
||||
_MAX_POST: int = 1_000_000
|
||||
|
@ -118,6 +122,90 @@ def pagelet_init(fun: Callable[[Dict[str, str], logging.Logger], None]):
|
|||
_PAGELET_INIT_FUNCTIONS.add(fun)
|
||||
|
||||
|
||||
class _GlobalEventTimer(Thread):
|
||||
"""
|
||||
A timer similar to threading.Timer, except that waits on an externally supplied threading.Event instance,
|
||||
therefore allowing all timers waiting on the same event to be cancelled at once.
|
||||
"""
|
||||
|
||||
def __init__(self, interval: float, event: Event, fun, *args, **kwargs):
|
||||
"""
|
||||
Create a new _GlobalEventTimer.
|
||||
:param interval: The delay after which to run the function.
|
||||
:param event: The external threading.Event to wait on.
|
||||
:param fun: The function to call.
|
||||
:param args: The positional arguments to pass to the function.
|
||||
:param kwargs: The keyword arguments to pass to the function.
|
||||
"""
|
||||
Thread.__init__(self)
|
||||
self.interval = interval
|
||||
self.fun = fun
|
||||
self.args = args if args is not None else []
|
||||
self.kwargs = kwargs if kwargs is not None else {}
|
||||
self.event = event
|
||||
|
||||
def run(self):
|
||||
self.event.wait(self.interval)
|
||||
if not self.event.is_set():
|
||||
self.fun(*self.args, **self.kwargs)
|
||||
# Do NOT call event.set(), as done in threading.Timer, as that would cancel all other timers
|
||||
|
||||
|
||||
def pagelet_cron(weeks: int = 0,
|
||||
days: int = 0,
|
||||
hours: int = 0,
|
||||
seconds: int = 0,
|
||||
minutes: int = 0,
|
||||
milliseconds: int = 0,
|
||||
microseconds: int = 0):
|
||||
"""
|
||||
Annotate a function to act as a pagelet cron function. The function will be called in a regular interval, defined
|
||||
by the arguments passed to the decorator, which are passed to a timedelta object.
|
||||
|
||||
The function must have the following signature:
|
||||
|
||||
(config: Dict[str, str], jinja_env: jinja2.Environment, logger: logging.Logger) -> None
|
||||
|
||||
config: The mutable dictionary of variables read from the [Pagelets] section of the configuration file.
|
||||
jinja_env: The Jinja2 environment used by the web server.
|
||||
logger: The server's logger instance.
|
||||
returns: Nothing.
|
||||
|
||||
:param weeks: Number of weeks in the interval.
|
||||
:param days: Number of days in the interval.
|
||||
:param hours: Number of hours in the interval.
|
||||
:param seconds: Number of seconds in the interval.
|
||||
:param minutes: Number of minutes in the interval.
|
||||
:param milliseconds: Number of milliseconds in the interval.
|
||||
:param microseconds: Number of microseconds in the interval.
|
||||
"""
|
||||
|
||||
def cron_wrapper(fun: Callable[[Dict[str, str], jinja2.Environment, logging.Logger], None]):
|
||||
# Create the timedelta object
|
||||
delta: timedelta = timedelta(weeks=weeks,
|
||||
days=days,
|
||||
hours=hours,
|
||||
seconds=seconds,
|
||||
minutes=minutes,
|
||||
milliseconds=milliseconds,
|
||||
microseconds=microseconds)
|
||||
|
||||
# This function is called once in the specified interval
|
||||
def cron():
|
||||
# Set a new timer
|
||||
t: Timer = _GlobalEventTimer(delta.total_seconds(), _PAGELET_CRON_STATIC_EVENT, cron)
|
||||
t.start()
|
||||
# Have the cron job be picked up by the cron runner provided by the web server
|
||||
if _PAGELET_CRON_RUNNER is not None:
|
||||
_PAGELET_CRON_RUNNER(fun)
|
||||
|
||||
# Set a timer to run the cron job after the specified interval
|
||||
timer: Timer = _GlobalEventTimer(delta.total_seconds(), _PAGELET_CRON_STATIC_EVENT, cron)
|
||||
timer.start()
|
||||
|
||||
return cron_wrapper
|
||||
|
||||
|
||||
class MatematHTTPServer(HTTPServer):
|
||||
"""
|
||||
A http.server.HTTPServer subclass that acts as a container for data that must be persistent between requests.
|
||||
|
@ -212,17 +300,29 @@ class MatematWebserver(object):
|
|||
running. If any exception is raised in the initialization phase, the program is terminated with a non-zero
|
||||
exit code.
|
||||
"""
|
||||
global _PAGELET_CRON_RUNNER
|
||||
try:
|
||||
# Run all pagelet initialization functions
|
||||
for fun in _PAGELET_INIT_FUNCTIONS:
|
||||
fun(self._httpd.pagelet_variables, self._httpd.logger)
|
||||
except BaseException as e:
|
||||
# If an error occurs, log it and terminate
|
||||
self._httpd.logger.exception(e)
|
||||
self._httpd.logger.critical('An initialization pagelet raised an error. Stopping.')
|
||||
raise e
|
||||
# If pagelet initialization went fine, start the HTTP server
|
||||
self._httpd.serve_forever()
|
||||
try:
|
||||
# Run all pagelet initialization functions
|
||||
for fun in _PAGELET_INIT_FUNCTIONS:
|
||||
fun(self._httpd.pagelet_variables, self._httpd.logger)
|
||||
# Set pagelet cron runner to self
|
||||
_PAGELET_CRON_RUNNER = self._cron_runner
|
||||
except BaseException as e:
|
||||
# If an error occurs, log it and terminate
|
||||
self._httpd.logger.exception(e)
|
||||
self._httpd.logger.critical('An initialization pagelet raised an error. Stopping.')
|
||||
raise e
|
||||
# If pagelet initialization went fine, start the HTTP server
|
||||
self._httpd.serve_forever()
|
||||
finally:
|
||||
# Cancel all cron timers at once when the webserver is shutting down
|
||||
_PAGELET_CRON_STATIC_EVENT.set()
|
||||
|
||||
def _cron_runner(self, fun: Callable[[Dict[str, str], jinja2.Environment, logging.Logger], None]):
|
||||
fun(self._httpd.pagelet_variables,
|
||||
self._httpd.jinja_env,
|
||||
self._httpd.logger)
|
||||
|
||||
|
||||
class HttpHandler(BaseHTTPRequestHandler):
|
||||
|
|
0
matemat/webserver/pagelets/cron.py
Normal file
0
matemat/webserver/pagelets/cron.py
Normal file
36
matemat/webserver/pagelets/receipt.py
Normal file
36
matemat/webserver/pagelets/receipt.py
Normal file
|
@ -0,0 +1,36 @@
|
|||
from typing import Any, Dict, List, Union
|
||||
|
||||
from matemat.webserver import pagelet, RequestArguments, PageletResponse, RedirectResponse, TemplateResponse
|
||||
from matemat.db import MatematDatabase
|
||||
from matemat.db.primitives import Receipt
|
||||
from matemat.util.currency_format import format_chf
|
||||
|
||||
|
||||
@pagelet('/receipt')
|
||||
def show_receipt(method: str,
|
||||
path: str,
|
||||
args: RequestArguments,
|
||||
session_vars: Dict[str, Any],
|
||||
headers: Dict[str, str],
|
||||
config: Dict[str, str]) \
|
||||
-> Union[str, bytes, PageletResponse]:
|
||||
if 'authenticated_user' not in session_vars:
|
||||
return RedirectResponse('/')
|
||||
# Connect to the database
|
||||
with MatematDatabase(config['DatabaseFile']) as db:
|
||||
# Fetch the authenticated user from the database
|
||||
uid: int = session_vars['authenticated_user']
|
||||
user = db.get_user(uid)
|
||||
receipt: Receipt = db.create_receipt(user)
|
||||
headers['Content-Type'] = 'text/plain; charset=utf-8'
|
||||
fdate: str = receipt.from_date.strftime('%d.%m.%Y, %H:%M')
|
||||
tdate: str = receipt.to_date.strftime('%d.%m.%Y, %H:%M')
|
||||
username: str = receipt.user.name.rjust(40)
|
||||
if len(receipt.transactions) == 0:
|
||||
fbal: str = format_chf(receipt.user.balance).rjust(12)
|
||||
else:
|
||||
fbal = format_chf(receipt.transactions[0].old_balance).rjust(12)
|
||||
tbal: str = format_chf(receipt.user.balance).rjust(12)
|
||||
return TemplateResponse('receipt.txt',
|
||||
fdate=fdate, tdate=tdate, user=username, fbal=fbal, tbal=tbal,
|
||||
transactions=receipt.transactions, instance_name=config['InstanceName'])
|
65
matemat/webserver/test/test_pagelet_cron.py
Normal file
65
matemat/webserver/test/test_pagelet_cron.py
Normal file
|
@ -0,0 +1,65 @@
|
|||
from typing import Dict
|
||||
|
||||
import unittest
|
||||
|
||||
import logging
|
||||
from threading import Lock, Thread, Timer
|
||||
from time import sleep
|
||||
import jinja2
|
||||
|
||||
from matemat.webserver import MatematWebserver, pagelet_cron
|
||||
|
||||
lock: Lock = Lock()
|
||||
|
||||
cron1called: int = 0
|
||||
cron2called: int = 0
|
||||
|
||||
|
||||
@pagelet_cron(seconds=4)
|
||||
def cron1(config: Dict[str, str],
|
||||
jinja_env: jinja2.Environment,
|
||||
logger: logging.Logger) -> None:
|
||||
global cron1called
|
||||
with lock:
|
||||
cron1called += 1
|
||||
|
||||
|
||||
@pagelet_cron(seconds=3)
|
||||
def cron2(config: Dict[str, str],
|
||||
jinja_env: jinja2.Environment,
|
||||
logger: logging.Logger) -> None:
|
||||
global cron2called
|
||||
with lock:
|
||||
cron2called += 1
|
||||
|
||||
|
||||
class TestPageletCron(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.srv = MatematWebserver('::1', 0, '/nonexistent', '/nonexistent', {}, {},
|
||||
logging.NOTSET, logging.NullHandler())
|
||||
self.srv_port = int(self.srv._httpd.socket.getsockname()[1])
|
||||
self.timer = Timer(10.0, self.srv._httpd.shutdown)
|
||||
self.timer.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.timer.cancel()
|
||||
if self.srv is not None:
|
||||
self.srv._httpd.socket.close()
|
||||
|
||||
def test_cron(self):
|
||||
"""
|
||||
Test that the cron functions are called properly.
|
||||
"""
|
||||
thread = Thread(target=self.srv.start)
|
||||
thread.start()
|
||||
sleep(12)
|
||||
self.srv._httpd.shutdown()
|
||||
with lock:
|
||||
self.assertEqual(2, cron1called)
|
||||
self.assertEqual(3, cron2called)
|
||||
# Make sure the cron threads were stopped
|
||||
sleep(5)
|
||||
with lock:
|
||||
self.assertEqual(2, cron1called)
|
||||
self.assertEqual(3, cron2called)
|
Loading…
Reference in a new issue