from __future__ import annotations import re from datetime import datetime from enum import StrEnum from functools import cached_property from textwrap import dedent from typing import List from flask import g from grung.objects import ( BackReference, Collection, DateTime, Dict, Field, Password, Pointer, Record, TextFilePointer, Timestamp, ) from grung.validators import PatternValidator from tinydb import where from ttfrog.exceptions import MalformedRequestError def app_context(): import ttfrog.app ttfrog.app.check_state() return ttfrog.app READ_ONLY_FIELD_TYPES = [Collection, Pointer, BackReference, Timestamp] class Permissions(StrEnum): READ = "r" WRITE = "w" DELETE = "d" class Page(Record): """ A page in the wiki. Just about everything in the databse is either a Page or a subclass of a Page. """ default = dedent( """ # {name} *Overview of this page* ## Section 1 *Organize your text into logically separted sections.* """ ) @classmethod def fields(cls): # fmt: off return [ *super().fields(), # The URI for the page, relative to the app's VIEW_URI Field("uri", unique=True, validators=[ PatternValidator(re.compile( r""" ^ # match from beginning of line (?: / # first char can be a / | # or (?: [0-9a-z] # first char is alphanum (?:/?[0-9a-z]+)* # following by more alphnum, maybe with a leading / )+ # at least once, possibly many times, )? # or URI could be entirely blank $ # until the end of the string """, re.IGNORECASE | re.VERBOSE )) ]), # The portion of the URI after the last / Field("name", validators=[PatternValidator(re.compile(r'^(?:/|(?:[0-9a-z]+))$', re.IGNORECASE))]), # The pages that exist below this page's URI Collection("members", member_type=Page), # The last user to touch the page. Pointer("author", value_type=User), # When the page was created DateTime("created"), # The last time the page was modified. Timestamp("last_modified"), # The main content blob of the page TextFilePointer("body", extension='.md', default=Page.default), # The access control list for this Page. Dict("acl"), ] # fmt: on @cached_property def read_only(self) -> set: return [name for (name, attr) in self._metadata.fields.items() if type(attr) in READ_ONLY_FIELD_TYPES] + [ "uid", "acl", ] def update(self, **kwargs): super().update(**dict((key, value) for key, value in kwargs.items() if key not in self.read_only)) def parent(self): if self.uri == "": return None app = app_context() parent_uri = "" if self.uri and "/" in self.uri: parent_uri = self.uri.rsplit("/", 1)[0] for table_name in app.db.tables(): page = app.db.table(table_name).get(where("uri") == parent_uri, recurse=False) if page: return page def before_insert(self, *args, **kwargs): """ Make the following adjustments before saving this record: * Derive the URI from the hierarchy of the parent. """ if not self.name: raise Exception("Must provide a name") app = app_context() super().before_insert(app.db) if not self.author: self.author = app.db.User.get(where("name") == "__system__") now = datetime.utcnow() if not self.doc_id and self.created < now: self.created = now def add_member(self, child: Record): app = app_context() prefix = (self.uri + "/") if self.uri else "" new_uri = f"{prefix}{child.name}" if child.uri != new_uri: app.log.debug(f"Moving {child._metadata.table}[{child.doc_id}] from {child.uri} to {new_uri}") child.uri = new_uri self.members = list(set(self.members + [app.db.save(child)])) app.db.save(self) return self.get_child(child) def get_child(self, obj: Record): for page in self.members: if page[page._metadata.primary_key] == obj[obj._metadata.primary_key]: return page return None def set_permissions(self, entity: Entity, permissions: List) -> str: app = app_context() perms = "".join(permissions) self.acl[entity.reference] = perms app.db.save(self) return perms def get_acl_for_entity(self, entity) -> (str, str | None): """ Search upward through the page hierarchy looking for one with an ACL that either has a grant for the entity we care about, or at least one group in which the entity is a member. """ app = app_context() def find_acl(obj): if hasattr(obj, "acl"): if entity.reference in obj.acl: return {entity.reference: obj.acl[entity.reference]} group_grants = {} for ref, grant in obj.acl.items(): (table_name, pkey, pval) = ref.split("::") if table_name == "Group": group = app.db.Group.get(where(pkey) == pval, recurse=False) if entity.reference in group.members: group_grants[ref] = grant if group_grants: return group_grants if hasattr(obj, "parent"): return find_acl(obj.parent()) return {"": ""} return find_acl(self) class Entity(Page): @classmethod def fields(cls): inherited = [field for field in super().fields() if field.name not in ("members", "uid")] return inherited + [ Field("name", primary_key=True), ] def has_permission(self, record: Record, requested: str) -> bool | None: for entity, grants in record.get_acl_for_entity(self).items(): if requested in grants: return True return False def can_read(self, record: Record): return self.has_permission(record, Permissions.READ) def can_write(self, record: Record): return self.has_permission(record, Permissions.WRITE) def can_delete(self, record: Record): return self.has_permission(record, Permissions.DELETE) class User(Entity): """ A website user, editable as a wiki page. """ @classmethod def fields(cls): return super().fields() + [ Field("email", unique=True), Password("password"), ] def update(self, **data): self.author = None if self == g.user else g.user return super().update(**data) def validate(self): if self.name == "__system__": raise MalformedRequestError("Invalid name.") return super().validate() def check_credentials(self, username: str, password: str) -> bool: return username == self.name and self._metadata.fields["password"].compare(password, self.password) def after_insert(self, *args, **kwargs): """ After saving this record, ensure that any page in the members collection is updated with the correct URI. This ensures that if a page is moved from one collection to another, the URI is updated. """ app = app_context() super().after_insert(app.db) if not hasattr(self, "members"): return for child in self.members: obj = BackReference.dereference(child, app.db, recurse=False) obj.uri = f"{self.uri}/{obj.name}" child = app.db.save(obj) class Group(Entity): """ A set of users, editable as a wiki page. """ @classmethod def fields(cls): return super().fields() + [Collection("members", member_type=Entity)] class NPC(Page): """ An NPC, editable as a wiki page. """ default = dedent( """ # {name} *[Ancestry] [Class]* | AC | HP | STR | DEX | CON | INT | WIS | CHA |----|----|-----|-----|-----|-----|-----|------ | 10 | 10 | +0 | +0 | +0 | +0 | +0 | +0 **{name} (they/they)** [description] * Personality: **[keywords]** * Flaw: **[flaw]** * Goal: **[goal]** """ )