From 2f927cec41d282a9b6e1485b80b8f5a015fde42b Mon Sep 17 00:00:00 2001 From: s3lph Date: Fri, 29 Jun 2018 21:56:22 +0200 Subject: [PATCH] Implemented a container for RequestArgument instances, with some more unit tests. --- matemat/webserver/__init__.py | 2 +- matemat/webserver/requestargs.py | 140 ++++++++++++++-- matemat/webserver/test/test_requestargs.py | 179 ++++++++++++++++++++- 3 files changed, 303 insertions(+), 18 deletions(-) diff --git a/matemat/webserver/__init__.py b/matemat/webserver/__init__.py index 1b4ab06..c52368e 100644 --- a/matemat/webserver/__init__.py +++ b/matemat/webserver/__init__.py @@ -6,5 +6,5 @@ API that can be used by 'pagelets' - single pages of a web service. If a reques server will attempt to serve the request with a static resource in a previously configured webroot directory. """ -from .requestargs import RequestArgument +from .requestargs import RequestArgument, RequestArguments from .httpd import MatematWebserver, HttpHandler, pagelet diff --git a/matemat/webserver/requestargs.py b/matemat/webserver/requestargs.py index dcad518..2150b31 100644 --- a/matemat/webserver/requestargs.py +++ b/matemat/webserver/requestargs.py @@ -1,5 +1,79 @@ -from typing import Iterator, List, Optional, Tuple, Union +from typing import Dict, Iterator, List, Tuple, Union + + +class RequestArguments(object): + """ + Container for HTTP Request arguments. + + Usage: + + # Create empty instance + ra = RequestArguments() + # Add an entry for the key 'foo' with the value 'bar' and Content-Type 'text/plain' + ra['foo'].append('text/plain', 'bar') + # Retrieve the value for the key 'foo', as a string... + foo = str(ra.foo) + # ... or as raw bytes + foo = bytes(ra.foo) + """ + + def __init__(self) -> None: + """ + Create an empty container instance. + """ + self.__container: Dict[str, RequestArgument] = dict() + + def __getitem__(self, key: str) -> 'RequestArgument': + """ + Retrieve the argument for the given name, creating it on the fly, if it doesn't exist. + + :param key: Name of the argument to retrieve. + :return: A RequestArgument instance. + :raises TypeError: If key is not a string. + """ + if not isinstance(key, str): + raise TypeError('key must be a str') + # Create empty argument, if it doesn't exist + if key not in self.__container: + self.__container[key] = RequestArgument(key) + # Return the argument for the name + return self.__container[key] + + def __getattr__(self, key: str) -> 'RequestArgument': + """ + Syntactic sugar for accessing values with a name that can be used in Python attributes. The value will be + returned as an immutable view. + + :param key: Name of the argument to retrieve. + :return: An immutable view of the RequestArgument instance. + """ + return _View.of(self.__container[key]) + + def __iter__(self) -> Iterator['RequestArguments']: + """ + Returns an iterator over the values in this instance. Values are represented as immutable views. + + :return: An iterator that yields immutable views of the values. + """ + for ra in self.__container.values(): + # Yield an immutable scalar view for each value + yield _View.of(ra) + + def __contains__(self, key: str) -> bool: + """ + Checks whether an argument with a given name exists in the RequestArguments instance. + + :param key: The name to check whether it exists. + :return: True, if present, False otherwise. + """ + return key in self.__container + + def __len__(self) -> int: + """ + :return: The number of arguments in this instance. + """ + return len(self.__container) class RequestArgument(object): @@ -16,18 +90,16 @@ class RequestArgument(object): Usage example: qsargs = urllib.parse.parse_qs(qs, strict_parsing=True, keep_blank_values=True, errors='strict') - args: Dict[str, RequestArgument] = dict() + args: RequestArguments for k, vs in qsargs: - args[k] = RequestArgument(k) + args[k].clear() for v in vs: # text/plain usually is a sensible choice for values decoded from urlencoded strings # IF ALREADY IN STRING FORM (which parse_qs does)! args[k].append('text/plain', v) - if 'username' in args and args['username'].is_scalar: - username: str = args['username'].get_str() - else: - raise ValueError() + if 'username' in args and args.username.is_scalar: + username = str(args.username) """ @@ -91,12 +163,12 @@ class RequestArgument(object): :return: An UTF-8 string representation of the requested value. :raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string. :raises IndexError: If the index is out of bounds. - :raises ValueError: If the index is not an int. + :raises TypeError: If the index is not an int. :raises TypeError: If the requested value is neither a str nor a bytes object. """ if not isinstance(index, int): # Index must be an int - raise ValueError('index must not be None') + raise TypeError('index must be an int') # Type hint; access array element v: Tuple[str, Union[bytes, str]] = self.__value[index] if isinstance(v[1], str): @@ -107,6 +179,14 @@ class RequestArgument(object): return v[1].decode('utf-8') raise TypeError('Value is neither a str nor bytes') + def __str__(self) -> str: + """ + Attempts to return the first value as a string. + :return: An UTF-8 string representation of the first value. + :raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string. + """ + return self.get_str() + def get_bytes(self, index: int = 0) -> bytes: """ Attempts to return a value as a bytes object. The index defaults to 0. @@ -114,12 +194,12 @@ class RequestArgument(object): :param index: The index of the value to retrieve. Default: 0. :return: A bytes object representation of the requested value. Strings will be encoded as UTF-8. :raises IndexError: If the index is out of bounds. - :raises ValueError: If the index is not an int. + :raises TypeError: If the index is not an int. :raises TypeError: If the requested value is neither a str nor a bytes object. """ if not isinstance(index, int): # Index must be a int - raise ValueError('index must not be None') + raise TypeError('index must be an int') # Type hint; access array element v: Tuple[str, Union[bytes, str]] = self.__value[index] if isinstance(v[1], bytes): @@ -130,6 +210,13 @@ class RequestArgument(object): return v[1].encode('utf-8') raise TypeError('Value is neither a str nor bytes') + def __bytes__(self) -> bytes: + """ + Attempts to return the first value as a bytes object. + :return: A bytes string representation of the first value. + """ + return self.get_bytes() + def get_content_type(self, index: int = 0) -> str: """ Attempts to retrieve a value's Content-Type. The index defaults to 0. @@ -137,18 +224,20 @@ class RequestArgument(object): :param index: The index of the value to retrieve. Default: 0. :return: The Content-Type of the requested value, as sent by the client. Not necessarily trustworthy. :raises IndexError: If the index is out of bounds. - :raises ValueError: If the index is not an int. + :raises TypeError: If the index is not an int. """ # instance is an array value if not isinstance(index, int): # Needs an index for array values - raise ValueError('index must not be None') + raise TypeError('index must be an int') # Type hint; access array element va: Tuple[str, Union[bytes, str]] = self.__value[index] # Return the content type of the requested value + if not isinstance(va[0], str): + raise TypeError('Content-Type is not a str') return va[0] - def append(self, ctype: str, value: Union[str, bytes]): + def append(self, ctype: str, value: Union[str, bytes]) -> None: """ Append a value to this instance. Turns an empty argument into a scalar and a scalar into an array. @@ -161,6 +250,17 @@ class RequestArgument(object): raise TypeError('A RequestArgument view is immutable!') self.__value.append((ctype, value)) + def clear(self) -> None: + """ + Remove all values from this instance. + + :raises TypeError: If called on an immutable view. + """ + if self.is_view: + # This is an immutable view, raise exception + raise TypeError('A RequestArgument view is immutable!') + self.__value.clear() + def __len__(self) -> int: """ :return: Number of values for this argument. @@ -178,7 +278,7 @@ class RequestArgument(object): # Yield an immutable scalar view for each (ctype, value) element in the array yield _View(self.__name, v) - def __getitem__(self, index: Union[int, slice]): + def __getitem__(self, index: Union[int, slice]) -> 'RequestArgument': """ Index the argument with either an int or a slice. The returned values are represented as immutable RequestArgument views. @@ -206,6 +306,16 @@ class _View(RequestArgument): """ super().__init__(name, value) + @staticmethod + def of(argument: 'RequestArgument') ->'RequestArgument': + """ + Create an immutable, unsliced view of an RequestArgument instance. + + :param argument: The RequestArgument instance to create a view of. + :return: An immutable view of the provided RequestArgument instance. + """ + return argument[:] + @property def is_view(self) -> bool: """ diff --git a/matemat/webserver/test/test_requestargs.py b/matemat/webserver/test/test_requestargs.py index 3383863..3e093a2 100644 --- a/matemat/webserver/test/test_requestargs.py +++ b/matemat/webserver/test/test_requestargs.py @@ -1,9 +1,10 @@ -from typing import List +from typing import Dict, List, Set, Tuple import unittest +import urllib.parse -from matemat.webserver import RequestArgument +from matemat.webserver import RequestArgument, RequestArguments # noinspection PyProtectedMember from matemat.webserver.requestargs import _View @@ -228,6 +229,56 @@ class TestRequestArguments(unittest.TestCase): self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1)) self.assertEqual(b'Hello, World!', ra.get_bytes(2)) + def test_clear_empty(self): + """ + Test clearing an empty RequestArgument. + """ + # Initialize the empty RequestArgument + ra = RequestArgument('foo') + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + ra.clear() + # Clearing an empty RequestArgument shouldn't have any effect + self.assertEqual('foo', ra.name) + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + + def test_clear_scalar(self): + """ + Test clearing a scalar RequestArgument. + """ + # Initialize the scalar RequestArgument + ra = RequestArgument('foo', ('text/plain', 'bar')) + self.assertEqual(1, len(ra)) + self.assertTrue(ra.is_scalar) + ra.clear() + # Clearing a scalar RequestArgument should reduce its size to 0 + self.assertEqual('foo', ra.name) + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + with self.assertRaises(IndexError): + ra.get_str() + + def test_clear_array(self): + """ + Test clearing an array RequestArgument. + """ + # Initialize the array RequestArgument + ra = RequestArgument('foo', [ + ('text/plain', 'bar'), + ('application/octet-stream', b'\x00\x80\xff\xfe'), + ('text/plain', 'baz'), + ]) + self.assertEqual(3, len(ra)) + self.assertFalse(ra.is_scalar) + ra.clear() + # Clearing an array RequestArgument should reduce its size to 0 + self.assertEqual('foo', ra.name) + self.assertEqual(0, len(ra)) + self.assertFalse(ra.is_scalar) + with self.assertRaises(IndexError): + ra.get_str() + def test_iterate_empty(self): """ Test iterating an empty RequestArgument. @@ -352,3 +403,127 @@ class TestRequestArguments(unittest.TestCase): # Make sure the returned value is immutable with self.assertRaises(TypeError): it.append('foo', 'bar') + with self.assertRaises(TypeError): + it.clear() + + def test_str_shorthand(self): + """ + Test the shorthand for get_str(0). + """ + ra = RequestArgument('foo', ('bar', 'baz')) + self.assertEqual('baz', str(ra)) + + def test_bytes_shorthand(self): + """ + Test the shorthand for get_bytes(0). + """ + ra = RequestArgument('foo', ('bar', b'\x00\x80\xff\xfe')) + self.assertEqual(b'\x00\x80\xff\xfe', bytes(ra)) + + # noinspection PyTypeChecker + def test_insert_garbage(self): + """ + Test proper handling with non-int indices and non-str/non-bytes data + :return: + """ + ra = RequestArgument('foo', 42) + with self.assertRaises(TypeError): + str(ra) + ra = RequestArgument('foo', (None, 42)) + with self.assertRaises(TypeError): + str(ra) + with self.assertRaises(TypeError): + bytes(ra) + with self.assertRaises(TypeError): + ra.get_content_type() + with self.assertRaises(TypeError): + ra.get_str('foo') + with self.assertRaises(TypeError): + ra.get_bytes('foo') + with self.assertRaises(TypeError): + ra.get_content_type('foo') + + def test_requestarguments_index(self): + """ + Make sure indexing a RequestArguments instance creates a new entry on the fly. + """ + ra = RequestArguments() + self.assertEqual(0, len(ra)) + self.assertFalse('foo' in ra) + # Create new entry + _ = ra['foo'] + self.assertEqual(1, len(ra)) + self.assertTrue('foo' in ra) + # Already exists, no new entry created + _ = ra['foo'] + self.assertEqual(1, len(ra)) + # Entry must be empty and mutable, and have the correct name + self.assertFalse(ra['foo'].is_view) + self.assertEqual(0, len(ra['foo'])) + self.assertEqual('foo', ra['foo'].name) + # Key must be a string + with self.assertRaises(TypeError): + # noinspection PyTypeChecker + _ = ra[42] + + def test_requestarguments_attr(self): + """ + Test attribute access syntactic sugar. + """ + ra = RequestArguments() + # Attribute should not exist yet + with self.assertRaises(KeyError): + _ = ra.foo + # Create entry + _ = ra['foo'] + # Creating entry should have created the attribute + self.assertEqual('foo', ra.foo.name) + # Attribute access should yield an immutable view + self.assertTrue(ra.foo.is_view) + + def test_requestarguments_iterate(self): + """ + Test iterating a RequestArguments instance. + """ + # Create an instance with some values + ra = RequestArguments() + ra['foo'].append('a', 'b') + ra['bar'].append('c', 'd') + ra['foo'].append('e', 'f') + # Container for test values (name, value) + items: Set[Tuple[str, str]] = set() + # Iterate RequestArguments instance, adding the name and value of each to the set + for a in ra: + items.add((a.name, str(a))) + # Compare result with expected value + self.assertEqual(2, len(items)) + self.assertIn(('foo', 'b'), items) + self.assertIn(('bar', 'd'), items) + + def test_requestarguments_full_use_case(self): + """ + Simulate a minimal RequestArguments use case. + """ + # Create empty RequestArguments instance + ra = RequestArguments() + # Parse GET request + getargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=42&bar=1337&foo=43&baz=Hello,%20World!') + # Insert GET arguments into RequestArguments + for k, vs in getargs.items(): + for v in vs: + ra[k].append('text/plain', v) + # Parse POST request + postargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=postfoo&postbar=42&foo=postfoo') + # Insert POST arguments into RequestArguments + for k, vs in postargs.items(): + # In this implementation, POST args replace GET args + ra[k].clear() + for v in vs: + ra[k].append('text/plain', v) + + # Someplace else: Use the RequestArguments instance. + self.assertEqual('1337', ra.bar.get_str()) + self.assertEqual('Hello, World!', ra.baz.get_str()) + self.assertEqual('42', ra.postbar.get_str()) + for a in ra.foo: + self.assertEqual('postfoo', a.get_str())