import io import sys from pathlib import Path from types import SimpleNamespace from dotenv import dotenv_values from flask import Flask from flask_session import Session from grung.db import GrungDB from tinydb import where from tinydb.storages import MemoryStorage from ttfrog import schema from ttfrog.exceptions import ( ApplicationNotInitializedError, MalformedRequestError, RecordNotFoundError, UnauthorizedError, ) class ApplicationContext: """ The global context for the application, this class provides access to the Flask app instance, the GrungDB instance, and the loaded configuration. To prevent multiple contexts from being created, the class is instantiated at import time and replaces the module in the symbol table. The first time it is imported, callers should call both .load_config() and .initialize(); this is typically done at program start. After being intialized, callers can import ttfrog.app and interact with the ApplicationContext instance directly: >>> from ttfrog import app >>> print(app.config.NAME) ttfrog """ CONFIG_DEFAULTS = """ # ttfrog Defaults NAME=ttfrog LOG_LEVEL=INFO SECRET_KEY=fnord IN_MEMORY_DB= DATA_ROOT=~/.dnd/ttfrog/ ADMIN_USERNAME=admin ADMIN_EMAIL=admin@telisar THEME=default VIEW_URI=/ API_URI=/_/v1/ """ def __init__(self): self.config: SimpleNamespace = None self.web: Flask = None self.db: GrungDB = None self._initialized = False def load_config(self, defaults: Path | None = Path("~/.dnd/ttfrog/defaults"), **overrides) -> None: """ Load the user configuration from the following in sources, in order: 1. ApplicationContext.CONFIG_DEFAULTS 2. The user's configuration defaults file, if any 3. Overrides specified by the caller, if any Once the configuration is loaded, the path attribute is also configured. """ config_file = defaults.expanduser() if defaults else None self.config = SimpleNamespace( **{ **dotenv_values(stream=io.StringIO(ApplicationContext.CONFIG_DEFAULTS)), **(dotenv_values(config_file) if config_file else {}), **overrides, } ) data_root = Path(self.config.DATA_ROOT).expanduser() self.path = SimpleNamespace( config=config_file, data_root=data_root, database=data_root / f"{self.config.NAME}.json", sessions=data_root / "session_cache", ) def initialize(self, db: GrungDB = None, force: bool = False) -> None: """ Instantiate both the database and the flask application. """ if force or not self._initialized: if db: self.db = db elif self.config.IN_MEMORY_DB: self.db = GrungDB.with_schema(schema, path=None, storage=MemoryStorage) else: self.db = GrungDB.with_schema( schema, path=self.path.database, sort_keys=True, indent=4, separators=(",", ": ") ) self.theme = Path(__file__).parent / "themes" / self.config.THEME self.web = Flask(self.config.NAME, template_folder=self.theme, static_folder=self.theme / "static") self.web.config["SECRET_KEY"] = self.config.SECRET_KEY self.web.config["SEND_FILE_MAX_AGE_DEFAULT"] = 0 self.web.config["DEBUG"] = True self.web.config["SESSION_TYPE"] = "filesystem" self.web.config["SESSION_REFRESH_EACH_REQUEST"] = True self.web.config["SESSION_FILE_DIR"] = self.path.sessions Session(self.web) self.log = self.web.logger self._initialized = True def check_state(self) -> None: if not self._initialized: raise ApplicationNotInitializedError("This action requires the application to be initialized.") def authenticate(self, username: str, password: str) -> schema.User: """ Returns the User record matching the given username and password """ if not (username and password): self.log.debug("Need both username and password to login") return None user = self.db.User.get(where("name") == username) if not user: self.log.debug(f"No user matching {username}") return None if not user.check_credentials(username, password): self.log.debug(f"Invalid credentials for {username}") return None return user def authorize(self, user, record, requested): return user.has_permission(record, requested) def _get_or_create_page_by_uri(self, user, table, uri): """ Get a page by URI. If it doesn't exist, create a new one if and only if the user has permission to write on its parent. """ uri = uri.replace(" ", "").strip("/") if uri.startswith(self.config.VIEW_URI): uri = uri.replace(self.config.VIEW_URI, "", 1) parent_uri = '' search_uri = '/' page_name = '/' if "/" in uri: (parent_uri, page_name) = uri.rsplit("/", 1) if parent_uri == 'Page': parent_uri = '/' search_uri = page_name else: search_uri = uri elif uri: parent_uri = "/" search_uri = uri page_name = uri self.log.debug(f"Searching for page in {table = } with {search_uri = }; its parent is {parent_uri=}") # self.log.debug("\n".join([f"{p.doc_id}: {p.uri}" for p in table.all()])) page = table.get(where("uri") == search_uri, recurse=False) if not page: # load the parent to check for write permissions self.log.debug(f"Page at {search_uri} does not exist, looking for parent at {parent_uri=}") parent_table = table if "/" in parent_uri else self.db.Page parent = None try: self.log.debug(f"Loading parent with {parent_uri}") parent = self.get_page(user, parent_table.name, uri=parent_uri) except Exception as e: self.log.debug(f"Error loading parent: {e}") if not parent: raise MalformedRequestError("Page does not exist and neither does its parent.") if not self.authorize(user, parent, schema.Permissions.WRITE): raise UnauthorizedError(f"User {user.doc_id} does not have permission to create under {parent_uri}.") page = getattr(schema, table.name)( name=page_name, body=f"# {page_name}\nThis page does not exist", parent=parent ) self.log.debug(f"Returning {page.doc_id}: {page.uri}") return page def get_page(self, user, table_name, doc_id=None, uri=None): """ Get a page by doc_id or by URI, if and only if the user is allowed to read it. A new Record instance will be returned if the requested page does not exist but the user has permission to create it. """ if not user.doc_id: self.log.error(f"Invalid user: {user}") raise MalformedRequestError("User does not exist.") try: table = self.db.table(table_name) except RuntimeError: table = self.db.Page self.log.error(f"Invalid table_name: {table_name}, will use Page") # raise MalformedRequestError(f"{table_name} table does not exist.") if doc_id: page = table.get(doc_id=doc_id) if not page: raise RecordNotFoundError(f"No record with {doc_id=} was found.") elif uri: page = self._get_or_create_page_by_uri(user, table, uri) else: self.log.error("No doc_id or uri.") raise MalformedRequestError("Either a doc_id or a uri must be specified.") if not self.authorize(user, page, schema.Permissions.READ): self.log.error(f"No permission for {user.name} on {page}") raise UnauthorizedError(f"User {user.doc_id} does not have permission to read {table_name} {page.doc_id}.") # resolve the pointers to subpages so we can render things like nav elements. if hasattr(page, "members"): subpages = [] for pointer in page.members: table, pkey, pval = pointer.split("::") subpages += self.db.table(table).search(where(pkey) == pval, recurse=False) page.members = subpages return page sys.modules[__name__] = ApplicationContext()