vtt/src/ttfrog/web.py
2025-12-25 10:29:46 -08:00

189 lines
6.6 KiB
Python

import json
import re
from urllib.parse import unquote
from flask import Response, g, jsonify, redirect, render_template, request, session, url_for
from grung.exceptions import ValidationError
from tinydb import where
from ttfrog import app, schema
from ttfrog.exceptions import MalformedRequestError, RecordNotFoundError, UnauthorizedError
def get_page(
path: str, table: str = "Page", doc_id: int = None, create_okay: bool = False
) -> (schema.Record | None, Exception | None):
"""
Get one page, including its members, but not recursively.
"""
try:
page = app.get_page(g.user, table, doc_id=doc_id, uri=path)
except (UnauthorizedError, MalformedRequestError) as e:
return None, e
except RecordNotFoundError as e:
if not create_okay:
return None, e
except ValidationError as e:
return None, e
return page, None
def api_response(response={}, messages=[], error=None):
response_code = 200
if error:
response_code = 500
response = {}
if isinstance(error, UnauthorizedError):
response_code = 403
elif isinstance(error, MalformedRequestError):
response_code = 4000
elif isinstance(error, RecordNotFoundError):
response_code = 404
return jsonify({"messages": messages, "response": response, "code": response_code}), response_code
def rendered(page: schema.Record, template: str = "page.html"):
if not page:
return Response("Page not found", status=404)
root = page if page.uri == "" else get_page("")[0]
return render_template(template, page=page, app=app, breadcrumbs=breadcrumbs(), root=root, user=g.user, g=g)
def breadcrumbs():
"""
Return (uri, name) pairs for the parents leading from the VIEW_URI to the current request.
"""
uri = ""
names = (request.path.replace(app.config.VIEW_URI, "", 1).strip("/") or "/").split("/")
for name in names:
uri = "/".join([uri, name])
yield (uri, name)
@app.web.before_request
def before_request():
g.messages = []
if not request.path.startswith("/static"):
user_id = session.get("user_id")
g.user = (
app.db.User.get(doc_id=user_id, recurse=False)
if user_id
else app.db.User.get(where("name") == "guest", recurse=False)
)
session["user_id"] = user_id
session["user"] = dict(g.user.serialize())
@app.web.after_request
def add_header(r):
r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, public, max-age=0"
r.headers["Pragma"] = "no-cache"
r.headers["Expires"] = "0"
return r
@app.web.route(app.config.VIEW_URI)
def index():
page, error = get_page("")
if error:
g.messages.append(str(error))
return rendered(page)
@app.web.route("/login", methods=["GET", "POST"])
def login():
app.web.session_interface.regenerate(session)
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
user = app.authenticate(username, password)
if user:
g.user = user
session["user_id"] = g.user.doc_id
session["user"] = dict(g.user.serialize())
return redirect(url_for("index"))
g.messages.append(f"Invalid login for {username}")
return rendered(schema.Page(name="Login"), "login.html")
@app.web.route("/logout")
def logout():
if "user_id" in session:
del session["user_id"]
del g.user
return redirect(url_for("index"))
@app.web.route(f"{app.config.VIEW_URI}/<string:table>/<string:path>", methods=["GET"])
@app.web.route(f"{app.config.VIEW_URI}/<string:path>", methods=["GET"], defaults={"table": "Page"})
def view(table, path):
clean_table = re.sub(r"[^a-zA-Z0-9]", "", unquote(table))
clean_path = re.sub(r"[^a-zA-Z0-9]", "", unquote(path))
if clean_table != table or clean_path != clean_path:
app.log.warning(f"Invalid table/path: {table=}, {path=}. Redirecting to {clean_table}/{clean_path}")
return redirect(url_for("view", table=clean_table, path=clean_path), 302)
app.log.debug(f"Looking for {table=}, {path=}")
page, error = get_page(request.path, table=table, create_okay=True)
if error:
app.log.error(error)
g.messages.append(str(error))
return rendered(page)
@app.web.route(f"{app.config.API_URI}/put/<path:table>/<path:path>", methods=["POST"])
@app.web.route(f"{app.config.API_URI}/put/<path:path>", methods=["POST"], defaults={"table": "Page"})
def put(table, path):
clean_table = re.sub(r"[^a-zA-Z0-9]", "", unquote(table))
clean_path = re.sub(r"[^a-zA-Z0-9]", "", unquote(path))
if clean_table != table or clean_path != clean_path:
return redirect(url_for("put", table=clean_table, path=clean_path), 302)
page, error = get_page("/".join([table, path]), table=table, create_okay=True)
if error:
return api_response(error=error)
page.update(**json.loads(request.data.decode())["body"])
updated = app.db.save(page)
if not page.doc_id:
parent = page.parent()
if parent:
parent.update(members=list(set(parent.members + [updated])))
app.db.save(parent)
app.log.debug(f"Saved page at uri {updated.uri}")
return api_response(response=dict(updated))
@app.web.route(f"{app.config.API_URI}/search/<string:space>", methods=["POST"])
@app.web.route(f"{app.config.API_URI}/search/", methods=["POST"], defaults={"space": None})
def search(space):
spaces = app.db.tables()
if space:
spaces = [space.lower().capitalize()]
query = json.loads(request.data.decode())["body"]
app.log.debug(f"Searching for records matching query {query}")
matches = []
for space in spaces:
for page in app.db.table(space).search(where("name").matches(query, re.IGNORECASE), recurse=False):
if app.authorize(g.user, page, schema.Permissions.READ):
app.log.debug(f"Adding search result {dict(page)}")
matches.append(dict(page))
return api_response(
response=matches, error=None if matches else RecordNotFoundError(f"No records matching '{query}'")
)
@app.web.route(f"{app.config.API_URI}/get/<path:table>/<int:doc_id>", methods=["GET"])
def get(table, doc_id):
clean_table = re.sub(r"[^a-zA-Z0-9]", "", unquote(table))
if clean_table != table:
return redirect(url_for("get", table=clean_table, doc_id=doc_id), 302)
app.log.debug(f"API: getting {table}({doc_id})")
page, error = get_page(g.user, table=table, doc_id=doc_id)
return api_response(response=dict(page), error=error)