-
Notifications
You must be signed in to change notification settings - Fork 11
Description
Problem
The /jobs/{id}/tasks/ endpoint uses GET but has destructive side effects — it calls manager.reserve_task() which consumes messages from the NATS JetStream queue. Each call removes tasks from the queue and returns different results, violating both the safety and idempotency guarantees of HTTP GET.
Both /tasks and /result also lack proper DRF serializers for incoming data — batch is validated with a raw IntegerField().clean() and results are manually looped through PipelineTaskResult(**item). The processing_service_name parameter introduced in #1117 is passed as a query param (including on the POST /result endpoint) and read with unsanitized query_params.get().
Relevant code: ami/jobs/views.py — JobViewSet.tasks() and JobViewSet.result() actions
Proposed changes
1. Change /tasks from GET to POST
The run, retry, and cancel actions on the same ViewSet already use POST for similar non-CRUD operations.
Before
GET /api/v2/jobs/60/tasks/?batch=4&project_id=18&processing_service_name=ADC
After
POST /api/v2/jobs/60/tasks/?project_id=18
Content-Type: application/json
{"batch": 4, "processing_service_name": "AMI Data Companion (hostname)"}
2. Add DRF serializers for both endpoints
Replace raw query param reading and manual Pydantic validation with proper serializers:
/tasks — new TasksRequestSerializer:
class TasksRequestSerializer(serializers.Serializer):
batch = serializers.IntegerField(min_value=1)
processing_service_name = serializers.CharField(required=False, max_length=200)/result — new ResultRequestSerializer wrapping the bare list:
class ResultRequestSerializer(serializers.Serializer):
processing_service_name = serializers.CharField(required=False, max_length=200)
results = SchemaField(schema=list[PipelineTaskResult])This follows the same pattern as PipelineRegistrationSerializer which already has processing_service_name = serializers.CharField().
3. Clean up #1117 artifacts
Once both endpoints use serializers:
- Remove
processing_service_name_paramOpenApiParameter fromami/jobs/schemas.py - Remove
_log_processing_service_name()helper fromami/jobs/views.py - Replace per-action logging boilerplate with reads from
serializer.validated_data - Add proper response schemas (replace
responses={200: dict}and hand-crafted dicts)
4. Sanitize and fall back to caller IP
Add a shared validate_processing_service_name that strips control characters, enforces max length, and falls back to the caller's IP when the field is blank or absent.
Context
This endpoint is used by external processing services (e.g., AMI Data Companion) to pull tasks for async ML processing. The flow is:
POST /jobs/{id}/tasks/— grab tasks to process- Worker processes tasks
POST /jobs/{id}/result/— submit results
Knowing which service grabbed which tasks is important for debugging — see #1117 and #1112.
Also consider having ADC set a descriptive User-Agent header on its HTTP session — free metadata in access logs with zero application code.
Impact
This is a breaking change for the ADC worker. The corresponding changes need to be made in ami-data-companion.
Checklist
-
/taskschanged from GET to POST -
TasksRequestSerializerwithbatch+processing_service_name -
ResultRequestSerializerwrappinglist[PipelineTaskResult]+processing_service_name - Proper response schemas for both endpoints
- Shared
processing_service_namevalidation with sanitization + IP fallback - Remove
processing_service_name_param,_log_processing_service_namefrom Handle processing_service_name parameters in requests from workers #1117 - Corresponding ADC worker PR for request format changes