Add snapshot diff engine (ams package) + tests

Standalone CLI that diffs two engine-surface snapshots across all four axes,
the foundation the FastAPI/DB layer will sit on.

- ams.snapshot : typed access to a snapshot.json
- ams.diff     : keyed set-diff per axis (added/removed/changed) + cross-owner
                 method-move detection; types keyed by (script_name,
                 via_module_iface) so the dual MULTIARRAY stays stable;
                 filter_by_owner for per-class focus
- ams.render   : human-readable report (+/-/~), owner-grouped
- ams.cli      : python -m ams OLD NEW [--owner C] [--only ...] [--json]

6 tests pass, incl. an integration test over the committed golden pair
(asserts BlooMoo adds GRBUFFER/INTERNET, MOUSE grows 104->128, Animo gains
GETFPS, Animo script fields unchanged).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Patryk Gensch
2026-05-30 22:18:04 +02:00
parent 91c7a11ba8
commit 6885bbee3d
8 changed files with 466 additions and 0 deletions

7
ams/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
"""ams - Aidem Media engine-surface tooling.
Compares engine-surface snapshots (produced by ghidra_scripts/extract_engine_surface.py)
across versions of the Piklib/BlooMoo engines.
"""
__version__ = "0.1.0"

5
ams/__main__.py Normal file
View File

@@ -0,0 +1,5 @@
import sys
from .cli import main
sys.exit(main())

49
ams/cli.py Normal file
View File

@@ -0,0 +1,49 @@
"""CLI: diff two engine-surface snapshots.
python -m ams OLD.snapshot.json NEW.snapshot.json [--owner CMC_Animo] [--only types,methods] [--json]
"""
from __future__ import annotations
import argparse
import json
import sys
from .diff import compute_diff, filter_by_owner
from .render import render_text
from .snapshot import Snapshot
_AXES = ["types", "methods", "events", "fields", "layout"]
def main(argv: list[str] | None = None) -> int:
p = argparse.ArgumentParser(prog="ams", description="Diff two engine-surface snapshots.")
p.add_argument("old", help="older snapshot.json")
p.add_argument("new", help="newer snapshot.json")
p.add_argument("--owner", help="restrict to one class, e.g. CMC_Animo")
p.add_argument("--only", help="comma-separated axes to show: " + ",".join(_AXES))
p.add_argument("--json", action="store_true", help="emit machine-readable JSON")
args = p.parse_args(argv)
old = Snapshot.load(args.old)
new = Snapshot.load(args.new)
diff = compute_diff(old, new)
if args.owner:
diff = filter_by_owner(diff, args.owner)
if args.json:
print(json.dumps(diff, indent=2, sort_keys=True))
return 0
only = None
if args.only:
only = {a.strip() for a in args.only.split(",") if a.strip()}
bad = only - set(_AXES)
if bad:
p.error("unknown axis: {0} (choose from {1})".format(",".join(sorted(bad)), ",".join(_AXES)))
print(render_text(diff, only=only))
return 0
if __name__ == "__main__":
sys.exit(main())

115
ams/diff.py Normal file
View File

