prometheus-tlsrpt-exporter/tlsrpt_exporter/__main__.py
s3lph 38fe3a49c1
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
feat: add support for application/tlsrpt+gzip
2023-07-24 01:49:40 +02:00

182 lines
6.8 KiB
Python

import argparse
import os
import dataclasses
import gzip
import json
from datetime import datetime
import bottle
import jinja2
def unzip(xs):
return list(zip(*xs))
class Reporter(bottle.Bottle):
def __init__(self, metrics, ns):
super().__init__()
self.datadir = ns.data_dir
self.metrics = metrics
self.route('/', 'POST', self.report)
def handle(self, data):
print(data)
if 'report-id' not in data:
return
prefix = f'{int(datetime.utcnow().timestamp())}+'
if len(data.get('policies', [])) == 0:
return
domain = None
for p in data.get('policies', []):
policy_domain = p.get('policy', {}).get('policy-domain')
if domain is None:
domain = policy_domain
count_s = p.get('summary', {}).get('total-successful-session-count', 0)
count_f = p.get('summary', {}).get('total-failure-session-count', 0)
self.metrics.add_report(policy_domain, count_s, count_f)
prefix += f'{count_s},{count_f};'
if domain is None:
return
fname = os.path.join(self.datadir, 'reports', prefix[:-1] + '+' + domain + '+' + data['report-id'])
with open(fname, 'w') as f:
json.dump(data, f)
def report(self):
bottle.response.add_header('Accept', 'application/tlsrpt+gzip')
bottle.response.add_header('Accept', 'application/tlsrpt+json')
if 'application/tlsrpt+gzip' in bottle.request.content_type:
self.handle(json.load(gzip.open(bottle.request.body)))
elif 'application/tlsrpt+json' in bottle.request.content_type:
self.handle(json.load(bottle.request.body))
else:
bottle.response.status = 406
class Metrics(bottle.Bottle):
def __init__(self, ns):
super().__init__()
self.datadir = ns.data_dir
self.count = {}
self.count_s = {}
self.count_f = {}
self.load_reports()
self.route('/', 'GET', self.metrics)
def add_report(self, domain, count_s, count_f):
self.count_s[domain] = self.count_s.setdefault(domain, 0) + count_s
self.count_f[domain] = self.count_f.setdefault(domain, 0) + count_f
self.count[domain] = self.count.setdefault(domain, 0) + 1
def load_reports(self):
for report in os.listdir(os.path.join(self.datadir, 'reports')):
if report.startswith('.'):
continue
_, stats, domain, _ = report.split('+', 3)
for p in stats.split(';'):
s, f = p.split(',')
self.add_report(domain, int(s), int(f))
def metrics(self):
bottle.response.headers['Content-Type'] = 'text/plain; version=0.0.4'
response = '# TYPE tlsrpt_successful counter\n'
response += '# HELP tlsrpt_successful Number of successful sessions\n'
for domain, count in self.count_s.items():
response += f'tlsrpt_successful{{domain="{domain}"}} {count}\n'
response += '# TYPE tlsrpt_failed counter\n'
response += '# HELP tlsrpt_failed Number of failed sessions\n'
for domain, count in self.count_f.items():
response += f'tlsrpt_failed{{domain="{domain}"}} {count}\n'
response += '# TYPE tlsrpt_count counter\n'
response += '# HELP tlsrpt_count Number of reports\n'
for domain, count in self.count.items():
response += f'tlsrpt_count{{domain="{domain}"}} {count}\n'
return response
@dataclasses.dataclass
class ReportInfo:
ts: datetime
stats: list[tuple[int, int]]
report_id: str
filename: str
class UI(bottle.Bottle):
def __init__(self, ns):
super().__init__()
self.datadir = ns.data_dir
self.baseurl = ns.base_url or f'//{ns.host}:{ns.port}'
self.route('/', 'GET', self.list_reports)
self.route('/report/<report_id>', 'GET', self.report_details)
self.route('/report/<report_id>/json', 'GET', self.report_download)
self.route('/style.css', 'GET', self.style)
self.env = jinja2.Environment(
loader=jinja2.FileSystemLoader([ns.template_dir])
)
self.env.filters['unzip'] = unzip
def style(self):
bottle.response.headers['Content-Type'] = 'text/css'
return self.env.get_template('style.css').render()
def list_reports(self):
reports = {}
for report in os.listdir(os.path.join(self.datadir, 'reports')):
if report.startswith('.'):
continue
dt, stats, domain, report_id = report.split('+', 3)
dt = datetime.fromtimestamp(int(float(dt)))
stats = [(int(p.split(',', 1)[0]), int(p.split(',', 1)[1])) for p in stats.split(';')]
reports.setdefault(domain, []).append(ReportInfo(dt, stats, report_id, report))
for v in reports.values():
v.sort(key=lambda x: x.ts)
return self.env.get_template('index.html').render(baseurl=self.baseurl, reports=reports)
def get_report(self, report_id: str):
report = None
for r in os.listdir(os.path.join(self.datadir, 'reports')):
tokens = r.split('+', 3)
if len(tokens) != 4:
continue
if tokens[3] != report_id:
continue
with open(os.path.join(self.datadir, 'reports', r), 'r') as f:
report = f.read()
return report
def report_details(self, report_id: str):
report = self.get_report(report_id)
return self.env.get_template('report.html').render(baseurl=self.baseurl, report=json.loads(report), json=json)
def report_download(self, report_id: str):
report = self.get_report(report_id)
bottle.response.headers['Content-Type'] = 'application/tlsrpt+json'
bottle.response.headers['Content-Disposition'] = f'attachment; filename={report_id}.json'
return report
def main():
ap = argparse.ArgumentParser('prometheus-tlsrpt-exporter')
ap.add_argument('--host', '-H', type=str, default='::', help='Address to bind to')
ap.add_argument('--port', '-p', type=int, default=9123, help='Port to bind to')
ap.add_argument('--base-url', '-B', type=str, default=None, help='Base URL')
ap.add_argument('--data-dir', '-d', type=str,
default='/tmp/prometheus-tlsrpt-exporter', help='Directory to save reports to')
ap.add_argument('--template-dir', '-t', type=str, default='templates', help='Directory to load templates from')
ns = ap.parse_args()
os.makedirs(os.path.join(ns.data_dir, 'reports'), exist_ok=True)
metrics = Metrics(ns)
bottle.mount('/report', Reporter(metrics, ns))
bottle.mount('/metrics', metrics)
bottle.mount('/ui', UI(ns))
bottle.run(host=ns.host, port=ns.port)
if __name__ == '__main__':
main()