icalendar-timeseries-server/icalendar_timeseries_server/event.py

66 lines
2 KiB
Python

from typing import Any, Dict, List, Set
import icalendar
import jinja2
from datetime import datetime
from icalendar_timeseries_server.config import get_config, get_jenv
from icalendar_timeseries_server.query import Metric
_ATTRIBUTES: List[str] = [
'attachs',
'categories',
'class',
'comment',
'description',
'geo',
'location',
'percent-complete',
'priority',
'resources',
'status',
'summary'
]
class Event(Metric):
def __init__(self, cname: str, event: icalendar.cal.Event, start: datetime, end: datetime):
self.start: datetime = start
self.calendar: str = cname
# self.attributes: Dict[str, str] = dict()
attributes: Dict[str, str] = dict()
tmp: Dict[str, Any] = {
'calendar': cname,
'start': start,
'end': end
}
for attr in _ATTRIBUTES:
tmp[attr] = event.get(attr, '')
substitution_keys = set(_ATTRIBUTES)
substitution_keys.update(tmp.keys())
substitution_keys.update(get_config().key_replace.keys())
substitution_keys.update(get_config().value_replace.keys())
for attr in substitution_keys:
newkey: str = get_config().key_replace.get(attr, attr)
value: str = tmp.get(attr, '')
newval_template: str = get_config().value_replace.get(attr, str(value))
jtemplate: jinja2.Template = get_jenv().from_string(newval_template)
newvalue: str = jtemplate.render(**tmp)
attributes[newkey] = newvalue
self.uid: str = f'{cname}-{start.strftime("%Y%m%dT%H%M%S%Z")}'
super().__init__('event', attributes)
def serialize(self) -> Dict[str, Any]:
event: Dict[str, Any] = {
'metric': {
'__name__': 'event',
'calendar': self.calendar
},
'value': [
self.start.timestamp(),
1
]
}
event['metric'].update(self._labels)
return event