@@ -0,0 +1,115 @@
"""Compute a structured diff between two engine-surface snapshots.
The result is a plain dict (JSON-serialisable). Each axis is a {added, removed, changed} block
produced by `keyed_diff`; `changed` entries carry the per-field old->new deltas. Methods also get
a cross-owner `moved` pass to surface hierarchy reparenting.
"""
from __future__ import annotations
from typing import Any, Callable, Hashable
from .snapshot import Snapshot
Item = dict
KeyFn = Callable[[Item], Hashable]
def _index(items: list[Item], key: KeyFn) -> dict[Hashable, Item]:
return {key(it): it for it in items}
def keyed_diff(a_items: list[Item], b_items: list[Item], key: KeyFn,
compare_fields: list[str]) -> dict[str, Any]:
"""Set-diff two item lists by `key`; for items present in both, report changed compare_fields."""
a = _index(a_items, key)
b = _index(b_items, key)
added = [b[k] for k in b if k not in a]
removed = [a[k] for k in a if k not in b]
changed = []
for k in a:
if k not in b:
continue
deltas = {f: [a[k].get(f), b[k].get(f)] for f in compare_fields if a[k].get(f) != b[k].get(f)}
if deltas:
changed.append({"item": b[k], "changes": deltas})
return {"added": added, "removed": removed, "changed": changed}
# --- per-axis keys -----------------------------------------------------------------------------
# Types: script_name + via_module_iface keeps the dual-dispatch MULTIARRAY entries distinct and
# stable across versions (addresses change, this semantic flag does not).
def _type_key(t: Item) -> Hashable:
return (t["script_name"], bool(t.get("via_module_iface")))
def _owner_name_key(x: Item) -> Hashable:
return (x["owner"], x["name"])
def _layout_key(x: Item) -> Hashable:
return (x["owner"], x["offset"])
def _detect_method_moves(old_m: list[Item], new_m: list[Item]) -> list[Item]:
"""A method name that left some owner and appeared under another - i.e. moved in the hierarchy."""
def owners_by_name(items: list[Item]) -> dict[str, set]:
out: dict[str, set] = {}
for m in items:
out.setdefault(m["name"], set()).add(m["owner"])
return out
old_o, new_o = owners_by_name(old_m), owners_by_name(new_m)
moves = []
for name in sorted(set(old_o) & set(new_o)):
lost = old_o[name] - new_o[name]
gained = new_o[name] - old_o[name]
if lost and gained:
moves.append({"name": name, "from_owners": sorted(lost), "to_owners": sorted(gained)})
return moves
def compute_diff(old: Snapshot, new: Snapshot) -> dict[str, Any]:
return {
"binary": {"from": old.binary, "to": new.binary},
"types": keyed_diff(old.types, new.types, _type_key, ["cpp_class", "object_size"]),
"methods": keyed_diff(old.methods, new.methods, _owner_name_key, ["id"]),
"events": keyed_diff(old.events, new.events, _owner_name_key, ["order"]),
"fields": keyed_diff(old.fields, new.fields, _owner_name_key, ["type"]),
"struct_layout": keyed_diff(old.struct_layout, new.struct_layout, _layout_key,
["size", "is_vtable"]),
"method_inheritance": keyed_diff(old.method_inheritance, new.method_inheritance,
lambda x: x["runner"], ["base_runner"]),
"field_inheritance": keyed_diff(old.field_inheritance, new.field_inheritance,
lambda x: x["class"], ["base_class"]),
"moved_methods": _detect_method_moves(old.methods, new.methods),
}
# --- owner filtering (for `--owner CMC_Animo`) -------------------------------------------------
def _item_owner(axis: str, item: Item) -> str | None:
if axis == "types":
return item.get("cpp_class")
if axis in ("methods", "events", "fields", "struct_layout"):
return item.get("owner")
if axis == "method_inheritance":
return item.get("runner")
if axis == "field_inheritance":
return item.get("class")
return None
def filter_by_owner(diff: dict[str, Any], owner: str) -> dict[str, Any]:
"""Restrict every axis to a single class/owner. `binary` and `moved_methods` are kept whole."""
out: dict[str, Any] = {"binary": diff["binary"]}
out["moved_methods"] = [m for m in diff["moved_methods"]
if owner in m["from_owners"] or owner in m["to_owners"]]
for axis, block in diff.items():
if axis in ("binary", "moved_methods"):
continue
out[axis] = {
"added": [i for i in block["added"] if _item_owner(axis, i) == owner],
"removed": [i for i in block["removed"] if _item_owner(axis, i) == owner],
"changed": [c for c in block["changed"] if _item_owner(axis, c["item"]) == owner],
}
return out

120
ams/render.py Normal file
View File

