diff --git a/ams/__init__.py b/ams/__init__.py new file mode 100644 index 0000000..2be304f --- /dev/null +++ b/ams/__init__.py @@ -0,0 +1,7 @@ +"""ams - Aidem Media engine-surface tooling. + +Compares engine-surface snapshots (produced by ghidra_scripts/extract_engine_surface.py) +across versions of the Piklib/BlooMoo engines. +""" + +__version__ = "0.1.0" diff --git a/ams/__main__.py b/ams/__main__.py new file mode 100644 index 0000000..dd8a8c9 --- /dev/null +++ b/ams/__main__.py @@ -0,0 +1,5 @@ +import sys + +from .cli import main + +sys.exit(main()) diff --git a/ams/cli.py b/ams/cli.py new file mode 100644 index 0000000..161a688 --- /dev/null +++ b/ams/cli.py @@ -0,0 +1,49 @@ +"""CLI: diff two engine-surface snapshots. + + python -m ams OLD.snapshot.json NEW.snapshot.json [--owner CMC_Animo] [--only types,methods] [--json] +""" + +from __future__ import annotations + +import argparse +import json +import sys + +from .diff import compute_diff, filter_by_owner +from .render import render_text +from .snapshot import Snapshot + +_AXES = ["types", "methods", "events", "fields", "layout"] + + +def main(argv: list[str] | None = None) -> int: + p = argparse.ArgumentParser(prog="ams", description="Diff two engine-surface snapshots.") + p.add_argument("old", help="older snapshot.json") + p.add_argument("new", help="newer snapshot.json") + p.add_argument("--owner", help="restrict to one class, e.g. CMC_Animo") + p.add_argument("--only", help="comma-separated axes to show: " + ",".join(_AXES)) + p.add_argument("--json", action="store_true", help="emit machine-readable JSON") + args = p.parse_args(argv) + + old = Snapshot.load(args.old) + new = Snapshot.load(args.new) + diff = compute_diff(old, new) + if args.owner: + diff = filter_by_owner(diff, args.owner) + + if args.json: + print(json.dumps(diff, indent=2, sort_keys=True)) + return 0 + + only = None + if args.only: + only = {a.strip() for a in args.only.split(",") if a.strip()} + bad = only - set(_AXES) + if bad: + p.error("unknown axis: {0} (choose from {1})".format(",".join(sorted(bad)), ",".join(_AXES))) + print(render_text(diff, only=only)) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/ams/diff.py b/ams/diff.py new file mode 100644 index 0000000..58e4409 --- /dev/null +++ b/ams/diff.py @@ -0,0 +1,115 @@ +"""Compute a structured diff between two engine-surface snapshots. + +The result is a plain dict (JSON-serialisable). Each axis is a {added, removed, changed} block +produced by `keyed_diff`; `changed` entries carry the per-field old->new deltas. Methods also get +a cross-owner `moved` pass to surface hierarchy reparenting. +""" + +from __future__ import annotations + +from typing import Any, Callable, Hashable + +from .snapshot import Snapshot + +Item = dict +KeyFn = Callable[[Item], Hashable] + + +def _index(items: list[Item], key: KeyFn) -> dict[Hashable, Item]: + return {key(it): it for it in items} + + +def keyed_diff(a_items: list[Item], b_items: list[Item], key: KeyFn, + compare_fields: list[str]) -> dict[str, Any]: + """Set-diff two item lists by `key`; for items present in both, report changed compare_fields.""" + a = _index(a_items, key) + b = _index(b_items, key) + added = [b[k] for k in b if k not in a] + removed = [a[k] for k in a if k not in b] + changed = [] + for k in a: + if k not in b: + continue + deltas = {f: [a[k].get(f), b[k].get(f)] for f in compare_fields if a[k].get(f) != b[k].get(f)} + if deltas: + changed.append({"item": b[k], "changes": deltas}) + return {"added": added, "removed": removed, "changed": changed} + + +# --- per-axis keys ----------------------------------------------------------------------------- +# Types: script_name + via_module_iface keeps the dual-dispatch MULTIARRAY entries distinct and +# stable across versions (addresses change, this semantic flag does not). +def _type_key(t: Item) -> Hashable: + return (t["script_name"], bool(t.get("via_module_iface"))) + + +def _owner_name_key(x: Item) -> Hashable: + return (x["owner"], x["name"]) + + +def _layout_key(x: Item) -> Hashable: + return (x["owner"], x["offset"]) + + +def _detect_method_moves(old_m: list[Item], new_m: list[Item]) -> list[Item]: + """A method name that left some owner and appeared under another - i.e. moved in the hierarchy.""" + def owners_by_name(items: list[Item]) -> dict[str, set]: + out: dict[str, set] = {} + for m in items: + out.setdefault(m["name"], set()).add(m["owner"]) + return out + + old_o, new_o = owners_by_name(old_m), owners_by_name(new_m) + moves = [] + for name in sorted(set(old_o) & set(new_o)): + lost = old_o[name] - new_o[name] + gained = new_o[name] - old_o[name] + if lost and gained: + moves.append({"name": name, "from_owners": sorted(lost), "to_owners": sorted(gained)}) + return moves + + +def compute_diff(old: Snapshot, new: Snapshot) -> dict[str, Any]: + return { + "binary": {"from": old.binary, "to": new.binary}, + "types": keyed_diff(old.types, new.types, _type_key, ["cpp_class", "object_size"]), + "methods": keyed_diff(old.methods, new.methods, _owner_name_key, ["id"]), + "events": keyed_diff(old.events, new.events, _owner_name_key, ["order"]), + "fields": keyed_diff(old.fields, new.fields, _owner_name_key, ["type"]), + "struct_layout": keyed_diff(old.struct_layout, new.struct_layout, _layout_key, + ["size", "is_vtable"]), + "method_inheritance": keyed_diff(old.method_inheritance, new.method_inheritance, + lambda x: x["runner"], ["base_runner"]), + "field_inheritance": keyed_diff(old.field_inheritance, new.field_inheritance, + lambda x: x["class"], ["base_class"]), + "moved_methods": _detect_method_moves(old.methods, new.methods), + } + + +# --- owner filtering (for `--owner CMC_Animo`) ------------------------------------------------- +def _item_owner(axis: str, item: Item) -> str | None: + if axis == "types": + return item.get("cpp_class") + if axis in ("methods", "events", "fields", "struct_layout"): + return item.get("owner") + if axis == "method_inheritance": + return item.get("runner") + if axis == "field_inheritance": + return item.get("class") + return None + + +def filter_by_owner(diff: dict[str, Any], owner: str) -> dict[str, Any]: + """Restrict every axis to a single class/owner. `binary` and `moved_methods` are kept whole.""" + out: dict[str, Any] = {"binary": diff["binary"]} + out["moved_methods"] = [m for m in diff["moved_methods"] + if owner in m["from_owners"] or owner in m["to_owners"]] + for axis, block in diff.items(): + if axis in ("binary", "moved_methods"): + continue + out[axis] = { + "added": [i for i in block["added"] if _item_owner(axis, i) == owner], + "removed": [i for i in block["removed"] if _item_owner(axis, i) == owner], + "changed": [c for c in block["changed"] if _item_owner(axis, c["item"]) == owner], + } + return out diff --git a/ams/render.py b/ams/render.py new file mode 100644 index 0000000..05b9149 --- /dev/null +++ b/ams/render.py @@ -0,0 +1,120 @@ +"""Human-readable rendering of a snapshot diff (see diff.compute_diff).""" + +from __future__ import annotations + +from typing import Any, Callable + + +def _counts(block: dict[str, Any]) -> str: + return "+{0} -{1} ~{2}".format( + len(block["added"]), len(block["removed"]), len(block["changed"])) + + +def _fmt_changes(changes: dict[str, list]) -> str: + return "; ".join("{0}: {1} -> {2}".format(f, v[0], v[1]) for f, v in sorted(changes.items())) + + +def _group_by(items: list[dict], owner_of: Callable[[dict], str]) -> dict[str, list[dict]]: + out: dict[str, list[dict]] = {} + for it in items: + out.setdefault(owner_of(it) or "?", []).append(it) + return out + + +# --- per-axis item formatting ------------------------------------------------------------------ +def _fmt_type(t: dict) -> str: + tag = " [via module iface]" if t.get("via_module_iface") else "" + size = t.get("object_size") + cls = t.get("cpp_class") or "?" + return "{0} -> {1} (size {2}){3}".format(t["script_name"], cls, size, tag) + + +def _fmt_method(m: dict) -> str: + return "{0} (id {1})".format(m["name"], m.get("id")) + + +def _fmt_event(e: dict) -> str: + return "{0} (#{1})".format(e["name"], e.get("order")) + + +def _fmt_field(f: dict) -> str: + return "{0}: {1}".format(f["name"], f.get("type")) + + +def _fmt_layout(x: dict) -> str: + vt = " vtable" if x.get("is_vtable") else "" + return "@{0:#x} size {1}{2}".format(x["offset"], x.get("size"), vt) + + +# --- section renderers ------------------------------------------------------------------------- +def _section_flat(out: list[str], title: str, block: dict, fmt: Callable[[dict], str], + name_of: Callable[[dict], str]) -> None: + out.append("") + out.append("{0:<16} {1}".format(title, _counts(block))) + for it in sorted(block["added"], key=name_of): + out.append(" + {0}".format(fmt(it))) + for it in sorted(block["removed"], key=name_of): + out.append(" - {0}".format(name_of(it))) + for ch in sorted(block["changed"], key=lambda c: name_of(c["item"])): + out.append(" ~ {0:<22} {1}".format(name_of(ch["item"]), _fmt_changes(ch["changes"]))) + + +def _section_owned(out: list[str], title: str, block: dict, fmt: Callable[[dict], str], + owner_of: Callable[[dict], str], name_of: Callable[[dict], str]) -> None: + out.append("") + out.append("{0:<16} {1}".format(title, _counts(block))) + added = _group_by(block["added"], owner_of) + removed = _group_by(block["removed"], owner_of) + changed = _group_by([c["item"] for c in block["changed"]], owner_of) + change_by_id = {id(c["item"]): c for c in block["changed"]} + for owner in sorted(set(added) | set(removed) | set(changed)): + out.append(" {0}".format(owner)) + for it in sorted(added.get(owner, []), key=name_of): + out.append(" + {0}".format(fmt(it))) + for it in sorted(removed.get(owner, []), key=name_of): + out.append(" - {0}".format(name_of(it))) + for it in sorted(changed.get(owner, []), key=name_of): + out.append(" ~ {0:<22} {1}".format( + name_of(it), _fmt_changes(change_by_id[id(it)]["changes"]))) + + +def _is_empty(block: dict) -> bool: + return not (block["added"] or block["removed"] or block["changed"]) + + +def render_text(diff: dict[str, Any], only: set[str] | None = None) -> str: + b = diff["binary"] + out: list[str] = ["Engine surface diff"] + out.append(" from: {0} [{1}/{2}]".format( + b["from"].get("name", "?"), b["from"].get("engine", "?"), b["from"].get("compiler", "?"))) + out.append(" to: {0} [{1}/{2}]".format( + b["to"].get("name", "?"), b["to"].get("engine", "?"), b["to"].get("compiler", "?"))) + + def want(axis: str) -> bool: + return only is None or axis in only + + if want("types") and not _is_empty(diff["types"]): + _section_flat(out, "TYPES", diff["types"], _fmt_type, lambda t: t["script_name"]) + if want("methods") and not _is_empty(diff["methods"]): + _section_owned(out, "METHODS", diff["methods"], _fmt_method, + lambda m: m["owner"], lambda m: m["name"]) + if want("events") and not _is_empty(diff["events"]): + _section_owned(out, "EVENTS", diff["events"], _fmt_event, + lambda e: e["owner"], lambda e: e["name"]) + if want("fields") and not _is_empty(diff["fields"]): + _section_owned(out, "FIELDS", diff["fields"], _fmt_field, + lambda f: f["owner"], lambda f: f["name"]) + if want("layout") and not _is_empty(diff["struct_layout"]): + _section_owned(out, "STRUCT LAYOUT", diff["struct_layout"], _fmt_layout, + lambda x: x["owner"], lambda x: "@{0:#x}".format(x["offset"])) + if want("methods") and diff["moved_methods"]: + out.append("") + out.append("MOVED METHODS {0}".format(len(diff["moved_methods"]))) + for m in sorted(diff["moved_methods"], key=lambda x: x["name"]): + out.append(" {0}: {1} -> {2}".format( + m["name"], ",".join(m["from_owners"]), ",".join(m["to_owners"]))) + + if all(_is_empty(diff[a]) for a in ("types", "methods", "events", "fields", "struct_layout")): + out.append("") + out.append("(no differences)") + return "\n".join(out) diff --git a/ams/snapshot.py b/ams/snapshot.py new file mode 100644 index 0000000..358189a --- /dev/null +++ b/ams/snapshot.py @@ -0,0 +1,58 @@ +"""Loading and light typed access to an engine-surface snapshot.json.""" + +from __future__ import annotations + +import json +from dataclasses import dataclass +from typing import Any + + +@dataclass +class Snapshot: + """Thin wrapper over a parsed snapshot.json. Axes are returned as plain lists of dicts so + they round-trip cleanly to JSON; the diff engine works directly on those records.""" + + raw: dict[str, Any] + + @classmethod + def load(cls, path: str) -> "Snapshot": + with open(path, "r", encoding="utf-8") as fh: + return cls(json.load(fh)) + + @property + def binary(self) -> dict[str, Any]: + return self.raw.get("binary", {}) + + @property + def types(self) -> list[dict]: + return self.raw.get("types", []) + + @property + def methods(self) -> list[dict]: + return self.raw.get("methods", []) + + @property + def events(self) -> list[dict]: + return self.raw.get("events", []) + + @property + def fields(self) -> list[dict]: + return self.raw.get("fields", []) + + @property + def struct_layout(self) -> list[dict]: + return self.raw.get("struct_layout", []) + + @property + def method_inheritance(self) -> list[dict]: + return self.raw.get("method_inheritance", []) + + @property + def field_inheritance(self) -> list[dict]: + return self.raw.get("field_inheritance", []) + + @property + def label(self) -> str: + b = self.binary + return "{0} [{1}/{2}]".format( + b.get("name", "?"), b.get("engine", "?"), b.get("compiler", "?")) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..cdaecc6 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,9 @@ +[project] +name = "ams" +version = "0.1.0" +description = "Aidem Media engine-surface snapshot diffing (Piklib/BlooMoo)" +requires-python = ">=3.9" +dependencies = [] + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/tests/test_diff.py b/tests/test_diff.py new file mode 100644 index 0000000..d8c4260 --- /dev/null +++ b/tests/test_diff.py @@ -0,0 +1,103 @@ +"""Unit tests for the diff engine + an integration test over the committed golden pair.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from ams.diff import compute_diff, filter_by_owner +from ams.render import render_text +from ams.snapshot import Snapshot + +SNAP_DIR = Path(__file__).resolve().parents[1] / "snapshots" + + +def _snap(**axes) -> Snapshot: + base = {"binary": {"name": "x", "engine": "e", "compiler": "c"}, + "types": [], "methods": [], "events": [], "fields": [], + "struct_layout": [], "method_inheritance": [], "field_inheritance": []} + base.update(axes) + return Snapshot(base) + + +# --- unit -------------------------------------------------------------------------------------- +def test_types_added_removed_changed(): + old = _snap(types=[ + {"script_name": "ANIMO", "cpp_class": "CMC_Animo", "object_size": 108, "via_module_iface": False}, + {"script_name": "OLD", "cpp_class": "CMC_Old", "object_size": 10, "via_module_iface": False}, + ]) + new = _snap(types=[ + {"script_name": "ANIMO", "cpp_class": "CMC_Animo", "object_size": 128, "via_module_iface": False}, + {"script_name": "NEW", "cpp_class": "CMC_New", "object_size": 20, "via_module_iface": False}, + ]) + d = compute_diff(old, new)["types"] + assert [t["script_name"] for t in d["added"]] == ["NEW"] + assert [t["script_name"] for t in d["removed"]] == ["OLD"] + assert len(d["changed"]) == 1 + assert d["changed"][0]["changes"]["object_size"] == [108, 128] + + +def test_dual_multiarray_kept_distinct(): + # same script_name, different via_module_iface -> two distinct, stable keys + items = [ + {"script_name": "MULTIARRAY", "cpp_class": "CMC_MultiArray", "object_size": 88, "via_module_iface": False}, + {"script_name": "MULTIARRAY", "cpp_class": "CMC_MultiArray", "object_size": 88, "via_module_iface": True}, + ] + d = compute_diff(_snap(types=items), _snap(types=items))["types"] + assert d == {"added": [], "removed": [], "changed": []} + + +def test_method_id_change_and_move(): + old = _snap(methods=[ + {"owner": "CMC_Animo", "name": "SHOW", "id": 1}, + {"owner": "CMC_Animo", "name": "PING", "id": 9}, + ]) + new = _snap(methods=[ + {"owner": "CMC_Animo", "name": "SHOW", "id": 2}, # id changed + {"owner": "CMC", "name": "PING", "id": 9}, # moved Animo -> base CMC + ]) + d = compute_diff(old, new) + assert d["methods"]["changed"][0]["changes"]["id"] == [1, 2] + assert d["moved_methods"] == [{"name": "PING", "from_owners": ["CMC_Animo"], "to_owners": ["CMC"]}] + + +def test_field_type_change_and_owner_filter(): + old = _snap(fields=[{"owner": "CMC_Animo", "name": "FPS", "type": "int", "order": 0}, + {"owner": "CMC_Sound", "name": "VOLUME", "type": "int", "order": 0}]) + new = _snap(fields=[{"owner": "CMC_Animo", "name": "FPS", "type": "double", "order": 0}, + {"owner": "CMC_Sound", "name": "VOLUME", "type": "int", "order": 0}]) + d = filter_by_owner(compute_diff(old, new), "CMC_Animo") + assert d["fields"]["changed"][0]["changes"]["type"] == ["int", "double"] + assert d["fields"]["added"] == [] and d["fields"]["removed"] == [] + + +def test_render_no_diff(): + out = render_text(compute_diff(_snap(), _snap())) + assert "(no differences)" in out + + +# --- integration over the committed golden pair ------------------------------------------------ +@pytest.mark.skipif(not (SNAP_DIR / "PIKLIB8.dll.snapshot.json").exists(), + reason="golden snapshots not present") +def test_golden_pair_piklib_to_bloomoo(): + old = Snapshot.load(str(SNAP_DIR / "PIKLIB8.dll.snapshot.json")) # VS6 + new = Snapshot.load(str(SNAP_DIR / "bloomoodll.dll.snapshot.json")) # VS8 + d = compute_diff(old, new) + + added_types = {t["script_name"] for t in d["types"]["added"]} + assert {"GRBUFFER", "INTERNET"} <= added_types + assert d["types"]["removed"] == [] + + size_changes = {c["item"]["script_name"]: c["changes"].get("object_size") + for c in d["types"]["changed"]} + assert size_changes.get("MOUSE") == [104, 128] + + # BlooMoo added Animo methods; none removed for Animo + animo = filter_by_owner(d, "CMC_Animo") + assert "GETFPS" in {m["name"] for m in animo["methods"]["added"]} + assert animo["fields"]["added"] == [] # Animo's script fields are identical across the pair + + # rendering must not raise and must mention the new types + text = render_text(d) + assert "GRBUFFER" in text and "MOUSE" in text