"""Business logic shared by the HTTP routes and the bulk importer.""" from __future__ import annotations import hashlib import json from typing import Any from sqlalchemy import select from sqlalchemy.orm import Session from . import models def _content_sha(data: dict[str, Any]) -> str: return hashlib.sha256(json.dumps(data, sort_keys=True).encode("utf-8")).hexdigest() def looks_like_snapshot(data: Any) -> bool: return isinstance(data, dict) and "binary" in data and "types" in data def _apply_metadata(snap: models.Snapshot, data: dict[str, Any]) -> None: binary = data.get("binary", {}) snap.binary_name = binary.get("name", "?") snap.engine = binary.get("engine") snap.compiler = binary.get("compiler") snap.schema_version = data.get("schema_version") snap.n_types = len(data.get("types", [])) snap.n_methods = len(data.get("methods", [])) snap.n_events = len(data.get("events", [])) snap.n_fields = len(data.get("fields", [])) snap.data = data def _get_or_create_game(db: Session, name: str) -> models.Game: game = db.scalar(select(models.Game).where(models.Game.name == name)) if game is None: game = models.Game(name=name) db.add(game) db.flush() return game def similar_snapshots( db: Session, snapshot_id: int, minimum: int = 0 ) -> list[tuple[models.Snapshot, dict]]: """Rank every other catalogued snapshot against #snapshot_id by surface similarity. Returns (snapshot, score) pairs (score = ams.similarity report) sorted by `overall` desc, dropping anything below `minimum`. Returns None if the target doesn't exist.""" from ..similarity import similarity from ..snapshot import Snapshot as Surface target = db.get(models.Snapshot, snapshot_id) if target is None: return None t_surface = Surface(target.data) hits: list[tuple[models.Snapshot, dict]] = [] for other in db.scalars(select(models.Snapshot).where(models.Snapshot.id != snapshot_id)): score = similarity(t_surface, Surface(other.data)) if score["overall"] >= minimum: hits.append((other, score)) hits.sort(key=lambda pair: pair[1]["overall"], reverse=True) return hits def import_snapshot(db: Session, data: dict[str, Any], game_name: str | None = None) -> models.Snapshot: """Upsert a snapshot, deduped by the binary's sha256 (falling back to a content hash).""" sha = data.get("binary", {}).get("sha256") or _content_sha(data) snap = db.scalar(select(models.Snapshot).where(models.Snapshot.sha256 == sha)) game = _get_or_create_game(db, game_name) if game_name else None if snap is None: snap = models.Snapshot(sha256=sha) db.add(snap) _apply_metadata(snap, data) if game is not None: snap.game_id = game.id db.commit() db.refresh(snap) return snap