@@ -0,0 +1,120 @@
"""Human-readable rendering of a snapshot diff (see diff.compute_diff)."""
from __future__ import annotations
from typing import Any, Callable
def _counts(block: dict[str, Any]) -> str:
return "+{0} -{1} ~{2}".format(
len(block["added"]), len(block["removed"]), len(block["changed"]))
def _fmt_changes(changes: dict[str, list]) -> str:
return "; ".join("{0}: {1} -> {2}".format(f, v[0], v[1]) for f, v in sorted(changes.items()))
def _group_by(items: list[dict], owner_of: Callable[[dict], str]) -> dict[str, list[dict]]:
out: dict[str, list[dict]] = {}
for it in items:
out.setdefault(owner_of(it) or "?", []).append(it)
return out
# --- per-axis item formatting ------------------------------------------------------------------
def _fmt_type(t: dict) -> str:
tag = " [via module iface]" if t.get("via_module_iface") else ""
size = t.get("object_size")
cls = t.get("cpp_class") or "?"
return "{0} -> {1} (size {2}){3}".format(t["script_name"], cls, size, tag)
def _fmt_method(m: dict) -> str:
return "{0} (id {1})".format(m["name"], m.get("id"))
def _fmt_event(e: dict) -> str:
return "{0} (#{1})".format(e["name"], e.get("order"))
def _fmt_field(f: dict) -> str:
return "{0}: {1}".format(f["name"], f.get("type"))
def _fmt_layout(x: dict) -> str:
vt = " vtable" if x.get("is_vtable") else ""
return "@{0:#x} size {1}{2}".format(x["offset"], x.get("size"), vt)
# --- section renderers -------------------------------------------------------------------------
def _section_flat(out: list[str], title: str, block: dict, fmt: Callable[[dict], str],
name_of: Callable[[dict], str]) -> None:
out.append("")
out.append("{0:<16} {1}".format(title, _counts(block)))
for it in sorted(block["added"], key=name_of):
out.append(" + {0}".format(fmt(it)))
for it in sorted(block["removed"], key=name_of):
out.append(" - {0}".format(name_of(it)))
for ch in sorted(block["changed"], key=lambda c: name_of(c["item"])):
out.append(" ~ {0:<22} {1}".format(name_of(ch["item"]), _fmt_changes(ch["changes"])))
def _section_owned(out: list[str], title: str, block: dict, fmt: Callable[[dict], str],
owner_of: Callable[[dict], str], name_of: Callable[[dict], str]) -> None:
out.append("")
out.append("{0:<16} {1}".format(title, _counts(block)))
added = _group_by(block["added"], owner_of)
removed = _group_by(block["removed"], owner_of)
changed = _group_by([c["item"] for c in block["changed"]], owner_of)
change_by_id = {id(c["item"]): c for c in block["changed"]}
for owner in sorted(set(added) | set(removed) | set(changed)):
out.append(" {0}".format(owner))
for it in sorted(added.get(owner, []), key=name_of):
out.append(" + {0}".format(fmt(it)))
for it in sorted(removed.get(owner, []), key=name_of):
out.append(" - {0}".format(name_of(it)))
for it in sorted(changed.get(owner, []), key=name_of):
out.append(" ~ {0:<22} {1}".format(
name_of(it), _fmt_changes(change_by_id[id(it)]["changes"])))
def _is_empty(block: dict) -> bool:
return not (block["added"] or block["removed"] or block["changed"])
def render_text(diff: dict[str, Any], only: set[str] | None = None) -> str:
b = diff["binary"]
out: list[str] = ["Engine surface diff"]
out.append(" from: {0} [{1}/{2}]".format(
b["from"].get("name", "?"), b["from"].get("engine", "?"), b["from"].get("compiler", "?")))
out.append(" to: {0} [{1}/{2}]".format(
b["to"].get("name", "?"), b["to"].get("engine", "?"), b["to"].get("compiler", "?")))
def want(axis: str) -> bool:
return only is None or axis in only
if want("types") and not _is_empty(diff["types"]):
_section_flat(out, "TYPES", diff["types"], _fmt_type, lambda t: t["script_name"])
if want("methods") and not _is_empty(diff["methods"]):
_section_owned(out, "METHODS", diff["methods"], _fmt_method,
lambda m: m["owner"], lambda m: m["name"])
if want("events") and not _is_empty(diff["events"]):
_section_owned(out, "EVENTS", diff["events"], _fmt_event,
lambda e: e["owner"], lambda e: e["name"])
if want("fields") and not _is_empty(diff["fields"]):
_section_owned(out, "FIELDS", diff["fields"], _fmt_field,
lambda f: f["owner"], lambda f: f["name"])
if want("layout") and not _is_empty(diff["struct_layout"]):
_section_owned(out, "STRUCT LAYOUT", diff["struct_layout"], _fmt_layout,
lambda x: x["owner"], lambda x: "@{0:#x}".format(x["offset"]))
if want("methods") and diff["moved_methods"]:
out.append("")
out.append("MOVED METHODS {0}".format(len(diff["moved_methods"])))
for m in sorted(diff["moved_methods"], key=lambda x: x["name"]):
out.append(" {0}: {1} -> {2}".format(
m["name"], ",".join(m["from_owners"]), ",".join(m["to_owners"])))
if all(_is_empty(diff[a]) for a in ("types", "methods", "events", "fields", "struct_layout")):
out.append("")
out.append("(no differences)")
return "\n".join(out)

