Skip to content

Fix pubsub reconnect failure handling#1243

Merged
GabrielePicco merged 9 commits into
masterfrom
fix/pubsub-reconnect-failures
May 27, 2026
Merged

Fix pubsub reconnect failure handling#1243
GabrielePicco merged 9 commits into
masterfrom
fix/pubsub-reconnect-failures

Conversation

@GabrielePicco
Copy link
Copy Markdown
Collaborator

@GabrielePicco GabrielePicco commented May 25, 2026

Summary

  • make pubsub reconnect return Result so WARN logs include the concrete reconnect or resubscribe failure
  • track connected SubMux clients and send new subscribe/unsubscribe operations only to connected clients while a failed client is reconnecting
  • add regression coverage for skipping a disconnected client during reconnect

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 25, 2026

Warning

Review limit reached

@GabrielePicco, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 53 minutes and 55 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: e0dd8c8e-c9a0-4cf2-bf20-b50b52456087

📥 Commits

Reviewing files that changed from the base of the PR and between ef4fae3 and 82899ca.

📒 Files selected for processing (2)
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs
  • magicblock-chainlink/src/submux/mod.rs
📝 Walkthrough

Walkthrough

This PR adds connection tracking to SubMuxClient using an Arc-wrapped HashSet to identify which inner pubsub clients are currently connected. The connected set is initialized from all configured clients, updated during reconnect cycles (removed on disconnect, reinserted on successful reconnect), and passed through the reconnector spawner. The reconnect_client_with_backoff method was refactored to call reconnect_client returning Result<()> instead of bool, enabling cleaner error propagation and metric updates. Subscription routing in subscribe, subscribe_program, and unsubscribe now dispatches tasks only to connected clients via a snapshot filter instead of all configured clients. A new test verifies subscriptions skip disconnected clients during reconnection. A small accessor method was also added to expose the subscribe_attempts counter on ChainPubsubClientMock.

Suggested reviewers

  • thlorenz
  • bmuddha
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/pubsub-reconnect-failures

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@GabrielePicco GabrielePicco marked this pull request as ready for review May 25, 2026 15:30
@GabrielePicco GabrielePicco requested a review from thlorenz May 25, 2026 15:30
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
magicblock-chainlink/src/submux/mod.rs (1)

1019-1025: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Use one connected-client snapshot for both quorum and dispatch.

required_*_subscription_confirmations() is computed before connected_clients_snapshot(). If a disconnect lands between those reads, process() can require confirmations from clients that are no longer in the routed set, causing transient subscribe failures during reconnect churn.

Compute quorum from the same snapshot
+    fn required_account_subscription_confirmations_for_clients(
+        clients: &[Arc<T>],
+    ) -> usize {
+        let n = clients.iter().filter(|c| c.subs_immediately()).count();
+        cmp::max(1, (n * 2) / 3)
+    }
+
+    fn required_program_subscription_confirmations_for_clients(
+        clients: &[Arc<T>],
+    ) -> usize {
+        let n = clients.iter().filter(|c| c.subs_immediately()).count();
+        cmp::max(1, n / 3)
+    }
+
     async fn subscribe(
         &self,
         pubkey: Pubkey,
         retries: Option<usize>,
     ) -> RemoteAccountProviderResult<()> {
+        let connected_clients = self.connected_clients_snapshot();
         AccountSubscriptionTask::Subscribe(
             pubkey,
             retries,
-            self.required_account_subscription_confirmations(),
+            Self::required_account_subscription_confirmations_for_clients(
+                &connected_clients,
+            ),
         )
-        .process(self.connected_clients_snapshot())
+        .process(connected_clients)
         .await
     }
 
     async fn subscribe_program(
         &self,
         program_id: Pubkey,
     ) -> RemoteAccountProviderResult<()> {
+        let connected_clients = self.connected_clients_snapshot();
         AccountSubscriptionTask::SubscribeProgram(
             program_id,
-            self.required_program_subscription_confirmations(),
+            Self::required_program_subscription_confirmations_for_clients(
+                &connected_clients,
+            ),
         )
-        .process(self.connected_clients_snapshot())
+        .process(connected_clients)
         .await
     }

