Do not send unencrypted admin reports

This commit is contained in:
s3lph 2022-04-25 23:06:08 +02:00
parent 283641ee5c
commit 612334ae8d
4 changed files with 59 additions and 17 deletions

View file

@ -84,11 +84,14 @@ class MultiList:
logging.info(f'No changes for {self._target}')
else:
for admin in target_admins:
report = AdminReport(self._target, admin.email, self._mail_from,
admin.key.blob if admin.key is not None else None,
to_subscribe, to_unsubscribe, to_update, to_add, to_remove,
conflicts)
self._reporter.add_message(report)
try:
report = AdminReport(self._target, admin.email, self._mail_from,
admin.key.blob if admin.key is not None else None,
to_subscribe, to_unsubscribe, to_update, to_add, to_remove,
conflicts)
self._reporter.add_message(report)
except BaseException:
logging.exception(f'Encryption to {admin.email} failed, not sending report')
logging.info(f'Finished processing: {self._target}')
def _lists_by_name(self) -> Tuple[SchleuderList, List[SchleuderList]]:

View file

@ -27,24 +27,29 @@ class Message(abc.ABC):
mail_from: str,
mail_to: str,
content: str,
encrypt_to: List[str]):
encrypt_to: List[str],
encrypt_may_fail: bool = False):
self._schleuder: str = schleuder
self._from: str = mail_from
self._to: str = mail_to
self._keys: List[str] = encrypt_to
self._mime: email.mime.base.MIMEBase = self._make_mime(content)
self._mime: email.mime.base.MIMEBase = self._make_mime(content, encrypt_may_fail)
@property
def mime(self) -> email.mime.base.MIMEBase:
return self._mime
def _make_mime(self, content: str) -> email.mime.base.MIMEBase:
def _make_mime(self, content: str, encrypt_may_fail: bool) -> email.mime.base.MIMEBase:
# Encrypt to all keys, if possible. Fall back to unencrypted otherwise
try:
self._mime = self._encrypt_message(content)
except Exception:
logging.exception('Encryption failed; falling back to unencrypted message')
self._mime = email.mime.text.MIMEText(content, _subtype='plain', _charset='utf-8')
except Exception as e:
if encrypt_may_fail:
logging.exception('Encryption failed; falling back to unencrypted message')
self._mime = email.mime.text.MIMEText(content, _subtype='plain', _charset='utf-8')
else:
logging.exception('Encryption failed; Not sending this message')
raise e
# Set all the email headers
self._mime['From'] = self._from
self._mime['Reply-To'] = self._from
@ -124,7 +129,8 @@ class KeyConflictMessage(Message):
mail_from=mail_from,
mail_to=chosen.email,
content=content,
encrypt_to=[s.key.blob for s in affected if s.key is not None]
encrypt_to=[s.key.blob for s in affected if s.key is not None],
encrypt_may_fail=True # Permit unencrypted fallback so the user gets notified of the conflict anyway
)
self.mime['Subject'] = f'MultiSchleuder {self._schleuder} - Key Conflict'
self.mime['X-MultiSchleuder-Digest'] = digest
@ -174,7 +180,8 @@ class UserConflictMessage(Message):
mail_from=mail_from,
mail_to=subscriber,
content=content,
encrypt_to=[chosen.key.blob]
encrypt_to=[chosen.key.blob],
encrypt_may_fail=True # Permit unencrypted fallback so the user gets notified of the conflict anyway
)
self.mime['Subject'] = f'MultiSchleuder {self._schleuder} - Subscriber Conflict'
self.mime['X-MultiSchleuder-Digest'] = digest

View file

