Closes the chain from a game file to a catalog entry: unpack an ISO/ZIP, content-identify the engine DLL (CMC_ObjectsContainer marker in RTTI, so a renamed file is still found), hash it (sha256 + md5 + optional ssdeep via ppdeep), run Ghidra headless with the extractor, enrich and import the snapshot. - unpack.py: bsdtar (ISO9660 + ZIP) with a pure-Python zipfile fallback - identify.py: content-based engine-DLL picker + hashing - ghidra.py: analyzeHeadless launcher discovery + post-script run - pipeline.py: orchestration with injectable extract_fn; sink db|http|none - cli.py: python -m ams.acquire (incl. --identify-only dry run) - tests: 7 new (forged PE markers + stubbed extractor) -> 18/18 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
170 lines
5.9 KiB
Python
170 lines
5.9 KiB
Python
"""End-to-end acquisition: a game archive (or loose DLL) → snapshot → catalog.
|
|
|
|
archive.iso ─▶ unpack ─▶ pick engine DLL ─▶ hash ─▶ Ghidra headless
|
|
│
|
|
snapshot.json ◀────────┘
|
|
│ enrich (fuzzy/md5/acquisition)
|
|
▼
|
|
import to DB *or* POST /snapshots
|
|
|
|
Everything heavy is injectable: pass your own `extract_fn(dll, out)->path` to test the
|
|
wiring without Ghidra. The default uses `ghidra.run_extractor`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
from dataclasses import dataclass, field
|
|
from typing import Callable
|
|
|
|
from . import ghidra
|
|
from .identify import Candidate, FileHashes, find_engine_dlls, hash_file
|
|
from .unpack import detect_kind, unpack
|
|
|
|
ExtractFn = Callable[[str, str], str]
|
|
|
|
|
|
class AcquireError(RuntimeError):
|
|
pass
|
|
|
|
|
|
@dataclass
|
|
class AcquireResult:
|
|
source: str
|
|
dll: str # absolute path to the engine DLL that was analysed
|
|
candidate: Candidate
|
|
hashes: FileHashes
|
|
snapshot_path: str
|
|
snapshot: dict
|
|
engine: str | None
|
|
imported_id: int | None = None
|
|
sink: str = "none" # "db" | "http" | "none"
|
|
extras: dict = field(default_factory=dict)
|
|
|
|
|
|
def _default_out_dir() -> str:
|
|
# repo snapshots/ (…/ams/acquire/pipeline.py -> repo root two levels up)
|
|
return str((__import__("pathlib").Path(__file__).resolve().parents[2] / "snapshots"))
|
|
|
|
|
|
def _enrich(snapshot: dict, *, dll: str, hashes: FileHashes, candidate: Candidate, source: str) -> None:
|
|
"""Stamp acquisition provenance into the snapshot's binary block (in place)."""
|
|
binary = snapshot.setdefault("binary", {})
|
|
# The extractor reads sha256 from the live program; backfill / cross-check from our own hash.
|
|
binary.setdefault("sha256", hashes.sha256)
|
|
binary["md5"] = hashes.md5
|
|
binary["size"] = hashes.size
|
|
if hashes.fuzzy:
|
|
binary["fuzzy"] = hashes.fuzzy
|
|
binary["acquisition"] = {
|
|
"source": os.path.basename(source),
|
|
"dll_name": os.path.basename(dll),
|
|
"identify_engine": candidate.engine,
|
|
"identify_reason": candidate.reason,
|
|
}
|
|
|
|
|
|
def acquire(
|
|
source: str,
|
|
game_name: str | None = None,
|
|
*,
|
|
out_dir: str | None = None,
|
|
extract_fn: ExtractFn | None = None,
|
|
sink: str = "db",
|
|
post_url: str | None = None,
|
|
keep_workdir: bool = False,
|
|
) -> AcquireResult:
|
|
"""Run the full pipeline on `source` (an ISO/ZIP, a directory, or a loose DLL).
|
|
|
|
sink: "db" imports straight into the catalog DB, "http" POSTs to `post_url`/snapshots,
|
|
"none" just produces the snapshot file. `extract_fn` overrides the Ghidra step."""
|
|
source = os.path.abspath(source)
|
|
if not os.path.exists(source):
|
|
raise AcquireError("source does not exist: {0}".format(source))
|
|
|
|
extract_fn = extract_fn or ghidra.run_extractor
|
|
out_dir = out_dir or _default_out_dir()
|
|
|
|
kind = detect_kind(source)
|
|
workdir: str | None = None
|
|
try:
|
|
# 1. resolve a directory/file tree to search for the engine DLL
|
|
if kind in ("iso", "zip"):
|
|
workdir = unpack(source)
|
|
search_root = workdir
|
|
else: # dir or pe — search in place
|
|
search_root = source
|
|
|
|
# 2. pick the engine DLL (content-based)
|
|
cands = find_engine_dlls(search_root)
|
|
if not cands:
|
|
raise AcquireError("no Piklib/BlooMoo engine DLL found under {0}".format(search_root))
|
|
best = cands[0]
|
|
dll = os.path.abspath(best.path)
|
|
|
|
# 3. hash it
|
|
hashes = hash_file(dll)
|
|
|
|
# 4. run the extractor (Ghidra, or an injected stub)
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
out_path = os.path.join(out_dir, os.path.basename(dll) + ".snapshot.json")
|
|
extract_fn(dll, out_path)
|
|
with open(out_path, "r", encoding="utf-8") as fh:
|
|
snapshot = json.load(fh)
|
|
|
|
# 5. enrich with acquisition provenance and rewrite
|
|
_enrich(snapshot, dll=dll, hashes=hashes, candidate=best, source=source)
|
|
with open(out_path, "w", encoding="utf-8") as fh:
|
|
json.dump(snapshot, fh, indent=2, sort_keys=True)
|
|
|
|
result = AcquireResult(
|
|
source=source, dll=dll, candidate=best, hashes=hashes,
|
|
snapshot_path=out_path, snapshot=snapshot,
|
|
engine=snapshot.get("binary", {}).get("engine") or best.engine,
|
|
)
|
|
|
|
# 6. land it in the catalog
|
|
if sink == "db":
|
|
result.imported_id = _import_db(snapshot, game_name)
|
|
result.sink = "db"
|
|
elif sink == "http":
|
|
result.imported_id = _post_http(post_url, snapshot, game_name)
|
|
result.sink = "http"
|
|
return result
|
|
finally:
|
|
if workdir and not keep_workdir:
|
|
shutil.rmtree(workdir, ignore_errors=True)
|
|
|
|
|
|
def _import_db(snapshot: dict, game_name: str | None) -> int:
|
|
from ..api.db import get_session, init_db
|
|
from ..api.service import import_snapshot
|
|
|
|
init_db()
|
|
db = get_session()
|
|
try:
|
|
snap = import_snapshot(db, snapshot, game_name)
|
|
return snap.id
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _post_http(post_url: str | None, snapshot: dict, game_name: str | None) -> int:
|
|
import urllib.parse
|
|
import urllib.request
|
|
|
|
if not post_url:
|
|
raise AcquireError("sink='http' requires post_url (e.g. http://127.0.0.1:8000)")
|
|
url = post_url.rstrip("/") + "/snapshots"
|
|
if game_name:
|
|
url += "?" + urllib.parse.urlencode({"game": game_name})
|
|
req = urllib.request.Request(
|
|
url, data=json.dumps(snapshot).encode("utf-8"),
|
|
headers={"Content-Type": "application/json"}, method="POST")
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
body = json.loads(resp.read().decode("utf-8"))
|
|
return body.get("id")
|