Initial commit

This commit is contained in:
s3lph 2021-09-26 08:40:02 +02:00
commit ac660802a4
14 changed files with 995 additions and 0 deletions

8
.gitignore vendored Normal file
View file

@ -0,0 +1,8 @@
**/.idea/
*.iml
**/__pycache__/
*.pyc
**/*.egg-info/
*.coverage
**/.mypy_cache/

16
LICENSE Normal file
View file

@ -0,0 +1,16 @@
Copyright 2021 s3lph
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
associated documentation files (the "Software"), to deal in the Software without restriction,
including without limitation the rights to use, copy, modify, merge, publish, distribute,
sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or
substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT
OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

169
README.md Normal file
View file

@ -0,0 +1,169 @@
# EasyWKS
### OpenPGP WKS for Human Beings
---
This is a work-in-progress project. See ROADMAP.md for details
## What is WKD/WKS?
Due to all the issues involved with the PGP key servers we're using today, GnuPG introduced a feature named [**Web Key
Discovery**][wkd] (WKD): Instead of searching for keys on the usual key servers, WKD is taking a **fully**
decentralized and federated approach, where each mail domain is responsible for hosting its users public keys on an
HTTPS web directory. For example, in order to retrieve the key for `john.doe@example.org`, they key can be located at
https://openpgpkey.example.org/.well-known/openpgpkey/example.org/hu/iy9q119eutrkn8s1mk4r39qejnbu3n5q?l=john.doe
In order to get the keys there, GnuPG developed an email-based protocol named [**Web Key Service**][wks] (WKS), which
lets users publish their public keys once they have proven ownership of the key using an email-based challenge-responce
mechanism.
At the time, WKS and WKD aren't yet part of the OpenPGP standard, but there's an [IETF Draft RFC][ietf] that's being
updated from time to time.
## Why EasyWKS?
I've experienced the WKS standard to be extremely cumbersome to conform to. The tools shipped with GnuPG,
`gpg-wks-server` and `gpg-wks-client` are usable well enough as long as you have shell access on a mailserver and pipe
the output of `gpg-wks-client` into `sendmail`. However, I'm usually doing my email via SMTP using a variety of
clients.
No matter which client I tried, I did not manage to get them to send mails in the format required by `gpg-wks-server`.
Even pasting the output of `gpg-wks-client` into `openssl s_client` after manually performing SMTP auth proved to be
difficult in case any line of the PGP-encrypted message happened to start with `Q` or `R`.
So I decided to write a WKS server that's much more lenient regarding the exact format of the mails it receives.
Instead of enforcing the strict format mandated by the standard, EasyWKS only requires:
- For the initial submission request: An unsigned & encrypted PGP/MIME message anywhere in the MIME tree, and the
ASCII-armored PGP-Key anywhere inside the encrypted message.
- For the confirmation response: A signed & encrypted PGP/MIME message anywhere in the MIME tree, and the
confirmation response anywhere inside the encrypted message.
This makes EasyWKS usable with every mail client, no matter whether WKS support is built-in or not.
EasyWKS aims to be a drop-in replacement for gpg-wks-server; see "Bootstrapping" below to learn how to migrate from
gpg-wks-server to EasyWKS.
## Requirements
- Python 3.6 or newer
- PyYAML
- bottle.py
- PGPy
## License
MIT
## Setup
### Installation
TODO
Create a cronjob, e.g. in `/etc/cron.d/easywks`
```crontab
0 3 * * * webkey /path/to/easywks clean
```
### Configuration
Configuration is done in `/etc/easywks.yml` (or any other place as specified by `--config`)
```yaml
---
# EasyWKS works inside this directory. Its PGP keys as well
# as all the submitted and published keys are stored here.
directory: /var/lib/easywks
# Number of seconds after which a pending submission request
# is considered stale and should be removed by easywks clean.
pending_lifetime: 604800
# Port configuration for the webserver. Put this behind a
# HTTPS-terminating reverse proxy!
host: 127.0.0.1
port: 8080
# Every domain served by EasyWKS must be listed here
domains:
example.org:
# Users send their requests to this address. It's up to
# you to make sure that the mails sent their get handed
# to EasyWKS.
submission_address: webkey@example.org
# If you want the PGP key for this domain to be
# password-protected, or if you're supplying your own
# password-protected key, set the passphrase here:
passphrase: "Correct Horse Battery Staple"
# Defaults are gpgwks@<domain> and no password protection.
example.com: {}
```
### Bootstrapping
Run `easywks init` (as the correct user) to initialize all files and directories. This will also generate a PGP key for
each domain, stored in `<workdir>/<domain>/key.pgp`. If you want to, you can replace this key by a key you generated
yourself. If your key is password-protected, you have to supply the passphrase in the config file or remove the
password protection.
If you are migrating from gpg-wks-server to EasyWKS, you can point EasyWKS to gpg-wks-server's working directory
(usually `/var/lib/gnupg/wks`). EasyWKS uses the same directory layout and file formats as gpg-wks-server, so it should
be able to take over where gpg-wks-server stopped. The only thing you need to do is to export the private keys from the
GnuPG keyring and write them to their domain's `key.pgp`.
### Webserver Setup
There are generally two ways to get WKD working:
- The gpg-wks-server approach, i.e. regularly copying the `hu` directories from EasyWKS' working directory to the
webroot.
- Running the EasyWKS web server behind a Reverse Proxy, e.g. using the follwing systemd unit:
```unit file (systemd)
[Unit]
Description=OpenPGP WKS for Human Beings
[Service]
Type=simple
ExecStart=/path/to/easywks webserver
Restart=on-failure
User=webkey
Group=webkey
WorkingDirectory=/var/lib/easywks
[Install]
WantedBy=multi-user.target
```
Once this service is up and running, you can configure your webserver to proxy requests to EasyWKS. E.g. with Apache2:
```
ProxyPass /.well-known/openpgpkey/ http://127.0.0.1:8080/.well-known/openpgpkey/
ProxyPassReverse /.well-known/openpgpkey/ http://127.0.0.1:8080/.well-known/openpgpkey/
```
Note that the webserver built into EasyWKS validates that the `?l=<uid>` matches the uid hash (the last URL component).
### MTA Setup
#### Postfix
Create a small wrapper script, e.g. `/usr/local/bin/postfix-easywks`:
```shell
#!/bin/bash
/path/to/easywks process | /usr/sbin/sendmail -t
```
Add an entry in `/etc/postfix/master.cf`:
```
webkey unix - n n - - pipe
flags=DRhu user=webkey argv=/usr/local/bin/postfix-easywks
```
[wkd]: https://wiki.gnupg.org/WKD
[wks]: https://wiki.gnupg.org/WKS
[ietf]: https://datatracker.ietf.org/doc/html/draft-koch-openpgp-webkey-service-12

