diff --git a/cmd/kelos-spawner/main.go b/cmd/kelos-spawner/main.go index 0f88eb2b..3d413b2d 100644 --- a/cmd/kelos-spawner/main.go +++ b/cmd/kelos-spawner/main.go @@ -136,7 +136,16 @@ func main() { Key: key, Config: cfgArgs, }).SetupWithManager(mgr); err != nil { - log.Error(err, "Unable to create controller") + log.Error(err, "Unable to create spawner controller") + os.Exit(1) + } + + if err := (&taskActivityReconciler{ + Client: cl, + Key: key, + Config: cfgArgs, + }).SetupWithManager(mgr); err != nil { + log.Error(err, "Unable to create task activity controller") os.Exit(1) } @@ -146,25 +155,77 @@ func main() { } } -// runReportingCycle lists all Tasks owned by the given TaskSpawner and runs -// reporting for each one that has GitHub reporting enabled. Running this -// in the same goroutine as the discovery loop avoids races between Task -// creation/deletion and annotation patching. -func runReportingCycle(ctx context.Context, cl client.Client, key types.NamespacedName, reporter *reporting.TaskReporter) error { +// handleTaskActivity is the lightweight reconcile path triggered by Task +// phase changes, deletion timestamp changes, and deletes. It recomputes +// status.activeTasks on the owning TaskSpawner and runs per-Task GitHub +// reporting when enabled, without calling source discovery. +func handleTaskActivity(ctx context.Context, cl client.Client, key types.NamespacedName, cfg spawnerRuntimeConfig) error { + log := ctrl.Log.WithName("spawner") + + var ts kelosv1alpha1.TaskSpawner + if err := cl.Get(ctx, key, &ts); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return fmt.Errorf("fetching TaskSpawner for task activity: %w", err) + } + + // List all Tasks owned by this TaskSpawner to recompute activeTasks. var taskList kelosv1alpha1.TaskList if err := cl.List(ctx, &taskList, client.InNamespace(key.Namespace), client.MatchingLabels{"kelos.dev/taskspawner": key.Name}, ); err != nil { - return fmt.Errorf("listing tasks for reporting: %w", err) + return fmt.Errorf("listing tasks for activity update: %w", err) } + activeTasks := 0 for i := range taskList.Items { - if err := reporter.ReportTaskStatus(ctx, &taskList.Items[i]); err != nil { - ctrl.Log.WithName("spawner").Error(err, "Reporting task status", "task", taskList.Items[i].Name) - // Continue with remaining tasks rather than aborting the cycle + t := &taskList.Items[i] + if t.Status.Phase != kelosv1alpha1.TaskPhaseSucceeded && t.Status.Phase != kelosv1alpha1.TaskPhaseFailed { + activeTasks++ + } + } + + // Update status.activeTasks if it changed. + if ts.Status.ActiveTasks != activeTasks { + // Re-fetch for latest resource version before status update. + if err := cl.Get(ctx, key, &ts); err != nil { + return fmt.Errorf("re-fetching TaskSpawner for active tasks update: %w", err) + } + ts.Status.ActiveTasks = activeTasks + if err := cl.Status().Update(ctx, &ts); err != nil { + return fmt.Errorf("updating TaskSpawner activeTasks: %w", err) + } + log.Info("Updated active tasks count", "activeTasks", activeTasks) + } + + // Run per-Task reporting when enabled. + if reportingEnabled(&ts) { + token, err := readGitHubToken(cfg.GitHubTokenFile) + if err != nil { + return fmt.Errorf("reading GitHub token for task activity reporting: %w", err) + } + + reporter := &reporting.TaskReporter{ + Client: cl, + Reporter: &reporting.GitHubReporter{ + Owner: cfg.GitHubOwner, + Repo: cfg.GitHubRepo, + Token: token, + TokenFile: cfg.GitHubTokenFile, + BaseURL: cfg.GitHubAPIBaseURL, + Client: cfg.HTTPClient, + }, + } + + for i := range taskList.Items { + if err := reporter.ReportTaskStatus(ctx, &taskList.Items[i]); err != nil { + log.Error(err, "Reporting task status during activity update", "task", taskList.Items[i].Name) + } } } + return nil } diff --git a/cmd/kelos-spawner/main_test.go b/cmd/kelos-spawner/main_test.go index 1638249c..3f3334ef 100644 --- a/cmd/kelos-spawner/main_test.go +++ b/cmd/kelos-spawner/main_test.go @@ -2033,118 +2033,32 @@ func TestReportingEnabled_Jira(t *testing.T) { } } -func TestRunReportingCycle_ReportsForAnnotatedTasks(t *testing.T) { +func TestRunOnce_DoesNotCallReporting(t *testing.T) { ts := newTaskSpawner("spawner", "default", nil) + ts.Spec.Suspend = boolPtr(true) ts.Spec.When.GitHubIssues.Reporting = &kelosv1alpha1.GitHubReporting{Enabled: true} - // Create a task with reporting annotations and a Pending phase - task := kelosv1alpha1.Task{ - ObjectMeta: metav1.ObjectMeta{ - Name: "spawner-1", - Namespace: "default", - Labels: map[string]string{ - "kelos.dev/taskspawner": "spawner", - }, - Annotations: map[string]string{ - reporting.AnnotationGitHubReporting: "enabled", - reporting.AnnotationSourceNumber: "42", - reporting.AnnotationSourceKind: "issue", - }, - }, - Spec: kelosv1alpha1.TaskSpec{ - Type: "claude-code", - Prompt: "test", - Credentials: kelosv1alpha1.Credentials{ - Type: kelosv1alpha1.CredentialTypeOAuth, - SecretRef: &kelosv1alpha1.SecretReference{Name: "creds"}, - }, - }, - Status: kelosv1alpha1.TaskStatus{ - Phase: kelosv1alpha1.TaskPhasePending, - }, - } - - cl, key := setupTest(t, ts, task) - - // Set up a fake GitHub server - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusCreated) - json.NewEncoder(w).Encode(map[string]int64{"id": 999}) - })) - defer server.Close() - - reporter := &reporting.TaskReporter{ - Client: cl, - Reporter: &reporting.GitHubReporter{ - Owner: "owner", - Repo: "repo", - Token: "token", - BaseURL: server.URL, - }, - } - - if err := runReportingCycle(context.Background(), cl, key, reporter); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Verify annotations were updated - var updated kelosv1alpha1.Task - if err := cl.Get(context.Background(), client.ObjectKeyFromObject(&task), &updated); err != nil { - t.Fatalf("Getting updated task: %v", err) - } - if updated.Annotations[reporting.AnnotationGitHubReportPhase] != "accepted" { - t.Errorf("Expected report phase 'accepted', got %q", updated.Annotations[reporting.AnnotationGitHubReportPhase]) - } - if updated.Annotations[reporting.AnnotationGitHubCommentID] == "" { - t.Error("Expected comment ID to be set") - } -} - -func TestRunReportingCycle_SkipsTasksWithoutReporting(t *testing.T) { - ts := newTaskSpawner("spawner", "default", nil) - - // Task without reporting annotations - task := kelosv1alpha1.Task{ - ObjectMeta: metav1.ObjectMeta{ - Name: "spawner-1", - Namespace: "default", - Labels: map[string]string{ - "kelos.dev/taskspawner": "spawner", - }, - }, - Spec: kelosv1alpha1.TaskSpec{ - Type: "claude-code", - Prompt: "test", - Credentials: kelosv1alpha1.Credentials{ - Type: kelosv1alpha1.CredentialTypeOAuth, - SecretRef: &kelosv1alpha1.SecretReference{Name: "creds"}, - }, - }, - Status: kelosv1alpha1.TaskStatus{ - Phase: kelosv1alpha1.TaskPhasePending, - }, + task := newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhasePending) + task.Annotations = map[string]string{ + reporting.AnnotationGitHubReporting: "enabled", + reporting.AnnotationSourceNumber: "42", + reporting.AnnotationSourceKind: "issue", } cl, key := setupTest(t, ts, task) - // Server should never be called server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - t.Error("GitHub API should not be called for tasks without reporting") + t.Error("Poll cycle should not call GitHub reporting API; reporting is handled by task-activity controller") w.WriteHeader(http.StatusOK) })) defer server.Close() - reporter := &reporting.TaskReporter{ - Client: cl, - Reporter: &reporting.GitHubReporter{ - Owner: "owner", - Repo: "repo", - Token: "token", - BaseURL: server.URL, - }, - } - - if err := runReportingCycle(context.Background(), cl, key, reporter); err != nil { + _, err := runOnce(context.Background(), cl, key, spawnerRuntimeConfig{ + GitHubOwner: "owner", + GitHubRepo: "repo", + GitHubAPIBaseURL: server.URL, + }) + if err != nil { t.Fatalf("Unexpected error: %v", err) } } @@ -2173,11 +2087,10 @@ func TestRunOnce_ReturnsPollIntervalForSuspendedTaskSpawner(t *testing.T) { } } -func TestRunOnce_UsesEnvTokenForReporting(t *testing.T) { +func TestHandleTaskActivity_UsesEnvToken(t *testing.T) { t.Setenv("GITHUB_TOKEN", "pat-token") ts := newTaskSpawner("spawner", "default", nil) - ts.Spec.Suspend = boolPtr(true) ts.Spec.When.GitHubIssues.Reporting = &kelosv1alpha1.GitHubReporting{Enabled: true} task := newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhasePending) @@ -2197,7 +2110,7 @@ func TestRunOnce_UsesEnvTokenForReporting(t *testing.T) { })) defer server.Close() - _, err := runOnce(context.Background(), cl, key, spawnerRuntimeConfig{ + err := handleTaskActivity(context.Background(), cl, key, spawnerRuntimeConfig{ GitHubOwner: "owner", GitHubRepo: "repo", GitHubAPIBaseURL: server.URL, @@ -2246,9 +2159,9 @@ func TestSpawnerReconcilerTaskSpawnerPredicate(t *testing.T) { } } -func TestSpawnerReconcilerTaskPredicate(t *testing.T) { +func TestTaskActivityReconcilerTaskPredicate(t *testing.T) { key := types.NamespacedName{Name: "spawner", Namespace: "default"} - r := &spawnerReconciler{Key: key} + r := &taskActivityReconciler{Key: key} p := r.taskPredicate() base := newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhasePending) @@ -2279,9 +2192,9 @@ func TestSpawnerReconcilerTaskPredicate(t *testing.T) { } } -func TestSpawnerReconcilerRequestsForTask(t *testing.T) { +func TestTaskActivityReconcilerRequestsForTask(t *testing.T) { key := types.NamespacedName{Name: "spawner", Namespace: "default"} - r := &spawnerReconciler{Key: key} + r := &taskActivityReconciler{Key: key} task := newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhasePending) requests := r.requestsForTask(context.Background(), task.DeepCopy()) @@ -2293,9 +2206,9 @@ func TestSpawnerReconcilerRequestsForTask(t *testing.T) { } other := newTask("other-1", "default", "other", kelosv1alpha1.TaskPhasePending) - requests = r.requestsForTask(context.Background(), other.DeepCopy()) - if len(requests) != 0 { - t.Fatalf("Expected no requests for non-matching task, got %d", len(requests)) + otherRequests := r.requestsForTask(context.Background(), other.DeepCopy()) + if len(otherRequests) != 0 { + t.Fatalf("Expected no requests for non-matching task, got %d", len(otherRequests)) } } @@ -2415,3 +2328,207 @@ func TestRunOnce_ReturnsSourcePollInterval(t *testing.T) { t.Fatalf("Interval = %v, want %v", interval, 15*time.Second) } } + +func TestHandleTaskActivity_UpdatesActiveTasksCount(t *testing.T) { + ts := newTaskSpawner("spawner", "default", nil) + ts.Status.ActiveTasks = 3 // stale value + + existingTasks := []kelosv1alpha1.Task{ + newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseRunning), + newTask("spawner-2", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded), + newTask("spawner-3", "default", "spawner", kelosv1alpha1.TaskPhasePending), + } + cl, key := setupTest(t, ts, existingTasks...) + + if err := handleTaskActivity(context.Background(), cl, key, spawnerRuntimeConfig{}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var updated kelosv1alpha1.TaskSpawner + if err := cl.Get(context.Background(), key, &updated); err != nil { + t.Fatalf("Getting TaskSpawner: %v", err) + } + // 1 running + 1 pending = 2 active (succeeded is excluded) + if updated.Status.ActiveTasks != 2 { + t.Errorf("Expected activeTasks=2, got %d", updated.Status.ActiveTasks) + } +} + +func TestHandleTaskActivity_NoUpdateWhenCountUnchanged(t *testing.T) { + ts := newTaskSpawner("spawner", "default", nil) + ts.Status.ActiveTasks = 1 + + task1 := newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseRunning) + task2 := newTask("spawner-2", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded) + + // Track status updates via interceptor + updateCalled := false + cl := fake.NewClientBuilder(). + WithScheme(newTestScheme()). + WithObjects(ts, &task1, &task2). + WithStatusSubresource(ts). + WithInterceptorFuncs(interceptor.Funcs{ + SubResourceUpdate: func(ctx context.Context, c client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error { + if subResourceName == "status" { + if _, ok := obj.(*kelosv1alpha1.TaskSpawner); ok { + updateCalled = true + } + } + return nil + }, + }). + Build() + key := types.NamespacedName{Name: ts.Name, Namespace: ts.Namespace} + + if err := handleTaskActivity(context.Background(), cl, key, spawnerRuntimeConfig{}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if updateCalled { + t.Error("Expected no status update when activeTasks count is unchanged") + } +} + +func TestHandleTaskActivity_RunsReportingWhenEnabled(t *testing.T) { + ts := newTaskSpawner("spawner", "default", nil) + ts.Spec.When.GitHubIssues.Reporting = &kelosv1alpha1.GitHubReporting{Enabled: true} + + task := kelosv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "spawner-1", + Namespace: "default", + Labels: map[string]string{ + "kelos.dev/taskspawner": "spawner", + }, + Annotations: map[string]string{ + reporting.AnnotationGitHubReporting: "enabled", + reporting.AnnotationSourceNumber: "42", + reporting.AnnotationSourceKind: "issue", + }, + }, + Spec: kelosv1alpha1.TaskSpec{ + Type: "claude-code", + Prompt: "test", + Credentials: kelosv1alpha1.Credentials{ + Type: kelosv1alpha1.CredentialTypeOAuth, + SecretRef: &kelosv1alpha1.SecretReference{Name: "creds"}, + }, + }, + Status: kelosv1alpha1.TaskStatus{ + Phase: kelosv1alpha1.TaskPhasePending, + }, + } + + cl, key := setupTest(t, ts, task) + + apiCalled := false + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + apiCalled = true + w.WriteHeader(http.StatusCreated) + json.NewEncoder(w).Encode(map[string]int64{"id": 999}) + })) + defer server.Close() + + cfg := spawnerRuntimeConfig{ + GitHubOwner: "owner", + GitHubRepo: "repo", + GitHubAPIBaseURL: server.URL, + } + + if err := handleTaskActivity(context.Background(), cl, key, cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + if !apiCalled { + t.Error("Expected GitHub API to be called for reporting") + } + + // Verify annotations were updated on the task + var updated kelosv1alpha1.Task + if err := cl.Get(context.Background(), client.ObjectKeyFromObject(&task), &updated); err != nil { + t.Fatalf("Getting updated task: %v", err) + } + if updated.Annotations[reporting.AnnotationGitHubReportPhase] != "accepted" { + t.Errorf("Expected report phase 'accepted', got %q", updated.Annotations[reporting.AnnotationGitHubReportPhase]) + } +} + +func TestHandleTaskActivity_SkipsReportingWhenDisabled(t *testing.T) { + ts := newTaskSpawner("spawner", "default", nil) + // Reporting not enabled (default) + + task := newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseRunning) + cl, key := setupTest(t, ts, task) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Error("GitHub API should not be called when reporting is disabled") + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + cfg := spawnerRuntimeConfig{ + GitHubOwner: "owner", + GitHubRepo: "repo", + GitHubAPIBaseURL: server.URL, + } + + if err := handleTaskActivity(context.Background(), cl, key, cfg); err != nil { + t.Fatalf("Unexpected error: %v", err) + } +} + +func TestHandleTaskActivity_DoesNotTriggerDiscovery(t *testing.T) { + ts := newTaskSpawner("spawner", "default", nil) + existingTasks := []kelosv1alpha1.Task{ + newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded), + } + cl, key := setupTest(t, ts, existingTasks...) + + // Record discovery metric before + beforeDiscovery := testutil.ToFloat64(discoveryTotal) + + if err := handleTaskActivity(context.Background(), cl, key, spawnerRuntimeConfig{}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Discovery metric should not have changed + afterDiscovery := testutil.ToFloat64(discoveryTotal) + if afterDiscovery != beforeDiscovery { + t.Errorf("Expected no discovery cycles to run, but discoveryTotal changed from %v to %v", beforeDiscovery, afterDiscovery) + } +} + +func TestHandleTaskActivity_TaskSpawnerNotFound(t *testing.T) { + // TaskSpawner does not exist - should return nil (no error) + cl := fake.NewClientBuilder(). + WithScheme(newTestScheme()). + Build() + key := types.NamespacedName{Name: "nonexistent", Namespace: "default"} + + if err := handleTaskActivity(context.Background(), cl, key, spawnerRuntimeConfig{}); err != nil { + t.Fatalf("Expected no error for missing TaskSpawner, got: %v", err) + } +} + +func TestHandleTaskActivity_AllTasksTerminal(t *testing.T) { + ts := newTaskSpawner("spawner", "default", nil) + ts.Status.ActiveTasks = 2 // stale value + + existingTasks := []kelosv1alpha1.Task{ + newTask("spawner-1", "default", "spawner", kelosv1alpha1.TaskPhaseSucceeded), + newTask("spawner-2", "default", "spawner", kelosv1alpha1.TaskPhaseFailed), + } + cl, key := setupTest(t, ts, existingTasks...) + + if err := handleTaskActivity(context.Background(), cl, key, spawnerRuntimeConfig{}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + var updated kelosv1alpha1.TaskSpawner + if err := cl.Get(context.Background(), key, &updated); err != nil { + t.Fatalf("Getting TaskSpawner: %v", err) + } + if updated.Status.ActiveTasks != 0 { + t.Errorf("Expected activeTasks=0 when all tasks are terminal, got %d", updated.Status.ActiveTasks) + } +} diff --git a/cmd/kelos-spawner/reconciler.go b/cmd/kelos-spawner/reconciler.go index dc2eca4e..4a419c2e 100644 --- a/cmd/kelos-spawner/reconciler.go +++ b/cmd/kelos-spawner/reconciler.go @@ -18,7 +18,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" kelosv1alpha1 "github.com/kelos-dev/kelos/api/v1alpha1" - "github.com/kelos-dev/kelos/internal/reporting" ) type spawnerRuntimeConfig struct { @@ -59,11 +58,6 @@ func (r *spawnerReconciler) SetupWithManager(mgr ctrl.Manager) error { Named("taskspawner-loop"). WithOptions(controller.Options{MaxConcurrentReconciles: 1}). For(&kelosv1alpha1.TaskSpawner{}, builder.WithPredicates(r.taskSpawnerPredicate())). - Watches( - &kelosv1alpha1.Task{}, - handler.EnqueueRequestsFromMapFunc(r.requestsForTask), - builder.WithPredicates(r.taskPredicate()), - ). Complete(r) } @@ -77,27 +71,9 @@ func runOnce(ctx context.Context, cl client.Client, key types.NamespacedName, cf return 0, fmt.Errorf("fetching TaskSpawner after cycle: %w", err) } - if reportingEnabled(&ts) { - token, err := readGitHubToken(cfg.GitHubTokenFile) - if err != nil { - return 0, fmt.Errorf("reading GitHub token for reporting: %w", err) - } - - reporter := &reporting.TaskReporter{ - Client: cl, - Reporter: &reporting.GitHubReporter{ - Owner: cfg.GitHubOwner, - Repo: cfg.GitHubRepo, - Token: token, - TokenFile: cfg.GitHubTokenFile, - BaseURL: cfg.GitHubAPIBaseURL, - Client: cfg.HTTPClient, - }, - } - if err := runReportingCycle(ctx, cl, key, reporter); err != nil { - return 0, err - } - } + // Reporting is handled exclusively by the taskActivityReconciler to + // avoid racing with this poll-cycle goroutine. Task phase changes + // trigger the task-activity controller which calls ReportTaskStatus. return resolvedPollInterval(&ts), nil } @@ -121,7 +97,33 @@ func resolvedPollInterval(ts *kelosv1alpha1.TaskSpawner) time.Duration { return parsePollInterval(ts.Spec.PollInterval) } -func (r *spawnerReconciler) requestsForTask(_ context.Context, obj client.Object) []reconcile.Request { +// taskActivityReconciler handles Task phase changes, deletion timestamp +// changes, and deletes for the single managed TaskSpawner. It recomputes +// status.activeTasks and runs per-Task GitHub reporting without triggering +// source discovery. +type taskActivityReconciler struct { + client.Client + Key types.NamespacedName + Config spawnerRuntimeConfig +} + +func (r *taskActivityReconciler) Reconcile(ctx context.Context, _ ctrl.Request) (ctrl.Result, error) { + return ctrl.Result{}, handleTaskActivity(ctx, r.Client, r.Key, r.Config) +} + +func (r *taskActivityReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + Named("task-activity"). + WithOptions(controller.Options{MaxConcurrentReconciles: 1}). + Watches( + &kelosv1alpha1.Task{}, + handler.EnqueueRequestsFromMapFunc(r.requestsForTask), + builder.WithPredicates(r.taskPredicate()), + ). + Complete(r) +} + +func (r *taskActivityReconciler) requestsForTask(_ context.Context, obj client.Object) []reconcile.Request { if !matchesSpawnerTask(obj, r.Key) { return nil } @@ -148,7 +150,7 @@ func (r *spawnerReconciler) taskSpawnerPredicate() predicate.Predicate { } } -func (r *spawnerReconciler) taskPredicate() predicate.Predicate { +func (r *taskActivityReconciler) taskPredicate() predicate.Predicate { return predicate.Funcs{ CreateFunc: func(event.CreateEvent) bool { return false