From a805a43d8f1c3d9c4f9491123967142f32edcb8c Mon Sep 17 00:00:00 2001
From: Preston Vasquez <prestonvasquez@icloud.com>
Date: Wed, 5 Mar 2025 16:21:48 -0700
Subject: [PATCH 1/3] GODRIVER-3173 Complete pending reads on conn checkout

---
 event/monitoring.go                           |  33 +-
 internal/driverutil/context.go                |  49 ++
 internal/integration/client_test.go           |  58 +-
 internal/integration/csot_test.go             | 153 +++---
 internal/integration/mtest/mongotest.go       |  25 +-
 internal/logger/component.go                  |   3 +
 .../pending-reads.json                        | 519 ++++++++++++++++++
 .../pending-reads.yml                         | 312 +++++++++++
 x/mongo/driver/operation.go                   |   8 +
 x/mongo/driver/topology/connection.go         |  45 +-
 x/mongo/driver/topology/pool.go               | 235 +++++---
 x/mongo/driver/topology/pool_test.go          | 221 ++++----
 12 files changed, 1388 insertions(+), 273 deletions(-)
 create mode 100644 internal/driverutil/context.go
 create mode 100644 testdata/client-side-operations-timeout/pending-reads.json
 create mode 100644 testdata/client-side-operations-timeout/pending-reads.yml

diff --git a/event/monitoring.go b/event/monitoring.go
index 2ca98969d7..4965a55528 100644
--- a/event/monitoring.go
+++ b/event/monitoring.go
@@ -75,17 +75,20 @@ const (
 
 // strings for pool command monitoring types
 const (
-	ConnectionPoolCreated     = "ConnectionPoolCreated"
-	ConnectionPoolReady       = "ConnectionPoolReady"
-	ConnectionPoolCleared     = "ConnectionPoolCleared"
-	ConnectionPoolClosed      = "ConnectionPoolClosed"
-	ConnectionCreated         = "ConnectionCreated"
-	ConnectionReady           = "ConnectionReady"
-	ConnectionClosed          = "ConnectionClosed"
-	ConnectionCheckOutStarted = "ConnectionCheckOutStarted"
-	ConnectionCheckOutFailed  = "ConnectionCheckOutFailed"
-	ConnectionCheckedOut      = "ConnectionCheckedOut"
-	ConnectionCheckedIn       = "ConnectionCheckedIn"
+	ConnectionPoolCreated          = "ConnectionPoolCreated"
+	ConnectionPoolReady            = "ConnectionPoolReady"
+	ConnectionPoolCleared          = "ConnectionPoolCleared"
+	ConnectionPoolClosed           = "ConnectionPoolClosed"
+	ConnectionCreated              = "ConnectionCreated"
+	ConnectionReady                = "ConnectionReady"
+	ConnectionClosed               = "ConnectionClosed"
+	ConnectionCheckOutStarted      = "ConnectionCheckOutStarted"
+	ConnectionCheckOutFailed       = "ConnectionCheckOutFailed"
+	ConnectionCheckedOut           = "ConnectionCheckedOut"
+	ConnectionCheckedIn            = "ConnectionCheckedIn"
+	ConnectionPendingReadStarted   = "ConnectionPendingReadStarted"
+	ConnectionPendingReadSucceeded = "ConnectionPendingReadSucceeded"
+	ConnectionPendingReadFailed    = "ConnectionPendingReadFailed"
 )
 
 // MonitorPoolOptions contains pool options as formatted in pool events
@@ -105,9 +108,11 @@ type PoolEvent struct {
 	Reason       string              `json:"reason"`
 	// ServiceID is only set if the Type is PoolCleared and the server is deployed behind a load balancer. This field
 	// can be used to distinguish between individual servers in a load balanced deployment.
-	ServiceID    *bson.ObjectID `json:"serviceId"`
-	Interruption bool           `json:"interruptInUseConnections"`
-	Error        error          `json:"error"`
+	ServiceID     *bson.ObjectID `json:"serviceId"`
+	Interruption  bool           `json:"interruptInUseConnections"`
+	Error         error          `json:"error"`
+	RequestID     int32          `json:"requestId"`
+	RemainingTime time.Duration  `json:"remainingTime"`
 }
 
 // PoolMonitor is a function that allows the user to gain access to events occurring in the pool
diff --git a/internal/driverutil/context.go b/internal/driverutil/context.go
new file mode 100644
index 0000000000..5b8cd54b3a
--- /dev/null
+++ b/internal/driverutil/context.go
@@ -0,0 +1,49 @@
+// Copyright (C) MongoDB, Inc. 2025-present.
+//
+// Licensed under the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License. You may obtain
+// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+
+package driverutil
+
+import "context"
+
+// ContextKey is a custom type used for the keys in context values to avoid
+// collisions.
+type ContextKey string
+
+const (
+	// ContextKeyHasMaxTimeMS represents a boolean value that indicates if
+	// maxTimeMS will be set on the wire message for an operation.
+	ContextKeyHasMaxTimeMS ContextKey = "hasMaxTimeMS"
+
+	// ContextKeyRequestID is the requestID for a given operation. This is used to
+	// propagate the requestID for a pending read during connection check out.
+	ContextKeyRequestID ContextKey = "requestID"
+)
+
+// WithValueHasMaxTimeMS returns a copy of the parent context with an added
+// value indicating whether an operation will append maxTimeMS to the wire
+// message.
+func WithValueHasMaxTimeMS(parentCtx context.Context, val bool) context.Context {
+	return context.WithValue(parentCtx, ContextKeyHasMaxTimeMS, val)
+}
+
+// WithRequestID returns a copy of the parent context with an added request ID
+// value.
+func WithRequestID(parentCtx context.Context, requestID int32) context.Context {
+	return context.WithValue(parentCtx, ContextKeyRequestID, requestID)
+}
+
+// HasMaxTimeMS checks if the context is for an operation that will append
+// maxTimeMS to the wire message.
+func HasMaxTimeMS(ctx context.Context) bool {
+	return ctx.Value(ContextKeyHasMaxTimeMS) != nil
+}
+
+// GetRequestID retrieves the request ID from the context if it exists.
+func GetRequestID(ctx context.Context) (int32, bool) {
+	val, ok := ctx.Value(ContextKeyRequestID).(int32)
+
+	return val, ok
+}
diff --git a/internal/integration/client_test.go b/internal/integration/client_test.go
index 6f18d9f146..7db256c070 100644
--- a/internal/integration/client_test.go
+++ b/internal/integration/client_test.go
@@ -13,6 +13,7 @@ import (
 	"os"
 	"reflect"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 
@@ -675,9 +676,9 @@ func TestClient(t *testing.T) {
 			},
 		}
 
+		_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
 		for _, tc := range testCases {
 			mt.Run(tc.desc, func(mt *mtest.T) {
-				_, err := mt.Coll.InsertOne(context.Background(), bson.D{})
 				require.NoError(mt, err)
 
 				mt.SetFailPoint(failpoint.FailPoint{
@@ -692,30 +693,47 @@ func TestClient(t *testing.T) {
 
 				mt.ClearEvents()
 
+				wg := sync.WaitGroup{}
+				wg.Add(50)
+
 				for i := 0; i < 50; i++ {
-					// Run 50 operations, each with a timeout of 50ms. Expect
+					// Run 50 concurrent operations, each with a timeout of 50ms. Expect
 					// them to all return a timeout error because the failpoint
-					// blocks find operations for 500ms. Run 50 to increase the
+					// blocks find operations for 50ms. Run 50 to increase the
 					// probability that an operation will time out in a way that
 					// can cause a retry.
-					ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
-					err = tc.operation(ctx, mt.Coll)
-					cancel()
-					assert.ErrorIs(mt, err, context.DeadlineExceeded)
-					assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true")
-
-					// Assert that each operation reported exactly one command
-					// started events, which means the operation did not retry
-					// after the context timeout.
-					evts := mt.GetAllStartedEvents()
-					require.Len(mt,
-						mt.GetAllStartedEvents(),
-						1,
-						"expected exactly 1 command started event per operation, but got %d after %d iterations",
-						len(evts),
-						i)
-					mt.ClearEvents()
+					go func() {
+						ctx, cancel := context.WithTimeout(context.Background(), 150*time.Millisecond)
+						err := tc.operation(ctx, mt.Coll)
+						cancel()
+						assert.ErrorIs(mt, err, context.DeadlineExceeded)
+						assert.True(mt, mongo.IsTimeout(err), "expected mongo.IsTimeout(err) to be true")
+
+						wg.Done()
+					}()
 				}
+
+				wg.Wait()
+
+				// Since an operation requires checking out a connection and because we
+				// attempt a pending read for socket timeouts and since the test forces
+				// 50 concurrent socket timeouts,  then it's possible that an
+				// operation checks out a connection that has a pending read. In this
+				// case the operation will time out when checking out a connection, and
+				// a started event will not be propagated. So instead of
+				// checking that we got exactly 50 started events, we should instead
+				// ensure that the number of started events is equal to the number of
+				// unique connections used to process the operations.
+				pendingReadConns := mt.NumberConnectionsPendingReadStarted()
+				evts := mt.GetAllStartedEvents()
+
+				require.Equal(mt,
+					len(evts)+pendingReadConns,
+					50,
+					"expected exactly 1 command started event per operation (50), but got %d",
+					len(evts)+pendingReadConns)
+				mt.ClearEvents()
+				mt.ClearFailPoints()
 			})
 		}
 	})
diff --git a/internal/integration/csot_test.go b/internal/integration/csot_test.go
index 6808efb2a4..877f5e0341 100644
--- a/internal/integration/csot_test.go
+++ b/internal/integration/csot_test.go
@@ -38,12 +38,13 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 	mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
 
 	testCases := []struct {
-		desc           string
-		commandName    string
-		setup          func(coll *mongo.Collection) error
-		operation      func(ctx context.Context, coll *mongo.Collection) error
-		sendsMaxTimeMS bool
-		topologies     []mtest.TopologyKind
+		desc                             string
+		commandName                      string
+		setup                            func(coll *mongo.Collection) error
+		operation                        func(ctx context.Context, coll *mongo.Collection) error
+		sendsMaxTimeMS                   bool
+		topologies                       []mtest.TopologyKind
+		preventsConnClosureWithTimeoutMS bool
 	}{
 		{
 			desc:        "FindOne",
@@ -55,7 +56,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 			operation: func(ctx context.Context, coll *mongo.Collection) error {
 				return coll.FindOne(ctx, bson.D{}).Err()
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "Find",
@@ -68,7 +70,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 				_, err := coll.Find(ctx, bson.D{})
 				return err
 			},
-			sendsMaxTimeMS: false,
+			sendsMaxTimeMS:                   false,
+			preventsConnClosureWithTimeoutMS: false,
 		},
 		{
 			desc:        "FindOneAndDelete",
@@ -80,7 +83,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 			operation: func(ctx context.Context, coll *mongo.Collection) error {
 				return coll.FindOneAndDelete(ctx, bson.D{}).Err()
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "FindOneAndUpdate",
@@ -92,7 +96,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 			operation: func(ctx context.Context, coll *mongo.Collection) error {
 				return coll.FindOneAndUpdate(ctx, bson.D{}, bson.M{"$set": bson.M{"key": "value"}}).Err()
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "FindOneAndReplace",
@@ -104,7 +109,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 			operation: func(ctx context.Context, coll *mongo.Collection) error {
 				return coll.FindOneAndReplace(ctx, bson.D{}, bson.D{}).Err()
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "InsertOne",
@@ -113,7 +119,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 				_, err := coll.InsertOne(ctx, bson.D{})
 				return err
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "InsertMany",
@@ -122,7 +129,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 				_, err := coll.InsertMany(ctx, []interface{}{bson.D{}})
 				return err
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "UpdateOne",
@@ -131,7 +139,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 				_, err := coll.UpdateOne(ctx, bson.D{}, bson.M{"$set": bson.M{"key": "value"}})
 				return err
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "UpdateMany",
@@ -140,7 +149,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 				_, err := coll.UpdateMany(ctx, bson.D{}, bson.M{"$set": bson.M{"key": "value"}})
 				return err
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "ReplaceOne",
@@ -149,7 +159,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 				_, err := coll.ReplaceOne(ctx, bson.D{}, bson.D{})
 				return err
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "DeleteOne",
@@ -158,7 +169,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 				_, err := coll.DeleteOne(ctx, bson.D{})
 				return err
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "DeleteMany",
@@ -168,6 +180,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 				return err
 			},
 			sendsMaxTimeMS: true,
+
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "Distinct",
@@ -175,7 +189,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 			operation: func(ctx context.Context, coll *mongo.Collection) error {
 				return coll.Distinct(ctx, "name", bson.D{}).Err()
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: true,
 		},
 		{
 			desc:        "Aggregate",
@@ -184,7 +199,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 				_, err := coll.Aggregate(ctx, mongo.Pipeline{})
 				return err
 			},
-			sendsMaxTimeMS: false,
+			sendsMaxTimeMS:                   false,
+			preventsConnClosureWithTimeoutMS: false,
 		},
 		{
 			desc:        "Watch",
@@ -196,7 +212,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 				}
 				return err
 			},
-			sendsMaxTimeMS: true,
+			sendsMaxTimeMS:                   true,
+			preventsConnClosureWithTimeoutMS: false,
 			// Change Streams aren't supported on standalone topologies.
 			topologies: []mtest.TopologyKind{
 				mtest.ReplicaSet,
@@ -218,7 +235,8 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 				var res []bson.D
 				return cursor.All(ctx, &res)
 			},
-			sendsMaxTimeMS: false,
+			sendsMaxTimeMS:                   false,
+			preventsConnClosureWithTimeoutMS: false,
 		},
 	}
 
@@ -348,56 +366,57 @@ func TestCSOT_maxTimeMS(t *testing.T) {
 					assertMaxTimeMSNotSet(mt, evt.Command)
 				}
 			})
