"""Acquisition-pipeline tests. The heavy Ghidra step is stubbed with an injected extract_fn, so everything here runs without Ghidra. We forge a minimal PE image carrying the engine marker strings to exercise content-based identification.""" from __future__ import annotations import json import os import zipfile import pytest from ams.acquire import acquire from ams.acquire.identify import find_engine_dlls, hash_file, is_pe, scan_markers from ams.acquire.unpack import detect_kind, unpack def _fake_pe(markers: bytes = b"") -> bytes: """Smallest bytes that pass is_pe(): MZ stub, e_lfanew→0x80, 'PE\\0\\0' there.""" buf = bytearray(0x200) buf[0:2] = b"MZ" buf[0x3C:0x40] = (0x80).to_bytes(4, "little") buf[0x80:0x84] = b"PE\x00\x00" if markers: buf[0x100:0x100 + len(markers)] = markers return bytes(buf) def _write(path: str, data: bytes) -> str: with open(path, "wb") as fh: fh.write(data) return path # --- identify --------------------------------------------------------------------------------- def test_is_pe_and_markers(tmp_path): engine = _write(str(tmp_path / "x.dll"), _fake_pe(b"CMC_ObjectsContainer ... BlooMooDLL")) plain = _write(str(tmp_path / "y.dll"), _fake_pe(b"nothing here")) text = _write(str(tmp_path / "z.txt"), b"not a pe") assert is_pe(engine) and is_pe(plain) assert not is_pe(text) has_factory, eng = scan_markers(engine) assert has_factory and eng == "BlooMoo" assert scan_markers(plain) == (False, None) def test_find_engine_dlls_picks_marked(tmp_path): (tmp_path / "sub").mkdir() _write(str(tmp_path / "readme.txt"), b"hello") _write(str(tmp_path / "plain.dll"), _fake_pe(b"boring")) target = _write(str(tmp_path / "sub" / "PIKLIB8.dll"), _fake_pe(b"CMC_ObjectsContainer Piklib build")) cands = find_engine_dlls(str(tmp_path)) assert cands, "expected at least one candidate" assert os.path.samefile(cands[0].path, target) assert cands[0].engine == "Piklib" assert cands[0].score >= 100 # factory marker dominates def test_hash_file(tmp_path): p = _write(str(tmp_path / "a.bin"), b"abc") h = hash_file(p) # sha256("abc") assert h.sha256 == "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad" assert h.size == 3 assert h.fuzzy is None or isinstance(h.fuzzy, str) # --- unpack ----------------------------------------------------------------------------------- def test_detect_kind(tmp_path): z = str(tmp_path / "g.zip") with zipfile.ZipFile(z, "w") as zf: zf.writestr("inner/PIKLIB8.dll", _fake_pe(b"CMC_ObjectsContainer")) assert detect_kind(z) == "zip" assert detect_kind(str(tmp_path)) == "dir" pe = _write(str(tmp_path / "p.dll"), _fake_pe()) assert detect_kind(pe) == "pe" def test_unpack_zip_and_find(tmp_path): z = str(tmp_path / "game.zip") with zipfile.ZipFile(z, "w") as zf: zf.writestr("data/bloomoodll.dll", _fake_pe(b"CMC_ObjectsContainer BlooMooDLL")) zf.writestr("data/notes.txt", b"x") dest = unpack(z) try: cands = find_engine_dlls(dest) assert cands and cands[0].engine == "BlooMoo" finally: import shutil shutil.rmtree(dest, ignore_errors=True) # --- full pipeline (stubbed extractor) -------------------------------------------------------- @pytest.fixture def golden_snapshot(): here = os.path.dirname(__file__) path = os.path.join(here, "..", "snapshots", "PIKLIB8.dll.snapshot.json") with open(path, "r", encoding="utf-8") as fh: return json.load(fh) def _stub_extractor(golden): """Return an extract_fn that writes the golden snapshot to out_path.""" def _fn(dll_path, out_path): with open(out_path, "w", encoding="utf-8") as fh: json.dump(golden, fh) return out_path return _fn def test_acquire_zip_no_sink(tmp_path, golden_snapshot): z = str(tmp_path / "reksio.zip") with zipfile.ZipFile(z, "w") as zf: zf.writestr("PIKLIB8.dll", _fake_pe(b"CMC_ObjectsContainer Piklib")) out_dir = str(tmp_path / "out") r = acquire(z, out_dir=out_dir, sink="none", extract_fn=_stub_extractor(golden_snapshot)) assert r.engine in ("Piklib", "BlooMoo") assert os.path.isfile(r.snapshot_path) # enrichment landed in the binary block b = r.snapshot["binary"] assert b["md5"] == r.hashes.md5 assert b["size"] == r.hashes.size assert b["acquisition"]["source"] == "reksio.zip" assert b["acquisition"]["dll_name"] == "PIKLIB8.dll" # and was persisted to disk, not just the in-memory dict on_disk = json.load(open(r.snapshot_path, encoding="utf-8")) assert on_disk["binary"]["acquisition"]["dll_name"] == "PIKLIB8.dll" assert r.imported_id is None and r.sink == "none" def test_acquire_loose_dll_into_db(tmp_path, golden_snapshot): from ams.api.db import configure configure("sqlite:///" + str(tmp_path / "cat.db")) dll = _write(str(tmp_path / "PIKLIB8.dll"), _fake_pe(b"CMC_ObjectsContainer Piklib")) out_dir = str(tmp_path / "out") r = acquire(dll, "Reksio i UFO", out_dir=out_dir, sink="db", extract_fn=_stub_extractor(golden_snapshot)) assert r.sink == "db" and isinstance(r.imported_id, int) from ams.api.db import get_session from ams.api.models import Game, Snapshot db = get_session() try: snap = db.get(Snapshot, r.imported_id) assert snap is not None and snap.binary_name assert snap.game is not None and snap.game.name == "Reksio i UFO" finally: db.close()