Apparently PGPy cant encrypt to ed25519 keys :( Fall back to unencrypted conflict message if any of the encryptions failed

This commit is contained in:
s3lph 2022-04-17 00:51:41 +02:00
parent c236b6825a
commit 1c517bd8a7
3 changed files with 39 additions and 24 deletions

View file

@ -1,8 +1,10 @@
from typing import Dict, List, Optional from typing import Dict, List, Optional
import email.mime.base
import email.mime.application import email.mime.application
import email.mime.multipart import email.mime.multipart
import email.mime.text
import email.utils import email.utils
import hashlib import hashlib
import json import json
@ -56,7 +58,7 @@ class ConflictMessage:
return self._digest return self._digest
@property @property
def mime(self) -> email.mime.multipart.MIMEMultipart: def mime(self) -> email.mime.base.MIMEBase:
# Render the message body # Render the message body
fpr = 'N/A' if self._chosen.key is None else self._chosen.key.fingerprint fpr = 'N/A' if self._chosen.key is None else self._chosen.key.fingerprint
_chosen = f'{fpr} {self._chosen.email}' _chosen = f'{fpr} {self._chosen.email}'
@ -70,6 +72,26 @@ class ConflictMessage:
chosen=_chosen, chosen=_chosen,
affected=_affected affected=_affected
) )
# Encrypt to all keys, if possible. Fall back to unencrypted otherwise - PGPy does not
# support every possible key algorithm yet, esp. it can't encrypt to ed25519 keys.
try:
mime: email.mime.base.MIMEBase = self._encrypt_message(msg)
except pgpy.errors.PGPEncryptionError:
mime = email.mime.text.MIMEText(msg, _subtype='plain', _charset='utf-8')
# Set all the email headers
mime['Subject'] = f'MultiSchleuder {self._schleuder} - Key Conflict'
mime['From'] = self._from
mime['Reply-To'] = self._from
mime['To'] = self._chosen.email
mime['Date'] = email.utils.formatdate()
mime['Auto-Submitted'] = 'auto-generated'
mime['Precedence'] = 'list'
mime['List-Id'] = f'<{self._schleuder.replace("@", ".")}>'
mime['List-Help'] = '<https://gitlab.com/s3lph/multischleuder>'
mime['X-MultiSchleuder-Digest'] = self._digest
return mime
def _encrypt_message(self, msg: str) -> email.mime.base.MIMEBase:
pgp = pgpy.PGPMessage.new(msg) pgp = pgpy.PGPMessage.new(msg)
# Encrypt the message to all keys # Encrypt the message to all keys
cipher = pgpy.constants.SymmetricKeyAlgorithm.AES256 cipher = pgpy.constants.SymmetricKeyAlgorithm.AES256
@ -95,17 +117,6 @@ class ConflictMessage:
mp0 = email.mime.multipart.MIMEMultipart(_subtype='encrypted', protocol='application/pgp-encrypted') mp0 = email.mime.multipart.MIMEMultipart(_subtype='encrypted', protocol='application/pgp-encrypted')
mp0.attach(mp1) mp0.attach(mp1)
mp0.attach(mp2) mp0.attach(mp2)
# Set all the email headers
mp0['Subject'] = f'MultiSchleuder {self._schleuder} - Key Conflict'
mp0['From'] = self._from
mp0['Reply-To'] = self._from
mp0['To'] = self._chosen.email
mp0['Date'] = email.utils.formatdate()
mp0['Auto-Submitted'] = 'auto-generated'
mp0['Precedence'] = 'list'
mp0['List-Id'] = f'<{self._schleuder.replace("@", ".")}>'
mp0['List-Help'] = '<https://gitlab.com/s3lph/multischleuder>'
mp0['X-MultiSchleuder-Digest'] = self._digest
return mp0 return mp0
@ -165,7 +176,8 @@ class KeyConflictResolution:
f.seek(0) f.seek(0)
try: try:
state: Dict[str, int] = json.load(f) state: Dict[str, int] = json.load(f)
except: except BaseException:
self._logger.exception('Cannot read statefile. WARNING: This could lead to spamming')
# TODO: This could lead to a situation where multischleuder becomes a spammer # TODO: This could lead to a situation where multischleuder becomes a spammer
state = {} state = {}
# Remove all state entries older than conflict_interval # Remove all state entries older than conflict_interval

View file

