Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions lib/mongo/cluster.rb
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,7 @@ def self.create(client, monitoring: nil)
# @api private
attr_reader :seeds

# @private
#
# @since 2.5.1
# @api private
attr_reader :session_pool

def_delegators :topology, :replica_set?, :replica_set_name, :sharded?,
Expand Down Expand Up @@ -983,6 +981,16 @@ def validate_session_support!(timeout: nil)
end
end

# Forces the cluster's periodic executor to run immediately. If the cluster
# has no periodic executor, this method does nothing.
#
# @api private
def trigger_periodic_executor!
return unless @periodic_executor

@periodic_executor.execute
end

private

# @api private
Expand Down
11 changes: 9 additions & 2 deletions lib/mongo/cluster/reapers/cursor_reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,15 @@ def kill_cursors
connection_global_id: kill_spec.connection_global_id,
}
if connection = kill_spec.connection
op.execute_with_connection(connection, context: Operation::Context.new(options: options))
connection.connection_pool.check_in(connection)
begin
op.execute_with_connection(connection, context: Operation::Context.new(options: options))
rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished
# Connection is dead. do_check_in will detect connection.error?
# and disconnect it; the pool slot becomes free for a new connection.
ensure
connection.unpin(:cursor)
connection.connection_pool.check_in(connection) unless connection.pinned?
end
else
op.execute(server, context: Operation::Context.new(options: options))
end
Expand Down
21 changes: 13 additions & 8 deletions lib/mongo/collection/view/change_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,19 @@ def create_cursor!(timeout_ms = nil)
# In load balanced topology, manually check out a connection
# so it remains checked out and pinned to the cursor.
connection = server.pool.check_out(context: context)
result = send_initial_query(connection, context)

start_at_operation_time = if doc = result.replies.first && result.replies.first.documents.first
doc['operationTime']
else
nil
end
result
begin
result = send_initial_query(connection, context)

start_at_operation_time = if doc = result.replies.first && result.replies.first.documents.first
doc['operationTime']
else
nil
end
result
rescue StandardError
server.pool.check_in(connection)
raise
end
else
server.with_connection do |connection|
result = send_initial_query(connection, context)
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo/cursor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ def get_more
with_overload_retry(context: possibly_refreshed_context) do
process(execute_operation(get_more_operation))
end
rescue Error::SocketError, Error::SocketTimeoutError
rescue Error::SocketError, Error::SocketTimeoutError, Error::ConnectionPerished
@get_more_network_error = true
raise
rescue Error::OperationFailure => e
Expand Down
16 changes: 16 additions & 0 deletions lib/mongo/server/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,22 @@ def self.finalize(available_connections, pending_connections, _populator)
end
end

# Returns statistics about the internal state of the connection pool,
# primarily for testing and debugging purposes.
#
# @return [ Hash ] The state of the connection pool
#
# @api private
def state
@lock.synchronize do
{ available_connections: @available_connections.length,
checked_out_connections: @checked_out_connections.length,
pending_connections: @pending_connections.length,
interrupt_connections: @interrupt_connections.length,
connection_requests: @connection_requests }
end
end

private

# Returns the next available connection, optionally with given
Expand Down
17 changes: 17 additions & 0 deletions spec/integration/cursor_pinning_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,22 @@
enum.each { |it| it.nil? }
end
end

context 'change stream' do
it 'returns the connection to the pool when send_initial_query raises' do
client = authorized_client.with(max_pool_size: 1, wait_queue_timeout: 1)
collection = client['change_stream_conn_leak']

allow_any_instance_of(Mongo::Collection::View::ChangeStream)
.to receive(:send_initial_query)
.and_raise(RuntimeError, 'simulated failure')

expect { collection.watch }.to raise_error(RuntimeError)

# Without the fix the connection is not checked back in, the pool is
# exhausted, and this raises ConnectionCheckOutTimeout after 1 second.
expect { collection.find.first }.not_to raise_error
end
end
end
end
70 changes: 70 additions & 0 deletions spec/integration/cursor_reaping_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,74 @@ def abandon_cursors
expect(event.reply['cursorsUnknown']).to be_empty
end
end

context 'load-balanced topology' do
require_topology :load_balanced

let(:pool_size) { 3 }

let(:client) do
authorized_client.with(max_pool_size: pool_size, wait_queue_timeout: 2)
end

let(:collection) { client['cursor_reaper_lb_leak'] }

before do
authorized_client['cursor_reaper_lb_leak'].drop
200.times { |i| authorized_client['cursor_reaper_lb_leak'].insert_one(name: "doc_#{i}") }
end

def kill_checked_out_sockets(pool)
pool.instance_variable_get(:@checked_out_connections).each do |conn|
sock = conn.instance_variable_get(:@socket)
raw = sock&.instance_variable_get(:@tcp_socket) ||
sock&.instance_variable_get(:@socket)
raw&.close unless raw.nil? || raw.closed?
rescue StandardError
end
end

it 'releases pinned connections when the socket dies before killCursors runs' do
server = client.cluster.next_primary
pool = server.pool

# Open pool_size cursors with cursor_id != 0. batch_size: 1 ensures the
# server never exhausts the cursor in the first batch, so each cursor pins
# its connection until the reaper runs.
pool_size.times do
scope = collection.find({}, batch_size: 1)
scope.each.first
end

# All connections should be pinned (checked out) and none available.
expect(pool.available_count).to eq(0)
expect(pool.state[:checked_out_connections]).to eq(pool_size)

# Simulate a network failure by force-closing the underlying TCP sockets
# while the connections are still pinned to their open cursors.
kill_checked_out_sockets(pool)

# The block-local `scope` variables are now eligible for GC. Force
# collection so finalizers run and KillSpecs are enqueued.
GC.start
sleep 1

# Force the cursor reaper (via the periodic executor) to process the
# queued KillSpecs. Without the fix, execute_with_connection raises
# SocketError on each dead connection and check_in is never called;
# the rescue here prevents that exception from failing the test for the
# wrong reason.
begin
client.cluster.trigger_periodic_executor!
rescue StandardError
nil
end

# Every pinned connection should have been checked back into the pool,
# even though the killCursors command failed on the dead socket. Without
# the fix, @checked_out_connections still holds all pool_size connections
# and this expectation fails.
expect(pool.state[:checked_out_connections]).to eq(0)
end
end
end
27 changes: 27 additions & 0 deletions spec/mongo/cursor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,33 @@
end
end

context 'when a getMore raises ConnectionPerished' do
let(:op) { double('operation') }

before do
allow(cursor).to receive(:get_more_operation).and_return(op)
if SpecConfig.instance.connect_options[:connect] == :load_balanced
allow(op).to receive(:execute_with_connection).and_raise(Mongo::Error::ConnectionPerished)
else
allow(op).to receive(:execute).and_raise(Mongo::Error::ConnectionPerished)
end
begin
cursor.to_a
rescue Mongo::Error::ConnectionPerished
nil
end
end

it 'sets @get_more_network_error' do
expect(cursor.instance_variable_get(:@get_more_network_error)).to be true
end

it 'does not attempt killCursors on close' do
expect(Mongo::Operation::KillCursors).not_to receive(:new)
cursor.close
end
end

context 'when no errors occur' do
it 'returns the correct amount' do
expect(cursor.to_a.size).to eq(102)
Expand Down
Loading