Skip to content
189 changes: 103 additions & 86 deletions sentry/src/transports/curl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,28 @@ pub struct CurlHttpTransport {
impl CurlHttpTransport {
/// Creates a new Transport.
pub fn new(options: &ClientOptions) -> Self {
Self::new_internal(options, None)
Self::new_internal(options, None, 30)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m: As we use the number 30 as the default channel capacity in all the transports, we should extract it to a constant that we reuse in all of them.

}

/// Creates a new Transport that uses the specified [`CurlClient`].
pub fn with_client(options: &ClientOptions, client: CurlClient) -> Self {
Self::new_internal(options, Some(client))
Self::new_internal(options, Some(client), 30)
Comment thread
mvanhorn marked this conversation as resolved.
Outdated
}

fn new_internal(options: &ClientOptions, client: Option<CurlClient>) -> Self {
/// Creates a new Transport with a custom transport channel capacity.
///
/// The channel capacity bounds how many envelopes may be queued before
/// `send_envelope` blocks. A higher capacity reduces the chance of
/// dropped events in high-throughput scenarios at the cost of memory.
pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self {
Self::new_internal(options, None, channel_capacity)
}

fn new_internal(
options: &ClientOptions,
client: Option<CurlClient>,
channel_capacity: usize,
) -> Self {
let client = client.unwrap_or_else(CurlClient::new);
let http_proxy = options.http_proxy.as_ref().map(ToString::to_string);
let https_proxy = options.https_proxy.as_ref().map(ToString::to_string);
Expand All @@ -38,99 +51,103 @@ impl CurlHttpTransport {
let accept_invalid_certs = options.accept_invalid_certs;

let mut handle = client;
let thread = TransportThread::new(move |envelope, rl| {
handle.reset();
handle.url(&url).unwrap();
handle.custom_request("POST").unwrap();

if accept_invalid_certs {
handle.ssl_verify_host(false).unwrap();
handle.ssl_verify_peer(false).unwrap();
}

match (scheme, &http_proxy, &https_proxy) {
(Scheme::Https, _, Some(proxy)) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
let thread = TransportThread::with_capacity(
move |envelope, rl| {
handle.reset();
handle.url(&url).unwrap();
handle.custom_request("POST").unwrap();

if accept_invalid_certs {
handle.ssl_verify_host(false).unwrap();
handle.ssl_verify_peer(false).unwrap();
}
(_, Some(proxy), _) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);

match (scheme, &http_proxy, &https_proxy) {
(Scheme::Https, _, Some(proxy)) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
}
(_, Some(proxy), _) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
}
_ => {}
}
_ => {}
}

let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let mut body = Cursor::new(body);

let mut retry_after = None;
let mut sentry_header = None;
let mut headers = curl::easy::List::new();
headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap();
headers.append("Expect:").unwrap();
handle.http_headers(headers).unwrap();
handle.upload(true).unwrap();
handle.in_filesize(body.get_ref().len() as u64).unwrap();
handle
.read_function(move |buf| Ok(body.read(buf).unwrap_or(0)))
.unwrap();
handle.verbose(true).unwrap();
handle
.debug_function(move |info, data| {
let prefix = match info {
curl::easy::InfoType::HeaderIn => "< ",
curl::easy::InfoType::HeaderOut => "> ",
curl::easy::InfoType::DataOut => "",
_ => return,
};
sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim());
})
.unwrap();

{
let mut handle = handle.transfer();
let retry_after_setter = &mut retry_after;
let sentry_header_setter = &mut sentry_header;

let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let mut body = Cursor::new(body);

let mut retry_after = None;
let mut sentry_header = None;
let mut headers = curl::easy::List::new();
headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap();
headers.append("Expect:").unwrap();
handle.http_headers(headers).unwrap();
handle.upload(true).unwrap();
handle.in_filesize(body.get_ref().len() as u64).unwrap();
handle
.read_function(move |buf| Ok(body.read(buf).unwrap_or(0)))
.unwrap();
handle.verbose(true).unwrap();
handle
.header_function(move |data| {
if let Ok(data) = std::str::from_utf8(data) {
let mut iter = data.split(':');
if let Some(key) = iter.next().map(str::to_lowercase) {
if key == "retry-after" {
*retry_after_setter = iter.next().map(|x| x.trim().to_string());
} else if key == "x-sentry-rate-limits" {
*sentry_header_setter =
iter.next().map(|x| x.trim().to_string());
.debug_function(move |info, data| {
let prefix = match info {
curl::easy::InfoType::HeaderIn => "< ",
curl::easy::InfoType::HeaderOut => "> ",
curl::easy::InfoType::DataOut => "",
_ => return,
};
sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim());
})
.unwrap();

{
let mut handle = handle.transfer();
let retry_after_setter = &mut retry_after;
let sentry_header_setter = &mut sentry_header;
handle
.header_function(move |data| {
if let Ok(data) = std::str::from_utf8(data) {
let mut iter = data.split(':');
if let Some(key) = iter.next().map(str::to_lowercase) {
if key == "retry-after" {
*retry_after_setter =
iter.next().map(|x| x.trim().to_string());
} else if key == "x-sentry-rate-limits" {
*sentry_header_setter =
iter.next().map(|x| x.trim().to_string());
}
}
}
true
})
.unwrap();
handle.perform().ok();
}

match handle.response_code() {
Ok(response_code) => {
if let Some(sentry_header) = sentry_header {
rl.update_from_sentry_header(&sentry_header);
} else if let Some(retry_after) = retry_after {
rl.update_from_retry_after(&retry_after);
} else if response_code == 429 {
rl.update_from_429();
}
if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
}
true
})
.unwrap();
handle.perform().ok();
}

match handle.response_code() {
Ok(response_code) => {
if let Some(sentry_header) = sentry_header {
rl.update_from_sentry_header(&sentry_header);
} else if let Some(retry_after) = retry_after {
rl.update_from_retry_after(&retry_after);
} else if response_code == 429 {
rl.update_from_429();
}
if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
});
},
channel_capacity,
);
Self { thread }
}
}
Expand Down
98 changes: 57 additions & 41 deletions sentry/src/transports/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,28 @@ pub struct ReqwestHttpTransport {
impl ReqwestHttpTransport {
/// Creates a new Transport.
pub fn new(options: &ClientOptions) -> Self {
Self::new_internal(options, None)
Self::new_internal(options, None, 30)
Comment thread
mvanhorn marked this conversation as resolved.
Outdated
}

/// Creates a new Transport that uses the specified [`ReqwestClient`].
pub fn with_client(options: &ClientOptions, client: ReqwestClient) -> Self {
Self::new_internal(options, Some(client))
Self::new_internal(options, Some(client), 30)
}

fn new_internal(options: &ClientOptions, client: Option<ReqwestClient>) -> Self {
/// Creates a new Transport with a custom transport channel capacity.
///
/// The channel capacity bounds how many envelopes may be queued before
/// `send_envelope` blocks. A higher capacity reduces the chance of
/// dropped events in high-throughput scenarios at the cost of memory.
pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self {
Self::new_internal(options, None, channel_capacity)
}

fn new_internal(
options: &ClientOptions,
client: Option<ReqwestClient>,
channel_capacity: usize,
) -> Self {
let client = client.unwrap_or_else(|| {
let mut builder = reqwest::Client::builder();
if options.accept_invalid_certs {
Expand Down Expand Up @@ -64,53 +77,56 @@ impl ReqwestHttpTransport {
let auth = dsn.to_auth(Some(&user_agent)).to_string();
let url = dsn.envelope_api_url().to_string();

let thread = TransportThread::new(move |envelope, mut rl| {
let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body);
let thread = TransportThread::with_capacity(
move |envelope, mut rl| {
let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body);

// NOTE: because of lifetime issues, building the request using the
// `client` has to happen outside of this async block.
async move {
match request.send().await {
Ok(response) => {
let headers = response.headers();
// NOTE: because of lifetime issues, building the request using the
// `client` has to happen outside of this async block.
async move {
match request.send().await {
Ok(response) => {
let headers = response.headers();

if let Some(sentry_header) = headers
.get("x-sentry-rate-limits")
.and_then(|x| x.to_str().ok())
{
rl.update_from_sentry_header(sentry_header);
} else if let Some(retry_after) = headers
.get(ReqwestHeaders::RETRY_AFTER)
.and_then(|x| x.to_str().ok())
{
rl.update_from_retry_after(retry_after);
} else if response.status() == StatusCode::TOO_MANY_REQUESTS {
rl.update_from_429();
}
if let Some(sentry_header) = headers
.get("x-sentry-rate-limits")
.and_then(|x| x.to_str().ok())
{
rl.update_from_sentry_header(sentry_header);
} else if let Some(retry_after) = headers
.get(ReqwestHeaders::RETRY_AFTER)
.and_then(|x| x.to_str().ok())
{
rl.update_from_retry_after(retry_after);
} else if response.status() == StatusCode::TOO_MANY_REQUESTS {
rl.update_from_429();
}

let is_payload_too_large =
response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE;
match response.text().await {
Err(err) => {
sentry_debug!("Failed to read sentry response: {}", err);
let is_payload_too_large =
response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE;
match response.text().await {
Err(err) => {
sentry_debug!("Failed to read sentry response: {}", err);
}
Ok(text) => {
sentry_debug!("Get response: `{}`", text);
}
}
Ok(text) => {
sentry_debug!("Get response: `{}`", text);
if is_payload_too_large {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
}
}
if is_payload_too_large {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
rl
}
rl
}
});
},
channel_capacity,
);
Self { thread }
}
}
Expand Down
19 changes: 16 additions & 3 deletions sentry/src/transports/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,25 @@ pub struct TransportThread {
}

impl TransportThread {
/// Spawn a new background thread.
pub fn new<SendFn>(mut send: SendFn) -> Self
/// Spawn a new background thread with the default channel capacity of 30.
pub fn new<SendFn>(send: SendFn) -> Self
where
SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static,
{
let (sender, receiver) = sync_channel(30);
Self::with_capacity(send, 30)
}

/// Spawn a new background thread with a custom channel capacity.
///
/// The channel capacity bounds how many envelopes may be queued before
/// `send` blocks. `channel_capacity` is clamped to a minimum of 1 to
/// avoid a rendezvous channel, which would silently drop envelopes under
/// `try_send`.
pub fn with_capacity<SendFn>(mut send: SendFn, channel_capacity: usize) -> Self
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m: Let's make this pub(crate) for now to limit public API surface. If folks want to have this as a public API in the future, we can expose it at that time.

Suggested change
pub fn with_capacity<SendFn>(mut send: SendFn, channel_capacity: usize) -> Self
pub(crate) fn with_capacity<SendFn>(mut send: SendFn, channel_capacity: usize) -> Self

where
SendFn: FnMut(Envelope, &mut RateLimiter) + Send + 'static,
{
let (sender, receiver) = sync_channel(channel_capacity.max(1));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should honor 0 here instead of clamping it. This is an advanced, opt-in transport API, and sync_channel(0) has defined rendezvous/no-buffer semantics. A capacity of 1 can still drop most events under bursts; it is not a general safeguard, only a different buffering policy. If someone explicitly passes 0, treating that as “no queued buffering” seems reasonable.

I tested this end-to-end with the clamp removed and with_channel_capacity(..., 0): sending 10 rapid events accepted 1 and dropped 9. That matches the channel semantics: zero capacity does not necessarily drop everything; it accepts an envelope when try_send happens while the transport thread is already waiting on the receiver. If we keep support for 0, the doc comment should describe that behavior rather than saying it would silently drop envelopes generally.

let shutdown = Arc::new(AtomicBool::new(false));
let shutdown_worker = shutdown.clone();
let handle = thread::Builder::new()
Comment on lines +46 to 53
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: With channel_capacity=0, flush() and drop() use a blocking send() for control tasks, which can cause the caller to hang if the worker thread is busy.
Severity: MEDIUM

Suggested Fix

Consider using try_send() for control tasks like Task::Flush and Task::Shutdown, similar to how envelopes are handled. This would align with the documented behavior and prevent blocking in flush() and drop() when the channel capacity is zero and the worker is busy.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: sentry/src/transports/thread.rs#L46-L53

Potential issue: When the transport thread is configured with `channel_capacity=0`, it
creates a rendezvous channel. However, the `flush()` and `drop()` methods use a blocking
`send()` to dispatch control tasks (`Task::Flush`, `Task::Shutdown`). If the worker
thread is occupied with a long-running operation, such as sending an envelope over HTTP,
it cannot receive new tasks. Consequently, the call to `flush()` will block until its
timeout is reached, and more critically, `drop()` will block the calling thread (e.g.,
during application shutdown) until the long-running operation completes. This can lead
to significant delays or hangs during shutdown for users who opt into this advanced
configuration. The documentation is also misleading, as it implies non-blocking behavior
for all sends.

Also affects:

  • sentry/src/transports/tokio_thread.rs:48~55

Expand Down
Loading