Brings up the documented target architecture as a docker-compose stack — a modular monolith with the Ghidra step split into its own async worker. - worker/: RQ queue (lazy redis import) + run_acquisition task (Job status queued→started→finished/failed, drives ams.acquire with sink=db) - Job model + JobOut schema; Snapshot.data is JSONB on Postgres - POST/GET /jobs: stream an upload to a shared volume, enqueue, poll status - docker/api.Dockerfile (slim) + docker/worker.Dockerfile (JDK21 + Ghidra fetched at build, overridable via GHIDRA_URL) + docker-compose.yml - ghidra.py: AMS_GHIDRA_SCRIPTS override for in-container script path - pyproject: [worker] extra (rq/redis/psycopg), python-multipart in [api] - tests: 4 new (task success/failure + endpoint enqueue/503) -> 22/22 Verified: API image builds, container serves /health + /ui + /jobs; compose config validates. Worker image (downloads ~1 GB Ghidra) not built here. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
45 lines
1.4 KiB
Python
45 lines
1.4 KiB
Python
"""RQ queue handle. Redis/RQ are imported lazily so the API (and the test suite)
|
|
can import this module without those packages installed — they're only needed when
|
|
something is actually enqueued or a worker runs."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
|
|
QUEUE_NAME = "acquire"
|
|
DEFAULT_REDIS_URL = "redis://localhost:6379/0"
|
|
|
|
TASK_PATH = "ams.worker.tasks.run_acquisition" # importable by the RQ worker process
|
|
|
|
|
|
def redis_url() -> str:
|
|
return os.environ.get("REDIS_URL", DEFAULT_REDIS_URL)
|
|
|
|
|
|
def get_queue():
|
|
"""Build an RQ Queue bound to Redis from $REDIS_URL (raises if rq/redis missing)."""
|
|
from redis import Redis # lazy
|
|
from rq import Queue
|
|
|
|
return Queue(QUEUE_NAME, connection=Redis.from_url(redis_url()))
|
|
|
|
|
|
def enqueue_acquisition(source_path: str, game_name: str | None, job_id: int) -> str:
|
|
"""Enqueue one acquisition and return the RQ job id.
|
|
|
|
The DB Job row (`job_id`) is the durable record; the worker re-opens it to report
|
|
progress. We pass the API's DATABASE_URL through so the worker writes to the same DB."""
|
|
q = get_queue()
|
|
rq_job = q.enqueue(
|
|
TASK_PATH,
|
|
kwargs={
|
|
"job_id": job_id,
|
|
"source_path": source_path,
|
|
"game_name": game_name,
|
|
"database_url": os.environ.get("DATABASE_URL"),
|
|
},
|
|
job_timeout=int(os.environ.get("AMS_JOB_TIMEOUT", "3600")),
|
|
result_ttl=86400,
|
|
)
|
|
return rq_job.id
|