113 lines
4.2 KiB
Python
113 lines
4.2 KiB
Python
from typing import Dict, List, Iterable
|
|
|
|
import sys
|
|
import urllib.request
|
|
from datetime import datetime, date, timedelta
|
|
from threading import Lock, Timer
|
|
|
|
from dateutil import rrule
|
|
from icalendar import cal
|
|
from isodate import Duration
|
|
|
|
from icalendar_timeseries_server import __version__
|
|
from icalendar_timeseries_server.config import get_config, CalendarConfig
|
|
from icalendar_timeseries_server.event import Event
|
|
|
|
|
|
_SCRAPE_CACHE: Dict[str, List[Event]] = dict()
|
|
_SCRAPE_CACHE_LOCK: Lock = Lock()
|
|
|
|
__py_version: str = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}'
|
|
USER_AGENT: str = f'icalendar-timeseries-server/{__version__} (Python/{__py_version})'
|
|
|
|
|
|
def _parse_recurring(event: cal.Event, start: datetime, end: datetime, duration: timedelta) -> List[datetime]:
|
|
occurences: List[datetime] = []
|
|
|
|
evstart = event.get('dtstart').dt
|
|
# First occurence lies in the future; no need to process further
|
|
if evstart >= end:
|
|
return occurences
|
|
|
|
# Extract recurrence rules from ical
|
|
ical_lines = event.to_ical().decode('utf-8').split('\r\n')
|
|
recurrence = '\n'.join(
|
|
[x for x in ical_lines
|
|
if x.startswith('RRULE') or x.startswith('RDATE') or x.startswith('EXRULE') or x.startswith('EXDATE')])
|
|
# Create a generator that yields a timestamp for each recurrence
|
|
generator = rrule.rrulestr(recurrence, dtstart=evstart)
|
|
|
|
# Generate an event entry for each occurence of the event
|
|
for dt in generator:
|
|
# Skip past occurences and break once the the event lies too far in the future
|
|
if dt + duration < start:
|
|
continue
|
|
if dt > end:
|
|
break
|
|
# Create an event entry
|
|
occurences.append(dt)
|
|
return occurences
|
|
|
|
|
|
def _scrape_calendar(name: str, config: CalendarConfig, start: datetime, end: datetime):
|
|
global _SCRAPE_CACHE, _SCRAPE_CACHE_LOCK
|
|
events = []
|
|
|
|
opener: urllib.request.OpenerDirector = config.get_url_opener()
|
|
with opener.open(config.url) as response:
|
|
data = response.read().decode('utf-8')
|
|
calendar = cal.Calendar.from_ical(data)
|
|
|
|
for element in calendar.walk():
|
|
if element.name == "VEVENT":
|
|
dtstart = element.get('dtstart').dt
|
|
if isinstance(dtstart, date):
|
|
dtstart = datetime(dtstart.year, dtstart.month, dtstart.day, tzinfo=start.tzinfo)
|
|
# Process either end timestamp or duration, if present
|
|
if 'dtend' in element:
|
|
evend = element.get('dtend').dt
|
|
if isinstance(evend, date):
|
|
evend = datetime(evend.year, evend.month, evend.day, tzinfo=start.tzinfo)
|
|
duration = evend - dtstart
|
|
elif 'duration' in element:
|
|
duration = element.get('duration').dt
|
|
else:
|
|
duration = timedelta(0)
|
|
if element.get('rrule') is not None or element.get('rdate') is not None:
|
|
occurences: Iterable[datetime] = _parse_recurring(element, start, end, duration)
|
|
else:
|
|
occurences = [dtstart]
|
|
for occurence in occurences:
|
|
if start <= occurence < end:
|
|
events.append(Event(name, element, occurence, occurence + duration))
|
|
with _SCRAPE_CACHE_LOCK:
|
|
_SCRAPE_CACHE[name] = events
|
|
|
|
|
|
def scrape_calendar(name: str, config: CalendarConfig):
|
|
# Get current time in configured timezone
|
|
tz = get_config().tz
|
|
now: datetime = datetime.now(tz)
|
|
# Reschedule calendar scraping
|
|
cron = Timer(config.interval.totimedelta(start=now).total_seconds(),
|
|
lambda: scrape_calendar(name, config))
|
|
cron.start()
|
|
# Compute interval for which to return events
|
|
start_delta: Duration = get_config().start_delta
|
|
end_delta: Duration = get_config().end_delta
|
|
start: datetime = now + start_delta
|
|
end: datetime = now + end_delta
|
|
# Scrape and parse the calendar
|
|
_scrape_calendar(name, config, start, end)
|
|
|
|
|
|
def start_scrape_calendar(name: str, config: CalendarConfig):
|
|
# Schedule first calendar scraping
|
|
cron = Timer(0, lambda: scrape_calendar(name, config))
|
|
cron.start()
|
|
|
|
|
|
def get_calendar(name: str):
|
|
global _SCRAPE_CACHE
|
|
with _SCRAPE_CACHE_LOCK:
|
|
return _SCRAPE_CACHE.get(name, [])
|