From d42d45d220ed5b465ec91122ad8f4a2f0aed95b1 Mon Sep 17 00:00:00 2001 From: Jasmine Ahuja Date: Mon, 30 Mar 2026 14:41:34 -0400 Subject: [PATCH 1/4] feat: implement SlackSource with Socket Mode listener Add SlackSource that connects to Slack via Socket Mode and accumulates WorkItems from channel messages. Discover() drains the queue each cycle. Event processing logic (shouldProcess, matchesChannel, matchesUser, buildWorkItem) is extracted as pure functions for testability. The Socket Mode goroutine is a thin glue layer. Filtering: ignores threaded replies, bot messages, and self-messages. Optional trigger command prefix support with stripping. Co-Authored-By: Claude Opus 4.6 --- go.mod | 2 + go.sum | 6 + internal/source/slack.go | 271 ++++++++++++++++++++++++++++++ internal/source/slack_test.go | 301 ++++++++++++++++++++++++++++++++++ 4 files changed, 580 insertions(+) create mode 100644 internal/source/slack.go create mode 100644 internal/source/slack_test.go diff --git a/go.mod b/go.mod index 22d7c3b8..04a87b54 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/prometheus/client_golang v1.23.2 github.com/prometheus/client_model v0.6.2 github.com/robfig/cron/v3 v3.0.1 + github.com/slack-go/slack v0.20.0 github.com/spf13/cobra v1.10.2 github.com/stretchr/testify v1.11.1 go.uber.org/zap v1.27.0 @@ -59,6 +60,7 @@ require ( github.com/google/go-querystring v1.1.0 // indirect github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect github.com/google/renameio/v2 v2.0.0 // indirect + github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/huandu/xstrings v1.5.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect diff --git a/go.sum b/go.sum index 7bd5416c..e2225783 100644 --- a/go.sum +++ b/go.sum @@ -74,6 +74,8 @@ github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7 github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= +github.com/go-test/deep v1.1.1 h1:0r/53hagsehfO4bzD2Pgr/+RgHqhmf+k1Bpse2cTu1U= +github.com/go-test/deep v1.1.1/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= github.com/gobuffalo/flect v1.0.3 h1:xeWBM2nui+qnVvNM4S3foBhCAL2XgPU+a7FdpelbTq4= github.com/gobuffalo/flect v1.0.3/go.mod h1:A5msMlrHtLqh9umBSnvabjsMrCcCpAyzglnDvkbYKHs= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= @@ -106,6 +108,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/yamlfmt v0.21.0 h1:9FKApQkDpMKgBjwLFytBHUCgqnQgxaQnci0uiESfbzs= github.com/google/yamlfmt v0.21.0/go.mod h1:q6FYExB+Ueu7jZDjKECJk+EaeDXJzJ6Ne0dxx69GWfI= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo= +github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= @@ -190,6 +194,8 @@ github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 h1:KRzFb2m7YtdldCEkzs6KqmJw4nqEV github.com/santhosh-tekuri/jsonschema/v6 v6.0.2/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/slack-go/slack v0.20.0 h1:gbDdbee8+Z2o+DWx05Spq3GzbrLLleiRwHUKs+hZLSU= +github.com/slack-go/slack v0.20.0/go.mod h1:K81UmCivcYd/5Jmz8vLBfuyoZ3B4rQC2GHVXHteXiAE= github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs= github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4= github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= diff --git a/internal/source/slack.go b/internal/source/slack.go new file mode 100644 index 00000000..15036cff --- /dev/null +++ b/internal/source/slack.go @@ -0,0 +1,271 @@ +package source + +import ( + "context" + "fmt" + "strings" + "sync" + + "github.com/slack-go/slack" + "github.com/slack-go/slack/slackevents" + "github.com/slack-go/slack/socketmode" + ctrl "sigs.k8s.io/controller-runtime" +) + +// SlackSource discovers work items from Slack messages via Socket Mode. +// A background goroutine listens for Slack events and accumulates WorkItems +// in an internal queue. Discover() drains the queue on each call. +type SlackSource struct { + // BotToken is the Bot User OAuth Token (xoxb-...). + BotToken string + // AppToken is the App-Level Token for Socket Mode (xapp-...). + AppToken string + // TriggerCommand is an optional slash command or message prefix. + // When empty, every non-threaded message triggers a task. + TriggerCommand string + // Channels restricts listening to specific channel IDs. Empty = all. + Channels []string + // AllowedUsers restricts which user IDs can trigger tasks. Empty = all. + AllowedUsers []string + + mu sync.Mutex + pending []WorkItem + counter int + started bool + selfUserID string + api *slack.Client + cancel context.CancelFunc +} + +// Discover returns accumulated WorkItems since the last call. +// On the first call it starts the Socket Mode listener. +func (s *SlackSource) Discover(ctx context.Context) ([]WorkItem, error) { + if !s.started { + if err := s.Start(ctx); err != nil { + return nil, fmt.Errorf("Starting Slack source: %w", err) + } + s.started = true + } + + s.mu.Lock() + items := s.pending + s.pending = nil + s.mu.Unlock() + + return items, nil +} + +// Start connects to Slack via Socket Mode and begins listening for events. +func (s *SlackSource) Start(ctx context.Context) error { + log := ctrl.Log.WithName("slack-source") + + s.api = slack.New( + s.BotToken, + slack.OptionAppLevelToken(s.AppToken), + ) + + authResp, err := s.api.AuthTestContext(ctx) + if err != nil { + return fmt.Errorf("Slack auth test failed: %w", err) + } + s.selfUserID = authResp.UserID + log.Info("Authenticated with Slack", "botUserID", s.selfUserID) + + sm := socketmode.New(s.api) + + ctx, s.cancel = context.WithCancel(ctx) + + go func() { + if err := sm.RunContext(ctx); err != nil { + log.Error(err, "Socket Mode connection closed") + } + }() + + go func() { + for evt := range sm.Events { + switch evt.Type { + case socketmode.EventTypeEventsAPI: + s.handleEventsAPI(sm, evt) + case socketmode.EventTypeSlashCommand: + s.handleSlashCommand(sm, evt) + } + } + }() + + return nil +} + +// Stop shuts down the Socket Mode listener. +func (s *SlackSource) Stop() { + if s.cancel != nil { + s.cancel() + } +} + +func (s *SlackSource) handleEventsAPI(sm *socketmode.Client, evt socketmode.Event) { + log := ctrl.Log.WithName("slack-source") + + eventsAPIEvent, ok := evt.Data.(slackevents.EventsAPIEvent) + if !ok { + sm.Ack(*evt.Request) + return + } + sm.Ack(*evt.Request) + + innerEvent, ok := eventsAPIEvent.InnerEvent.Data.(*slackevents.MessageEvent) + if !ok { + return + } + + body, ok := shouldProcess(innerEvent.User, innerEvent.SubType, innerEvent.ThreadTimeStamp, innerEvent.Text, s.selfUserID, s.TriggerCommand) + if !ok { + return + } + + if !matchesChannel(innerEvent.Channel, s.Channels) { + return + } + if !matchesUser(innerEvent.User, s.AllowedUsers) { + return + } + + userName := innerEvent.User + if info, err := s.api.GetUserInfo(innerEvent.User); err == nil { + userName = info.RealName + if userName == "" { + userName = info.Name + } + } + + permalink := "" + if link, err := s.api.GetPermalink(&slack.PermalinkParameters{ + Channel: innerEvent.Channel, + Ts: innerEvent.TimeStamp, + }); err == nil { + permalink = link + } + + channelName := innerEvent.Channel + if info, err := s.api.GetConversationInfo(&slack.GetConversationInfoInput{ + ChannelID: innerEvent.Channel, + }); err == nil { + channelName = info.Name + } + + s.mu.Lock() + s.counter++ + item := buildWorkItem(innerEvent.TimeStamp, s.counter, userName, body, permalink, channelName) + s.pending = append(s.pending, item) + s.mu.Unlock() + + log.Info("Queued Slack message as work item", "number", item.Number, "user", userName, "channel", channelName) +} + +func (s *SlackSource) handleSlashCommand(sm *socketmode.Client, evt socketmode.Event) { + log := ctrl.Log.WithName("slack-source") + + cmd, ok := evt.Data.(slack.SlashCommand) + if !ok { + sm.Ack(*evt.Request) + return + } + sm.Ack(*evt.Request) + + if cmd.UserID == s.selfUserID { + return + } + if !matchesChannel(cmd.ChannelID, s.Channels) { + return + } + if !matchesUser(cmd.UserID, s.AllowedUsers) { + return + } + + body := strings.TrimSpace(cmd.Text) + if body == "" { + return + } + + userName := cmd.UserName + channelName := cmd.ChannelName + + s.mu.Lock() + s.counter++ + item := buildWorkItem(cmd.TriggerID, s.counter, userName, body, "", channelName) + s.pending = append(s.pending, item) + s.mu.Unlock() + + log.Info("Queued slash command as work item", "number", item.Number, "user", userName, "channel", channelName) +} + +// shouldProcess decides whether a Slack message should become a WorkItem. +// It returns the processed body text and true if the message should trigger, +// or an empty string and false if it should be ignored. +func shouldProcess(userID, subtype, threadTS, text, selfUserID, triggerCmd string) (string, bool) { + if userID == selfUserID { + return "", false + } + if subtype == "bot_message" { + return "", false + } + if threadTS != "" { + return "", false + } + if text == "" { + return "", false + } + + if triggerCmd != "" { + if !strings.HasPrefix(text, triggerCmd) { + return "", false + } + body := strings.TrimSpace(strings.TrimPrefix(text, triggerCmd)) + if body == "" { + return "", false + } + return body, true + } + + return text, true +} + +// matchesChannel returns true if channelID is in the allowed list, +// or if the allowed list is empty (all channels permitted). +func matchesChannel(channelID string, allowed []string) bool { + if len(allowed) == 0 { + return true + } + for _, id := range allowed { + if id == channelID { + return true + } + } + return false +} + +// matchesUser returns true if userID is in the allowed list, +// or if the allowed list is empty (all users permitted). +func matchesUser(userID string, allowed []string) bool { + if len(allowed) == 0 { + return true + } + for _, id := range allowed { + if id == userID { + return true + } + } + return false +} + +// buildWorkItem constructs a WorkItem from Slack message fields. +func buildWorkItem(id string, number int, userName, body, permalink, channelName string) WorkItem { + return WorkItem{ + ID: id, + Number: number, + Title: userName, + Body: body, + URL: permalink, + Labels: []string{channelName}, + Kind: "SlackMessage", + } +} diff --git a/internal/source/slack_test.go b/internal/source/slack_test.go new file mode 100644 index 00000000..39c88755 --- /dev/null +++ b/internal/source/slack_test.go @@ -0,0 +1,301 @@ +package source + +import ( + "context" + "testing" +) + +func TestShouldProcess(t *testing.T) { + tests := []struct { + name string + userID string + subtype string + threadTS string + text string + selfUserID string + triggerCmd string + wantBody string + wantOK bool + }{ + { + name: "top-level message, no trigger command", + userID: "U001", + text: "fix the login page", + selfUserID: "UBOT", + wantBody: "fix the login page", + wantOK: true, + }, + { + name: "top-level message with trigger prefix", + userID: "U001", + text: "/kelos fix the login page", + selfUserID: "UBOT", + triggerCmd: "/kelos", + wantBody: "fix the login page", + wantOK: true, + }, + { + name: "top-level message with trigger prefix and extra spaces", + userID: "U001", + text: "/kelos fix the login page", + selfUserID: "UBOT", + triggerCmd: "/kelos", + wantBody: "fix the login page", + wantOK: true, + }, + { + name: "message prefix trigger", + userID: "U001", + text: "!fix broken button", + selfUserID: "UBOT", + triggerCmd: "!fix", + wantBody: "broken button", + wantOK: true, + }, + { + name: "trigger prefix only, no body after stripping", + userID: "U001", + text: "/kelos", + selfUserID: "UBOT", + triggerCmd: "/kelos", + wantBody: "", + wantOK: false, + }, + { + name: "does not match trigger prefix", + userID: "U001", + text: "unrelated message", + selfUserID: "UBOT", + triggerCmd: "/kelos", + wantBody: "", + wantOK: false, + }, + { + name: "threaded message ignored", + userID: "U001", + text: "this is a reply", + selfUserID: "UBOT", + threadTS: "1234567890.123456", + wantBody: "", + wantOK: false, + }, + { + name: "message from self ignored", + userID: "UBOT", + text: "my own message", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, + { + name: "bot_message subtype ignored", + userID: "U002", + subtype: "bot_message", + text: "bot says hello", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, + { + name: "empty text ignored", + userID: "U001", + text: "", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + body, ok := shouldProcess(tt.userID, tt.subtype, tt.threadTS, tt.text, tt.selfUserID, tt.triggerCmd) + if ok != tt.wantOK { + t.Errorf("shouldProcess() ok = %v, want %v", ok, tt.wantOK) + } + if body != tt.wantBody { + t.Errorf("shouldProcess() body = %q, want %q", body, tt.wantBody) + } + }) + } +} + +func TestMatchesChannel(t *testing.T) { + tests := []struct { + name string + channelID string + allowed []string + want bool + }{ + { + name: "empty allow list permits all", + channelID: "C123", + allowed: nil, + want: true, + }, + { + name: "matching channel", + channelID: "C123", + allowed: []string{"C123", "C456"}, + want: true, + }, + { + name: "non-matching channel", + channelID: "C789", + allowed: []string{"C123", "C456"}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := matchesChannel(tt.channelID, tt.allowed) + if got != tt.want { + t.Errorf("matchesChannel(%q, %v) = %v, want %v", tt.channelID, tt.allowed, got, tt.want) + } + }) + } +} + +func TestMatchesUser(t *testing.T) { + tests := []struct { + name string + userID string + allowed []string + want bool + }{ + { + name: "empty allow list permits all", + userID: "U001", + allowed: nil, + want: true, + }, + { + name: "matching user", + userID: "U001", + allowed: []string{"U001", "U002"}, + want: true, + }, + { + name: "non-matching user", + userID: "U003", + allowed: []string{"U001", "U002"}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := matchesUser(tt.userID, tt.allowed) + if got != tt.want { + t.Errorf("matchesUser(%q, %v) = %v, want %v", tt.userID, tt.allowed, got, tt.want) + } + }) + } +} + +func TestBuildWorkItem(t *testing.T) { + item := buildWorkItem("1234567890.123456", 42, "Jane Doe", "fix the bug", "https://slack.com/link", "test-channel") + + if item.ID != "1234567890.123456" { + t.Errorf("expected ID %q, got %q", "1234567890.123456", item.ID) + } + if item.Number != 42 { + t.Errorf("expected Number 42, got %d", item.Number) + } + if item.Title != "Jane Doe" { + t.Errorf("expected Title %q, got %q", "Jane Doe", item.Title) + } + if item.Body != "fix the bug" { + t.Errorf("expected Body %q, got %q", "fix the bug", item.Body) + } + if item.URL != "https://slack.com/link" { + t.Errorf("expected URL %q, got %q", "https://slack.com/link", item.URL) + } + if len(item.Labels) != 1 || item.Labels[0] != "test-channel" { + t.Errorf("expected Labels [test-channel], got %v", item.Labels) + } + if item.Kind != "SlackMessage" { + t.Errorf("expected Kind %q, got %q", "SlackMessage", item.Kind) + } +} + +func TestDiscoverDrainsPending(t *testing.T) { + s := &SlackSource{ + started: true, // Skip Start() call + } + + // Pre-populate pending items + s.pending = []WorkItem{ + {ID: "1", Title: "User A", Body: "task one"}, + {ID: "2", Title: "User B", Body: "task two"}, + } + + items, err := s.Discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(items) != 2 { + t.Fatalf("expected 2 items, got %d", len(items)) + } + if items[0].ID != "1" || items[1].ID != "2" { + t.Errorf("unexpected items: %+v", items) + } + + // Pending should be empty now + if len(s.pending) != 0 { + t.Errorf("expected pending to be empty, got %d items", len(s.pending)) + } +} + +func TestDiscoverEmpty(t *testing.T) { + s := &SlackSource{ + started: true, // Skip Start() call + } + + items, err := s.Discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(items) != 0 { + t.Fatalf("expected 0 items, got %d", len(items)) + } +} + +func TestDiscoverMultipleCalls(t *testing.T) { + s := &SlackSource{ + started: true, + } + + // First batch + s.pending = []WorkItem{{ID: "1", Body: "first"}} + + items, err := s.Discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(items) != 1 || items[0].ID != "1" { + t.Errorf("first drain: expected [{ID:1}], got %+v", items) + } + + // Second batch + s.pending = []WorkItem{{ID: "2", Body: "second"}, {ID: "3", Body: "third"}} + + items, err = s.Discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(items) != 2 || items[0].ID != "2" || items[1].ID != "3" { + t.Errorf("second drain: expected [{ID:2},{ID:3}], got %+v", items) + } + + // Empty after drain + items, err = s.Discover(context.Background()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(items) != 0 { + t.Errorf("expected empty after drain, got %d items", len(items)) + } +} From 38af305d384658713fd8eaf7a9a9ae6c7eef491b Mon Sep 17 00:00:00 2001 From: Jasmine Ahuja Date: Mon, 30 Mar 2026 15:07:06 -0400 Subject: [PATCH 2/4] fix: address review feedback for SlackSource - Replace started bool with sync.Once to fix potential data race - Use context.Background() for Socket Mode goroutine lifetime - Add 5-second timeout for enrichment API calls - Use composite key for slash command work item ID - Filter message_changed, message_deleted, message_replied subtypes - Add test cases for new filtered subtypes Towards AIE-17 Co-Authored-By: Claude Opus 4.6 --- internal/source/slack.go | 34 +++++++++++++++---------- internal/source/slack_test.go | 47 ++++++++++++++++++++++++++++------- 2 files changed, 59 insertions(+), 22 deletions(-) diff --git a/internal/source/slack.go b/internal/source/slack.go index 15036cff..cfb16647 100644 --- a/internal/source/slack.go +++ b/internal/source/slack.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" "sync" + "time" "github.com/slack-go/slack" "github.com/slack-go/slack/slackevents" @@ -31,7 +32,8 @@ type SlackSource struct { mu sync.Mutex pending []WorkItem counter int - started bool + startOnce sync.Once + startErr error selfUserID string api *slack.Client cancel context.CancelFunc @@ -40,11 +42,11 @@ type SlackSource struct { // Discover returns accumulated WorkItems since the last call. // On the first call it starts the Socket Mode listener. func (s *SlackSource) Discover(ctx context.Context) ([]WorkItem, error) { - if !s.started { - if err := s.Start(ctx); err != nil { - return nil, fmt.Errorf("Starting Slack source: %w", err) - } - s.started = true + s.startOnce.Do(func() { + s.startErr = s.Start(ctx) + }) + if s.startErr != nil { + return nil, fmt.Errorf("Starting Slack source: %w", s.startErr) } s.mu.Lock() @@ -73,10 +75,11 @@ func (s *SlackSource) Start(ctx context.Context) error { sm := socketmode.New(s.api) - ctx, s.cancel = context.WithCancel(ctx) + bgCtx, cancel := context.WithCancel(context.Background()) + s.cancel = cancel go func() { - if err := sm.RunContext(ctx); err != nil { + if err := sm.RunContext(bgCtx); err != nil { log.Error(err, "Socket Mode connection closed") } }() @@ -130,7 +133,10 @@ func (s *SlackSource) handleEventsAPI(sm *socketmode.Client, evt socketmode.Even } userName := innerEvent.User - if info, err := s.api.GetUserInfo(innerEvent.User); err == nil { + enrichCtx, enrichCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer enrichCancel() + + if info, err := s.api.GetUserInfoContext(enrichCtx, innerEvent.User); err == nil { userName = info.RealName if userName == "" { userName = info.Name @@ -138,7 +144,7 @@ func (s *SlackSource) handleEventsAPI(sm *socketmode.Client, evt socketmode.Even } permalink := "" - if link, err := s.api.GetPermalink(&slack.PermalinkParameters{ + if link, err := s.api.GetPermalinkContext(enrichCtx, &slack.PermalinkParameters{ Channel: innerEvent.Channel, Ts: innerEvent.TimeStamp, }); err == nil { @@ -146,7 +152,7 @@ func (s *SlackSource) handleEventsAPI(sm *socketmode.Client, evt socketmode.Even } channelName := innerEvent.Channel - if info, err := s.api.GetConversationInfo(&slack.GetConversationInfoInput{ + if info, err := s.api.GetConversationInfoContext(enrichCtx, &slack.GetConversationInfoInput{ ChannelID: innerEvent.Channel, }); err == nil { channelName = info.Name @@ -191,7 +197,8 @@ func (s *SlackSource) handleSlashCommand(sm *socketmode.Client, evt socketmode.E s.mu.Lock() s.counter++ - item := buildWorkItem(cmd.TriggerID, s.counter, userName, body, "", channelName) + itemID := fmt.Sprintf("%s:%s:%s", cmd.ChannelID, cmd.Command, cmd.TriggerID) + item := buildWorkItem(itemID, s.counter, userName, body, "", channelName) s.pending = append(s.pending, item) s.mu.Unlock() @@ -205,7 +212,8 @@ func shouldProcess(userID, subtype, threadTS, text, selfUserID, triggerCmd strin if userID == selfUserID { return "", false } - if subtype == "bot_message" { + switch subtype { + case "bot_message", "message_changed", "message_deleted", "message_replied": return "", false } if threadTS != "" { diff --git a/internal/source/slack_test.go b/internal/source/slack_test.go index 39c88755..02b025de 100644 --- a/internal/source/slack_test.go +++ b/internal/source/slack_test.go @@ -96,6 +96,33 @@ func TestShouldProcess(t *testing.T) { wantBody: "", wantOK: false, }, + { + name: "message_changed subtype ignored", + userID: "U001", + subtype: "message_changed", + text: "edited message", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, + { + name: "message_deleted subtype ignored", + userID: "U001", + subtype: "message_deleted", + text: "deleted message", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, + { + name: "message_replied subtype ignored", + userID: "U001", + subtype: "message_replied", + text: "reply notification", + selfUserID: "UBOT", + wantBody: "", + wantOK: false, + }, { name: "empty text ignored", userID: "U001", @@ -219,10 +246,16 @@ func TestBuildWorkItem(t *testing.T) { } } +// newStartedSlackSource returns a SlackSource where the startOnce has +// already fired (so Discover won't call Start). +func newStartedSlackSource() *SlackSource { + s := &SlackSource{} + s.startOnce.Do(func() {}) // Mark as started without actually connecting + return s +} + func TestDiscoverDrainsPending(t *testing.T) { - s := &SlackSource{ - started: true, // Skip Start() call - } + s := newStartedSlackSource() // Pre-populate pending items s.pending = []WorkItem{ @@ -249,9 +282,7 @@ func TestDiscoverDrainsPending(t *testing.T) { } func TestDiscoverEmpty(t *testing.T) { - s := &SlackSource{ - started: true, // Skip Start() call - } + s := newStartedSlackSource() items, err := s.Discover(context.Background()) if err != nil { @@ -264,9 +295,7 @@ func TestDiscoverEmpty(t *testing.T) { } func TestDiscoverMultipleCalls(t *testing.T) { - s := &SlackSource{ - started: true, - } + s := newStartedSlackSource() // First batch s.pending = []WorkItem{{ID: "1", Body: "first"}} From 1148eee5a6c5201ad0737688bf77e4ea4b59e4b2 Mon Sep 17 00:00:00 2001 From: Jasmine Ahuja Date: Mon, 30 Mar 2026 15:51:23 -0400 Subject: [PATCH 3/4] fix: guard s.cancel with mutex to prevent data race in Stop() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit s.cancel is written in Start() and read in Stop() — without synchronization this is a race if Stop() is called while Start() is still running inside startOnce.Do. Towards AIE-17 Co-Authored-By: Claude Opus 4.6 --- internal/source/slack.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/source/slack.go b/internal/source/slack.go index cfb16647..6c60637b 100644 --- a/internal/source/slack.go +++ b/internal/source/slack.go @@ -76,7 +76,9 @@ func (s *SlackSource) Start(ctx context.Context) error { sm := socketmode.New(s.api) bgCtx, cancel := context.WithCancel(context.Background()) + s.mu.Lock() s.cancel = cancel + s.mu.Unlock() go func() { if err := sm.RunContext(bgCtx); err != nil { @@ -100,8 +102,11 @@ func (s *SlackSource) Start(ctx context.Context) error { // Stop shuts down the Socket Mode listener. func (s *SlackSource) Stop() { - if s.cancel != nil { - s.cancel() + s.mu.Lock() + cancel := s.cancel + s.mu.Unlock() + if cancel != nil { + cancel() } } From 25aa82bc6433c390cda9252dd02de30631769385 Mon Sep 17 00:00:00 2001 From: jkahuja <49356678+jkahuja@users.noreply.github.com> Date: Tue, 31 Mar 2026 11:16:04 -0400 Subject: [PATCH 4/4] feat: wire Slack source into deployment builder and spawner (#37) * feat: wire Slack source into deployment builder and spawner - Add Slack block to buildPodParts() mounting SLACK_BOT_TOKEN and SLACK_APP_TOKEN from SecretRef, passing optional CLI args for trigger command, channels, and allowed users - Add CLI flags and spawnerRuntimeConfig fields for Slack config - Add Slack case to buildSource() and resolvedPollInterval() - Add parseCSV helper for comma-separated CLI flag values - Add TestDeploymentBuilder_Slack and TestDeploymentBuilder_SlackMinimal Towards AIE-17 Co-Authored-By: Claude Opus 4.6 * test: add TestBuildSource_Slack for buildSource coverage Verifies the Slack branch in buildSource() correctly reads env vars for bot/app tokens and parses CSV CLI args for trigger command, channels, and allowed users. Towards AIE-17 Co-Authored-By: Claude Opus 4.6 * feat: add Slack status feedback as thread replies (#38) * feat: add Slack status feedback as thread replies When a Slack message triggers a Task, the bot now posts thread replies to the originating message as the Task progresses: - "Working on your request..." when accepted - "Done!" when succeeded - "Failed." when failed Implementation: - SlackReporter posts/updates thread replies via slack-go - SlackTaskReporter mirrors GitHubTaskReporter pattern using Slack-specific annotations for channel, thread_ts, reply_ts - sourceAnnotations() stamps Slack metadata on Tasks at creation - reportingEnabled() returns true for Slack sources (always-on) - buildWorkItem() now carries channel ID for annotation stamping Towards AIE-17 Co-Authored-By: Claude Opus 4.6 * fix: enrich Slack status messages and address review feedback - Include PR URL in succeeded messages and error details in failed messages - Extract SlackMessenger interface for testability (enables fake injection) - Reuse slack.Client instead of creating one per API call - Validate SLACK_BOT_TOKEN is set before constructing reporter - Add full post+persist and update path tests for SlackTaskReporter Co-Authored-By: Claude Opus 4.6 * fix: skip thread_ts annotation for slash command work items Slash command item IDs are compound strings (e.g. "C123:/cmd:trigger") that are not valid Slack message timestamps. Using them as thread_ts would cause PostThreadReply to fail with invalid_arguments. Only set the annotation when the ID is a valid Slack timestamp (digits.digits). Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: Claude Opus 4.6 --------- Co-authored-by: Claude Opus 4.6 --- cmd/kelos-spawner/main.go | 123 ++++++-- cmd/kelos-spawner/main_test.go | 144 +++++++++- cmd/kelos-spawner/reconciler.go | 76 +++-- .../taskspawner_deployment_builder.go | 37 +++ .../taskspawner_deployment_builder_test.go | 139 +++++++++ internal/reporting/slack.go | 69 +++++ internal/reporting/slack_test.go | 80 ++++++ internal/reporting/watcher.go | 130 +++++++++ internal/reporting/watcher_test.go | 271 ++++++++++++++++++ internal/source/slack.go | 8 +- internal/source/slack_test.go | 6 +- 11 files changed, 1020 insertions(+), 63 deletions(-) create mode 100644 internal/reporting/slack.go create mode 100644 internal/reporting/slack_test.go diff --git a/cmd/kelos-spawner/main.go b/cmd/kelos-spawner/main.go index 59df50e8..c68b6a45 100644 --- a/cmd/kelos-spawner/main.go +++ b/cmd/kelos-spawner/main.go @@ -48,6 +48,9 @@ func main() { var jiraBaseURL string var jiraProject string var jiraJQL string + var slackTriggerCommand string + var slackChannels string + var slackAllowedUsers string var oneShot bool flag.StringVar(&name, "taskspawner-name", "", "Name of the TaskSpawner to manage") @@ -59,6 +62,9 @@ func main() { flag.StringVar(&jiraBaseURL, "jira-base-url", "", "Jira instance base URL (e.g. https://mycompany.atlassian.net)") flag.StringVar(&jiraProject, "jira-project", "", "Jira project key") flag.StringVar(&jiraJQL, "jira-jql", "", "Optional JQL filter for Jira issues") + flag.StringVar(&slackTriggerCommand, "slack-trigger-command", "", "Slack trigger command or message prefix") + flag.StringVar(&slackChannels, "slack-channels", "", "Comma-separated list of Slack channel IDs to listen in") + flag.StringVar(&slackAllowedUsers, "slack-allowed-users", "", "Comma-separated list of allowed Slack user IDs") flag.BoolVar(&oneShot, "one-shot", false, "Run a single discovery cycle and exit (used by CronJob)") opts, applyVerbosity := logging.SetupZapOptions(flag.CommandLine) @@ -106,15 +112,18 @@ func main() { httpClient := &http.Client{Transport: transport} cfgArgs := spawnerRuntimeConfig{ - GitHubOwner: githubOwner, - GitHubRepo: githubRepo, - GitHubAPIBaseURL: githubAPIBaseURL, - GHProxyURL: ghProxyURL, - GitHubTokenFile: githubTokenFile, - JiraBaseURL: jiraBaseURL, - JiraProject: jiraProject, - JiraJQL: jiraJQL, - HTTPClient: httpClient, + GitHubOwner: githubOwner, + GitHubRepo: githubRepo, + GitHubAPIBaseURL: githubAPIBaseURL, + GHProxyURL: ghProxyURL, + GitHubTokenFile: githubTokenFile, + JiraBaseURL: jiraBaseURL, + JiraProject: jiraProject, + JiraJQL: jiraJQL, + SlackTriggerCommand: slackTriggerCommand, + SlackChannels: slackChannels, + SlackAllowedUsers: slackAllowedUsers, + HTTPClient: httpClient, } if oneShot { @@ -177,9 +186,26 @@ func runReportingCycle(ctx context.Context, cl client.Client, key types.Namespac return nil } -func runCycle(ctx context.Context, cl client.Client, key types.NamespacedName, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL string, httpClient *http.Client) error { +func runSlackReportingCycle(ctx context.Context, cl client.Client, key types.NamespacedName, reporter *reporting.SlackTaskReporter) error { + var taskList kelosv1alpha1.TaskList + if err := cl.List(ctx, &taskList, + client.InNamespace(key.Namespace), + client.MatchingLabels{"kelos.dev/taskspawner": key.Name}, + ); err != nil { + return fmt.Errorf("listing tasks for Slack reporting: %w", err) + } + + for i := range taskList.Items { + if err := reporter.ReportTaskStatus(ctx, &taskList.Items[i]); err != nil { + ctrl.Log.WithName("spawner").Error(err, "Reporting Slack task status", "task", taskList.Items[i].Name) + } + } + return nil +} + +func runCycle(ctx context.Context, cl client.Client, key types.NamespacedName, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers string, httpClient *http.Client) error { start := time.Now() - err := runCycleCore(ctx, cl, key, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, httpClient) + err := runCycleCore(ctx, cl, key, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers, httpClient) discoveryDurationSeconds.Observe(time.Since(start).Seconds()) if err != nil { discoveryErrorsTotal.Inc() @@ -187,13 +213,13 @@ func runCycle(ctx context.Context, cl client.Client, key types.NamespacedName, g return err } -func runCycleCore(ctx context.Context, cl client.Client, key types.NamespacedName, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL string, httpClient *http.Client) error { +func runCycleCore(ctx context.Context, cl client.Client, key types.NamespacedName, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers string, httpClient *http.Client) error { var ts kelosv1alpha1.TaskSpawner if err := cl.Get(ctx, key, &ts); err != nil { return fmt.Errorf("fetching TaskSpawner: %w", err) } - src, err := buildSource(&ts, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, httpClient) + src, err := buildSource(&ts, githubOwner, githubRepo, githubAPIBaseURL, githubTokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers, httpClient) if err != nil { return fmt.Errorf("building source: %w", err) } @@ -517,10 +543,24 @@ func renderTaskTemplateMetadata(ts *kelosv1alpha1.TaskSpawner, item source.WorkI return labels, annotations, nil } -// sourceAnnotations returns annotations that stamp GitHub source metadata -// onto a spawned Task. These annotations enable downstream consumers (such -// as the reporting watcher) to identify the originating issue or PR. +// sourceAnnotations returns annotations that stamp source metadata onto a +// spawned Task. These annotations enable downstream consumers (such as the +// reporting watcher) to identify the originating issue, PR, or Slack message. func sourceAnnotations(ts *kelosv1alpha1.TaskSpawner, item source.WorkItem) map[string]string { + if ts.Spec.When.Slack != nil && len(item.Labels) >= 2 { + annotations := map[string]string{ + reporting.AnnotationSlackReporting: "enabled", + reporting.AnnotationSlackChannel: item.Labels[1], + } + // Only set thread_ts when the item ID is a valid Slack message + // timestamp (e.g. "1234567890.123456"). Slash command IDs are + // compound strings containing colons and are not valid timestamps. + if isSlackTimestamp(item.ID) { + annotations[reporting.AnnotationSlackThreadTS] = item.ID + } + return annotations + } + if ts.Spec.When.GitHubIssues == nil && ts.Spec.When.GitHubPullRequests == nil { return nil } @@ -542,9 +582,12 @@ func sourceAnnotations(ts *kelosv1alpha1.TaskSpawner, item source.WorkItem) map[ return annotations } -// reportingEnabled returns true when GitHub reporting is configured and enabled +// reportingEnabled returns true when reporting is configured and enabled // on the TaskSpawner. func reportingEnabled(ts *kelosv1alpha1.TaskSpawner) bool { + if ts.Spec.When.Slack != nil { + return true + } if ts.Spec.When.GitHubIssues != nil && ts.Spec.When.GitHubIssues.Reporting != nil { return ts.Spec.When.GitHubIssues.Reporting.Enabled } @@ -596,7 +639,7 @@ func resolveGitHubCommentPolicy(policy *kelosv1alpha1.GitHubCommentPolicy, legac }, nil } -func buildSource(ts *kelosv1alpha1.TaskSpawner, owner, repo, apiBaseURL, tokenFile, jiraBaseURL, jiraProject, jiraJQL string, httpClient *http.Client) (source.Source, error) { +func buildSource(ts *kelosv1alpha1.TaskSpawner, owner, repo, apiBaseURL, tokenFile, jiraBaseURL, jiraProject, jiraJQL, slackTriggerCommand, slackChannels, slackAllowedUsers string, httpClient *http.Client) (source.Source, error) { if ts.Spec.When.GitHubIssues != nil { gh := ts.Spec.When.GitHubIssues token, err := readGitHubToken(tokenFile) @@ -673,6 +716,18 @@ func buildSource(ts *kelosv1alpha1.TaskSpawner, owner, repo, apiBaseURL, tokenFi }, nil } + if ts.Spec.When.Slack != nil { + botToken := os.Getenv("SLACK_BOT_TOKEN") + appToken := os.Getenv("SLACK_APP_TOKEN") + return &source.SlackSource{ + BotToken: botToken, + AppToken: appToken, + TriggerCommand: slackTriggerCommand, + Channels: parseCSV(slackChannels), + AllowedUsers: parseCSV(slackAllowedUsers), + }, nil + } + if ts.Spec.When.Cron != nil { var lastDiscovery time.Time if ts.Status.LastDiscoveryTime != nil { @@ -689,6 +744,20 @@ func buildSource(ts *kelosv1alpha1.TaskSpawner, owner, repo, apiBaseURL, tokenFi return nil, fmt.Errorf("no source configured in TaskSpawner %s/%s", ts.Namespace, ts.Name) } +func parseCSV(s string) []string { + if s == "" { + return nil + } + var result []string + for _, item := range strings.Split(s, ",") { + item = strings.TrimSpace(item) + if item != "" { + result = append(result, item) + } + } + return result +} + func readGitHubToken(tokenFile string) (string, error) { token := os.Getenv("GITHUB_TOKEN") if tokenFile == "" { @@ -769,6 +838,24 @@ func parseOwnerRepo(repoURL string) (string, string) { return "", "" } +// isSlackTimestamp returns true when s looks like a Slack message timestamp +// (e.g. "1234567890.123456"). Slash command work-item IDs are compound +// strings like "C123:/cmd:trigger" and must not be used as thread_ts. +func isSlackTimestamp(s string) bool { + parts := strings.SplitN(s, ".", 2) + if len(parts) != 2 || parts[0] == "" || parts[1] == "" { + return false + } + for _, p := range parts { + for _, c := range p { + if c < '0' || c > '9' { + return false + } + } + } + return true +} + func parsePollInterval(s string) time.Duration { if s == "" { return 5 * time.Minute diff --git a/cmd/kelos-spawner/main_test.go b/cmd/kelos-spawner/main_test.go index 1638249c..f19d39da 100644 --- a/cmd/kelos-spawner/main_test.go +++ b/cmd/kelos-spawner/main_test.go @@ -138,7 +138,7 @@ func newTask(name, namespace, spawnerName string, phase kelosv1alpha1.TaskPhase) func TestBuildSource_GitHubIssuesWithBaseURL(t *testing.T) { ts := newTaskSpawner("spawner", "default", nil) - src, err := buildSource(ts, "my-org", "my-repo", "https://github.example.com/api/v3", "", "", "", "", nil) + src, err := buildSource(ts, "my-org", "my-repo", "https://github.example.com/api/v3", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -161,7 +161,7 @@ func TestBuildSource_GitHubIssuesWithBaseURL(t *testing.T) { func TestBuildSource_GitHubIssuesDefaultBaseURL(t *testing.T) { ts := newTaskSpawner("spawner", "default", nil) - src, err := buildSource(ts, "kelos-dev", "kelos", "", "", "", "", "", nil) + src, err := buildSource(ts, "kelos-dev", "kelos", "", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -187,7 +187,7 @@ func TestBuildSource_GitHubPullRequests(t *testing.T) { }, } - src, err := buildSource(ts, "kelos-dev", "kelos", "https://github.example.com/api/v3", "", "", "", "", nil) + src, err := buildSource(ts, "kelos-dev", "kelos", "https://github.example.com/api/v3", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -247,7 +247,7 @@ func TestBuildSource_Jira(t *testing.T) { t.Setenv("JIRA_USER", "user@example.com") t.Setenv("JIRA_TOKEN", "jira-api-token") - src, err := buildSource(ts, "", "", "", "", "https://mycompany.atlassian.net", "PROJ", "status = Open", nil) + src, err := buildSource(ts, "", "", "", "", "https://mycompany.atlassian.net", "PROJ", "status = Open", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -273,6 +273,46 @@ func TestBuildSource_Jira(t *testing.T) { } } +func TestBuildSource_Slack(t *testing.T) { + t.Setenv("SLACK_BOT_TOKEN", "xoxb-test-token") + t.Setenv("SLACK_APP_TOKEN", "xapp-test-token") + + ts := &kelosv1alpha1.TaskSpawner{ + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Slack: &kelosv1alpha1.Slack{ + SecretRef: kelosv1alpha1.SecretReference{Name: "slack-creds"}, + }, + }, + }, + } + + src, err := buildSource(ts, "", "", "", "", "", "", "", "/kelos", "C123,C456", "U001,U002", nil) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + slackSrc, ok := src.(*source.SlackSource) + if !ok { + t.Fatalf("Expected *source.SlackSource, got %T", src) + } + if slackSrc.BotToken != "xoxb-test-token" { + t.Errorf("BotToken = %q, want %q", slackSrc.BotToken, "xoxb-test-token") + } + if slackSrc.AppToken != "xapp-test-token" { + t.Errorf("AppToken = %q, want %q", slackSrc.AppToken, "xapp-test-token") + } + if slackSrc.TriggerCommand != "/kelos" { + t.Errorf("TriggerCommand = %q, want %q", slackSrc.TriggerCommand, "/kelos") + } + if len(slackSrc.Channels) != 2 || slackSrc.Channels[0] != "C123" || slackSrc.Channels[1] != "C456" { + t.Errorf("Channels = %v, want [C123 C456]", slackSrc.Channels) + } + if len(slackSrc.AllowedUsers) != 2 || slackSrc.AllowedUsers[0] != "U001" || slackSrc.AllowedUsers[1] != "U002" { + t.Errorf("AllowedUsers = %v, want [U001 U002]", slackSrc.AllowedUsers) + } +} + func TestRunCycleWithSource_NoMaxConcurrency(t *testing.T) { ts := newTaskSpawner("spawner", "default", nil) cl, key := setupTest(t, ts) @@ -535,7 +575,7 @@ func TestRunCycle_BuildSourceFailureCountsDiscoveryErrorAndDuration(t *testing.T beforeErrors := testutil.ToFloat64(discoveryErrorsTotal) beforeDurationCount := histogramSampleCount(t, discoveryDurationSeconds) - err := runCycle(context.Background(), cl, key, "owner", "repo", "", "", "", "", "", nil) + err := runCycle(context.Background(), cl, key, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err == nil { t.Fatal("Expected buildSource error") } @@ -1103,7 +1143,7 @@ func TestBuildSource_PriorityLabelsPassedToSource(t *testing.T) { "priority/imporant-soon", } - src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", nil) + src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1130,7 +1170,7 @@ func TestRunCycleWithSource_CommentFieldsPassedToSource(t *testing.T) { ExcludeComments: []string{"/kelos needs-input"}, } - src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", nil) + src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1159,7 +1199,7 @@ func TestBuildSource_CommentPolicyPassedToIssueSource(t *testing.T) { }, } - src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", nil) + src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1199,7 +1239,7 @@ func TestBuildSource_CommentPolicyPassedToPullRequestSource(t *testing.T) { }, } - src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", nil) + src, err := buildSource(ts, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1264,7 +1304,7 @@ func TestBuildSource_CommentPolicyRejectsMixedConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := buildSource(tt.ts, "owner", "repo", "", "", "", "", "", nil) + _, err := buildSource(tt.ts, "owner", "repo", "", "", "", "", "", "", "", "", nil) if err == nil { t.Fatal("Expected error for mixed legacy and commentPolicy config") } @@ -1799,6 +1839,90 @@ func TestSourceAnnotations_ReportingEnabledPR(t *testing.T) { } } +func TestSourceAnnotations_SlackMessage(t *testing.T) { + ts := &kelosv1alpha1.TaskSpawner{ + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Slack: &kelosv1alpha1.Slack{}, + }, + }, + } + + item := source.WorkItem{ + ID: "1234567890.123456", + Kind: "SlackMessage", + Labels: []string{"general", "C123ABC"}, + } + + annotations := sourceAnnotations(ts, item) + if annotations == nil { + t.Fatal("Expected annotations, got nil") + } + if annotations[reporting.AnnotationSlackReporting] != "enabled" { + t.Errorf("Expected slack-reporting 'enabled', got %q", annotations[reporting.AnnotationSlackReporting]) + } + if annotations[reporting.AnnotationSlackChannel] != "C123ABC" { + t.Errorf("Expected slack-channel 'C123ABC', got %q", annotations[reporting.AnnotationSlackChannel]) + } + if annotations[reporting.AnnotationSlackThreadTS] != "1234567890.123456" { + t.Errorf("Expected slack-thread-ts '1234567890.123456', got %q", annotations[reporting.AnnotationSlackThreadTS]) + } +} + +func TestSourceAnnotations_SlackSlashCommand(t *testing.T) { + ts := &kelosv1alpha1.TaskSpawner{ + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Slack: &kelosv1alpha1.Slack{}, + }, + }, + } + + // Slash command IDs are compound strings, not valid Slack timestamps + item := source.WorkItem{ + ID: "C123ABC:/kelos:trigger-id-abc", + Kind: "SlackMessage", + Labels: []string{"general", "C123ABC"}, + } + + annotations := sourceAnnotations(ts, item) + if annotations == nil { + t.Fatal("Expected annotations, got nil") + } + if annotations[reporting.AnnotationSlackReporting] != "enabled" { + t.Errorf("Expected slack-reporting 'enabled', got %q", annotations[reporting.AnnotationSlackReporting]) + } + if annotations[reporting.AnnotationSlackChannel] != "C123ABC" { + t.Errorf("Expected slack-channel 'C123ABC', got %q", annotations[reporting.AnnotationSlackChannel]) + } + if _, ok := annotations[reporting.AnnotationSlackThreadTS]; ok { + t.Errorf("Expected no slack-thread-ts for slash command, got %q", annotations[reporting.AnnotationSlackThreadTS]) + } +} + +func TestIsSlackTimestamp(t *testing.T) { + tests := []struct { + input string + want bool + }{ + {"1234567890.123456", true}, + {"0.0", true}, + {"C123ABC:/kelos:trigger-id-abc", false}, + {"not-a-timestamp", false}, + {"", false}, + {"1234567890", false}, + {".123456", false}, + {"1234567890.", false}, + } + for _, tt := range tests { + t.Run(tt.input, func(t *testing.T) { + if got := isSlackTimestamp(tt.input); got != tt.want { + t.Errorf("isSlackTimestamp(%q) = %v, want %v", tt.input, got, tt.want) + } + }) + } +} + func TestSourceAnnotations_NonGitHub(t *testing.T) { ts := &kelosv1alpha1.TaskSpawner{ Spec: kelosv1alpha1.TaskSpawnerSpec{ diff --git a/cmd/kelos-spawner/reconciler.go b/cmd/kelos-spawner/reconciler.go index 8c2cab09..ca969ec2 100644 --- a/cmd/kelos-spawner/reconciler.go +++ b/cmd/kelos-spawner/reconciler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "os" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -22,15 +23,18 @@ import ( ) type spawnerRuntimeConfig struct { - GitHubOwner string - GitHubRepo string - GitHubAPIBaseURL string - GHProxyURL string - GitHubTokenFile string - JiraBaseURL string - JiraProject string - JiraJQL string - HTTPClient *http.Client + GitHubOwner string + GitHubRepo string + GitHubAPIBaseURL string + GHProxyURL string + GitHubTokenFile string + JiraBaseURL string + JiraProject string + JiraJQL string + SlackTriggerCommand string + SlackChannels string + SlackAllowedUsers string + HTTPClient *http.Client } type spawnerReconciler struct { @@ -69,7 +73,7 @@ func (r *spawnerReconciler) SetupWithManager(mgr ctrl.Manager) error { } func runOnce(ctx context.Context, cl client.Client, key types.NamespacedName, cfg spawnerRuntimeConfig) (time.Duration, error) { - if err := runCycle(ctx, cl, key, cfg.GitHubOwner, cfg.GitHubRepo, cfg.GHProxyURL, cfg.GitHubTokenFile, cfg.JiraBaseURL, cfg.JiraProject, cfg.JiraJQL, cfg.HTTPClient); err != nil { + if err := runCycle(ctx, cl, key, cfg.GitHubOwner, cfg.GitHubRepo, cfg.GHProxyURL, cfg.GitHubTokenFile, cfg.JiraBaseURL, cfg.JiraProject, cfg.JiraJQL, cfg.SlackTriggerCommand, cfg.SlackChannels, cfg.SlackAllowedUsers, cfg.HTTPClient); err != nil { return 0, err } @@ -79,25 +83,39 @@ func runOnce(ctx context.Context, cl client.Client, key types.NamespacedName, cf } if reportingEnabled(&ts) { - token, err := readGitHubToken(cfg.GitHubTokenFile) - if err != nil { - return 0, fmt.Errorf("reading GitHub token for reporting: %w", err) - } + if ts.Spec.When.Slack != nil { + botToken := os.Getenv("SLACK_BOT_TOKEN") + if botToken == "" { + return 0, fmt.Errorf("SLACK_BOT_TOKEN environment variable is required for Slack reporting") + } + slackReporter := &reporting.SlackTaskReporter{ + Client: cl, + Reporter: &reporting.SlackReporter{BotToken: botToken}, + } + if err := runSlackReportingCycle(ctx, cl, key, slackReporter); err != nil { + return 0, err + } + } else { + token, err := readGitHubToken(cfg.GitHubTokenFile) + if err != nil { + return 0, fmt.Errorf("reading GitHub token for reporting: %w", err) + } - // Reporting always uses the direct API base URL (writes bypass the proxy). - reporter := &reporting.TaskReporter{ - Client: cl, - Reporter: &reporting.GitHubReporter{ - Owner: cfg.GitHubOwner, - Repo: cfg.GitHubRepo, - Token: token, - TokenFile: cfg.GitHubTokenFile, - BaseURL: cfg.GitHubAPIBaseURL, - Client: cfg.HTTPClient, - }, - } - if err := runReportingCycle(ctx, cl, key, reporter); err != nil { - return 0, err + // Reporting always uses the direct API base URL (writes bypass the proxy). + reporter := &reporting.TaskReporter{ + Client: cl, + Reporter: &reporting.GitHubReporter{ + Owner: cfg.GitHubOwner, + Repo: cfg.GitHubRepo, + Token: token, + TokenFile: cfg.GitHubTokenFile, + BaseURL: cfg.GitHubAPIBaseURL, + Client: cfg.HTTPClient, + }, + } + if err := runReportingCycle(ctx, cl, key, reporter); err != nil { + return 0, err + } } } @@ -116,6 +134,8 @@ func resolvedPollInterval(ts *kelosv1alpha1.TaskSpawner) time.Duration { sourceInterval = ts.Spec.When.GitHubPullRequests.PollInterval case ts.Spec.When.Jira != nil: sourceInterval = ts.Spec.When.Jira.PollInterval + case ts.Spec.When.Slack != nil: + sourceInterval = ts.Spec.When.Slack.PollInterval } if sourceInterval != "" { return parsePollInterval(sourceInterval) diff --git a/internal/controller/taskspawner_deployment_builder.go b/internal/controller/taskspawner_deployment_builder.go index 04d9273c..794d460f 100644 --- a/internal/controller/taskspawner_deployment_builder.go +++ b/internal/controller/taskspawner_deployment_builder.go @@ -233,6 +233,43 @@ func (b *DeploymentBuilder) buildPodParts(ts *kelosv1alpha1.TaskSpawner, workspa ) } + if ts.Spec.When.Slack != nil { + slack := ts.Spec.When.Slack + if slack.TriggerCommand != "" { + args = append(args, "--slack-trigger-command="+slack.TriggerCommand) + } + if len(slack.Channels) > 0 { + args = append(args, "--slack-channels="+strings.Join(slack.Channels, ",")) + } + if len(slack.AllowedUsers) > 0 { + args = append(args, "--slack-allowed-users="+strings.Join(slack.AllowedUsers, ",")) + } + envVars = append(envVars, + corev1.EnvVar{ + Name: "SLACK_BOT_TOKEN", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: slack.SecretRef.Name, + }, + Key: "SLACK_BOT_TOKEN", + }, + }, + }, + corev1.EnvVar{ + Name: "SLACK_APP_TOKEN", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: slack.SecretRef.Name, + }, + Key: "SLACK_APP_TOKEN", + }, + }, + }, + ) + } + labels := map[string]string{ "kelos.dev/name": "kelos", "kelos.dev/component": "spawner", diff --git a/internal/controller/taskspawner_deployment_builder_test.go b/internal/controller/taskspawner_deployment_builder_test.go index 8cecf5de..a3a1799e 100644 --- a/internal/controller/taskspawner_deployment_builder_test.go +++ b/internal/controller/taskspawner_deployment_builder_test.go @@ -3012,3 +3012,142 @@ func TestReconcileDeployment_KeepsDeploymentWithNewLabels(t *testing.T) { t.Errorf("expected kelos.dev/component label in selector") } } + +func TestDeploymentBuilder_Slack(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-spawner", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Slack: &kelosv1alpha1.Slack{ + SecretRef: kelosv1alpha1.SecretReference{Name: "slack-creds"}, + TriggerCommand: "/kelos", + Channels: []string{"C123", "C456"}, + AllowedUsers: []string{"U001", "U002"}, + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + }, + } + + deploy := builder.Build(ts, nil, false) + + if len(deploy.Spec.Template.Spec.Containers) != 1 { + t.Fatalf("expected 1 container, got %d", len(deploy.Spec.Template.Spec.Containers)) + } + + spawner := deploy.Spec.Template.Spec.Containers[0] + + // Check Slack args + foundTriggerCommand := false + foundChannels := false + foundAllowedUsers := false + for _, arg := range spawner.Args { + switch { + case arg == "--slack-trigger-command=/kelos": + foundTriggerCommand = true + case arg == "--slack-channels=C123,C456": + foundChannels = true + case arg == "--slack-allowed-users=U001,U002": + foundAllowedUsers = true + } + } + if !foundTriggerCommand { + t.Errorf("expected --slack-trigger-command arg, got args: %v", spawner.Args) + } + if !foundChannels { + t.Errorf("expected --slack-channels arg, got args: %v", spawner.Args) + } + if !foundAllowedUsers { + t.Errorf("expected --slack-allowed-users arg, got args: %v", spawner.Args) + } + + // Check env vars + envMap := make(map[string]corev1.EnvVar) + for _, env := range spawner.Env { + envMap[env.Name] = env + } + + botToken, ok := envMap["SLACK_BOT_TOKEN"] + if !ok { + t.Fatal("expected SLACK_BOT_TOKEN env var") + } + if botToken.ValueFrom == nil || botToken.ValueFrom.SecretKeyRef == nil { + t.Fatal("expected SLACK_BOT_TOKEN to reference a secret") + } + if botToken.ValueFrom.SecretKeyRef.Name != "slack-creds" { + t.Errorf("SLACK_BOT_TOKEN secret name = %q, want %q", botToken.ValueFrom.SecretKeyRef.Name, "slack-creds") + } + if botToken.ValueFrom.SecretKeyRef.Key != "SLACK_BOT_TOKEN" { + t.Errorf("SLACK_BOT_TOKEN secret key = %q, want %q", botToken.ValueFrom.SecretKeyRef.Key, "SLACK_BOT_TOKEN") + } + + appToken, ok := envMap["SLACK_APP_TOKEN"] + if !ok { + t.Fatal("expected SLACK_APP_TOKEN env var") + } + if appToken.ValueFrom == nil || appToken.ValueFrom.SecretKeyRef == nil { + t.Fatal("expected SLACK_APP_TOKEN to reference a secret") + } + if appToken.ValueFrom.SecretKeyRef.Name != "slack-creds" { + t.Errorf("SLACK_APP_TOKEN secret name = %q, want %q", appToken.ValueFrom.SecretKeyRef.Name, "slack-creds") + } + if appToken.ValueFrom.SecretKeyRef.Key != "SLACK_APP_TOKEN" { + t.Errorf("SLACK_APP_TOKEN secret key = %q, want %q", appToken.ValueFrom.SecretKeyRef.Key, "SLACK_APP_TOKEN") + } +} + +func TestDeploymentBuilder_SlackMinimal(t *testing.T) { + builder := NewDeploymentBuilder() + ts := &kelosv1alpha1.TaskSpawner{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-spawner", + Namespace: "default", + }, + Spec: kelosv1alpha1.TaskSpawnerSpec{ + When: kelosv1alpha1.When{ + Slack: &kelosv1alpha1.Slack{ + SecretRef: kelosv1alpha1.SecretReference{Name: "slack-creds"}, + }, + }, + TaskTemplate: kelosv1alpha1.TaskTemplate{ + Type: "claude-code", + }, + }, + } + + deploy := builder.Build(ts, nil, false) + + if len(deploy.Spec.Template.Spec.Containers) != 1 { + t.Fatalf("expected 1 container, got %d", len(deploy.Spec.Template.Spec.Containers)) + } + + spawner := deploy.Spec.Template.Spec.Containers[0] + + // Verify no optional args are present + for _, arg := range spawner.Args { + if strings.HasPrefix(arg, "--slack-trigger-command") || + strings.HasPrefix(arg, "--slack-channels") || + strings.HasPrefix(arg, "--slack-allowed-users") { + t.Errorf("unexpected Slack arg in minimal config: %s", arg) + } + } + + // Env vars should still be present (tokens are always required) + envMap := make(map[string]corev1.EnvVar) + for _, env := range spawner.Env { + envMap[env.Name] = env + } + + if _, ok := envMap["SLACK_BOT_TOKEN"]; !ok { + t.Error("expected SLACK_BOT_TOKEN env var even in minimal config") + } + if _, ok := envMap["SLACK_APP_TOKEN"]; !ok { + t.Error("expected SLACK_APP_TOKEN env var even in minimal config") + } +} diff --git a/internal/reporting/slack.go b/internal/reporting/slack.go new file mode 100644 index 00000000..ab4f9e59 --- /dev/null +++ b/internal/reporting/slack.go @@ -0,0 +1,69 @@ +package reporting + +import ( + "context" + "fmt" + + "github.com/slack-go/slack" +) + +// SlackReporter posts and updates thread replies in Slack channels. +type SlackReporter struct { + // BotToken is the Bot User OAuth Token (xoxb-...). + BotToken string + client *slack.Client +} + +func (r *SlackReporter) api() *slack.Client { + if r.client == nil { + r.client = slack.New(r.BotToken) + } + return r.client +} + +// PostThreadReply posts a new message as a thread reply and returns the +// reply's message timestamp. +func (r *SlackReporter) PostThreadReply(ctx context.Context, channel, threadTS, text string) (string, error) { + _, ts, err := r.api().PostMessageContext(ctx, channel, + slack.MsgOptionText(text, false), + slack.MsgOptionTS(threadTS), + ) + if err != nil { + return "", fmt.Errorf("posting Slack thread reply: %w", err) + } + return ts, nil +} + +// UpdateMessage updates an existing Slack message in place. +func (r *SlackReporter) UpdateMessage(ctx context.Context, channel, messageTS, text string) error { + _, _, _, err := r.api().UpdateMessageContext(ctx, channel, messageTS, + slack.MsgOptionText(text, false), + ) + if err != nil { + return fmt.Errorf("updating Slack message: %w", err) + } + return nil +} + +// FormatSlackAccepted returns the thread reply text for an accepted task. +func FormatSlackAccepted(taskName string) string { + return fmt.Sprintf("Working on your request... (Task: %s)", taskName) +} + +// FormatSlackSucceeded returns the thread reply text for a succeeded task. +// When results contain a PR URL, it is included in the message. +func FormatSlackSucceeded(taskName string, results map[string]string) string { + if pr := results["pr"]; pr != "" { + return fmt.Sprintf("Done! PR: %s (Task: %s)", pr, taskName) + } + return fmt.Sprintf("Done! (Task: %s)", taskName) +} + +// FormatSlackFailed returns the thread reply text for a failed task. +// When a status message is available, it is included in the reply. +func FormatSlackFailed(taskName, message string) string { + if message != "" { + return fmt.Sprintf("Failed: %s (Task: %s)", message, taskName) + } + return fmt.Sprintf("Failed. (Task: %s)", taskName) +} diff --git a/internal/reporting/slack_test.go b/internal/reporting/slack_test.go new file mode 100644 index 00000000..d4a6fe11 --- /dev/null +++ b/internal/reporting/slack_test.go @@ -0,0 +1,80 @@ +package reporting + +import ( + "context" + "testing" +) + +func TestFormatSlackMessages(t *testing.T) { + t.Run("accepted", func(t *testing.T) { + got := FormatSlackAccepted("spawner-1234567890.123456") + want := "Working on your request... (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) + + t.Run("succeeded with PR", func(t *testing.T) { + results := map[string]string{"pr": "https://github.com/org/repo/pull/42"} + got := FormatSlackSucceeded("spawner-1234567890.123456", results) + want := "Done! PR: https://github.com/org/repo/pull/42 (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) + + t.Run("succeeded without results", func(t *testing.T) { + got := FormatSlackSucceeded("spawner-1234567890.123456", nil) + want := "Done! (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) + + t.Run("succeeded with empty results", func(t *testing.T) { + got := FormatSlackSucceeded("spawner-1234567890.123456", map[string]string{}) + want := "Done! (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) + + t.Run("failed with message", func(t *testing.T) { + got := FormatSlackFailed("spawner-1234567890.123456", "pod OOMKilled") + want := "Failed: pod OOMKilled (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) + + t.Run("failed without message", func(t *testing.T) { + got := FormatSlackFailed("spawner-1234567890.123456", "") + want := "Failed. (Task: spawner-1234567890.123456)" + if got != want { + t.Errorf("got %q, want %q", got, want) + } + }) +} + +func TestSlackReporterConstruction(t *testing.T) { + reporter := &SlackReporter{BotToken: "xoxb-test-token"} + if reporter.BotToken != "xoxb-test-token" { + t.Errorf("BotToken = %q, want %q", reporter.BotToken, "xoxb-test-token") + } +} + +func TestSlackReporter_PostThreadReplyError(t *testing.T) { + reporter := &SlackReporter{BotToken: "xoxb-invalid"} + _, err := reporter.PostThreadReply(context.Background(), "C123", "1234.5678", "test") + if err == nil { + t.Error("expected error with invalid token, got nil") + } +} + +func TestSlackReporter_UpdateMessageError(t *testing.T) { + reporter := &SlackReporter{BotToken: "xoxb-invalid"} + err := reporter.UpdateMessage(context.Background(), "C123", "1234.5678", "test") + if err == nil { + t.Error("expected error with invalid token, got nil") + } +} diff --git a/internal/reporting/watcher.go b/internal/reporting/watcher.go index 2c7b059a..28cf87cf 100644 --- a/internal/reporting/watcher.go +++ b/internal/reporting/watcher.go @@ -32,6 +32,26 @@ const ( // AnnotationGitHubReportPhase records the last Task phase that was // reported to GitHub, preventing duplicate API calls on re-list. AnnotationGitHubReportPhase = "kelos.dev/github-report-phase" + + // AnnotationSlackReporting indicates that Slack reporting is enabled + // for this Task. + AnnotationSlackReporting = "kelos.dev/slack-reporting" + + // AnnotationSlackChannel records the Slack channel ID where the + // originating message was posted. + AnnotationSlackChannel = "kelos.dev/slack-channel" + + // AnnotationSlackThreadTS records the originating message timestamp, + // used as thread_ts for posting replies. + AnnotationSlackThreadTS = "kelos.dev/slack-thread-ts" + + // AnnotationSlackReplyTS stores the message timestamp of the status + // reply so subsequent updates edit the same message. + AnnotationSlackReplyTS = "kelos.dev/slack-reply-ts" + + // AnnotationSlackReportPhase records the last Task phase that was + // reported to Slack, preventing duplicate API calls on re-list. + AnnotationSlackReportPhase = "kelos.dev/slack-report-phase" ) // TaskReporter watches Tasks and reports status changes to GitHub. @@ -156,3 +176,113 @@ func (tr *TaskReporter) persistReportingState(ctx context.Context, task *kelosv1 return nil } + +// SlackMessenger is the interface for posting and updating Slack messages. +type SlackMessenger interface { + PostThreadReply(ctx context.Context, channel, threadTS, text string) (string, error) + UpdateMessage(ctx context.Context, channel, messageTS, text string) error +} + +// SlackTaskReporter watches Tasks and reports status changes to Slack +// as thread replies on the originating message. +type SlackTaskReporter struct { + Client client.Client + Reporter SlackMessenger +} + +// ReportTaskStatus checks a Task's current phase against its last reported +// phase and creates or updates the Slack thread reply as needed. +func (tr *SlackTaskReporter) ReportTaskStatus(ctx context.Context, task *kelosv1alpha1.Task) error { + log := ctrl.Log.WithName("slack-reporter") + + annotations := task.Annotations + if annotations == nil { + return nil + } + + if annotations[AnnotationSlackReporting] != "enabled" { + return nil + } + + channel := annotations[AnnotationSlackChannel] + threadTS := annotations[AnnotationSlackThreadTS] + if channel == "" || threadTS == "" { + return nil + } + + var desiredPhase string + switch task.Status.Phase { + case kelosv1alpha1.TaskPhasePending, kelosv1alpha1.TaskPhaseRunning, kelosv1alpha1.TaskPhaseWaiting: + desiredPhase = "accepted" + case kelosv1alpha1.TaskPhaseSucceeded: + desiredPhase = "succeeded" + case kelosv1alpha1.TaskPhaseFailed: + desiredPhase = "failed" + default: + return nil + } + + if annotations[AnnotationSlackReportPhase] == desiredPhase { + return nil + } + + var body string + switch desiredPhase { + case "accepted": + body = FormatSlackAccepted(task.Name) + case "succeeded": + body = FormatSlackSucceeded(task.Name, task.Status.Results) + case "failed": + body = FormatSlackFailed(task.Name, task.Status.Message) + } + + replyTS := annotations[AnnotationSlackReplyTS] + if replyTS == "" { + log.Info("Posting Slack thread reply", "task", task.Name, "channel", channel, "phase", desiredPhase) + newTS, err := tr.Reporter.PostThreadReply(ctx, channel, threadTS, body) + if err != nil { + return fmt.Errorf("posting Slack reply for task %s: %w", task.Name, err) + } + replyTS = newTS + } else { + log.Info("Updating Slack thread reply", "task", task.Name, "channel", channel, "phase", desiredPhase) + if err := tr.Reporter.UpdateMessage(ctx, channel, replyTS, body); err != nil { + return fmt.Errorf("updating Slack reply for task %s: %w", task.Name, err) + } + } + + if err := tr.persistSlackReportingState(ctx, task, replyTS, desiredPhase); err != nil { + return err + } + + return nil +} + +func (tr *SlackTaskReporter) persistSlackReportingState(ctx context.Context, task *kelosv1alpha1.Task, replyTS, desiredPhase string) error { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + var current kelosv1alpha1.Task + if err := tr.Client.Get(ctx, client.ObjectKeyFromObject(task), ¤t); err != nil { + return err + } + + if current.Annotations == nil { + current.Annotations = make(map[string]string) + } + current.Annotations[AnnotationSlackReplyTS] = replyTS + current.Annotations[AnnotationSlackReportPhase] = desiredPhase + + if err := tr.Client.Update(ctx, ¤t); err != nil { + return err + } + + task.Annotations = current.Annotations + return nil + }); err != nil { + if apierrors.IsNotFound(err) { + return fmt.Errorf("persisting Slack reporting annotations on task %s: task no longer exists", task.Name) + } + return fmt.Errorf("persisting Slack reporting annotations on task %s: %w", task.Name, err) + } + + return nil +} diff --git a/internal/reporting/watcher_test.go b/internal/reporting/watcher_test.go index 4c6b471f..c832b33b 100644 --- a/internal/reporting/watcher_test.go +++ b/internal/reporting/watcher_test.go @@ -576,3 +576,274 @@ func TestReportTaskStatus_NilAnnotations(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } } + +func TestSlackTaskReporter_PostsThreadReply(t *testing.T) { + task := &kelosv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-task", + Namespace: "default", + Annotations: map[string]string{ + AnnotationSlackReporting: "enabled", + AnnotationSlackChannel: "C123ABC", + AnnotationSlackThreadTS: "1234567890.123456", + }, + }, + Spec: kelosv1alpha1.TaskSpec{ + Type: "claude-code", + Prompt: "test", + Credentials: kelosv1alpha1.Credentials{ + Type: kelosv1alpha1.CredentialTypeOAuth, + SecretRef: &kelosv1alpha1.SecretReference{Name: "creds"}, + }, + }, + Status: kelosv1alpha1.TaskStatus{ + Phase: kelosv1alpha1.TaskPhasePending, + }, + } + + cl := fake.NewClientBuilder().WithScheme(newTestScheme()).WithObjects(task).Build() + + var posted []slackReplyRecord + reporter := &fakeSlackReporter{ + postFn: func(ctx context.Context, channel, threadTS, text string) (string, error) { + posted = append(posted, slackReplyRecord{method: "post", channel: channel, threadTS: threadTS, text: text}) + return "1234567890.999999", nil + }, + } + + tr := &SlackTaskReporter{Client: cl, Reporter: reporter} + + if err := tr.ReportTaskStatus(context.Background(), task); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(posted) != 1 { + t.Fatalf("expected 1 post, got %d", len(posted)) + } + if posted[0].channel != "C123ABC" { + t.Errorf("channel = %q, want C123ABC", posted[0].channel) + } + if posted[0].threadTS != "1234567890.123456" { + t.Errorf("threadTS = %q, want 1234567890.123456", posted[0].threadTS) + } + + // Verify annotations were persisted + var updated kelosv1alpha1.Task + if err := cl.Get(context.Background(), client.ObjectKeyFromObject(task), &updated); err != nil { + t.Fatalf("getting updated task: %v", err) + } + if updated.Annotations[AnnotationSlackReportPhase] != "accepted" { + t.Errorf("report phase = %q, want accepted", updated.Annotations[AnnotationSlackReportPhase]) + } + if updated.Annotations[AnnotationSlackReplyTS] != "1234567890.999999" { + t.Errorf("reply ts = %q, want 1234567890.999999", updated.Annotations[AnnotationSlackReplyTS]) + } +} + +func TestSlackTaskReporter_UpdatesExistingReply(t *testing.T) { + task := &kelosv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-task", + Namespace: "default", + Annotations: map[string]string{ + AnnotationSlackReporting: "enabled", + AnnotationSlackChannel: "C123ABC", + AnnotationSlackThreadTS: "1234567890.123456", + AnnotationSlackReplyTS: "1234567890.999999", + AnnotationSlackReportPhase: "accepted", + }, + }, + Spec: kelosv1alpha1.TaskSpec{ + Type: "claude-code", + Prompt: "test", + Credentials: kelosv1alpha1.Credentials{ + Type: kelosv1alpha1.CredentialTypeOAuth, + SecretRef: &kelosv1alpha1.SecretReference{Name: "creds"}, + }, + }, + Status: kelosv1alpha1.TaskStatus{ + Phase: kelosv1alpha1.TaskPhaseSucceeded, + Results: map[string]string{"pr": "https://github.com/org/repo/pull/42"}, + }, + } + + cl := fake.NewClientBuilder().WithScheme(newTestScheme()).WithObjects(task).Build() + + var updated []slackReplyRecord + reporter := &fakeSlackReporter{ + updateFn: func(ctx context.Context, channel, messageTS, text string) error { + updated = append(updated, slackReplyRecord{method: "update", channel: channel, threadTS: messageTS, text: text}) + return nil + }, + } + + tr := &SlackTaskReporter{Client: cl, Reporter: reporter} + + if err := tr.ReportTaskStatus(context.Background(), task); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(updated) != 1 { + t.Fatalf("expected 1 update, got %d", len(updated)) + } + if updated[0].channel != "C123ABC" { + t.Errorf("channel = %q, want C123ABC", updated[0].channel) + } + // Verify the message includes the PR URL + wantText := FormatSlackSucceeded(task.Name, task.Status.Results) + if updated[0].text != wantText { + t.Errorf("text = %q, want %q", updated[0].text, wantText) + } +} + +func TestSlackTaskReporter_SkipPaths(t *testing.T) { + baseTask := &kelosv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-task", + Namespace: "default", + Annotations: map[string]string{ + AnnotationSlackReporting: "enabled", + AnnotationSlackChannel: "C123ABC", + AnnotationSlackThreadTS: "1234567890.123456", + }, + }, + Status: kelosv1alpha1.TaskStatus{ + Phase: kelosv1alpha1.TaskPhasePending, + }, + } + + tests := []struct { + name string + mutate func(t *kelosv1alpha1.Task) + }{ + { + name: "no reporting annotation", + mutate: func(t *kelosv1alpha1.Task) { + delete(t.Annotations, AnnotationSlackReporting) + }, + }, + { + name: "already reported same phase", + mutate: func(t *kelosv1alpha1.Task) { + t.Annotations[AnnotationSlackReportPhase] = "accepted" + }, + }, + { + name: "nil annotations", + mutate: func(t *kelosv1alpha1.Task) { + t.Annotations = nil + }, + }, + { + name: "missing channel", + mutate: func(t *kelosv1alpha1.Task) { + delete(t.Annotations, AnnotationSlackChannel) + }, + }, + { + name: "empty phase", + mutate: func(t *kelosv1alpha1.Task) { + t.Status.Phase = "" + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + task := baseTask.DeepCopy() + tt.mutate(task) + + called := false + reporter := &fakeSlackReporter{ + postFn: func(ctx context.Context, channel, threadTS, text string) (string, error) { + called = true + return "", nil + }, + } + + cl := fake.NewClientBuilder().WithScheme(newTestScheme()).WithObjects(task).Build() + tr := &SlackTaskReporter{Client: cl, Reporter: reporter} + + if err := tr.ReportTaskStatus(context.Background(), task); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if called { + t.Error("expected reporter to not be called") + } + }) + } +} + +type slackReplyRecord struct { + method string + channel string + threadTS string + text string +} + +type fakeSlackReporter struct { + postFn func(ctx context.Context, channel, threadTS, text string) (string, error) + updateFn func(ctx context.Context, channel, messageTS, text string) error +} + +func (f *fakeSlackReporter) PostThreadReply(ctx context.Context, channel, threadTS, text string) (string, error) { + if f.postFn != nil { + return f.postFn(ctx, channel, threadTS, text) + } + return "fake-reply-ts", nil +} + +func (f *fakeSlackReporter) UpdateMessage(ctx context.Context, channel, messageTS, text string) error { + if f.updateFn != nil { + return f.updateFn(ctx, channel, messageTS, text) + } + return nil +} + +func TestSlackTaskReporter_PhaseMapping(t *testing.T) { + tests := []struct { + name string + phase kelosv1alpha1.TaskPhase + wantDesired string + shouldProcess bool + }{ + {"pending", kelosv1alpha1.TaskPhasePending, "accepted", true}, + {"running", kelosv1alpha1.TaskPhaseRunning, "accepted", true}, + {"waiting", kelosv1alpha1.TaskPhaseWaiting, "accepted", true}, + {"succeeded", kelosv1alpha1.TaskPhaseSucceeded, "succeeded", true}, + {"failed", kelosv1alpha1.TaskPhaseFailed, "failed", true}, + {"empty", "", "", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + task := &kelosv1alpha1.Task{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-task", + Namespace: "default", + Annotations: map[string]string{ + AnnotationSlackReporting: "enabled", + AnnotationSlackChannel: "C123", + AnnotationSlackThreadTS: "1234.5678", + }, + }, + Status: kelosv1alpha1.TaskStatus{ + Phase: tt.phase, + }, + } + + if tt.shouldProcess { + // Mark as already reported to verify skip logic + task.Annotations[AnnotationSlackReportPhase] = tt.wantDesired + } + + cl := fake.NewClientBuilder().WithScheme(newTestScheme()).WithObjects(task).Build() + tr := &SlackTaskReporter{Client: cl, Reporter: &SlackReporter{BotToken: "xoxb-test"}} + + // Should not error — either skips (empty phase) or skips (already reported) + if err := tr.ReportTaskStatus(context.Background(), task); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + } +} diff --git a/internal/source/slack.go b/internal/source/slack.go index 6c60637b..8f9877b9 100644 --- a/internal/source/slack.go +++ b/internal/source/slack.go @@ -165,7 +165,7 @@ func (s *SlackSource) handleEventsAPI(sm *socketmode.Client, evt socketmode.Even s.mu.Lock() s.counter++ - item := buildWorkItem(innerEvent.TimeStamp, s.counter, userName, body, permalink, channelName) + item := buildWorkItem(innerEvent.TimeStamp, s.counter, userName, body, permalink, channelName, innerEvent.Channel) s.pending = append(s.pending, item) s.mu.Unlock() @@ -203,7 +203,7 @@ func (s *SlackSource) handleSlashCommand(sm *socketmode.Client, evt socketmode.E s.mu.Lock() s.counter++ itemID := fmt.Sprintf("%s:%s:%s", cmd.ChannelID, cmd.Command, cmd.TriggerID) - item := buildWorkItem(itemID, s.counter, userName, body, "", channelName) + item := buildWorkItem(itemID, s.counter, userName, body, "", channelName, cmd.ChannelID) s.pending = append(s.pending, item) s.mu.Unlock() @@ -271,14 +271,14 @@ func matchesUser(userID string, allowed []string) bool { } // buildWorkItem constructs a WorkItem from Slack message fields. -func buildWorkItem(id string, number int, userName, body, permalink, channelName string) WorkItem { +func buildWorkItem(id string, number int, userName, body, permalink, channelName, channelID string) WorkItem { return WorkItem{ ID: id, Number: number, Title: userName, Body: body, URL: permalink, - Labels: []string{channelName}, + Labels: []string{channelName, channelID}, Kind: "SlackMessage", } } diff --git a/internal/source/slack_test.go b/internal/source/slack_test.go index 02b025de..b985ffcd 100644 --- a/internal/source/slack_test.go +++ b/internal/source/slack_test.go @@ -221,7 +221,7 @@ func TestMatchesUser(t *testing.T) { } func TestBuildWorkItem(t *testing.T) { - item := buildWorkItem("1234567890.123456", 42, "Jane Doe", "fix the bug", "https://slack.com/link", "test-channel") + item := buildWorkItem("1234567890.123456", 42, "Jane Doe", "fix the bug", "https://slack.com/link", "test-channel", "C123ABC") if item.ID != "1234567890.123456" { t.Errorf("expected ID %q, got %q", "1234567890.123456", item.ID) @@ -238,8 +238,8 @@ func TestBuildWorkItem(t *testing.T) { if item.URL != "https://slack.com/link" { t.Errorf("expected URL %q, got %q", "https://slack.com/link", item.URL) } - if len(item.Labels) != 1 || item.Labels[0] != "test-channel" { - t.Errorf("expected Labels [test-channel], got %v", item.Labels) + if len(item.Labels) != 2 || item.Labels[0] != "test-channel" || item.Labels[1] != "C123ABC" { + t.Errorf("expected Labels [test-channel C123ABC], got %v", item.Labels) } if item.Kind != "SlackMessage" { t.Errorf("expected Kind %q, got %q", "SlackMessage", item.Kind)