From bc024ea82d682c44fc81c85f1dff9b83099c1d4d Mon Sep 17 00:00:00 2001 From: s3lph Date: Tue, 20 Aug 2019 15:53:51 +0200 Subject: [PATCH 1/5] WIP: Implement regular scraping of the calendar sources in the background, instead of blocking in the foreground process --- README.md | 4 +-- icalendar-timeseries-server.json | 9 +++-- icalendar_timeseries_server/api.py | 18 +++------- icalendar_timeseries_server/cal.py | 48 ++++++++++++++++----------- icalendar_timeseries_server/config.py | 10 +++--- icalendar_timeseries_server/event.py | 2 +- icalendar_timeseries_server/main.py | 6 +++- 7 files changed, 52 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index 8bb43fb..41bb59c 100644 --- a/README.md +++ b/README.md @@ -92,7 +92,6 @@ Configuration is done through a JSON config file: "port": 8090, "start_delta": "-PT3H", "end_delta": "P30D", - "cache": "PT15M", "tz": "Europe/Zurich", "calendars": { "private": { @@ -104,6 +103,7 @@ Configuration is done through a JSON config file: } }, "public": { + "interval": "P1D", "url": "https://example.cloud/dav/me/public.ics" }, "confidential": { @@ -136,11 +136,11 @@ Configuration is done through a JSON config file: | `port` | int | The port to listen on. | | `start_delta` | string | A signed ISO 8601 duration string, describing the event range start offset relative to the current time. | | `end_delta` | string | An unsigned ISO 8601 duration string, describing the event range end offset relative to the current time. | -| `cache` | string | An unsigned ISO 8601 duration string, describing the cache timeout duration. | | `tz` | string | The local timezone. | | `calendars` | dict | The calendars to scrape. | | `keys(calendars)` | string | Name of the calendar. | | `calendars.*.url` | string | The HTTP or HTTPS URL to scrape. | +| `calendars.*.interval` | string | An unsigned ISO 8601 duration string, describing the scrape interval for this calendar. | | `calendars.*.ca` | string | Path to the CA certificate file to validate the server's TLS certificate against, in PEM format (optional). | | `calendars.*.auth` | dict | Authorization config for the calendar. | | `calendars.*.auth[].type` | string | Authorization type, one of `none` (no authorization), `basic` (HTTP Basic Authentication), `tls` (TLS client certificate). | diff --git a/icalendar-timeseries-server.json b/icalendar-timeseries-server.json index d24809c..847ee7d 100644 --- a/icalendar-timeseries-server.json +++ b/icalendar-timeseries-server.json @@ -7,23 +7,26 @@ "tz": "Europe/Zurich", "calendars": { "tlstest": { + "interval": "PT5M", "url": "https://localhost/private.ics", "ca": "/home/sebastian/tlstest/ca/ca/ca.crt", "auth": { "type": "tls", "keyfile": "/home/sebastian/tlstest/client/combined.pem" } + }, + "filetest": { + "interval": "PT1M", + "url": "file:///srv/http/private.ics" } }, "key_replace": { "summary": "a_summary", - "description": "b_description", - "calendar": "c_calendar" + "description": "b_description" }, "value_replace": { "summary": "{{ summary|truncate(100, end=' \\N{HORIZONTAL ELLIPSIS}') }}", "description": "{{ description|truncate(100, end=' \\N{HORIZONTAL ELLIPSIS}') }}", - "calendar": "{{ 0 if calendar == 'private' else 1 }}", "useless_metric": "{{ start.timestamp() + end.timestamp() }}" } } diff --git a/icalendar_timeseries_server/api.py b/icalendar_timeseries_server/api.py index 0132574..5674f0e 100644 --- a/icalendar_timeseries_server/api.py +++ b/icalendar_timeseries_server/api.py @@ -1,29 +1,21 @@ from typing import List import json -from datetime import datetime from urllib.error import HTTPError import traceback import bottle -from isodate import Duration from icalendar_timeseries_server.config import get_config -from icalendar_timeseries_server.event import Event -from icalendar_timeseries_server.cal import scrape_calendar +from icalendar_timeseries_server.event import Metric +from icalendar_timeseries_server.cal import get_calendar from icalendar_timeseries_server.query import MetricQuery @bottle.route('/api/v1/query') @bottle.route('/api/v1/query_range') def prometheus_api(): - tz = get_config().tz - now: datetime = datetime.now(tz) - start_delta: Duration = get_config().start_delta - end_delta: Duration = get_config().end_delta - start: datetime = now + start_delta - end: datetime = now + end_delta - events: List[Event] = [] + events: List[Metric] = [] try: q = MetricQuery(bottle.request.query['query']) @@ -39,8 +31,8 @@ def prometheus_api(): return json.dumps(response) try: - for name, caldef in get_config().calendars.items(): - events.extend(scrape_calendar(name, caldef, start, end)) + for name in get_config().calendars.keys(): + events.extend(get_calendar(name)) events = list(filter(q, events)) events.sort(key=lambda e: e.start) response = { diff --git a/icalendar_timeseries_server/cal.py b/icalendar_timeseries_server/cal.py index c17dd77..76838e3 100644 --- a/icalendar_timeseries_server/cal.py +++ b/icalendar_timeseries_server/cal.py @@ -1,18 +1,20 @@ -from typing import Dict, List, Iterable, Tuple +from typing import Dict, List, Iterable import sys import urllib.request from datetime import datetime, date, timedelta +from threading import Timer from dateutil import rrule from icalendar import cal +from isodate import Duration from icalendar_timeseries_server import __version__ from icalendar_timeseries_server.config import get_config, CalendarConfig from icalendar_timeseries_server.event import Event -_SCRAPE_CACHE: Dict[str, Tuple[datetime, List[Event]]] = dict() +_SCRAPE_CACHE: Dict[str, List[Event]] = dict() __py_version: str = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}' USER_AGENT: str = f'icalendar-timeseries-server/{__version__} (Python/{__py_version})' @@ -46,8 +48,15 @@ def _parse_recurring(event: cal.Event, start: datetime, end: datetime, duration: return occurences -def _parse_calendar(name: str, calendar: cal.Calendar, start: datetime, end: datetime) -> List[Event]: +def _scrape_calendar(name: str, config: CalendarConfig, start: datetime, end: datetime): + global _SCRAPE_CACHE events = [] + + opener: urllib.request.OpenerDirector = config.get_url_opener() + with opener.open(config.url) as response: + data = response.read().decode('utf-8') + calendar = cal.Calendar.from_ical(data) + for element in calendar.walk(): if element.name == "VEVENT": dtstart = element.get('dtstart').dt @@ -70,23 +79,22 @@ def _parse_calendar(name: str, calendar: cal.Calendar, start: datetime, end: dat for occurence in occurences: if start <= occurence < end: events.append(Event(name, element, occurence, occurence + duration)) - return events + _SCRAPE_CACHE[name] = events -def scrape_calendar(name: str, config: CalendarConfig, start: datetime, end: datetime) -> List[Event]: +def scrape_calendar(name: str, config: CalendarConfig): + tz = get_config().tz + now: datetime = datetime.now(tz) + start_delta: Duration = get_config().start_delta + end_delta: Duration = get_config().end_delta + start: datetime = now + start_delta + end: datetime = now + end_delta + _scrape_calendar(name, config, start, end) + cron = Timer(config.interval.totimedelta(start=now).total_seconds(), + lambda: scrape_calendar(name, config)) + cron.start() + + +def get_calendar(name: str): global _SCRAPE_CACHE - now: datetime = datetime.now(tz=get_config().tz) - if get_config().cache.total_seconds() > 0 and name in _SCRAPE_CACHE: - cache_timeout, cached = _SCRAPE_CACHE[name] - if now < cache_timeout: - print('serving cached') - return cached - print('doing request') - - opener: urllib.request.OpenerDirector = config.get_url_opener() - with opener.open(config.url) as response: - data = response.read().decode('utf-8') - calendar = cal.Calendar.from_ical(data) - parsed: List[Event] = _parse_calendar(name, calendar, start, end) - _SCRAPE_CACHE[name] = now + get_config().cache, parsed - return parsed + return _SCRAPE_CACHE.get(name, []) diff --git a/icalendar_timeseries_server/config.py b/icalendar_timeseries_server/config.py index 5ce34ea..bfbde2c 100644 --- a/icalendar_timeseries_server/config.py +++ b/icalendar_timeseries_server/config.py @@ -27,6 +27,7 @@ class CalendarConfig: def __init__(self, config: Dict[str, Any], config_path: str) -> None: self._url: str = _keycheck('url', config, str, config_path) + self._scrape_interval: Duration = _parse_timedelta('interval', config, config_path, default_value='PT15M') self._ca: Optional[str] = _keycheck('ca', config, str, config_path, optional=True) auth: Dict[str, Any] = _keycheck('auth', config, dict, config_path, default_value={'type': 'none'}) self._authtype: str = _keycheck('type', auth, str, f'{config_path}.auth', @@ -56,6 +57,10 @@ class CalendarConfig: def url(self) -> str: return self._url + @property + def interval(self) -> Duration: + return self._scrape_interval + def get_url_opener(self) -> urllib.request.OpenerDirector: if self._authtype == 'tls': @@ -89,7 +94,6 @@ class Config: self._tz: pytz.tzinfo = _parse_timezone('tz', config, '', default_value='UTC') self._start_delta: Duration = _parse_timedelta('start_delta', config, '', default_value='PT') self._end_delta: Duration = _parse_timedelta('end_delta', config, '', default_value='P30D') - self._cache: Duration = _parse_timedelta('cache', config, '', default_value='PT', force_positive=True) self._calendars: Dict[str, CalendarConfig] = self._parse_calendars_config('calendars', config, '') self._key_replace = _parse_key_replace('key_replace', config, '') self._value_replace = _parse_value_replace('value_replace', config, '') @@ -125,10 +129,6 @@ class Config: def end_delta(self) -> Duration: return self._end_delta - @property - def cache(self) -> Duration: - return self._cache - @property def calendars(self) -> Dict[str, CalendarConfig]: return self._calendars diff --git a/icalendar_timeseries_server/event.py b/icalendar_timeseries_server/event.py index dff8a9e..934b4f7 100644 --- a/icalendar_timeseries_server/event.py +++ b/icalendar_timeseries_server/event.py @@ -38,7 +38,7 @@ class Event(Metric): for attr in _ATTRIBUTES: tmp[attr] = event.get(attr, '') substitution_keys = set(_ATTRIBUTES) - substitution_keys.update(['start', 'end']) + substitution_keys.update(tmp.keys()) substitution_keys.update(get_config().key_replace.keys()) substitution_keys.update(get_config().value_replace.keys()) for attr in substitution_keys: diff --git a/icalendar_timeseries_server/main.py b/icalendar_timeseries_server/main.py index cf549cb..2456a2f 100644 --- a/icalendar_timeseries_server/main.py +++ b/icalendar_timeseries_server/main.py @@ -2,6 +2,7 @@ import sys import bottle +from icalendar_timeseries_server.cal import scrape_calendar from icalendar_timeseries_server.config import load_config, load_default_config, get_config # Contains decorated bottle handler function for /api/v1/query @@ -17,7 +18,10 @@ def main(): else: print(f'Can only read one config file, got "{" ".join(sys.argv[1:])}"') exit(1) - bottle.run(host=get_config().addr, port=get_config().port) + config = get_config() + for calname in config.calendars.keys(): + scrape_calendar(calname, config.calendars[calname]) + bottle.run(host=config.addr, port=get_config().port) if __name__ == '__main__': From ffc720cabe61070b323518a290b5550661b83ad8 Mon Sep 17 00:00:00 2001 From: s3lph Date: Wed, 21 Aug 2019 13:40:16 +0200 Subject: [PATCH 2/5] Update unit tests to work with per-calendar scrape intervals. --- icalendar_timeseries_server/cal.py | 10 ++++--- .../test/test_config.py | 26 ++++++++++++++++--- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/icalendar_timeseries_server/cal.py b/icalendar_timeseries_server/cal.py index 76838e3..accff89 100644 --- a/icalendar_timeseries_server/cal.py +++ b/icalendar_timeseries_server/cal.py @@ -83,16 +83,20 @@ def _scrape_calendar(name: str, config: CalendarConfig, start: datetime, end: da def scrape_calendar(name: str, config: CalendarConfig): + # Get current time in configured timezone tz = get_config().tz now: datetime = datetime.now(tz) + # Reschedule calendar scraping + cron = Timer(config.interval.totimedelta(start=now).total_seconds(), + lambda: scrape_calendar(name, config)) + cron.start() + # Compute interval for which to return events start_delta: Duration = get_config().start_delta end_delta: Duration = get_config().end_delta start: datetime = now + start_delta end: datetime = now + end_delta + # Scrape and parse the calendar _scrape_calendar(name, config, start, end) - cron = Timer(config.interval.totimedelta(start=now).total_seconds(), - lambda: scrape_calendar(name, config)) - cron.start() def get_calendar(name: str): diff --git a/icalendar_timeseries_server/test/test_config.py b/icalendar_timeseries_server/test/test_config.py index 3543e0d..c0f7601 100644 --- a/icalendar_timeseries_server/test/test_config.py +++ b/icalendar_timeseries_server/test/test_config.py @@ -16,7 +16,6 @@ _CONFIG_VALID = """ "port": 8090, "start_delta": "-PT3H", "end_delta": "P30D", - "cache": "PT15M", "tz": "Europe/Zurich", "calendars": { "private": { @@ -28,10 +27,12 @@ _CONFIG_VALID = """ } }, "public": { - "url": "https://example.cloud/dav/me/public.ics" + "url": "https://example.cloud/dav/me/public.ics", + "interval": "P1D" }, "confidential": { "url": "https://example.cloud/dav/me/confidential.ics", + "interval": "PT5M", "ca": "/etc/ssl/ca.pem", "auth": { "type": "tls", @@ -124,5 +125,24 @@ class ConfigTest(unittest.TestCase): self.assertEqual(config.port, 8090) self.assertEqual(config.start_delta, Duration(hours=-3)) self.assertEqual(config.end_delta, Duration(days=30)) - self.assertEqual(config.cache, Duration(minutes=15)) self.assertEqual(config.tz, pytz.timezone('Europe/Zurich')) + + def test_parse_calendars(self): + config = Config(json.loads(_CONFIG_VALID)) + self.assertEqual({'public', 'private', 'confidential'}, config.calendars.keys()) + + self.assertEqual('https://example.cloud/dav/me/public.ics', config.calendars['public'].url) + self.assertEqual(Duration(days=1), config.calendars['public'].interval) + self.assertEqual('none', config.calendars['public']._authtype) + + self.assertEqual('https://example.cloud/dav/me/private.ics', config.calendars['private'].url) + self.assertEqual(Duration(minutes=15), config.calendars['private'].interval) + self.assertEqual('basic', config.calendars['private']._authtype) + self.assertEqual('Basic bWU6bXlzdXBlcnNlY3VyZXBhc3N3b3Jk', + config.calendars['private']._request_headers['Authorization']) + + self.assertEqual('https://example.cloud/dav/me/confidential.ics', config.calendars['confidential'].url) + self.assertEqual(Duration(minutes=5), config.calendars['confidential'].interval) + self.assertEqual('tls', config.calendars['confidential']._authtype) + self.assertEqual('/etc/ssl/client.pem', config.calendars['confidential']._tls_keyfile) + self.assertEqual('mysupersecurepassword', config.calendars['confidential']._tls_passphrase) From 8d3e28a11d8152fa2d7182fa2a73c8b731ff346f Mon Sep 17 00:00:00 2001 From: s3lph Date: Wed, 21 Aug 2019 13:51:43 +0200 Subject: [PATCH 3/5] Move first scrape from foreground to background --- icalendar_timeseries_server/cal.py | 9 +++++++++ icalendar_timeseries_server/main.py | 6 ++++-- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/icalendar_timeseries_server/cal.py b/icalendar_timeseries_server/cal.py index accff89..fb5269d 100644 --- a/icalendar_timeseries_server/cal.py +++ b/icalendar_timeseries_server/cal.py @@ -99,6 +99,15 @@ def scrape_calendar(name: str, config: CalendarConfig): _scrape_calendar(name, config, start, end) +def start_scrape_calendar(name: str, config: CalendarConfig): + # Get current time in configured timezone + tz = get_config().tz + now: datetime = datetime.now(tz) + # Schedule first calendar scraping + cron = Timer(0, lambda: scrape_calendar(name, config)) + cron.start() + + def get_calendar(name: str): global _SCRAPE_CACHE return _SCRAPE_CACHE.get(name, []) diff --git a/icalendar_timeseries_server/main.py b/icalendar_timeseries_server/main.py index 2456a2f..a1168f0 100644 --- a/icalendar_timeseries_server/main.py +++ b/icalendar_timeseries_server/main.py @@ -2,7 +2,7 @@ import sys import bottle -from icalendar_timeseries_server.cal import scrape_calendar +from icalendar_timeseries_server.cal import start_scrape_calendar from icalendar_timeseries_server.config import load_config, load_default_config, get_config # Contains decorated bottle handler function for /api/v1/query @@ -19,8 +19,10 @@ def main(): print(f'Can only read one config file, got "{" ".join(sys.argv[1:])}"') exit(1) config = get_config() + # Schedule calendar scraping in the background for calname in config.calendars.keys(): - scrape_calendar(calname, config.calendars[calname]) + start_scrape_calendar(calname, config.calendars[calname]) + # Start the Bottle HTTP server bottle.run(host=config.addr, port=get_config().port) From 4cc8adc7b924d101eec1767235a7b613dcbe3336 Mon Sep 17 00:00:00 2001 From: s3lph Date: Wed, 21 Aug 2019 13:52:25 +0200 Subject: [PATCH 4/5] Secure all critical sections using _SCRAPE_CACHE with a lock. --- icalendar_timeseries_server/cal.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/icalendar_timeseries_server/cal.py b/icalendar_timeseries_server/cal.py index fb5269d..0e582ec 100644 --- a/icalendar_timeseries_server/cal.py +++ b/icalendar_timeseries_server/cal.py @@ -3,7 +3,7 @@ from typing import Dict, List, Iterable import sys import urllib.request from datetime import datetime, date, timedelta -from threading import Timer +from threading import Lock, Timer from dateutil import rrule from icalendar import cal @@ -15,6 +15,7 @@ from icalendar_timeseries_server.event import Event _SCRAPE_CACHE: Dict[str, List[Event]] = dict() +_SCRAPE_CACHE_LOCK: Lock = Lock() __py_version: str = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}' USER_AGENT: str = f'icalendar-timeseries-server/{__version__} (Python/{__py_version})' @@ -49,7 +50,7 @@ def _parse_recurring(event: cal.Event, start: datetime, end: datetime, duration: def _scrape_calendar(name: str, config: CalendarConfig, start: datetime, end: datetime): - global _SCRAPE_CACHE + global _SCRAPE_CACHE, _SCRAPE_CACHE_LOCK events = [] opener: urllib.request.OpenerDirector = config.get_url_opener() @@ -79,7 +80,8 @@ def _scrape_calendar(name: str, config: CalendarConfig, start: datetime, end: da for occurence in occurences: if start <= occurence < end: events.append(Event(name, element, occurence, occurence + duration)) - _SCRAPE_CACHE[name] = events + with _SCRAPE_CACHE_LOCK: + _SCRAPE_CACHE[name] = events def scrape_calendar(name: str, config: CalendarConfig): @@ -110,4 +112,5 @@ def start_scrape_calendar(name: str, config: CalendarConfig): def get_calendar(name: str): global _SCRAPE_CACHE - return _SCRAPE_CACHE.get(name, []) + with _SCRAPE_CACHE_LOCK: + return _SCRAPE_CACHE.get(name, []) From 96064eec1220f9ccfab777fb8ad4ab90f7929592 Mon Sep 17 00:00:00 2001 From: s3lph Date: Wed, 21 Aug 2019 13:57:38 +0200 Subject: [PATCH 5/5] Remove unnecessary code for initial calendar scrape. --- icalendar_timeseries_server/cal.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/icalendar_timeseries_server/cal.py b/icalendar_timeseries_server/cal.py index 0e582ec..a2e2200 100644 --- a/icalendar_timeseries_server/cal.py +++ b/icalendar_timeseries_server/cal.py @@ -102,9 +102,6 @@ def scrape_calendar(name: str, config: CalendarConfig): def start_scrape_calendar(name: str, config: CalendarConfig): - # Get current time in configured timezone - tz = get_config().tz - now: datetime = datetime.now(tz) # Schedule first calendar scraping cron = Timer(0, lambda: scrape_calendar(name, config)) cron.start()