Skip to content
194 changes: 107 additions & 87 deletions sentry/src/transports/curl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::time::Duration;

use curl::easy::Easy as CurlClient;

use super::{thread::TransportThread, HTTP_PAYLOAD_TOO_LARGE, HTTP_PAYLOAD_TOO_LARGE_MESSAGE};
use super::{
thread::TransportThread, DEFAULT_CHANNEL_CAPACITY, HTTP_PAYLOAD_TOO_LARGE,
HTTP_PAYLOAD_TOO_LARGE_MESSAGE,
};

use crate::{sentry_debug, types::Scheme, ClientOptions, Envelope, Transport};

Expand All @@ -18,15 +21,28 @@ pub struct CurlHttpTransport {
impl CurlHttpTransport {
/// Creates a new Transport.
pub fn new(options: &ClientOptions) -> Self {
Self::new_internal(options, None)
Self::new_internal(options, None, DEFAULT_CHANNEL_CAPACITY)
}

/// Creates a new Transport that uses the specified [`CurlClient`].
pub fn with_client(options: &ClientOptions, client: CurlClient) -> Self {
Self::new_internal(options, Some(client))
Self::new_internal(options, Some(client), DEFAULT_CHANNEL_CAPACITY)
}

fn new_internal(options: &ClientOptions, client: Option<CurlClient>) -> Self {
/// Creates a new Transport with a custom transport channel capacity.
///
/// The channel capacity bounds how many envelopes may be queued before
/// `send_envelope` blocks. A higher capacity reduces the chance of
/// dropped events in high-throughput scenarios at the cost of memory.
pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self {
Self::new_internal(options, None, channel_capacity)
}

fn new_internal(
options: &ClientOptions,
client: Option<CurlClient>,
channel_capacity: usize,
) -> Self {
let client = client.unwrap_or_else(CurlClient::new);
let http_proxy = options.http_proxy.as_ref().map(ToString::to_string);
let https_proxy = options.https_proxy.as_ref().map(ToString::to_string);
Expand All @@ -38,99 +54,103 @@ impl CurlHttpTransport {
let accept_invalid_certs = options.accept_invalid_certs;

let mut handle = client;
let thread = TransportThread::new(move |envelope, rl| {
handle.reset();
handle.url(&url).unwrap();
handle.custom_request("POST").unwrap();

if accept_invalid_certs {
handle.ssl_verify_host(false).unwrap();
handle.ssl_verify_peer(false).unwrap();
}

match (scheme, &http_proxy, &https_proxy) {
(Scheme::Https, _, Some(proxy)) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
let thread = TransportThread::with_capacity(
move |envelope, rl| {
handle.reset();
handle.url(&url).unwrap();
handle.custom_request("POST").unwrap();

if accept_invalid_certs {
handle.ssl_verify_host(false).unwrap();
handle.ssl_verify_peer(false).unwrap();
}
(_, Some(proxy), _) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);

match (scheme, &http_proxy, &https_proxy) {
(Scheme::Https, _, Some(proxy)) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
}
(_, Some(proxy), _) => {
if let Err(err) = handle.proxy(proxy) {
sentry_debug!("invalid proxy: {:?}", err);
}
}
_ => {}
}
_ => {}
}

let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let mut body = Cursor::new(body);

let mut retry_after = None;
let mut sentry_header = None;
let mut headers = curl::easy::List::new();
headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap();
headers.append("Expect:").unwrap();
handle.http_headers(headers).unwrap();
handle.upload(true).unwrap();
handle.in_filesize(body.get_ref().len() as u64).unwrap();
handle
.read_function(move |buf| Ok(body.read(buf).unwrap_or(0)))
.unwrap();
handle.verbose(true).unwrap();
handle
.debug_function(move |info, data| {
let prefix = match info {
curl::easy::InfoType::HeaderIn => "< ",
curl::easy::InfoType::HeaderOut => "> ",
curl::easy::InfoType::DataOut => "",
_ => return,
};
sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim());
})
.unwrap();

{
let mut handle = handle.transfer();
let retry_after_setter = &mut retry_after;
let sentry_header_setter = &mut sentry_header;

let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let mut body = Cursor::new(body);

let mut retry_after = None;
let mut sentry_header = None;
let mut headers = curl::easy::List::new();
headers.append(&format!("X-Sentry-Auth: {auth}")).unwrap();
headers.append("Expect:").unwrap();
handle.http_headers(headers).unwrap();
handle.upload(true).unwrap();
handle.in_filesize(body.get_ref().len() as u64).unwrap();
handle
.read_function(move |buf| Ok(body.read(buf).unwrap_or(0)))
.unwrap();
handle.verbose(true).unwrap();
handle
.header_function(move |data| {
if let Ok(data) = std::str::from_utf8(data) {
let mut iter = data.split(':');
if let Some(key) = iter.next().map(str::to_lowercase) {
if key == "retry-after" {
*retry_after_setter = iter.next().map(|x| x.trim().to_string());
} else if key == "x-sentry-rate-limits" {
*sentry_header_setter =
iter.next().map(|x| x.trim().to_string());
.debug_function(move |info, data| {
let prefix = match info {
curl::easy::InfoType::HeaderIn => "< ",
curl::easy::InfoType::HeaderOut => "> ",
curl::easy::InfoType::DataOut => "",
_ => return,
};
sentry_debug!("curl: {}{}", prefix, String::from_utf8_lossy(data).trim());
})
.unwrap();

{
let mut handle = handle.transfer();
let retry_after_setter = &mut retry_after;
let sentry_header_setter = &mut sentry_header;
handle
.header_function(move |data| {
if let Ok(data) = std::str::from_utf8(data) {
let mut iter = data.split(':');
if let Some(key) = iter.next().map(str::to_lowercase) {
if key == "retry-after" {
*retry_after_setter =
iter.next().map(|x| x.trim().to_string());
} else if key == "x-sentry-rate-limits" {
*sentry_header_setter =
iter.next().map(|x| x.trim().to_string());
}
}
}
true
})
.unwrap();
handle.perform().ok();
}

match handle.response_code() {
Ok(response_code) => {
if let Some(sentry_header) = sentry_header {
rl.update_from_sentry_header(&sentry_header);
} else if let Some(retry_after) = retry_after {
rl.update_from_retry_after(&retry_after);
} else if response_code == 429 {
rl.update_from_429();
}
if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
}
true
})
.unwrap();
handle.perform().ok();
}

match handle.response_code() {
Ok(response_code) => {
if let Some(sentry_header) = sentry_header {
rl.update_from_sentry_header(&sentry_header);
} else if let Some(retry_after) = retry_after {
rl.update_from_retry_after(&retry_after);
} else if response_code == 429 {
rl.update_from_429();
}
if response_code == HTTP_PAYLOAD_TOO_LARGE as u32 {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
});
},
channel_capacity,
);
Self { thread }
}
}
Expand Down
3 changes: 3 additions & 0 deletions sentry/src/transports/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub(crate) const HTTP_PAYLOAD_TOO_LARGE: u16 = 413;
pub(crate) const HTTP_PAYLOAD_TOO_LARGE_MESSAGE: &str =
"Envelope was discarded due to size limits (HTTP 413).";

#[cfg(sentry_any_http_transport)]
pub(crate) const DEFAULT_CHANNEL_CAPACITY: usize = 30;

#[cfg(feature = "reqwest")]
type DefaultTransport = ReqwestHttpTransport;

Expand Down
101 changes: 59 additions & 42 deletions sentry/src/transports/reqwest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::time::Duration;
use reqwest::{header as ReqwestHeaders, Client as ReqwestClient, Proxy, StatusCode};

use super::{
tokio_thread::TransportThread, HTTP_PAYLOAD_TOO_LARGE, HTTP_PAYLOAD_TOO_LARGE_MESSAGE,
tokio_thread::TransportThread, DEFAULT_CHANNEL_CAPACITY, HTTP_PAYLOAD_TOO_LARGE,
HTTP_PAYLOAD_TOO_LARGE_MESSAGE,
};

use crate::{sentry_debug, ClientOptions, Envelope, Transport};
Expand All @@ -21,15 +22,28 @@ pub struct ReqwestHttpTransport {
impl ReqwestHttpTransport {
/// Creates a new Transport.
pub fn new(options: &ClientOptions) -> Self {
Self::new_internal(options, None)
Self::new_internal(options, None, DEFAULT_CHANNEL_CAPACITY)
}

/// Creates a new Transport that uses the specified [`ReqwestClient`].
pub fn with_client(options: &ClientOptions, client: ReqwestClient) -> Self {
Self::new_internal(options, Some(client))
Self::new_internal(options, Some(client), DEFAULT_CHANNEL_CAPACITY)
}

fn new_internal(options: &ClientOptions, client: Option<ReqwestClient>) -> Self {
/// Creates a new Transport with a custom transport channel capacity.
///
/// The channel capacity bounds how many envelopes may be queued before
/// `send_envelope` blocks. A higher capacity reduces the chance of
/// dropped events in high-throughput scenarios at the cost of memory.
pub fn with_channel_capacity(options: &ClientOptions, channel_capacity: usize) -> Self {
Self::new_internal(options, None, channel_capacity)
}

fn new_internal(
options: &ClientOptions,
client: Option<ReqwestClient>,
channel_capacity: usize,
) -> Self {
let client = client.unwrap_or_else(|| {
let mut builder = reqwest::Client::builder();
if options.accept_invalid_certs {
Expand Down Expand Up @@ -64,53 +78,56 @@ impl ReqwestHttpTransport {
let auth = dsn.to_auth(Some(&user_agent)).to_string();
let url = dsn.envelope_api_url().to_string();

let thread = TransportThread::new(move |envelope, mut rl| {
let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body);
let thread = TransportThread::with_capacity(
move |envelope, mut rl| {
let mut body = Vec::new();
envelope.to_writer(&mut body).unwrap();
let request = client.post(&url).header("X-Sentry-Auth", &auth).body(body);

// NOTE: because of lifetime issues, building the request using the
// `client` has to happen outside of this async block.
async move {
match request.send().await {
Ok(response) => {
let headers = response.headers();
// NOTE: because of lifetime issues, building the request using the
// `client` has to happen outside of this async block.
async move {
match request.send().await {
Ok(response) => {
let headers = response.headers();

if let Some(sentry_header) = headers
.get("x-sentry-rate-limits")
.and_then(|x| x.to_str().ok())
{
rl.update_from_sentry_header(sentry_header);
} else if let Some(retry_after) = headers
.get(ReqwestHeaders::RETRY_AFTER)
.and_then(|x| x.to_str().ok())
{
rl.update_from_retry_after(retry_after);
} else if response.status() == StatusCode::TOO_MANY_REQUESTS {
rl.update_from_429();
}
if let Some(sentry_header) = headers
.get("x-sentry-rate-limits")
.and_then(|x| x.to_str().ok())
{
rl.update_from_sentry_header(sentry_header);
} else if let Some(retry_after) = headers
.get(ReqwestHeaders::RETRY_AFTER)
.and_then(|x| x.to_str().ok())
{
rl.update_from_retry_after(retry_after);
} else if response.status() == StatusCode::TOO_MANY_REQUESTS {
rl.update_from_429();
}

let is_payload_too_large =
response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE;
match response.text().await {
Err(err) => {
sentry_debug!("Failed to read sentry response: {}", err);
let is_payload_too_large =
response.status().as_u16() == HTTP_PAYLOAD_TOO_LARGE;
match response.text().await {
Err(err) => {
sentry_debug!("Failed to read sentry response: {}", err);
}
Ok(text) => {
sentry_debug!("Get response: `{}`", text);
}
}
Ok(text) => {
sentry_debug!("Get response: `{}`", text);
if is_payload_too_large {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
}
}
if is_payload_too_large {
sentry_debug!("{HTTP_PAYLOAD_TOO_LARGE_MESSAGE}");
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
}
Err(err) => {
sentry_debug!("Failed to send envelope: {}", err);
}
rl
}
rl
}
});
},
channel_capacity,
);
Self { thread }
}
}
Expand Down
Loading
Loading