Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion crates/data-chain/src/primary/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::attestation::{AggregatedAttestation, Attestation};
use crate::batch::BatchDigest;
use crate::car::Car;
use crate::cut::Cut;
use crate::error::DclError;
use crate::messages::{DclMessage, PrimaryToWorker, WorkerToPrimary};
use crate::primary::attestation_collector::AttestationCollector;
use crate::primary::config::PrimaryConfig;
Expand Down Expand Up @@ -459,7 +460,37 @@ impl Primary {

DclMessage::CarResponse(car_opt) => {
if let Some(car) = car_opt {
self.handle_received_car(peer, car).await;
debug!(
from = %peer,
proposer = %car.proposer,
position = car.position,
"Received CarResponse for gap recovery"
);

// Clear the pending request tracker
self.state.clear_car_request(&car.proposer, car.position);

// Process the missing Car first
self.handle_received_car(peer, car.clone()).await;

// After processing, check if any queued Cars are now ready
// This handles the case where we had position 2 queued and just received position 1
let ready_cars = self.state.get_cars_ready_after_gap_filled(&car.proposer);
for ready_car in ready_cars {
debug!(
proposer = %ready_car.proposer,
position = ready_car.position,
"Processing queued Car after gap filled"
);
// Process recursively - might trigger more attestations or gap recoveries
self.handle_received_car(ready_car.proposer, ready_car)
.await;
}
} else {
debug!(
from = %peer,
"Received empty CarResponse - peer doesn't have the requested Car"
);
}
}

Expand Down Expand Up @@ -565,6 +596,49 @@ impl Primary {
);
}

Err(DclError::PositionGap {
validator,
expected,
actual,
}) => {
// Position gap detected - the Car arrived out of order
// We need to request the missing predecessor(s) and queue this Car
debug!(
from = %from,
proposer = %validator,
expected,
actual,
"Position gap detected, initiating gap recovery"
);

// Queue the out-of-order Car for later processing
if !self.state.is_awaiting_gap_sync(&validator, actual) {
self.state.queue_car_awaiting_gap(car.clone(), expected);
}

// Request missing Cars (all positions from expected to actual-1)
let missing_positions = self.state.get_missing_positions(&validator, actual);
for pos in missing_positions {
if !self.state.is_car_request_pending(&validator, pos) {
debug!(
validator = %validator,
position = pos,
"Sending CarRequest for missing position"
);

// Track the pending request
self.state.track_car_request(validator, pos);

// Send CarRequest to all peers (one of them should have it)
let request = DclMessage::CarRequest {
validator,
position: pos,
};
self.network.broadcast(&request).await;
}
}
}

