Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ configs_derive = { path = "core/configs_derive", version = "0.1.0" }
consensus = { path = "core/consensus" }
console-subscriber = "0.5.0"
crossbeam = "0.8.4"
crossfire = "3.1.4"
ctor = "0.6.3"
ctrlc = { version = "3.5", features = ["termination"] }
cucumber = "0.22"
Expand Down
1 change: 1 addition & 0 deletions DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ crossbeam-deque: 0.8.6, "Apache-2.0 OR MIT",
crossbeam-epoch: 0.9.18, "Apache-2.0 OR MIT",
crossbeam-queue: 0.3.12, "Apache-2.0 OR MIT",
crossbeam-utils: 0.8.21, "Apache-2.0 OR MIT",
crossfire: 3.1.4, "Apache-2.0",
crossterm: 0.29.0, "MIT",
crossterm_winapi: 0.9.1, "MIT",
crunchy: 0.2.4, "MIT",
Expand Down
40 changes: 40 additions & 0 deletions core/common/src/types/consensus/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,46 @@ pub enum Operation {
Reserved = 200,
}

impl Operation {
/// Returns `true` for metadata / control-plane operations (streams, topics,
/// users, consumer groups, etc.) that are always handled by shard 0.
#[inline]
pub fn is_metadata(&self) -> bool {
Comment thread
numinnex marked this conversation as resolved.
matches!(
self,
Operation::CreateStream
| Operation::UpdateStream
| Operation::DeleteStream
| Operation::PurgeStream
| Operation::CreateTopic
| Operation::UpdateTopic
| Operation::DeleteTopic
| Operation::PurgeTopic
| Operation::CreatePartitions
| Operation::DeletePartitions
| Operation::CreateConsumerGroup
| Operation::DeleteConsumerGroup
| Operation::CreateUser
| Operation::UpdateUser
| Operation::DeleteUser
| Operation::ChangePassword
| Operation::UpdatePermissions
| Operation::CreatePersonalAccessToken
| Operation::DeletePersonalAccessToken
)
}

/// Returns `true` for data-plane operations that are routed to the shard
/// owning the partition identified by the message's namespace.
#[inline]
pub fn is_partition(&self) -> bool {
matches!(
self,
Operation::SendMessages | Operation::StoreConsumerOffset | Operation::DeleteSegments
)
}
}

#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct GenericHeader {
Expand Down
28 changes: 2 additions & 26 deletions core/metadata/src/impls/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ use consensus::{
};
use iggy_common::{
header::{
Command2, ConsensusHeader, GenericHeader, Operation, PrepareHeader, PrepareOkHeader,
RequestHeader,
Command2, ConsensusHeader, GenericHeader, PrepareHeader, PrepareOkHeader, RequestHeader,
},
message::Message,
};
Expand Down Expand Up @@ -260,30 +259,7 @@ where
message.header().command(),
Command2::Request | Command2::Prepare | Command2::PrepareOk
));
let operation = message.header().operation();
// TODO: Use better selection, smth like greater or equal based on op number.
matches!(
operation,
Operation::CreateStream
| Operation::UpdateStream
| Operation::DeleteStream
| Operation::PurgeStream
| Operation::CreateTopic
| Operation::UpdateTopic
| Operation::DeleteTopic
| Operation::PurgeTopic
| Operation::CreatePartitions
| Operation::DeletePartitions
| Operation::CreateConsumerGroup
| Operation::DeleteConsumerGroup
| Operation::CreateUser
| Operation::UpdateUser
| Operation::DeleteUser
| Operation::ChangePassword
| Operation::UpdatePermissions
| Operation::CreatePersonalAccessToken
| Operation::DeletePersonalAccessToken
)
message.header().operation().is_metadata()
}
}

Expand Down
7 changes: 1 addition & 6 deletions core/partitions/src/iggy_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,12 +514,7 @@ where
message.header().command(),
Command2::Request | Command2::Prepare | Command2::PrepareOk
));
let operation = message.header().operation();
// TODO: Use better selection, smth like greater or equal based on op number.
matches!(
operation,
Operation::DeleteSegments | Operation::SendMessages | Operation::StoreConsumerOffset
)
message.header().operation().is_partition()
}
}

Expand Down
5 changes: 5 additions & 0 deletions core/shard/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ edition = "2024"

[dependencies]
consensus = { path = "../consensus" }
crossfire = { workspace = true }
futures = { workspace = true }
hash32 = { workspace = true }
iggy_common = { path = "../common" }
journal = { path = "../journal" }
message_bus = { path = "../message_bus" }
metadata = { path = "../metadata" }
papaya = { workspace = true }
partitions = { path = "../partitions" }
tracing = { workspace = true }
125 changes: 100 additions & 25 deletions core/shard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use consensus::{
MetadataHandle, MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, PlaneIdentity,
VsrConsensus,
};
mod router;
pub mod shards_table;

use consensus::{MuxPlane, NamespacedPipeline, PartitionsHandle, Plane, VsrConsensus};
use iggy_common::header::{GenericHeader, PrepareHeader, PrepareOkHeader, RequestHeader};
use iggy_common::message::{Message, MessageBag};
use iggy_common::sharding::IggyNamespace;
Expand All @@ -28,6 +28,7 @@ use message_bus::MessageBus;
use metadata::IggyMetadata;
use metadata::stm::StateMachine;
use partitions::IggyPartitions;
use shards_table::ShardsTable;

