Added lots of unit tests for the database facade, and already fixed some bugs.
This commit is contained in:
parent
c238e8e9c8
commit
01b0b95652
7 changed files with 257 additions and 36 deletions
|
@ -8,4 +8,4 @@ test:
|
|||
- pip3 install wheel
|
||||
- pip3 install -r requirements.txt
|
||||
- python3-coverage run --branch -m unittest discover matemat
|
||||
- python3-coverage report -m
|
||||
- python3-coverage report -m --include 'matemat/*' --omit '*/test_*.py'
|
||||
|
|
|
@ -56,10 +56,6 @@ class DatabaseFacade(object):
|
|||
'admin': admin,
|
||||
'member': member
|
||||
})
|
||||
affected = c.execute('SELECT changes()').fetchone()[0]
|
||||
if affected != 1:
|
||||
raise DatabaseConsistencyError(
|
||||
f'create_user should affect 1 users row, but affected {affected}')
|
||||
c.execute('SELECT last_insert_rowid()')
|
||||
user_id = int(c.fetchone()[0])
|
||||
return User(user_id, username, email, admin, member)
|
||||
|
@ -100,10 +96,6 @@ class DatabaseFacade(object):
|
|||
'user_id': user.id,
|
||||
'pwhash': pwhash
|
||||
})
|
||||
affected = c.execute('SELECT changes()').fetchone()[0]
|
||||
if affected != 1:
|
||||
raise DatabaseConsistencyError(
|
||||
f'change_password should affect 1 users row, but affected {affected}')
|
||||
|
||||
def change_touchkey(self, user: User, password: str, touchkey: Optional[str], verify_password: bool = True):
|
||||
with self.db.transaction() as c:
|
||||
|
@ -122,10 +114,6 @@ class DatabaseFacade(object):
|
|||
'user_id': user.id,
|
||||
'tkhash': tkhash
|
||||
})
|
||||
affected = c.execute('SELECT changes()').fetchone()[0]
|
||||
if affected != 1:
|
||||
raise DatabaseConsistencyError(
|
||||
f'change_touchkey should affect 1 users row, but affected {affected}')
|
||||
|
||||
def change_user(self, user: User):
|
||||
with self.db.transaction() as c:
|
||||
|
@ -162,7 +150,7 @@ class DatabaseFacade(object):
|
|||
products: List[Product] = []
|
||||
with self.db.transaction(exclusive=False) as c:
|
||||
for row in c.execute('''
|
||||
SELECT product_id, name, price_member, price_external
|
||||
SELECT product_id, name, price_member, price_non_member
|
||||
FROM products
|
||||
'''):
|
||||
product_id, name, price_member, price_external = row
|
||||
|
@ -184,16 +172,10 @@ class DatabaseFacade(object):
|
|||
'price_non_member': price_non_member
|
||||
})
|
||||
c.execute('SELECT last_insert_rowid()')
|
||||
affected = c.execute('SELECT changes()').fetchone()[0]
|
||||
if affected != 1:
|
||||
raise DatabaseConsistencyError(
|
||||
f'create_product should affect 1 products row, but affected {affected}')
|
||||
product_id = int(c.fetchone()[0])
|
||||
return Product(product_id, name, price_member, price_non_member)
|
||||
|
||||
def change_product(self, product: Product):
|
||||
if product.id == -1:
|
||||
raise ValueError('Invalid product ID')
|
||||
with self.db.transaction() as c:
|
||||
c.execute('''
|
||||
UPDATE products
|
||||
|
@ -201,9 +183,10 @@ class DatabaseFacade(object):
|
|||
name = :name,
|
||||
price_member = :price_member,
|
||||
price_non_member = :price_non_member
|
||||
WHERE product_id = :product_is
|
||||
WHERE product_id = :product_id
|
||||
''', {
|
||||
'product_id': product.id,
|
||||
'name': product.name,
|
||||
'price_member': product.price_member,
|
||||
'price_non_member': product.price_non_member
|
||||
})
|
||||
|
@ -230,7 +213,10 @@ class DatabaseFacade(object):
|
|||
FROM consumption
|
||||
WHERE user_id = :user_id
|
||||
AND product_id = :product_id
|
||||
''')
|
||||
''', {
|
||||
'user_id': user.id,
|
||||
'product_id': product.id
|
||||
})
|
||||
row = c.fetchone()
|
||||
if row is None:
|
||||
c.execute('''
|
||||
|
@ -280,8 +266,6 @@ class DatabaseFacade(object):
|
|||
f'increment_consumption should affect 1 products row, but affected {affected}')
|
||||
|
||||
def restock(self, product: Product, count: int):
|
||||
if product.id == -1:
|
||||
raise ValueError('Invalid product ID')
|
||||
with self.db.transaction() as c:
|
||||
c.execute('''
|
||||
UPDATE products
|
||||
|
@ -296,6 +280,8 @@ class DatabaseFacade(object):
|
|||
raise DatabaseConsistencyError(f'restock should affect 1 products row, but affected {affected}')
|
||||
|
||||
def deposit(self, user: User, amount: int):
|
||||
if amount < 0:
|
||||
raise ValueError('Cannot deposit a negative value')
|
||||
with self.db.transaction() as c:
|
||||
c.execute('''
|
||||
UPDATE users
|
||||
|
|
|
@ -14,23 +14,43 @@ class DatabaseTest(unittest.TestCase):
|
|||
|
||||
def test_create_user(self):
|
||||
with self.db as db:
|
||||
with db.transaction() as c:
|
||||
with db.transaction(exclusive=False) as c:
|
||||
db.create_user('testuser', 'supersecurepassword', 'testuser@example.com')
|
||||
c.execute("SELECT * FROM users")
|
||||
row = c.fetchone()
|
||||
if row is None:
|
||||
self.fail()
|
||||
self.assertEqual('testuser', row[1])
|
||||
self.assertEqual('testuser@example.com', row[2])
|
||||
self.assertEqual(0, row[5])
|
||||
self.assertEqual(1, row[6])
|
||||
|
||||
def test_create_existing_user(self):
|
||||
with self.db as db:
|
||||
db.create_user('testuser', 'supersecurepassword', 'testuser@example.com')
|
||||
with self.assertRaises(ValueError):
|
||||
db.create_user('testuser', 'supersecurepassword2', 'testuser2@example.com')
|
||||
|
||||
def test_list_users(self):
|
||||
with self.db as db:
|
||||
users = db.list_users()
|
||||
self.assertEqual(0, len(users))
|
||||
db.create_user('testuser', 'supersecurepassword', 'testuser@example.com', True, True)
|
||||
db.create_user('anothertestuser', 'otherpassword', 'anothertestuser@example.com', False, True)
|
||||
db.create_user('yatu', 'igotapasswordtoo', 'yatu@example.com', False, False)
|
||||
users = db.list_users()
|
||||
self.assertEqual(3, len(users))
|
||||
usercheck = {}
|
||||
for user in users:
|
||||
if user.name == 'testuser':
|
||||
self.assertEqual('testuser@example.com', user.email)
|
||||
self.assertTrue(user.is_member)
|
||||
self.assertTrue(user.is_admin)
|
||||
elif user.name == 'anothertestuser':
|
||||
self.assertEqual('anothertestuser@example.com', user.email)
|
||||
self.assertTrue(user.is_member)
|
||||
self.assertFalse(user.is_admin)
|
||||
elif user.name == 'yatu':
|
||||
self.assertEqual('yatu@example.com', user.email)
|
||||
self.assertFalse(user.is_member)
|
||||
self.assertFalse(user.is_admin)
|
||||
usercheck[user.id] = 1
|
||||
self.assertEqual(3, len(usercheck))
|
||||
|
||||
def test_login(self):
|
||||
with self.db as db:
|
||||
with db.transaction() as c:
|
||||
|
@ -60,7 +80,6 @@ class DatabaseTest(unittest.TestCase):
|
|||
# Both password and touchkey should fail
|
||||
db.login('testuser', password='supersecurepassword', touchkey='0123')
|
||||
|
||||
|
||||
def test_change_password(self):
|
||||
with self.db as db:
|
||||
user = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com')
|
||||
|
@ -79,6 +98,10 @@ class DatabaseTest(unittest.TestCase):
|
|||
with self.assertRaises(AuthenticationError):
|
||||
db.login('testuser', 'mynewpassword')
|
||||
db.login('testuser', 'adminpasswordreset')
|
||||
user._user_id = -1
|
||||
with self.assertRaises(AuthenticationError):
|
||||
# Password change for an inexistent user should fail
|
||||
db.change_password(user, 'adminpasswordreset', 'passwordwithoutuser')
|
||||
|
||||
def test_change_touchkey(self):
|
||||
with self.db as db:
|
||||
|
@ -99,3 +122,192 @@ class DatabaseTest(unittest.TestCase):
|
|||
with self.assertRaises(AuthenticationError):
|
||||
db.login('testuser', touchkey='4567')
|
||||
db.login('testuser', touchkey='89ab')
|
||||
user._user_id = -1
|
||||
with self.assertRaises(AuthenticationError):
|
||||
# Touchkey change for an inexistent user should fail
|
||||
db.change_touchkey(user, '89ab', '048c')
|
||||
|
||||
def test_change_user(self):
|
||||
with self.db as db:
|
||||
user = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com', True, True)
|
||||
user.email = 'newaddress@example.com'
|
||||
user.is_admin = False
|
||||
user.is_member = False
|
||||
db.change_user(user)
|
||||
checkuser = db.login('testuser', 'supersecurepassword')
|
||||
self.assertEqual('newaddress@example.com', checkuser.email)
|
||||
self.assertFalse(checkuser.is_admin)
|
||||
self.assertFalse(checkuser.is_member)
|
||||
user._user_id = -1
|
||||
with self.assertRaises(DatabaseConsistencyError):
|
||||
db.change_user(user)
|
||||
|
||||
def test_delete_user(self):
|
||||
with self.db as db:
|
||||
user = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com', True, True)
|
||||
db.login('testuser', 'supersecurepassword')
|
||||
db.delete_user(user)
|
||||
try:
|
||||
# Should fail, as the user does not exist anymore
|
||||
db.login('testuser', 'supersecurepassword')
|
||||
except AuthenticationError as e:
|
||||
self.assertEqual('User does not exist', e.msg)
|
||||
with self.assertRaises(DatabaseConsistencyError):
|
||||
# Should fail, as the user does not exist anymore
|
||||
db.delete_user(user)
|
||||
|
||||
def test_create_product(self):
|
||||
with self.db as db:
|
||||
with db.transaction() as c:
|
||||
db.create_product('Club Mate', 200, 200)
|
||||
c.execute("SELECT * FROM products")
|
||||
row = c.fetchone()
|
||||
self.assertEqual('Club Mate', row[1])
|
||||
self.assertEqual(0, row[2])
|
||||
self.assertEqual(200, row[3])
|
||||
self.assertEqual(200, row[4])
|
||||
with self.assertRaises(ValueError):
|
||||
db.create_product('Club Mate', 250, 250)
|
||||
|
||||
def test_list_products(self):
|
||||
with self.db as db:
|
||||
# Test empty list
|
||||
products = db.list_products()
|
||||
self.assertEqual(0, len(products))
|
||||
db.create_product('Club Mate', 200, 200)
|
||||
db.create_product('Flora Power Mate', 200, 200)
|
||||
db.create_product('Fritz Mate', 200, 250)
|
||||
products = db.list_products()
|
||||
self.assertEqual(3, len(products))
|
||||
productcheck = {}
|
||||
for product in products:
|
||||
if product.name == 'Club Mate':
|
||||
self.assertEqual(200, product.price_member)
|
||||
self.assertEqual(200, product.price_non_member)
|
||||
elif product.name == 'Flora Power Mate':
|
||||
self.assertEqual(200, product.price_member)
|
||||
self.assertEqual(200, product.price_non_member)
|
||||
elif product.name == 'Fritz Mate':
|
||||
self.assertEqual(200, product.price_member)
|
||||
self.assertEqual(250, product.price_non_member)
|
||||
productcheck[product.id] = 1
|
||||
self.assertEqual(3, len(productcheck))
|
||||
|
||||
def test_change_product(self):
|
||||
with self.db as db:
|
||||
product = db.create_product('Club Mate', 200, 200)
|
||||
product.name = 'Flora Power Mate'
|
||||
product.price_member = 150
|
||||
product.price_non_member = 250
|
||||
db.change_product(product)
|
||||
checkproduct = db.list_products()[0]
|
||||
self.assertEqual('Flora Power Mate', checkproduct.name)
|
||||
self.assertEqual(150, checkproduct.price_member)
|
||||
self.assertEqual(250, checkproduct.price_non_member)
|
||||
product._product_id = -1
|
||||
with self.assertRaises(DatabaseConsistencyError):
|
||||
db.change_product(product)
|
||||
product2 = db.create_product('Club Mate', 200, 200)
|
||||
product2.name = 'Flora Power Mate'
|
||||
with self.assertRaises(DatabaseConsistencyError):
|
||||
# Should fail, as a product with the same name already exists.
|
||||
db.change_product(product2)
|
||||
|
||||
def test_delete_product(self):
|
||||
with self.db as db:
|
||||
product = db.create_product('Club Mate', 200, 200)
|
||||
product2 = db.create_product('Flora Power Mate', 200, 200)
|
||||
|
||||
self.assertEqual(2, len(db.list_products()))
|
||||
db.delete_product(product)
|
||||
|
||||
# Only the other product should remain in the DB
|
||||
products = db.list_products()
|
||||
self.assertEqual(1, len(products))
|
||||
self.assertEqual(product2, products[0])
|
||||
|
||||
with self.assertRaises(DatabaseConsistencyError):
|
||||
# Should fail, as the product does not exist anymore
|
||||
db.delete_product(product)
|
||||
|
||||
def test_deposit(self):
|
||||
with self.db as db:
|
||||
with db.transaction() as c:
|
||||
user = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com', True, True)
|
||||
user2 = db.create_user('testuser2', 'supersecurepassword', 'testuser@example.com', True, True)
|
||||
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user.id])
|
||||
self.assertEqual(0, c.fetchone()[0])
|
||||
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user2.id])
|
||||
self.assertEqual(0, c.fetchone()[0])
|
||||
db.deposit(user, 1337)
|
||||
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user.id])
|
||||
self.assertEqual(1337, c.fetchone()[0])
|
||||
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user2.id])
|
||||
self.assertEqual(0, c.fetchone()[0])
|
||||
with self.assertRaises(ValueError):
|
||||
# Should fail, negative amount
|
||||
db.deposit(user, -42)
|
||||
user._user_id = -1
|
||||
with self.assertRaises(DatabaseConsistencyError):
|
||||
# Should fail, user id -1 does not exist
|
||||
db.deposit(user, 42)
|
||||
|
||||
def test_restock(self):
|
||||
with self.db as db:
|
||||
with db.transaction() as c:
|
||||
product = db.create_product('Club Mate', 200, 200)
|
||||
product2 = db.create_product('Flora Power Mate', 200, 200)
|
||||
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [product.id])
|
||||
self.assertEqual(0, c.fetchone()[0])
|
||||
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [product2.id])
|
||||
self.assertEqual(0, c.fetchone()[0])
|
||||
db.restock(product, 42)
|
||||
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [product.id])
|
||||
self.assertEqual(42, c.fetchone()[0])
|
||||
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [product2.id])
|
||||
self.assertEqual(0, c.fetchone()[0])
|
||||
product._product_id = -1
|
||||
with self.assertRaises(DatabaseConsistencyError):
|
||||
# Should fail, product id -1 does not exist
|
||||
db.restock(product, 42)
|
||||
|
||||
def test_consumption(self):
|
||||
with self.db as db:
|
||||
# Set up test case
|
||||
user1 = db.create_user('user1', 'supersecurepassword', 'testuser@example.com', member=True)
|
||||
user2 = db.create_user('user2', 'supersecurepassword', 'testuser@example.com', member=False)
|
||||
user3 = db.create_user('user3', 'supersecurepassword', 'testuser@example.com', member=False)
|
||||
db.deposit(user1, 1337)
|
||||
db.deposit(user2, 4242)
|
||||
db.deposit(user3, 1234)
|
||||
clubmate = db.create_product('Club Mate', 200, 200)
|
||||
florapowermate = db.create_product('Flora Power Mate', 150, 250)
|
||||
fritzmate = db.create_product('Fritz Mate', 200, 200)
|
||||
db.restock(clubmate, 50)
|
||||
db.restock(florapowermate, 70)
|
||||
db.restock(fritzmate, 10)
|
||||
|
||||
# user1 is somewhat addicted to caffeine
|
||||
db.increment_consumption(user1, clubmate, 1)
|
||||
db.increment_consumption(user1, clubmate, 2)
|
||||
db.increment_consumption(user1, florapowermate, 3)
|
||||
|
||||
# user2 is reeeally addicted
|
||||
db.increment_consumption(user2, clubmate, 7)
|
||||
db.increment_consumption(user2, florapowermate, 3)
|
||||
db.increment_consumption(user2, florapowermate, 4)
|
||||
|
||||
with db.transaction(exclusive=False) as c:
|
||||
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user1.id])
|
||||
self.assertEqual(1337 - 200 * 3 - 150 * 3, c.fetchone()[0])
|
||||
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user2.id])
|
||||
self.assertEqual(4242 - 200 * 7 - 250 * 7, c.fetchone()[0])
|
||||
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user3.id])
|
||||
self.assertEqual(1234, c.fetchone()[0])
|
||||
|
||||
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [clubmate.id])
|
||||
self.assertEqual(40, c.fetchone()[0])
|
||||
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [florapowermate.id])
|
||||
self.assertEqual(60, c.fetchone()[0])
|
||||
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [fritzmate.id])
|
||||
self.assertEqual(10, c.fetchone()[0])
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
|
||||
import apsw
|
||||
|
||||
from matemat.exceptions import DatabaseConsistencyError
|
||||
|
||||
|
||||
class Transaction(object):
|
||||
|
||||
|
@ -33,6 +35,8 @@ class Transaction(object):
|
|||
else:
|
||||
self._cursor.execute('ROLLBACK')
|
||||
self._wrapper._in_transaction = False
|
||||
if exc_type == apsw.ConstraintError:
|
||||
raise DatabaseConsistencyError(str(exc_val))
|
||||
|
||||
|
||||
class DatabaseWrapper(object):
|
||||
|
|
|
@ -5,7 +5,7 @@ class AuthenticationError(BaseException):
|
|||
self._msg = msg
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'AuthenticationErro: {self._msg}'
|
||||
return f'AuthenticationError: {self._msg}'
|
||||
|
||||
@property
|
||||
def msg(self) -> str:
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
|
||||
class Product(object):
|
||||
|
||||
def __init__(self,
|
||||
|
@ -11,6 +10,14 @@ class Product(object):
|
|||
self._price_member: int = price_member
|
||||
self._price_non_member: int = price_non_member
|
||||
|
||||
def __eq__(self, other):
|
||||
if other is None or not isinstance(other, Product):
|
||||
return False
|
||||
return self._product_id == other._product_id \
|
||||
and self._name == other._name \
|
||||
and self._price_member == other._price_member \
|
||||
and self._price_non_member == other._price_non_member
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
return self._product_id
|
||||
|
@ -19,6 +26,11 @@ class Product(object):
|
|||
def name(self) -> str:
|
||||
return self._name
|
||||
|
||||
@name.setter
|
||||
def name(self, name: str):
|
||||
self._name = name
|
||||
|
||||
|
||||
@property
|
||||
def price_member(self) -> int:
|
||||
return self._price_member
|
||||
|
|
|
@ -10,14 +10,21 @@ class User(object):
|
|||
email: Optional[str] = None,
|
||||
admin: bool = False,
|
||||
member: bool = True):
|
||||
if user_id == -1:
|
||||
raise ValueError('Invalid user ID')
|
||||
self._user_id: int = user_id
|
||||
self._username: str = username
|
||||
self._email: Optional[str] = email
|
||||
self._admin: bool = admin
|
||||
self._member: bool = member
|
||||
|
||||
def __eq__(self, other):
|
||||
if other is None or not isinstance(other, User):
|
||||
return False
|
||||
return self._user_id == other._user_id \
|
||||
and self._username == other._username \
|
||||
and self._email == other._email \
|
||||
and self._admin == other._admin \
|
||||
and self._member == other._member
|
||||
|
||||
@property
|
||||
def id(self) -> int:
|
||||
return self._user_id
|
||||
|
|
Loading…
Reference in a new issue