Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions trapdata/antenna/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,27 @@ def get_full_service_name(service_name: str) -> str:
def get_jobs(
base_url: str,
auth_token: str,
pipeline_slug: str,
pipeline_slugs: list[str],
processing_service_name: str,
) -> list[int]:
"""Fetch job ids from the API for the given pipeline.
) -> list[tuple[int, str]]:
"""Fetch job ids from the API for the given pipelines in a single request.

Calls: GET {base_url}/jobs?pipeline__slug=<pipeline>&ids_only=1&processing_service_name=<name>
Calls: GET {base_url}/jobs?pipeline__slug__in=<slugs>&ids_only=1&processing_service_name=<name>

Args:
base_url: Antenna API base URL (e.g., "http://localhost:8000/api/v2")
auth_token: API authentication token
pipeline_slug: Pipeline slug to filter jobs
pipeline_slugs: List of pipeline slugs to filter jobs
processing_service_name: Name of the processing service

Returns:
List of job ids (possibly empty) on success or error.
List of (job_id, pipeline_slug) tuples (possibly empty) on success or error.
"""
with get_http_session(auth_token) as session:
try:
url = f"{base_url.rstrip('/')}/jobs"
params = {
"pipeline__slug": pipeline_slug,
"pipeline__slug__in": ",".join(pipeline_slugs),
"ids_only": 1,
"incomplete_only": 1,
"processing_service_name": processing_service_name,
Expand All @@ -61,7 +61,7 @@ def get_jobs(

# Parse and validate response with Pydantic
jobs_response = AntennaJobsListResponse.model_validate(resp.json())
return [job.id for job in jobs_response.results]
return [(job.id, job.pipeline_slug) for job in jobs_response.results]
except requests.RequestException as e:
logger.error(f"Failed to fetch jobs from {base_url}: {e}")
return []
Expand Down
1 change: 1 addition & 0 deletions trapdata/antenna/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class AntennaJobListItem(pydantic.BaseModel):
"""A single job item from the Antenna jobs list API response."""

id: int
pipeline_slug: str


class AntennaJobsListResponse(pydantic.BaseModel):
Expand Down
49 changes: 25 additions & 24 deletions trapdata/antenna/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,32 +90,33 @@ def _worker_loop(gpu_id: int, pipelines: list[str]):
# These should probably come from a dedicated endpoint and should preempt batch jobs under the assumption that they
# would run on the same GPU.
any_jobs = False
for pipeline in pipelines:
logger.info(f"[GPU {gpu_id}] Checking for jobs for pipeline {pipeline}")
jobs = get_jobs(
base_url=settings.antenna_api_base_url,
auth_token=settings.antenna_api_auth_token,
pipeline_slug=pipeline,
processing_service_name=full_service_name,
logger.info(
f"[GPU {gpu_id}] Checking for jobs for pipelines: {', '.join(pipelines)}"
)
jobs = get_jobs(
base_url=settings.antenna_api_base_url,
auth_token=settings.antenna_api_auth_token,
pipeline_slugs=pipelines,
processing_service_name=full_service_name,
)
for job_id, pipeline in jobs:
logger.info(
f"[GPU {gpu_id}] Processing job {job_id} with pipeline {pipeline}"
)
for job_id in jobs:
logger.info(
f"[GPU {gpu_id}] Processing job {job_id} with pipeline {pipeline}"
try:
any_work_done = _process_job(
pipeline=pipeline,
job_id=job_id,
settings=settings,
processing_service_name=full_service_name,
)
try:
any_work_done = _process_job(
pipeline=pipeline,
job_id=job_id,
settings=settings,
processing_service_name=full_service_name,
)
any_jobs = any_jobs or any_work_done
except Exception as e:
logger.error(
f"[GPU {gpu_id}] Failed to process job {job_id} with pipeline {pipeline}: {e}",
exc_info=True,
)
# Continue to next job rather than crashing the worker
any_jobs = any_jobs or any_work_done
except Exception as e:
logger.error(
f"[GPU {gpu_id}] Failed to process job {job_id} with pipeline {pipeline}: {e}",
exc_info=True,
)
# Continue to next job rather than crashing the worker

if not any_jobs:
logger.info(
Expand Down
Loading