2019-08-20 00:24:51 +02:00
|
|
|
|
|
|
|
import unittest
|
|
|
|
|
|
|
|
import json
|
|
|
|
import pytz
|
|
|
|
from datetime import timedelta
|
|
|
|
|
|
|
|
from isodate.duration import Duration
|
|
|
|
|
|
|
|
from icalendar_timeseries_server.config import _keycheck, _parse_timedelta, _parse_timezone, Config
|
|
|
|
|
|
|
|
|
|
|
|
_CONFIG_VALID = """
|
|
|
|
{
|
|
|
|
"addr": "127.0.0.1",
|
|
|
|
"port": 8090,
|
|
|
|
"start_delta": "-PT3H",
|
|
|
|
"end_delta": "P30D",
|
|
|
|
"tz": "Europe/Zurich",
|
|
|
|
"calendars": {
|
|
|
|
"private": {
|
|
|
|
"url": "https://example.cloud/dav/me/private.ics",
|
|
|
|
"auth": {
|
|
|
|
"type": "basic",
|
|
|
|
"username": "me",
|
|
|
|
"password": "mysupersecurepassword"
|
|
|
|
}
|
|
|
|
},
|
|
|
|
"public": {
|
2019-08-21 13:40:16 +02:00
|
|
|
"url": "https://example.cloud/dav/me/public.ics",
|
|
|
|
"interval": "P1D"
|
2019-08-20 00:24:51 +02:00
|
|
|
},
|
|
|
|
"confidential": {
|
|
|
|
"url": "https://example.cloud/dav/me/confidential.ics",
|
2019-08-21 13:40:16 +02:00
|
|
|
"interval": "PT5M",
|
2019-08-20 00:24:51 +02:00
|
|
|
"ca": "/etc/ssl/ca.pem",
|
|
|
|
"auth": {
|
|
|
|
"type": "tls",
|
|
|
|
"keyfile": "/etc/ssl/client.pem",
|
|
|
|
"passphrase": "mysupersecurepassword"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
|
|
|
"key_replace": {
|
|
|
|
"summary": "01_summary",
|
|
|
|
"description": "02_description"
|
|
|
|
},
|
|
|
|
"value_replace": {
|
|
|
|
"summary": "{{ summary|truncate(100) }}",
|
|
|
|
"description": "{{ description|truncate(100) }}"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
class ConfigTest(unittest.TestCase):
|
|
|
|
|
|
|
|
def test_keycheck_valid(self) -> None:
|
|
|
|
config = {
|
|
|
|
'foo': 'bar',
|
|
|
|
'bar': 42,
|
|
|
|
'baz': [],
|
|
|
|
'qux': {}
|
|
|
|
}
|
|
|
|
strvalue = _keycheck('foo', config, str, '', valid_values=['foo', 'bar', 'baz'])
|
|
|
|
self.assertEqual(strvalue, 'bar')
|
|
|
|
strdefvalue = _keycheck('fooo', config, str, '', default_value='baar')
|
|
|
|
self.assertEqual(strdefvalue, 'baar')
|
|
|
|
stroptvalue = _keycheck('foooo', config, str, '', optional=True)
|
|
|
|
self.assertIsNone(stroptvalue)
|
|
|
|
intvalue = _keycheck('bar', config, int, '')
|
|
|
|
self.assertEqual(intvalue, 42)
|
|
|
|
listvalue = _keycheck('baz', config, list, '')
|
|
|
|
self.assertEqual(listvalue, [])
|
|
|
|
objvalue = _keycheck('qux', config, dict, '')
|
|
|
|
self.assertEqual(objvalue, {})
|
|
|
|
|
|
|
|
def test_keycheck_missing(self) -> None:
|
|
|
|
config = {
|
|
|
|
'foo': 'bar',
|
|
|
|
}
|
|
|
|
with self.assertRaises(KeyError):
|
|
|
|
_keycheck('baz', config, str, '')
|
|
|
|
|
|
|
|
def test_keycheck_type(self) -> None:
|
|
|
|
config = {
|
|
|
|
'foo': 'bar',
|
|
|
|
}
|
|
|
|
with self.assertRaises(TypeError):
|
|
|
|
_keycheck('foo', config, int, '')
|
|
|
|
|
|
|
|
def test_keycheck_value(self) -> None:
|
|
|
|
config = {
|
|
|
|
'foo': '1337',
|
|
|
|
}
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
_keycheck('foo', config, str, '', valid_values=['foo', 'bar', 'baz'])
|
|
|
|
|
|
|
|
def test_parse_timedelta(self) -> None:
|
|
|
|
config = {
|
|
|
|
'pos': 'PT42S',
|
|
|
|
'neg': '-PT42S'
|
|
|
|
}
|
|
|
|
self.assertEqual(_parse_timedelta('zero', config, '', default_value='PT'), timedelta(seconds=0))
|
|
|
|
self.assertEqual(_parse_timedelta('pos', config, ''), timedelta(seconds=42))
|
|
|
|
self.assertEqual(_parse_timedelta('neg', config, ''), timedelta(seconds=-42))
|
|
|
|
self.assertEqual(_parse_timedelta('pos', config, '', force_positive=True), timedelta(seconds=42))
|
|
|
|
with self.assertRaises(ValueError):
|
|
|
|
_parse_timedelta('neg', config, '', force_positive=True)
|
|
|
|
|
|
|
|
def test_parse_timezone(self) -> None:
|
|
|
|
config = {
|
|
|
|
'tz': 'Europe/Zurich',
|
|
|
|
'notz': 'North/Winterfell'
|
|
|
|
}
|
|
|
|
self.assertEqual(_parse_timezone('tz', config, ''), pytz.timezone('Europe/Zurich'))
|
|
|
|
self.assertEqual(_parse_timezone('def', config, '', default_value='Europe/Berlin'),
|
|
|
|
pytz.timezone('Europe/Berlin'))
|
|
|
|
with self.assertRaises(pytz.exceptions.UnknownTimeZoneError):
|
|
|
|
_parse_timezone('notz', config, '')
|
|
|
|
|
|
|
|
def test_parse_full_config_valid(self):
|
|
|
|
config = Config(json.loads(_CONFIG_VALID))
|
|
|
|
self.assertEqual(config.addr, '127.0.0.1')
|
|
|
|
self.assertEqual(config.port, 8090)
|
|
|
|
self.assertEqual(config.start_delta, Duration(hours=-3))
|
|
|
|
self.assertEqual(config.end_delta, Duration(days=30))
|
|
|
|
self.assertEqual(config.tz, pytz.timezone('Europe/Zurich'))
|
2019-08-21 13:40:16 +02:00
|
|
|
|
|
|
|
def test_parse_calendars(self):
|
|
|
|
config = Config(json.loads(_CONFIG_VALID))
|
|
|
|
self.assertEqual({'public', 'private', 'confidential'}, config.calendars.keys())
|
|
|
|
|
|
|
|
self.assertEqual('https://example.cloud/dav/me/public.ics', config.calendars['public'].url)
|
|
|
|
self.assertEqual(Duration(days=1), config.calendars['public'].interval)
|
|
|
|
self.assertEqual('none', config.calendars['public']._authtype)
|
|
|
|
|
|
|
|
self.assertEqual('https://example.cloud/dav/me/private.ics', config.calendars['private'].url)
|
|
|
|
self.assertEqual(Duration(minutes=15), config.calendars['private'].interval)
|
|
|
|
self.assertEqual('basic', config.calendars['private']._authtype)
|
|
|
|
self.assertEqual('Basic bWU6bXlzdXBlcnNlY3VyZXBhc3N3b3Jk',
|
|
|
|
config.calendars['private']._request_headers['Authorization'])
|
|
|
|
|
|
|
|
self.assertEqual('https://example.cloud/dav/me/confidential.ics', config.calendars['confidential'].url)
|
|
|
|
self.assertEqual(Duration(minutes=5), config.calendars['confidential'].interval)
|
|
|
|
self.assertEqual('tls', config.calendars['confidential']._authtype)
|
|
|
|
self.assertEqual('/etc/ssl/client.pem', config.calendars['confidential']._tls_keyfile)
|
|
|
|
self.assertEqual('mysupersecurepassword', config.calendars['confidential']._tls_passphrase)
|