Also applies to: 1043-1048

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@magicblock-chainlink/src/submux/mod.rs` around lines 1019 - 1025,
AccountSubscriptionTask::Subscribe currently calls
required_account_subscription_confirmations() before reading
connected_clients_snapshot(), so a disconnect between those calls can make
process() expect confirmations from clients no longer in the snapshot; fix by
calling connected_clients_snapshot() once into a local variable, compute
required_account_subscription_confirmations() (and any other
required_*_subscription_confirmations()) using that snapshot, and then call
.process() with the same snapshot variable; apply the same change to the other
similar block (the subscribe call around the 1043-1048 equivalent) so quorum and
dispatch use the identical connected_clients_snapshot().
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@magicblock-chainlink/src/submux/mod.rs`:
- Around line 325-330: The code is calling .expect() on
connected_client_ids.lock() which will panic on a poisoned Mutex; replace these
panicking paths with a non-panicking lock recovery pattern (match on
connected_client_ids.lock() and use Err(poison).into_inner() to recover) or a
small helper like lock_or_recover(&connected_client_ids) that returns a
MutexGuard even when poisoned; apply this change to the occurrences using
connected_client_ids.lock() (e.g., the block that computes was_connected using
Self::client_key(&client) and the other listed spots) so the mux handles
poisoned locks gracefully instead of crashing.
- Around line 2001-2006: The test currently uses sleep_ms(100) after calling
client1.disable_reconnect(), client1.simulate_disconnect(), and
aborts[0].send(), which makes the assertion on
mux.connected_clients.load(Ordering::SeqCst) timing-sensitive; replace the fixed
sleep with a bounded wait that polls the shared state until it becomes 1 (or
times out). Implement a small loop using tokio::time::timeout combined with a
short delay (e.g., tokio::time::sleep for a few ms or tokio::task::yield_now)
that repeatedly checks mux.connected_clients.load(Ordering::SeqCst) and only
proceeds when it observes 1 (failing the test if the timeout elapses), keeping
the calls to client1.disable_reconnect(), client1.simulate_disconnect(), and
aborts[0].send() unchanged.

---

Outside diff comments:
In `@magicblock-chainlink/src/submux/mod.rs`:
- Around line 1019-1025: AccountSubscriptionTask::Subscribe currently calls
required_account_subscription_confirmations() before reading
connected_clients_snapshot(), so a disconnect between those calls can make
process() expect confirmations from clients no longer in the snapshot; fix by
calling connected_clients_snapshot() once into a local variable, compute
required_account_subscription_confirmations() (and any other
required_*_subscription_confirmations()) using that snapshot, and then call
.process() with the same snapshot variable; apply the same change to the other
similar block (the subscribe call around the 1043-1048 equivalent) so quorum and
dispatch use the identical connected_clients_snapshot().
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 1e7f2940-8dd2-40fb-b44d-6217e9dc3898

📥 Commits

Reviewing files that changed from the base of the PR and between 2277af6 and ef4fae3.

📒 Files selected for processing (2)
  • magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs
  • magicblock-chainlink/src/submux/mod.rs

Comment thread magicblock-chainlink/src/submux/mod.rs
Comment thread magicblock-chainlink/src/submux/mod.rs
Copy link
Copy Markdown
Collaborator

@thlorenz thlorenz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM aside from how the id is derived.

Comment thread magicblock-chainlink/src/submux/mod.rs
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3d7a6cf489

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread magicblock-chainlink/src/submux/mod.rs Outdated
@GabrielePicco GabrielePicco requested a review from thlorenz May 27, 2026 13:21
@GabrielePicco GabrielePicco enabled auto-merge (squash) May 27, 2026 13:21
@GabrielePicco GabrielePicco disabled auto-merge May 27, 2026 13:57
@GabrielePicco GabrielePicco merged commit fe96f25 into master May 27, 2026
6 of 7 checks passed
@GabrielePicco GabrielePicco deleted the fix/pubsub-reconnect-failures branch May 27, 2026 13:57
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 82899cadaf

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +449 to +450
Self::connected_client_ids_lock(&self.connected_client_ids)
.insert(Self::client_key(&client));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reconcile deferred clients before marking them connected

When a deferred pubsub endpoint is attached, add_client snapshots program_subs before replaying them, but the client is excluded from connected_clients_snapshot() until this insert. RemoteAccountProvider::subscribe_program does not take subscription_transition_lock, so a program subscription added while add_client is between its initial snapshot and this insertion is recorded in program_subs and sent only to the already-connected clients; there is no second replay in add_client, leaving the newly attached client unsubscribed for that program until a future reconnect.

Useful? React with 👍 / 👎.

GabrielePicco added a commit that referenced this pull request May 27, 2026
Co-authored-by: Thorsten Lorenz <thlorenz@gmx.de>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants