Unit tests

This commit is contained in:
s3lph 2022-04-15 03:25:46 +02:00
parent f18a2a4021
commit efd34b0fd0
10 changed files with 427 additions and 47 deletions

2
.gitignore vendored
View file

@ -4,7 +4,7 @@
**/__pycache__/
*.pyc
**/*.egg-info/
*.coverage
**/.coverage*
**/.mypy_cache/
ca.pem

View file

@ -9,7 +9,7 @@ stages:
before_script:
- pip3 install coverage pycodestyle mypy
- pip3 install coverage pycodestyle mypy aiosmtpd
- export MULTISCHLEUDER_VERSION=$(python -c 'import multischleuder; print(multischleuder.__version__)')

View file

@ -15,7 +15,7 @@ class SchleuderApi:
def __init__(self,
url: str,
token: str,
cafile: str):
cafile: Optional[str] = None):
self._url = url
self._cafile = cafile
self._dry_run = False
@ -42,8 +42,11 @@ class SchleuderApi:
payload: Optional[bytes] = data.encode() if data is not None else None
# Create a custom SSL context that does not validate the hostname, but
# validates against the self-signed schleuder-api-daemon certificate
context = ssl.create_default_context(cafile=self._cafile)
context.check_hostname = False
if self._cafile is not None:
context = ssl.create_default_context(cafile=self._cafile)
context.check_hostname = False
else:
context = None
# Perform the actual request
req = urllib.request.Request(url, data=payload, method=method, headers=self._headers)
resp = urllib.request.urlopen(req, context=context)
@ -56,7 +59,7 @@ class SchleuderApi:
def get_lists(self) -> List['SchleuderList']:
lists = self.__request('lists.json')
return [SchleuderList(**s) for s in lists]
return [SchleuderList.from_api(**s) for s in lists]
# Subscriber Management
@ -64,7 +67,7 @@ class SchleuderApi:
response = self.__request('subscriptions.json', schleuder.id)
subs: List[SchleuderSubscriber] = []
for r in response:
key = self.__request('keys/{}.json', schleuder.id, fmt=[r['fingerprint']])
key = self.get_key(r['fingerprint'], schleuder)
sub = SchleuderSubscriber.from_api(key, **r)
subs.append(sub)
return subs
@ -91,11 +94,15 @@ class SchleuderApi:
# Key Management
def get_key(self, fpr: str, schleuder: 'SchleuderList'):
if self._dry_run:
return
key = self.__request('keys/{}.json', list_id=schleuder.id, fmt=[fpr])
return SchleuderKey.from_api(schleuder.id, **key)
def post_key(self, key: 'SchleuderKey', schleuder: 'SchleuderList'):
if self._dry_run:
return
if not key.blob:
raise ValueError('Cannot upload a key stub')
data = json.dumps({
'keymaterial': key.blob
})

View file

@ -8,9 +8,9 @@ import smtplib
class TlsMode(enum.Enum):
PLAIN = TlsMode('smtp', 25)
SMTPS = TlsMode('smtps', 465)
STARTTLS = TlsMode('smtp+starttls', 587)
PLAIN = 'smtp', 25
SMTPS = 'smtps', 465
STARTTLS = 'smtp+starttls', 587
def __init__(self, proto: str, port: int):
self._proto = proto
@ -27,8 +27,12 @@ class TlsMode(enum.Enum):
class SmtpClient:
def __init__(self, hostname: str, port: int, tls: 'TlsMode',
username: Optional[str], password: Optional[str]):
def __init__(self,
hostname: str = 'localhost',
port: int = 25,
tls: 'TlsMode' = TlsMode.PLAIN,
username: Optional[str] = None,
password: Optional[str] = None):
self._hostname: str = hostname
self._port: int = port
self._tls: TlsMode = tls
@ -78,4 +82,6 @@ class SmtpClient:
return ret
def __str__(self) -> str:
return f'{self._tls.proto}://{self._username}@{self._hostname}:{self._port}'
if self._username is not None and self._password is not None:
return f'{self._tls.proto}://{self._username}@{self._hostname}:{self._port}'
return f'{self._tls.proto}://{self._hostname}:{self._port}'

View file

View file

