icalendar-timeseries-server/icalendar_timeseries_server/api.py
2021-05-24 05:33:32 +02:00

60 lines
1.9 KiB
Python

from typing import List
import json
import logging
import bottle
from icalendar_timeseries_server.config import get_config
from icalendar_timeseries_server.event import Metric
from icalendar_timeseries_server.cal import get_calendar_events, get_calendar_todos
from icalendar_timeseries_server.query import MetricQuery
@bottle.route('/api/v1/query')
@bottle.route('/api/v1/query_range')
def prometheus_api():
events: List[Metric] = []
try:
q = MetricQuery(bottle.request.query['query'])
except ValueError as e:
response = {
'status': 'error',
'errorType': 'bad_data',
'error': str(e)
}
bottle.response.status = 400
logging.exception('Cannot parse PromQL query')
bottle.response.add_header('Content-Type', 'application/json')
return json.dumps(response)
try:
for name in get_config().calendars.keys():
if q.name == 'event':
events.extend(get_calendar_events(name))
events = list(filter(q, events))
events.sort(key=lambda e: e.start)
elif q.name == 'todo':
events.extend(get_calendar_todos(name))
events = list(filter(q, events))
# Sort by due date and priority
events.sort(key=lambda e: (e.due is None, e.due, e.priority))
response = {
'status': 'success',
'data': {
'resultType': 'vector',
'result': [e.serialize() for e in events]
}
}
except BaseException:
response = {
'status': 'error',
'errorType': 'internal',
'error': 'An internal error occurred.'
}
bottle.response.status = 500
logging.exception('An internal error occurred')
bottle.response.add_header('Content-Type', 'application/json')
return json.dumps(response)