matemat/matemat/db/wrapper.py

136 lines
5 KiB
Python

from typing import Any, Optional
from matemat.exceptions import DatabaseConsistencyError
from matemat.db.schemas import SCHEMAS
from matemat.db.migrations import *
class DatabaseTransaction(object):
def __init__(self, db: sqlite3.Connection, exclusive: bool = True) -> None:
self._db: sqlite3.Connection = db
self._cursor: Optional[sqlite3.Cursor] = None
self._excl = exclusive
self._is_dummy: bool = False
def __enter__(self) -> sqlite3.Cursor:
if self._db.in_transaction:
self._is_dummy = True
return self._db.cursor()
else:
self._is_dummy = False
if self._excl:
self._db.execute('BEGIN EXCLUSIVE')
else:
self._db.execute('BEGIN')
self._cursor = self._db.cursor()
return self._cursor
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
if self._is_dummy:
return
if exc_type is None:
self._db.commit()
else:
self._db.rollback()
if exc_type == sqlite3.IntegrityError:
raise DatabaseConsistencyError(str(exc_val))
class DatabaseWrapper(object):
SCHEMA_VERSION = 10
def __init__(self, filename: str) -> None:
self._filename: str = filename
self._sqlite_db: Optional[sqlite3.Connection] = None
def __enter__(self) -> 'DatabaseWrapper':
self.connect()
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.close()
def transaction(self, exclusive: bool = True) -> DatabaseTransaction:
if self._sqlite_db is None:
raise RuntimeError(f'Database connection to {self._filename} is not established.')
return DatabaseTransaction(self._sqlite_db, exclusive)
def _setup(self) -> None:
# Create or update schemas if necessary
with self.transaction() as c:
version: int = self._user_version
if version < 1:
# Don't use executescript, as it issues a COMMIT first
for command in SCHEMAS[self.SCHEMA_VERSION]:
c.execute(command)
elif version < self.SCHEMA_VERSION:
self._upgrade(from_version=version, to_version=self.SCHEMA_VERSION)
elif version > self.SCHEMA_VERSION:
raise RuntimeError('Database schema is newer than supported by this version of Matemat.')
self._user_version = self.SCHEMA_VERSION
# Enable foreign key enforcement
cursor = self._sqlite_db.cursor()
cursor.execute('PRAGMA foreign_keys = 1')
def _upgrade(self, from_version: int, to_version: int) -> None:
with self.transaction() as c:
if from_version <= 1 and to_version >= 2:
migrate_schema_1_to_2(c)
if from_version <= 2 and to_version >= 3:
migrate_schema_2_to_3(c)
if from_version <= 3 and to_version >= 4:
migrate_schema_3_to_4(c)
if from_version <= 4 and to_version >= 5:
migrate_schema_4_to_5(c)
if from_version <= 5 and to_version >= 6:
migrate_schema_5_to_6(c)
if from_version <= 6 and to_version >= 7:
migrate_schema_6_to_7(c)
if from_version <= 7 and to_version >= 8:
migrate_schema_7_to_8(c)
if from_version <= 7 and to_version >= 8:
migrate_schema_7_to_8(c)
if from_version <= 8 and to_version >= 9:
migrate_schema_8_to_9(c)
if from_version <= 9 and to_version >= 10:
migrate_schema_9_to_10(c)
def connect(self) -> None:
if self.is_connected():
raise RuntimeError(f'Database connection to {self._filename} is already established.')
self._sqlite_db = sqlite3.connect(self._filename)
self._setup()
def close(self) -> None:
if self._sqlite_db is None:
raise RuntimeError(f'Database connection to {self._filename} is not established.')
if self.in_transaction():
raise RuntimeError(f'A transaction is still ongoing.')
self._sqlite_db.close()
self._sqlite_db = None
def in_transaction(self) -> bool:
return self._sqlite_db is not None and self._sqlite_db.in_transaction
def is_connected(self) -> bool:
return self._sqlite_db is not None
@property
def _user_version(self) -> int:
if self._sqlite_db is None:
raise RuntimeError(f'Database connection to {self._filename} is not established.')
cursor = self._sqlite_db.cursor()
cursor.execute('PRAGMA user_version')
version = int(cursor.fetchone()[0])
return version
@_user_version.setter
def _user_version(self, version: int) -> None:
if self._sqlite_db is None:
raise RuntimeError(f'Database connection to {self._filename} is not established.')
cursor = self._sqlite_db.cursor()
cursor.execute(f'PRAGMA user_version = {version}')