"""Async-acquisition layer: the RQ task body and the upload/enqueue endpoint. Redis, RQ and Ghidra are all stubbed, so this runs in the plain dev venv.""" from __future__ import annotations from types import SimpleNamespace import pytest from ams.api.db import configure, get_session, init_db from ams.api.models import Job def _make_job(tmp_path, **kw) -> int: configure("sqlite:///{0}/w.db".format(tmp_path)) init_db() db = get_session() try: job = Job(source_name="game.zip", source_path=str(tmp_path / "game.zip"), game_name=kw.get("game"), status="queued") db.add(job) db.commit() return job.id finally: db.close() # --- task body -------------------------------------------------------------------------------- def test_run_acquisition_success(tmp_path, monkeypatch): job_id = _make_job(tmp_path, game="Reksio i UFO") fake = SimpleNamespace(imported_id=42, engine="Piklib", dll=str(tmp_path / "PIKLIB8.dll")) monkeypatch.setattr("ams.worker.tasks.acquire", lambda *a, **k: fake) from ams.worker.tasks import run_acquisition out = run_acquisition(job_id, str(tmp_path / "game.zip"), "Reksio i UFO", database_url="sqlite:///{0}/w.db".format(tmp_path)) assert out["snapshot_id"] == 42 db = get_session() try: job = db.get(Job, job_id) assert job.status == "finished" assert job.snapshot_id == 42 and job.dll_name == "PIKLIB8.dll" assert job.error is None finally: db.close() def test_run_acquisition_failure_records_error(tmp_path, monkeypatch): job_id = _make_job(tmp_path) def boom(*a, **k): raise RuntimeError("no engine DLL found") monkeypatch.setattr("ams.worker.tasks.acquire", boom) from ams.worker.tasks import run_acquisition with pytest.raises(RuntimeError): run_acquisition(job_id, str(tmp_path / "game.zip"), database_url="sqlite:///{0}/w.db".format(tmp_path)) db = get_session() try: job = db.get(Job, job_id) assert job.status == "failed" assert "no engine DLL found" in job.error finally: db.close() # --- upload + enqueue endpoint ---------------------------------------------------------------- @pytest.fixture() def client(tmp_path, monkeypatch): pytest.importorskip("multipart") # python-multipart, needed for file uploads from fastapi.testclient import TestClient from ams.api.app import create_app from ams.api.routes import jobs monkeypatch.setattr(jobs, "UPLOAD_DIR", str(tmp_path / "uploads")) app = create_app(database_url="sqlite:///{0}/api.db".format(tmp_path)) return TestClient(app) def test_create_job_enqueues(client, monkeypatch): calls = {} def fake_enqueue(path, game, job_id): calls["path"], calls["game"], calls["job_id"] = path, game, job_id return "rq-deadbeef" monkeypatch.setattr("ams.worker.queue.enqueue_acquisition", fake_enqueue) r = client.post("/jobs", files={"file": ("reksio.zip", b"PK\x03\x04 fake", "application/zip")}, data={"game": "Reksio i UFO"}) assert r.status_code == 202 body = r.json() assert body["status"] == "queued" and body["rq_id"] == "rq-deadbeef" assert body["game_name"] == "Reksio i UFO" and body["source_name"] == "reksio.zip" # the bytes were streamed to the shared upload dir and that path was enqueued assert calls["job_id"] == body["id"] import os assert os.path.isfile(calls["path"]) listed = client.get("/jobs").json() assert [j["id"] for j in listed] == [body["id"]] assert client.get("/jobs/{0}".format(body["id"])).json()["status"] == "queued" def test_create_job_enqueue_failure_returns_503(client, monkeypatch): def boom(*a, **k): raise RuntimeError("redis down") monkeypatch.setattr("ams.worker.queue.enqueue_acquisition", boom) r = client.post("/jobs", files={"file": ("x.dll", b"MZ", "application/octet-stream")}) assert r.status_code == 503 assert "redis down" in r.json()["detail"]