1
0
Fork 0
forked from s3lph/matemat

Implemented a container for RequestArgument instances, with some more unit tests.

This commit is contained in:
s3lph 2018-06-29 21:56:22 +02:00
parent 21a927046d
commit 2f927cec41
3 changed files with 303 additions and 18 deletions

View file

@ -6,5 +6,5 @@ API that can be used by 'pagelets' - single pages of a web service. If a reques
server will attempt to serve the request with a static resource in a previously configured webroot directory.
"""
from .requestargs import RequestArgument
from .requestargs import RequestArgument, RequestArguments
from .httpd import MatematWebserver, HttpHandler, pagelet

View file

@ -1,5 +1,79 @@
from typing import Iterator, List, Optional, Tuple, Union
from typing import Dict, Iterator, List, Tuple, Union
class RequestArguments(object):
"""
Container for HTTP Request arguments.
Usage:
# Create empty instance
ra = RequestArguments()
# Add an entry for the key 'foo' with the value 'bar' and Content-Type 'text/plain'
ra['foo'].append('text/plain', 'bar')
# Retrieve the value for the key 'foo', as a string...
foo = str(ra.foo)
# ... or as raw bytes
foo = bytes(ra.foo)
"""
def __init__(self) -> None:
"""
Create an empty container instance.
"""
self.__container: Dict[str, RequestArgument] = dict()
def __getitem__(self, key: str) -> 'RequestArgument':
"""
Retrieve the argument for the given name, creating it on the fly, if it doesn't exist.
:param key: Name of the argument to retrieve.
:return: A RequestArgument instance.
:raises TypeError: If key is not a string.
"""
if not isinstance(key, str):
raise TypeError('key must be a str')
# Create empty argument, if it doesn't exist
if key not in self.__container:
self.__container[key] = RequestArgument(key)
# Return the argument for the name
return self.__container[key]
def __getattr__(self, key: str) -> 'RequestArgument':
"""
Syntactic sugar for accessing values with a name that can be used in Python attributes. The value will be
returned as an immutable view.
:param key: Name of the argument to retrieve.
:return: An immutable view of the RequestArgument instance.
"""
return _View.of(self.__container[key])
def __iter__(self) -> Iterator['RequestArguments']:
"""
Returns an iterator over the values in this instance. Values are represented as immutable views.
:return: An iterator that yields immutable views of the values.
"""
for ra in self.__container.values():
# Yield an immutable scalar view for each value
yield _View.of(ra)
def __contains__(self, key: str) -> bool:
"""
Checks whether an argument with a given name exists in the RequestArguments instance.
:param key: The name to check whether it exists.
:return: True, if present, False otherwise.
"""
return key in self.__container
def __len__(self) -> int:
"""
:return: The number of arguments in this instance.
"""
return len(self.__container)
class RequestArgument(object):
@ -16,18 +90,16 @@ class RequestArgument(object):
Usage example:
qsargs = urllib.parse.parse_qs(qs, strict_parsing=True, keep_blank_values=True, errors='strict')
args: Dict[str, RequestArgument] = dict()
args: RequestArguments
for k, vs in qsargs:
args[k] = RequestArgument(k)
args[k].clear()
for v in vs:
# text/plain usually is a sensible choice for values decoded from urlencoded strings
# IF ALREADY IN STRING FORM (which parse_qs does)!
args[k].append('text/plain', v)
if 'username' in args and args['username'].is_scalar:
username: str = args['username'].get_str()
else:
raise ValueError()
if 'username' in args and args.username.is_scalar:
username = str(args.username)
"""
@ -91,12 +163,12 @@ class RequestArgument(object):
:return: An UTF-8 string representation of the requested value.
:raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string.
:raises IndexError: If the index is out of bounds.
:raises ValueError: If the index is not an int.
:raises TypeError: If the index is not an int.
:raises TypeError: If the requested value is neither a str nor a bytes object.
"""
if not isinstance(index, int):
# Index must be an int
raise ValueError('index must not be None')
raise TypeError('index must be an int')
# Type hint; access array element
v: Tuple[str, Union[bytes, str]] = self.__value[index]
if isinstance(v[1], str):
@ -107,6 +179,14 @@ class RequestArgument(object):
return v[1].decode('utf-8')
raise TypeError('Value is neither a str nor bytes')
def __str__(self) -> str:
"""
Attempts to return the first value as a string.
:return: An UTF-8 string representation of the first value.
:raises UnicodeDecodeError: If the value cannot be decoded into an UTF-8 string.
"""
return self.get_str()
def get_bytes(self, index: int = 0) -> bytes:
"""
Attempts to return a value as a bytes object. The index defaults to 0.
@ -114,12 +194,12 @@ class RequestArgument(object):
:param index: The index of the value to retrieve. Default: 0.
:return: A bytes object representation of the requested value. Strings will be encoded as UTF-8.
:raises IndexError: If the index is out of bounds.
:raises ValueError: If the index is not an int.
:raises TypeError: If the index is not an int.
:raises TypeError: If the requested value is neither a str nor a bytes object.
"""
if not isinstance(index, int):
# Index must be a int
raise ValueError('index must not be None')
raise TypeError('index must be an int')
# Type hint; access array element
v: Tuple[str, Union[bytes, str]] = self.__value[index]
if isinstance(v[1], bytes):
@ -130,6 +210,13 @@ class RequestArgument(object):
return v[1].encode('utf-8')
raise TypeError('Value is neither a str nor bytes')
def __bytes__(self) -> bytes:
"""
Attempts to return the first value as a bytes object.
:return: A bytes string representation of the first value.
"""
return self.get_bytes()
def get_content_type(self, index: int = 0) -> str:
"""
Attempts to retrieve a value's Content-Type. The index defaults to 0.
@ -137,18 +224,20 @@ class RequestArgument(object):
:param index: The index of the value to retrieve. Default: 0.
:return: The Content-Type of the requested value, as sent by the client. Not necessarily trustworthy.
:raises IndexError: If the index is out of bounds.
:raises ValueError: If the index is not an int.
:raises TypeError: If the index is not an int.
"""
# instance is an array value
if not isinstance(index, int):
# Needs an index for array values
raise ValueError('index must not be None')
raise TypeError('index must be an int')
# Type hint; access array element
va: Tuple[str, Union[bytes, str]] = self.__value[index]
# Return the content type of the requested value
if not isinstance(va[0], str):
raise TypeError('Content-Type is not a str')
return va[0]
def append(self, ctype: str, value: Union[str, bytes]):
def append(self, ctype: str, value: Union[str, bytes]) -> None:
"""
Append a value to this instance. Turns an empty argument into a scalar and a scalar into an array.
@ -161,6 +250,17 @@ class RequestArgument(object):
raise TypeError('A RequestArgument view is immutable!')
self.__value.append((ctype, value))
def clear(self) -> None:
"""
Remove all values from this instance.
:raises TypeError: If called on an immutable view.
"""
if self.is_view:
# This is an immutable view, raise exception
raise TypeError('A RequestArgument view is immutable!')
self.__value.clear()
def __len__(self) -> int:
"""
:return: Number of values for this argument.
@ -178,7 +278,7 @@ class RequestArgument(object):
# Yield an immutable scalar view for each (ctype, value) element in the array
yield _View(self.__name, v)
def __getitem__(self, index: Union[int, slice]):
def __getitem__(self, index: Union[int, slice]) -> 'RequestArgument':
"""
Index the argument with either an int or a slice. The returned values are represented as immutable
RequestArgument views.
@ -206,6 +306,16 @@ class _View(RequestArgument):
"""
super().__init__(name, value)
@staticmethod
def of(argument: 'RequestArgument') ->'RequestArgument':
"""
Create an immutable, unsliced view of an RequestArgument instance.
:param argument: The RequestArgument instance to create a view of.
:return: An immutable view of the provided RequestArgument instance.
"""
return argument[:]
@property
def is_view(self) -> bool:
"""

