icalendar-timeseries-server/icalendar_timeseries_server/cal.py

113 lines
4.2 KiB
Python

from typing import Dict, List, Iterable
import sys
import urllib.request
from datetime import datetime, date, timedelta
from threading import Lock, Timer
from dateutil import rrule
from icalendar import cal
from isodate import Duration
from icalendar_timeseries_server import __version__
from icalendar_timeseries_server.config import get_config, CalendarConfig
from icalendar_timeseries_server.event import Event
_SCRAPE_CACHE: Dict[str, List[Event]] = dict()
_SCRAPE_CACHE_LOCK: Lock = Lock()
__py_version: str = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}'
USER_AGENT: str = f'icalendar-timeseries-server/{__version__} (Python/{__py_version})'
def _parse_recurring(event: cal.Event, start: datetime, end: datetime, duration: timedelta) -> List[datetime]:
occurences: List[datetime] = []
evstart = event.get('dtstart').dt
# First occurence lies in the future; no need to process further
if evstart >= end:
return occurences
# Extract recurrence rules from ical
ical_lines = event.to_ical().decode('utf-8').split('\r\n')
recurrence = '\n'.join(
[x for x in ical_lines
if x.startswith('RRULE') or x.startswith('RDATE') or x.startswith('EXRULE') or x.startswith('EXDATE')])
# Create a generator that yields a timestamp for each recurrence
generator = rrule.rrulestr(recurrence, dtstart=evstart)
# Generate an event entry for each occurence of the event
for dt in generator:
# Skip past occurences and break once the the event lies too far in the future
if dt + duration < start:
continue
if dt > end:
break
# Create an event entry
occurences.append(dt)
return occurences
def _scrape_calendar(name: str, config: CalendarConfig, start: datetime, end: datetime):
global _SCRAPE_CACHE, _SCRAPE_CACHE_LOCK
events = []
opener: urllib.request.OpenerDirector = config.get_url_opener()
with opener.open(config.url) as response:
data = response.read().decode('utf-8')
calendar = cal.Calendar.from_ical(data)
for element in calendar.walk():
if element.name == "VEVENT":
dtstart = element.get('dtstart').dt
if isinstance(dtstart, date):
dtstart = datetime(dtstart.year, dtstart.month, dtstart.day, tzinfo=start.tzinfo)
# Process either end timestamp or duration, if present
if 'dtend' in element:
evend = element.get('dtend').dt
if isinstance(evend, date):
evend = datetime(evend.year, evend.month, evend.day, tzinfo=start.tzinfo)
duration = evend - dtstart
elif 'duration' in element:
duration = element.get('duration').dt
else:
duration = timedelta(0)
if element.get('rrule') is not None or element.get('rdate') is not None:
occurences: Iterable[datetime] = _parse_recurring(element, start, end, duration)
else:
occurences = [dtstart]
for occurence in occurences:
if start <= occurence < end:
events.append(Event(name, element, occurence, occurence + duration))
with _SCRAPE_CACHE_LOCK:
_SCRAPE_CACHE[name] = events
def scrape_calendar(name: str, config: CalendarConfig):
# Get current time in configured timezone
tz = get_config().tz
now: datetime = datetime.now(tz)
# Reschedule calendar scraping
cron = Timer(config.interval.totimedelta(start=now).total_seconds(),
lambda: scrape_calendar(name, config))
cron.start()
# Compute interval for which to return events
start_delta: Duration = get_config().start_delta
end_delta: Duration = get_config().end_delta
start: datetime = now + start_delta
end: datetime = now + end_delta
# Scrape and parse the calendar
_scrape_calendar(name, config, start, end)
def start_scrape_calendar(name: str, config: CalendarConfig):
# Schedule first calendar scraping
cron = Timer(0, lambda: scrape_calendar(name, config))
cron.start()
def get_calendar(name: str):
global _SCRAPE_CACHE
with _SCRAPE_CACHE_LOCK:
return _SCRAPE_CACHE.get(name, [])