forked from s3lph/matemat
5d710f0c18
- Enabled foreign key constraints (!!!) - Fixed a facade bug exposed by foreign key constraints
126 lines
4.6 KiB
Python
126 lines
4.6 KiB
Python
|
|
from __future__ import annotations
|
|
from typing import Any, Optional
|
|
|
|
import sqlite3
|
|
|
|
from matemat.exceptions import DatabaseConsistencyError
|
|
from matemat.db.schemas import SCHEMAS
|
|
from matemat.db.migrations import migrate_schema_1_to_2, migrate_schema_2_to_3
|
|
|
|
|
|
class Transaction(object):
|
|
|
|
def __init__(self, db: sqlite3.Connection, exclusive: bool = True) -> None:
|
|
self._db: sqlite3.Connection = db
|
|
self._cursor: Optional[sqlite3.Cursor] = None
|
|
self._excl = exclusive
|
|
self._is_dummy: bool = False
|
|
|
|
def __enter__(self) -> sqlite3.Cursor:
|
|
if self._db.in_transaction:
|
|
self._is_dummy = True
|
|
return self._db.cursor()
|
|
else:
|
|
self._is_dummy = False
|
|
if self._excl:
|
|
self._db.execute('BEGIN EXCLUSIVE')
|
|
else:
|
|
self._db.execute('BEGIN')
|
|
self._cursor = self._db.cursor()
|
|
return self._cursor
|
|
|
|
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
|
if self._is_dummy:
|
|
return
|
|
if exc_type is None:
|
|
self._db.commit()
|
|
else:
|
|
self._db.rollback()
|
|
if exc_type == sqlite3.IntegrityError:
|
|
raise DatabaseConsistencyError(str(exc_val))
|
|
|
|
|
|
class DatabaseWrapper(object):
|
|
|
|
SCHEMA_VERSION = 3
|
|
|
|
def __init__(self, filename: str) -> None:
|
|
self._filename: str = filename
|
|
self._sqlite_db: Optional[sqlite3.Connection] = None
|
|
|
|
def __enter__(self) -> DatabaseWrapper:
|
|
self.connect()
|
|
return self
|
|
|
|
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
|
self.close()
|
|
|
|
def transaction(self, exclusive: bool = True) -> Transaction:
|
|
if self._sqlite_db is None:
|
|
raise RuntimeError(f'Database connection to {self._filename} is not established.')
|
|
return Transaction(self._sqlite_db, exclusive)
|
|
|
|
def _setup(self) -> None:
|
|
# Enable foreign key enforcement
|
|
cursor = self._sqlite_db.cursor()
|
|
cursor.execute('PRAGMA foreign_keys = 1')
|
|
|
|
# Create or update schemas if necessary
|
|
with self.transaction() as c:
|
|
# Defer foreign key enforcement in the setup transaction
|
|
c.execute('PRAGMA defer_foreign_keys = 1')
|
|
version: int = self._user_version
|
|
if version < 1:
|
|
# Don't use executescript, as it issues a COMMIT first
|
|
for command in SCHEMAS[self.SCHEMA_VERSION]:
|
|
c.execute(command)
|
|
elif version < self.SCHEMA_VERSION:
|
|
self._upgrade(from_version=version, to_version=self.SCHEMA_VERSION)
|
|
elif version > self.SCHEMA_VERSION:
|
|
raise RuntimeError('Database schema is newer than supported by this version of Matemat.')
|
|
self._user_version = self.SCHEMA_VERSION
|
|
|
|
def _upgrade(self, from_version: int, to_version: int) -> None:
|
|
with self.transaction() as c:
|
|
# Note to future s3lph: If there are further migrations, also consider upgrades like 1 -> 3
|
|
if from_version == 1 and to_version >= 2:
|
|
migrate_schema_1_to_2(c)
|
|
if from_version <= 2 and to_version >= 3:
|
|
migrate_schema_2_to_3(c)
|
|
|
|
def connect(self) -> None:
|
|
if self.is_connected():
|
|
raise RuntimeError(f'Database connection to {self._filename} is already established.')
|
|
self._sqlite_db = sqlite3.connect(self._filename)
|
|
self._setup()
|
|
|
|
def close(self) -> None:
|
|
if self._sqlite_db is None:
|
|
raise RuntimeError(f'Database connection to {self._filename} is not established.')
|
|
if self.in_transaction():
|
|
raise RuntimeError(f'A transaction is still ongoing.')
|
|
self._sqlite_db.close()
|
|
self._sqlite_db = None
|
|
|
|
def in_transaction(self) -> bool:
|
|
return self._sqlite_db is not None and self._sqlite_db.in_transaction
|
|
|
|
def is_connected(self) -> bool:
|
|
return self._sqlite_db is not None
|
|
|
|
@property
|
|
def _user_version(self) -> int:
|
|
if self._sqlite_db is None:
|
|
raise RuntimeError(f'Database connection to {self._filename} is not established.')
|
|
cursor = self._sqlite_db.cursor()
|
|
cursor.execute('PRAGMA user_version')
|
|
version = int(cursor.fetchone()[0])
|
|
return version
|
|
|
|
@_user_version.setter
|
|
def _user_version(self, version: int) -> None:
|
|
if self._sqlite_db is None:
|
|
raise RuntimeError(f'Database connection to {self._filename} is not established.')
|
|
cursor = self._sqlite_db.cursor()
|
|
cursor.execute(f'PRAGMA user_version = {version}')
|