Skip to content

feat(shard): Implement shard router#2853

Open
krishvishal wants to merge 5 commits intoapache:masterfrom
krishvishal:shard-router
Open

feat(shard): Implement shard router#2853
krishvishal wants to merge 5 commits intoapache:masterfrom
krishvishal:shard-router

Conversation

@krishvishal
Copy link
Contributor

In the shard router each shard gets an inbox (flume channel) and a vector of senders to all other shards (including itself). When a message arrives at any shard, the router:

  1. Decomposes the generic message to extract the operation and namespace
  2. Resolves the target shard - metadata ops go to shard 0, partition ops are looked up in the ShardsTable, unknown ops default to shard 0
  3. Enqueues a ShardFrame into the target shard's channel

The message pump (run_message_pump) drains the inbox and processes frames sequentially, ensuring all mutations on a given shard are serialized through a single async task.

The ShardsTable trait abstracts partition ownership lookup with two implementations:

  • () : no-op for single-shard setups (simulator for now)
  • PapayaShardsTable: lock-free concurrent map for multi-shard deployments

@codecov
Copy link

codecov bot commented Mar 3, 2026

Codecov Report

❌ Patch coverage is 0% with 173 lines in your changes missing coverage. Please review.
✅ Project coverage is 67.79%. Comparing base (be23a35) to head (02683f0).

Files with missing lines Patch % Lines
core/shard/src/router.rs 0.00% 86 Missing ⚠️
core/shard/src/lib.rs 0.00% 44 Missing ⚠️
core/shard/src/shards_table.rs 0.00% 33 Missing ⚠️
core/common/src/types/consensus/header.rs 0.00% 8 Missing ⚠️
core/simulator/src/replica.rs 0.00% 2 Missing ⚠️

❌ Your patch check has failed because the patch coverage (0.00%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage.

Additional details and impacted files
@@             Coverage Diff              @@
##             master    #2853      +/-   ##
============================================
- Coverage     67.94%   67.79%   -0.15%     
  Complexity      739      739              
============================================
  Files          1049     1051       +2     
  Lines         84385    84542     +157     
  Branches      60963    61131     +168     
============================================
- Hits          57336    57319      -17     
- Misses        24692    24851     +159     
- Partials       2357     2372      +15     
Flag Coverage Δ
csharp 67.43% <ø> (-0.19%) ⬇️
go 6.33% <ø> (ø)
java 54.83% <ø> (ø)
node 92.26% <ø> (-0.15%) ⬇️
python 0.00% <ø> (ø)
rust 70.04% <0.00%> (-0.20%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
core/simulator/src/replica.rs 0.00% <0.00%> (ø)
core/common/src/types/consensus/header.rs 23.57% <0.00%> (-0.70%) ⬇️
core/shard/src/shards_table.rs 0.00% <0.00%> (ø)
core/shard/src/lib.rs 0.00% <0.00%> (ø)
core/shard/src/router.rs 0.00% <0.00%> (ø)

... and 15 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@hubcio
Copy link
Contributor

hubcio commented Mar 3, 2026

can you use crossfire instead of flume? its much more battle-tested than flume and seems to have better performance.

@krishvishal
Copy link
Contributor Author

can you use crossfire instead of flume? its much more battle-tested than flume and seems to have better performance.

Okay, I will look into it.

Comment on lines +34 to +38
impl<B, J, S, M, T, R> IggyShard<B, J, S, M, T, R>
where
B: MessageBus,
T: ShardsTable,
R: Send,
Copy link
Contributor Author

@krishvishal krishvishal Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hubcio with crossfire. This code will change to

impl<B, J, S, M, T, R> IggyShard<B, J, S, M, T, R>
where
    B: MessageBus,
    T: ShardsTable,
    R: Send + 'static,
{

crossfire requires additional 'static bound. Which maybe too restrictive. I'm thinking if it would cause problems for us or not. Let me know what do you think of this.

Copy link
Contributor

@hubcio hubcio Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO it's OK. shard (and messages router) should live forever in given thread so static lifetime fits perfectly. @numinnex any thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine, lets move to crossfire

if planes.0.is_applicable(&request) {
planes.0.on_request(request).await;
let (metadata, (partitions, _)) = self.plane.inner();
if metadata.is_applicable(&request) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we do it this way ? that's the purpose of MuxPlane to not have that logic exposed inside of the top-level caller

Copy link
Contributor Author

@krishvishal krishvishal Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MuxPlane's Plane trait dispatch can't work here because the two planes use different consensus types:

  1. Metadata: VsrConsensus<B> (default LocalPipeline)
  2. Partitions: VsrConsensus<B, NamespacedPipeline>

The variadic Plane impl requires a single C for both, so self.plane.on_request(...) doesn't compile without either:

  • Unifying both planes on NamespacedPipeline which changes metadata's pipeline behavior.
  • Adding a second Plane impl for IggyPartitions with the wrong pipeline type

Both feel like they are hacky solutions.

/// Dispatch a message and return a receiver that resolves when the target
/// shard has finished processing it.
pub fn dispatch_request(&self, message: Message<GenericHeader>) -> flume::Receiver<R> {
let (operation, namespace, generic) = Self::decompose(&message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get rid of the decompose function, instead do it inline there and match inline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/// - Partition operations route to the shard that owns the namespace,
/// looked up via the [`ShardsTable`].
/// - Unknown operations fall back to shard 0.
fn resolve(&self, operation: Operation, namespace: IggyNamespace) -> u16 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inline this aswell

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants