#!/usr/bin/env python3 import json import mailbox import os import subprocess import deepdiff import pgpy def subscribermap(subs): return { s: pgpy.PGPKey.from_file(f'/tmp/{s}.asc')[0].fingerprint.replace(' ', '') for s in subs } def keymap(subs): return { pgpy.PGPKey.from_file(f'/tmp/{s}.asc')[0].fingerprint.replace(' ', ''): s for s in subs } # Test subscribers subsp = subprocess.Popen(['/usr/bin/schleuder-cli', 'subscriptions', 'list', 'test-global@schleuder.example.org'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) subsout, _ = subsp.communicate() keysp = subprocess.Popen(['/usr/bin/schleuder-cli', 'keys', 'list', 'test-global@schleuder.example.org'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) keysout, _ = keysp.communicate() fprp = subprocess.Popen(['/usr/bin/schleuder-cli', 'lists', 'show', 'test-global@schleuder.example.org', 'fingerprint'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) fprout, _ = fprp.communicate() expected_subscribers = subscribermap([ 'aaron.example@example.org', 'admin@example.org', 'alex.example@example.org', 'alice.example@example.net', 'amy.example@example.org', 'andy.example@example.org', 'anna.example@example.org', 'anotherspammer@example.org' ]) actual_subscribers = {} for s in subsout.decode().splitlines(): if '\t' not in s: continue uid, fpr, *_ = s.split('\t', 2) actual_subscribers[uid] = fpr subsdiff = deepdiff.DeepDiff(expected_subscribers, actual_subscribers) if len(subsdiff) > 0: print(subsdiff) exit(1) # Test keys expected_keys = keymap([ 'aaron.example@example.org', 'admin@example.org', 'alex.example@example.org', 'alice.example@example.org', # schleuder returns the primary UID 'amy.example@example.org', 'andy.example@example.org', 'anna.example@example.org', 'anotherspammer@example.org' ]) expected_keys[fprout.strip().decode()] = 'test-global@schleuder.example.org' actual_keys = {} for s in keysout.decode().splitlines(): if ' ' not in s: continue fpr, uid = s.split(' ', 1) actual_keys[fpr] = uid keysdiff = deepdiff.DeepDiff(expected_keys, actual_keys) if len(keysdiff) > 0: print(keysdiff) exit(1) # Test mbox mbox = mailbox.mbox('/var/spool/mail/root') if len(mbox) != 4: print(f'Expected 4 messages in mbox, got {len(mbox)}') exit(1) messages = [] for i in range(4): messages.append(mbox.popitem()[1]) mbox.close() msg1 = [m for m in messages if m['To'] == 'andy.example@example.org'][0] msg2 = [m for m in messages if m['To'] == 'admin@example.org'][0] msg3 = [m for m in messages if m['To'] == 'alice.example@example.org'][0] msg4 = [m for m in messages if m['To'] == 'alice.example@example.net'][0] if 'X-MultiSchleuder-Digest' not in msg1: print(f'Key conflict message should have a X-MultiSchleuder-Digest header, missing') exit(1) digest1 = msg1['X-MultiSchleuder-Digest'].strip() if msg1['From'] != 'test-global-owner@schleuder.example.org': print(f'Expected "From: test-global-owner@schleuder.example.org", got {msg1["From"]}') exit(1) if msg1['To'] != 'andy.example@example.org': print(f'Expected "To: andy.example@example.org", got {msg1["To"]}') exit(1) if msg1['Auto-Submitted'] != 'auto-generated': print(f'Expected "Auto-Submitted: auto-generated", got {msg1["Auto-Submitted"]}') exit(1) if msg1['Precedence'] != 'list': print(f'Expected "Precedence: list", got {msg1["Precedence"]}') exit(1) if 'X-MultiSchleuder-Digest' in msg2: print(f'Admin report message should not have a X-MultiSchleuder-Digest header, got {msg2["X-MultiSchleuder-Digest"]}') exit(1) if msg2['From'] != 'test-global-owner@schleuder.example.org': print(f'Expected "From: test-global-owner@schleuder.example.org", got {msg2["From"]}') exit(1) if msg2['To'] != 'admin@example.org': print(f'Expected "To: admin@example.org", got {msg2["To"]}') exit(1) if msg2['Auto-Submitted'] != 'auto-generated': print(f'Expected "Auto-Submitted: auto-generated", got {msg2["Auto-Submitted"]}') exit(1) if msg2['Precedence'] != 'list': print(f'Expected "Precedence: list", got {msg2["Precedence"]}') exit(1) if 'X-MultiSchleuder-Digest' not in msg3: print(f'User conflict message should have a X-MultiSchleuder-Digest header, missing') exit(1) digest3 = msg3['X-MultiSchleuder-Digest'].strip() if msg3['From'] != 'test-global-owner@schleuder.example.org': print(f'Expected "From: test-global-owner@schleuder.example.org", got {msg3["From"]}') exit(1) if msg3['To'] != 'alice.example@example.org': print(f'Expected "To: alice.example@example.org", got {msg3["To"]}') exit(1) if msg3['Auto-Submitted'] != 'auto-generated': print(f'Expected "Auto-Submitted: auto-generated", got {msg3["Auto-Submitted"]}') exit(1) if msg3['Precedence'] != 'list': print(f'Expected "Precedence: list", got {msg3["Precedence"]}') exit(1) if 'X-MultiSchleuder-Digest' not in msg4: print(f'User conflict message should have a X-MultiSchleuder-Digest header, missing') exit(1) digest4 = msg4['X-MultiSchleuder-Digest'].strip() if msg4['From'] != 'test-global-owner@schleuder.example.org': print(f'Expected "From: test-global-owner@schleuder.example.org", got {msg4["From"]}') exit(1) if msg4['To'] != 'alice.example@example.net': print(f'Expected "To: alice.example@example.net", got {msg4["To"]}') exit(1) if msg4['Auto-Submitted'] != 'auto-generated': print(f'Expected "Auto-Submitted: auto-generated", got {msg4["Auto-Submitted"]}') exit(1) if msg4['Precedence'] != 'list': print(f'Expected "Precedence: list", got {msg4["Precedence"]}') exit(1) if digest3 != digest4: print(f'User conflict messages should have the same digest, got: "{digest3}" vs "{digest4}"') exit(1) gpg1 = subprocess.Popen(['/usr/bin/gpg', '-d'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) gpg1o, _ = gpg1.communicate(msg1.get_payload()[1].get_payload(decode=True)) print(f'\nKey conflict message (decrypted):\n{gpg1o.decode()}') gpg2 = subprocess.Popen(['/usr/bin/gpg', '-d'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) gpg2o, _ = gpg2.communicate(msg2.get_payload()[1].get_payload(decode=True)) print(f'\nAdmin report message (decrypted):\n{gpg2o.decode()}') gpg3 = subprocess.Popen(['/usr/bin/gpg', '-d'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) gpg3o, _ = gpg3.communicate(msg3.get_payload()[1].get_payload(decode=True)) print(f'\nUser conflict message (decrypted):\n{gpg3o.decode()}') # Test conflict statefile with open('/tmp/conflict.json', 'r') as f: conflict = json.load(f) if len(conflict) != 2: print('Expected 1 entry in conflict statefile, got:') print(json.dumps(conflict)) exit(1) if digest1 not in conflict: print(f'Expected key "{digest1}" in conflict statefile, got:') print(json.dumps(conflict)) exit(1) if digest3 not in conflict: print(f'Expected key "{digest3}" in conflict statefile, got:') print(json.dumps(conflict)) exit(1) print('All checks passed!')