feat: allow submitting additional revoked keys with the submission request

This commit is contained in:
s3lph 2023-01-31 00:46:23 +01:00
parent fddbea70d9
commit 4e1465cdb2
5 changed files with 96 additions and 33 deletions

View file

@ -8,7 +8,7 @@ from pgpy import PGPKey
from .config import Config
from .crypto import create_pgp_key, privkey_to_pubkey
from .util import hash_user_id
from .util import hash_user_id, armor_keys, split_revoked
def _locked_read(file: str, binary: bool = False):
@ -62,33 +62,34 @@ def init_working_directory():
def read_public_key(domain, user):
hu = hash_user_id(user)
keyfile = os.path.join(Config.working_directory, domain, 'hu', hu)
key, _ = PGPKey.from_blob(_locked_read(keyfile, binary=True))
return key
return read_hashed_public_key(domain, hash_user_id(user))
def read_hashed_public_key(domain, hu):
keyfile = os.path.join(Config.working_directory, domain, 'hu', hu)
key, _ = PGPKey.from_blob(_locked_read(keyfile, binary=True))
return key
_, keys = PGPKey.from_blob(_locked_read(keyfile, binary=True))
key, revoked = split_revoked(keys.values())
return key, revoked
def write_public_key(domain, user, key):
def write_public_key(domain, user, key, revoked):
hu = hash_user_id(user)
keyfile = os.path.join(Config.working_directory, domain, 'hu', hu)
_locked_write(keyfile, bytes(key), binary=True)
joined = bytes(key) + b''.join([bytes(k) for k in revoked])
_locked_write(keyfile, joined, binary=True)
def read_pending_key(domain, nonce):
keyfile = os.path.join(Config.working_directory, domain, 'pending', nonce)
key, _ = PGPKey.from_blob(_locked_read(keyfile))
return key
_, keys = PGPKey.from_blob(_locked_read(keyfile))
key, revoked = split_revoked(keys.values())
return key[0], revoked
def write_pending_key(domain, nonce, key):
def write_pending_key(domain, nonce, key, revoked_keys):
keyfile = os.path.join(Config.working_directory, domain, 'pending', nonce)
_locked_write(keyfile, str(key))
armored = armor_keys([key] + revoked_keys)
_locked_write(keyfile, armored)
def remove_pending_key(domain, nonce):

View file

@ -59,10 +59,10 @@ def advanced_hu(domain: str, userhash: str):
if not userid or hash_user_id(userid) != userhash:
abort(404, 'Not Found')
try:
pubkey = read_hashed_public_key(domain, userhash)
pubkey, revoked = read_hashed_public_key(domain, userhash)
response.add_header('Content-Type', 'application/octet-stream')
response.add_header(*CORS)
return bytes(pubkey)
return bytes(pubkey) + b''.join([bytes(k) for k in revoked])
except FileNotFoundError:
abort(404, 'Not Found')

View file

