Skip to content

Commit 93ff6c9

Browse files
joostjagerclaude
andcommitted
Defer MonitorUpdatingPersister writes to flush()
Update MonitorUpdatingPersister and MonitorUpdatingPersisterAsync to queue persist operations in memory instead of writing immediately to disk. The Persist trait methods now return ChannelMonitorUpdateStatus:: InProgress and the actual writes happen when flush() is called. This fixes a race condition that could cause channel force closures: previously, if the node crashed after writing channel monitors but before writing the channel manager, the monitors would be ahead of the manager on restart. By deferring monitor writes until after the channel manager is persisted (via flush()), we ensure the manager is always at least as up-to-date as the monitors. The flush() method takes an optional count parameter to flush only a specific number of queued writes. The background processor captures the queue size before persisting the channel manager, then flushes exactly that many writes afterward. This prevents flushing monitor updates that arrived after the manager state was captured. Key changes: - Add PendingWrite enum to represent queued write/remove operations - Add pending_writes queue to MonitorUpdatingPersisterAsyncInner - Add pending_write_count() and flush(count) to Persist trait and ChainMonitor - ChainMonitor::flush() calls channel_monitor_updated for each completed write - Update Persist impl to queue writes and return InProgress - Call flush() in background processor after channel manager persistence - Remove unused event_notifier from AsyncPersister Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent b4fb555 commit 93ff6c9

3 files changed

Lines changed: 257 additions & 213 deletions

File tree

lightning-background-processor/src/lib.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,6 +1152,11 @@ where
11521152

11531153
let mut futures = Joiner::new();
11541154

1155+
// Capture the number of pending monitor writes before persisting the channel manager.
1156+
// We'll only flush this many writes after the manager is persisted, to avoid flushing
1157+
// monitor updates that arrived after the manager state was captured.
1158+
let pending_monitor_writes = chain_monitor.pending_write_count();
1159+
11551160
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11561161
log_trace!(logger, "Persisting ChannelManager...");
11571162

@@ -1349,6 +1354,14 @@ where
13491354
res?;
13501355
}
13511356

