icalendar-timeseries-server/icalendar_timeseries_server/api.py

55 lines
1.6 KiB
Python
Raw Permalink Normal View History

from typing import List
import json
import logging
import bottle
from icalendar_timeseries_server.config import get_config
from icalendar_timeseries_server.event import Metric
from icalendar_timeseries_server.cal import get_calendar
from icalendar_timeseries_server.query import MetricQuery
@bottle.route('/api/v1/query')
@bottle.route('/api/v1/query_range')
def prometheus_api():
events: List[Metric] = []
try:
q = MetricQuery(bottle.request.query['query'])
except ValueError as e:
response = {
'status': 'error',
'errorType': 'bad_data',
'error': str(e)
}
bottle.response.status = 400
logging.exception('Cannot parse PromQL query')
bottle.response.add_header('Content-Type', 'application/json')
return json.dumps(response)
try:
for name in get_config().calendars.keys():
events.extend(get_calendar(name))
events = list(filter(q, events))
events.sort(key=lambda e: e.start)
response = {
'status': 'success',
'data': {
'resultType': 'vector',
'result': [e.serialize() for e in events]
}
}
except BaseException:
response = {
'status': 'error',
'errorType': 'internal',
'error': 'An internal error occurred.'
}
bottle.response.status = 500
logging.exception('An internal error occurred')
bottle.response.add_header('Content-Type', 'application/json')
return json.dumps(response)