@ -1,5 +1,5 @@
import sys
from typing import List, Dict
from typing import List, Dict, Tuple
from .crypto import pgp_decrypt
from .mailing import get_mailing_method
@ -8,6 +8,7 @@ from .files import read_pending_key, write_public_key, remove_pending_key, write
from .types import SubmissionRequest, ConfirmationResponse, PublishResponse, ConfirmationRequest, EasyWksError,\
XLOOP_HEADER
from .types import fingerprint
from .util import split_revoked
from email.message import MIMEPart, Message
from email.parser import BytesParser
@ -46,30 +47,43 @@ def _get_pgp_message(parts: List[MIMEPart]) -> PGPMessage:
return pgp
def _get_pgp_publickey(parts: List[MIMEPart]) -> PGPKey:
pubkey = None
def _get_pgp_publickeys(parts: List[MIMEPart]) -> Tuple[PGPKey, List[PGPKey]]:
pubkeys: Dict[str, PGPKey] = {}
for part in parts:
try:
key, _ = PGPKey.from_blob(part.get_content())
if key.is_public:
if pubkey:
raise EasyWksError('More than one PGP public key in message. Only submit a single key at once.')
pubkey = key
_, keys = PGPKey.from_blob(part.get_content())
for (_, public), key in keys.items():
if not public:
continue
fpr = fingerprint(key.fingerprint)
if fpr in pubkeys:
raise EasyWksError(f'Key with fingerprint {fpr} appears multiple times in submission request.')
pubkeys[fpr] = key
except PGPError:
pass
if not pubkey:
if len(pubkeys) == 0:
raise EasyWksError('No PGP public key found in the encrypted message part.')
return pubkey
key, revoked_keys = split_revoked(pubkeys.values())
if len(key) < 1:
raise EasyWksError('All of the submitted keys appear to be revoked.')
elif len(key) > 1:
fprs = ' '.join([fingerprint(k) for k in key])
raise EasyWksError(f'More than one non-revoked key was submitted: {fprs}')
return key[0], revoked_keys
def _parse_submission_request(pgp: PGPMessage, submission: str, sender: str):
payload = BytesParser(policy=default).parsebytes(pgp.message)
leafs = _get_mime_leafs(payload)
pubkey = _get_pgp_publickey(leafs)
sender_uid: PGPUID = pubkey.get_uid(sender)
valid_key, revoked_keys = _get_pgp_publickeys(leafs)
sender_uid: PGPUID = valid_key.get_uid(sender)
if sender_uid is None or sender_uid.email != sender:
raise EasyWksError(f'Key has no UID that matches {sender}')
return SubmissionRequest(sender, submission, pubkey)
for key in revoked_keys:
sender_uid: PGPUID = key.get_uid(sender)
if sender_uid is None or sender_uid.email != sender:
raise EasyWksError(f'Revoked key {fingerprint(key)} has no UID that matches {sender}')
return SubmissionRequest(sender, submission, valid_key, revoked_keys)
def _find_confirmation_response(parts: List[MIMEPart]) -> str:
@ -152,7 +166,7 @@ def process_mail(mail: bytes):
if confirmation_response is not None:
request: ConfirmationResponse = _parse_confirmation_response(decrypted, submission_address, sender_mail)
try:
key = read_pending_key(sender_domain, request.nonce)
key, revoked_keys = read_pending_key(sender_domain, request.nonce)
except FileNotFoundError:
raise EasyWksError('There is no submission request for this email address, or it has expired. '
'Please resubmit your submission request.')
@ -160,13 +174,13 @@ def process_mail(mail: bytes):
request.verify_signature(key)
response: PublishResponse = request.get_publish_response(key)
rmsg = response.create_signed_message()
write_public_key(sender_domain, sender_mail, key)
write_public_key(sender_domain, sender_mail, key, revoked_keys)
remove_pending_key(sender_domain, request.nonce)
else:
request: SubmissionRequest = _parse_submission_request(decrypted, submission_address, sender_mail)
response: ConfirmationRequest = request.confirmation_request()
rmsg = response.create_signed_message()
write_pending_key(sender_domain, response.nonce, request.key)
write_pending_key(sender_domain, response.nonce, request.key, request.revoked_keys)
except EasyWksError as e:
rmsg = e.create_message(sender_mail, submission_address)
method = get_mailing_method(Config.mailing_method)
@ -200,5 +214,5 @@ def process_key_from_stdin(args):
print(f'Skipping foreign email {uid.email}')
continue
# All checks passed, importing key
write_public_key(domain, uid.email, pubkey)
write_public_key(domain, uid.email, pubkey, [])
print(f'Imported key {fingerprint(pubkey)} for email {uid.email}')

View file

@ -1,4 +1,6 @@
from typing import List
from datetime import datetime
from email.encoders import encode_noop
from email.policy import default
@ -20,10 +22,11 @@ XLOOP_HEADER = 'EasyWKS'
class SubmissionRequest:
def __init__(self, submitter_addr: str, submission_addr: str, key: PGPKey):
def __init__(self, submitter_addr: str, submission_addr: str, key: PGPKey, revoked_keys: List[PGPKey]):
self._submitter_addr = submitter_addr
self._submission_addr = submission_addr
self._key = key
self._revoked_keys = revoked_keys
def confirmation_request(self) -> 'ConfirmationRequest':
return ConfirmationRequest(self._submitter_addr, self. _submission_addr, self._key)
@ -40,6 +43,10 @@ class SubmissionRequest:
def key(self):
return self._key
@property
def revoked_keys(self):
return list(self._revoked_keys)
class ConfirmationRequest:

View file

@ -1,9 +1,14 @@
from typing import Iterable, List, Tuple
import base64
import hashlib
import secrets
import string
import textwrap
from pgpy import PGPKey
from pgpy.constants import SignatureType
def _zrtp_base32(sha1: bytes) -> str:
@ -36,3 +41,39 @@ def create_nonce(n: int = 32) -> str:
def fingerprint(key: PGPKey) -> str:
return key.fingerprint.upper().replace(' ', '')
def crc24(data: bytes) -> bytes:
# https://www.rfc-editor.org/rfc/rfc4880#section-6.1
crc = 0xB704CE
for b in data:
crc ^= (b << 16)
for _ in range(8):
crc <<= 1
if crc & 0x1000000:
crc ^= 0x1864CFB
return bytes([(crc & 0xff0000) >> 16, (crc & 0xff00) >> 8, (crc & 0xff)])
def armor_keys(keys: List[PGPKey]) -> str:
joined = b''.join([bytes(k) for k in keys])
armored = base64.b64encode(joined).decode()
wrapped = '\n'.join(textwrap.wrap(armored, 64))
checksum = base64.b64encode(crc24(joined)).decode()
armored = '-----BEGIN PGP PUBLIC KEY BLOCK-----\n\n' +\
wrapped + '\n=' + checksum +\
'\n-----END PGP PUBLIC KEY BLOCK-----\n'
return armored
def split_revoked(keys: Iterable[PGPKey]) -> Tuple[List[PGPKey], List[PGPKey]]:
revoked_keys = set()
for key in keys:
if len(key.revocation_signatures) == 0:
continue
for rsig in key.revocation_signatures:
if rsig.type == SignatureType.KeyRevocation:
revoked_keys.add(key)
break
key = [k for k in keys if k not in revoked_keys]
return key, list(revoked_keys)