Skip to content

chore: Add AsyncCloseable interface. #95

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,6 @@ public Mono<Void> closeGracefully() {
return Mono.fromRunnable(sink::complete);
}

@Override
public void close() {
sink.complete();
}

}

public static Builder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void testCreateMessageWithoutSamplingCapabilities(String clientType) {
.hasMessage("Client must be configured with sampling capabilities");
}
}
server.close();
server.closeGracefully().block();
}

@ParameterizedTest(name = "{0} : {displayName} ")
Expand Down Expand Up @@ -189,7 +189,7 @@ void testCreateMessageSuccess(String clientType) {
assertThat(result.stopReason()).isEqualTo(CreateMessageResult.StopReason.STOP_SEQUENCE);
});
}
mcpServer.closeGracefully().block();
mcpServer.closeGracefully();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't close the server since the Mono is not subscribed to. As this is an async server it is required to subscribe. I think the same issue has been introduced in multiple other places.

  1. Please review all the code changes for this error.
  2. Please do not force-push as it makes reviewing difficult.

Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We should always return the same instance.
  2. We should do the close/subscribe when invoking.

But , these are not consistent across the codebase.

In akka/pekko, there is a whenTerminate, which will return the same instance too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt about this @chemicL , I'm a little busy at work, but I can update this pr with your input.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure - is this in reference to my comment above? I only wanted to indicate that the mcpServer instance is an McpAsyncServer, which means closeGracefully() returns a Mono, which needs to be subscribed to in order to trigger closing, otherwise nothing happens.

That comment does not touch upon returning the same instance of Mono, just the fact that it won't close anything.

For returning the same instance - I am not sure whether we need that - do you think we do? I imagine in most cases the double closing should be idempotent so perhaps there is no need. If we do have issues with double closing, let's open a new issue to address this and think of solutions, WDYT?

}

@ParameterizedTest(name = "{0} : {displayName} ")
Expand Down Expand Up @@ -267,7 +267,7 @@ void testCreateMessageWithRequestTimeoutSuccess(String clientType) throws Interr
});
}

mcpServer.closeGracefully().block();
mcpServer.closeGracefully();
}

@ParameterizedTest(name = "{0} : {displayName} ")
Expand Down Expand Up @@ -326,7 +326,7 @@ void testCreateMessageWithRequestTimeoutFail(String clientType) throws Interrupt

}

mcpServer.closeGracefully().block();
mcpServer.closeGracefully();
}

// ---------------------------------------
Expand Down Expand Up @@ -376,7 +376,7 @@ void testRootsSuccess(String clientType) {
});
}

mcpServer.close();
mcpServer.closeGracefully();
}

@ParameterizedTest(name = "{0} : {displayName} ")
Expand Down Expand Up @@ -410,7 +410,7 @@ void testRootsWithoutCapability(String clientType) {
}
}

mcpServer.close();
mcpServer.closeGracefully();
}

@ParameterizedTest(name = "{0} : {displayName} ")
Expand All @@ -437,7 +437,7 @@ void testRootsNotificationWithEmptyRootsList(String clientType) {
});
}

mcpServer.close();
mcpServer.closeGracefully();
}

@ParameterizedTest(name = "{0} : {displayName} ")
Expand Down Expand Up @@ -471,7 +471,7 @@ void testRootsWithMultipleHandlers(String clientType) {
});
}

mcpServer.close();
mcpServer.closeGracefully();
}

@ParameterizedTest(name = "{0} : {displayName} ")
Expand Down Expand Up @@ -502,7 +502,7 @@ void testRootsServerCloseWithActiveSubscription(String clientType) {
});
}

mcpServer.close();
mcpServer.closeGracefully();
}

// ---------------------------------------
Expand Down Expand Up @@ -554,7 +554,7 @@ void testToolCallSuccess(String clientType) {
assertThat(response).isEqualTo(callResponse);
}

mcpServer.close();
mcpServer.closeGracefully();
}

@ParameterizedTest(name = "{0} : {displayName} ")
Expand Down Expand Up @@ -626,7 +626,7 @@ void testToolListChangeHandlingSuccess(String clientType) {
});
}

mcpServer.close();
mcpServer.closeGracefully();
}

@ParameterizedTest(name = "{0} : {displayName} ")
Expand All @@ -642,7 +642,7 @@ void testInitialize(String clientType) {
assertThat(initResult).isNotNull();
}

mcpServer.close();
mcpServer.closeGracefully();
}

// ---------------------------------------
Expand Down Expand Up @@ -750,7 +750,7 @@ void testLoggingNotification(String clientType) {
assertThat(notificationMap.get("Another error message").data()).isEqualTo("Another error message");
});
}
mcpServer.close();
mcpServer.closeGracefully();
}

// ---------------------------------------
Expand Down Expand Up @@ -799,7 +799,7 @@ void testCompletionShouldReturnExpectedSuggestions(String clientType) {
assertThat(samplingRequest.get().ref().type()).isEqualTo("ref/prompt");
}

mcpServer.close();
mcpServer.closeGracefully();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -401,20 +401,6 @@ public Mono<Void> closeGracefully() {
});
}

/**
* Closes the transport immediately.
*/
@Override
public void close() {
try {
sseBuilder.complete();
logger.debug("Successfully completed SSE builder for session {}", sessionId);
}
catch (Exception e) {
logger.warn("Failed to complete SSE builder for session {}: {}", sessionId, e.getMessage());
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void testCreateMessageWithoutSamplingCapabilities() {
.hasMessage("Client must be configured with sampling capabilities");
}
}
server.close();
server.closeGracefully().block();
}