1357+
// Flush the monitor writes that were pending before we persisted the channel manager.
1358+
// Any writes that arrived after are left in the queue for the next iteration.
1359+
if pending_monitor_writes > 0 {
1360+
if let Err(e) = chain_monitor.flush(Some(pending_monitor_writes)) {
1361+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1362+
}
1363+
}
1364+
13521365
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
13531366
sleeper(ONION_MESSAGE_HANDLER_TIMER)
13541367
}) {
@@ -1413,6 +1426,12 @@ where
14131426
channel_manager.get_cm().encode(),
14141427
)
14151428
.await?;
1429+
1430+
// Flush all pending monitor writes after final channel manager persistence.
1431+
if let Err(e) = chain_monitor.flush(None) {
1432+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1433+
}
1434+
14161435
if let Some(ref scorer) = scorer {
14171436
kv_store
14181437
.write(
@@ -1722,6 +1741,9 @@ impl BackgroundProcessor {
17221741
channel_manager.get_cm().timer_tick_occurred();
17231742
last_freshness_call = Instant::now();
17241743
}
1744+
// Capture the number of pending monitor writes before persisting the channel manager.
1745+
let pending_monitor_writes = chain_monitor.pending_write_count();
1746+
17251747
if channel_manager.get_cm().get_and_clear_needs_persistence() {
17261748
log_trace!(logger, "Persisting ChannelManager...");
17271749
(kv_store.write(
@@ -1733,6 +1755,13 @@ impl BackgroundProcessor {
17331755
log_trace!(logger, "Done persisting ChannelManager.");
17341756
}
17351757

1758+
// Flush the monitor writes that were pending before we persisted the channel manager.
1759+
if pending_monitor_writes > 0 {
1760+
if let Err(e) = chain_monitor.flush(Some(pending_monitor_writes)) {
1761+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1762+
}
1763+
}
1764+
17361765
if let Some(liquidity_manager) = liquidity_manager.as_ref() {
17371766
log_trace!(logger, "Persisting LiquidityManager...");
17381767
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
@@ -1853,6 +1882,12 @@ impl BackgroundProcessor {
18531882
CHANNEL_MANAGER_PERSISTENCE_KEY,
18541883
channel_manager.get_cm().encode(),
18551884
)?;
1885+
1886+
// Flush all pending monitor writes after final channel manager persistence.
1887+
if let Err(e) = chain_monitor.flush(None) {
1888+
log_error!(logger, "Failed to flush chain monitor: {}", e);
1889+
}
1890+
18561891
if let Some(ref scorer) = scorer {
18571892
kv_store.write(
18581893
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,

lightning/src/chain/chainmonitor.rs

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ use crate::chain::channelmonitor::{
3939
use crate::chain::transaction::{OutPoint, TransactionData};
4040
use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Filter, WatchedOutput};
4141
use crate::events::{self, Event, EventHandler, ReplayEvent};
42+
use crate::io;
4243
use crate::ln::channel_state::ChannelDetails;
4344
#[cfg(peer_storage)]
4445
use crate::ln::msgs::PeerStorage;
@@ -198,16 +199,25 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
198199
/// the monitor already exists in the archive.
199200
fn archive_persisted_channel(&self, monitor_name: MonitorName);
200201

201-
/// Fetches the set of [`ChannelMonitorUpdate`]s, previously persisted with
202-
/// [`Self::update_persisted_channel`], which have completed.
202+
/// Returns the number of pending writes in the queue.
203203
///
204-
/// Returning an update here is equivalent to calling
205-
/// [`ChainMonitor::channel_monitor_updated`]. Because of this, this method is defaulted and
206-
/// hidden in the docs.
207-
#[doc(hidden)]
208-
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
209-
Vec::new()
204+
/// This can be used to capture the queue size before persisting the channel manager,
205+
/// then pass that count to [`Self::flush`] to only flush those specific updates.
206+
fn pending_write_count(&self) -> usize {
207+
0
210208
}
209+
210+
/// Flushes pending writes to the underlying storage.
211+
///
212+
/// If `count` is `Some(n)`, only the first `n` pending writes are flushed.
213+
/// If `count` is `None`, all pending writes are flushed.
214+
///
215+
/// For implementations that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
216+
/// from persist methods), this method should write queued data to storage.
217+
///
218+
/// Returns the list of completed updates (channel_id, update_id) on success, or an error if
219+
/// any write failed.
220+
fn flush(&self, count: Option<usize>) -> Result<Vec<(ChannelId, u64)>, io::Error>;
211221
}
212222

213223
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -272,7 +282,6 @@ pub struct AsyncPersister<
272282
FE::Target: FeeEstimator,
273283
{
274284
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, BI, FE>,
275-
event_notifier: Arc<Notifier>,
276285
}
277286

278287
impl<
@@ -320,26 +329,28 @@ where
320329
&self, monitor_name: MonitorName,
321330
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
322331
) -> ChannelMonitorUpdateStatus {
323-
let notifier = Arc::clone(&self.event_notifier);
324-
self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier);
332+
self.persister.queue_new_channel(monitor_name, monitor);
325333
ChannelMonitorUpdateStatus::InProgress
326334
}
327335

328336
fn update_persisted_channel(
329337
&self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>,
330338
monitor: &ChannelMonitor<<SP::Target as SignerProvider>::EcdsaSigner>,
331339
) -> ChannelMonitorUpdateStatus {
332-
let notifier = Arc::clone(&self.event_notifier);
333-
self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier);
340+
self.persister.queue_channel_update(monitor_name, monitor_update, monitor);
334341
ChannelMonitorUpdateStatus::InProgress
335342
}
336343

337344
fn archive_persisted_channel(&self, monitor_name: MonitorName) {
338345
self.persister.spawn_async_archive_persisted_channel(monitor_name);
339346
}
340347

341-
fn get_and_clear_completed_updates(&self) -> Vec<(ChannelId, u64)> {
342-
self.persister.get_and_clear_completed_updates()
348+
fn pending_write_count(&self) -> usize {
349+
self.persister.pending_write_count()
350+
}
351+
352+
fn flush(&self, count: Option<usize>) -> Result<Vec<(ChannelId, u64)>, io::Error> {
353+
crate::util::persist::poll_sync_future(self.persister.flush(count))
343354
}
344355
}
345356

@@ -440,7 +451,6 @@ impl<
440451
persister: MonitorUpdatingPersisterAsync<K, S, L, ES, SP, T, F>, _entropy_source: ES,
441452
_our_peerstorage_encryption_key: PeerStorageKey,
442453
) -> Self {
443-
let event_notifier = Arc::new(Notifier::new());
444454
Self {
445455
monitors: RwLock::new(new_hash_map()),
446456
chain_source,
@@ -450,8 +460,8 @@ impl<
450460
_entropy_source,
451461
pending_monitor_events: Mutex::new(Vec::new()),
452462
highest_chain_height: AtomicUsize::new(0),
453-
event_notifier: Arc::clone(&event_notifier),
454-
persister: AsyncPersister { persister, event_notifier },
463+
event_notifier: Arc::new(Notifier::new()),
464+
persister: AsyncPersister { persister },
455465
pending_send_only_events: Mutex::new(Vec::new()),
456466
#[cfg(peer_storage)]
457467
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
@@ -742,6 +752,33 @@ where
742752
.collect()
743753
}
744754

755+
/// Returns the number of pending writes in the persister queue.
756+
///
757+
/// This can be used to capture the queue size before persisting the channel manager,
758+
/// then pass that count to [`Self::flush`] to only flush those specific updates.
759+
pub fn pending_write_count(&self) -> usize {
760+
self.persister.pending_write_count()
761+
}
762+
763+
/// Flushes pending writes to the underlying storage.
764+
///
765+
/// If `count` is `Some(n)`, only the first `n` pending writes are flushed.
766+
/// If `count` is `None`, all pending writes are flushed.
767+
///
768+
/// For persisters that queue writes (returning [`ChannelMonitorUpdateStatus::InProgress`]
769+
/// from persist methods), this method writes queued data to storage and signals
770+
/// completion to the channel manager via [`Self::channel_monitor_updated`].
771+
///
772+
/// Returns the list of completed updates (channel_id, update_id) on success, or an error if
773+
/// any write failed. Note that even if an error is returned, some writes may have succeeded.
774+
pub fn flush(&self, count: Option<usize>) -> Result<Vec<(ChannelId, u64)>, io::Error> {
775+
let completed = self.persister.flush(count)?;
776+
for (channel_id, update_id) in &completed {
777+
let _ = self.channel_monitor_updated(*channel_id, *update_id);
778+
}
779+
Ok(completed)
780+
}
781+
745782
#[cfg(any(test, feature = "_test_utils"))]
746783
pub fn remove_monitor(&self, channel_id: &ChannelId) -> ChannelMonitor<ChannelSigner> {
747784
self.monitors.write().unwrap().remove(channel_id).unwrap().monitor
@@ -1497,9 +1534,6 @@ where
14971534
fn release_pending_monitor_events(
14981535
&self,
14991536
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
1500-
for (channel_id, update_id) in self.persister.get_and_clear_completed_updates() {
1501-
let _ = self.channel_monitor_updated(channel_id, update_id);
1502-
}
15031537
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
15041538
for monitor_state in self.monitors.read().unwrap().values() {
15051539
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();

0 commit comments

Comments
 (0)