Initial commit

This commit is contained in:
s3lph 2022-08-19 23:06:05 +02:00
commit 8672128dd3
3 changed files with 161 additions and 0 deletions

38
README.md Normal file
View file

@ -0,0 +1,38 @@
# Prometheus to InfluxDB Pushgateway
## Configuration
```yaml
---
influxdb:
url: http://influx.example.org:8086
user: changeme
pass: changeme
prometheus:
url: http://prometheus.example.org:9090
queries:
- prom_query: 'max(irate(node_network_receive_bytes_total{job="node",device="em2"}[5m]) * 8) without (instance)'
influx_db: sensors
influx_labels:
location: Uplink
name: Downstream
influx_retention: rp_week
influx_series: network_traffic
- prom_query: 'max(irate(node_network_transmit_bytes_total{job="node",device="em2"}[5m]) * 8) without (instance)'
influx_db: sensors
influx_labels:
location: Uplink
name: Upstream
influx_retention: rp_week
influx_series: network_traffic
```
## Run
Put e.g. the following into a cronjob like this:
```cron
* * * * * nobody /usr/bin/python3 prometheus2influxdb.py /path/to/config.yaml
```

24
config.yaml Normal file
View file

@ -0,0 +1,24 @@
---
influxdb:
url: http://influx.example.org:8086
user: changeme
pass: changeme
prometheus:
url: http://prometheus.example.org:9090
queries:
- prom_query: 'max(irate(node_network_receive_bytes_total{job="node",device="em2"}[5m]) * 8) without (instance)'
influx_db: sensors
influx_labels:
location: Uplink
name: Downstream
influx_retention: rp_week
influx_series: network_traffic
- prom_query: 'max(irate(node_network_transmit_bytes_total{job="node",device="em2"}[5m]) * 8) without (instance)'
influx_db: sensors
influx_labels:
location: Uplink
name: Upstream
influx_retention: rp_week
influx_series: network_traffic

99
prometheus2influxdb.py Normal file
View file

@ -0,0 +1,99 @@
#!/usr/bin/python3
import json
import sys
from urllib import request, parse
import yaml
def _get_unicode(data, force=False):
'''
This function is taken as-is from
https://github.com/AxiomExergy/influx-client/blob/master/influx/line_protocol.py
Licensed by Axiom Exergy under the terms of the Apache License 2.0
'''
"""Try to return a text aka unicode object from the given data."""
if isinstance(data, bytes):
return data.decode('utf-8')
elif data is None:
return ''
elif force:
return str(data)
else:
return data
def _escape_tag(tag):
'''
This function is taken as-is from
https://github.com/AxiomExergy/influx-client/blob/master/influx/line_protocol.py
Licensed by Axiom Exergy under the terms of the Apache License 2.0
'''
tag = _get_unicode(tag, force=True)
return tag.replace(
"\\", "\\\\"
).replace(
" ", "\\ "
).replace(
",", "\\,"
).replace(
"=", "\\="
)
def _escape_tag_value(value):
'''
This function is taken as-is from
https://github.com/AxiomExergy/influx-client/blob/master/influx/line_protocol.py
Licensed by Axiom Exergy under the terms of the Apache License 2.0
'''
ret = _escape_tag(value)
if ret.endswith('\\'):
ret += ' '
return ret
def run_query(query, config):
encoded = parse.quote(query['prom_query'])
url = config['prometheus']['url'] + '/api/v1/query?query=' + encoded
res = request.urlopen(url)
response = json.loads(res.read().decode())
if 'status' not in response or 'data' not in response:
raise ValueError()
if response['status'] != 'success':
raise ValueError()
if response['data']['resultType'] != 'vector':
raise ValueError()
lines = []
for result in response['data']['result']:
time = int(result['value'][0])
value = float(result['value'][1])
line = _escape_tag(query['influx_series'])
for lkey, lvalue in query['influx_labels'].items():
lkey = _escape_tag(lkey)
lvalue = _escape_tag_value(lvalue)
line += f',{lkey}={lvalue}'
line += f' value={_escape_tag_value(value)} {time}'
lines.append(line)
ib = config['influxdb']['url'] + '/write'
user = parse.quote(config['influxdb']['user'])
pw = parse.quote(config['influxdb']['pass'])
db = parse.quote(query['influx_db'])
rp = parse.quote(query['influx_retention'])
iq = f'u={user}&p={pw}&db={db}&rp={rp}&precision=s'
url = f'{ib}?{iq}'
body = '\n'.join(lines).encode()
request.urlopen(url, body)
if __name__ == '__main__':
with open(sys.argv[1], 'r') as c:
config = yaml.safe_load(c)
for query in config['queries']:
run_query(query, config)