Files
Aidem-Media-DLL-Analysis/ams/acquire/unpack.py
Patryk Gensch 6797ad5ddb Add ISO/ZIP acquisition pipeline (ams.acquire worker)
Closes the chain from a game file to a catalog entry: unpack an ISO/ZIP,
content-identify the engine DLL (CMC_ObjectsContainer marker in RTTI, so a
renamed file is still found), hash it (sha256 + md5 + optional ssdeep via
ppdeep), run Ghidra headless with the extractor, enrich and import the snapshot.

- unpack.py: bsdtar (ISO9660 + ZIP) with a pure-Python zipfile fallback
- identify.py: content-based engine-DLL picker + hashing
- ghidra.py: analyzeHeadless launcher discovery + post-script run
- pipeline.py: orchestration with injectable extract_fn; sink db|http|none
- cli.py: python -m ams.acquire (incl. --identify-only dry run)
- tests: 7 new (forged PE markers + stubbed extractor) -> 18/18

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-31 12:11:56 +02:00

90 lines
2.8 KiB
Python

"""Unpack a game archive (ISO9660 or ZIP) into a directory.
`bsdtar` (libarchive — ships with macOS and most Linuxes) reads ISO *and* ZIP, so
it is the primary path. A pure-Python `zipfile` fallback covers ZIPs when bsdtar
is missing. ISOs without bsdtar raise, with a clear hint.
"""
from __future__ import annotations
import os
import shutil
import subprocess
import tempfile
import zipfile
from .identify import is_pe
class UnpackError(RuntimeError):
pass
def detect_kind(path: str) -> str:
"""One of: 'dir', 'iso', 'zip', 'pe', 'unknown' — by directory check then magic bytes."""
if os.path.isdir(path):
return "dir"
try:
with open(path, "rb") as fh:
head = fh.read(4)
# ISO9660: the primary volume descriptor carries 'CD001' at offset 0x8001.
fh.seek(0x8001)
iso_magic = fh.read(5)
except OSError as e:
raise UnpackError("cannot read {0}: {1}".format(path, e))
if head[:4] in (b"PK\x03\x04", b"PK\x05\x06", b"PK\x07\x08"):
return "zip"
if iso_magic == b"CD001":
return "iso"
if is_pe(path):
return "pe"
return "unknown"
def _has_bsdtar() -> bool:
return shutil.which("bsdtar") is not None
def _bsdtar_extract(archive: str, dest: str) -> None:
try:
proc = subprocess.run(
["bsdtar", "-x", "-f", archive, "-C", dest],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
except OSError as e:
raise UnpackError("bsdtar failed to launch: {0}".format(e))
if proc.returncode != 0:
raise UnpackError("bsdtar exited {0}: {1}".format(
proc.returncode, proc.stderr.decode("utf-8", "replace").strip()))
def unpack(archive: str, dest: str | None = None) -> str:
"""Extract `archive` into `dest` (a fresh temp dir if None) and return that directory.
Directories pass through unchanged. A bare PE/DLL is rejected here — callers that
accept loose binaries should branch on `detect_kind` first."""
kind = detect_kind(archive)
if kind == "dir":
return archive
if kind == "pe":
raise UnpackError("{0} is a bare PE image, not an archive".format(archive))
if kind == "unknown":
raise UnpackError("unrecognised archive (not ISO/ZIP): {0}".format(archive))
created = dest is None
dest = dest or tempfile.mkdtemp(prefix="ams_unpack_")
try:
if _has_bsdtar():
_bsdtar_extract(archive, dest)
elif kind == "zip":
with zipfile.ZipFile(archive) as zf:
zf.extractall(dest)
else: # iso without bsdtar
raise UnpackError(
"ISO extraction needs bsdtar (libarchive); install it or unpack the ISO manually")
except Exception:
if created:
shutil.rmtree(dest, ignore_errors=True)
raise
return dest