Skip to content

Commit 3ed3a9a

Browse files
gjkim42claude
andcommitted
Decouple Task activity from source discovery in spawner
Remove the Task watch from the main spawner reconciler that triggered full source discovery (GitHub/Jira API calls) on every Task phase change, deletion timestamp change, or delete event. Add a separate lightweight task-activity controller that watches Task events and only recomputes status.activeTasks and runs per-Task GitHub reporting when enabled, without calling source discovery. The next GitHub/Jira item is only considered on the next scheduled poll. This eliminates 100% of GitHub/Jira requests caused by Task-driven rediscovery, which could amount to thousands of unnecessary API calls per hour in busy environments. Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent db6028c commit 3ed3a9a

3 files changed

Lines changed: 323 additions & 15 deletions

File tree

cmd/kelos-spawner/main.go

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,16 @@ func main() {
136136
Key: key,
137137
Config: cfgArgs,
138138
}).SetupWithManager(mgr); err != nil {
139-
log.Error(err, "Unable to create controller")
139+
log.Error(err, "Unable to create spawner controller")
140+
os.Exit(1)
141+
}
142+
143+
if err := (&taskActivityReconciler{
144+
Client: cl,
145+
Key: key,
146+
Config: cfgArgs,
147+
}).SetupWithManager(mgr); err != nil {
148+
log.Error(err, "Unable to create task activity controller")
140149
os.Exit(1)
141150
}
142151

@@ -168,6 +177,80 @@ func runReportingCycle(ctx context.Context, cl client.Client, key types.Namespac
168177
return nil
169178
}
170179

180+
// handleTaskActivity is the lightweight reconcile path triggered by Task
181+
// phase changes, deletion timestamp changes, and deletes. It recomputes
182+
// status.activeTasks on the owning TaskSpawner and runs per-Task GitHub
183+
// reporting when enabled, without calling source discovery.
184+
func handleTaskActivity(ctx context.Context, cl client.Client, key types.NamespacedName, cfg spawnerRuntimeConfig) error {
185+
log := ctrl.Log.WithName("spawner")
186+
187+
var ts kelosv1alpha1.TaskSpawner
188+
if err := cl.Get(ctx, key, &ts); err != nil {
189+
if apierrors.IsNotFound(err) {
190+
return nil
191+
}
192+
return fmt.Errorf("fetching TaskSpawner for task activity: %w", err)
193+
}
194+
195+
// List all Tasks owned by this TaskSpawner to recompute activeTasks.
196+
var taskList kelosv1alpha1.TaskList
197+
if err := cl.List(ctx, &taskList,
198+
client.InNamespace(key.Namespace),
199+
client.MatchingLabels{"kelos.dev/taskspawner": key.Name},
200+
); err != nil {
201+
return fmt.Errorf("listing tasks for activity update: %w", err)
202+
}
203+
204+
activeTasks := 0
205+
for i := range taskList.Items {
206+
t := &taskList.Items[i]
207+
if t.Status.Phase != kelosv1alpha1.TaskPhaseSucceeded && t.Status.Phase != kelosv1alpha1.TaskPhaseFailed {
208+
activeTasks++
209+
}
210+
}
211+
212+
// Update status.activeTasks if it changed.
213+
if ts.Status.ActiveTasks != activeTasks {
214+
// Re-fetch for latest resource version before status update.
215+
if err := cl.Get(ctx, key, &ts); err != nil {
216+
return fmt.Errorf("re-fetching TaskSpawner for active tasks update: %w", err)
217+
}
218+
ts.Status.ActiveTasks = activeTasks
219+
if err := cl.Status().Update(ctx, &ts); err != nil {
220+
return fmt.Errorf("updating TaskSpawner activeTasks: %w", err)
221+
}
222+
log.Info("Updated active tasks count", "activeTasks", activeTasks)
223+
}
224+
225+
// Run per-Task reporting when enabled.
226+
if reportingEnabled(&ts) {
227+
token, err := readGitHubToken(cfg.GitHubTokenFile)
228+
if err != nil {
229+
return fmt.Errorf("reading GitHub token for task activity reporting: %w", err)
230+
}
231+
232+
reporter := &reporting.TaskReporter{
233+
Client: cl,
234+
Reporter: &reporting.GitHubReporter{
235+
Owner: cfg.GitHubOwner,
236+
Repo: cfg.GitHubRepo,
237+
Token: token,
238+
TokenFile: cfg.GitHubTokenFile,
239+
BaseURL: cfg.GitHubAPIBaseURL,
240+
Client: cfg.HTTPClient,
241+
},
242+
}
243+
244+
for i := range taskList.Items {
245+
if err := reporter.ReportTaskStatus(ctx, &taskList.Items[i]); err != nil {
246+
log.Error(err, "Reporting task status during activity update", "task", taskList.Items[i].Name)
247+
}
248+
}
249+
}
250+
251+
return nil
252+
}
253+
171254
func runCycle(ctx context.Context, cl client.Client, key types.NamespacedName, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL string, httpClient *http.Client) error {
172255
start := time.Now()
173256
err := runCycleCore(ctx, cl, key, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, httpClient)

cmd/kelos-spawner/main_test.go

Lines changed: 211 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2246,9 +2246,9 @@ func TestSpawnerReconcilerTaskSpawnerPredicate(t *testing.T) {
22462246
}
22472247
}
22482248