View file

@ -1,9 +1,10 @@
from typing import List
from typing import Dict, List, Set, Tuple
import unittest
import urllib.parse
from matemat.webserver import RequestArgument
from matemat.webserver import RequestArgument, RequestArguments
# noinspection PyProtectedMember
from matemat.webserver.requestargs import _View
@ -228,6 +229,56 @@ class TestRequestArguments(unittest.TestCase):
self.assertEqual(b'\x00\x80\xff\xfe', ra.get_bytes(1))
self.assertEqual(b'Hello, World!', ra.get_bytes(2))
def test_clear_empty(self):
"""
Test clearing an empty RequestArgument.
"""
# Initialize the empty RequestArgument
ra = RequestArgument('foo')
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
ra.clear()
# Clearing an empty RequestArgument shouldn't have any effect
self.assertEqual('foo', ra.name)
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
def test_clear_scalar(self):
"""
Test clearing a scalar RequestArgument.
"""
# Initialize the scalar RequestArgument
ra = RequestArgument('foo', ('text/plain', 'bar'))
self.assertEqual(1, len(ra))
self.assertTrue(ra.is_scalar)
ra.clear()
# Clearing a scalar RequestArgument should reduce its size to 0
self.assertEqual('foo', ra.name)
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
with self.assertRaises(IndexError):
ra.get_str()
def test_clear_array(self):
"""
Test clearing an array RequestArgument.
"""
# Initialize the array RequestArgument
ra = RequestArgument('foo', [
('text/plain', 'bar'),
('application/octet-stream', b'\x00\x80\xff\xfe'),
('text/plain', 'baz'),
])
self.assertEqual(3, len(ra))
self.assertFalse(ra.is_scalar)
ra.clear()
# Clearing an array RequestArgument should reduce its size to 0
self.assertEqual('foo', ra.name)
self.assertEqual(0, len(ra))
self.assertFalse(ra.is_scalar)
with self.assertRaises(IndexError):
ra.get_str()
def test_iterate_empty(self):
"""
Test iterating an empty RequestArgument.
@ -352,3 +403,127 @@ class TestRequestArguments(unittest.TestCase):
# Make sure the returned value is immutable
with self.assertRaises(TypeError):
it.append('foo', 'bar')
with self.assertRaises(TypeError):
it.clear()
def test_str_shorthand(self):
"""
Test the shorthand for get_str(0).
"""
ra = RequestArgument('foo', ('bar', 'baz'))
self.assertEqual('baz', str(ra))
def test_bytes_shorthand(self):
"""
Test the shorthand for get_bytes(0).
"""
ra = RequestArgument('foo', ('bar', b'\x00\x80\xff\xfe'))
self.assertEqual(b'\x00\x80\xff\xfe', bytes(ra))
# noinspection PyTypeChecker
def test_insert_garbage(self):
"""
Test proper handling with non-int indices and non-str/non-bytes data
:return:
"""
ra = RequestArgument('foo', 42)
with self.assertRaises(TypeError):
str(ra)
ra = RequestArgument('foo', (None, 42))
with self.assertRaises(TypeError):
str(ra)
with self.assertRaises(TypeError):
bytes(ra)
with self.assertRaises(TypeError):
ra.get_content_type()
with self.assertRaises(TypeError):
ra.get_str('foo')
with self.assertRaises(TypeError):
ra.get_bytes('foo')
with self.assertRaises(TypeError):
ra.get_content_type('foo')
def test_requestarguments_index(self):
"""
Make sure indexing a RequestArguments instance creates a new entry on the fly.
"""
ra = RequestArguments()
self.assertEqual(0, len(ra))
self.assertFalse('foo' in ra)
# Create new entry
_ = ra['foo']
self.assertEqual(1, len(ra))
self.assertTrue('foo' in ra)
# Already exists, no new entry created
_ = ra['foo']
self.assertEqual(1, len(ra))
# Entry must be empty and mutable, and have the correct name
self.assertFalse(ra['foo'].is_view)
self.assertEqual(0, len(ra['foo']))
self.assertEqual('foo', ra['foo'].name)
# Key must be a string
with self.assertRaises(TypeError):
# noinspection PyTypeChecker
_ = ra[42]
def test_requestarguments_attr(self):
"""
Test attribute access syntactic sugar.
"""
ra = RequestArguments()
# Attribute should not exist yet
with self.assertRaises(KeyError):
_ = ra.foo
# Create entry
_ = ra['foo']
# Creating entry should have created the attribute
self.assertEqual('foo', ra.foo.name)
# Attribute access should yield an immutable view
self.assertTrue(ra.foo.is_view)
def test_requestarguments_iterate(self):
"""
Test iterating a RequestArguments instance.
"""
# Create an instance with some values
ra = RequestArguments()
ra['foo'].append('a', 'b')
ra['bar'].append('c', 'd')
ra['foo'].append('e', 'f')
# Container for test values (name, value)
items: Set[Tuple[str, str]] = set()
# Iterate RequestArguments instance, adding the name and value of each to the set
for a in ra:
items.add((a.name, str(a)))
# Compare result with expected value
self.assertEqual(2, len(items))
self.assertIn(('foo', 'b'), items)
self.assertIn(('bar', 'd'), items)
def test_requestarguments_full_use_case(self):
"""
Simulate a minimal RequestArguments use case.
"""
# Create empty RequestArguments instance
ra = RequestArguments()
# Parse GET request
getargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=42&bar=1337&foo=43&baz=Hello,%20World!')
# Insert GET arguments into RequestArguments
for k, vs in getargs.items():
for v in vs:
ra[k].append('text/plain', v)
# Parse POST request
postargs: Dict[str, List[str]] = urllib.parse.parse_qs('foo=postfoo&postbar=42&foo=postfoo')
# Insert POST arguments into RequestArguments
for k, vs in postargs.items():
# In this implementation, POST args replace GET args
ra[k].clear()
for v in vs:
ra[k].append('text/plain', v)
# Someplace else: Use the RequestArguments instance.
self.assertEqual('1337', ra.bar.get_str())
self.assertEqual('Hello, World!', ra.baz.get_str())
self.assertEqual('42', ra.postbar.get_str())
for a in ra.foo:
self.assertEqual('postfoo', a.get_str())