269 lines
9.3 KiB
Python
269 lines
9.3 KiB
Python
|
|
import string
|
|
import yaml
|
|
|
|
|
|
POLICY_MAILBOX_ONLY = 'mailbox-only'
|
|
POLICY_AUTH_SUBMIT = 'auth-submit'
|
|
POLICY_PROTOCOL_VERSION = 'protocol-version'
|
|
STANDARD_POLICY_FLAGS = {
|
|
POLICY_MAILBOX_ONLY: bool,
|
|
# 'dane-only': bool, # deprecated
|
|
POLICY_AUTH_SUBMIT: bool,
|
|
POLICY_PROTOCOL_VERSION: int,
|
|
}
|
|
|
|
EASYWKS_POLICY_PREFIX = 'me.s3lph.easywks_'
|
|
EWP_PERMIT_UNSIGNED_RESPONSE = EASYWKS_POLICY_PREFIX + 'permit-unsigned-response'
|
|
EWP_STRIP_UNVERIFIED_UIDS = EASYWKS_POLICY_PREFIX + 'strip-unverified-uids'
|
|
EWP_STRIP_UA_UIDS = EASYWKS_POLICY_PREFIX + 'strip-ua-uids'
|
|
EWP_STRIP_3RDPARTY_SIGNATURES = EASYWKS_POLICY_PREFIX + 'strip-3rdparty-signatures'
|
|
EWP_MAX_REVOKED_KEYS = EASYWKS_POLICY_PREFIX + 'max-revoked-keys'
|
|
EWP_MINIMIZE_REVOKED_KEYS = EASYWKS_POLICY_PREFIX + 'minimize-revoked-keys'
|
|
EASYWKS_POLICY_FLAGS = {
|
|
EWP_PERMIT_UNSIGNED_RESPONSE: bool,
|
|
EWP_STRIP_UNVERIFIED_UIDS: bool,
|
|
EWP_STRIP_UA_UIDS: bool,
|
|
EWP_STRIP_3RDPARTY_SIGNATURES: bool,
|
|
EWP_MAX_REVOKED_KEYS: int,
|
|
EWP_MINIMIZE_REVOKED_KEYS: bool,
|
|
}
|
|
|
|
|
|
def _validate_mailing_method(value):
|
|
methods = ['stdout', 'smtp']
|
|
if value not in methods:
|
|
return f'permitted values: {methods}, got {value}'
|
|
|
|
|
|
def _validate_smtp_config(value):
|
|
if not isinstance(value, dict):
|
|
return f'must be a map, got {type(value)}'
|
|
if not isinstance(value['host'], str):
|
|
return f'host must be a str, got {type(value["host"])}'
|
|
if not isinstance(value['port'], int):
|
|
return f'port must be a int, got {type(value["port"])}'
|
|
if 'tls' in value:
|
|
if not isinstance(value['tls'], bool):
|
|
return f'tls must be a bool, got {type(value["tls"])}'
|
|
else:
|
|
value['tls'] = False
|
|
if 'starttls' in value:
|
|
if not isinstance(value['starttls'], bool):
|
|
return f'starttls must be a bool, got {type(value["starttls"])}'
|
|
else:
|
|
value['starttls'] = False
|
|
if 'username' in value and 'password' in value:
|
|
if not isinstance(value['username'], str):
|
|
return f'username must be a str, got {type(value["username"])}'
|
|
if not isinstance(value['password'], str):
|
|
return f'password must be a str, got {type(value["password"])}'
|
|
else:
|
|
value['username'] = None
|
|
value['password'] = None
|
|
|
|
|
|
def _validate_httpd_config(value):
|
|
if not isinstance(value, dict):
|
|
return f'must be a map, got {type(value)}'
|
|
if 'host' in value:
|
|
if not isinstance(value['host'], str):
|
|
return f'host must be a str, got {type(value["host"])}'
|
|
else:
|
|
value['host'] = 'localhost'
|
|
if 'port' in value:
|
|
if not isinstance(value['port'], int):
|
|
return f'port must be a int, got {type(value["port"])}'
|
|
else:
|
|
value['port'] = 8080
|
|
if 'require_user_urlparam' in value:
|
|
if not isinstance(value['require_user_urlparam'], bool):
|
|
return f'port must be a bool, got {type(value["require_user_urlparam"])}'
|
|
else:
|
|
value['require_user_urlparam'] = True
|
|
|
|
|
|
def _validate_lmtpd_config(value):
|
|
if not isinstance(value, dict):
|
|
return f'must be a map, got {type(value)}'
|
|
if not isinstance(value['host'], str):
|
|
return f'host must be a str, got {type(value["host"])}'
|
|
if not isinstance(value['port'], int):
|
|
return f'port must be a int, got {type(value["port"])}'
|
|
|
|
|
|
def _validate_policy_flags(value):
|
|
alphabet = string.ascii_lowercase + string.digits + '-._'
|
|
if not isinstance(value, dict):
|
|
return f'must be a map, got {type(value)}'
|
|
for flag, v in value.items():
|
|
if not isinstance(flag, str) or len(flag) == 0:
|
|
return 'has non-string or empty members'
|
|
if flag[0] not in string.ascii_lowercase:
|
|
return 'must start with a lowercase letter'
|
|
for c in flag:
|
|
if c not in alphabet:
|
|
return f'has invalid key {flag}'
|
|
if '_' in flag:
|
|
if flag.startswith(EASYWKS_POLICY_PREFIX):
|
|
cls = EASYWKS_POLICY_FLAGS.get(flag)
|
|
if flag not in EASYWKS_POLICY_FLAGS:
|
|
return f'unknown policy flag {flag}'
|
|
if not isinstance(v, cls):
|
|
return f'invalid type {v.__class__.__name__} for flag {flag}'
|
|
else:
|
|
cls = STANDARD_POLICY_FLAGS.get(flag)
|
|
if flag not in STANDARD_POLICY_FLAGS:
|
|
return f'unknown policy flag {flag}'
|
|
if not isinstance(v, cls):
|
|
return f'invalid type {v.__class__.__name__} for flag {flag}'
|
|
|
|
|
|
def _validate_responses(value):
|
|
if not isinstance(value, dict):
|
|
return f'must be a map, got {type(value)}'
|
|
if 'header' not in value:
|
|
value['header'] = '''Hi there!
|
|
|
|
This is the EasyWKS system at {domain}.
|
|
'''
|
|
if 'footer' not in value:
|
|
value['footer'] = '''For more information on WKD and WKS see:
|
|
|
|
https://gnupg.org/faq/wkd.html
|
|
https://gnupg.org/faq/wks.html
|
|
|
|
|
|
Regards
|
|
EasyWKS
|
|
|
|
--
|
|
Dance like nobody is watching.
|
|
Encrypt live everybody is.
|
|
'''
|
|
if 'confirm' not in value:
|
|
value['confirm'] = '''You appear to have submitted your key for publication in the Web Key
|
|
Directory. There's one more step you need to complete. If you did not
|
|
request this, you can simply ignore this message.
|
|
|
|
If your email client doesn't automatically complete this challenge, you
|
|
can perform this step manually: Please verify that you can decrypt the
|
|
second part of this message and that the fingerprint listed in the
|
|
encrypted part matches your key. If everything looks ok, please reply
|
|
to this message with an **encrypted and signed PGP/MIME message** with
|
|
the following content (without the <> brackets)
|
|
|
|
type: confirmation-response
|
|
sender: <your email address>
|
|
nonce: <copy the nonce from the encrypted part of this message>
|
|
'''
|
|
if 'done' not in value:
|
|
value['done'] = '''Your key has been published to the Web Key Directory.
|
|
You can test WKD key retrieval e.g. with:
|
|
|
|
gpg --auto-key-locate=wkd,nodefault --locate-key {sender}
|
|
'''
|
|
if 'error' not in value:
|
|
value['error'] = '''An error has occurred while processing your request:
|
|
|
|
{error}
|
|
|
|
If this error persists, please contact your administrator for help.'''
|
|
|
|
|
|
class _ConfigOption:
|
|
|
|
def __init__(self, key, typ, default, validator=None):
|
|
self._key = key
|
|
self._typ = typ
|
|
self._default = default
|
|
self._validator = validator
|
|
self._value = default
|
|
|
|
@property
|
|
def value(self):
|
|
return self._value
|
|
|
|
def load(self, conf: dict):
|
|
self._value = conf.get(self._key, self._default)
|
|
if not isinstance(self._value, self._typ):
|
|
raise TypeError(f'Option {self._key} must be {self._typ}, got {type(self._value)}')
|
|
if self._validator:
|
|
response = self._validator(self._value)
|
|
if response:
|
|
raise ValueError(f'Invalid value for option {self._key}: {response}')
|
|
|
|
|
|
class _Config:
|
|
|
|
def __init__(self, **kwargs):
|
|
self._options = kwargs
|
|
|
|
def __getattr__(self, item):
|
|
if item in self._options:
|
|
return self._options[item].value
|
|
raise AttributeError()
|
|
|
|
|
|
class _GlobalConfig(_Config):
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.__domains = {}
|
|
|
|
def __make_domain(self, domain):
|
|
self.__domains[domain] = _Config(
|
|
submission_address=_ConfigOption('submission_address', str, f'gpgwks@{domain}'),
|
|
passphrase=_ConfigOption('passphrase', str, ''),
|
|
policy_flags=_ConfigOption('policy_flags', dict, {}, validator=_validate_policy_flags)
|
|
)
|
|
|
|
def __getitem__(self, item):
|
|
return self.__domains.get(item)
|
|
|
|
@property
|
|
def domains(self) -> list:
|
|
return list(self.__domains.keys())
|
|
|
|
def load_config(self, path: str = '/etc/easywks.yml'):
|
|
with open(path, 'r') as f:
|
|
conf = yaml.safe_load(f)
|
|
for co in self._options.values():
|
|
co.load(conf)
|
|
self.__domains = {}
|
|
for domain, dconf in conf['domains'].items():
|
|
self.__make_domain(domain)
|
|
for co in self.__domains[domain]._options.values():
|
|
co.load(dconf)
|
|
|
|
|
|
Config = _GlobalConfig(
|
|
working_directory=_ConfigOption('directory', str, '/var/lib/easywks'),
|
|
pending_lifetime=_ConfigOption('pending_lifetime', int, 604800),
|
|
mailing_method=_ConfigOption('mailing_method', str, 'stdout', validator=_validate_mailing_method),
|
|
# TODO: permit_unsigned_response is deprecated, but for now retained for backwards compatibility
|
|
permit_unsigned_response=_ConfigOption('permit_unsigned_response', bool, False),
|
|
httpd=_ConfigOption('httpd', dict, {
|
|
'host': 'localhost',
|
|
'port': 8080,
|
|
'require_user_urlparam': True
|
|
}, validator=_validate_httpd_config),
|
|
smtp=_ConfigOption('smtp', dict, {
|
|
'host': 'localhost',
|
|
'port': 25,
|
|
'tls': False,
|
|
'starttls': False
|
|
}, validator=_validate_smtp_config),
|
|
lmtpd=_ConfigOption('lmtpd', dict, {
|
|
'host': 'localhost',
|
|
'port': 25,
|
|
}, validator=_validate_lmtpd_config),
|
|
responses=_ConfigOption('responses', dict, {}, validator=_validate_responses),
|
|
)
|
|
|
|
|
|
def render_message(key, **kwargs):
|
|
header = Config.responses['header'].format(**kwargs)
|
|
content = Config.responses[key].format(**kwargs)
|
|
footer = Config.responses['footer'].format(**kwargs)
|
|
return f'{header}\n{content}\n{footer}'
|