2019-08-20 00:24:51 +02:00
from typing import Any, Dict, List, Optional, Type, Union
import base64
import json
from datetime import timedelta
import ssl
import urllib.request
import sys
2019-09-01 23:13:24 +02:00
import logging
2019-08-20 00:24:51 +02:00
import pytz
import jinja2
from isodate import Duration, parse_duration
from icalendar_timeseries_server import __version__
CONFIG: Optional['Config'] = None
JENV: Optional[jinja2.Environment] = None
__py_version: str = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}'
USER_AGENT: str = f'icalendar-timeseries-server/{__version__} (Python/{__py_version})'
class CalendarConfig:
def __init__(self, config: Dict[str, Any], config_path: str) -> None:
self._url: str = _keycheck('url', config, str, config_path)
2019-08-20 15:53:51 +02:00
self._scrape_interval: Duration = _parse_timedelta('interval', config, config_path, default_value='PT15M')
2019-08-20 00:24:51 +02:00
self._ca: Optional[str] = _keycheck('ca', config, str, config_path, optional=True)
auth: Dict[str, Any] = _keycheck('auth', config, dict, config_path, default_value={'type': 'none'})
self._authtype: str = _keycheck('type', auth, str, f'{config_path}.auth',
valid_values=['none', 'basic', 'tls'])
self._request_headers: Dict[str, str] = {
'User-Agent': USER_AGENT
if self._authtype == 'none':
# No auth, nothing to do
elif self._authtype == 'basic':
# Build the Authorization header required for HTTP Basic Auth
username: str = _keycheck('username', auth, str, f'{config_path}.auth')
password: str = _keycheck('password', auth, str, f'{config_path}.auth')
token: str = base64.b64encode(f'{username}:{password}'.encode()).decode()
self._request_headers['Authorization'] = f'Basic {token}'
elif self._authtype == 'tls':
# Store TLS config for later use
self._tls_keyfile: str = _keycheck('keyfile', auth, str, f'{config_path}.auth')
self._tls_passphrase: str = _keycheck('passphrase', auth, str, f'{config_path}.auth', optional=True)
def url(self) -> str:
return self._url
2019-08-20 15:53:51 +02:00
def interval(self) -> Duration:
return self._scrape_interval
2019-08-20 00:24:51 +02:00
def get_url_opener(self) -> urllib.request.OpenerDirector:
if self._authtype == 'tls':
# Create an OpenSSL context and load the CA and client certificates
context: ssl.SSLContext = ssl.create_default_context(cafile=self._ca)
context.load_cert_chain(certfile=self._tls_keyfile, password=self._tls_passphrase)
opener: urllib.request.OpenerDirector = \
elif self._ca is not None:
# No TLS Client Certificate, but load the CA certificate
context: ssl.SSLContext = ssl.create_default_context(cafile=self._ca)
opener = urllib.request.build_opener(urllib.request.HTTPSHandler(context=context))
# No TLS config, return default opener
opener = urllib.request.build_opener()
# Set request headers, at least User-Agent, optionally Authorization
opener.addheaders = [(k, v) for k, v in self._request_headers.items()]
return opener
class Config:
def __init__(self, config: Dict[str, Any] = None) -> None:
if config is None:
config = dict()
self._addr: str = _keycheck('addr', config, str, '', default_value='')
self._port: int = _keycheck('port', config, int, '', default_value=8090)
self._tz: pytz.tzinfo = _parse_timezone('tz', config, '', default_value='UTC')
self._start_delta: Duration = _parse_timedelta('start_delta', config, '', default_value='PT')
self._end_delta: Duration = _parse_timedelta('end_delta', config, '', default_value='P30D')
self._calendars: Dict[str, CalendarConfig] = self._parse_calendars_config('calendars', config, '')
self._key_replace = _parse_key_replace('key_replace', config, '')
self._value_replace = _parse_value_replace('value_replace', config, '')
def _parse_calendars_config(key: str,
config: Dict[str, Any],
path: str) -> Dict[str, CalendarConfig]:
cdef: Dict[str, Any] = _keycheck(key, config, dict, path, default_value={})
calendars: Dict[str, CalendarConfig] = dict()
for name, c in cdef.items():
calendar: CalendarConfig = CalendarConfig(c, f'{path}.{name}')
calendars[name] = calendar
return calendars
def addr(self) -> str:
return self._addr
def port(self) -> int:
return self._port
def tz(self) -> pytz.tzinfo:
return self._tz
def start_delta(self) -> Duration:
return self._start_delta
def end_delta(self) -> Duration:
return self._end_delta
def calendars(self) -> Dict[str, CalendarConfig]:
return self._calendars
def key_replace(self) -> Dict[str, str]:
return self._key_replace
def value_replace(self) -> Dict[str, str]:
return self._value_replace
def _keycheck(key: str,
config: Dict[str, Any],
typ: Type,
path: str,
default_value: Any = None,
optional: bool = False,
valid_values: Optional[List[Any]] = None) -> Any:
if key not in config:
if default_value is not None or optional:
return default_value
raise KeyError(f'Expected member "{key}" not found at path {path}')
value: Any = config[key]
if not isinstance(value, typ):
2019-09-01 23:13:24 +02:00
raise TypeError(f'Expected {typ.__name__}, not {type(value).__name__} for path {path}.{key}')
2019-08-20 00:24:51 +02:00
if valid_values is not None:
if value not in valid_values:
raise ValueError(f'Expected one of {", ".join(valid_values)} ({typ}), not {value} for path {path}.{key}')
return value
def _parse_timedelta(key: str,
config: Dict[str, Any],
path: str,
default_value: Any = None,
force_positive: bool = False) -> Duration:
tdstring: str = _keycheck(key, config, str, path, default_value=default_value)
duration: Union[Duration, timedelta] = parse_duration(tdstring)
if isinstance(duration, timedelta):
duration = Duration(days=duration.days,
if force_positive and duration.total_seconds() < 0:
raise ValueError(f'Duration must be positive for path {path}.{key}')
return duration
def _parse_timezone(key: str,
config: Dict[str, Any],
path: str,
default_value: Any = None) -> Any:
zonename: str = _keycheck(key, config, str, path, default_value=default_value)
return pytz.timezone(zonename)
def _parse_key_replace(key: str,
config: Dict[str, Any],
path: str) -> Dict[str, str]:
key_replace: Dict[str, Any] = _keycheck(key, config, dict, path, default_value={})
for label in key_replace.keys():
_keycheck(label, key_replace, str, f'{path}.{key}')
return key_replace
def _parse_value_replace(key: str,
config: Dict[str, Any],
path: str) -> Dict[str, str]:
value_replace: Dict[str, Any] = _keycheck(key, config, dict, path, default_value={})
for label in value_replace.keys():
_keycheck(label, value_replace, str, f'{path}.{key}')
return value_replace
def get_config() -> Config:
global CONFIG
return CONFIG
def get_jenv() -> jinja2.Environment:
global JENV
return JENV
def load_config(filename: str):
2019-09-01 23:13:24 +02:00
with open(filename, 'r') as f:
json_config = json.loads(f.read())
CONFIG = Config(json_config)
JENV = jinja2.Environment()
except json.JSONDecodeError as e:
logging.exception('Cannot parse config JSON')
raise e
except Exception as e:
raise e
2019-08-20 00:24:51 +02:00
def load_default_config():
CONFIG = Config({})
JENV = jinja2.Environment()