Fix pubsub reconnect failure handling#1243
Conversation
|
Warning Review limit reached
More reviews will be available in 53 minutes and 55 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughThis PR adds connection tracking to SubMuxClient using an Arc-wrapped HashSet to identify which inner pubsub clients are currently connected. The connected set is initialized from all configured clients, updated during reconnect cycles (removed on disconnect, reinserted on successful reconnect), and passed through the reconnector spawner. The reconnect_client_with_backoff method was refactored to call reconnect_client returning Result<()> instead of bool, enabling cleaner error propagation and metric updates. Subscription routing in subscribe, subscribe_program, and unsubscribe now dispatches tasks only to connected clients via a snapshot filter instead of all configured clients. A new test verifies subscriptions skip disconnected clients during reconnection. A small accessor method was also added to expose the subscribe_attempts counter on ChainPubsubClientMock. Suggested reviewers
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
magicblock-chainlink/src/submux/mod.rs (1)
1019-1025:⚠️ Potential issue | 🟠 Major | ⚡ Quick winUse one connected-client snapshot for both quorum and dispatch.
required_*_subscription_confirmations()is computed beforeconnected_clients_snapshot(). If a disconnect lands between those reads,process()can require confirmations from clients that are no longer in the routed set, causing transient subscribe failures during reconnect churn.Compute quorum from the same snapshot
+ fn required_account_subscription_confirmations_for_clients( + clients: &[Arc<T>], + ) -> usize { + let n = clients.iter().filter(|c| c.subs_immediately()).count(); + cmp::max(1, (n * 2) / 3) + } + + fn required_program_subscription_confirmations_for_clients( + clients: &[Arc<T>], + ) -> usize { + let n = clients.iter().filter(|c| c.subs_immediately()).count(); + cmp::max(1, n / 3) + } + async fn subscribe( &self, pubkey: Pubkey, retries: Option<usize>, ) -> RemoteAccountProviderResult<()> { + let connected_clients = self.connected_clients_snapshot(); AccountSubscriptionTask::Subscribe( pubkey, retries, - self.required_account_subscription_confirmations(), + Self::required_account_subscription_confirmations_for_clients( + &connected_clients, + ), ) - .process(self.connected_clients_snapshot()) + .process(connected_clients) .await } async fn subscribe_program( &self, program_id: Pubkey, ) -> RemoteAccountProviderResult<()> { + let connected_clients = self.connected_clients_snapshot(); AccountSubscriptionTask::SubscribeProgram( program_id, - self.required_program_subscription_confirmations(), + Self::required_program_subscription_confirmations_for_clients( + &connected_clients, + ), ) - .process(self.connected_clients_snapshot()) + .process(connected_clients) .await }Also applies to: 1043-1048
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@magicblock-chainlink/src/submux/mod.rs` around lines 1019 - 1025, AccountSubscriptionTask::Subscribe currently calls required_account_subscription_confirmations() before reading connected_clients_snapshot(), so a disconnect between those calls can make process() expect confirmations from clients no longer in the snapshot; fix by calling connected_clients_snapshot() once into a local variable, compute required_account_subscription_confirmations() (and any other required_*_subscription_confirmations()) using that snapshot, and then call .process() with the same snapshot variable; apply the same change to the other similar block (the subscribe call around the 1043-1048 equivalent) so quorum and dispatch use the identical connected_clients_snapshot().
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@magicblock-chainlink/src/submux/mod.rs`:
- Around line 325-330: The code is calling .expect() on
connected_client_ids.lock() which will panic on a poisoned Mutex; replace these
panicking paths with a non-panicking lock recovery pattern (match on
connected_client_ids.lock() and use Err(poison).into_inner() to recover) or a
small helper like lock_or_recover(&connected_client_ids) that returns a
MutexGuard even when poisoned; apply this change to the occurrences using
connected_client_ids.lock() (e.g., the block that computes was_connected using
Self::client_key(&client) and the other listed spots) so the mux handles
poisoned locks gracefully instead of crashing.
- Around line 2001-2006: The test currently uses sleep_ms(100) after calling
client1.disable_reconnect(), client1.simulate_disconnect(), and
aborts[0].send(), which makes the assertion on
mux.connected_clients.load(Ordering::SeqCst) timing-sensitive; replace the fixed
sleep with a bounded wait that polls the shared state until it becomes 1 (or
times out). Implement a small loop using tokio::time::timeout combined with a
short delay (e.g., tokio::time::sleep for a few ms or tokio::task::yield_now)
that repeatedly checks mux.connected_clients.load(Ordering::SeqCst) and only
proceeds when it observes 1 (failing the test if the timeout elapses), keeping
the calls to client1.disable_reconnect(), client1.simulate_disconnect(), and
aborts[0].send() unchanged.
---
Outside diff comments:
In `@magicblock-chainlink/src/submux/mod.rs`:
- Around line 1019-1025: AccountSubscriptionTask::Subscribe currently calls
required_account_subscription_confirmations() before reading
connected_clients_snapshot(), so a disconnect between those calls can make
process() expect confirmations from clients no longer in the snapshot; fix by
calling connected_clients_snapshot() once into a local variable, compute
required_account_subscription_confirmations() (and any other
required_*_subscription_confirmations()) using that snapshot, and then call
.process() with the same snapshot variable; apply the same change to the other
similar block (the subscribe call around the 1043-1048 equivalent) so quorum and
dispatch use the identical connected_clients_snapshot().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 1e7f2940-8dd2-40fb-b44d-6217e9dc3898
📒 Files selected for processing (2)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rsmagicblock-chainlink/src/submux/mod.rs
thlorenz
left a comment
There was a problem hiding this comment.
LGTM aside from how the id is derived.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3d7a6cf489
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 82899cadaf
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| Self::connected_client_ids_lock(&self.connected_client_ids) | ||
| .insert(Self::client_key(&client)); |
There was a problem hiding this comment.
Reconcile deferred clients before marking them connected
When a deferred pubsub endpoint is attached, add_client snapshots program_subs before replaying them, but the client is excluded from connected_clients_snapshot() until this insert. RemoteAccountProvider::subscribe_program does not take subscription_transition_lock, so a program subscription added while add_client is between its initial snapshot and this insertion is recorded in program_subs and sent only to the already-connected clients; there is no second replay in add_client, leaving the newly attached client unsubscribed for that program until a future reconnect.
Useful? React with 👍 / 👎.
Co-authored-by: Thorsten Lorenz <thlorenz@gmx.de>
Summary
Resultso WARN logs include the concrete reconnect or resubscribe failure