multischleuder/test/report.py

206 lines
7.2 KiB
Python
Executable file

#!/usr/bin/env python3
import json
import mailbox
import os
import subprocess
import deepdiff
import pgpy
def subscribermap(subs):
return {
s: pgpy.PGPKey.from_file(f'/tmp/{s}.asc')[0].fingerprint.replace(' ', '')
for s in subs
}
def keymap(subs):
return {
pgpy.PGPKey.from_file(f'/tmp/{s}.asc')[0].fingerprint.replace(' ', ''): s
for s in subs
}
# Test subscribers
subsp = subprocess.Popen(['/usr/bin/schleuder-cli', 'subscriptions', 'list', 'test-global@schleuder.example.org'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
subsout, _ = subsp.communicate()
keysp = subprocess.Popen(['/usr/bin/schleuder-cli', 'keys', 'list', 'test-global@schleuder.example.org'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
keysout, _ = keysp.communicate()
fprp = subprocess.Popen(['/usr/bin/schleuder-cli', 'lists', 'show', 'test-global@schleuder.example.org', 'fingerprint'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
fprout, _ = fprp.communicate()
expected_subscribers = subscribermap([
'aaron.example@example.org',
'admin@example.org',
'alex.example@example.org',
'alice.example@example.net',
'amy.example@example.org',
'andy.example@example.org',
'anna.example@example.org',
'anotherspammer@example.org'
])
actual_subscribers = {}
for s in subsout.decode().splitlines():
if '\t' not in s:
continue
uid, fpr, *_ = s.split('\t', 2)
actual_subscribers[uid] = fpr
subsdiff = deepdiff.DeepDiff(expected_subscribers, actual_subscribers)
if len(subsdiff) > 0:
print(subsdiff)
exit(1)
# Test keys
expected_keys = keymap([
'aaron.example@example.org',
'admin@example.org',
'alex.example@example.org',
'alice.example@example.org', # schleuder returns the primary UID
'amy.example@example.org',
'andy.example@example.org',
'anna.example@example.org',
'anotherspammer@example.org'
])
expected_keys[fprout.strip().decode()] = 'test-global@schleuder.example.org'
actual_keys = {}
for s in keysout.decode().splitlines():
if ' ' not in s:
continue
fpr, uid = s.split(' ', 1)
actual_keys[fpr] = uid
keysdiff = deepdiff.DeepDiff(expected_keys, actual_keys)
if len(keysdiff) > 0:
print(keysdiff)
exit(1)
# Test mbox
mbox = mailbox.mbox('/var/spool/mail/root')
if len(mbox) != 4:
print(f'Expected 4 messages in mbox, got {len(mbox)}')
exit(1)
messages = []
for i in range(4):
messages.append(mbox.popitem()[1])
mbox.close()
msg1 = [m for m in messages if m['To'] == 'andy.example@example.org'][0]
msg2 = [m for m in messages if m['To'] == 'admin@example.org'][0]
msg3 = [m for m in messages if m['To'] == 'alice.example@example.org'][0]
msg4 = [m for m in messages if m['To'] == 'alice.example@example.net'][0]
if 'X-MultiSchleuder-Digest' not in msg1:
print(f'Key conflict message should have a X-MultiSchleuder-Digest header, missing')
exit(1)
digest1 = msg1['X-MultiSchleuder-Digest'].strip()
if msg1['From'] != 'test-global-owner@schleuder.example.org':
print(f'Expected "From: test-global-owner@schleuder.example.org", got {msg1["From"]}')
exit(1)
if msg1['To'] != 'andy.example@example.org':
print(f'Expected "To: andy.example@example.org", got {msg1["To"]}')
exit(1)
if msg1['Auto-Submitted'] != 'auto-generated':
print(f'Expected "Auto-Submitted: auto-generated", got {msg1["Auto-Submitted"]}')
exit(1)
if msg1['Precedence'] != 'list':
print(f'Expected "Precedence: list", got {msg1["Precedence"]}')
exit(1)
if 'X-MultiSchleuder-Digest' in msg2:
print(f'Admin report message should not have a X-MultiSchleuder-Digest header, got {msg2["X-MultiSchleuder-Digest"]}')
exit(1)
if msg2['From'] != 'test-global-owner@schleuder.example.org':
print(f'Expected "From: test-global-owner@schleuder.example.org", got {msg2["From"]}')
exit(1)
if msg2['To'] != 'admin@example.org':
print(f'Expected "To: admin@example.org", got {msg2["To"]}')
exit(1)
if msg2['Auto-Submitted'] != 'auto-generated':
print(f'Expected "Auto-Submitted: auto-generated", got {msg2["Auto-Submitted"]}')
exit(1)
if msg2['Precedence'] != 'list':
print(f'Expected "Precedence: list", got {msg2["Precedence"]}')
exit(1)
if 'X-MultiSchleuder-Digest' not in msg3:
print(f'User conflict message should have a X-MultiSchleuder-Digest header, missing')
exit(1)
digest3 = msg3['X-MultiSchleuder-Digest'].strip()
if msg3['From'] != 'test-global-owner@schleuder.example.org':
print(f'Expected "From: test-global-owner@schleuder.example.org", got {msg3["From"]}')
exit(1)
if msg3['To'] != 'alice.example@example.org':
print(f'Expected "To: alice.example@example.org", got {msg3["To"]}')
exit(1)
if msg3['Auto-Submitted'] != 'auto-generated':
print(f'Expected "Auto-Submitted: auto-generated", got {msg3["Auto-Submitted"]}')
exit(1)
if msg3['Precedence'] != 'list':
print(f'Expected "Precedence: list", got {msg3["Precedence"]}')
exit(1)
if 'X-MultiSchleuder-Digest' not in msg4:
print(f'User conflict message should have a X-MultiSchleuder-Digest header, missing')
exit(1)
digest4 = msg4['X-MultiSchleuder-Digest'].strip()
if msg4['From'] != 'test-global-owner@schleuder.example.org':
print(f'Expected "From: test-global-owner@schleuder.example.org", got {msg4["From"]}')
exit(1)
if msg4['To'] != 'alice.example@example.net':
print(f'Expected "To: alice.example@example.net", got {msg4["To"]}')
exit(1)
if msg4['Auto-Submitted'] != 'auto-generated':
print(f'Expected "Auto-Submitted: auto-generated", got {msg4["Auto-Submitted"]}')
exit(1)
if msg4['Precedence'] != 'list':
print(f'Expected "Precedence: list", got {msg4["Precedence"]}')
exit(1)
if digest3 != digest4:
print(f'User conflict messages should have the same digest, got: "{digest3}" vs "{digest4}"')
exit(1)
gpg1 = subprocess.Popen(['/usr/bin/gpg', '-d'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
gpg1o, _ = gpg1.communicate(msg1.get_payload()[1].get_payload(decode=True))
print(f'\nKey conflict message (decrypted):\n{gpg1o.decode()}')
gpg2 = subprocess.Popen(['/usr/bin/gpg', '-d'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
gpg2o, _ = gpg2.communicate(msg2.get_payload()[1].get_payload(decode=True))
print(f'\nAdmin report message (decrypted):\n{gpg2o.decode()}')
gpg3 = subprocess.Popen(['/usr/bin/gpg', '-d'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
gpg3o, _ = gpg3.communicate(msg3.get_payload()[1].get_payload(decode=True))
print(f'\nUser conflict message (decrypted):\n{gpg3o.decode()}')
# Test conflict statefile
with open('/tmp/conflict.json', 'r') as f:
conflict = json.load(f)
if len(conflict) != 2:
print('Expected 1 entry in conflict statefile, got:')
print(json.dumps(conflict))
exit(1)
if digest1 not in conflict:
print(f'Expected key "{digest1}" in conflict statefile, got:')
print(json.dumps(conflict))
exit(1)
if digest3 not in conflict:
print(f'Expected key "{digest3}" in conflict statefile, got:')
print(json.dumps(conflict))
exit(1)
print('All checks passed!')