Err(e) => {
warn!(
from = %from,
Expand Down
155 changes: 154 additions & 1 deletion crates/data-chain/src/primary/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,22 @@ pub struct CarAwaitingBatches {
pub requested_at: Instant,
}

/// Car awaiting gap synchronization (received out of order)
///
/// When we receive a Car at position N but expect position M (where M < N),
/// we queue the Car here and request the missing predecessors via CarRequest.
#[derive(Clone, Debug)]
pub struct CarAwaitingGapSync {
/// The Car that arrived out of order
pub car: Car,
/// The position we expected (the gap starts here)
pub expected_position: u64,
/// When we first received this out-of-order Car
pub received_at: Instant,
/// Number of retry attempts for requesting missing Cars
pub retry_count: u32,
}

/// Pipeline stage for tracking consensus progress (T111)
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PipelineStage {
Expand Down Expand Up @@ -90,6 +106,12 @@ pub struct PrimaryState {
pub pending_cars: HashMap<Hash, PendingCar>,
/// Cars awaiting batch synchronization (keyed by Car hash)
pub cars_awaiting_batches: HashMap<Hash, CarAwaitingBatches>,
/// Cars awaiting gap synchronization (keyed by (validator, position))
/// These are Cars received out of order, waiting for predecessors
pub cars_awaiting_gap_sync: HashMap<(ValidatorId, u64), CarAwaitingGapSync>,
/// Pending CarRequest tracking (validator, position) -> request time
/// Used to avoid sending duplicate requests and implement backoff
pub pending_car_requests: HashMap<(ValidatorId, u64), Instant>,
/// Highest attested Car per validator (ready for Cut inclusion)
/// Stores the Car and its aggregated attestation (with aggregated BLS signature)
pub attested_cars: HashMap<ValidatorId, (Car, AggregatedAttestation)>,
Expand Down Expand Up @@ -134,6 +156,8 @@ impl PrimaryState {
last_seen_car_hashes: HashMap::new(),
pending_cars: HashMap::new(),
cars_awaiting_batches: HashMap::new(),
cars_awaiting_gap_sync: HashMap::new(),
pending_car_requests: HashMap::new(),
attested_cars: HashMap::new(),
last_attested_idx: 0,
equivocations: HashMap::new(),
Expand Down Expand Up @@ -335,6 +359,132 @@ impl PrimaryState {
self.cars_awaiting_batches.contains_key(car_hash)
}

// =========================================================
// Gap recovery for out-of-order Cars (Issue #106)
// =========================================================

/// Queue a Car that arrived out of order (position gap detected)
///
/// When we receive a Car at position N but expect position M < N,
/// we queue it here and will process it once we receive the missing
/// predecessors via CarRequest/CarResponse.
pub fn queue_car_awaiting_gap(&mut self, car: Car, expected_position: u64) {
let key = (car.proposer, car.position);
// Don't re-queue if already waiting
if self.cars_awaiting_gap_sync.contains_key(&key) {
return;
}
self.cars_awaiting_gap_sync.insert(
key,
CarAwaitingGapSync {
car,
expected_position,
received_at: Instant::now(),
retry_count: 0,
},
);
}

/// Check if a Car is already queued for gap sync
pub fn is_awaiting_gap_sync(&self, validator: &ValidatorId, position: u64) -> bool {
self.cars_awaiting_gap_sync
.contains_key(&(*validator, position))
}

/// Get Cars that are ready after a gap is filled
///
/// When we receive missing Cars via CarResponse, this method returns
/// queued Cars that can now be processed (their expected_position matches
/// the newly updated last_seen_position + 1).
///
/// Returns Cars in position order for the given validator.
pub fn get_cars_ready_after_gap_filled(&mut self, validator: &ValidatorId) -> Vec<Car> {
let expected = self.expected_position(validator);

let mut ready = Vec::new();
let mut ready_keys = Vec::new();

// Find all queued Cars for this validator that are now at expected position
for ((v, pos), awaiting) in &self.cars_awaiting_gap_sync {
if v == validator && *pos == expected {
ready.push(awaiting.car.clone());
ready_keys.push((*v, *pos));
}
}

// Remove the ready Cars from the queue
for key in ready_keys {
self.cars_awaiting_gap_sync.remove(&key);
}

ready
}

/// Track a pending CarRequest to avoid duplicates
pub fn track_car_request(&mut self, validator: ValidatorId, position: u64) {
self.pending_car_requests
.insert((validator, position), Instant::now());
}

/// Check if a CarRequest is already pending
pub fn is_car_request_pending(&self, validator: &ValidatorId, position: u64) -> bool {
self.pending_car_requests
.contains_key(&(*validator, position))
}

/// Clear a pending CarRequest (e.g., after receiving response)
pub fn clear_car_request(&mut self, validator: &ValidatorId, position: u64) {
self.pending_car_requests.remove(&(*validator, position));
}

/// Get stale CarRequests that should be retried
///
/// Returns (validator, position) pairs for requests older than the timeout.
pub fn get_stale_car_requests(&self, timeout: std::time::Duration) -> Vec<(ValidatorId, u64)> {
let now = Instant::now();
self.pending_car_requests
.iter()
.filter_map(|((v, pos), requested_at)| {
if now.duration_since(*requested_at) > timeout {
Some((*v, *pos))
} else {
None
}
})
.collect()
}

/// Get all missing positions for a validator that need CarRequests
///
/// Given the expected position and a received position, returns all
/// positions in between that we need to request.
pub fn get_missing_positions(
&self,
validator: &ValidatorId,
received_position: u64,
) -> Vec<u64> {
let expected = self.expected_position(validator);
if received_position <= expected {
return Vec::new();
}
(expected..received_position)
.filter(|pos| !self.is_car_request_pending(validator, *pos))
.collect()
}

/// Cleanup stale gap sync data
fn cleanup_stale_gap_sync_data(&mut self, timeout: std::time::Duration) {
let now = Instant::now();

// Cleanup old Cars awaiting gap sync
self.cars_awaiting_gap_sync
.retain(|_, awaiting| now.duration_since(awaiting.received_at) < timeout);

// Cleanup old pending CarRequests
self.pending_car_requests
.retain(|_, requested_at| now.duration_since(*requested_at) < timeout);
}

// =========================================================
// Pipeline state management (T111-T113)
// =========================================================
Expand Down Expand Up @@ -516,7 +666,7 @@ impl PrimaryState {
referenced
}

/// Cleanup stale pending data (pending cars, cars awaiting batches).
/// Cleanup stale pending data (pending cars, cars awaiting batches, gap sync).
///
/// Removes entries that have been pending for too long (likely orphaned).
/// Uses a timeout-based approach: entries older than 5 minutes are removed.
Expand All @@ -532,6 +682,9 @@ impl PrimaryState {
// Cleanup old cars awaiting batches
self.cars_awaiting_batches
.retain(|_, awaiting| awaiting.requested_at.elapsed() < STALE_TIMEOUT);

// Cleanup old gap sync data
self.cleanup_stale_gap_sync_data(STALE_TIMEOUT);
}

/// Perform full memory cleanup.
Expand Down