314 lines
15 KiB
Python
314 lines
15 KiB
Python
|
|
import unittest
|
|
|
|
import crypt
|
|
|
|
from matemat.db import MatematDatabase
|
|
from matemat.exceptions import AuthenticationError, DatabaseConsistencyError
|
|
|
|
|
|
class DatabaseTest(unittest.TestCase):
|
|
|
|
def setUp(self) -> None:
|
|
# Create an in-memory database for testing
|
|
self.db = MatematDatabase(':memory:')
|
|
|
|
def test_create_user(self) -> None:
|
|
with self.db as db:
|
|
with db.transaction(exclusive=False) as c:
|
|
db.create_user('testuser', 'supersecurepassword', 'testuser@example.com')
|
|
c.execute("SELECT * FROM users")
|
|
row = c.fetchone()
|
|
self.assertEqual('testuser', row[1])
|
|
self.assertEqual('testuser@example.com', row[2])
|
|
self.assertEqual(0, row[5])
|
|
self.assertEqual(1, row[6])
|
|
with self.assertRaises(ValueError):
|
|
db.create_user('testuser', 'supersecurepassword2', 'testuser2@example.com')
|
|
|
|
def test_list_users(self) -> None:
|
|
with self.db as db:
|
|
users = db.list_users()
|
|
self.assertEqual(0, len(users))
|
|
db.create_user('testuser', 'supersecurepassword', 'testuser@example.com', True, True)
|
|
db.create_user('anothertestuser', 'otherpassword', 'anothertestuser@example.com', False, True)
|
|
db.create_user('yatu', 'igotapasswordtoo', 'yatu@example.com', False, False)
|
|
users = db.list_users()
|
|
self.assertEqual(3, len(users))
|
|
usercheck = {}
|
|
for user in users:
|
|
if user.name == 'testuser':
|
|
self.assertEqual('testuser@example.com', user.email)
|
|
self.assertTrue(user.is_member)
|
|
self.assertTrue(user.is_admin)
|
|
elif user.name == 'anothertestuser':
|
|
self.assertEqual('anothertestuser@example.com', user.email)
|
|
self.assertTrue(user.is_member)
|
|
self.assertFalse(user.is_admin)
|
|
elif user.name == 'yatu':
|
|
self.assertEqual('yatu@example.com', user.email)
|
|
self.assertFalse(user.is_member)
|
|
self.assertFalse(user.is_admin)
|
|
usercheck[user.id] = 1
|
|
self.assertEqual(3, len(usercheck))
|
|
|
|
def test_login(self) -> None:
|
|
with self.db as db:
|
|
with db.transaction() as c:
|
|
u = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com')
|
|
# Add a touchkey without using the provided function
|
|
c.execute('''UPDATE users SET touchkey = :tkhash WHERE user_id = :user_id''', {
|
|
'tkhash': crypt.crypt('0123', crypt.mksalt()),
|
|
'user_id': u.id
|
|
})
|
|
user = db.login('testuser', 'supersecurepassword')
|
|
self.assertEqual(u.id, user.id)
|
|
user = db.login('testuser', touchkey='0123')
|
|
self.assertEqual(u.id, user.id)
|
|
with self.assertRaises(AuthenticationError):
|
|
# Inexistent user should fail
|
|
db.login('nooone', 'supersecurepassword')
|
|
with self.assertRaises(AuthenticationError):
|
|
# Wrong password should fail
|
|
db.login('testuser', 'anothersecurepassword')
|
|
with self.assertRaises(AuthenticationError):
|
|
# Wrong touchkey should fail
|
|
db.login('testuser', touchkey='0124')
|
|
with self.assertRaises(ValueError):
|
|
# No password or touchkey should fail
|
|
db.login('testuser')
|
|
with self.assertRaises(ValueError):
|
|
# Both password and touchkey should fail
|
|
db.login('testuser', password='supersecurepassword', touchkey='0123')
|
|
|
|
def test_change_password(self) -> None:
|
|
with self.db as db:
|
|
user = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com')
|
|
db.login('testuser', 'supersecurepassword')
|
|
# Normal password change should succeed
|
|
db.change_password(user, 'supersecurepassword', 'mynewpassword')
|
|
with self.assertRaises(AuthenticationError):
|
|
db.login('testuser', 'supersecurepassword')
|
|
db.login('testuser', 'mynewpassword')
|
|
with self.assertRaises(AuthenticationError):
|
|
# Should fail due to wrong old password
|
|
db.change_password(user, 'iforgotmypassword', 'mynewpassword')
|
|
db.login('testuser', 'mynewpassword')
|
|
# This should pass even though the old password is not known (admin password reset)
|
|
db.change_password(user, '42', 'adminpasswordreset', verify_password=False)
|
|
with self.assertRaises(AuthenticationError):
|
|
db.login('testuser', 'mynewpassword')
|
|
db.login('testuser', 'adminpasswordreset')
|
|
user.id = -1
|
|
with self.assertRaises(AuthenticationError):
|
|
# Password change for an inexistent user should fail
|
|
db.change_password(user, 'adminpasswordreset', 'passwordwithoutuser')
|
|
|
|
def test_change_touchkey(self) -> None:
|
|
with self.db as db:
|
|
user = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com')
|
|
db.change_touchkey(user, 'supersecurepassword', '0123')
|
|
db.login('testuser', touchkey='0123')
|
|
# Normal touchkey change should succeed
|
|
db.change_touchkey(user, 'supersecurepassword', touchkey='4567')
|
|
with self.assertRaises(AuthenticationError):
|
|
db.login('testuser', touchkey='0123')
|
|
db.login('testuser', touchkey='4567')
|
|
with self.assertRaises(AuthenticationError):
|
|
# Should fail due to wrong old password
|
|
db.change_touchkey(user, 'iforgotmypassword', '89ab')
|
|
db.login('testuser', touchkey='4567')
|
|
# This should pass even though the old password is not known (admin password reset)
|
|
db.change_touchkey(user, '42', '89ab', verify_password=False)
|
|
with self.assertRaises(AuthenticationError):
|
|
db.login('testuser', touchkey='4567')
|
|
db.login('testuser', touchkey='89ab')
|
|
user.id = -1
|
|
with self.assertRaises(AuthenticationError):
|
|
# Touchkey change for an inexistent user should fail
|
|
db.change_touchkey(user, '89ab', '048c')
|
|
|
|
def test_change_user(self) -> None:
|
|
with self.db as db:
|
|
user = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com', True, True)
|
|
user.email = 'newaddress@example.com'
|
|
user.is_admin = False
|
|
user.is_member = False
|
|
db.change_user(user)
|
|
checkuser = db.login('testuser', 'supersecurepassword')
|
|
self.assertEqual('newaddress@example.com', checkuser.email)
|
|
self.assertFalse(checkuser.is_admin)
|
|
self.assertFalse(checkuser.is_member)
|
|
user.id = -1
|
|
with self.assertRaises(DatabaseConsistencyError):
|
|
db.change_user(user)
|
|
|
|
def test_delete_user(self) -> None:
|
|
with self.db as db:
|
|
user = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com', True, True)
|
|
db.login('testuser', 'supersecurepassword')
|
|
db.delete_user(user)
|
|
try:
|
|
# Should fail, as the user does not exist anymore
|
|
db.login('testuser', 'supersecurepassword')
|
|
except AuthenticationError as e:
|
|
self.assertEqual('User does not exist', e.msg)
|
|
with self.assertRaises(DatabaseConsistencyError):
|
|
# Should fail, as the user does not exist anymore
|
|
db.delete_user(user)
|
|
|
|
def test_create_product(self) -> None:
|
|
with self.db as db:
|
|
with db.transaction() as c:
|
|
db.create_product('Club Mate', 200, 200)
|
|
c.execute("SELECT * FROM products")
|
|
row = c.fetchone()
|
|
self.assertEqual('Club Mate', row[1])
|
|
self.assertEqual(0, row[2])
|
|
self.assertEqual(200, row[3])
|
|
self.assertEqual(200, row[4])
|
|
with self.assertRaises(ValueError):
|
|
db.create_product('Club Mate', 250, 250)
|
|
|
|
def test_list_products(self) -> None:
|
|
with self.db as db:
|
|
# Test empty list
|
|
products = db.list_products()
|
|
self.assertEqual(0, len(products))
|
|
db.create_product('Club Mate', 200, 200)
|
|
db.create_product('Flora Power Mate', 200, 200)
|
|
db.create_product('Fritz Mate', 200, 250)
|
|
products = db.list_products()
|
|
self.assertEqual(3, len(products))
|
|
productcheck = {}
|
|
for product in products:
|
|
if product.name == 'Club Mate':
|
|
self.assertEqual(200, product.price_member)
|
|
self.assertEqual(200, product.price_non_member)
|
|
elif product.name == 'Flora Power Mate':
|
|
self.assertEqual(200, product.price_member)
|
|
self.assertEqual(200, product.price_non_member)
|
|
elif product.name == 'Fritz Mate':
|
|
self.assertEqual(200, product.price_member)
|
|
self.assertEqual(250, product.price_non_member)
|
|
productcheck[product.id] = 1
|
|
self.assertEqual(3, len(productcheck))
|
|
|
|
def test_change_product(self) -> None:
|
|
with self.db as db:
|
|
product = db.create_product('Club Mate', 200, 200)
|
|
product.name = 'Flora Power Mate'
|
|
product.price_member = 150
|
|
product.price_non_member = 250
|
|
db.change_product(product)
|
|
checkproduct = db.list_products()[0]
|
|
self.assertEqual('Flora Power Mate', checkproduct.name)
|
|
self.assertEqual(150, checkproduct.price_member)
|
|
self.assertEqual(250, checkproduct.price_non_member)
|
|
product.id = -1
|
|
with self.assertRaises(DatabaseConsistencyError):
|
|
db.change_product(product)
|
|
product2 = db.create_product('Club Mate', 200, 200)
|
|
product2.name = 'Flora Power Mate'
|
|
with self.assertRaises(DatabaseConsistencyError):
|
|
# Should fail, as a product with the same name already exists.
|
|
db.change_product(product2)
|
|
|
|
def test_delete_product(self) -> None:
|
|
with self.db as db:
|
|
product = db.create_product('Club Mate', 200, 200)
|
|
product2 = db.create_product('Flora Power Mate', 200, 200)
|
|
|
|
self.assertEqual(2, len(db.list_products()))
|
|
db.delete_product(product)
|
|
|
|
# Only the other product should remain in the DB
|
|
products = db.list_products()
|
|
self.assertEqual(1, len(products))
|
|
self.assertEqual(product2, products[0])
|
|
|
|
with self.assertRaises(DatabaseConsistencyError):
|
|
# Should fail, as the product does not exist anymore
|
|
db.delete_product(product)
|
|
|
|
def test_deposit(self) -> None:
|
|
with self.db as db:
|
|
with db.transaction() as c:
|
|
user = db.create_user('testuser', 'supersecurepassword', 'testuser@example.com', True, True)
|
|
user2 = db.create_user('testuser2', 'supersecurepassword', 'testuser@example.com', True, True)
|
|
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user.id])
|
|
self.assertEqual(0, c.fetchone()[0])
|
|
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user2.id])
|
|
self.assertEqual(0, c.fetchone()[0])
|
|
db.deposit(user, 1337)
|
|
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user.id])
|
|
self.assertEqual(1337, c.fetchone()[0])
|
|
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user2.id])
|
|
self.assertEqual(0, c.fetchone()[0])
|
|
with self.assertRaises(ValueError):
|
|
# Should fail, negative amount
|
|
db.deposit(user, -42)
|
|
user.id = -1
|
|
with self.assertRaises(DatabaseConsistencyError):
|
|
# Should fail, user id -1 does not exist
|
|
db.deposit(user, 42)
|
|
|
|
def test_restock(self) -> None:
|
|
with self.db as db:
|
|
with db.transaction() as c:
|
|
product = db.create_product('Club Mate', 200, 200)
|
|
product2 = db.create_product('Flora Power Mate', 200, 200)
|
|
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [product.id])
|
|
self.assertEqual(0, c.fetchone()[0])
|
|
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [product2.id])
|
|
self.assertEqual(0, c.fetchone()[0])
|
|
db.restock(product, 42)
|
|
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [product.id])
|
|
self.assertEqual(42, c.fetchone()[0])
|
|
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [product2.id])
|
|
self.assertEqual(0, c.fetchone()[0])
|
|
product.id = -1
|
|
with self.assertRaises(DatabaseConsistencyError):
|
|
# Should fail, product id -1 does not exist
|
|
db.restock(product, 42)
|
|
|
|
def test_consumption(self) -> None:
|
|
with self.db as db:
|
|
# Set up test case
|
|
user1 = db.create_user('user1', 'supersecurepassword', 'testuser@example.com', member=True)
|
|
user2 = db.create_user('user2', 'supersecurepassword', 'testuser@example.com', member=False)
|
|
user3 = db.create_user('user3', 'supersecurepassword', 'testuser@example.com', member=False)
|
|
db.deposit(user1, 1337)
|
|
db.deposit(user2, 4242)
|
|
db.deposit(user3, 1234)
|
|
clubmate = db.create_product('Club Mate', 200, 200)
|
|
florapowermate = db.create_product('Flora Power Mate', 150, 250)
|
|
fritzmate = db.create_product('Fritz Mate', 200, 200)
|
|
db.restock(clubmate, 50)
|
|
db.restock(florapowermate, 70)
|
|
db.restock(fritzmate, 10)
|
|
|
|
# user1 is somewhat addicted to caffeine
|
|
db.increment_consumption(user1, clubmate, 1)
|
|
db.increment_consumption(user1, clubmate, 2)
|
|
db.increment_consumption(user1, florapowermate, 3)
|
|
|
|
# user2 is reeeally addicted
|
|
db.increment_consumption(user2, clubmate, 7)
|
|
db.increment_consumption(user2, florapowermate, 3)
|
|
db.increment_consumption(user2, florapowermate, 4)
|
|
|
|
with db.transaction(exclusive=False) as c:
|
|
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user1.id])
|
|
self.assertEqual(1337 - 200 * 3 - 150 * 3, c.fetchone()[0])
|
|
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user2.id])
|
|
self.assertEqual(4242 - 200 * 7 - 250 * 7, c.fetchone()[0])
|
|
c.execute('''SELECT balance FROM users WHERE user_id = ?''', [user3.id])
|
|
self.assertEqual(1234, c.fetchone()[0])
|
|
|
|
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [clubmate.id])
|
|
self.assertEqual(40, c.fetchone()[0])
|
|
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [florapowermate.id])
|
|
self.assertEqual(60, c.fetchone()[0])
|
|
c.execute('''SELECT stock FROM products WHERE product_id = ?''', [fritzmate.id])
|
|
self.assertEqual(10, c.fetchone()[0])
|