Add a small WKS client implementation
This commit is contained in:
parent
f081578c50
commit
b1f7d993c9
1 changed files with 623 additions and 0 deletions
623
client.py
Executable file
623
client.py
Executable file
|
@ -0,0 +1,623 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import time
|
||||||
|
from getpass import getpass
|
||||||
|
from datetime import datetime
|
||||||
|
import imaplib
|
||||||
|
import poplib
|
||||||
|
import smtplib
|
||||||
|
import subprocess
|
||||||
|
import urllib.error
|
||||||
|
import urllib.parse
|
||||||
|
import urllib.request
|
||||||
|
import email
|
||||||
|
from email.mime.application import MIMEApplication
|
||||||
|
from email.mime.text import MIMEText
|
||||||
|
from email.parser import BytesParser
|
||||||
|
from email.utils import format_datetime
|
||||||
|
from xml.etree import ElementTree
|
||||||
|
|
||||||
|
|
||||||
|
def _get_pgp_message(message: email.message.Message) -> bytes:
|
||||||
|
pgp = None
|
||||||
|
for part in message.walk():
|
||||||
|
if part.is_multipart():
|
||||||
|
continue
|
||||||
|
p = part.get_content()
|
||||||
|
if isinstance(p, str):
|
||||||
|
p = p.encode()
|
||||||
|
if b'BEGIN PGP MESSAGE' in p:
|
||||||
|
if pgp is not None:
|
||||||
|
raise ValueError('More than one encrypted message part')
|
||||||
|
pgp = p
|
||||||
|
if pgp is None:
|
||||||
|
raise ValueError('No encrypted message part')
|
||||||
|
return pgp
|
||||||
|
|
||||||
|
|
||||||
|
class MailServerConfig(abc.ABC):
|
||||||
|
|
||||||
|
def __init__(self, proto, hostname, port, tls, username, password):
|
||||||
|
self.proto = proto
|
||||||
|
self.hostname = hostname
|
||||||
|
self.port = port
|
||||||
|
self.tls = tls
|
||||||
|
self.username = username
|
||||||
|
self.password = password
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
proto = self.proto + ('s' if self.tls == 'SSL' else '') + ('+starttls' if self.tls == 'STARTTLS' else '')
|
||||||
|
return f'{proto}://{self.username}@{self.hostname}:{self.port}'
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
if not isinstance(other, MailServerConfig):
|
||||||
|
return False
|
||||||
|
return self.proto == other.proto \
|
||||||
|
and self.hostname == other.hostname \
|
||||||
|
and self.port == other.port \
|
||||||
|
and self.tls == other.tls \
|
||||||
|
and self.username == other.username \
|
||||||
|
and self.password == other.password
|
||||||
|
|
||||||
|
def __lt__(self, other):
|
||||||
|
if not isinstance(other, MailServerConfig):
|
||||||
|
raise TypeError(f'Cannot compare {type(self)} to {type(other)}')
|
||||||
|
if self == other:
|
||||||
|
return False
|
||||||
|
# SMTP not comparable to IMAP or POP3
|
||||||
|
if (self.proto == 'smtp' or other.proto == 'smtp') and self.proto != other.proto:
|
||||||
|
raise TypeError(f'Cannot compare {type(self)} to {type(other)}')
|
||||||
|
# IMAP < POP3
|
||||||
|
if self.proto == 'imap' and other.proto == 'pop3':
|
||||||
|
return True
|
||||||
|
# SSL < STARTTLS < plain
|
||||||
|
if (self.tls == 'SSL' and other.tls != 'SSL') or (self.tls == 'STARTTLS' and other.tls == 'plain'):
|
||||||
|
return True
|
||||||
|
# full email < local part
|
||||||
|
if '@' in self.username and '@' not in other.username:
|
||||||
|
return True
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class SmtpServerConfig(MailServerConfig):
|
||||||
|
|
||||||
|
def __init__(self, hostname, port, tls, username, password):
|
||||||
|
super().__init__('smtp', hostname, port, tls, username, password)
|
||||||
|
self._smtp = None
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
cls = smtplib.SMTP if self.tls != 'SSL' else smtplib.SMTP_SSL
|
||||||
|
self._smtp = cls(self.hostname, self.port)
|
||||||
|
smtp = self._smtp.__enter__()
|
||||||
|
if self.tls == 'STARTTLS':
|
||||||
|
smtp.starttls()
|
||||||
|
smtp.login(self.username, self.password)
|
||||||
|
return smtp
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
ret = self._smtp.__exit__(exc_type, exc_val, exc_tb)
|
||||||
|
self._smtp = None
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def send_message(self, msg):
|
||||||
|
if self._smtp is None:
|
||||||
|
raise RuntimeError('SMTP connection is not established')
|
||||||
|
self._smtp.send_message(msg)
|
||||||
|
|
||||||
|
|
||||||
|
class IncomingServerConfig(MailServerConfig, abc.ABC):
|
||||||
|
|
||||||
|
def __init__(self, proto, hostname, port, tls, username, password):
|
||||||
|
super().__init__(proto, hostname, port, tls, username, password)
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def get_new_messages(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ImapServerConfig(IncomingServerConfig):
|
||||||
|
|
||||||
|
def __init__(self, hostname, port, tls, username, password):
|
||||||
|
super().__init__('imap', hostname, port, tls, username, password)
|
||||||
|
self._imap = None
|
||||||
|
self._uidnext = None
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
cls = imaplib.IMAP4 if self.tls != 'SSL' else imaplib.IMAP4_SSL
|
||||||
|
self._imap = cls(self.hostname, self.port)
|
||||||
|
imap = self._imap.__enter__()
|
||||||
|
if self.tls == 'STARTTLS':
|
||||||
|
imap.starttls()
|
||||||
|
imap.login(self.username, self.password)
|
||||||
|
imap.select('INBOX', readonly=True)
|
||||||
|
self._uidnext = int(imap.response('UIDNEXT')[1][0].decode())
|
||||||
|
return imap
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
ret = self._imap.__exit__(exc_type, exc_val, exc_tb)
|
||||||
|
self._imap = None
|
||||||
|
self._uidnext = None
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def get_new_messages(self):
|
||||||
|
if self._imap is None or self._uidnext is None:
|
||||||
|
raise RuntimeError('IMAP connection is not established')
|
||||||
|
self._imap.select('INBOX', readonly=True)
|
||||||
|
u = int(self._imap.response('UIDNEXT')[1][0].decode())
|
||||||
|
if u > self._uidnext:
|
||||||
|
messages = self._imap.uid('fetch', f'{self._uidnext}:{u - 1}', '(RFC822)')
|
||||||
|
for message in messages[1]:
|
||||||
|
if not isinstance(message, tuple) or b'RFC822' not in message[0]:
|
||||||
|
# Not an email message
|
||||||
|
continue
|
||||||
|
yield message[1]
|
||||||
|
self._uidnext = u
|
||||||
|
|
||||||
|
|
||||||
|
class Pop3ServerConfig(IncomingServerConfig):
|
||||||
|
|
||||||
|
def __init__(self, hostname, port, tls, username, password):
|
||||||
|
super().__init__('pop3', hostname, port, tls, username, password)
|
||||||
|
self._pop3: poplib.POP3 = None
|
||||||
|
self._messagelist = None
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
cls = poplib.POP3 if self.tls != 'SSL' else poplib.POP3_SSL
|
||||||
|
self._pop3 = cls(self.hostname, self.port)
|
||||||
|
if self.tls == 'STARTTLS':
|
||||||
|
self._pop3.stls()
|
||||||
|
self._pop3.user(self.username)
|
||||||
|
self._pop3.pass_(self.password)
|
||||||
|
self._messagelist = set(self._pop3.list()[1])
|
||||||
|
return self._pop3
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
self._messagelist = None
|
||||||
|
self._pop3.close()
|
||||||
|
self._pop3 = None
|
||||||
|
|
||||||
|
def get_new_messages(self):
|
||||||
|
if self._pop3 is None or self._messagelist is None:
|
||||||
|
raise RuntimeError('POP3 connection is not established')
|
||||||
|
ml = set(self._pop3.list()[1])
|
||||||
|
for msg in ml.difference(self._messagelist):
|
||||||
|
msgid, _ = msg.split(' ', 1)
|
||||||
|
message = '\r\n'.join(self._pop3.retr(msgid)[1])
|
||||||
|
yield message[1]
|
||||||
|
self._messagelist = ml
|
||||||
|
|
||||||
|
|
||||||
|
def default_port(proto: str, tls: str):
|
||||||
|
if proto == 'imap':
|
||||||
|
return 993 if tls == 'SSL' else 143
|
||||||
|
if proto == 'pop3':
|
||||||
|
return 995 if tls == 'SSL' else 110
|
||||||
|
if proto == 'smtp':
|
||||||
|
return 465 if tls == 'SSL' else 587
|
||||||
|
raise ValueError(f'Unknown protocol: {proto}')
|
||||||
|
|
||||||
|
|
||||||
|
def template_username(template: str, address: str, userinputs):
|
||||||
|
user, domain = address.split('@', 1)
|
||||||
|
template = template \
|
||||||
|
.replace('%EMAILADDRESS%', address) \
|
||||||
|
.replace('%EMAILLOCALPART%', user) \
|
||||||
|
.replace('%EMAILDOMAIN%', domain)
|
||||||
|
for key in userinputs:
|
||||||
|
if key not in template:
|
||||||
|
continue
|
||||||
|
label, value = userinputs[key]
|
||||||
|
if value is None:
|
||||||
|
value = getpass(f'Autoconfiguration field "{label}" (will not echo): ')
|
||||||
|
userinputs[key] = label, value
|
||||||
|
template = template.replace(key, value)
|
||||||
|
return template
|
||||||
|
|
||||||
|
|
||||||
|
def parse_xml_mailserver(xmle, address: str, password: str, userinputs):
|
||||||
|
proto = xmle.get('type')
|
||||||
|
hostname = xmle.find('./hostname').text
|
||||||
|
tls = xmle.find('./socketType').text
|
||||||
|
port = int(xmle.find('./port').text or default_port(proto, tls))
|
||||||
|
username = template_username(xmle.find('./username').text, address, userinputs)
|
||||||
|
pw = xmle.find('./password')
|
||||||
|
if pw:
|
||||||
|
pw = template_username(pw.text, address, userinputs)
|
||||||
|
else:
|
||||||
|
pw = password
|
||||||
|
if proto == 'smtp':
|
||||||
|
return SmtpServerConfig(hostname, port, tls, username, pw)
|
||||||
|
elif proto == 'imap':
|
||||||
|
return ImapServerConfig(hostname, port, tls, username, pw)
|
||||||
|
elif proto == 'pop3':
|
||||||
|
return Pop3ServerConfig(hostname, port, tls, username, pw)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_thunderbird_autoconfig(xml: str, address: str, password: str):
|
||||||
|
user, domain = address.split('@', 1)
|
||||||
|
root = ElementTree.fromstring(xml)
|
||||||
|
userinputs = {}
|
||||||
|
for ui in root.findall('.//userinput'):
|
||||||
|
k = ui.get('key')
|
||||||
|
label = ui.get('label')
|
||||||
|
userinputs[k] = (label, None)
|
||||||
|
incoming = root.findall(f"./emailProvider/domain[.='{domain}']/../incomingServer")
|
||||||
|
outgoing = root.findall(f"./emailProvider/domain[.='{domain}']/../outgoingServer")
|
||||||
|
iconf = [parse_xml_mailserver(i, address, password, userinputs) for i in incoming]
|
||||||
|
oconf = [parse_xml_mailserver(o, address, password, userinputs) for o in outgoing]
|
||||||
|
return iconf, oconf
|
||||||
|
|
||||||
|
|
||||||
|
def tb_wellknown_autoconfig(address: str, password: str):
|
||||||
|
user, domain = address.split('@', 1)
|
||||||
|
subdomain = f'autoconfig.{domain}'
|
||||||
|
sdurl = urllib.parse.urlunsplit(('http', subdomain, 'mail/config-v1.1.xml', f'emailaddress={address}', ''))
|
||||||
|
mdurl = urllib.parse.urlunsplit(('http', domain, '.well-known/autoconfig/mail/config-v1.1.xml',
|
||||||
|
f'emailaddress={address}', ''))
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(sdurl) as sdresponse:
|
||||||
|
return parse_thunderbird_autoconfig(sdresponse.read().decode(), address, password)
|
||||||
|
except urllib.error.URLError:
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(mdurl) as mdresponse:
|
||||||
|
return parse_thunderbird_autoconfig(mdresponse.read().decode(), address, password)
|
||||||
|
except urllib.error.URLError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def tb_ispdb_autoconfig(address: str, password: str):
|
||||||
|
user, domain = address.split('@', 1)
|
||||||
|
ispdb = f'https://autoconfig.thunderbird.net/v1.1/{domain}'
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(ispdb) as response:
|
||||||
|
return parse_thunderbird_autoconfig(response.read().decode(), address, password)
|
||||||
|
except urllib.error.URLError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def manual_config(address: str, password: str):
|
||||||
|
print('Autoconfiguration has failed. Please enter the mail server settings manually.')
|
||||||
|
host = input('SMTP hostname: ')
|
||||||
|
tls = input('TLS ("plain", "SSL" or "STARTLS (default: STARTTLS): ') or 'STARTTLS'
|
||||||
|
p = default_port('smtp', tls)
|
||||||
|
port = int(input(f'SMTP port (default: {p}): ') or str(p))
|
||||||
|
outconfig = SmtpServerConfig(host, port, tls, address, password)
|
||||||
|
proto = input('Mail Retrieval protocol ("imap" or "pop3", default: "imap"): ') or 'imap'
|
||||||
|
host = input(f'{proto.upper()} hostname: ')
|
||||||
|
tls = input('TLS ("plain", "SSL" or "STARTLS (default: SSL): ') or 'SSL'
|
||||||
|
p = default_port(proto, tls)
|
||||||
|
port = int(input(f'{proto.upper()} port (default: {p}): ') or str(p))
|
||||||
|
if proto == 'imap':
|
||||||
|
inconfig = ImapServerConfig(host, port, tls, address, password)
|
||||||
|
else:
|
||||||
|
inconfig = Pop3ServerConfig(host, port, tls, address, password)
|
||||||
|
return [inconfig], [outconfig]
|
||||||
|
|
||||||
|
|
||||||
|
def parse_dns_srv_record(record, service: str, address: str, password: str):
|
||||||
|
user, domain = address.split('@', 1)
|
||||||
|
if service == '_submission':
|
||||||
|
cls = SmtpServerConfig
|
||||||
|
elif service == '_imaps' or service == '_imap':
|
||||||
|
cls = ImapServerConfig
|
||||||
|
elif service == '_pop3s' or service == '_pop3':
|
||||||
|
cls = Pop3ServerConfig
|
||||||
|
else:
|
||||||
|
raise ValueError(service)
|
||||||
|
if service.endswith('s'):
|
||||||
|
tls = ['SSL']
|
||||||
|
else:
|
||||||
|
tls = ['STARTTLS', 'plain']
|
||||||
|
configs = []
|
||||||
|
for t in tls:
|
||||||
|
configs.append(cls(record.target, record.port, t, address, password))
|
||||||
|
configs.append(cls(record.target, record.port, t, user, password))
|
||||||
|
return configs
|
||||||
|
|
||||||
|
|
||||||
|
def rfc6186_autoconfig(address: str, password: str):
|
||||||
|
try:
|
||||||
|
import dns.resolver
|
||||||
|
import dns.rdtypes.IN.SRV
|
||||||
|
except ImportError:
|
||||||
|
print('"dnspython" dependency missing. Skipping RFC 6186 autoconfig')
|
||||||
|
return None
|
||||||
|
user, domain = address.split('@', 1)
|
||||||
|
smtpdomain = f'_submission._tcp.{domain}.'
|
||||||
|
imapsdomain = f'_imaps._tcp.{domain}.'
|
||||||
|
imapdomain = f'_imap._tcp.{domain}.'
|
||||||
|
pop3sdomain = f'_pop3s._tcp.{domain}.'
|
||||||
|
pop3domain = f'_pop3._tcp.{domain}.'
|
||||||
|
osrv = []
|
||||||
|
isrv = []
|
||||||
|
try:
|
||||||
|
smtpanswer: dns.resolver.Answer = dns.resolver.resolve(smtpdomain, dns.rdatatype.SRV)
|
||||||
|
except dns.exception.DNSException:
|
||||||
|
return None
|
||||||
|
for srv in smtpanswer:
|
||||||
|
if srv.rdtype != dns.rdatatype.SRV or srv.rdclass != dns.rdataclass.IN:
|
||||||
|
continue
|
||||||
|
osrv.append((smtpanswer.canonical_name.labels[0].decode(), srv))
|
||||||
|
for sd in [imapsdomain, imapdomain, pop3sdomain, pop3domain]:
|
||||||
|
try:
|
||||||
|
answer: dns.resolver.Answer = dns.resolver.resolve(sd, dns.rdatatype.SRV)
|
||||||
|
for srv in answer:
|
||||||
|
if srv.rdtype != dns.rdatatype.SRV or srv.rdclass != dns.rdataclass.IN:
|
||||||
|
continue
|
||||||
|
if srv.port == 0 and str(srv.target) == '.':
|
||||||
|
continue
|
||||||
|
isrv.append((answer.canonical_name.labels[0].decode(), srv))
|
||||||
|
except dns.exception.DNSException:
|
||||||
|
continue
|
||||||
|
incoming = []
|
||||||
|
outgoing = []
|
||||||
|
for (iqname, irec) in isrv:
|
||||||
|
incoming.extend(parse_dns_srv_record(irec, iqname, address, password))
|
||||||
|
for (oqname, orec) in isrv:
|
||||||
|
outgoing.extend(parse_dns_srv_record(orec, oqname, address, password))
|
||||||
|
return incoming, outgoing
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_confirmation_request(address, fingerprint, encrypted):
|
||||||
|
gpg = subprocess.Popen(['/usr/bin/gpg', '--decrypt'],
|
||||||
|
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
gpg.stdin.write(encrypted)
|
||||||
|
gpg.stdin.close()
|
||||||
|
gpg.wait()
|
||||||
|
if gpg.returncode != 0:
|
||||||
|
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
|
||||||
|
decrypted = gpg.stdout.read().decode()
|
||||||
|
rdict = {}
|
||||||
|
for line in decrypted.splitlines():
|
||||||
|
if ':' not in line:
|
||||||
|
continue
|
||||||
|
key, value = line.split(':', 1)
|
||||||
|
rdict[key.strip()] = value.strip()
|
||||||
|
if rdict.get('type', '') != 'confirmation-request':
|
||||||
|
raise ValueError('Invalid confirmation request: "type" missing or not "confirmation-request"')
|
||||||
|
if 'sender' not in rdict or 'address' not in rdict or 'fingerprint' not in rdict or 'nonce' not in rdict:
|
||||||
|
raise ValueError('Invalid confirmation request: a mandatory item is missing from the request')
|
||||||
|
if rdict['address'] != address:
|
||||||
|
raise ValueError(f'Confirmation address "{rdict["address"]}" does not match my address "{address}"')
|
||||||
|
if rdict['fingerprint'].replace(' ', '') != fingerprint.replace(' ', ''):
|
||||||
|
raise ValueError(
|
||||||
|
f'Confirmation fingerprint "{rdict["fingerprint"]}" does not match my fingerprint "{fingerprint}"')
|
||||||
|
print(f'Nonce: {rdict["nonce"]}')
|
||||||
|
return rdict['sender'], rdict['nonce']
|
||||||
|
|
||||||
|
|
||||||
|
def _create_submission_request(address: str, fingerprint: str, submission_address: str):
|
||||||
|
gpg = subprocess.Popen([
|
||||||
|
'/usr/bin/gpg', '--locate-keys', '--with-colons', submission_address
|
||||||
|
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
gpg.wait()
|
||||||
|
if gpg.returncode != 0:
|
||||||
|
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
|
||||||
|
print('Retrieved submission key')
|
||||||
|
gpg = subprocess.Popen([
|
||||||
|
'/usr/bin/gpg', '--armor',
|
||||||
|
'--export-options', 'export-minimal',
|
||||||
|
'--export', fingerprint
|
||||||
|
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
gpg.wait()
|
||||||
|
if gpg.returncode != 0:
|
||||||
|
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
|
||||||
|
print('Retrieved key to publish')
|
||||||
|
pubkey = gpg.stdout.read()
|
||||||
|
gpg = subprocess.Popen([
|
||||||
|
'/usr/bin/gpg', '--armor', '--with-colons',
|
||||||
|
'--encrypt', '--trust-model', 'always',
|
||||||
|
'--local-user', address,
|
||||||
|
'--recipient', address,
|
||||||
|
'--recipient', submission_address
|
||||||
|
], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
gpg.stdin.write(pubkey)
|
||||||
|
gpg.stdin.close()
|
||||||
|
try:
|
||||||
|
gpg.wait(timeout=2)
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
gpg.kill()
|
||||||
|
print(f'gpg subprocess timed out; stderr: {gpg.stderr.read()}')
|
||||||
|
raise RuntimeError(f'gpg subprocess timed out; stderr: {gpg.stderr.read()}')
|
||||||
|
if gpg.returncode != 0:
|
||||||
|
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
|
||||||
|
print('Created encrypted message')
|
||||||
|
encrypted = gpg.stdout.read().decode()
|
||||||
|
mail = MIMEText(encrypted, _subtype='plain')
|
||||||
|
mail['Subject'] = 'WKS submission request'
|
||||||
|
mail['To'] = submission_address
|
||||||
|
mail['From'] = address
|
||||||
|
mail['Date'] = format_datetime(datetime.utcnow())
|
||||||
|
return mail
|
||||||
|
|
||||||
|
|
||||||
|
def _create_confirmation_response(address: str, submission: str, nonce: str, fp: str, content_subtype: str):
|
||||||
|
response_template = '\r\n'.join([
|
||||||
|
'type: confirmation-response',
|
||||||
|
f'sender: {address}',
|
||||||
|
f'nonce: {nonce}',
|
||||||
|
''
|
||||||
|
])
|
||||||
|
payload = MIMEText(response_template, _subtype='plain')
|
||||||
|
gpg = subprocess.Popen([
|
||||||
|
'/usr/bin/gpg', '--armor', '--with-colons',
|
||||||
|
'--encrypt', '--sign', '--trust-model', 'always',
|
||||||
|
'--local-user', fp,
|
||||||
|
'--recipient', address,
|
||||||
|
'--recipient', submission
|
||||||
|
], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
gpg.stdin.write(payload.as_string(policy=email.policy.default).encode())
|
||||||
|
gpg.stdin.close()
|
||||||
|
gpg.wait()
|
||||||
|
if gpg.returncode != 0:
|
||||||
|
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
|
||||||
|
encrypted = gpg.stdout.read().decode()
|
||||||
|
mail = MIMEApplication(encrypted, _subtype=content_subtype)
|
||||||
|
mail['Subject'] = 'WKS confirmation response'
|
||||||
|
mail['To'] = submission
|
||||||
|
mail['From'] = address
|
||||||
|
mail['Date'] = format_datetime(datetime.utcnow())
|
||||||
|
return mail
|
||||||
|
|
||||||
|
|
||||||
|
def handle_incoming_message(address, fingerprint, rfc822, smtp_config: SmtpServerConfig):
|
||||||
|
msg: email.message.EmailMessage = BytesParser(policy=email.policy.default).parsebytes(rfc822)
|
||||||
|
if msg.get('wks-phase', '') == 'done':
|
||||||
|
pgp = _get_pgp_message(msg)
|
||||||
|
if pgp is None:
|
||||||
|
print('WKS key submission successful. Congratulations!')
|
||||||
|
return True
|
||||||
|
print('Decrypting WKS response. GnuPG may prompt you for your passphrase.')
|
||||||
|
gpg = subprocess.Popen(['/usr/bin/gpg', '--batch', '--decrypt', '--skip-verify'],
|
||||||
|
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
gpg.stdin.write(pgp)
|
||||||
|
gpg.stdin.close()
|
||||||
|
gpg.wait()
|
||||||
|
if gpg.returncode != 0:
|
||||||
|
print('It seems WKS submission was successful, however decryption of the submission response failed: ' +
|
||||||
|
gpg.stderr.read().decode())
|
||||||
|
return True
|
||||||
|
decrypted = email.parser.BytesParser(policy=email.policy.default).parsebytes(gpg.stdout.read())
|
||||||
|
body = decrypted.get_body(preferencelist=('plain',))
|
||||||
|
if body is None:
|
||||||
|
print('WKS key submission successful. Congratulations!')
|
||||||
|
else:
|
||||||
|
print(body.get_content())
|
||||||
|
return True
|
||||||
|
if msg.get('wks-phase', '') == 'error':
|
||||||
|
body = msg.get_body(preferencelist=('plain',))
|
||||||
|
if body is None:
|
||||||
|
print('WKS key submission failed. However, the WKS server did not return an error description.')
|
||||||
|
else:
|
||||||
|
print(body.get_content())
|
||||||
|
return True
|
||||||
|
for leaf in msg.walk():
|
||||||
|
if leaf.get_content_type() not in ['application/vnd.gnupg.wkd', 'application/vnd.gnupg.wks']:
|
||||||
|
continue
|
||||||
|
print('Received confirmation request')
|
||||||
|
try:
|
||||||
|
submission, nonce = _parse_confirmation_request(address, fingerprint, leaf.get_content())
|
||||||
|
except BaseException as e:
|
||||||
|
print(f'Parsing failed: {e}')
|
||||||
|
continue
|
||||||
|
print('Creating confirmation response. GnuPG may prompt you for your passphrase.')
|
||||||
|
response = _create_confirmation_response(address, submission, nonce, fingerprint, leaf.get_content_subtype())
|
||||||
|
print('Sending confirmation response')
|
||||||
|
with smtp_config:
|
||||||
|
smtp_config.send_message(response)
|
||||||
|
print('Awaiting publish response')
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def _gpg_get_uid_fp(address: str):
|
||||||
|
gpg = subprocess.Popen([
|
||||||
|
'/usr/bin/gpg', '--with-colons', '--list-keys', address
|
||||||
|
], stdout=subprocess.PIPE)
|
||||||
|
gpg.wait()
|
||||||
|
if gpg.returncode != 0:
|
||||||
|
raise RuntimeError(f'gpg subprocess returned with non-zero exit code; stderr: {gpg.stderr.read()}')
|
||||||
|
keylist = gpg.stdout.read().decode()
|
||||||
|
pubs = []
|
||||||
|
fprs = []
|
||||||
|
for line in keylist.splitlines():
|
||||||
|
if line.startswith('pub:'):
|
||||||
|
pub = line.split(':')[4]
|
||||||
|
pubs.append(pub)
|
||||||
|
elif line.startswith('fpr:'):
|
||||||
|
fpr = line.split(':')[9]
|
||||||
|
fprs.append(fpr)
|
||||||
|
if len(pubs) == 0:
|
||||||
|
raise ValueError(f'No key found for {address}.')
|
||||||
|
elif len(pubs) > 1:
|
||||||
|
print(f'Found multiple keys for {address}, please choose:')
|
||||||
|
for i, pub in enumerate(pubs, start=1):
|
||||||
|
print(f'{i}: {pub}')
|
||||||
|
i = int(input('Enter number: ')) - 1
|
||||||
|
else:
|
||||||
|
i = 0
|
||||||
|
pub = pubs[i]
|
||||||
|
fpr = next(filter(lambda x: x.endswith(pub), fprs))
|
||||||
|
return fpr
|
||||||
|
|
||||||
|
|
||||||
|
def _get_submission_address(address: str):
|
||||||
|
_, domain = address.split('@', 1)
|
||||||
|
advanced_url = f'https://openpgpkey.{domain}/.well-known/openpgpkey/{domain}/submission-address'
|
||||||
|
direct_url = f'https://{domain}/.well-known/openpgpkey/submission-address'
|
||||||
|
try:
|
||||||
|
with urllib.request.urlopen(advanced_url) as response:
|
||||||
|
return response.read().decode().strip()
|
||||||
|
except urllib.error.URLError:
|
||||||
|
pass
|
||||||
|
with urllib.request.urlopen(direct_url) as response:
|
||||||
|
return response.read().decode().strip()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ad = input('Enter email: ')
|
||||||
|
sa = None
|
||||||
|
try:
|
||||||
|
sa = _get_submission_address(ad)
|
||||||
|
except urllib.error.URLError:
|
||||||
|
print('No WKS submission address found. Does your provider support WKS?')
|
||||||
|
exit(1)
|
||||||
|
fp = _gpg_get_uid_fp(ad)
|
||||||
|
print(f'Chose {fp}')
|
||||||
|
pw = getpass('Enter IMAP/POP3/SMTP password (will not echo): ')
|
||||||
|
for fn in [tb_wellknown_autoconfig, rfc6186_autoconfig, tb_ispdb_autoconfig, manual_config]:
|
||||||
|
autoconf = fn(ad, pw)
|
||||||
|
if autoconf is not None:
|
||||||
|
break
|
||||||
|
if autoconf is None:
|
||||||
|
raise RuntimeError('No autoconfig available')
|
||||||
|
incoming, outgoing = autoconf
|
||||||
|
# Find the first working server configurations
|
||||||
|
incoming_server = None
|
||||||
|
outgoing_server = None
|
||||||
|
for i in sorted(incoming):
|
||||||
|
try:
|
||||||
|
with i:
|
||||||
|
incoming_server = i
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
if incoming_server is None:
|
||||||
|
raise RuntimeError('No working IMAP/POP3 server found through autoconfiguration. Please specify manually.')
|
||||||
|
print(f'Autoconfigured incoming server: {incoming_server}')
|
||||||
|
for i in sorted(outgoing):
|
||||||
|
try:
|
||||||
|
with i:
|
||||||
|
outgoing_server = i
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
if outgoing_server is None:
|
||||||
|
raise RuntimeError('No working SMTP server found through autoconfiguration. Please specify manually.')
|
||||||
|
print(f'Autoconfigured outgoing server: {outgoing_server}')
|
||||||
|
with incoming_server:
|
||||||
|
now = datetime.utcnow()
|
||||||
|
done = False
|
||||||
|
request = _create_submission_request(ad, fp, sa)
|
||||||
|
print('Sending submission request')
|
||||||
|
with outgoing_server:
|
||||||
|
outgoing_server.send_message(request)
|
||||||
|
print('Awaiting response')
|
||||||
|
while not done and (datetime.utcnow() - now).total_seconds() < 300:
|
||||||
|
time.sleep(5)
|
||||||
|
for message in incoming_server.get_new_messages():
|
||||||
|
done = handle_incoming_message(ad, fp, message, outgoing_server)
|
||||||
|
if done:
|
||||||
|
break
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
Loading…
Reference in a new issue