easywks/client.py

647 lines
25 KiB
Python
Raw Permalink Normal View History

2021-10-20 01:23:57 +02:00
#!/usr/bin/env python3
import abc
import time
from getpass import getpass
from datetime import datetime, timezone
import time
2021-10-20 01:23:57 +02:00
import imaplib
import poplib
import smtplib
import subprocess
import urllib.error
import urllib.parse
import urllib.request
import email
from email.mime.application import MIMEApplication
from email.mime.text import MIMEText
from email.parser import BytesParser
from email.utils import format_datetime
from xml.etree import ElementTree
def _get_pgp_message(message: email.message.Message) -> bytes:
pgp = None
for part in message.walk():
if part.is_multipart():
continue
p = part.get_content()
if isinstance(p, str):
p = p.encode()
if b'BEGIN PGP MESSAGE' in p:
if pgp is not None:
raise ValueError('More than one encrypted message part')
pgp = p
if pgp is None:
raise ValueError('No encrypted message part')
return pgp
class MailServerConfig(abc.ABC):
def __init__(self, proto, hostname, port, tls, username, password):
self.proto = proto
self.hostname = hostname
self.port = port
self.tls = tls
self.username = username
self.password = password
def __repr__(self):
proto = self.proto + ('s' if self.tls == 'SSL' else '') + ('+starttls' if self.tls == 'STARTTLS' else '')
return f'{proto}://{self.username}@{self.hostname}:{self.port}'
def __eq__(self, other):
if not isinstance(other, MailServerConfig):
return False
return self.proto == other.proto \
and self.hostname == other.hostname \
and self.port == other.port \
and self.tls == other.tls \
and self.username == other.username \
and self.password == other.password
def __lt__(self, other):
if not isinstance(other, MailServerConfig):
raise TypeError(f'Cannot compare {type(self)} to {type(other)}')
if self == other:
return False
# SMTP not comparable to IMAP or POP3
if (self.proto == 'smtp' or other.proto == 'smtp') and self.proto != other.proto:
raise TypeError(f'Cannot compare {type(self)} to {type(other)}')
# IMAP < POP3
if self.proto == 'imap' and other.proto == 'pop3':
return True
# SSL < STARTTLS < plain
if (self.tls == 'SSL' and other.tls != 'SSL') or (self.tls == 'STARTTLS' and other.tls == 'plain'):
return True
# full email < local part
if '@' in self.username and '@' not in other.username:
return True
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class SmtpServerConfig(MailServerConfig):
def __init__(self, hostname, port, tls, username, password):
super().__init__('smtp', hostname, port, tls, username, password)
self._smtp = None
def __enter__(self):
cls = smtplib.SMTP if self.tls != 'SSL' else smtplib.SMTP_SSL
self._smtp = cls(self.hostname, self.port)
smtp = self._smtp.__enter__()
if self.tls == 'STARTTLS':
smtp.starttls()
smtp.login(self.username, self.password)
return smtp
def __exit__(self, exc_type, exc_val, exc_tb):
ret = self._smtp.__exit__(exc_type, exc_val, exc_tb)
self._smtp = None
return ret
def send_message(self, msg):
if self._smtp is None:
raise RuntimeError('SMTP connection is not established')
self._smtp.send_message(msg)
class IncomingServerConfig(MailServerConfig, abc.ABC):
def __init__(self, proto, hostname, port, tls, username, password):
super().__init__(proto, hostname, port, tls, username, password)
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def get_new_messages(self):
pass
class ImapServerConfig(IncomingServerConfig):
def __init__(self, hostname, port, tls, username, password):
super().__init__('imap', hostname, port, tls, username, password)
self._imap = None
self._uidnext = None
def __enter__(self):
cls = imaplib.IMAP4 if self.tls != 'SSL' else imaplib.IMAP4_SSL
self._imap = cls(self.hostname, self.port)
imap = self._imap.__enter__()
if self.tls == 'STARTTLS':
imap.starttls()
imap.login(self.username, self.password)
imap.select('INBOX', readonly=True)
self._uidnext = int(imap.response('UIDNEXT')[1][0].decode())
return imap
def __exit__(self, exc_type, exc_val, exc_tb):
ret = self._imap.__exit__(exc_type, exc_val, exc_tb)
self._imap = None
self._uidnext = None
return ret
def get_new_messages(self):
if self._imap is None or self._uidnext is None:
raise RuntimeError('IMAP connection is not established')
self._imap.select('INBOX', readonly=True)
u = int(self._imap.response('UIDNEXT')[1][0].decode())
if u > self._uidnext:
messages = self._imap.uid('fetch', f'{self._uidnext}:{u - 1}', '(RFC822)')
for message in messages[1]:
if not isinstance(message, tuple) or b'RFC822' not in message[0]:
# Not an email message
continue
yield message[1]
self._uidnext = u
class Pop3ServerConfig(IncomingServerConfig):
def __init__(self, hostname, port, tls, username, password):
super().__init__('pop3', hostname, port, tls, username, password)
self._pop3: poplib.POP3 = None
self._messagelist = None
def __enter__(self):
cls = poplib.POP3 if self.tls != 'SSL' else poplib.POP3_SSL
self._pop3 = cls(self.hostname, self.port)
if self.tls == 'STARTTLS':
self._pop3.stls()
self._pop3.user(self.username)
self._pop3.pass_(self.password)
self._messagelist = set(self._pop3.list()[1])
return self._pop3
def __exit__(self, exc_type, exc_val, exc_tb):
self._messagelist = None
self._pop3.close()
self._pop3 = None
def get_new_messages(self):
if self._pop3 is None or self._messagelist is None:
raise RuntimeError('POP3 connection is not established')
ml = set(self._pop3.list()[1])
for msg in ml.difference(self._messagelist):
msgid, _ = msg.split(' ', 1)
message = '\r\n'.join(self._pop3.retr(msgid)[1])
yield message[1]
self._messagelist = ml
def default_port(proto: str, tls: str):
if proto == 'imap':
return 993 if tls == 'SSL' else 143
if proto == 'pop3':
return 995 if tls == 'SSL' else 110
if proto == 'smtp':
return 465 if tls == 'SSL' else 587
raise ValueError(f'Unknown protocol: {proto}')
def template_username(template: str, address: str, userinputs):
user, domain = address.split('@', 1)
template = template \
.replace('%EMAILADDRESS%', address) \
.replace('%EMAILLOCALPART%', user) \
.replace('%EMAILDOMAIN%', domain)
for key in userinputs:
if key not in template:
continue
label, value = userinputs[key]
if value is None:
value = getpass(f'Autoconfiguration field "{label}" (will not echo): ')
userinputs[key] = label, value
template = template.replace(key, value)
return template
def parse_xml_mailserver(xmle, address: str, password: str, userinputs):
proto = xmle.get('type')
hostname = xmle.find('./hostname').text
tls = xmle.find('./socketType').text
port = int(xmle.find('./port').text or default_port(proto, tls))
username = template_username(xmle.find('./username').text, address, userinputs)
pw = xmle.find('./password')
if pw:
pw = template_username(pw.text, address, userinputs)
else:
pw = password
if proto == 'smtp':
return SmtpServerConfig(hostname, port, tls, username, pw)
elif proto == 'imap':
return ImapServerConfig(hostname, port, tls, username, pw)
elif proto == 'pop3':
return Pop3ServerConfig(hostname, port, tls, username, pw)
def parse_thunderbird_autoconfig(xml: str, address: str, password: str):
user, domain = address.split('@', 1)
root = ElementTree.fromstring(xml)
userinputs = {}
for ui in root.findall('.//userinput'):
k = ui.get('key')
label = ui.get('label')
userinputs[k] = (label, None)
incoming = root.findall(f"./emailProvider/domain[.='{domain}']/../incomingServer")
outgoing = root.findall(f"./emailProvider/domain[.='{domain}']/../outgoingServer")
iconf = [parse_xml_mailserver(i, address, password, userinputs) for i in incoming]
oconf = [parse_xml_mailserver(o, address, password, userinputs) for o in outgoing]
return iconf, oconf
def tb_wellknown_autoconfig(address: str, password: str):
user, domain = address.split('@', 1)
subdomain = f'autoconfig.{domain}'
sdurl = urllib.parse.urlunsplit(('http', subdomain, 'mail/config-v1.1.xml', f'emailaddress={address}', ''))
mdurl = urllib.parse.urlunsplit(('http', domain, '.well-known/autoconfig/mail/config-v1.1.xml',
f'emailaddress={address}', ''))
try:
with urllib.request.urlopen(sdurl) as sdresponse:
return parse_thunderbird_autoconfig(sdresponse.read().decode(), address, password)
except urllib.error.URLError:
try:
with urllib.request.urlopen(mdurl) as mdresponse:
return parse_thunderbird_autoconfig(mdresponse.read().decode(), address, password)
except urllib.error.URLError:
return None
def tb_ispdb_autoconfig(address: str, password: str):
user, domain = address.split('@', 1)
ispdb = f'https://autoconfig.thunderbird.net/v1.1/{domain}'
try:
with urllib.request.urlopen(ispdb) as response:
return parse_thunderbird_autoconfig(response.read().decode(), address, password)
except urllib.error.URLError:
return None
def manual_config(address: str, password: str):
print('Autoconfiguration has failed. Please enter the mail server settings manually.')
host = input('SMTP hostname: ')
tls = input('TLS ("plain", "SSL" or "STARTLS (default: STARTTLS): ') or 'STARTTLS'
p = default_port('smtp', tls)
port = int(input(f'SMTP port (default: {p}): ') or str(p))
outconfig = SmtpServerConfig(host, port, tls, address, password)
proto = input('Mail Retrieval protocol ("imap" or "pop3", default: "imap"): ') or 'imap'
host = input(f'{proto.upper()} hostname: ')
tls = input('TLS ("plain", "SSL" or "STARTLS (default: SSL): ') or 'SSL'
p = default_port(proto, tls)
port = int(input(f'{proto.upper()} port (default: {p}): ') or str(p))
if proto == 'imap':
inconfig = ImapServerConfig(host, port, tls, address, password)
else:
inconfig = Pop3ServerConfig(host, port, tls, address, password)
return [inconfig], [outconfig]
def parse_dns_srv_record(record, service: str, address: str, password: str):
user, domain = address.split('@', 1)
if service == '_submission':
cls = SmtpServerConfig
elif service == '_imaps' or service == '_imap':
cls = ImapServerConfig
elif service == '_pop3s' or service == '_pop3':
cls = Pop3ServerConfig
else:
raise ValueError(service)
if service.endswith('s'):
tls = ['SSL']
else:
tls = ['STARTTLS', 'plain']
configs = []
for t in tls:
configs.append(cls(record.target, record.port, t, address, password))
configs.append(cls(record.target, record.port, t, user, password))
return configs
def rfc6186_autoconfig(address: str, password: str):
try:
import dns.resolver
import dns.rdtypes.IN.SRV
except ImportError:
print('"dnspython" dependency missing. Skipping RFC 6186 autoconfig')
return None
user, domain = address.split('@', 1)
smtpdomain = f'_submission._tcp.{domain}.'
imapsdomain = f'_imaps._tcp.{domain}.'
imapdomain = f'_imap._tcp.{domain}.'
pop3sdomain = f'_pop3s._tcp.{domain}.'
pop3domain = f'_pop3._tcp.{domain}.'
osrv = []
isrv = []
try:
smtpanswer: dns.resolver.Answer = dns.resolver.resolve(smtpdomain, dns.rdatatype.SRV)
except dns.exception.DNSException:
return None
for srv in smtpanswer:
if srv.rdtype != dns.rdatatype.SRV or srv.rdclass != dns.rdataclass.IN:
continue
osrv.append((smtpanswer.canonical_name.labels[0].decode(), srv))
for sd in [imapsdomain, imapdomain, pop3sdomain, pop3domain]:
try:
answer: dns.resolver.Answer = dns.resolver.resolve(sd, dns.rdatatype.SRV)
for srv in answer:
if srv.rdtype != dns.rdatatype.SRV or srv.rdclass != dns.rdataclass.IN:
continue
if srv.port == 0 and str(srv.target) == '.':
continue
isrv.append((answer.canonical_name.labels[0].decode(), srv))
except dns.exception.DNSException:
continue
incoming = []
outgoing = []
for (iqname, irec) in isrv:
incoming.extend(parse_dns_srv_record(irec, iqname, address, password))
for (oqname, orec) in isrv:
outgoing.extend(parse_dns_srv_record(orec, oqname, address, password))
return incoming, outgoing
def _parse_confirmation_request(address, fingerprint, encrypted):
gpg = subprocess.Popen(['/usr/bin/gpg', '--decrypt'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
gpg.stdin.write(encrypted)
gpg.stdin.close()
gpg.wait()
if gpg.returncode != 0:
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
decrypted = gpg.stdout.read().decode()
rdict = {}
for line in decrypted.splitlines():
if ':' not in line:
continue
key, value = line.split(':', 1)
rdict[key.strip()] = value.strip()
if rdict.get('type', '') != 'confirmation-request':
raise ValueError('Invalid confirmation request: "type" missing or not "confirmation-request"')
if 'sender' not in rdict or 'address' not in rdict or 'fingerprint' not in rdict or 'nonce' not in rdict:
raise ValueError('Invalid confirmation request: a mandatory item is missing from the request')
if rdict['address'] != address:
raise ValueError(f'Confirmation address "{rdict["address"]}" does not match my address "{address}"')
if rdict['fingerprint'].replace(' ', '') != fingerprint.replace(' ', ''):
raise ValueError(
f'Confirmation fingerprint "{rdict["fingerprint"]}" does not match my fingerprint "{fingerprint}"')
print(f'Nonce: {rdict["nonce"]}')
return rdict['sender'], rdict['nonce']
def _create_submission_request(address: str, submission_address: str, fingerprint: str, revoked_fingerprints):
2021-10-20 01:23:57 +02:00
gpg = subprocess.Popen([
'/usr/bin/gpg', '--locate-keys', '--with-colons', submission_address
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
gpg.wait()
if gpg.returncode != 0:
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
print('Retrieved submission key')
gpg = subprocess.Popen([
'/usr/bin/gpg', '--armor',
'--export-options', 'export-minimal',
'--export', fingerprint
] + revoked_fingerprints, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
2021-10-20 01:23:57 +02:00
gpg.wait()
if gpg.returncode != 0:
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
print('Retrieved key to publish')
pubkey = gpg.stdout.read()
gpg = subprocess.Popen([
'/usr/bin/gpg', '--armor', '--with-colons',
'--encrypt', '--trust-model', 'always',
'--local-user', address,
'--recipient', address,
'--recipient', submission_address
], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
gpg.stdin.write(pubkey)
gpg.stdin.close()
try:
gpg.wait(timeout=2)
except subprocess.TimeoutExpired:
gpg.kill()
print(f'gpg subprocess timed out; stderr: {gpg.stderr.read()}')
raise RuntimeError(f'gpg subprocess timed out; stderr: {gpg.stderr.read()}')
if gpg.returncode != 0:
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
print('Created encrypted message')
encrypted = gpg.stdout.read().decode()
mail = MIMEText(encrypted, _subtype='plain')
mail['Subject'] = 'WKS submission request'
mail['To'] = submission_address
mail['From'] = address
mail['Date'] = format_datetime(datetime.now(timezone.utc))
2021-10-20 01:23:57 +02:00
return mail
def _create_confirmation_response(address: str, submission: str, nonce: str, fp: str, content_subtype: str):
response_template = '\r\n'.join([
'type: confirmation-response',
f'sender: {address}',
f'nonce: {nonce}',
''
])
payload = MIMEText(response_template, _subtype='plain')
gpg = subprocess.Popen([
'/usr/bin/gpg', '--armor', '--with-colons',
'--encrypt', '--sign', '--trust-model', 'always',
'--local-user', fp,
'--recipient', address,
'--recipient', submission
], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
gpg.stdin.write(payload.as_string(policy=email.policy.default).encode())
gpg.stdin.close()
gpg.wait()
if gpg.returncode != 0:
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
encrypted = gpg.stdout.read().decode()
mail = MIMEApplication(encrypted, _subtype=content_subtype)
mail['Subject'] = 'WKS confirmation response'
mail['To'] = submission
mail['From'] = address
mail['Date'] = format_datetime(datetime.now(timezone.utc))
2021-10-20 01:23:57 +02:00
return mail
def handle_incoming_message(address, fingerprint, rfc822, smtp_config: SmtpServerConfig):
msg: email.message.EmailMessage = BytesParser(policy=email.policy.default).parsebytes(rfc822)
if msg.get('wks-phase', '') == 'done':
pgp = _get_pgp_message(msg)
if pgp is None:
print('WKS key submission successful. Congratulations!')
return True
print('Decrypting WKS response. GnuPG may prompt you for your passphrase.')
gpg = subprocess.Popen(['/usr/bin/gpg', '--batch', '--decrypt', '--skip-verify'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
gpg.stdin.write(pgp)
gpg.stdin.close()
gpg.wait()
if gpg.returncode != 0:
print('It seems WKS submission was successful, however decryption of the submission response failed: ' +
gpg.stderr.read().decode())
return True
decrypted = email.parser.BytesParser(policy=email.policy.default).parsebytes(gpg.stdout.read())
body = decrypted.get_body(preferencelist=('plain',))
if body is None:
print('WKS key submission successful. Congratulations!')
else:
print(body.get_content())
return True
if msg.get('wks-phase', '') == 'error':
body = msg.get_body(preferencelist=('plain',))
if body is None:
print('WKS key submission failed. However, the WKS server did not return an error description.')
else:
print(body.get_content())
return True
for leaf in msg.walk():
if leaf.get_content_type() not in ['application/vnd.gnupg.wkd', 'application/vnd.gnupg.wks']:
continue
print('Received confirmation request')
try:
submission, nonce = _parse_confirmation_request(address, fingerprint, leaf.get_content())
except BaseException as e:
print(f'Parsing failed: {e}')
continue
print('Creating confirmation response. GnuPG may prompt you for your passphrase.')
response = _create_confirmation_response(address, submission, nonce, fingerprint, leaf.get_content_subtype())
print('Sending confirmation response')
with smtp_config:
smtp_config.send_message(response)
print('Awaiting publish response')
return False
def _gpg_get_uid_fp(address: str):
gpg = subprocess.Popen([
'/usr/bin/gpg', '--with-colons', '--list-keys', address
], stdout=subprocess.PIPE)
gpg.wait()
if gpg.returncode != 0:
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
keylist = gpg.stdout.read().decode()
pubs = []
revoked = []
2021-10-20 01:23:57 +02:00
fprs = []
for line in keylist.splitlines():
if line.startswith('pub:'):
pub = line.split(':')[4]
r = line.split(':')[1] == 'r'
2021-10-20 01:23:57 +02:00
pubs.append(pub)
revoked.append(r)
2021-10-20 01:23:57 +02:00
elif line.startswith('fpr:'):
fpr = line.split(':')[9]
fprs.append(fpr)
valid = {next((f for f in fprs if f.endswith(pub))): pub for i, pub in enumerate(pubs) if not revoked[i]}
revoked = {next((f for f in fprs if f.endswith(pub))): pub for i, pub in enumerate(pubs) if revoked[i]}
if len(valid) == 0:
raise ValueError(f'No valid key found for {address}.')
elif len(valid) > 1:
2021-10-20 01:23:57 +02:00
print(f'Found multiple keys for {address}, please choose:')
fpridx = list(valid.keys())
for i, f in enumerate(fpridx, start=1):
print(f'{i}: {f}')
2021-10-20 01:23:57 +02:00
i = int(input('Enter number: ')) - 1
fpr = fpridx[i]
else:
fpr = list(valid.keys())[0]
if len(revoked) > 0:
print(f'There are revoked keys for {address}. Please choose which to upload (separate multiple by spaces): ')
revidx = list(revoked.keys())
for i, f in enumerate(revidx, start=1):
print(f'{i}: {f}')
rids = [int(i)-1 for i in input('Enter number(s): ').split()]
rfprs = [revidx[i] for i in rids]
2021-10-20 01:23:57 +02:00
else:
rfprs = []
return fpr, rfprs
2021-10-20 01:23:57 +02:00
def _get_submission_address(address: str):
_, domain = address.split('@', 1)
advanced_url = f'https://openpgpkey.{domain}/.well-known/openpgpkey/{domain}/submission-address'
direct_url = f'https://{domain}/.well-known/openpgpkey/submission-address'
try:
with urllib.request.urlopen(advanced_url) as response:
return response.read().decode().strip()
except urllib.error.URLError:
pass
with urllib.request.urlopen(direct_url) as response:
return response.read().decode().strip()
def main():
ad = input('Enter email: ')
sa = None
try:
sa = _get_submission_address(ad)
except urllib.error.URLError:
print('No WKS submission address found. Does your provider support WKS?')
exit(1)
fp, rfprs = _gpg_get_uid_fp(ad)
2021-10-20 01:23:57 +02:00
print(f'Chose {fp}')
for rfpr in rfprs:
print(f'Chose revoked key {rfpr}')
2021-10-20 01:23:57 +02:00
pw = getpass('Enter IMAP/POP3/SMTP password (will not echo): ')
for fn in [tb_wellknown_autoconfig, rfc6186_autoconfig, tb_ispdb_autoconfig, manual_config]:
autoconf = fn(ad, pw)
if autoconf is not None:
break
if autoconf is None:
raise RuntimeError('No autoconfig available')
incoming, outgoing = autoconf
# Find the first working server configurations
incoming_server = None
outgoing_server = None
for i in sorted(incoming):
try:
with i:
incoming_server = i
break
2021-10-20 01:23:57 +02:00
except:
continue
if incoming_server is None:
raise RuntimeError('No working IMAP/POP3 server found through autoconfiguration. Please specify manually.')
print(f'Autoconfigured incoming server: {incoming_server}')
for i in sorted(outgoing):
try:
with i:
outgoing_server = i
break
2021-10-20 01:23:57 +02:00
except:
continue
if outgoing_server is None:
raise RuntimeError('No working SMTP server found through autoconfiguration. Please specify manually.')
print(f'Autoconfigured outgoing server: {outgoing_server}')
confirm = input('Please confirm: [Y/n] ')
if confirm.lower() not in ['', 'y', 'yes']:
print('Aborted')
exit(1)
2021-10-20 01:23:57 +02:00
with incoming_server:
now = time.monotonic()
2021-10-20 01:23:57 +02:00
done = False
request = _create_submission_request(ad, sa, fp, rfprs)
2021-10-20 01:23:57 +02:00
print('Sending submission request')
with outgoing_server:
outgoing_server.send_message(request)
print('Awaiting response')
while not done and time.monotonic() - now < 300:
2021-10-20 01:23:57 +02:00
time.sleep(5)
for message in incoming_server.get_new_messages():
done = handle_incoming_message(ad, fp, message, outgoing_server)
if done:
break
if __name__ == '__main__':
main()