Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ impl TestChainMonitor {
Arc::clone(&persister),
Arc::clone(&keys),
keys.get_peer_storage_key(),
false,
)),
logger,
keys,
Expand Down
1 change: 1 addition & 0 deletions fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger>) {
Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }),
Arc::clone(&keys_manager),
keys_manager.get_peer_storage_key(),
false,
));

let network = Network::Bitcoin;
Expand Down
1 change: 1 addition & 0 deletions fuzz/src/lsps_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub fn do_test(data: &[u8]) {
Arc::clone(&kv_store),
Arc::clone(&keys_manager),
keys_manager.get_peer_storage_key(),
false,
));
let best_block = BestBlock::from_network(network);
let params = ChainParameters { network, best_block };
Expand Down
72 changes: 71 additions & 1 deletion lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,10 @@ where

let mut futures = Joiner::new();

// Capture the pending count before persisting. Only this many writes will be
// flushed afterward, so that updates arriving after persist aren't included.
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");

Expand Down Expand Up @@ -1317,6 +1321,10 @@ where
res?;
}

// Flush monitor writes that were pending before we persisted. New updates that
// arrived after are left for the next iteration.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);

match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
sleeper(ONION_MESSAGE_HANDLER_TIMER)
}) {
Expand Down Expand Up @@ -1373,6 +1381,7 @@ where
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
kv_store
.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand All @@ -1381,6 +1390,10 @@ where
channel_manager.get_cm().encode(),
)
.await?;

// Flush monitor writes that were pending before final persistence.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);

if let Some(ref scorer) = scorer {
kv_store
.write(
Expand Down Expand Up @@ -1684,6 +1697,11 @@ impl BackgroundProcessor {
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = Instant::now();
}

// Capture the pending count before persisting. Only this many writes will be
// flushed afterward, so that updates arriving after persist aren't included.
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
log_trace!(logger, "Persisting ChannelManager...");
(kv_store.write(
Expand All @@ -1695,6 +1713,10 @@ impl BackgroundProcessor {
log_trace!(logger, "Done persisting ChannelManager.");
}

// Flush monitor writes that were pending before we persisted. New updates
// that arrived after are left for the next iteration.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
log_trace!(logger, "Persisting LiquidityManager...");
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
Expand Down Expand Up @@ -1809,12 +1831,17 @@ impl BackgroundProcessor {
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
kv_store.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)?;

// Flush monitor writes that were pending before final persistence.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);

if let Some(ref scorer) = scorer {
kv_store.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand Down Expand Up @@ -1896,9 +1923,10 @@ mod tests {
use bitcoin::transaction::{Transaction, TxOut};
use bitcoin::{Amount, ScriptBuf, Txid};
use core::sync::atomic::{AtomicBool, Ordering};
use lightning::chain::chainmonitor;
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use lightning::chain::transaction::OutPoint;
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
use lightning::chain::{BestBlock, Confirm, Filter};
use lightning::events::{Event, PathFailure, ReplayEvent};
use lightning::ln::channelmanager;
use lightning::ln::channelmanager::{
Expand Down Expand Up @@ -2444,6 +2472,7 @@ mod tests {
Arc::clone(&kv_store),
Arc::clone(&keys_manager),
keys_manager.get_peer_storage_key(),
true,
));
let best_block = BestBlock::from_network(network);
let params = ChainParameters { network, best_block };
Expand Down Expand Up @@ -2565,6 +2594,8 @@ mod tests {
(persist_dir, nodes)
}

/// Opens a channel between two nodes without a running `BackgroundProcessor`,
/// so deferred monitor operations are flushed manually at each step.
macro_rules! open_channel {
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
begin_open_channel!($node_a, $node_b, $channel_value);
Expand All @@ -2580,19 +2611,31 @@ mod tests {
tx.clone(),
)
.unwrap();
// funding_transaction_generated does not call watch_channel, so no
// deferred op is queued and FundingCreated is available immediately.
let msg_a = get_event_msg!(
$node_a,
MessageSendEvent::SendFundingCreated,
$node_b.node.get_our_node_id()
);
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
// Flush node_b's new monitor (watch_channel) so it releases the
// FundingSigned message.
$node_b
.chain_monitor
.flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger);
get_event!($node_b, Event::ChannelPending);
let msg_b = get_event_msg!(
$node_b,
MessageSendEvent::SendFundingSigned,
$node_a.node.get_our_node_id()
);
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
// Flush node_a's new monitor (watch_channel) queued by
// handle_funding_signed.
$node_a
.chain_monitor
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
get_event!($node_a, Event::ChannelPending);
tx
}};
Expand Down Expand Up @@ -2699,6 +2742,20 @@ mod tests {
confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
}

/// Waits until the background processor has flushed all pending deferred monitor
/// operations for the given node. Panics if the pending count does not reach zero
/// within `EVENT_DEADLINE`.
fn wait_for_flushed(chain_monitor: &ChainMonitor) {
let start = std::time::Instant::now();
while chain_monitor.pending_operation_count() > 0 {
assert!(
start.elapsed() < EVENT_DEADLINE,
"Pending monitor operations were not flushed within deadline"
);
std::thread::sleep(Duration::from_millis(10));
}
}

#[test]
fn test_background_processor() {
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
Expand Down Expand Up @@ -3039,11 +3096,21 @@ mod tests {
.node
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
.unwrap();
// funding_transaction_generated does not call watch_channel, so no deferred op is
// queued and the FundingCreated message is available immediately.
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
// Node 1 has no bg processor, flush its new monitor (watch_channel) manually so
// events and FundingSigned are released.
nodes[1]
.chain_monitor
.flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger);
get_event!(nodes[1], Event::ChannelPending);
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
// Wait for the bg processor to flush the new monitor (watch_channel) queued by
// handle_funding_signed.
wait_for_flushed(&nodes[0].chain_monitor);
channel_pending_recv
.recv_timeout(EVENT_DEADLINE)
.expect("ChannelPending not handled within deadline");
Expand Down Expand Up @@ -3104,6 +3171,9 @@ mod tests {
error_message.to_string(),
)
.unwrap();
// Wait for the bg processor to flush the monitor update triggered by force close
// so the commitment tx is broadcast.
wait_for_flushed(&nodes[0].chain_monitor);
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);

Expand Down
Loading
Loading