icalendar-timeseries-server/icalendar_timeseries_server/cal.py

101 lines
3.7 KiB
Python
Raw Normal View History

from typing import Dict, List, Iterable
import sys
import urllib.request
from datetime import datetime, date, timedelta
from threading import Timer
from dateutil import rrule
from icalendar import cal
from isodate import Duration
from icalendar_timeseries_server import __version__
from icalendar_timeseries_server.config import get_config, CalendarConfig
from icalendar_timeseries_server.event import Event
_SCRAPE_CACHE: Dict[str, List[Event]] = dict()
__py_version: str = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}'
USER_AGENT: str = f'icalendar-timeseries-server/{__version__} (Python/{__py_version})'
def _parse_recurring(event: cal.Event, start: datetime, end: datetime, duration: timedelta) -> List[datetime]:
occurences: List[datetime] = []
evstart = event.get('dtstart').dt
# First occurence lies in the future; no need to process further
if evstart >= end:
return occurences
# Extract recurrence rules from ical
ical_lines = event.to_ical().decode('utf-8').split('\r\n')
recurrence = '\n'.join(
[x for x in ical_lines
if x.startswith('RRULE') or x.startswith('RDATE') or x.startswith('EXRULE') or x.startswith('EXDATE')])
# Create a generator that yields a timestamp for each recurrence
generator = rrule.rrulestr(recurrence, dtstart=evstart)
# Generate an event entry for each occurence of the event
for dt in generator:
# Skip past occurences and break once the the event lies too far in the future
if dt + duration < start:
continue
if dt > end:
break
# Create an event entry
occurences.append(dt)
return occurences
def _scrape_calendar(name: str, config: CalendarConfig, start: datetime, end: datetime):
global _SCRAPE_CACHE
events = []
opener: urllib.request.OpenerDirector = config.get_url_opener()
with opener.open(config.url) as response:
data = response.read().decode('utf-8')
calendar = cal.Calendar.from_ical(data)
for element in calendar.walk():
if element.name == "VEVENT":
dtstart = element.get('dtstart').dt
if isinstance(dtstart, date):
dtstart = datetime(dtstart.year, dtstart.month, dtstart.day, tzinfo=start.tzinfo)
# Process either end timestamp or duration, if present
if 'dtend' in element:
evend = element.get('dtend').dt
if isinstance(evend, date):
evend = datetime(evend.year, evend.month, evend.day, tzinfo=start.tzinfo)
duration = evend - dtstart
elif 'duration' in element:
duration = element.get('duration').dt
else:
duration = timedelta(0)
if element.get('rrule') is not None or element.get('rdate') is not None:
occurences: Iterable[datetime] = _parse_recurring(element, start, end, duration)
else:
occurences = [dtstart]
for occurence in occurences:
if start <= occurence < end:
events.append(Event(name, element, occurence, occurence + duration))
_SCRAPE_CACHE[name] = events
def scrape_calendar(name: str, config: CalendarConfig):
tz = get_config().tz
now: datetime = datetime.now(tz)
start_delta: Duration = get_config().start_delta
end_delta: Duration = get_config().end_delta
start: datetime = now + start_delta
end: datetime = now + end_delta
_scrape_calendar(name, config, start, end)
cron = Timer(config.interval.totimedelta(start=now).total_seconds(),
lambda: scrape_calendar(name, config))
cron.start()
def get_calendar(name: str):
global _SCRAPE_CACHE
return _SCRAPE_CACHE.get(name, [])