Skip to content

Commit 1864ce8

Browse files
authored
feat: add Streamable HTTP Support to MCP Server (#76)
* feat: add streamable http support to mcp server * chore: update readme * docs: update readme * fix: typo * chore: update sse support flag * fix: test on windows * feat: strengthen protocol compliance * chore: add tracing for client disconnect * fix: stream lifecycle and improve shutdown * fix: build issue * chore: update readme * chore: update dependencies
1 parent c0f4bd5 commit 1864ce8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+4254
-646
lines changed

Cargo.lock

Lines changed: 114 additions & 84 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,26 @@ By default, it uses the **2025-06-18** version, but earlier versions can be enab
2323

2424

2525

26-
This project currently supports following transports:
27-
- **stdio** (Standard Input/Output)
28-
- **sse** (Server-Sent Events).
29-
26+
This project supports following transports:
27+
- **Stdio** (Standard Input/Output)
28+
- **SSE** (Server-Sent Events).
29+
- **Streamable HTTP**.
3030

3131

3232
🚀 The **rust-mcp-sdk** includes a lightweight [Axum](https://github.com/tokio-rs/axum) based server that handles all core functionality seamlessly. Switching between `stdio` and `sse` is straightforward, requiring minimal code changes. The server is designed to efficiently handle multiple concurrent client connections and offers built-in support for SSL.
3333

34-
**⚠️** **Streamable HTTP** transport and authentication still in progress and not yet available. Project is currently under development and should be used at your own risk.
34+
35+
36+
**MCP Streamable HTTP Support**
37+
- [x] Streamable HTTP Support for MCP Servers
38+
- [x] DNS Rebinding Protection
39+
- [x] Batch Messages
40+
- [x] Streaming & non-streaming JSON responses
41+
- [ ] Streamable HTTP Support for MCP Clients
42+
- [ ] Resumability
43+
- [ ] Authentication / OAuth
44+
45+
**⚠️** Project is currently under development and should be used at your own risk.
3546

3647
## Table of Contents
3748
- [Usage Examples](#usage-examples)

crates/rust-mcp-macros/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ syn = "2.0"
2121
quote = "1.0"
2222
proc-macro2 = "1.0"
2323

24+
2425
[dev-dependencies]
2526
rust-mcp-schema = { workspace = true, default-features = false }
2627

crates/rust-mcp-sdk/README.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,26 @@ By default, it uses the **2025-06-18** version, but earlier versions can be enab
2323

2424

2525

26-
This project currently supports following transports:
27-
- **stdio** (Standard Input/Output)
28-
- **sse** (Server-Sent Events).
29-
26+
This project supports following transports:
27+
- **Stdio** (Standard Input/Output)
28+
- **SSE** (Server-Sent Events).
29+
- **Streamable HTTP**.
3030

3131

3232
🚀 The **rust-mcp-sdk** includes a lightweight [Axum](https://github.com/tokio-rs/axum) based server that handles all core functionality seamlessly. Switching between `stdio` and `sse` is straightforward, requiring minimal code changes. The server is designed to efficiently handle multiple concurrent client connections and offers built-in support for SSL.
3333

34-
**⚠️** **Streamable HTTP** transport and authentication still in progress and not yet available. Project is currently under development and should be used at your own risk.
34+
35+
36+
**MCP Streamable HTTP Support**
37+
- [x] Streamable HTTP Support for MCP Servers
38+
- [x] DNS Rebinding Protection
39+
- [x] Batch Messages
40+
- [x] Streaming & non-streaming JSON responses
41+
- [ ] Streamable HTTP Support for MCP Clients
42+
- [ ] Resumability
43+
- [ ] Authentication / OAuth
44+
45+
**⚠️** Project is currently under development and should be used at your own risk.
3546

3647
## Table of Contents
3748
- [Usage Examples](#usage-examples)

crates/rust-mcp-sdk/src/error.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
use crate::schema::RpcError;
1+
use crate::schema::{ParseProtocolVersionError, RpcError};
2+
23
use rust_mcp_transport::error::TransportError;
34
use thiserror::Error;
5+
use tokio::task::JoinError;
46

57
#[cfg(feature = "hyper-server")]
68
use crate::hyper_servers::error::TransportServerError;
@@ -16,14 +18,18 @@ pub enum McpSdkError {
1618
#[error("{0}")]
1719
TransportError(#[from] TransportError),
1820
#[error("{0}")]
21+
JoinError(#[from] JoinError),
22+
#[error("{0}")]
1923
AnyError(Box<(dyn std::error::Error + Send + Sync)>),
2024
#[error("{0}")]
2125
SdkError(#[from] crate::schema::schema_utils::SdkError),
2226
#[cfg(feature = "hyper-server")]
2327
#[error("{0}")]
2428
TransportServerError(#[from] TransportServerError),
25-
#[error("Incompatible mcp protocol version: client:{0} server:{1}")]
29+
#[error("Incompatible mcp protocol version: requested:{0} current:{1}")]
2630
IncompatibleProtocolVersion(String, String),
31+
#[error("{0}")]
32+
ParseProtocolVersionError(#[from] ParseProtocolVersionError),
2733
}
2834

2935
impl McpSdkError {

crates/rust-mcp-sdk/src/hyper_servers.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod app_state;
22
pub mod error;
3+
pub mod hyper_runtime;
34
pub mod hyper_server;
45
pub mod hyper_server_core;
56
mod middlewares;

crates/rust-mcp-sdk/src/hyper_servers/app_state.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,23 @@ pub struct AppState {
1919
pub handler: Arc<dyn McpServerHandler>,
2020
pub ping_interval: Duration,
2121
pub sse_message_endpoint: String,
22+
pub http_streamable_endpoint: String,
2223
pub transport_options: Arc<TransportOptions>,
24+
pub enable_json_response: bool,
25+
/// List of allowed host header values for DNS rebinding protection.
26+
/// If not specified, host validation is disabled.
27+
pub allowed_hosts: Option<Vec<String>>,
28+
/// List of allowed origin header values for DNS rebinding protection.
29+
/// If not specified, origin validation is disabled.
30+
pub allowed_origins: Option<Vec<String>>,
31+
/// Enable DNS rebinding protection (requires allowedHosts and/or allowedOrigins to be configured).
32+
/// Default is false for backwards compatibility.
33+
pub dns_rebinding_protection: bool,
34+
}
35+
36+
impl AppState {
37+
pub fn needs_dns_protection(&self) -> bool {
38+
self.dns_rebinding_protection
39+
&& (self.allowed_hosts.is_some() || self.allowed_origins.is_some())
40+
}
2341
}

crates/rust-mcp-sdk/src/hyper_servers/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ pub enum TransportServerError {
2121
InvalidServerOptions(String),
2222
#[error("{0}")]
2323
SslCertError(String),
24+
#[error("{0}")]
25+
TransportError(String),
2426
}
2527

2628
impl IntoResponse for TransportServerError {
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
use std::{sync::Arc, time::Duration};
2+
3+
use crate::{
4+
mcp_server::HyperServer,
5+
schema::{
6+
schema_utils::{NotificationFromServer, RequestFromServer, ResultFromClient},
7+
CreateMessageRequestParams, CreateMessageResult, LoggingMessageNotificationParams,
8+
PromptListChangedNotificationParams, ResourceListChangedNotificationParams,
9+
ResourceUpdatedNotificationParams, ToolListChangedNotificationParams,
10+
},
11+
McpServer,
12+
};
13+
14+
use axum_server::Handle;
15+
use rust_mcp_transport::SessionId;
16+
use tokio::{sync::Mutex, task::JoinHandle};
17+
18+
use crate::{
19+
error::SdkResult,
20+
hyper_servers::app_state::AppState,
21+
mcp_server::{
22+
error::{TransportServerError, TransportServerResult},
23+
ServerRuntime,
24+
},
25+
};
26+
27+
pub struct HyperRuntime {
28+
pub(crate) state: Arc<AppState>,
29+
pub(crate) server_task: JoinHandle<Result<(), TransportServerError>>,
30+
pub(crate) server_handle: Handle,
31+
}
32+
33+
impl HyperRuntime {
34+
pub async fn create(server: HyperServer) -> SdkResult<Self> {
35+
let addr = server.options.resolve_server_address().await?;
36+
let state = server.state();
37+
38+
let server_handle = server.server_handle();
39+
40+
let server_task = tokio::spawn(async move {
41+
#[cfg(feature = "ssl")]
42+
if server.options.enable_ssl {
43+
server.start_ssl(addr).await
44+
} else {
45+
server.start_http(addr).await
46+
}
47+
48+
#[cfg(not(feature = "ssl"))]
49+
if server.options.enable_ssl {
50+
panic!("SSL requested but the 'ssl' feature is not enabled");
51+
} else {
52+
server.start_http(addr).await
53+
}
54+
});
55+
56+
Ok(Self {
57+
state,
58+
server_task,
59+
server_handle,
60+
})
61+
}
62+
63+
pub fn graceful_shutdown(&self, timeout: Option<Duration>) {
64+
self.server_handle.graceful_shutdown(timeout);
65+
}
66+
67+
pub async fn await_server(self) -> SdkResult<()> {
68+
let result = self.server_task.await?;
69+
result.map_err(|err| err.into())
70+
}
71+
72+
pub async fn runtime_by_session(
73+
&self,
74+
session_id: &SessionId,
75+
) -> TransportServerResult<Arc<Mutex<Arc<ServerRuntime>>>> {
76+
self.state.session_store.get(session_id).await.ok_or(
77+
TransportServerError::SessionIdInvalid(session_id.to_string()),
78+
)
79+
}
80+
81+
pub async fn send_request(
82+
&self,
83+
session_id: &SessionId,
84+
request: RequestFromServer,
85+
timeout: Option<Duration>,
86+
) -> SdkResult<ResultFromClient> {
87+
let runtime = self.runtime_by_session(session_id).await?;
88+
let runtime = runtime.lock().await.to_owned();
89+
runtime.request(request, timeout).await
90+
}
91+
92+
pub async fn send_notification(
93+
&self,
94+
session_id: &SessionId,
95+
notification: NotificationFromServer,
96+
) -> SdkResult<()> {
97+
let runtime = self.runtime_by_session(session_id).await?;
98+
let runtime = runtime.lock().await.to_owned();
99+
runtime.send_notification(notification).await
100+
}
101+
102+
pub async fn send_logging_message(
103+
&self,
104+
session_id: &SessionId,
105+
params: LoggingMessageNotificationParams,
106+
) -> SdkResult<()> {
107+
let runtime = self.runtime_by_session(session_id).await?;
108+
let runtime = runtime.lock().await.to_owned();
109+
runtime.send_logging_message(params).await
110+
}
111+
112+
/// An optional notification from the server to the client, informing it that
113+
/// the list of prompts it offers has changed.
114+
/// This may be issued by servers without any previous subscription from the client.
115+
pub async fn send_prompt_list_changed(
116+
&self,
117+
session_id: &SessionId,
118+
params: Option<PromptListChangedNotificationParams>,
119+
) -> SdkResult<()> {
120+
let runtime = self.runtime_by_session(session_id).await?;
121+
let runtime = runtime.lock().await.to_owned();
122+
runtime.send_prompt_list_changed(params).await
123+
}
124+
125+
/// An optional notification from the server to the client,
126+
/// informing it that the list of resources it can read from has changed.
127+
/// This may be issued by servers without any previous subscription from the client.
128+
pub async fn send_resource_list_changed(
129+
&self,
130+
session_id: &SessionId,
131+
params: Option<ResourceListChangedNotificationParams>,
132+
) -> SdkResult<()> {
133+
let runtime = self.runtime_by_session(session_id).await?;
134+
let runtime = runtime.lock().await.to_owned();
135+
runtime.send_resource_list_changed(params).await
136+
}
137+
138+
/// A notification from the server to the client, informing it that
139+
/// a resource has changed and may need to be read again.
140+
/// This should only be sent if the client previously sent a resources/subscribe request.
141+
pub async fn send_resource_updated(
142+
&self,
143+
session_id: &SessionId,
144+
params: ResourceUpdatedNotificationParams,
145+
) -> SdkResult<()> {
146+
let runtime = self.runtime_by_session(session_id).await?;
147+
let runtime = runtime.lock().await.to_owned();
148+
runtime.send_resource_updated(params).await
149+
}
150+
151+
/// An optional notification from the server to the client, informing it that
152+
/// the list of tools it offers has changed.
153+
/// This may be issued by servers without any previous subscription from the client.
154+
pub async fn send_tool_list_changed(
155+
&self,
156+
session_id: &SessionId,
157+
params: Option<ToolListChangedNotificationParams>,
158+
) -> SdkResult<()> {
159+
let runtime = self.runtime_by_session(session_id).await?;
160+
let runtime = runtime.lock().await.to_owned();
161+
runtime.send_tool_list_changed(params).await
162+
}
163+
164+
/// A ping request to check that the other party is still alive.
165+
/// The receiver must promptly respond, or else may be disconnected.
166+
///
167+
/// This function creates a `PingRequest` with no specific parameters, sends the request and awaits the response
168+
/// Once the response is received, it attempts to convert it into the expected
169+
/// result type.
170+
///
171+
/// # Returns
172+
/// A `SdkResult` containing the `rust_mcp_schema::Result` if the request is successful.
173+
/// If the request or conversion fails, an error is returned.
174+
pub async fn ping(
175+
&self,
176+
session_id: &SessionId,
177+
timeout: Option<Duration>,
178+
) -> SdkResult<crate::schema::Result> {
179+
let runtime = self.runtime_by_session(session_id).await?;
180+
let runtime = runtime.lock().await.to_owned();
181+
runtime.ping(timeout).await
182+
}
183+
184+
/// A request from the server to sample an LLM via the client.
185+
/// The client has full discretion over which model to select.
186+
/// The client should also inform the user before beginning sampling,
187+
/// to allow them to inspect the request (human in the loop)
188+
/// and decide whether to approve it.
189+
pub async fn create_message(
190+
&self,
191+
session_id: &SessionId,
192+
params: CreateMessageRequestParams,
193+
) -> SdkResult<CreateMessageResult> {
194+
let runtime = self.runtime_by_session(session_id).await?;
195+
let runtime = runtime.lock().await.to_owned();
196+
runtime.create_message(params).await
197+
}
198+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1+
pub(crate) mod protect_dns_rebinding;
12
pub(crate) mod session_id_gen;

0 commit comments

Comments
 (0)