Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(POC V2) GODRIVER-3173 Complete pending reads on conn checkout #1977

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions event/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,20 @@ const (

// strings for pool command monitoring types
const (
ConnectionPoolCreated = "ConnectionPoolCreated"
ConnectionPoolReady = "ConnectionPoolReady"
ConnectionPoolCleared = "ConnectionPoolCleared"
ConnectionPoolClosed = "ConnectionPoolClosed"
ConnectionCreated = "ConnectionCreated"
ConnectionReady = "ConnectionReady"
ConnectionClosed = "ConnectionClosed"
ConnectionCheckOutStarted = "ConnectionCheckOutStarted"
ConnectionCheckOutFailed = "ConnectionCheckOutFailed"
ConnectionCheckedOut = "ConnectionCheckedOut"
ConnectionCheckedIn = "ConnectionCheckedIn"
ConnectionPoolCreated = "ConnectionPoolCreated"
ConnectionPoolReady = "ConnectionPoolReady"
ConnectionPoolCleared = "ConnectionPoolCleared"
ConnectionPoolClosed = "ConnectionPoolClosed"
ConnectionCreated = "ConnectionCreated"
ConnectionReady = "ConnectionReady"
ConnectionClosed = "ConnectionClosed"
ConnectionCheckOutStarted = "ConnectionCheckOutStarted"
ConnectionCheckOutFailed = "ConnectionCheckOutFailed"
ConnectionCheckedOut = "ConnectionCheckedOut"
ConnectionCheckedIn = "ConnectionCheckedIn"
ConnectionPendingReadStarted = "ConnectionPendingReadStarted"
ConnectionPendingReadSucceeded = "ConnectionPendingReadSucceeded"
ConnectionPendingReadFailed = "ConnectionPendingReadFailed"
)

// MonitorPoolOptions contains pool options as formatted in pool events
Expand All @@ -105,9 +108,11 @@ type PoolEvent struct {
Reason string `json:"reason"`
// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
// can be used to distinguish between individual servers in a load balanced deployment.
ServiceID *bson.ObjectID `json:"serviceId"`
Interruption bool `json:"interruptInUseConnections"`
Error error `json:"error"`
ServiceID *bson.ObjectID `json:"serviceId"`
Interruption bool `json:"interruptInUseConnections"`
Error error `json:"error"`
RequestID int32 `json:"requestId"`
RemainingTime time.Duration `json:"remainingTime"`
}

// PoolMonitor is a function that allows the user to gain access to events occurring in the pool
Expand Down
49 changes: 49 additions & 0 deletions internal/driverutil/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (C) MongoDB, Inc. 2025-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0

package driverutil

import "context"

// ContextKey is a custom type used for the keys in context values to avoid
// collisions.
type ContextKey string

const (
// ContextKeyHasMaxTimeMS represents a boolean value that indicates if
// maxTimeMS will be set on the wire message for an operation.
ContextKeyHasMaxTimeMS ContextKey = "hasMaxTimeMS"

// ContextKeyRequestID is the requestID for a given operation. This is used to
// propagate the requestID for a pending read during connection check out.
ContextKeyRequestID ContextKey = "requestID"
)

// WithValueHasMaxTimeMS returns a copy of the parent context with an added
// value indicating whether an operation will append maxTimeMS to the wire
// message.
func WithValueHasMaxTimeMS(parentCtx context.Context, val bool) context.Context {
return context.WithValue(parentCtx, ContextKeyHasMaxTimeMS, val)
}

// WithRequestID returns a copy of the parent context with an added request ID
// value.
func WithRequestID(parentCtx context.Context, requestID int32) context.Context {
return context.WithValue(parentCtx, ContextKeyRequestID, requestID)
}

// HasMaxTimeMS checks if the context is for an operation that will append
// maxTimeMS to the wire message.
func HasMaxTimeMS(ctx context.Context) bool {
return ctx.Value(ContextKeyHasMaxTimeMS) != nil
}

// GetRequestID retrieves the request ID from the context if it exists.
func GetRequestID(ctx context.Context) (int32, bool) {
val, ok := ctx.Value(ContextKeyRequestID).(int32)

return val, ok
}
58 changes: 38 additions & 20 deletions internal/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"reflect"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -675,9 +676,9 @@ func TestClient(t *testing.T) {
},
}

_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
for _, tc := range testCases {
mt.Run(tc.desc, func(mt *mtest.T) {
_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
require.NoError(mt, err)

mt.SetFailPoint(failpoint.FailPoint{
Expand All @@ -692,30 +693,47 @@ func TestClient(t *testing.T) {

mt.ClearEvents()

wg := sync.WaitGroup{}
wg.Add(50)

for i := 0; i < 50; i++ {
// Run 50 operations, each with a timeout of 50ms. Expect
// Run 50 concurrent operations, each with a timeout of 50ms. Expect
// them to all return a timeout error because the failpoint
// blocks find operations for 500ms. Run 50 to increase the
// blocks find operations for 50ms. Run 50 to increase the
// probability that an operation will time out in a way that
// can cause a retry.
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
err = tc.operation(ctx, mt.Coll)
cancel()
assert.ErrorIs(mt, err, context.DeadlineExceeded)
assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true")

// Assert that each operation reported exactly one command
// started events, which means the operation did not retry
// after the context timeout.
evts := mt.GetAllStartedEvents()
require.Len(mt,
mt.GetAllStartedEvents(),
1,
"expected exactly 1 command started event per operation, but got %d after %d iterations",
len(evts),
i)
mt.ClearEvents()
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
err := tc.operation(ctx, mt.Coll)
cancel()
assert.ErrorIs(mt, err, context.DeadlineExceeded)
assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true")

wg.Done()
}()
}

wg.Wait()

// Since an operation requires checking out a connection and because we
// attempt a pending read for socket timeouts and since the test forces
// 50 concurrent socket timeouts, then it's possible that an
// operation checks out a connection that has a pending read. In this
// case the operation will time out when checking out a connection, and
// a started event will not be propagated. So instead of
// checking that we got exactly 50 started events, we should instead
// ensure that the number of started events is equal to the number of
// unique connections used to process the operations.
pendingReadConns := mt.NumberConnectionsPendingReadStarted()
evts := mt.GetAllStartedEvents()

require.Equal(mt,
len(evts)+pendingReadConns,
50,
"expected exactly 1 command started event per operation (50), but got %d",
len(evts)+pendingReadConns)
mt.ClearEvents()
mt.ClearFailPoints()
})
}
})
Expand Down
Loading
Loading