matemat/matemat/db/wrapper.py

137 lines
4.9 KiB
Python

from typing import Any, Optional
import sqlite3
from matemat.exceptions import DatabaseConsistencyError
import matemat.db.migrations
class DatabaseTransaction(object):
def __init__(self, db: sqlite3.Connection, exclusive: bool = True) -> None:
self._db: sqlite3.Connection = db
self._cursor: Optional[sqlite3.Cursor] = None
self._excl = exclusive
self._is_dummy: bool = False
def __enter__(self) -> sqlite3.Cursor:
if self._db.in_transaction:
self._is_dummy = True
return self._db.cursor()
else:
self._is_dummy = False
if self._excl:
self._db.execute('BEGIN EXCLUSIVE')
else:
self._db.execute('BEGIN')
self._cursor = self._db.cursor()
return self._cursor
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
if self._is_dummy:
return
if exc_type is None:
self._db.commit()
else:
self._db.rollback()
if exc_type == sqlite3.IntegrityError:
raise DatabaseConsistencyError(str(exc_val))
class DatabaseWrapper(object):
def __init__(self, filename: str) -> None:
self._filename: str = filename
self._sqlite_db: Optional[sqlite3.Connection] = None
def __enter__(self) -> 'DatabaseWrapper':
self.connect()
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.close()
def transaction(self, exclusive: bool = True) -> DatabaseTransaction:
if self._sqlite_db is None:
raise RuntimeError(f'Database connection to {self._filename} is not established.')
return DatabaseTransaction(self._sqlite_db, exclusive)
def _setup(self) -> None:
# Create or update schemas if necessary
version: int = self._user_version
if version < self.schema_version:
self._upgrade(from_version=version, to_version=self.schema_version)
self._user_version = self.schema_version
elif version > self.schema_version:
raise RuntimeError('Database schema is newer than supported by this version of Matemat.')
# Enable foreign key enforcement
cursor = self._sqlite_db.cursor()
cursor.execute('PRAGMA foreign_keys=ON')
def _upgrade(self, from_version: int, to_version: int) -> None:
if from_version >= to_version:
return
# Create backup before migration
if self._filename != ':memory:':
bakfile = f'{self._filename}_{from_version}_{to_version}.bak'
bak = sqlite3.connect(bakfile)
with bak:
self._sqlite_db.backup(bak, pages=1)
bak.close()
# Iterate through migrations, executing them one by one
with self.transaction() as c:
c.execute('PRAGMA foreign_keys=OFF')
c.execute('PRAGMA legacy_alter_table=ON')
for i in range(from_version+1, to_version+1):
migration = getattr(matemat.db.migrations, f'migrate_schema_{i}')
migration(c)
c.execute('PRAGMA foreign_key_check')
c.execute('PRAGMA foreign_keys=ON')
def connect(self) -> None:
if self.is_connected():
raise RuntimeError(f'Database connection to {self._filename} is already established.')
self._sqlite_db = sqlite3.connect(self._filename)
self._setup()
def close(self) -> None:
if self._sqlite_db is None:
raise RuntimeError(f'Database connection to {self._filename} is not established.')
if self.in_transaction():
raise RuntimeError(f'A transaction is still ongoing.')
self._sqlite_db.close()
self._sqlite_db = None
def in_transaction(self) -> bool:
return self._sqlite_db is not None and self._sqlite_db.in_transaction
def is_connected(self) -> bool:
return self._sqlite_db is not None
@property
def _user_version(self) -> int:
if self._sqlite_db is None:
raise RuntimeError(f'Database connection to {self._filename} is not established.')
cursor = self._sqlite_db.cursor()
cursor.execute('PRAGMA user_version')
version = int(cursor.fetchone()[0])
return version
@_user_version.setter
def _user_version(self, version: int) -> None:
if self._sqlite_db is None:
raise RuntimeError(f'Database connection to {self._filename} is not established.')
cursor = self._sqlite_db.cursor()
cursor.execute(f'PRAGMA user_version = {version}')
@property
def schema_version(self) -> int:
max_migration = 0
for name in dir(matemat.db.migrations):
if not name.startswith('migrate_schema_'):
continue
migration = int(name.split('_')[2])
max_migration = max(max_migration, migration)
return max_migration