Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/robfig/cron/v3 v3.0.1
github.com/slack-go/slack v0.20.0
github.com/spf13/cobra v1.10.2
github.com/stretchr/testify v1.11.1
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -59,6 +60,7 @@ require (
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
github.com/google/renameio/v2 v2.0.0 // indirect
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/huandu/xstrings v1.5.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7
github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/go-test/deep v1.1.1 h1:0r/53hagsehfO4bzD2Pgr/+RgHqhmf+k1Bpse2cTu1U=
github.com/go-test/deep v1.1.1/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/gobuffalo/flect v1.0.3 h1:xeWBM2nui+qnVvNM4S3foBhCAL2XgPU+a7FdpelbTq4=
github.com/gobuffalo/flect v1.0.3/go.mod h1:A5msMlrHtLqh9umBSnvabjsMrCcCpAyzglnDvkbYKHs=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
Expand Down Expand Up @@ -106,6 +108,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/yamlfmt v0.21.0 h1:9FKApQkDpMKgBjwLFytBHUCgqnQgxaQnci0uiESfbzs=
github.com/google/yamlfmt v0.21.0/go.mod h1:q6FYExB+Ueu7jZDjKECJk+EaeDXJzJ6Ne0dxx69GWfI=
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo=
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
Expand Down Expand Up @@ -190,6 +194,8 @@ github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 h1:KRzFb2m7YtdldCEkzs6KqmJw4nqEV
github.com/santhosh-tekuri/jsonschema/v6 v6.0.2/go.mod h1:JXeL+ps8p7/KNMjDQk3TCwPpBy0wYklyWTfbkIzdIFU=
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
github.com/slack-go/slack v0.20.0 h1:gbDdbee8+Z2o+DWx05Spq3GzbrLLleiRwHUKs+hZLSU=
github.com/slack-go/slack v0.20.0/go.mod h1:K81UmCivcYd/5Jmz8vLBfuyoZ3B4rQC2GHVXHteXiAE=
github.com/spf13/afero v1.12.0 h1:UcOPyRBYczmFn6yvphxkn9ZEOY65cpwGKb5mL36mrqs=
github.com/spf13/afero v1.12.0/go.mod h1:ZTlWwG4/ahT8W7T0WQ5uYmjI9duaLQGy3Q2OAl4sk/4=
github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w=
Expand Down
271 changes: 271 additions & 0 deletions internal/source/slack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
package source

import (
"context"
"fmt"
"strings"
"sync"

"github.com/slack-go/slack"
"github.com/slack-go/slack/slackevents"
"github.com/slack-go/slack/socketmode"
ctrl "sigs.k8s.io/controller-runtime"
)

// SlackSource discovers work items from Slack messages via Socket Mode.
// A background goroutine listens for Slack events and accumulates WorkItems
// in an internal queue. Discover() drains the queue on each call.
type SlackSource struct {
// BotToken is the Bot User OAuth Token (xoxb-...).
BotToken string
// AppToken is the App-Level Token for Socket Mode (xapp-...).
AppToken string
// TriggerCommand is an optional slash command or message prefix.
// When empty, every non-threaded message triggers a task.
TriggerCommand string
// Channels restricts listening to specific channel IDs. Empty = all.
Channels []string
// AllowedUsers restricts which user IDs can trigger tasks. Empty = all.
AllowedUsers []string

mu sync.Mutex
pending []WorkItem
counter int
started bool
selfUserID string
api *slack.Client
cancel context.CancelFunc
}

// Discover returns accumulated WorkItems since the last call.
// On the first call it starts the Socket Mode listener.
func (s *SlackSource) Discover(ctx context.Context) ([]WorkItem, error) {
if !s.started {
if err := s.Start(ctx); err != nil {
return nil, fmt.Errorf("Starting Slack source: %w", err)
}
s.started = true
}

s.mu.Lock()
items := s.pending
s.pending = nil
s.mu.Unlock()

return items, nil
}

// Start connects to Slack via Socket Mode and begins listening for events.
func (s *SlackSource) Start(ctx context.Context) error {
log := ctrl.Log.WithName("slack-source")

s.api = slack.New(
s.BotToken,
slack.OptionAppLevelToken(s.AppToken),
)

authResp, err := s.api.AuthTestContext(ctx)
if err != nil {
return fmt.Errorf("Slack auth test failed: %w", err)
}
s.selfUserID = authResp.UserID
log.Info("Authenticated with Slack", "botUserID", s.selfUserID)

sm := socketmode.New(s.api)

ctx, s.cancel = context.WithCancel(ctx)

go func() {
if err := sm.RunContext(ctx); err != nil {
log.Error(err, "Socket Mode connection closed")
}
}()

go func() {
for evt := range sm.Events {
switch evt.Type {
case socketmode.EventTypeEventsAPI:
s.handleEventsAPI(sm, evt)
case socketmode.EventTypeSlashCommand:
s.handleSlashCommand(sm, evt)
}
}
}()

return nil
}

// Stop shuts down the Socket Mode listener.
func (s *SlackSource) Stop() {
if s.cancel != nil {
s.cancel()
}
}

func (s *SlackSource) handleEventsAPI(sm *socketmode.Client, evt socketmode.Event) {
log := ctrl.Log.WithName("slack-source")

eventsAPIEvent, ok := evt.Data.(slackevents.EventsAPIEvent)
if !ok {
sm.Ack(*evt.Request)
return
}
sm.Ack(*evt.Request)

innerEvent, ok := eventsAPIEvent.InnerEvent.Data.(*slackevents.MessageEvent)
if !ok {
return
}

body, ok := shouldProcess(innerEvent.User, innerEvent.SubType, innerEvent.ThreadTimeStamp, innerEvent.Text, s.selfUserID, s.TriggerCommand)
if !ok {
return
}

if !matchesChannel(innerEvent.Channel, s.Channels) {
return
}
if !matchesUser(innerEvent.User, s.AllowedUsers) {
return
}

userName := innerEvent.User
if info, err := s.api.GetUserInfo(innerEvent.User); err == nil {
userName = info.RealName
if userName == "" {
userName = info.Name
}
}

permalink := ""
if link, err := s.api.GetPermalink(&slack.PermalinkParameters{
Channel: innerEvent.Channel,
Ts: innerEvent.TimeStamp,
}); err == nil {
permalink = link
}

channelName := innerEvent.Channel
if info, err := s.api.GetConversationInfo(&slack.GetConversationInfoInput{
ChannelID: innerEvent.Channel,
}); err == nil {
channelName = info.Name
}

s.mu.Lock()
s.counter++
item := buildWorkItem(innerEvent.TimeStamp, s.counter, userName, body, permalink, channelName)
s.pending = append(s.pending, item)
s.mu.Unlock()

log.Info("Queued Slack message as work item", "number", item.Number, "user", userName, "channel", channelName)
}

func (s *SlackSource) handleSlashCommand(sm *socketmode.Client, evt socketmode.Event) {
log := ctrl.Log.WithName("slack-source")

cmd, ok := evt.Data.(slack.SlashCommand)
if !ok {
sm.Ack(*evt.Request)
return
}
sm.Ack(*evt.Request)

if cmd.UserID == s.selfUserID {
return
}
if !matchesChannel(cmd.ChannelID, s.Channels) {
return
}
if !matchesUser(cmd.UserID, s.AllowedUsers) {
return
}

body := strings.TrimSpace(cmd.Text)
if body == "" {
return
}

userName := cmd.UserName
channelName := cmd.ChannelName

s.mu.Lock()
s.counter++
item := buildWorkItem(cmd.TriggerID, s.counter, userName, body, "", channelName)
s.pending = append(s.pending, item)
s.mu.Unlock()

log.Info("Queued slash command as work item", "number", item.Number, "user", userName, "channel", channelName)
}

// shouldProcess decides whether a Slack message should become a WorkItem.
// It returns the processed body text and true if the message should trigger,
// or an empty string and false if it should be ignored.
func shouldProcess(userID, subtype, threadTS, text, selfUserID, triggerCmd string) (string, bool) {
if userID == selfUserID {
return "", false
}
if subtype == "bot_message" {
return "", false
}
if threadTS != "" {
return "", false
}
if text == "" {
return "", false
}

if triggerCmd != "" {
if !strings.HasPrefix(text, triggerCmd) {
return "", false
}
body := strings.TrimSpace(strings.TrimPrefix(text, triggerCmd))
if body == "" {
return "", false
}
return body, true
}

return text, true
}

// matchesChannel returns true if channelID is in the allowed list,
// or if the allowed list is empty (all channels permitted).
func matchesChannel(channelID string, allowed []string) bool {
if len(allowed) == 0 {
return true
}
for _, id := range allowed {
if id == channelID {
return true
}
}
return false
}

// matchesUser returns true if userID is in the allowed list,
// or if the allowed list is empty (all users permitted).
func matchesUser(userID string, allowed []string) bool {
if len(allowed) == 0 {
return true
}
for _, id := range allowed {
if id == userID {
return true
}
}
return false
}

// buildWorkItem constructs a WorkItem from Slack message fields.
func buildWorkItem(id string, number int, userName, body, permalink, channelName string) WorkItem {
return WorkItem{
ID: id,
Number: number,
Title: userName,
Body: body,
URL: permalink,
Labels: []string{channelName},
Kind: "SlackMessage",
}
}
Loading
Loading