+			if tc.preventsConnClosureWithTimeoutMS {
+				opts := mtest.NewOptions().
+					// Blocking failpoints don't work on pre-4.2 and sharded
+					// clusters.
+					Topologies(mtest.Single, mtest.ReplicaSet).
+					MinServerVersion("4.2")
+				mt.RunOpts("prevents connection closure", opts, func(mt *mtest.T) {
+					if tc.setup != nil {
+						err := tc.setup(mt.Coll)
+						require.NoError(mt, err)
+					}
 
-			opts := mtest.NewOptions().
-				// Blocking failpoints don't work on pre-4.2 and sharded
-				// clusters.
-				Topologies(mtest.Single, mtest.ReplicaSet).
-				MinServerVersion("4.2")
-			mt.RunOpts("prevents connection closure", opts, func(mt *mtest.T) {
-				if tc.setup != nil {
-					err := tc.setup(mt.Coll)
-					require.NoError(mt, err)
-				}
-
-				mt.SetFailPoint(failpoint.FailPoint{
-					ConfigureFailPoint: "failCommand",
-					Mode:               failpoint.ModeAlwaysOn,
-					Data: failpoint.Data{
-						FailCommands:    []string{tc.commandName},
-						BlockConnection: true,
-						// Note that some operations (currently Find and
-						// Aggregate) do not send maxTimeMS by default, meaning
-						// that the server will only respond after BlockTimeMS
-						// is elapsed. If the amount of time that the driver
-						// waits for responses after a timeout is significantly
-						// lower than BlockTimeMS, this test will start failing
-						// for those operations.
-						BlockTimeMS: 500,
-					},
-				})
-
-				tpm := eventtest.NewTestPoolMonitor()
-				mt.ResetClient(options.Client().
-					SetPoolMonitor(tpm.PoolMonitor))
-
-				// Run 5 operations that time out, then assert that no
-				// connections were closed.
-				for i := 0; i < 5; i++ {
-					ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond)
-					err := tc.operation(ctx, mt.Coll)
-					cancel()
-
-					if !mongo.IsTimeout(err) {
-						t.Logf("Operation %d returned a non-timeout error: %v", i, err)
+					mt.SetFailPoint(failpoint.FailPoint{
+						ConfigureFailPoint: "failCommand",
+						Mode:               failpoint.ModeAlwaysOn,
+						Data: failpoint.Data{
+							FailCommands:    []string{tc.commandName},
+							BlockConnection: true,
+							// Note that some operations (currently Find and
+							// Aggregate) do not send maxTimeMS by default, meaning
+							// that the server will only respond after BlockTimeMS
+							// is elapsed. If the amount of time that the driver
+							// waits for responses after a timeout is significantly
+							// lower than BlockTimeMS, this test will start failing
+							// for those operations.
+							BlockTimeMS: 500,
+						},
+					})
+
+					tpm := eventtest.NewTestPoolMonitor()
+					mt.ResetClient(options.Client().
+						SetPoolMonitor(tpm.PoolMonitor))
+
+					// Run 5 operations that time out, then assert that no
+					// connections were closed.
+					for i := 0; i < 5; i++ {
+						ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond)
+						err := tc.operation(ctx, mt.Coll)
+						cancel()
+
+						if !mongo.IsTimeout(err) {
+							t.Logf("Operation %d returned a non-timeout error: %v", i, err)
+						}
 					}
-				}
 
-				closedEvents := tpm.Events(func(pe *event.PoolEvent) bool {
-					return pe.Type == event.ConnectionClosed
+					closedEvents := tpm.Events(func(pe *event.PoolEvent) bool {
+						return pe.Type == event.ConnectionClosed
+					})
+					assert.Len(mt, closedEvents, 0, "expected no connection closed event")
 				})
-				assert.Len(mt, closedEvents, 0, "expected no connection closed event")
-			})
+			}
 		})
 	}
 
diff --git a/internal/integration/mtest/mongotest.go b/internal/integration/mtest/mongotest.go
index 3967bf7f82..901d71a3ff 100644
--- a/internal/integration/mtest/mongotest.go
+++ b/internal/integration/mtest/mongotest.go
@@ -55,7 +55,10 @@ type T struct {
 	// It must be accessed using the atomic package and should be at the beginning of the struct.
 	// - atomic bug: https://pkg.go.dev/sync/atomic#pkg-note-BUG
 	// - suggested layout: https://go101.org/article/memory-layout.html
-	connsCheckedOut int64
+	connsCheckedOut          int64
+	connPendingReadStarted   int64
+	connPendingReadSucceeded int64
+	connPendingReadFailed    int64
 
 	*testing.T
 
@@ -348,6 +351,20 @@ func (t *T) NumberConnectionsCheckedOut() int {
 	return int(atomic.LoadInt64(&t.connsCheckedOut))
 }
 
+// NumberConnectionsPendingReadStarted returns the number of connections that have
+// started a pending read.
+func (t *T) NumberConnectionsPendingReadStarted() int {
+	return int(atomic.LoadInt64(&t.connPendingReadStarted))
+}
+
+func (t *T) NumberConnectionsPendingReadSucceeded() int {
+	return int(atomic.LoadInt64(&t.connPendingReadSucceeded))
+}
+
+func (t *T) NumberConnectionsPendingReadFailed() int {
+	return int(atomic.LoadInt64(&t.connPendingReadFailed))
+}
+
 // ClearEvents clears the existing command monitoring events.
 func (t *T) ClearEvents() {
 	t.started = t.started[:0]
@@ -640,6 +657,12 @@ func (t *T) createTestClient() {
 					atomic.AddInt64(&t.connsCheckedOut, 1)
 				case event.ConnectionCheckedIn:
 					atomic.AddInt64(&t.connsCheckedOut, -1)
+				case event.ConnectionPendingReadStarted:
+					atomic.AddInt64(&t.connPendingReadStarted, 1)
+				case event.ConnectionPendingReadSucceeded:
+					atomic.AddInt64(&t.connPendingReadSucceeded, 1)
+				case event.ConnectionCheckOutFailed:
+					atomic.AddInt64(&t.connPendingReadFailed, 1)
 				}
 			},
 		})
