multischleuder/multischleuder/reporting.py

318 lines
12 KiB
Python

from typing import Dict, List, Optional, Set
import abc
import email.encoders
import email.mime.base
import email.mime.application
import email.mime.multipart
import email.mime.text
import email.utils
import hashlib
import json
import logging
import struct
from collections import OrderedDict
from datetime import datetime
import pgpy # type: ignore
from multischleuder.types import SchleuderKey, SchleuderSubscriber
class Message(abc.ABC):
def __init__(self,
schleuder: str,
mail_from: str,
mail_to: str,
content: str,
encrypt_to: List[str],
encrypt_may_fail: bool = False):
self._schleuder: str = schleuder
self._from: str = mail_from
self._to: str = mail_to
self._keys: List[str] = encrypt_to
self._mime: email.mime.base.MIMEBase = self._make_mime(content, encrypt_may_fail)
@property
def mime(self) -> email.mime.base.MIMEBase:
return self._mime
def _make_mime(self, content: str, encrypt_may_fail: bool) -> email.mime.base.MIMEBase:
# Encrypt to all keys, if possible. Fall back to unencrypted otherwise
try:
self._mime = self._encrypt_message(content)
except Exception as e:
if encrypt_may_fail:
logging.exception('Encryption failed; falling back to unencrypted message')
self._mime = email.mime.text.MIMEText(content, _subtype='plain', _charset='utf-8')
else:
logging.exception('Encryption failed; Not sending this message')
raise e
# Set all the email headers
self._mime['From'] = self._from
self._mime['Reply-To'] = self._from
self._mime['To'] = self._to
self._mime['Date'] = email.utils.formatdate()
self._mime['Auto-Submitted'] = 'auto-generated'
self._mime['Precedence'] = 'list'
self._mime['List-Id'] = f'<{self._schleuder.replace("@", ".")}>'
self._mime['List-Help'] = '<https://gitlab.com/s3lph/multischleuder>'
return self._mime
def _encrypt_message(self, content: str) -> email.mime.base.MIMEBase:
if len(self._keys) == 0:
return email.mime.text.MIMEText(content, _subtype='plain', _charset='utf-8')
pgp = pgpy.PGPMessage.new(content)
# Encrypt the message to all keys
cipher = pgpy.constants.SymmetricKeyAlgorithm.AES256
sessionkey = cipher.gen_key()
try:
for k in self._keys:
key, _ = pgpy.PGPKey.from_blob(k)
pgp = key.encrypt(pgp, cipher=cipher, sessionkey=sessionkey)
finally:
del sessionkey
# Build the MIME message
# First the small "version" part ...
mp1 = email.mime.application.MIMEApplication('Version: 1',
_subtype='pgp-encrypted',
_encoder=email.encoders.encode_noop)
mp1['Content-Description'] = 'PGP/MIME version identification'
mp1['Content-Disposition'] = 'attachment'
# ... then the actual encrypted payload ...
mp2 = email.mime.application.MIMEApplication(str(pgp),
_subtype='octet-stream',
_encoder=email.encoders.encode_noop,
name='encrypted.asc')
mp2['Content-Description'] = 'OpenPGP encrypted message'
mp2['Content-Disposition'] = 'inline; filename="message.asc"'
# ... and finally the root multipart container
mp0 = email.mime.multipart.MIMEMultipart(_subtype='encrypted',
protocol='application/pgp-encrypted')
mp0.attach(mp1)
mp0.attach(mp2)
return mp0
class KeyConflictMessage(Message):
def __init__(self,
schleuder: str,
chosen: SchleuderSubscriber,
affected: List[SchleuderSubscriber],
digest: str,
mail_from: str,
template: str,
sourcemap: Dict[int, str]):
# Render the message body
fpr = 'no key' if chosen.key is None else chosen.key.fingerprint
_chosen = f'{fpr} {chosen.email}'
_affected = ''
for a in affected:
fpr = 'no key' if a.key is None else a.key.fingerprint
aschleuder: str = sourcemap.get(a.schleuder, 'unknown')
_affected += f'{fpr} {aschleuder}\n'
content = template.format(
subscriber=chosen.email,
schleuder=schleuder,
chosen=_chosen,
affected=_affected
)
self._chosen: SchleuderSubscriber = chosen
self._affected: List[SchleuderSubscriber] = affected
self._template: str = template
self._sourcemap: Dict[int, str] = sourcemap
super().__init__(
schleuder=schleuder,
mail_from=mail_from,
mail_to=chosen.email,
content=content,
encrypt_to=[s.key.blob for s in affected if s.key is not None],
encrypt_may_fail=True # Permit unencrypted fallback so the user gets notified of the conflict anyway
)
self.mime['Subject'] = f'MultiSchleuder {self._schleuder} - Key Conflict'
self.mime['X-MultiSchleuder-Digest'] = digest
@property
def report_str(self) -> str:
fpr = 'no key' if self._chosen.key is None else self._chosen.key.fingerprint
s = f'{self._chosen.email} -> {fpr}\n'
for a in self._affected:
fpr = 'no key' if a.key is None else a.key.fingerprint
aschleuder: str = self._sourcemap.get(a.schleuder, 'unknown')
s += f'- {fpr} ({aschleuder})\n'
return s
class UserConflictMessage(Message):
def __init__(self,
schleuder: str,
subscriber: str,
chosen: SchleuderSubscriber,
affected: List[SchleuderSubscriber],
digest: str,
mail_from: str,
template: str,
sourcemap: Dict[int, str]):
# Render the message body
assert chosen.key is not None
_chosen = chosen.email
_affected = ''
for a in affected:
aschleuder: str = sourcemap.get(a.schleuder, 'unknown')
_affected += f'{a.email} {aschleuder}\n'
content = template.format(
subscriber=subscriber,
fingerprint=chosen.key.fingerprint,
schleuder=schleuder,
chosen=_chosen,
affected=_affected
)
self._chosen: SchleuderSubscriber = chosen
self._affected: List[SchleuderSubscriber] = affected
self._template: str = template
self._sourcemap: Dict[int, str] = sourcemap
super().__init__(
schleuder=schleuder,
mail_from=mail_from,
mail_to=subscriber,
content=content,
encrypt_to=[chosen.key.blob],
encrypt_may_fail=True # Permit unencrypted fallback so the user gets notified of the conflict anyway
)
self.mime['Subject'] = f'MultiSchleuder {self._schleuder} - Subscriber Conflict'
self.mime['X-MultiSchleuder-Digest'] = digest
@property
def report_str(self) -> str:
assert self._chosen.key is not None
s = f'{self._chosen.key.fingerprint} -> {self._chosen.email}\n'
for a in self._affected:
aschleuder: str = self._sourcemap.get(a.schleuder, 'unknown')
s += f'- {a.email} ({aschleuder})\n'
return s
class AdminReport(Message):
def __init__(self,
schleuder: str,
mail_to: str,
mail_from: str,
encrypt_to: Optional[str],
subscribed: Set[SchleuderSubscriber],
unsubscribed: Set[SchleuderSubscriber],
updated: Set[SchleuderSubscriber],
added: Set[SchleuderKey],
removed: Set[SchleuderKey],
conflicts: List[Optional[Message]],
sourcemap: Dict[int, str]):
if len(subscribed) == 0 and len(unsubscribed) == 0 and len(removed) == 0 \
and len(added) == 0 and len(updated) == 0 and len(conflicts) == 0:
raise ValueError('No changes, not creating admin report')
key_conflicts: List[KeyConflictMessage] =\
[m for m in conflicts if m is not None and isinstance(m, KeyConflictMessage)]
_user_conflicts: Dict[str, UserConflictMessage] = {}
# Make sure the user conflicts are unique
for m in conflicts:
if m is None or not isinstance(m, UserConflictMessage):
continue
assert m._chosen.key is not None
fpr = m._chosen.key.fingerprint
if fpr not in _user_conflicts:
_user_conflicts[fpr] = m
user_conflicts: List[UserConflictMessage] = list(_user_conflicts.values())
# Assemble the content
content = f'''
== Admin Report for MultiSchleuder {schleuder} ==
'''
if len(subscribed) > 0:
content += '''
>>> Subscribed:
'''
for s in subscribed:
schleuder: str = sourcemap.get(s.schleuder, 'unknown')
content += f'{s.email} ({schleuder})\n'
if len(unsubscribed) > 0:
content += '''
>>> Unsubscribed:
'''
for s in unsubscribed:
content += f'{s.email}\n'
if len(updated) > 0:
content += '''
>>> Subscriber keys changed:
'''
for s in updated:
fpr = 'no key' if s.key is None else s.key.fingerprint
content += f'{s.email} ({fpr})\n'
if len(added) > 0:
content += '''
>>> Keys added:
'''
for k in added:
content += f'{k.fingerprint} ({k.email})\n'
if len(removed) > 0:
content += '''
>>> Keys deleted:
'''
for k in removed:
content += f'{k.fingerprint} ({k.email})\n'
if len(key_conflicts) > 0:
content += '''
>>> Keys conflicts:
'''
for c in key_conflicts:
content += f'{c.report_str}\n'
if len(user_conflicts) > 0:
content += '''
>>> User conflicts:
'''
for u in user_conflicts:
content += f'{u.report_str}\n'
super().__init__(
schleuder=schleuder,
mail_from=mail_from,
mail_to=mail_to,
content=content,
encrypt_to=[encrypt_to] if encrypt_to is not None else []
)
self.mime['Subject'] = f'MultiSchleuder Admin Report: {self._schleuder}'
class Reporter:
_messages: List['Message'] = []
def __init__(self,
send_conflict_messages: bool,
send_admin_reports: bool):
self._send_conflict_messages: bool = send_conflict_messages
self._send_admin_reports: bool = send_admin_reports
def add_message(self, message: Optional[Message]):
if message is None:
return
if not self._send_conflict_messages and isinstance(message, KeyConflictMessage):
return
if not self._send_conflict_messages and isinstance(message, UserConflictMessage):
return
if not self._send_admin_reports and isinstance(message, AdminReport):
return
self.__class__._messages.append(message)
def add_messages(self, messages: List[Optional[Message]]):
for msg in messages:
self.add_message(msg)
@classmethod
def get_messages(cls) -> List['Message']:
return list(cls._messages)
@classmethod
def clear_messages(cls):
cls._messages.clear()