Add FastAPI catalog backend (games/snapshots/diff) + tests

Modular-monolith backend over SQLAlchemy (SQLite by default, Postgres-ready
via DATABASE_URL). The full snapshot.json is stored verbatim; diffing reads it
back through the ams.diff engine, so the DB never mirrors the snapshot schema.

- ams.api.db/models/schemas/service : Game 1-N Snapshot, sha256-deduped upsert
- routes: POST/GET /games, POST/GET /snapshots (import, deduped), GET /diff
          (?old&new[&owner]) running compute_diff on stored snapshots, /health
- ams.api.importer : bulk CLI loader (python -m ams.api.importer --game ...)
- run: uvicorn ams.api.app:create_app --factory

11 tests pass (6 diff + 5 API via TestClient over the golden pair). Smoke-tested
live on uvicorn: import -> /snapshots -> /diff returns the BlooMoo deltas.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Patryk Gensch
2026-05-30 22:27:24 +02:00
parent 6885bbee3d
commit 8386196653
14 changed files with 481 additions and 2 deletions

3
.gitignore vendored
View File

@@ -15,3 +15,6 @@ __pycache__/
# macOS
.DS_Store
# Local SQLite catalog DB
*.db

1
ams/api/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""FastAPI backend: catalog of games/engine versions + snapshot import + diff endpoint."""

30
ams/api/app.py Normal file
View File

@@ -0,0 +1,30 @@
"""FastAPI application factory.
Run with uvicorn's factory mode (no import-time DB side effects):
uvicorn ams.api.app:create_app --factory --reload
"""
from __future__ import annotations
from fastapi import FastAPI
from .. import __version__
from .db import configure, init_db
from .routes import diff, games, snapshots
def create_app(database_url: str | None = None) -> FastAPI:
configure(database_url)
init_db()
app = FastAPI(title="ams — engine surface catalog", version=__version__)
app.include_router(games.router)
app.include_router(snapshots.router)
app.include_router(diff.router)
@app.get("/health", tags=["meta"])
def health() -> dict[str, str]:
return {"status": "ok", "version": __version__}
return app

53
ams/api/db.py Normal file
View File

@@ -0,0 +1,53 @@
"""Database engine/session wiring. SQLite by default (zero setup), Postgres-ready via DATABASE_URL.
The engine/session factory is process-global and (re)built by `configure()`, so tests can point it
at a temporary database before creating the app.
"""
from __future__ import annotations
import os
from collections.abc import Iterator
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, Session, sessionmaker
DEFAULT_URL = "sqlite:///./ams.db"
class Base(DeclarativeBase):
pass
_engine = None
_SessionLocal: sessionmaker | None = None
def configure(url: str | None = None) -> None:
global _engine, _SessionLocal
url = url or os.environ.get("DATABASE_URL", DEFAULT_URL)
connect_args = {"check_same_thread": False} if url.startswith("sqlite") else {}
_engine = create_engine(url, connect_args=connect_args, future=True)
_SessionLocal = sessionmaker(bind=_engine, autoflush=False, expire_on_commit=False, class_=Session)
def init_db() -> None:
from . import models # noqa: F401 - register mappers before create_all
if _engine is None:
configure()
Base.metadata.create_all(_engine)
def get_session() -> Session:
if _SessionLocal is None:
configure()
return _SessionLocal()
def get_db() -> Iterator[Session]:
"""FastAPI dependency yielding a request-scoped session."""
db = get_session()
try:
yield db
finally:
db.close()

41
ams/api/importer.py Normal file
View File

@@ -0,0 +1,41 @@
"""Bulk-import snapshot.json files straight into the DB (no HTTP server needed).
python -m ams.api.importer [--game "Reksio i UFO"] snapshots/*.json
"""
from __future__ import annotations
import argparse
import json
import sys
from .db import get_session, init_db
from .service import import_snapshot, looks_like_snapshot
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(prog="ams-import", description="Import snapshots into the catalog DB.")
p.add_argument("files", nargs="+", help="snapshot.json files")
p.add_argument("--game", help="link all imported snapshots to this game (created if missing)")
args = p.parse_args(argv)
init_db()
db = get_session()
try:
for path in args.files:
with open(path, "r", encoding="utf-8") as fh:
data = json.load(fh)
if not looks_like_snapshot(data):
print("[!] skip (not a snapshot): {0}".format(path))
continue
snap = import_snapshot(db, data, args.game)
print("[+] #{0} {1} [{2}/{3}] types={4} methods={5} events={6} fields={7}".format(
snap.id, snap.binary_name, snap.engine, snap.compiler,
snap.n_types, snap.n_methods, snap.n_events, snap.n_fields))
finally:
db.close()
return 0
if __name__ == "__main__":
sys.exit(main())

51
ams/api/models.py Normal file
View File

@@ -0,0 +1,51 @@
"""ORM models: a Game has many Snapshots. The full snapshot.json is stored verbatim in `data`
(JSON / JSONB); axis counts are denormalised for cheap listing. Diffing reads `data` back through
the existing ams.diff engine, so the DB never has to mirror the snapshot schema."""
from __future__ import annotations
from datetime import datetime, timezone
from sqlalchemy import ForeignKey, JSON, String, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from .db import Base
def _utcnow() -> datetime:
return datetime.now(timezone.utc)
class Game(Base):
__tablename__ = "games"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String, unique=True, index=True)
notes: Mapped[str | None] = mapped_column(String, default=None)
snapshots: Mapped[list["Snapshot"]] = relationship(
back_populates="game", cascade="all, delete-orphan")
class Snapshot(Base):
__tablename__ = "snapshots"
__table_args__ = (UniqueConstraint("sha256", name="uq_snapshot_sha256"),)
id: Mapped[int] = mapped_column(primary_key=True)
game_id: Mapped[int | None] = mapped_column(ForeignKey("games.id"), default=None, index=True)
binary_name: Mapped[str] = mapped_column(String)
sha256: Mapped[str] = mapped_column(String, index=True)
engine: Mapped[str | None] = mapped_column(String, default=None)
compiler: Mapped[str | None] = mapped_column(String, default=None)
schema_version: Mapped[int | None] = mapped_column(default=None)
n_types: Mapped[int] = mapped_column(default=0)
n_methods: Mapped[int] = mapped_column(default=0)
n_events: Mapped[int] = mapped_column(default=0)
n_fields: Mapped[int] = mapped_column(default=0)
created_at: Mapped[datetime] = mapped_column(default=_utcnow)
data: Mapped[dict] = mapped_column(JSON)
game: Mapped["Game | None"] = relationship(back_populates="snapshots")

View File

30
ams/api/routes/diff.py Normal file
View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from ...diff import compute_diff, filter_by_owner
from ...snapshot import Snapshot
from .. import models
from ..db import get_db
router = APIRouter(tags=["diff"])
@router.get("/diff")
def get_diff(
old: int = Query(..., description="older snapshot id"),
new: int = Query(..., description="newer snapshot id"),
owner: str | None = Query(None, description="restrict to one class, e.g. CMC_Animo"),
db: Session = Depends(get_db),
) -> dict[str, Any]:
a = db.get(models.Snapshot, old)
b = db.get(models.Snapshot, new)
if a is None or b is None:
raise HTTPException(404, "snapshot not found")
diff = compute_diff(Snapshot(a.data), Snapshot(b.data))
if owner:
diff = filter_by_owner(diff, owner)
return diff

37
ams/api/routes/games.py Normal file
View File

@@ -0,0 +1,37 @@
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from .. import models, schemas
from ..db import get_db
router = APIRouter(prefix="/games", tags=["games"])
@router.post("", response_model=schemas.GameOut, status_code=201)
def create_game(body: schemas.GameCreate, db: Session = Depends(get_db)) -> models.Game:
game = models.Game(name=body.name, notes=body.notes)
db.add(game)
try:
db.commit()
except IntegrityError:
db.rollback()
raise HTTPException(409, "game with that name already exists")
db.refresh(game)
return game
@router.get("", response_model=list[schemas.GameOut])
def list_games(db: Session = Depends(get_db)) -> list[models.Game]:
return list(db.scalars(select(models.Game).order_by(models.Game.name)))
@router.get("/{game_id}", response_model=schemas.GameDetail)
def get_game(game_id: int, db: Session = Depends(get_db)) -> models.Game:
game = db.get(models.Game, game_id)
if game is None:
raise HTTPException(404, "game not found")
return game

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
from typing import Any
from fastapi import APIRouter, Body, Depends, HTTPException, Query
from sqlalchemy import select
from sqlalchemy.orm import Session
from .. import models, schemas, service
from ..db import get_db
router = APIRouter(prefix="/snapshots", tags=["snapshots"])
@router.post("", response_model=schemas.SnapshotOut, status_code=201)
def create_snapshot(
data: dict[str, Any] = Body(..., description="a full engine-surface snapshot.json"),
game: str | None = Query(None, description="link to / create this game by name"),
db: Session = Depends(get_db),
) -> models.Snapshot:
if not service.looks_like_snapshot(data):
raise HTTPException(422, "body is not an engine-surface snapshot (missing binary/types)")
return service.import_snapshot(db, data, game)
@router.get("", response_model=list[schemas.SnapshotOut])
def list_snapshots(
game_id: int | None = Query(None), db: Session = Depends(get_db)
) -> list[models.Snapshot]:
q = select(models.Snapshot)
if game_id is not None:
q = q.where(models.Snapshot.game_id == game_id)
return list(db.scalars(q.order_by(models.Snapshot.id)))
@router.get("/{snapshot_id}", response_model=schemas.SnapshotDetail)
def get_snapshot(snapshot_id: int, db: Session = Depends(get_db)) -> models.Snapshot:
snap = db.get(models.Snapshot, snapshot_id)
if snap is None:
raise HTTPException(404, "snapshot not found")
return snap

43
ams/api/schemas.py Normal file
View File

@@ -0,0 +1,43 @@
"""Pydantic request/response models."""
from __future__ import annotations
from datetime import datetime
from pydantic import BaseModel, ConfigDict
class GameCreate(BaseModel):
name: str
notes: str | None = None
class GameOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
name: str
notes: str | None = None
class SnapshotOut(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
game_id: int | None
binary_name: str
sha256: str
engine: str | None
compiler: str | None
schema_version: int | None
n_types: int
n_methods: int
n_events: int
n_fields: int
created_at: datetime
class SnapshotDetail(SnapshotOut):
data: dict
class GameDetail(GameOut):
snapshots: list[SnapshotOut] = []

60
ams/api/service.py Normal file
View File

@@ -0,0 +1,60 @@
"""Business logic shared by the HTTP routes and the bulk importer."""
from __future__ import annotations
import hashlib
import json
from typing import Any
from sqlalchemy import select
from sqlalchemy.orm import Session
from . import models
def _content_sha(data: dict[str, Any]) -> str:
return hashlib.sha256(json.dumps(data, sort_keys=True).encode("utf-8")).hexdigest()
def looks_like_snapshot(data: Any) -> bool:
return isinstance(data, dict) and "binary" in data and "types" in data
def _apply_metadata(snap: models.Snapshot, data: dict[str, Any]) -> None:
binary = data.get("binary", {})
snap.binary_name = binary.get("name", "?")
snap.engine = binary.get("engine")
snap.compiler = binary.get("compiler")
snap.schema_version = data.get("schema_version")
snap.n_types = len(data.get("types", []))
snap.n_methods = len(data.get("methods", []))
snap.n_events = len(data.get("events", []))
snap.n_fields = len(data.get("fields", []))
snap.data = data
def _get_or_create_game(db: Session, name: str) -> models.Game:
game = db.scalar(select(models.Game).where(models.Game.name == name))
if game is None:
game = models.Game(name=name)
db.add(game)
db.flush()
return game
def import_snapshot(db: Session, data: dict[str, Any], game_name: str | None = None) -> models.Snapshot:
"""Upsert a snapshot, deduped by the binary's sha256 (falling back to a content hash)."""
sha = data.get("binary", {}).get("sha256") or _content_sha(data)
snap = db.scalar(select(models.Snapshot).where(models.Snapshot.sha256 == sha))
game = _get_or_create_game(db, game_name) if game_name else None
if snap is None:
snap = models.Snapshot(sha256=sha)
db.add(snap)
_apply_metadata(snap, data)
if game is not None:
snap.game_id = game.id
db.commit()
db.refresh(snap)
return snap

View File

@@ -1,9 +1,28 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project]
name = "ams"
version = "0.1.0"
description = "Aidem Media engine-surface snapshot diffing (Piklib/BlooMoo)"
requires-python = ">=3.9"
description = "Aidem Media engine-surface snapshot diffing + catalog (Piklib/BlooMoo)"
requires-python = ">=3.10"
dependencies = []
[project.optional-dependencies]
api = [
"fastapi>=0.110",
"uvicorn[standard]>=0.27",
"sqlalchemy>=2.0",
"pydantic>=2.6",
]
dev = [
"pytest>=8",
"httpx>=0.27",
]
[tool.setuptools.packages.find]
include = ["ams*"]
[tool.pytest.ini_options]
testpaths = ["tests"]

70
tests/test_api.py Normal file
View File

@@ -0,0 +1,70 @@
"""API tests against a temp SQLite DB, exercising import -> list -> diff over the golden pair."""
from __future__ import annotations
import json
from pathlib import Path
import pytest
pytest.importorskip("fastapi")
from fastapi.testclient import TestClient # noqa: E402
from ams.api.app import create_app # noqa: E402
SNAP_DIR = Path(__file__).resolve().parents[1] / "snapshots"
PIKLIB = SNAP_DIR / "PIKLIB8.dll.snapshot.json"
BLOOMOO = SNAP_DIR / "bloomoodll.dll.snapshot.json"
pytestmark = pytest.mark.skipif(not PIKLIB.exists(), reason="golden snapshots not present")
@pytest.fixture()
def client(tmp_path) -> TestClient:
app = create_app(database_url="sqlite:///{0}/api.db".format(tmp_path))
return TestClient(app)
def _load(path: Path) -> dict:
with open(path, "r", encoding="utf-8") as fh:
return json.load(fh)
def test_health(client: TestClient):
assert client.get("/health").json()["status"] == "ok"
def test_import_list_and_dedup(client: TestClient):
r = client.post("/snapshots", params={"game": "Reksio i UFO"}, json=_load(PIKLIB))
assert r.status_code == 201
first_id = r.json()["id"]
assert r.json()["engine"] == "Piklib" and r.json()["n_types"] == 40
# re-importing the same binary (same sha256) updates in place, not a duplicate row
r2 = client.post("/snapshots", json=_load(PIKLIB))
assert r2.json()["id"] == first_id
assert len(client.get("/snapshots").json()) == 1
# the game was auto-created and linked
games = client.get("/games").json()
assert [g["name"] for g in games] == ["Reksio i UFO"]
def test_invalid_snapshot_rejected(client: TestClient):
assert client.post("/snapshots", json={"not": "a snapshot"}).status_code == 422
def test_diff_endpoint(client: TestClient):
a = client.post("/snapshots", params={"game": "Reksio i UFO"}, json=_load(PIKLIB)).json()["id"]
b = client.post("/snapshots", params={"game": "Reksio i Kapitan Nemo"}, json=_load(BLOOMOO)).json()["id"]
diff = client.get("/diff", params={"old": a, "new": b}).json()
added = {t["script_name"] for t in diff["types"]["added"]}
assert {"GRBUFFER", "INTERNET"} <= added
animo = client.get("/diff", params={"old": a, "new": b, "owner": "CMC_Animo"}).json()
assert "GETFPS" in {m["name"] for m in animo["methods"]["added"]}
def test_diff_missing_snapshot_404(client: TestClient):
assert client.get("/diff", params={"old": 999, "new": 998}).status_code == 404