Skip to content
15 changes: 14 additions & 1 deletion sentry-core/src/clientoptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ pub struct ClientOptions {
pub session_mode: SessionMode,
/// The user agent that should be reported.
pub user_agent: Cow<'static, str>,
/// The capacity of the transport channel.
///
/// This controls how many envelopes can be queued before the transport
/// starts dropping them. In high-throughput scenarios, increasing this
/// value can reduce the chance of losing events. Defaults to 30.
pub transport_channel_capacity: usize,
Comment thread
mvanhorn marked this conversation as resolved.
Outdated
}

impl ClientOptions {
Expand Down Expand Up @@ -278,7 +284,13 @@ impl fmt::Debug for ClientOptions {
.field("enable_logs", &self.enable_logs)
.field("before_send_log", &before_send_log);

debug_struct.field("user_agent", &self.user_agent).finish()
debug_struct
.field("user_agent", &self.user_agent)
.field(
"transport_channel_capacity",
&self.transport_channel_capacity,
)
.finish()
}
}

Expand Down Expand Up @@ -313,6 +325,7 @@ impl Default for ClientOptions {
session_mode: SessionMode::Application,
user_agent: Cow::Borrowed(USER_AGENT),
max_request_body_size: MaxRequestBodySize::Medium,
transport_channel_capacity: 30,
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
#[cfg(feature = "logs")]
enable_logs: true,
#[cfg(feature = "logs")]
Expand Down
171 changes: 88 additions & 83 deletions sentry/src/transports/curl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,101 +36,106 @@ impl CurlHttpTransport {
let url = dsn.envelope_api_url().to_string();
let scheme = dsn.scheme();
let accept_invalid_certs = options.accept_invalid_certs;
let channel_capacity = options.transport_channel_capacity;

let mut handle = client;
let thread = TransportThread::new(move |envelope, rl| {
handle.reset();
handle.url(&url).unwrap();
handle.custom_request("POST").unwrap();

if accept_invalid_certs {
handle.ssl_verify_host(false).unwrap();
handle.ssl_verify_peer(false).unwrap();
}

match (scheme, &http_proxy, &https_proxy) {
(Scheme::Https, _, Some(proxy)) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
let thread = TransportThread::new(
move |envelope, rl| {
handle.reset();
handle.url(&url).unwrap();
handle.custom_request("POST").unwrap();

if accept_invalid_certs {
handle.ssl_verify_host(false).unwrap();
handle.ssl_verify_peer(false).unwrap();
}
(_, Some(proxy), _) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);

match (scheme, &http_proxy, &https_proxy) {
(Scheme::Https, _, Some(proxy)) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
}
(_, Some(proxy), _) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
}
_ => {}
}
_ => {}
}

let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let mut body = Cursor::new(body);

let mut retry_after = None;
let mut sentry_header = None;
let mut headers = curl::easy::List::new();
headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap();
headers.append("Expect:").unwrap();
handle.http_headers(headers).unwrap();
handle.upload(true).unwrap();
handle.in_filesize(body.get_ref().len() as u64).unwrap();
handle
.read_function(move |buf| Ok(body.read(buf).unwrap_or(0)))
.unwrap();
handle.verbose(true).unwrap();
handle
.debug_function(move |info, data| {
let prefix = match info {
curl::easy::InfoType::HeaderIn => "< ",
curl::easy::InfoType::HeaderOut => "> ",
curl::easy::InfoType::DataOut => "",
_ => return,
};
sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim());
})
.unwrap();

{
let mut handle = handle.transfer();
let retry_after_setter = &mut retry_after;
let sentry_header_setter = &mut sentry_header;

let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let mut body = Cursor::new(body);

let mut retry_after = None;
let mut sentry_header = None;
let mut headers = curl::easy::List::new();
headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap();
headers.append("Expect:").unwrap();
handle.http_headers(headers).unwrap();
handle.upload(true).unwrap();
handle.in_filesize(body.get_ref().len() as u64).unwrap();
handle
.read_function(move |buf| Ok(body.read(buf).unwrap_or(0)))
.unwrap();
handle.verbose(true).unwrap();
handle
.header_function(move |data| {
if let Ok(data) = std::str::from_utf8(data) {
let mut iter = data.split(':');
if let Some(key) = iter.next().map(str::to_lowercase) {
if key == "retry-after" {
*retry_after_setter = iter.next().map(|x| x.trim().to_string());
} else if key == "x-sentry-rate-limits" {
*sentry_header_setter =
iter.next().map(|x| x.trim().to_string());
.debug_function(move |info, data| {
let prefix = match info {
curl::easy::InfoType::HeaderIn => "< ",
curl::easy::InfoType::HeaderOut => "> ",
curl::easy::InfoType::DataOut => "",
_ => return,
};
sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim());
})
.unwrap();

{
let mut handle = handle.transfer();
let retry_after_setter = &mut retry_after;
let sentry_header_setter = &mut sentry_header;
handle
.header_function(move |data| {
if let Ok(data) = std::str::from_utf8(data) {
let mut iter = data.split(':');
if let Some(key) = iter.next().map(str::to_lowercase) {
if key == "retry-after" {
*retry_after_setter =
iter.next().map(|x| x.trim().to_string());
} else if key == "x-sentry-rate-limits" {
*sentry_header_setter =
iter.next().map(|x| x.trim().to_string());
}
}
}
true
})
.unwrap();
handle.perform().ok();
}

match handle.response_code() {
Ok(response_code) => {
if let Some(sentry_header) = sentry_header {
rl.update_from_sentry_header(&sentry_header);
} else if let Some(retry_after) = retry_after {
rl.update_from_retry_after(&retry_after);
} else if response_code == 429 {
rl.update_from_429();
}
if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
}
true
})
.unwrap();
handle.perform().ok();
}

match handle.response_code() {
Ok(response_code) => {
if let Some(sentry_header) = sentry_header {
rl.update_from_sentry_header(&sentry_header);
} else if let Some(retry_after) = retry_after {
rl.update_from_retry_after(&retry_after);
} else if response_code == 429 {
rl.update_from_429();
}
if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
});
},
channel_capacity,
);
Self { thread }
}
}
Expand Down
80 changes: 42 additions & 38 deletions sentry/src/transports/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,54 +63,58 @@ impl ReqwestHttpTransport {
let user_agent = options.user_agent.clone();
let auth = dsn.to_auth(Some(&user_agent)).to_string();
let url = dsn.envelope_api_url().to_string();
let channel_capacity = options.transport_channel_capacity;

let thread = TransportThread::new(move |envelope, mut rl| {
let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body);
let thread = TransportThread::new(
move |envelope, mut rl| {
let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body);

// NOTE: because of lifetime issues, building the request using the
// `client` has to happen outside of this async block.
async move {
match request.send().await {
Ok(response) => {
let headers = response.headers();
// NOTE: because of lifetime issues, building the request using the
// `client` has to happen outside of this async block.
async move {
match request.send().await {
Ok(response) => {
let headers = response.headers();

if let Some(sentry_header) = headers
.get("x-sentry-rate-limits")
.and_then(|x| x.to_str().ok())
{
rl.update_from_sentry_header(sentry_header);
} else if let Some(retry_after) = headers
.get(ReqwestHeaders::RETRY_AFTER)
.and_then(|x| x.to_str().ok())
{
rl.update_from_retry_after(retry_after);
} else if response.status() == StatusCode::TOO_MANY_REQUESTS {
rl.update_from_429();
}
if let Some(sentry_header) = headers
.get("x-sentry-rate-limits")
.and_then(|x| x.to_str().ok())
{
rl.update_from_sentry_header(sentry_header);
} else if let Some(retry_after) = headers
.get(ReqwestHeaders::RETRY_AFTER)
.and_then(|x| x.to_str().ok())
{
rl.update_from_retry_after(retry_after);
} else if response.status() == StatusCode::TOO_MANY_REQUESTS {
rl.update_from_429();
}

let is_payload_too_large =
response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE;
match response.text().await {
Err(err) => {
sentry_debug!("Failed to read sentry response: {}", err);
let is_payload_too_large =
response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE;
match response.text().await {
Err(err) => {
sentry_debug!("Failed to read sentry response: {}", err);
}
Ok(text) => {
sentry_debug!("Get response: `{}`", text);
}
}
Ok(text) => {
sentry_debug!("Get response: `{}`", text);
if is_payload_too_large {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
}
}
if is_payload_too_large {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
rl
}
rl
}
});
},
channel_capacity,
);
Self { thread }
}
}
Expand Down
4 changes: 2 additions & 2 deletions sentry/src/transports/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ pub struct TransportThread {

impl TransportThread {
/// Spawn a new background thread.
pub fn new<SendFn>(mut send: SendFn) -> Self
pub fn new<SendFn>(mut send: SendFn, channel_capacity: usize) -> Self
where
SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static,
{
let (sender, receiver) = sync_channel(30);
let (sender, receiver) = sync_channel(channel_capacity.max(1));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should honor 0 here instead of clamping it. This is an advanced, opt-in transport API, and sync_channel(0) has defined rendezvous/no-buffer semantics. A capacity of 1 can still drop most events under bursts; it is not a general safeguard, only a different buffering policy. If someone explicitly passes 0, treating that as “no queued buffering” seems reasonable.

I tested this end-to-end with the clamp removed and with_channel_capacity(..., 0): sending 10 rapid events accepted 1 and dropped 9. That matches the channel semantics: zero capacity does not necessarily drop everything; it accepts an envelope when try_send happens while the transport thread is already waiting on the receiver. If we keep support for 0, the doc comment should describe that behavior rather than saying it would silently drop envelopes generally.

let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_worker = shutdown.clone();
let handle = thread::Builder::new()
Comment on lines +46 to 53
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: With channel_capacity=0, flush() and drop() use a blocking send() for control tasks, which can cause the caller to hang if the worker thread is busy.
Severity: MEDIUM

Suggested Fix

Consider using try_send() for control tasks like Task::Flush and Task::Shutdown, similar to how envelopes are handled. This would align with the documented behavior and prevent blocking in flush() and drop() when the channel capacity is zero and the worker is busy.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: sentry/src/transports/thread.rs#L46-L53

Potential issue: When the transport thread is configured with `channel_capacity=0`, it
creates a rendezvous channel. However, the `flush()` and `drop()` methods use a blocking
`send()` to dispatch control tasks (`Task::Flush`, `Task::Shutdown`). If the worker
thread is occupied with a long-running operation, such as sending an envelope over HTTP,
it cannot receive new tasks. Consequently, the call to `flush()` will block until its
timeout is reached, and more critically, `drop()` will block the calling thread (e.g.,
during application shutdown) until the long-running operation completes. This can lead
to significant delays or hangs during shutdown for users who opt into this advanced
configuration. The documentation is also misleading, as it implies non-blocking behavior
for all sends.

Also affects:

  • sentry/src/transports/tokio_thread.rs:48~55

Expand Down
4 changes: 2 additions & 2 deletions sentry/src/transports/tokio_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ pub struct TransportThread {

impl TransportThread {
/// Spawn a new background thread.
pub fn new<SendFn, SendFuture>(mut send: SendFn) -> Self
pub fn new<SendFn, SendFuture>(mut send: SendFn, channel_capacity: usize) -> Self
where
SendFn: FnMut(Envelope, RateLimiter) -> SendFuture + Send + 'static,
// NOTE: returning RateLimiter here, otherwise we are in borrow hell
SendFuture: std::future::Future<Output = RateLimiter>,
{
let (sender, receiver) = sync_channel(30);
let (sender, receiver) = sync_channel(channel_capacity.max(1));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concept applies here; I tested both transport thread variants with the clamp removed, and both accepted 1 of 10 rapid events with capacity 0 rather than dropping everything.

let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_worker = shutdown.clone();
let handle = thread::Builder::new()
Expand Down
Loading
Loading