icalendar-timeseries-server/icalendar_timeseries_server/cal.py

93 lines
3.7 KiB
Python
Raw Normal View History

from typing import Dict, List, Iterable, Tuple
import sys
import urllib.request
from datetime import datetime, date, timedelta
from dateutil import rrule
from icalendar import cal
from icalendar_timeseries_server import __version__
from icalendar_timeseries_server.config import get_config, CalendarConfig
from icalendar_timeseries_server.event import Event
_SCRAPE_CACHE: Dict[str, Tuple[datetime, List[Event]]] = dict()
__py_version: str = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}'
USER_AGENT: str = f'icalendar-timeseries-server/{__version__} (Python/{__py_version})'
def _parse_recurring(event: cal.Event, start: datetime, end: datetime, duration: timedelta) -> List[datetime]:
occurences: List[datetime] = []
evstart = event.get('dtstart').dt
# First occurence lies in the future; no need to process further
if evstart >= end:
return occurences
# Extract recurrence rules from ical
ical_lines = event.to_ical().decode('utf-8').split('\r\n')
recurrence = '\n'.join(
[x for x in ical_lines
if x.startswith('RRULE') or x.startswith('RDATE') or x.startswith('EXRULE') or x.startswith('EXDATE')])
# Create a generator that yields a timestamp for each recurrence
generator = rrule.rrulestr(recurrence, dtstart=evstart)
# Generate an event entry for each occurence of the event
for dt in generator:
# Skip past occurences and break once the the event lies too far in the future
if dt + duration < start:
continue
if dt > end:
break
# Create an event entry
occurences.append(dt)
return occurences
def _parse_calendar(name: str, calendar: cal.Calendar, start: datetime, end: datetime) -> List[Event]:
events = []
for element in calendar.walk():
if element.name == "VEVENT":
dtstart = element.get('dtstart').dt
if isinstance(dtstart, date):
dtstart = datetime(dtstart.year, dtstart.month, dtstart.day, tzinfo=start.tzinfo)
# Process either end timestamp or duration, if present
if 'dtend' in element:
evend = element.get('dtend').dt
if isinstance(evend, date):
evend = datetime(evend.year, evend.month, evend.day, tzinfo=start.tzinfo)
duration = evend - dtstart
elif 'duration' in element:
duration = element.get('duration').dt
else:
duration = timedelta(0)
if element.get('rrule') is not None or element.get('rdate') is not None:
occurences: Iterable[datetime] = _parse_recurring(element, start, end, duration)
else:
occurences = [dtstart]
for occurence in occurences:
if start <= occurence < end:
events.append(Event(name, element, occurence, occurence + duration))
return events
def scrape_calendar(name: str, config: CalendarConfig, start: datetime, end: datetime) -> List[Event]:
global _SCRAPE_CACHE
now: datetime = datetime.now(tz=get_config().tz)
if get_config().cache.total_seconds() > 0 and name in _SCRAPE_CACHE:
cache_timeout, cached = _SCRAPE_CACHE[name]
if now < cache_timeout:
print('serving cached')
return cached
print('doing request')
opener: urllib.request.OpenerDirector = config.get_url_opener()
with opener.open(config.url) as response:
data = response.read().decode('utf-8')
calendar = cal.Calendar.from_ical(data)
parsed: List[Event] = _parse_calendar(name, calendar, start, end)
_SCRAPE_CACHE[name] = now + get_config().cache, parsed
return parsed