#!/usr/bin/env python3 from typing import Any, Dict, List, Optional import argparse import base64 import email import json import logging import os import re import urllib.request import smtplib import ssl import sys import hashlib import yaml import pgpy from dataclasses import dataclass, field, Field from datetime import datetime from dateutil.parser import isoparse LOG: logging.Logger = logging.getLogger('multischleuder') class SmtpServerConfig: def __init__(self, hostname: str, port: int, tls: bool, username: str, password: str): self.hostname = hostname self.port = port self.tls = tls self.username = username self.password = password self._smtp = None @staticmethod def parse(config: Dict[str, Any]) -> 'SmtpServerConfig': tls = config.get('tls', 'PLAIN') port = { 'SSL': 465, 'STARTTLS': 587 }.get(tls, 25) return SmtpServerConfig( hostname=config['hostname'], port=config.get('port', port), tls=tls, username=config.get('username'), password=config.get('password') ) def __enter__(self): cls = smtplib.SMTP if self.tls != 'SSL' else smtplib.SMTP_SSL self._smtp = cls(self.hostname, self.port) smtp = self._smtp.__enter__() if self.tls == 'STARTTLS': smtp.starttls() if self.username is not None and self.password is not None: smtp.login(self.username, self.password) return smtp def __exit__(self, exc_type, exc_val, exc_tb): ret = self._smtp.__exit__(exc_type, exc_val, exc_tb) self._smtp = None return ret def send_message(self, msg): if self._smtp is None: raise RuntimeError('SMTP connection is not established') self._smtp.send_message(msg) @dataclass class SchleuderSubscriber: email: str key: 'SchleuderKey' sub_id: int schleuder: 'SchleuderList' created_at: datetime def __str__(self) -> str: return f'{self.email}' def __hash__(self) -> int: return hash(self.email) def __eq__(self, o) -> bool: if not isinstance(o, SchleuderSubscriber): return False return self.email == o.email @dataclass class SchleuderKey: fingerprint: str email: str blob: Optional[str] = field(repr=False, default=None) source_list: int def __hash__(self) -> int: return hash((self.fingerprint, self.email)) def __str__(self) -> str: return f'{self.fingerprint} ({self.email})' @dataclass class SchleuderList: id: int name: str fingerprint: str def __hash__(self) -> int: return hash(self.id) class ConflictMessage: def __init__(self, multilist: 'MultiList', chosen: 'SchleuderSubscriber', affected: List['SchleuderSubscriber']): self.multlist = multilist self.chosen = chosen self.affected = affected # Generate a SHA1 digest that only changes when the subscription list changes self.affected.sort(key=lambda x: x.created_at) digest = hashlib.new('sha1') digest.update(self.chosen.email) digest.update(self.chosen.scheluder.name) for affected in self.affected: digest.update(affected.scheluder.name) digest.update(affected.key.blob) self.now = datetime.utcnow() self.digest = digest.hexdigest() def to_mime(self): _chosen = f'{self.chosen.key.fingerprint} {self.chosen.email}' for affected in self.affected: _affected += f'{affected.key.fingerprint} {affected.schleuder.name}\n' msg = self.multilist.conflict_message.format( subscriber=self.chosen.email, schleuder=self.multilist.list, chosen=_chosen, affected=_affected ) pgp = pgpy.PGPMessage.new(msg) # Encrypt the message to all keys cipher = pgpy.constants.SymmetricKeyAlgorithm.AES256 sessionkey = cipher.gen_key() try: for affected in self.affected: key, _ = pgpy.PGPKey.from_blob(self.affected.key.blob) key._require_usage_flags = False pgp = key.encrypt(pgp, cipher=cipher, sessionkey=sessionkey) finally: del sessionkey # Build the MIME message mp1 = email.mime.application.MIMEApplication('Version: 1', _subtype='pgp-encrypted') mp1['Content-Description'] = 'PGP/MIME version identification' mp1['Content-Disposition'] = 'attachment' mp2 = email.mime.application.MIMEApplication(str(pgp), _subtype='octet-stream', name='encrypted.asc') mp2['Content-Description'] = 'OpenPGP encrypted message' mp2['Content-Disposition'] = 'inline; filename="message.asc"' mp0 = email.mime.multipart.MIMEMultipart(_subtype='encrypted', protocol='application/pgp-encrypted') mp0.attach(mp1) mp0.attach(mp2) mp0['Subject'] = f'MultiSchleuder {self.multilist.name} - Key Conflict' mp0['From'] = self.multilist.conflict_from mp0['Reply-To'] = self.multilist.conflict_from mp0['To'] = self.chosen.address mp0['Date'] = email.util.formatdate(self.no) mp0['Auto-Submitted'] = 'auto-generated' mp0['Precedence'] = 'list' mp0['List-Id'] = f'<{self.multilist.list.replace("@", ".")}>' mp0['List-Help'] = '' mp0['X-MultiSchleuder-Digest'] = self.digest return mp0 class MultiList: def __init__(self, ns: argparse.Namespace, list: str, source: List[str], unmanaged: List[str], banned: List[str], from_address: str, api_url: str, api_token: str, api_cafile: str, conflict_state_file: str, conflict_message: str, conflict_interval: int, conflict_smtp: SmtpServerConfig): self.ns: argparse.Namespace = ns self.list: str = list self.source: List[str] = source self.unmanaged: List[str] = unmanaged self.banned: List[str] = banned self.api_url: str = api_url self.basic_auth = {'Authorization': b'Basic ' + base64.b64encode(f'schleuder:{api_token}'.encode())} self.api_cafile: str = api_cafile self.conflict_messages = [] self.conflict_from = from_address self.conflict_state_file: str = conflict_state_file self.conflict_message: str = conflict_message self.conflict_interval: int = conflict_interval self.conflict_smtp: SmtpServerConfig = conflict_smtp def request(self, path: str, list_id: Optional[int] = None, data: Optional[str] = None, method: str = 'GET', fmt=[]): if fmt: path = path.format(*fmt) url = os.path.join(self.api_url, path) if list_id is not None: url += f'?list_id={list_id}' payload: Optional[bytes] = data.encode() if data is not None else None context = ssl.create_default_context(cafile=self.api_cafile) context.check_hostname = False req = urllib.request.Request(url, data=payload, method=method, headers=self.basic_auth) resp = urllib.request.urlopen(req, context=context) return json.loads(resp.read().decode()) def resolve_lists(self) -> Dict[str, 'SchleuderList']: response = self.request('lists.json') lists = {} for r in response: l = SchleuderList(r['id'], r['email'], r['fingerprint']) if l.name == self.list or l.name in self.source: lists[l.name] = l return lists def get_list_subscribers(self, list: 'SchleuderList') -> Dict[str, 'SchleuderSubscriber']: response = self.request('subscriptions.json', list.id) subs = {} for r in response: key = SchleuderKey(r['fingerprint'], r['email'], list.id) sub = SchleuderSubscriber(r['email'], key, isoparse(r['created_at']), list, r['id']) subs[sub.email] = sub return subs def get_key(self, key: 'SchleuderKey') -> 'SchleuderKey': if key.blob is not None: return key r = self.request('keys/{}.json', list_id=key.source_list, fmt=[key.fingerprint]) key.blob = r['ascii'] return key def post_key(self, key: 'SchleuderKey', list: 'SchleuderList'): if self.ns.dry_run: return if not key.blob: raise RuntimeError('Key blob needs to be retrieved first') data = json.dumps({ 'keymaterial': key.blob }) self.request('keys.json', list_id=list.id, data=data, method='POST') def delete_key(self, key: SchleuderKey, list: 'SchleuderList'): if self.ns.dry_run: return self.request('keys/{}.json', list_id=list.id, fmt=[key.fingerprint], method='DELETE') def subscribe(self, sub: 'SchleuderSubscriber', list: 'SchleuderList'): if self.ns.dry_run: return data = json.dumps({ 'email': sub.email, 'fingerprint': sub.key.fingerprint }) self.request('subscriptions.json', list_id=list.id, data=data, method='POST') def unsubscribe(self, sub: 'SchleuderSubscriber'): if self.ns.dry_run: return self.request('subscriptions/{}.json', fmt=[sub.sub_id], method='DELETE') def resolve_subscriber_conflicts(self, subscriptions: List['SchleuderSubscription']) -> 'SchleuderSubscription': if len(subscriptions) == 1: return subscriptions[0] earliest = min(subscriptions, key=lambda x: x.created_at) LOG.debug(f'Key Conflict for {earliest.email} in lists, chose {earliest.schleuder.name}:') for s in subscriptions: LOG.debug(f' - {s.schleuder.name}: {s.key.fingerprint}') for s in subscriptions: self.get_key(s.key, s.list.id) self.conflict_messages.append(ConflictMessage(earliest, subscriptions)) return earliest def send_conflict_messages(self): now = datetime.utcnow().timestamp() with open(self.conflict_state_file, 'a+') as f: f.seek(0) state = json.load(f) # Remove all state entries older than conflict_interval state = {k: v for k, v in state.items() if now-v < self.conflict_interval} # Remove all messages not already sent recently msgs = [m for m in self.conflict_messages if m.digest not in state] # Add all remaining messages to state dict for m in msgs: state[m.digest] = now if not self.ns.dry_run: f.seek(0) f.truncate() json.dump(state) # Finally send the mails for m in msgs: LOG.info(f'Sending key conflict message to {m["To"]}') LOG.debug(f'MIME Message:\n{str(m)}') if not self.ns.dry_run: with self.smtp_config as smtp: smtp.send_message(m) # Clear conflict messages self.conflict_messages = [] def process(self): LOG.info(f'Processing: {self.list} {"DRY RUN" if self.ns.dry_run else ""}') # todo: conflict handling - what if a user is subscribed through 2 lists (and possibly with different keys?) lists: Dict[str, SchleuderList] = self.resolve_lists() target_list = lists[self.list] # Get current subs, except for unmanaged adresses current_subs = {s for s in self.get_list_subscribers(target_list).values() if s.email not in self.unmanaged} current_keys = {s.key for s in current_subs} intended_subs = set() intended_keys = set() all_subs = dict() # This loop may return multiple subscriptions for some users if they are subscribed on multiple sub-lists for source in self.source: for sub in self.get_list_subscribers(lists[source]).values(): # Don't include banned or unmanaged adresses if sub.email in self.banned or s.email in self.unmanaged: continue all_subs.setdefault(sub.email, []).append(sub) # Remove for subs in all_subs.values(): sub = self.resolve_subscriber_conflicts(subs) intended_subs.add(sub) intended_keys.add(sub.key) # Determine the change set to_subscribe = intended_subs.difference(current_subs) to_unsubscribe = current_subs.difference(intended_subs) to_remove = current_keys.difference(intended_keys) to_add = intended_keys.difference(current_keys) # Retrieve actual key blobs for key in to_add: self.get_key(key) # Perform the actual list modifications in an order where nothing breaks for key in to_add: self.post_key(key, target_list_id) LOG.info(f'Added key: {key}') for email in to_subscribe: self.subscribe(email, target_list_id) LOG.info(f'Subscribed user: {email}') for sub_id in to_unsubscribe: self.unsubscribe(sub_id) LOG.info(f'Unsubscribed user: {sub_id}') for key in to_remove: self.delete_key(key, target_list_id) LOG.info(f'Removed key: {key}') self.send_conflict_messages() if len(to_add) + len(to_subscribe) + len(to_unsubscribe) + len(to_remove) == 0: LOG.info(f'No changes for {self.list}') else: LOG.info(f'Finished processing: {self.list}') def parse_config(ns: argparse.Namespace) -> List['MultiList']: with open(ns.config, 'r') as f: y = yaml.safe_load(f) lists = [] for l in y['lists']: lc = MultiList(ns, l['name'], l['source'], l.get('unmanaged', []), l.get('banned', []), l.get('from'), y['api']['url'], y['api']['token'], y['api']['cafile'], y['conflict']['state'], y['conflict']['message'], y['conflict']['interval'], SmtpServerConfig.parse(y['conflict']['smtp'])) lists.append(lc) return lists def main(): ap = argparse.ArgumentParser() ap.add_argument('--config', '-c', type=str, default='/etc/multischleuder/config.yml') ap.add_argument('--dry-run', '-n', action='store_true', default=False) ap.add_argument('--verbose', '-v', action='store_true', default=False) ns = ap.parse_args(sys.argv[1:]) if ns.verbose: LOG.setLevel('DEBUG') LOG.debug('Enabled verbose logging') lists = parse_config(ns) for l in lists: l.process() if __name__ == '__main__': main()