Skip to content

Commit b51f44f

Browse files
Copilotasmyasnikovrekby
authored
Fix race condition in TestStreamListener_WorkerCreationAndRouting (#1912)
Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: asmyasnikov <[email protected]> Co-authored-by: Aleksey Myasnikov <[email protected]> Co-authored-by: Timofey Koolin <[email protected]>
1 parent 142ad22 commit b51f44f

File tree

1 file changed

+14
-0
lines changed

1 file changed

+14
-0
lines changed

internal/topic/topiclistenerinternal/stream_listener_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,16 @@ func TestStreamListener_WorkerCreationAndRouting(t *testing.T) {
2525
// Initially no workers should exist
2626
require.Empty(t, listener.workers)
2727

28+
// Channel to signal when handler has been called
29+
handlerCalled := make(chan struct{})
30+
2831
// Set up mock expectations - the worker will call OnStartPartitionSessionRequest
2932
EventHandlerMock(e).EXPECT().OnStartPartitionSessionRequest(
3033
gomock.Any(),
3134
gomock.Any(),
3235
).DoAndReturn(func(ctx context.Context, event *PublicEventStartPartitionSession) error {
3336
event.Confirm()
37+
close(handlerCalled)
3438

3539
return nil
3640
})
@@ -56,6 +60,9 @@ func TestStreamListener_WorkerCreationAndRouting(t *testing.T) {
5660
// Should have created a worker
5761
require.Len(t, listener.workers, 1)
5862

63+
// Waiting for add session to internals
64+
xtest.WaitChannelClosed(t, handlerCalled)
65+
5966
// Verify session was added
6067
session, err := listener.sessions.Get(100)
6168
require.NoError(t, err)
@@ -166,12 +173,16 @@ func TestStreamListener_CloseWorkers(t *testing.T) {
166173
ctx := sf.Context(e)
167174
listener := StreamListener(e)
168175

176+
// Channel to signal when handler has been called
177+
handlerCalled := make(chan struct{})
178+
169179
// Set up mock expectations
170180
EventHandlerMock(e).EXPECT().OnStartPartitionSessionRequest(
171181
gomock.Any(),
172182
gomock.Any(),
173183
).DoAndReturn(func(ctx context.Context, event *PublicEventStartPartitionSession) error {
174184
event.Confirm()
185+
close(handlerCalled)
175186

176187
return nil
177188
})
@@ -195,6 +206,9 @@ func TestStreamListener_CloseWorkers(t *testing.T) {
195206
require.NoError(t, err)
196207
require.Len(t, listener.workers, 1)
197208

209+
// Wait for the handler to be called by the worker
210+
xtest.WaitChannelClosed(t, handlerCalled)
211+
198212
// Close the listener - this might fail if background worker is already closed by test cleanup
199213
// That's expected behavior in test environment
200214
_ = listener.Close(ctx, errors.New("test close"))

0 commit comments

Comments
 (0)