From fff34948d2d6b68ec5d005053ad44ada9e773fa1 Mon Sep 17 00:00:00 2001 From: evilchili Date: Sat, 1 Nov 2025 23:48:56 -0700 Subject: [PATCH] remove forms, add validators --- src/ttfrog/app.py | 6 +-- src/ttfrog/bootstrap.py | 16 ++++---- src/ttfrog/forms.py | 83 ----------------------------------------- src/ttfrog/schema.py | 82 ++++++++++++++++++++++++++++++++++------ src/ttfrog/web.py | 52 +++++++++++++++++--------- 5 files changed, 116 insertions(+), 123 deletions(-) delete mode 100644 src/ttfrog/forms.py diff --git a/src/ttfrog/app.py b/src/ttfrog/app.py index 63a5caa..c5543ef 100644 --- a/src/ttfrog/app.py +++ b/src/ttfrog/app.py @@ -165,12 +165,9 @@ API_URI=/_/v1/ else: search_uri = uri - # self.log.debug(f"Searching for page in {table = } with {search_uri = }; its parent is {parent_uri=}") - # self.log.debug("\n".join([f"{p.doc_id}: {p.uri}" for p in table.all()])) page = table.get(where("uri") == search_uri, recurse=False) if not page: # load the parent to check for write permissions - # self.log.debug(f"Page at {search_uri} does not exist, looking for parent at {parent_uri=}") parent_table = table if parent_uri and "/" in parent_uri else self.db.Page parent = None try: @@ -187,6 +184,9 @@ API_URI=/_/v1/ obj = getattr(schema, table.name) page = obj(name=page_name, body=obj.default.format(name=page_name), parent=parent) + # validate the page name before we try to create anything + page._metadata.fields["name"].validate(page, db=self.db) + # self.log.debug(f"Returning {page.doc_id}: {page.uri}") return page diff --git a/src/ttfrog/bootstrap.py b/src/ttfrog/bootstrap.py index 792d459..03a4fd3 100644 --- a/src/ttfrog/bootstrap.py +++ b/src/ttfrog/bootstrap.py @@ -43,26 +43,26 @@ def bootstrap(): app.db.save(schema.User(name="__system__")) # create the top-level pages - root = app.db.save(schema.Page(name=app.config.VIEW_URI, body=b"This is the home page", uri="")) + root = app.db.save(schema.Page(name=app.config.VIEW_URI, body="This is the home page", uri="")) - users = root.add_member(schema.Page(name="User", body=b"# Users\nusers go here.")) - groups = root.add_member(schema.Page(name="Group", body=b"# Groups\ngroups go here.")) - npcs = root.add_member(schema.Page(name="NPC", body=b"# NPCS!")) - wiki = root.add_member(schema.Page(name="Wiki", body=TEMPLATE.encode())) + users = root.add_member(schema.Page(name="User", body="# Users\nusers go here.")) + groups = root.add_member(schema.Page(name="Group", body="# Groups\ngroups go here.")) + npcs = root.add_member(schema.Page(name="NPC", body="# NPCS!")) + wiki = root.add_member(schema.Page(name="Wiki", body=TEMPLATE)) # create the NPCs npcs.add_member(schema.NPC(name="Sabetha", body="")) npcs.add_member(schema.NPC(name="John", body="")) # create the users - guest = users.add_member(schema.User(name="guest", body=b"# guest")) + guest = users.add_member(schema.User(name="guest", body="# guest")) admin = users.add_member( - schema.User(name=app.config.ADMIN_USERNAME, password="fnord", email=app.config.ADMIN_EMAIL, body=b"# fnord") + schema.User(name=app.config.ADMIN_USERNAME, password="fnord", email=app.config.ADMIN_EMAIL, body="# fnord") ) # create the admin user and admins group - admins = groups.add_member(schema.Group(name="administrators", members=[admin], body=b"# administrators")) + admins = groups.add_member(schema.Group(name="administrators", members=[admin], body="# administrators")) # admins get full access root.set_permissions( diff --git a/src/ttfrog/forms.py b/src/ttfrog/forms.py deleted file mode 100644 index 2d32047..0000000 --- a/src/ttfrog/forms.py +++ /dev/null @@ -1,83 +0,0 @@ -import logging -from dataclasses import dataclass, field -from functools import cached_property - -from flask import g -from grung.types import BackReference, Collection, Pointer, Record, Timestamp - -from ttfrog import schema - -READ_ONLY_FIELD_TYPES = [Collection, Pointer, BackReference, Timestamp] - - -logger = logging.getLogger(__name__) - - -@dataclass -class Form: - """ - The base Form controller for the web UI. - """ - - record: Record - data: field(default_factory=dict) - - @cached_property - def read_only(self) -> set: - return [ - name for (name, attr) in self.record._metadata.fields.items() if type(attr) in READ_ONLY_FIELD_TYPES - ] + ["uid", "acl"] - - def prepare(self): - for key, value in self.data.items(): - if key in self.read_only: - continue - if self.record[key] != value: - self.record[key] = value - self.record.author = None if self.record == g.user else g.user - return self.record - - -@dataclass -class Page(Form): - """ - A form for creating and updating Page records. - """ - - record: schema.Page - - -@dataclass -class Wiki(Form): - """ - A form for creating and updating Wiki records. - """ - - record: schema.Page - - -@dataclass -class NPC(Page): - """ - A form for creating and updating Page records. - """ - - record: schema.NPC - - -@dataclass -class User(Page): - """ - A form for creating and updating Page records. - """ - - record: schema.NPC - - -@dataclass -class Group(Page): - """ - A form for creating and updating Page records. - """ - - record: schema.NPC diff --git a/src/ttfrog/schema.py b/src/ttfrog/schema.py index 9e67ce7..b10f259 100644 --- a/src/ttfrog/schema.py +++ b/src/ttfrog/schema.py @@ -1,11 +1,14 @@ from __future__ import annotations +import re from datetime import datetime from enum import StrEnum +from functools import cached_property from textwrap import dedent from typing import List -from grung.types import ( +from flask import g +from grung.objects import ( BackReference, Collection, DateTime, @@ -17,8 +20,11 @@ from grung.types import ( TextFilePointer, Timestamp, ) +from grung.validators import PatternValidator from tinydb import where +from ttfrog.exceptions import MalformedRequestError + def app_context(): import ttfrog.app @@ -27,6 +33,9 @@ def app_context(): return ttfrog.app +READ_ONLY_FIELD_TYPES = [Collection, Pointer, BackReference, Timestamp] + + class Permissions(StrEnum): READ = "r" WRITE = "w" @@ -57,17 +66,59 @@ class Page(Record): # fmt: off return [ *super().fields(), - Field("uri", unique=True), # The URI for the page, relative to the app's VIEW_URI - Field("name"), # The portion of the URI after the last / - Collection("members", Page), # The pages that exist below this page's URI - Pointer("author", value_type=User), # The last user to touch the page. - DateTime("created"), # When the page was created - Timestamp("last_modified"), # The last time the page was modified. + + # The URI for the page, relative to the app's VIEW_URI + Field("uri", unique=True, validators=[ + PatternValidator(re.compile( + r""" + ^ # match from beginning of line + (?: + / # first char can be a / + | # or + (?: + [0-9a-z] # first char is alphanum + (?:/?[0-9a-z]+)* # following by more alphnum, maybe with a leading / + )+ # at least once, possibly many times, + )? # or URI could be entirely blank + $ # until the end of the string + """, + re.IGNORECASE | re.VERBOSE + )) + ]), + + # The portion of the URI after the last / + Field("name", validators=[PatternValidator(re.compile(r'^(?:/|(?:[0-9a-z]+))$', re.IGNORECASE))]), + + # The pages that exist below this page's URI + Collection("members", member_type=Page), + + # The last user to touch the page. + Pointer("author", value_type=User), + + # When the page was created + DateTime("created"), + + # The last time the page was modified. + Timestamp("last_modified"), + + # The main content blob of the page + TextFilePointer("body", extension='.md', default=Page.default), + + # The access control list for this Page. Dict("acl"), - TextFilePointer("body", extension='.md', default=Page.default), # The main content blob of the page ] # fmt: on + @cached_property + def read_only(self) -> set: + return [name for (name, attr) in self._metadata.fields.items() if type(attr) in READ_ONLY_FIELD_TYPES] + [ + "uid", + "acl", + ] + + def update(self, **kwargs): + super().update(**dict((key, value) for key, value in kwargs.items() if key not in self.read_only)) + def parent(self): if self.uri == "": return None @@ -75,7 +126,7 @@ class Page(Record): app = app_context() parent_uri = "" - if "/" in self.uri: + if self.uri and "/" in self.uri: parent_uri = self.uri.rsplit("/", 1)[0] for table_name in app.db.tables(): page = app.db.table(table_name).get(where("uri") == parent_uri, recurse=False) @@ -95,7 +146,7 @@ class Page(Record): super().before_insert(app.db) if not self.author: - self.author = app.db.User.get(where('name') == '__system__') + self.author = app.db.User.get(where("name") == "__system__") now = datetime.utcnow() if not self.doc_id and self.created < now: @@ -192,6 +243,15 @@ class User(Entity): Password("password"), ] + def update(self, **data): + self.author = None if self == g.user else g.user + return super().update(**data) + + def validate(self): + if self.name == "__system__": + raise MalformedRequestError("Invalid name.") + return super().validate() + def check_credentials(self, username: str, password: str) -> bool: return username == self.name and self._metadata.fields["password"].compare(password, self.password) @@ -218,7 +278,7 @@ class Group(Entity): @classmethod def fields(cls): - return super().fields() + [Collection("members", Entity)] + return super().fields() + [Collection("members", member_type=Entity)] class NPC(Page): diff --git a/src/ttfrog/web.py b/src/ttfrog/web.py index 54a1061..2e07c11 100644 --- a/src/ttfrog/web.py +++ b/src/ttfrog/web.py @@ -1,10 +1,12 @@ import json import re +from urllib.parse import unquote from flask import Response, g, jsonify, redirect, render_template, request, session, url_for +from grung.exceptions import ValidationError from tinydb import where -from ttfrog import app, forms, schema +from ttfrog import app, schema from ttfrog.exceptions import MalformedRequestError, RecordNotFoundError, UnauthorizedError @@ -21,6 +23,8 @@ def get_page( except RecordNotFoundError as e: if not create_okay: return None, e + except ValidationError as e: + return None, e return page, None @@ -63,8 +67,12 @@ def breadcrumbs(): def before_request(): g.messages = [] if not request.path.startswith("/static"): - user_id = session.get("user_id", 1) - g.user = app.db.User.get(doc_id=user_id, recurse=False) + user_id = session.get("user_id") + g.user = ( + app.db.User.get(doc_id=user_id, recurse=False) + if user_id + else app.db.User.get(where("name") == "guest", recurse=False) + ) session["user_id"] = user_id session["user"] = dict(g.user.serialize()) @@ -109,9 +117,14 @@ def logout(): return redirect(url_for("index")) -@app.web.route(f"{app.config.VIEW_URI}//", methods=["GET"]) -@app.web.route(f"{app.config.VIEW_URI}/", methods=["GET"], defaults={"table": "Page"}) +@app.web.route(f"{app.config.VIEW_URI}//", methods=["GET"]) +@app.web.route(f"{app.config.VIEW_URI}/", methods=["GET"], defaults={"table": "Page"}) def view(table, path): + clean_table = re.sub(r"[^a-zA-Z0-9]", "", unquote(table)) + clean_path = re.sub(r"[^a-zA-Z0-9]", "", unquote(path)) + if clean_table != table or clean_path != clean_path: + return redirect(url_for("view", table=clean_table, path=clean_path), 302) + page, error = get_page(request.path, table=table, create_okay=True) if error: g.messages.append(str(error)) @@ -121,24 +134,23 @@ def view(table, path): @app.web.route(f"{app.config.API_URI}/put//", methods=["POST"]) @app.web.route(f"{app.config.API_URI}/put/", methods=["POST"], defaults={"table": "Page"}) def put(table, path): - app.log.debug(f"Checking for page at {table}/{path} in {table} space") + clean_table = re.sub(r"[^a-zA-Z0-9]", "", unquote(table)) + clean_path = re.sub(r"[^a-zA-Z0-9]", "", unquote(path)) + if clean_table != table or clean_path != clean_path: + return redirect(url_for("put", table=clean_table, path=clean_path), 302) + page, error = get_page("/".join([table, path]), table=table, create_okay=True) - app.log.debug(f"Found {page.doc_id}") if error: return api_response(error=error) - params = json.loads(request.data.decode())["body"] - save_data = getattr(forms, table)(page, params).prepare() - app.log.debug("Saving form data...") - app.log.debug(f"{save_data=}") - doc = app.db.save(save_data) - app.log.debug(f"Saved {dict(doc)}") + page.update(**json.loads(request.data.decode())["body"]) + updated = app.db.save(page) if not page.doc_id: - print(f"Adding {doc.doc_id} to {page.parent.members}") - page.parent.members = list(set(page.parent.members + [doc])) - app.db.save(page.parent) - app.log.debug(f"Saved: {dict(doc)=}") - return api_response(response=dict(doc)) + parent = page.parent() + if parent: + parent.update(members=list(set(parent.members + [updated]))) + app.db.save(parent) + return api_response(response=dict(updated)) @app.web.route(f"{app.config.API_URI}/search/", methods=["POST"]) @@ -163,6 +175,10 @@ def search(space): @app.web.route(f"{app.config.API_URI}/get//", methods=["GET"]) def get(table, doc_id): + clean_table = re.sub(r"[^a-zA-Z0-9]", "", unquote(table)) + if clean_table != table: + return redirect(url_for("get", table=clean_table, doc_id=doc_id), 302) + app.log.debug(f"API: getting {table}({doc_id})") page, error = get_page(g.user, table=table, doc_id=doc_id) return api_response(response=dict(page), error=error)