2249-
func TestSpawnerReconcilerTaskPredicate(t *testing.T) {
2249+
func TestTaskActivityReconcilerTaskPredicate(t *testing.T) {
22502250
key := types.NamespacedName{Name: "spawner", Namespace: "default"}
2251-
r := &spawnerReconciler{Key: key}
2251+
r := &taskActivityReconciler{Key: key}
22522252
p := r.taskPredicate()
22532253

22542254
base := newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhasePending)
@@ -2279,9 +2279,9 @@ func TestSpawnerReconcilerTaskPredicate(t *testing.T) {
22792279
}
22802280
}
22812281

2282-
func TestSpawnerReconcilerRequestsForTask(t *testing.T) {
2282+
func TestTaskActivityReconcilerRequestsForTask(t *testing.T) {
22832283
key := types.NamespacedName{Name: "spawner", Namespace: "default"}
2284-
r := &spawnerReconciler{Key: key}
2284+
r := &taskActivityReconciler{Key: key}
22852285

22862286
task := newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhasePending)
22872287
requests := r.requestsForTask(context.Background(), task.DeepCopy())
@@ -2293,9 +2293,9 @@ func TestSpawnerReconcilerRequestsForTask(t *testing.T) {
22932293
}
22942294

22952295
other := newTask("other-1", "default", "other", kelosv1alpha1.TaskPhasePending)
2296-
requests = r.requestsForTask(context.Background(), other.DeepCopy())
2297-
if len(requests) != 0 {
2298-
t.Fatalf("Expected no requests for non-matching task, got %d", len(requests))
2296+
otherRequests := r.requestsForTask(context.Background(), other.DeepCopy())
2297+
if len(otherRequests) != 0 {
2298+
t.Fatalf("Expected no requests for non-matching task, got %d", len(otherRequests))
22992299
}
23002300
}
23012301

@@ -2415,3 +2415,207 @@ func TestRunOnce_ReturnsSourcePollInterval(t *testing.T) {
24152415
t.Fatalf("Interval = %v, want %v", interval, 15*time.Second)
24162416
}
24172417
}
2418+
2419+
func TestHandleTaskActivity_UpdatesActiveTasksCount(t *testing.T) {
2420+
ts := newTaskSpawner("spawner", "default", nil)
2421+
ts.Status.ActiveTasks = 3 // stale value
2422+
2423+
existingTasks := []kelosv1alpha1.Task{
2424+
newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseRunning),
2425+
newTask("spawner-2", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded),
2426+
newTask("spawner-3", "default", "spawner", kelosv1alpha1.TaskPhasePending),
2427+
}
2428+
cl, key := setupTest(t, ts, existingTasks...)
2429+
2430+
if err := handleTaskActivity(context.Background(), cl, key, spawnerRuntimeConfig{}); err != nil {
2431+
t.Fatalf("Unexpected error: %v", err)
2432+
}
2433+
2434+
var updated kelosv1alpha1.TaskSpawner
2435+
if err := cl.Get(context.Background(), key, &updated); err != nil {
2436+
t.Fatalf("Getting TaskSpawner: %v", err)
2437+
}
2438+
// 1 running + 1 pending = 2 active (succeeded is excluded)
2439+
if updated.Status.ActiveTasks != 2 {
2440+
t.Errorf("Expected activeTasks=2, got %d", updated.Status.ActiveTasks)
2441+
}
2442+
}
2443+
2444+
func TestHandleTaskActivity_NoUpdateWhenCountUnchanged(t *testing.T) {
2445+
ts := newTaskSpawner("spawner", "default", nil)
2446+
ts.Status.ActiveTasks = 1
2447+
2448+
task1 := newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseRunning)
2449+
task2 := newTask("spawner-2", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded)
2450+
2451+
// Track status updates via interceptor
2452+
updateCalled := false
2453+
cl := fake.NewClientBuilder().
2454+
WithScheme(newTestScheme()).
2455+
WithObjects(ts, &task1, &task2).
2456+
WithStatusSubresource(ts).
2457+
WithInterceptorFuncs(interceptor.Funcs{
2458+
SubResourceUpdate: func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error {
2459+
if subResourceName == "status" {
2460+
if _, ok := obj.(*kelosv1alpha1.TaskSpawner); ok {
2461+
updateCalled = true
2462+
}
2463+
}
2464+
return nil
2465+
},
2466+
}).
2467+
Build()
2468+
key := types.NamespacedName{Name: ts.Name, Namespace: ts.Namespace}
2469+
2470+
if err := handleTaskActivity(context.Background(), cl, key, spawnerRuntimeConfig{}); err != nil {
2471+
t.Fatalf("Unexpected error: %v", err)
2472+
}
2473+
2474+
if updateCalled {
2475+
t.Error("Expected no status update when activeTasks count is unchanged")
2476+
}
2477+
}
2478+
2479+
func TestHandleTaskActivity_RunsReportingWhenEnabled(t *testing.T) {
2480+
ts := newTaskSpawner("spawner", "default", nil)
2481+
ts.Spec.When.GitHubIssues.Reporting = &kelosv1alpha1.GitHubReporting{Enabled: true}
2482+
2483+
task := kelosv1alpha1.Task{
2484+
ObjectMeta: metav1.ObjectMeta{
2485+
Name: "spawner-1",
2486+
Namespace: "default",
2487+
Labels: map[string]string{
2488+
"kelos.dev/taskspawner": "spawner",
2489+
},
2490+
Annotations: map[string]string{
2491+
reporting.AnnotationGitHubReporting: "enabled",
2492+
reporting.AnnotationSourceNumber: "42",
2493+
reporting.AnnotationSourceKind: "issue",
2494+
},
2495+
},
2496+
Spec: kelosv1alpha1.TaskSpec{
2497+
Type: "claude-code",
2498+
Prompt: "test",
2499+
Credentials: kelosv1alpha1.Credentials{
2500+
Type: kelosv1alpha1.CredentialTypeOAuth,
2501+
SecretRef: &kelosv1alpha1.SecretReference{Name: "creds"},
2502+
},
2503+
},
2504+
Status: kelosv1alpha1.TaskStatus{
2505+
Phase: kelosv1alpha1.TaskPhasePending,
2506+
},
2507+
}
2508+
2509+
cl, key := setupTest(t, ts, task)
2510+
2511+
apiCalled := false
2512+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2513+
apiCalled = true
2514+
w.WriteHeader(http.StatusCreated)
2515+
json.NewEncoder(w).Encode(map[string]int64{"id": 999})
2516+
}))
2517+
defer server.Close()
2518+
2519+
cfg := spawnerRuntimeConfig{
2520+
GitHubOwner: "owner",
2521+
GitHubRepo: "repo",
2522+
GitHubAPIBaseURL: server.URL,
2523+
}
2524+
2525+
if err := handleTaskActivity(context.Background(), cl, key, cfg); err != nil {
2526+
t.Fatalf("Unexpected error: %v", err)
2527+
}
2528+
2529+
if !apiCalled {
2530+
t.Error("Expected GitHub API to be called for reporting")
2531+
}
2532+
2533+
// Verify annotations were updated on the task
2534+
var updated kelosv1alpha1.Task
2535+
if err := cl.Get(context.Background(), client.ObjectKeyFromObject(&task), &updated); err != nil {
2536+
t.Fatalf("Getting updated task: %v", err)
2537+
}
2538+
if updated.Annotations[reporting.AnnotationGitHubReportPhase] != "accepted" {
2539+
t.Errorf("Expected report phase 'accepted', got %q", updated.Annotations[reporting.AnnotationGitHubReportPhase])
2540+
}
2541+
}
2542+
2543+
func TestHandleTaskActivity_SkipsReportingWhenDisabled(t *testing.T) {
2544+
ts := newTaskSpawner("spawner", "default", nil)
2545+
// Reporting not enabled (default)
2546+
2547+
task := newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseRunning)
2548+
cl, key := setupTest(t, ts, task)
2549+
2550+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2551+
t.Error("GitHub API should not be called when reporting is disabled")
2552+
w.WriteHeader(http.StatusOK)
2553+
}))
2554+
defer server.Close()
2555+
2556+
cfg := spawnerRuntimeConfig{
2557+
GitHubOwner: "owner",
2558+
GitHubRepo: "repo",
2559+
GitHubAPIBaseURL: server.URL,
2560+
}
2561+
2562+
if err := handleTaskActivity(context.Background(), cl, key, cfg); err != nil {
2563+
t.Fatalf("Unexpected error: %v", err)
2564+
}
2565+
}
2566+
2567+
func TestHandleTaskActivity_DoesNotTriggerDiscovery(t *testing.T) {
2568+
ts := newTaskSpawner("spawner", "default", nil)
2569+
existingTasks := []kelosv1alpha1.Task{
2570+
newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded),
2571+
}
2572+
cl, key := setupTest(t, ts, existingTasks...)
2573+
2574+
// Record discovery metric before
2575+
beforeDiscovery := testutil.ToFloat64(discoveryTotal)
2576+
2577+
if err := handleTaskActivity(context.Background(), cl, key, spawnerRuntimeConfig{}); err != nil {
2578+
t.Fatalf("Unexpected error: %v", err)
2579+
}
2580+
2581+
// Discovery metric should not have changed
2582+
afterDiscovery := testutil.ToFloat64(discoveryTotal)
2583+
if afterDiscovery != beforeDiscovery {
2584+
t.Errorf("Expected no discovery cycles to run, but discoveryTotal changed from %v to %v", beforeDiscovery, afterDiscovery)
2585+
}
2586+
}
2587+
2588+
func TestHandleTaskActivity_TaskSpawnerNotFound(t *testing.T) {
2589+
// TaskSpawner does not exist - should return nil (no error)
2590+
cl := fake.NewClientBuilder().
2591+
WithScheme(newTestScheme()).
2592+
Build()
2593+
key := types.NamespacedName{Name: "nonexistent", Namespace: "default"}
2594+
2595+
if err := handleTaskActivity(context.Background(), cl, key, spawnerRuntimeConfig{}); err != nil {
2596+
t.Fatalf("Expected no error for missing TaskSpawner, got: %v", err)
2597+
}
2598+
}
2599+
2600+
func TestHandleTaskActivity_AllTasksTerminal(t *testing.T) {
2601+
ts := newTaskSpawner("spawner", "default", nil)
2602+
ts.Status.ActiveTasks = 2 // stale value
2603+
2604+
existingTasks := []kelosv1alpha1.Task{
2605+
newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded),
2606+
newTask("spawner-2", "default", "spawner", kelosv1alpha1.TaskPhaseFailed),
2607+
}
2608+
cl, key := setupTest(t, ts, existingTasks...)
2609+
2610+
if err := handleTaskActivity(context.Background(), cl, key, spawnerRuntimeConfig{}); err != nil {
2611+
t.Fatalf("Unexpected error: %v", err)
2612+
}
2613+
2614+
var updated kelosv1alpha1.TaskSpawner
2615+
if err := cl.Get(context.Background(), key, &updated); err != nil {
2616+
t.Fatalf("Getting TaskSpawner: %v", err)
2617+
}
2618+
if updated.Status.ActiveTasks != 0 {
2619+
t.Errorf("Expected activeTasks=0 when all tasks are terminal, got %d", updated.Status.ActiveTasks)
2620+
}
2621+
}

