Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
a19be26
Rename operationTimeout
katcharov Mar 12, 2025
489d6d9
Remove doWithResetTimeout
katcharov Mar 13, 2025
fedc764
Remove resetTimeoutIfPresent from CommandBatchCursor
katcharov Mar 14, 2025
3a81c61
Remove Timeout from RetryState
katcharov Mar 14, 2025
ad5937c
Decouple OperationContext from BindingContext.
vbabanin Jul 16, 2025
29750df
Merge branch 'main' into JAVA-5640
vbabanin Aug 10, 2025
353099b
Update logback-test.xml
vbabanin Aug 12, 2025
2b33520
Fix compilation errors.
vbabanin Aug 12, 2025
cddd29a
Merge branch 'main' into JAVA-5640
vbabanin Aug 12, 2025
c742581
Add javadoc, remove comments.
vbabanin Aug 12, 2025
ab49280
Remove redundant tests.
vbabanin Aug 13, 2025
3c6040a
Enhance Javadoc comments and improve method signatures for clarity.
vbabanin Aug 13, 2025
c21a67f
Remove comments.
vbabanin Aug 13, 2025
863293f
Rename methods.
vbabanin Aug 13, 2025
6059fc4
Add support for awaiting CommandStartedEvent in Unified Test Runner.
vbabanin Aug 15, 2025
53c339f
Revert "Add support for awaiting CommandStartedEvent in Unified Test …
vbabanin Aug 15, 2025
e245820
Merge branch 'main' into JAVA-5640
rozza Aug 18, 2025
e52aa5e
Update driver-core/src/main/com/mongodb/internal/connection/Operation…
vbabanin Sep 6, 2025
7fa7e1f
Update driver-core/src/main/com/mongodb/internal/operation/CommandRea…
vbabanin Sep 7, 2025
7ed97d8
Rename cursor interfaces.
vbabanin Sep 8, 2025
889adb9
Merge branch 'main' into JAVA-5640
vbabanin Sep 8, 2025
58a9619
Consolidate connection establishment SessionContext creation.
vbabanin Sep 17, 2025
0b6d041
Refactor ChangeStreamBatchCursor to use initialOperationContext.
vbabanin Sep 17, 2025
0e4accd
Update driver-core/src/main/com/mongodb/internal/operation/AsyncChang…
vbabanin Sep 23, 2025
3f7991e
Update driver-core/src/main/com/mongodb/internal/operation/AsyncChang…
vbabanin Sep 23, 2025
4509d9c
Update driver-core/src/main/com/mongodb/internal/operation/AsyncChang…
vbabanin Sep 23, 2025
9e3324a
Update driver-core/src/main/com/mongodb/internal/operation/ChangeStre…
vbabanin Sep 23, 2025
3d00c7e
Update driver-core/src/main/com/mongodb/internal/operation/AsyncChang…
vbabanin Sep 23, 2025
3d5df6a
Improve clarity in variable naming.
vbabanin Sep 23, 2025
392fce3
Address TODOs.
vbabanin Sep 23, 2025
67a4be0
Merge branch 'main' into JAVA-5640
vbabanin Sep 23, 2025
59dc254
Update driver-core/src/main/com/mongodb/internal/operation/AsyncChang…
rozza Sep 29, 2025
c483fb3
Merge branch 'main' into JAVA-5640
vbabanin Oct 2, 2025
56025da
Fix Atlas tests in evergreen
vbabanin Oct 2, 2025
27ff43e
Merge branch 'main' into JAVA-5640
vbabanin Oct 6, 2025
6089137
Merge branch 'main' into JAVA-5640
vbabanin Oct 13, 2025
af62b2a
Merge remote-tracking branch 'ross/JAVA-5961' into JAVA-5640
vbabanin Oct 15, 2025
6a2a68b
Merge branch 'main' into JAVA-5640
vbabanin Oct 16, 2025
5f06003
Remove unsued imports.
vbabanin Oct 17, 2025
7c55f5a
Merge branch 'main' into JAVA-5640
vbabanin Oct 18, 2025
2985aa4
Correct javadoc after merge.
vbabanin Oct 20, 2025
b8eb987
Add constant.
vbabanin Oct 21, 2025
f6f372b
Merge branch 'main' into JAVA-5640
vbabanin Oct 21, 2025
90f92f4
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Oct 21, 2025
11b6b6f
Merge branch 'main' into JAVA-5640
rozza Oct 21, 2025
e37bd46
Merge branch 'main' into JAVA-5640
vbabanin Oct 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
378 changes: 210 additions & 168 deletions driver-core/src/main/com/mongodb/internal/TimeoutContext.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
*<p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public interface SingleResultCallback<T> {
SingleResultCallback<Void> THEN_DO_NOTHING = (r, t) -> {};

/**
* Called when the function completes. This method must not complete abruptly, see {@link AsyncCallbackFunction} for more details.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.internal.async.function;

import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.lang.Nullable;

/**
* An {@linkplain AsyncCallbackFunction asynchronous callback-based function} of three parameters.
* This class is a callback-based.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*
* @param <P1> The type of the first parameter to the function.
* @param <P2> The type of the second parameter to the function.
* @param <R> See {@link AsyncCallbackFunction}
* @see AsyncCallbackFunction
*/
@FunctionalInterface
public interface AsyncCallbackTriFunction<P1, P2, P3, R> {
/**
* @param p1 The first {@code @}{@link Nullable} argument of the asynchronous function.
* @param p2 The second {@code @}{@link Nullable} argument of the asynchronous function.
* @see AsyncCallbackFunction#apply(Object, SingleResultCallback)
*/
void apply(P1 p1, P2 p2, P3 p3, SingleResultCallback<R> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
@NotThreadSafe
public final class RetryState {
public static final int RETRIES = 1;
private static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE;
public static final int INFINITE_ATTEMPTS = Integer.MAX_VALUE;

private final LoopState loopState;
private final int attempts;
Expand All @@ -67,19 +67,16 @@ public final class RetryState {
* </p>
*
* @param retries A positive number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
* @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
* @param retryUntilTimeoutThrowsException // TODO-JAVA-5640 shouldn't a timeout always stop retries?
* @see #attempts()
*/
public static RetryState withRetryableState(final int retries, final TimeoutContext timeoutContext) {
public static RetryState withRetryableState(final int retries, final boolean retryUntilTimeoutThrowsException) {
assertTrue(retries > 0);
if (timeoutContext.hasTimeoutMS()){
return new RetryState(INFINITE_ATTEMPTS, timeoutContext);
}
return new RetryState(retries, null);
return new RetryState(retries, retryUntilTimeoutThrowsException);
}

public static RetryState withNonRetryableState() {
return new RetryState(0, null);
return new RetryState(0, false);
}

/**
Expand All @@ -94,19 +91,19 @@ public static RetryState withNonRetryableState() {
* @see #attempts()
*/
public RetryState(final TimeoutContext timeoutContext) {
this(INFINITE_ATTEMPTS, timeoutContext);
this(INFINITE_ATTEMPTS, timeoutContext.hasTimeoutMS());
}

/**
* @param retries A non-negative number of allowed retries. {@link Integer#MAX_VALUE} is a special value interpreted as being unlimited.
* @param timeoutContext A timeout context that will be used to determine if the operation has timed out.
* @param retryUntilTimeoutThrowsException
* @see #attempts()
*/
private RetryState(final int retries, @Nullable final TimeoutContext timeoutContext) {
private RetryState(final int retries, final boolean retryUntilTimeoutThrowsException) {
assertTrue(retries >= 0);
loopState = new LoopState();
attempts = retries == INFINITE_ATTEMPTS ? INFINITE_ATTEMPTS : retries + 1;
this.retryUntilTimeoutThrowsException = timeoutContext != null && timeoutContext.hasTimeoutMS();
this.retryUntilTimeoutThrowsException = retryUntilTimeoutThrowsException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.mongodb.ServerAddress;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.OperationContext;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
Expand All @@ -30,7 +31,7 @@ public interface AsyncClusterAwareReadWriteBinding extends AsyncReadWriteBinding
* @param serverAddress the server address
* @param callback the to be passed the connection source
*/
void getConnectionSource(ServerAddress serverAddress, SingleResultCallback<AsyncConnectionSource> callback);
void getConnectionSource(ServerAddress serverAddress, OperationContext operationContext, SingleResultCallback<AsyncConnectionSource> callback);

@Override
AsyncClusterAwareReadWriteBinding retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.mongodb.internal.connection.AsyncConnection;
import com.mongodb.internal.connection.Cluster;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.connection.ReadConcernAwareNoOpSessionContext;
import com.mongodb.internal.connection.Server;
import com.mongodb.internal.selector.ReadPreferenceServerSelector;
import com.mongodb.internal.selector.ReadPreferenceWithFallbackServerSelector;
Expand All @@ -33,7 +34,6 @@
import com.mongodb.selector.ServerSelector;

import static com.mongodb.assertions.Assertions.notNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* A simple ReadWriteBinding implementation that supplies write connection sources bound to a possibly different primary each time, and a
Expand All @@ -44,24 +44,17 @@
public class AsyncClusterBinding extends AbstractReferenceCounted implements AsyncClusterAwareReadWriteBinding {
private final Cluster cluster;
private final ReadPreference readPreference;
private final ReadConcern readConcern;
private final OperationContext operationContext;

/**
* Creates an instance.
*
* @param cluster a non-null Cluster which will be used to select a server to bind to
* @param readPreference a non-null ReadPreference for read operations
* @param readConcern a non-null read concern
* @param operationContext the operation context
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public AsyncClusterBinding(final Cluster cluster, final ReadPreference readPreference, final ReadConcern readConcern,
final OperationContext operationContext) {
public AsyncClusterBinding(final Cluster cluster, final ReadPreference readPreference) {
this.cluster = notNull("cluster", cluster);
this.readPreference = notNull("readPreference", readPreference);
this.readConcern = notNull("readConcern", readConcern);
this.operationContext = notNull("operationContext", operationContext);
}

@Override
Expand All @@ -76,21 +69,18 @@ public ReadPreference getReadPreference() {
}

@Override
public OperationContext getOperationContext() {
return operationContext;
}

@Override
public void getReadConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new ReadPreferenceServerSelector(readPreference), callback);
public void getReadConnectionSource(final OperationContext operationContext,
final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new ReadPreferenceServerSelector(readPreference), operationContext, callback);
}

@Override
public void getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference,
final OperationContext operationContext,
final SingleResultCallback<AsyncConnectionSource> callback) {
// Assume 5.0+ for load-balanced mode
if (cluster.getSettings().getMode() == ClusterConnectionMode.LOAD_BALANCED) {
getReadConnectionSource(callback);
getReadConnectionSource(operationContext, callback);
} else {
ReadPreferenceWithFallbackServerSelector readPreferenceWithFallbackServerSelector
= new ReadPreferenceWithFallbackServerSelector(readPreference, minWireVersion, fallbackReadPreference);
Expand All @@ -106,16 +96,19 @@ public void getReadConnectionSource(final int minWireVersion, final ReadPreferen
}

@Override
public void getWriteConnectionSource(final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new WritableServerSelector(), callback);
public void getWriteConnectionSource(final OperationContext operationContext,
final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new WritableServerSelector(), operationContext, callback);
}

@Override
public void getConnectionSource(final ServerAddress serverAddress, final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new ServerAddressSelector(serverAddress), callback);
public void getConnectionSource(final ServerAddress serverAddress, final OperationContext operationContext,
final SingleResultCallback<AsyncConnectionSource> callback) {
getAsyncClusterBindingConnectionSource(new ServerAddressSelector(serverAddress), operationContext, callback);
}

private void getAsyncClusterBindingConnectionSource(final ServerSelector serverSelector,
final OperationContext operationContext,
final SingleResultCallback<AsyncConnectionSource> callback) {
cluster.selectServerAsync(serverSelector, operationContext, (result, t) -> {
if (t != null) {
Expand All @@ -132,12 +125,12 @@ private final class AsyncClusterBindingConnectionSource extends AbstractReferenc
private final ServerDescription serverDescription;
private final ReadPreference appliedReadPreference;

private AsyncClusterBindingConnectionSource(final Server server, final ServerDescription serverDescription,
final ReadPreference appliedReadPreference) {
private AsyncClusterBindingConnectionSource(final Server server,
final ServerDescription serverDescription,
final ReadPreference appliedReadPreference) {
this.server = server;
this.serverDescription = serverDescription;
this.appliedReadPreference = appliedReadPreference;
operationContext.getTimeoutContext().minRoundTripTimeMS(NANOSECONDS.toMillis(serverDescription.getMinRoundTripTimeNanos()));
AsyncClusterBinding.this.retain();
}

Expand All @@ -146,19 +139,18 @@ public ServerDescription getServerDescription() {
return serverDescription;
}

@Override
public OperationContext getOperationContext() {
return operationContext;
}

@Override
public ReadPreference getReadPreference() {
return appliedReadPreference;
}

@Override
public void getConnection(final SingleResultCallback<AsyncConnection> callback) {
server.getConnectionAsync(operationContext, callback);
public void getConnection(final OperationContext operationContext, final SingleResultCallback<AsyncConnection> callback) {
// The first read in a causally consistent session MUST not send afterClusterTime to the server
// (because the operationTime has not yet been determined). Therefore, we use ReadConcernAwareNoOpSessionContext to
// so that we do not advance clusterTime on ClientSession in given operationContext because it might not be yet set.
ReadConcern readConcern = operationContext.getSessionContext().getReadConcern();
server.getConnectionAsync(operationContext.withSessionContext(new ReadConcernAwareNoOpSessionContext(readConcern)), callback);
}

public AsyncConnectionSource retain() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.AsyncConnection;
import com.mongodb.internal.connection.OperationContext;

/**
* A source of connections to a single MongoDB server.
Expand All @@ -42,12 +43,7 @@ public interface AsyncConnectionSource extends BindingContext, ReferenceCounted
*/
ReadPreference getReadPreference();

/**
* Gets a connection from this source.
*
* @param callback the to be passed the connection
*/
void getConnection(SingleResultCallback<AsyncConnection> callback);
void getConnection(OperationContext operationContext, SingleResultCallback<AsyncConnection> callback);

@Override
AsyncConnectionSource retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.mongodb.ReadPreference;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.OperationContext;

/**
* An asynchronous factory of connection sources to servers that can be read from and that satisfy the specified read preference.
Expand All @@ -35,7 +36,7 @@ public interface AsyncReadBinding extends BindingContext, ReferenceCounted {
* Returns a connection source to a server that satisfies the read preference with which this instance is configured.
* @param callback the to be passed the connection source
*/
void getReadConnectionSource(SingleResultCallback<AsyncConnectionSource> callback);
void getReadConnectionSource(OperationContext operationContext, SingleResultCallback<AsyncConnectionSource> callback);

/**
* Return a connection source that satisfies the read preference with which this instance is configured, if all connected servers have
Expand All @@ -48,6 +49,7 @@ public interface AsyncReadBinding extends BindingContext, ReferenceCounted {
* @see com.mongodb.internal.operation.AggregateToCollectionOperation
*/
void getReadConnectionSource(int minWireVersion, ReadPreference fallbackReadPreference,
OperationContext operationContext,
SingleResultCallback<AsyncConnectionSource> callback);

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal.binding;

import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.OperationContext;

/**
* An asynchronous factory of connection sources to servers that can be written to, e.g, a standalone, a mongos, or a replica set primary.
Expand All @@ -30,7 +31,7 @@ public interface AsyncWriteBinding extends BindingContext, ReferenceCounted {
*
* @param callback the to be passed the connection source
*/
void getWriteConnectionSource(SingleResultCallback<AsyncConnectionSource> callback);
void getWriteConnectionSource(OperationContext operationContext, SingleResultCallback<AsyncConnectionSource> callback);

@Override
AsyncWriteBinding retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package com.mongodb.internal.binding;

import com.mongodb.internal.connection.OperationContext;


/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
Expand All @@ -29,5 +26,5 @@ public interface BindingContext {
*
* @return the operation context for the binding context.
*/
OperationContext getOperationContext();
// OperationContext getOperationContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.mongodb.internal.binding;

import com.mongodb.ServerAddress;
import com.mongodb.internal.connection.OperationContext;

/**
* This interface is not part of the public API and may be removed or changed at any time.
Expand All @@ -27,5 +28,5 @@ public interface ClusterAwareReadWriteBinding extends ReadWriteBinding {
* Returns a connection source to the specified server address.
* @return the connection source
*/
ConnectionSource getConnectionSource(ServerAddress serverAddress);
ConnectionSource getConnectionSource(ServerAddress serverAddress, OperationContext operationContext);
}
Loading