#!/usr/bin/env python3 import abc import time from getpass import getpass from datetime import datetime, timezone import time import imaplib import poplib import smtplib import subprocess import urllib.error import urllib.parse import urllib.request import email from email.mime.application import MIMEApplication from email.mime.text import MIMEText from email.parser import BytesParser from email.utils import format_datetime from xml.etree import ElementTree def _get_pgp_message(message: email.message.Message) -> bytes: pgp = None for part in message.walk(): if part.is_multipart(): continue p = part.get_content() if isinstance(p, str): p = p.encode() if b'BEGIN PGP MESSAGE' in p: if pgp is not None: raise ValueError('More than one encrypted message part') pgp = p if pgp is None: raise ValueError('No encrypted message part') return pgp class MailServerConfig(abc.ABC): def __init__(self, proto, hostname, port, tls, username, password): self.proto = proto self.hostname = hostname self.port = port self.tls = tls self.username = username self.password = password def __repr__(self): proto = self.proto + ('s' if self.tls == 'SSL' else '') + ('+starttls' if self.tls == 'STARTTLS' else '') return f'{proto}://{self.username}@{self.hostname}:{self.port}' def __eq__(self, other): if not isinstance(other, MailServerConfig): return False return self.proto == other.proto \ and self.hostname == other.hostname \ and self.port == other.port \ and self.tls == other.tls \ and self.username == other.username \ and self.password == other.password def __lt__(self, other): if not isinstance(other, MailServerConfig): raise TypeError(f'Cannot compare {type(self)} to {type(other)}') if self == other: return False # SMTP not comparable to IMAP or POP3 if (self.proto == 'smtp' or other.proto == 'smtp') and self.proto != other.proto: raise TypeError(f'Cannot compare {type(self)} to {type(other)}') # IMAP < POP3 if self.proto == 'imap' and other.proto == 'pop3': return True # SSL < STARTTLS < plain if (self.tls == 'SSL' and other.tls != 'SSL') or (self.tls == 'STARTTLS' and other.tls == 'plain'): return True # full email < local part if '@' in self.username and '@' not in other.username: return True def __enter__(self): pass def __exit__(self, exc_type, exc_val, exc_tb): pass class SmtpServerConfig(MailServerConfig): def __init__(self, hostname, port, tls, username, password): super().__init__('smtp', hostname, port, tls, username, password) self._smtp = None def __enter__(self): cls = smtplib.SMTP if self.tls != 'SSL' else smtplib.SMTP_SSL self._smtp = cls(self.hostname, self.port) smtp = self._smtp.__enter__() if self.tls == 'STARTTLS': smtp.starttls() smtp.login(self.username, self.password) return smtp def __exit__(self, exc_type, exc_val, exc_tb): ret = self._smtp.__exit__(exc_type, exc_val, exc_tb) self._smtp = None return ret def send_message(self, msg): if self._smtp is None: raise RuntimeError('SMTP connection is not established') self._smtp.send_message(msg) class IncomingServerConfig(MailServerConfig, abc.ABC): def __init__(self, proto, hostname, port, tls, username, password): super().__init__(proto, hostname, port, tls, username, password) def __enter__(self): pass def __exit__(self, exc_type, exc_val, exc_tb): pass def get_new_messages(self): pass class ImapServerConfig(IncomingServerConfig): def __init__(self, hostname, port, tls, username, password): super().__init__('imap', hostname, port, tls, username, password) self._imap = None self._uidnext = None def __enter__(self): cls = imaplib.IMAP4 if self.tls != 'SSL' else imaplib.IMAP4_SSL self._imap = cls(self.hostname, self.port) imap = self._imap.__enter__() if self.tls == 'STARTTLS': imap.starttls() imap.login(self.username, self.password) imap.select('INBOX', readonly=True) self._uidnext = int(imap.response('UIDNEXT')[1][0].decode()) return imap def __exit__(self, exc_type, exc_val, exc_tb): ret = self._imap.__exit__(exc_type, exc_val, exc_tb) self._imap = None self._uidnext = None return ret def get_new_messages(self): if self._imap is None or self._uidnext is None: raise RuntimeError('IMAP connection is not established') self._imap.select('INBOX', readonly=True) u = int(self._imap.response('UIDNEXT')[1][0].decode()) if u > self._uidnext: messages = self._imap.uid('fetch', f'{self._uidnext}:{u - 1}', '(RFC822)') for message in messages[1]: if not isinstance(message, tuple) or b'RFC822' not in message[0]: # Not an email message continue yield message[1] self._uidnext = u class Pop3ServerConfig(IncomingServerConfig): def __init__(self, hostname, port, tls, username, password): super().__init__('pop3', hostname, port, tls, username, password) self._pop3: poplib.POP3 = None self._messagelist = None def __enter__(self): cls = poplib.POP3 if self.tls != 'SSL' else poplib.POP3_SSL self._pop3 = cls(self.hostname, self.port) if self.tls == 'STARTTLS': self._pop3.stls() self._pop3.user(self.username) self._pop3.pass_(self.password) self._messagelist = set(self._pop3.list()[1]) return self._pop3 def __exit__(self, exc_type, exc_val, exc_tb): self._messagelist = None self._pop3.close() self._pop3 = None def get_new_messages(self): if self._pop3 is None or self._messagelist is None: raise RuntimeError('POP3 connection is not established') ml = set(self._pop3.list()[1]) for msg in ml.difference(self._messagelist): msgid, _ = msg.split(' ', 1) message = '\r\n'.join(self._pop3.retr(msgid)[1]) yield message[1] self._messagelist = ml def default_port(proto: str, tls: str): if proto == 'imap': return 993 if tls == 'SSL' else 143 if proto == 'pop3': return 995 if tls == 'SSL' else 110 if proto == 'smtp': return 465 if tls == 'SSL' else 587 raise ValueError(f'Unknown protocol: {proto}') def template_username(template: str, address: str, userinputs): user, domain = address.split('@', 1) template = template \ .replace('%EMAILADDRESS%', address) \ .replace('%EMAILLOCALPART%', user) \ .replace('%EMAILDOMAIN%', domain) for key in userinputs: if key not in template: continue label, value = userinputs[key] if value is None: value = getpass(f'Autoconfiguration field "{label}" (will not echo): ') userinputs[key] = label, value template = template.replace(key, value) return template def parse_xml_mailserver(xmle, address: str, password: str, userinputs): proto = xmle.get('type') hostname = xmle.find('./hostname').text tls = xmle.find('./socketType').text port = int(xmle.find('./port').text or default_port(proto, tls)) username = template_username(xmle.find('./username').text, address, userinputs) pw = xmle.find('./password') if pw: pw = template_username(pw.text, address, userinputs) else: pw = password if proto == 'smtp': return SmtpServerConfig(hostname, port, tls, username, pw) elif proto == 'imap': return ImapServerConfig(hostname, port, tls, username, pw) elif proto == 'pop3': return Pop3ServerConfig(hostname, port, tls, username, pw) def parse_thunderbird_autoconfig(xml: str, address: str, password: str): user, domain = address.split('@', 1) root = ElementTree.fromstring(xml) userinputs = {} for ui in root.findall('.//userinput'): k = ui.get('key') label = ui.get('label') userinputs[k] = (label, None) incoming = root.findall(f"./emailProvider/domain[.='{domain}']/../incomingServer") outgoing = root.findall(f"./emailProvider/domain[.='{domain}']/../outgoingServer") iconf = [parse_xml_mailserver(i, address, password, userinputs) for i in incoming] oconf = [parse_xml_mailserver(o, address, password, userinputs) for o in outgoing] return iconf, oconf def tb_wellknown_autoconfig(address: str, password: str): user, domain = address.split('@', 1) subdomain = f'autoconfig.{domain}' sdurl = urllib.parse.urlunsplit(('http', subdomain, 'mail/config-v1.1.xml', f'emailaddress={address}', '')) mdurl = urllib.parse.urlunsplit(('http', domain, '.well-known/autoconfig/mail/config-v1.1.xml', f'emailaddress={address}', '')) try: with urllib.request.urlopen(sdurl) as sdresponse: return parse_thunderbird_autoconfig(sdresponse.read().decode(), address, password) except urllib.error.URLError: try: with urllib.request.urlopen(mdurl) as mdresponse: return parse_thunderbird_autoconfig(mdresponse.read().decode(), address, password) except urllib.error.URLError: return None def tb_ispdb_autoconfig(address: str, password: str): user, domain = address.split('@', 1) ispdb = f'https://autoconfig.thunderbird.net/v1.1/{domain}' try: with urllib.request.urlopen(ispdb) as response: return parse_thunderbird_autoconfig(response.read().decode(), address, password) except urllib.error.URLError: return None def manual_config(address: str, password: str): print('Autoconfiguration has failed. Please enter the mail server settings manually.') host = input('SMTP hostname: ') tls = input('TLS ("plain", "SSL" or "STARTLS (default: STARTTLS): ') or 'STARTTLS' p = default_port('smtp', tls) port = int(input(f'SMTP port (default: {p}): ') or str(p)) outconfig = SmtpServerConfig(host, port, tls, address, password) proto = input('Mail Retrieval protocol ("imap" or "pop3", default: "imap"): ') or 'imap' host = input(f'{proto.upper()} hostname: ') tls = input('TLS ("plain", "SSL" or "STARTLS (default: SSL): ') or 'SSL' p = default_port(proto, tls) port = int(input(f'{proto.upper()} port (default: {p}): ') or str(p)) if proto == 'imap': inconfig = ImapServerConfig(host, port, tls, address, password) else: inconfig = Pop3ServerConfig(host, port, tls, address, password) return [inconfig], [outconfig] def parse_dns_srv_record(record, service: str, address: str, password: str): user, domain = address.split('@', 1) if service == '_submission': cls = SmtpServerConfig elif service == '_imaps' or service == '_imap': cls = ImapServerConfig elif service == '_pop3s' or service == '_pop3': cls = Pop3ServerConfig else: raise ValueError(service) if service.endswith('s'): tls = ['SSL'] else: tls = ['STARTTLS', 'plain'] configs = [] for t in tls: configs.append(cls(record.target, record.port, t, address, password)) configs.append(cls(record.target, record.port, t, user, password)) return configs def rfc6186_autoconfig(address: str, password: str): try: import dns.resolver import dns.rdtypes.IN.SRV except ImportError: print('"dnspython" dependency missing. Skipping RFC 6186 autoconfig') return None user, domain = address.split('@', 1) smtpdomain = f'_submission._tcp.{domain}.' imapsdomain = f'_imaps._tcp.{domain}.' imapdomain = f'_imap._tcp.{domain}.' pop3sdomain = f'_pop3s._tcp.{domain}.' pop3domain = f'_pop3._tcp.{domain}.' osrv = [] isrv = [] try: smtpanswer: dns.resolver.Answer = dns.resolver.resolve(smtpdomain, dns.rdatatype.SRV) except dns.exception.DNSException: return None for srv in smtpanswer: if srv.rdtype != dns.rdatatype.SRV or srv.rdclass != dns.rdataclass.IN: continue osrv.append((smtpanswer.canonical_name.labels[0].decode(), srv)) for sd in [imapsdomain, imapdomain, pop3sdomain, pop3domain]: try: answer: dns.resolver.Answer = dns.resolver.resolve(sd, dns.rdatatype.SRV) for srv in answer: if srv.rdtype != dns.rdatatype.SRV or srv.rdclass != dns.rdataclass.IN: continue if srv.port == 0 and str(srv.target) == '.': continue isrv.append((answer.canonical_name.labels[0].decode(), srv)) except dns.exception.DNSException: continue incoming = [] outgoing = [] for (iqname, irec) in isrv: incoming.extend(parse_dns_srv_record(irec, iqname, address, password)) for (oqname, orec) in isrv: outgoing.extend(parse_dns_srv_record(orec, oqname, address, password)) return incoming, outgoing def _parse_confirmation_request(address, fingerprint, encrypted): gpg = subprocess.Popen(['/usr/bin/gpg', '--decrypt'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) gpg.stdin.write(encrypted) gpg.stdin.close() gpg.wait() if gpg.returncode != 0: raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') decrypted = gpg.stdout.read().decode() rdict = {} for line in decrypted.splitlines(): if ':' not in line: continue key, value = line.split(':', 1) rdict[key.strip()] = value.strip() if rdict.get('type', '') != 'confirmation-request': raise ValueError('Invalid confirmation request: "type" missing or not "confirmation-request"') if 'sender' not in rdict or 'address' not in rdict or 'fingerprint' not in rdict or 'nonce' not in rdict: raise ValueError('Invalid confirmation request: a mandatory item is missing from the request') if rdict['address'] != address: raise ValueError(f'Confirmation address "{rdict["address"]}" does not match my address "{address}"') if rdict['fingerprint'].replace(' ', '') != fingerprint.replace(' ', ''): raise ValueError( f'Confirmation fingerprint "{rdict["fingerprint"]}" does not match my fingerprint "{fingerprint}"') print(f'Nonce: {rdict["nonce"]}') return rdict['sender'], rdict['nonce'] def _create_submission_request(address: str, submission_address: str, fingerprint: str, revoked_fingerprints): gpg = subprocess.Popen([ '/usr/bin/gpg', '--locate-keys', '--with-colons', submission_address ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) gpg.wait() if gpg.returncode != 0: raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') print('Retrieved submission key') gpg = subprocess.Popen([ '/usr/bin/gpg', '--armor', '--export-options', 'export-minimal', '--export', fingerprint ] + revoked_fingerprints, stdout=subprocess.PIPE, stderr=subprocess.PIPE) gpg.wait() if gpg.returncode != 0: raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') print('Retrieved key to publish') pubkey = gpg.stdout.read() gpg = subprocess.Popen([ '/usr/bin/gpg', '--armor', '--with-colons', '--encrypt', '--trust-model', 'always', '--local-user', address, '--recipient', address, '--recipient', submission_address ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) gpg.stdin.write(pubkey) gpg.stdin.close() try: gpg.wait(timeout=2) except subprocess.TimeoutExpired: gpg.kill() print(f'gpg subprocess timed out; stderr: {gpg.stderr.read()}') raise RuntimeError(f'gpg subprocess timed out; stderr: {gpg.stderr.read()}') if gpg.returncode != 0: raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') print('Created encrypted message') encrypted = gpg.stdout.read().decode() mail = MIMEText(encrypted, _subtype='plain') mail['Subject'] = 'WKS submission request' mail['To'] = submission_address mail['From'] = address mail['Date'] = format_datetime(datetime.now(timezone.utc)) return mail def _create_confirmation_response(address: str, submission: str, nonce: str, fp: str, content_subtype: str): response_template = '\r\n'.join([ 'type: confirmation-response', f'sender: {address}', f'nonce: {nonce}', '' ]) payload = MIMEText(response_template, _subtype='plain') gpg = subprocess.Popen([ '/usr/bin/gpg', '--armor', '--with-colons', '--encrypt', '--sign', '--trust-model', 'always', '--local-user', fp, '--recipient', address, '--recipient', submission ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) gpg.stdin.write(payload.as_string(policy=email.policy.default).encode()) gpg.stdin.close() gpg.wait() if gpg.returncode != 0: raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') encrypted = gpg.stdout.read().decode() mail = MIMEApplication(encrypted, _subtype=content_subtype) mail['Subject'] = 'WKS confirmation response' mail['To'] = submission mail['From'] = address mail['Date'] = format_datetime(datetime.now(timezone.utc)) return mail def handle_incoming_message(address, fingerprint, rfc822, smtp_config: SmtpServerConfig): msg: email.message.EmailMessage = BytesParser(policy=email.policy.default).parsebytes(rfc822) if msg.get('wks-phase', '') == 'done': pgp = _get_pgp_message(msg) if pgp is None: print('WKS key submission successful. Congratulations!') return True print('Decrypting WKS response. GnuPG may prompt you for your passphrase.') gpg = subprocess.Popen(['/usr/bin/gpg', '--batch', '--decrypt', '--skip-verify'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) gpg.stdin.write(pgp) gpg.stdin.close() gpg.wait() if gpg.returncode != 0: print('It seems WKS submission was successful, however decryption of the submission response failed: ' + gpg.stderr.read().decode()) return True decrypted = email.parser.BytesParser(policy=email.policy.default).parsebytes(gpg.stdout.read()) body = decrypted.get_body(preferencelist=('plain',)) if body is None: print('WKS key submission successful. Congratulations!') else: print(body.get_content()) return True if msg.get('wks-phase', '') == 'error': body = msg.get_body(preferencelist=('plain',)) if body is None: print('WKS key submission failed. However, the WKS server did not return an error description.') else: print(body.get_content()) return True for leaf in msg.walk(): if leaf.get_content_type() not in ['application/vnd.gnupg.wkd', 'application/vnd.gnupg.wks']: continue print('Received confirmation request') try: submission, nonce = _parse_confirmation_request(address, fingerprint, leaf.get_content()) except BaseException as e: print(f'Parsing failed: {e}') continue print('Creating confirmation response. GnuPG may prompt you for your passphrase.') response = _create_confirmation_response(address, submission, nonce, fingerprint, leaf.get_content_subtype()) print('Sending confirmation response') with smtp_config: smtp_config.send_message(response) print('Awaiting publish response') return False def _gpg_get_uid_fp(address: str): gpg = subprocess.Popen([ '/usr/bin/gpg', '--with-colons', '--list-keys', address ], stdout=subprocess.PIPE) gpg.wait() if gpg.returncode != 0: raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') keylist = gpg.stdout.read().decode() pubs = [] revoked = [] fprs = [] for line in keylist.splitlines(): if line.startswith('pub:'): pub = line.split(':')[4] r = line.split(':')[1] == 'r' pubs.append(pub) revoked.append(r) elif line.startswith('fpr:'): fpr = line.split(':')[9] fprs.append(fpr) valid = {next((f for f in fprs if f.endswith(pub))): pub for i, pub in enumerate(pubs) if not revoked[i]} revoked = {next((f for f in fprs if f.endswith(pub))): pub for i, pub in enumerate(pubs) if revoked[i]} if len(valid) == 0: raise ValueError(f'No valid key found for {address}.') elif len(valid) > 1: print(f'Found multiple keys for {address}, please choose:') fpridx = list(valid.keys()) for i, f in enumerate(fpridx, start=1): print(f'{i}: {f}') i = int(input('Enter number: ')) - 1 fpr = fpridx[i] else: fpr = list(valid.keys())[0] if len(revoked) > 0: print(f'There are revoked keys for {address}. Please choose which to upload (separate multiple by spaces): ') revidx = list(revoked.keys()) for i, f in enumerate(revidx, start=1): print(f'{i}: {f}') rids = [int(i)-1 for i in input('Enter number(s): ').split()] rfprs = [revidx[i] for i in rids] else: rfprs = [] return fpr, rfprs def _get_submission_address(address: str): _, domain = address.split('@', 1) advanced_url = f'https://openpgpkey.{domain}/.well-known/openpgpkey/{domain}/submission-address' direct_url = f'https://{domain}/.well-known/openpgpkey/submission-address' try: with urllib.request.urlopen(advanced_url) as response: return response.read().decode().strip() except urllib.error.URLError: pass with urllib.request.urlopen(direct_url) as response: return response.read().decode().strip() def main(): ad = input('Enter email: ') sa = None try: sa = _get_submission_address(ad) except urllib.error.URLError: print('No WKS submission address found. Does your provider support WKS?') exit(1) fp, rfprs = _gpg_get_uid_fp(ad) print(f'Chose {fp}') for rfpr in rfprs: print(f'Chose revoked key {rfpr}') pw = getpass('Enter IMAP/POP3/SMTP password (will not echo): ') for fn in [tb_wellknown_autoconfig, rfc6186_autoconfig, tb_ispdb_autoconfig, manual_config]: autoconf = fn(ad, pw) if autoconf is not None: break if autoconf is None: raise RuntimeError('No autoconfig available') incoming, outgoing = autoconf # Find the first working server configurations incoming_server = None outgoing_server = None for i in sorted(incoming): try: with i: incoming_server = i break except: continue if incoming_server is None: raise RuntimeError('No working IMAP/POP3 server found through autoconfiguration. Please specify manually.') print(f'Autoconfigured incoming server: {incoming_server}') for i in sorted(outgoing): try: with i: outgoing_server = i break except: continue if outgoing_server is None: raise RuntimeError('No working SMTP server found through autoconfiguration. Please specify manually.') print(f'Autoconfigured outgoing server: {outgoing_server}') confirm = input('Please confirm: [Y/n] ') if confirm.lower() not in ['', 'y', 'yes']: print('Aborted') exit(1) with incoming_server: now = time.monotonic() done = False request = _create_submission_request(ad, sa, fp, rfprs) print('Sending submission request') with outgoing_server: outgoing_server.send_message(request) print('Awaiting response') while not done and time.monotonic() - now < 300: time.sleep(5) for message in incoming_server.get_new_messages(): done = handle_incoming_message(ad, fp, message, outgoing_server) if done: break if __name__ == '__main__': main()