from typing import List import json import logging import bottle from icalendar_timeseries_server.config import get_config from icalendar_timeseries_server.event import Metric from icalendar_timeseries_server.cal import get_calendar from icalendar_timeseries_server.query import MetricQuery @bottle.route('/api/v1/query') @bottle.route('/api/v1/query_range') def prometheus_api(): events: List[Metric] = [] try: q = MetricQuery(bottle.request.query['query']) except ValueError as e: response = { 'status': 'error', 'errorType': 'bad_data', 'error': str(e) } bottle.response.status = 400 logging.exception('Cannot parse PromQL query') bottle.response.add_header('Content-Type', 'application/json') return json.dumps(response) try: for name in get_config().calendars.keys(): events.extend(get_calendar(name)) events = list(filter(q, events)) events.sort(key=lambda e: e.start) response = { 'status': 'success', 'data': { 'resultType': 'vector', 'result': [e.serialize() for e in events] } } except BaseException: response = { 'status': 'error', 'errorType': 'internal', 'error': 'An internal error occurred.' } bottle.response.status = 500 logging.exception('An internal error occurred') bottle.response.add_header('Content-Type', 'application/json') return json.dumps(response)