cmd/kelos-spawner/reconciler.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ func (r *spawnerReconciler) SetupWithManager(mgr ctrl.Manager) error {
5959
Named("taskspawner-loop").
6060
WithOptions(controller.Options{MaxConcurrentReconciles: 1}).
6161
For(&kelosv1alpha1.TaskSpawner{}, builder.WithPredicates(r.taskSpawnerPredicate())).
62-
Watches(
63-
&kelosv1alpha1.Task{},
64-
handler.EnqueueRequestsFromMapFunc(r.requestsForTask),
65-
builder.WithPredicates(r.taskPredicate()),
66-
).
6762
Complete(r)
6863
}
6964

@@ -121,7 +116,33 @@ func resolvedPollInterval(ts *kelosv1alpha1.TaskSpawner) time.Duration {
121116
return parsePollInterval(ts.Spec.PollInterval)
122117
}
123118

124-
func (r *spawnerReconciler) requestsForTask(_ context.Context, obj client.Object) []reconcile.Request {
119+
// taskActivityReconciler handles Task phase changes, deletion timestamp
120+
// changes, and deletes for the single managed TaskSpawner. It recomputes
121+
// status.activeTasks and runs per-Task GitHub reporting without triggering
122+
// source discovery.
123+
type taskActivityReconciler struct {
124+
client.Client
125+
Key types.NamespacedName
126+
Config spawnerRuntimeConfig
127+
}
128+
129+
func (r *taskActivityReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) {
130+
return ctrl.Result{}, handleTaskActivity(ctx, r.Client, r.Key, r.Config)
131+
}
132+
133+
func (r *taskActivityReconciler) SetupWithManager(mgr ctrl.Manager) error {
134+
return ctrl.NewControllerManagedBy(mgr).
135+
Named("task-activity").
136+
WithOptions(controller.Options{MaxConcurrentReconciles: 1}).
137+
Watches(
138+
&kelosv1alpha1.Task{},
139+
handler.EnqueueRequestsFromMapFunc(r.requestsForTask),
140+
builder.WithPredicates(r.taskPredicate()),
141+
).
142+
Complete(r)
143+
}
144+
145+
func (r *taskActivityReconciler) requestsForTask(_ context.Context, obj client.Object) []reconcile.Request {
125146
if !matchesSpawnerTask(obj, r.Key) {
126147
return nil
127148
}
@@ -148,7 +169,7 @@ func (r *spawnerReconciler) taskSpawnerPredicate() predicate.Predicate {
148169
}
149170
}
150171

151-
func (r *spawnerReconciler) taskPredicate() predicate.Predicate {
172+
func (r *taskActivityReconciler) taskPredicate() predicate.Predicate {
152173
return predicate.Funcs{
153174
CreateFunc: func(event.CreateEvent) bool {
154175
return false

0 commit comments

Comments
 (0)