@ -0,0 +1,234 @@
import unittest
from datetime import datetime
from unittest.mock import patch, MagicMock
from dateutil.tz import tzutc
from multischleuder.api import SchleuderApi
from multischleuder.types import SchleuderKey, SchleuderList, SchleuderSubscriber
_LIST_RESPONSE = '''
[
{
"id": 42,
"created_at": "2022-04-15T01:11:12.123Z",
"updated_at": "2022-04-15T01:11:12.123Z",
"email": "test@schleuder.example.org",
"fingerprint": "5DCC2536D27521884A2FC7962DFD8DAAC73CB9FD",
"log_level": "warn",
"subject_prefix": "",
"subject_prefix_in": "",
"subject_prefix_out": "",
"openpgp_header_preference": "signencrypt",
"public_footer": "Just a test",
"headers_to_meta": [
"from",
"to",
"cc",
"date",
"sig",
"enc"
],
"bounces_drop_on_headers": {},
"keywords_admin_only": [
"subscribe",
"unsubscribe",
"delete-key"
],
"keywords_admin_notify": [
"add-key"
],
"send_encrypted_only": true,
"receive_encrypted_only": true,
"receive_signed_only": true,
"receive_authenticated_only": true,
"receive_from_subscribed_emailaddresses_only": false,
"receive_admin_only": false,
"keep_msgid": true,
"bounces_drop_all": false,
"bounces_notify_admins": true,
"include_list_headers": true,
"include_openpgp_header": true,
"max_message_size_kb": 10240,
"language": "en",
"forward_all_incoming_to_admins": false,
"logfiles_to_keep": 2,
"internal_footer": "",
"deliver_selfsent": true,
"include_autocrypt_header": true,
"set_reply_to_to_sender": false,
"munge_from": false
}
]
'''
_SUBSCRIBER_RESPONSE = '''
[
{
"id": 23,
"list_id": 42,
"email": "foo@example.org",
"fingerprint": "2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6",
"admin": false,
"delivery_enabled": true,
"created_at": "2022-04-15T01:11:12.123Z",
"updated_at": "2022-04-15T01:11:12.123Z"
}
]
'''
_KEY_RESPONSE = '''
{
"fingerprint": "2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6",
"email": "foo@example.org",
"expiry": null,
"generated_at": "2022-04-14T23:19:24.000Z",
"primary_uid": "Multischleuder Test Key (TEST - DO NOT USE) <foo@example.org>",
"oneline": "0x2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6 foo@example.org 2022-04-14",
"trust_issues": null,
"description": "pub ed25519 2022-04-14 [SC]\\n 2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6\\nuid Multischleuder Test Key (TEST - DO NOT USE) <foo@example.org>\\nsub cv25519 2022-04-14 [E]\\n",
"ascii": "pub ed25519 2022-04-14 [SC]\\n 2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6\\nuid Multischleuder Test Key (TEST - DO NOT USE) <foo@example.org>\\nsub cv25519 2022-04-14 [E]\\n-----BEGIN PGP PUBLIC KEY BLOCK-----\\n\\nmDMEYlirsBYJKwYBBAHaRw8BAQdAGAHsSb3b3x+V6d7XouOXJryqW4mcjn1nDT2z\\nFgf5lEy0PU11bHRpc2NobGV1ZGVyIFRlc3QgS2V5IChURVNUIC0gRE8gTk9UIFVT\\nRSkgPGZvb0BleGFtcGxlLm9yZz6IkAQTFggAOBYhBC+7wN+X/b8eS3BO7eOe9PrE\\nIL62BQJiWKuwAhsDBQsJCAcCBhUKCQgLAgQWAgMBAh4BAheAAAoJEOOe9PrEIL62\\nUZUA/RrRiqhz+eY5PBXWvNzN2FjL0aTsrbPsjQ3fzQ0lThQkAQD+tVjLfx495OXn\\n/Y2NwnZaKtM80FPBsy2C0u0rEd6uALg4BGJYq7ASCisGAQQBl1UBBQEBB0A5xq5f\\nUb1i2Ayvbt+ZFgxx+OL3KT12AkSkLcaAeRjKcQMBCAeIeAQYFggAIBYhBC+7wN+X\\n/b8eS3BO7eOe9PrEIL62BQJiWKuwAhsMAAoJEOOe9PrEIL624lEA/iyn0KNUx8AK\\nrSMLp7JawmsT+uD2pcw1uH3qZPHMja3gAP91/1vKQ8X5tEYzlE9OvVqtf9ESQBLj\\nzxMaMEdub5qiBQ==\\n=RCkT\\n-----END PGP PUBLIC KEY BLOCK-----\\n"
}
''' # noqa E501
class TestSchleuderApi(unittest.TestCase):
def _mock_api(self, mock):
m = MagicMock()
m.getcode.return_value = 200
def read():
url = mock.call_args_list[-1][0][0].get_full_url()
if 'lists.json' in url:
return _LIST_RESPONSE.encode()
if 'subscriptions.json' in url:
return _SUBSCRIBER_RESPONSE.encode()
return _KEY_RESPONSE.encode()
m.read = read
m.__enter__.return_value = m
mock.return_value = m
return SchleuderApi('https://localhost:4443',
'86cf2676d065dc902548e563ab22b57868ed2eb6')
@patch('urllib.request.urlopen')
def test_get_lists(self, mock):
api = self._mock_api(mock)
lists = api.get_lists()
self.assertEqual(1, len(lists))
self.assertEqual('test@schleuder.example.org', lists[0].name)
# Test request data
self.assertEqual('https://localhost:4443/lists.json',
mock.call_args[0][0].get_full_url())
self.assertEqual('Basic c2NobGV1ZGVyOjg2Y2YyNjc2ZDA2NWRjOTAyNTQ4ZTU2M2FiMjJiNTc4NjhlZDJlYjY=',
mock.call_args[0][0].headers.get('Authorization'))
@patch('urllib.request.urlopen')
def test_get_subscribers(self, mock):
api = self._mock_api(mock)
subs = api.get_subscribers(SchleuderList(42, '', ''))
# Test request data
self.assertEqual('https://localhost:4443/subscriptions.json?list_id=42',
mock.call_args_list[0][0][0].get_full_url())
self.assertEqual('https://localhost:4443/keys/2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6.json?list_id=42',
mock.call_args_list[1][0][0].get_full_url())
self.assertEqual(1, len(subs))
self.assertEqual(23, subs[0].id)
self.assertEqual('foo@example.org', subs[0].email)
self.assertEqual(42, subs[0].schleuder)
self.assertEqual(datetime(2022, 4, 15, 1, 11, 12, 123000, tzinfo=tzutc()),
subs[0].created_at)
self.assertEqual('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', subs[0].key.fingerprint)
self.assertEqual('foo@example.org', subs[0].key.email)
self.assertIn('-----BEGIN PGP PUBLIC KEY BLOCK-----', subs[0].key.blob)
self.assertEqual(42, subs[0].key.schleuder)
@patch('urllib.request.urlopen')
def test_subscribe(self, mock):
api = self._mock_api(mock)
now = datetime.utcnow()
key = SchleuderKey('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', 'foo@example.org', 'verylongpgpkeyblock', 42)
sub = SchleuderSubscriber(23, 'foo@example.org', key, 42, now)
api.subscribe(sub, SchleuderList(42, '', ''))
self.assertEqual('https://localhost:4443/subscriptions.json?list_id=42',
mock.call_args_list[0][0][0].get_full_url())
self.assertEqual('POST', mock.call_args_list[0][0][0].method)
# todo assert request payload
@patch('urllib.request.urlopen')
def test_unsubscribe(self, mock):
api = self._mock_api(mock)
now = datetime.utcnow()
key = SchleuderKey('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', 'foo@example.org', 'verylongpgpkeyblock', 42)
sub = SchleuderSubscriber(23, 'foo@example.org', key, 42, now)
api.unsubscribe(sub)
self.assertEqual('https://localhost:4443/subscriptions/23.json',
mock.call_args_list[0][0][0].get_full_url())
self.assertEqual('DELETE', mock.call_args_list[0][0][0].method)
# todo assert request payload
@patch('urllib.request.urlopen')
def test_update_fingerprint(self, mock):
api = self._mock_api(mock)
now = datetime.utcnow()
key = SchleuderKey('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', 'foo@example.org', 'verylongpgpkeyblock', 42)
sub = SchleuderSubscriber(23, 'foo@example.org', key, 42, now)
api.update_fingerprint(sub)
self.assertEqual('https://localhost:4443/subscriptions/23.json',
mock.call_args_list[0][0][0].get_full_url())
self.assertEqual('PATCH', mock.call_args_list[0][0][0].method)
# todo assert request payload
@patch('urllib.request.urlopen')
def test_get_key(self, mock):
api = self._mock_api(mock)
key = api.get_key('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', SchleuderList(42, '', ''))
self.assertEqual('https://localhost:4443/keys/2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6.json?list_id=42',
mock.call_args_list[0][0][0].get_full_url())
self.assertEqual('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', key.fingerprint)
self.assertEqual('foo@example.org', key.email)
self.assertEqual(42, key.schleuder)
self.assertIn('-----BEGIN PGP PUBLIC KEY BLOCK-----', key.blob)
@patch('urllib.request.urlopen')
def test_post_key(self, mock):
api = self._mock_api(mock)
key = SchleuderKey('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', 'foo@example.org', 'verylongpgpkeyblock', 42)
api.post_key(key, SchleuderList(42, '', ''))
self.assertEqual('https://localhost:4443/keys.json?list_id=42',
mock.call_args_list[0][0][0].get_full_url())
self.assertEqual('POST', mock.call_args_list[0][0][0].method)
# todo assert request payload
@patch('urllib.request.urlopen')
def test_delete_key(self, mock):
api = self._mock_api(mock)
key = SchleuderKey('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', 'foo@example.org', 'verylongpgpkeyblock', 42)
api.delete_key(key, SchleuderList(42, '', ''))
self.assertEqual('https://localhost:4443/keys/2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6.json?list_id=42',
mock.call_args_list[0][0][0].get_full_url())
self.assertEqual('DELETE', mock.call_args_list[0][0][0].method)
# todo assert request payload
@patch('urllib.request.urlopen')
def test_dry_run(self, mock):
api = self._mock_api(mock)
api.dry_run()
now = datetime.utcnow()
key = SchleuderKey('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', 'foo@example.org', 'verylongpgpkeyblock', 42)
sub = SchleuderSubscriber(23, 'foo@example.org', key, 42, now)
sch = SchleuderList(42, '', '')
# create, update, delete should be no-ops
api.subscribe(sub, sch)
api.unsubscribe(sub)
api.update_fingerprint(sub)
api.post_key(key, SchleuderList(42, '', ''))
api.delete_key(key, SchleuderList(42, '', ''))
self.assertEqual(0, len(mock.call_args_list))
# only reads should execute
api.get_lists()
api.get_subscribers(sch)
api.get_key('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', sch)
self.assertLess(0, len(mock.call_args_list))

