feat: implement SlackSource with Socket Mode listener#36
feat: implement SlackSource with Socket Mode listener#36jkahuja wants to merge 3 commits intoAIE-17-slack-api-typesfrom
Conversation
Add SlackSource that connects to Slack via Socket Mode and accumulates WorkItems from channel messages. Discover() drains the queue each cycle. Event processing logic (shouldProcess, matchesChannel, matchesUser, buildWorkItem) is extracted as pure functions for testability. The Socket Mode goroutine is a thin glue layer. Filtering: ignores threaded replies, bot messages, and self-messages. Optional trigger command prefix support with stripping. Co-Authored-By: Claude Opus 4.6 <[email protected]>
Greptile SummaryThis PR introduces Key changes:
Remaining concerns:
Confidence Score: 4/5Safe to merge once the shared enrichment timeout and event-loop goroutine-leak concerns are addressed; all prior P0/P1 races are resolved. All previously flagged data races (started flag, cancel field) are fixed. The three remaining findings are P2 — none will cause incorrect WorkItems or data loss — but the goroutine-leak risk on Stop() and the stalled event loop are reliability concerns worth fixing before production. Score of 4 rather than 5 reflects that these are real reliability issues, not purely cosmetic. internal/source/slack.go — enrichment timeout sharing (lines 141-164) and event-loop goroutine exit path (lines 89-98). Important Files Changed
Sequence DiagramsequenceDiagram
participant Spawner
participant SlackSource
participant SocketMode as socketmode.Client
participant SlackAPI as Slack API
Spawner->>SlackSource: Discover(ctx)
note over SlackSource: startOnce.Do fires on first call
SlackSource->>SlackAPI: AuthTestContext(ctx)
SlackAPI-->>SlackSource: selfUserID
SlackSource->>SocketMode: RunContext(bgCtx) [goroutine]
SlackSource->>SocketMode: range sm.Events [goroutine]
SlackSource-->>Spawner: [] (empty first cycle)
SlackAPI-->>SocketMode: WebSocket event (message)
SocketMode-->>SlackSource: EventTypeEventsAPI
SlackSource->>SlackSource: shouldProcess() / matchesChannel() / matchesUser()
SlackSource->>SlackAPI: GetUserInfoContext(enrichCtx)
SlackSource->>SlackAPI: GetPermalinkContext(enrichCtx)
SlackSource->>SlackAPI: GetConversationInfoContext(enrichCtx)
SlackSource->>SlackSource: mu.Lock → append(pending, item)
Spawner->>SlackSource: Discover(ctx)
SlackSource->>SlackSource: mu.Lock → drain pending
SlackSource-->>Spawner: [WorkItem{Kind:"SlackMessage"}]
Spawner->>SlackSource: Stop()
SlackSource->>SlackSource: mu.Lock → cancel()
SocketMode-->>SocketMode: RunContext returns
Reviews (3): Last reviewed commit: "fix: guard s.cancel with mutex to preven..." | Re-trigger Greptile |
internal/source/slack.go
Outdated
| if !s.started { | ||
| if err := s.Start(ctx); err != nil { | ||
| return nil, fmt.Errorf("Starting Slack source: %w", err) | ||
| } | ||
| s.started = true | ||
| } |
There was a problem hiding this comment.
s.started is read and written in Discover() without holding s.mu. If Discover() is ever called concurrently (e.g., in tests or if the spawner loop is parallelised later), two goroutines could both see started == false, call Start() twice, and create duplicate goroutines and double-initialise s.api, s.selfUserID, and s.cancel. The Go race detector will flag this.
The pending/counter state is properly guarded by s.mu, but the start-up guard is not. Either extend the mutex coverage or use sync.Once for lazy initialisation:
// Option A – guard with sync.Once (idiomatic Go lazy-init)
type SlackSource struct {
...
startOnce sync.Once
startErr error
...
}
func (s *SlackSource) Discover(ctx context.Context) ([]WorkItem, error) {
s.startOnce.Do(func() {
s.startErr = s.Start(ctx)
})
if s.startErr != nil {
return nil, fmt.Errorf("Starting Slack source: %w", s.startErr)
}
...
}There was a problem hiding this comment.
Fixed in 38af305 — replaced started bool with sync.Once as suggested.
internal/source/slack.go
Outdated
|
|
||
| sm := socketmode.New(s.api) | ||
|
|
||
| ctx, s.cancel = context.WithCancel(ctx) |
There was a problem hiding this comment.
Socket Mode lifetime tied to first
Discover() call's context
context.WithCancel(ctx) is derived from the context passed into Start(), which is the same context the caller passed to the very first Discover() call. If that context is ever cancelled (e.g., during a reconcile timeout, a test cleanup, or controller restart), sm.RunContext will return and the background Socket Mode connection will terminate. Because s.started is already true at that point, subsequent calls to Discover() will skip Start() and silently return no items — the source becomes permanently dead without any error surfaced to the caller.
Consider basing the Socket Mode goroutine on a longer-lived context (e.g., the operator manager's lifecycle context, passed separately) rather than re-using the per-call Discover context:
// Start receives a dedicated lifecycle context, not the per-Discover ctx.
func (s *SlackSource) Start(lifecycleCtx context.Context) error {
...
lctx, s.cancel := context.WithCancel(lifecycleCtx)
go func() { sm.RunContext(lctx) }()
...
}Alternatively, detect that the context has been cancelled and reset started so the next Discover() call re-initialises the connection.
There was a problem hiding this comment.
Fixed in 38af305 — Socket Mode goroutine now uses context.Background() as its base. The Discover ctx is only used for the initial AuthTestContext call.
- Replace started bool with sync.Once to fix potential data race - Use context.Background() for Socket Mode goroutine lifetime - Add 5-second timeout for enrichment API calls - Use composite key for slash command work item ID - Filter message_changed, message_deleted, message_replied subtypes - Add test cases for new filtered subtypes Towards AIE-17 Co-Authored-By: Claude Opus 4.6 <[email protected]>
|
@greptile re-review |
| // Stop shuts down the Socket Mode listener. | ||
| func (s *SlackSource) Stop() { | ||
| if s.cancel != nil { | ||
| s.cancel() | ||
| } | ||
| } |
There was a problem hiding this comment.
Data race on
s.cancel in Stop()
s.cancel is written in Start() (inside startOnce.Do) but read in Stop() without any synchronisation. If Stop() is called concurrently while Discover() is still inside startOnce.Do (i.e., Start() hasn't yet returned), the Go race detector will flag a concurrent read/write on s.cancel. The previous race on s.started was fixed with sync.Once, but s.cancel still has the same problem.
A minimal fix is to guard access through the existing mutex:
func (s *SlackSource) Stop() {
s.mu.Lock()
cancel := s.cancel
s.mu.Unlock()
if cancel != nil {
cancel()
}
}And symmetrically in Start(), assign s.cancel while holding s.mu:
bgCtx, bgCancel := context.WithCancel(context.Background())
s.mu.Lock()
s.cancel = bgCancel
s.mu.Unlock()There was a problem hiding this comment.
Fixed in 1148eee — both Start() and Stop() now access s.cancel under s.mu.
s.cancel is written in Start() and read in Stop() — without synchronization this is a race if Stop() is called while Start() is still running inside startOnce.Do. Towards AIE-17 Co-Authored-By: Claude Opus 4.6 <[email protected]>
|
@greptile re-review |
What type of PR is this?
/kind feature
What this PR does / why we need it:
Implements
SlackSource— the core Go code that connects to Slack via Socket Mode and producesWorkItems through the existingSourceinterface.Architecture: A background goroutine maintains a persistent WebSocket connection to Slack. Incoming messages are filtered and accumulated in a queue.
Discover()drains the queue each cycle, matching the existing spawner loop pattern.Event processing is split into pure, testable functions:
shouldProcess()— filters out threaded replies, bot messages, self-messages, and applies trigger command prefix matchingmatchesChannel()/matchesUser()— allow-list filtering (empty list = allow all)buildWorkItem()— constructs WorkItem with Kind="SlackMessage"Key behaviors:
AuthTest()on startup to filter self-messagesDiscover()callWhich issue(s) this PR is related to:
Part of kelos-dev#595
Towards AIE-17
Special notes for your reviewer:
This is PR 2 of 4 for the Slack integration, stacked on PR #35 (API types).
The Socket Mode glue layer (
Start(), event handlers) is intentionally thin and not unit-tested — it will be validated via integration/e2e testing after PR 3 wires everything together. All filtering and WorkItem construction logic is tested via the extracted pure functions.Does this PR introduce a user-facing change?