6
ROADMAP.md Normal file
View file

@ -0,0 +1,6 @@
# EasyWKS Roadmap
- [ ] Don't rely on sendmail to send out responses. Use e.g. `smtplib`.
- [ ] Get rid of postfix-easywks wrapper script.
- [ ] LMTP server mode, e.g. using the `smtpd` module.
- [ ] Figure out whether file locking in the working directory is necessary to avoid races.

2
easywks/__init__.py Normal file
View file

@ -0,0 +1,2 @@
__version__ = '0.1'

91
easywks/config.py Normal file
View file

@ -0,0 +1,91 @@
import string
import yaml
def _validate_policy_flags(value):
alphabet = string.ascii_lowercase + string.digits + '-._'
if not isinstance(value, dict):
return 'policy_flags must be a map'
for flag, v in value.items():
if not isinstance(flag, str) or len(flag) == 0:
return 'policy_flags has non-string or empty members'
if flag[0] not in string.ascii_lowercase:
return 'policy_flags must start with a lowercase letter'
for c in flag:
if c not in alphabet:
return f'policy_flags has invalid key {flag}'
class _ConfigOption:
def __init__(self, key, typ, default, validator=None):
self._key = key
self._typ = typ
self._default = default
self._validator = validator
self._value = default
@property
def value(self):
return self._value
def load(self, conf: dict):
self._value = conf.get(self._key, self._default)
if not isinstance(self._value, self._typ):
raise TypeError(f'Option {self._key} must be {self._typ}, got {type(self._value)}')
if self._validator:
response = self._validator(self._value)
if response:
raise ValueError(f'Invalid value for option {self._key}: {response}')
class _Config:
def __init__(self, **kwargs):
self._options = kwargs
def __getattr__(self, item):
if item in self._options:
return self._options[item].value
raise AttributeError()
class _GlobalConfig(_Config):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.__domains = {}
def __make_domain(self, domain):
self.__domains[domain] = _Config(
submission_address=_ConfigOption('address', str, f'gpgwks@{domain}'),
passphrase=_ConfigOption('passphrase', str, ''),
policy_flags=_ConfigOption('policy_flags', dict, {}, validator=_validate_policy_flags)
)
def __getitem__(self, item):
return self.__domains.get(item)
@property
def domains(self) -> list:
return list(self.__domains.keys())
def load_config(self, path: str = '/etc/easywks.yml'):
with open(path, 'r') as f:
conf = yaml.safe_load(f)
for co in self._options.values():
co.load(conf)
self.__domains = {}
for domain, dconf in conf['domains'].items():
self.__make_domain(domain)
for co in self.__domains[domain]._options.values():
co.load(conf)
Config = _GlobalConfig(
working_directory=_ConfigOption('directory', str, '/var/lib/easywks'),
host=_ConfigOption('host', str, '127.0.0.1'),
port=_ConfigOption('port', int, 8080),
pending_lifetime=_ConfigOption('pending_lifetime', int, 604800)
)

