From 6797ad5ddb76794a1962822d9e2ebce1eb5f247a Mon Sep 17 00:00:00 2001 From: Patryk Gensch <43010113+patryk025@users.noreply.github.com> Date: Sun, 31 May 2026 12:11:56 +0200 Subject: [PATCH] Add ISO/ZIP acquisition pipeline (ams.acquire worker) Closes the chain from a game file to a catalog entry: unpack an ISO/ZIP, content-identify the engine DLL (CMC_ObjectsContainer marker in RTTI, so a renamed file is still found), hash it (sha256 + md5 + optional ssdeep via ppdeep), run Ghidra headless with the extractor, enrich and import the snapshot. - unpack.py: bsdtar (ISO9660 + ZIP) with a pure-Python zipfile fallback - identify.py: content-based engine-DLL picker + hashing - ghidra.py: analyzeHeadless launcher discovery + post-script run - pipeline.py: orchestration with injectable extract_fn; sink db|http|none - cli.py: python -m ams.acquire (incl. --identify-only dry run) - tests: 7 new (forged PE markers + stubbed extractor) -> 18/18 Co-Authored-By: Claude Opus 4.8 --- README.md | 22 ++++++ ams/acquire/__init__.py | 7 ++ ams/acquire/__main__.py | 6 ++ ams/acquire/cli.py | 87 +++++++++++++++++++++ ams/acquire/ghidra.py | 93 ++++++++++++++++++++++ ams/acquire/identify.py | 139 +++++++++++++++++++++++++++++++++ ams/acquire/pipeline.py | 169 ++++++++++++++++++++++++++++++++++++++++ ams/acquire/unpack.py | 89 +++++++++++++++++++++ pyproject.toml | 3 + tests/test_acquire.py | 159 +++++++++++++++++++++++++++++++++++++ 10 files changed, 774 insertions(+) create mode 100644 ams/acquire/__init__.py create mode 100644 ams/acquire/__main__.py create mode 100644 ams/acquire/cli.py create mode 100644 ams/acquire/ghidra.py create mode 100644 ams/acquire/identify.py create mode 100644 ams/acquire/pipeline.py create mode 100644 ams/acquire/unpack.py create mode 100644 tests/test_acquire.py diff --git a/README.md b/README.md index 021bfef..5afcc51 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,28 @@ analyzeHeadless -process PIKLIB8.dll \ -postScript extract_engine_surface.py "$(pwd)/snapshots/PIKLIB8.snapshot.json" ``` +## Akwizycja — ISO/ZIP → katalog + +Worker, który domyka łańcuch od *pliku gry* do wpisu w katalogu: rozpakowuje archiwum, +**sam znajduje DLL silnika** (po markerach w binarce — `CMC_ObjectsContainer` w RTTI — +więc działa nawet po zmianie nazwy pliku), liczy hashe (sha256 + md5 + opcjonalnie ssdeep), +odpala Ghidrę headless z ekstraktorem i ląduje snapshotem w bazie. + +```bash +pip install -e ".[api,acquire]" # acquire = ppdeep (fuzzy hash, opcjonalny) +export GHIDRA_HEADLESS=/path/to/ghidra/support/analyzeHeadless # albo GHIDRA_HOME + +python -m ams.acquire game.iso --game "Reksio i UFO" # ISO/ZIP/katalog/luźny DLL +python -m ams.acquire dump_dir --game "Reksio i UFO" --sink http --post http://127.0.0.1:8000 +python -m ams.acquire PIKLIB8.dll --identify-only # tylko unpack+identify+hash, bez Ghidry +``` + +`--sink db` (domyślnie) importuje wprost do bazy, `--sink http` POST-uje na `/snapshots`, +`--sink none` zostawia sam snapshot. `--identify-only` to suchy bieg do walidacji bez Ghidry. +Rozpakowywanie stoi na `bsdtar` (libarchive — czyta i ISO9660, i ZIP); ZIP ma fallback na +czysty Python. Snapshot dostaje doklejony blok `binary.acquisition` (źródło, nazwa DLL) oraz +`binary.fuzzy/md5/size`. + ## Diff engine (CLI) ```bash diff --git a/ams/acquire/__init__.py b/ams/acquire/__init__.py new file mode 100644 index 0000000..ba298fe --- /dev/null +++ b/ams/acquire/__init__.py @@ -0,0 +1,7 @@ +"""Acquisition pipeline: game archive (ISO/ZIP) or loose DLL → engine snapshot → catalog.""" + +from __future__ import annotations + +from .pipeline import AcquireError, AcquireResult, acquire + +__all__ = ["acquire", "AcquireResult", "AcquireError"] diff --git a/ams/acquire/__main__.py b/ams/acquire/__main__.py new file mode 100644 index 0000000..dbdd066 --- /dev/null +++ b/ams/acquire/__main__.py @@ -0,0 +1,6 @@ +import sys + +from .cli import main + +if __name__ == "__main__": + sys.exit(main()) diff --git a/ams/acquire/cli.py b/ams/acquire/cli.py new file mode 100644 index 0000000..a30837c --- /dev/null +++ b/ams/acquire/cli.py @@ -0,0 +1,87 @@ +"""Acquire a game from an ISO/ZIP (or loose DLL) into the catalog. + + python -m ams.acquire game.iso --game "Reksio i UFO" + python -m ams.acquire dump_dir --game "Reksio i UFO" --sink http --post http://127.0.0.1:8000 + python -m ams.acquire PIKLIB8.dll --no-ghidra-check # identify+hash only, skip the heavy pass +""" + +from __future__ import annotations + +import argparse +import sys + +from . import acquire +from .ghidra import GhidraNotFound, find_headless +from .identify import find_engine_dlls, hash_file +from .pipeline import AcquireError +from .unpack import detect_kind, unpack + + +def _cmd_identify(args) -> int: + """Dry run: unpack (if needed), list engine-DLL candidates, hash the best one.""" + kind = detect_kind(args.source) + root = unpack(args.source) if kind in ("iso", "zip") else args.source + cands = find_engine_dlls(root) + if not cands: + print("[!] no engine DLL found under {0}".format(root)) + return 1 + for i, c in enumerate(cands): + mark = "*" if i == 0 else " " + print("{0} score={1:3d} engine={2:<8} {3} ({4})".format( + mark, c.score, c.engine or "?", c.path, c.reason)) + h = hash_file(cands[0].path) + print("\nbest: {0}\n sha256 {1}\n md5 {2}\n size {3}\n fuzzy {4}".format( + cands[0].path, h.sha256, h.md5, h.size, h.fuzzy or "(install ppdeep for fuzzy hashing)")) + return 0 + + +def main(argv: list[str] | None = None) -> int: + p = argparse.ArgumentParser(prog="ams-acquire", description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("source", help="ISO/ZIP archive, an unpacked directory, or a loose engine DLL") + p.add_argument("--game", help="link the snapshot to this game (created if missing)") + p.add_argument("--out-dir", help="where to write the snapshot.json (default: repo snapshots/)") + p.add_argument("--sink", choices=("db", "http", "none"), default="db", + help="where the snapshot lands (default: db)") + p.add_argument("--post", dest="post_url", help="base URL for --sink http (e.g. http://127.0.0.1:8000)") + p.add_argument("--keep", action="store_true", help="keep the temp unpack dir") + p.add_argument("--identify-only", action="store_true", + help="just unpack+identify+hash; don't run Ghidra or import") + args = p.parse_args(argv) + + if args.identify_only: + try: + return _cmd_identify(args) + except (AcquireError, OSError) as e: + print("[!] {0}".format(e)) + return 1 + + if args.sink != "none" and find_headless() is None: + print("[i] Ghidra headless not configured (set $GHIDRA_HEADLESS or $GHIDRA_HOME).") + print(" Run with --identify-only to test unpack/identify/hash without Ghidra.") + + try: + r = acquire(args.source, args.game, out_dir=args.out_dir, + sink=args.sink, post_url=args.post_url, keep_workdir=args.keep) + except GhidraNotFound as e: + print("[!] {0}".format(e)) + return 2 + except (AcquireError, OSError) as e: + print("[!] {0}".format(e)) + return 1 + + print("[+] DLL: {0} (engine={1}, {2})".format(r.dll, r.engine, r.candidate.reason)) + print(" sha256 {0} size {1} fuzzy {2}".format( + r.hashes.sha256, r.hashes.size, r.hashes.fuzzy or "-")) + print(" snapshot: {0}".format(r.snapshot_path)) + b = r.snapshot.get("binary", {}) + print(" types={0} methods={1} events={2} fields={3}".format( + len(r.snapshot.get("types", [])), len(r.snapshot.get("methods", [])), + len(r.snapshot.get("events", [])), len(r.snapshot.get("fields", [])))) + if r.sink != "none": + print(" → catalog ({0}) id={1} game={2}".format(r.sink, r.imported_id, args.game or "-")) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/ams/acquire/ghidra.py b/ams/acquire/ghidra.py new file mode 100644 index 0000000..acf6d49 --- /dev/null +++ b/ams/acquire/ghidra.py @@ -0,0 +1,93 @@ +"""Drive Ghidra's `analyzeHeadless` to run the engine-surface extractor on a DLL. + +This is the heavy worker step: it imports the binary into a throwaway Ghidra +project, auto-analyses it, then runs `ghidra_scripts/extract_engine_surface.py` +as a post-script that writes the snapshot JSON to a path we pick. + +Ghidra isn't a Python package, so it must be located on disk. Resolution order: + 1. $GHIDRA_HEADLESS — full path to the analyzeHeadless launcher + 2. $GHIDRA_HOME/support/analyzeHeadless + 3. `analyzeHeadless` on PATH +""" + +from __future__ import annotations + +import os +import shutil +import subprocess +import tempfile +import uuid +from pathlib import Path + +_SCRIPT_NAME = "extract_engine_surface.py" +# ams/acquire/ghidra.py -> repo root is two parents up +_SCRIPT_DIR = Path(__file__).resolve().parents[2] / "ghidra_scripts" + + +class GhidraNotFound(RuntimeError): + pass + + +class GhidraRunError(RuntimeError): + pass + + +def find_headless() -> str | None: + """Locate the analyzeHeadless launcher, or None if Ghidra isn't configured.""" + env = os.environ.get("GHIDRA_HEADLESS") + if env and os.path.isfile(env): + return env + home = os.environ.get("GHIDRA_HOME") + if home: + for name in ("analyzeHeadless", "analyzeHeadless.bat"): + cand = os.path.join(home, "support", name) + if os.path.isfile(cand): + return cand + return shutil.which("analyzeHeadless") + + +def run_extractor( + dll_path: str, + out_path: str, + *, + headless: str | None = None, + script_dir: str | None = None, + timeout: int = 1800, +) -> str: + """Headless-analyse `dll_path` and write the snapshot to `out_path`; returns `out_path`. + + Raises GhidraNotFound if no launcher is configured, GhidraRunError on failure or if + the script produced no output.""" + headless = headless or find_headless() + if not headless: + raise GhidraNotFound( + "analyzeHeadless not found — set $GHIDRA_HEADLESS or $GHIDRA_HOME (Ghidra's install dir)") + + script_dir = script_dir or str(_SCRIPT_DIR) + out_path = os.path.abspath(out_path) + os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) + + proj_dir = tempfile.mkdtemp(prefix="ams_ghidra_") + proj_name = "ams_" + uuid.uuid4().hex[:8] + cmd = [ + headless, proj_dir, proj_name, + "-import", dll_path, + "-scriptPath", script_dir, + "-postScript", _SCRIPT_NAME, out_path, + "-deleteProject", + ] + try: + proc = subprocess.run( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=timeout) + except subprocess.TimeoutExpired: + raise GhidraRunError("analyzeHeadless timed out after {0}s".format(timeout)) + except OSError as e: + raise GhidraRunError("failed to launch analyzeHeadless: {0}".format(e)) + finally: + shutil.rmtree(proj_dir, ignore_errors=True) + + if not os.path.isfile(out_path): + tail = proc.stdout.decode("utf-8", "replace")[-2000:] if proc.stdout else "" + raise GhidraRunError( + "extractor produced no snapshot at {0}\n--- headless tail ---\n{1}".format(out_path, tail)) + return out_path diff --git a/ams/acquire/identify.py b/ams/acquire/identify.py new file mode 100644 index 0000000..042a7e3 --- /dev/null +++ b/ams/acquire/identify.py @@ -0,0 +1,139 @@ +"""Find the engine DLL inside an unpacked game tree and hash it. + +Identification is *content-based* first — we scan the file for marker strings that +only a Piklib/BlooMoo engine carries (the factory class name shows up inside the +MSVC RTTI/mangled symbols) — and fall back to filename hints. So a renamed DLL is +still picked correctly, before the expensive Ghidra pass ever runs. +""" + +from __future__ import annotations + +import hashlib +import os +from dataclasses import dataclass + +# ASCII substrings that survive inside the PE: the factory class name is the +# decisive "this is an engine DLL" signal (it appears within mangled RTTI names), +# the namespace/product tags only disambiguate which engine. +_MARK_FACTORY = b"CMC_ObjectsContainer" +_MARK_BLOOMOO = b"BlooMoo" +_MARK_PIKLIB = b"Piklib" + +_FILENAME_HINTS = ("piklib", "bloomoo") +_PE_EXT = (".dll", ".exe") + +_SCAN_LIMIT = 64 * 1024 * 1024 # plenty for these engine DLLs, bounds pathological files + + +@dataclass +class Candidate: + """A scored engine-DLL candidate. Higher score = more certain.""" + + path: str + score: int + engine: str | None # "BlooMoo" | "Piklib" | None (factory present, product unclear) + reason: str + + +@dataclass +class FileHashes: + sha256: str + md5: str + size: int + fuzzy: str | None # ssdeep-style, when ppdeep/ssdeep is installed; else None + + +def is_pe(path: str) -> bool: + """True if the file is a Windows PE image (MZ stub + PE\\0\\0 header).""" + try: + with open(path, "rb") as fh: + if fh.read(2) != b"MZ": + return False + fh.seek(0x3C) + off = fh.read(4) + if len(off) < 4: + return False + fh.seek(int.from_bytes(off, "little")) + return fh.read(4) == b"PE\x00\x00" + except OSError: + return False + + +def scan_markers(path: str, limit: int = _SCAN_LIMIT) -> tuple[bool, str | None]: + """Scan up to `limit` bytes for engine markers → (has_factory, engine_guess).""" + try: + with open(path, "rb") as fh: + blob = fh.read(limit) + except OSError: + return (False, None) + engine = "BlooMoo" if _MARK_BLOOMOO in blob else ("Piklib" if _MARK_PIKLIB in blob else None) + return (_MARK_FACTORY in blob, engine) + + +def _score(path: str) -> Candidate | None: + """Score a single file as an engine DLL, or None if it isn't a PE image.""" + if not is_pe(path): + return None + has_factory, engine = scan_markers(path) + score, reasons = 0, [] + if has_factory: + score += 100 + reasons.append("factory-marker") + name = os.path.basename(path).lower() + if any(h in name for h in _FILENAME_HINTS): + score += 10 + reasons.append("filename-hint") + if engine: + score += 5 + reasons.append("engine=" + engine) + if score == 0: + return None # a PE with no engine signal at all — not a candidate + return Candidate(path=path, score=score, engine=engine, reason=",".join(reasons)) + + +def find_engine_dlls(root: str) -> list[Candidate]: + """Walk `root` and return engine-DLL candidates, strongest first. + + A single file path is accepted directly. Only PE files with a `.dll`/`.exe` + extension are considered, but the actual decision is content-based.""" + targets: list[str] = [] + if os.path.isfile(root): + targets = [root] + else: + for dirpath, _dirs, files in os.walk(root): + for fn in files: + if fn.lower().endswith(_PE_EXT): + targets.append(os.path.join(dirpath, fn)) + + out = [c for c in (_score(p) for p in targets) if c is not None] + out.sort(key=lambda c: c.score, reverse=True) + return out + + +def fuzzy_hash(path: str) -> str | None: + """Context-triggered piecewise hash (ssdeep format) for near-duplicate detection. + + Uses ppdeep (pure-Python) or ssdeep if importable; returns None otherwise, so the + pipeline never hard-depends on it.""" + for modname in ("ppdeep", "ssdeep"): + try: + mod = __import__(modname) + except ImportError: + continue + try: + return mod.hash_from_file(path) + except Exception: + return None + return None + + +def hash_file(path: str) -> FileHashes: + """sha256 + md5 + size (+ fuzzy when available), streamed so big DLLs don't load whole.""" + sha, md5 = hashlib.sha256(), hashlib.md5() + size = 0 + with open(path, "rb") as fh: + for chunk in iter(lambda: fh.read(1024 * 1024), b""): + sha.update(chunk) + md5.update(chunk) + size += len(chunk) + return FileHashes(sha256=sha.hexdigest(), md5=md5.hexdigest(), size=size, fuzzy=fuzzy_hash(path)) diff --git a/ams/acquire/pipeline.py b/ams/acquire/pipeline.py new file mode 100644 index 0000000..d7d46ea --- /dev/null +++ b/ams/acquire/pipeline.py @@ -0,0 +1,169 @@ +"""End-to-end acquisition: a game archive (or loose DLL) → snapshot → catalog. + + archive.iso ─▶ unpack ─▶ pick engine DLL ─▶ hash ─▶ Ghidra headless + │ + snapshot.json ◀────────┘ + │ enrich (fuzzy/md5/acquisition) + ▼ + import to DB *or* POST /snapshots + +Everything heavy is injectable: pass your own `extract_fn(dll, out)->path` to test the +wiring without Ghidra. The default uses `ghidra.run_extractor`. +""" + +from __future__ import annotations + +import json +import os +import shutil +import tempfile +from dataclasses import dataclass, field +from typing import Callable + +from . import ghidra +from .identify import Candidate, FileHashes, find_engine_dlls, hash_file +from .unpack import detect_kind, unpack + +ExtractFn = Callable[[str, str], str] + + +class AcquireError(RuntimeError): + pass + + +@dataclass +class AcquireResult: + source: str + dll: str # absolute path to the engine DLL that was analysed + candidate: Candidate + hashes: FileHashes + snapshot_path: str + snapshot: dict + engine: str | None + imported_id: int | None = None + sink: str = "none" # "db" | "http" | "none" + extras: dict = field(default_factory=dict) + + +def _default_out_dir() -> str: + # repo snapshots/ (…/ams/acquire/pipeline.py -> repo root two levels up) + return str((__import__("pathlib").Path(__file__).resolve().parents[2] / "snapshots")) + + +def _enrich(snapshot: dict, *, dll: str, hashes: FileHashes, candidate: Candidate, source: str) -> None: + """Stamp acquisition provenance into the snapshot's binary block (in place).""" + binary = snapshot.setdefault("binary", {}) + # The extractor reads sha256 from the live program; backfill / cross-check from our own hash. + binary.setdefault("sha256", hashes.sha256) + binary["md5"] = hashes.md5 + binary["size"] = hashes.size + if hashes.fuzzy: + binary["fuzzy"] = hashes.fuzzy + binary["acquisition"] = { + "source": os.path.basename(source), + "dll_name": os.path.basename(dll), + "identify_engine": candidate.engine, + "identify_reason": candidate.reason, + } + + +def acquire( + source: str, + game_name: str | None = None, + *, + out_dir: str | None = None, + extract_fn: ExtractFn | None = None, + sink: str = "db", + post_url: str | None = None, + keep_workdir: bool = False, +) -> AcquireResult: + """Run the full pipeline on `source` (an ISO/ZIP, a directory, or a loose DLL). + + sink: "db" imports straight into the catalog DB, "http" POSTs to `post_url`/snapshots, + "none" just produces the snapshot file. `extract_fn` overrides the Ghidra step.""" + source = os.path.abspath(source) + if not os.path.exists(source): + raise AcquireError("source does not exist: {0}".format(source)) + + extract_fn = extract_fn or ghidra.run_extractor + out_dir = out_dir or _default_out_dir() + + kind = detect_kind(source) + workdir: str | None = None + try: + # 1. resolve a directory/file tree to search for the engine DLL + if kind in ("iso", "zip"): + workdir = unpack(source) + search_root = workdir + else: # dir or pe — search in place + search_root = source + + # 2. pick the engine DLL (content-based) + cands = find_engine_dlls(search_root) + if not cands: + raise AcquireError("no Piklib/BlooMoo engine DLL found under {0}".format(search_root)) + best = cands[0] + dll = os.path.abspath(best.path) + + # 3. hash it + hashes = hash_file(dll) + + # 4. run the extractor (Ghidra, or an injected stub) + os.makedirs(out_dir, exist_ok=True) + out_path = os.path.join(out_dir, os.path.basename(dll) + ".snapshot.json") + extract_fn(dll, out_path) + with open(out_path, "r", encoding="utf-8") as fh: + snapshot = json.load(fh) + + # 5. enrich with acquisition provenance and rewrite + _enrich(snapshot, dll=dll, hashes=hashes, candidate=best, source=source) + with open(out_path, "w", encoding="utf-8") as fh: + json.dump(snapshot, fh, indent=2, sort_keys=True) + + result = AcquireResult( + source=source, dll=dll, candidate=best, hashes=hashes, + snapshot_path=out_path, snapshot=snapshot, + engine=snapshot.get("binary", {}).get("engine") or best.engine, + ) + + # 6. land it in the catalog + if sink == "db": + result.imported_id = _import_db(snapshot, game_name) + result.sink = "db" + elif sink == "http": + result.imported_id = _post_http(post_url, snapshot, game_name) + result.sink = "http" + return result + finally: + if workdir and not keep_workdir: + shutil.rmtree(workdir, ignore_errors=True) + + +def _import_db(snapshot: dict, game_name: str | None) -> int: + from ..api.db import get_session, init_db + from ..api.service import import_snapshot + + init_db() + db = get_session() + try: + snap = import_snapshot(db, snapshot, game_name) + return snap.id + finally: + db.close() + + +def _post_http(post_url: str | None, snapshot: dict, game_name: str | None) -> int: + import urllib.parse + import urllib.request + + if not post_url: + raise AcquireError("sink='http' requires post_url (e.g. http://127.0.0.1:8000)") + url = post_url.rstrip("/") + "/snapshots" + if game_name: + url += "?" + urllib.parse.urlencode({"game": game_name}) + req = urllib.request.Request( + url, data=json.dumps(snapshot).encode("utf-8"), + headers={"Content-Type": "application/json"}, method="POST") + with urllib.request.urlopen(req, timeout=30) as resp: + body = json.loads(resp.read().decode("utf-8")) + return body.get("id") diff --git a/ams/acquire/unpack.py b/ams/acquire/unpack.py new file mode 100644 index 0000000..dfbba60 --- /dev/null +++ b/ams/acquire/unpack.py @@ -0,0 +1,89 @@ +"""Unpack a game archive (ISO9660 or ZIP) into a directory. + +`bsdtar` (libarchive — ships with macOS and most Linuxes) reads ISO *and* ZIP, so +it is the primary path. A pure-Python `zipfile` fallback covers ZIPs when bsdtar +is missing. ISOs without bsdtar raise, with a clear hint. +""" + +from __future__ import annotations + +import os +import shutil +import subprocess +import tempfile +import zipfile + +from .identify import is_pe + + +class UnpackError(RuntimeError): + pass + + +def detect_kind(path: str) -> str: + """One of: 'dir', 'iso', 'zip', 'pe', 'unknown' — by directory check then magic bytes.""" + if os.path.isdir(path): + return "dir" + try: + with open(path, "rb") as fh: + head = fh.read(4) + # ISO9660: the primary volume descriptor carries 'CD001' at offset 0x8001. + fh.seek(0x8001) + iso_magic = fh.read(5) + except OSError as e: + raise UnpackError("cannot read {0}: {1}".format(path, e)) + if head[:4] in (b"PK\x03\x04", b"PK\x05\x06", b"PK\x07\x08"): + return "zip" + if iso_magic == b"CD001": + return "iso" + if is_pe(path): + return "pe" + return "unknown" + + +def _has_bsdtar() -> bool: + return shutil.which("bsdtar") is not None + + +def _bsdtar_extract(archive: str, dest: str) -> None: + try: + proc = subprocess.run( + ["bsdtar", "-x", "-f", archive, "-C", dest], + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + ) + except OSError as e: + raise UnpackError("bsdtar failed to launch: {0}".format(e)) + if proc.returncode != 0: + raise UnpackError("bsdtar exited {0}: {1}".format( + proc.returncode, proc.stderr.decode("utf-8", "replace").strip())) + + +def unpack(archive: str, dest: str | None = None) -> str: + """Extract `archive` into `dest` (a fresh temp dir if None) and return that directory. + + Directories pass through unchanged. A bare PE/DLL is rejected here — callers that + accept loose binaries should branch on `detect_kind` first.""" + kind = detect_kind(archive) + if kind == "dir": + return archive + if kind == "pe": + raise UnpackError("{0} is a bare PE image, not an archive".format(archive)) + if kind == "unknown": + raise UnpackError("unrecognised archive (not ISO/ZIP): {0}".format(archive)) + + created = dest is None + dest = dest or tempfile.mkdtemp(prefix="ams_unpack_") + try: + if _has_bsdtar(): + _bsdtar_extract(archive, dest) + elif kind == "zip": + with zipfile.ZipFile(archive) as zf: + zf.extractall(dest) + else: # iso without bsdtar + raise UnpackError( + "ISO extraction needs bsdtar (libarchive); install it or unpack the ISO manually") + except Exception: + if created: + shutil.rmtree(dest, ignore_errors=True) + raise + return dest diff --git a/pyproject.toml b/pyproject.toml index f263a40..2fbe54d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,9 @@ api = [ "sqlalchemy>=2.0", "pydantic>=2.6", ] +acquire = [ + "ppdeep>=20200505", # pure-Python ssdeep, for fuzzy/near-duplicate hashing (optional) +] dev = [ "pytest>=8", "httpx>=0.27", diff --git a/tests/test_acquire.py b/tests/test_acquire.py new file mode 100644 index 0000000..60f7a45 --- /dev/null +++ b/tests/test_acquire.py @@ -0,0 +1,159 @@ +"""Acquisition-pipeline tests. The heavy Ghidra step is stubbed with an injected +extract_fn, so everything here runs without Ghidra. We forge a minimal PE image +carrying the engine marker strings to exercise content-based identification.""" + +from __future__ import annotations + +import json +import os +import zipfile + +import pytest + +from ams.acquire import acquire +from ams.acquire.identify import find_engine_dlls, hash_file, is_pe, scan_markers +from ams.acquire.unpack import detect_kind, unpack + + +def _fake_pe(markers: bytes = b"") -> bytes: + """Smallest bytes that pass is_pe(): MZ stub, e_lfanew→0x80, 'PE\\0\\0' there.""" + buf = bytearray(0x200) + buf[0:2] = b"MZ" + buf[0x3C:0x40] = (0x80).to_bytes(4, "little") + buf[0x80:0x84] = b"PE\x00\x00" + if markers: + buf[0x100:0x100 + len(markers)] = markers + return bytes(buf) + + +def _write(path: str, data: bytes) -> str: + with open(path, "wb") as fh: + fh.write(data) + return path + + +# --- identify --------------------------------------------------------------------------------- + +def test_is_pe_and_markers(tmp_path): + engine = _write(str(tmp_path / "x.dll"), _fake_pe(b"CMC_ObjectsContainer ... BlooMooDLL")) + plain = _write(str(tmp_path / "y.dll"), _fake_pe(b"nothing here")) + text = _write(str(tmp_path / "z.txt"), b"not a pe") + + assert is_pe(engine) and is_pe(plain) + assert not is_pe(text) + has_factory, eng = scan_markers(engine) + assert has_factory and eng == "BlooMoo" + assert scan_markers(plain) == (False, None) + + +def test_find_engine_dlls_picks_marked(tmp_path): + (tmp_path / "sub").mkdir() + _write(str(tmp_path / "readme.txt"), b"hello") + _write(str(tmp_path / "plain.dll"), _fake_pe(b"boring")) + target = _write(str(tmp_path / "sub" / "PIKLIB8.dll"), + _fake_pe(b"CMC_ObjectsContainer Piklib build")) + + cands = find_engine_dlls(str(tmp_path)) + assert cands, "expected at least one candidate" + assert os.path.samefile(cands[0].path, target) + assert cands[0].engine == "Piklib" + assert cands[0].score >= 100 # factory marker dominates + + +def test_hash_file(tmp_path): + p = _write(str(tmp_path / "a.bin"), b"abc") + h = hash_file(p) + # sha256("abc") + assert h.sha256 == "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad" + assert h.size == 3 + assert h.fuzzy is None or isinstance(h.fuzzy, str) + + +# --- unpack ----------------------------------------------------------------------------------- + +def test_detect_kind(tmp_path): + z = str(tmp_path / "g.zip") + with zipfile.ZipFile(z, "w") as zf: + zf.writestr("inner/PIKLIB8.dll", _fake_pe(b"CMC_ObjectsContainer")) + assert detect_kind(z) == "zip" + assert detect_kind(str(tmp_path)) == "dir" + pe = _write(str(tmp_path / "p.dll"), _fake_pe()) + assert detect_kind(pe) == "pe" + + +def test_unpack_zip_and_find(tmp_path): + z = str(tmp_path / "game.zip") + with zipfile.ZipFile(z, "w") as zf: + zf.writestr("data/bloomoodll.dll", _fake_pe(b"CMC_ObjectsContainer BlooMooDLL")) + zf.writestr("data/notes.txt", b"x") + dest = unpack(z) + try: + cands = find_engine_dlls(dest) + assert cands and cands[0].engine == "BlooMoo" + finally: + import shutil + shutil.rmtree(dest, ignore_errors=True) + + +# --- full pipeline (stubbed extractor) -------------------------------------------------------- + +@pytest.fixture +def golden_snapshot(): + here = os.path.dirname(__file__) + path = os.path.join(here, "..", "snapshots", "PIKLIB8.dll.snapshot.json") + with open(path, "r", encoding="utf-8") as fh: + return json.load(fh) + + +def _stub_extractor(golden): + """Return an extract_fn that writes the golden snapshot to out_path.""" + def _fn(dll_path, out_path): + with open(out_path, "w", encoding="utf-8") as fh: + json.dump(golden, fh) + return out_path + return _fn + + +def test_acquire_zip_no_sink(tmp_path, golden_snapshot): + z = str(tmp_path / "reksio.zip") + with zipfile.ZipFile(z, "w") as zf: + zf.writestr("PIKLIB8.dll", _fake_pe(b"CMC_ObjectsContainer Piklib")) + out_dir = str(tmp_path / "out") + + r = acquire(z, out_dir=out_dir, sink="none", extract_fn=_stub_extractor(golden_snapshot)) + + assert r.engine in ("Piklib", "BlooMoo") + assert os.path.isfile(r.snapshot_path) + # enrichment landed in the binary block + b = r.snapshot["binary"] + assert b["md5"] == r.hashes.md5 + assert b["size"] == r.hashes.size + assert b["acquisition"]["source"] == "reksio.zip" + assert b["acquisition"]["dll_name"] == "PIKLIB8.dll" + # and was persisted to disk, not just the in-memory dict + on_disk = json.load(open(r.snapshot_path, encoding="utf-8")) + assert on_disk["binary"]["acquisition"]["dll_name"] == "PIKLIB8.dll" + assert r.imported_id is None and r.sink == "none" + + +def test_acquire_loose_dll_into_db(tmp_path, golden_snapshot): + from ams.api.db import configure + + configure("sqlite:///" + str(tmp_path / "cat.db")) + dll = _write(str(tmp_path / "PIKLIB8.dll"), _fake_pe(b"CMC_ObjectsContainer Piklib")) + out_dir = str(tmp_path / "out") + + r = acquire(dll, "Reksio i UFO", out_dir=out_dir, sink="db", + extract_fn=_stub_extractor(golden_snapshot)) + + assert r.sink == "db" and isinstance(r.imported_id, int) + + from ams.api.db import get_session + from ams.api.models import Game, Snapshot + db = get_session() + try: + snap = db.get(Snapshot, r.imported_id) + assert snap is not None and snap.binary_name + assert snap.game is not None and snap.game.name == "Reksio i UFO" + finally: + db.close()