@ -9,6 +9,7 @@ from dateutil.tz import tzutc
from multischleuder.processor import MultiList
from multischleuder.reporting import Message
from multischleuder.test.test_conflict import _PRIVKEY_1
from multischleuder.types import SchleuderKey, SchleuderList, SchleuderSubscriber
@ -38,7 +39,7 @@ def _list_lists():
def _get_key(fpr: str, schleuder: SchleuderList):
key1 = SchleuderKey('966842467B3254143F994D5E5C408C012D216471',
'admin@example.org', 'BEGIN PGP 2D216471', schleuder.id)
'admin@example.org', str(_PRIVKEY_1.pubkey), schleuder.id)
key2 = SchleuderKey('6449FFB6EE68187962FA013B5CA2F4F51791BAF6',
'ada.lovelace@example.org', 'BEGIN PGP 1791BAF6', schleuder.id)
key3 = SchleuderKey('414D3960D34730F63C74D5190EBC5A16716DEC79',
@ -72,7 +73,7 @@ def _get_admins(schleuder: SchleuderList):
if schleuder.id != 2:
return []
key = SchleuderKey('966842467B3254143F994D5E5C408C012D216471',
'admin@example.org', 'BEGIN PGP 2D216471', schleuder.id)
'admin@example.org', str(_PRIVKEY_1.pubkey), schleuder.id)
date = datetime(2022, 4, 15, 5, 23, 42, 0, tzinfo=tzutc())
admin = SchleuderSubscriber(0, 'admin@example.org', key, schleuder.id, date)
return [admin]
@ -80,7 +81,7 @@ def _get_admins(schleuder: SchleuderList):
def _get_subs(schleuder: SchleuderList):
key1 = SchleuderKey('966842467B3254143F994D5E5C408C012D216471',
'admin@example.org', 'BEGIN PGP 2D216471', schleuder.id)
'admin@example.org', str(_PRIVKEY_1.pubkey), schleuder.id)
key2 = SchleuderKey('6449FFB6EE68187962FA013B5CA2F4F51791BAF6',
'ada.lovelace@example.org', 'BEGIN PGP 1791BAF6', schleuder.id)
key3 = SchleuderKey('414D3960D34730F63C74D5190EBC5A16716DEC79',

View file

@ -3,11 +3,27 @@ import unittest
from datetime import datetime
import pgpy.errors
from multischleuder.reporting import KeyConflictMessage, AdminReport, Reporter, UserConflictMessage
from multischleuder.types import SchleuderKey, SchleuderList, SchleuderSubscriber
from multischleuder.test.test_conflict import _PRIVKEY_1
BROKENKEY = '''
-----BEGIN PGP PUBLIC KEY BLOCK-----
mDMEYmcMbxYJKwYBBAHaRw8BAQdAKUohRdnuTSldKwawfLdwwUvOJjz/pHx3fXS2
v2dUQx+0SU11bHRpc2NobGV1ZGVyIEJyb2tlbiBBZG1pbiBLZXkgKFRFU1QgS0VZ
IERPIE5PVCBVU0UpIDxhZG1pbkBleGFtcGxlLm9yZz6IkAQTFggAOBYhBGtuFOnz
PJOCOdfv6OuAwhfh1Uj8BQJiZwxvAhsBBQsJCAcCBhUKCQgLAgQWAgMBAh4BAheA
AAoJEOuAwhfh1Uj8PnkBAM6PfYUZbvvYEkSdwzmZXDwhPRsSA0bhjL5aVwIeCCdp
AQDeImNI6czSLVAuwObKv8FnpmbFi3HxTNzakp44DoD8Aw==
=JtdI
-----END PGP PUBLIC KEY BLOCK-----
'''
def one_of_each_kind():
sub = SchleuderSubscriber(1, 'foo@example.org', None, 1, datetime.utcnow())
key = SchleuderKey(_PRIVKEY_1.fingerprint.replace(' ', ''), 'foo@example.org', str(_PRIVKEY_1.pubkey), 1)
@ -24,7 +40,7 @@ def one_of_each_kind():
schleuder='test@example.org',
mail_to='admin@example.org',
mail_from='test-owner@example.org',
encrypt_to=None,
encrypt_to=str(_PRIVKEY_1.pubkey),
subscribed={},
unsubscribed={sub},
updated={},
@ -94,3 +110,18 @@ class TestReporting(unittest.TestCase):
r.add_messages([None])
self.assertEqual(0, len(Reporter.get_messages()))
Reporter.clear_messages()
def test_admin_report_nokey(self):
sub = SchleuderSubscriber(1, 'foo@example.org', None, 1, datetime.utcnow())
with self.assertRaises(pgpy.errors.PGPError):
AdminReport(
schleuder='test@example.org',
mail_to='admin@example.org',
mail_from='test-owner@example.org',
encrypt_to=BROKENKEY,
subscribed={sub},
unsubscribed={},
updated={},
added={},
removed={},
conflicts=[])