from typing import Dict, List, Iterable import sys import urllib.request import logging from datetime import datetime, date, timedelta from threading import Lock, Timer from dateutil import rrule from icalendar import cal from isodate import Duration from icalendar_timeseries_server import __version__ from icalendar_timeseries_server.config import get_config, CalendarConfig from icalendar_timeseries_server.event import Event _SCRAPE_CACHE: Dict[str, List[Event]] = dict() _SCRAPE_CACHE_LOCK: Lock = Lock() __py_version: str = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}' USER_AGENT: str = f'icalendar-timeseries-server/{__version__} (Python/{__py_version})' def _parse_recurring(event: cal.Event, start: datetime, end: datetime, duration: timedelta) -> List[datetime]: occurences: List[datetime] = [] evstart = event.get('dtstart').dt if isinstance(evstart, date) and not isinstance(evstart, datetime): evstart = datetime(evstart.year, evstart.month, evstart.day, tzinfo=start.tzinfo) # First occurence lies in the future; no need to process further if evstart >= end: return occurences # Extract recurrence rules from ical ical_lines = event.to_ical().decode('utf-8').split('\r\n') recurrence = '\n'.join( [x for x in ical_lines if x.startswith('RRULE') or x.startswith('RDATE') or x.startswith('EXRULE') or x.startswith('EXDATE')]) # Create a generator that yields a timestamp for each recurrence generator = rrule.rrulestr(recurrence, dtstart=evstart) # Generate an event entry for each occurence of the event for dt in generator: # Skip past occurences and break once the the event lies too far in the future if dt + duration < start: continue if dt > end: break # Create an event entry occurences.append(dt) return occurences def _scrape_calendar(name: str, config: CalendarConfig, start: datetime, end: datetime): global _SCRAPE_CACHE, _SCRAPE_CACHE_LOCK events = [] opener: urllib.request.OpenerDirector = config.get_url_opener() try: with opener.open(config.url) as response: data = response.read().decode('utf-8') except BaseException: logging.exception(f'An error occurred while scraping the calendar endpoint "{name}" ({config.url})') return calendar = cal.Calendar.from_ical(data) for element in calendar.walk(): if element.name == "VEVENT": dtstart = element.get('dtstart').dt # Apparently datetime is a subclass of date... if isinstance(dtstart, date) and not isinstance(dtstart, datetime): dtstart = datetime(dtstart.year, dtstart.month, dtstart.day, tzinfo=start.tzinfo) # Process either end timestamp or duration, if present if 'dtend' in element: evend = element.get('dtend').dt if isinstance(evend, date) and not isinstance(evend, datetime): evend = datetime(evend.year, evend.month, evend.day, tzinfo=start.tzinfo) duration = evend - dtstart elif 'duration' in element: duration = element.get('duration').dt else: duration = timedelta(0) if element.get('rrule') is not None or element.get('rdate') is not None: occurences: Iterable[datetime] = _parse_recurring(element, start, end, duration) else: occurences = [dtstart] for occurence in occurences: if start <= occurence + duration and occurence < end: events.append(Event(name, element, occurence, occurence + duration)) with _SCRAPE_CACHE_LOCK: _SCRAPE_CACHE[name] = events def scrape_calendar(name: str, config: CalendarConfig): # Get current time in configured timezone tz = get_config().tz now: datetime = datetime.now(tz) # Reschedule calendar scraping cron = Timer(config.interval.totimedelta(start=now).total_seconds(), lambda: scrape_calendar(name, config)) cron.start() # Compute interval for which to return events start_delta: Duration = get_config().start_delta end_delta: Duration = get_config().end_delta start: datetime = now + start_delta end: datetime = now + end_delta # Scrape and parse the calendar _scrape_calendar(name, config, start, end) def start_scrape_calendar(name: str, config: CalendarConfig): # Schedule first calendar scraping cron = Timer(0, lambda: scrape_calendar(name, config)) cron.start() def get_calendar(name: str): global _SCRAPE_CACHE with _SCRAPE_CACHE_LOCK: return _SCRAPE_CACHE.get(name, [])