@ -203,7 +203,8 @@ class TestSchleuderApi(unittest.TestCase):
def test_subscribe(self, mock): def test_subscribe(self, mock):
api = self._mock_api(mock) api = self._mock_api(mock)
now = datetime.utcnow() now = datetime.utcnow()
key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9', 'andy.example@example.org', 'verylongpgpkeyblock', 42) key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9',
'andy.example@example.org', 'verylongpgpkeyblock', 42)
sub = SchleuderSubscriber(23, 'andy.example@example.org', key, 42, now) sub = SchleuderSubscriber(23, 'andy.example@example.org', key, 42, now)
api.subscribe(sub, SchleuderList(42, '', '')) api.subscribe(sub, SchleuderList(42, '', ''))
self.assertEqual('https://localhost:4443/subscriptions.json?list_id=42', self.assertEqual('https://localhost:4443/subscriptions.json?list_id=42',
@ -223,7 +224,8 @@ class TestSchleuderApi(unittest.TestCase):
def test_unsubscribe(self, mock): def test_unsubscribe(self, mock):
api = self._mock_api(mock) api = self._mock_api(mock)
now = datetime.utcnow() now = datetime.utcnow()
key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9', 'andy.example@example.org', 'verylongpgpkeyblock', 42) key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9',
'andy.example@example.org', 'verylongpgpkeyblock', 42)
sub = SchleuderSubscriber(23, 'andy.example@example.org', key, 42, now) sub = SchleuderSubscriber(23, 'andy.example@example.org', key, 42, now)
api.unsubscribe(sub, SchleuderList(42, '', '')) api.unsubscribe(sub, SchleuderList(42, '', ''))
self.assertEqual('https://localhost:4443/subscriptions/23.json', self.assertEqual('https://localhost:4443/subscriptions/23.json',
@ -235,7 +237,8 @@ class TestSchleuderApi(unittest.TestCase):
def test_update_fingerprint(self, mock): def test_update_fingerprint(self, mock):
api = self._mock_api(mock) api = self._mock_api(mock)
now = datetime.utcnow() now = datetime.utcnow()
key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9', 'andy.example@example.org', 'verylongpgpkeyblock', 42) key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9',
'andy.example@example.org', 'verylongpgpkeyblock', 42)
sub = SchleuderSubscriber(23, 'andy.example@example.org', key, 42, now) sub = SchleuderSubscriber(23, 'andy.example@example.org', key, 42, now)
api.update_fingerprint(sub, SchleuderList(42, '', '')) api.update_fingerprint(sub, SchleuderList(42, '', ''))
self.assertEqual('https://localhost:4443/subscriptions/23.json', self.assertEqual('https://localhost:4443/subscriptions/23.json',
@ -265,7 +268,8 @@ class TestSchleuderApi(unittest.TestCase):
@patch('urllib.request.urlopen') @patch('urllib.request.urlopen')
def test_post_key(self, mock): def test_post_key(self, mock):
api = self._mock_api(mock) api = self._mock_api(mock)
key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9', 'andy.example@example.org', 'verylongpgpkeyblock', 42) key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9',
'andy.example@example.org', 'verylongpgpkeyblock', 42)
api.post_key(key, SchleuderList(42, '', '')) api.post_key(key, SchleuderList(42, '', ''))
self.assertEqual('https://localhost:4443/keys.json?list_id=42', self.assertEqual('https://localhost:4443/keys.json?list_id=42',
mock.call_args_list[0][0][0].get_full_url()) mock.call_args_list[0][0][0].get_full_url())
@ -275,7 +279,8 @@ class TestSchleuderApi(unittest.TestCase):
@patch('urllib.request.urlopen') @patch('urllib.request.urlopen')
def test_delete_key(self, mock): def test_delete_key(self, mock):
api = self._mock_api(mock) api = self._mock_api(mock)
key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9', 'andy.example@example.org', 'verylongpgpkeyblock', 42) key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9',
'andy.example@example.org', 'verylongpgpkeyblock', 42)
api.delete_key(key, SchleuderList(42, '', '')) api.delete_key(key, SchleuderList(42, '', ''))
self.assertEqual('https://localhost:4443/keys/ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9.json?list_id=42', self.assertEqual('https://localhost:4443/keys/ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9.json?list_id=42',
mock.call_args_list[0][0][0].get_full_url()) mock.call_args_list[0][0][0].get_full_url())
@ -287,7 +292,8 @@ class TestSchleuderApi(unittest.TestCase):
api = self._mock_api(mock) api = self._mock_api(mock)
api.dry_run() api.dry_run()
now = datetime.utcnow() now = datetime.utcnow()
key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9', 'andy.example@example.org', 'verylongpgpkeyblock', 42) key = SchleuderKey('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9',
'andy.example@example.org', 'verylongpgpkeyblock', 42)
sub = SchleuderSubscriber(23, 'andy.example@example.org', key, 42, now) sub = SchleuderSubscriber(23, 'andy.example@example.org', key, 42, now)
sch = SchleuderList(42, '', '') sch = SchleuderList(42, '', '')
# create, update, delete should be no-ops; 5 requests for retrieving the keys & subscriptions in unsub & update # create, update, delete should be no-ops; 5 requests for retrieving the keys & subscriptions in unsub & update

View file

@ -23,12 +23,9 @@ class TestSchleuderTypes(unittest.TestCase):
self.assertEqual('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9', k.fingerprint) self.assertEqual('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9', k.fingerprint)
self.assertEqual('andy.example@example.org', k.email) self.assertEqual('andy.example@example.org', k.email)
self.assertEqual(42, k.schleuder) self.assertEqual(42, k.schleuder)
self.assertIn('-----BEGIN PGP PUBLIC KEY BLOCK-----', k.blob) self.assertTrue(k.blob.strip().startswith('-----BEGIN PGP PUBLIC KEY BLOCK-----'))
self.assertTrue(k.blob.strip().endswith('-----END PGP PUBLIC KEY BLOCK-----'))
self.assertIn('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9 (andy.example@example.org)', str(k)) self.assertIn('ADB9BC679FF53CC8EF66FAC39348FDAB7A7663F9 (andy.example@example.org)', str(k))
# Make sure the key can be used by PGPy
key, _ = pgpy.PGPKey.from_blob(k.blob)
msg = pgpy.PGPMessage.new('Hello World')
key.encrypt(msg)
def test_parse_subscriber(self): def test_parse_subscriber(self):
k = SchleuderKey.from_api(42, **json.loads(_KEY_RESPONSE)) k = SchleuderKey.from_api(42, **json.loads(_KEY_RESPONSE))