The Cosmos Evaluator service framework provides building blocks for wrapping evaluation checks as REST API microservices. Each service follows a consistent pattern: receive a request, validate it, run the check, and return formatted results.
┌─────────────────────────────────────────────────────┐
│ FastAPI Application │
│ │
│ GET /health POST /process GET /config │
└────────┬──────────────┬──────────────────┬──────────┘
│ │ │
│ ┌─────────▼──────────┐ │
│ │ RequestHandler │ │
│ │ (parse & validate)│ │
│ └─────────┬──────────┘ │
│ │ │
│ ┌─────────▼──────────┐ │
│ │ ServiceBase │ │
│ │(validate & process)│ │
│ └─────────┬──────────┘ │
│ │ │
│ ┌─────────▼──────────┐ │
│ │ ResponseFormatter │ │
│ │ (format response) │ │
│ └────────────────────┘ │
│ │
┌────▼─────────────────────────────────▼──┐
│ StorageProvider │
│ (upload results, download inputs) │
└─────────────────────────────────────────┘
The abstract base class that all checker services inherit from. Defines the contract every service must implement.
from services.framework.service_base import ServiceBase
class MyService(ServiceBase[MyRequest, MyResult]):
async def validate_input(self, request: MyRequest) -> bool:
"""Validate that the request data is processable.
Return True if valid. Raise an exception with details if not."""
...
async def process(self, request: MyRequest) -> MyResult:
"""Run the evaluation logic. Returns the result."""
...ServiceBase is generic over RequestType and ResponseType — both should be Pydantic BaseModel subclasses.
It also provides:
- A pre-configured
self.loggerfor structured logging cleanup(output_dir)to remove temporary directories after processing
The framework defines three protocol interfaces. These are runtime-checkable Python protocols, so you can implement them however you like as long as the method signatures match.
Parses and validates incoming HTTP requests into your domain model.
class RequestHandler(Protocol[RequestType]):
async def parse(self, raw_request: Any) -> RequestType: ...
async def validate(self, request_data: RequestType) -> bool: ...
def get_content_type(self) -> str: ...Built-in implementation: JsonRequestHandler — Parses JSON request bodies into Pydantic models.
Formats service responses into a consistent envelope.
class ResponseFormatter(Protocol[ResponseType]):
async def format_success(self, data: Any, metadata: dict | None = None) -> ResponseType: ...
async def format_error(self, error: Exception, status_code: int = 500) -> ResponseType: ...
async def format_progress(self, progress: float, message: str = "") -> ResponseType: ...
def get_content_type(self) -> str: ...Built-in implementation: JsonResponseFormatter — Wraps responses in a standard JSON envelope:
Success response:
{
"success": true,
"data": { ... },
"metadata": { ... },
"timestamp": "2025-01-01T00:00:00Z"
}Error response:
{
"success": false,
"error": {
"type": "ValueError",
"message": "Invalid input",
"code": null,
"details": null
},
"timestamp": "2025-01-01T00:00:00Z"
}Additional methods on JsonResponseFormatter:
format_validation_error(validation_errors, status_code=422)— For Pydantic validation failuresformat_paginated_response(items, total, page, page_size)— For paginated list endpoints
Cloud-agnostic interface for storing and retrieving files and data.
class StorageProvider(Protocol):
async def store(self, data: Any, key: str, metadata: dict | None = None) -> str: ...
async def retrieve(self, key: str) -> Any: ...
async def delete(self, key: str) -> bool: ...
async def exists(self, key: str) -> bool: ...
def list_keys(self, prefix: str = "") -> AsyncIterator[str]: ...
async def store_file(self, file_path: Path, key: str, ...) -> StorageUrls: ...
async def generate_presigned_url(self, key: str, ...) -> str: ...
async def download_from_url(self, url: str, destination: Path) -> Path: ...Built-in implementations:
LocalStorageProvider— File system storage (good for development)S3StorageProvider— AWS S3 with presigned URL supportStorageProviderFactory— Creates the right provider based on settings
StorageUrls is a dataclass with raw (permanent) and presigned (temporary access) URL fields.
Base class for service configuration using Pydantic Settings:
from services.settings_base import SettingsBase
class Settings(SettingsBase):
my_custom_setting: str = Field(default="value")
model_config = SettingsConfigDict(
env_prefix="MYCHECKER_",
...
)Common settings inherited from SettingsBase:
| Setting | Env Variable | Default | Description |
|---|---|---|---|
env |
COSMOS_EVALUATOR_ENV |
local |
Environment: local for development |
log_level |
COSMOS_EVALUATOR_LOG_LEVEL |
INFO |
Logging level |
storage_type |
COSMOS_EVALUATOR_STORAGE_TYPE |
s3 |
Storage backend: s3 or local |
storage_bucket |
COSMOS_EVALUATOR_STORAGE_BUCKET |
— | S3 bucket name |
storage_region |
COSMOS_EVALUATOR_STORAGE_REGION |
— | AWS region |
storage_access_key |
COSMOS_EVALUATOR_STORAGE_ACCESS_KEY |
— | AWS access key |
storage_secret_key |
COSMOS_EVALUATOR_STORAGE_SECRET_KEY |
— | AWS secret key |
In local environment, credentials are automatically loaded from ~/.cosmos_evaluator/.env.
Every Cosmos Evaluator service exposes three endpoints:
Returns service status for monitoring and service discovery.
{
"success": true,
"data": {
"service": "obstacle_correspondence",
"status": "healthy",
"version": "1.15.0",
"environment": "local"
}
}The main processing endpoint. Accepts a request, runs the check, and returns results.
The request and response models are service-specific (see individual check documentation). The response is always wrapped in the standard envelope.
Returns the default configuration for the checker, useful for clients to discover available options.
{
"success": true,
"data": {
"default_config": {
"av.obstacle": { ... }
}
}
}# All services together
dazel run //services:rest_apis
# Individual services
dazel run //services/obstacle_correspondence:rest_api
dazel run //services/vlm:rest_api
# With custom port
dazel run //services/obstacle_correspondence:rest_api -- --port 9000Default ports (local development):
| Service | Port |
|---|---|
| Obstacle Correspondence | 8082 |
| VLM | 8083 |
# Check health
curl http://localhost:8082/health
# Get default config
curl http://localhost:8082/config
# Process a request (example uses repo sample data; mount with -v $(pwd)/checks/sample_data/cosmos_public:/data for local mode)
curl -X POST http://localhost:8082/process \
-H "Content-Type: application/json" \
-d '{
"clip_id": "01ce78ad-9e9a-4df9-95d1-1d50e41a04ce_764657799000_764677799000",
"rds_hq_url": "/data/rds_hq.zip",
"augmented_video_url": "/data/01ce78ad-9e9a-4df9-95d1-1d50e41a04ce_764657799000_764677799000_0_Morning.30fps.mp4",
"camera_name": "camera_front_wide_120fov"
}'See the Custom Checker Tutorial for a complete walkthrough of building a new checker service from scratch.
The general steps are:
- Define models — Pydantic request/response models
- Implement service — Subclass
ServiceBasewith your check logic - Configure settings — Subclass
SettingsBasefor service-specific config - Build REST API — FastAPI app with
/health,/process,/configendpoints - Create BUILD file — Bazel targets for library, binary, and tests
The Obstacle and VLM services in services/obstacle_correspondence/ and services/vlm/ serve as reference implementations.