Files
Patryk Gensch 6797ad5ddb Add ISO/ZIP acquisition pipeline (ams.acquire worker)
Closes the chain from a game file to a catalog entry: unpack an ISO/ZIP,
content-identify the engine DLL (CMC_ObjectsContainer marker in RTTI, so a
renamed file is still found), hash it (sha256 + md5 + optional ssdeep via
ppdeep), run Ghidra headless with the extractor, enrich and import the snapshot.

- unpack.py: bsdtar (ISO9660 + ZIP) with a pure-Python zipfile fallback
- identify.py: content-based engine-DLL picker + hashing
- ghidra.py: analyzeHeadless launcher discovery + post-script run
- pipeline.py: orchestration with injectable extract_fn; sink db|http|none
- cli.py: python -m ams.acquire (incl. --identify-only dry run)
- tests: 7 new (forged PE markers + stubbed extractor) -> 18/18

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-31 12:11:56 +02:00

170 lines
5.9 KiB
Python

"""End-to-end acquisition: a game archive (or loose DLL) → snapshot → catalog.
archive.iso ─▶ unpack ─▶ pick engine DLL ─▶ hash ─▶ Ghidra headless
snapshot.json ◀────────┘
│ enrich (fuzzy/md5/acquisition)
import to DB *or* POST /snapshots
Everything heavy is injectable: pass your own `extract_fn(dll, out)->path` to test the
wiring without Ghidra. The default uses `ghidra.run_extractor`.
"""
from __future__ import annotations
import json
import os
import shutil
import tempfile
from dataclasses import dataclass, field
from typing import Callable
from . import ghidra
from .identify import Candidate, FileHashes, find_engine_dlls, hash_file
from .unpack import detect_kind, unpack
ExtractFn = Callable[[str, str], str]
class AcquireError(RuntimeError):
pass
@dataclass
class AcquireResult:
source: str
dll: str # absolute path to the engine DLL that was analysed
candidate: Candidate
hashes: FileHashes
snapshot_path: str
snapshot: dict
engine: str | None
imported_id: int | None = None
sink: str = "none" # "db" | "http" | "none"
extras: dict = field(default_factory=dict)
def _default_out_dir() -> str:
# repo snapshots/ (…/ams/acquire/pipeline.py -> repo root two levels up)
return str((__import__("pathlib").Path(__file__).resolve().parents[2] / "snapshots"))
def _enrich(snapshot: dict, *, dll: str, hashes: FileHashes, candidate: Candidate, source: str) -> None:
"""Stamp acquisition provenance into the snapshot's binary block (in place)."""
binary = snapshot.setdefault("binary", {})
# The extractor reads sha256 from the live program; backfill / cross-check from our own hash.
binary.setdefault("sha256", hashes.sha256)
binary["md5"] = hashes.md5
binary["size"] = hashes.size
if hashes.fuzzy:
binary["fuzzy"] = hashes.fuzzy
binary["acquisition"] = {
"source": os.path.basename(source),
"dll_name": os.path.basename(dll),
"identify_engine": candidate.engine,
"identify_reason": candidate.reason,
}
def acquire(
source: str,
game_name: str | None = None,
*,
out_dir: str | None = None,
extract_fn: ExtractFn | None = None,
sink: str = "db",
post_url: str | None = None,
keep_workdir: bool = False,
) -> AcquireResult:
"""Run the full pipeline on `source` (an ISO/ZIP, a directory, or a loose DLL).
sink: "db" imports straight into the catalog DB, "http" POSTs to `post_url`/snapshots,
"none" just produces the snapshot file. `extract_fn` overrides the Ghidra step."""
source = os.path.abspath(source)
if not os.path.exists(source):
raise AcquireError("source does not exist: {0}".format(source))
extract_fn = extract_fn or ghidra.run_extractor
out_dir = out_dir or _default_out_dir()
kind = detect_kind(source)
workdir: str | None = None
try:
# 1. resolve a directory/file tree to search for the engine DLL
if kind in ("iso", "zip"):
workdir = unpack(source)
search_root = workdir
else: # dir or pe — search in place
search_root = source
# 2. pick the engine DLL (content-based)
cands = find_engine_dlls(search_root)
if not cands:
raise AcquireError("no Piklib/BlooMoo engine DLL found under {0}".format(search_root))
best = cands[0]
dll = os.path.abspath(best.path)
# 3. hash it
hashes = hash_file(dll)
# 4. run the extractor (Ghidra, or an injected stub)
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, os.path.basename(dll) + ".snapshot.json")
extract_fn(dll, out_path)
with open(out_path, "r", encoding="utf-8") as fh:
snapshot = json.load(fh)
# 5. enrich with acquisition provenance and rewrite
_enrich(snapshot, dll=dll, hashes=hashes, candidate=best, source=source)
with open(out_path, "w", encoding="utf-8") as fh:
json.dump(snapshot, fh, indent=2, sort_keys=True)
result = AcquireResult(
source=source, dll=dll, candidate=best, hashes=hashes,
snapshot_path=out_path, snapshot=snapshot,
engine=snapshot.get("binary", {}).get("engine") or best.engine,
)
# 6. land it in the catalog
if sink == "db":
result.imported_id = _import_db(snapshot, game_name)
result.sink = "db"
elif sink == "http":
result.imported_id = _post_http(post_url, snapshot, game_name)
result.sink = "http"
return result
finally:
if workdir and not keep_workdir:
shutil.rmtree(workdir, ignore_errors=True)
def _import_db(snapshot: dict, game_name: str | None) -> int:
from ..api.db import get_session, init_db
from ..api.service import import_snapshot
init_db()
db = get_session()
try:
snap = import_snapshot(db, snapshot, game_name)
return snap.id
finally:
db.close()
def _post_http(post_url: str | None, snapshot: dict, game_name: str | None) -> int:
import urllib.parse
import urllib.request
if not post_url:
raise AcquireError("sink='http' requires post_url (e.g. http://127.0.0.1:8000)")
url = post_url.rstrip("/") + "/snapshots"
if game_name:
url += "?" + urllib.parse.urlencode({"game": game_name})
req = urllib.request.Request(
url, data=json.dumps(snapshot).encode("utf-8"),
headers={"Content-Type": "application/json"}, method="POST")
with urllib.request.urlopen(req, timeout=30) as resp:
body = json.loads(resp.read().decode("utf-8"))
return body.get("id")