Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use vector_lib::sensitive_string::SensitiveString;
#[cfg(feature = "aws-core")]
use crate::aws::AwsAuthentication;

use crate::sinks::util::http::ConnectionConfig;
use crate::{
config::ProxyConfig,
internal_events::{http_client, HttpServerRequestReceived, HttpServerResponseSent},
Expand Down Expand Up @@ -92,7 +93,26 @@ where
tls_settings: impl Into<MaybeTlsSettings>,
proxy_config: &ProxyConfig,
) -> Result<HttpClient<B>, HttpError> {
HttpClient::new_with_custom_client(tls_settings, proxy_config, &mut Client::builder())
HttpClient::new_with_connection_config(tls_settings, proxy_config, None)
}

pub fn new_with_connection_config(
tls_settings: impl Into<MaybeTlsSettings>,
proxy_config: &ProxyConfig,
connection_config: Option<ConnectionConfig>,
) -> Result<HttpClient<B>, HttpError> {
let mut builder = Client::builder();

if let Some(config) = connection_config {
if let Some(idle_secs) = config.idle_timeout_secs {
builder.pool_idle_timeout(Duration::from_secs(idle_secs));
}
if let Some(max_idle) = config.pool_idle_per_host {
builder.pool_max_idle_per_host(max_idle);
}
}

HttpClient::new_with_custom_client(tls_settings, proxy_config, &mut builder)
}

pub fn new_with_custom_client(
Expand Down
1 change: 1 addition & 0 deletions src/sinks/axiom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ impl SinkConfig for AxiomConfig {
),
payload_prefix: "".into(), // Always newline delimited JSON
payload_suffix: "".into(), // Always newline delimited JSON
connection: None,
};

