Skip to content

Commit 8a59673

Browse files
mihowclaude
andauthored
feat: fetch jobs for all pipelines in a single API request (#114)
* feat: fetch jobs for all pipelines in a single API request Replace the per-pipeline loop with a single call using pipeline__slug__in filter. This reduces N API requests per poll cycle to 1, regardless of how many pipelines the worker handles. The Antenna MinimalJobSerializer now returns pipeline_slug so the worker knows which pipeline each job belongs to. Co-Authored-By: Claude <[email protected]> * fix: update tests for pipeline__slug__in and add empty guard - Guard against empty pipeline_slugs list in get_jobs() - Update mock server to accept pipeline__slug__in parameter - Include pipeline_slug in AntennaJobListItem construction - Update test assertions for new tuple return type Co-Authored-By: Claude <[email protected]> --------- Co-authored-by: Claude <[email protected]>
1 parent 1faa7d0 commit 8a59673

5 files changed

Lines changed: 60 additions & 41 deletions

File tree

trapdata/antenna/client.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,29 @@ def get_full_service_name(service_name: str) -> str:
2929
def get_jobs(
3030
base_url: str,
3131
auth_token: str,
32-
pipeline_slug: str,
32+
pipeline_slugs: list[str],
3333
processing_service_name: str,
34-
) -> list[int]:
35-
"""Fetch job ids from the API for the given pipeline.
34+
) -> list[tuple[int, str]]:
35+
"""Fetch job ids from the API for the given pipelines in a single request.
3636
37-
Calls: GET {base_url}/jobs?pipeline__slug=<pipeline>&ids_only=1&processing_service_name=<name>
37+
Calls: GET {base_url}/jobs?pipeline__slug__in=<slugs>&ids_only=1&processing_service_name=<name>
3838
3939
Args:
4040
base_url: Antenna API base URL (e.g., "http://localhost:8000/api/v2")
4141
auth_token: API authentication token
42-
pipeline_slug: Pipeline slug to filter jobs
42+
pipeline_slugs: List of pipeline slugs to filter jobs
4343
processing_service_name: Name of the processing service
4444
4545
Returns:
46-
List of job ids (possibly empty) on success or error.
46+
List of (job_id, pipeline_slug) tuples (possibly empty) on success or error.
4747
"""
4848
with get_http_session(auth_token) as session:
4949
try:
50+
if not pipeline_slugs:
51+
return []
5052
url = f"{base_url.rstrip('/')}/jobs"
5153
params = {
52-
"pipeline__slug": pipeline_slug,
54+
"pipeline__slug__in": ",".join(pipeline_slugs),
5355
"ids_only": 1,
5456
"incomplete_only": 1,
5557
"processing_service_name": processing_service_name,
@@ -61,7 +63,7 @@ def get_jobs(
6163

6264
# Parse and validate response with Pydantic
6365
jobs_response = AntennaJobsListResponse.model_validate(resp.json())
64-
return [job.id for job in jobs_response.results]
66+
return [(job.id, job.pipeline_slug) for job in jobs_response.results]
6567
except requests.RequestException as e:
6668
logger.error(f"Failed to fetch jobs from {base_url}: {e}")
6769
return []

trapdata/antenna/schemas.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class AntennaJobListItem(pydantic.BaseModel):
2525
"""A single job item from the Antenna jobs list API response."""
2626

2727
id: int
28+
pipeline_slug: str
2829

2930

3031
class AntennaJobsListResponse(pydantic.BaseModel):

trapdata/antenna/tests/antenna_api_server.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@
2929

3030
@app.get("/api/v2/jobs")
3131
def get_jobs(
32-
pipeline__slug: str,
33-
ids_only: int,
34-
incomplete_only: int,
32+
pipeline__slug__in: str = "",
33+
ids_only: int = 1,
34+
incomplete_only: int = 1,
3535
processing_service_name: str = "",
3636
):
3737
"""Return available job IDs.
3838
3939
Args:
40-
pipeline__slug: Pipeline slug filter
40+
pipeline__slug__in: Comma-separated pipeline slugs filter
4141
ids_only: If 1, return only job IDs
4242
incomplete_only: If 1, return only incomplete jobs
4343
processing_service_name: Name of the processing service making the request
@@ -48,9 +48,17 @@ def get_jobs(
4848
global _last_get_jobs_service_name
4949
_last_get_jobs_service_name = processing_service_name
5050

51+
# Determine pipeline slug for response (use first slug from filter)
52+
slugs = (
53+
[s for s in pipeline__slug__in.split(",") if s] if pipeline__slug__in else []
54+
)
55+
default_slug = slugs[0] if slugs else "test_pipeline"
56+
5157
# Return all jobs in queue (for testing, we return all registered jobs)
5258
job_ids = list(_jobs_queue.keys())
53-
results = [AntennaJobListItem(id=job_id) for job_id in job_ids]
59+
results = [
60+
AntennaJobListItem(id=job_id, pipeline_slug=default_slug) for job_id in job_ids
61+
]
5462
return AntennaJobsListResponse(results=results)
5563

5664

trapdata/antenna/tests/test_worker.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,13 @@ def test_returns_job_ids(self):
220220

221221
with patch_antenna_api_requests(self.antenna_client):
222222
result = get_jobs(
223-
"http://testserver/api/v2", "test-token", "moths_2024", "Test Worker"
223+
"http://testserver/api/v2",
224+
"test-token",
225+
["moths_2024"],
226+
"Test Worker",
224227
)
225228

226-
assert result == [10, 20, 30]
229+
assert [job_id for job_id, _ in result] == [10, 20, 30]
227230
assert antenna_api_server.get_last_get_jobs_service_name() == "Test Worker"
228231

229232

@@ -460,9 +463,13 @@ def test_full_workflow_with_real_inference(self):
460463
assert success is True
461464

462465
# Step 2: Get jobs
463-
job_ids = get_jobs(
464-
"http://testserver/api/v2", "test-token", pipeline_slug, "Test Worker"
466+
jobs = get_jobs(
467+
"http://testserver/api/v2",
468+
"test-token",
469+
[pipeline_slug],
470+
"Test Worker",
465471
)
472+
job_ids = [job_id for job_id, _ in jobs]
466473
assert 200 in job_ids
467474
assert antenna_api_server.get_last_get_jobs_service_name() == "Test Worker"
468475

trapdata/antenna/worker.py

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -90,32 +90,33 @@ def _worker_loop(gpu_id: int, pipelines: list[str]):
9090
# These should probably come from a dedicated endpoint and should preempt batch jobs under the assumption that they
9191
# would run on the same GPU.
9292
any_jobs = False
93-
for pipeline in pipelines:
94-
logger.info(f"[GPU {gpu_id}] Checking for jobs for pipeline {pipeline}")
95-
jobs = get_jobs(
96-
base_url=settings.antenna_api_base_url,
97-
auth_token=settings.antenna_api_auth_token,
98-
pipeline_slug=pipeline,
99-
processing_service_name=full_service_name,
93+
logger.info(
94+
f"[GPU {gpu_id}] Checking for jobs for pipelines: {', '.join(pipelines)}"
95+
)
96+
jobs = get_jobs(
97+
base_url=settings.antenna_api_base_url,
98+
auth_token=settings.antenna_api_auth_token,
99+
pipeline_slugs=pipelines,
100+
processing_service_name=full_service_name,
101+
)
102+
for job_id, pipeline in jobs:
103+
logger.info(
104+
f"[GPU {gpu_id}] Processing job {job_id} with pipeline {pipeline}"
100105
)
101-
for job_id in jobs:
102-
logger.info(
103-
f"[GPU {gpu_id}] Processing job {job_id} with pipeline {pipeline}"
106+
try:
107+
any_work_done = _process_job(
108+
pipeline=pipeline,
109+
job_id=job_id,
110+
settings=settings,
111+
processing_service_name=full_service_name,
104112
)
105-
try:
106-
any_work_done = _process_job(
107-
pipeline=pipeline,
108-
job_id=job_id,
109-
settings=settings,
110-
processing_service_name=full_service_name,
111-
)
112-
any_jobs = any_jobs or any_work_done
113-
except Exception as e:
114-
logger.error(
115-
f"[GPU {gpu_id}] Failed to process job {job_id} with pipeline {pipeline}: {e}",
116-
exc_info=True,
117-
)
118-
# Continue to next job rather than crashing the worker
113+
any_jobs = any_jobs or any_work_done
114+
except Exception as e:
115+
logger.error(
116+
f"[GPU {gpu_id}] Failed to process job {job_id} with pipeline {pipeline}: {e}",
117+
exc_info=True,
118+
)
119+
# Continue to next job rather than crashing the worker
119120

120121
if not any_jobs:
121122
logger.info(

0 commit comments

Comments
 (0)