Files
Aidem-Media-DLL-Analysis/ghidra_scripts/extract_engine_surface.py
Patryk Gensch 67cbc32a2c Support Piklib 6.1/7.1: CMC_Scene::resolve factory + tag-based types
Earlier text-script engines (Piklib 6.1/7.1, added text scripts in 6.1) keep the
type factory on CMC_Scene::resolve, not CMC_ObjectsContainer::resolve — so the
extractor bailed with "resolve not found". find_factory() now tries both anchors.

6.1's factory is also tag-based: each branch is operator==(NAME) -> new(0x74) ->
store tag -> jmp, with the ctor in a separate tag switch (no inline ctor). extract_types
gains a pre-emit: when the next operator== arrives still armed, it records the pending
type by name (size known, ctor/cpp_class not). The 8.x inline-ctor factory clears `armed`
first, so it's untouched (golden pair unchanged).

Per-version reality: 6.1 = 23 types / 0 methods (no prepareMthHashSet yet) / 103 events
/ 80 fields; 7.1 = 26 / 322 / 102 / 86 / 288 dispatch (full); type names line up across
6.1->7.1->8.x so version diffs work.

- snapshots/PIKLib61 + PIKLIB71 added as golden fixtures (evolution chain)
- tests/test_versions.py: 6.1 partial surface, 7.1 full, 61->71 diff -> 38/38

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-31 19:53:47 +02:00

923 lines
37 KiB
Python

