from typing import Any, Dict, List, Optional, Type, Union import base64 import json from datetime import timedelta import ssl import urllib.request import sys import logging import pytz import jinja2 from isodate import Duration, parse_duration from icalendar_timeseries_server import __version__ CONFIG: Optional['Config'] = None JENV: Optional[jinja2.Environment] = None __py_version: str = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}' USER_AGENT: str = f'icalendar-timeseries-server/{__version__} (Python/{__py_version})' class CalendarConfig: def __init__(self, config: Dict[str, Any], config_path: str) -> None: self._url: str = _keycheck('url', config, str, config_path) self._scrape_interval: Duration = _parse_timedelta('interval', config, config_path, default_value='PT15M') self._ca: Optional[str] = _keycheck('ca', config, str, config_path, optional=True) auth: Dict[str, Any] = _keycheck('auth', config, dict, config_path, default_value={'type': 'none'}) self._authtype: str = _keycheck('type', auth, str, f'{config_path}.auth', default_value='none', valid_values=['none', 'basic', 'tls']) self._request_headers: Dict[str, str] = { 'User-Agent': USER_AGENT } if self._authtype == 'none': # No auth, nothing to do pass elif self._authtype == 'basic': # Build the Authorization header required for HTTP Basic Auth username: str = _keycheck('username', auth, str, f'{config_path}.auth') password: str = _keycheck('password', auth, str, f'{config_path}.auth') token: str = base64.b64encode(f'{username}:{password}'.encode()).decode() self._request_headers['Authorization'] = f'Basic {token}' elif self._authtype == 'tls': # Store TLS config for later use self._tls_keyfile: str = _keycheck('keyfile', auth, str, f'{config_path}.auth') self._tls_passphrase: str = _keycheck('passphrase', auth, str, f'{config_path}.auth', optional=True) @property def url(self) -> str: return self._url @property def interval(self) -> Duration: return self._scrape_interval def get_url_opener(self) -> urllib.request.OpenerDirector: if self._authtype == 'tls': # Create an OpenSSL context and load the CA and client certificates context: ssl.SSLContext = ssl.create_default_context(cafile=self._ca) context.load_cert_chain(certfile=self._tls_keyfile, password=self._tls_passphrase) opener: urllib.request.OpenerDirector = \ urllib.request.build_opener(urllib.request.HTTPSHandler(context=context)) elif self._ca is not None: # No TLS Client Certificate, but load the CA certificate context: ssl.SSLContext = ssl.create_default_context(cafile=self._ca) opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=context)) else: # No TLS config, return default opener opener = urllib.request.build_opener() # Set request headers, at least User-Agent, optionally Authorization opener.addheaders = [(k, v) for k, v in self._request_headers.items()] return opener class Config: def __init__(self, config: Dict[str, Any] = None) -> None: if config is None: config = dict() self._addr: str = _keycheck('addr', config, str, '', default_value='127.0.0.1') self._port: int = _keycheck('port', config, int, '', default_value=8090) self._tz: pytz.tzinfo = _parse_timezone('tz', config, '', default_value='UTC') self._start_delta: Duration = _parse_timedelta('start_delta', config, '', default_value='PT') self._end_delta: Duration = _parse_timedelta('end_delta', config, '', default_value='P30D') self._calendars: Dict[str, CalendarConfig] = self._parse_calendars_config('calendars', config, '') self._key_replace = _parse_key_replace('key_replace', config, '') self._value_replace = _parse_value_replace('value_replace', config, '') @staticmethod def _parse_calendars_config(key: str, config: Dict[str, Any], path: str) -> Dict[str, CalendarConfig]: cdef: Dict[str, Any] = _keycheck(key, config, dict, path, default_value={}) calendars: Dict[str, CalendarConfig] = dict() for name, c in cdef.items(): calendar: CalendarConfig = CalendarConfig(c, f'{path}.{name}') calendars[name] = calendar return calendars @property def addr(self) -> str: return self._addr @property def port(self) -> int: return self._port @property def tz(self) -> pytz.tzinfo: return self._tz @property def start_delta(self) -> Duration: return self._start_delta @property def end_delta(self) -> Duration: return self._end_delta @property def calendars(self) -> Dict[str, CalendarConfig]: return self._calendars @property def key_replace(self) -> Dict[str, str]: return self._key_replace @property def value_replace(self) -> Dict[str, str]: return self._value_replace def _keycheck(key: str, config: Dict[str, Any], typ: Type, path: str, default_value: Any = None, optional: bool = False, valid_values: Optional[List[Any]] = None) -> Any: if key not in config: if default_value is not None or optional: return default_value raise KeyError(f'Expected member "{key}" not found at path {path}') value: Any = config[key] if not isinstance(value, typ): raise TypeError(f'Expected {typ.__name__}, not {type(value).__name__} for path {path}.{key}') if valid_values is not None: if value not in valid_values: raise ValueError(f'Expected one of {", ".join(valid_values)} ({typ}), not {value} for path {path}.{key}') return value def _parse_timedelta(key: str, config: Dict[str, Any], path: str, default_value: Any = None, force_positive: bool = False) -> Duration: tdstring: str = _keycheck(key, config, str, path, default_value=default_value) duration: Union[Duration, timedelta] = parse_duration(tdstring) if isinstance(duration, timedelta): duration = Duration(days=duration.days, seconds=duration.seconds, microseconds=duration.microseconds) if force_positive and duration.total_seconds() < 0: raise ValueError(f'Duration must be positive for path {path}.{key}') return duration def _parse_timezone(key: str, config: Dict[str, Any], path: str, default_value: Any = None) -> Any: zonename: str = _keycheck(key, config, str, path, default_value=default_value) return pytz.timezone(zonename) def _parse_key_replace(key: str, config: Dict[str, Any], path: str) -> Dict[str, str]: key_replace: Dict[str, Any] = _keycheck(key, config, dict, path, default_value={}) for label in key_replace.keys(): _keycheck(label, key_replace, str, f'{path}.{key}') return key_replace def _parse_value_replace(key: str, config: Dict[str, Any], path: str) -> Dict[str, str]: value_replace: Dict[str, Any] = _keycheck(key, config, dict, path, default_value={}) for label in value_replace.keys(): _keycheck(label, value_replace, str, f'{path}.{key}') return value_replace def get_config() -> Config: global CONFIG return CONFIG def get_jenv() -> jinja2.Environment: global JENV return JENV def load_config(filename: str): global CONFIG, JENV try: with open(filename, 'r') as f: json_config = json.loads(f.read()) CONFIG = Config(json_config) JENV = jinja2.Environment() except json.JSONDecodeError as e: logging.exception('Cannot parse config JSON') raise e except Exception as e: logging.error(e) raise e def load_default_config(): global CONFIG, JENV CONFIG = Config({}) JENV = jinja2.Environment()