69
easywks/crypto.py Normal file
View file

@ -0,0 +1,69 @@
import os
import stat
from pgpy import PGPKey, PGPUID, PGPMessage, PGPSignature
from pgpy.constants import PubKeyAlgorithm, KeyFlags, SymmetricKeyAlgorithm, HashAlgorithm, CompressionAlgorithm
from .config import Config
def _keyfile(domain: str) -> str:
return os.path.join(Config.working_directory, domain, 'key.pgp')
class PrivateKey:
def __init__(self, domain: str):
key, _ = PGPKey.from_file(_keyfile(domain))
self.key: PGPKey = key
self.passphrase: str = Config[domain].passphrase
self.context = None
def __enter__(self):
if self.key.is_protected:
self.context = self.key.unlock(self.passphrase)
self.context.__enter__()
return self.key
def __exit__(self, exc_type, exc_val, exc_tb):
if self.context:
self.context.__exit__(exc_type, exc_val, exc_tb)
self.context = None
def create_pgp_key(domain: str):
keyfile = _keyfile(domain)
if os.path.exists(keyfile):
return
# Generate RSA key and UID
key = PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 3072)
subkey = PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 3072)
uid = PGPUID.new(pn='', comment='', email=Config[domain].submission_address)
key.add_uid(uid, selfsign=True, primary=True, key_expiration=None,
usage={KeyFlags.Sign, KeyFlags.Certify},
hashes=[HashAlgorithm.SHA512, HashAlgorithm.SHA256],
ciphers=[SymmetricKeyAlgorithm.AES256, SymmetricKeyAlgorithm.Camellia256],
compression=[CompressionAlgorithm.BZ2, CompressionAlgorithm.Uncompressed])
key.add_subkey(subkey, usage={KeyFlags.EncryptCommunications})
passphrase = Config[domain].passphrase
if passphrase is not None and len(passphrase) > 0:
key.protect(passphrase, SymmetricKeyAlgorithm.AES256, HashAlgorithm.SHA256)
with open(keyfile, 'wb') as f:
os.chmod(keyfile, stat.S_IRUSR | stat.S_IWUSR)
f.write(bytes(key))
def privkey_to_pubkey(domain: str) -> PGPKey:
keyfile = _keyfile(domain)
key, _ = PGPKey.from_file(keyfile)
return key.pubkey
def pgp_decrypt(domain: str, message: PGPMessage) -> PGPMessage:
with PrivateKey(domain) as key:
return key.decrypt(message)
def pgp_sign(domain: str, message: PGPMessage) -> PGPSignature:
with PrivateKey(domain) as key:
return key.sign(message)

