from typing import Dict import re import logging LABEL_MATCH_OPERATORS = [ '=', '!=', '=~', '!~' ] class Metric: def __init__(self, name: str, labels: Dict[str, str]): self._name: str = name self._labels: Dict[str, str] = dict() for k, v in labels.items(): self._labels[k] = v @property def name(self) -> str: return self._name def keys(self): return self._labels.keys() def __getitem__(self, item: str) -> str: return self._labels.get(item, None) def __getattr__(self, item: str) -> str: return self._labels.get(item, None) class LabelFilter: def __init__(self, op: str, label: str, value: str): if op not in LABEL_MATCH_OPERATORS: raise ValueError() self._op = op self._label = label self._value = value def __call__(self, metric: Metric) -> bool: if self._label not in metric.keys(): return False if self._op == '=': return metric[self._label] == self._value elif self._op == '!=': return metric[self._label] != self._value elif self._op == '=~': return re.match(self._value, metric[self._label]) is not None elif self._op == '!~': return re.match(self._value, metric[self._label]) is None class MetricQuery: def __init__(self, q: str) -> None: self._filters = [] # The query '1+1' is used by Grafana to test the connection. if q == "1+1": return self.__parse(q) def __parse(self, q: str): logging.debug(f'Parsing PromQL query string: {q}') # globalstate: # 0 = parsing metric name # 1 = parsing filters # 2 = fully parsed query globalstate = 0 self._metric_name = '' # buffer for the name of the filter that is being parsed cfiltername = '' # buffer for the filter operator that is being parsed coperator = '' # buffer for the filter value that is being parsed cfiltervalue = '' # filterstate: # 0 = parsing label name # 1 = parsing operator # 2 = parsing filter value filterstate = 0 # Whether currently inside a quoted string quoted = False # Whether the last character started an escape sequence escaping = False # query string index i = 0 while i < len(q): c = q[i] i += 1 if globalstate == 0: # Parsing the metric name: append the character to the name buffer, # or move on to the filter parsing step when "{" is encountered. if c == '{': globalstate = 1 else: self._metric_name += c elif globalstate == 1: # Parsing the query's filters, until an unescaped "}" is encountered. if filterstate == 0: # Parsing the filter's label name. # When a character is not accepted, re-loop over it in the next mode. if c in '_: 0123456789 ' or ('a' <= c <= 'z') or ('A' <= c <= 'Z'): cfiltername += c else: filterstate = 1 i -= 1 elif filterstate == 1: # Parsing the filter's operator, one of =, !=, =~, !~. # When a character is not accepted, re-loop over it in the next mode. if c in '!~= ': coperator += c else: filterstate = 2 i -= 1 elif filterstate == 2: # Parsing the filter's value. # When a "," is encountered, start with the next filter. When a "}" is encountered, parsing is done. if not quoted: if c.strip() == '': continue if c == '"': quoted = True elif c == ',' or c == '}': # Done parsing the filter. Validate label name and operator. if coperator not in LABEL_MATCH_OPERATORS: raise ValueError(f'Invalid operator: {coperator.strip()}') if not re.match('^[a-zA-Z_:][a-zA-Z0-9_:]*$', cfiltername.strip()): raise ValueError(f'Invalid label name: {cfiltername.strip()}') self._filters.append(LabelFilter(coperator.strip(), cfiltername.strip(), cfiltervalue)) # Reset filter buffers cfiltername = '' coperator = '' cfiltervalue = '' filterstate = 0 if c == '}': globalstate = 2 else: raise ValueError() else: # Quoting and escaping if c == '"' and not escaping: quoted = False else: if escaping: escaping = False cfiltervalue += c else: if c == '\\': escaping = True else: cfiltervalue += c else: if c.strip() == '': # Trim remaining whitespace continue else: raise ValueError('Garbage after metric query') # Match metric name against the regex from the Prometheus documentation if not re.match('^[a-zA-Z_:][a-zA-Z0-9_:]*$', self._metric_name.strip()): raise ValueError(f'Invalid metric name: {self._metric_name.strip()}') if escaping: raise ValueError('Expected escape sequence, got EOF') elif quoted: raise ValueError('Expected \'\"\', got EOF') elif globalstate == 1: raise ValueError('Expected \'}\', got EOF') elif filterstate != 0: raise ValueError('Unexpected EOF') def __call__(self, metric: Metric) -> bool: """ Applies the filter deducted from the query string to the given metric. :param metric: The metric to apply the filter to. :return: True if the filter matches, False otherwise. """ # Never match a metric with a different name if metric.name != self._metric_name: return False # Iterate the single filters, return False if even one does not match for f in self._filters: if not f(metric): return False # Return True if all filters matched return True @property def name(self) -> str: return self._metric_name