forked from s3lph/matemat
Initial commit; database facade mostly implemented, but only partially tested
This commit is contained in:
commit
c91f77702a
15 changed files with 655 additions and 0 deletions
8
.gitignore
vendored
Normal file
8
.gitignore
vendored
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
**/.idea/
|
||||||
|
*.iml
|
||||||
|
|
||||||
|
**/__pycache__/
|
||||||
|
*.pyc
|
||||||
|
|
||||||
|
*.sqlite3
|
||||||
|
*.db
|
2
matemat/__init__.py
Normal file
2
matemat/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
|
||||||
|
__version__ = '2.0'
|
2
matemat/db/__init__.py
Normal file
2
matemat/db/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
|
||||||
|
from .database import Database
|
364
matemat/db/database.py
Normal file
364
matemat/db/database.py
Normal file
|
@ -0,0 +1,364 @@
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
import apsw
|
||||||
|
import bcrypt
|
||||||
|
|
||||||
|
from matemat.primitives import User, Product
|
||||||
|
from matemat.exceptions import AuthenticationException
|
||||||
|
|
||||||
|
|
||||||
|
class Transaction(object):
|
||||||
|
|
||||||
|
def __init__(self, db: apsw.Connection, wrapper: 'Database', exclusive: bool = True):
|
||||||
|
self._db: apsw.Connection = db
|
||||||
|
self._cursor = None
|
||||||
|
self._excl = exclusive
|
||||||
|
self._wrapper: Database = wrapper
|
||||||
|
self._is_dummy: bool = False
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
if self._wrapper._in_transaction:
|
||||||
|
self._is_dummy = True
|
||||||
|
return self._db.cursor()
|
||||||
|
else:
|
||||||
|
self._is_dummy = False
|
||||||
|
self._cursor = self._db.cursor()
|
||||||
|
self._wrapper._in_transaction = True
|
||||||
|
if self._excl:
|
||||||
|
self._cursor.execute('BEGIN EXCLUSIVE')
|
||||||
|
else:
|
||||||
|
self._cursor.execute('BEGIN')
|
||||||
|
return self._cursor
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
if self._is_dummy:
|
||||||
|
return
|
||||||
|
if exc_type is None:
|
||||||
|
self._cursor.execute('COMMIT')
|
||||||
|
else:
|
||||||
|
self._cursor.execute('ROLLBACK')
|
||||||
|
self._wrapper._in_transaction = False
|
||||||
|
|
||||||
|
|
||||||
|
class Database(object):
|
||||||
|
SCHEMA_VERSION = 1
|
||||||
|
|
||||||
|
SCHEMA = '''
|
||||||
|
CREATE TABLE users (
|
||||||
|
user_id INTEGER PRIMARY KEY,
|
||||||
|
username TEXT NOT NULL,
|
||||||
|
email TEXT DEFAULT NULL,
|
||||||
|
password TEXT NOT NULL,
|
||||||
|
touchkey TEXT DEFAULT NULL,
|
||||||
|
is_admin INTEGER(1) NOT NULL DEFAULT 0,
|
||||||
|
is_member INTEGER(1) NOT NULL DEFAULT 1,
|
||||||
|
balance INTEGER(8) NOT NULL DEFAULT 0,
|
||||||
|
lastchange INTEGER(8) NOT NULL DEFAULT 0
|
||||||
|
);
|
||||||
|
CREATE TABLE products (
|
||||||
|
product_id INTEGER PRIMARY KEY,
|
||||||
|
name TEXT UNIQUE NOT NULL,
|
||||||
|
stock INTEGER(8) NOT NULL DEFAULT 0,
|
||||||
|
price_member INTEGER(8) NOT NULL,
|
||||||
|
price_non_member INTEGER(8) NOT NULL
|
||||||
|
);
|
||||||
|
CREATE TABLE consumption (
|
||||||
|
user_id INTEGER NOT NULL,
|
||||||
|
product_id INTEGER NOT NULL,
|
||||||
|
count INTEGER(8) NOT NULL DEFAULT 0,
|
||||||
|
PRIMARY KEY (user_id, product_id),
|
||||||
|
FOREIGN KEY (user_id) REFERENCES users(user_id)
|
||||||
|
ON DELETE CASCADE ON UPDATE CASCADE,
|
||||||
|
FOREIGN KEY (product_id) REFERENCES products(product_id)
|
||||||
|
ON DELETE CASCADE ON UPDATE CASCADE
|
||||||
|
)
|
||||||
|
'''
|
||||||
|
|
||||||
|
def __init__(self, filename: str):
|
||||||
|
self._filename: str = filename
|
||||||
|
self._sqlite_db: apsw.Connection = None
|
||||||
|
self._in_transaction: bool = False
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.connect()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
self.close()
|
||||||
|
|
||||||
|
def transaction(self, exclusive: bool = True) -> Transaction:
|
||||||
|
return Transaction(self._sqlite_db, self, exclusive)
|
||||||
|
|
||||||
|
def _setup(self):
|
||||||
|
with self.transaction() as c:
|
||||||
|
version: int = self._user_version
|
||||||
|
if version < 1:
|
||||||
|
c.execute(self.SCHEMA)
|
||||||
|
elif version < self.SCHEMA_VERSION:
|
||||||
|
self._upgrade(old=version, new=self.SCHEMA_VERSION)
|
||||||
|
self._user_version = self.SCHEMA_VERSION
|
||||||
|
|
||||||
|
def _upgrade(self, old: int, new: int):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
if self.is_connected():
|
||||||
|
raise RuntimeError(f'Database connection to {self._filename} is already established.')
|
||||||
|
self._sqlite_db = apsw.Connection(self._filename)
|
||||||
|
self._setup()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
if self._sqlite_db is None:
|
||||||
|
raise RuntimeError(f'Database connection to {self._filename} is not established.')
|
||||||
|
if self.in_transaction():
|
||||||
|
raise RuntimeError(f'A transaction is still ongoing.')
|
||||||
|
self._sqlite_db.close()
|
||||||
|
self._sqlite_db = None
|
||||||
|
|
||||||
|
def in_transaction(self) -> bool:
|
||||||
|
return self._in_transaction
|
||||||
|
|
||||||
|
def is_connected(self) -> bool:
|
||||||
|
return self._sqlite_db is not None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _user_version(self) -> int:
|
||||||
|
cursor = self._sqlite_db.cursor()
|
||||||
|
cursor.execute('PRAGMA user_version')
|
||||||
|
version = int(cursor.fetchone()[0])
|
||||||
|
return version
|
||||||
|
|
||||||
|
@_user_version.setter
|
||||||
|
def _user_version(self, version: int):
|
||||||
|
cursor = self._sqlite_db.cursor()
|
||||||
|
cursor.execute(f'PRAGMA user_version = {version}')
|
||||||
|
|
||||||
|
def list_users(self) -> List[User]:
|
||||||
|
users: List[User] = []
|
||||||
|
with self.transaction(exclusive=False) as c:
|
||||||
|
for row in c.execute('''
|
||||||
|
SELECT user_id, username, email, is_admin, is_member
|
||||||
|
FROM users
|
||||||
|
'''):
|
||||||
|
user_id, username, email, is_admin, is_member = row
|
||||||
|
users.append(User(user_id, username, email, is_admin, is_member))
|
||||||
|
return users
|
||||||
|
|
||||||
|
def create_user(self,
|
||||||
|
username: str,
|
||||||
|
password: str,
|
||||||
|
email: Optional[str] = None,
|
||||||
|
admin: bool = False,
|
||||||
|
member: bool = True) -> User:
|
||||||
|
pwhash: str = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt(12))
|
||||||
|
user_id: int = -1
|
||||||
|
with self.transaction() as c:
|
||||||
|
c.execute('SELECT user_id FROM users WHERE username = ?', [username])
|
||||||
|
if c.fetchone() is not None:
|
||||||
|
raise ValueError(f'A user with the name \'{username}\' already exists.')
|
||||||
|
c.execute('''
|
||||||
|
INSERT INTO users (username, email, password, balance, is_admin, is_member, lastchange)
|
||||||
|
VALUES (:username, :email, :pwhash, 0, :admin, :member, STRFTIME('%s', 'now'))
|
||||||
|
''', {
|
||||||
|
'username': username,
|
||||||
|
'email': email,
|
||||||
|
'pwhash': pwhash,
|
||||||
|
'admin': admin,
|
||||||
|
'member': member
|
||||||
|
})
|
||||||
|
c.execute('SELECT last_insert_rowid()')
|
||||||
|
user_id = int(c.fetchone()[0])
|
||||||
|
return User(user_id, username, email, admin, member)
|
||||||
|
|
||||||
|
def login(self, username: str, password: str) -> Optional[User]:
|
||||||
|
with self.transaction(exclusive=False) as c:
|
||||||
|
c.execute('''
|
||||||
|
SELECT user_id, username, email, password, is_admin, is_member
|
||||||
|
FROM users
|
||||||
|
WHERE username = ?
|
||||||
|
''', [username])
|
||||||
|
row = c.fetchone()
|
||||||
|
if row is None:
|
||||||
|
return None
|
||||||
|
user_id, username, email, pwhash, admin, member = row
|
||||||
|
if not bcrypt.checkpw(password.encode('utf-8'), pwhash):
|
||||||
|
return None
|
||||||
|
return User(user_id, username, email, admin, member)
|
||||||
|
|
||||||
|
def change_password(self, user: User, oldpass: str, newpass: str, newpass2: str, verify_password: bool = True):
|
||||||
|
if newpass != newpass2:
|
||||||
|
raise ValueError('New passwords don\'t match.')
|
||||||
|
with self.transaction() as c:
|
||||||
|
c.execute('''
|
||||||
|
SELECT password FROM users WHERE user_id = ?
|
||||||
|
''', [user.id])
|
||||||
|
row = c.fetchone()
|
||||||
|
if row is None:
|
||||||
|
raise AuthenticationException('User does not exist in database.')
|
||||||
|
if verify_password and not bcrypt.checkpw(oldpass.encode('utf-8'), row[0]):
|
||||||
|
raise AuthenticationException('Old password does not match.')
|
||||||
|
pwhash: str = bcrypt.hashpw(newpass.encode('utf-8'), bcrypt.gensalt(12))
|
||||||
|
c.execute('''
|
||||||
|
UPDATE users SET password = :pwhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
|
||||||
|
''', {
|
||||||
|
'user_id': user.id,
|
||||||
|
'pwhash': pwhash
|
||||||
|
})
|
||||||
|
|
||||||
|
def change_touchkey(self, user: User, password: str, touchkey: Optional[str], verify_password: bool = True):
|
||||||
|
with self.transaction() as c:
|
||||||
|
c.execute('''
|
||||||
|
SELECT password FROM users WHERE user_id = ?
|
||||||
|
''', [user.id])
|
||||||
|
row = c.fetchone()
|
||||||
|
if row is None:
|
||||||
|
raise AuthenticationException('User does not exist in database.')
|
||||||
|
if verify_password and not bcrypt.checkpw(password.encode('utf-8'), row[0]):
|
||||||
|
raise AuthenticationException('Password does not match.')
|
||||||
|
tkhash: str = bcrypt.hashpw(touchkey.encode('utf-8'), bcrypt.gensalt(12)) if touchkey is not None else None
|
||||||
|
c.execute('''
|
||||||
|
UPDATE users SET touchkey = :tkhash, lastchange = STRFTIME('%s', 'now') WHERE user_id = :user_id
|
||||||
|
''', {
|
||||||
|
'user_id': user.id,
|
||||||
|
'tkhash': tkhash
|
||||||
|
})
|
||||||
|
|
||||||
|
def change_user(self, user: User):
|
||||||
|
with self.transaction() as c:
|
||||||
|
c.execute('''
|
||||||
|
UPDATE users SET
|
||||||
|
email = :email,
|
||||||
|
is_admin = :is_admin,
|
||||||
|
is_member = :is_member,
|
||||||
|
lastchange = STRFTIME('%s', 'now')
|
||||||
|
WHERE user_id = :user_id
|
||||||
|
''', {
|
||||||
|
'user_id': user.id,
|
||||||
|
'email': user.email,
|
||||||
|
'is_admin': user.is_admin,
|
||||||
|
'is_member': user.is_member
|
||||||
|
})
|
||||||
|
|
||||||
|
def delete_user(self, user: User):
|
||||||
|
with self.transaction() as c:
|
||||||
|
c.execute('''
|
||||||
|
DELETE FROM users
|
||||||
|
WHERE user_id = ?
|
||||||
|
''', [user.id])
|
||||||
|
|
||||||
|
def list_products(self) -> List[Product]:
|
||||||
|
products: List[Product] = []
|
||||||
|
with self.transaction(exclusive=False) as c:
|
||||||
|
for row in c.execute('''
|
||||||
|
SELECT product_id, name, price_member, price_external
|
||||||
|
FROM products
|
||||||
|
'''):
|
||||||
|
product_id, name, price_member, price_external = row
|
||||||
|
products.append(Product(product_id, name, price_member, price_external))
|
||||||
|
return products
|
||||||
|
|
||||||
|
def create_product(self, name: str, price_member: int, price_non_member: int) -> Product:
|
||||||
|
product_id: int = -1
|
||||||
|
with self.transaction() as c:
|
||||||
|
c.execute('SELECT product_id FROM products WHERE name = ?', [name])
|
||||||
|
if c.fetchone() is not None:
|
||||||
|
raise ValueError(f'A product with the name \'{name}\' already exists.')
|
||||||
|
c.execute('''
|
||||||
|
INSERT INTO products (name, price_member, price_non_member, stock)
|
||||||
|
VALUES (:name, :price_member, :price_non_member, 0)
|
||||||
|
''', {
|
||||||
|
'name': name,
|
||||||
|
'price_member': price_member,
|
||||||
|
'price_non_member': price_non_member
|
||||||
|
})
|
||||||
|
c.execute('SELECT last_insert_rowid()')
|
||||||
|
product_id = int(c.fetchone()[0])
|
||||||
|
return Product(product_id, name, price_member, price_non_member)
|
||||||
|
|
||||||
|
def change_product(self, product: Product):
|
||||||
|
with self.transaction() as c:
|
||||||
|
c.execute('''
|
||||||
|
UPDATE products
|
||||||
|
SET
|
||||||
|
name = :name,
|
||||||
|
price_member = :price_member,
|
||||||
|
price_non_member = :price_non_member
|
||||||
|
WHERE product_id = :product_is
|
||||||
|
''', {
|
||||||
|
'product_id': product.id,
|
||||||
|
'price_member': product.price_member,
|
||||||
|
'price_non_member': product.price_non_member
|
||||||
|
})
|
||||||
|
|
||||||
|
def delete_product(self, product: Product):
|
||||||
|
with self.transaction() as c:
|
||||||
|
c.execute('''
|
||||||
|
DELETE FROM products
|
||||||
|
WHERE product_id = ?
|
||||||
|
''', [product.id])
|
||||||
|
|
||||||
|
def increment_consumption(self, user: User, product: Product, count: int = 1):
|
||||||
|
with self.transaction() as c:
|
||||||
|
c.execute('''
|
||||||
|
SELECT count
|
||||||
|
FROM consumption
|
||||||
|
WHERE user_id = :user_id
|
||||||
|
AND product_id = :product_id
|
||||||
|
''')
|
||||||
|
row = c.fetchone()
|
||||||
|
if row is None:
|
||||||
|
c.execute('''
|
||||||
|
INSERT INTO consumption (user_id, product_id, count)
|
||||||
|
VALUES (:user_id, :product_id, :count)
|
||||||
|
''', {
|
||||||
|
'user_id': user.id,
|
||||||
|
'product_id': product.id,
|
||||||
|
'count': count
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
c.execute('''
|
||||||
|
UPDATE consumption
|
||||||
|
SET count = count + :count
|
||||||
|
WHERE user_id = :user_id AND product_id = :product_id
|
||||||
|
''', {
|
||||||
|
'user_id': user.id,
|
||||||
|
'product_id': product.id,
|
||||||
|
'count': count
|
||||||
|
})
|
||||||
|
c.execute('''
|
||||||
|
UPDATE users
|
||||||
|
SET balance = balance - :cost
|
||||||
|
WHERE user_id = :user_id''', {
|
||||||
|
'user_id': user.id,
|
||||||
|
'cost': count * product.price_member if user.is_member else count * product.price_non_member
|
||||||
|
})
|
||||||
|
c.execute('''
|
||||||
|
UPDATE products
|
||||||
|
SET stock = stock - :count
|
||||||
|
WHERE product_id = :product_id
|
||||||
|
''', {
|
||||||
|
'product_id': product.id,
|
||||||
|
'count': count
|
||||||
|
})
|
||||||
|
|
||||||
|
def restock(self, product: Product, count: int):
|
||||||
|
with self.transaction() as c:
|
||||||
|
c.execute('''
|
||||||
|
UPDATE products
|
||||||
|
SET stock = stock + :count
|
||||||
|
WHERE product_id = :product_id
|
||||||
|
''', {
|
||||||
|
'product_id': product.id,
|
||||||
|
'count': count
|
||||||
|
})
|
||||||
|
|
||||||
|
def deposit(self, user: User, amount: int):
|
||||||
|
with self.transaction() as c:
|
||||||
|
c.execute('''
|
||||||
|
UPDATE users
|
||||||
|
SET balance = balance + :amount
|
||||||
|
WHERE user_id = :user_id
|
||||||
|
''', {
|
||||||
|
'user_id': user.id,
|
||||||
|
'amount': amount
|
||||||
|
})
|
0
matemat/db/test/__init__.py
Normal file
0
matemat/db/test/__init__.py
Normal file
93
matemat/db/test/test_database.py
Normal file
93
matemat/db/test/test_database.py
Normal file
|
@ -0,0 +1,93 @@
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from matemat.db import Database
|
||||||
|
|
||||||
|
|
||||||
|
class DatabaseTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.db = Database(':memory:')
|
||||||
|
|
||||||
|
def test_create_schema(self):
|
||||||
|
"""
|
||||||
|
Test creation of database schema in an empty database
|
||||||
|
"""
|
||||||
|
with self.db as db:
|
||||||
|
self.assertEqual(Database.SCHEMA_VERSION, db._user_version)
|
||||||
|
|
||||||
|
def test_in_transaction(self):
|
||||||
|
"""
|
||||||
|
Test transaction tracking
|
||||||
|
"""
|
||||||
|
with self.db as db:
|
||||||
|
self.assertFalse(db.in_transaction())
|
||||||
|
with self.db.transaction():
|
||||||
|
self.assertTrue(db.in_transaction())
|
||||||
|
self.assertFalse(db.in_transaction())
|
||||||
|
|
||||||
|
def test_transaction_nesting(self):
|
||||||
|
"""
|
||||||
|
Inner transactions should not do anything
|
||||||
|
"""
|
||||||
|
with self.db as db:
|
||||||
|
self.assertFalse(db.in_transaction())
|
||||||
|
t1 = self.db.transaction()
|
||||||
|
with t1:
|
||||||
|
self.assertTrue(db.in_transaction())
|
||||||
|
self.assertFalse(t1._is_dummy)
|
||||||
|
t2 = self.db.transaction()
|
||||||
|
with t2:
|
||||||
|
self.assertTrue(db.in_transaction())
|
||||||
|
self.assertTrue(t2._is_dummy)
|
||||||
|
self.assertTrue(db.in_transaction())
|
||||||
|
self.assertFalse(db.in_transaction())
|
||||||
|
|
||||||
|
def test_transaction_commit(self):
|
||||||
|
"""
|
||||||
|
If no error occurs, actions in a transaction should be committed.
|
||||||
|
"""
|
||||||
|
with self.db as db:
|
||||||
|
with db.transaction() as c:
|
||||||
|
c.execute("INSERT INTO users VALUES (1, 'testuser', NULL, 'supersecurepassword', NULL, 1, 1, 0, NULL)")
|
||||||
|
c = db._sqlite_db.cursor()
|
||||||
|
c.execute("SELECT * FROM users")
|
||||||
|
user = c.fetchone()
|
||||||
|
self.assertEqual((1, 'testuser', None, 'supersecurepassword', None, 1, 1, 0, None), user)
|
||||||
|
|
||||||
|
def test_transaction_rollback(self):
|
||||||
|
"""
|
||||||
|
If an error occurs in a transaction, actions should be rolled back.
|
||||||
|
"""
|
||||||
|
with self.db as db:
|
||||||
|
try:
|
||||||
|
with db.transaction() as c:
|
||||||
|
c.execute("""
|
||||||
|
INSERT INTO users VALUES (1, 'testuser', NULL, 'supersecurepassword', NULL, 1, 1, 0, NULL)
|
||||||
|
""")
|
||||||
|
raise ValueError('This should trigger a rollback')
|
||||||
|
except ValueError as e:
|
||||||
|
if str(e) != 'This should trigger a rollback':
|
||||||
|
raise e
|
||||||
|
c = db._sqlite_db.cursor()
|
||||||
|
c.execute("SELECT * FROM users")
|
||||||
|
self.assertIsNone(c.fetchone())
|
||||||
|
|
||||||
|
def test_create_user(self):
|
||||||
|
with self.db as db:
|
||||||
|
with db.transaction() as c:
|
||||||
|
db.create_user('testuser', 'supersecurepassword', 'testuser@example.com')
|
||||||
|
c.execute("SELECT * FROM users")
|
||||||
|
row = c.fetchone()
|
||||||
|
if row is None:
|
||||||
|
self.fail()
|
||||||
|
self.assertEqual('testuser', row[1])
|
||||||
|
self.assertEqual('testuser@example.com', row[2])
|
||||||
|
self.assertEqual(0, row[5])
|
||||||
|
self.assertEqual(1, row[6])
|
||||||
|
|
||||||
|
def test_create_existing_user(self):
|
||||||
|
with self.db as db:
|
||||||
|
db.create_user('testuser', 'supersecurepassword', 'testuser@example.com')
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
db.create_user('testuser', 'supersecurepassword2', 'testuser2@example.com')
|
12
matemat/exceptions/AuthenticatonException.py
Normal file
12
matemat/exceptions/AuthenticatonException.py
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
|
||||||
|
class AuthenticationException(BaseException):
|
||||||
|
|
||||||
|
def __init__(self, msg: str = None):
|
||||||
|
self._msg = msg
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return f'AuthenticationException: {self._msg}'
|
||||||
|
|
||||||
|
@property
|
||||||
|
def msg(self) -> str:
|
||||||
|
return self._msg
|
2
matemat/exceptions/__init__.py
Normal file
2
matemat/exceptions/__init__.py
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
|
||||||
|
from .AuthenticatonException import AuthenticationException
|
38
matemat/primitives/Product.py
Normal file
38
matemat/primitives/Product.py
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
|
||||||
|
class Product(object):
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
product_id: int,
|
||||||
|
name: str,
|
||||||
|
price_member: int,
|
||||||
|
price_non_member: int):
|
||||||
|
if product_id == -1:
|
||||||
|
raise ValueError('Invalid product ID')
|
||||||
|
self._product_id: int = product_id
|
||||||
|
self._name: str = name
|
||||||
|
self._price_member: int = price_member
|
||||||
|
self._price_non_member: int = price_non_member
|
||||||
|
|
||||||
|
@property
|
||||||
|
def id(self) -> int:
|
||||||
|
return self._product_id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self) -> str:
|
||||||
|
return self._name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def price_member(self) -> int:
|
||||||
|
return self._price_member
|
||||||
|
|
||||||
|
@price_member.setter
|
||||||
|
def price_member(self, price: int):
|
||||||
|
self._price_member = price
|
||||||
|
|
||||||
|
@property
|
||||||
|
def price_non_member(self) -> int:
|
||||||
|
return self._price_non_member
|
||||||
|
|
||||||
|
@price_non_member.setter
|
||||||
|
def price_non_member(self, price: int):
|
||||||
|
self._price_non_member = price
|
55
matemat/primitives/User.py
Normal file
55
matemat/primitives/User.py
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
class User(object):
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
user_id: int,
|
||||||
|
username: str,
|
||||||
|
email: Optional[str] = None,
|
||||||
|
admin: bool = False,
|
||||||
|
member: bool = True):
|
||||||
|
if user_id == -1:
|
||||||
|
raise ValueError('Invalid user ID')
|
||||||
|
self._user_id: int = user_id
|
||||||
|
self._username: str = username
|
||||||
|
self._email: Optional[str] = email
|
||||||
|
self._admin: bool = admin
|
||||||
|
self._member: bool = member
|
||||||
|
|
||||||
|
@property
|
||||||
|
def id(self) -> int:
|
||||||
|
return self._user_id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self) -> str:
|
||||||
|
return self._username
|
||||||
|
|
||||||
|
@property
|
||||||
|
def email(self) -> str:
|
||||||
|
return self._email
|
||||||
|
|
||||||
|
@email.setter
|
||||||
|
def email(self, email: str):
|
||||||
|
self._email = email
|
||||||
|
|
||||||
|
@email.setter
|
||||||
|
def email(self, email: str):
|
||||||
|
self._email = email
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_admin(self) -> bool:
|
||||||
|
return self._admin
|
||||||
|
|
||||||
|
@is_admin.setter
|
||||||
|
def is_admin(self, admin: bool):
|
||||||
|
self._admin = admin
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_member(self) -> bool:
|
||||||
|
return self._member
|
||||||
|
|
||||||
|
@is_member.setter
|
||||||
|
def is_member(self, member: bool):
|
||||||
|
self._member = member
|
3
matemat/primitives/__init__.py
Normal file
3
matemat/primitives/__init__.py
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
|
||||||
|
from .User import User
|
||||||
|
from .Product import Product
|
0
matemat/webserver/__init__.py
Normal file
0
matemat/webserver/__init__.py
Normal file
62
matemat/webserver/httpd.py
Normal file
62
matemat/webserver/httpd.py
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
|
||||||
|
from typing import Tuple, Dict, Optional
|
||||||
|
|
||||||
|
import socket
|
||||||
|
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||||
|
from http.cookies import SimpleCookie
|
||||||
|
from uuid import uuid4
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
from matemat import __version__ as matemat_version
|
||||||
|
|
||||||
|
|
||||||
|
class MatematWebserver(object):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._httpd = HTTPServer(('', 8080), HttpHandler)
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self._httpd.serve_forever()
|
||||||
|
|
||||||
|
|
||||||
|
class HttpHandler(BaseHTTPRequestHandler):
|
||||||
|
|
||||||
|
def __init__(self, request: socket.socket, client_address: Tuple[str, int], server: HTTPServer):
|
||||||
|
super().__init__(request, client_address, server)
|
||||||
|
self._session_vars: Dict[str, Tuple[datetime, Dict[str, object]]] = dict()
|
||||||
|
print(self._session_vars)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def server_version(self) -> str:
|
||||||
|
return f'matemat/{matemat_version}'
|
||||||
|
|
||||||
|
def start_session(self) -> Optional[Tuple[str, datetime]]:
|
||||||
|
now = datetime.utcnow()
|
||||||
|
cookiestring = '\n'.join(self.headers.get_all('Cookie', failobj=[]))
|
||||||
|
cookie = SimpleCookie()
|
||||||
|
cookie.load(cookiestring)
|
||||||
|
session_id = cookie['matemat_session_id'] if 'matemat_session_id' in cookie else str(uuid4())
|
||||||
|
|
||||||
|
if session_id in self._session_vars and self._session_vars[session_id][0] < now:
|
||||||
|
self.end_session(session_id)
|
||||||
|
raise TimeoutError('Session timed out')
|
||||||
|
elif session_id not in self._session_vars:
|
||||||
|
self._session_vars[session_id] = (now + timedelta(hours=1)), dict()
|
||||||
|
return session_id
|
||||||
|
|
||||||
|
def end_session(self, session_id: str):
|
||||||
|
if session_id in self._session_vars:
|
||||||
|
del self._session_vars[session_id]
|
||||||
|
|
||||||
|
def do_GET(self):
|
||||||
|
try:
|
||||||
|
session_id, timeout = self.start_session()
|
||||||
|
except TimeoutError:
|
||||||
|
self.send_header('Set-Cookie', 'matemat_session_id=; expires=Thu, 01 Jan 1970 00:00:00 GMT')
|
||||||
|
self.send_error(599, 'Session Timed Out.', 'Please log in again.')
|
||||||
|
return
|
||||||
|
self.send_response(200, 'Welcome!')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
MatematWebserver().start()
|
2
requirements.txt
Normal file
2
requirements.txt
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
bcrypt
|
||||||
|
apsw
|
12
setup.py
Normal file
12
setup.py
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name='matemat',
|
||||||
|
version='2.00',
|
||||||
|
packages=['matemat'],
|
||||||
|
url='',
|
||||||
|
license='',
|
||||||
|
author='s3lph',
|
||||||
|
author_email='',
|
||||||
|
description='Replacement for the original Ruby matemat software.'
|
||||||
|
)
|
Loading…
Reference in a new issue