Skip to content

Commit c8b647a

Browse files
author
Ian Driver
committed
Add send_keepalive_packet option to out_forward plugin
This adds the send_keepalive_packet configuration parameter to enable TCP keepalive (SO_KEEPALIVE) on forward output connections, similar to the implementation in in_forward (PR #2352). The existing 'keepalive' parameter only enables connection pooling, not TCP-level keepalive probes. Signed-off-by: Ian Driver <idriver@williamhill.us>
1 parent 1327bed commit c8b647a

2 files changed

Lines changed: 35 additions & 3 deletions

File tree

lib/fluent/plugin/out_forward.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ class ForwardOutput < Output
119119
config_param :keepalive, :bool, default: false
120120
desc "Expired time of keepalive. Default value is nil, which means to keep connection as long as possible"
121121
config_param :keepalive_timeout, :time, default: nil
122+
desc 'Check the remote connection is still available by sending a keepalive packet if this value is true.'
123+
config_param :send_keepalive_packet, :bool, default: false
122124

123125
config_section :security, required: false, multi: false do
124126
desc 'The hostname'
@@ -270,6 +272,10 @@ def configure(conf)
270272
log.warn('The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive true` to your conf.')
271273
end
272274

275+
if @send_keepalive_packet && !@keepalive
276+
raise Fluent::ConfigError, "'send_keepalive_packet' is enabled but 'keepalive' is not. Enable 'keepalive' to use TCP keepalive."
277+
end
278+
273279
raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
274280

275281
if @compress == :zstd
@@ -414,6 +420,7 @@ def create_transfer_socket(host, port, hostname, &block)
414420
send_timeout: @send_timeout,
415421
recv_timeout: @ack_response_timeout,
416422
connect_timeout: @connect_timeout,
423+
send_keepalive_packet: @send_keepalive_packet,
417424
&block
418425
)
419426
when :tcp
@@ -423,6 +430,7 @@ def create_transfer_socket(host, port, hostname, &block)
423430
send_timeout: @send_timeout,
424431
recv_timeout: @ack_response_timeout,
425432
connect_timeout: @connect_timeout,
433+
send_keepalive_packet: @send_keepalive_packet,
426434
&block
427435
)
428436
else

test/plugin/test_out_forward.rb

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,27 @@ def try_write(chunk)
153153
end
154154
end
155155

156+
test 'send_keepalive_packet is disabled by default' do
157+
@d = d = create_driver(config)
158+
assert_false d.instance.send_keepalive_packet
159+
end
160+
161+
test 'send_keepalive_packet can be enabled' do
162+
@d = d = create_driver(config + %[
163+
keepalive true
164+
send_keepalive_packet true
165+
])
166+
assert_true d.instance.send_keepalive_packet
167+
end
168+
169+
test 'send_keepalive_packet without keepalive raises error' do
170+
assert_raise(Fluent::ConfigError.new("'send_keepalive_packet' is enabled but 'keepalive' is not. Enable 'keepalive' to use TCP keepalive.")) do
171+
create_driver(config + %[
172+
send_keepalive_packet true
173+
])
174+
end
175+
end
176+
156177
test 'configure with ignore_network_errors_at_startup' do
157178
normal_conf = config_element('match', '**', {}, [
158179
config_element('server', '', {'name' => 'test', 'host' => 'unexisting.yaaaaaaaaaaaaaay.host.example.com'})
@@ -1303,7 +1324,8 @@ def plugin_id_for_test?
13031324
linger_timeout: anything,
13041325
send_timeout: anything,
13051326
recv_timeout: anything,
1306-
connect_timeout: anything
1327+
connect_timeout: anything,
1328+
send_keepalive_packet: anything
13071329
) { |sock| mock(sock).close.once; sock }.twice
13081330

13091331
target_input_driver.run(timeout: 15) do
@@ -1343,7 +1365,8 @@ def plugin_id_for_test?
13431365
linger_timeout: anything,
13441366
send_timeout: anything,
13451367
recv_timeout: anything,
1346-
connect_timeout: anything
1368+
connect_timeout: anything,
1369+
send_keepalive_packet: anything
13471370
) { |sock| mock(sock).close.once; sock }.once
13481371

13491372
target_input_driver.run(timeout: 15) do
@@ -1380,7 +1403,8 @@ def plugin_id_for_test?
13801403
linger_timeout: anything,
13811404
send_timeout: anything,
13821405
recv_timeout: anything,
1383-
connect_timeout: anything) { |sock|
1406+
connect_timeout: anything,
1407+
send_keepalive_packet: anything) { |sock|
13841408
mock(sock).close.once; sock
13851409
}.twice
13861410

0 commit comments

Comments
 (0)