58
ams/snapshot.py Normal file
View File

@@ -0,0 +1,58 @@
"""Loading and light typed access to an engine-surface snapshot.json."""
from __future__ import annotations
import json
from dataclasses import dataclass
from typing import Any
@dataclass
class Snapshot:
"""Thin wrapper over a parsed snapshot.json. Axes are returned as plain lists of dicts so
they round-trip cleanly to JSON; the diff engine works directly on those records."""
raw: dict[str, Any]
@classmethod
def load(cls, path: str) -> "Snapshot":
with open(path, "r", encoding="utf-8") as fh:
return cls(json.load(fh))
@property
def binary(self) -> dict[str, Any]:
return self.raw.get("binary", {})
@property
def types(self) -> list[dict]:
return self.raw.get("types", [])
@property
def methods(self) -> list[dict]:
return self.raw.get("methods", [])
@property
def events(self) -> list[dict]:
return self.raw.get("events", [])
@property
def fields(self) -> list[dict]:
return self.raw.get("fields", [])
@property
def struct_layout(self) -> list[dict]:
return self.raw.get("struct_layout", [])
@property
def method_inheritance(self) -> list[dict]:
return self.raw.get("method_inheritance", [])
@property
def field_inheritance(self) -> list[dict]:
return self.raw.get("field_inheritance", [])
@property
def label(self) -> str:
b = self.binary
return "{0} [{1}/{2}]".format(
b.get("name", "?"), b.get("engine", "?"), b.get("compiler", "?"))

9
pyproject.toml Normal file
View File

@@ -0,0 +1,9 @@
[project]
name = "ams"
version = "0.1.0"
description = "Aidem Media engine-surface snapshot diffing (Piklib/BlooMoo)"
requires-python = ">=3.9"
dependencies = []
[tool.pytest.ini_options]
testpaths = ["tests"]

103
tests/test_diff.py Normal file
View File