@Test
Expand Down Expand Up @@ -211,7 +211,7 @@ void testCreateMessageSuccess() {

assertThat(response).isNotNull().isEqualTo(callResponse);
}
mcpServer.close();
mcpServer.closeGracefully();
}

@Test
Expand Down Expand Up @@ -283,7 +283,7 @@ void testCreateMessageWithRequestTimeoutSuccess() throws InterruptedException {
assertThat(response).isEqualTo(callResponse);

mcpClient.close();
mcpServer.close();
mcpServer.closeGracefully();
}

@Test
Expand Down Expand Up @@ -354,7 +354,7 @@ void testCreateMessageWithRequestTimeoutFail() throws InterruptedException {
}).withMessageContaining("Timeout");

mcpClient.close();
mcpServer.close();
mcpServer.closeGracefully();
}

// ---------------------------------------
Expand Down Expand Up @@ -401,7 +401,7 @@ void testRootsSuccess() {
});
}

mcpServer.close();
mcpServer.closeGracefully();
}

@Test
Expand Down Expand Up @@ -434,7 +434,7 @@ void testRootsWithoutCapability() {
}
}

mcpServer.close();
mcpServer.closeGracefully();
}

@Test
Expand All @@ -459,7 +459,7 @@ void testRootsNotificationWithEmptyRootsList() {
});
}

mcpServer.close();
mcpServer.closeGracefully();
}

@Test
Expand Down Expand Up @@ -488,7 +488,7 @@ void testRootsWithMultipleHandlers() {
});
}

mcpServer.close();
mcpServer.closeGracefully();
}

@Test
Expand All @@ -515,7 +515,7 @@ void testRootsServerCloseWithActiveSubscription() {
});
}

mcpServer.close();
mcpServer.closeGracefully();
}

// ---------------------------------------
Expand Down Expand Up @@ -563,7 +563,7 @@ void testToolCallSuccess() {
assertThat(response).isNotNull().isEqualTo(callResponse);
}

mcpServer.close();
mcpServer.closeGracefully();
}

@Test
Expand Down Expand Up @@ -632,7 +632,7 @@ void testToolListChangeHandlingSuccess() {
});
}

mcpServer.close();
mcpServer.closeGracefully();
}

@Test
Expand All @@ -646,7 +646,7 @@ void testInitialize() {
assertThat(initResult).isNotNull();
}

mcpServer.close();
mcpServer.closeGracefully();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ void testGracefulShutdown() {
void testImmediateClose() {
var mcpAsyncServer = McpServer.async(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();

assertThatCode(() -> mcpAsyncServer.close()).doesNotThrowAnyException();
assertThatCode(() -> mcpAsyncServer.closeGracefully().block()).doesNotThrowAnyException();
}

// ---------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ void testGracefulShutdown() {
void testImmediateClose() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();

assertThatCode(() -> mcpSyncServer.close()).doesNotThrowAnyException();
assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,6 @@ public McpSchema.Implementation getClientInfo() {
return this.clientInfo;
}

/**
* Closes the client connection immediately.
*/
public void close() {
this.mcpSession.close();
}

/**
* Gracefully closes the client connection.
* @return A Mono that completes when the connection is closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public McpSchema.Implementation getClientInfo() {

@Override
public void close() {
this.delegate.close();
this.closeGracefully();
}

public boolean closeGracefully() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,6 @@ public Mono<Void> closeGracefully() {
return this.delegate.closeGracefully();
}

/**
* Close the server immediately.
*/
public void close() {
this.delegate.close();
}

// ---------------------------------------
// Tool Management
// ---------------------------------------
Expand Down Expand Up @@ -390,11 +383,6 @@ public Mono<Void> closeGracefully() {
return this.mcpTransportProvider.closeGracefully();
}

@Override
public void close() {
this.mcpTransportProvider.close();
}

private McpServerSession.NotificationHandler asyncRootsListChangedNotificationHandler(
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers) {
return (exchange, params) -> exchange.listRoots()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,6 @@ public void closeGracefully() {
this.asyncServer.closeGracefully().block();
}

/**
* Close the server immediately.
*/
public void close() {
this.asyncServer.close();
}

/**
* Get the underlying async server instance.
* @return The wrapped async server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -433,21 +433,6 @@ public Mono<Void> closeGracefully() {
});
}

/**
* Closes the transport immediately.
*/
@Override
public void close() {
try {
sessions.remove(sessionId);
asyncContext.complete();
logger.debug("Successfully completed async context for session {}", sessionId);
}
catch (Exception e) {
logger.warn("Failed to complete async context for session {}: {}", sessionId, e.getMessage());
}
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,6 @@ public Mono<Void> closeGracefully() {
});
}

@Override
public void close() {
isClosing.set(true);
logger.debug("Session transport closed");
}

private void initProcessing() {
handleIncomingMessages();
startInboundProcessing();
Expand Down Expand Up @@ -239,7 +233,7 @@ private void startInboundProcessing() {
finally {
isClosing.set(true);
if (session != null) {
session.close();
session.closeGracefully().block();
}
inboundSink.tryEmitComplete();
}
Expand Down
Loading