View file

@ -0,0 +1,109 @@
import asyncio
import unittest
from email.mime.text import MIMEText
from aiosmtpd.controller import Controller
from aiosmtpd.smtp import AuthResult, SMTP
from multischleuder.smtp import SmtpClient, TlsMode
class MockSmtpHandler:
def __init__(self):
self.rcpt = None
async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
self.rcpt = address
envelope.rcpt_tos.append(address)
return '250 OK'
class MockController(Controller):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.received_user = None
self.received_pass = None
def _auth(self, server, session, envelope, mechanism, auth_data):
self.received_user = auth_data.login.decode()
self.received_pass = auth_data.password.decode()
return AuthResult(success=True)
def factory(self):
return SMTP(handler=self.handler,
loop=self.loop,
auth_require_tls=False,
authenticator=self._auth)
class TestSmtpClient(unittest.TestCase):
def test_empty(self):
s = 'smtp://localhost:25'
self.assertEqual(s, str(SmtpClient()))
def test_parse_empty(self):
s = 'smtp://localhost:25'
self.assertEqual(s, str(SmtpClient.parse({})))
def test_full(self):
s = 'smtp+starttls://example@example.org:10025'
c = SmtpClient(
hostname='example.org',
port=10025,
tls=TlsMode.STARTTLS,
username='example',
password='supersecurepassword'
)
self.assertEqual(s, str(c))
def test_full_parse(self):
s = 'smtp+starttls://example@example.org:10025'
c = SmtpClient.parse({
'hostname': 'example.org',
'port': 10025,
'tls': 'STARTTLS',
'username': 'example',
'password': 'supersecurepassword'
})
self.assertEqual(s, str(c))
def test_default_ports(self):
self.assertEqual(25, SmtpClient.parse({'tls': 'PLAIN'})._port)
self.assertEqual(465, SmtpClient.parse({'tls': 'SMTPS'})._port)
self.assertEqual(587, SmtpClient.parse({'tls': 'STARTTLS'})._port)
def test_send_message_auth(self):
ctrl = MockController(handler=MockSmtpHandler(), hostname='127.0.0.1', port=10025)
ctrl.start()
client = SmtpClient(
hostname='127.0.0.1',
port=ctrl.port,
username='example',
password='supersecurepassword')
msg = MIMEText('Hello World!')
msg['From'] = 'foo@example.org'
msg['To'] = 'bar@example.org'
with client:
client.send_message(msg)
ctrl.stop()
self.assertEqual('example', ctrl.received_user)
self.assertEqual('supersecurepassword', ctrl.received_pass)
self.assertEqual('bar@example.org', ctrl.handler.rcpt)
def test_send_message(self):
ctrl = MockController(handler=MockSmtpHandler(), hostname='127.0.0.1', port=10026)
ctrl.start()
client = SmtpClient(hostname='127.0.0.1', port=ctrl.port)
msg = MIMEText('Hello World!')
msg['From'] = 'foo@example.org'
msg['To'] = 'baz@example.org'
with client:
client.send_message(msg)
ctrl.stop()
self.assertIsNone(ctrl.received_user)
self.assertIsNone(ctrl.received_pass)
self.assertEqual('baz@example.org', ctrl.handler.rcpt)