diff --git a/internal/logger/component.go b/internal/logger/component.go
index a601707cbf..5abc3f5f79 100644
--- a/internal/logger/component.go
+++ b/internal/logger/component.go
@@ -28,6 +28,9 @@ const (
 	ConnectionCheckoutFailed         = "Connection checkout failed"
 	ConnectionCheckedOut             = "Connection checked out"
 	ConnectionCheckedIn              = "Connection checked in"
+	ConnectionPendingReadStarted     = "Pending read started"
+	ConnectionPendingReadSucceeded   = "Pending read succeeded"
+	ConnectionPendingReadFailed      = "Pending read failed"
 	ServerSelectionFailed            = "Server selection failed"
 	ServerSelectionStarted           = "Server selection started"
 	ServerSelectionSucceeded         = "Server selection succeeded"
diff --git a/testdata/client-side-operations-timeout/pending-reads.json b/testdata/client-side-operations-timeout/pending-reads.json
new file mode 100644
index 0000000000..43b2745c7c
--- /dev/null
+++ b/testdata/client-side-operations-timeout/pending-reads.json
@@ -0,0 +1,519 @@
+{
+  "description": "Operation timeouts do not cause connection churn",
+  "schemaVersion": "1.9",
+  "runOnRequirements": [
+    {
+      "minServerVersion": "4.4",
+      "topologies": [
+        "standalone",
+        "replicaset"
+      ]
+    }
+  ],
+  "createEntities": [
+    {
+      "client": {
+        "id": "failPointClient",
+        "useMultipleMongoses": false
+      }
+    },
+    {
+      "client": {
+        "id": "client",
+        "uriOptions": {
+          "maxPoolSize": 1
+        },
+        "useMultipleMongoses": false,
+        "observeEvents": [
+          "commandFailedEvent",
+          "commandSucceededEvent",
+          "connectionCheckedOutEvent",
+          "connectionCheckedInEvent",
+          "connectionClosedEvent"
+        ]
+      }
+    },
+    {
+      "database": {
+        "id": "test",
+        "client": "client",
+        "databaseName": "test"
+      }
+    },
+    {
+      "collection": {
+        "id": "coll",
+        "database": "test",
+        "collectionName": "coll"
+      }
+    }
+  ],
+  "initialData": [
+    {
+      "collectionName": "coll",
+      "databaseName": "test",
+      "documents": []
+    }
+  ],
+  "tests": [
+    {
+      "description": "Write operation with successful pending read",
+      "operations": [
+        {
+          "name": "failPoint",
+          "object": "testRunner",
+          "arguments": {
+            "client": "failPointClient",
+            "failPoint": {
+              "configureFailPoint": "failCommand",
+              "mode": {
+                "times": 1
+              },
+              "data": {
+                "failCommands": [
+                  "insert"
+                ],
+                "blockConnection": true,
+                "blockTimeMS": 750
+              }
+            }
+          }
+        },
+        {
+          "name": "insertOne",
+          "object": "coll",
+          "arguments": {
+            "timeoutMS": 50,
+            "document": {
+              "_id": 3,
+              "x": 1
+            }
+          },
+          "expectError": {
+            "isTimeoutError": true
+          }
+        },
+        {
+          "name": "findOne",
+          "object": "coll",
+          "arguments": {
+            "filter": {
+              "_id": 1
+            }
+          }
+        }
+      ],
+      "expectEvents": [
+        {
+          "client": "client",
+          "events": [
+            {
+              "commandFailedEvent": {
+                "commandName": "insert"
+              }
+            },
+            {
+              "commandSucceededEvent": {
+                "commandName": "find"
+              }
+            }
+          ]
+        },
+        {
+          "client": "client",
+          "eventType": "cmap",
+          "events": [
+            {
+              "connectionCheckedOutEvent": {}
+            },
+            {
+              "connectionCheckedInEvent": {}
+            },
+            {
+              "connectionCheckedOutEvent": {}
+            },
+            {
+              "connectionCheckedInEvent": {}
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "description": "Concurrent write operation with successful pending read",
+      "operations": [
+        {
+          "name": "failPoint",
+          "object": "testRunner",
+          "arguments": {
+            "client": "failPointClient",
+            "failPoint": {
+              "configureFailPoint": "failCommand",
+              "mode": {
+                "times": 1
+              },
+              "data": {
+                "failCommands": [
+                  "insert"
+                ],
+                "blockConnection": true,
+                "blockTimeMS": 750
+              }
+            }
+          }
+        },
+        {
+          "name": "createEntities",
+          "object": "testRunner",
+          "arguments": {
+            "entities": [
+              {
+                "thread": {
+                  "id": "thread0"
+                }
+              },
+              {
+                "thread": {
+                  "id": "thread1"
+                }
+              }
+            ]
+          }
+        },
+        {
+          "name": "runOnThread",
+          "object": "testRunner",
+          "arguments": {
+            "thread": "thread0",
+            "operation": {
+              "name": "insertOne",
+              "object": "coll",
+              "arguments": {
+                "timeoutMS": 500,
+                "document": {
+                  "_id": 2
+                }
+              }
+            },
+            "expectError": {
+              "isTimeoutError": true
+            }
+          }
+        },
+        {
+          "name": "waitForEvent",
+          "object": "testRunner",
+          "arguments": {
+            "client": "client",
+            "event": {
+              "connectionCheckedOutEvent": {}
+            },
+            "count": 1
+          }
+        },
+        {
+          "name": "runOnThread",
+          "object": "testRunner",
+          "arguments": {
+            "thread": "thread1",
+            "operation": {
+              "name": "insertOne",
+              "object": "coll",
+              "arguments": {
+                "document": {
+                  "_id": 3
+                }
+              }
+            }
+          }
+        },
+        {
+          "name": "waitForThread",
+          "object": "testRunner",
+          "arguments": {
+            "thread": "thread1"
+          }
+        }
+      ],
+      "expectEvents": [
+        {
+          "client": "client",
+          "events": [
+            {
+              "commandFailedEvent": {
+                "commandName": "insert"
+              }
+            },
+            {
+              "commandSucceededEvent": {
+                "commandName": "insert"
+              }
+            }
+          ]
+        },
+        {
+          "client": "client",
+          "eventType": "cmap",
+          "events": [
+            {
+              "connectionCheckedOutEvent": {}
+            },
+            {
+              "connectionCheckedInEvent": {}
+            },
+            {
+              "connectionCheckedOutEvent": {}
+            },
+            {
+              "connectionCheckedInEvent": {}
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "description": "Write operation with unsuccessful pending read",
+      "operations": [
+        {
+          "name": "failPoint",
+          "object": "testRunner",
+          "arguments": {
+            "client": "failPointClient",
+            "failPoint": {
+              "configureFailPoint": "failCommand",
+              "mode": {
+                "times": 1
+              },
+              "data": {
+                "failCommands": [
+                  "insert"
+                ],
+                "blockConnection": true,
+                "blockTimeMS": 2100
+              }
+            }
+          }
+        },
+        {
+          "name": "insertOne",
+          "object": "coll",
+          "arguments": {
+            "timeoutMS": 50,
+            "document": {
+              "_id": 3,
+              "x": 1
+            }
+          },
+          "expectError": {
+            "isTimeoutError": true
+          }
+        },
+        {
+          "name": "insertOne",
+          "object": "coll",
+          "arguments": {
+            "timeoutMS": 2000,
+            "document": {
+              "_id": 3,
+              "x": 1
+            }
+          },
+          "expectError": {
+            "isTimeoutError": true
+          }
+        }
+      ],
+      "expectEvents": [
+        {
+          "client": "client",
+          "events": [
+            {
+              "commandFailedEvent": {
+                "commandName": "insert"
+              }
+            }
+          ]
+        },
+        {
+          "client": "client",
+          "eventType": "cmap",
+          "events": [
+            {
+              "connectionCheckedOutEvent": {}
+            },
+            {
+              "connectionCheckedInEvent": {}
+            },
+            {
+              "connectionClosedEvent": {
+                "reason": "error"
+              }
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "description": "Read operation with successful pending read",
+      "operations": [
+        {
+          "name": "failPoint",
+          "object": "testRunner",
+          "arguments": {
+            "client": "failPointClient",
+            "failPoint": {
+              "configureFailPoint": "failCommand",
+              "mode": {
+                "times": 1
+              },
+              "data": {
+                "failCommands": [
+                  "find"
+                ],
+                "blockConnection": true,
+                "blockTimeMS": 750
+              }
+            }
+          }
+        },
+        {
+          "name": "findOne",
+          "object": "coll",
+          "arguments": {
+            "timeoutMS": 50,
+            "filter": {
+              "_id": 1
+            }
+          },
+          "expectError": {
+            "isTimeoutError": true
+          }
+        },
+        {
+          "name": "findOne",
+          "object": "coll",
+          "arguments": {
+            "filter": {
+              "_id": 1
+            }
+          }
+        }
+      ],
+      "expectEvents": [
+        {
+          "client": "client",
+          "events": [
+            {
+              "commandFailedEvent": {
+                "commandName": "find"
+              }
+            },
+            {
+              "commandSucceededEvent": {
+                "commandName": "find"
+              }
+            }
+          ]
+        },
+        {
+          "client": "client",
+          "eventType": "cmap",
+          "events": [
+            {
+              "connectionCheckedOutEvent": {}
+            },
+            {
+              "connectionCheckedInEvent": {}
+            },
+            {
+              "connectionCheckedOutEvent": {}
+            },
+            {
+              "connectionCheckedInEvent": {}
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "description": "Read operation with unsuccessful pending read",
+      "operations": [
+        {
+          "name": "failPoint",
+          "object": "testRunner",
+          "arguments": {
+            "client": "failPointClient",
+            "failPoint": {
+              "configureFailPoint": "failCommand",
+              "mode": {
+                "times": 1
+              },
+              "data": {
+                "failCommands": [
+                  "find"
+                ],
+                "blockConnection": true,
+                "blockTimeMS": 2100
+              }
+            }
+          }
+        },
+        {
+          "name": "findOne",
+          "object": "coll",
+          "arguments": {
+            "timeoutMS": 50,
+            "filter": {
+              "_id": 1
+            }
+          },
+          "expectError": {
+            "isTimeoutError": true
+          }
+        },
+        {
+          "name": "findOne",
+          "object": "coll",
+          "arguments": {
+            "timeoutMS": 2000,
+            "filter": {
+              "_id": 1
+            }
+          },
+          "expectError": {
+            "isTimeoutError": true
+          }
+        }
+      ],
+      "expectEvents": [
+        {
+          "client": "client",
+          "events": [
+            {
+              "commandFailedEvent": {
+                "commandName": "find"
+              }
+            }
+          ]
+        },
+        {
+          "client": "client",
+          "eventType": "cmap",
+          "events": [
+            {
+              "connectionCheckedOutEvent": {}
+            },
+            {
+              "connectionCheckedInEvent": {}
+            },
+            {
+              "connectionClosedEvent": {
+                "reason": "error"
+              }
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}
diff --git a/testdata/client-side-operations-timeout/pending-reads.yml b/testdata/client-side-operations-timeout/pending-reads.yml
new file mode 100644
index 0000000000..c07728009e
--- /dev/null
+++ b/testdata/client-side-operations-timeout/pending-reads.yml
@@ -0,0 +1,312 @@
+description: "Operation timeouts do not cause connection churn"
+
+schemaVersion: "1.9"
+
+runOnRequirements:
+  - minServerVersion: "4.4"
+    # TODO(SERVER-96344): When using failpoints, mongos returns MaxTimeMSExpired 
+    # after maxTimeMS, whereas mongod returns it after 
+    # max(blockTimeMS, maxTimeMS).  Until this ticket is resolved, these tests 
+    # will not pass on sharded clusters.
+    topologies: ["standalone", "replicaset"]
+
+createEntities:
+  - client:
+      id: &failPointClient failPointClient
+      useMultipleMongoses: false
+  - client:
+      id: &client client
+      uriOptions:
+        maxPoolSize: 1
+      useMultipleMongoses: false
+      observeEvents:
+        - commandFailedEvent
+        - commandSucceededEvent
+        - connectionCheckedOutEvent
+        - connectionCheckedInEvent
+        - connectionClosedEvent
+  - database:
+      id: &database test
+      client: *client
+      databaseName: *database
+  - collection:
+      id: &collection coll
+      database: *database
+      collectionName: *collection
+
+initialData:
+  - collectionName: *collection
+    databaseName: *database
+    documents: []
+
+tests:
+  - description: "Write operation with successful pending read"
+    operations:
+      # Create a failpoint to block the first operation
+      - name: failPoint
+        object: testRunner
+        arguments:
+          client: *failPointClient
+          failPoint:
+            configureFailPoint: failCommand
+            mode: { times: 1 }
+            data:
+              failCommands: ["insert"]
+              blockConnection: true
+              blockTimeMS: 750
+
+      # Execute operation with timeout less than block time
+      - name: insertOne
+        object: *collection
+        arguments:
+          timeoutMS: 50
+          document: { _id: 3, x: 1 }
+        expectError:
+          isTimeoutError: true
+
+      # Execute a subsequent operation to complete the read
+      - name: findOne
+        object: *collection
+        arguments:
+          filter: { _id: 1 }
+
+    expectEvents:
+      - client: *client
+        events:
+          - commandFailedEvent:
+              commandName: insert
+          - commandSucceededEvent:
+              commandName: find
+      - client: *client
+        eventType: cmap
+        events:
+          - connectionCheckedOutEvent: {} # insert
+          - connectionCheckedInEvent: {}  # insert fails
+          - connectionCheckedOutEvent: {} # find
+          - connectionCheckedInEvent: {}  # find succeeds
+
+  - description: "Concurrent write operation with successful pending read"
+    operations:
+      # Create a failpoint to block the first operation
+      - name: failPoint
+        object: testRunner
+        arguments:
+          client: *failPointClient
+          failPoint:
+            configureFailPoint: failCommand
+            mode: { times: 1 }
+            data:
+              failCommands: ["insert"]
+              blockConnection: true
+              blockTimeMS: 750
+
+      # Start threads.
+      - name: createEntities
+        object: testRunner
+        arguments:
+          entities:
+            - thread:
+                id: &thread0 thread0
+            - thread:
+                id: &thread1 thread1
+
+      # Run an insert in two threads. We expect the first to time out and the 
+      # second to finish the pending read from the first and complete 
+      # successfully.
+      - name: runOnThread
+        object: testRunner
+        arguments:
+          thread: *thread0
+          operation:
+            name: insertOne
+            object: *collection
+            arguments:
+              timeoutMS: 500
+              document:
+                _id: 2
+          expectError:
+            isTimeoutError: true
+
+      # Ensure the first thread checks out a connection before executing the 
+      # operation in the second thread. This maintains concurrent behavior but 
+      # presents the worst case scenario.
+      - name: waitForEvent
+        object: testRunner
+        arguments:
+          client: *client
+          event:
+            connectionCheckedOutEvent: {}
+          count: 1
+
+      - name: runOnThread
+        object: testRunner
+        arguments:
+          thread: *thread1
+          operation:
+            name: insertOne
+            object: *collection
+            arguments:
+              document:
+                _id: 3
+
+      # Stop threads.
+      - name: waitForThread
+        object: testRunner
+        arguments:
+          thread: *thread1
+
+    expectEvents:
+      - client: *client
+        events:
+          - commandFailedEvent:
+              commandName: insert
+          - commandSucceededEvent:
+              commandName: insert
+      - client: *client
+        eventType: cmap
+        events:
+          - connectionCheckedOutEvent: {} # insert
+          - connectionCheckedInEvent: {}  # insert fails
+          - connectionCheckedOutEvent: {} # find
+          - connectionCheckedInEvent: {}  # find succeeds
+
+  - description: "Write operation with unsuccessful pending read"
+    operations:
+      # Create a failpoint to block the first operation
+      - name: failPoint
+        object: testRunner
+        arguments:
+          client: *failPointClient
+          failPoint:
+            configureFailPoint: failCommand
+            mode: { times: 1 }
+            data:
+              failCommands: ["insert"]
+              blockConnection: true
+              blockTimeMS: 2100
+
+      # Execute operation with timeout less than block time
+      - name: insertOne
+        object: *collection
+        arguments:
+          timeoutMS: 50
+          document: { _id: 3, x: 1 }
+        expectError:
+          isTimeoutError: true
+
+      # The pending read should fail
+      - name: insertOne
+        object: *collection
+        arguments:
+          timeoutMS: 2000
+          document: { _id: 3, x: 1 }
+        expectError:
+          isTimeoutError: true
+
+    expectEvents:
+      - client: *client
+        events:
+          - commandFailedEvent:
+              commandName: insert
+          # No second failed event since we timed out attempting to check out 
+          # the connection for the second operation
+      - client: *client
+        eventType: cmap
+        events:
+          - connectionCheckedOutEvent: {} # first insert
+          - connectionCheckedInEvent: {}  # first insert fails
+          - connectionClosedEvent:        # second insert times out pending read in checkout, closes
+              reason: error
+
+  - description: "Read operation with successful pending read"
+    operations:
+      # Create a failpoint to block the first operation
+      - name: failPoint
+        object: testRunner
+        arguments:
+          client: *failPointClient
+          failPoint:
+            configureFailPoint: failCommand
+            mode: { times: 1 }
+            data:
+              failCommands: ["find"]
+              blockConnection: true
+              blockTimeMS: 750
+
+      # Execute operation with timeout less than block time
+      - name: findOne
+        object: *collection
+        arguments:
+          timeoutMS: 50
+          filter: { _id: 1 }
+        expectError:
+          isTimeoutError: true
+
+      # Execute a subsequent operation to complete the read
+      - name: findOne
+        object: *collection
+        arguments:
+          filter: { _id: 1 }
+
+    expectEvents:
+      - client: *client
+        events:
+          - commandFailedEvent:
+              commandName: find
+          - commandSucceededEvent:
+              commandName: find
+      - client: *client
+        eventType: cmap
+        events:
+          - connectionCheckedOutEvent: {} # first find
+          - connectionCheckedInEvent: {}  # first find fails
+          - connectionCheckedOutEvent: {} # second find
+          - connectionCheckedInEvent: {}  # second find succeeds
+
+  - description: "Read operation with unsuccessful pending read"
+    operations:
+      # Create a failpoint to block the first operation
+      - name: failPoint
+        object: testRunner
+        arguments:
+          client: *failPointClient
+          failPoint:
+            configureFailPoint: failCommand
+            mode: { times: 1 }
+            data:
+              failCommands: ["find"]
+              blockConnection: true
+              blockTimeMS: 2100
+
+      # Execute operation with timeout less than block time
+      - name: findOne
+        object: *collection
+        arguments:
+          timeoutMS: 50
+          filter: { _id: 1 }
+        expectError:
+          isTimeoutError: true
+
+      # The pending read should fail
+      - name: findOne
+        object: *collection
+        arguments:
+          timeoutMS: 2000
+          filter: { _id: 1 }
+        expectError:
+          isTimeoutError: true
+
+    expectEvents:
+      - client: *client
+        events:
+          - commandFailedEvent:
+              commandName: find
+          # No second failed event since we timed out attempting to check out 
+          # the connection for the second operation
+      - client: *client
+        eventType: cmap
+        events:
+          - connectionCheckedOutEvent: {} # first find
+          - connectionCheckedInEvent: {}  # first find fails
+          - connectionClosedEvent:        # second find times out pending read in checkout, closes
+              reason: error
diff --git a/x/mongo/driver/operation.go b/x/mongo/driver/operation.go
index 2597a5de66..cc9f631a99 100644
--- a/x/mongo/driver/operation.go
+++ b/x/mongo/driver/operation.go
@@ -785,6 +785,14 @@ func (op Operation) Execute(ctx context.Context) error {
 			if moreToCome {
 				roundTrip = op.moreToComeRoundTrip
 			}
+
+			// Set context values to handle a pending read in case of a socket
+			// timeout.
+			if maxTimeMS != 0 {
+				ctx = driverutil.WithValueHasMaxTimeMS(ctx, true)
+				ctx = driverutil.WithRequestID(ctx, startedInfo.requestID)
+			}
+
 			res, err = roundTrip(ctx, conn, *wm)
 
 			if ep, ok := srvr.(ErrorProcessor); ok {
diff --git a/x/mongo/driver/topology/connection.go b/x/mongo/driver/topology/connection.go
index 24ad6a3a51..85d2aecf9c 100644
--- a/x/mongo/driver/topology/connection.go
+++ b/x/mongo/driver/topology/connection.go
@@ -47,6 +47,12 @@ var (
 
 func nextConnectionID() uint64 { return atomic.AddUint64(&globalConnectionID, 1) }
 
+type pendingReadState struct {
+	remainingBytes int32
+	requestID      int32
+	remainingTime  *time.Duration
+}
+
 type connection struct {
 	// state must be accessed using the atomic package and should be at the beginning of the struct.
 	// - atomic bug: https://pkg.go.dev/sync/atomic#pkg-note-BUG
@@ -82,9 +88,11 @@ type connection struct {
 	// accessTokens in the OIDC authenticator cache.
 	oidcTokenGenID uint64
 
-	// awaitRemainingBytes indicates the size of server response that was not completely
-	// read before returning the connection to the pool.
-	awaitRemainingBytes *int32
+	// pendingReadState contains information required to attempt a pending read
+	// in the event of a socket timeout for an operation that has appended
+	// maxTimeMS to the wire message.
+	pendingReadState *pendingReadState
+	pendingReadMu    sync.Mutex
 }
 
 // newConnection handles the creation of a connection. It does not connect the connection.
@@ -407,11 +415,14 @@ func (c *connection) readWireMessage(ctx context.Context) ([]byte, error) {
 
 	dst, errMsg, err := c.read(ctx)
 	if err != nil {
-		if c.awaitRemainingBytes == nil {
-			// If the connection was not marked as awaiting response, close the
-			// connection because we don't know what the connection state is.
+		c.pendingReadMu.Lock()
+		if c.pendingReadState == nil {
+			// If there is no pending read on the connection, use the pre-CSOT
+			// behavior and close the connection because we don't know if there are
+			// other bytes left to read.
 			c.close()
 		}
+		c.pendingReadMu.Unlock()
 		message := errMsg
 		if errors.Is(err, io.EOF) {
 			message = "socket was unexpectedly closed"
@@ -476,8 +487,15 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string,
 	// reading messages from an exhaust cursor.
 	n, err := io.ReadFull(c.nc, sizeBuf[:])
 	if err != nil {
-		if l := int32(n); l == 0 && isCSOTTimeout(err) {
-			c.awaitRemainingBytes = &l
+		if l := int32(n); l == 0 && isCSOTTimeout(err) && driverutil.HasMaxTimeMS(ctx) {
+			requestID, _ := driverutil.GetRequestID(ctx)
+
+			c.pendingReadMu.Lock()
+			c.pendingReadState = &pendingReadState{
+				remainingBytes: l,
+				requestID:      requestID,
+			}
+			c.pendingReadMu.Unlock()
 		}
 		return nil, "incomplete read of message header", err
 	}
@@ -492,8 +510,15 @@ func (c *connection) read(ctx context.Context) (bytesRead []byte, errMsg string,
 	n, err = io.ReadFull(c.nc, dst[4:])
 	if err != nil {
 		remainingBytes := size - 4 - int32(n)
-		if remainingBytes > 0 && isCSOTTimeout(err) {
-			c.awaitRemainingBytes = &remainingBytes
+		if remainingBytes > 0 && isCSOTTimeout(err) && driverutil.HasMaxTimeMS(ctx) {
+			requestID, _ := driverutil.GetRequestID(ctx)
+
+			c.pendingReadMu.Lock()
+			c.pendingReadState = &pendingReadState{
+				remainingBytes: remainingBytes,
+				requestID:      requestID,
+			}
+			c.pendingReadMu.Unlock()
 		}
 		return dst, "incomplete read of full message", err
 	}
diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go
index d6568e844f..7a596cc77f 100644
--- a/x/mongo/driver/topology/pool.go
+++ b/x/mongo/driver/topology/pool.go
@@ -8,6 +8,7 @@ package topology
 
 import (
 	"context"
+	"errors"
 	"fmt"
 	"io"
 	"net"
@@ -18,6 +19,7 @@ import (
 	"go.mongodb.org/mongo-driver/v2/bson"
 	"go.mongodb.org/mongo-driver/v2/event"
 	"go.mongodb.org/mongo-driver/v2/internal/logger"
+	"go.mongodb.org/mongo-driver/v2/internal/ptrutil"
 	"go.mongodb.org/mongo-driver/v2/mongo/address"
 	"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
 )
@@ -576,6 +578,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
 			return nil, w.err
 		}
 
+		if err := awaitPendingRead(ctx, p, w.conn); err != nil {
+			return nil, err
+		}
+
 		duration = time.Since(start)
 		if mustLogPoolMessage(p) {
 			keysAndValues := logger.KeyValues{
@@ -632,6 +638,10 @@ func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
 			return nil, w.err
 		}
 
+		if err := awaitPendingRead(ctx, p, w.conn); err != nil {
+			return nil, err
+		}
+
 		duration := time.Since(start)
 		if mustLogPoolMessage(p) {
 			keysAndValues := logger.KeyValues{
@@ -771,82 +781,190 @@ func (p *pool) removeConnection(conn *connection, reason reason, err error) erro
 	return nil
 }
 
-var (
-	// BGReadTimeout is the maximum amount of the to wait when trying to read
-	// the server reply on a connection after an operation timed out. The
-	// default is 400ms.
-	//
-	// Deprecated: BGReadTimeout is intended for internal use only and may be
-	// removed or modified at any time.
-	BGReadTimeout = 400 * time.Millisecond
+// PendingReadTimeout is the maximum amount of the to wait when trying to read
+// the server reply on a connection after an operation timed out. The
+// default is 1 second.
+//
+// Deprecated: PendingReadTimeout is intended for internal use only and may be
+// removed or modified at any time.
+var PendingReadTimeout = 2000 * time.Millisecond
+
+// awaitPendingRead sets a new read deadline on the provided connection and
+// tries to read any bytes returned by the server. If there are any errors, the
+// connection will be checked back into the pool to be retried.
+func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error {
+	conn.pendingReadMu.Lock()
+	defer conn.pendingReadMu.Unlock()
+
+	// If there are no bytes pending read, do nothing.
+	if conn.pendingReadState == nil {
+		return nil
+	}
 
-	// BGReadCallback is a callback for monitoring the behavior of the
-	// background-read-on-timeout connection preserving mechanism.
-	//
-	// Deprecated: BGReadCallback is intended for internal use only and may be
-	// removed or modified at any time.
-	BGReadCallback func(addr string, start, read time.Time, errs []error, connClosed bool)
-)
+	prs := conn.pendingReadState
+	if prs.remainingTime == nil {
+		prs.remainingTime = ptrutil.Ptr(PendingReadTimeout)
+	}
 
-// bgRead sets a new read deadline on the provided connection and tries to read
-// any bytes returned by the server. If successful, it checks the connection
-// into the provided pool. If there are any errors, it closes the connection.
-//
-// It calls the package-global BGReadCallback function, if set, with the
-// address, timings, and any errors that occurred.
-func bgRead(pool *pool, conn *connection, size int32) {
-	var err error
-	start := time.Now()
+	if mustLogPoolMessage(pool) {
+		keysAndValues := logger.KeyValues{
+			logger.KeyDriverConnectionID, conn.driverConnectionID,
+			logger.KeyRequestID, prs.requestID,
+		}
+
+		logPoolMessage(pool, logger.ConnectionPendingReadStarted, keysAndValues...)
+	}
+
+	if pool.monitor != nil {
+		event := &event.PoolEvent{
+			Type:         event.ConnectionPendingReadStarted,
+			ConnectionID: conn.driverConnectionID,
+			RequestID:    prs.requestID,
+		}
+
+		pool.monitor.Event(event)
+	}
+
+	size := prs.remainingBytes
+
+	checkIn := false
+	var someErr error
 
 	defer func() {
-		read := time.Now()
-		errs := make([]error, 0)
-		connClosed := false
-		if err != nil {
-			errs = append(errs, err)
-			connClosed = true
-			err = conn.close()
-			if err != nil {
-				errs = append(errs, fmt.Errorf("error closing conn after reading: %w", err))
+		if mustLogPoolMessage(pool) && someErr != nil {
+			keysAndValues := logger.KeyValues{
+				logger.KeyDriverConnectionID, conn.driverConnectionID,
+				logger.KeyRequestID, prs.requestID,
+				logger.KeyReason, someErr.Error(),
+				logger.KeyRemainingTimeMS, *prs.remainingTime,
 			}
+
+			logPoolMessage(pool, logger.ConnectionPendingReadFailed, keysAndValues...)
+		}
+
+		if pool.monitor != nil && someErr != nil {
+			event := &event.PoolEvent{
+				Type:          event.ConnectionPendingReadFailed,
+				Address:       pool.address.String(),
+				ConnectionID:  conn.driverConnectionID,
+				RequestID:     prs.requestID,
+				RemainingTime: *prs.remainingTime,
+				Reason:        someErr.Error(),
+				Error:         someErr,
+			}
+
+			pool.monitor.Event(event)
+		}
+
+		// If we have exceeded the time limit, then close the connection.
+		if prs.remainingTime != nil && *prs.remainingTime < 0 {
+			if err := conn.close(); err != nil {
+				panic(err)
+			}
+
+			return
+		}
+
+		if !checkIn {
+			return
 		}
 
 		// No matter what happens, always check the connection back into the
 		// pool, which will either make it available for other operations or
 		// remove it from the pool if it was closed.
-		err = pool.checkInNoEvent(conn)
-		if err != nil {
-			errs = append(errs, fmt.Errorf("error checking in: %w", err))
-		}
+		//
+		// TODO(GODRIVER-3385): Figure out how to handle this error. It's possible
+		// that a single connection can be checked out to handle multiple concurrent
+		// operations. This is likely a bug in the Go Driver. So it's possible that
+		// the connection is idle at the point of check-in.
+		_ = pool.checkInNoEvent(conn)
+	}()
 
-		if BGReadCallback != nil {
-			BGReadCallback(conn.addr.String(), start, read, errs, connClosed)
+	dl, contextDeadlineUsed := ctx.Deadline()
+	if !contextDeadlineUsed {
+		// If there is a remainingTime, use that. If not, use the static
+		// PendingReadTimeout. This is required since a user could provide a timeout
+		// for the first try that does not exceed the pending read timeout, fail,
+		// and then not use a timeout for a subsequent try.
+		if prs.remainingTime != nil {
+			dl = time.Now().Add(*prs.remainingTime)
+		} else {
+			dl = time.Now().Add(PendingReadTimeout)
 		}
-	}()
+	}
 
-	err = conn.nc.SetReadDeadline(time.Now().Add(BGReadTimeout))
+	err := conn.nc.SetReadDeadline(dl)
 	if err != nil {
-		err = fmt.Errorf("error setting a read deadline: %w", err)
-		return
+		checkIn = true
+
+		someErr = fmt.Errorf("error setting a read deadline: %w", err)
+
+		return someErr
 	}
 
-	if size == 0 {
+	st := time.Now()
+
+	if size == 0 { // Question: Would this alawys equal to zero?
 		var sizeBuf [4]byte
-		_, err = io.ReadFull(conn.nc, sizeBuf[:])
-		if err != nil {
-			err = fmt.Errorf("error reading the message size: %w", err)
-			return
+		if _, err := io.ReadFull(conn.nc, sizeBuf[:]); err != nil {
+			prs.remainingTime = ptrutil.Ptr(*prs.remainingTime - time.Since(st))
+			checkIn = true
+
+			err = transformNetworkError(ctx, err, contextDeadlineUsed)
+			someErr = fmt.Errorf("error reading the message size: %w", err)
+
+			return someErr
 		}
 		size, err = conn.parseWmSizeBytes(sizeBuf)
 		if err != nil {
-			return
+			checkIn = true
+			someErr = transformNetworkError(ctx, err, contextDeadlineUsed)
+
+			return someErr
 		}
 		size -= 4
 	}
-	_, err = io.CopyN(io.Discard, conn.nc, int64(size))
+
+	n, err := io.CopyN(io.Discard, conn.nc, int64(size))
 	if err != nil {
-		err = fmt.Errorf("error discarding %d byte message: %w", size, err)
+		// If the read times out, record the bytes left to read before exiting.
+		nerr := net.Error(nil)
+		if l := int32(n); l == 0 && errors.As(err, &nerr) && nerr.Timeout() {
+			prs.remainingBytes = l + prs.remainingBytes
+			prs.remainingTime = ptrutil.Ptr(*prs.remainingTime - time.Since(st))
+		}
+
+		checkIn = true
+
+		err = transformNetworkError(ctx, err, contextDeadlineUsed)
+		someErr = fmt.Errorf("error discarding %d byte message: %w", size, err)
+
+		return someErr
 	}
+
+	if mustLogPoolMessage(pool) {
+		keysAndValues := logger.KeyValues{
+			logger.KeyDriverConnectionID, conn.driverConnectionID,
+			logger.KeyRequestID, prs.requestID,
+		}
+
+		logPoolMessage(pool, logger.ConnectionPendingReadSucceeded, keysAndValues...)
+	}
+
+	if pool.monitor != nil {
+		event := &event.PoolEvent{
+			Type:         event.ConnectionPendingReadSucceeded,
+			Address:      pool.address.String(),
+			ConnectionID: conn.driverConnectionID,
+			Duration:     time.Since(st),
+		}
+
+		pool.monitor.Event(event)
+	}
+
+	conn.pendingReadState = nil
+
+	return nil
 }
 
 // checkIn returns an idle connection to the pool. If the connection is perished or the pool is
@@ -888,21 +1006,6 @@ func (p *pool) checkInNoEvent(conn *connection) error {
 		return ErrWrongPool
 	}
 
-	// If the connection has an awaiting server response, try to read the
-	// response in another goroutine before checking it back into the pool.
-	//
-	// Do this here because we want to publish checkIn events when the operation
-	// is done with the connection, not when it's ready to be used again. That
-	// means that connections in "awaiting response" state are checked in but
-	// not usable, which is not covered by the current pool events. We may need
-	// to add pool event information in the future to communicate that.
-	if conn.awaitRemainingBytes != nil {
-		size := *conn.awaitRemainingBytes
-		conn.awaitRemainingBytes = nil
-		go bgRead(p, conn, size)
-		return nil
-	}
-
 	// Bump the connection idle start time here because we're about to make the
 	// connection "available". The idle start time is used to determine how long
 	// a connection has been idle and when it has reached its max idle time and
diff --git a/x/mongo/driver/topology/pool_test.go b/x/mongo/driver/topology/pool_test.go
index 3d270de2e0..45640667f8 100644
--- a/x/mongo/driver/topology/pool_test.go
+++ b/x/mongo/driver/topology/pool_test.go
@@ -18,6 +18,7 @@ import (
 	"go.mongodb.org/mongo-driver/v2/event"
 	"go.mongodb.org/mongo-driver/v2/internal/assert"
 	"go.mongodb.org/mongo-driver/v2/internal/csot"
+	"go.mongodb.org/mongo-driver/v2/internal/driverutil"
 	"go.mongodb.org/mongo-driver/v2/internal/eventtest"
 	"go.mongodb.org/mongo-driver/v2/internal/require"
 	"go.mongodb.org/mongo-driver/v2/mongo/address"
@@ -1233,24 +1234,10 @@ func TestPool_maintain(t *testing.T) {
 	})
 }
 
-func TestBackgroundRead(t *testing.T) {
+func TestAwaitPendingRead(t *testing.T) {
 	t.Parallel()
 
-	newBGReadCallback := func(errsCh chan []error) func(string, time.Time, time.Time, []error, bool) {
-		return func(_ string, _, _ time.Time, errs []error, _ bool) {
-			errsCh <- errs
-			close(errsCh)
-		}
-	}
-
 	t.Run("incomplete read of message header", func(t *testing.T) {
-		errsCh := make(chan []error)
-		var originalCallback func(string, time.Time, time.Time, []error, bool)
-		originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh)
-		t.Cleanup(func() {
-			BGReadCallback = originalCallback
-		})
-
 		timeout := 10 * time.Millisecond
 
 		cleanup := make(chan struct{})
@@ -1274,24 +1261,21 @@ func TestBackgroundRead(t *testing.T) {
 
 		conn, err := p.checkOut(context.Background())
 		require.NoError(t, err)
+
 		ctx, cancel := csot.WithTimeout(context.Background(), &timeout)
 		defer cancel()
+
+		ctx = driverutil.WithValueHasMaxTimeMS(ctx, true)
+		ctx = driverutil.WithRequestID(ctx, -1)
+
 		_, err = conn.readWireMessage(ctx)
 		regex := regexp.MustCompile(
 			`^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`,
 		)
 		assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex)
-		assert.Nil(t, conn.awaitRemainingBytes, "conn.awaitRemainingBytes should be nil")
-		close(errsCh) // this line causes a double close if BGReadCallback is ever called.
+		assert.Nil(t, conn.pendingReadState, "conn.awaitRemainingBytes should be nil")
 	})
 	t.Run("timeout reading message header, successful background read", func(t *testing.T) {
-		errsCh := make(chan []error)
-		var originalCallback func(string, time.Time, time.Time, []error, bool)
-		originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh)
-		t.Cleanup(func() {
-			BGReadCallback = originalCallback
-		})
-
 		timeout := 10 * time.Millisecond
 
 		addr := bootstrapConnections(t, 1, func(nc net.Conn) {
@@ -1305,8 +1289,20 @@ func TestBackgroundRead(t *testing.T) {
 			require.NoError(t, err)
 		})
 
+		var pendingReadError error
+		monitor := &event.PoolMonitor{
+			Event: func(pe *event.PoolEvent) {
+				if pe.Type == event.ConnectionPendingReadFailed {
+					pendingReadError = pe.Error
+				}
+			},
+		}
+
 		p := newPool(
-			poolConfig{Address: address.Address(addr.String())},
+			poolConfig{
+				Address:     address.Address(addr.String()),
+				PoolMonitor: monitor,
+			},
 		)
 		defer p.close(context.Background())
 		err := p.ready()
@@ -1314,8 +1310,13 @@ func TestBackgroundRead(t *testing.T) {
 
 		conn, err := p.checkOut(context.Background())
 		require.NoError(t, err)
+
 		ctx, cancel := csot.WithTimeout(context.Background(), &timeout)
 		defer cancel()
+
+		ctx = driverutil.WithValueHasMaxTimeMS(ctx, true)
+		ctx = driverutil.WithRequestID(ctx, -1)
+
 		_, err = conn.readWireMessage(ctx)
 		regex := regexp.MustCompile(
 			`^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`,
@@ -1323,22 +1324,13 @@ func TestBackgroundRead(t *testing.T) {
 		assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex)
 		err = p.checkIn(conn)
 		require.NoError(t, err)
-		var bgErrs []error
-		select {
-		case bgErrs = <-errsCh:
-		case <-time.After(3 * time.Second):
-			assert.Fail(t, "did not receive expected error after waiting for 3 seconds")
-		}
-		require.Len(t, bgErrs, 0, "expected no error from bgRead()")
+
+		_, err = p.checkOut(context.Background())
+		require.NoError(t, err)
+
+		require.NoError(t, pendingReadError)
 	})
 	t.Run("timeout reading message header, incomplete head during background read", func(t *testing.T) {
-		errsCh := make(chan []error)
-		var originalCallback func(string, time.Time, time.Time, []error, bool)
-		originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh)
-		t.Cleanup(func() {
-			BGReadCallback = originalCallback
-		})
-
 		timeout := 10 * time.Millisecond
 
 		addr := bootstrapConnections(t, 1, func(nc net.Conn) {
@@ -1352,8 +1344,20 @@ func TestBackgroundRead(t *testing.T) {
 			require.NoError(t, err)
 		})
 
+		var pendingReadError error
+		monitor := &event.PoolMonitor{
+			Event: func(pe *event.PoolEvent) {
+				if pe.Type == event.ConnectionPendingReadFailed {
+					pendingReadError = pe.Error
+				}
+			},
+		}
+
 		p := newPool(
-			poolConfig{Address: address.Address(addr.String())},
+			poolConfig{
+				Address:     address.Address(addr.String()),
+				PoolMonitor: monitor,
+			},
 		)
 		defer p.close(context.Background())
 		err := p.ready()
@@ -1361,8 +1365,13 @@ func TestBackgroundRead(t *testing.T) {
 
 		conn, err := p.checkOut(context.Background())
 		require.NoError(t, err)
+
 		ctx, cancel := csot.WithTimeout(context.Background(), &timeout)
 		defer cancel()
+
+		ctx = driverutil.WithValueHasMaxTimeMS(ctx, true)
+		ctx = driverutil.WithRequestID(ctx, -1)
+
 		_, err = conn.readWireMessage(ctx)
 		regex := regexp.MustCompile(
 			`^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`,
@@ -1370,23 +1379,13 @@ func TestBackgroundRead(t *testing.T) {
 		assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex)
 		err = p.checkIn(conn)
 		require.NoError(t, err)
-		var bgErrs []error
-		select {
-		case bgErrs = <-errsCh:
-		case <-time.After(3 * time.Second):
-			assert.Fail(t, "did not receive expected error after waiting for 3 seconds")
-		}
-		require.Len(t, bgErrs, 1, "expected 1 error from bgRead()")
-		assert.EqualError(t, bgErrs[0], "error reading the message size: unexpected EOF")
+
+		_, err = p.checkOut(context.Background())
+		require.Error(t, err)
+
+		assert.EqualError(t, pendingReadError, "error reading the message size: unexpected EOF")
 	})
 	t.Run("timeout reading message header, background read timeout", func(t *testing.T) {
-		errsCh := make(chan []error)
-		var originalCallback func(string, time.Time, time.Time, []error, bool)
-		originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh)
-		t.Cleanup(func() {
-			BGReadCallback = originalCallback
-		})
-
 		timeout := 10 * time.Millisecond
 
 		cleanup := make(chan struct{})
@@ -1404,17 +1403,35 @@ func TestBackgroundRead(t *testing.T) {
 			require.NoError(t, err)
 		})
 
+		var pendingReadError error
+		monitor := &event.PoolMonitor{
+			Event: func(pe *event.PoolEvent) {
+				if pe.Type == event.ConnectionPendingReadFailed {
+					pendingReadError = pe.Error
+				}
+			},
+		}
+
 		p := newPool(
-			poolConfig{Address: address.Address(addr.String())},
+			poolConfig{
+				Address:     address.Address(addr.String()),
+				PoolMonitor: monitor,
+			},
 		)
+
 		defer p.close(context.Background())
 		err := p.ready()
 		require.NoError(t, err)
 
 		conn, err := p.checkOut(context.Background())
 		require.NoError(t, err)
+
 		ctx, cancel := csot.WithTimeout(context.Background(), &timeout)
 		defer cancel()
+
+		ctx = driverutil.WithValueHasMaxTimeMS(ctx, true)
+		ctx = driverutil.WithRequestID(ctx, -1)
+
 		_, err = conn.readWireMessage(ctx)
 		regex := regexp.MustCompile(
 			`^connection\(.*\[-\d+\]\) incomplete read of message header: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`,
@@ -1422,26 +1439,16 @@ func TestBackgroundRead(t *testing.T) {
 		assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex)
 		err = p.checkIn(conn)
 		require.NoError(t, err)
-		var bgErrs []error
-		select {
-		case bgErrs = <-errsCh:
-		case <-time.After(3 * time.Second):
-			assert.Fail(t, "did not receive expected error after waiting for 3 seconds")
-		}
-		require.Len(t, bgErrs, 1, "expected 1 error from bgRead()")
+
+		_, err = p.checkOut(context.Background())
+		require.Error(t, err)
+
 		wantErr := regexp.MustCompile(
 			`^error discarding 6 byte message: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`,
 		)
-		assert.True(t, wantErr.MatchString(bgErrs[0].Error()), "error %q does not match pattern %q", bgErrs[0], wantErr)
+		assert.True(t, wantErr.MatchString(pendingReadError.Error()), "error %q does not match pattern %q", pendingReadError, wantErr)
 	})
 	t.Run("timeout reading full message, successful background read", func(t *testing.T) {
-		errsCh := make(chan []error)
-		var originalCallback func(string, time.Time, time.Time, []error, bool)
-		originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh)
-		t.Cleanup(func() {
-			BGReadCallback = originalCallback
-		})
-
 		timeout := 10 * time.Millisecond
 
 		addr := bootstrapConnections(t, 1, func(nc net.Conn) {
@@ -1458,17 +1465,35 @@ func TestBackgroundRead(t *testing.T) {
 			require.NoError(t, err)
 		})
 
+		var pendingReadError error
+		monitor := &event.PoolMonitor{
+			Event: func(pe *event.PoolEvent) {
+				if pe.Type == event.ConnectionPendingReadFailed {
+					pendingReadError = pe.Error
+				}
+			},
+		}
+
 		p := newPool(
-			poolConfig{Address: address.Address(addr.String())},
+			poolConfig{
+				Address:     address.Address(addr.String()),
+				PoolMonitor: monitor,
+			},
 		)
+
 		defer p.close(context.Background())
 		err := p.ready()
 		require.NoError(t, err)
 
 		conn, err := p.checkOut(context.Background())
 		require.NoError(t, err)
+
 		ctx, cancel := csot.WithTimeout(context.Background(), &timeout)
 		defer cancel()
+
+		ctx = driverutil.WithValueHasMaxTimeMS(ctx, true)
+		ctx = driverutil.WithRequestID(ctx, -1)
+
 		_, err = conn.readWireMessage(ctx)
 		regex := regexp.MustCompile(
 			`^connection\(.*\[-\d+\]\) incomplete read of full message: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`,
@@ -1476,22 +1501,13 @@ func TestBackgroundRead(t *testing.T) {
 		assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex)
 		err = p.checkIn(conn)
 		require.NoError(t, err)
-		var bgErrs []error
-		select {
-		case bgErrs = <-errsCh:
-		case <-time.After(3 * time.Second):
-			assert.Fail(t, "did not receive expected error after waiting for 3 seconds")
-		}
-		require.Len(t, bgErrs, 0, "expected no error from bgRead()")
+
+		_, err = p.checkOut(context.Background())
+		require.NoError(t, err)
+
+		require.NoError(t, pendingReadError)
 	})
 	t.Run("timeout reading full message, background read EOF", func(t *testing.T) {
-		errsCh := make(chan []error)
-		var originalCallback func(string, time.Time, time.Time, []error, bool)
-		originalCallback, BGReadCallback = BGReadCallback, newBGReadCallback(errsCh)
-		t.Cleanup(func() {
-			BGReadCallback = originalCallback
-		})
-
 		timeout := 10 * time.Millisecond
 
 		addr := bootstrapConnections(t, 1, func(nc net.Conn) {
@@ -1508,17 +1524,35 @@ func TestBackgroundRead(t *testing.T) {
 			require.NoError(t, err)
 		})
 
+		var pendingReadError error
+		monitor := &event.PoolMonitor{
+			Event: func(pe *event.PoolEvent) {
+				if pe.Type == event.ConnectionPendingReadFailed {
+					pendingReadError = pe.Error
+				}
+			},
+		}
+
 		p := newPool(
-			poolConfig{Address: address.Address(addr.String())},
+			poolConfig{
+				Address:     address.Address(addr.String()),
+				PoolMonitor: monitor,
+			},
 		)
+
 		defer p.close(context.Background())
 		err := p.ready()
 		require.NoError(t, err)
 
 		conn, err := p.checkOut(context.Background())
 		require.NoError(t, err)
+
 		ctx, cancel := csot.WithTimeout(context.Background(), &timeout)
 		defer cancel()
+
+		ctx = driverutil.WithValueHasMaxTimeMS(ctx, true)
+		ctx = driverutil.WithRequestID(ctx, -1)
+
 		_, err = conn.readWireMessage(ctx)
 		regex := regexp.MustCompile(
 			`^connection\(.*\[-\d+\]\) incomplete read of full message: context deadline exceeded: read tcp 127.0.0.1:.*->127.0.0.1:.*: i\/o timeout$`,
@@ -1526,14 +1560,11 @@ func TestBackgroundRead(t *testing.T) {
 		assert.True(t, regex.MatchString(err.Error()), "error %q does not match pattern %q", err, regex)
 		err = p.checkIn(conn)
 		require.NoError(t, err)
-		var bgErrs []error
-		select {
-		case bgErrs = <-errsCh:
-		case <-time.After(3 * time.Second):
-			assert.Fail(t, "did not receive expected error after waiting for 3 seconds")
-		}
-		require.Len(t, bgErrs, 1, "expected 1 error from bgRead()")
-		assert.EqualError(t, bgErrs[0], "error discarding 3 byte message: EOF")
+
+		_, err = p.checkOut(context.Background())
+		require.Error(t, err)
+
+		assert.EqualError(t, pendingReadError, "error discarding 3 byte message: EOF")
 	})
 }
 

From 70cfd39c34a11d323e5a708c26084b480d1c87a5 Mon Sep 17 00:00:00 2001
From: Preston Vasquez <prestonvasquez@icloud.com>
Date: Thu, 6 Mar 2025 19:18:30 -0700
Subject: [PATCH 2/3] GODRIVER-3173 Update unified spect tests

---
 internal/integration/unified/event.go         | 15 ++++++++
 .../integration/unified/event_verification.go | 17 ++++++++-
 .../pending-reads.json                        | 35 ++++++++++++++++++-
 .../pending-reads.yml                         | 21 ++++++++---
 x/mongo/driver/topology/pool.go               |  1 +
 5 files changed, 83 insertions(+), 6 deletions(-)

diff --git a/internal/integration/unified/event.go b/internal/integration/unified/event.go
index 9e6d6d5a35..7792ab949a 100644
--- a/internal/integration/unified/event.go
+++ b/internal/integration/unified/event.go
@@ -30,6 +30,9 @@ const (
 	connectionCheckOutFailedEvent   monitoringEventType = "ConnectionCheckOutFailedEvent"
 	connectionCheckedOutEvent       monitoringEventType = "ConnectionCheckedOutEvent"
 	connectionCheckedInEvent        monitoringEventType = "ConnectionCheckedInEvent"
+	connectionPendingReadStarted    monitoringEventType = "ConnectionPendingReadStarted"
+	connectionPendingReadSucceeded  monitoringEventType = "ConnectionPendingReadSucceeded"
+	connectionPendingReadFailed     monitoringEventType = "ConnectionPendingReadFailed"
 	serverDescriptionChangedEvent   monitoringEventType = "ServerDescriptionChangedEvent"
 	serverHeartbeatFailedEvent      monitoringEventType = "ServerHeartbeatFailedEvent"
 	serverHeartbeatStartedEvent     monitoringEventType = "ServerHeartbeatStartedEvent"
@@ -67,6 +70,12 @@ func monitoringEventTypeFromString(eventStr string) (monitoringEventType, bool)
 		return connectionCheckedOutEvent, true
 	case "connectioncheckedinevent":
 		return connectionCheckedInEvent, true
+	case "connectionpendingreadstarted":
+		return connectionPendingReadStarted, true
+	case "connectionpendingreadsucceeded":
+		return connectionPendingReadSucceeded, true
+	case "connectionpendingreadfailed":
+		return connectionPendingReadFailed, true
 	case "serverdescriptionchangedevent":
 		return serverDescriptionChangedEvent, true
 	case "serverheartbeatfailedevent":
@@ -106,6 +115,12 @@ func monitoringEventTypeFromPoolEvent(evt *event.PoolEvent) monitoringEventType
 		return connectionCheckedOutEvent
 	case event.ConnectionCheckedIn:
 		return connectionCheckedInEvent
+	case event.ConnectionPendingReadStarted:
+		return connectionPendingReadStarted
+	case event.ConnectionPendingReadSucceeded:
+		return connectionPendingReadSucceeded
+	case event.ConnectionPendingReadFailed:
+		return connectionPendingReadFailed
 	default:
 		return ""
 	}
diff --git a/internal/integration/unified/event_verification.go b/internal/integration/unified/event_verification.go
index ebdb0b19c3..c488fe0465 100644
--- a/internal/integration/unified/event_verification.go
+++ b/internal/integration/unified/event_verification.go
@@ -56,7 +56,10 @@ type cmapEvent struct {
 		Reason *string `bson:"reason"`
 	} `bson:"connectionCheckOutFailedEvent"`
 
-	ConnectionCheckedInEvent *struct{} `bson:"connectionCheckedInEvent"`
+	ConnectionCheckedInEvent       *struct{} `bson:"connectionCheckedInEvent"`
+	ConnectionPendingReadStarted   *struct{} `bson:"connectionPendingReadStarted"`
+	ConnectionPendingreadSucceeded *struct{} `bson:"connectionPendingReadSucceeded"`
+	ConnectionPendingReadFailed    *struct{} `bson:"connectionPendingReadFailed"`
 
 	PoolClearedEvent *struct {
 		HasServiceID              *bool `bson:"hasServiceId"`
@@ -350,6 +353,18 @@ func verifyCMAPEvents(client *clientEntity, expectedEvents *expectedEvents) erro
 			if _, pooled, err = getNextPoolEvent(pooled, event.ConnectionCheckedIn); err != nil {
 				return newEventVerificationError(idx, client, "failed to get next pool event: %v", err.Error())
 			}
+		case evt.ConnectionPendingReadStarted != nil:
+			if _, pooled, err = getNextPoolEvent(pooled, event.ConnectionPendingReadStarted); err != nil {
+				return newEventVerificationError(idx, client, "failed to get next pool event: %v", err.Error())
+			}
+		case evt.ConnectionPendingreadSucceeded != nil:
+			if _, pooled, err = getNextPoolEvent(pooled, event.ConnectionPendingReadSucceeded); err != nil {
+				return newEventVerificationError(idx, client, "failed to get next pool event: %v", err.Error())
+			}
+		case evt.ConnectionPendingReadFailed != nil:
+			if _, pooled, err = getNextPoolEvent(pooled, event.ConnectionPendingReadFailed); err != nil {
+				return newEventVerificationError(idx, client, "failed to get next pool event: %v", err.Error())
+			}
 		case evt.PoolClearedEvent != nil:
 			var actual *event.PoolEvent
 			if actual, pooled, err = getNextPoolEvent(pooled, event.ConnectionPoolCleared); err != nil {
diff --git a/testdata/client-side-operations-timeout/pending-reads.json b/testdata/client-side-operations-timeout/pending-reads.json
index 43b2745c7c..3fd2e8b2ef 100644
--- a/testdata/client-side-operations-timeout/pending-reads.json
+++ b/testdata/client-side-operations-timeout/pending-reads.json
@@ -29,7 +29,10 @@
           "commandSucceededEvent",
           "connectionCheckedOutEvent",
           "connectionCheckedInEvent",
-          "connectionClosedEvent"
+          "connectionClosedEvent",
+          "connectionPendingReadSucceeded",
+          "connectionPendingReadStarted",
+          "connectionPendingReadFailed"
         ]
       }
     },
@@ -129,6 +132,12 @@
             {
               "connectionCheckedInEvent": {}
             },
+            {
+              "connectionPendingReadStarted": {}
+            },
+            {
+              "connectionPendingReadSucceeded": {}
+            },
             {
               "connectionCheckedOutEvent": {}
             },
@@ -261,6 +270,12 @@
             {
               "connectionCheckedInEvent": {}
             },
+            {
+              "connectionPendingReadStarted": {}
+            },
+            {
+              "connectionPendingReadSucceeded": {}
+            },
             {
               "connectionCheckedOutEvent": {}
             },
@@ -344,6 +359,12 @@
             {
               "connectionCheckedInEvent": {}
             },
+            {
+              "connectionPendingReadStarted": {}
+            },
+            {
+              "connectionPendingReadFailed": {}
+            },
             {
               "connectionClosedEvent": {
                 "reason": "error"
@@ -425,6 +446,12 @@
             {
               "connectionCheckedInEvent": {}
             },
+            {
+              "connectionPendingReadStarted": {}
+            },
+            {
+              "connectionPendingReadSucceeded": {}
+            },
             {
               "connectionCheckedOutEvent": {}
             },
@@ -506,6 +533,12 @@
             {
               "connectionCheckedInEvent": {}
             },
+            {
+              "connectionPendingReadStarted": {}
+            },
+            {
+              "connectionPendingReadFailed": {}
+            },
             {
               "connectionClosedEvent": {
                 "reason": "error"
diff --git a/testdata/client-side-operations-timeout/pending-reads.yml b/testdata/client-side-operations-timeout/pending-reads.yml
index c07728009e..fe5344292e 100644
--- a/testdata/client-side-operations-timeout/pending-reads.yml
+++ b/testdata/client-side-operations-timeout/pending-reads.yml
@@ -25,6 +25,9 @@ createEntities:
         - connectionCheckedOutEvent
         - connectionCheckedInEvent
         - connectionClosedEvent
+        - connectionPendingReadSucceeded
+        - connectionPendingReadStarted
+        - connectionPendingReadFailed
   - database:
       id: &database test
       client: *client
@@ -80,10 +83,12 @@ tests:
       - client: *client
         eventType: cmap
         events:
-          - connectionCheckedOutEvent: {} # insert
-          - connectionCheckedInEvent: {}  # insert fails
-          - connectionCheckedOutEvent: {} # find
-          - connectionCheckedInEvent: {}  # find succeeds
+          - connectionCheckedOutEvent: {}
+          - connectionCheckedInEvent: {}        # insert fails
+          - connectionPendingReadStarted: {}
+          - connectionPendingReadSucceeded: {}  # find op drains conn
+          - connectionCheckedOutEvent: {}
+          - connectionCheckedInEvent: {}        # find succeeds
 
   - description: "Concurrent write operation with successful pending read"
     operations:
@@ -167,6 +172,8 @@ tests:
         events:
           - connectionCheckedOutEvent: {} # insert
           - connectionCheckedInEvent: {}  # insert fails
+          - connectionPendingReadStarted: {}
+          - connectionPendingReadSucceeded: {}
           - connectionCheckedOutEvent: {} # find
           - connectionCheckedInEvent: {}  # find succeeds
 
@@ -215,6 +222,8 @@ tests:
         events:
           - connectionCheckedOutEvent: {} # first insert
           - connectionCheckedInEvent: {}  # first insert fails
+          - connectionPendingReadStarted: {}
+          - connectionPendingReadFailed: {}
           - connectionClosedEvent:        # second insert times out pending read in checkout, closes
               reason: error
 
@@ -260,6 +269,8 @@ tests:
         events:
           - connectionCheckedOutEvent: {} # first find
           - connectionCheckedInEvent: {}  # first find fails
+          - connectionPendingReadStarted: {}
+          - connectionPendingReadSucceeded: {}
           - connectionCheckedOutEvent: {} # second find
           - connectionCheckedInEvent: {}  # second find succeeds
 
@@ -308,5 +319,7 @@ tests:
         events:
           - connectionCheckedOutEvent: {} # first find
           - connectionCheckedInEvent: {}  # first find fails
+          - connectionPendingReadStarted: {}
+          - connectionPendingReadFailed: {}
           - connectionClosedEvent:        # second find times out pending read in checkout, closes
               reason: error
diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go
index 7a596cc77f..228773cebc 100644
--- a/x/mongo/driver/topology/pool.go
+++ b/x/mongo/driver/topology/pool.go
@@ -818,6 +818,7 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error {
 	if pool.monitor != nil {
 		event := &event.PoolEvent{
 			Type:         event.ConnectionPendingReadStarted,
+			Address:      pool.address.String(),
 			ConnectionID: conn.driverConnectionID,
 			RequestID:    prs.requestID,
 		}

From f060a090e49c4300f869877ad80a9a02c8c07191 Mon Sep 17 00:00:00 2001
From: Preston Vasquez <prestonvasquez@icloud.com>
Date: Wed, 19 Mar 2025 13:19:43 -0600
Subject: [PATCH 3/3] GODRIVER-3173 Refresh pending read every 4KB

---
 .../pending-reads.json                        |  8 +--
 .../pending-reads.yml                         |  8 +--
 x/mongo/driver/topology/pool.go               | 60 +++++++++++++++----
 3 files changed, 55 insertions(+), 21 deletions(-)

diff --git a/testdata/client-side-operations-timeout/pending-reads.json b/testdata/client-side-operations-timeout/pending-reads.json
index 3fd2e8b2ef..ef7c20dedb 100644
--- a/testdata/client-side-operations-timeout/pending-reads.json
+++ b/testdata/client-side-operations-timeout/pending-reads.json
@@ -304,7 +304,7 @@
                   "insert"
                 ],
                 "blockConnection": true,
-                "blockTimeMS": 2100
+                "blockTimeMS": 500
               }
             }
           }
@@ -327,7 +327,7 @@
           "name": "insertOne",
           "object": "coll",
           "arguments": {
-            "timeoutMS": 2000,
+            "timeoutMS": 400,
             "document": {
               "_id": 3,
               "x": 1
@@ -480,7 +480,7 @@
                   "find"
                 ],
                 "blockConnection": true,
-                "blockTimeMS": 2100
+                "blockTimeMS": 500
               }
             }
           }
@@ -502,7 +502,7 @@
           "name": "findOne",
           "object": "coll",
           "arguments": {
-            "timeoutMS": 2000,
+            "timeoutMS": 400,
             "filter": {
               "_id": 1
             }
diff --git a/testdata/client-side-operations-timeout/pending-reads.yml b/testdata/client-side-operations-timeout/pending-reads.yml
index fe5344292e..c76f6bf1e2 100644
--- a/testdata/client-side-operations-timeout/pending-reads.yml
+++ b/testdata/client-side-operations-timeout/pending-reads.yml
@@ -190,7 +190,7 @@ tests:
             data:
               failCommands: ["insert"]
               blockConnection: true
-              blockTimeMS: 2100
+              blockTimeMS: 500
 
       # Execute operation with timeout less than block time
       - name: insertOne
@@ -205,7 +205,7 @@ tests:
       - name: insertOne
         object: *collection
         arguments:
-          timeoutMS: 2000
+          timeoutMS: 400
           document: { _id: 3, x: 1 }
         expectError:
           isTimeoutError: true
@@ -287,7 +287,7 @@ tests:
             data:
               failCommands: ["find"]
               blockConnection: true
-              blockTimeMS: 2100
+              blockTimeMS: 500
 
       # Execute operation with timeout less than block time
       - name: findOne
@@ -302,7 +302,7 @@ tests:
       - name: findOne
         object: *collection
         arguments:
-          timeoutMS: 2000
+          timeoutMS: 400
           filter: { _id: 1 }
         expectError:
           isTimeoutError: true
diff --git a/x/mongo/driver/topology/pool.go b/x/mongo/driver/topology/pool.go
index 228773cebc..735f7fcede 100644
--- a/x/mongo/driver/topology/pool.go
+++ b/x/mongo/driver/topology/pool.go
@@ -783,11 +783,12 @@ func (p *pool) removeConnection(conn *connection, reason reason, err error) erro
 
 // PendingReadTimeout is the maximum amount of the to wait when trying to read
 // the server reply on a connection after an operation timed out. The
-// default is 1 second.
+// default is 400 milliseconds. This value is refreshed for every 4KB read from
+// the TCP stream.
 //
 // Deprecated: PendingReadTimeout is intended for internal use only and may be
 // removed or modified at any time.
-var PendingReadTimeout = 2000 * time.Millisecond
+var PendingReadTimeout = 400 * time.Millisecond
 
 // awaitPendingRead sets a new read deadline on the provided connection and
 // tries to read any bytes returned by the server. If there are any errors, the
@@ -926,21 +927,54 @@ func awaitPendingRead(ctx context.Context, pool *pool, conn *connection) error {
 		size -= 4
 	}
 
-	n, err := io.CopyN(io.Discard, conn.nc, int64(size))
-	if err != nil {
-		// If the read times out, record the bytes left to read before exiting.
-		nerr := net.Error(nil)
-		if l := int32(n); l == 0 && errors.As(err, &nerr) && nerr.Timeout() {
-			prs.remainingBytes = l + prs.remainingBytes
-			prs.remainingTime = ptrutil.Ptr(*prs.remainingTime - time.Since(st))
+	const bufSize = 4096
+	buf := make([]byte, bufSize)
+
+	var totalRead int64
+
+	// Iterate every 4KB of the TCP stream, refreshing the remainingTimeout for
+	// each successful read to avoid closing while streaming large (upto 16MiB)
+	// response messages.
+	for totalRead < int64(size) {
+		newDeadline := time.Now().Add(*prs.remainingTime)
+		if err := conn.nc.SetReadDeadline(newDeadline); err != nil {
+			checkIn = true
+			someErr = fmt.Errorf("error renewing read deadline: %w", err)
+
+			return someErr
 		}
 
-		checkIn = true
+		remaining := int64(size) - totalRead
 
-		err = transformNetworkError(ctx, err, contextDeadlineUsed)
-		someErr = fmt.Errorf("error discarding %d byte message: %w", size, err)
+		readSize := bufSize
+		if int64(readSize) > remaining {
+			readSize = int(remaining)
+		}
 
-		return someErr
+		n, err := conn.nc.Read(buf[:readSize])
+		if n > 0 {
+			totalRead += int64(n)
+
+			// Refresh the remaining time if we get are receiving data.
+			prs.remainingTime = ptrutil.Ptr(PendingReadTimeout)
+		}
+
+		if err != nil {
+			// If the read times out, record the bytes left to read before exiting.
+			// Reduce the remainingTime.
+			nerr := net.Error(nil)
+			if l := int32(n); l == 0 && errors.As(err, &nerr) && nerr.Timeout() {
+				prs.remainingBytes = l + prs.remainingBytes
+				prs.remainingTime = ptrutil.Ptr(*prs.remainingTime - time.Since(st))
+			}
+
+			checkIn = true
+
+			err = transformNetworkError(ctx, err, contextDeadlineUsed)
+			someErr = fmt.Errorf("error discarding %d byte message: %w", size, err)
+
+			return someErr
+		}
 	}
 
 	if mustLogPoolMessage(pool) {