Closes the chain from a game file to a catalog entry: unpack an ISO/ZIP, content-identify the engine DLL (CMC_ObjectsContainer marker in RTTI, so a renamed file is still found), hash it (sha256 + md5 + optional ssdeep via ppdeep), run Ghidra headless with the extractor, enrich and import the snapshot. - unpack.py: bsdtar (ISO9660 + ZIP) with a pure-Python zipfile fallback - identify.py: content-based engine-DLL picker + hashing - ghidra.py: analyzeHeadless launcher discovery + post-script run - pipeline.py: orchestration with injectable extract_fn; sink db|http|none - cli.py: python -m ams.acquire (incl. --identify-only dry run) - tests: 7 new (forged PE markers + stubbed extractor) -> 18/18 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
90 lines
2.8 KiB
Python
90 lines
2.8 KiB
Python
"""Unpack a game archive (ISO9660 or ZIP) into a directory.
|
|
|
|
`bsdtar` (libarchive — ships with macOS and most Linuxes) reads ISO *and* ZIP, so
|
|
it is the primary path. A pure-Python `zipfile` fallback covers ZIPs when bsdtar
|
|
is missing. ISOs without bsdtar raise, with a clear hint.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
import zipfile
|
|
|
|
from .identify import is_pe
|
|
|
|
|
|
class UnpackError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def detect_kind(path: str) -> str:
|
|
"""One of: 'dir', 'iso', 'zip', 'pe', 'unknown' — by directory check then magic bytes."""
|
|
if os.path.isdir(path):
|
|
return "dir"
|
|
try:
|
|
with open(path, "rb") as fh:
|
|
head = fh.read(4)
|
|
# ISO9660: the primary volume descriptor carries 'CD001' at offset 0x8001.
|
|
fh.seek(0x8001)
|
|
iso_magic = fh.read(5)
|
|
except OSError as e:
|
|
raise UnpackError("cannot read {0}: {1}".format(path, e))
|
|
if head[:4] in (b"PK\x03\x04", b"PK\x05\x06", b"PK\x07\x08"):
|
|
return "zip"
|
|
if iso_magic == b"CD001":
|
|
return "iso"
|
|
if is_pe(path):
|
|
return "pe"
|
|
return "unknown"
|
|
|
|
|
|
def _has_bsdtar() -> bool:
|
|
return shutil.which("bsdtar") is not None
|
|
|
|
|
|
def _bsdtar_extract(archive: str, dest: str) -> None:
|
|
try:
|
|
proc = subprocess.run(
|
|
["bsdtar", "-x", "-f", archive, "-C", dest],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
)
|
|
except OSError as e:
|
|
raise UnpackError("bsdtar failed to launch: {0}".format(e))
|
|
if proc.returncode != 0:
|
|
raise UnpackError("bsdtar exited {0}: {1}".format(
|
|
proc.returncode, proc.stderr.decode("utf-8", "replace").strip()))
|
|
|
|
|
|
def unpack(archive: str, dest: str | None = None) -> str:
|
|
"""Extract `archive` into `dest` (a fresh temp dir if None) and return that directory.
|
|
|
|
Directories pass through unchanged. A bare PE/DLL is rejected here — callers that
|
|
accept loose binaries should branch on `detect_kind` first."""
|
|
kind = detect_kind(archive)
|
|
if kind == "dir":
|
|
return archive
|
|
if kind == "pe":
|
|
raise UnpackError("{0} is a bare PE image, not an archive".format(archive))
|
|
if kind == "unknown":
|
|
raise UnpackError("unrecognised archive (not ISO/ZIP): {0}".format(archive))
|
|
|
|
created = dest is None
|
|
dest = dest or tempfile.mkdtemp(prefix="ams_unpack_")
|
|
try:
|
|
if _has_bsdtar():
|
|
_bsdtar_extract(archive, dest)
|
|
elif kind == "zip":
|
|
with zipfile.ZipFile(archive) as zf:
|
|
zf.extractall(dest)
|
|
else: # iso without bsdtar
|
|
raise UnpackError(
|
|
"ISO extraction needs bsdtar (libarchive); install it or unpack the ISO manually")
|
|
except Exception:
|
|
if created:
|
|
shutil.rmtree(dest, ignore_errors=True)
|
|
raise
|
|
return dest
|