"""RQ queue handle. Redis/RQ are imported lazily so the API (and the test suite) can import this module without those packages installed — they're only needed when something is actually enqueued or a worker runs.""" from __future__ import annotations import os QUEUE_NAME = "acquire" DEFAULT_REDIS_URL = "redis://localhost:6379/0" TASK_PATH = "ams.worker.tasks.run_acquisition" # importable by the RQ worker process def redis_url() -> str: return os.environ.get("REDIS_URL", DEFAULT_REDIS_URL) def get_queue(): """Build an RQ Queue bound to Redis from $REDIS_URL (raises if rq/redis missing).""" from redis import Redis # lazy from rq import Queue return Queue(QUEUE_NAME, connection=Redis.from_url(redis_url())) def enqueue_acquisition(source_path: str, game_name: str | None, job_id: int) -> str: """Enqueue one acquisition and return the RQ job id. The DB Job row (`job_id`) is the durable record; the worker re-opens it to report progress. We pass the API's DATABASE_URL through so the worker writes to the same DB.""" q = get_queue() rq_job = q.enqueue( TASK_PATH, kwargs={ "job_id": job_id, "source_path": source_path, "game_name": game_name, "database_url": os.environ.get("DATABASE_URL"), }, job_timeout=int(os.environ.get("AMS_JOB_TIMEOUT", "3600")), result_ttl=86400, ) return rq_job.id