-
Notifications
You must be signed in to change notification settings - Fork 2
Description
Discussed in #11
Originally posted by ghost November 18, 2025
Problem
Current Pipeline Implementation Complexity
Asya🎭's envelope protocol supports multi-actor pipelines through the route.actors array, but implementing pipelines requires significant manual configuration:
Current workflow to create a pipeline:
- Deploy AsyncActor CRDs for each stage (e.g.,
image-resizer,image-optimizer,thumbnail-generator) - Manually create gateway tool configuration in
src/asya-gateway/config/tools.yaml - Write tool logic to construct envelope with correct
route.actorsarray - Restart gateway to load new tool configuration
- Manually document the pipeline flow for team members
- Repeat steps 2-4 for every pipeline change
Problems this creates:
- Configuration sprawl: Pipeline definition is split across AsyncActor CRDs and gateway tool configs
- No validation: Gateway tools can reference non-existent actors, causing runtime failures
- Poor discoverability: No single place to see all available pipelines
- Manual synchronization: Changing pipeline requires updating multiple files and restarting gateway
- No version control linkage: Pipeline changes in AsyncActors don't automatically update gateway tools
- Operational overhead: Teams must understand envelope protocol, route construction, and gateway tool API
Motivating Scenarios
Scenario 1: ML Inference Pipeline
User wants: image → preprocessing → model-inference → postprocessing → results
Current implementation:
1. Create 3 AsyncActor CRDs (preprocessing-actor, inference-actor, postprocessing-actor)
2. Edit tools.yaml to add "run_inference" tool
3. Write route construction: {"actors": ["preprocessing-actor", "inference-actor", "postprocessing-actor"], "current": 0}
4. Document pipeline in README or Confluence
5. When adding new model, edit AsyncActor, edit tool, restart gateway
Desired: Single Route CRD that declares pipeline and auto-creates MCP tool
Scenario 2: Multi-Stage Data Processing
Team has 5 different data pipelines:
- ETL pipeline: extract → transform → load (3 actors)
- Analytics pipeline: collect → aggregate → analyze → visualize (4 actors)
- Validation pipeline: sanitize → validate → enrich (3 actors)
- Alert pipeline: detect → classify → notify (3 actors)
- Backup pipeline: snapshot → compress → upload (3 actors)
Current approach: 15 AsyncActors + 5 manual gateway tool configs
Problem: No clear view of which actors belong to which pipeline, tools.yaml becomes unmaintainable
Scenario 3: Pipeline Evolution
Pipeline v1: text-extractor → summarizer
Pipeline v2: text-extractor → translator → summarizer
Pipeline v3: text-extractor → classifier → summarizer (conditional routing in future)
Current approach: Edit tools.yaml, update route construction, restart gateway, update docs
Problem: Pipeline changes require coordinated edits across multiple files, no audit trail
Motivation
Key Insights
-
Pipelines are first-class concepts: Multi-actor workflows are common enough to deserve declarative support, not just protocol-level primitives.
-
Gateway tools should be generated, not written: Most MCP tools just submit envelopes with predefined routes - this is boilerplate that should be auto-generated.
-
Namespace is the natural boundary: Tools belong to a namespace alongside the actors they chain together.
-
Configuration should be centralized: A single AsyncTool CRD should declare actors, tool schema, and documentation - not scattered across files.
-
Validation at deploy time beats runtime errors: AsyncTool CRD can validate actors exist before creating gateway tools, preventing broken pipelines.
Goals
- Simplify pipeline creation: Declare pipeline in single YAML, get working MCP tool automatically
- Auto-generate gateway tools: AsyncTool CRD creates corresponding MCP tool in gateway without manual config
- Centralize pipeline definition: AsyncTool spec includes actors, input schema, description - no separate tool config
- Enable discoverability:
kubectl get asynctools(orkubectl get asyt) shows all available pipelines - Support versioning: AsyncTool changes are version-controlled CRD updates, not manual edits
- Validate at creation time: AsyncTool controller validates actors exist before creating gateway tools
Non-Goals
- Replace envelope protocol: AsyncTool compile to envelope
route.actorsarrays, don't change sidecar behavior - Conditional routing: Initial version supports linear pipelines only (conditional/branching is future work)
- Deployment orchestration: AsyncTools don't deploy actors, just chain existing ones
- Health monitoring: AsyncTool status shows existence, not runtime health (separate concern)
Proposed Solution
AsyncTool CRD
Introduce a namespace-scoped CRD that declares actor pipelines and auto-generates gateway MCP tools.
API Design:
apiVersion: asya.sh/v1alpha1
kind: AsyaTool
metadata:
name: image-processing-pipeline
namespace: production
spec:
# Ordered list of actors forming the pipeline
# Compiled to envelope route.actors array
actors:
- image-resizer
- image-optimizer
- thumbnail-generator
# MCP tool configuration
# If enabled, gateway auto-creates tool with this spec
tool:
# Tool name exposed via MCP
# Must be unique within namespace
name: process_image
# Human-readable description shown in MCP tool list
description: "Resize, optimize, and generate thumbnails for uploaded images"
# JSON Schema for tool input
# Gateway validates input against this schema
# Schema is passed to first actor in pipeline as envelope payload
inputSchema:
type: object
required:
- image_url
properties:
image_url:
type: string
format: uri
description: "URL of the image to process"
max_width:
type: integer
default: 1920
description: "Maximum width in pixels"
max_height:
type: integer
default: 1080
description: "Maximum height in pixels"
quality:
type: integer
minimum: 1
maximum: 100
default: 85
description: "JPEG quality (1-100)"
# Optional: Additional envelope headers
# Merged into all envelopes submitted via this route
headers:
priority: high
team: media-processing
# Optional: Result tracking configuration
# Controls how gateway tracks envelope through pipeline
tracking:
# Enable SSE streaming for this route's envelopes
streamResults: true
# Timeout for pipeline execution
timeout: 5m
status:
# Validation results
conditions:
- type: ActorsReady
status: "True"
reason: AllActorsExist
message: "All 3 actors exist in namespace"
lastTransitionTime: "2025-11-11T10:30:00Z"
- type: ToolReady
status: "True"
reason: ToolRegistered
message: "MCP tool 'process_image' registered in gateway"
lastTransitionTime: "2025-11-11T10:30:15Z"
- type: Ready
status: "True"
reason: ToolHealthy
message: "Tool is ready for use"
lastTransitionTime: "2025-11-11T10:30:15Z"
# Per-actor validation status
actors:
- name: image-resizer
exists: true
ready: true
lastChecked: "2025-11-11T10:35:00Z"
- name: image-optimizer
exists: true
ready: true
lastChecked: "2025-11-11T10:35:00Z"
- name: thumbnail-generator
exists: true
ready: true
lastChecked: "2025-11-11T10:35:00Z"
# Gateway tool registration status
tool:
name: process_image
registered: true
gatewayPod: asya-gateway-7d4c5f8b9-x2j4k
lastUpdated: "2025-11-11T10:30:15Z"
# Summary metrics
summary:
totalActors: 3
readyActors: 3
toolRegistered: true
lastUpdated: "2025-11-11T10:35:00Z"Controller Architecture
New controller: ToolReconciler
Runs cluster-wide, performs namespace-scoped reconciliation.
Responsibilities:
- Watch Tool resources in all namespaces
- Validate that all actors in
spec.actorsexist in the namespace - Notify gateway to register/update MCP tool based on
spec.tool - Update Tool status with validation results and tool registration status
- Handle Tool deletion by unregistering tools from gateway
Does NOT:
- Create or manage AsyncActor resources
- Deploy infrastructure
- Track runtime envelope processing
- Implement health monitoring
Reconciliation Logic:
func (r *ToolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var tool asyav1alpha1.Tool
if err := r.Get(ctx, req.NamespacedName, &tool); err != nil {
if apierrors.IsNotFound(err) {
// Tool deleted - unregister tool from gateway
return ctrl.Result{}, r.unregisterTool(ctx, req.NamespacedName)
}
return ctrl.Result{}, err
}
// Validate actors exist
actorStatus := r.validateActors(ctx, tool.Namespace, tool.Spec.Actors)
// Check if all actors ready
allActorsReady := true
for _, status := range actorStatus {
if !status.Exists || !status.Ready {
allActorsReady = false
break
}
}
// Register/update tool in gateway if actors ready
var toolStatus ToolStatus
if allActorsReady && tool.Spec.Tool.Name != "" {
toolStatus = r.registerOrUpdateTool(ctx, &tool)
}
// Update status
r.updateToolStatus(ctx, &tool, actorStatus, toolStatus)
// Requeue for periodic validation
return ctrl.Result{RequeueAfter: 30 * time.Second}, nil
}
func (r *ToolReconciler) validateActors(ctx context.Context, namespace string, actorNames []string) []ActorStatus {
var statuses []ActorStatus
for _, name := range actorNames {
actor := &asyav1alpha1.AsyncActor{}
err := r.Get(ctx, client.ObjectKey{
Name: name, Namespace: namespace,
}, actor)
if err != nil {
statuses = append(statuses, ActorStatus{
Name: name,
Exists: false,
Ready: false,
Error: err.Error(),
})
continue
}
ready := meta.IsStatusConditionTrue(actor.Status.Conditions, "Ready")
statuses = append(statuses, ActorStatus{
Name: name,
Exists: true,
Ready: ready,
})
}
return statuses
}
func (r *ToolReconciler) registerOrUpdateTool(ctx context.Context, tool *asyav1alpha1.Tool) ToolStatus {
// Construct tool registration request
toolSpec := GatewayToolSpec{
Name: tool.Spec.Tool.Name,
Description: tool.Spec.Tool.Description,
InputSchema: tool.Spec.Tool.InputSchema,
Tool: ToolInfo{
Actors: tool.Spec.Actors,
Headers: tool.Spec.Headers,
},
Namespace: tool.Namespace,
}
// Send to gateway via HTTP API or shared ConfigMap
// (Implementation detail: gateway watches ConfigMap or provides registration endpoint)
err := r.gatewayClient.RegisterTool(ctx, toolSpec)
return ToolStatus{
Name: tool.Spec.Tool.Name,
Registered: err == nil,
Error: err,
}
}Gateway Integration
Gateway changes required:
-
Tool registration endpoint/ConfigMap watching
- Gateway watches ConfigMap containing Tool-generated tool specs
- Or: Gateway provides HTTP endpoint for tool registration (operator calls it)
-
Dynamic tool loading
- Gateway loads tool specs from ConfigMap at startup and on changes
- No need to restart gateway when Tools are created/updated
-
Tool execution
- When tool is called, gateway constructs envelope:
{ "id": "<generated-id>", "route": { "actors": ["actor1", "actor2", "actor3"], // From AsyncTool spec "current": 0 }, "headers": {...}, // From Tool spec.headers + tool invocation metadata "payload": {...} // From tool input, validated against inputSchema } - Submits envelope to first actor's queue (
asya-actor1) - Tracks envelope through pipeline if
tracking.streamResults: true
- When tool is called, gateway constructs envelope:
ConfigMap approach (simpler, no new gateway API):
apiVersion: v1
kind: ConfigMap
metadata:
name: asya-tools
namespace: production
data:
tools.json: |
[
{
"name": "process_image",
"description": "Resize, optimize, and generate thumbnails",
"inputSchema": {...},
"route": {
"actors": ["image-resizer", "image-optimizer", "thumbnail-generator"],
"headers": {"priority": "high"}
}
}
]Tool controller updates this ConfigMap, gateway watches for changes and reloads tools.
Usage Workflow
Step 1: Deploy actors
kubectl apply -f - <<EOF
apiVersion: asya.sh/v1alpha1
kind: AsyncActor
metadata:
name: image-resizer
namespace: production
spec:
image: myregistry/image-resizer:v1.0
transport: rabbitmq
---
apiVersion: asya.sh/v1alpha1
kind: AsyncActor
metadata:
name: image-optimizer
namespace: production
spec:
image: myregistry/image-optimizer:v1.0
transport: rabbitmq
---
apiVersion: asya.sh/v1alpha1
kind: AsyncActor
metadata:
name: thumbnail-generator
namespace: production
spec:
image: myregistry/thumbnail-generator:v1.0
transport: rabbitmq
EOFStep 2: Create Tool (declares pipeline, auto-creates tool)
kubectl apply -f - <<EOF
apiVersion: asya.sh/v1alpha1
kind: AsyncTool
metadata:
name: image-processing-pipeline
namespace: production
spec:
actors:
- image-resizer
- image-optimizer
- thumbnail-generator
tool:
name: process_image
description: "Resize, optimize, and generate thumbnails for uploaded images"
inputSchema:
type: object
required: [image_url]
properties:
image_url:
type: string
description: "URL of the image to process"
tracking:
streamResults: true
EOFStep 3: Verify Tool status
kubectl get tool image-processing-pipeline -n production
# NAME ACTORS READY TOOL AGE
# image-processing-pipeline 3/3 True process_image 30s
kubectl describe tool image-processing-pipeline -n production
# Status:
# Conditions:
# Type Status Reason Message
# ---- ------ ------ -------
# ActorsReady True AllActorsExist All 3 actors exist
# ToolReady True ToolRegistered Tool 'process_image' registered
# Ready True ToolHealthy Tool is readyStep 4: Use auto-generated MCP tool
# Tool appears automatically in MCP tools list
# No manual gateway configuration neededBenefits
For developers:
- ✅ Single YAML declares entire pipeline - no scattered configuration
- ✅ Tool auto-generated - no manual tools.yaml editing
- ✅ Input schema validated - prevents invalid tool calls
- ✅ Version control friendly - pipeline changes are Git diffs
- ✅ Self-documenting - Tool spec shows actors, schema, description
For operators/SREs:
- ✅
kubectl get toolsshows all pipelines at a glance - ✅ Tool status shows validation errors before runtime
- ✅ No gateway restarts needed - tools load dynamically
- ✅ Namespace isolation - teams manage their own tools
- ✅ Clear ownership - Tool lives with actors it chains
For platform teams:
- ✅ Reduces gateway configuration complexity
- ✅ Standardizes pipeline declaration across teams
- ✅ Enables discoverability - tools are Kubernetes resources
- ✅ Supports GitOps workflows - tools are declarative CRDs
- ✅ Optional adoption - existing tools.yaml configs still work
Implementation Phases
Phase 1: CRD Definition (2-3 hours)
- Define AsyncTool CRD schema
- Add validation rules (unique tool names, actor list non-empty, etc.)
- Generate CRD manifests with
make manifests - Write CRD documentation
Phase 2: Basic Controller (4-5 hours)
- Implement ToolReconciler
- Actor existence validation logic
- Status update logic
- Event emission for status changes
Phase 3: Gateway Integration - ConfigMap Approach (3-4 hours)
- Tool controller updates ConfigMap with tool specs
- Gateway watches ConfigMap and loads tools dynamically
- Tool execution: construct envelope from Tool spec
- Testing: verify tools work end-to-end
Phase 4: Tool Schema Validation (2-3 hours)
- Gateway validates tool input against
inputSchema - Return clear errors for invalid inputs
- Unit tests for schema validation
Phase 5: Tracking Integration (2-3 hours)
- Implement
tracking.streamResultsflag - Gateway tracks tool-based envelopes via SSE
- Add timeout support from
tracking.timeout
Phase 6: Documentation & Examples (2-3 hours)
- Example Tool manifests
- Migration guide from tools.yaml to Tools
- Best practices documentation
- Update architecture docs
Total effort: ~15-21 hours
Examples
Example 1: Simple Data Pipeline
apiVersion: asya.sh/v1alpha1
kind: AsyncTool
metadata:
name: etl-pipeline
namespace: analytics
spec:
actors:
- data-extractor
- data-transformer
- data-loader
tool:
name: run_etl
description: "Extract, transform, and load data from source to warehouse"
inputSchema:
type: object
required: [source_url, destination_table]
properties:
source_url:
type: string
description: "URL of data source"
destination_table:
type: string
description: "Target table name"
batch_size:
type: integer
default: 1000
tracking:
streamResults: true
timeout: 10mExample 2: ML Inference Pipeline
apiVersion: asya.sh/v1alpha1
kind: AsyncTool
metadata:
name: text-analysis-pipeline
namespace: ml-models
spec:
actors:
- text-preprocessor
- sentiment-classifier
- entity-extractor
- result-formatter
tool:
name: analyze_text
description: "Analyze text sentiment and extract named entities"
inputSchema:
type: object
required: [text]
properties:
text:
type: string
minLength: 1
maxLength: 10000
description: "Text to analyze"
language:
type: string
enum: [en, es, fr, de]
default: en
description: "Text language"
headers:
model_version: "v2.5"
team: ml-inference
tracking:
streamResults: true
timeout: 30sExample 3: Multi-Stage Video Processing
apiVersion: asya.sh/v1alpha1
kind: AsyncTool
metadata:
name: video-processing-pipeline
namespace: media
spec:
actors:
- video-downloader
- video-transcoder
- thumbnail-extractor
- quality-analyzer
- cdn-uploader
tool:
name: process_video
description: "Download, transcode, analyze, and upload video to CDN"
inputSchema:
type: object
required: [video_url, output_format]
properties:
video_url:
type: string
format: uri
description: "Source video URL"
output_format:
type: string
enum: [mp4, webm, hls]
description: "Target video format"
resolution:
type: string
enum: ["720p", "1080p", "4k"]
default: "1080p"
bitrate:
type: integer
description: "Target bitrate in kbps"
tracking:
streamResults: true
timeout: 30mAlternatives Considered
Alternative 1: Enhanced tools.yaml configuration
Approach: Extend gateway tools.yaml to support tool declarations inline.
Rejected because:
- Tools.yaml is not version-controlled with actors
- No validation that actors exist
- No Kubernetes-native discoverability
- Requires gateway restart for changes
- Doesn't integrate with kubectl/GitOps workflows
Alternative 2: Annotation-based pipeline declaration
Approach: Use AsyncActor annotations to declare "next actor" in pipeline.
metadata:
annotations:
asya.sh/next-actor: image-optimizerRejected because:
- No central view of complete pipeline
- Hard to trace multi-actor flows
- Circular dependency detection is complex
- Tool generation requires scanning all actors
- Unclear ownership of pipeline definition
Alternative 3: Gateway-managed tools via API
Approach: Gateway provides REST API to create tools, stores them internally.
Rejected because:
- Not Kubernetes-native (no kubectl integration)
- State management complexity in gateway
- Doesn't integrate with GitOps workflows
- No validation that actors exist
- Scaling/HA challenges for stateful gateway
Alternative 4: Extend AsyncActor CRD with tools field
Approach: Add spec.tools to AsyncActor to declare pipelines it participates in.
Rejected because:
- Tool is multi-actor concept, shouldn't live in single actor
- Duplicated configuration across multiple actors
- Unclear which actor "owns" the tool
- Complicates AsyncActor API surface
Migration Strategy
Tools are fully optional and coexist with existing tools.yaml configuration:
Existing deployments (tools.yaml only):
- Continue working exactly as before
- Gateway loads tools from
config/tools.yaml - No changes required
New deployments (Tools):
- Create AsyncTool CRDs for declarative pipelines
- Gateway loads tools from Tools ConfigMap
- Use kubectl to manage pipelines
Hybrid approach (both tools.yaml and Tools):
- Gateway loads tools from both sources
- Tool name conflicts: Tool-generated tools take precedence (configurable)
- Gradual migration: create Tools for new pipelines, keep existing tools.yaml
Migration path:
- Deploy Tool controller alongside operator
- Existing tools.yaml configs continue working
- New pipelines use Tool CRDs
- Gradually migrate tools.yaml entries to Tools
- Eventually deprecate tools.yaml (optional, not required)
Future Enhancements
Conditional routing (beyond Phase 1):
spec:
actors:
- classifier
conditionalTools:
- condition: "payload.category == 'urgent'"
actors: [urgent-processor, notifier]
- condition: "payload.category == 'standard'"
actors: [standard-processor]Tool composition (nested tools):
spec:
actors:
- preprocessor
- tool://image-processing-pipeline # Reference another Tool
- postprocessorRate limiting per tool:
spec:
rateLimit:
requestsPerSecond: 100
burstSize: 20Tool-level autoscaling hints:
spec:
autoscaling:
suggestedMinReplicas: 2
suggestedMaxReplicas: 10Open Questions
-
Tool name conflicts: What happens if two Tools in same namespace declare same tool name?
- Recommendation: Validation webhook rejects duplicate tool names in same namespace
-
Tool updates: How does gateway handle Tool updates (actor list changes, schema changes)?
- Recommendation: Controller updates ConfigMap, gateway reloads tools, in-flight envelopes complete with old tool
-
Tool execution priority: Do Tool-generated tools take precedence over tools.yaml tools?
- Recommendation: Yes, with configurable flag
--prefer-tool-tools=true(default true)
- Recommendation: Yes, with configurable flag
-
Cross-namespace tools: Should Tools reference actors in different namespaces?
- Recommendation: No, keep Tools namespace-scoped for Phase 1 (future enhancement if needed)
-
Tool deletion: What happens to in-flight envelopes when Tool is deleted?
- Recommendation: Tool unregistered immediately, in-flight envelopes complete normally (actors still exist)
References
- AsyncActor CRD:
src/asya-operator/api/v1alpha1/asyncactor_types.go - Envelope protocol:
CLAUDE.mdsection "Envelope Protocol" - Gateway tool configuration:
src/asya-gateway/config/README.md - MCP protocol: Gateway MCP implementation
Next Steps
- Review proposal with maintainers
- Get feedback on Tool API design and gateway integration approach
- Decide between ConfigMap vs HTTP API for tool registration
- Create GitHub issue for implementation tracking
- Break down implementation into subtasks
- Assign to milestone (likely v0.3.0)