from typing import Dict, List, Iterable import sys import urllib.request from datetime import datetime, date, timedelta from threading import Timer from dateutil import rrule from icalendar import cal from isodate import Duration from icalendar_timeseries_server import __version__ from icalendar_timeseries_server.config import get_config, CalendarConfig from icalendar_timeseries_server.event import Event _SCRAPE_CACHE: Dict[str, List[Event]] = dict() __py_version: str = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}' USER_AGENT: str = f'icalendar-timeseries-server/{__version__} (Python/{__py_version})' def _parse_recurring(event: cal.Event, start: datetime, end: datetime, duration: timedelta) -> List[datetime]: occurences: List[datetime] = [] evstart = event.get('dtstart').dt # First occurence lies in the future; no need to process further if evstart >= end: return occurences # Extract recurrence rules from ical ical_lines = event.to_ical().decode('utf-8').split('\r\n') recurrence = '\n'.join( [x for x in ical_lines if x.startswith('RRULE') or x.startswith('RDATE') or x.startswith('EXRULE') or x.startswith('EXDATE')]) # Create a generator that yields a timestamp for each recurrence generator = rrule.rrulestr(recurrence, dtstart=evstart) # Generate an event entry for each occurence of the event for dt in generator: # Skip past occurences and break once the the event lies too far in the future if dt + duration < start: continue if dt > end: break # Create an event entry occurences.append(dt) return occurences def _scrape_calendar(name: str, config: CalendarConfig, start: datetime, end: datetime): global _SCRAPE_CACHE events = [] opener: urllib.request.OpenerDirector = config.get_url_opener() with opener.open(config.url) as response: data = response.read().decode('utf-8') calendar = cal.Calendar.from_ical(data) for element in calendar.walk(): if element.name == "VEVENT": dtstart = element.get('dtstart').dt if isinstance(dtstart, date): dtstart = datetime(dtstart.year, dtstart.month, dtstart.day, tzinfo=start.tzinfo) # Process either end timestamp or duration, if present if 'dtend' in element: evend = element.get('dtend').dt if isinstance(evend, date): evend = datetime(evend.year, evend.month, evend.day, tzinfo=start.tzinfo) duration = evend - dtstart elif 'duration' in element: duration = element.get('duration').dt else: duration = timedelta(0) if element.get('rrule') is not None or element.get('rdate') is not None: occurences: Iterable[datetime] = _parse_recurring(element, start, end, duration) else: occurences = [dtstart] for occurence in occurences: if start <= occurence < end: events.append(Event(name, element, occurence, occurence + duration)) _SCRAPE_CACHE[name] = events def scrape_calendar(name: str, config: CalendarConfig): # Get current time in configured timezone tz = get_config().tz now: datetime = datetime.now(tz) # Reschedule calendar scraping cron = Timer(config.interval.totimedelta(start=now).total_seconds(), lambda: scrape_calendar(name, config)) cron.start() # Compute interval for which to return events start_delta: Duration = get_config().start_delta end_delta: Duration = get_config().end_delta start: datetime = now + start_delta end: datetime = now + end_delta # Scrape and parse the calendar _scrape_calendar(name, config, start, end) def start_scrape_calendar(name: str, config: CalendarConfig): # Get current time in configured timezone tz = get_config().tz now: datetime = datetime.now(tz) # Schedule first calendar scraping cron = Timer(0, lambda: scrape_calendar(name, config)) cron.start() def get_calendar(name: str): global _SCRAPE_CACHE return _SCRAPE_CACHE.get(name, [])