Skip to content

Streams: Re-evaluate subscribe_if predicate on catalog updates#9

Merged
karim-agha merged 3 commits intoflashbots:mainfrom
zmanian:feature/dynamic-predicate-reevaluation
Feb 17, 2026
Merged

Streams: Re-evaluate subscribe_if predicate on catalog updates#9
karim-agha merged 3 commits intoflashbots:mainfrom
zmanian:feature/dynamic-predicate-reevaluation

Conversation

@zmanian
Copy link
Copy Markdown
Contributor

@zmanian zmanian commented Feb 16, 2026

Summary

  • Consumer stream workers now dynamically re-evaluate subscribe_if predicates when the discovery catalog updates, disconnecting producers whose tags (or other peer entry attributes) no longer satisfy the predicate
  • Adds per-receiver cancellation tokens so individual receivers can be terminated without affecting other active subscriptions
  • Includes integration test (dynamic_tag_reevaluation) that verifies connect/disconnect/reconnect cycle when producer tags change at runtime

Motivation

In architectures where BFT consensus (e.g. Simplex, CometBFT) runs at the outer layer with rotating leaders, and each validator runs an internal Mosaik Raft group for high availability, stream consumers need to dynamically route orderflow to whichever validator is the current BFT leader. The natural pattern is for leader nodes to tag themselves (e.g. bft-role:leader) and for consumers to use subscribe_if predicates that filter on those tags.

Previously, the consumer worker only evaluated subscribe_if when discovering new producers. Active connections were never re-checked, so producers whose tags changed to no longer match the predicate remained connected indefinitely -- accumulating stale connections over leader rotations.

Changes

src/streams/consumer/worker.rs

  • Added receiver_cancels: HashMap<Digest, CancellationToken> to track per-receiver cancel tokens
  • When spawning a receiver, create a child cancellation token and pass it (instead of the worker's token) so individual receivers can be cancelled
  • After connecting new producers in on_catalog_update, re-evaluate the predicate for all active connections and disconnect those that no longer match
  • Clean up cancel tokens in on_receiver_state_update and on_terminated

tests/streams/consumer/auth.rs

  • New dynamic_tag_reevaluation test: sets up a consumer with a tag-based predicate, simulates tag removal via discovery().feed() of a re-signed entry, verifies disconnection, then simulates a different producer gaining the tag and verifies reconnection

Test plan

  • streams::consumer::auth::by_tag -- existing test still passes
  • streams::consumer::auth::dynamic_tag_reevaluation -- new test passes
  • streams::consumer::when::smoke -- existing test still passes
  • cargo check clean

zmanian and others added 3 commits February 15, 2026 18:56
Motivation: In architectures where BFT consensus (e.g. Simplex, CometBFT)
runs at the outer layer with rotating leaders, and each validator runs an
internal Mosaik Raft group for high availability, stream consumers need to
dynamically route orderflow to whichever validator is the current BFT
leader. The natural pattern is for leader nodes to tag themselves (e.g.
"bft-role:leader") and for consumers to use subscribe_if predicates that
filter on those tags. However, this only works if the consumer re-evaluates
the predicate when tags change on existing peers -- not just when new peers
appear.

Previously, the consumer worker only evaluated the subscribe_if predicate
when discovering new producers. Active connections were never re-checked,
so producers whose tags changed to no longer match the predicate would
remain connected indefinitely, accumulating stale connections over time.

This adds dynamic predicate re-evaluation: on each catalog update, active
connections are checked against the current predicate and producers that
no longer satisfy it (or have left the catalog) are disconnected via
per-receiver cancellation tokens.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Three matching engine nodes (Raft group) with two trader nodes
demonstrating typed streams for order submission and fill
notification.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@karim-agha karim-agha merged commit 40e348a into flashbots:main Feb 17, 2026
@karim-agha
Copy link
Copy Markdown
Contributor

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants