The .py extractor runs fine under PyGhidra in the GUI; only `analyzeHeadless` doesn't init PyGhidra. Add an env-gated CPython path so modern Ghidra works headless: - ghidra.run_extractor_pyghidra(): runs the same GhidraScript via pyghidra.run_script (boots Ghidra in-process, imports+analyses, getScriptArgs()=[out_path]); run_extractor dispatches to it when AMS_USE_PYGHIDRA is set. No script changes needed. - worker image installs pyghidra + sets GHIDRA_INSTALL_DIR; compose exposes AMS_USE_PYGHIDRA (default off). Jython path stays the default and untouched. - README documents both variants (Jython <=11.3.x vs PyGhidra 11.4+/12.x). - test: AMS_USE_PYGHIDRA routes to the PyGhidra back-end (clear error if pkg missing). 35/35 tests pass. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
172 lines
6.1 KiB
Python
172 lines
6.1 KiB
Python
"""Acquisition-pipeline tests. The heavy Ghidra step is stubbed with an injected
|
|
extract_fn, so everything here runs without Ghidra. We forge a minimal PE image
|
|
carrying the engine marker strings to exercise content-based identification."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import zipfile
|
|
|
|
import pytest
|
|
|
|
from ams.acquire import acquire
|
|
from ams.acquire.identify import find_engine_dlls, hash_file, is_pe, scan_markers
|
|
from ams.acquire.unpack import detect_kind, unpack
|
|
|
|
|
|
def _fake_pe(markers: bytes = b"") -> bytes:
|
|
"""Smallest bytes that pass is_pe(): MZ stub, e_lfanew→0x80, 'PE\\0\\0' there."""
|
|
buf = bytearray(0x200)
|
|
buf[0:2] = b"MZ"
|
|
buf[0x3C:0x40] = (0x80).to_bytes(4, "little")
|
|
buf[0x80:0x84] = b"PE\x00\x00"
|
|
if markers:
|
|
buf[0x100:0x100 + len(markers)] = markers
|
|
return bytes(buf)
|
|
|
|
|
|
def _write(path: str, data: bytes) -> str:
|
|
with open(path, "wb") as fh:
|
|
fh.write(data)
|
|
return path
|
|
|
|
|
|
# --- identify ---------------------------------------------------------------------------------
|
|
|
|
def test_is_pe_and_markers(tmp_path):
|
|
engine = _write(str(tmp_path / "x.dll"), _fake_pe(b"CMC_ObjectsContainer ... BlooMooDLL"))
|
|
plain = _write(str(tmp_path / "y.dll"), _fake_pe(b"nothing here"))
|
|
text = _write(str(tmp_path / "z.txt"), b"not a pe")
|
|
|
|
assert is_pe(engine) and is_pe(plain)
|
|
assert not is_pe(text)
|
|
has_factory, eng = scan_markers(engine)
|
|
assert has_factory and eng == "BlooMoo"
|
|
assert scan_markers(plain) == (False, None)
|
|
|
|
|
|
def test_find_engine_dlls_picks_marked(tmp_path):
|
|
(tmp_path / "sub").mkdir()
|
|
_write(str(tmp_path / "readme.txt"), b"hello")
|
|
_write(str(tmp_path / "plain.dll"), _fake_pe(b"boring"))
|
|
target = _write(str(tmp_path / "sub" / "PIKLIB8.dll"),
|
|
_fake_pe(b"CMC_ObjectsContainer Piklib build"))
|
|
|
|
cands = find_engine_dlls(str(tmp_path))
|
|
assert cands, "expected at least one candidate"
|
|
assert os.path.samefile(cands[0].path, target)
|
|
assert cands[0].engine == "Piklib"
|
|
assert cands[0].score >= 100 # factory marker dominates
|
|
|
|
|
|
def test_hash_file(tmp_path):
|
|
p = _write(str(tmp_path / "a.bin"), b"abc")
|
|
h = hash_file(p)
|
|
# sha256("abc")
|
|
assert h.sha256 == "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad"
|
|
assert h.size == 3
|
|
assert h.fuzzy is None or isinstance(h.fuzzy, str)
|
|
|
|
|
|
# --- unpack -----------------------------------------------------------------------------------
|
|
|
|
def test_detect_kind(tmp_path):
|
|
z = str(tmp_path / "g.zip")
|
|
with zipfile.ZipFile(z, "w") as zf:
|
|
zf.writestr("inner/PIKLIB8.dll", _fake_pe(b"CMC_ObjectsContainer"))
|
|
assert detect_kind(z) == "zip"
|
|
assert detect_kind(str(tmp_path)) == "dir"
|
|
pe = _write(str(tmp_path / "p.dll"), _fake_pe())
|
|
assert detect_kind(pe) == "pe"
|
|
|
|
|
|
def test_unpack_zip_and_find(tmp_path):
|
|
z = str(tmp_path / "game.zip")
|
|
with zipfile.ZipFile(z, "w") as zf:
|
|
zf.writestr("data/bloomoodll.dll", _fake_pe(b"CMC_ObjectsContainer BlooMooDLL"))
|
|
zf.writestr("data/notes.txt", b"x")
|
|
dest = unpack(z)
|
|
try:
|
|
cands = find_engine_dlls(dest)
|
|
assert cands and cands[0].engine == "BlooMoo"
|
|
finally:
|
|
import shutil
|
|
shutil.rmtree(dest, ignore_errors=True)
|
|
|
|
|
|
# --- full pipeline (stubbed extractor) --------------------------------------------------------
|
|
|
|
@pytest.fixture
|
|
def golden_snapshot():
|
|
here = os.path.dirname(__file__)
|
|
path = os.path.join(here, "..", "snapshots", "PIKLIB8.dll.snapshot.json")
|
|
with open(path, "r", encoding="utf-8") as fh:
|
|
return json.load(fh)
|
|
|
|
|
|
def _stub_extractor(golden):
|
|
"""Return an extract_fn that writes the golden snapshot to out_path."""
|
|
def _fn(dll_path, out_path):
|
|
with open(out_path, "w", encoding="utf-8") as fh:
|
|
json.dump(golden, fh)
|
|
return out_path
|
|
return _fn
|
|
|
|
|
|
def test_acquire_zip_no_sink(tmp_path, golden_snapshot):
|
|
z = str(tmp_path / "reksio.zip")
|
|
with zipfile.ZipFile(z, "w") as zf:
|
|
zf.writestr("PIKLIB8.dll", _fake_pe(b"CMC_ObjectsContainer Piklib"))
|
|
out_dir = str(tmp_path / "out")
|
|
|
|
r = acquire(z, out_dir=out_dir, sink="none", extract_fn=_stub_extractor(golden_snapshot))
|
|
|
|
assert r.engine in ("Piklib", "BlooMoo")
|
|
assert os.path.isfile(r.snapshot_path)
|
|
# enrichment landed in the binary block
|
|
b = r.snapshot["binary"]
|
|
assert b["md5"] == r.hashes.md5
|
|
assert b["size"] == r.hashes.size
|
|
assert b["acquisition"]["source"] == "reksio.zip"
|
|
assert b["acquisition"]["dll_name"] == "PIKLIB8.dll"
|
|
# and was persisted to disk, not just the in-memory dict
|
|
on_disk = json.load(open(r.snapshot_path, encoding="utf-8"))
|
|
assert on_disk["binary"]["acquisition"]["dll_name"] == "PIKLIB8.dll"
|
|
assert r.imported_id is None and r.sink == "none"
|
|
|
|
|
|
def test_pyghidra_dispatch_without_dep(tmp_path, monkeypatch):
|
|
"""AMS_USE_PYGHIDRA routes to the PyGhidra back-end; without the package it fails clearly."""
|
|
import importlib.util
|
|
|
|
from ams.acquire import ghidra
|
|
if importlib.util.find_spec("pyghidra") is not None:
|
|
pytest.skip("pyghidra is installed; this exercises the missing-dependency path")
|
|
monkeypatch.setenv("AMS_USE_PYGHIDRA", "1")
|
|
with pytest.raises(ghidra.GhidraNotFound, match="pyghidra"):
|
|
ghidra.run_extractor(str(tmp_path / "x.dll"), str(tmp_path / "out.json"))
|
|
|
|
|
|
def test_acquire_loose_dll_into_db(tmp_path, golden_snapshot):
|
|
from ams.api.db import configure
|
|
|
|
configure("sqlite:///" + str(tmp_path / "cat.db"))
|
|
dll = _write(str(tmp_path / "PIKLIB8.dll"), _fake_pe(b"CMC_ObjectsContainer Piklib"))
|
|
out_dir = str(tmp_path / "out")
|
|
|
|
r = acquire(dll, "Reksio i UFO", out_dir=out_dir, sink="db",
|
|
extract_fn=_stub_extractor(golden_snapshot))
|
|
|
|
assert r.sink == "db" and isinstance(r.imported_id, int)
|
|
|
|
from ams.api.db import get_session
|
|
from ams.api.models import Game, Snapshot
|
|
db = get_session()
|
|
try:
|
|
snap = db.get(Snapshot, r.imported_id)
|
|
assert snap is not None and snap.binary_name
|
|
assert snap.game is not None and snap.game.name == "Reksio i UFO"
|
|
finally:
|
|
db.close()
|