166 lines
6.4 KiB
Python
166 lines
6.4 KiB
Python
import sys
|
|
from typing import List, Dict
|
|
|
|
from .crypto import pgp_decrypt
|
|
from .mailing import get_mailing_method
|
|
from .config import Config
|
|
from .files import read_pending_key, write_public_key, remove_pending_key, write_pending_key
|
|
from .types import SubmissionRequest, ConfirmationResponse, PublishResponse, ConfirmationRequest, EasyWksError,\
|
|
XLOOP_HEADER
|
|
|
|
from email.message import MIMEPart, Message
|
|
from email.parser import BytesParser
|
|
from email.policy import default
|
|
from email.utils import getaddresses
|
|
|
|
from pgpy import PGPMessage, PGPKey, PGPUID
|
|
from pgpy.errors import PGPError
|
|
|
|
|
|
def _get_mime_leafs(msg: Message) -> List[MIMEPart]:
|
|
stack = [msg]
|
|
leafs = []
|
|
while len(stack) > 0:
|
|
node = stack.pop()
|
|
if node.is_multipart():
|
|
stack.extend(node.get_payload())
|
|
else:
|
|
leafs.append(node)
|
|
return leafs
|
|
|
|
|
|
def _get_pgp_message(parts: List[MIMEPart]) -> PGPMessage:
|
|
pgp = None
|
|
for part in parts:
|
|
try:
|
|
p: PGPMessage = PGPMessage.from_blob(part.get_content())
|
|
except ValueError:
|
|
continue
|
|
if p.is_encrypted:
|
|
if pgp is not None:
|
|
raise EasyWksError('More than one encrypted message part')
|
|
pgp = p
|
|
if pgp is None:
|
|
raise EasyWksError('No encrypted message part')
|
|
return pgp
|
|
|
|
|
|
def _get_pgp_publickey(parts: List[MIMEPart]) -> PGPKey:
|
|
pubkey = None
|
|
for part in parts:
|
|
try:
|
|
key, _ = PGPKey.from_blob(part.get_content())
|
|
if key.is_public:
|
|
if pubkey:
|
|
raise EasyWksError('More than one PGP public key in message')
|
|
pubkey = key
|
|
except PGPError:
|
|
pass
|
|
if not pubkey:
|
|
raise EasyWksError('No PGP public key in message')
|
|
return pubkey
|
|
|
|
|
|
def _parse_submission_request(pgp: PGPMessage, submission: str, sender: str):
|
|
payload = BytesParser(policy=default).parsebytes(pgp.message)
|
|
leafs = _get_mime_leafs(payload)
|
|
pubkey = _get_pgp_publickey(leafs)
|
|
sender_uid: PGPUID = pubkey.get_uid(sender)
|
|
if sender_uid is None or sender_uid.email != sender:
|
|
raise EasyWksError(f'Key has no UID that matches {sender}')
|
|
return SubmissionRequest(sender, submission, pubkey)
|
|
|
|
|
|
def _find_confirmation_response(parts: List[MIMEPart]) -> str:
|
|
response = None
|
|
for part in parts:
|
|
if part.get('type', '') == 'confirmation-response':
|
|
# the message wasn't a MIME message; return content as-is
|
|
return str(part)
|
|
c = part.get_content()
|
|
if isinstance(c, bytes):
|
|
try:
|
|
c = c.decode()
|
|
except UnicodeDecodeError:
|
|
# obviously not our part
|
|
continue
|
|
if not isinstance(c, str):
|
|
# not our part either
|
|
continue
|
|
if 'confirmation-response' in c:
|
|
response = c
|
|
continue
|
|
if not response:
|
|
raise EasyWksError('No confirmation response found in message')
|
|
return response
|
|
|
|
|
|
def _parse_confirmation_response(pgp: PGPMessage, submission: str, sender: str):
|
|
payload = BytesParser(policy=default).parsebytes(pgp.message)
|
|
parts = _get_mime_leafs(payload)
|
|
response = _find_confirmation_response(parts)
|
|
rdict: Dict[str, str] = {}
|
|
for line in response.splitlines():
|
|
if ':' not in line:
|
|
continue
|
|
key, value = line.split(':', 1)
|
|
rdict[key.strip()] = value.strip()
|
|
if rdict.get('type', '') != 'confirmation-response':
|
|
raise EasyWksError('Invalid confirmation response: "type" missing or not "confirmation-response"')
|
|
if 'sender' not in rdict or 'nonce' not in rdict:
|
|
raise EasyWksError('Invalid confirmation response: "sender" or "nonce" missing from confirmation response')
|
|
if rdict['sender'] != sender:
|
|
raise EasyWksError('Confirmation sender does not match message sender')
|
|
return ConfirmationResponse(rdict['sender'], submission, rdict['nonce'], pgp)
|
|
|
|
|
|
def process_mail(mail: bytes):
|
|
try:
|
|
msg: Message = BytesParser(policy=default).parsebytes(mail)
|
|
_, sender_mail = getaddresses([msg['from']])[0]
|
|
local, sender_domain = sender_mail.split('@', 1)
|
|
except ValueError:
|
|
raise EasyWksError('Sender mail is not a valid mail address')
|
|
if sender_domain not in Config.domains:
|
|
raise EasyWksError(f'Domain {sender_domain} not supported')
|
|
if msg.get('x-loop', '') == XLOOP_HEADER or 'auto-submitted' in msg:
|
|
# Mail has somehow looped back to us, discard
|
|
return
|
|
submission_address: str = Config[sender_domain].submission_address
|
|
try:
|
|
rcpt = getaddresses(msg.get_all('to', []) + msg.get_all('cc', []))
|
|
if len(rcpt) != 1:
|
|
raise EasyWksError('Message has more than one recipients')
|
|
_, rcpt_mail = rcpt[0]
|
|
if rcpt_mail != submission_address:
|
|
raise EasyWksError(f'Message not addressed to submission address {submission_address} '
|
|
f'for domain {sender_domain}')
|
|
leafs = _get_mime_leafs(msg)
|
|
pgp: PGPMessage = _get_pgp_message(leafs)
|
|
decrypted = pgp_decrypt(sender_domain, pgp)
|
|
if decrypted.is_signed:
|
|
request: ConfirmationResponse = _parse_confirmation_response(decrypted, submission_address, sender_mail)
|
|
try:
|
|
key = read_pending_key(sender_domain, request.nonce)
|
|
except FileNotFoundError:
|
|
raise EasyWksError('There is no submission request for this email address, or it has expired. '
|
|
'Please resubmit your submission request.')
|
|
# this throws an error if signature verification fails
|
|
response: PublishResponse = request.verify_signature(key)
|
|
rmsg = response.create_signed_message()
|
|
write_public_key(sender_domain, sender_mail, key)
|
|
remove_pending_key(sender_domain, request.nonce)
|
|
else:
|
|
request: SubmissionRequest = _parse_submission_request(decrypted, submission_address, sender_mail)
|
|
response: ConfirmationRequest = request.confirmation_request()
|
|
rmsg = response.create_signed_message()
|
|
write_pending_key(sender_domain, response.nonce, request.key)
|
|
except EasyWksError as e:
|
|
rmsg = e.create_message(sender_mail, submission_address)
|
|
method = get_mailing_method(Config.mailing_method)
|
|
method(rmsg)
|
|
|
|
|
|
def process_mail_from_stdin():
|
|
mail = sys.stdin.read().encode()
|
|
process_mail(mail)
|