90
easywks/files.py Normal file
View file

@ -0,0 +1,90 @@
import os
import stat
from datetime import datetime, timedelta
from pgpy import PGPKey
from .config import Config
from .crypto import create_pgp_key, privkey_to_pubkey
from .util import hash_user_id
def make_submission_address_file(domain: str):
return Config[domain].submission_address + '\n'
def make_policy_file(domain: str):
content = f'submission-address: {Config[domain].submission_address}\n'
for flag, value in Config[domain].policy_flags.items():
if not value or len(value) == 0:
content += f'{flag}: {value}\n'
else:
content += flag + '\n'
return content
def init_working_directory():
wdir = Config.working_directory
os.makedirs(wdir, exist_ok=True)
os.chmod(wdir, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
for domain in Config.domains:
# Create necessary files and directories
os.makedirs(os.path.join(wdir, domain, 'hu'), exist_ok=True)
os.makedirs(os.path.join(wdir, domain, 'pending'), exist_ok=True)
with open(os.path.join(wdir, domain, 'submission-address'), 'w') as saf:
saf.write(make_submission_address_file(domain))
with open(os.path.join(wdir, domain, 'policy'), 'w') as polf:
polf.write(make_policy_file(domain))
# Create PGP key if it doesn't exist yet
create_pgp_key(domain)
# Export submission key to hu dir
key = privkey_to_pubkey(domain)
uid = hash_user_id(Config[domain].submission_address)
with open(os.path.join(wdir, domain, 'hu', uid), 'wb') as hu:
hu.write(bytes(key))
def read_public_key(domain, user):
hu = hash_user_id(user)
keyfile = os.path.join(Config.working_directory, domain, 'hu', hu)
key, _ = PGPKey.from_file(keyfile)
return key
def write_public_key(domain, user, key):
hu = hash_user_id(user)
keyfile = os.path.join(Config.working_directory, domain, 'hu', hu)
with open(keyfile, 'wb') as f:
f.write(bytes(key))
def read_pending_key(domain, nonce):
keyfile = os.path.join(Config.working_directory, domain, 'pending', nonce)
key, _ = PGPKey.from_file(keyfile)
return key
def write_pending_key(domain, nonce, key):
keyfile = os.path.join(Config.working_directory, domain, 'pending', nonce)
with open(keyfile, 'w') as f:
f.write(str(key))
def remove_pending_key(domain, nonce):
keyfile = os.path.join(Config.working_directory, domain, 'pending', nonce)
os.unlink(keyfile)
def clean_stale_requests():
stale = (datetime.utcnow() - timedelta(seconds=Config.pending_lifetime)).timestamp()
for domain in Config.domains:
pending_dir = os.path.join(Config.working_directory, domain, 'pending')
for file in os.listdir(pending_dir):
try:
absfile = os.path.join(pending_dir, file)
if os.stat(absfile).st_mtime < stale:
os.unlink(absfile)
except BaseException as e:
print(e)
continue

43
easywks/main.py Normal file
View file

@ -0,0 +1,43 @@
from .config import Config
from .files import init_working_directory, clean_stale_requests
from .process import process_mail
from .server import run_server
import sys
import argparse
def parse_arguments():
ap = argparse.ArgumentParser(description='OpenPGP WKS for Human Beings')
ap.add_argument('--config', '-c', metavar='/path/to/config.yml', type=str, nargs=1, default='/etc/easywks.yaml')
sp = ap.add_subparsers(description='EasyWKS understands the following commands:', required=True)
init = sp.add_parser('init', help='Initialize the EasyWKS working directory and generate the PGP Key'
'Also called automatically by the other commands.')
init.set_defaults(fn=None)
clean = sp.add_parser('clean', help='Clean up stale pending requests. Call this in a cronjob.')
clean.set_defaults(fn=clean_stale_requests)
process = sp.add_parser('process', help='Read an incoming mail from stdin and write the response to stdout. '
'Hook this up to your MTA.')
process.set_defaults(fn=process_mail)
server = sp.add_parser('webserver', help='Run a WKD web server. Put this behind a HTTPS-terminating reverse proxy.')
server.set_defaults(fn=run_server)
return ap.parse_args(sys.argv[1:])
def main():
args = parse_arguments()
Config.load_config(args.config[0])
init_working_directory()
if args.fn:
args.fn()
if __name__ == '__main__':
main()

154
easywks/process.py Normal file
View file

@ -0,0 +1,154 @@
import sys
from typing import List, Dict
from .crypto import pgp_decrypt
from .util import xloop_header
from .config import Config
from .files import read_pending_key, write_public_key, remove_pending_key, write_pending_key
from .types import SubmissionRequest, ConfirmationResponse, PublishResponse, ConfirmationRequest
from email.message import MIMEPart, Message
from email.parser import BytesParser
from email.policy import default, SMTP
from email.utils import getaddresses
from pgpy import PGPMessage, PGPKey, PGPUID
from pgpy.errors import PGPError
def _get_mime_leafs(msg: Message) -> List[MIMEPart]:
stack = [msg]
leafs = []
while len(stack) > 0:
node = stack.pop()
if node.is_multipart():
stack.extend(node.get_payload())
else:
leafs.append(node)
return leafs
def _get_pgp_message(parts: List[MIMEPart]) -> PGPMessage:
pgp = None
for part in parts:
try:
p: PGPMessage = PGPMessage.from_blob(part.get_content())
except ValueError:
continue
if p.is_encrypted:
if pgp is not None:
raise ValueError('More than one encrypted message part')
pgp = p
if pgp is None:
raise ValueError('No encrypted message part')
return pgp
def _get_pgp_publickey(parts: List[MIMEPart]) -> PGPKey:
pubkey = None
for part in parts:
try:
key, _ = PGPKey.from_blob(part.get_content())
if key.is_public:
if pubkey:
raise ValueError('More than one key in message')
pubkey = key
except PGPError:
pass
if not pubkey:
raise ValueError('No pubkey in message')
return pubkey
def _parse_submission_request(pgp: PGPMessage, submission: str, sender: str):
payload = BytesParser(policy=default).parsebytes(pgp.message)
leafs = _get_mime_leafs(payload)
pubkey = _get_pgp_publickey(leafs)
sender_uid: PGPUID = pubkey.get_uid(sender)
if sender_uid is None or sender_uid.email != sender:
raise ValueError(f'Key has no UID that matches {sender}')
return SubmissionRequest(sender, submission, pubkey)
def _find_confirmation_response(parts: List[MIMEPart]) -> str:
response = None
for part in parts:
if part.get('type', '') == 'confirmation-response':
# the message wasn't a MIME message; return content as-is
return str(part)
c = part.get_content()
if isinstance(c, bytes):
try:
c = c.decode()
except UnicodeDecodeError:
# obviously not our part
continue
if not isinstance(c, str):
# not our part either
continue
if 'confirmation-response' in c:
response = c
continue
if not response:
raise ValueError('No confirmation response found in message')
return response
def _parse_confirmation_response(pgp: PGPMessage, submission: str, sender: str):
payload = BytesParser(policy=default).parsebytes(pgp.message)
parts = _get_mime_leafs(payload)
response = _find_confirmation_response(parts)
rdict: Dict[str, str] = {}
for line in response.splitlines():
if ':' not in line:
continue
key, value = line.split(':', 1)
rdict[key.strip()] = value.strip()
if 'sender' not in rdict or 'nonce' not in rdict or rdict.get('type', '') != 'confirmation-response':
raise ValueError('Message is not a valid confirmation response')
if rdict['sender'] != sender:
raise ValueError('Confirmation sender does not match message sender')
return ConfirmationResponse(rdict['sender'], submission, rdict['nonce'], pgp)
def process_mail(mail: bytes = None):
if not mail:
mail = sys.stdin.read().encode()
msg: Message = BytesParser(policy=default).parsebytes(mail)
_, sender_mail = getaddresses([msg['from']])[0]
local, sender_domain = sender_mail.split('@')
if sender_domain not in Config.domains:
raise KeyError(f'Domain {sender_domain} not supported')
if msg.get('x-loop', '') == xloop_header(sender_domain) or 'auto-submitted' in msg:
# Mail has somehow looped back to us, discard
return
submission_address: str = Config[sender_domain].submission_address
rcpt = getaddresses(msg.get_all('to', []) + msg.get_all('cc', []))
if len(rcpt) != 1:
raise ValueError('Message has more than one recipients')
_, rcpt_mail = rcpt[0]
if rcpt_mail != submission_address:
raise ValueError(f'Message not addressed to submission address {submission_address} for domain {sender_domain}')
leafs = _get_mime_leafs(msg)
pgp: PGPMessage = _get_pgp_message(leafs)
decrypted = pgp_decrypt(sender_domain, pgp)
if decrypted.is_signed:
request: ConfirmationResponse = _parse_confirmation_response(decrypted, submission_address, sender_mail)
try:
key = read_pending_key(sender_domain, request.nonce)
except FileNotFoundError:
# silently ignore non-existing requests
return
# this throws an error if signature verification fails
response: PublishResponse = request.verify_signature(key)
rmsg = response.create_signed_message().as_string(policy=SMTP)
write_public_key(sender_domain, sender_mail, key)
remove_pending_key(sender_domain, request.nonce)
print(rmsg)
else:
request: SubmissionRequest = _parse_submission_request(decrypted, submission_address, sender_mail)
response: ConfirmationRequest = request.confirmation_request()
rmsg = response.create_signed_message().as_string(policy=SMTP)
write_pending_key(sender_domain, response.nonce, request.key)
print(rmsg)

46
easywks/server.py Normal file
View file

@ -0,0 +1,46 @@
from .config import Config
from .files import read_public_key, make_submission_address_file, make_policy_file
from .util import hash_user_id
from bottle import get, run, abort, response, request
@get('/.well-known/openpgpkey/<domain>/submission-address')
def submission_address(domain: str):
if domain not in Config.domains:
abort(404, 'Not Found')
response.add_header('Content-Type', 'text/plain')
return make_submission_address_file(domain)
@get('/.well-known/openpgpkey/<domain>/policy')
def policy(domain: str):
if domain not in Config.domains:
abort(404, 'Not Found')
response.add_header('Content-Type', 'text/plain')
return make_policy_file(domain)
@get('/.well-known/openpgpkey/<domain>/hu/<userhash>')
def hu(domain: str, userhash: str):
if domain not in Config.domains:
abort(404, 'Not Found')
userid = request.query.l
print(userid, userhash, hash_user_id(userid))
if not userid or hash_user_id(userid) != userhash:
abort(404, 'Not Found')
try:
pubkey = read_public_key(domain, userid)
response.add_header('Content-Type', 'application/octet-stream')
return bytes(pubkey)
except FileNotFoundError:
abort(404, 'Not Found')
def run_server():
run(host=Config.host, port=Config.port)
if __name__ == '__main__':
run_server()

231
easywks/types.py Normal file
View file

@ -0,0 +1,231 @@
from datetime import datetime
from email.policy import default
from email.utils import format_datetime
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
from email.mime.text import MIMEText
from pgpy import PGPKey, PGPMessage, PGPUID
from pgpy.types import SignatureVerification
from .crypto import pgp_sign
from .util import create_nonce, fingerprint, xloop_header
class SubmissionRequest:
def __init__(self, submitter_addr: str, submission_addr: str, key: PGPKey):
self._submitter_addr = submitter_addr
self._submission_addr = submission_addr
self._key = key
def confirmation_request(self) -> 'ConfirmationRequest':
return ConfirmationRequest(self._submitter_addr, self. _submission_addr, self._key)
@property
def submitter_address(self):
return self._submitter_addr
@property
def submission_address(self):
return self._submission_addr
@property
def key(self):
return self._key
class ConfirmationRequest:
MAIL_TEXT = '''Hi there!
This is the EasyWKS system at {domain}.
You appear to have submitted your key for publication in the Web Key
Directory. There's one more step you need to complete. If you did not
request this, you can simply ignore this message.
If your email client doesn't automatically complete this challenge, you
can perform this step manually: Please verify that you can decrypt the
second part of this message and that the fingerprint listed in the
encrypted part matches your key. If everything looks ok, please reply
to this message with an **encrypted and signed PGP/MIME message** with
the following content (without the <> brackets)
type: confirmation-response
sender: <your email address>
nonce: <copy the nonce from the encrypted part of this message>
For more information on WKD and WKS see:
https://gnupg.org/faq/wkd.html
https://gnupg.org/faq/wks.html
Regards
EasyWKS
--
Dance like nobody is watching.
Encrypt live everybody is.
'''
def __init__(self, submitter_addr: str, submission_addr: str, key: PGPKey, nonce: str = None):
self._domain = submitter_addr.split('@')[1]
self._submitter_addr = submitter_addr
self._submission_addr = submission_addr
self._key = key
self._nonce = nonce or create_nonce()
@property
def domain(self):
return self._domain
@property
def submitter_address(self):
return self._submitter_addr
@property
def submission_address(self):
return self._submission_addr
@property
def key(self):
return self._key
@property
def nonce(self):
return self._nonce
def create_signed_message(self):
mpplain = MIMEText(ConfirmationRequest.MAIL_TEXT.format(domain=self.domain), _subtype='plain')
ps = '\r\n'.join([
'type: confirmation-request',
f'sender: {self._submission_addr}',
f'address: {self._submitter_addr}',
f'fingerprint: {fingerprint(self._key)}',
f'nonce: {self._nonce}',
''
])
payload = MIMEText(ps, _subtype='plain')
to_encrypt = PGPMessage.new(payload.as_string(policy=default))
encrypted = self._key.encrypt(to_encrypt)
mpenc = MIMEApplication(str(encrypted), _subtype='vnd.gnupg.wks')
mixed = MIMEMultipart(_subtype='mixed', _subparts=[mpplain, mpenc])
to_sign = PGPMessage.new(mixed.as_string(policy=default))
sig = pgp_sign(self.domain, to_sign)
mpsig = MIMEApplication(str(sig), _subtype='pgp-signature')
email = MIMEMultipart(_subtype='signed', _subparts=[mixed, mpsig], policy=default,
protocol='application/pgp-signature')
email['Subject'] = 'Confirm your key publication'
email['To'] = self._submitter_addr
email['From'] = self._submission_addr
email['Date'] = format_datetime(datetime.utcnow())
email['Wks-Draft-Version'] = '3'
email['Wks-Phase'] = 'confirm'
email['X-Loop'] = xloop_header(self.domain)
email['Auto-Submitted'] = 'auto-replied'
return email
class ConfirmationResponse:
def __init__(self, submitter_addr: str, submission_addr: str, nonce: str, msg: PGPMessage):
self._domain = submitter_addr.split('@')[1]
self._submitter_addr = submitter_addr
self._submission_addr = submission_addr
self._nonce = nonce
self._msg = msg
@property
def domain(self):
return self._domain
@property
def submitter_address(self):
return self._submitter_addr
@property
def submission_address(self):
return self._submission_addr
@property
def nonce(self):
return self._nonce
def verify_signature(self, key: PGPKey) -> 'PublishResponse':
uid: PGPUID = key.get_uid(self._submitter_addr)
if uid is None or uid.email != self._submitter_addr:
raise ValueError(f'UID {self._submitter_addr} not found in PGP key')
verification: SignatureVerification = key.verify(self._msg)
for verified, by, sig, subject in verification.good_signatures:
if fingerprint(key) == fingerprint(by):
return PublishResponse(self._submitter_addr, self._submission_addr, key)
raise ValueError('Signature could not be verified')
class PublishResponse:
MAIL_TEXT = '''Hi there!
This is the EasyWKS system at {domain}.
Your key has been published to the Web Key Directory.
You can test WKD key retrieval e.g. with:
gpg --auto-key-locate=wkd,nodefault --locate-key {uid}
For more information on WKD and WKS see:
https://gnupg.org/faq/wkd.html
https://gnupg.org/faq/wks.html
Regards
EasyWKS
--
Dance like nobody is watching.
Encrypt live everybody is.
'''
def __init__(self, submitter_addr: str, submission_addr: str, key: PGPKey):
self._domain = submitter_addr.split('@')[1]
self._submitter_addr = submitter_addr
self._submission_addr = submission_addr
self._key = key
@property
def submitter_address(self):
return self._submitter_addr
@property
def submission_address(self):
return self._submission_addr
@property
def key(self):
return self._key
@property
def domain(self):
return self._domain
def create_signed_message(self):
mpplain = MIMEText(ConfirmationRequest.MAIL_TEXT.format(domain=self.domain, uid=self.submitter_address),
_subtype='plain')
to_encrypt = PGPMessage.new(mpplain.as_string(policy=default))
encrypted: PGPMessage = self.key.encrypt(to_encrypt)
encrypted |= pgp_sign(self.domain, encrypted)
payload = MIMEApplication(str(encrypted), _subtype='octet-stream')
mpenc = MIMEApplication(payload.as_string(policy=default), _subtype='pgp-encrypted')
email = MIMEMultipart(_subtype='encrypted', _subparts=[mpenc], policy=default,
protocol='application/pgp-encrypted')
email['Subject'] = 'Your key has been published'
email['To'] = self.submitter_address
email['From'] = self.submission_address
email['Date'] = format_datetime(datetime.utcnow())
email['Wks-Draft-Version'] = '3'
email['Wks-Phase'] = 'done'
email['X-Loop'] = xloop_header(self.domain)
email['Auto-Submitted'] = 'auto-replied'
return email

44
easywks/util.py Normal file
View file

@ -0,0 +1,44 @@
import hashlib
import secrets
import string
from pgpy import PGPKey
def _zrtp_base32(sha1: bytes) -> str:
# https://datatracker.ietf.org/doc/html/rfc6189#section-5.1.6
alphabet = 'ybndrfg8ejkmcpqxot1uwisza345h769'
encoded = ''
bits = int.from_bytes(sha1, 'big')
shift = 155
for j in range(32):
n = (bits >> shift) & 31
encoded += alphabet[n]
shift -= 5
return encoded
def hash_user_id(uid: str) -> str:
if '@' in uid:
uid, _ = uid.split('@', 1)
# Only ASCII must be lowercased...
lower = ''.join(c.lower() if c in string.ascii_uppercase else c for c in uid)
digest = hashlib.sha1(lower.encode()).digest()
return _zrtp_base32(digest)
def create_nonce(n: int = 32) -> str:
alphabet = string.ascii_letters + string.digits
nonce = ''.join(secrets.choice(alphabet) for _ in range(n))
return nonce
def fingerprint(key: PGPKey) -> str:
return key.fingerprint.upper().replace(' ', '')
def xloop_header(domain: str) -> str:
components = list(reversed(domain.split('.')))
components.append('easywks')
return '.'.join(components)

26
setup.py Normal file
View file

@ -0,0 +1,26 @@
from setuptools import setup, find_packages
from easywks import __version__
setup(
name='easywks',
version=__version__,
author='s3lph',
author_email='account-gitlab-ideynizv@kernelpanic.lol',
description='OpenPGP WKS for Human Beings',
license='MIT',
keywords='pgp,wks',
url='https://gitlab.com/s3lph/easywks',
packages=find_packages(exclude=['*.test']),
install_requires=[
'bottle',
'PyYAML',
'PGPy',
],
entry_points={
'console_scripts': [
'easywks = easywks.main:main'
]
},
)