Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/socket_tcp_disconnect_mode.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Added a `disconnect_mode` configuration option to the `socket` (TCP mode), `logstash`, `fluent`, `syslog` (TCP mode), and `statsd` (TCP mode) sources. This controls how Vector closes TCP connections on shutdown or when `max_connection_duration_secs` elapses. The `drain` mode maintains the existing graceful shutdown behaviour while the `abort` mode closes connections immediately without waiting for the client to acknowledge the shutdown. This is useful for clients that never read from the socket and therefore cannot detect a graceful shutdown. The `logstash` and `fluent` sources default to `abort` to match the behaviour of Logstash's own Beats input plugin and Fluentd's `in_forward` plugin respectively.

authors: tronboto
18 changes: 17 additions & 1 deletion src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use vector_lib::{
};
use vrl::value::{Kind, Value, kind::Collection};

use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
use super::util::net::{DisconnectMode, SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
use crate::{
config::{
DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
Expand Down Expand Up @@ -195,6 +195,16 @@ pub struct FluentTcpConfig {
#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: SourceAcknowledgementsConfig,

#[configurable(derived)]
#[serde(default = "default_disconnect_mode")]
disconnect_mode: DisconnectMode,
}

// Mimic Fluentd's in_forward plugin which sets SO_LINGER=0 by default:
// https://github.com/fluent/fluentd/blob/ead4c3d06685de751d8c4c92e448411c617dded5/lib/fluent/plugin_helper/server.rb#L288-L289
const fn default_disconnect_mode() -> DisconnectMode {
DisconnectMode::Abort
}

impl FluentTcpConfig {
Expand All @@ -220,6 +230,7 @@ impl FluentTcpConfig {
tls_client_metadata_key,
self.receive_buffer_bytes,
None,
self.disconnect_mode,
cx,
self.acknowledgements,
self.connection_limit,
Expand Down Expand Up @@ -283,6 +294,7 @@ impl GenerateConfig for FluentConfig {
receive_buffer_bytes: None,
acknowledgements: Default::default(),
connection_limit: Some(2),
disconnect_mode: default_disconnect_mode(),
}),
log_namespace: None,
})
Expand Down Expand Up @@ -1097,6 +1109,7 @@ mod tests {
receive_buffer_bytes: None,
acknowledgements: true.into(),
connection_limit: None,
disconnect_mode: default_disconnect_mode(),
}),
log_namespace: None,
}
Expand Down Expand Up @@ -1164,6 +1177,7 @@ mod tests {
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
disconnect_mode: default_disconnect_mode(),
}),
log_namespace: Some(true),
};
Expand Down Expand Up @@ -1222,6 +1236,7 @@ mod tests {
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
disconnect_mode: default_disconnect_mode(),
}),
log_namespace: None,
};
Expand Down Expand Up @@ -1450,6 +1465,7 @@ mod integration_tests {
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
disconnect_mode: default_disconnect_mode(),
}),
log_namespace: None,
}
Expand Down
16 changes: 15 additions & 1 deletion src/sources/logstash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use vector_lib::{
};
use vrl::value::{KeyString, Kind, kind::Collection};

