Skip to content

Commit c6bb096

Browse files
Prefer legacy forward maps on manager read
We are working on removing the requirement of regularly persisting the ChannelManager, and as a result began reconstructing the manager's forwards maps from Channel data on startup in a recent PR, see cb398f6 and parent commits. At the time, we implemented ChannelManager::read to prefer to use the newly reconstructed maps, partly to ensure we have test coverage of the new maps' usage. This resulted in a lot of code that would deduplicate HTLCs that were present in the old maps to avoid redundant HTLC handling/duplicate forwards, adding extra complexity. Instead, prefer to use the old maps if they are present (which will always be the case in prod, for now), but avoid writing the legacy maps in test mode so tests will always exercise the new paths.
1 parent bacd80e commit c6bb096

File tree

1 file changed

+91
-175
lines changed

1 file changed

+91
-175
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 91 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -11758,6 +11758,10 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
1175811758

1175911759
if !new_intercept_events.is_empty() {
1176011760
let mut events = self.pending_events.lock().unwrap();
11761+
// It's possible we processed this intercept forward, generated an event, then re-processed
11762+
// it here after restart, in which case the intercept event should not be pushed
11763+
// redundantly.
11764+
new_intercept_events.retain(|ev| !events.contains(ev));
1176111765
events.append(&mut new_intercept_events);
1176211766
}
1176311767
}
@@ -16681,7 +16685,18 @@ where
1668116685
}
1668216686
}
1668316687

16684-
{
16688+
// In LDK 0.2 and below, the `ChannelManager` would track all payments and HTLCs internally and
16689+
// persist that state, relying on it being up-to-date on restart. Newer versions are moving
16690+
// towards reducing this reliance on regular persistence of the `ChannelManager`, and instead
16691+
// reconstruct HTLC/payment state based on `Channel{Monitor}` data if
16692+
// `reconstruct_manager_from_monitors` is set on read. In tests, we want to always use the new
16693+
// codepaths.
16694+
#[cfg(not(any(test, feature = "_test_utils")))]
16695+
let reconstruct_manager_from_monitors = false;
16696+
#[cfg(any(test, feature = "_test_utils"))]
16697+
let reconstruct_manager_from_monitors = true;
16698+
16699+
if !reconstruct_manager_from_monitors {
1668516700
let forward_htlcs = self.forward_htlcs.lock().unwrap();
1668616701
(forward_htlcs.len() as u64).write(writer)?;
1668716702
for (short_channel_id, pending_forwards) in forward_htlcs.iter() {
@@ -16691,12 +16706,16 @@ where
1669116706
forward.write(writer)?;
1669216707
}
1669316708
}
16709+
} else {
16710+
0u64.write(writer)?;
1669416711
}
1669516712

1669616713
let mut decode_update_add_htlcs_opt = None;
16697-
let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
16698-
if !decode_update_add_htlcs.is_empty() {
16699-
decode_update_add_htlcs_opt = Some(decode_update_add_htlcs);
16714+
if !reconstruct_manager_from_monitors {
16715+
let decode_update_add_htlcs = self.decode_update_add_htlcs.lock().unwrap();
16716+
if !decode_update_add_htlcs.is_empty() {
16717+
decode_update_add_htlcs_opt = Some(decode_update_add_htlcs);
16718+
}
1670016719
}
1670116720

1670216721
let claimable_payments = self.claimable_payments.lock().unwrap();
@@ -16842,9 +16861,11 @@ where
1684216861
}
1684316862

1684416863
let mut pending_intercepted_htlcs = None;
16845-
let our_pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap();
16846-
if our_pending_intercepts.len() != 0 {
16847-
pending_intercepted_htlcs = Some(our_pending_intercepts);
16864+
if !reconstruct_manager_from_monitors {
16865+
let our_pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap();
16866+
if our_pending_intercepts.len() != 0 {
16867+
pending_intercepted_htlcs = Some(our_pending_intercepts);
16868+
}
1684816869
}
1684916870

1685016871
let mut pending_claiming_payments = Some(&claimable_payments.pending_claiming_payments);
@@ -16885,6 +16906,7 @@ where
1688516906
(17, in_flight_monitor_updates, option),
1688616907
(19, peer_storage_dir, optional_vec),
1688716908
(21, WithoutLength(&self.flow.writeable_async_receive_offer_cache()), required),
16909+
(23, reconstruct_manager_from_monitors, required),
1688816910
});
1688916911

