feat: Implement stateful retry and resumable stream logic for TopicMessageQuery#2171
feat: Implement stateful retry and resumable stream logic for TopicMessageQuery#2171manishdait wants to merge 7 commits into
Conversation
Codecov Report❌ Patch coverage is @@ Coverage Diff @@
## main #2171 +/- ##
==========================================
+ Coverage 93.99% 95.03% +1.04%
==========================================
Files 163 163
Lines 10442 10504 +62
==========================================
+ Hits 9815 9983 +168
+ Misses 627 521 -106 🚀 New features to boost your workflow:
|
Up to standards ✅🟢 Issues
|
|
Hi @manishdait, This pull request has had no commit activity for 10 days. Are you still working on it?
If you're no longer working on this, please comment Reach out on discord or join our office hours if you need assistance. From the Python SDK Team |
|
Hi @manishdait, this is CronInactivityBot 👋 This pull request has had no new commits for 21 days, so I'm closing it and unassigning you from the linked issue to keep the backlog healthy. If you're no longer interested, no action is needed. Tip: You can comment If you'd like to continue working on this later, feel free to comment |
83d14a9 to
631d51e
Compare
WalkthroughTopicMessageQuery subscription now tracks state across retries, resumes from the last received message, handles chunked responses, applies gRPC-aware retry decisions with exponential backoff, and coordinates thread-safe call cancellation via SubscriptionHandle. ChangesTopic Message Query Subscription Reliability
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. 📋 Issue PlannerLet us write the prompt for your AI agent so you can ship faster (with fewer bugs). View plan for ticket: ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: ea309a58-41ae-4f9a-9ef9-8bb283f770f5
📒 Files selected for processing (4)
src/hiero_sdk_python/query/topic_message_query.pysrc/hiero_sdk_python/utils/subscription_handle.pytests/unit/subscription_handle_test.pytests/unit/topic_message_query_test.py
631d51e to
43f9b2c
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 1454aa79-c860-4bc3-8f79-1b3fde0bbdc6
📒 Files selected for processing (4)
src/hiero_sdk_python/query/topic_message_query.pysrc/hiero_sdk_python/utils/subscription_handle.pytests/unit/subscription_handle_test.pytests/unit/topic_message_query_test.py
| def _set_call(self, call: Any): | ||
| """Sets the active gRPC call so it can be cancelled.""" | ||
| with self._lock: | ||
| self._call = call | ||
|
|
||
| if self._cancelled.is_set(): | ||
| self._call.cancel() |
There was a problem hiding this comment.
Avoid calling cancel() while holding _lock.
Line 26 and Line 34 invoke cancel() inside the critical section. If that call blocks or re-enters related paths, this can stall or deadlock cancellation. Capture the call reference while locked, then cancel outside the lock. Also use is not None at Line 33.
Suggested fix
def _set_call(self, call: Any):
"""Sets the active gRPC call so it can be cancelled."""
+ call_to_cancel = None
with self._lock:
self._call = call
if self._cancelled.is_set():
- self._call.cancel()
+ call_to_cancel = call
+ if call_to_cancel is not None:
+ call_to_cancel.cancel()
def cancel(self):
"""Signals to cancel the subscription."""
+ call_to_cancel = None
with self._lock:
self._cancelled.set()
- if self._call:
- self._call.cancel()
+ if self._call is not None:
+ call_to_cancel = self._call
+ if call_to_cancel is not None:
+ call_to_cancel.cancel()Also applies to: 30-34
43f9b2c to
45c80fb
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (4)
src/hiero_sdk_python/query/topic_message_query.py (1)
61-62:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winType annotation mismatch:
_error_handlerdeclared asCallable[[], None]but used withExceptionargument.Line 62 declares
_error_handlerasCallable[[], None], but:
set_error_handler(line 85) expectsCallable[[Exception], None]_on_error(line 118) takeserr: Exception- The handler is invoked with
self._error_handler(e)at line 243This inconsistency means type checkers will report errors, and users following the type hint at line 62 would provide incompatible handlers.
🐛 Proposed fix
self._completion_handler: Callable[[], None] | None = self._on_complete - self._error_handler: Callable[[], None] | None = self._on_error + self._error_handler: Callable[[Exception], None] | None = self._on_errortests/unit/topic_message_query_test.py (3)
152-154:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winChunk ordering is not verified — test would pass even if chunks are assembled in wrong order.
Using
inonly checks presence, not position. If the implementation incorrectly orders chunks, this test won't detect it.🐛 Proposed fix to verify correct ordering
assert len(received_messages) == 1 - assert b"chunk-1" in received_messages[0].contents - assert b"chunk-2" in received_messages[0].contents + # Verify chunks are assembled in correct order + contents = received_messages[0].contents + chunk1_pos = contents.find(b"chunk-1") + chunk2_pos = contents.find(b"chunk-2") + assert chunk1_pos != -1, "chunk-1 not found in assembled message" + assert chunk2_pos != -1, "chunk-2 not found in assembled message" + assert chunk1_pos < chunk2_pos, "Chunks assembled in wrong order: chunk-1 should come before chunk-2"As per coding guidelines, tests should catch regressions in chunk ordering behavior.
167-178:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winMissing assertion that
on_erroris invoked after retries exhaust.When max_attempts are exhausted with retryable errors,
on_errorshould be called. The test creates anon_error=MagicMock()but never verifies it was invoked, so regressions in terminal error handling would go undetected.🐛 Proposed fix
def test_retry_logic_on_retryable_error(mock_client, error): """Test that the query retries on retryable errors but stops after max_attempts.""" query = TopicMessageQuery(topic_id="0.0.123").set_max_attempts(2).set_max_backoff(0.5) mock_client.mirror_stub.subscribeTopic.side_effect = [error, error] - handle = query.subscribe(mock_client, on_message=MagicMock(), on_error=MagicMock()) + on_error = MagicMock() + handle = query.subscribe(mock_client, on_message=MagicMock(), on_error=on_error) handle._thread.join(timeout=2.0) + assert not handle._thread.is_alive(), "Thread should have terminated after retries exhausted" assert mock_client.mirror_stub.subscribeTopic.call_count == 2 + on_error.assert_called_once_with(error)As per coding guidelines, "Tests must provide useful error messages when they fail for future debugging."
220-224:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winTiming-dependent sleep can cause flaky tests in CI.
time.sleep(0.2)followed byassert handle._thread.is_alive()is scheduler-dependent. In slow CI environments, the thread may not have started processing yet, or conversely, the sleep may be insufficient. Use an event to synchronize on first message receipt before cancellation.🐛 Proposed fix using event synchronization
+import threading + def test_subscription_cancellation(mock_client): """Test that cancelling a handle stops the subscription thread.""" query = TopicMessageQuery(topic_id="0.0.123") def infinite_stream(): while True: yield mirror_proto.ConsensusTopicResponse(message=b"ping") time.sleep(0.1) mock_call = MagicMock() mock_call.__iter__.return_value = infinite_stream() mock_client.mirror_stub.subscribeTopic.return_value = mock_call - on_message = MagicMock() + first_message_seen = threading.Event() + on_message = MagicMock(side_effect=lambda _: first_message_seen.set()) handle = query.subscribe(mock_client, on_message=on_message) - time.sleep(0.2) - assert handle._thread.is_alive() + assert first_message_seen.wait(timeout=2.0), "Expected at least one streamed message before cancellation" + assert handle._thread.is_alive(), "Subscription thread should be alive before cancel" handle.cancel()As per coding guidelines, "No timing-dependent or unseeded random assertions."
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: cc2d52be-f082-42a4-b517-4b8426de7c85
📒 Files selected for processing (4)
src/hiero_sdk_python/query/topic_message_query.pysrc/hiero_sdk_python/utils/subscription_handle.pytests/unit/subscription_handle_test.pytests/unit/topic_message_query_test.py
|
Hi, this is WorkflowBot.
|
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
Signed-off-by: Manish Dait <daitmanish88@gmail.com>
45c80fb to
bb49b14
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (4)
tests/unit/topic_message_query_test.py (4)
383-408:⚠️ Potential issue | 🟠 Major | ⚡ Quick winReplace timing-dependent sleep with event-based synchronization.
The
time.sleep(0.2)at line 400 followed byis_alive()assertion at line 401 is scheduler-dependent and can flap in CI under high load or slow schedulers. Use an event to deterministically wait for the first message delivery before asserting thread liveness and performing cancellation.⏱️ Proposed fix using threading.Event
+import threading import time from datetime import datetime, timezone @@ def test_subscription_cancellation(mock_client): """Test that cancelling a handle stops the subscription thread.""" query = TopicMessageQuery(topic_id="0.0.123") def infinite_stream(): while True: yield mirror_proto.ConsensusTopicResponse(message=b"ping") time.sleep(0.1) mock_call = MagicMock() mock_call.__iter__.return_value = infinite_stream() mock_client.mirror_stub.subscribeTopic.return_value = mock_call - on_message = MagicMock() + first_message_seen = threading.Event() + on_message = MagicMock(side_effect=lambda _: first_message_seen.set()) handle = query.subscribe(mock_client, on_message=on_message) - time.sleep(0.2) - assert handle._thread.is_alive() + assert first_message_seen.wait(timeout=1.0), "Expected at least one message before cancellation" + assert handle._thread.is_alive(), "Subscription thread should be alive before cancel()" handle.cancel() handle._thread.join(timeout=1.0) assert not handle._thread.is_alive() mock_call.cancel.assert_called()As per coding guidelines, "No timing-dependent or unseeded random assertions."
54-80:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winVerify fluent interface contract: setters must return
self.The test validates internal state but doesn't assert that each setter returns
self, which is required for method chaining. If a setter is accidentally changed to returnNone, chaining would break but this test would pass.🔗 Proposed fix
def test_topic_message_query_initialization(): """Test initializing the query with various parameter types and setters.""" start = datetime(2023, 1, 1, tzinfo=timezone.utc) def mock_complete(): pass def mock_error(e): pass - query = ( - TopicMessageQuery() - .set_topic_id("0.0.123") - .set_start_time(start) - .set_limit(5) - .set_chunking_enabled(True) - .set_completion_handler(mock_complete) - .set_error_handler(mock_error) - ) + query = TopicMessageQuery() + assert query.set_topic_id("0.0.123") is query, "set_topic_id should return self" + assert query.set_start_time(start) is query, "set_start_time should return self" + assert query.set_limit(5) is query, "set_limit should return self" + assert query.set_chunking_enabled(True) is query, "set_chunking_enabled should return self" + assert query.set_completion_handler(mock_complete) is query, "set_completion_handler should return self" + assert query.set_error_handler(mock_error) is query, "set_error_handler should return self" assert query._topic_id.topicNum == 123 assert query._start_time.seconds == int(start.timestamp()) assert query._limit == 5 assert query._chunking_enabled is True assert query._completion_handler == mock_complete assert query._error_handler == mock_errorAs per coding guidelines, "Assert fluent setters return
self".
304-306:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winChunk ordering is not verified — test passes even if chunks are assembled backwards.
The test uses
inat lines 305-306, which only checks presence. If the implementation concatenateschunk-2beforechunk-1, this assertion would still pass.🔍 Proposed fix to verify correct ordering
assert len(received_messages) == 1 - assert b"chunk-1" in received_messages[0].contents - assert b"chunk-2" in received_messages[0].contents + # Verify chunks are assembled in correct order + contents = received_messages[0].contents + chunk1_pos = contents.find(b"chunk-1") + chunk2_pos = contents.find(b"chunk-2") + assert chunk1_pos != -1, "chunk-1 not found in assembled message" + assert chunk2_pos != -1, "chunk-2 not found in assembled message" + assert chunk1_pos < chunk2_pos, "Chunks assembled in wrong order: chunk-1 must precede chunk-2"As per coding guidelines, tests must catch regressions in ordering behavior.
345-355:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winVerify terminal error callback and thread termination after retries exhaust.
The test passes
on_error=MagicMock()at line 350 but never asserts it's invoked when max_attempts are exhausted. The implementation should callon_errorwith the final error before stopping. Additionally,join(timeout=2.0)at line 352 should be followed by an assertion that the thread actually terminated—if it times out, the subscription count assertion at line 354 may pass spuriously.🧪 Proposed fix
def test_retry_logic_on_retryable_error(mock_client, error): """Test that the query retries on retryable errors but stops after max_attempts.""" query = TopicMessageQuery(topic_id="0.0.123").set_max_attempts(2).set_max_backoff(0.5) mock_client.mirror_stub.subscribeTopic.side_effect = [error, error] - handle = query.subscribe(mock_client, on_message=MagicMock(), on_error=MagicMock()) + on_error = MagicMock() + handle = query.subscribe(mock_client, on_message=MagicMock(), on_error=on_error) handle._thread.join(timeout=2.0) + assert not handle._thread.is_alive(), "Subscription thread should have terminated after retries exhausted" assert mock_client.mirror_stub.subscribeTopic.call_count == 2 + on_error.assert_called_once_with(error)As per coding guidelines, "Tests must provide useful error messages when they fail for future debugging" and should verify error handler contracts.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: e10049ac-fd87-4829-a382-583a10b742f7
📒 Files selected for processing (4)
src/hiero_sdk_python/query/topic_message_query.pysrc/hiero_sdk_python/utils/subscription_handle.pytests/unit/subscription_handle_test.pytests/unit/topic_message_query_test.py
| if self._limit > 0: | ||
| request.limit = max(0, self._limit - state.count) |
There was a problem hiding this comment.
Count emitted TopicMessages before decrementing limit.
- File and line:
src/hiero_sdk_python/query/topic_message_query.py, Lines 176-177 and 185-207 - Proto field:
ConsensusTopicQuery.limit(#4). (raw.githubusercontent.com) - Issue type: Wrong default
- Description:
state.countis subtracted from the next request'slimit, but it is incremented for everyConsensusTopicResponse, including intermediate chunks. On retries with chunking enabled, that makes the remaining limit track wire chunks instead of deliveredTopicMessages. Once the subtraction reaches0, the retry request serializeslimit = 0, and the schema defines zero/unset as "receive indefinitely", so a resumed subscription can either stop before enough logical messages are emitted or run past the caller's cap. (raw.githubusercontent.com) - Suggested fix: Increment
state.countonly afteron_message()runs for a complete logical message, and stop retrying oncestate.count >= self._limitinstead of issuing another request withlimit = 0.
🐛 Proposed fix
def _build_query_request(self, state: SubscriptionState) -> mirror_proto.ConsensusTopicQuery:
"""Build the request object based on current subscription state."""
request = mirror_proto.ConsensusTopicQuery(topicID=self._topic_id)
if self._end_time is not None:
request.consensusEndTime.CopyFrom(self._end_time)
if state.last_message is not None:
last_message_time = state.last_message.consensusTimestamp
@@
request.consensusStartTime.seconds = seconds
request.consensusStartTime.nanos = nanos
if self._limit > 0:
- request.limit = max(0, self._limit - state.count)
+ request.limit = self._limit - state.count
else:
if self._start_time is not None:
request.consensusStartTime.CopyFrom(self._start_time)
request.limit = self._limit
return request
def _handle_response(self, response, state: SubscriptionState, on_message: Callable[[TopicMessage], None]) -> None:
"""Handles single or chunked messages."""
- state.count += 1
state.last_message = response
if not self._chunking_enabled or not response.HasField("chunkInfo") or response.chunkInfo.total <= 1:
message = TopicMessage.of_single(response)
on_message(message)
+ state.count += 1
return
@@
if len(chunks) == response.chunkInfo.total:
del state.pending_messages[initial_tx_id]
message = TopicMessage.of_many(chunks)
on_message(message)
+ state.count += 1
def subscribe(
@@
def run_stream():
while state.attempt < self._max_attempts and not subscription_handle.is_cancelled():
+ if self._limit > 0 and state.count >= self._limit:
+ if self._completion_handler:
+ self._completion_handler()
+ return
+
state.attempt += 1
request = self._build_query_request(state)As per coding guidelines, "Compare the SDK class against the proto schema" and "Ensure Query code remains: Backward-compatible."
Also applies to: 185-207
| received_messages = [] | ||
| handle = query.subscribe(mock_client, on_message=lambda m: received_messages.append(m)) | ||
|
|
||
| handle._thread.join(timeout=1.0) |
There was a problem hiding this comment.
Add thread termination assertions after join to detect timeout failures early.
If join(timeout=1.0) times out and the thread remains alive, subsequent assertions will pass or fail unpredictably without a clear diagnostic message. Explicitly assert that the thread terminated.
🧵 Proposed fix
# In test_chunk_message_handling:
handle._thread.join(timeout=1.0)
+ assert not handle._thread.is_alive(), "Subscription thread should have terminated after consuming all chunks"
assert len(received_messages) == 1Apply the same pattern in test_chunk_message_handling_when_chunking_is_disabled at line 328.
As per coding guidelines, "Tests must provide useful error messages when they fail for future debugging."
Also applies to: 328-328
Description:
This PR introduces fixes for flaky
TopicMessageQuerye2e tests by implementing retry logic and error handling.Changes Made:
SubscriptionStateto track last_message and count. on retry, the query resumes fromlast_message.consensusTimestamp + 1nsto prevent message loss or duplication._should_retry logicto identify retryable gRPC errors (e.g.,UNAVAILABLE,RESOURCE_EXHAUSTED) and specificRST_STREAMinternal errors.SubscriptionHandleto safely manage gRPC call cancellation across threads.Related issue(s):
Fixes #1796
Notes for reviewer:
Checklist