-
Notifications
You must be signed in to change notification settings - Fork 178
[MCP Server] Support Streamable HTTP and deprecate SSE in MCP server #4162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[MCP Server] Support Streamable HTTP and deprecate SSE in MCP server #4162
Conversation
plugin/src/main/java/org/opensearch/ml/plugin/MachineLearningPlugin.java
Outdated
Show resolved
Hide resolved
The failing ITs are because of usage of deprecated Bedrock models.
Will raise another PR to fix |
import lombok.extern.log4j.Log4j2; | ||
|
||
@Log4j2 | ||
public class RestMcpStatelessStreamingAction extends BaseRestHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does stateless mean? Do we also have or plan to build a stateful action?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MCP protocol supports streamable HTTP in a stateful manner as well, but that would require streaming to be enabled.
While we do not have any plans yet, by naming this endpoint stateless we are keeping the door open for potential future use cases. Also it makes it clear to readers what underlying methodology is being used keeping it consistent with MCP JAVA SDK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For future extension , should we create a new API or extend current one ? If need to extend current one, the sateless name seems not reasonable.
From https://modelcontextprotocol.io/specification/2025-03-26/basic/transports
The server MUST provide a single HTTP endpoint path (hereafter referred to as the MCP endpoint) that supports both POST and GET methods. For example, this could be a URL like https://example.com/mcp.
Does that mean we should use one HTTP endpoint for both stateless and stateful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed the Rest and Transport Actions to remove "stateless"
return channel -> { | ||
try { | ||
if (request.content() == null) { | ||
sendErrorResponse(channel, null, -32700, "Parse error: empty body"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this magic number mean: -32700
? Define these json RPC error code as constants ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are defined by the JSON RPC spec which is used by MCP Protocol. These numbers indicate different errors to the client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used constants for JSON-RPC error codes
.async(mcpStatelessServerTransportProvider) | ||
.serverInfo("OpenSearch-MCP-Stateless-Server", "0.1.0") | ||
.capabilities(serverCapabilities) | ||
.instructions("OpenSearch MCP Stateless Server - provides access to ML tools without sessions") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OpenSearch MCP Stateless Server
this name looks confusing, do we have another type of stateful server ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description here does not affect the functionality. But I believe it would not hurt to have an explicit mention of the server being stateless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can the client side see/get the instructions and serverInfo ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If they can see these information, let's design carefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove the stateless
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the server desc to remove "stateless"
OpenSearchMcpStatelessServerTransportProvider transportProvider = McpStatelessServerHolder | ||
.getMcpStatelessServerTransportProvider(); | ||
if (transportProvider == null || !transportProvider.isHandlerReady()) { | ||
log.error("MCP transport provider not ready - server may not be properly initialized"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this an error ? Change to warn level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is an error and we are throwing an exception in the next lines of the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see McpStatelessServerHolder.java line 72 using warn level log. Let's keep consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the warning logging since we do not need to check isHandlerReady anymore
// Requests: capture id for any downstream error mapping | ||
final Object id = (msg instanceof McpSchema.JSONRPCRequest) ? ((McpSchema.JSONRPCRequest) msg).id() : null; | ||
|
||
transportProvider.handleRequest(requestBody).subscribe(response -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 86 already called deserializeJsonRpcMessage
, why not pass msg
to handleRequest
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I will refactor the method to accept the McpSchema.JSONRPCMessage instead of the whole requestBody
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a transport action to control permission on API level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a transport Action
Signed-off-by: rithin-pullela-aws <[email protected]>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4162 +/- ##
============================================
+ Coverage 82.02% 82.17% +0.15%
+ Complexity 9195 9186 -9
============================================
Files 789 788 -1
Lines 39541 39437 -104
Branches 4387 4400 +13
============================================
- Hits 32432 32407 -25
+ Misses 5219 5139 -80
- Partials 1890 1891 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: Rithin Pullela <[email protected]>
Signed-off-by: rithin-pullela-aws <[email protected]>
.prompts(false) // We don't support prompts | ||
.build(); | ||
|
||
log.info("Building MCP server without pre-loaded tools (dynamic loading)..."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "pre-loaded tools" mean?
error -> log.error("Initial tool loading failed", error) | ||
) | ||
); | ||
initialized = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should move this line inside autoLoadAllMcpTools
success branch ? If load MCP tools failed, we should not set initialized
as true
right?
.wrap(r -> { log.debug("Auto reload mcp tools schedule job run successfully!"); }, e -> { | ||
log.error(e.getMessage(), e); | ||
}); | ||
threadPool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use dedicated thread pool ?
Refer to MLExecuteTaskRunner
threadPool.executor(EXECUTE_THREAD_POOL)
toolNames.forEach(toolName -> queryBuilder.should(QueryBuilders.matchQuery("name", toolName))); | ||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); | ||
searchSourceBuilder.query(queryBuilder); | ||
searchRequest.source(searchSourceBuilder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add size ?
searchRequest.source().size(MAX_TOOL_NUMBER);
McpToolRegisterInput mcpTool = parseMcpTool(x.getSourceAsString()); | ||
mcpTools.add(mcpTool); | ||
} catch (IOException e) { | ||
listener.onFailure(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If have multiple MCP tools parsing failed, will call this listener.onFailure(e);
multiple times. Is that ok ? I remember that will trigger some exception. Have you done some test?
Should we use List to track all exceptions, then send back all exceptions with listener.onFailure(...)
?
|
||
public void searchToolsWithPrimaryTermAndSeqNo(List<String> toolNames, ActionListener<SearchResponse> listener) { | ||
SearchRequest searchRequest = buildSearchRequest(toolNames); | ||
searchRequest.source().seqNoAndPrimaryTerm(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add size ?
searchRequest.source().size(MAX_TOOL_NUMBER);
.getMcpAsyncServerInstance() | ||
// Use putIfAbsent to make check-and-act atomic | ||
Long previousVersion = McpStatelessServerHolder.IN_MEMORY_MCP_TOOLS.putIfAbsent(tool.getName(), tool.getVersion()); | ||
if (previousVersion == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if previousVersion != null
? I think we should delete the old version and add the new version, right?
The current design will only start one MCP server inside OpenSearch cluster with one custom set of tools. That sounds not flexible if multiple users want to have different MCP servers with different sets of tools. No need to change the design in this PR. But some draft idea for supporting multiple MCP servers: Register MCP server
Sample response
Initialize connection
|
Description
This PR deprecates SSE server and now provides the Streamable HTTP server.
The new endpoint:
http://localhost:9200/_plugins/_ml/mcp/stream
0.12.1
versionNOTE: The MCP Client (connectors) with the new protocol is not implemented yet. Will do it in a different PR to not clutter the current code.
Testing:
Enable MCP Server:
Register a Tool:
Mock Client behavior via Rest Calls:
1. Initialize Connection:
Expected response:
2. Initialization Complete Notification
Expected response:
3. List Available Tools
Expected response:
4. Call a Tool
Expected Response:
Other ways of Testing:
You can also test with various MCP clients
1. Fast MCP client:
2. PostMan MCP client:

In the latest versions of postman we can directly test an MCP server by creating an MCP kind of request
Related Issues
Resolves #3813 since we don't need the extra variable in streamable_http protocol.
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.