feat(shard): Implement shard router#2853
Conversation
c010536 to
3a41f08
Compare
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (0.00%) is below the target coverage (50.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## master #2853 +/- ##
============================================
- Coverage 67.94% 67.79% -0.15%
Complexity 739 739
============================================
Files 1049 1051 +2
Lines 84385 84542 +157
Branches 60963 61131 +168
============================================
- Hits 57336 57319 -17
- Misses 24692 24851 +159
- Partials 2357 2372 +15
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
can you use |
Okay, I will look into it. |
| impl<B, J, S, M, T, R> IggyShard<B, J, S, M, T, R> | ||
| where | ||
| B: MessageBus, | ||
| T: ShardsTable, | ||
| R: Send, |
There was a problem hiding this comment.
@hubcio with crossfire. This code will change to
impl<B, J, S, M, T, R> IggyShard<B, J, S, M, T, R>
where
B: MessageBus,
T: ShardsTable,
R: Send + 'static,
{crossfire requires additional 'static bound. Which maybe too restrictive. I'm thinking if it would cause problems for us or not. Let me know what do you think of this.
There was a problem hiding this comment.
IMHO it's OK. shard (and messages router) should live forever in given thread so static lifetime fits perfectly. @numinnex any thoughts?
There was a problem hiding this comment.
I think it's fine, lets move to crossfire
| if planes.0.is_applicable(&request) { | ||
| planes.0.on_request(request).await; | ||
| let (metadata, (partitions, _)) = self.plane.inner(); | ||
| if metadata.is_applicable(&request) { |
There was a problem hiding this comment.
Why we do it this way ? that's the purpose of MuxPlane to not have that logic exposed inside of the top-level caller
There was a problem hiding this comment.
The MuxPlane's Plane trait dispatch can't work here because the two planes use different consensus types:
Metadata: VsrConsensus<B> (default LocalPipeline)Partitions: VsrConsensus<B, NamespacedPipeline>
The variadic Plane impl requires a single C for both, so self.plane.on_request(...) doesn't compile without either:
- Unifying both planes on
NamespacedPipelinewhich changes metadata's pipeline behavior. - Adding a second
Plane impl for IggyPartitionswith the wrong pipeline type
Both feel like they are hacky solutions.
core/shard/src/router.rs
Outdated
| /// Dispatch a message and return a receiver that resolves when the target | ||
| /// shard has finished processing it. | ||
| pub fn dispatch_request(&self, message: Message<GenericHeader>) -> flume::Receiver<R> { | ||
| let (operation, namespace, generic) = Self::decompose(&message); |
There was a problem hiding this comment.
Let's get rid of the decompose function, instead do it inline there and match inline
core/shard/src/router.rs
Outdated
| /// - Partition operations route to the shard that owns the namespace, | ||
| /// looked up via the [`ShardsTable`]. | ||
| /// - Unknown operations fall back to shard 0. | ||
| fn resolve(&self, operation: Operation, namespace: IggyNamespace) -> u16 { |
In the shard router each shard gets an inbox (flume channel) and a vector of senders to all other shards (including itself). When a message arrives at any shard, the router:
The message pump (run_message_pump) drains the inbox and processes frames sequentially, ensuring all mutations on a given shard are serialized through a single async task.
The ShardsTable trait abstracts partition ownership lookup with two implementations: