122 lines
3.3 KiB
Python
122 lines
3.3 KiB
Python
|
#!/usr/bin/env python3
|
||
|
|
||
|
import json
|
||
|
import mailbox
|
||
|
import os
|
||
|
import subprocess
|
||
|
|
||
|
import deepdiff
|
||
|
import pgpy
|
||
|
|
||
|
|
||
|
def subscribermap(subs):
|
||
|
return {
|
||
|
s: pgpy.PGPKey.from_file(f'/tmp/{s}.asc')[0].fingerprint.replace(' ', '')
|
||
|
for s in subs
|
||
|
}
|
||
|
|
||
|
|
||
|
def keymap(subs):
|
||
|
return {
|
||
|
pgpy.PGPKey.from_file(f'/tmp/{s}.asc')[0].fingerprint.replace(' ', ''): s
|
||
|
for s in subs
|
||
|
}
|
||
|
|
||
|
# Test subscribers
|
||
|
|
||
|
subsp = subprocess.Popen(['/usr/bin/schleuder-cli', 'subscriptions', 'list', 'test-global@schleuder.example.org'],
|
||
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
|
subsout, _ = subsp.communicate()
|
||
|
keysp = subprocess.Popen(['/usr/bin/schleuder-cli', 'keys', 'list', 'test-global@schleuder.example.org'],
|
||
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
|
keysout, _ = keysp.communicate()
|
||
|
fprp = subprocess.Popen(['/usr/bin/schleuder-cli', 'lists', 'show', 'test-global@schleuder.example.org', 'fingerprint'],
|
||
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||
|
fprout, _ = fprp.communicate()
|
||
|
|
||
|
expected_subscribers = subscribermap([
|
||
|
'aaron.example@example.org',
|
||
|
'admin@example.org',
|
||
|
'alex.example@example.org',
|
||
|
'amy.example@example.org',
|
||
|
'andy.example@example.org',
|
||
|
'anna.example@example.org',
|
||
|
'anotherspammer@example.org'
|
||
|
])
|
||
|
|
||
|
actual_subscribers = {}
|
||
|
for s in subsout.decode().splitlines():
|
||
|
if '\t' not in s:
|
||
|
continue
|
||
|
uid, fpr, *_ = s.split('\t', 2)
|
||
|
actual_subscribers[uid] = fpr
|
||
|
|
||
|
subsdiff = deepdiff.DeepDiff(expected_subscribers, actual_subscribers)
|
||
|
if len(subsdiff) > 0:
|
||
|
print(subsdiff)
|
||
|
exit(1)
|
||
|
|
||
|
# Test keys
|
||
|
|
||
|
expected_keys = keymap([
|
||
|
'aaron.example@example.org',
|
||
|
'admin@example.org',
|
||
|
'alex.example@example.org',
|
||
|
'amy.example@example.org',
|
||
|
'andy.example@example.org',
|
||
|
'anna.example@example.org',
|
||
|
'anotherspammer@example.org'
|
||
|
])
|
||
|
expected_keys[fprout.strip().decode()] = 'test-global@schleuder.example.org'
|
||
|
|
||
|
actual_keys = {}
|
||
|
for s in keysout.decode().splitlines():
|
||
|
if ' ' not in s:
|
||
|
continue
|
||
|
fpr, uid = s.split(' ', 1)
|
||
|
actual_keys[fpr] = uid
|
||
|
|
||
|
keysdiff = deepdiff.DeepDiff(expected_keys, actual_keys)
|
||
|
if len(keysdiff) > 0:
|
||
|
print(keysdiff)
|
||
|
exit(1)
|
||
|
|
||
|
# Test mbox
|
||
|
|
||
|
mbox = mailbox.mbox('/var/spool/mail/root')
|
||
|
if len(mbox) != 1:
|
||
|
print(f'Expected 1 message in mbox, got {len(mbox)}')
|
||
|
exit(1)
|
||
|
_, msg = mbox.popitem()
|
||
|
mbox.close()
|
||
|
digest = msg['X-MultiSchleuder-Digest'].strip()
|
||
|
|
||
|
if msg['From'] != 'test-global-owner@schleuder.example.org':
|
||
|
print(f'Expected "From: test-global-owner@schleuder.example.org", got {msg["From"]}')
|
||
|
exit(1)
|
||
|
if msg['To'] != 'andy.example@example.org':
|
||
|
print(f'Expected "To: andy.example@example.org", got {msg["To"]}')
|
||
|
exit(1)
|
||
|
if msg['Auto-Submitted'] != 'auto-generated':
|
||
|
print(f'Expected "Auto-Submitted: auto-generated", got {msg["Auto-Submitted"]}')
|
||
|
exit(1)
|
||
|
if msg['Precedence'] != 'list':
|
||
|
print(f'Expected "Precedence: list", got {msg["Precedence"]}')
|
||
|
exit(1)
|
||
|
|
||
|
|
||
|
# Test conflict statefile
|
||
|
|
||
|
with open('/tmp/conflict.json', 'r') as f:
|
||
|
conflict = json.load(f)
|
||
|
if len(conflict) != 1:
|
||
|
print('Expected 1 entry in conflict statefile, got:')
|
||
|
print(json.dumps(conflict))
|
||
|
exit(1)
|
||
|
if digest not in conflict:
|
||
|
print(f'Expected key "{digest}" in conflict statefile, got:')
|
||
|
print(json.dumps(conflict))
|
||
|
exit(1)
|
||
|
|
||
|
print('All checks passed!')
|