import inspect import re from functools import reduce from operator import ior from pathlib import Path from typing import List from tinydb import Query, TinyDB, table from tinydb.storages import MemoryStorage from tinydb.table import Document from grung.exceptions import UniqueConstraintError from grung.types import Record class RecordTable(table.Table): """ Wrapper around tinydb Tables that handles Records instead of dicts. """ def __init__(self, name: str, db: TinyDB, document_class: Document = Record, **kwargs): self.document_class = document_class self.db = db super().__init__(db.storage, name, **kwargs) def insert(self, document): document.before_insert(self.db) doc = document.serialize() self._check_constraints(doc) if doc.doc_id: last_insert_id = super().upsert(doc)[0] else: last_insert_id = super().insert(dict(doc)) doc.doc_id = last_insert_id doc.after_insert(self.db) return doc.deserialize(self.db) def get(self, *args, doc_id: int = None, recurse: bool = False, **kwargs): """ Return exactly zero or one records from the database matching the supplied criteria. If more than one records match the criteria, return the first one. Criteria are ignored if doc_id is specified. Usage: Table.get(doc_id=1) Table.get(where("uid") == "abcdef") """ if doc_id: document = super().get(doc_id=doc_id) if document: return document.deserialize(self.db, recurse=recurse) matches = self.search(*args, recurse=recurse, **kwargs) if matches: return matches[0] def search(self, *args, recurse: bool = False, **kwargs) -> List[Record]: results = super().search(*args, **kwargs) return [r.deserialize(self.db, recurse=recurse) for r in results] def remove(self, document): if document.doc_id: super().remove(doc_ids=[document.doc_id]) def _check_constraints(self, document) -> bool: self._check_unique(document) def _check_unique(self, document) -> bool: matches = [] queries = reduce( ior, [ Query()[field.name].matches(f"^{document[field.name]}$", flags=re.IGNORECASE) for field in document._metadata.fields.values() if field.unique ], ) matches = [dict(match) for match in super().search(queries) if match.doc_id != document.doc_id] if matches != []: raise UniqueConstraintError(document, queries, matches) class GrungDB(TinyDB): """ A TinyDB database instance that uses RecordTable instances for each table and Record instances for each document in the table. """ default_table_name = "Record" _tables = {} def __init__(self, path: Path, *args, **kwargs): self.path = path if kwargs["storage"] != MemoryStorage: args = (path,) + args super().__init__(*args, **kwargs) self.create_table(Record) def table(self, name: str) -> RecordTable: if name not in self._tables: raise RuntimeError(f"No such table: {name}") return self._tables[name] def create_table(self, table_class): name = table_class.__name__ if name not in self._tables: self._tables[name] = RecordTable(name, db=self, document_class=table_class) return self.table(name) def save(self, record): """ Create or update a record in its table. """ return self.table(record._metadata.table).insert(record) def delete(self, record): return self.table(record._metadata.table).remove(record) def __getattr__(self, attr_name): """ Make tables attributes of the instance. """ if attr_name in self._tables: return self.table(attr_name) return super().__getattr__(attr_name) @classmethod def with_schema(cls, schema_module, path: Path | None, *args, **kwargs): db = GrungDB(path=path, *args, **kwargs) for name, obj in inspect.getmembers(schema_module): if type(obj) == type and issubclass(obj, Record): db.create_table(obj) return db