From f4aa7caaa9a098d3b3d8d8467348d26ffd37d5d2 Mon Sep 17 00:00:00 2001 From: Patryk Gensch <43010113+patryk025@users.noreply.github.com> Date: Sun, 31 May 2026 12:24:47 +0200 Subject: [PATCH] Containerise: Postgres + Redis/RQ + API + Ghidra worker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Brings up the documented target architecture as a docker-compose stack — a modular monolith with the Ghidra step split into its own async worker. - worker/: RQ queue (lazy redis import) + run_acquisition task (Job status queued→started→finished/failed, drives ams.acquire with sink=db) - Job model + JobOut schema; Snapshot.data is JSONB on Postgres - POST/GET /jobs: stream an upload to a shared volume, enqueue, poll status - docker/api.Dockerfile (slim) + docker/worker.Dockerfile (JDK21 + Ghidra fetched at build, overridable via GHIDRA_URL) + docker-compose.yml - ghidra.py: AMS_GHIDRA_SCRIPTS override for in-container script path - pyproject: [worker] extra (rq/redis/psycopg), python-multipart in [api] - tests: 4 new (task success/failure + endpoint enqueue/503) -> 22/22 Verified: API image builds, container serves /health + /ui + /jobs; compose config validates. Worker image (downloads ~1 GB Ghidra) not built here. Co-Authored-By: Claude Opus 4.8 --- .dockerignore | 15 +++++ README.md | 27 +++++++++ ams/acquire/ghidra.py | 2 +- ams/api/app.py | 3 +- ams/api/models.py | 33 ++++++++++- ams/api/routes/jobs.py | 70 +++++++++++++++++++++++ ams/api/schemas.py | 14 +++++ ams/worker/__init__.py | 1 + ams/worker/queue.py | 44 +++++++++++++++ ams/worker/tasks.py | 55 ++++++++++++++++++ docker-compose.yml | 72 ++++++++++++++++++++++++ docker/api.Dockerfile | 19 +++++++ docker/worker.Dockerfile | 35 ++++++++++++ pyproject.toml | 6 ++ tests/test_worker.py | 118 +++++++++++++++++++++++++++++++++++++++ 15 files changed, 511 insertions(+), 3 deletions(-) create mode 100644 .dockerignore create mode 100644 ams/api/routes/jobs.py create mode 100644 ams/worker/__init__.py create mode 100644 ams/worker/queue.py create mode 100644 ams/worker/tasks.py create mode 100644 docker-compose.yml create mode 100644 docker/api.Dockerfile create mode 100644 docker/worker.Dockerfile create mode 100644 tests/test_worker.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..2f15c28 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,15 @@ +.git +.venv +__pycache__ +*.pyc +*.db +.pytest_cache +.DS_Store +# game binaries / archives never belong in an image +*.dll +*.exe +*.iso +*.zip +*.gpr +*.rep +uploads/ diff --git a/README.md b/README.md index 5afcc51..0a42389 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,33 @@ analyzeHeadless -process PIKLIB8.dll \ -postScript extract_engine_surface.py "$(pwd)/snapshots/PIKLIB8.snapshot.json" ``` +## Uruchomienie w kontenerach (docker compose) + +Pełny stack — modularny monolit + **wydzielony worker Ghidry** — czterema usługami: + +``` +db (Postgres) ── api (FastAPI + UI) ──┐ + ├── redis (kolejka) +worker (Ghidra headless) ─────────────┘ +``` + +```bash +docker compose up --build # api na http://localhost:8000 +``` + +`api` i `worker` współdzielą wolumen `uploads`: API streamuje wgrane archiwum na dysk, +worker czyta je po ścieżce (przez Redisa leci tylko ścieżka, nie bajty). Obraz workera +**pobiera Ghidrę (~1 GB) przy pierwszym buildzie** — wersję nadpiszesz przez +`--build-arg GHIDRA_URL=…`. Postgres trzyma katalog trwale (wolumen `pgdata`). + +Asynchroniczna akwizycja przez API (zlecenie → kolejka → worker → snapshot w bazie): + +```bash +curl -F file=@game.iso -F game="Reksio i UFO" http://localhost:8000/jobs # → 202 {id, status:queued} +curl http://localhost:8000/jobs/1 # poll: queued→started→finished +``` +Endpointy joba: `POST /jobs` (upload+enqueue), `GET /jobs`, `GET /jobs/{id}` (status, `snapshot_id`, `error`). + ## Akwizycja — ISO/ZIP → katalog Worker, który domyka łańcuch od *pliku gry* do wpisu w katalogu: rozpakowuje archiwum, diff --git a/ams/acquire/ghidra.py b/ams/acquire/ghidra.py index acf6d49..ece2eb6 100644 --- a/ams/acquire/ghidra.py +++ b/ams/acquire/ghidra.py @@ -63,7 +63,7 @@ def run_extractor( raise GhidraNotFound( "analyzeHeadless not found — set $GHIDRA_HEADLESS or $GHIDRA_HOME (Ghidra's install dir)") - script_dir = script_dir or str(_SCRIPT_DIR) + script_dir = script_dir or os.environ.get("AMS_GHIDRA_SCRIPTS") or str(_SCRIPT_DIR) out_path = os.path.abspath(out_path) os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) diff --git a/ams/api/app.py b/ams/api/app.py index ee5ac64..0f9a601 100644 --- a/ams/api/app.py +++ b/ams/api/app.py @@ -15,7 +15,7 @@ from fastapi.staticfiles import StaticFiles from .. import __version__ from .db import configure, init_db -from .routes import diff, games, snapshots +from .routes import diff, games, jobs, snapshots _STATIC = Path(__file__).parent / "static" @@ -28,6 +28,7 @@ def create_app(database_url: str | None = None) -> FastAPI: app.include_router(games.router) app.include_router(snapshots.router) app.include_router(diff.router) + app.include_router(jobs.router) @app.get("/health", tags=["meta"]) def health() -> dict[str, str]: diff --git a/ams/api/models.py b/ams/api/models.py index dd50b18..8077b17 100644 --- a/ams/api/models.py +++ b/ams/api/models.py @@ -7,10 +7,14 @@ from __future__ import annotations from datetime import datetime, timezone from sqlalchemy import ForeignKey, JSON, String, UniqueConstraint +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column, relationship from .db import Base +# JSONB on Postgres (indexable, typed), plain JSON elsewhere (e.g. SQLite in dev/tests). +_JSON = JSON().with_variant(JSONB, "postgresql") + def _utcnow() -> datetime: return datetime.now(timezone.utc) @@ -46,6 +50,33 @@ class Snapshot(Base): n_fields: Mapped[int] = mapped_column(default=0) created_at: Mapped[datetime] = mapped_column(default=_utcnow) - data: Mapped[dict] = mapped_column(JSON) + data: Mapped[dict] = mapped_column(_JSON) game: Mapped["Game | None"] = relationship(back_populates="snapshots") + + +class Job(Base): + """An acquisition job: an uploaded archive/DLL handed to the Ghidra worker. + + The API row is the durable source of truth (survives Redis); `rq_id` links to the + transient RQ job. The worker walks status queued → started → finished/failed and, + on success, points `snapshot_id` at the catalog entry it produced.""" + + __tablename__ = "jobs" + + id: Mapped[int] = mapped_column(primary_key=True) + rq_id: Mapped[str | None] = mapped_column(String, default=None, index=True) + status: Mapped[str] = mapped_column(String, default="queued", index=True) + + source_name: Mapped[str] = mapped_column(String) # original upload filename + source_path: Mapped[str] = mapped_column(String) # path on the shared volume + game_name: Mapped[str | None] = mapped_column(String, default=None) + + snapshot_id: Mapped[int | None] = mapped_column(ForeignKey("snapshots.id"), default=None) + dll_name: Mapped[str | None] = mapped_column(String, default=None) + error: Mapped[str | None] = mapped_column(String, default=None) + + created_at: Mapped[datetime] = mapped_column(default=_utcnow) + updated_at: Mapped[datetime] = mapped_column(default=_utcnow, onupdate=_utcnow) + + snapshot: Mapped["Snapshot | None"] = relationship() diff --git a/ams/api/routes/jobs.py b/ams/api/routes/jobs.py new file mode 100644 index 0000000..60d86c2 --- /dev/null +++ b/ams/api/routes/jobs.py @@ -0,0 +1,70 @@ +"""Acquisition jobs: upload a game archive/DLL, hand it to the Ghidra worker, poll status. + +The upload is streamed to a shared volume ($AMS_UPLOAD_DIR) that the worker container also +mounts; only the path travels through Redis, not the bytes.""" + +from __future__ import annotations + +import os +import shutil +import uuid + +from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile +from sqlalchemy import select +from sqlalchemy.orm import Session + +from .. import models, schemas +from ..db import get_db + +router = APIRouter(prefix="/jobs", tags=["jobs"]) + +UPLOAD_DIR = os.environ.get("AMS_UPLOAD_DIR", "./uploads") + + +def _save_upload(upload: UploadFile) -> tuple[str, str]: + """Stream the upload to the shared dir under a unique name; return (path, original_name).""" + os.makedirs(UPLOAD_DIR, exist_ok=True) + original = os.path.basename(upload.filename or "upload.bin") + dest = os.path.join(UPLOAD_DIR, "{0}_{1}".format(uuid.uuid4().hex[:8], original)) + with open(dest, "wb") as fh: + shutil.copyfileobj(upload.file, fh) + return dest, original + + +@router.post("", response_model=schemas.JobOut, status_code=202) +def create_job( + file: UploadFile = File(..., description="an ISO/ZIP archive or a loose engine DLL"), + game: str | None = Form(None, description="link the resulting snapshot to this game"), + db: Session = Depends(get_db), +) -> models.Job: + path, original = _save_upload(file) + job = models.Job(source_name=original, source_path=path, game_name=game, status="queued") + db.add(job) + db.commit() + db.refresh(job) + + try: + from ...worker.queue import enqueue_acquisition + + job.rq_id = enqueue_acquisition(path, game, job.id) + db.commit() + db.refresh(job) + except Exception as exc: # Redis/RQ down or missing — surface, don't leave a phantom job + job.status = "failed" + job.error = "enqueue failed: {0}".format(exc) + db.commit() + raise HTTPException(503, job.error) + return job + + +@router.get("", response_model=list[schemas.JobOut]) +def list_jobs(db: Session = Depends(get_db)) -> list[models.Job]: + return list(db.scalars(select(models.Job).order_by(models.Job.id.desc()))) + + +@router.get("/{job_id}", response_model=schemas.JobOut) +def get_job(job_id: int, db: Session = Depends(get_db)) -> models.Job: + job = db.get(models.Job, job_id) + if job is None: + raise HTTPException(404, "job not found") + return job diff --git a/ams/api/schemas.py b/ams/api/schemas.py index 8e39fac..c86c675 100644 --- a/ams/api/schemas.py +++ b/ams/api/schemas.py @@ -41,3 +41,17 @@ class SnapshotDetail(SnapshotOut): class GameDetail(GameOut): snapshots: list[SnapshotOut] = [] + + +class JobOut(BaseModel): + model_config = ConfigDict(from_attributes=True) + id: int + rq_id: str | None + status: str + source_name: str + game_name: str | None + snapshot_id: int | None + dll_name: str | None + error: str | None + created_at: datetime + updated_at: datetime diff --git a/ams/worker/__init__.py b/ams/worker/__init__.py new file mode 100644 index 0000000..ef724e5 --- /dev/null +++ b/ams/worker/__init__.py @@ -0,0 +1 @@ +"""Async acquisition worker: an RQ queue feeding a Ghidra-headless container.""" diff --git a/ams/worker/queue.py b/ams/worker/queue.py new file mode 100644 index 0000000..a0752b3 --- /dev/null +++ b/ams/worker/queue.py @@ -0,0 +1,44 @@ +"""RQ queue handle. Redis/RQ are imported lazily so the API (and the test suite) +can import this module without those packages installed — they're only needed when +something is actually enqueued or a worker runs.""" + +from __future__ import annotations + +import os + +QUEUE_NAME = "acquire" +DEFAULT_REDIS_URL = "redis://localhost:6379/0" + +TASK_PATH = "ams.worker.tasks.run_acquisition" # importable by the RQ worker process + + +def redis_url() -> str: + return os.environ.get("REDIS_URL", DEFAULT_REDIS_URL) + + +def get_queue(): + """Build an RQ Queue bound to Redis from $REDIS_URL (raises if rq/redis missing).""" + from redis import Redis # lazy + from rq import Queue + + return Queue(QUEUE_NAME, connection=Redis.from_url(redis_url())) + + +def enqueue_acquisition(source_path: str, game_name: str | None, job_id: int) -> str: + """Enqueue one acquisition and return the RQ job id. + + The DB Job row (`job_id`) is the durable record; the worker re-opens it to report + progress. We pass the API's DATABASE_URL through so the worker writes to the same DB.""" + q = get_queue() + rq_job = q.enqueue( + TASK_PATH, + kwargs={ + "job_id": job_id, + "source_path": source_path, + "game_name": game_name, + "database_url": os.environ.get("DATABASE_URL"), + }, + job_timeout=int(os.environ.get("AMS_JOB_TIMEOUT", "3600")), + result_ttl=86400, + ) + return rq_job.id diff --git a/ams/worker/tasks.py b/ams/worker/tasks.py new file mode 100644 index 0000000..a460110 --- /dev/null +++ b/ams/worker/tasks.py @@ -0,0 +1,55 @@ +"""The RQ task body. Runs inside the Ghidra-equipped worker container. + +It re-points SQLAlchemy at the shared DATABASE_URL (the worker is a separate process +from the API), drives the acquisition pipeline, and walks the Job row through its +status transitions so the API/UI can poll progress.""" + +from __future__ import annotations + +import os +import traceback + +from ..acquire import acquire +from ..api.db import configure, get_session, init_db +from ..api.models import Job + + +def _set(job_id: int, **fields): + """Patch a Job row in its own short-lived session (worker may run far from the API).""" + db = get_session() + try: + job = db.get(Job, job_id) + if job is None: + return None + for k, v in fields.items(): + setattr(job, k, v) + db.commit() + return job + finally: + db.close() + + +def run_acquisition( + job_id: int, + source_path: str, + game_name: str | None = None, + database_url: str | None = None, +) -> dict: + """Acquire `source_path` into the catalog, updating Job #`job_id` as it goes. + + Returns a small result dict (also stored by RQ). Errors are recorded on the Job row + and re-raised so RQ marks the job failed.""" + configure(database_url) + init_db() + + _set(job_id, status="started", error=None) + try: + result = acquire(source_path, game_name, sink="db") + except Exception as exc: # record then re-raise so RQ sees the failure too + _set(job_id, status="failed", error="{0}: {1}".format(type(exc).__name__, exc)) + traceback.print_exc() + raise + + _set(job_id, status="finished", snapshot_id=result.imported_id, + dll_name=os.path.basename(result.dll), error=None) + return {"snapshot_id": result.imported_id, "engine": result.engine, "dll": result.dll} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..2780fe4 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,72 @@ +# Aidem Media Playground — full stack. +# +# db Postgres (durable catalog) +# redis job queue broker +# api FastAPI + Command Center UI (http://localhost:8000) +# worker Ghidra-headless acquisition worker (drains the 'acquire' queue) +# +# api and worker share the `uploads` volume: the API streams an uploaded archive there, +# the worker reads it back by path. Bring it up with: docker compose up --build +# +# NOTE: the worker image downloads Ghidra (~1 GB) on first build — that layer is slow but cached. + +services: + db: + image: postgres:16-alpine + environment: + POSTGRES_USER: ams + POSTGRES_PASSWORD: ams + POSTGRES_DB: ams + volumes: + - pgdata:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ams"] + interval: 5s + timeout: 3s + retries: 10 + + redis: + image: redis:7-alpine + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 10 + + api: + build: + context: . + dockerfile: docker/api.Dockerfile + environment: + DATABASE_URL: postgresql+psycopg://ams:ams@db:5432/ams + REDIS_URL: redis://redis:6379/0 + AMS_UPLOAD_DIR: /data/uploads + ports: + - "8000:8000" + volumes: + - uploads:/data/uploads + depends_on: + db: + condition: service_healthy + redis: + condition: service_healthy + + worker: + build: + context: . + dockerfile: docker/worker.Dockerfile + environment: + DATABASE_URL: postgresql+psycopg://ams:ams@db:5432/ams + REDIS_URL: redis://redis:6379/0 + AMS_UPLOAD_DIR: /data/uploads + volumes: + - uploads:/data/uploads + depends_on: + db: + condition: service_healthy + redis: + condition: service_healthy + +volumes: + pgdata: + uploads: diff --git a/docker/api.Dockerfile b/docker/api.Dockerfile new file mode 100644 index 0000000..350d25e --- /dev/null +++ b/docker/api.Dockerfile @@ -0,0 +1,19 @@ +# API + Command Center UI. Stays slim — the heavy Ghidra lifting lives in the worker image. +FROM python:3.12-slim + +WORKDIR /app + +# Copy metadata first for layer caching, then the source. +COPY pyproject.toml README.md ./ +COPY ams ./ams +COPY ghidra_scripts ./ghidra_scripts +COPY snapshots ./snapshots + +# Editable install keeps ams + ghidra_scripts co-located (the worker resolves the script +# path relative to the package). The API needs the queue client too, to enqueue jobs. +RUN pip install --no-cache-dir -e ".[api]" rq redis "psycopg[binary]>=3.1" + +ENV AMS_UPLOAD_DIR=/data/uploads +EXPOSE 8000 + +CMD ["uvicorn", "ams.api.app:create_app", "--factory", "--host", "0.0.0.0", "--port", "8000"] diff --git a/docker/worker.Dockerfile b/docker/worker.Dockerfile new file mode 100644 index 0000000..62224c2 --- /dev/null +++ b/docker/worker.Dockerfile @@ -0,0 +1,35 @@ +# Ghidra-equipped acquisition worker. Self-contained: bundles JDK 21 + a pinned Ghidra +# release so `docker compose up` just works (at the cost of a heavy, slow-to-build image). +# +# Override the Ghidra build without editing this file: +# docker build --build-arg GHIDRA_URL=https://github.com/.../ghidra_X_PUBLIC_DATE.zip ... +FROM eclipse-temurin:21-jdk-jammy + +ARG GHIDRA_URL=https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_11.3_build/ghidra_11.3_PUBLIC_20250205.zip + +# Runtime deps: python (the package), unzip/wget (fetch Ghidra), libarchive-tools (bsdtar: +# unpacks ISO9660 + ZIP game archives). +RUN apt-get update && apt-get install -y --no-install-recommends \ + python3 python3-pip unzip wget ca-certificates libarchive-tools \ + && rm -rf /var/lib/apt/lists/* + +# Fetch + unpack Ghidra into /opt/ghidra (strip the versioned top-level dir). +RUN wget -q "$GHIDRA_URL" -O /tmp/ghidra.zip \ + && unzip -q /tmp/ghidra.zip -d /opt \ + && mv /opt/ghidra_* /opt/ghidra \ + && rm /tmp/ghidra.zip + +ENV GHIDRA_HOME=/opt/ghidra +ENV AMS_GHIDRA_SCRIPTS=/app/ghidra_scripts +ENV AMS_UPLOAD_DIR=/data/uploads + +WORKDIR /app +COPY pyproject.toml README.md ./ +COPY ams ./ams +COPY ghidra_scripts ./ghidra_scripts +COPY snapshots ./snapshots + +RUN pip3 install --no-cache-dir -e ".[api,acquire,worker]" + +# Drain the 'acquire' queue. Shell form so $REDIS_URL expands at runtime. +CMD rq worker --url "${REDIS_URL:-redis://redis:6379/0}" acquire diff --git a/pyproject.toml b/pyproject.toml index 2fbe54d..15fbc7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,10 +15,16 @@ api = [ "uvicorn[standard]>=0.27", "sqlalchemy>=2.0", "pydantic>=2.6", + "python-multipart>=0.0.9", # file uploads for the acquisition endpoint ] acquire = [ "ppdeep>=20200505", # pure-Python ssdeep, for fuzzy/near-duplicate hashing (optional) ] +worker = [ + "rq>=1.16", + "redis>=5.0", + "psycopg[binary]>=3.1", # Postgres driver for the shared catalog DB +] dev = [ "pytest>=8", "httpx>=0.27", diff --git a/tests/test_worker.py b/tests/test_worker.py new file mode 100644 index 0000000..c9f16e1 --- /dev/null +++ b/tests/test_worker.py @@ -0,0 +1,118 @@ +"""Async-acquisition layer: the RQ task body and the upload/enqueue endpoint. +Redis, RQ and Ghidra are all stubbed, so this runs in the plain dev venv.""" + +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from ams.api.db import configure, get_session, init_db +from ams.api.models import Job + + +def _make_job(tmp_path, **kw) -> int: + configure("sqlite:///{0}/w.db".format(tmp_path)) + init_db() + db = get_session() + try: + job = Job(source_name="game.zip", source_path=str(tmp_path / "game.zip"), + game_name=kw.get("game"), status="queued") + db.add(job) + db.commit() + return job.id + finally: + db.close() + + +# --- task body -------------------------------------------------------------------------------- + +def test_run_acquisition_success(tmp_path, monkeypatch): + job_id = _make_job(tmp_path, game="Reksio i UFO") + + fake = SimpleNamespace(imported_id=42, engine="Piklib", dll=str(tmp_path / "PIKLIB8.dll")) + monkeypatch.setattr("ams.worker.tasks.acquire", lambda *a, **k: fake) + + from ams.worker.tasks import run_acquisition + out = run_acquisition(job_id, str(tmp_path / "game.zip"), "Reksio i UFO", + database_url="sqlite:///{0}/w.db".format(tmp_path)) + assert out["snapshot_id"] == 42 + + db = get_session() + try: + job = db.get(Job, job_id) + assert job.status == "finished" + assert job.snapshot_id == 42 and job.dll_name == "PIKLIB8.dll" + assert job.error is None + finally: + db.close() + + +def test_run_acquisition_failure_records_error(tmp_path, monkeypatch): + job_id = _make_job(tmp_path) + + def boom(*a, **k): + raise RuntimeError("no engine DLL found") + monkeypatch.setattr("ams.worker.tasks.acquire", boom) + + from ams.worker.tasks import run_acquisition + with pytest.raises(RuntimeError): + run_acquisition(job_id, str(tmp_path / "game.zip"), + database_url="sqlite:///{0}/w.db".format(tmp_path)) + + db = get_session() + try: + job = db.get(Job, job_id) + assert job.status == "failed" + assert "no engine DLL found" in job.error + finally: + db.close() + + +# --- upload + enqueue endpoint ---------------------------------------------------------------- + +@pytest.fixture() +def client(tmp_path, monkeypatch): + pytest.importorskip("multipart") # python-multipart, needed for file uploads + from fastapi.testclient import TestClient + + from ams.api.app import create_app + from ams.api.routes import jobs + + monkeypatch.setattr(jobs, "UPLOAD_DIR", str(tmp_path / "uploads")) + app = create_app(database_url="sqlite:///{0}/api.db".format(tmp_path)) + return TestClient(app) + + +def test_create_job_enqueues(client, monkeypatch): + calls = {} + + def fake_enqueue(path, game, job_id): + calls["path"], calls["game"], calls["job_id"] = path, game, job_id + return "rq-deadbeef" + monkeypatch.setattr("ams.worker.queue.enqueue_acquisition", fake_enqueue) + + r = client.post("/jobs", files={"file": ("reksio.zip", b"PK\x03\x04 fake", "application/zip")}, + data={"game": "Reksio i UFO"}) + assert r.status_code == 202 + body = r.json() + assert body["status"] == "queued" and body["rq_id"] == "rq-deadbeef" + assert body["game_name"] == "Reksio i UFO" and body["source_name"] == "reksio.zip" + # the bytes were streamed to the shared upload dir and that path was enqueued + assert calls["job_id"] == body["id"] + import os + assert os.path.isfile(calls["path"]) + + listed = client.get("/jobs").json() + assert [j["id"] for j in listed] == [body["id"]] + assert client.get("/jobs/{0}".format(body["id"])).json()["status"] == "queued" + + +def test_create_job_enqueue_failure_returns_503(client, monkeypatch): + def boom(*a, **k): + raise RuntimeError("redis down") + monkeypatch.setattr("ams.worker.queue.enqueue_acquisition", boom) + + r = client.post("/jobs", files={"file": ("x.dll", b"MZ", "application/octet-stream")}) + assert r.status_code == 503 + assert "redis down" in r.json()["detail"]