From b1f7d993c9a27342d5a15912199ed70fc0c8aef9 Mon Sep 17 00:00:00 2001 From: s3lph Date: Wed, 20 Oct 2021 01:23:57 +0200 Subject: [PATCH] Add a small WKS client implementation --- client.py | 623 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 623 insertions(+) create mode 100755 client.py diff --git a/client.py b/client.py new file mode 100755 index 0000000..0d3770f --- /dev/null +++ b/client.py @@ -0,0 +1,623 @@ +#!/usr/bin/env python3 + +import abc +import time +from getpass import getpass +from datetime import datetime +import imaplib +import poplib +import smtplib +import subprocess +import urllib.error +import urllib.parse +import urllib.request +import email +from email.mime.application import MIMEApplication +from email.mime.text import MIMEText +from email.parser import BytesParser +from email.utils import format_datetime +from xml.etree import ElementTree + + +def _get_pgp_message(message: email.message.Message) -> bytes: + pgp = None + for part in message.walk(): + if part.is_multipart(): + continue + p = part.get_content() + if isinstance(p, str): + p = p.encode() + if b'BEGIN PGP MESSAGE' in p: + if pgp is not None: + raise ValueError('More than one encrypted message part') + pgp = p + if pgp is None: + raise ValueError('No encrypted message part') + return pgp + + +class MailServerConfig(abc.ABC): + + def __init__(self, proto, hostname, port, tls, username, password): + self.proto = proto + self.hostname = hostname + self.port = port + self.tls = tls + self.username = username + self.password = password + + def __repr__(self): + proto = self.proto + ('s' if self.tls == 'SSL' else '') + ('+starttls' if self.tls == 'STARTTLS' else '') + return f'{proto}://{self.username}@{self.hostname}:{self.port}' + + def __eq__(self, other): + if not isinstance(other, MailServerConfig): + return False + return self.proto == other.proto \ + and self.hostname == other.hostname \ + and self.port == other.port \ + and self.tls == other.tls \ + and self.username == other.username \ + and self.password == other.password + + def __lt__(self, other): + if not isinstance(other, MailServerConfig): + raise TypeError(f'Cannot compare {type(self)} to {type(other)}') + if self == other: + return False + # SMTP not comparable to IMAP or POP3 + if (self.proto == 'smtp' or other.proto == 'smtp') and self.proto != other.proto: + raise TypeError(f'Cannot compare {type(self)} to {type(other)}') + # IMAP < POP3 + if self.proto == 'imap' and other.proto == 'pop3': + return True + # SSL < STARTTLS < plain + if (self.tls == 'SSL' and other.tls != 'SSL') or (self.tls == 'STARTTLS' and other.tls == 'plain'): + return True + # full email < local part + if '@' in self.username and '@' not in other.username: + return True + + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + +class SmtpServerConfig(MailServerConfig): + + def __init__(self, hostname, port, tls, username, password): + super().__init__('smtp', hostname, port, tls, username, password) + self._smtp = None + + def __enter__(self): + cls = smtplib.SMTP if self.tls != 'SSL' else smtplib.SMTP_SSL + self._smtp = cls(self.hostname, self.port) + smtp = self._smtp.__enter__() + if self.tls == 'STARTTLS': + smtp.starttls() + smtp.login(self.username, self.password) + return smtp + + def __exit__(self, exc_type, exc_val, exc_tb): + ret = self._smtp.__exit__(exc_type, exc_val, exc_tb) + self._smtp = None + return ret + + def send_message(self, msg): + if self._smtp is None: + raise RuntimeError('SMTP connection is not established') + self._smtp.send_message(msg) + + +class IncomingServerConfig(MailServerConfig, abc.ABC): + + def __init__(self, proto, hostname, port, tls, username, password): + super().__init__(proto, hostname, port, tls, username, password) + + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def get_new_messages(self): + pass + + +class ImapServerConfig(IncomingServerConfig): + + def __init__(self, hostname, port, tls, username, password): + super().__init__('imap', hostname, port, tls, username, password) + self._imap = None + self._uidnext = None + + def __enter__(self): + cls = imaplib.IMAP4 if self.tls != 'SSL' else imaplib.IMAP4_SSL + self._imap = cls(self.hostname, self.port) + imap = self._imap.__enter__() + if self.tls == 'STARTTLS': + imap.starttls() + imap.login(self.username, self.password) + imap.select('INBOX', readonly=True) + self._uidnext = int(imap.response('UIDNEXT')[1][0].decode()) + return imap + + def __exit__(self, exc_type, exc_val, exc_tb): + ret = self._imap.__exit__(exc_type, exc_val, exc_tb) + self._imap = None + self._uidnext = None + return ret + + def get_new_messages(self): + if self._imap is None or self._uidnext is None: + raise RuntimeError('IMAP connection is not established') + self._imap.select('INBOX', readonly=True) + u = int(self._imap.response('UIDNEXT')[1][0].decode()) + if u > self._uidnext: + messages = self._imap.uid('fetch', f'{self._uidnext}:{u - 1}', '(RFC822)') + for message in messages[1]: + if not isinstance(message, tuple) or b'RFC822' not in message[0]: + # Not an email message + continue + yield message[1] + self._uidnext = u + + +class Pop3ServerConfig(IncomingServerConfig): + + def __init__(self, hostname, port, tls, username, password): + super().__init__('pop3', hostname, port, tls, username, password) + self._pop3: poplib.POP3 = None + self._messagelist = None + + def __enter__(self): + cls = poplib.POP3 if self.tls != 'SSL' else poplib.POP3_SSL + self._pop3 = cls(self.hostname, self.port) + if self.tls == 'STARTTLS': + self._pop3.stls() + self._pop3.user(self.username) + self._pop3.pass_(self.password) + self._messagelist = set(self._pop3.list()[1]) + return self._pop3 + + def __exit__(self, exc_type, exc_val, exc_tb): + self._messagelist = None + self._pop3.close() + self._pop3 = None + + def get_new_messages(self): + if self._pop3 is None or self._messagelist is None: + raise RuntimeError('POP3 connection is not established') + ml = set(self._pop3.list()[1]) + for msg in ml.difference(self._messagelist): + msgid, _ = msg.split(' ', 1) + message = '\r\n'.join(self._pop3.retr(msgid)[1]) + yield message[1] + self._messagelist = ml + + +def default_port(proto: str, tls: str): + if proto == 'imap': + return 993 if tls == 'SSL' else 143 + if proto == 'pop3': + return 995 if tls == 'SSL' else 110 + if proto == 'smtp': + return 465 if tls == 'SSL' else 587 + raise ValueError(f'Unknown protocol: {proto}') + + +def template_username(template: str, address: str, userinputs): + user, domain = address.split('@', 1) + template = template \ + .replace('%EMAILADDRESS%', address) \ + .replace('%EMAILLOCALPART%', user) \ + .replace('%EMAILDOMAIN%', domain) + for key in userinputs: + if key not in template: + continue + label, value = userinputs[key] + if value is None: + value = getpass(f'Autoconfiguration field "{label}" (will not echo): ') + userinputs[key] = label, value + template = template.replace(key, value) + return template + + +def parse_xml_mailserver(xmle, address: str, password: str, userinputs): + proto = xmle.get('type') + hostname = xmle.find('./hostname').text + tls = xmle.find('./socketType').text + port = int(xmle.find('./port').text or default_port(proto, tls)) + username = template_username(xmle.find('./username').text, address, userinputs) + pw = xmle.find('./password') + if pw: + pw = template_username(pw.text, address, userinputs) + else: + pw = password + if proto == 'smtp': + return SmtpServerConfig(hostname, port, tls, username, pw) + elif proto == 'imap': + return ImapServerConfig(hostname, port, tls, username, pw) + elif proto == 'pop3': + return Pop3ServerConfig(hostname, port, tls, username, pw) + + +def parse_thunderbird_autoconfig(xml: str, address: str, password: str): + user, domain = address.split('@', 1) + root = ElementTree.fromstring(xml) + userinputs = {} + for ui in root.findall('.//userinput'): + k = ui.get('key') + label = ui.get('label') + userinputs[k] = (label, None) + incoming = root.findall(f"./emailProvider/domain[.='{domain}']/../incomingServer") + outgoing = root.findall(f"./emailProvider/domain[.='{domain}']/../outgoingServer") + iconf = [parse_xml_mailserver(i, address, password, userinputs) for i in incoming] + oconf = [parse_xml_mailserver(o, address, password, userinputs) for o in outgoing] + return iconf, oconf + + +def tb_wellknown_autoconfig(address: str, password: str): + user, domain = address.split('@', 1) + subdomain = f'autoconfig.{domain}' + sdurl = urllib.parse.urlunsplit(('http', subdomain, 'mail/config-v1.1.xml', f'emailaddress={address}', '')) + mdurl = urllib.parse.urlunsplit(('http', domain, '.well-known/autoconfig/mail/config-v1.1.xml', + f'emailaddress={address}', '')) + try: + with urllib.request.urlopen(sdurl) as sdresponse: + return parse_thunderbird_autoconfig(sdresponse.read().decode(), address, password) + except urllib.error.URLError: + try: + with urllib.request.urlopen(mdurl) as mdresponse: + return parse_thunderbird_autoconfig(mdresponse.read().decode(), address, password) + except urllib.error.URLError: + return None + + +def tb_ispdb_autoconfig(address: str, password: str): + user, domain = address.split('@', 1) + ispdb = f'https://autoconfig.thunderbird.net/v1.1/{domain}' + try: + with urllib.request.urlopen(ispdb) as response: + return parse_thunderbird_autoconfig(response.read().decode(), address, password) + except urllib.error.URLError: + return None + + +def manual_config(address: str, password: str): + print('Autoconfiguration has failed. Please enter the mail server settings manually.') + host = input('SMTP hostname: ') + tls = input('TLS ("plain", "SSL" or "STARTLS (default: STARTTLS): ') or 'STARTTLS' + p = default_port('smtp', tls) + port = int(input(f'SMTP port (default: {p}): ') or str(p)) + outconfig = SmtpServerConfig(host, port, tls, address, password) + proto = input('Mail Retrieval protocol ("imap" or "pop3", default: "imap"): ') or 'imap' + host = input(f'{proto.upper()} hostname: ') + tls = input('TLS ("plain", "SSL" or "STARTLS (default: SSL): ') or 'SSL' + p = default_port(proto, tls) + port = int(input(f'{proto.upper()} port (default: {p}): ') or str(p)) + if proto == 'imap': + inconfig = ImapServerConfig(host, port, tls, address, password) + else: + inconfig = Pop3ServerConfig(host, port, tls, address, password) + return [inconfig], [outconfig] + + +def parse_dns_srv_record(record, service: str, address: str, password: str): + user, domain = address.split('@', 1) + if service == '_submission': + cls = SmtpServerConfig + elif service == '_imaps' or service == '_imap': + cls = ImapServerConfig + elif service == '_pop3s' or service == '_pop3': + cls = Pop3ServerConfig + else: + raise ValueError(service) + if service.endswith('s'): + tls = ['SSL'] + else: + tls = ['STARTTLS', 'plain'] + configs = [] + for t in tls: + configs.append(cls(record.target, record.port, t, address, password)) + configs.append(cls(record.target, record.port, t, user, password)) + return configs + + +def rfc6186_autoconfig(address: str, password: str): + try: + import dns.resolver + import dns.rdtypes.IN.SRV + except ImportError: + print('"dnspython" dependency missing. Skipping RFC 6186 autoconfig') + return None + user, domain = address.split('@', 1) + smtpdomain = f'_submission._tcp.{domain}.' + imapsdomain = f'_imaps._tcp.{domain}.' + imapdomain = f'_imap._tcp.{domain}.' + pop3sdomain = f'_pop3s._tcp.{domain}.' + pop3domain = f'_pop3._tcp.{domain}.' + osrv = [] + isrv = [] + try: + smtpanswer: dns.resolver.Answer = dns.resolver.resolve(smtpdomain, dns.rdatatype.SRV) + except dns.exception.DNSException: + return None + for srv in smtpanswer: + if srv.rdtype != dns.rdatatype.SRV or srv.rdclass != dns.rdataclass.IN: + continue + osrv.append((smtpanswer.canonical_name.labels[0].decode(), srv)) + for sd in [imapsdomain, imapdomain, pop3sdomain, pop3domain]: + try: + answer: dns.resolver.Answer = dns.resolver.resolve(sd, dns.rdatatype.SRV) + for srv in answer: + if srv.rdtype != dns.rdatatype.SRV or srv.rdclass != dns.rdataclass.IN: + continue + if srv.port == 0 and str(srv.target) == '.': + continue + isrv.append((answer.canonical_name.labels[0].decode(), srv)) + except dns.exception.DNSException: + continue + incoming = [] + outgoing = [] + for (iqname, irec) in isrv: + incoming.extend(parse_dns_srv_record(irec, iqname, address, password)) + for (oqname, orec) in isrv: + outgoing.extend(parse_dns_srv_record(orec, oqname, address, password)) + return incoming, outgoing + + +def _parse_confirmation_request(address, fingerprint, encrypted): + gpg = subprocess.Popen(['/usr/bin/gpg', '--decrypt'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gpg.stdin.write(encrypted) + gpg.stdin.close() + gpg.wait() + if gpg.returncode != 0: + raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') + decrypted = gpg.stdout.read().decode() + rdict = {} + for line in decrypted.splitlines(): + if ':' not in line: + continue + key, value = line.split(':', 1) + rdict[key.strip()] = value.strip() + if rdict.get('type', '') != 'confirmation-request': + raise ValueError('Invalid confirmation request: "type" missing or not "confirmation-request"') + if 'sender' not in rdict or 'address' not in rdict or 'fingerprint' not in rdict or 'nonce' not in rdict: + raise ValueError('Invalid confirmation request: a mandatory item is missing from the request') + if rdict['address'] != address: + raise ValueError(f'Confirmation address "{rdict["address"]}" does not match my address "{address}"') + if rdict['fingerprint'].replace(' ', '') != fingerprint.replace(' ', ''): + raise ValueError( + f'Confirmation fingerprint "{rdict["fingerprint"]}" does not match my fingerprint "{fingerprint}"') + print(f'Nonce: {rdict["nonce"]}') + return rdict['sender'], rdict['nonce'] + + +def _create_submission_request(address: str, fingerprint: str, submission_address: str): + gpg = subprocess.Popen([ + '/usr/bin/gpg', '--locate-keys', '--with-colons', submission_address + ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gpg.wait() + if gpg.returncode != 0: + raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') + print('Retrieved submission key') + gpg = subprocess.Popen([ + '/usr/bin/gpg', '--armor', + '--export-options', 'export-minimal', + '--export', fingerprint + ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gpg.wait() + if gpg.returncode != 0: + raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') + print('Retrieved key to publish') + pubkey = gpg.stdout.read() + gpg = subprocess.Popen([ + '/usr/bin/gpg', '--armor', '--with-colons', + '--encrypt', '--trust-model', 'always', + '--local-user', address, + '--recipient', address, + '--recipient', submission_address + ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gpg.stdin.write(pubkey) + gpg.stdin.close() + try: + gpg.wait(timeout=2) + except subprocess.TimeoutExpired: + gpg.kill() + print(f'gpg subprocess timed out; stderr: {gpg.stderr.read()}') + raise RuntimeError(f'gpg subprocess timed out; stderr: {gpg.stderr.read()}') + if gpg.returncode != 0: + raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') + print('Created encrypted message') + encrypted = gpg.stdout.read().decode() + mail = MIMEText(encrypted, _subtype='plain') + mail['Subject'] = 'WKS submission request' + mail['To'] = submission_address + mail['From'] = address + mail['Date'] = format_datetime(datetime.utcnow()) + return mail + + +def _create_confirmation_response(address: str, submission: str, nonce: str, fp: str, content_subtype: str): + response_template = '\r\n'.join([ + 'type: confirmation-response', + f'sender: {address}', + f'nonce: {nonce}', + '' + ]) + payload = MIMEText(response_template, _subtype='plain') + gpg = subprocess.Popen([ + '/usr/bin/gpg', '--armor', '--with-colons', + '--encrypt', '--sign', '--trust-model', 'always', + '--local-user', fp, + '--recipient', address, + '--recipient', submission + ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gpg.stdin.write(payload.as_string(policy=email.policy.default).encode()) + gpg.stdin.close() + gpg.wait() + if gpg.returncode != 0: + raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') + encrypted = gpg.stdout.read().decode() + mail = MIMEApplication(encrypted, _subtype=content_subtype) + mail['Subject'] = 'WKS confirmation response' + mail['To'] = submission + mail['From'] = address + mail['Date'] = format_datetime(datetime.utcnow()) + return mail + + +def handle_incoming_message(address, fingerprint, rfc822, smtp_config: SmtpServerConfig): + msg: email.message.EmailMessage = BytesParser(policy=email.policy.default).parsebytes(rfc822) + if msg.get('wks-phase', '') == 'done': + pgp = _get_pgp_message(msg) + if pgp is None: + print('WKS key submission successful. Congratulations!') + return True + print('Decrypting WKS response. GnuPG may prompt you for your passphrase.') + gpg = subprocess.Popen(['/usr/bin/gpg', '--batch', '--decrypt', '--skip-verify'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + gpg.stdin.write(pgp) + gpg.stdin.close() + gpg.wait() + if gpg.returncode != 0: + print('It seems WKS submission was successful, however decryption of the submission response failed: ' + + gpg.stderr.read().decode()) + return True + decrypted = email.parser.BytesParser(policy=email.policy.default).parsebytes(gpg.stdout.read()) + body = decrypted.get_body(preferencelist=('plain',)) + if body is None: + print('WKS key submission successful. Congratulations!') + else: + print(body.get_content()) + return True + if msg.get('wks-phase', '') == 'error': + body = msg.get_body(preferencelist=('plain',)) + if body is None: + print('WKS key submission failed. However, the WKS server did not return an error description.') + else: + print(body.get_content()) + return True + for leaf in msg.walk(): + if leaf.get_content_type() not in ['application/vnd.gnupg.wkd', 'application/vnd.gnupg.wks']: + continue + print('Received confirmation request') + try: + submission, nonce = _parse_confirmation_request(address, fingerprint, leaf.get_content()) + except BaseException as e: + print(f'Parsing failed: {e}') + continue + print('Creating confirmation response. GnuPG may prompt you for your passphrase.') + response = _create_confirmation_response(address, submission, nonce, fingerprint, leaf.get_content_subtype()) + print('Sending confirmation response') + with smtp_config: + smtp_config.send_message(response) + print('Awaiting publish response') + return False + + +def _gpg_get_uid_fp(address: str): + gpg = subprocess.Popen([ + '/usr/bin/gpg', '--with-colons', '--list-keys', address + ], stdout=subprocess.PIPE) + gpg.wait() + if gpg.returncode != 0: + raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}') + keylist = gpg.stdout.read().decode() + pubs = [] + fprs = [] + for line in keylist.splitlines(): + if line.startswith('pub:'): + pub = line.split(':')[4] + pubs.append(pub) + elif line.startswith('fpr:'): + fpr = line.split(':')[9] + fprs.append(fpr) + if len(pubs) == 0: + raise ValueError(f'No key found for {address}.') + elif len(pubs) > 1: + print(f'Found multiple keys for {address}, please choose:') + for i, pub in enumerate(pubs, start=1): + print(f'{i}: {pub}') + i = int(input('Enter number: ')) - 1 + else: + i = 0 + pub = pubs[i] + fpr = next(filter(lambda x: x.endswith(pub), fprs)) + return fpr + + +def _get_submission_address(address: str): + _, domain = address.split('@', 1) + advanced_url = f'https://openpgpkey.{domain}/.well-known/openpgpkey/{domain}/submission-address' + direct_url = f'https://{domain}/.well-known/openpgpkey/submission-address' + try: + with urllib.request.urlopen(advanced_url) as response: + return response.read().decode().strip() + except urllib.error.URLError: + pass + with urllib.request.urlopen(direct_url) as response: + return response.read().decode().strip() + + +def main(): + ad = input('Enter email: ') + sa = None + try: + sa = _get_submission_address(ad) + except urllib.error.URLError: + print('No WKS submission address found. Does your provider support WKS?') + exit(1) + fp = _gpg_get_uid_fp(ad) + print(f'Chose {fp}') + pw = getpass('Enter IMAP/POP3/SMTP password (will not echo): ') + for fn in [tb_wellknown_autoconfig, rfc6186_autoconfig, tb_ispdb_autoconfig, manual_config]: + autoconf = fn(ad, pw) + if autoconf is not None: + break + if autoconf is None: + raise RuntimeError('No autoconfig available') + incoming, outgoing = autoconf + # Find the first working server configurations + incoming_server = None + outgoing_server = None + for i in sorted(incoming): + try: + with i: + incoming_server = i + except: + continue + if incoming_server is None: + raise RuntimeError('No working IMAP/POP3 server found through autoconfiguration. Please specify manually.') + print(f'Autoconfigured incoming server: {incoming_server}') + for i in sorted(outgoing): + try: + with i: + outgoing_server = i + except: + continue + if outgoing_server is None: + raise RuntimeError('No working SMTP server found through autoconfiguration. Please specify manually.') + print(f'Autoconfigured outgoing server: {outgoing_server}') + with incoming_server: + now = datetime.utcnow() + done = False + request = _create_submission_request(ad, fp, sa) + print('Sending submission request') + with outgoing_server: + outgoing_server.send_message(request) + print('Awaiting response') + while not done and (datetime.utcnow() - now).total_seconds() < 300: + time.sleep(5) + for message in incoming_server.get_new_messages(): + done = handle_incoming_message(ad, fp, message, outgoing_server) + if done: + break + + +if __name__ == '__main__': + main()