1689016912
// Remove the SpliceFailed events added earlier.
@@ -17597,9 +17619,10 @@ where
1759717619
};
1759817620
}
1759917621

17600-
// Some maps are read but may no longer be used because we attempt to rebuild the pending HTLC
17601-
// set from the `Channel{Monitor}`s instead, as a step towards removing the requirement of
17602-
// regularly persisting the `ChannelManager`.
17622+
// In LDK versions >0.2, we are taking steps to remove the requirement of regularly peristing
17623+
// the `ChannelManager`. To that end, if `reconstruct_manager_from_monitors` is set below, we
17624+
// will rebuild the pending HTLC set using data from the `Channel{Monitor}`s instead and ignore
17625+
// these legacy maps.
1760317626
let mut pending_intercepted_htlcs_legacy: Option<HashMap<InterceptId, PendingAddHTLCInfo>> =
1760417627
None;
1760517628
let mut decode_update_add_htlcs_legacy: Option<HashMap<u64, Vec<msgs::UpdateAddHTLC>>> =
@@ -17629,6 +17652,7 @@ where
1762917652
let mut inbound_payment_id_secret = None;
1763017653
let mut peer_storage_dir: Option<Vec<(PublicKey, Vec<u8>)>> = None;
1763117654
let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new();
17655+
let mut reconstruct_manager_from_monitors = false;
1763217656
read_tlv_fields!(reader, {
1763317657
(1, pending_outbound_payments_no_retry, option),
1763417658
(2, pending_intercepted_htlcs_legacy, option),
@@ -17647,6 +17671,7 @@ where
1764717671
(17, in_flight_monitor_updates, option),
1764817672
(19, peer_storage_dir, optional_vec),
1764917673
(21, async_receive_offer_cache, (default_value, async_receive_offer_cache)),
17674+
(23, reconstruct_manager_from_monitors, (default_value, false)),
1765017675
});
1765117676
let mut decode_update_add_htlcs_legacy =
1765217677
decode_update_add_htlcs_legacy.unwrap_or_else(|| new_hash_map());
@@ -17964,18 +17989,20 @@ where
1796417989
let mut peer_state_lock = peer_state_mtx.lock().unwrap();
1796517990
let peer_state = &mut *peer_state_lock;
1796617991
is_channel_closed = !peer_state.channel_by_id.contains_key(channel_id);
17967-
if let Some(chan) = peer_state.channel_by_id.get(channel_id) {
17968-
if let Some(funded_chan) = chan.as_funded() {
17969-
let inbound_committed_update_adds =
17970-
funded_chan.get_inbound_committed_update_adds();
17971-
if !inbound_committed_update_adds.is_empty() {
17972-
// Reconstruct `ChannelManager::decode_update_add_htlcs` from the serialized
17973-
// `Channel`, as part of removing the requirement to regularly persist the
17974-
// `ChannelManager`.
17975-
decode_update_add_htlcs.insert(
17976-
funded_chan.context.outbound_scid_alias(),
17977-
inbound_committed_update_adds,
17978-
);
17992+
if reconstruct_manager_from_monitors {
17993+
if let Some(chan) = peer_state.channel_by_id.get(channel_id) {
17994+
if let Some(funded_chan) = chan.as_funded() {
17995+
let inbound_committed_update_adds =
17996+
funded_chan.get_inbound_committed_update_adds();
17997+
if !inbound_committed_update_adds.is_empty() {
17998+
// Reconstruct `ChannelManager::decode_update_add_htlcs` from the serialized
17999+
// `Channel`, as part of removing the requirement to regularly persist the
18000+
// `ChannelManager`.
18001+
decode_update_add_htlcs.insert(
18002+
funded_chan.context.outbound_scid_alias(),
18003+
inbound_committed_update_adds,
18004+
);
18005+
}
1797918006
}
1798018007
}
1798118008
}
@@ -18030,15 +18057,18 @@ where
1803018057
info.prev_funding_outpoint == prev_hop_data.outpoint
1803118058
&& info.prev_htlc_id == prev_hop_data.htlc_id
1803218059
};
18033-
// We always add all inbound committed HTLCs to `decode_update_add_htlcs` in the above
18034-
// loop, but we need to prune from those added HTLCs if they were already forwarded to
18035-
// the outbound edge. Otherwise, we'll double-forward.
18036-
dedup_decode_update_add_htlcs(
18037-
&mut decode_update_add_htlcs,
18038-
&prev_hop_data,
18039-
"HTLC was forwarded to the closed channel",
18040-
&args.logger,
18041-
);
18060+
// If `reconstruct_manager_from_monitors` is set, we always add all inbound committed
18061+
// HTLCs to `decode_update_add_htlcs` in the above loop, but we need to prune from
18062+
// those added HTLCs if they were already forwarded to the outbound edge. Otherwise,
18063+
// we'll double-forward.
18064+
if reconstruct_manager_from_monitors {
18065+
dedup_decode_update_add_htlcs(
18066+
&mut decode_update_add_htlcs,
18067+
&prev_hop_data,
18068+
"HTLC was forwarded to the closed channel",
18069+
&args.logger,
18070+
);
18071+
}
1804218072
if is_channel_closed {
1804318073
// The ChannelMonitor is now responsible for this HTLC's
1804418074
// failure/success and will let us know what its outcome is. If we
@@ -18547,101 +18577,49 @@ where
1854718577
}
1854818578
}
1854918579

18550-
// De-duplicate HTLCs that are present in both `failed_htlcs` and `decode_update_add_htlcs`.
18551-
// Omitting this de-duplication could lead to redundant HTLC processing and/or bugs.
18552-
for (src, _, _, _, _, _) in failed_htlcs.iter() {
18553-
if let HTLCSource::PreviousHopData(prev_hop_data) = src {
18554-
dedup_decode_update_add_htlcs(
18555-
&mut decode_update_add_htlcs,
18556-
prev_hop_data,
18557-
"HTLC was failed backwards during manager read",
18558-
&args.logger,
18559-
);
18560-
}
18561-
}
18562-
18563-
// See above comment on `failed_htlcs`.
18564-
for htlcs in claimable_payments.values().map(|pmt| &pmt.htlcs) {
18565-
for prev_hop_data in htlcs.iter().map(|h| &h.prev_hop) {
18566-
dedup_decode_update_add_htlcs(
18567-
&mut decode_update_add_htlcs,
18568-
prev_hop_data,
18569-
"HTLC was already decoded and marked as a claimable payment",
18570-
&args.logger,
18571-
);
18572-
}
18573-
}
18574-
18575-
// Remove HTLCs from `forward_htlcs` if they are also present in `decode_update_add_htlcs`.
18576-
//
18577-
// In the future, the full set of pending HTLCs will be pulled from `Channel{Monitor}` data and
18578-
// placed in `ChannelManager::decode_update_add_htlcs` on read, to be handled on the next call
18579-
// to `process_pending_htlc_forwards`. This is part of a larger effort to remove the requirement
18580-
// of regularly persisting the `ChannelManager`. The new pipeline is supported for HTLC forwards
18581-
// received on LDK 0.3+ but not <= 0.2, so prune non-legacy HTLCs from `forward_htlcs`.
18582-
forward_htlcs_legacy.retain(|scid, pending_fwds| {
18583-
for fwd in pending_fwds {
18584-
let (prev_scid, prev_htlc_id) = match fwd {
18585-
HTLCForwardInfo::AddHTLC(htlc) => {
18586-
(htlc.prev_outbound_scid_alias, htlc.prev_htlc_id)
18587-
},
18588-
HTLCForwardInfo::FailHTLC { htlc_id, .. }
18589-
| HTLCForwardInfo::FailMalformedHTLC { htlc_id, .. } => (*scid, *htlc_id),
18590-
};
18591-
if let Some(pending_update_adds) = decode_update_add_htlcs.get_mut(&prev_scid) {
18592-
if pending_update_adds
18593-
.iter()
18594-
.any(|update_add| update_add.htlc_id == prev_htlc_id)
18595-
{
18596-
return false;
18597-
}
18580+
if reconstruct_manager_from_monitors {
18581+
// De-duplicate HTLCs that are present in both `failed_htlcs` and `decode_update_add_htlcs`.
18582+
// Omitting this de-duplication could lead to redundant HTLC processing and/or bugs.
18583+
for (src, _, _, _, _, _) in failed_htlcs.iter() {
18584+
if let HTLCSource::PreviousHopData(prev_hop_data) = src {
18585+
dedup_decode_update_add_htlcs(
18586+
&mut decode_update_add_htlcs,
18587+
prev_hop_data,
18588+
"HTLC was failed backwards during manager read",
18589+
&args.logger,
18590+
);
1859818591
}
1859918592
}
18600-
true
18601-
});
18602-
// Remove intercepted HTLC forwards if they are also present in `decode_update_add_htlcs`. See
18603-
// the above comment.
18604-
pending_intercepted_htlcs_legacy.retain(|id, fwd| {
18605-
let prev_scid = fwd.prev_outbound_scid_alias;
18606-
if let Some(pending_update_adds) = decode_update_add_htlcs.get_mut(&prev_scid) {
18607-
if pending_update_adds
18608-
.iter()
18609-
.any(|update_add| update_add.htlc_id == fwd.prev_htlc_id)
18610-
{
18611-
pending_events_read.retain(
18612-
|(ev, _)| !matches!(ev, Event::HTLCIntercepted { intercept_id, .. } if intercept_id == id),
18593+
18594+
// See above comment on `failed_htlcs`.
18595+
for htlcs in claimable_payments.values().map(|pmt| &pmt.htlcs) {
18596+
for prev_hop_data in htlcs.iter().map(|h| &h.prev_hop) {
18597+
dedup_decode_update_add_htlcs(
18598+
&mut decode_update_add_htlcs,
18599+
prev_hop_data,
18600+
"HTLC was already decoded and marked as a claimable payment",
18601+
&args.logger,
1861318602
);
18614-
return false;
1861518603
}
1861618604
}
18605+
}
18606+
18607+
// If we have a pending intercept HTLC present but no corresponding event, add that now rather
18608+
// than relying on the user having persisted the event prior to shutdown.
18609+
for (id, intercept) in pending_intercepted_htlcs_legacy.iter() {
1861718610
if !pending_events_read.iter().any(
1861818611
|(ev, _)| matches!(ev, Event::HTLCIntercepted { intercept_id, .. } if intercept_id == id),
1861918612
) {
18620-
match create_htlc_intercepted_event(*id, &fwd) {
18613+
match create_htlc_intercepted_event(*id, intercept) {
1862118614
Ok(ev) => pending_events_read.push_back((ev, None)),
1862218615
Err(()) => debug_assert!(false),
1862318616
}
1862418617
}
18625-
true
18626-
});
18627-
// Add legacy update_adds that were received on LDK <= 0.2 that are not present in the
18628-
// `decode_update_add_htlcs` map that was rebuilt from `Channel{Monitor}` data, see above
18629-
// comment.
18630-
for (scid, legacy_update_adds) in decode_update_add_htlcs_legacy.drain() {
18631-
match decode_update_add_htlcs.entry(scid) {
18632-
hash_map::Entry::Occupied(mut update_adds) => {
18633-
for legacy_update_add in legacy_update_adds {
18634-
if !update_adds.get().contains(&legacy_update_add) {
18635-
update_adds.get_mut().push(legacy_update_add);
18636-
}
18637-
}
18638-
},
18639-
hash_map::Entry::Vacant(entry) => {
18640-
entry.insert(legacy_update_adds);
18641-
},
18642-
}
1864318618
}
1864418619

18620+
if !reconstruct_manager_from_monitors {
18621+
decode_update_add_htlcs = decode_update_add_htlcs_legacy;
18622+
}
1864518623
let best_block = BestBlock::new(best_block_hash, best_block_height);
1864618624
let flow = OffersMessageFlow::new(
1864718625
chain_hash,
@@ -19006,12 +18984,11 @@ where
1900618984
mod tests {
1900718985
use crate::events::{ClosureReason, Event, HTLCHandlingFailureType};
1900818986
use crate::ln::channelmanager::{
19009-
create_recv_pending_htlc_info, inbound_payment, HTLCForwardInfo, InterceptId, PaymentId,
18987+
create_recv_pending_htlc_info, inbound_payment, InterceptId, PaymentId,
1901018988
RecipientOnionFields,
1901118989
};
1901218990
use crate::ln::functional_test_utils::*;
1901318991
use crate::ln::msgs::{self, BaseMessageHandler, ChannelMessageHandler, MessageSendEvent};
19014-
use crate::ln::onion_utils::AttributionData;
1901518992
use crate::ln::onion_utils::{self, LocalHTLCFailureReason};
1901618993
use crate::ln::outbound_payment::Retry;
1901718994
use crate::ln::types::ChannelId;
@@ -19021,7 +18998,6 @@ mod tests {
1902118998
use crate::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret};
1902218999
use crate::util::config::{ChannelConfig, ChannelConfigUpdate};
1902319000
use crate::util::errors::APIError;
19024-
use crate::util::ser::Writeable;
1902519001
use crate::util::test_utils;
1902619002
use bitcoin::secp256k1::ecdh::SharedSecret;
1902719003
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
@@ -20079,66 +20055,6 @@ mod tests {
2007920055
check_spends!(txn[0], funding_tx);
2008020056
}
2008120057
}
20082-
20083-
#[test]
20084-
#[rustfmt::skip]
20085-
fn test_malformed_forward_htlcs_ser() {
20086-
// Ensure that `HTLCForwardInfo::FailMalformedHTLC`s are (de)serialized properly.
20087-
let chanmon_cfg = create_chanmon_cfgs(1);
20088-
let node_cfg = create_node_cfgs(1, &chanmon_cfg);
20089-
let persister;
20090-
let chain_monitor;
20091-
let chanmgrs = create_node_chanmgrs(1, &node_cfg, &[None]);
20092-
let deserialized_chanmgr;
20093-
let mut nodes = create_network(1, &node_cfg, &chanmgrs);
20094-
20095-
let dummy_failed_htlc = |htlc_id| {
20096-
HTLCForwardInfo::FailHTLC { htlc_id, err_packet: msgs::OnionErrorPacket { data: vec![42], attribution_data: Some(AttributionData::new()) } }
20097-
};
20098-
let dummy_malformed_htlc = |htlc_id| {
20099-
HTLCForwardInfo::FailMalformedHTLC {
20100-
htlc_id,
20101-
failure_code: LocalHTLCFailureReason::InvalidOnionPayload.failure_code(),
20102-
sha256_of_onion: [0; 32],
20103-
}
20104-
};
20105-
20106-
let dummy_htlcs_1: Vec<HTLCForwardInfo> = (1..10).map(|htlc_id| {
20107-
if htlc_id % 2 == 0 {
20108-
dummy_failed_htlc(htlc_id)
20109-
} else {
20110-
dummy_malformed_htlc(htlc_id)
20111-
}
20112-
}).collect();
20113-
20114-
let dummy_htlcs_2: Vec<HTLCForwardInfo> = (1..10).map(|htlc_id| {
20115-
if htlc_id % 2 == 1 {
20116-
dummy_failed_htlc(htlc_id)
20117-
} else {
20118-
dummy_malformed_htlc(htlc_id)
20119-
}
20120-
}).collect();
20121-
20122-
20123-
let (scid_1, scid_2) = (42, 43);
20124-
let mut forward_htlcs = new_hash_map();
20125-
forward_htlcs.insert(scid_1, dummy_htlcs_1.clone());
20126-
forward_htlcs.insert(scid_2, dummy_htlcs_2.clone());
20127-
20128-
let mut chanmgr_fwd_htlcs = nodes[0].node.forward_htlcs.lock().unwrap();
20129-
*chanmgr_fwd_htlcs = forward_htlcs.clone();
20130-
core::mem::drop(chanmgr_fwd_htlcs);
20131-
20132-
reload_node!(nodes[0], nodes[0].node.encode(), &[], persister, chain_monitor, deserialized_chanmgr);
20133-
20134-
let mut deserialized_fwd_htlcs = nodes[0].node.forward_htlcs.lock().unwrap();
20135-
for scid in [scid_1, scid_2].iter() {
20136-
let deserialized_htlcs = deserialized_fwd_htlcs.remove(scid).unwrap();
20137-
assert_eq!(forward_htlcs.remove(scid).unwrap(), deserialized_htlcs);
20138-
}
20139-
assert!(deserialized_fwd_htlcs.is_empty());
20140-
core::mem::drop(deserialized_fwd_htlcs);
20141-
}
2014220058
}
2014320059

2014420060
#[cfg(ldk_bench)]

0 commit comments

Comments
 (0)