Add ISO/ZIP acquisition pipeline (ams.acquire worker)
Closes the chain from a game file to a catalog entry: unpack an ISO/ZIP, content-identify the engine DLL (CMC_ObjectsContainer marker in RTTI, so a renamed file is still found), hash it (sha256 + md5 + optional ssdeep via ppdeep), run Ghidra headless with the extractor, enrich and import the snapshot. - unpack.py: bsdtar (ISO9660 + ZIP) with a pure-Python zipfile fallback - identify.py: content-based engine-DLL picker + hashing - ghidra.py: analyzeHeadless launcher discovery + post-script run - pipeline.py: orchestration with injectable extract_fn; sink db|http|none - cli.py: python -m ams.acquire (incl. --identify-only dry run) - tests: 7 new (forged PE markers + stubbed extractor) -> 18/18 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
93
ams/acquire/ghidra.py
Normal file
93
ams/acquire/ghidra.py
Normal file
@@ -0,0 +1,93 @@
|
||||
"""Drive Ghidra's `analyzeHeadless` to run the engine-surface extractor on a DLL.
|
||||
|
||||
This is the heavy worker step: it imports the binary into a throwaway Ghidra
|
||||
project, auto-analyses it, then runs `ghidra_scripts/extract_engine_surface.py`
|
||||
as a post-script that writes the snapshot JSON to a path we pick.
|
||||
|
||||
Ghidra isn't a Python package, so it must be located on disk. Resolution order:
|
||||
1. $GHIDRA_HEADLESS — full path to the analyzeHeadless launcher
|
||||
2. $GHIDRA_HOME/support/analyzeHeadless
|
||||
3. `analyzeHeadless` on PATH
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
_SCRIPT_NAME = "extract_engine_surface.py"
|
||||
# ams/acquire/ghidra.py -> repo root is two parents up
|
||||
_SCRIPT_DIR = Path(__file__).resolve().parents[2] / "ghidra_scripts"
|
||||
|
||||
|
||||
class GhidraNotFound(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
class GhidraRunError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def find_headless() -> str | None:
|
||||
"""Locate the analyzeHeadless launcher, or None if Ghidra isn't configured."""
|
||||
env = os.environ.get("GHIDRA_HEADLESS")
|
||||
if env and os.path.isfile(env):
|
||||
return env
|
||||
home = os.environ.get("GHIDRA_HOME")
|
||||
if home:
|
||||
for name in ("analyzeHeadless", "analyzeHeadless.bat"):
|
||||
cand = os.path.join(home, "support", name)
|
||||
if os.path.isfile(cand):
|
||||
return cand
|
||||
return shutil.which("analyzeHeadless")
|
||||
|
||||
|
||||
def run_extractor(
|
||||
dll_path: str,
|
||||
out_path: str,
|
||||
*,
|
||||
headless: str | None = None,
|
||||
script_dir: str | None = None,
|
||||
timeout: int = 1800,
|
||||
) -> str:
|
||||
"""Headless-analyse `dll_path` and write the snapshot to `out_path`; returns `out_path`.
|
||||
|
||||
Raises GhidraNotFound if no launcher is configured, GhidraRunError on failure or if
|
||||
the script produced no output."""
|
||||
headless = headless or find_headless()
|
||||
if not headless:
|
||||
raise GhidraNotFound(
|
||||
"analyzeHeadless not found — set $GHIDRA_HEADLESS or $GHIDRA_HOME (Ghidra's install dir)")
|
||||
|
||||
script_dir = script_dir or str(_SCRIPT_DIR)
|
||||
out_path = os.path.abspath(out_path)
|
||||
os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
|
||||
|
||||
proj_dir = tempfile.mkdtemp(prefix="ams_ghidra_")
|
||||
proj_name = "ams_" + uuid.uuid4().hex[:8]
|
||||
cmd = [
|
||||
headless, proj_dir, proj_name,
|
||||
"-import", dll_path,
|
||||
"-scriptPath", script_dir,
|
||||
"-postScript", _SCRIPT_NAME, out_path,
|
||||
"-deleteProject",
|
||||
]
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout)
|
||||
except subprocess.TimeoutExpired:
|
||||
raise GhidraRunError("analyzeHeadless timed out after {0}s".format(timeout))
|
||||
except OSError as e:
|
||||
raise GhidraRunError("failed to launch analyzeHeadless: {0}".format(e))
|
||||
finally:
|
||||
shutil.rmtree(proj_dir, ignore_errors=True)
|
||||
|
||||
if not os.path.isfile(out_path):
|
||||
tail = proc.stdout.decode("utf-8", "replace")[-2000:] if proc.stdout else ""
|
||||
raise GhidraRunError(
|
||||
"extractor produced no snapshot at {0}\n--- headless tail ---\n{1}".format(out_path, tail))
|
||||
return out_path
|
||||
Reference in New Issue
Block a user