Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 71 additions & 10 deletions cmd/kelos-spawner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,16 @@ func main() {
Key: key,
Config: cfgArgs,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create controller")
log.Error(err, "Unable to create spawner controller")
os.Exit(1)
}

if err := (&taskActivityReconciler{
Client: cl,
Key: key,
Config: cfgArgs,
}).SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create task activity controller")
os.Exit(1)
}

Expand All @@ -146,25 +155,77 @@ func main() {
}
}

// runReportingCycle lists all Tasks owned by the given TaskSpawner and runs
// reporting for each one that has GitHub reporting enabled. Running this
// in the same goroutine as the discovery loop avoids races between Task
// creation/deletion and annotation patching.
func runReportingCycle(ctx context.Context, cl client.Client, key types.NamespacedName, reporter *reporting.TaskReporter) error {
// handleTaskActivity is the lightweight reconcile path triggered by Task
// phase changes, deletion timestamp changes, and deletes. It recomputes
// status.activeTasks on the owning TaskSpawner and runs per-Task GitHub
// reporting when enabled, without calling source discovery.
func handleTaskActivity(ctx context.Context, cl client.Client, key types.NamespacedName, cfg spawnerRuntimeConfig) error {
log := ctrl.Log.WithName("spawner")

var ts kelosv1alpha1.TaskSpawner
if err := cl.Get(ctx, key, &ts); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("fetching TaskSpawner for task activity: %w", err)
}

// List all Tasks owned by this TaskSpawner to recompute activeTasks.
var taskList kelosv1alpha1.TaskList
if err := cl.List(ctx, &taskList,
client.InNamespace(key.Namespace),
client.MatchingLabels{"kelos.dev/taskspawner": key.Name},
); err != nil {
return fmt.Errorf("listing tasks for reporting: %w", err)
return fmt.Errorf("listing tasks for activity update: %w", err)
}

activeTasks := 0
for i := range taskList.Items {
if err := reporter.ReportTaskStatus(ctx, &taskList.Items[i]); err != nil {
ctrl.Log.WithName("spawner").Error(err, "Reporting task status", "task", taskList.Items[i].Name)
// Continue with remaining tasks rather than aborting the cycle
t := &taskList.Items[i]
if t.Status.Phase != kelosv1alpha1.TaskPhaseSucceeded && t.Status.Phase != kelosv1alpha1.TaskPhaseFailed {
activeTasks++
}
}

// Update status.activeTasks if it changed.
if ts.Status.ActiveTasks != activeTasks {
// Re-fetch for latest resource version before status update.
if err := cl.Get(ctx, key, &ts); err != nil {
return fmt.Errorf("re-fetching TaskSpawner for active tasks update: %w", err)
}
ts.Status.ActiveTasks = activeTasks
if err := cl.Status().Update(ctx, &ts); err != nil {
return fmt.Errorf("updating TaskSpawner activeTasks: %w", err)
}
log.Info("Updated active tasks count", "activeTasks", activeTasks)
}

// Run per-Task reporting when enabled.
if reportingEnabled(&ts) {
token, err := readGitHubToken(cfg.GitHubTokenFile)
if err != nil {
return fmt.Errorf("reading GitHub token for task activity reporting: %w", err)
}

reporter := &reporting.TaskReporter{
Client: cl,
Reporter: &reporting.GitHubReporter{
Owner: cfg.GitHubOwner,
Repo: cfg.GitHubRepo,
Token: token,
TokenFile: cfg.GitHubTokenFile,
BaseURL: cfg.GitHubAPIBaseURL,
Client: cfg.HTTPClient,
},
}

for i := range taskList.Items {
if err := reporter.ReportTaskStatus(ctx, &taskList.Items[i]); err != nil {
log.Error(err, "Reporting task status during activity update", "task", taskList.Items[i].Name)
}
}
}

return nil
}

Expand Down
Loading
Loading