@@ -0,0 +1,103 @@
"""Unit tests for the diff engine + an integration test over the committed golden pair."""
from __future__ import annotations
from pathlib import Path
import pytest
from ams.diff import compute_diff, filter_by_owner
from ams.render import render_text
from ams.snapshot import Snapshot
SNAP_DIR = Path(__file__).resolve().parents[1] / "snapshots"
def _snap(**axes) -> Snapshot:
base = {"binary": {"name": "x", "engine": "e", "compiler": "c"},
"types": [], "methods": [], "events": [], "fields": [],
"struct_layout": [], "method_inheritance": [], "field_inheritance": []}
base.update(axes)
return Snapshot(base)
# --- unit --------------------------------------------------------------------------------------
def test_types_added_removed_changed():
old = _snap(types=[
{"script_name": "ANIMO", "cpp_class": "CMC_Animo", "object_size": 108, "via_module_iface": False},
{"script_name": "OLD", "cpp_class": "CMC_Old", "object_size": 10, "via_module_iface": False},
])
new = _snap(types=[
{"script_name": "ANIMO", "cpp_class": "CMC_Animo", "object_size": 128, "via_module_iface": False},
{"script_name": "NEW", "cpp_class": "CMC_New", "object_size": 20, "via_module_iface": False},
])
d = compute_diff(old, new)["types"]
assert [t["script_name"] for t in d["added"]] == ["NEW"]
assert [t["script_name"] for t in d["removed"]] == ["OLD"]
assert len(d["changed"]) == 1
assert d["changed"][0]["changes"]["object_size"] == [108, 128]
def test_dual_multiarray_kept_distinct():
# same script_name, different via_module_iface -> two distinct, stable keys
items = [
{"script_name": "MULTIARRAY", "cpp_class": "CMC_MultiArray", "object_size": 88, "via_module_iface": False},
{"script_name": "MULTIARRAY", "cpp_class": "CMC_MultiArray", "object_size": 88, "via_module_iface": True},
]
d = compute_diff(_snap(types=items), _snap(types=items))["types"]
assert d == {"added": [], "removed": [], "changed": []}
def test_method_id_change_and_move():
old = _snap(methods=[
{"owner": "CMC_Animo", "name": "SHOW", "id": 1},
{"owner": "CMC_Animo", "name": "PING", "id": 9},
])
new = _snap(methods=[
{"owner": "CMC_Animo", "name": "SHOW", "id": 2}, # id changed
{"owner": "CMC", "name": "PING", "id": 9}, # moved Animo -> base CMC
])
d = compute_diff(old, new)
assert d["methods"]["changed"][0]["changes"]["id"] == [1, 2]
assert d["moved_methods"] == [{"name": "PING", "from_owners": ["CMC_Animo"], "to_owners": ["CMC"]}]
def test_field_type_change_and_owner_filter():
old = _snap(fields=[{"owner": "CMC_Animo", "name": "FPS", "type": "int", "order": 0},
{"owner": "CMC_Sound", "name": "VOLUME", "type": "int", "order": 0}])
new = _snap(fields=[{"owner": "CMC_Animo", "name": "FPS", "type": "double", "order": 0},
{"owner": "CMC_Sound", "name": "VOLUME", "type": "int", "order": 0}])
d = filter_by_owner(compute_diff(old, new), "CMC_Animo")
assert d["fields"]["changed"][0]["changes"]["type"] == ["int", "double"]
assert d["fields"]["added"] == [] and d["fields"]["removed"] == []
def test_render_no_diff():
out = render_text(compute_diff(_snap(), _snap()))
assert "(no differences)" in out
# --- integration over the committed golden pair ------------------------------------------------
@pytest.mark.skipif(not (SNAP_DIR / "PIKLIB8.dll.snapshot.json").exists(),
reason="golden snapshots not present")
def test_golden_pair_piklib_to_bloomoo():
old = Snapshot.load(str(SNAP_DIR / "PIKLIB8.dll.snapshot.json")) # VS6
new = Snapshot.load(str(SNAP_DIR / "bloomoodll.dll.snapshot.json")) # VS8
d = compute_diff(old, new)
added_types = {t["script_name"] for t in d["types"]["added"]}
assert {"GRBUFFER", "INTERNET"} <= added_types
assert d["types"]["removed"] == []
size_changes = {c["item"]["script_name"]: c["changes"].get("object_size")
for c in d["types"]["changed"]}
assert size_changes.get("MOUSE") == [104, 128]
# BlooMoo added Animo methods; none removed for Animo
animo = filter_by_owner(d, "CMC_Animo")
assert "GETFPS" in {m["name"] for m in animo["methods"]["added"]}
assert animo["fields"]["added"] == [] # Animo's script fields are identical across the pair
# rendering must not raise and must mention the new types
text = render_text(d)
assert "GRBUFFER" in text and "MOUSE" in text