icalendar-timeseries-server/icalendar_timeseries_server/query.py

191 lines
7 KiB
Python
Raw Permalink Normal View History

from typing import Dict
import re
import logging
LABEL_MATCH_OPERATORS = [
'=',
'!=',
'=~',
'!~'
]
class Metric:
def __init__(self, name: str, labels: Dict[str, str]):
self._name: str = name
self._labels: Dict[str, str] = dict()
for k, v in labels.items():
self._labels[k] = v
@property
def name(self) -> str:
return self._name
def keys(self):
return self._labels.keys()
def __getitem__(self, item: str) -> str:
return self._labels.get(item, None)
def __getattr__(self, item: str) -> str:
return self._labels.get(item, None)
class LabelFilter:
def __init__(self, op: str, label: str, value: str):
if op not in LABEL_MATCH_OPERATORS:
raise ValueError()
self._op = op
self._label = label
self._value = value
def __call__(self, metric: Metric) -> bool:
if self._label not in metric.keys():
return False
if self._op == '=':
return metric[self._label] == self._value
elif self._op == '!=':
return metric[self._label] != self._value
elif self._op == '=~':
return re.match(self._value, metric[self._label]) is not None
elif self._op == '!~':
return re.match(self._value, metric[self._label]) is None
class MetricQuery:
def __init__(self, q: str) -> None:
self._filters = []
# The query '1+1' is used by Grafana to test the connection.
if q == "1+1":
return
self.__parse(q)
def __parse(self, q: str):
logging.debug(f'Parsing PromQL query string: {q}')
# globalstate:
# 0 = parsing metric name
# 1 = parsing filters
# 2 = fully parsed query
globalstate = 0
self._metric_name = ''
# buffer for the name of the filter that is being parsed
cfiltername = ''
# buffer for the filter operator that is being parsed
coperator = ''
# buffer for the filter value that is being parsed
cfiltervalue = ''
# filterstate:
# 0 = parsing label name
# 1 = parsing operator
# 2 = parsing filter value
filterstate = 0
# Whether currently inside a quoted string
quoted = False
# Whether the last character started an escape sequence
escaping = False
# query string index
i = 0
while i < len(q):
c = q[i]
i += 1
if globalstate == 0:
# Parsing the metric name: append the character to the name buffer,
# or move on to the filter parsing step when "{" is encountered.
if c == '{':
globalstate = 1
else:
self._metric_name += c
elif globalstate == 1:
# Parsing the query's filters, until an unescaped "}" is encountered.
if filterstate == 0:
# Parsing the filter's label name.
# When a character is not accepted, re-loop over it in the next mode.
if c in '_: 0123456789 ' or ('a' <= c <= 'z') or ('A' <= c <= 'Z'):
cfiltername += c
else:
filterstate = 1
i -= 1
elif filterstate == 1:
# Parsing the filter's operator, one of =, !=, =~, !~.
# When a character is not accepted, re-loop over it in the next mode.
if c in '!~= ':
coperator += c
else:
filterstate = 2
i -= 1
elif filterstate == 2:
# Parsing the filter's value.
# When a "," is encountered, start with the next filter. When a "}" is encountered, parsing is done.
if not quoted:
if c.strip() == '':
continue
if c == '"':
quoted = True
elif c == ',' or c == '}':
# Done parsing the filter. Validate label name and operator.
if coperator not in LABEL_MATCH_OPERATORS:
raise ValueError(f'Invalid operator: {coperator.strip()}')
if not re.match('^[a-zA-Z_:][a-zA-Z0-9_:]*$', cfiltername.strip()):
raise ValueError(f'Invalid label name: {cfiltername.strip()}')
self._filters.append(LabelFilter(coperator.strip(), cfiltername.strip(), cfiltervalue))
# Reset filter buffers
cfiltername = ''
coperator = ''
cfiltervalue = ''
filterstate = 0
if c == '}':
globalstate = 2
else:
raise ValueError()
else:
# Quoting and escaping
if c == '"' and not escaping:
quoted = False
else:
if escaping:
escaping = False
cfiltervalue += c
else:
if c == '\\':
escaping = True
else:
cfiltervalue += c
else:
if c.strip() == '':
# Trim remaining whitespace
continue
else:
raise ValueError('Garbage after metric query')
# Match metric name against the regex from the Prometheus documentation
if not re.match('^[a-zA-Z_:][a-zA-Z0-9_:]*$', self._metric_name.strip()):
raise ValueError(f'Invalid metric name: {self._metric_name.strip()}')
if escaping:
raise ValueError('Expected escape sequence, got EOF')
elif quoted:
raise ValueError('Expected \'\"\', got EOF')
elif globalstate == 1:
raise ValueError('Expected \'}\', got EOF')
elif filterstate != 0:
raise ValueError('Unexpected EOF')
def __call__(self, metric: Metric):
"""
Applies the filter deducted from the query string to the given metric.
:param metric: The metric to apply the filter to.
:return: True if the filter matches, False otherwise.
"""
# Never match a metric with a different name
if metric.name != self._metric_name:
return False
# Iterate the single filters, return False if even one does not match
for f in self._filters:
if not f(metric):
return False
# Return True if all filters matched
return True