-
Notifications
You must be signed in to change notification settings - Fork 14
Integration: Add taskCompletions source type for reactive inter-spawner workflow compositionΒ #835
Description
π€ Kelos Strategist Agent @gjkim42
Area: Integration Opportunities
Summary
Kelos TaskSpawners currently operate as independent, isolated event consumers. Each spawner watches one external source (GitHub, Jira, cron) and creates tasks β but spawners cannot react to each other's task completions. There is no internal event bus that connects spawner outputs to spawner inputs.
This proposal adds a taskCompletions source type that makes a TaskSpawner reactive to tasks completed by other spawners, enabling composable multi-spawner workflows without external infrastructure.
Problem
1. No way to chain spawners together
The spawner cycle (cmd/kelos-spawner/main.go:205-463) discovers items from exactly one external source, creates tasks, and loops. When a task produces outputs (a PR URL, a branch, structured results via Task.Status.Results), there is no mechanism for another spawner to react to those outputs.
Real workflows need this chaining:
- Security review: Any spawner's completed PR β security scanning agent reviews the diff
- Multi-agent collaboration: "planner" spawner produces a plan β "implementer" spawner picks it up β "reviewer" spawner validates the result
- Failure escalation: Failed tasks from any spawner β diagnostic agent investigates root cause and files a report
- Post-merge automation: Succeeded PR-creating tasks β deployment or changelog agent runs
2. Existing proposals don't close this gap
| Proposal | What it does | Why it doesn't solve this |
|---|---|---|
| #749 (onCompletion hooks) | Sends outbound HTTP notifications | Notifies external systems, doesn't spawn new tasks |
| #687 (webhook source) | Receives inbound HTTP from external systems | Requires standing up webhook endpoints; no native task-to-task awareness |
| #710 (taskTemplates plural) | Spawns a static pipeline per work item | All steps are pre-defined in one spawner; no cross-spawner composition |
| #747 (conditional dependencies) | Result-based branching within a pipeline | Scoped to a single spawner's pipeline |
| #829 (parent-child tasks) | Agent-initiated sub-task spawning at runtime | Requires MCP server; agent decides at runtime, not declarative |
Combining #749 + #687 could theoretically create a chain (task completes β webhook notification β webhook source triggers new spawner), but this requires:
- Both features to be implemented
- External webhook endpoint infrastructure or an in-cluster receiver
- Manual wiring of webhook URLs, HMAC secrets, and field mappings
- No type safety β arbitrary JSON payloads instead of native Task data
A native taskCompletions source eliminates all of this overhead.
3. Kelos's own self-development demonstrates the need
Kelos uses 7+ separate TaskSpawners (workers, reviewer, PR responder, config updater, etc.) that operate independently. Today, coordination between them relies on GitHub as an intermediary β a worker creates a PR, a reviewer discovers the PR via githubPullRequests source. This only works because GitHub is the shared state. For non-GitHub workflows (Jira, cron, future sources), there is no shared intermediary.
Proposed API
Add a TaskCompletions field to the When struct in api/v1alpha1/taskspawner_types.go:
// TaskCompletions discovers completed tasks from other spawners as work items.
type TaskCompletions struct {
// SpawnerSelector identifies which spawners' tasks to watch.
// When empty, watches all tasks in the namespace regardless of spawner.
// +optional
SpawnerSelector *TaskCompletionsSelector `json:"spawnerSelector,omitempty"`
// Phases specifies which terminal phases to discover.
// Defaults to ["Succeeded"].
// +kubebuilder:validation:Items:Enum=Succeeded;Failed
// +kubebuilder:default={"Succeeded"}
// +optional
Phases []TaskPhase `json:"phases,omitempty"`
// RequiredResults filters to only tasks that have all specified keys
// in their status.results map. Useful for gating on tasks that
// produced specific outputs (e.g., a PR URL).
// +optional
RequiredResults []string `json:"requiredResults,omitempty"`
// LabelSelector filters tasks by Kubernetes labels.
// +optional
LabelSelector map[string]string `json:"labelSelector,omitempty"`
// PollInterval overrides spec.pollInterval for this source.
// +optional
PollInterval string `json:"pollInterval,omitempty"`
}
// TaskCompletionsSelector identifies task sources.
type TaskCompletionsSelector struct {
// Names lists specific TaskSpawner names whose tasks are watched.
// +optional
Names []string `json:"names,omitempty"`
}Added to When:
type When struct {
// ... existing fields ...
// TaskCompletions discovers completed tasks from other spawners.
// +optional
TaskCompletions *TaskCompletions `json:"taskCompletions,omitempty"`
}Template Variables
The taskCompletions source maps completed Task data to WorkItem fields:
| Variable | Source | Example |
|---|---|---|
{{.ID}} |
Task name | "code-worker-42" |
{{.Title}} |
Task prompt (first 100 chars) | "Fix the login bug..." |
{{.Body}} |
Full task prompt | Full text |
{{.Kind}} |
Fixed string | "TaskCompletion" |
{{.Labels}} |
Task labels (comma-separated) | "kelos.dev/taskspawner=code-worker" |
{{.URL}} |
First PR URL from outputs (if any) | "https://github.com/org/repo/pull/123" |
{{.Branch}} |
Branch from results | "kelos-task-42" |
{{.Results}} |
Full results map (new field on WorkItem) | {"branch":"...", "pr":"...", "cost-usd":"0.12"} |
Example: Security Review Pipeline
# Step 1: Workers pick up issues and create PRs
apiVersion: kelos.dev/v1alpha1
kind: TaskSpawner
metadata:
name: code-worker
spec:
when:
githubIssues:
labels: ["agent"]
commentPolicy:
triggerComment: "/kelos pick-up"
taskTemplate:
type: claude-code
workspaceRef:
name: my-repo
credentials:
type: oauth
secretRef:
name: claude-creds
branch: "kelos-task-{{.Number}}"
promptTemplate: "Fix issue #{{.Number}}: {{.Title}}\n{{.Body}}"
ttlSecondsAfterFinished: 3600
---
# Step 2: Security reviewer reacts to successful worker tasks that produced PRs
apiVersion: kelos.dev/v1alpha1
kind: TaskSpawner
metadata:
name: security-reviewer
spec:
when:
taskCompletions:
spawnerSelector:
names: ["code-worker"]
phases: ["Succeeded"]
requiredResults: ["pr"]
maxConcurrency: 2
taskTemplate:
type: claude-code
workspaceRef:
name: my-repo
credentials:
type: oauth
secretRef:
name: claude-creds
branch: "{{.Branch}}"
promptTemplate: |
You are a security reviewer. Review the changes on branch {{.Branch}}
for the PR at {{.URL}}.
Check for: SQL injection, XSS, command injection, secrets in code,
and OWASP top-10 vulnerabilities. Post your findings as a PR review.
ttlSecondsAfterFinished: 1800Example: Failure Diagnostician
apiVersion: kelos.dev/v1alpha1
kind: TaskSpawner
metadata:
name: failure-diagnostician
spec:
when:
taskCompletions:
phases: ["Failed"]
labelSelector:
team: backend
maxConcurrency: 1
taskTemplate:
type: claude-code
workspaceRef:
name: my-repo
credentials:
type: oauth
secretRef:
name: claude-creds
promptTemplate: |
A Kelos agent task failed. Investigate the failure and file a diagnostic report.
Failed task: {{.ID}}
Original prompt (truncated): {{.Title}}
Task labels: {{.Labels}}
Check the git state, review recent changes, and determine:
1. What went wrong
2. Whether this is a flaky issue or a real problem
3. Suggested remediation steps
Create a GitHub issue with your findings.Implementation Path
Phase 1: Source implementation (minimal, no CRD change needed for prototype)
The Source interface (internal/source/source.go:37-39) requires only Discover(ctx) ([]WorkItem, error). A TaskCompletionsSource would:
- Accept a Kubernetes client (unlike other sources that use HTTP clients)
- List Tasks matching the selector criteria:
var taskList kelosv1alpha1.TaskList cl.List(ctx, &taskList, client.InNamespace(namespace), client.MatchingLabels(selector), )
- Filter by phase and required results
- Map each matching Task to a
WorkItem
The spawner already has a Kubernetes client (cmd/kelos-spawner/main.go:85-89) and already lists tasks in its cycle (main.go:254-260). The buildSource function (main.go:590-681) would add a new branch for TaskCompletions.
Phase 2: CRD extension
Add TaskCompletions to the When struct. This is additive and backward-compatible β existing spawners are unaffected.
Phase 3: Deduplication
The existing spawner dedup logic (taskName = fmt.Sprintf("%s-%s", ts.Name, item.ID) at main.go:274) already prevents duplicate tasks per work item. For taskCompletions, the item ID would be the source task's name, so a completed task only triggers one downstream task.
Retriggering would work naturally: if a source task is deleted (TTL cleanup) and re-created (retrigger), the new completion produces a new item ID via the new task name.
Phase 4: Deployment model
Unlike GitHub/Jira sources that need external API access, taskCompletions only needs cluster-internal access. The spawner Deployment already has RBAC to list Tasks (internal/controller/taskspawner_controller.go sets up ServiceAccount + RoleBinding). No additional secrets, tokens, or sidecars are needed.
Why This Matters
This is the missing "reactive glue" between spawners. Today Kelos can fan-out (one spawner, many tasks) but not fan-in or chain (spawner A's outputs feed spawner B's inputs). With taskCompletions:
- Composable workflows: Teams can assemble complex multi-agent pipelines by connecting independent spawners, each owned by different teams
- Cross-concern automation: A security team deploys a "security-reviewer" spawner that automatically reviews every PR any agent creates, without modifying the original spawners
- Zero infrastructure: No webhooks, no message queues, no external services β purely Kubernetes-native, using the same API the spawner already accesses
- Incremental adoption: Existing spawners continue working unchanged;
taskCompletionsspawners are added alongside them
/kind feature