from typing import List import json from urllib.error import HTTPError import traceback import bottle from icalendar_timeseries_server.config import get_config from icalendar_timeseries_server.event import Metric from icalendar_timeseries_server.cal import get_calendar from icalendar_timeseries_server.query import MetricQuery @bottle.route('/api/v1/query') @bottle.route('/api/v1/query_range') def prometheus_api(): events: List[Metric] = [] try: q = MetricQuery(bottle.request.query['query']) except ValueError as e: response = { 'status': 'error', 'errorType': 'bad_data', 'error': str(e) } bottle.response.status = 400 traceback.print_exc() bottle.response.add_header('Content-Type', 'application/json') return json.dumps(response) try: for name in get_config().calendars.keys(): events.extend(get_calendar(name)) events = list(filter(q, events)) events.sort(key=lambda e: e.start) response = { 'status': 'success', 'data': { 'resultType': 'vector', 'result': [e.serialize() for e in events] } } except HTTPError as e: response = { 'status': 'error', 'errorType': 'internal', 'error': str(e) } bottle.response.status = 500 traceback.print_exc() except BaseException: response = { 'status': 'error', 'errorType': 'internal', 'error': 'An internal error occurred.' } bottle.response.status = 500 traceback.print_exc() bottle.response.add_header('Content-Type', 'application/json') return json.dumps(response)