Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1087,9 +1087,15 @@ def handle_pushed(self, response):

def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=None):
if self.is_defunct:
raise ConnectionShutdown("Connection to %s is defunct" % self.endpoint)
msg = "Connection to %s is defunct" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
raise ConnectionShutdown(msg)
elif self.is_closed:
raise ConnectionShutdown("Connection to %s is closed" % self.endpoint)
msg = "Connection to %s is closed" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
raise ConnectionShutdown(msg)
elif not self._socket_writable:
raise ConnectionBusy("Connection %s is overloaded" % self.endpoint)

Expand Down Expand Up @@ -1120,7 +1126,10 @@ def wait_for_responses(self, *msgs, **kwargs):
failed, the corresponding Exception will be raised.
"""
if self.is_closed or self.is_defunct:
raise ConnectionShutdown("Connection %s is already closed" % (self, ))
msg = "Connection %s is already closed" % (self,)
if self.last_error:
msg += ": %s" % (self.last_error,)
raise ConnectionShutdown(msg)
timeout = kwargs.get('timeout')
fail_on_error = kwargs.get('fail_on_error', True)
waiter = ResponseWaiter(self, len(msgs), fail_on_error)
Expand Down
6 changes: 4 additions & 2 deletions cassandra/io/asyncioreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,10 @@ async def _close(self):
log.debug("Closed socket to %s" % (self.endpoint,))

if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
msg = "Connection to %s was closed" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
self.error_all_requests(ConnectionShutdown(msg))
# don't leave in-progress operations hanging
self.connected_event.set()

Expand Down
8 changes: 5 additions & 3 deletions cassandra/io/asyncorereactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,14 @@ def close(self):
log.debug("Closed socket to %s", self.endpoint)

if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
msg = "Connection to %s was closed" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
self.error_all_requests(ConnectionShutdown(msg))

#This happens when the connection is shutdown while waiting for the ReadyMessage
if not self.connected_event.is_set():
self.last_error = ConnectionShutdown("Connection to %s was closed" % self.endpoint)
self.last_error = ConnectionShutdown(msg)

# don't leave in-progress operations hanging
self.connected_event.set()
Expand Down
6 changes: 4 additions & 2 deletions cassandra/io/eventletreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,10 @@ def close(self):
log.debug("Closed socket to %s" % (self.endpoint,))

if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
msg = "Connection to %s was closed" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
self.error_all_requests(ConnectionShutdown(msg))
# don't leave in-progress operations hanging
self.connected_event.set()

Expand Down
6 changes: 4 additions & 2 deletions cassandra/io/geventreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ def close(self):
log.debug("Closed socket to %s" % (self.endpoint,))

if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
msg = "Connection to %s was closed" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
self.error_all_requests(ConnectionShutdown(msg))
# don't leave in-progress operations hanging
self.connected_event.set()

Expand Down
6 changes: 4 additions & 2 deletions cassandra/io/libevreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,10 @@ def close(self):

# don't leave in-progress operations hanging
if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
msg = "Connection to %s was closed" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
self.error_all_requests(ConnectionShutdown(msg))
self.connected_event.set()

def handle_write(self, watcher, revents, errno=None):
Expand Down
6 changes: 4 additions & 2 deletions cassandra/io/twistedreactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,10 @@ def close(self):
log.debug("Closed socket to %s", self.endpoint)

if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
msg = "Connection to %s was closed" % self.endpoint
if self.last_error:
msg += ": %s" % (self.last_error,)
self.error_all_requests(ConnectionShutdown(msg))
# don't leave in-progress operations hanging
self.connected_event.set()

Expand Down
70 changes: 69 additions & 1 deletion tests/unit/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from cassandra.cluster import Cluster
from cassandra.connection import (Connection, HEADER_DIRECTION_TO_CLIENT, ProtocolError,
locally_supported_compressions, ConnectionHeartbeat, _Frame, Timer, TimerManager,
ConnectionException, DefaultEndPoint, ShardAwarePortGenerator)
ConnectionException, ConnectionShutdown, DefaultEndPoint, ShardAwarePortGenerator)
from cassandra.marshal import uint8_pack, uint32_pack, int32_pack
from cassandra.protocol import (write_stringmultimap, write_int, write_string,
SupportedMessage, ProtocolHandler)
Expand Down Expand Up @@ -260,6 +260,74 @@ def test_set_connection_class(self):
cluster = Cluster(connection_class='test')
assert 'test' == cluster.connection_class

def test_connection_shutdown_includes_last_error(self):
"""
Test that ConnectionShutdown exceptions include the last_error when available.
This helps debug issues like "Bad file descriptor" by showing the original cause.
See https://github.com/scylladb/python-driver/issues/614
"""
c = self.make_connection()
c.lock = Lock()
c._requests = {}

# Simulate the connection becoming defunct with a specific error
original_error = OSError(9, "Bad file descriptor")
c.is_defunct = True
c.last_error = original_error

# send_msg should raise ConnectionShutdown that includes the last_error
with pytest.raises(ConnectionShutdown) as exc_info:
c.send_msg(Mock(), 1, Mock())

# Verify the error message includes the original error
error_message = str(exc_info.value)
assert "is defunct" in error_message
assert "Bad file descriptor" in error_message

def test_connection_shutdown_closed_includes_last_error(self):
"""
Test that ConnectionShutdown exceptions for closed connections include last_error.
"""
c = self.make_connection()
c.lock = Lock()
c._requests = {}

# Simulate the connection being closed with a specific error
original_error = OSError(9, "Bad file descriptor")
c.is_closed = True
c.last_error = original_error

# send_msg should raise ConnectionShutdown that includes the last_error
with pytest.raises(ConnectionShutdown) as exc_info:
c.send_msg(Mock(), 1, Mock())

# Verify the error message includes the original error
error_message = str(exc_info.value)
assert "is closed" in error_message
assert "Bad file descriptor" in error_message

def test_wait_for_responses_shutdown_includes_last_error(self):
"""
Test that wait_for_responses raises ConnectionShutdown with last_error.
"""
c = self.make_connection()
c.lock = Lock()
c._requests = {}

# Simulate the connection being defunct with a specific error
original_error = OSError(9, "Bad file descriptor")
c.is_defunct = True
c.last_error = original_error

# wait_for_responses should raise ConnectionShutdown that includes the last_error
with pytest.raises(ConnectionShutdown) as exc_info:
c.wait_for_responses(Mock())

# Verify the error message includes the original error
error_message = str(exc_info.value)
assert "already closed" in error_message
assert "Bad file descriptor" in error_message


@patch('cassandra.connection.ConnectionHeartbeat._raise_if_stopped')
class ConnectionHeartbeatTest(unittest.TestCase):
Expand Down
Loading