Skip to content

Commit aa888d0

Browse files
DmitryNaumovDmitry Naumovkprokopenko
authored
Remove race condition in stream_reconnector.go (#1902)
Co-authored-by: Dmitry Naumov <[email protected]> Co-authored-by: Konstantin Prokopenko <[email protected]>
1 parent b51f44f commit aa888d0

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed race in `readerReconnector`
2+
13
## v3.117.1
24
* Fixed scan a column of type `Decimal(precision,scale)` into a struct field of type `types.Decimal{}` using `ScanStruct()`
35
* Fixed race in integration test `TestTopicWriterLogMessagesWithoutData`

internal/topic/topicreaderinternal/stream_reconnector.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,21 @@ func (r *readerReconnector) CloseWithError(ctx context.Context, reason error) er
212212
r.closeOnce.Do(func() {
213213
closeErr = r.background.Close(ctx, reason)
214214

215-
if r.streamVal != nil {
216-
streamCloseErr := r.streamVal.CloseWithError(ctx, xerrors.WithStackTrace(errReaderClosed))
217-
r.streamContextCancel(errReaderClosed)
215+
// Get references under lock
216+
var streamVal batchedStreamReader
217+
var streamCancel context.CancelCauseFunc
218+
219+
r.m.WithLock(func() {
220+
streamVal = r.streamVal
221+
streamCancel = r.streamContextCancel
222+
})
223+
224+
// Make I/O calls outside the lock
225+
if streamVal != nil {
226+
streamCloseErr := streamVal.CloseWithError(ctx, xerrors.WithStackTrace(errReaderClosed))
227+
if streamCancel != nil {
228+
streamCancel(errReaderClosed)
229+
}
218230
if closeErr == nil {
219231
closeErr = streamCloseErr
220232
}

0 commit comments

Comments
 (0)