diff --git a/api/v1alpha1/taskspawner_types.go b/api/v1alpha1/taskspawner_types.go index b8ad27f2..cc5068ec 100644 --- a/api/v1alpha1/taskspawner_types.go +++ b/api/v1alpha1/taskspawner_types.go @@ -44,6 +44,12 @@ type When struct { // LinearWebhook triggers task spawning on Linear webhook events. // +optional LinearWebhook *LinearWebhook `json:"linearWebhook,omitempty"` + + // Slack discovers work items from Slack messages via Socket Mode. + // The spawner connects to Slack via an outbound WebSocket (no ingress + // required) and listens for messages in the channels the bot is invited to. + // +optional + Slack *Slack `json:"slack,omitempty"` } // Cron triggers task spawning on a cron schedule. @@ -394,6 +400,50 @@ type LinearWebhookFilter struct { ExcludeLabels []string `json:"excludeLabels,omitempty"` } +// Slack discovers work items from Slack messages via Socket Mode. +// The spawner connects to Slack using an App-Level Token (Socket Mode) and +// listens for messages in configured channels. No ingress, LoadBalancer, or +// public URL is required — the connection is outbound only. +// +// Authentication is provided via a Secret that must contain two keys: +// - SLACK_BOT_TOKEN: Bot User OAuth Token (xoxb-...) +// - SLACK_APP_TOKEN: App-Level Token for Socket Mode (xapp-...) +// +// The bot must be invited to each channel it should listen in. +type Slack struct { + // SecretRef references a Secret containing "SLACK_BOT_TOKEN" and + // "SLACK_APP_TOKEN" keys. + // +kubebuilder:validation:Required + SecretRef SecretReference `json:"secretRef"` + + // TriggerCommand is an optional slash command or message prefix that + // triggers task creation (e.g., "/kelos", "!fix"). When set, only + // messages starting with this prefix trigger tasks and the prefix is + // stripped from the prompt. When empty, every non-threaded message in + // the channel triggers a task. + // +optional + TriggerCommand string `json:"triggerCommand,omitempty"` + + // Channels optionally restricts which Slack channels the bot listens in. + // Values are channel IDs (e.g., "C0123456789"). When empty, the bot + // listens in every channel it has been invited to. + // +optional + Channels []string `json:"channels,omitempty"` + + // AllowedUsers optionally restricts which Slack users can trigger tasks. + // Values are Slack user IDs (e.g., "U0123456789"). When empty, any user + // in the channel can trigger tasks. + // +optional + AllowedUsers []string `json:"allowedUsers,omitempty"` + + // PollInterval overrides spec.pollInterval for this source (e.g., "30s", "5m"). + // Slack uses Socket Mode (real-time), but Discover() is still called on + // this interval to drain accumulated events. When empty, spec.pollInterval + // is used. + // +optional + PollInterval string `json:"pollInterval,omitempty"` +} + // TaskTemplateMetadata holds optional labels and annotations for spawned Tasks. type TaskTemplateMetadata struct { // Labels are merged into the spawned Task's labels. Values support Go diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 53452af0..2b5ff015 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -642,6 +642,32 @@ func (in *SkillsShSpec) DeepCopy() *SkillsShSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Slack) DeepCopyInto(out *Slack) { + *out = *in + out.SecretRef = in.SecretRef + if in.Channels != nil { + in, out := &in.Channels, &out.Channels + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.AllowedUsers != nil { + in, out := &in.AllowedUsers, &out.AllowedUsers + *out = make([]string, len(*in)) + copy(*out, *in) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Slack. +func (in *Slack) DeepCopy() *Slack { + if in == nil { + return nil + } + out := new(Slack) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Task) DeepCopyInto(out *Task) { *out = *in @@ -1002,6 +1028,11 @@ func (in *When) DeepCopyInto(out *When) { *out = new(LinearWebhook) (*in).DeepCopyInto(*out) } + if in.Slack != nil { + in, out := &in.Slack, &out.Slack + *out = new(Slack) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new When. diff --git a/cmd/kelos-spawner/main.go b/cmd/kelos-spawner/main.go index 59df50e8..c68b6a45 100644 --- a/cmd/kelos-spawner/main.go +++ b/cmd/kelos-spawner/main.go @@ -48,6 +48,9 @@ func main() { var jiraBaseURL string var jiraProject string var jiraJQL string + var slackTriggerCommand string + var slackChannels string + var slackAllowedUsers string var oneShot bool flag.StringVar(&name, "taskspawner-name", "", "Name of the TaskSpawner to manage") @@ -59,6 +62,9 @@ func main() { flag.StringVar(&jiraBaseURL, "jira-base-url", "", "Jira instance base URL (e.g. https://mycompany.atlassian.net)") flag.StringVar(&jiraProject, "jira-project", "", "Jira project key") flag.StringVar(&jiraJQL, "jira-jql", "", "Optional JQL filter for Jira issues") + flag.StringVar(&slackTriggerCommand, "slack-trigger-command", "", "Slack trigger command or message prefix") + flag.StringVar(&slackChannels, "slack-channels", "", "Comma-separated list of Slack channel IDs to listen in") + flag.StringVar(&slackAllowedUsers, "slack-allowed-users", "", "Comma-separated list of allowed Slack user IDs") flag.BoolVar(&oneShot, "one-shot", false, "Run a single discovery cycle and exit (used by CronJob)") opts, applyVerbosity := logging.SetupZapOptions(flag.CommandLine) @@ -106,15 +112,18 @@ func main() { httpClient := &http.Client{Transport: transport} cfgArgs := spawnerRuntimeConfig{ - GitHubOwner: githubOwner, - GitHubRepo: githubRepo, - GitHubAPIBaseURL: githubAPIBaseURL, - GHProxyURL: ghProxyURL, - GitHubTokenFile: githubTokenFile, - JiraBaseURL: jiraBaseURL, - JiraProject: jiraProject, - JiraJQL: jiraJQL, - HTTPClient: httpClient, + GitHubOwner: githubOwner, + GitHubRepo: githubRepo, + GitHubAPIBaseURL: githubAPIBaseURL, + GHProxyURL: ghProxyURL, + GitHubTokenFile: githubTokenFile, + JiraBaseURL: jiraBaseURL, + JiraProject: jiraProject, + JiraJQL: jiraJQL, + SlackTriggerCommand: slackTriggerCommand, + SlackChannels: slackChannels, + SlackAllowedUsers: slackAllowedUsers, + HTTPClient: httpClient, } if oneShot { @@ -177,9 +186,26 @@ func runReportingCycle(ctx context.Context, cl client.Client, key types.Namespac return nil } -func runCycle(ctx context.Context, cl client.Client, key types.NamespacedName, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL string, httpClient *http.Client) error { +func runSlackReportingCycle(ctx context.Context, cl client.Client, key types.NamespacedName, reporter *reporting.SlackTaskReporter) error { + var taskList kelosv1alpha1.TaskList + if err := cl.List(ctx, &taskList, + client.InNamespace(key.Namespace), + client.MatchingLabels{"kelos.dev/taskspawner": key.Name}, + ); err != nil { + return fmt.Errorf("listing tasks for Slack reporting: %w", err) + } + + for i := range taskList.Items { + if err := reporter.ReportTaskStatus(ctx, &taskList.Items[i]); err != nil { + ctrl.Log.WithName("spawner").Error(err, "Reporting Slack task status", "task", taskList.Items[i].Name) + } + } + return nil +} + +func runCycle(ctx context.Context, cl client.Client, key types.NamespacedName, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers string, httpClient *http.Client) error { start := time.Now() - err := runCycleCore(ctx, cl, key, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, httpClient) + err := runCycleCore(ctx, cl, key, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers, httpClient) discoveryDurationSeconds.Observe(time.Since(start).Seconds()) if err != nil { discoveryErrorsTotal.Inc() @@ -187,13 +213,13 @@ func runCycle(ctx context.Context, cl client.Client, key types.NamespacedName, g return err } -func runCycleCore(ctx context.Context, cl client.Client, key types.NamespacedName, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL string, httpClient *http.Client) error { +func runCycleCore(ctx context.Context, cl client.Client, key types.NamespacedName, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers string, httpClient *http.Client) error { var ts kelosv1alpha1.TaskSpawner if err := cl.Get(ctx, key, &ts); err != nil { return fmt.Errorf("fetching TaskSpawner: %w", err) } - src, err := buildSource(&ts, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, httpClient) + src, err := buildSource(&ts, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers, httpClient) if err != nil { return fmt.Errorf("building source: %w", err) } @@ -517,10 +543,24 @@ func renderTaskTemplateMetadata(ts *kelosv1alpha1.TaskSpawner, item source.WorkI return labels, annotations, nil } -// sourceAnnotations returns annotations that stamp GitHub source metadata -// onto a spawned Task. These annotations enable downstream consumers (such -// as the reporting watcher) to identify the originating issue or PR. +// sourceAnnotations returns annotations that stamp source metadata onto a +// spawned Task. These annotations enable downstream consumers (such as the +// reporting watcher) to identify the originating issue, PR, or Slack message. func sourceAnnotations(ts *kelosv1alpha1.TaskSpawner, item source.WorkItem) map[string]string { + if ts.Spec.When.Slack != nil && len(item.Labels) >= 2 { + annotations := map[string]string{ + reporting.AnnotationSlackReporting: "enabled", + reporting.AnnotationSlackChannel: item.Labels[1], + } + // Only set thread_ts when the item ID is a valid Slack message + // timestamp (e.g. "1234567890.123456"). Slash command IDs are + // compound strings containing colons and are not valid timestamps. + if isSlackTimestamp(item.ID) { + annotations[reporting.AnnotationSlackThreadTS] = item.ID + } + return annotations + } + if ts.Spec.When.GitHubIssues == nil && ts.Spec.When.GitHubPullRequests == nil { return nil } @@ -542,9 +582,12 @@ func sourceAnnotations(ts *kelosv1alpha1.TaskSpawner, item source.WorkItem) map[ return annotations } -// reportingEnabled returns true when GitHub reporting is configured and enabled +// reportingEnabled returns true when reporting is configured and enabled // on the TaskSpawner. func reportingEnabled(ts *kelosv1alpha1.TaskSpawner) bool { + if ts.Spec.When.Slack != nil { + return true + } if ts.Spec.When.GitHubIssues != nil && ts.Spec.When.GitHubIssues.Reporting != nil { return ts.Spec.When.GitHubIssues.Reporting.Enabled } @@ -596,7 +639,7 @@ func resolveGitHubCommentPolicy(policy *kelosv1alpha1.GitHubCommentPolicy, legac }, nil } -func buildSource(ts *kelosv1alpha1.TaskSpawner, owner, repo, apiBaseURL, tokenFile, jiraBaseURL, jiraProject, jiraJQL string, httpClient *http.Client) (source.Source, error) { +func buildSource(ts *kelosv1alpha1.TaskSpawner, owner, repo, apiBaseURL, tokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers string, httpClient *http.Client) (source.Source, error) { if ts.Spec.When.GitHubIssues != nil { gh := ts.Spec.When.GitHubIssues token, err := readGitHubToken(tokenFile) @@ -673,6 +716,18 @@ func buildSource(ts *kelosv1alpha1.TaskSpawner, owner, repo, apiBaseURL, tokenFi }, nil } + if ts.Spec.When.Slack != nil { + botToken := os.Getenv("SLACK_BOT_TOKEN") + appToken := os.Getenv("SLACK_APP_TOKEN") + return &source.SlackSource{ + BotToken: botToken, + AppToken: appToken, + TriggerCommand: slackTriggerCommand, + Channels: parseCSV(slackChannels), + AllowedUsers: parseCSV(slackAllowedUsers), + }, nil + } + if ts.Spec.When.Cron != nil { var lastDiscovery time.Time if ts.Status.LastDiscoveryTime != nil { @@ -689,6 +744,20 @@ func buildSource(ts *kelosv1alpha1.TaskSpawner, owner, repo, apiBaseURL, tokenFi return nil, fmt.Errorf("no source configured in TaskSpawner %s/%s", ts.Namespace, ts.Name) } +func parseCSV(s string) []string { + if s == "" { + return nil + } + var result []string + for _, item := range strings.Split(s, ",") { + item = strings.TrimSpace(item) + if item != "" { + result = append(result, item) + } + } + return result +} + func readGitHubToken(tokenFile string) (string, error) { token := os.Getenv("GITHUB_TOKEN") if tokenFile == "" { @@ -769,6 +838,24 @@ func parseOwnerRepo(repoURL string) (string, string) { return "", "" } +// isSlackTimestamp returns true when s looks like a Slack message timestamp +// (e.g. "1234567890.123456"). Slash command work-item IDs are compound +// strings like "C123:/cmd:trigger" and must not be used as thread_ts. +func isSlackTimestamp(s string) bool { + parts := strings.SplitN(s, ".", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return false + } + for _, p := range parts { + for _, c := range p { + if c < '0' || c > '9' { + return false + } + } + } + return true +} + func parsePollInterval(s string) time.Duration { if s == "" { return 5 * time.Minute diff --git a/cmd/kelos-spawner/main_test.go b/cmd/kelos-spawner/main_test.go index 1638249c..f19d39da 100644 --- a/cmd/kelos-spawner/main_test.go +++ b/cmd/kelos-spawner/main_test.go @@ -138,7 +138,7 @@ func newTask(name, namespace, spawnerName string, phase kelosv1alpha1.TaskPhase) func TestBuildSource_GitHubIssuesWithBaseURL(t *testing.T) { ts := newTaskSpawner("spawner", "default", nil) - src, err := buildSource(ts, "my-org", "my-repo", "https://github.example.com/api/v3", "", "", "", "", nil) + src, err := buildSource(ts, "my-org", "my-repo", "https://github.example.com/api/v3", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -161,7 +161,7 @@ func TestBuildSource_GitHubIssuesWithBaseURL(t *testing.T) { func TestBuildSource_GitHubIssuesDefaultBaseURL(t *testing.T) { ts := newTaskSpawner("spawner", "default", nil) - src, err := buildSource(ts, "kelos-dev", "kelos", "", "", "", "", "", nil) + src, err := buildSource(ts, "kelos-dev", "kelos", "", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -187,7 +187,7 @@ func TestBuildSource_GitHubPullRequests(t *testing.T) { }, } - src, err := buildSource(ts, "kelos-dev", "kelos", "https://github.example.com/api/v3", "", "", "", "", nil) + src, err := buildSource(ts, "kelos-dev", "kelos", "https://github.example.com/api/v3", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -247,7 +247,7 @@ func TestBuildSource_Jira(t *testing.T) { t.Setenv("JIRA_USER", "user@example.com") t.Setenv("JIRA_TOKEN", "jira-api-token") - src, err := buildSource(ts, "", "", "", "", "https://mycompany.atlassian.net", "PROJ", "status = Open", nil) + src, err := buildSource(ts, "", "", "", "", "https://mycompany.atlassian.net", "PROJ", "status = Open", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -273,6 +273,46 @@ func TestBuildSource_Jira(t *testing.T) { } } +func TestBuildSource_Slack(t *testing.T) { + t.Setenv("SLACK_BOT_TOKEN", "xoxb-test-token") + t.Setenv("SLACK_APP_TOKEN", "xapp-test-token") + + ts := &kelosv1alpha1.TaskSpawner{ + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Slack: &kelosv1alpha1.Slack{ + SecretRef: kelosv1alpha1.SecretReference{Name: "slack-creds"}, + }, + }, + }, + } + + src, err := buildSource(ts, "", "", "", "", "", "", "", "/kelos", "C123,C456", "U001,U002", nil) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + slackSrc, ok := src.(*source.SlackSource) + if !ok { + t.Fatalf("Expected *source.SlackSource, got %T", src) + } + if slackSrc.BotToken != "xoxb-test-token" { + t.Errorf("BotToken = %q, want %q", slackSrc.BotToken, "xoxb-test-token") + } + if slackSrc.AppToken != "xapp-test-token" { + t.Errorf("AppToken = %q, want %q", slackSrc.AppToken, "xapp-test-token") + } + if slackSrc.TriggerCommand != "/kelos" { + t.Errorf("TriggerCommand = %q, want %q", slackSrc.TriggerCommand, "/kelos") + } + if len(slackSrc.Channels) != 2 || slackSrc.Channels[0] != "C123" || slackSrc.Channels[1] != "C456" { + t.Errorf("Channels = %v, want [C123 C456]", slackSrc.Channels) + } + if len(slackSrc.AllowedUsers) != 2 || slackSrc.AllowedUsers[0] != "U001" || slackSrc.AllowedUsers[1] != "U002" { + t.Errorf("AllowedUsers = %v, want [U001 U002]", slackSrc.AllowedUsers) + } +} + func TestRunCycleWithSource_NoMaxConcurrency(t *testing.T) { ts := newTaskSpawner("spawner", "default", nil) cl, key := setupTest(t, ts) @@ -535,7 +575,7 @@ func TestRunCycle_BuildSourceFailureCountsDiscoveryErrorAndDuration(t *testing.T beforeErrors := testutil.ToFloat64(discoveryErrorsTotal) beforeDurationCount := histogramSampleCount(t, discoveryDurationSeconds) - err := runCycle(context.Background(), cl, key, "owner", "repo", "", "", "", "", "", nil) + err := runCycle(context.Background(), cl, key, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err == nil { t.Fatal("Expected buildSource error") } @@ -1103,7 +1143,7 @@ func TestBuildSource_PriorityLabelsPassedToSource(t *testing.T) { "priority/imporant-soon", } - src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", nil) + src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1130,7 +1170,7 @@ func TestRunCycleWithSource_CommentFieldsPassedToSource(t *testing.T) { ExcludeComments: []string{"/kelos needs-input"}, } - src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", nil) + src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1159,7 +1199,7 @@ func TestBuildSource_CommentPolicyPassedToIssueSource(t *testing.T) { }, } - src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", nil) + src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1199,7 +1239,7 @@ func TestBuildSource_CommentPolicyPassedToPullRequestSource(t *testing.T) { }, } - src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", nil) + src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1264,7 +1304,7 @@ func TestBuildSource_CommentPolicyRejectsMixedConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := buildSource(tt.ts, "owner", "repo", "", "", "", "", "", nil) + _, err := buildSource(tt.ts, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err == nil { t.Fatal("Expected error for mixed legacy and commentPolicy config") } @@ -1799,6 +1839,90 @@ func TestSourceAnnotations_ReportingEnabledPR(t *testing.T) { } } +func TestSourceAnnotations_SlackMessage(t *testing.T) { + ts := &kelosv1alpha1.TaskSpawner{ + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Slack: &kelosv1alpha1.Slack{}, + }, + }, + } + + item := source.WorkItem{ + ID: "1234567890.123456", + Kind: "SlackMessage", + Labels: []string{"general", "C123ABC"}, + } + + annotations := sourceAnnotations(ts, item) + if annotations == nil { + t.Fatal("Expected annotations, got nil") + } + if annotations[reporting.AnnotationSlackReporting] != "enabled" { + t.Errorf("Expected slack-reporting 'enabled', got %q", annotations[reporting.AnnotationSlackReporting]) + } + if annotations[reporting.AnnotationSlackChannel] != "C123ABC" { + t.Errorf("Expected slack-channel 'C123ABC', got %q", annotations[reporting.AnnotationSlackChannel]) + } + if annotations[reporting.AnnotationSlackThreadTS] != "1234567890.123456" { + t.Errorf("Expected slack-thread-ts '1234567890.123456', got %q", annotations[reporting.AnnotationSlackThreadTS]) + } +} + +func TestSourceAnnotations_SlackSlashCommand(t *testing.T) { + ts := &kelosv1alpha1.TaskSpawner{ + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Slack: &kelosv1alpha1.Slack{}, + }, + }, + } + + // Slash command IDs are compound strings, not valid Slack timestamps + item := source.WorkItem{ + ID: "C123ABC:/kelos:trigger-id-abc", + Kind: "SlackMessage", + Labels: []string{"general", "C123ABC"}, + } + + annotations := sourceAnnotations(ts, item) + if annotations == nil { + t.Fatal("Expected annotations, got nil") + } + if annotations[reporting.AnnotationSlackReporting] != "enabled" { + t.Errorf("Expected slack-reporting 'enabled', got %q", annotations[reporting.AnnotationSlackReporting]) + } + if annotations[reporting.AnnotationSlackChannel] != "C123ABC" { + t.Errorf("Expected slack-channel 'C123ABC', got %q", annotations[reporting.AnnotationSlackChannel]) + } + if _, ok := annotations[reporting.AnnotationSlackThreadTS]; ok { + t.Errorf("Expected no slack-thread-ts for slash command, got %q", annotations[reporting.AnnotationSlackThreadTS]) + } +} + +func TestIsSlackTimestamp(t *testing.T) { + tests := []struct { + input string + want bool + }{ + {"1234567890.123456", true}, + {"0.0", true}, + {"C123ABC:/kelos:trigger-id-abc", false}, + {"not-a-timestamp", false}, + {"", false}, + {"1234567890", false}, + {".123456", false}, + {"1234567890.", false}, + } + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + if got := isSlackTimestamp(tt.input); got != tt.want { + t.Errorf("isSlackTimestamp(%q) = %v, want %v", tt.input, got, tt.want) + } + }) + } +} + func TestSourceAnnotations_NonGitHub(t *testing.T) { ts := &kelosv1alpha1.TaskSpawner{ Spec: kelosv1alpha1.TaskSpawnerSpec{ diff --git a/cmd/kelos-spawner/reconciler.go b/cmd/kelos-spawner/reconciler.go index 8c2cab09..ca969ec2 100644 --- a/cmd/kelos-spawner/reconciler.go +++ b/cmd/kelos-spawner/reconciler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "os" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -22,15 +23,18 @@ import ( ) type spawnerRuntimeConfig struct { - GitHubOwner string - GitHubRepo string - GitHubAPIBaseURL string - GHProxyURL string - GitHubTokenFile string - JiraBaseURL string - JiraProject string - JiraJQL string - HTTPClient *http.Client + GitHubOwner string + GitHubRepo string + GitHubAPIBaseURL string + GHProxyURL string + GitHubTokenFile string + JiraBaseURL string + JiraProject string + JiraJQL string + SlackTriggerCommand string + SlackChannels string + SlackAllowedUsers string + HTTPClient *http.Client } type spawnerReconciler struct { @@ -69,7 +73,7 @@ func (r *spawnerReconciler) SetupWithManager(mgr ctrl.Manager) error { } func runOnce(ctx context.Context, cl client.Client, key types.NamespacedName, cfg spawnerRuntimeConfig) (time.Duration, error) { - if err := runCycle(ctx, cl, key, cfg.GitHubOwner, cfg.GitHubRepo, cfg.GHProxyURL, cfg.GitHubTokenFile, cfg.JiraBaseURL, cfg.JiraProject, cfg.JiraJQL, cfg.HTTPClient); err != nil { + if err := runCycle(ctx, cl, key, cfg.GitHubOwner, cfg.GitHubRepo, cfg.GHProxyURL, cfg.GitHubTokenFile, cfg.JiraBaseURL, cfg.JiraProject, cfg.JiraJQL, cfg.SlackTriggerCommand, cfg.SlackChannels, cfg.SlackAllowedUsers, cfg.HTTPClient); err != nil { return 0, err } @@ -79,25 +83,39 @@ func runOnce(ctx context.Context, cl client.Client, key types.NamespacedName, cf } if reportingEnabled(&ts) { - token, err := readGitHubToken(cfg.GitHubTokenFile) - if err != nil { - return 0, fmt.Errorf("reading GitHub token for reporting: %w", err) - } + if ts.Spec.When.Slack != nil { + botToken := os.Getenv("SLACK_BOT_TOKEN") + if botToken == "" { + return 0, fmt.Errorf("SLACK_BOT_TOKEN environment variable is required for Slack reporting") + } + slackReporter := &reporting.SlackTaskReporter{ + Client: cl, + Reporter: &reporting.SlackReporter{BotToken: botToken}, + } + if err := runSlackReportingCycle(ctx, cl, key, slackReporter); err != nil { + return 0, err + } + } else { + token, err := readGitHubToken(cfg.GitHubTokenFile) + if err != nil { + return 0, fmt.Errorf("reading GitHub token for reporting: %w", err) + } - // Reporting always uses the direct API base URL (writes bypass the proxy). - reporter := &reporting.TaskReporter{ - Client: cl, - Reporter: &reporting.GitHubReporter{ - Owner: cfg.GitHubOwner, - Repo: cfg.GitHubRepo, - Token: token, - TokenFile: cfg.GitHubTokenFile, - BaseURL: cfg.GitHubAPIBaseURL, - Client: cfg.HTTPClient, - }, - } - if err := runReportingCycle(ctx, cl, key, reporter); err != nil { - return 0, err + // Reporting always uses the direct API base URL (writes bypass the proxy). + reporter := &reporting.TaskReporter{ + Client: cl, + Reporter: &reporting.GitHubReporter{ + Owner: cfg.GitHubOwner, + Repo: cfg.GitHubRepo, + Token: token, + TokenFile: cfg.GitHubTokenFile, + BaseURL: cfg.GitHubAPIBaseURL, + Client: cfg.HTTPClient, + }, + } + if err := runReportingCycle(ctx, cl, key, reporter); err != nil { + return 0, err + } } } @@ -116,6 +134,8 @@ func resolvedPollInterval(ts *kelosv1alpha1.TaskSpawner) time.Duration { sourceInterval = ts.Spec.When.GitHubPullRequests.PollInterval case ts.Spec.When.Jira != nil: sourceInterval = ts.Spec.When.Jira.PollInterval + case ts.Spec.When.Slack != nil: + sourceInterval = ts.Spec.When.Slack.PollInterval } if sourceInterval != "" { return parsePollInterval(sourceInterval) diff --git a/go.mod b/go.mod index 22d7c3b8..04a87b54 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/prometheus/client_golang v1.23.2 github.com/prometheus/client_model v0.6.2 github.com/robfig/cron/v3 v3.0.1 + github.com/slack-go/slack v0.20.0 github.com/spf13/cobra v1.10.2 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.0 @@ -59,6 +60,7 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect github.com/google/renameio/v2 v2.0.0 // indirect + github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/huandu/xstrings v1.5.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/go.sum b/go.sum index 7bd5416c..e2225783 100644 --- a/go.sum +++ b/go.sum @@ -74,6 +74,8 @@ github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7 github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/go-test/deep v1.1.1 h1:0r/53hagsehfO4bzD2Pgr/+RgHqhmf+k1Bpse2cTu1U= +github.com/go-test/deep v1.1.1/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= github.com/gobuffalo/flect v1.0.3 h1:xeWBM2nui+qnVvNM4S3foBhCAL2XgPU+a7FdpelbTq4= github.com/gobuffalo/flect v1.0.3/go.mod h1:A5msMlrHtLqh9umBSnvabjsMrCcCpAyzglnDvkbYKHs= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= @@ -106,6 +108,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/yamlfmt v0.21.0 h1:9FKApQkDpMKgBjwLFytBHUCgqnQgxaQnci0uiESfbzs= github.com/google/yamlfmt v0.21.0/go.mod h1:q6FYExB+Ueu7jZDjKECJk+EaeDXJzJ6Ne0dxx69GWfI= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= @@ -190,6 +194,8 @@ github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 h1:KRzFb2m7YtdldCEkzs6KqmJw4nqEV github.com/santhosh-tekuri/jsonschema/v6 v6.0.2/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/slack-go/slack v0.20.0 h1:gbDdbee8+Z2o+DWx05Spq3GzbrLLleiRwHUKs+hZLSU= +github.com/slack-go/slack v0.20.0/go.mod h1:K81UmCivcYd/5Jmz8vLBfuyoZ3B4rQC2GHVXHteXiAE= github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs= github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= diff --git a/internal/controller/taskspawner_deployment_builder.go b/internal/controller/taskspawner_deployment_builder.go index 04d9273c..794d460f 100644 --- a/internal/controller/taskspawner_deployment_builder.go +++ b/internal/controller/taskspawner_deployment_builder.go @@ -233,6 +233,43 @@ func (b *DeploymentBuilder) buildPodParts(ts *kelosv1alpha1.TaskSpawner, workspa ) } + if ts.Spec.When.Slack != nil { + slack := ts.Spec.When.Slack + if slack.TriggerCommand != "" { + args = append(args, "--slack-trigger-command="+slack.TriggerCommand) + } + if len(slack.Channels) > 0 { + args = append(args, "--slack-channels="+strings.Join(slack.Channels, ",")) + } + if len(slack.AllowedUsers) > 0 { + args = append(args, "--slack-allowed-users="+strings.Join(slack.AllowedUsers, ",")) + } + envVars = append(envVars, + corev1.EnvVar{ + Name: "SLACK_BOT_TOKEN", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: slack.SecretRef.Name, + }, + Key: "SLACK_BOT_TOKEN", + }, + }, + }, + corev1.EnvVar{ + Name: "SLACK_APP_TOKEN", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: slack.SecretRef.Name, + }, + Key: "SLACK_APP_TOKEN", + }, + }, + }, + ) + } + labels := map[string]string{ "kelos.dev/name": "kelos", "kelos.dev/component": "spawner", diff --git a/internal/controller/taskspawner_deployment_builder_test.go b/internal/controller/taskspawner_deployment_builder_test.go index 8cecf5de..a3a1799e 100644 --- a/internal/controller/taskspawner_deployment_builder_test.go +++ b/internal/controller/taskspawner_deployment_builder_test.go @@ -3012,3 +3012,142 @@ func TestReconcileDeployment_KeepsDeploymentWithNewLabels(t *testing.T) { t.Errorf("expected kelos.dev/component label in selector") } } + +func TestDeploymentBuilder_Slack(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-spawner", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Slack: &kelosv1alpha1.Slack{ + SecretRef: kelosv1alpha1.SecretReference{Name: "slack-creds"}, + TriggerCommand: "/kelos", + Channels: []string{"C123", "C456"}, + AllowedUsers: []string{"U001", "U002"}, + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + }, + } + + deploy := builder.Build(ts, nil, false) + + if len(deploy.Spec.Template.Spec.Containers) != 1 { + t.Fatalf("expected 1 container, got %d", len(deploy.Spec.Template.Spec.Containers)) + } + + spawner := deploy.Spec.Template.Spec.Containers[0] + + // Check Slack args + foundTriggerCommand := false + foundChannels := false + foundAllowedUsers := false + for _, arg := range spawner.Args { + switch { + case arg == "--slack-trigger-command=/kelos": + foundTriggerCommand = true + case arg == "--slack-channels=C123,C456": + foundChannels = true + case arg == "--slack-allowed-users=U001,U002": + foundAllowedUsers = true + } + } + if !foundTriggerCommand { + t.Errorf("expected --slack-trigger-command arg, got args: %v", spawner.Args) + } + if !foundChannels { + t.Errorf("expected --slack-channels arg, got args: %v", spawner.Args) + } + if !foundAllowedUsers { + t.Errorf("expected --slack-allowed-users arg, got args: %v", spawner.Args) + } + + // Check env vars + envMap := make(map[string]corev1.EnvVar) + for _, env := range spawner.Env { + envMap[env.Name] = env + } + + botToken, ok := envMap["SLACK_BOT_TOKEN"] + if !ok { + t.Fatal("expected SLACK_BOT_TOKEN env var") + } + if botToken.ValueFrom == nil || botToken.ValueFrom.SecretKeyRef == nil { + t.Fatal("expected SLACK_BOT_TOKEN to reference a secret") + } + if botToken.ValueFrom.SecretKeyRef.Name != "slack-creds" { + t.Errorf("SLACK_BOT_TOKEN secret name = %q, want %q", botToken.ValueFrom.SecretKeyRef.Name, "slack-creds") + } + if botToken.ValueFrom.SecretKeyRef.Key != "SLACK_BOT_TOKEN" { + t.Errorf("SLACK_BOT_TOKEN secret key = %q, want %q", botToken.ValueFrom.SecretKeyRef.Key, "SLACK_BOT_TOKEN") + } + + appToken, ok := envMap["SLACK_APP_TOKEN"] + if !ok { + t.Fatal("expected SLACK_APP_TOKEN env var") + } + if appToken.ValueFrom == nil || appToken.ValueFrom.SecretKeyRef == nil { + t.Fatal("expected SLACK_APP_TOKEN to reference a secret") + } + if appToken.ValueFrom.SecretKeyRef.Name != "slack-creds" { + t.Errorf("SLACK_APP_TOKEN secret name = %q, want %q", appToken.ValueFrom.SecretKeyRef.Name, "slack-creds") + } + if appToken.ValueFrom.SecretKeyRef.Key != "SLACK_APP_TOKEN" { + t.Errorf("SLACK_APP_TOKEN secret key = %q, want %q", appToken.ValueFrom.SecretKeyRef.Key, "SLACK_APP_TOKEN") + } +} + +func TestDeploymentBuilder_SlackMinimal(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-spawner", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Slack: &kelosv1alpha1.Slack{ + SecretRef: kelosv1alpha1.SecretReference{Name: "slack-creds"}, + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + }, + } + + deploy := builder.Build(ts, nil, false) + + if len(deploy.Spec.Template.Spec.Containers) != 1 { + t.Fatalf("expected 1 container, got %d", len(deploy.Spec.Template.Spec.Containers)) + } + + spawner := deploy.Spec.Template.Spec.Containers[0] + + // Verify no optional args are present + for _, arg := range spawner.Args { + if strings.HasPrefix(arg, "--slack-trigger-command") || + strings.HasPrefix(arg, "--slack-channels") || + strings.HasPrefix(arg, "--slack-allowed-users") { + t.Errorf("unexpected Slack arg in minimal config: %s", arg) + } + } + + // Env vars should still be present (tokens are always required) + envMap := make(map[string]corev1.EnvVar) + for _, env := range spawner.Env { + envMap[env.Name] = env + } + + if _, ok := envMap["SLACK_BOT_TOKEN"]; !ok { + t.Error("expected SLACK_BOT_TOKEN env var even in minimal config") + } + if _, ok := envMap["SLACK_APP_TOKEN"]; !ok { + t.Error("expected SLACK_APP_TOKEN env var even in minimal config") + } +} diff --git a/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml b/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml index e40ebc20..12833398 100644 --- a/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml +++ b/internal/manifests/charts/kelos/templates/crds/taskspawner-crd.yaml @@ -955,6 +955,57 @@ spec: required: - types type: object + slack: + description: |- + Slack discovers work items from Slack messages via Socket Mode. + The spawner connects to Slack via an outbound WebSocket (no ingress + required) and listens for messages in the channels the bot is invited to. + properties: + allowedUsers: + description: |- + AllowedUsers optionally restricts which Slack users can trigger tasks. + Values are Slack user IDs (e.g., "U0123456789"). When empty, any user + in the channel can trigger tasks. + items: + type: string + type: array + channels: + description: |- + Channels optionally restricts which Slack channels the bot listens in. + Values are channel IDs (e.g., "C0123456789"). When empty, the bot + listens in every channel it has been invited to. + items: + type: string + type: array + pollInterval: + description: |- + PollInterval overrides spec.pollInterval for this source (e.g., "30s", "5m"). + Slack uses Socket Mode (real-time), but Discover() is still called on + this interval to drain accumulated events. When empty, spec.pollInterval + is used. + type: string + secretRef: + description: |- + SecretRef references a Secret containing "SLACK_BOT_TOKEN" and + "SLACK_APP_TOKEN" keys. + properties: + name: + description: Name is the name of the secret. + type: string + required: + - name + type: object + triggerCommand: + description: |- + TriggerCommand is an optional slash command or message prefix that + triggers task creation (e.g., "/kelos", "!fix"). When set, only + messages starting with this prefix trigger tasks and the prefix is + stripped from the prompt. When empty, every non-threaded message in + the channel triggers a task. + type: string + required: + - secretRef + type: object type: object required: - taskTemplate diff --git a/internal/manifests/install-crd.yaml b/internal/manifests/install-crd.yaml index f87f063e..fc9972de 100644 --- a/internal/manifests/install-crd.yaml +++ b/internal/manifests/install-crd.yaml @@ -1634,6 +1634,57 @@ spec: required: - types type: object + slack: + description: |- + Slack discovers work items from Slack messages via Socket Mode. + The spawner connects to Slack via an outbound WebSocket (no ingress + required) and listens for messages in the channels the bot is invited to. + properties: + allowedUsers: + description: |- + AllowedUsers optionally restricts which Slack users can trigger tasks. + Values are Slack user IDs (e.g., "U0123456789"). When empty, any user + in the channel can trigger tasks. + items: + type: string + type: array + channels: + description: |- + Channels optionally restricts which Slack channels the bot listens in. + Values are channel IDs (e.g., "C0123456789"). When empty, the bot + listens in every channel it has been invited to. + items: + type: string + type: array + pollInterval: + description: |- + PollInterval overrides spec.pollInterval for this source (e.g., "30s", "5m"). + Slack uses Socket Mode (real-time), but Discover() is still called on + this interval to drain accumulated events. When empty, spec.pollInterval + is used. + type: string + secretRef: + description: |- + SecretRef references a Secret containing "SLACK_BOT_TOKEN" and + "SLACK_APP_TOKEN" keys. + properties: + name: + description: Name is the name of the secret. + type: string + required: + - name + type: object + triggerCommand: + description: |- + TriggerCommand is an optional slash command or message prefix that + triggers task creation (e.g., "/kelos", "!fix"). When set, only + messages starting with this prefix trigger tasks and the prefix is + stripped from the prompt. When empty, every non-threaded message in + the channel triggers a task. + type: string + required: + - secretRef + type: object type: object required: - taskTemplate diff --git a/internal/reporting/slack.go b/internal/reporting/slack.go new file mode 100644 index 00000000..ab4f9e59 --- /dev/null +++ b/internal/reporting/slack.go @@ -0,0 +1,69 @@ +package reporting + +import ( + "context" + "fmt" + + "github.com/slack-go/slack" +) + +// SlackReporter posts and updates thread replies in Slack channels. +type SlackReporter struct { + // BotToken is the Bot User OAuth Token (xoxb-...). + BotToken string + client *slack.Client +} + +func (r *SlackReporter) api() *slack.Client { + if r.client == nil { + r.client = slack.New(r.BotToken) + } + return r.client +} + +// PostThreadReply posts a new message as a thread reply and returns the +// reply's message timestamp. +func (r *SlackReporter) PostThreadReply(ctx context.Context, channel, threadTS, text string) (string, error) { + _, ts, err := r.api().PostMessageContext(ctx, channel, + slack.MsgOptionText(text, false), + slack.MsgOptionTS(threadTS), + ) + if err != nil { + return "", fmt.Errorf("posting Slack thread reply: %w", err) + } + return ts, nil +} + +// UpdateMessage updates an existing Slack message in place. +func (r *SlackReporter) UpdateMessage(ctx context.Context, channel, messageTS, text string) error { + _, _, _, err := r.api().UpdateMessageContext(ctx, channel, messageTS, + slack.MsgOptionText(text, false), + ) + if err != nil { + return fmt.Errorf("updating Slack message: %w", err) + } + return nil +} + +// FormatSlackAccepted returns the thread reply text for an accepted task. +func FormatSlackAccepted(taskName string) string { + return fmt.Sprintf("Working on your request... (Task: %s)", taskName) +} + +// FormatSlackSucceeded returns the thread reply text for a succeeded task. +// When results contain a PR URL, it is included in the message. +func FormatSlackSucceeded(taskName string, results map[string]string) string { + if pr := results["pr"]; pr != "" { + return fmt.Sprintf("Done! PR: %s (Task: %s)", pr, taskName) + } + return fmt.Sprintf("Done! (Task: %s)", taskName) +} + +// FormatSlackFailed returns the thread reply text for a failed task. +// When a status message is available, it is included in the reply. +func FormatSlackFailed(taskName, message string) string { + if message != "" { + return fmt.Sprintf("Failed: %s (Task: %s)", message, taskName) + } + return fmt.Sprintf("Failed. (Task: %s)", taskName) +} diff --git a/internal/reporting/slack_test.go b/internal/reporting/slack_test.go new file mode 100644 index 00000000..d4a6fe11 --- /dev/null +++ b/internal/reporting/slack_test.go @@ -0,0 +1,80 @@ +package reporting + +import ( + "context" + "testing" +) + +func TestFormatSlackMessages(t *testing.T) { + t.Run("accepted", func(t *testing.T) { + got := FormatSlackAccepted("spawner-1234567890.123456") + want := "Working on your request... (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) + + t.Run("succeeded with PR", func(t *testing.T) { + results := map[string]string{"pr": "https://github.com/org/repo/pull/42"} + got := FormatSlackSucceeded("spawner-1234567890.123456", results) + want := "Done! PR: https://github.com/org/repo/pull/42 (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) + + t.Run("succeeded without results", func(t *testing.T) { + got := FormatSlackSucceeded("spawner-1234567890.123456", nil) + want := "Done! (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) + + t.Run("succeeded with empty results", func(t *testing.T) { + got := FormatSlackSucceeded("spawner-1234567890.123456", map[string]string{}) + want := "Done! (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) + + t.Run("failed with message", func(t *testing.T) { + got := FormatSlackFailed("spawner-1234567890.123456", "pod OOMKilled") + want := "Failed: pod OOMKilled (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) + + t.Run("failed without message", func(t *testing.T) { + got := FormatSlackFailed("spawner-1234567890.123456", "") + want := "Failed. (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) +} + +func TestSlackReporterConstruction(t *testing.T) { + reporter := &SlackReporter{BotToken: "xoxb-test-token"} + if reporter.BotToken != "xoxb-test-token" { + t.Errorf("BotToken = %q, want %q", reporter.BotToken, "xoxb-test-token") + } +} + +func TestSlackReporter_PostThreadReplyError(t *testing.T) { + reporter := &SlackReporter{BotToken: "xoxb-invalid"} + _, err := reporter.PostThreadReply(context.Background(), "C123", "1234.5678", "test") + if err == nil { + t.Error("expected error with invalid token, got nil") + } +} + +func TestSlackReporter_UpdateMessageError(t *testing.T) { + reporter := &SlackReporter{BotToken: "xoxb-invalid"} + err := reporter.UpdateMessage(context.Background(), "C123", "1234.5678", "test") + if err == nil { + t.Error("expected error with invalid token, got nil") + } +} diff --git a/internal/reporting/watcher.go b/internal/reporting/watcher.go index 2c7b059a..28cf87cf 100644 --- a/internal/reporting/watcher.go +++ b/internal/reporting/watcher.go @@ -32,6 +32,26 @@ const ( // AnnotationGitHubReportPhase records the last Task phase that was // reported to GitHub, preventing duplicate API calls on re-list. AnnotationGitHubReportPhase = "kelos.dev/github-report-phase" + + // AnnotationSlackReporting indicates that Slack reporting is enabled + // for this Task. + AnnotationSlackReporting = "kelos.dev/slack-reporting" + + // AnnotationSlackChannel records the Slack channel ID where the + // originating message was posted. + AnnotationSlackChannel = "kelos.dev/slack-channel" + + // AnnotationSlackThreadTS records the originating message timestamp, + // used as thread_ts for posting replies. + AnnotationSlackThreadTS = "kelos.dev/slack-thread-ts" + + // AnnotationSlackReplyTS stores the message timestamp of the status + // reply so subsequent updates edit the same message. + AnnotationSlackReplyTS = "kelos.dev/slack-reply-ts" + + // AnnotationSlackReportPhase records the last Task phase that was + // reported to Slack, preventing duplicate API calls on re-list. + AnnotationSlackReportPhase = "kelos.dev/slack-report-phase" ) // TaskReporter watches Tasks and reports status changes to GitHub. @@ -156,3 +176,113 @@ func (tr *TaskReporter) persistReportingState(ctx context.Context, task *kelosv1 return nil } + +// SlackMessenger is the interface for posting and updating Slack messages. +type SlackMessenger interface { + PostThreadReply(ctx context.Context, channel, threadTS, text string) (string, error) + UpdateMessage(ctx context.Context, channel, messageTS, text string) error +} + +// SlackTaskReporter watches Tasks and reports status changes to Slack +// as thread replies on the originating message. +type SlackTaskReporter struct { + Client client.Client + Reporter SlackMessenger +} + +// ReportTaskStatus checks a Task's current phase against its last reported +// phase and creates or updates the Slack thread reply as needed. +func (tr *SlackTaskReporter) ReportTaskStatus(ctx context.Context, task *kelosv1alpha1.Task) error { + log := ctrl.Log.WithName("slack-reporter") + + annotations := task.Annotations + if annotations == nil { + return nil + } + + if annotations[AnnotationSlackReporting] != "enabled" { + return nil + } + + channel := annotations[AnnotationSlackChannel] + threadTS := annotations[AnnotationSlackThreadTS] + if channel == "" || threadTS == "" { + return nil + } + + var desiredPhase string + switch task.Status.Phase { + case kelosv1alpha1.TaskPhasePending, kelosv1alpha1.TaskPhaseRunning, kelosv1alpha1.TaskPhaseWaiting: + desiredPhase = "accepted" + case kelosv1alpha1.TaskPhaseSucceeded: + desiredPhase = "succeeded" + case kelosv1alpha1.TaskPhaseFailed: + desiredPhase = "failed" + default: + return nil + } + + if annotations[AnnotationSlackReportPhase] == desiredPhase { + return nil + } + + var body string + switch desiredPhase { + case "accepted": + body = FormatSlackAccepted(task.Name) + case "succeeded": + body = FormatSlackSucceeded(task.Name, task.Status.Results) + case "failed": + body = FormatSlackFailed(task.Name, task.Status.Message) + } + + replyTS := annotations[AnnotationSlackReplyTS] + if replyTS == "" { + log.Info("Posting Slack thread reply", "task", task.Name, "channel", channel, "phase", desiredPhase) + newTS, err := tr.Reporter.PostThreadReply(ctx, channel, threadTS, body) + if err != nil { + return fmt.Errorf("posting Slack reply for task %s: %w", task.Name, err) + } + replyTS = newTS + } else { + log.Info("Updating Slack thread reply", "task", task.Name, "channel", channel, "phase", desiredPhase) + if err := tr.Reporter.UpdateMessage(ctx, channel, replyTS, body); err != nil { + return fmt.Errorf("updating Slack reply for task %s: %w", task.Name, err) + } + } + + if err := tr.persistSlackReportingState(ctx, task, replyTS, desiredPhase); err != nil { + return err + } + + return nil +} + +func (tr *SlackTaskReporter) persistSlackReportingState(ctx context.Context, task *kelosv1alpha1.Task, replyTS, desiredPhase string) error { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + var current kelosv1alpha1.Task + if err := tr.Client.Get(ctx, client.ObjectKeyFromObject(task), ¤t); err != nil { + return err + } + + if current.Annotations == nil { + current.Annotations = make(map[string]string) + } + current.Annotations[AnnotationSlackReplyTS] = replyTS + current.Annotations[AnnotationSlackReportPhase] = desiredPhase + + if err := tr.Client.Update(ctx, ¤t); err != nil { + return err + } + + task.Annotations = current.Annotations + return nil + }); err != nil { + if apierrors.IsNotFound(err) { + return fmt.Errorf("persisting Slack reporting annotations on task %s: task no longer exists", task.Name) + } + return fmt.Errorf("persisting Slack reporting annotations on task %s: %w", task.Name, err) + } + + return nil +} diff --git a/internal/reporting/watcher_test.go b/internal/reporting/watcher_test.go index 4c6b471f..c832b33b 100644 --- a/internal/reporting/watcher_test.go +++ b/internal/reporting/watcher_test.go @@ -576,3 +576,274 @@ func TestReportTaskStatus_NilAnnotations(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } } + +func TestSlackTaskReporter_PostsThreadReply(t *testing.T) { + task := &kelosv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-task", + Namespace: "default", + Annotations: map[string]string{ + AnnotationSlackReporting: "enabled", + AnnotationSlackChannel: "C123ABC", + AnnotationSlackThreadTS: "1234567890.123456", + }, + }, + Spec: kelosv1alpha1.TaskSpec{ + Type: "claude-code", + Prompt: "test", + Credentials: kelosv1alpha1.Credentials{ + Type: kelosv1alpha1.CredentialTypeOAuth, + SecretRef: &kelosv1alpha1.SecretReference{Name: "creds"}, + }, + }, + Status: kelosv1alpha1.TaskStatus{ + Phase: kelosv1alpha1.TaskPhasePending, + }, + } + + cl := fake.NewClientBuilder().WithScheme(newTestScheme()).WithObjects(task).Build() + + var posted []slackReplyRecord + reporter := &fakeSlackReporter{ + postFn: func(ctx context.Context, channel, threadTS, text string) (string, error) { + posted = append(posted, slackReplyRecord{method: "post", channel: channel, threadTS: threadTS, text: text}) + return "1234567890.999999", nil + }, + } + + tr := &SlackTaskReporter{Client: cl, Reporter: reporter} + + if err := tr.ReportTaskStatus(context.Background(), task); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(posted) != 1 { + t.Fatalf("expected 1 post, got %d", len(posted)) + } + if posted[0].channel != "C123ABC" { + t.Errorf("channel = %q, want C123ABC", posted[0].channel) + } + if posted[0].threadTS != "1234567890.123456" { + t.Errorf("threadTS = %q, want 1234567890.123456", posted[0].threadTS) + } + + // Verify annotations were persisted + var updated kelosv1alpha1.Task + if err := cl.Get(context.Background(), client.ObjectKeyFromObject(task), &updated); err != nil { + t.Fatalf("getting updated task: %v", err) + } + if updated.Annotations[AnnotationSlackReportPhase] != "accepted" { + t.Errorf("report phase = %q, want accepted", updated.Annotations[AnnotationSlackReportPhase]) + } + if updated.Annotations[AnnotationSlackReplyTS] != "1234567890.999999" { + t.Errorf("reply ts = %q, want 1234567890.999999", updated.Annotations[AnnotationSlackReplyTS]) + } +} + +func TestSlackTaskReporter_UpdatesExistingReply(t *testing.T) { + task := &kelosv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-task", + Namespace: "default", + Annotations: map[string]string{ + AnnotationSlackReporting: "enabled", + AnnotationSlackChannel: "C123ABC", + AnnotationSlackThreadTS: "1234567890.123456", + AnnotationSlackReplyTS: "1234567890.999999", + AnnotationSlackReportPhase: "accepted", + }, + }, + Spec: kelosv1alpha1.TaskSpec{ + Type: "claude-code", + Prompt: "test", + Credentials: kelosv1alpha1.Credentials{ + Type: kelosv1alpha1.CredentialTypeOAuth, + SecretRef: &kelosv1alpha1.SecretReference{Name: "creds"}, + }, + }, + Status: kelosv1alpha1.TaskStatus{ + Phase: kelosv1alpha1.TaskPhaseSucceeded, + Results: map[string]string{"pr": "https://github.com/org/repo/pull/42"}, + }, + } + + cl := fake.NewClientBuilder().WithScheme(newTestScheme()).WithObjects(task).Build() + + var updated []slackReplyRecord + reporter := &fakeSlackReporter{ + updateFn: func(ctx context.Context, channel, messageTS, text string) error { + updated = append(updated, slackReplyRecord{method: "update", channel: channel, threadTS: messageTS, text: text}) + return nil + }, + } + + tr := &SlackTaskReporter{Client: cl, Reporter: reporter} + + if err := tr.ReportTaskStatus(context.Background(), task); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(updated) != 1 { + t.Fatalf("expected 1 update, got %d", len(updated)) + } + if updated[0].channel != "C123ABC" { + t.Errorf("channel = %q, want C123ABC", updated[0].channel) + } + // Verify the message includes the PR URL + wantText := FormatSlackSucceeded(task.Name, task.Status.Results) + if updated[0].text != wantText { + t.Errorf("text = %q, want %q", updated[0].text, wantText) + } +} + +func TestSlackTaskReporter_SkipPaths(t *testing.T) { + baseTask := &kelosv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-task", + Namespace: "default", + Annotations: map[string]string{ + AnnotationSlackReporting: "enabled", + AnnotationSlackChannel: "C123ABC", + AnnotationSlackThreadTS: "1234567890.123456", + }, + }, + Status: kelosv1alpha1.TaskStatus{ + Phase: kelosv1alpha1.TaskPhasePending, + }, + } + + tests := []struct { + name string + mutate func(t *kelosv1alpha1.Task) + }{ + { + name: "no reporting annotation", + mutate: func(t *kelosv1alpha1.Task) { + delete(t.Annotations, AnnotationSlackReporting) + }, + }, + { + name: "already reported same phase", + mutate: func(t *kelosv1alpha1.Task) { + t.Annotations[AnnotationSlackReportPhase] = "accepted" + }, + }, + { + name: "nil annotations", + mutate: func(t *kelosv1alpha1.Task) { + t.Annotations = nil + }, + }, + { + name: "missing channel", + mutate: func(t *kelosv1alpha1.Task) { + delete(t.Annotations, AnnotationSlackChannel) + }, + }, + { + name: "empty phase", + mutate: func(t *kelosv1alpha1.Task) { + t.Status.Phase = "" + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + task := baseTask.DeepCopy() + tt.mutate(task) + + called := false + reporter := &fakeSlackReporter{ + postFn: func(ctx context.Context, channel, threadTS, text string) (string, error) { + called = true + return "", nil + }, + } + + cl := fake.NewClientBuilder().WithScheme(newTestScheme()).WithObjects(task).Build() + tr := &SlackTaskReporter{Client: cl, Reporter: reporter} + + if err := tr.ReportTaskStatus(context.Background(), task); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if called { + t.Error("expected reporter to not be called") + } + }) + } +} + +type slackReplyRecord struct { + method string + channel string + threadTS string + text string +} + +type fakeSlackReporter struct { + postFn func(ctx context.Context, channel, threadTS, text string) (string, error) + updateFn func(ctx context.Context, channel, messageTS, text string) error +} + +func (f *fakeSlackReporter) PostThreadReply(ctx context.Context, channel, threadTS, text string) (string, error) { + if f.postFn != nil { + return f.postFn(ctx, channel, threadTS, text) + } + return "fake-reply-ts", nil +} + +func (f *fakeSlackReporter) UpdateMessage(ctx context.Context, channel, messageTS, text string) error { + if f.updateFn != nil { + return f.updateFn(ctx, channel, messageTS, text) + } + return nil +} + +func TestSlackTaskReporter_PhaseMapping(t *testing.T) { + tests := []struct { + name string + phase kelosv1alpha1.TaskPhase + wantDesired string + shouldProcess bool + }{ + {"pending", kelosv1alpha1.TaskPhasePending, "accepted", true}, + {"running", kelosv1alpha1.TaskPhaseRunning, "accepted", true}, + {"waiting", kelosv1alpha1.TaskPhaseWaiting, "accepted", true}, + {"succeeded", kelosv1alpha1.TaskPhaseSucceeded, "succeeded", true}, + {"failed", kelosv1alpha1.TaskPhaseFailed, "failed", true}, + {"empty", "", "", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + task := &kelosv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-task", + Namespace: "default", + Annotations: map[string]string{ + AnnotationSlackReporting: "enabled", + AnnotationSlackChannel: "C123", + AnnotationSlackThreadTS: "1234.5678", + }, + }, + Status: kelosv1alpha1.TaskStatus{ + Phase: tt.phase, + }, + } + + if tt.shouldProcess { + // Mark as already reported to verify skip logic + task.Annotations[AnnotationSlackReportPhase] = tt.wantDesired + } + + cl := fake.NewClientBuilder().WithScheme(newTestScheme()).WithObjects(task).Build() + tr := &SlackTaskReporter{Client: cl, Reporter: &SlackReporter{BotToken: "xoxb-test"}} + + // Should not error — either skips (empty phase) or skips (already reported) + if err := tr.ReportTaskStatus(context.Background(), task); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + } +} diff --git a/internal/source/slack.go b/internal/source/slack.go new file mode 100644 index 00000000..8f9877b9 --- /dev/null +++ b/internal/source/slack.go @@ -0,0 +1,284 @@ +package source + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/slack-go/slack" + "github.com/slack-go/slack/slackevents" + "github.com/slack-go/slack/socketmode" + ctrl "sigs.k8s.io/controller-runtime" +) + +// SlackSource discovers work items from Slack messages via Socket Mode. +// A background goroutine listens for Slack events and accumulates WorkItems +// in an internal queue. Discover() drains the queue on each call. +type SlackSource struct { + // BotToken is the Bot User OAuth Token (xoxb-...). + BotToken string + // AppToken is the App-Level Token for Socket Mode (xapp-...). + AppToken string + // TriggerCommand is an optional slash command or message prefix. + // When empty, every non-threaded message triggers a task. + TriggerCommand string + // Channels restricts listening to specific channel IDs. Empty = all. + Channels []string + // AllowedUsers restricts which user IDs can trigger tasks. Empty = all. + AllowedUsers []string + + mu sync.Mutex + pending []WorkItem + counter int + startOnce sync.Once + startErr error + selfUserID string + api *slack.Client + cancel context.CancelFunc +} + +// Discover returns accumulated WorkItems since the last call. +// On the first call it starts the Socket Mode listener. +func (s *SlackSource) Discover(ctx context.Context) ([]WorkItem, error) { + s.startOnce.Do(func() { + s.startErr = s.Start(ctx) + }) + if s.startErr != nil { + return nil, fmt.Errorf("Starting Slack source: %w", s.startErr) + } + + s.mu.Lock() + items := s.pending + s.pending = nil + s.mu.Unlock() + + return items, nil +} + +// Start connects to Slack via Socket Mode and begins listening for events. +func (s *SlackSource) Start(ctx context.Context) error { + log := ctrl.Log.WithName("slack-source") + + s.api = slack.New( + s.BotToken, + slack.OptionAppLevelToken(s.AppToken), + ) + + authResp, err := s.api.AuthTestContext(ctx) + if err != nil { + return fmt.Errorf("Slack auth test failed: %w", err) + } + s.selfUserID = authResp.UserID + log.Info("Authenticated with Slack", "botUserID", s.selfUserID) + + sm := socketmode.New(s.api) + + bgCtx, cancel := context.WithCancel(context.Background()) + s.mu.Lock() + s.cancel = cancel + s.mu.Unlock() + + go func() { + if err := sm.RunContext(bgCtx); err != nil { + log.Error(err, "Socket Mode connection closed") + } + }() + + go func() { + for evt := range sm.Events { + switch evt.Type { + case socketmode.EventTypeEventsAPI: + s.handleEventsAPI(sm, evt) + case socketmode.EventTypeSlashCommand: + s.handleSlashCommand(sm, evt) + } + } + }() + + return nil +} + +// Stop shuts down the Socket Mode listener. +func (s *SlackSource) Stop() { + s.mu.Lock() + cancel := s.cancel + s.mu.Unlock() + if cancel != nil { + cancel() + } +} + +func (s *SlackSource) handleEventsAPI(sm *socketmode.Client, evt socketmode.Event) { + log := ctrl.Log.WithName("slack-source") + + eventsAPIEvent, ok := evt.Data.(slackevents.EventsAPIEvent) + if !ok { + sm.Ack(*evt.Request) + return + } + sm.Ack(*evt.Request) + + innerEvent, ok := eventsAPIEvent.InnerEvent.Data.(*slackevents.MessageEvent) + if !ok { + return + } + + body, ok := shouldProcess(innerEvent.User, innerEvent.SubType, innerEvent.ThreadTimeStamp, innerEvent.Text, s.selfUserID, s.TriggerCommand) + if !ok { + return + } + + if !matchesChannel(innerEvent.Channel, s.Channels) { + return + } + if !matchesUser(innerEvent.User, s.AllowedUsers) { + return + } + + userName := innerEvent.User + enrichCtx, enrichCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer enrichCancel() + + if info, err := s.api.GetUserInfoContext(enrichCtx, innerEvent.User); err == nil { + userName = info.RealName + if userName == "" { + userName = info.Name + } + } + + permalink := "" + if link, err := s.api.GetPermalinkContext(enrichCtx, &slack.PermalinkParameters{ + Channel: innerEvent.Channel, + Ts: innerEvent.TimeStamp, + }); err == nil { + permalink = link + } + + channelName := innerEvent.Channel + if info, err := s.api.GetConversationInfoContext(enrichCtx, &slack.GetConversationInfoInput{ + ChannelID: innerEvent.Channel, + }); err == nil { + channelName = info.Name + } + + s.mu.Lock() + s.counter++ + item := buildWorkItem(innerEvent.TimeStamp, s.counter, userName, body, permalink, channelName, innerEvent.Channel) + s.pending = append(s.pending, item) + s.mu.Unlock() + + log.Info("Queued Slack message as work item", "number", item.Number, "user", userName, "channel", channelName) +} + +func (s *SlackSource) handleSlashCommand(sm *socketmode.Client, evt socketmode.Event) { + log := ctrl.Log.WithName("slack-source") + + cmd, ok := evt.Data.(slack.SlashCommand) + if !ok { + sm.Ack(*evt.Request) + return + } + sm.Ack(*evt.Request) + + if cmd.UserID == s.selfUserID { + return + } + if !matchesChannel(cmd.ChannelID, s.Channels) { + return + } + if !matchesUser(cmd.UserID, s.AllowedUsers) { + return + } + + body := strings.TrimSpace(cmd.Text) + if body == "" { + return + } + + userName := cmd.UserName + channelName := cmd.ChannelName + + s.mu.Lock() + s.counter++ + itemID := fmt.Sprintf("%s:%s:%s", cmd.ChannelID, cmd.Command, cmd.TriggerID) + item := buildWorkItem(itemID, s.counter, userName, body, "", channelName, cmd.ChannelID) + s.pending = append(s.pending, item) + s.mu.Unlock() + + log.Info("Queued slash command as work item", "number", item.Number, "user", userName, "channel", channelName) +} + +// shouldProcess decides whether a Slack message should become a WorkItem. +// It returns the processed body text and true if the message should trigger, +// or an empty string and false if it should be ignored. +func shouldProcess(userID, subtype, threadTS, text, selfUserID, triggerCmd string) (string, bool) { + if userID == selfUserID { + return "", false + } + switch subtype { + case "bot_message", "message_changed", "message_deleted", "message_replied": + return "", false + } + if threadTS != "" { + return "", false + } + if text == "" { + return "", false + } + + if triggerCmd != "" { + if !strings.HasPrefix(text, triggerCmd) { + return "", false + } + body := strings.TrimSpace(strings.TrimPrefix(text, triggerCmd)) + if body == "" { + return "", false + } + return body, true + } + + return text, true +} + +// matchesChannel returns true if channelID is in the allowed list, +// or if the allowed list is empty (all channels permitted). +func matchesChannel(channelID string, allowed []string) bool { + if len(allowed) == 0 { + return true + } + for _, id := range allowed { + if id == channelID { + return true + } + } + return false +} + +// matchesUser returns true if userID is in the allowed list, +// or if the allowed list is empty (all users permitted). +func matchesUser(userID string, allowed []string) bool { + if len(allowed) == 0 { + return true + } + for _, id := range allowed { + if id == userID { + return true + } + } + return false +} + +// buildWorkItem constructs a WorkItem from Slack message fields. +func buildWorkItem(id string, number int, userName, body, permalink, channelName, channelID string) WorkItem { + return WorkItem{ + ID: id, + Number: number, + Title: userName, + Body: body, + URL: permalink, + Labels: []string{channelName, channelID}, + Kind: "SlackMessage", + } +} diff --git a/internal/source/slack_test.go b/internal/source/slack_test.go new file mode 100644 index 00000000..b985ffcd --- /dev/null +++ b/internal/source/slack_test.go @@ -0,0 +1,330 @@ +package source + +import ( + "context" + "testing" +) + +func TestShouldProcess(t *testing.T) { + tests := []struct { + name string + userID string + subtype string + threadTS string + text string + selfUserID string + triggerCmd string + wantBody string + wantOK bool + }{ + { + name: "top-level message, no trigger command", + userID: "U001", + text: "fix the login page", + selfUserID: "UBOT", + wantBody: "fix the login page", + wantOK: true, + }, + { + name: "top-level message with trigger prefix", + userID: "U001", + text: "/kelos fix the login page", + selfUserID: "UBOT", + triggerCmd: "/kelos", + wantBody: "fix the login page", + wantOK: true, + }, + { + name: "top-level message with trigger prefix and extra spaces", + userID: "U001", + text: "/kelos fix the login page", + selfUserID: "UBOT", + triggerCmd: "/kelos", + wantBody: "fix the login page", + wantOK: true, + }, + { + name: "message prefix trigger", + userID: "U001", + text: "!fix broken button", + selfUserID: "UBOT", + triggerCmd: "!fix", + wantBody: "broken button", + wantOK: true, + }, + { + name: "trigger prefix only, no body after stripping", + userID: "U001", + text: "/kelos", + selfUserID: "UBOT", + triggerCmd: "/kelos", + wantBody: "", + wantOK: false, + }, + { + name: "does not match trigger prefix", + userID: "U001", + text: "unrelated message", + selfUserID: "UBOT", + triggerCmd: "/kelos", + wantBody: "", + wantOK: false, + }, + { + name: "threaded message ignored", + userID: "U001", + text: "this is a reply", + selfUserID: "UBOT", + threadTS: "1234567890.123456", + wantBody: "", + wantOK: false, + }, + { + name: "message from self ignored", + userID: "UBOT", + text: "my own message", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, + { + name: "bot_message subtype ignored", + userID: "U002", + subtype: "bot_message", + text: "bot says hello", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, + { + name: "message_changed subtype ignored", + userID: "U001", + subtype: "message_changed", + text: "edited message", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, + { + name: "message_deleted subtype ignored", + userID: "U001", + subtype: "message_deleted", + text: "deleted message", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, + { + name: "message_replied subtype ignored", + userID: "U001", + subtype: "message_replied", + text: "reply notification", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, + { + name: "empty text ignored", + userID: "U001", + text: "", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + body, ok := shouldProcess(tt.userID, tt.subtype, tt.threadTS, tt.text, tt.selfUserID, tt.triggerCmd) + if ok != tt.wantOK { + t.Errorf("shouldProcess() ok = %v, want %v", ok, tt.wantOK) + } + if body != tt.wantBody { + t.Errorf("shouldProcess() body = %q, want %q", body, tt.wantBody) + } + }) + } +} + +func TestMatchesChannel(t *testing.T) { + tests := []struct { + name string + channelID string + allowed []string + want bool + }{ + { + name: "empty allow list permits all", + channelID: "C123", + allowed: nil, + want: true, + }, + { + name: "matching channel", + channelID: "C123", + allowed: []string{"C123", "C456"}, + want: true, + }, + { + name: "non-matching channel", + channelID: "C789", + allowed: []string{"C123", "C456"}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := matchesChannel(tt.channelID, tt.allowed) + if got != tt.want { + t.Errorf("matchesChannel(%q, %v) = %v, want %v", tt.channelID, tt.allowed, got, tt.want) + } + }) + } +} + +func TestMatchesUser(t *testing.T) { + tests := []struct { + name string + userID string + allowed []string + want bool + }{ + { + name: "empty allow list permits all", + userID: "U001", + allowed: nil, + want: true, + }, + { + name: "matching user", + userID: "U001", + allowed: []string{"U001", "U002"}, + want: true, + }, + { + name: "non-matching user", + userID: "U003", + allowed: []string{"U001", "U002"}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := matchesUser(tt.userID, tt.allowed) + if got != tt.want { + t.Errorf("matchesUser(%q, %v) = %v, want %v", tt.userID, tt.allowed, got, tt.want) + } + }) + } +} + +func TestBuildWorkItem(t *testing.T) { + item := buildWorkItem("1234567890.123456", 42, "Jane Doe", "fix the bug", "https://slack.com/link", "test-channel", "C123ABC") + + if item.ID != "1234567890.123456" { + t.Errorf("expected ID %q, got %q", "1234567890.123456", item.ID) + } + if item.Number != 42 { + t.Errorf("expected Number 42, got %d", item.Number) + } + if item.Title != "Jane Doe" { + t.Errorf("expected Title %q, got %q", "Jane Doe", item.Title) + } + if item.Body != "fix the bug" { + t.Errorf("expected Body %q, got %q", "fix the bug", item.Body) + } + if item.URL != "https://slack.com/link" { + t.Errorf("expected URL %q, got %q", "https://slack.com/link", item.URL) + } + if len(item.Labels) != 2 || item.Labels[0] != "test-channel" || item.Labels[1] != "C123ABC" { + t.Errorf("expected Labels [test-channel C123ABC], got %v", item.Labels) + } + if item.Kind != "SlackMessage" { + t.Errorf("expected Kind %q, got %q", "SlackMessage", item.Kind) + } +} + +// newStartedSlackSource returns a SlackSource where the startOnce has +// already fired (so Discover won't call Start). +func newStartedSlackSource() *SlackSource { + s := &SlackSource{} + s.startOnce.Do(func() {}) // Mark as started without actually connecting + return s +} + +func TestDiscoverDrainsPending(t *testing.T) { + s := newStartedSlackSource() + + // Pre-populate pending items + s.pending = []WorkItem{ + {ID: "1", Title: "User A", Body: "task one"}, + {ID: "2", Title: "User B", Body: "task two"}, + } + + items, err := s.Discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(items) != 2 { + t.Fatalf("expected 2 items, got %d", len(items)) + } + if items[0].ID != "1" || items[1].ID != "2" { + t.Errorf("unexpected items: %+v", items) + } + + // Pending should be empty now + if len(s.pending) != 0 { + t.Errorf("expected pending to be empty, got %d items", len(s.pending)) + } +} + +func TestDiscoverEmpty(t *testing.T) { + s := newStartedSlackSource() + + items, err := s.Discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(items) != 0 { + t.Fatalf("expected 0 items, got %d", len(items)) + } +} + +func TestDiscoverMultipleCalls(t *testing.T) { + s := newStartedSlackSource() + + // First batch + s.pending = []WorkItem{{ID: "1", Body: "first"}} + + items, err := s.Discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(items) != 1 || items[0].ID != "1" { + t.Errorf("first drain: expected [{ID:1}], got %+v", items) + } + + // Second batch + s.pending = []WorkItem{{ID: "2", Body: "second"}, {ID: "3", Body: "third"}} + + items, err = s.Discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(items) != 2 || items[0].ID != "2" || items[1].ID != "3" { + t.Errorf("second drain: expected [{ID:2},{ID:3}], got %+v", items) + } + + // Empty after drain + items, err = s.Discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(items) != 0 { + t.Errorf("expected empty after drain, got %d items", len(items)) + } +}