Streams: Re-evaluate subscribe_if predicate on catalog updates#9
Merged
karim-agha merged 3 commits intoflashbots:mainfrom Feb 17, 2026
Merged
Conversation
Motivation: In architectures where BFT consensus (e.g. Simplex, CometBFT) runs at the outer layer with rotating leaders, and each validator runs an internal Mosaik Raft group for high availability, stream consumers need to dynamically route orderflow to whichever validator is the current BFT leader. The natural pattern is for leader nodes to tag themselves (e.g. "bft-role:leader") and for consumers to use subscribe_if predicates that filter on those tags. However, this only works if the consumer re-evaluates the predicate when tags change on existing peers -- not just when new peers appear. Previously, the consumer worker only evaluated the subscribe_if predicate when discovering new producers. Active connections were never re-checked, so producers whose tags changed to no longer match the predicate would remain connected indefinitely, accumulating stale connections over time. This adds dynamic predicate re-evaluation: on each catalog update, active connections are checked against the current predicate and producers that no longer satisfy it (or have left the catalog) are disconnected via per-receiver cancellation tokens. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Three matching engine nodes (Raft group) with two trader nodes demonstrating typed streams for order submission and fill notification. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Contributor
|
Thank you! |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
subscribe_ifpredicates when the discovery catalog updates, disconnecting producers whose tags (or other peer entry attributes) no longer satisfy the predicatedynamic_tag_reevaluation) that verifies connect/disconnect/reconnect cycle when producer tags change at runtimeMotivation
In architectures where BFT consensus (e.g. Simplex, CometBFT) runs at the outer layer with rotating leaders, and each validator runs an internal Mosaik Raft group for high availability, stream consumers need to dynamically route orderflow to whichever validator is the current BFT leader. The natural pattern is for leader nodes to tag themselves (e.g.
bft-role:leader) and for consumers to usesubscribe_ifpredicates that filter on those tags.Previously, the consumer worker only evaluated
subscribe_ifwhen discovering new producers. Active connections were never re-checked, so producers whose tags changed to no longer match the predicate remained connected indefinitely -- accumulating stale connections over leader rotations.Changes
src/streams/consumer/worker.rsreceiver_cancels: HashMap<Digest, CancellationToken>to track per-receiver cancel tokenson_catalog_update, re-evaluate the predicate for all active connections and disconnect those that no longer matchon_receiver_state_updateandon_terminatedtests/streams/consumer/auth.rsdynamic_tag_reevaluationtest: sets up a consumer with a tag-based predicate, simulates tag removal viadiscovery().feed()of a re-signed entry, verifies disconnection, then simulates a different producer gaining the tag and verifies reconnectionTest plan
streams::consumer::auth::by_tag-- existing test still passesstreams::consumer::auth::dynamic_tag_reevaluation-- new test passesstreams::consumer::when::smoke-- existing test still passescargo checkclean