http_sink_config.build(cx).await
Expand Down
9 changes: 8 additions & 1 deletion src/sinks/elasticsearch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use super::{
request_builder::ElasticsearchRequestBuilder, ElasticsearchApiVersion, ElasticsearchEncoder,
InvalidHostSnafu, Request, VersionType,
};
use crate::sinks::util::http::ConnectionConfig;
use crate::{
http::{HttpClient, MaybeAuth, QueryParameterValue, QueryParameters},
sinks::{
Expand Down Expand Up @@ -190,6 +191,7 @@ impl ElasticsearchCommon {
&request,
&tls_settings,
proxy_config,
config.connection,
)
.await
{
Expand Down Expand Up @@ -341,6 +343,7 @@ async fn get_version(
request: &RequestConfig,
tls_settings: &TlsSettings,
proxy_config: &ProxyConfig,
connection_config: Option<ConnectionConfig>,
) -> crate::Result<usize> {
#[derive(Deserialize)]
struct Version {
Expand All @@ -351,7 +354,11 @@ async fn get_version(
version: Option<Version>,
}

let client = HttpClient::new(tls_settings.clone(), proxy_config)?;
let client = HttpClient::new_with_connection_config(
tls_settings.clone(),
proxy_config,
connection_config,
)?;
let response = get(
base_url,
auth,
Expand Down
17 changes: 16 additions & 1 deletion src/sinks/elasticsearch/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
use futures::{FutureExt, TryFutureExt};
use vector_lib::configurable::configurable_component;

use crate::sinks::util::http::ConnectionConfig;
use crate::{
codecs::Transformer,
config::{AcknowledgementsConfig, DataType, Input, SinkConfig, SinkContext},
Expand Down Expand Up @@ -218,6 +219,15 @@ pub struct ElasticsearchConfig {
)]
#[configurable(derived)]
pub acknowledgements: AcknowledgementsConfig,

/// Connection-level settings for the underlying HTTP client.
///
/// This allows configuring parameters like connection idle timeout and
/// maximum idle connections per host. Useful when running behind load
/// balancers with strict idle policies.
#[configurable(derived)]
#[serde(default)]
pub connection: Option<ConnectionConfig>,
}

fn default_doc_type() -> String {
Expand Down Expand Up @@ -255,6 +265,7 @@ impl Default for ElasticsearchConfig {
data_stream: None,
metrics: None,
acknowledgements: Default::default(),
connection: None,
}
}
}
Expand Down Expand Up @@ -541,7 +552,11 @@ impl SinkConfig for ElasticsearchConfig {
let commons = ElasticsearchCommon::parse_many(self, cx.proxy()).await?;
let common = commons[0].clone();

let client = HttpClient::new(common.tls_settings.clone(), cx.proxy())?;
let client = HttpClient::new_with_connection_config(
common.tls_settings.clone(),
cx.proxy(),
self.connection,
)?;

let request_limits = self.request.tower.into_settings();

Expand Down
40 changes: 39 additions & 1 deletion src/sinks/http/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::{
};
#[cfg(feature = "aws-core")]
use crate::aws::AwsAuthentication;
use crate::sinks::util::http::ConnectionConfig;
#[cfg(feature = "aws-core")]
use crate::sinks::util::http::SigV4Config;
use crate::{
Expand Down Expand Up @@ -107,6 +108,10 @@ pub struct HttpSinkConfig {
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,

#[configurable(derived)]
#[serde(default)]
pub connection: Option<ConnectionConfig>,
}

/// HTTP method.
Expand Down Expand Up @@ -163,7 +168,11 @@ impl From<HttpMethod> for Method {
impl HttpSinkConfig {
fn build_http_client(&self, cx: &SinkContext) -> crate::Result<HttpClient> {
let tls = TlsSettings::from_options(self.tls.as_ref())?;
Ok(HttpClient::new(tls, cx.proxy())?)
Ok(HttpClient::new_with_connection_config(
tls,
cx.proxy(),
self.connection,
)?)
}

pub(super) fn build_encoder(&self) -> crate::Result<Encoder<Framer>> {
Expand Down Expand Up @@ -392,6 +401,7 @@ mod tests {
acknowledgements: AcknowledgementsConfig::default(),
payload_prefix: String::new(),
payload_suffix: String::new(),
connection: None,
};

let external_resource = ExternalResource::new(
Expand All @@ -413,4 +423,32 @@ mod tests {
}

register_validatable_component!(HttpSinkConfig);

#[test]
fn deserialize_connection_config_defaults() {
let cfg: ConnectionConfig = serde_yaml::from_str("{}").unwrap();
// Defaults should be None
assert!(cfg.idle_timeout_secs.is_none());
assert!(cfg.pool_idle_per_host.is_none());
}

#[test]
fn http_sink_config_with_connection() {
let yaml = r#"
uri: "http://example.com"
encoding:
codec: "json"
connection:
idle_timeout_secs: 120
pool_idle_per_host: 20
"#;

let cfg: HttpSinkConfig = serde_yaml::from_str(yaml).unwrap();

assert_eq!(cfg.uri.uri, "http://example.com");
assert!(cfg.connection.is_some());
let conn = cfg.connection.unwrap();
assert_eq!(conn.idle_timeout_secs, Some(120));
assert_eq!(conn.pool_idle_per_host, Some(20));
}
}
1 change: 1 addition & 0 deletions src/sinks/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ fn default_cfg(encoding: EncodingConfigWithFraming) -> HttpSinkConfig {
request: Default::default(),
tls: Default::default(),
acknowledgements: Default::default(),
connection: Default::default(),
}
}

Expand Down
1 change: 1 addition & 0 deletions src/sinks/humio/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ impl HumioLogsConfig {
timestamp_key: Some(config_timestamp_key_target_path()),
endpoint_target: EndpointTarget::Event,
auto_extract_timestamp: None,
connection: None,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/sinks/opentelemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl Default for Protocol {
request: Default::default(),
tls: Default::default(),
acknowledgements: Default::default(),
connection: None,
})
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/sinks/splunk_hec/common/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use super::{
service::{HttpRequestBuilder, MetadataFields},
EndpointTarget,
};
use crate::sinks::util::http::ConnectionConfig;
use crate::{
http::HttpClient,
internal_events::TemplateRenderingError,
Expand Down Expand Up @@ -50,6 +51,24 @@ pub fn create_client(
Ok(HttpClient::new(tls_settings, proxy_config)?)
}

pub fn create_client_with_connection_config(
tls: Option<&TlsConfig>,
proxy_config: &ProxyConfig,
connection: Option<ConnectionConfig>,
) -> crate::Result<HttpClient> {
let tls_settings = TlsSettings::from_options(tls)?;

if let Some(conn) = connection {
Ok(HttpClient::new_with_connection_config(
tls_settings,
proxy_config,
Some(conn),
)?)
} else {
Ok(HttpClient::new(tls_settings, proxy_config)?)
}
}

// TODO: `HttpBatchService` has been deprecated for direct use in sinks.
// This sink should undergo a refactor to utilize the `HttpService`
// instead, which extracts much of the boilerplate code for `Service`.
Expand Down
46 changes: 43 additions & 3 deletions src/sinks/splunk_hec/logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use vector_lib::{
sensitive_string::SensitiveString,
};

use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink};
use crate::sinks::splunk_hec::common::create_client_with_connection_config;
use crate::sinks::util::http::ConnectionConfig;
use crate::{
http::HttpClient,
sinks::{
Expand All @@ -20,8 +23,6 @@ use crate::{
},
};

use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink};

/// Configuration for the `splunk_hec_logs` sink.
#[configurable_component(sink(
"splunk_hec_logs",
Expand Down Expand Up @@ -152,6 +153,15 @@ pub struct HecLogsSinkConfig {
#[configurable(metadata(docs::advanced))]
#[serde(default = "default_endpoint_target")]
pub endpoint_target: EndpointTarget,

/// Connection-level settings for the underlying HTTP client.
///
/// This allows configuring parameters like connection idle timeout and
/// maximum idle connections per host. Useful when running behind load
/// balancers with strict idle policies.
#[configurable(derived)]
#[serde(default)]
pub connection: Option<ConnectionConfig>,
}

const fn default_endpoint_target() -> EndpointTarget {
Expand All @@ -178,6 +188,7 @@ impl GenerateConfig for HecLogsSinkConfig {
timestamp_key: None,
auto_extract_timestamp: None,
endpoint_target: EndpointTarget::Event,
connection: None,
})
.unwrap()
}
Expand All @@ -191,7 +202,14 @@ impl SinkConfig for HecLogsSinkConfig {
return Err("`auto_extract_timestamp` cannot be set for the `raw` endpoint.".into());
}

let client = create_client(self.tls.as_ref(), cx.proxy())?;
//let client = create_client(self.tls.as_ref(), cx.proxy())?;

let client = if let Some(conn) = &self.connection {
create_client_with_connection_config(self.tls.as_ref(), cx.proxy(), Some(*conn))?
} else {
create_client(self.tls.as_ref(), cx.proxy())?
};

let healthcheck = build_healthcheck(
self.endpoint.clone(),
self.default_token.inner().to_owned(),
Expand Down Expand Up @@ -334,6 +352,7 @@ mod tests {
timestamp_key: None,
auto_extract_timestamp: None,
endpoint_target: EndpointTarget::Raw,
connection: None,
};

let endpoint = format!("{endpoint}/services/collector/raw");
Expand All @@ -360,4 +379,25 @@ mod tests {
}

register_validatable_component!(HecLogsSinkConfig);

#[test]
fn splunk_config_with_connection() {
let yaml = r#"
default_token: "test_token"
endpoint: "http://splunk:8088"
encoding:
codec: "json"
connection:
idle_timeout_secs: 120
pool_idle_per_host: 15
"#;

let cfg: HecLogsSinkConfig = serde_yaml::from_str(yaml).unwrap();
assert_eq!(cfg.default_token.inner(), "test_token");
assert!(cfg.connection.is_some());

let conn = cfg.connection.unwrap();
assert_eq!(conn.idle_timeout_secs, Some(120));
assert_eq!(conn.pool_idle_per_host, Some(15));
}
}
1 change: 1 addition & 0 deletions src/sinks/splunk_hec/logs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ async fn splunk_passthrough_token() {
timestamp_key: None,
auto_extract_timestamp: None,
endpoint_target: EndpointTarget::Event,
connection: None,
};
let cx = SinkContext::default();

Expand Down
19 changes: 19 additions & 0 deletions src/sinks/util/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,25 @@ impl RequestConfig {
}
}

#[configurable_component]
#[configurable(title = "Configuration for connection behavior in the HTTP client.")]
#[configurable(description = "Configuration for connection behavior in the HTTP client.")]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct ConnectionConfig {
/// Maximum idle time for a connection before it’s closed (seconds)
pub idle_timeout_secs: Option<u64>,

/// Maximum number of idle connections to keep per host
pub pool_idle_per_host: Option<usize>,
}

impl ConnectionConfig {
pub const DEFAULT: Self = Self {
idle_timeout_secs: None,
pool_idle_per_host: None,
};
}

#[derive(Debug, Snafu)]
pub enum HeaderValidationError {
#[snafu(display("{}: {}", source, name))]
Expand Down
1 change: 1 addition & 0 deletions src/sources/splunk_hec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,7 @@ mod tests {
timestamp_key: None,
auto_extract_timestamp: None,
endpoint_target: Default::default(),
connection: None,
}
.build(SinkContext::default())
.await
Expand Down