Files
Aidem-Media-DLL-Analysis/ams/acquire/ghidra.py
Patryk Gensch f4aa7caaa9 Containerise: Postgres + Redis/RQ + API + Ghidra worker
Brings up the documented target architecture as a docker-compose stack — a
modular monolith with the Ghidra step split into its own async worker.

- worker/: RQ queue (lazy redis import) + run_acquisition task (Job status
  queued→started→finished/failed, drives ams.acquire with sink=db)
- Job model + JobOut schema; Snapshot.data is JSONB on Postgres
- POST/GET /jobs: stream an upload to a shared volume, enqueue, poll status
- docker/api.Dockerfile (slim) + docker/worker.Dockerfile (JDK21 + Ghidra
  fetched at build, overridable via GHIDRA_URL) + docker-compose.yml
- ghidra.py: AMS_GHIDRA_SCRIPTS override for in-container script path
- pyproject: [worker] extra (rq/redis/psycopg), python-multipart in [api]
- tests: 4 new (task success/failure + endpoint enqueue/503) -> 22/22

Verified: API image builds, container serves /health + /ui + /jobs; compose
config validates. Worker image (downloads ~1 GB Ghidra) not built here.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-31 12:24:47 +02:00

94 lines
3.1 KiB
Python

"""Drive Ghidra's `analyzeHeadless` to run the engine-surface extractor on a DLL.
This is the heavy worker step: it imports the binary into a throwaway Ghidra
project, auto-analyses it, then runs `ghidra_scripts/extract_engine_surface.py`
as a post-script that writes the snapshot JSON to a path we pick.
Ghidra isn't a Python package, so it must be located on disk. Resolution order:
1. $GHIDRA_HEADLESS — full path to the analyzeHeadless launcher
2. $GHIDRA_HOME/support/analyzeHeadless
3. `analyzeHeadless` on PATH
"""
from __future__ import annotations
import os
import shutil
import subprocess
import tempfile
import uuid
from pathlib import Path
_SCRIPT_NAME = "extract_engine_surface.py"
# ams/acquire/ghidra.py -> repo root is two parents up
_SCRIPT_DIR = Path(__file__).resolve().parents[2] / "ghidra_scripts"
class GhidraNotFound(RuntimeError):
pass
class GhidraRunError(RuntimeError):
pass
def find_headless() -> str | None:
"""Locate the analyzeHeadless launcher, or None if Ghidra isn't configured."""
env = os.environ.get("GHIDRA_HEADLESS")
if env and os.path.isfile(env):
return env
home = os.environ.get("GHIDRA_HOME")
if home:
for name in ("analyzeHeadless", "analyzeHeadless.bat"):
cand = os.path.join(home, "support", name)
if os.path.isfile(cand):
return cand
return shutil.which("analyzeHeadless")
def run_extractor(
dll_path: str,
out_path: str,
*,
headless: str | None = None,
script_dir: str | None = None,
timeout: int = 1800,
) -> str:
"""Headless-analyse `dll_path` and write the snapshot to `out_path`; returns `out_path`.
Raises GhidraNotFound if no launcher is configured, GhidraRunError on failure or if
the script produced no output."""
headless = headless or find_headless()
if not headless:
raise GhidraNotFound(
"analyzeHeadless not found — set $GHIDRA_HEADLESS or $GHIDRA_HOME (Ghidra's install dir)")
script_dir = script_dir or os.environ.get("AMS_GHIDRA_SCRIPTS") or str(_SCRIPT_DIR)
out_path = os.path.abspath(out_path)
os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
proj_dir = tempfile.mkdtemp(prefix="ams_ghidra_")
proj_name = "ams_" + uuid.uuid4().hex[:8]
cmd = [
headless, proj_dir, proj_name,
"-import", dll_path,
"-scriptPath", script_dir,
"-postScript", _SCRIPT_NAME, out_path,
"-deleteProject",
]
try:
proc = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout)
except subprocess.TimeoutExpired:
raise GhidraRunError("analyzeHeadless timed out after {0}s".format(timeout))
except OSError as e:
raise GhidraRunError("failed to launch analyzeHeadless: {0}".format(e))
finally:
shutil.rmtree(proj_dir, ignore_errors=True)
if not os.path.isfile(out_path):
tail = proc.stdout.decode("utf-8", "replace")[-2000:] if proc.stdout else ""
raise GhidraRunError(
"extractor produced no snapshot at {0}\n--- headless tail ---\n{1}".format(out_path, tail))
return out_path