Recovers how a script method id maps to its implementation, the foundation for body-level normalisation. Each CMC_*_Runner::run is a switch(id) (vtable slot 17); every case is the method body — inline (MSVC6) or a tail-call to a separate show()/load() (MSVC8). The extractor parses the jump table at the disassembly level (Ghidra's decompiler jump-table recovery silently dropped the big runners), fingerprints each case by its ordered CALL anchors (Class::method / vtbl+0xNN), and expands thin wrappers one level so MSVC8 lines up with MSVC6. Validated on the golden pair: Animo SHOW..RESUME (id 1-4) yield identical leaves (getAnimo + vtbl+0xa0/0xa4/0x4c/0x50) across both compilers. Coverage 30/32 runners; Piklib 475 / BlooMoo 619 dispatch rows. - extract_engine_surface.py: extract_method_dispatch (schema_version -> 4) - snapshots regenerated with the method_dispatch axis - ams: Snapshot.method_dispatch; diff axis keyed (owner,id) on [impl,calls] with method-name join; render METHOD BODIES section; cli --only dispatch; owner filter - UI: "Ciała metod" diff axis + browse tab - tests: body-change unit + cross-compiler vtbl assertion -> 29/29 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
837 lines
32 KiB
Python
837 lines
32 KiB
Python
# Extract the "engine surface" (types/methods/events/fields) from a Piklib/BlooMoo
|
|
# engine DLL and emit a snapshot.json for cross-version diffing.
|
|
#
|
|
# Runs as a Ghidra **headless** post-script. Compatible with both pyghidra (CPython 3)
|
|
# and the bundled Jython 2.7, so it avoids f-strings and py3-only APIs.
|
|
#
|
|
# Usage (headless):
|
|
# analyzeHeadless <projDir> <projName> -process PIKLIB8.dll \
|
|
# -postScript extract_engine_surface.py /abs/path/out.snapshot.json
|
|
#
|
|
# Design note: extraction stands on SEMANTIC ANCHORS (call targets, referenced string
|
|
# constants, push immediates), never on decompiled-C text. That is what makes the same
|
|
# script work across MSVC6 (Piklib) and MSVC8 (BlooMoo) despite very different codegen.
|
|
#
|
|
# @category AidemMedia
|
|
from __future__ import print_function
|
|
|
|
import json
|
|
import hashlib
|
|
import os
|
|
import re
|
|
import tempfile
|
|
|
|
from ghidra.app.decompiler import DecompInterface
|
|
from ghidra.program.model.pcode import PcodeOp
|
|
|
|
# Names of the helper functions the factory dispatch relies on. These survive demangling
|
|
# identically on both compilers.
|
|
OP_NEW = "operator_new"
|
|
OP_EQ = "operator=="
|
|
CMC_PREFIX = "CMC_"
|
|
|
|
# How many preceding instructions to inspect when recovering a PUSH argument.
|
|
LOOKBACK = 8
|
|
|
|
|
|
# --------------------------------------------------------------------------- helpers
|
|
|
|
def find_function_by_qualified(program, class_name, method_name):
|
|
"""Find a function `method_name` whose immediate parent namespace is `class_name`."""
|
|
fm = program.getFunctionManager()
|
|
it = fm.getFunctions(True)
|
|
while it.hasNext():
|
|
f = it.next()
|
|
if f.getName() != method_name:
|
|
continue
|
|
ns = f.getParentNamespace()
|
|
if ns is not None and ns.getName() == class_name:
|
|
return f
|
|
return None
|
|
|
|
|
|
def call_target(program, instr):
|
|
"""Resolve the (possibly thunked) function a direct CALL points at, or None."""
|
|
fm = program.getFunctionManager()
|
|
for a in instr.getFlows():
|
|
f = fm.getFunctionAt(a)
|
|
if f is not None:
|
|
return f
|
|
return None
|
|
|
|
|
|
def resolve_thunk(func):
|
|
"""Follow an incremental-linking (ILT) thunk to the real function. MSVC8 places a
|
|
`JMP real` stub at the symbol address; the actual body lives elsewhere.
|
|
|
|
Two cases: (a) Ghidra modelled it as a real thunk function, or (b) it didn't, and the
|
|
stub is just a function whose entire body is one unconditional `JMP real` (the symbol
|
|
`resolve` sits on the stub, while the body is e.g. FUN_xxxx)."""
|
|
if func is None:
|
|
return func
|
|
if func.isThunk():
|
|
return func.getThunkedFunction(True)
|
|
instr = currentProgram.getListing().getInstructionAt(func.getEntryPoint())
|
|
if instr is not None:
|
|
ft = instr.getFlowType()
|
|
if ft.isJump() and not ft.isConditional():
|
|
flows = instr.getFlows()
|
|
if len(flows) == 1:
|
|
target = currentProgram.getFunctionManager().getFunctionAt(flows[0])
|
|
if target is not None:
|
|
return target
|
|
return func
|
|
|
|
|
|
def _read_cstring(program, addr, maxlen=64):
|
|
"""Read a NUL-terminated printable-ASCII string straight from memory. Needed because
|
|
some type-name literals (e.g. "BOOL", "FONT") are referenced but never defined as Data,
|
|
so getDataAt() returns nothing for them."""
|
|
mem = program.getMemory()
|
|
chars = []
|
|
i = 0
|
|
while i < maxlen:
|
|
try:
|
|
b = mem.getByte(addr.add(i)) & 0xff
|
|
except: # noqa: E722 - Java MemoryAccessException isn't a Python Exception under Jython
|
|
return None
|
|
if b == 0:
|
|
break
|
|
if b < 0x20 or b > 0x7e:
|
|
return None # not a clean type-name literal
|
|
chars.append(chr(b))
|
|
i += 1
|
|
return "".join(chars) if chars else None
|
|
|
|
|
|
def _string_at(program, addr):
|
|
if addr is None or not addr.isMemoryAddress():
|
|
return None # skip stack/register/constant refs (e.g. inline CXString stack buffers)
|
|
d = program.getListing().getDataAt(addr)
|
|
if d is not None and d.hasStringValue():
|
|
return str(d.getValue())
|
|
return _read_cstring(program, addr) # fall back to raw memory for undefined literals
|
|
|
|
|
|
def lookback_string(program, recent):
|
|
"""Most recent PUSH that references (or immediately points at) a string constant."""
|
|
af = program.getAddressFactory().getDefaultAddressSpace()
|
|
for instr in reversed(recent):
|
|
if instr.getMnemonicString() != "PUSH":
|
|
continue
|
|
for ref in instr.getReferencesFrom():
|
|
s = _string_at(program, ref.getToAddress())
|
|
if s is not None:
|
|
return s
|
|
sc = instr.getScalar(0) # fallback: treat the immediate as an address
|
|
if sc is not None:
|
|
try:
|
|
s = _string_at(program, af.getAddress(sc.getUnsignedValue()))
|
|
if s is not None:
|
|
return s
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def lookback_scalar(recent):
|
|
"""Most recent PUSH of a plain integer immediate (the operator_new size)."""
|
|
for instr in reversed(recent):
|
|
if instr.getMnemonicString() == "PUSH":
|
|
sc = instr.getScalar(0)
|
|
if sc is not None and not instr.getReferencesFrom():
|
|
return int(sc.getUnsignedValue())
|
|
return None
|
|
|
|
|
|
# A load from an object field: `[<reg> + 0xNN]` with a general register base (not the stack
|
|
# pointer/base ESP/EBP). The module-interface dispatch branches fetch their container from a
|
|
# field of `this` this way, while the direct branches pass `this` straight from a register.
|
|
# Offset-agnostic, so it survives different compilers/struct layouts.
|
|
_FIELD_LOAD = re.compile(r"\[E(?:AX|BX|CX|DX|SI|DI) \+ 0x[0-9a-fA-F]+\]")
|
|
|
|
|
|
def _branch_uses_field_load(branch):
|
|
for instr in branch:
|
|
if _FIELD_LOAD.search(instr.toString()):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _imm_string(program, instr):
|
|
"""If any operand is an immediate that points at a printable string, return that string.
|
|
|
|
Used for method names: they are loaded as `MOV reg, <strptr>`, and the immediate is the
|
|
true string start. Relying on references instead breaks under MSVC6, whose inline strcpy
|
|
references string+1 (first char handled separately), truncating the name by one char."""
|
|
af = program.getAddressFactory().getDefaultAddressSpace()
|
|
for opi in range(instr.getNumOperands()):
|
|
sc = instr.getScalar(opi)
|
|
if sc is None:
|
|
continue
|
|
try:
|
|
s = _string_at(program, af.getAddress(sc.getUnsignedValue()))
|
|
except: # noqa: E722
|
|
continue
|
|
if s is not None:
|
|
return s
|
|
return None
|
|
|
|
|
|
# --------------------------------------------------------------------------- extractors
|
|
|
|
def extract_types(program, factory):
|
|
"""Walk CMC_ObjectsContainer::resolve, recovering the type-dispatch ladder.
|
|
|
|
Per branch the engine emits, in order:
|
|
CALL operator==(typeStr, "NAME") -> script type name
|
|
CALL operator_new(SIZE) -> object allocation size
|
|
CALL <ctor>(...) -> the next call IS the constructor
|
|
|
|
The constructor is detected *structurally* (first CALL after operator_new), not by name:
|
|
several ctors are unnamed FUN_xxxx (e.g. CMC_Text, CMC_Movie) and name-gating dropped them.
|
|
"""
|
|
listing = program.getListing()
|
|
types = []
|
|
recent = []
|
|
branch = [] # instructions since the current branch's operator== match
|
|
pending_name = None
|
|
pending_size = None
|
|
armed = False # set by operator_new; the next CALL is the object's constructor
|
|
|
|
it = listing.getInstructions(factory.getBody(), True)
|
|
while it.hasNext():
|
|
instr = it.next()
|
|
if instr.getMnemonicString() == "CALL":
|
|
f = call_target(program, instr)
|
|
tname = f.getName() if f is not None else None
|
|
if tname == OP_EQ:
|
|
s = lookback_string(program, recent)
|
|
if s is not None:
|
|
pending_name = s
|
|
branch = []
|
|
elif tname == OP_NEW:
|
|
pending_size = lookback_scalar(recent)
|
|
armed = True
|
|
elif armed:
|
|
if pending_name is not None and f is not None:
|
|
ctor = resolve_thunk(f)
|
|
cls = f.getName()
|
|
types.append({
|
|
"script_name": pending_name,
|
|
"cpp_class": cls if cls.startswith(CMC_PREFIX) else None,
|
|
"ctor_addr": "0x%x" % ctor.getEntryPoint().getOffset(),
|
|
"object_size": pending_size,
|
|
"dispatch_addr": "0x%x" % instr.getAddress().getOffset(),
|
|
"via_module_iface": _branch_uses_field_load(branch),
|
|
})
|
|
pending_name = None
|
|
pending_size = None
|
|
armed = False
|
|
recent.append(instr)
|
|
branch.append(instr)
|
|
if len(recent) > LOOKBACK:
|
|
recent.pop(0)
|
|
return types
|
|
|
|
|
|
def _owner_from_runner(runner_name):
|
|
"""CMC_Animo_Runner -> CMC_Animo ; CMC_Runner -> CMC (the base that holds global methods)."""
|
|
if runner_name.endswith("_Runner"):
|
|
return runner_name[:-len("_Runner")]
|
|
return runner_name
|
|
|
|
|
|
def _is_method_name(s):
|
|
return bool(s) and all(c.isalnum() or c == "_" for c in s)
|
|
|
|
|
|
def _extract_methods_from(program, runner_func):
|
|
"""Walk one CMC_*_Runner::prepareMthHashSet, recovering its directly-registered methods.
|
|
|
|
Per method the engine emits: new CInteger(ID) ; new CStringHashCode("NAME") ; CHashtable::put.
|
|
The method name is *loaded* (MOV), not pushed, so we track the most recent string literal
|
|
referenced by any instruction rather than scanning PUSH operands. The leading call to the
|
|
base class's prepareMthHashSet gives the inheritance link (inherited/global methods)."""
|
|
listing = program.getListing()
|
|
methods = []
|
|
base_runner = None
|
|
last_string = None
|
|
last_id = None
|
|
pending_name = None
|
|
recent = []
|
|
|
|
ns = runner_func.getParentNamespace()
|
|
runner = ns.getName() if ns is not None else "?"
|
|
owner = _owner_from_runner(runner)
|
|
|
|
it = listing.getInstructions(resolve_thunk(runner_func).getBody(), True)
|
|
while it.hasNext():
|
|
instr = it.next()
|
|
s = _imm_string(program, instr)
|
|
if s is not None and _is_method_name(s):
|
|
last_string = s
|
|
if instr.getMnemonicString() == "CALL":
|
|
cf = call_target(program, instr)
|
|
cname = cf.getName() if cf is not None else None
|
|
cns = cf.getParentNamespace() if cf is not None else None
|
|
cns_name = cns.getName() if cns is not None else None
|
|
if cname == "prepareMthHashSet":
|
|
if base_runner is None:
|
|
base_runner = cns_name
|
|
elif cname == "CInteger":
|
|
last_id = lookback_scalar(recent)
|
|
elif cname == "CStringHashCode":
|
|
pending_name = last_string
|
|
elif cname == "put" and cns_name == "CHashtable":
|
|
if pending_name is not None:
|
|
methods.append({"owner": owner, "runner": runner,
|
|
"name": pending_name, "id": last_id})
|
|
pending_name = None
|
|
recent.append(instr)
|
|
if len(recent) > LOOKBACK:
|
|
recent.pop(0)
|
|
return methods, runner, base_runner
|
|
|
|
|
|
def extract_methods(program):
|
|
"""Returns (methods, inheritance). `methods` lists each Runner's directly-registered
|
|
methods; `inheritance` maps each runner to the base runner it chains to, so the consumer
|
|
can compose the full (incl. global) method set per type. Method id -> vtable address
|
|
correlation is a later step."""
|
|
fm = program.getFunctionManager()
|
|
methods = []
|
|
inheritance = []
|
|
it = fm.getFunctions(True)
|
|
while it.hasNext():
|
|
f = it.next()
|
|
if f.getName() != "prepareMthHashSet":
|
|
continue
|
|
own, runner, base = _extract_methods_from(program, f)
|
|
methods.extend(own)
|
|
if base is not None:
|
|
inheritance.append({"runner": runner, "base_runner": base})
|
|
return methods, inheritance
|
|
|
|
|
|
_VTBL_OFF = re.compile(r"\[\w+ \+ (0x[0-9a-fA-F]+)\]")
|
|
_MEM_OFF = re.compile(r"\[\w+ \+ (0x[0-9a-fA-F]+)\]")
|
|
|
|
|
|
def _is_generic_name(name):
|
|
"""A compiler-assigned placeholder, not a real symbol."""
|
|
return (not name) or name.startswith("FUN_") or name.startswith("thunk_") or name.startswith("LAB_")
|
|
|
|
|
|
def _qualified(f):
|
|
if f is None:
|
|
return None
|
|
ns = f.getParentNamespace()
|
|
nm = f.getName()
|
|
return (ns.getName() + "::" + nm) if (ns is not None and ns.getName() != "Global") else nm
|
|
|
|
|
|
def _call_anchor(program, instr):
|
|
"""A normalisable, compiler-tolerant fingerprint of one CALL inside a switch case.
|
|
|
|
Direct call -> "Namespace::name". On MSVC8 the symbol sits on the ILT *stub* while the body
|
|
is an unnamed FUN_, so we keep the stub's name and only fall back to the thunk-resolved body
|
|
when the direct name is itself a placeholder. Indirect virtual call -> "vtbl+0xNN" from the
|
|
displacement, which abstracts away the register holding `this`."""
|
|
cf = call_target(program, instr)
|
|
if cf is not None:
|
|
if not _is_generic_name(cf.getName()):
|
|
return _qualified(cf)
|
|
resolved = resolve_thunk(cf)
|
|
if resolved is not None and not _is_generic_name(resolved.getName()):
|
|
return _qualified(resolved)
|
|
return _qualified(cf)
|
|
m = _VTBL_OFF.search(instr.toString())
|
|
if m is not None:
|
|
return "vtbl+" + m.group(1)
|
|
return None
|
|
|
|
|
|
def _walk_calls(program, start_addr, stops, limit=80):
|
|
"""Walk a straight-line block from `start_addr`, returning (anchors, funcs):
|
|
|
|
* `anchors` - ordered CALL fingerprints (see `_call_anchor`), additionally recovering the
|
|
`MOV reg,[base+0xNN]` / `CALL reg` virtual-call idiom (MSVC8) as `vtbl+0xNN`, so it matches
|
|
the `CALL [reg+0xNN]` form (MSVC6).
|
|
* `funcs` - the (anchor, entry) of each *direct* call to a real function, used to detect a
|
|
thin wrapper case that just forwards to a named/unnamed submethod.
|
|
|
|
Stops at a RET, an unconditional jump, a `stops` address, or after `limit` instructions."""
|
|
listing = program.getListing()
|
|
instr = listing.getInstructionAt(start_addr)
|
|
anchors = []
|
|
funcs = []
|
|
regoff = {} # register -> vtable offset most recently loaded into it
|
|
n = 0
|
|
while instr is not None and n < limit:
|
|
if n > 0 and instr.getAddress() in stops:
|
|
break
|
|
n += 1
|
|
mn = instr.getMnemonicString()
|
|
if mn == "MOV" and instr.getNumOperands() >= 2:
|
|
dst = instr.getDefaultOperandRepresentation(0)
|
|
m = _MEM_OFF.search(instr.toString())
|
|
if m is not None:
|
|
regoff[dst] = m.group(1)
|
|
else:
|
|
regoff.pop(dst, None)
|
|
elif mn == "CALL":
|
|
a = _call_anchor(program, instr)
|
|
if a is None: # CALL reg -> use the offset last loaded into that register
|
|
op0 = instr.getDefaultOperandRepresentation(0)
|
|
if op0 in regoff:
|
|
a = "vtbl+" + regoff[op0]
|
|
if a is not None:
|
|
anchors.append(a)
|
|
cf = call_target(program, instr)
|
|
if cf is not None:
|
|
body = resolve_thunk(cf)
|
|
if body is not None:
|
|
funcs.append((a, body.getEntryPoint()))
|
|
ft = instr.getFlowType()
|
|
if ft.isTerminal() or (ft.isJump() and not ft.isConditional()):
|
|
break
|
|
instr = instr.getNext()
|
|
return anchors, funcs
|
|
|
|
|
|
_SWITCH_JMP = re.compile(r"\[(\w+)\*0x4 \+ (0x[0-9a-fA-F]+)\]")
|
|
_LEA_DISP = re.compile(r"\[\w+ \+ (-?0x[0-9a-fA-F]+)\]")
|
|
|
|
|
|
def _lea_disp(instr):
|
|
"""Signed displacement of a `LEA reg,[base + disp]`, parsed from text when getScalar misses."""
|
|
m = _LEA_DISP.search(instr.toString())
|
|
return int(m.group(1), 16) if m is not None else None
|
|
|
|
|
|
def _parse_switch(program, func):
|
|
"""Recover the dense jump-table switch of a `run` function at the disassembly level
|
|
(decompiler-independent, so it survives the big inline-heavy runners). Both MSVC6 and
|
|
MSVC8 emit the same shape:
|
|
|
|
LEA idx,[reg - base] ; CMP idx, range ; JA default ; JMP [idx*4 + TABLE]
|
|
|
|
Returns {table, base, count} or None. `id = table_index + base`; `count = range + 1`."""
|
|
listing = program.getListing()
|
|
instrs = []
|
|
it = listing.getInstructions(func.getBody(), True)
|
|
while it.hasNext():
|
|
instrs.append(it.next())
|
|
|
|
idx_reg = table = jmp_addr = None
|
|
for instr in instrs:
|
|
if instr.getMnemonicString() == "JMP" and instr.getFlowType().isComputed():
|
|
m = _SWITCH_JMP.search(instr.toString())
|
|
if m is not None:
|
|
idx_reg = m.group(1)
|
|
space = program.getAddressFactory().getDefaultAddressSpace()
|
|
table = space.getAddress(int(m.group(2), 16))
|
|
jmp_addr = instr.getAddress()
|
|
break
|
|
if table is None:
|
|
return None
|
|
|
|
base = 0
|
|
count = None
|
|
for instr in instrs:
|
|
if instr.getAddress().equals(jmp_addr):
|
|
break
|
|
if instr.getNumOperands() == 0 or instr.getDefaultOperandRepresentation(0) != idx_reg:
|
|
continue
|
|
mn = instr.getMnemonicString()
|
|
s = instr.getScalar(1)
|
|
if mn == "CMP" and s is not None:
|
|
count = int(s.getValue()) + 1
|
|
elif mn == "LEA": # LEA idx,[reg - k] -> id = index + k
|
|
disp = int(s.getValue()) if s is not None else _lea_disp(instr)
|
|
if disp is not None:
|
|
base = -disp
|
|
elif mn == "SUB" and s is not None:
|
|
base = int(s.getValue())
|
|
elif mn == "ADD" and s is not None:
|
|
base = -int(s.getValue())
|
|
elif mn == "DEC":
|
|
base = 1
|
|
return {"table": table, "base": base, "count": count}
|
|
|
|
|
|
def extract_method_dispatch(program):
|
|
"""For each CMC_*_Runner::run, recover how method ids map to their implementation.
|
|
|
|
`run(int id, ...)` is a `switch(id)` (vtable slot 17, overridden per runner) whose every
|
|
`case id:` is the method body - either a tail-call to a named submethod (BlooMoo/MSVC8
|
|
keeps show()/load()/... as separate functions) or inline code whose leaves are virtual
|
|
calls on the wrapped object (Piklib/MSVC6). We fingerprint each case by its ordered CALL
|
|
anchors, so a later pass can diff method *bodies* by (owner, id). Join names via `methods`."""
|
|
fm = program.getFunctionManager()
|
|
out = []
|
|
it = fm.getFunctions(True)
|
|
while it.hasNext():
|
|
f = it.next()
|
|
if f.getName() != "run":
|
|
continue
|
|
ns = f.getParentNamespace()
|
|
runner = ns.getName() if ns is not None else "?"
|
|
if not runner.endswith("_Runner"):
|
|
continue
|
|
try:
|
|
out.extend(_dispatch_from_run(program, f, _owner_from_runner(runner), runner))
|
|
except Exception as e: # one malformed runner shouldn't sink the whole axis
|
|
print("[!] method_dispatch %s: %s" % (runner, e))
|
|
return out
|
|
|
|
|
|
def _dispatch_from_run(program, run_func, owner, runner):
|
|
run_func = resolve_thunk(run_func)
|
|
sw = _parse_switch(program, run_func)
|
|
if sw is None:
|
|
return []
|
|
count = sw["count"]
|
|
if count is None or count < 1 or count > 4096:
|
|
return []
|
|
|
|
mem = program.getMemory()
|
|
space = program.getAddressFactory().getDefaultAddressSpace()
|
|
targets = []
|
|
for i in range(count):
|
|
try:
|
|
val = mem.getInt(sw["table"].add(i * 4)) & 0xffffffff
|
|
except Exception:
|
|
break
|
|
targets.append(space.getAddress(val))
|
|
|
|
stops = set(targets)
|
|
rows = []
|
|
for i in range(len(targets)):
|
|
anchors, funcs = _walk_calls(program, targets[i], stops)
|
|
# A thin wrapper case forwards to one submethod: the real body (and its leaf anchors)
|
|
# live in that function. Expanding one level makes MSVC8 (separate show()/load()) line up
|
|
# with MSVC6 (inline), so `calls` is a compiler-tolerant body fingerprint.
|
|
if len(anchors) == 1 and len(funcs) == 1 and funcs[0][0] == anchors[0]:
|
|
impl = funcs[0][0]
|
|
impl_entry = funcs[0][1]
|
|
impl_addr = "0x%x" % impl_entry.getOffset()
|
|
calls, _ = _walk_calls(program, impl_entry, set())
|
|
else:
|
|
impl = None
|
|
impl_addr = "0x%x" % targets[i].getOffset() # body is inline in the case block
|
|
calls = anchors
|
|
rows.append({
|
|
"owner": owner, "runner": runner, "id": i + sw["base"],
|
|
"case_addr": "0x%x" % targets[i].getOffset(),
|
|
"impl": impl, "impl_addr": impl_addr, "calls": calls,
|
|
})
|
|
return rows
|
|
|
|
|
|
def extract_events(program):
|
|
"""Per CMC_*::getBehavioursList, collect the ordered event-name literals (ONINIT, ONDONE, ...).
|
|
|
|
The function builds a flat CXString[] with each name inlined; there is no base-class call, so
|
|
each class's list is self-contained (no inheritance chain, unlike methods). Each literal is
|
|
loaded twice per entry (strlen + memcpy), so consecutive duplicates are collapsed."""
|
|
fm = program.getFunctionManager()
|
|
listing = program.getListing()
|
|
events = []
|
|
it = fm.getFunctions(True)
|
|
while it.hasNext():
|
|
f = it.next()
|
|
if f.getName() != "getBehavioursList":
|
|
continue
|
|
ns = f.getParentNamespace()
|
|
owner = ns.getName() if ns is not None else "?"
|
|
order = 0
|
|
last = None
|
|
ins = listing.getInstructions(resolve_thunk(f).getBody(), True)
|
|
while ins.hasNext():
|
|
s = _imm_string(program, ins.next())
|
|
if s is not None and _is_method_name(s) and s != last:
|
|
events.append({"owner": owner, "name": s, "order": order})
|
|
order += 1
|
|
last = s
|
|
return events
|
|
|
|
|
|
def _is_cmc_ctor(func):
|
|
ns = func.getParentNamespace()
|
|
return ns is not None and func.getName() == ns.getName() and func.getName().startswith(CMC_PREFIX)
|
|
|
|
|
|
def _this_varnodes(high):
|
|
"""Varnodes that represent the `this` parameter (parameter category index 0)."""
|
|
out = set()
|
|
syms = high.getLocalSymbolMap().getSymbols()
|
|
while syms.hasNext():
|
|
s = syms.next()
|
|
if s.isParameter() and s.getCategoryIndex() == 0:
|
|
hv = s.getHighVariable()
|
|
if hv is not None:
|
|
for vn in hv.getInstances():
|
|
out.add(vn)
|
|
return out
|
|
|
|
|
|
def _trace_this_offset(vn, this_vns, depth=0):
|
|
"""If `vn` is `this + constant`, return the constant; else None. Walks the def chain through
|
|
the address arithmetic the decompiler emits (this abstracts away which register held `this`)."""
|
|
if vn is None or depth > 12:
|
|
return None
|
|
if vn in this_vns:
|
|
return 0
|
|
d = vn.getDef()
|
|
if d is None:
|
|
return None
|
|
op = d.getOpcode()
|
|
ins = d.getInputs()
|
|
if op in (PcodeOp.INT_ADD, PcodeOp.PTRADD):
|
|
a, b = ins[0], ins[1]
|
|
if b.isConstant():
|
|
base = _trace_this_offset(a, this_vns, depth + 1)
|
|
if base is not None:
|
|
step = ins[2].getOffset() if (op == PcodeOp.PTRADD and len(ins) > 2) else 1
|
|
return base + b.getOffset() * step
|
|
if a.isConstant():
|
|
base = _trace_this_offset(b, this_vns, depth + 1)
|
|
if base is not None:
|
|
return base + a.getOffset()
|
|
elif op == PcodeOp.PTRSUB:
|
|
b = ins[1]
|
|
if b.isConstant():
|
|
base = _trace_this_offset(ins[0], this_vns, depth + 1)
|
|
if base is not None:
|
|
return base + b.getOffset()
|
|
elif op in (PcodeOp.COPY, PcodeOp.CAST, PcodeOp.INT_ZEXT, PcodeOp.INT_SEXT):
|
|
return _trace_this_offset(ins[0], this_vns, depth + 1)
|
|
return None
|
|
|
|
|
|
def _is_vtable_value(program, vn):
|
|
"""True if the stored value is a constant pointer to a *vftable* symbol."""
|
|
if not vn.isConstant():
|
|
return False
|
|
try:
|
|
a = program.getAddressFactory().getDefaultAddressSpace().getAddress(vn.getOffset())
|
|
except: # noqa: E722
|
|
return False
|
|
sym = program.getSymbolTable().getPrimarySymbol(a)
|
|
return sym is not None and "vftable" in sym.getName().lower()
|
|
|
|
|
|
def _base_ctor(program, ctor):
|
|
"""Direct base class = the first CMC_* constructor this ctor calls (conventionally first)."""
|
|
listing = program.getListing()
|
|
it = listing.getInstructions(resolve_thunk(ctor).getBody(), True)
|
|
while it.hasNext():
|
|
instr = it.next()
|
|
if instr.getMnemonicString() == "CALL":
|
|
cf = call_target(program, instr)
|
|
if cf is not None and _is_cmc_ctor(cf) and cf.getName() != ctor.getName():
|
|
return cf.getName()
|
|
return None
|
|
|
|
|
|
def _extract_fields_from(program, ifc, ctor):
|
|
"""Recover this-relative STORE offsets from one CMC_* constructor via decompiler P-code."""
|
|
res = ifc.decompileFunction(ctor, 60, monitor)
|
|
if res is None or not res.decompileCompleted():
|
|
return [], None
|
|
high = res.getHighFunction()
|
|
if high is None:
|
|
return [], None
|
|
this_vns = _this_varnodes(high)
|
|
if not this_vns:
|
|
return [], None
|
|
owner = ctor.getParentNamespace().getName()
|
|
fields = {}
|
|
ops = high.getPcodeOps()
|
|
while ops.hasNext():
|
|
op = ops.next()
|
|
if op.getOpcode() != PcodeOp.STORE:
|
|
continue
|
|
off = _trace_this_offset(op.getInput(1), this_vns)
|
|
if off is None or off < 0:
|
|
continue
|
|
val = op.getInput(2)
|
|
rec = {"owner": owner, "offset": off, "size": val.getSize(),
|
|
"is_vtable": _is_vtable_value(program, val), "confidence": "high"}
|
|
cur = fields.get(off)
|
|
if cur is None or rec["size"] > cur["size"]:
|
|
fields[off] = rec
|
|
return list(fields.values()), _base_ctor(program, ctor)
|
|
|
|
|
|
_PROP_TYPE = [("Bool", "bool"), ("Int", "int"), ("Double", "double"), ("Float", "double"),
|
|
("List", "list"), ("Point", "point"), ("Size", "size"), ("Rect", "rect")]
|
|
|
|
|
|
def _prop_type(getter):
|
|
"""Map a CMElement::getProperty<T>Value getter name to the script field type."""
|
|
for needle, ty in _PROP_TYPE:
|
|
if needle in getter:
|
|
return ty
|
|
return "string" # plain getPropertyValue
|
|
|
|
|
|
def extract_script_fields(program):
|
|
"""The script-visible named fields each type exposes (FILENAME, FPS, PRELOAD, VISIBLE, ...).
|
|
|
|
The CMC_* constructor reads them from the element: it builds the property-name literal and
|
|
calls CMElement::getProperty<T>Value(...). We anchor on that getter call, take the preceding
|
|
string literal as the field name, and derive the field type from the getter. Returns a flat
|
|
list deduped per (owner, name)."""
|
|
fm = program.getFunctionManager()
|
|
listing = program.getListing()
|
|
by_key = {} # (owner, name) -> record
|
|
next_order = {} # owner -> next order index
|
|
it = fm.getFunctions(True)
|
|
while it.hasNext():
|
|
f = it.next()
|
|
if not _is_cmc_ctor(f):
|
|
continue
|
|
owner = f.getParentNamespace().getName()
|
|
last_string = None
|
|
ins = listing.getInstructions(resolve_thunk(f).getBody(), True)
|
|
while ins.hasNext():
|
|
instr = ins.next()
|
|
s = _imm_string(program, instr)
|
|
if s is not None and _is_method_name(s):
|
|
last_string = s
|
|
if instr.getMnemonicString() == "CALL":
|
|
cf = call_target(program, instr)
|
|
cname = cf.getName() if cf is not None else None
|
|
if cname is not None and cname.startswith("getProperty") and last_string is not None:
|
|
key = (owner, last_string)
|
|
if key not in by_key:
|
|
o = next_order.get(owner, 0)
|
|
by_key[key] = {"owner": owner, "name": last_string,
|
|
"type": _prop_type(cname), "order": o}
|
|
next_order[owner] = o + 1
|
|
last_string = None
|
|
return sorted(by_key.values(), key=lambda r: (r["owner"], r["order"]))
|
|
|
|
|
|
def extract_struct_layout(program):
|
|
"""Bonus/fuzzy axis: per CMC_* constructor, recover the this-relative C++ field stores
|
|
(offset, size, is_vtable) and the base-class link. Offsets come from decompiler P-code, so
|
|
they are normalised across compilers. Returns (layout, field_inheritance)."""
|
|
fm = program.getFunctionManager()
|
|
ifc = DecompInterface()
|
|
ifc.openProgram(program)
|
|
try:
|
|
by_key = {} # (owner, offset) -> field record, deduped across overloaded ctors
|
|
inheritance = {} # class -> base_class
|
|
it = fm.getFunctions(True)
|
|
while it.hasNext():
|
|
f = it.next()
|
|
if not _is_cmc_ctor(f):
|
|
continue
|
|
own, base = _extract_fields_from(program, ifc, f)
|
|
for rec in own:
|
|
key = (rec["owner"], rec["offset"])
|
|
cur = by_key.get(key)
|
|
if cur is None or rec["size"] > cur["size"]:
|
|
by_key[key] = rec
|
|
if base is not None and f.getName() not in inheritance:
|
|
inheritance[f.getName()] = base
|
|
fields = sorted(by_key.values(), key=lambda r: (r["owner"], r["offset"]))
|
|
field_inheritance = [{"class": k, "base_class": v} for k, v in inheritance.items()]
|
|
return fields, field_inheritance
|
|
finally:
|
|
ifc.dispose()
|
|
|
|
|
|
# --------------------------------------------------------------------------- metadata
|
|
|
|
def detect_engine(factory):
|
|
"""(engine, compiler) from the factory's enclosing namespace. Compiler is heuristic."""
|
|
ns = factory.getParentNamespace()
|
|
parent = ns.getParentNamespace() if ns is not None else None
|
|
if parent is not None and parent.getName() == "BlooMooDLL":
|
|
return ("BlooMoo", "MSVC8")
|
|
return ("Piklib", "MSVC6")
|
|
|
|
|
|
def default_out_path(program):
|
|
"""Write into <repo>/snapshots/ (derived from this script's own location),
|
|
falling back to the OS temp dir if the layout is unexpected or unwritable."""
|
|
name = program.getName() + ".snapshot.json"
|
|
try:
|
|
scripts_dir = os.path.dirname(getSourceFile().getAbsolutePath()) # .../ghidra_scripts
|
|
out_dir = os.path.join(os.path.dirname(scripts_dir), "snapshots")
|
|
if not os.path.isdir(out_dir):
|
|
os.makedirs(out_dir)
|
|
return os.path.join(out_dir, name)
|
|
except Exception:
|
|
return os.path.join(tempfile.gettempdir(), name)
|
|
|
|
|
|
def sha256_of(program):
|
|
try:
|
|
path = program.getExecutablePath()
|
|
fh = open(path, "rb")
|
|
try:
|
|
return hashlib.sha256(fh.read()).hexdigest()
|
|
finally:
|
|
fh.close()
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# --------------------------------------------------------------------------- main
|
|
|
|
def run():
|
|
program = currentProgram # GhidraScript/pyghidra inject this global, not `program`
|
|
factory = find_function_by_qualified(program, "CMC_ObjectsContainer", "resolve")
|
|
if factory is None:
|
|
print("[!] CMC_ObjectsContainer::resolve not found - is this a Piklib/BlooMoo DLL?")
|
|
return
|
|
|
|
engine, compiler = detect_engine(factory) # namespace lives on the symbol/stub
|
|
factory = resolve_thunk(factory) # MSVC8 exposes the symbol as a JMP stub
|
|
types = extract_types(program, factory)
|
|
methods, method_inheritance = extract_methods(program)
|
|
events = extract_events(program)
|
|
fields = extract_script_fields(program)
|
|
struct_layout, field_inheritance = extract_struct_layout(program)
|
|
method_dispatch = extract_method_dispatch(program)
|
|
|
|
snapshot = {
|
|
"schema_version": 4,
|
|
"binary": {
|
|
"name": program.getName(),
|
|
"sha256": sha256_of(program),
|
|
"engine": engine,
|
|
"compiler": compiler,
|
|
"factory_addr": "0x%x" % factory.getEntryPoint().getOffset(),
|
|
},
|
|
"types": types,
|
|
"methods": methods,
|
|
"method_inheritance": method_inheritance,
|
|
"events": events,
|
|
"fields": fields,
|
|
"field_inheritance": field_inheritance,
|
|
"struct_layout": struct_layout,
|
|
"method_dispatch": method_dispatch,
|
|
}
|
|
|
|
args = getScriptArgs()
|
|
out_path = args[0] if len(args) > 0 else default_out_path(program)
|
|
fh = open(out_path, "w")
|
|
try:
|
|
fh.write(json.dumps(snapshot, indent=2, sort_keys=True))
|
|
finally:
|
|
fh.close()
|
|
|
|
print("[+] %s [%s/%s]: %d types, %d methods, %d events, %d fields (%d layout, %d dispatch) -> %s" % (
|
|
program.getName(), engine, compiler, len(types), len(methods),
|
|
len(events), len(fields), len(struct_layout), len(method_dispatch), out_path))
|
|
|
|
|
|
run()
|