Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions core/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,6 @@ iggy_common = { workspace = true }
message_bus = { workspace = true }
rand = { workspace = true }
rand_xoshiro = { workspace = true }

[dev-dependencies]
futures = { workspace = true }
76 changes: 60 additions & 16 deletions core/consensus/src/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::{
};
use bit_set::BitSet;
use iggy_common::header::{
Command2, ConsensusHeader, DoViewChangeHeader, PrepareHeader, PrepareOkHeader, RequestHeader,
StartViewChangeHeader, StartViewHeader,
Command2, ConsensusHeader, DoViewChangeHeader, GenericHeader, PrepareHeader, PrepareOkHeader,
RequestHeader, StartViewChangeHeader, StartViewHeader,
};
use iggy_common::message::Message;
use message_bus::IggyMessageBus;
Expand Down Expand Up @@ -435,7 +435,7 @@ where
pipeline: RefCell<P>,

message_bus: B,
// TODO: Add loopback_queue for messages to self
loopback_queue: RefCell<VecDeque<Message<GenericHeader>>>,
/// Tracks start view change messages received from all replicas (including self)
start_view_change_from_all_replicas: RefCell<BitSet<u32>>,

Expand Down Expand Up @@ -484,6 +484,7 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
last_prepare_checksum: Cell::new(0),
pipeline: RefCell::new(pipeline),
message_bus,
loopback_queue: RefCell::new(VecDeque::with_capacity(PIPELINE_PREPARE_QUEUE_MAX)),
start_view_change_from_all_replicas: RefCell::new(BitSet::with_capacity(REPLICAS_MAX)),
do_view_change_from_all_replicas: RefCell::new(dvc_quorum_array_empty()),
do_view_change_quorum: Cell::new(false),
Expand Down Expand Up @@ -621,12 +622,18 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
self.do_view_change_quorum.set(false);
}

/// Reset all view change state for a new view.
fn reset_view_change_state(&self) {
/// Reset all view change state when transitioning to a new view.
///
/// Clears the loopback queue: stale PrepareOks from the old view
/// reference pipeline entries that no longer exist, so processing
/// them would be a no-op (handle_prepare_ok ignores unknown ops).
/// The primary does not require its own self-ack for quorum.
pub(crate) fn reset_view_change_state(&self) {
self.reset_svc_quorum();
self.reset_dvc_quorum();
self.sent_own_start_view_change.set(false);
self.sent_own_do_view_change.set(false);
self.loopback_queue.borrow_mut().clear();
}

/// Process one tick. Call this periodically (e.g., every 10ms).
Expand Down Expand Up @@ -1061,6 +1068,11 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
// Stale pipeline entries from the old view are invalid in the new view.
// Log reconciliation replays from the journal, not the pipeline.
self.pipeline.borrow_mut().clear();
// Stale PrepareOk messages from the old view must not leak into the new view.
// `reset_view_change_state` handles this for view-number advances (SVC/DVC/SV),
// but this path fires within the current view after DVC quorum -- so we clear
// the loopback queue directly.
self.loopback_queue.borrow_mut().clear();

// Update timeouts for normal primary operation
{
Expand All @@ -1079,12 +1091,10 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
}]
}

/// Handle a prepare_ok message from a follower.
/// Called on the primary when a follower acknowledges a prepare.
/// Handle a PrepareOk message from a replica.
///
/// Returns true if quorum was just reached for this op.
/// Handle a PrepareOk message. Returns true if quorum was reached.
/// Note: Caller (on_ack) should validate is_primary and status before calling.
/// Returns `true` if quorum was just reached for this op.
/// Caller (`on_ack`) should validate `is_primary` and status before calling.
pub fn handle_prepare_ok(&self, header: &PrepareOkHeader) -> bool {
assert_eq!(header.command, Command2::PrepareOk);
assert!(
Expand Down Expand Up @@ -1139,6 +1149,42 @@ impl<B: MessageBus, P: Pipeline<Entry = PipelineEntry>> VsrConsensus<B, P> {
false
}

/// Enqueue a self-addressed message for processing in the next loopback drain.
///
/// Currently only PrepareOk messages are routed here (via `send_or_loopback`).
// TODO: Route SVC/DVC self-messages through loopback once VsrAction dispatch is implemented.
pub(crate) fn push_loopback(&self, message: Message<GenericHeader>) {
assert!(
self.loopback_queue.borrow().len() < PIPELINE_PREPARE_QUEUE_MAX,
"loopback queue overflow: {} items",
self.loopback_queue.borrow().len()
);
self.loopback_queue.borrow_mut().push_back(message);
}

/// Drain all pending loopback messages into `buf`, leaving the queue empty.
///
/// The caller must dispatch each drained message to the appropriate handler.
pub fn drain_loopback_into(&self, buf: &mut Vec<Message<GenericHeader>>) {
buf.extend(self.loopback_queue.borrow_mut().drain(..));
}

/// Send a message to `target`, routing self-addressed messages through the loopback queue.
pub(crate) async fn send_or_loopback(&self, target: u8, message: Message<GenericHeader>)
where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
{
if target == self.replica {
self.push_loopback(message);
} else {
// TODO: Propagate send errors instead of panicking; requires bus error design.
self.message_bus
.send_to_replica(target, message)
.await
.unwrap();
}
}

pub fn message_bus(&self) -> &B {
&self.message_bus
}
Expand Down Expand Up @@ -1222,12 +1268,10 @@ where
type Sequencer = LocalSequencer;
type Pipeline = P;

// TODO(hubcio): maybe we could record the primary's own ack here
// (entry.add_ack(self.replica)) instead of round-tripping through
// the message bus via send_prepare_ok.
// This avoids serialization/queuing overhead and would also allow
// reordering to WAL-first (on_replicate before pipeline_message)
// without risking lost self-acks from dispatch timing.
// The primary's self-ack is delivered via the loopback queue
// (push_loopback / drain_loopback_into) rather than inline here,
// so that WAL persistence can happen between pipeline insertion
// and ack recording.
fn pipeline_message(&self, message: Self::Message<Self::ReplicateHeader>) {
assert!(self.is_primary(), "only primary can pipeline messages");

Expand Down
Loading
Loading