pub type ShardPlane<B, J, S, M> = MuxPlane<
variadic!(
Expand All @@ -36,30 +37,116 @@ pub type ShardPlane<B, J, S, M> = MuxPlane<
),
>;

pub struct IggyShard<B, J, S, M>
/// Bounded mpsc channel sender (blocking send).
pub type Sender<T> = crossfire::MTx<crossfire::mpsc::Array<T>>;

/// Bounded mpsc channel receiver (async recv).
pub type Receiver<T> = crossfire::AsyncRx<crossfire::mpsc::Array<T>>;

/// Create a bounded mpsc channel with a blocking sender and async receiver.
pub fn channel<T: Send + 'static>(capacity: usize) -> (Sender<T>, Receiver<T>) {
crossfire::mpsc::bounded_blocking_async(capacity)
}

/// Envelope for inter-shard channel messages.
///
/// Wraps a consensus [`Message`] together with an optional one-shot response
/// channel. Fire-and-forget dispatches leave `response_sender` as `None`;
/// request-response dispatches provide a sender that the message pump will
/// notify once the message has been processed.
///
/// The response type `R` is generic so that higher layers (e.g. HTTP handlers)
/// can carry a response enum while the consensus layer can default to `()`.
pub struct ShardFrame<R: Send + 'static = ()> {
pub message: Message<GenericHeader>,
pub response_sender: Option<Sender<R>>,
}

impl<R: Send + 'static> ShardFrame<R> {
/// Create a fire-and-forget frame (no caller waiting for completion).
pub fn fire_and_forget(message: Message<GenericHeader>) -> Self {
Self {
message,
response_sender: None,
}
}

/// Create a request-response frame. Returns the frame and a receiver
/// that the caller can await for completion notification.
pub fn with_response(message: Message<GenericHeader>) -> (Self, Receiver<R>) {
let (tx, rx) = channel(1);
(
Self {
message,
response_sender: Some(tx),
},
rx,
)
}
}

pub struct IggyShard<B, J, S, M, T = (), R: Send + 'static = ()>
where
B: MessageBus,
{
pub id: u8,
pub id: u16,
pub name: String,
pub plane: ShardPlane<B, J, S, M>,

/// Channel senders to every shard, indexed by shard id.
/// Includes a sender to self so that local routing goes through the
/// same channel path as remote routing.
senders: Vec<Sender<ShardFrame<R>>>,

/// Receiver end of this shard's inbox. Peer shards (and self) send
/// messages here via the corresponding sender.
inbox: Receiver<ShardFrame<R>>,

/// Partition namespace -> owning shard lookup.
shards_table: T,
}

impl<B, J, S, M> IggyShard<B, J, S, M>
impl<B, J, S, M, T, R: Send + 'static> IggyShard<B, J, S, M, T, R>
where
B: MessageBus,
T: ShardsTable,
{
/// Create a new shard from pre-built metadata and partition planes.
/// Create a new shard with channel links and a shards table.
///
/// * `senders` - one sender per shard in the cluster (indexed by shard id).
/// * `inbox` - the receiver that this shard drains in its message pump.
/// * `shards_table` - namespace -> shard routing table.
pub fn new(
id: u8,
id: u16,
name: String,
metadata: IggyMetadata<VsrConsensus<B>, J, S, M>,
partitions: IggyPartitions<VsrConsensus<B, NamespacedPipeline>>,
senders: Vec<Sender<ShardFrame<R>>>,
inbox: Receiver<ShardFrame<R>>,
shards_table: T,
) -> Self {
let plane = MuxPlane::new(variadic!(metadata, partitions));
Self { id, name, plane }
Self {
id,
name,
plane,
senders,
inbox,
shards_table,
}
}

pub fn shards_table(&self) -> &T {
&self.shards_table
}
}

/// Local message processing — these methods handle messages that have been
/// routed to this shard via the message pump.
impl<B, J, S, M, T, R: Send + 'static> IggyShard<B, J, S, M, T, R>
where
B: MessageBus,
{
/// Dispatch an incoming network message to the appropriate consensus plane.
///
/// Routes requests, replication messages, and acks to either the metadata
Expand Down Expand Up @@ -93,11 +180,7 @@ where
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
if self.plane.metadata().is_applicable(&request) {
self.plane.metadata().on_request(request).await;
} else {
self.plane.partitions().on_request(request).await;
}
self.plane.on_request(request).await;
}

pub async fn on_replicate(&self, prepare: Message<PrepareHeader>)
Expand All @@ -111,11 +194,7 @@ where
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
if self.plane.metadata().is_applicable(&prepare) {
self.plane.metadata().on_replicate(prepare).await;
} else {
self.plane.partitions().on_replicate(prepare).await;
}
self.plane.on_replicate(prepare).await;
}

pub async fn on_ack(&self, prepare_ok: Message<PrepareOkHeader>)
Expand All @@ -129,11 +208,7 @@ where
>,
M: StateMachine<Input = Message<PrepareHeader>>,
{
if self.plane.metadata().is_applicable(&prepare_ok) {
self.plane.metadata().on_ack(prepare_ok).await;
} else {
self.plane.partitions().on_ack(prepare_ok).await;
}
self.plane.on_ack(prepare_ok).await;
}

/// Drain and dispatch loopback messages for each consensus plane.
Expand Down
Loading
Loading