View file

@ -0,0 +1,36 @@
import datetime
import json
import unittest
from dateutil.tz import tzutc
from multischleuder.types import SchleuderKey, SchleuderList, SchleuderSubscriber
from multischleuder.test.test_api import _KEY_RESPONSE,_LIST_RESPONSE, _SUBSCRIBER_RESPONSE
class TestSchleuderTypes(unittest.TestCase):
def test_parse_list(self):
s = SchleuderList.from_api(**json.loads(_LIST_RESPONSE)[0])
self.assertEqual(42, s.id)
self.assertEqual('test@schleuder.example.org', s.name)
self.assertEqual('5DCC2536D27521884A2FC7962DFD8DAAC73CB9FD', s.fingerprint)
def test_parse_key(self):
k = SchleuderKey.from_api(42, **json.loads(_KEY_RESPONSE))
self.assertEqual('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', k.fingerprint)
self.assertEqual('foo@example.org', k.email)
self.assertEqual(42, k.schleuder)
self.assertIn('-----BEGIN PGP PUBLIC KEY BLOCK-----', k.blob)
self.assertIn('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6 (foo@example.org)', str(k))
def test_parse_subscriber(self):
k = SchleuderKey.from_api(42, **json.loads(_KEY_RESPONSE))
s = SchleuderSubscriber.from_api(k, **json.loads(_SUBSCRIBER_RESPONSE)[0])
self.assertEqual(23, s.id)
self.assertEqual('foo@example.org', str(s))
self.assertEqual('foo@example.org', s.email)
self.assertEqual(42, s.schleuder)
self.assertEqual(datetime.datetime(2022, 4, 15, 1, 11, 12, 123000, tzinfo=tzutc()),
s.created_at)
self.assertEqual('2FBBC0DF97FDBF1E4B704EEDE39EF4FAC420BEB6', s.key.fingerprint)

