318 lines
12 KiB
Python
318 lines
12 KiB
Python
|
|
from typing import Dict, List, Optional, Set
|
|
|
|
import abc
|
|
import email.encoders
|
|
import email.mime.base
|
|
import email.mime.application
|
|
import email.mime.multipart
|
|
import email.mime.text
|
|
import email.utils
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import struct
|
|
from collections import OrderedDict
|
|
from datetime import datetime
|
|
|
|
import pgpy # type: ignore
|
|
|
|
from multischleuder.types import SchleuderKey, SchleuderSubscriber
|
|
|
|
|
|
class Message(abc.ABC):
|
|
|
|
def __init__(self,
|
|
schleuder: str,
|
|
mail_from: str,
|
|
mail_to: str,
|
|
content: str,
|
|
encrypt_to: List[str],
|
|
encrypt_may_fail: bool = False):
|
|
self._schleuder: str = schleuder
|
|
self._from: str = mail_from
|
|
self._to: str = mail_to
|
|
self._keys: List[str] = encrypt_to
|
|
self._mime: email.mime.base.MIMEBase = self._make_mime(content, encrypt_may_fail)
|
|
|
|
@property
|
|
def mime(self) -> email.mime.base.MIMEBase:
|
|
return self._mime
|
|
|
|
def _make_mime(self, content: str, encrypt_may_fail: bool) -> email.mime.base.MIMEBase:
|
|
# Encrypt to all keys, if possible. Fall back to unencrypted otherwise
|
|
try:
|
|
self._mime = self._encrypt_message(content)
|
|
except Exception as e:
|
|
if encrypt_may_fail:
|
|
logging.exception('Encryption failed; falling back to unencrypted message')
|
|
self._mime = email.mime.text.MIMEText(content, _subtype='plain', _charset='utf-8')
|
|
else:
|
|
logging.exception('Encryption failed; Not sending this message')
|
|
raise e
|
|
# Set all the email headers
|
|
self._mime['From'] = self._from
|
|
self._mime['Reply-To'] = self._from
|
|
self._mime['To'] = self._to
|
|
self._mime['Date'] = email.utils.formatdate()
|
|
self._mime['Auto-Submitted'] = 'auto-generated'
|
|
self._mime['Precedence'] = 'list'
|
|
self._mime['List-Id'] = f'<{self._schleuder.replace("@", ".")}>'
|
|
self._mime['List-Help'] = '<https://gitlab.com/s3lph/multischleuder>'
|
|
return self._mime
|
|
|
|
def _encrypt_message(self, content: str) -> email.mime.base.MIMEBase:
|
|
if len(self._keys) == 0:
|
|
return email.mime.text.MIMEText(content, _subtype='plain', _charset='utf-8')
|
|
pgp = pgpy.PGPMessage.new(content)
|
|
# Encrypt the message to all keys
|
|
cipher = pgpy.constants.SymmetricKeyAlgorithm.AES256
|
|
sessionkey = cipher.gen_key()
|
|
try:
|
|
for k in self._keys:
|
|
key, _ = pgpy.PGPKey.from_blob(k)
|
|
pgp = key.encrypt(pgp, cipher=cipher, sessionkey=sessionkey)
|
|
finally:
|
|
del sessionkey
|
|
# Build the MIME message
|
|
# First the small "version" part ...
|
|
mp1 = email.mime.application.MIMEApplication('Version: 1',
|
|
_subtype='pgp-encrypted',
|
|
_encoder=email.encoders.encode_noop)
|
|
mp1['Content-Description'] = 'PGP/MIME version identification'
|
|
mp1['Content-Disposition'] = 'attachment'
|
|
# ... then the actual encrypted payload ...
|
|
mp2 = email.mime.application.MIMEApplication(str(pgp),
|
|
_subtype='octet-stream',
|
|
_encoder=email.encoders.encode_noop,
|
|
name='encrypted.asc')
|
|
mp2['Content-Description'] = 'OpenPGP encrypted message'
|
|
mp2['Content-Disposition'] = 'inline; filename="message.asc"'
|
|
# ... and finally the root multipart container
|
|
mp0 = email.mime.multipart.MIMEMultipart(_subtype='encrypted',
|
|
protocol='application/pgp-encrypted')
|
|
mp0.attach(mp1)
|
|
mp0.attach(mp2)
|
|
return mp0
|
|
|
|
|
|
class KeyConflictMessage(Message):
|
|
|
|
def __init__(self,
|
|
schleuder: str,
|
|
chosen: SchleuderSubscriber,
|
|
affected: List[SchleuderSubscriber],
|
|
digest: str,
|
|
mail_from: str,
|
|
template: str,
|
|
sourcemap: Dict[int, str]):
|
|
# Render the message body
|
|
fpr = 'no key' if chosen.key is None else chosen.key.fingerprint
|
|
_chosen = f'{fpr} {chosen.email}'
|
|
_affected = ''
|
|
for a in affected:
|
|
fpr = 'no key' if a.key is None else a.key.fingerprint
|
|
aschleuder: str = sourcemap.get(a.schleuder, 'unknown')
|
|
_affected += f'{fpr} {aschleuder}\n'
|
|
content = template.format(
|
|
subscriber=chosen.email,
|
|
schleuder=schleuder,
|
|
chosen=_chosen,
|
|
affected=_affected
|
|
)
|
|
self._chosen: SchleuderSubscriber = chosen
|
|
self._affected: List[SchleuderSubscriber] = affected
|
|
self._template: str = template
|
|
self._sourcemap: Dict[int, str] = sourcemap
|
|
super().__init__(
|
|
schleuder=schleuder,
|
|
mail_from=mail_from,
|
|
mail_to=chosen.email,
|
|
content=content,
|
|
encrypt_to=[s.key.blob for s in affected if s.key is not None],
|
|
encrypt_may_fail=True # Permit unencrypted fallback so the user gets notified of the conflict anyway
|
|
)
|
|
self.mime['Subject'] = f'MultiSchleuder {self._schleuder} - Key Conflict'
|
|
self.mime['X-MultiSchleuder-Digest'] = digest
|
|
|
|
@property
|
|
def report_str(self) -> str:
|
|
fpr = 'no key' if self._chosen.key is None else self._chosen.key.fingerprint
|
|
s = f'{self._chosen.email} -> {fpr}\n'
|
|
for a in self._affected:
|
|
fpr = 'no key' if a.key is None else a.key.fingerprint
|
|
aschleuder: str = self._sourcemap.get(a.schleuder, 'unknown')
|
|
s += f'- {fpr} ({aschleuder})\n'
|
|
return s
|
|
|
|
|
|
class UserConflictMessage(Message):
|
|
|
|
def __init__(self,
|
|
schleuder: str,
|
|
subscriber: str,
|
|
chosen: SchleuderSubscriber,
|
|
affected: List[SchleuderSubscriber],
|
|
digest: str,
|
|
mail_from: str,
|
|
template: str,
|
|
sourcemap: Dict[int, str]):
|
|
# Render the message body
|
|
assert chosen.key is not None
|
|
_chosen = chosen.email
|
|
_affected = ''
|
|
for a in affected:
|
|
aschleuder: str = sourcemap.get(a.schleuder, 'unknown')
|
|
_affected += f'{a.email} {aschleuder}\n'
|
|
content = template.format(
|
|
subscriber=subscriber,
|
|
fingerprint=chosen.key.fingerprint,
|
|
schleuder=schleuder,
|
|
chosen=_chosen,
|
|
affected=_affected
|
|
)
|
|
self._chosen: SchleuderSubscriber = chosen
|
|
self._affected: List[SchleuderSubscriber] = affected
|
|
self._template: str = template
|
|
self._sourcemap: Dict[int, str] = sourcemap
|
|
super().__init__(
|
|
schleuder=schleuder,
|
|
mail_from=mail_from,
|
|
mail_to=subscriber,
|
|
content=content,
|
|
encrypt_to=[chosen.key.blob],
|
|
encrypt_may_fail=True # Permit unencrypted fallback so the user gets notified of the conflict anyway
|
|
)
|
|
self.mime['Subject'] = f'MultiSchleuder {self._schleuder} - Subscriber Conflict'
|
|
self.mime['X-MultiSchleuder-Digest'] = digest
|
|
|
|
@property
|
|
def report_str(self) -> str:
|
|
assert self._chosen.key is not None
|
|
s = f'{self._chosen.key.fingerprint} -> {self._chosen.email}\n'
|
|
for a in self._affected:
|
|
aschleuder: str = self._sourcemap.get(a.schleuder, 'unknown')
|
|
s += f'- {a.email} ({aschleuder})\n'
|
|
return s
|
|
|
|
|
|
class AdminReport(Message):
|
|
|
|
def __init__(self,
|
|
schleuder: str,
|
|
mail_to: str,
|
|
mail_from: str,
|
|
encrypt_to: Optional[str],
|
|
subscribed: Set[SchleuderSubscriber],
|
|
unsubscribed: Set[SchleuderSubscriber],
|
|
updated: Set[SchleuderSubscriber],
|
|
added: Set[SchleuderKey],
|
|
removed: Set[SchleuderKey],
|
|
conflicts: List[Optional[Message]],
|
|
sourcemap: Dict[int, str]):
|
|
if len(subscribed) == 0 and len(unsubscribed) == 0 and len(removed) == 0 \
|
|
and len(added) == 0 and len(updated) == 0 and len(conflicts) == 0:
|
|
raise ValueError('No changes, not creating admin report')
|
|
key_conflicts: List[KeyConflictMessage] =\
|
|
[m for m in conflicts if m is not None and isinstance(m, KeyConflictMessage)]
|
|
_user_conflicts: Dict[str, UserConflictMessage] = {}
|
|
# Make sure the user conflicts are unique
|
|
for m in conflicts:
|
|
if m is None or not isinstance(m, UserConflictMessage):
|
|
continue
|
|
assert m._chosen.key is not None
|
|
fpr = m._chosen.key.fingerprint
|
|
if fpr not in _user_conflicts:
|
|
_user_conflicts[fpr] = m
|
|
user_conflicts: List[UserConflictMessage] = list(_user_conflicts.values())
|
|
# Assemble the content
|
|
content = f'''
|
|
== Admin Report for MultiSchleuder {schleuder} ==
|
|
'''
|
|
if len(subscribed) > 0:
|
|
content += '''
|
|
>>> Subscribed:
|
|
'''
|
|
for s in subscribed:
|
|
sschleuder: str = sourcemap.get(s.schleuder, 'unknown')
|
|
content += f'{s.email} ({sschleuder})\n'
|
|
if len(unsubscribed) > 0:
|
|
content += '''
|
|
>>> Unsubscribed:
|
|
'''
|
|
for s in unsubscribed:
|
|
content += f'{s.email}\n'
|
|
if len(updated) > 0:
|
|
content += '''
|
|
>>> Subscriber keys changed:
|
|
'''
|
|
for s in updated:
|
|
fpr = 'no key' if s.key is None else s.key.fingerprint
|
|
content += f'{s.email} ({fpr})\n'
|
|
if len(added) > 0:
|
|
content += '''
|
|
>>> Keys added:
|
|
'''
|
|
for k in added:
|
|
content += f'{k.fingerprint} ({k.email})\n'
|
|
if len(removed) > 0:
|
|
content += '''
|
|
>>> Keys deleted:
|
|
'''
|
|
for k in removed:
|
|
content += f'{k.fingerprint} ({k.email})\n'
|
|
if len(key_conflicts) > 0:
|
|
content += '''
|
|
>>> Keys conflicts:
|
|
'''
|
|
for c in key_conflicts:
|
|
content += f'{c.report_str}\n'
|
|
if len(user_conflicts) > 0:
|
|
content += '''
|
|
>>> User conflicts:
|
|
'''
|
|
for u in user_conflicts:
|
|
content += f'{u.report_str}\n'
|
|
|
|
super().__init__(
|
|
schleuder=schleuder,
|
|
mail_from=mail_from,
|
|
mail_to=mail_to,
|
|
content=content,
|
|
encrypt_to=[encrypt_to] if encrypt_to is not None else []
|
|
)
|
|
self.mime['Subject'] = f'MultiSchleuder Admin Report: {self._schleuder}'
|
|
|
|
|
|
class Reporter:
|
|
|
|
_messages: List['Message'] = []
|
|
|
|
def __init__(self,
|
|
send_conflict_messages: bool,
|
|
send_admin_reports: bool):
|
|
self._send_conflict_messages: bool = send_conflict_messages
|
|
self._send_admin_reports: bool = send_admin_reports
|
|
|
|
def add_message(self, message: Optional[Message]):
|
|
if message is None:
|
|
return
|
|
if not self._send_conflict_messages and isinstance(message, KeyConflictMessage):
|
|
return
|
|
if not self._send_conflict_messages and isinstance(message, UserConflictMessage):
|
|
return
|
|
if not self._send_admin_reports and isinstance(message, AdminReport):
|
|
return
|
|
self.__class__._messages.append(message)
|
|
|
|
def add_messages(self, messages: List[Optional[Message]]):
|
|
for msg in messages:
|
|
self.add_message(msg)
|
|
|
|
@classmethod
|
|
def get_messages(cls) -> List['Message']:
|
|
return list(cls._messages)
|
|
|
|
@classmethod
|
|
def clear_messages(cls):
|
|
cls._messages.clear()
|