Skip to content

Commit 2ad6852

Browse files
committed
Fix COPY OUT race conditions and state corruption (Python 3.9.9 specific)
This commit addresses critical concurrency issues in the `COPY OUT` protocol implementation. These issues were specifically observed in Python 3.9.9 environments (likely due to differences in asyncio event loop scheduling compared to newer versions like 3.13), leading to deadlocks and data corruption. Changes: 1. Fix Data Loss (Backpressure Handling): - Issue: In Python 3.9, `_dispatch_result` could be triggered multiple times while the sink was paused, overwriting `_pending_result`. - Fix: Implemented data merging logic. New incoming data is now appended to the existing pending buffer instead of replacing it. 2. Fix State Pollution on Cancel: - Issue: Cancelling a `COPY` task left residual data in `_pending_result`. Subsequent queries (e.g., `SELECT 1`) would incorrectly consume this stale data, causing `AttributeError: 'tuple' object has no attribute '_init_types'`. - Fix: Added a `try...finally` block in `copy_out` to unconditionally clear `_pending_result` and reset protocol state upon exit. 3. Fix Deadlocks: - Issue: The connection could get stuck in a paused state if execution was interrupted or if `resume_reading` was missed. - Fix: Ensured `self.resume_reading()` is called immediately after consuming buffered data in `_new_waiter` and in the cleanup phase. 4. Fix Logic Errors: - corrected `_new_waiter` to prevent assigning completed Futures to `self.waiter`, avoiding `InternalClientError`. Fixes #8
1 parent 161e685 commit 2ad6852

File tree

2 files changed

+40
-3
lines changed

2 files changed

+40
-3
lines changed

async_gaussdb/protocol/protocol.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ cdef class BaseProtocol(CoreProtocol):
5151

5252
bint _is_ssl
5353

54+
object _pending_result
55+
5456
PreparedStatementState statement
5557

5658
cdef get_connection(self)

async_gaussdb/protocol/protocol.pyx

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ cdef class BaseProtocol(CoreProtocol):
102102

103103
self._is_ssl = False
104104

105+
self._pending_result = None
106+
105107
try:
106108
self.create_future = loop.create_future
107109
except AttributeError:
@@ -414,7 +416,10 @@ cdef class BaseProtocol(CoreProtocol):
414416
self._request_cancel()
415417
# Make asyncio shut up about unretrieved
416418
# QueryCanceledError
417-
waiter.add_done_callback(lambda f: f.exception())
419+
if waiter and not waiter.done():
420+
waiter.cancel()
421+
elif waiter and waiter.done() and not waiter.cancelled():
422+
waiter.exception()
418423
raise
419424

420425
# done will be True upon receipt of CopyDone.
@@ -424,6 +429,7 @@ cdef class BaseProtocol(CoreProtocol):
424429
waiter = self._new_waiter(timer.get_remaining_budget())
425430

426431
finally:
432+
self._pending_result = None
427433
self.resume_reading()
428434

429435
return status_msg
@@ -776,6 +782,14 @@ cdef class BaseProtocol(CoreProtocol):
776782
self.abort()
777783

778784
cdef _new_waiter(self, timeout):
785+
if self._pending_result is not None:
786+
res = self._pending_result
787+
self._pending_result = None
788+
self.resume_reading()
789+
waiter = self.loop.create_future()
790+
waiter.set_result(res)
791+
return waiter
792+
779793
if self.waiter is not None:
780794
raise apg_exc.InterfaceError(
781795
'cannot perform operation: another operation is in progress')
@@ -848,10 +862,31 @@ cdef class BaseProtocol(CoreProtocol):
848862
waiter = self.waiter
849863
self.waiter = None
850864

851-
if PG_DEBUG:
852-
if waiter is None:
865+
if waiter is None:
866+
if PG_DEBUG:
853867
raise apg_exc.InternalClientError('_on_result: waiter is None')
868+
869+
if self.state == PROTOCOL_COPY_OUT_DATA or \
870+
self.state == PROTOCOL_COPY_OUT_DONE:
871+
872+
copy_done = self.state == PROTOCOL_COPY_OUT_DONE
873+
if copy_done:
874+
status_msg = self.result_status_msg.decode(self.encoding)
875+
else:
876+
status_msg = None
877+
878+
self.pause_reading()
879+
if self._pending_result is not None:
880+
old_data, old_done, old_status = self._pending_result
881+
current_data = self.result if self.result is not None else b''
882+
merged_data = (old_data if old_data is not None else b'') + current_data
883+
self._pending_result = (merged_data, copy_done, status_msg)
884+
else:
885+
self._pending_result = (self.result, copy_done, status_msg)
854886

887+
return
888+
else:
889+
return
855890
if waiter.cancelled():
856891
return
857892

0 commit comments

Comments
 (0)