"""End-to-end acquisition: a game archive (or loose DLL) → snapshot → catalog. archive.iso ─▶ unpack ─▶ pick engine DLL ─▶ hash ─▶ Ghidra headless │ snapshot.json ◀────────┘ │ enrich (fuzzy/md5/acquisition) ▼ import to DB *or* POST /snapshots Everything heavy is injectable: pass your own `extract_fn(dll, out)->path` to test the wiring without Ghidra. The default uses `ghidra.run_extractor`. """ from __future__ import annotations import json import os import shutil import tempfile from dataclasses import dataclass, field from typing import Callable from . import ghidra from .identify import Candidate, FileHashes, find_engine_dlls, hash_file from .unpack import detect_kind, unpack ExtractFn = Callable[[str, str], str] class AcquireError(RuntimeError): pass @dataclass class AcquireResult: source: str dll: str # absolute path to the engine DLL that was analysed candidate: Candidate hashes: FileHashes snapshot_path: str snapshot: dict engine: str | None imported_id: int | None = None sink: str = "none" # "db" | "http" | "none" extras: dict = field(default_factory=dict) def _default_out_dir() -> str: # repo snapshots/ (…/ams/acquire/pipeline.py -> repo root two levels up) return str((__import__("pathlib").Path(__file__).resolve().parents[2] / "snapshots")) def _enrich(snapshot: dict, *, dll: str, hashes: FileHashes, candidate: Candidate, source: str) -> None: """Stamp acquisition provenance into the snapshot's binary block (in place).""" binary = snapshot.setdefault("binary", {}) # The extractor reads sha256 from the live program; backfill / cross-check from our own hash. binary.setdefault("sha256", hashes.sha256) binary["md5"] = hashes.md5 binary["size"] = hashes.size if hashes.fuzzy: binary["fuzzy"] = hashes.fuzzy binary["acquisition"] = { "source": os.path.basename(source), "dll_name": os.path.basename(dll), "identify_engine": candidate.engine, "identify_reason": candidate.reason, } def acquire( source: str, game_name: str | None = None, *, out_dir: str | None = None, extract_fn: ExtractFn | None = None, sink: str = "db", post_url: str | None = None, keep_workdir: bool = False, ) -> AcquireResult: """Run the full pipeline on `source` (an ISO/ZIP, a directory, or a loose DLL). sink: "db" imports straight into the catalog DB, "http" POSTs to `post_url`/snapshots, "none" just produces the snapshot file. `extract_fn` overrides the Ghidra step.""" source = os.path.abspath(source) if not os.path.exists(source): raise AcquireError("source does not exist: {0}".format(source)) extract_fn = extract_fn or ghidra.run_extractor out_dir = out_dir or _default_out_dir() kind = detect_kind(source) workdir: str | None = None try: # 1. resolve a directory/file tree to search for the engine DLL if kind in ("iso", "zip"): workdir = unpack(source) search_root = workdir else: # dir or pe — search in place search_root = source # 2. pick the engine DLL (content-based) cands = find_engine_dlls(search_root) if not cands: raise AcquireError("no Piklib/BlooMoo engine DLL found under {0}".format(search_root)) best = cands[0] dll = os.path.abspath(best.path) # 3. hash it hashes = hash_file(dll) # 4. run the extractor (Ghidra, or an injected stub) os.makedirs(out_dir, exist_ok=True) out_path = os.path.join(out_dir, os.path.basename(dll) + ".snapshot.json") extract_fn(dll, out_path) with open(out_path, "r", encoding="utf-8") as fh: snapshot = json.load(fh) # 5. enrich with acquisition provenance and rewrite _enrich(snapshot, dll=dll, hashes=hashes, candidate=best, source=source) with open(out_path, "w", encoding="utf-8") as fh: json.dump(snapshot, fh, indent=2, sort_keys=True) result = AcquireResult( source=source, dll=dll, candidate=best, hashes=hashes, snapshot_path=out_path, snapshot=snapshot, engine=snapshot.get("binary", {}).get("engine") or best.engine, ) # 6. land it in the catalog if sink == "db": result.imported_id = _import_db(snapshot, game_name) result.sink = "db" elif sink == "http": result.imported_id = _post_http(post_url, snapshot, game_name) result.sink = "http" return result finally: if workdir and not keep_workdir: shutil.rmtree(workdir, ignore_errors=True) def _import_db(snapshot: dict, game_name: str | None) -> int: from ..api.db import get_session, init_db from ..api.service import import_snapshot init_db() db = get_session() try: snap = import_snapshot(db, snapshot, game_name) return snap.id finally: db.close() def _post_http(post_url: str | None, snapshot: dict, game_name: str | None) -> int: import urllib.parse import urllib.request if not post_url: raise AcquireError("sink='http' requires post_url (e.g. http://127.0.0.1:8000)") url = post_url.rstrip("/") + "/snapshots" if game_name: url += "?" + urllib.parse.urlencode({"game": game_name}) req = urllib.request.Request( url, data=json.dumps(snapshot).encode("utf-8"), headers={"Content-Type": "application/json"}, method="POST") with urllib.request.urlopen(req, timeout=30) as resp: body = json.loads(resp.read().decode("utf-8")) return body.get("id")