View file

@ -6,9 +6,9 @@ from dateutil.parser import isoparse
@dataclass
class SchleuderList:
id: int
name: str
fingerprint: str
id: int = field(compare=True)
name: str = field(compare=False)
fingerprint: str = field(compare=False)
@staticmethod
def from_api(id: int,
@ -17,52 +17,43 @@ class SchleuderList:
*args, **kwargs) -> 'SchleuderList':
return SchleuderList(id, email, fingerprint)
def __hash__(self) -> int:
return hash(self.id)
@dataclass
class SchleuderSubscriber:
id: int
email: str
key: 'SchleuderKey'
schleuder: int
created_at: datetime
id: int = field(compare=False)
email: str = field(compare=True)
key: 'SchleuderKey' = field(compare=False)
schleuder: int = field(compare=False)
created_at: datetime = field(compare=False)
@staticmethod
def from_api(key: 'SchleuderKey',
id: int,
list_id: int,
email: str,
fingerprint: str,
admin: bool,
delivery_enabled: bool,
created_at: str,
updated_at: str) -> 'SchleuderSubscriber':
*args, **kwargs) -> 'SchleuderSubscriber':
created = isoparse(created_at)
return SchleuderSubscriber(id, email, key, list_id, created)
def __str__(self) -> str:
return f'{self.email}'
def __hash__(self) -> int:
return hash(self.email)
def __eq__(self, o) -> bool:
if not isinstance(o, SchleuderSubscriber):
return False
return self.email == o.email
@dataclass
class SchleuderKey:
fingerprint: str
email: str
blob: str = field(repr=False)
schleuder: int
fingerprint: str = field(compare=True)
email: str = field(compare=True)
blob: str = field(compare=True, hash=False, repr=False)
schleuder: int = field(compare=False)
def __hash__(self) -> int:
return hash((self.fingerprint, self.email))
@staticmethod
def from_api(schleuder: int,
fingerprint: str,
email: str,
ascii: str,
*args, **kwargs) -> 'SchleuderKey':
return SchleuderKey(fingerprint, email, ascii, schleuder)
def __str__(self) -> str:
return f'{self.fingerprint} ({self.email})'

View file

@ -18,9 +18,6 @@ setup(
'PyYAML',
'PGPy',
],
test_requirements=[
'mypy'
],
entry_points={
'console_scripts': [
'multischleuder = multischleuder.main:main'