use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
use super::util::net::{DisconnectMode, SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
use crate::{
config::{
DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
Expand Down Expand Up @@ -62,6 +62,10 @@ pub struct LogstashConfig {
#[configurable(metadata(docs::advanced))]
connection_limit: Option<u32>,

#[configurable(derived)]
#[serde(default = "default_disconnect_mode")]
disconnect_mode: DisconnectMode,

#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: SourceAcknowledgementsConfig,
Expand Down Expand Up @@ -115,6 +119,12 @@ impl LogstashConfig {
}
}

// Mimic Logstash's Beats input plugin which sets SO_LINGER=0 by default:
// https://github.com/logstash-plugins/logstash-input-beats/blob/4898d37c63255c109bea73f3c7eccdd0421f532f/src/main/java/org/logstash/beats/Server.java#L72
const fn default_disconnect_mode() -> DisconnectMode {
DisconnectMode::Abort
}

impl Default for LogstashConfig {
fn default() -> Self {
Self {
Expand All @@ -125,6 +135,7 @@ impl Default for LogstashConfig {
receive_buffer_bytes: None,
acknowledgements: Default::default(),
connection_limit: None,
disconnect_mode: default_disconnect_mode(),
log_namespace: None,
}
}
Expand Down Expand Up @@ -163,6 +174,7 @@ impl SourceConfig for LogstashConfig {
tls_client_metadata_key,
self.receive_buffer_bytes,
None,
self.disconnect_mode,
cx,
self.acknowledgements,
self.connection_limit,
Expand Down Expand Up @@ -755,6 +767,7 @@ mod test {
receive_buffer_bytes: None,
acknowledgements: true.into(),
connection_limit: None,
disconnect_mode: default_disconnect_mode(),
log_namespace: None,
}
.build(SourceContext::new_test(sender, None))
Expand Down Expand Up @@ -1016,6 +1029,7 @@ mod integration_tests {
receive_buffer_bytes: None,
acknowledgements: false.into(),
connection_limit: None,
disconnect_mode: default_disconnect_mode(),
log_namespace: None,
}
.build(SourceContext::new_test(sender, None))
Expand Down
34 changes: 34 additions & 0 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl SourceConfig for SocketConfig {
tls_client_metadata_key,
config.receive_buffer_bytes(),
config.max_connection_duration_secs(),
config.disconnect_mode(),
cx,
false.into(),
config.connection_limit,
Expand Down Expand Up @@ -367,6 +368,7 @@ mod test {
};

use super::{SocketConfig, tcp::TcpConfig, udp::UdpConfig};
use crate::sources::util::net::DisconnectMode;
use crate::{
SourceSender,
config::{ComponentKey, GlobalOptions, SourceConfig, SourceContext, log_schema},
Expand Down Expand Up @@ -923,6 +925,38 @@ mod test {
}
}

#[tokio::test]
async fn tcp_disconnect_mode_abort_on_shutdown() {
let source_id = ComponentKey::from("tcp_disconnect_mode_abort_on_shutdown");
let (tx, _) = SourceSender::new_test();
let (guard, addr) = next_addr();
let (cx, mut shutdown) = SourceContext::new_shutdown(&source_id, tx);

let mut source_config = TcpConfig::from_address(addr.into());
source_config.set_disconnect_mode(DisconnectMode::Abort);
let source_task = SocketConfig::from(source_config).build(cx).await.unwrap();

drop(tokio::spawn(source_task));
wait_for_tcp_and_release(guard, addr).await;

let mut stream = TcpStream::connect(addr)
.await
.expect("stream should be able to connect");
let mut buffer = [0u8; 10];

let deadline = Instant::now() + Duration::from_secs(10);
tokio::spawn(shutdown.shutdown_source(&source_id, deadline));

let read_result = tokio::time::timeout(Duration::from_secs(5), stream.read(&mut buffer))
.await
.expect("timed out waiting for connection to close");

match read_result {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionReset),
Ok(n) => panic!("expected connection reset, got Ok({n})"),
}
}

//////// UDP TESTS ////////
async fn send_lines_udp(to: SocketAddr, lines: impl IntoIterator<Item = String>) -> UdpSocket {
send_lines_udp_from(bind_unused_udp(), to, lines)
Expand Down
16 changes: 15 additions & 1 deletion src/sources/socket/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
codecs::Decoder,
event::Event,
serde::default_decoding,
sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource},
sources::util::net::{DisconnectMode, SocketListenAddr, TcpNullAcker, TcpSource},
tcp::TcpKeepaliveConfig,
tls::TlsSourceConfig,
};
Expand Down Expand Up @@ -79,6 +79,10 @@ pub struct TcpConfig {
#[configurable(metadata(docs::type_unit = "connections"))]
pub connection_limit: Option<u32>,

#[configurable(derived)]
#[serde(default)]
disconnect_mode: DisconnectMode,

#[configurable(derived)]
pub(super) framing: Option<FramingConfig>,

Expand Down Expand Up @@ -115,6 +119,7 @@ impl TcpConfig {
framing: None,
decoding: default_decoding(),
connection_limit: None,
disconnect_mode: DisconnectMode::Drain,
log_namespace: None,
}
}
Expand Down Expand Up @@ -159,11 +164,20 @@ impl TcpConfig {
self.max_connection_duration_secs
}

pub const fn disconnect_mode(&self) -> DisconnectMode {
self.disconnect_mode
}

pub const fn set_max_connection_duration_secs(&mut self, val: Option<u64>) -> &mut Self {
self.max_connection_duration_secs = val;
self
}

pub const fn set_disconnect_mode(&mut self, val: DisconnectMode) -> &mut Self {
self.disconnect_mode = val;
self
}

pub const fn set_shutdown_timeout_secs(&mut self, val: u64) -> &mut Self {
self.shutdown_timeout_secs = Duration::from_secs(val);
self
Expand Down
10 changes: 9 additions & 1 deletion src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use vector_lib::{
};

use self::parser::ParseError;
use super::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket};
use super::util::net::{
DisconnectMode, SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket,
};
use crate::{
SourceSender,
codecs::Decoder,
Expand Down Expand Up @@ -139,6 +141,10 @@ pub struct TcpConfig {
#[configurable(metadata(docs::type_unit = "connections"))]
connection_limit: Option<u32>,

#[configurable(derived)]
#[serde(default)]
disconnect_mode: DisconnectMode,

/// Whether or not to sanitize incoming statsd key names. When "true", keys are sanitized by:
/// - "/" is replaced with "-"
/// - All whitespace is replaced with "_"
Expand All @@ -163,6 +169,7 @@ impl TcpConfig {
shutdown_timeout_secs: default_shutdown_timeout_secs(),
receive_buffer_bytes: None,
connection_limit: None,
disconnect_mode: DisconnectMode::Drain,
sanitize: default_sanitize(),
convert_to: default_convert_to(),
}
Expand Down Expand Up @@ -222,6 +229,7 @@ impl SourceConfig for StatsdConfig {
tls_client_metadata_key,
config.receive_buffer_bytes,
None,
config.disconnect_mode,
cx,
false.into(),
config.connection_limit,
Expand Down
13 changes: 12 additions & 1 deletion src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use crate::{
},
net,
shutdown::ShutdownSignal,
sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket},
sources::util::net::{
DisconnectMode, SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket,
},
tcp::TcpKeepaliveConfig,
tls::{MaybeTlsSettings, TlsSourceConfig},
};
Expand Down Expand Up @@ -100,6 +102,10 @@ pub enum Mode {

/// The maximum number of TCP connections that are allowed at any given time.
connection_limit: Option<u32>,

#[configurable(derived)]
#[serde(default)]
disconnect_mode: DisconnectMode,
},

/// Listen on UDP.
Expand Down Expand Up @@ -155,6 +161,7 @@ impl Default for SyslogConfig {
tls: None,
receive_buffer_bytes: None,
connection_limit: None,
disconnect_mode: DisconnectMode::Drain,
},
host_key: None,
max_length: crate::serde::default_max_length(),
Expand Down Expand Up @@ -188,6 +195,7 @@ impl SourceConfig for SyslogConfig {
tls,
receive_buffer_bytes,
connection_limit,
disconnect_mode,
} => {
let source = SyslogTcpSource {
max_length: self.max_length,
Expand All @@ -209,6 +217,7 @@ impl SourceConfig for SyslogConfig {
tls_client_metadata_key,
receive_buffer_bytes,
None,
disconnect_mode,
cx,
false.into(),
connection_limit,
Expand Down Expand Up @@ -1146,6 +1155,7 @@ mod test {
tls: None,
receive_buffer_bytes: None,
connection_limit: None,
disconnect_mode: DisconnectMode::Drain,
});

let key = ComponentKey::from("in");
Expand Down Expand Up @@ -1357,6 +1367,7 @@ mod test {
tls: None,
receive_buffer_bytes: None,
connection_limit: None,
disconnect_mode: DisconnectMode::Drain,
});

let key = ComponentKey::from("in");
Expand Down
4 changes: 2 additions & 2 deletions src/sources/util/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use vector_lib::configurable::configurable_component;

#[cfg(feature = "sources-utils-net-tcp")]
pub use self::tcp::{
MAX_IN_FLIGHT_EVENTS_TARGET, TcpNullAcker, TcpSource, TcpSourceAck, TcpSourceAcker,
request_limiter::RequestLimiter, try_bind_tcp_listener,
DisconnectMode, MAX_IN_FLIGHT_EVENTS_TARGET, TcpNullAcker, TcpSource, TcpSourceAck,
TcpSourceAcker, request_limiter::RequestLimiter, try_bind_tcp_listener,
};
#[cfg(feature = "sources-utils-net-udp")]
pub use self::udp::try_bind_udp_socket;
Expand Down
Loading
Loading