Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 114 additions & 6 deletions crates/consensus/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use async_trait::async_trait;
use informalsystems_malachitebft_app::streaming::StreamContent;
use informalsystems_malachitebft_app::types::ProposedValue;
use informalsystems_malachitebft_core_consensus::LocallyProposedValue;
use informalsystems_malachitebft_core_types::{CommitCertificate, Round, Validity};
use informalsystems_malachitebft_core_types::{CommitCertificate, Validity};
use informalsystems_malachitebft_engine::host::{HostMsg, HostRef, Next};
use informalsystems_malachitebft_sync::RawDecidedValue;
use ractor::{async_trait as ractor_async_trait, Actor, ActorProcessingErr, ActorRef};
Expand Down Expand Up @@ -377,6 +377,7 @@ impl Actor for CipherBftHost {
if proposal_part.first && proposal_part.last {
let height = proposal_part.height;
let round = proposal_part.round;
let valid_round = proposal_part.valid_round;
let proposer = proposal_part.proposer;
let value = ConsensusValue(proposal_part.cut);

Expand All @@ -385,15 +386,18 @@ impl Actor for CipherBftHost {
from = %from,
height = height.0,
round = %round,
valid_round = %valid_round,
proposer = %proposer,
"Assembled complete proposal from single part"
);

// Build ProposedValue for the received proposal
// Use the actual valid_round from the proposal (Round::Nil for fresh
// proposals, or the POL round for re-proposals)
let proposed_value = ProposedValue {
height,
round,
valid_round: Round::Nil,
valid_round,
proposer,
value,
validity: Validity::Valid,
Expand Down Expand Up @@ -754,16 +758,49 @@ impl ChannelValueBuilder {
}

/// Store a cut for consensus requests.
///
/// The cut is stored by its consensus height and can be retrieved by
/// `build_value()` when consensus requests a proposal for that height.
pub async fn store_cut(&self, height: ConsensusHeight, cut: Cut) {
// Log the cut being stored with diagnostic info
let cut_dcl_height = cut.height;

// Check for height mismatch between consensus height and DCL cut height
if height.0 != cut_dcl_height {
warn!(
"ChannelValueBuilder: Height mismatch - storing cut at consensus height {} \
but cut's DCL height is {}. This may indicate DCL/consensus synchronization issues.",
height.0, cut_dcl_height
);
}

debug!(
"ChannelValueBuilder: Storing Cut for height {} with {} cars",
height,
"ChannelValueBuilder: Storing Cut for consensus height {} (DCL height: {}) with {} cars",
height.0,
cut_dcl_height,
cut.cars.len()
);

{
let mut pending = self.pending_cuts.write().await;

// Log if we're overwriting an existing cut (shouldn't happen normally)
if pending.contains_key(&height) {
warn!(
"ChannelValueBuilder: Overwriting existing cut at height {}. \
This may indicate duplicate cut production.",
height.0
);
}

pending.insert(height, cut.clone());

// Log available heights for debugging
let available: Vec<u64> = pending.keys().map(|h| h.0).collect();
trace!(
"ChannelValueBuilder: Pending cuts available at heights: {:?}",
available
);
}

// Clean up old pending cuts (keep only last N heights based on config)
Expand All @@ -773,12 +810,30 @@ impl ChannelValueBuilder {
let heights: Vec<_> = pending.keys().cloned().collect();
let max_height = heights.iter().max().copied().unwrap_or(height);
let cutoff = max_height.0.saturating_sub(retention as u64);
let removed_count = pending.len();
pending.retain(|h, _| h.0 >= cutoff);
let removed = removed_count - pending.len();
if removed > 0 {
debug!(
"ChannelValueBuilder: Cleaned up {} old cuts, retaining heights >= {}",
removed, cutoff
);
}
}

// Notify any waiters
self.cut_notify.notify_waiters();
}

/// Get a snapshot of available pending cut heights.
///
/// Useful for debugging height synchronization issues.
pub async fn available_heights(&self) -> Vec<u64> {
let pending = self.pending_cuts.read().await;
let mut heights: Vec<u64> = pending.keys().map(|h| h.0).collect();
heights.sort_unstable();
heights
}
}

#[async_trait]
Expand All @@ -791,6 +846,7 @@ impl ValueBuilder for ChannelValueBuilder {
// Wait for a cut at this height with timeout
let timeout = Duration::from_secs(30);
let start = std::time::Instant::now();
let mut logged_waiting = false;

loop {
// Check if we have a cut
Expand All @@ -800,6 +856,11 @@ impl ValueBuilder for ChannelValueBuilder {
let value = ConsensusValue::from(cut.clone());
let value_id = informalsystems_malachitebft_core_types::Value::id(&value);

debug!(
"ChannelValueBuilder: Found cut for height {}, building proposal",
height.0
);

// Store by value_id for later lookup
self.cuts_by_value_id
.write()
Expand All @@ -812,16 +873,63 @@ impl ValueBuilder for ChannelValueBuilder {
value,
));
}

// Log available heights on first wait (helpful for debugging)
if !logged_waiting {
let available: Vec<u64> = pending.keys().map(|h| h.0).collect();
if available.is_empty() {
debug!(
"ChannelValueBuilder: Waiting for cut at height {}. \
No cuts currently available.",
height.0
);
} else {
debug!(
"ChannelValueBuilder: Waiting for cut at height {}. \
Available heights: {:?}",
height.0, available
);
}
logged_waiting = true;
}
}

// Check timeout
if start.elapsed() > timeout {
// Provide detailed error message for debugging
let available = self.available_heights().await;
let closest = if available.is_empty() {
"none".to_string()
} else {
// Find the closest available height
let closest_height = available
.iter()
.min_by_key(|h| (**h as i64 - height.0 as i64).abs())
.copied()
.unwrap_or(0);
format!("{} (available: {:?})", closest_height, available)
};

return Err(ConsensusError::Other(format!(
"Timeout: No cut available for height {} after {:?}",
height, timeout
"Timeout waiting for cut at height {} after {:?}. \
Closest available height: {}. \
This indicates DCL may be behind consensus or cuts are not being produced. \
Check DCL primary logs for batch/attestation activity.",
height.0, timeout, closest
)));
}

// Log progress periodically (every 5 seconds)
let elapsed = start.elapsed();
if elapsed.as_secs() > 0 && elapsed.as_secs().is_multiple_of(5) {
let available = self.available_heights().await;
warn!(
"ChannelValueBuilder: Still waiting for cut at height {} ({:?} elapsed). \
Available heights: {:?}",
height.0, elapsed, available
);
}

// Wait for notification or timeout
tokio::select! {
_ = self.cut_notify.notified() => {}
Expand Down
33 changes: 33 additions & 0 deletions crates/consensus/src/proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ impl MalachiteProposal<CipherBftContext> for CutProposal {
pub struct CutProposalPart {
pub height: ConsensusHeight,
pub round: Round,
/// The round in which the proposer received 2f+1 prevotes for this value.
/// This is `Round::Nil` for fresh proposals, or a valid round number for
/// re-proposals (POL - Proof-of-Lock round).
pub valid_round: Round,
pub proposer: ConsensusAddress,
pub cut: Cut,
pub first: bool,
Expand All @@ -120,6 +124,8 @@ impl BorshSerialize for CutProposalPart {
// Serialize metadata
self.height.serialize(writer)?;
(self.round.as_i64() as u32).serialize(writer)?;
// Serialize valid_round: use i64 to handle Round::Nil (-1) properly
(self.valid_round.as_i64()).serialize(writer)?;
self.proposer.serialize(writer)?;

// Use bincode for Cut (contains HashMap which doesn't implement borsh)
Expand All @@ -139,6 +145,13 @@ impl BorshDeserialize for CutProposalPart {
let height = ConsensusHeight::deserialize_reader(reader)?;
let round_val: u32 = BorshDeserialize::deserialize_reader(reader)?;
let round = Round::new(round_val);
// Deserialize valid_round: stored as i64 to handle Round::Nil (-1)
let valid_round_val: i64 = BorshDeserialize::deserialize_reader(reader)?;
let valid_round = if valid_round_val < 0 {
Round::Nil
} else {
Round::new(valid_round_val as u32)
};
let proposer = ConsensusAddress::deserialize_reader(reader)?;

// Deserialize cut
Expand All @@ -152,6 +165,7 @@ impl BorshDeserialize for CutProposalPart {
Ok(Self {
height,
round,
valid_round,
proposer,
cut,
first,
Expand All @@ -166,15 +180,26 @@ impl CutProposalPart {
/// Since the Malachite StreamMessage no longer carries height/round/proposer,
/// we embed this metadata directly in the proposal part for reconstruction
/// on the receiving end.
///
/// # Arguments
///
/// * `height` - The consensus height for this proposal
/// * `round` - The round in which this proposal is being made
/// * `valid_round` - The round in which the proposer received 2f+1 prevotes
/// for this value. Use `Round::Nil` for fresh proposals (not re-proposals).
/// * `proposer` - The address of the proposing validator
/// * `cut` - The Cut data being proposed
pub fn single(
height: ConsensusHeight,
round: Round,
valid_round: Round,
proposer: ConsensusAddress,
cut: Cut,
) -> Self {
Self {
height,
round,
valid_round,
proposer,
cut,
first: true,
Expand All @@ -192,6 +217,14 @@ impl CutProposalPart {
self.round
}

/// Accessor for valid_round (POL round).
///
/// Returns `Round::Nil` for fresh proposals, or the round number where
/// the proposer received 2f+1 prevotes for this value (re-proposals).
pub fn valid_round(&self) -> Round {
self.valid_round
}

/// Accessor for proposer address
pub fn proposer(&self) -> &ConsensusAddress {
&self.proposer
Expand Down
72 changes: 32 additions & 40 deletions crates/data-chain/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,47 +99,39 @@ pub struct AggregatedAttestation {
}

impl AggregatedAttestation {
/// Create from individual attestations
/// Create from individual attestations without validator indices.
///
/// # Panics
/// Panics if attestations is empty or attestations have different car_hash
pub fn aggregate(attestations: &[Attestation], validator_count: usize) -> Option<Self> {
if attestations.is_empty() {
return None;
}

let first = &attestations[0];
let car_hash = first.car_hash;
let car_position = first.car_position;
let car_proposer = first.car_proposer;

// Verify all attestations are for the same Car
for att in attestations {
if att.car_hash != car_hash {
return None;
}
}

// Build validator bitmap and collect signatures
// Note: Without index mapping, we cannot populate the bitmap correctly
// Use aggregate_with_indices() for proper bitmap population
let validators = bitvec![u8, Lsb0; 0; validator_count];
let mut sigs: Vec<&BlsSignature> = Vec::with_capacity(attestations.len());

for att in attestations {
sigs.push(&att.signature);
}

// Aggregate signatures
let aggregated_signature = BlsAggregateSignature::aggregate(&sigs).ok()?;

Some(Self {
car_hash,
car_position,
car_proposer,
validators,
aggregated_signature,
})
/// # Deprecated
///
/// This method is **deprecated** and will always return `None`. The method cannot
/// correctly populate the validator bitmap without knowing each attestation's
/// validator index, which means the resulting `AggregatedAttestation` would fail
/// verification since `verify()` relies on the bitmap to select public keys.
///
/// Use [`aggregate_with_indices`](Self::aggregate_with_indices) instead, which
/// takes `(Attestation, usize)` pairs where the `usize` is the validator's index
/// in the validator set.
///
/// For proposers creating self-attestations, use
/// [`aggregate_with_self`](Self::aggregate_with_self) which handles the proposer's
/// own attestation separately.
///
/// # Returns
///
/// Always returns `None`. Use `aggregate_with_indices()` for proper aggregation.
#[deprecated(
since = "0.1.0",
note = "Cannot populate bitmap without validator indices. Use aggregate_with_indices() instead."
)]
pub fn aggregate(_attestations: &[Attestation], _validator_count: usize) -> Option<Self> {
// This method cannot work correctly without validator index mapping.
// The bitmap must be populated with the correct validator indices for
// verify() to work, but we don't have that information here.
//
// Previously this returned an AggregatedAttestation with an all-zero bitmap,
// which would always fail verification. Now we return None to make the
// failure explicit at aggregation time rather than verification time.
None
}

/// Create from attestations with validator index mapping
Expand Down