import argparse import os import dataclasses import gzip import json from datetime import datetime import bottle import jinja2 def unzip(xs): return list(zip(*xs)) class Reporter(bottle.Bottle): def __init__(self, metrics, ns): super().__init__() self.datadir = ns.data_dir self.metrics = metrics self.route('/', 'POST', self.report) def handle(self, data): print(data) if 'report-id' not in data: return prefix = f'{int(datetime.utcnow().timestamp())}+' if len(data.get('policies', [])) == 0: return domain = None for p in data.get('policies', []): policy_domain = p.get('policy', {}).get('policy-domain') if domain is None: domain = policy_domain count_s = p.get('summary', {}).get('total-successful-session-count', 0) count_f = p.get('summary', {}).get('total-failure-session-count', 0) self.metrics.add_report(policy_domain, count_s, count_f) prefix += f'{count_s},{count_f};' if domain is None: return fname = os.path.join(self.datadir, 'reports', prefix[:-1] + '+' + domain + '+' + data['report-id']) with open(fname, 'w') as f: json.dump(data, f) def report(self): bottle.response.add_header('Accept', 'application/tlsrpt+gzip') bottle.response.add_header('Accept', 'application/tlsrpt+json') if 'application/tlsrpt+gzip' in bottle.request.content_type: self.handle(json.load(gzip.open(bottle.request.body))) elif 'application/tlsrpt+json' in bottle.request.content_type: self.handle(json.load(bottle.request.body)) else: bottle.response.status = 406 class Metrics(bottle.Bottle): def __init__(self, ns): super().__init__() self.datadir = ns.data_dir self.count = {} self.count_s = {} self.count_f = {} self.load_reports() self.route('/', 'GET', self.metrics) def add_report(self, domain, count_s, count_f): self.count_s[domain] = self.count_s.setdefault(domain, 0) + count_s self.count_f[domain] = self.count_f.setdefault(domain, 0) + count_f self.count[domain] = self.count.setdefault(domain, 0) + 1 def load_reports(self): for report in os.listdir(os.path.join(self.datadir, 'reports')): if report.startswith('.'): continue _, stats, domain, _ = report.split('+', 3) for p in stats.split(';'): s, f = p.split(',') self.add_report(domain, int(s), int(f)) def metrics(self): bottle.response.headers['Content-Type'] = 'text/plain; version=0.0.4' response = '# TYPE tlsrpt_successful counter\n' response += '# HELP tlsrpt_successful Number of successful sessions\n' for domain, count in self.count_s.items(): response += f'tlsrpt_successful{{domain="{domain}"}} {count}\n' response += '# TYPE tlsrpt_failed counter\n' response += '# HELP tlsrpt_failed Number of failed sessions\n' for domain, count in self.count_f.items(): response += f'tlsrpt_failed{{domain="{domain}"}} {count}\n' response += '# TYPE tlsrpt_count counter\n' response += '# HELP tlsrpt_count Number of reports\n' for domain, count in self.count.items(): response += f'tlsrpt_count{{domain="{domain}"}} {count}\n' return response @dataclasses.dataclass class ReportInfo: ts: datetime stats: list[tuple[int, int]] report_id: str filename: str class UI(bottle.Bottle): def __init__(self, ns): super().__init__() self.datadir = ns.data_dir self.baseurl = ns.base_url or f'//{ns.host}:{ns.port}' self.route('/', 'GET', self.list_reports) self.route('/report/', 'GET', self.report_details) self.route('/report//json', 'GET', self.report_download) self.route('/style.css', 'GET', self.style) self.env = jinja2.Environment( loader=jinja2.FileSystemLoader([ns.template_dir]) ) self.env.filters['unzip'] = unzip def style(self): bottle.response.headers['Content-Type'] = 'text/css' return self.env.get_template('style.css').render() def list_reports(self): reports = {} for report in os.listdir(os.path.join(self.datadir, 'reports')): if report.startswith('.'): continue dt, stats, domain, report_id = report.split('+', 3) dt = datetime.fromtimestamp(int(float(dt))) stats = [(int(p.split(',', 1)[0]), int(p.split(',', 1)[1])) for p in stats.split(';')] reports.setdefault(domain, []).append(ReportInfo(dt, stats, report_id, report)) for v in reports.values(): v.sort(key=lambda x: x.ts) return self.env.get_template('index.html').render(baseurl=self.baseurl, reports=reports) def get_report(self, report_id: str): report = None for r in os.listdir(os.path.join(self.datadir, 'reports')): tokens = r.split('+', 3) if len(tokens) != 4: continue if tokens[3] != report_id: continue with open(os.path.join(self.datadir, 'reports', r), 'r') as f: report = f.read() return report def report_details(self, report_id: str): report = self.get_report(report_id) return self.env.get_template('report.html').render(baseurl=self.baseurl, report=json.loads(report), json=json) def report_download(self, report_id: str): report = self.get_report(report_id) bottle.response.headers['Content-Type'] = 'application/tlsrpt+json' bottle.response.headers['Content-Disposition'] = f'attachment; filename={report_id}.json' return report def main(): ap = argparse.ArgumentParser('prometheus-tlsrpt-exporter') ap.add_argument('--host', '-H', type=str, default='::', help='Address to bind to') ap.add_argument('--port', '-p', type=int, default=9123, help='Port to bind to') ap.add_argument('--base-url', '-B', type=str, default=None, help='Base URL') ap.add_argument('--data-dir', '-d', type=str, default='/tmp/prometheus-tlsrpt-exporter', help='Directory to save reports to') ap.add_argument('--template-dir', '-t', type=str, default='templates', help='Directory to load templates from') ns = ap.parse_args() os.makedirs(os.path.join(ns.data_dir, 'reports'), exist_ok=True) metrics = Metrics(ns) bottle.mount('/report', Reporter(metrics, ns)) bottle.mount('/metrics', metrics) bottle.mount('/ui', UI(ns)) bottle.run(host=ns.host, port=ns.port) if __name__ == '__main__': main()