# Extract the "engine surface" (types/methods/events/fields) from a Piklib/BlooMoo
# engine DLL and emit a snapshot.json for cross-version diffing.
#
# Runs as a Ghidra **headless** post-script. Compatible with both pyghidra (CPython 3)
# and the bundled Jython 2.7, so it avoids f-strings and py3-only APIs.
#
# Usage (headless):
# analyzeHeadless <projDir> <projName> -process PIKLIB8.dll \
# -postScript extract_engine_surface.py /abs/path/out.snapshot.json
#
# Design note: extraction stands on SEMANTIC ANCHORS (call targets, referenced string
# constants, push immediates), never on decompiled-C text. That is what makes the same
# script work across MSVC6 (Piklib) and MSVC8 (BlooMoo) despite very different codegen.
#
# @category AidemMedia
from __future__ import print_function
import json
import hashlib
import os
import re
import tempfile
from ghidra.app.decompiler import DecompInterface
from ghidra.program.model.pcode import PcodeOp
# Names of the helper functions the factory dispatch relies on. These survive demangling
# identically on both compilers.
OP_NEW = "operator_new"
OP_EQ = "operator=="
CMC_PREFIX = "CMC_"
# How many preceding instructions to inspect when recovering a PUSH argument.
LOOKBACK = 8
# --------------------------------------------------------------------------- helpers
def find_function_by_qualified(program, class_name, method_name):
"""Find a function `method_name` whose immediate parent namespace is `class_name`."""
fm = program.getFunctionManager()
it = fm.getFunctions(True)
while it.hasNext():
f = it.next()
if f.getName() != method_name:
continue
ns = f.getParentNamespace()
if ns is not None and ns.getName() == class_name:
return f
return None
def call_target(program, instr):
"""Resolve the (possibly thunked) function a direct CALL points at, or None."""
fm = program.getFunctionManager()
for a in instr.getFlows():
f = fm.getFunctionAt(a)
if f is not None:
return f
return None
def resolve_thunk(func):
"""Follow an incremental-linking (ILT) thunk to the real function. MSVC8 places a
`JMP real` stub at the symbol address; the actual body lives elsewhere.
Two cases: (a) Ghidra modelled it as a real thunk function, or (b) it didn't, and the
stub is just a function whose entire body is one unconditional `JMP real` (the symbol
`resolve` sits on the stub, while the body is e.g. FUN_xxxx)."""
if func is None:
return func
if func.isThunk():
return func.getThunkedFunction(True)
instr = currentProgram.getListing().getInstructionAt(func.getEntryPoint())
if instr is not None:
ft = instr.getFlowType()
if ft.isJump() and not ft.isConditional():
flows = instr.getFlows()
if len(flows) == 1:
target = currentProgram.getFunctionManager().getFunctionAt(flows[0])
if target is not None:
return target
return func
def _read_cstring(program, addr, maxlen=64):
"""Read a NUL-terminated printable-ASCII string straight from memory. Needed because
some type-name literals (e.g. "BOOL", "FONT") are referenced but never defined as Data,
so getDataAt() returns nothing for them."""
mem = program.getMemory()
chars = []
i = 0
while i < maxlen:
try:
b = mem.getByte(addr.add(i)) & 0xff
except: # noqa: E722 - Java MemoryAccessException isn't a Python Exception under Jython
return None
if b == 0:
break
if b < 0x20 or b > 0x7e:
return None # not a clean type-name literal
chars.append(chr(b))
i += 1
return "".join(chars) if chars else None
def _string_at(program, addr):
if addr is None or not addr.isMemoryAddress():
return None # skip stack/register/constant refs (e.g. inline CXString stack buffers)
d = program.getListing().getDataAt(addr)
if d is not None and d.hasStringValue():
return str(d.getValue())
return _read_cstring(program, addr) # fall back to raw memory for undefined literals
def lookback_string(program, recent):
"""Most recent PUSH that references (or immediately points at) a string constant."""
af = program.getAddressFactory().getDefaultAddressSpace()
for instr in reversed(recent):
if instr.getMnemonicString() != "PUSH":
continue
for ref in instr.getReferencesFrom():
s = _string_at(program, ref.getToAddress())
if s is not None:
return s
sc = instr.getScalar(0) # fallback: treat the immediate as an address
if sc is not None:
try:
s = _string_at(program, af.getAddress(sc.getUnsignedValue()))
if s is not None:
return s
except Exception:
pass
return None
def lookback_scalar(recent):
"""Most recent PUSH of a plain integer immediate (the operator_new size)."""
for instr in reversed(recent):
if instr.getMnemonicString() == "PUSH":
sc = instr.getScalar(0)
if sc is not None and not instr.getReferencesFrom():
return int(sc.getUnsignedValue())
return None
# A load from an object field: `[<reg> + 0xNN]` with a general register base (not the stack
# pointer/base ESP/EBP). The module-interface dispatch branches fetch their container from a
# field of `this` this way, while the direct branches pass `this` straight from a register.
# Offset-agnostic, so it survives different compilers/struct layouts.
_FIELD_LOAD = re.compile(r"\[E(?:AX|BX|CX|DX|SI|DI) \+ 0x[0-9a-fA-F]+\]")
def _branch_uses_field_load(branch):
for instr in branch:
if _FIELD_LOAD.search(instr.toString()):
return True
return False
def _imm_string(program, instr):
"""If any operand is an immediate that points at a printable string, return that string.
Used for method names: they are loaded as `MOV reg, <strptr>`, and the immediate is the
true string start. Relying on references instead breaks under MSVC6, whose inline strcpy
references string+1 (first char handled separately), truncating the name by one char."""
af = program.getAddressFactory().getDefaultAddressSpace()
for opi in range(instr.getNumOperands()):
sc = instr.getScalar(opi)
if sc is None:
continue
try:
s = _string_at(program, af.getAddress(sc.getUnsignedValue()))
except: # noqa: E722
continue
if s is not None:
return s
return None
# --------------------------------------------------------------------------- extractors
def extract_types(program, factory):
"""Walk CMC_ObjectsContainer::resolve, recovering the type-dispatch ladder.
Per branch the engine emits, in order:
CALL operator==(typeStr, "NAME") -> script type name
CALL operator_new(SIZE) -> object allocation size
CALL <ctor>(...) -> the next call IS the constructor
The constructor is detected *structurally* (first CALL after operator_new), not by name:
several ctors are unnamed FUN_xxxx (e.g. CMC_Text, CMC_Movie) and name-gating dropped them.
"""
listing = program.getListing()
types = []
recent = []
branch = [] # instructions since the current branch's operator== match
pending_name = None
pending_size = None
armed = False # set by operator_new; the next CALL is the object's constructor
it = listing.getInstructions(factory.getBody(), True)
while it.hasNext():
instr = it.next()
if instr.getMnemonicString() == "CALL":
f = call_target(program, instr)
tname = f.getName() if f is not None else None
if tname == OP_EQ:
# Tag-based factory (Piklib 6.1/7.1's CMC_Scene::resolve): a branch is
# `operator==(NAME) -> new(SIZE) -> store tag -> jmp`, with the ctor in a
# separate tag switch, so no inline ctor ever fires `elif armed`. If we reach
# the *next* operator== still armed, record the pending type by name (size known,
# ctor not). The inline-ctor factory (8.x) clears `armed` first, so it's untouched.
if armed and pending_name is not None:
types.append({
"script_name": pending_name,
"cpp_class": None,
"ctor_addr": None,
"object_size": pending_size,
"dispatch_addr": None,
"via_module_iface": _branch_uses_field_load(branch),
})
pending_name = None
pending_size = None
armed = False
s = lookback_string(program, recent)
if s is not None:
pending_name = s
branch = []
elif tname == OP_NEW:
pending_size = lookback_scalar(recent)
armed = True
elif armed:
if pending_name is not None and f is not None:
ctor = resolve_thunk(f)
cls = f.getName()
types.append({
"script_name": pending_name,
"cpp_class": cls if cls.startswith(CMC_PREFIX) else None,
"ctor_addr": "0x%x" % ctor.getEntryPoint().getOffset(),
"object_size": pending_size,
"dispatch_addr": "0x%x" % instr.getAddress().getOffset(),
"via_module_iface": _branch_uses_field_load(branch),
})
pending_name = None
pending_size = None
armed = False
recent.append(instr)
branch.append(instr)
if len(recent) > LOOKBACK:
recent.pop(0)
return types
def _owner_from_runner(runner_name):
"""CMC_Animo_Runner -> CMC_Animo ; CMC_Runner -> CMC (the base that holds global methods)."""
if runner_name.endswith("_Runner"):
return runner_name[:-len("_Runner")]
return runner_name
def _is_method_name(s):
return bool(s) and all(c.isalnum() or c == "_" for c in s)
def _extract_methods_from(program, runner_func):
"""Walk one CMC_*_Runner::prepareMthHashSet, recovering its directly-registered methods.
Per method the engine emits: new CInteger(ID) ; new CStringHashCode("NAME") ; CHashtable::put.
The method name is *loaded* (MOV), not pushed, so we track the most recent string literal
referenced by any instruction rather than scanning PUSH operands. The leading call to the
base class's prepareMthHashSet gives the inheritance link (inherited/global methods)."""
listing = program.getListing()
methods = []
base_runner = None
last_string = None
last_id = None
pending_name = None
recent = []
ns = runner_func.getParentNamespace()
runner = ns.getName() if ns is not None else "?"
owner = _owner_from_runner(runner)
it = listing.getInstructions(resolve_thunk(runner_func).getBody(), True)
while it.hasNext():
instr = it.next()
s = _imm_string(program, instr)
if s is not None and _is_method_name(s):
last_string = s
if instr.getMnemonicString() == "CALL":
cf = call_target(program, instr)
cname = cf.getName() if cf is not None else None
cns = cf.getParentNamespace() if cf is not None else None
cns_name = cns.getName() if cns is not None else None
if cname == "prepareMthHashSet":
if base_runner is None:
base_runner = cns_name
elif cname == "CInteger":
last_id = lookback_scalar(recent)
elif cname == "CStringHashCode":
pending_name = last_string
elif cname == "put" and cns_name == "CHashtable":
if pending_name is not None:
methods.append({"owner": owner, "runner": runner,
"name": pending_name, "id": last_id})
pending_name = None
recent.append(instr)
if len(recent) > LOOKBACK:
recent.pop(0)
return methods, runner, base_runner
def extract_methods(program):
"""Returns (methods, inheritance). `methods` lists each Runner's directly-registered
methods; `inheritance` maps each runner to the base runner it chains to, so the consumer
can compose the full (incl. global) method set per type. Method id -> vtable address
correlation is a later step."""
fm = program.getFunctionManager()
methods = []
inheritance = []
it = fm.getFunctions(True)
while it.hasNext():
f = it.next()
if f.getName() != "prepareMthHashSet":
continue
own, runner, base = _extract_methods_from(program, f)
methods.extend(own)
if base is not None:
inheritance.append({"runner": runner, "base_runner": base})
return methods, inheritance
_VTBL_OFF = re.compile(r"\[\w+ \+ (0x[0-9a-fA-F]+)\]")
_MEM_OFF = re.compile(r"\[\w+ \+ (0x[0-9a-fA-F]+)\]")
def _is_generic_name(name):
"""A compiler-assigned placeholder, not a real symbol."""
return (not name) or name.startswith("FUN_") or name.startswith("thunk_") or name.startswith("LAB_")
def _executable(program, addr):
"""True if `addr` lives in an executable memory block - a sanity gate so a switch shape we
don't model (or a table over-read) can never emit a non-code 'case' as a method."""
blk = program.getMemory().getBlock(addr)
return blk is not None and blk.isExecute()
def _qualified(f):
if f is None:
return None
ns = f.getParentNamespace()
nm = f.getName()
return (ns.getName() + "::" + nm) if (ns is not None and ns.getName() != "Global") else nm
def _call_anchor(program, instr):
"""A normalisable, compiler-tolerant fingerprint of one CALL inside a switch case.
Direct call -> "Namespace::name". On MSVC8 the symbol sits on the ILT *stub* while the body
is an unnamed FUN_, so we keep the stub's name and only fall back to the thunk-resolved body
when the direct name is itself a placeholder. Indirect virtual call -> "vtbl+0xNN" from the
displacement, which abstracts away the register holding `this`."""
cf = call_target(program, instr)
if cf is not None:
if not _is_generic_name(cf.getName()):
return _qualified(cf)
resolved = resolve_thunk(cf)
if resolved is not None and not _is_generic_name(resolved.getName()):
return _qualified(resolved)
return _qualified(cf)
m = _VTBL_OFF.search(instr.toString())
if m is not None:
return "vtbl+" + m.group(1)
return None
def _walk_calls(program, start_addr, stops, limit=80):
"""Walk a straight-line block from `start_addr`, returning (anchors, funcs):
* `anchors` - ordered CALL fingerprints (see `_call_anchor`), additionally recovering the
`MOV reg,[base+0xNN]` / `CALL reg` virtual-call idiom (MSVC8) as `vtbl+0xNN`, so it matches
the `CALL [reg+0xNN]` form (MSVC6).
* `funcs` - the (anchor, entry) of each *direct* call to a real function, used to detect a
thin wrapper case that just forwards to a named/unnamed submethod.
Stops at a RET, an unconditional jump, a `stops` address, or after `limit` instructions."""
listing = program.getListing()
instr = listing.getInstructionAt(start_addr)
anchors = []
funcs = []
regoff = {} # register -> vtable offset most recently loaded into it
n = 0
while instr is not None and n < limit:
if n > 0 and instr.getAddress() in stops:
break
n += 1
mn = instr.getMnemonicString()
if mn == "MOV" and instr.getNumOperands() >= 2:
dst = instr.getDefaultOperandRepresentation(0)
m = _MEM_OFF.search(instr.toString())
if m is not None:
regoff[dst] = m.group(1)
else:
regoff.pop(dst, None)
elif mn == "CALL":
a = _call_anchor(program, instr)
if a is None: # CALL reg -> use the offset last loaded into that register
op0 = instr.getDefaultOperandRepresentation(0)
if op0 in regoff:
a = "vtbl+" + regoff[op0]
if a is not None:
anchors.append(a)
cf = call_target(program, instr)
if cf is not None:
body = resolve_thunk(cf)
if body is not None:
funcs.append((a, body.getEntryPoint()))
ft = instr.getFlowType()
if ft.isTerminal() or (ft.isJump() and not ft.isConditional()):
break
instr = instr.getNext()
return anchors, funcs
_SWITCH_JMP = re.compile(r"\[(\w+)\*0x4 \+ (0x[0-9a-fA-F]+)\]")
_LEA_DISP = re.compile(r"\[\w+ \+ (-?0x[0-9a-fA-F]+)\]")
# A two-level switch's byte-index-table load: `byte ptr [<boundsreg> + <tableaddr>]`. The 5+ hex
# digits distinguish a table address (0x100xxxxx) from a small struct-field offset.
_BYTE_TABLE = re.compile(r"byte ptr \[(\w+) \+ (0x[0-9a-fA-F]{5,})\]")
def _lea_disp(instr):
"""Signed displacement of a `LEA reg,[base + disp]`, parsed from text when getScalar misses."""
m = _LEA_DISP.search(instr.toString())
return int(m.group(1), 16) if m is not None else None
def _s32(v):
"""Interpret a scalar as 32-bit signed. Ghidra's getScalar() hands back the raw unsigned
immediate, so e.g. `ADD idx, 0xFFFFFEFF` (really -257, a switch starting at id 257) must be
sign-extended or the recovered ids overflow to nonsense."""
v = int(v) & 0xffffffff
return v - 0x100000000 if v >= 0x80000000 else v
def _parse_switch(program, func):
"""Recover the dense jump-table switch of a `run` function at the disassembly level
(decompiler-independent, so it survives the big inline-heavy runners). Both MSVC6 and
MSVC8 emit the same shape:
LEA idx,[reg - base] ; CMP idx, range ; JA default ; JMP [idx*4 + TABLE]
Returns {table, base, count, default} or None. `id = table_index + base`; `count = range + 1`.
`default` is the out-of-range target of the `JA default` bounds check - the jump table also
routes its *holes* (ids the runner doesn't implement) there, so cases pointing at it are not
real methods and must be dropped."""
listing = program.getListing()
space = program.getAddressFactory().getDefaultAddressSpace()
instrs = []
it = listing.getInstructions(func.getBody(), True)
while it.hasNext():
instrs.append(it.next())
idx_reg = table = jmp_addr = default = None
for instr in instrs:
ft = instr.getFlowType()
if ft.isJump() and ft.isConditional(): # the `JA default` bounds check (last one wins)
flows = instr.getFlows()
if len(flows) > 0:
default = flows[0]
if instr.getMnemonicString() == "JMP" and ft.isComputed():
m = _SWITCH_JMP.search(instr.toString())
if m is not None:
idx_reg = m.group(1)
table = space.getAddress(int(m.group(2), 16))
jmp_addr = instr.getAddress()
break
if table is None:
return None
# Two-level switch: the index is itself looked up in a byte table - `MOVZX r, byte[i+bt]`
# (MSVC8) or `XOR r,r; MOV rl, byte[i+bt]` (MSVC6). Targets are ptrTable[byteTable[i]]. The
# *bounds* register (the `i` indexing the byte table) is what LEA/CMP constrain - which on
# MSVC6 differs from the JMP's index register - so recover it from the byte-table load.
byte_table = None
bounds_reg = idx_reg
for instr in instrs:
if instr.getAddress().equals(jmp_addr):
break
mbt = _BYTE_TABLE.search(instr.toString())
if mbt is not None:
bounds_reg = mbt.group(1)
byte_table = space.getAddress(int(mbt.group(2), 16))
break
base = 0
count = None
for instr in instrs:
if instr.getAddress().equals(jmp_addr):
break
if instr.getNumOperands() == 0 or instr.getDefaultOperandRepresentation(0) != bounds_reg:
continue
mn = instr.getMnemonicString()
s = instr.getScalar(1)
if mn == "CMP" and s is not None:
count = _s32(s.getValue()) + 1
elif mn == "LEA": # LEA idx,[reg - k] -> id = index + k
raw = s.getValue() if s is not None else _lea_disp(instr)
if raw is not None: # _s32 also fixes the text path: Ghidra prints a
base = -_s32(raw) # big displacement unsigned ("0xfffffeff" = -257)
elif mn == "SUB" and s is not None:
base = _s32(s.getValue())
elif mn == "ADD" and s is not None:
base = -_s32(s.getValue())
elif mn == "DEC":
base = 1
return {"table": table, "base": base, "count": count, "default": default,
"byte_table": byte_table}
def extract_method_dispatch(program):
"""For each CMC_*_Runner::run, recover how method ids map to their implementation.
`run(int id, ...)` is a `switch(id)` (vtable slot 17, overridden per runner) whose every
`case id:` is the method body - either a tail-call to a named submethod (BlooMoo/MSVC8
keeps show()/load()/... as separate functions) or inline code whose leaves are virtual
calls on the wrapped object (Piklib/MSVC6). We fingerprint each case by its ordered CALL
anchors, so a later pass can diff method *bodies* by (owner, id). Join names via `methods`."""
fm = program.getFunctionManager()
out = []
it = fm.getFunctions(True)
while it.hasNext():
f = it.next()
if f.getName() != "run":
continue
ns = f.getParentNamespace()
runner = ns.getName() if ns is not None else "?"
if not runner.endswith("_Runner"):
continue
try:
out.extend(_dispatch_from_run(program, f, _owner_from_runner(runner), runner))
except Exception as e: # one malformed runner shouldn't sink the whole axis
print("[!] method_dispatch %s: %s" % (runner, e))
return out
def _dispatch_from_run(program, run_func, owner, runner):
run_func = resolve_thunk(run_func)
sw = _parse_switch(program, run_func)
if sw is None:
return []
count = sw["count"]
if count is None or count < 1 or count > 4096:
return []
mem = program.getMemory()
space = program.getAddressFactory().getDefaultAddressSpace()
ptr = sw["table"]
byte_table = sw.get("byte_table")
targets = []
for i in range(count):
try:
# Two-level switch: index the pointer table through the byte index table.
slot = (mem.getByte(byte_table.add(i)) & 0xff) if byte_table is not None else i
val = mem.getInt(ptr.add(slot * 4)) & 0xffffffff
except Exception:
break
targets.append(space.getAddress(val))
stops = set(t for t in targets if _executable(program, t))
default = sw.get("default")
rows = []
for i in range(len(targets)):
mid = i + sw["base"]
if mid < 0 or mid > 0xffff:
continue # nonsensical id -> the switch base wasn't recovered cleanly; don't emit garbage
if not _executable(program, targets[i]):
continue # target isn't code (unsupported switch shape / over-read) -> skip, never emit garbage
if default is not None and targets[i].equals(default):
continue # a switch hole (unimplemented id) routed to the base-runner default
anchors, funcs = _walk_calls(program, targets[i], stops)
# A thin wrapper case forwards to one submethod: the real body (and its leaf anchors)
# live in that function. Expanding one level makes MSVC8 (separate show()/load()) line up
# with MSVC6 (inline), so `calls` is a compiler-tolerant body fingerprint.
if len(anchors) == 1 and len(funcs) == 1 and funcs[0][0] == anchors[0]:
impl = funcs[0][0]
impl_entry = funcs[0][1]
impl_addr = "0x%x" % impl_entry.getOffset()
calls, _ = _walk_calls(program, impl_entry, set())
else:
impl = None
impl_addr = "0x%x" % targets[i].getOffset() # body is inline in the case block
calls = anchors
rows.append({
"owner": owner, "runner": runner, "id": mid,
"case_addr": "0x%x" % targets[i].getOffset(),
"impl": impl, "impl_addr": impl_addr, "calls": calls,
})
return rows
def extract_events(program):
"""Per CMC_*::getBehavioursList, collect the ordered event-name literals (ONINIT, ONDONE, ...).
The function builds a flat CXString[] with each name inlined; there is no base-class call, so
each class's list is self-contained (no inheritance chain, unlike methods). Each literal is
loaded twice per entry (strlen + memcpy), so consecutive duplicates are collapsed."""
fm = program.getFunctionManager()
listing = program.getListing()
events = []
it = fm.getFunctions(True)
while it.hasNext():
f = it.next()
if f.getName() != "getBehavioursList":
continue
ns = f.getParentNamespace()
owner = ns.getName() if ns is not None else "?"
order = 0
last = None
ins = listing.getInstructions(resolve_thunk(f).getBody(), True)
while ins.hasNext():
s = _imm_string(program, ins.next())
if s is not None and _is_method_name(s) and s != last:
events.append({"owner": owner, "name": s, "order": order})
order += 1
last = s
return events
def _is_cmc_ctor(func):
ns = func.getParentNamespace()
return ns is not None and func.getName() == ns.getName() and func.getName().startswith(CMC_PREFIX)
def _this_varnodes(high):
"""Varnodes that represent the `this` parameter (parameter category index 0)."""
out = set()
syms = high.getLocalSymbolMap().getSymbols()
while syms.hasNext():
s = syms.next()
if s.isParameter() and s.getCategoryIndex() == 0:
hv = s.getHighVariable()
if hv is not None:
for vn in hv.getInstances():
out.add(vn)
return out
def _trace_this_offset(vn, this_vns, depth=0):
"""If `vn` is `this + constant`, return the constant; else None. Walks the def chain through
the address arithmetic the decompiler emits (this abstracts away which register held `this`)."""
if vn is None or depth > 12:
return None
if vn in this_vns:
return 0
d = vn.getDef()
if d is None:
return None
op = d.getOpcode()
ins = d.getInputs()
if op in (PcodeOp.INT_ADD, PcodeOp.PTRADD):
a, b = ins[0], ins[1]
if b.isConstant():
base = _trace_this_offset(a, this_vns, depth + 1)
if base is not None:
step = ins[2].getOffset() if (op == PcodeOp.PTRADD and len(ins) > 2) else 1
return base + b.getOffset() * step
if a.isConstant():
base = _trace_this_offset(b, this_vns, depth + 1)
if base is not None:
return base + a.getOffset()
elif op == PcodeOp.PTRSUB:
b = ins[1]
if b.isConstant():
base = _trace_this_offset(ins[0], this_vns, depth + 1)
if base is not None:
return base + b.getOffset()
elif op in (PcodeOp.COPY, PcodeOp.CAST, PcodeOp.INT_ZEXT, PcodeOp.INT_SEXT):
return _trace_this_offset(ins[0], this_vns, depth + 1)
return None
def _is_vtable_value(program, vn):
"""True if the stored value is a constant pointer to a *vftable* symbol."""
if not vn.isConstant():
return False
try:
a = program.getAddressFactory().getDefaultAddressSpace().getAddress(vn.getOffset())
except: # noqa: E722
return False
sym = program.getSymbolTable().getPrimarySymbol(a)
return sym is not None and "vftable" in sym.getName().lower()
def _base_ctor(program, ctor):
"""Direct base class = the first CMC_* constructor this ctor calls (conventionally first)."""
listing = program.getListing()
it = listing.getInstructions(resolve_thunk(ctor).getBody(), True)
while it.hasNext():
instr = it.next()
if instr.getMnemonicString() == "CALL":
cf = call_target(program, instr)
if cf is not None and _is_cmc_ctor(cf) and cf.getName() != ctor.getName():
return cf.getName()
return None
def _extract_fields_from(program, ifc, ctor):
"""Recover this-relative STORE offsets from one CMC_* constructor via decompiler P-code."""
res = ifc.decompileFunction(ctor, 60, monitor)
if res is None or not res.decompileCompleted():
return [], None
high = res.getHighFunction()
if high is None:
return [], None
this_vns = _this_varnodes(high)
if not this_vns:
return [], None
owner = ctor.getParentNamespace().getName()
fields = {}
ops = high.getPcodeOps()
while ops.hasNext():
op = ops.next()
if op.getOpcode() != PcodeOp.STORE:
continue
off = _trace_this_offset(op.getInput(1), this_vns)
if off is None or off < 0:
continue
val = op.getInput(2)
rec = {"owner": owner, "offset": off, "size": val.getSize(),
"is_vtable": _is_vtable_value(program, val), "confidence": "high"}
cur = fields.get(off)
if cur is None or rec["size"] > cur["size"]:
fields[off] = rec
return list(fields.values()), _base_ctor(program, ctor)
_PROP_TYPE = [("Bool", "bool"), ("Int", "int"), ("Double", "double"), ("Float", "double"),
("List", "list"), ("Point", "point"), ("Size", "size"), ("Rect", "rect")]
def _prop_type(getter):
"""Map a CMElement::getProperty<T>Value getter name to the script field type."""
for needle, ty in _PROP_TYPE:
if needle in getter:
return ty
return "string" # plain getPropertyValue
def extract_script_fields(program):
"""The script-visible named fields each type exposes (FILENAME, FPS, PRELOAD, VISIBLE, ...).
The CMC_* constructor reads them from the element: it builds the property-name literal and
calls CMElement::getProperty<T>Value(...). We anchor on that getter call, take the preceding
string literal as the field name, and derive the field type from the getter. Returns a flat
list deduped per (owner, name)."""
fm = program.getFunctionManager()
listing = program.getListing()
by_key = {} # (owner, name) -> record
next_order = {} # owner -> next order index
it = fm.getFunctions(True)
while it.hasNext():
f = it.next()
if not _is_cmc_ctor(f):
continue
owner = f.getParentNamespace().getName()
last_string = None
ins = listing.getInstructions(resolve_thunk(f).getBody(), True)
while ins.hasNext():
instr = ins.next()
s = _imm_string(program, instr)
if s is not None and _is_method_name(s):
last_string = s
if instr.getMnemonicString() == "CALL":
cf = call_target(program, instr)
cname = cf.getName() if cf is not None else None
if cname is not None and cname.startswith("getProperty") and last_string is not None:
key = (owner, last_string)
if key not in by_key:
o = next_order.get(owner, 0)
by_key[key] = {"owner": owner, "name": last_string,
"type": _prop_type(cname), "order": o}
next_order[owner] = o + 1
last_string = None
return sorted(by_key.values(), key=lambda r: (r["owner"], r["order"]))
def extract_struct_layout(program):
"""Bonus/fuzzy axis: per CMC_* constructor, recover the this-relative C++ field stores
(offset, size, is_vtable) and the base-class link. Offsets come from decompiler P-code, so
they are normalised across compilers. Returns (layout, field_inheritance)."""
fm = program.getFunctionManager()
ifc = DecompInterface()
ifc.openProgram(program)
try:
by_key = {} # (owner, offset) -> field record, deduped across overloaded ctors
inheritance = {} # class -> base_class
it = fm.getFunctions(True)
while it.hasNext():
f = it.next()
if not _is_cmc_ctor(f):
continue
own, base = _extract_fields_from(program, ifc, f)
for rec in own:
key = (rec["owner"], rec["offset"])
cur = by_key.get(key)
if cur is None or rec["size"] > cur["size"]:
by_key[key] = rec
if base is not None and f.getName() not in inheritance:
inheritance[f.getName()] = base
fields = sorted(by_key.values(), key=lambda r: (r["owner"], r["offset"]))
field_inheritance = [{"class": k, "base_class": v} for k, v in inheritance.items()]
return fields, field_inheritance
finally:
ifc.dispose()
# --------------------------------------------------------------------------- metadata
def detect_engine(factory):
"""(engine, compiler) from the factory's enclosing namespace. Compiler is heuristic."""
ns = factory.getParentNamespace()
parent = ns.getParentNamespace() if ns is not None else None
if parent is not None and parent.getName() == "BlooMooDLL":
return ("BlooMoo", "MSVC8")
return ("Piklib", "MSVC6")
def default_out_path(program):
"""Write into <repo>/snapshots/ (derived from this script's own location),
falling back to the OS temp dir if the layout is unexpected or unwritable."""
name = program.getName() + ".snapshot.json"
try:
scripts_dir = os.path.dirname(getSourceFile().getAbsolutePath()) # .../ghidra_scripts
out_dir = os.path.join(os.path.dirname(scripts_dir), "snapshots")
if not os.path.isdir(out_dir):
os.makedirs(out_dir)
return os.path.join(out_dir, name)
except Exception:
return os.path.join(tempfile.gettempdir(), name)
def sha256_of(program):
try:
path = program.getExecutablePath()
fh = open(path, "rb")
try:
return hashlib.sha256(fh.read()).hexdigest()
finally:
fh.close()
except Exception:
return None
# --------------------------------------------------------------------------- main
def find_factory(program):
"""The type-dispatch factory (the operator==("NAME") -> new -> ctor ladder). Its home class
moved across versions: the script factory lived on CMC_Scene in early text-script Piklib
(6.1 / 7.1), then was hoisted into CMC_ObjectsContainer from Piklib 8.x onward (and BlooMoo)."""
for class_name, method_name in (
("CMC_ObjectsContainer", "resolve"), # Piklib 8.x / BlooMoo
("CMC_Scene", "resolve"), # Piklib 6.1 / 7.1
):
f = find_function_by_qualified(program, class_name, method_name)
if f is not None:
return f
return None
def run():
program = currentProgram # GhidraScript/pyghidra inject this global, not `program`
factory = find_factory(program)
if factory is None:
print("[!] factory not found (CMC_ObjectsContainer::resolve / CMC_Scene::resolve)"
" - is this a Piklib/BlooMoo DLL with text-script support? (added in Piklib 6.1)")
return
engine, compiler = detect_engine(factory) # namespace lives on the symbol/stub
factory = resolve_thunk(factory) # MSVC8 exposes the symbol as a JMP stub
types = extract_types(program, factory)
methods, method_inheritance = extract_methods(program)
events = extract_events(program)
fields = extract_script_fields(program)
struct_layout, field_inheritance = extract_struct_layout(program)
method_dispatch = extract_method_dispatch(program)
snapshot = {
"schema_version": 4,
"binary": {
"name": program.getName(),
"sha256": sha256_of(program),
"engine": engine,
"compiler": compiler,
"factory_addr": "0x%x" % factory.getEntryPoint().getOffset(),
},
"types": types,
"methods": methods,
"method_inheritance": method_inheritance,
"events": events,
"fields": fields,
"field_inheritance": field_inheritance,
"struct_layout": struct_layout,
"method_dispatch": method_dispatch,
}
args = getScriptArgs()
out_path = args[0] if len(args) > 0 else default_out_path(program)
fh = open(out_path, "w")
try:
fh.write(json.dumps(snapshot, indent=2, sort_keys=True))
finally:
fh.close()
print("[+] %s [%s/%s]: %d types, %d methods, %d events, %d fields (%d layout, %d dispatch) -> %s" % (
program.getName(), engine, compiler, len(types), len(methods),
len(events), len(fields), len(struct_layout), len(method_dispatch), out_path))
run()