Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,28 @@ lightning-liquidity = { version = "0.1.0", features = ["std"] }
#lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync", features = ["esplora-async-https", "electrum", "time"] }
#lightning-liquidity = { path = "../rust-lightning/lightning-liquidity", features = ["std"] }

bdk_chain = { version = "0.21.1", default-features = false, features = ["std"] }
bdk_esplora = { version = "0.20.1", default-features = false, features = ["async-https-rustls", "tokio"]}
bdk_electrum = { version = "0.20.1", default-features = false, features = ["use-rustls"]}
bdk_wallet = { version = "1.0.0", default-features = false, features = ["std", "keys-bip39"]}
bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] }
bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]}
bdk_electrum = { version = "0.23.0", default-features = false, features = ["use-rustls"]}
bdk_wallet = { version = "2.0.0", default-features = false, features = ["std", "keys-bip39"]}

reqwest = { version = "0.11", default-features = false, features = ["json", "rustls-tls"] }
reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
rusqlite = { version = "0.31.0", features = ["bundled"] }
bitcoin = "0.32.2"
bitcoin = "0.32.4"
bip39 = "2.0.0"
bip21 = { version = "0.5", features = ["std"], default-features = false }

base64 = { version = "0.22.1", default-features = false, features = ["std"] }
rand = "0.8.5"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros" ] }
esplora-client = { version = "0.11", default-features = false, features = ["tokio", "async-https-rustls"] }
electrum-client = { version = "0.22.0", default-features = true }
esplora-client = { version = "0.12", default-features = false, features = ["tokio", "async-https-rustls"] }

# FIXME: This was introduced to decouple the `bdk_esplora` and
# `lightning-transaction-sync` APIs. We should drop it as part of the upgrade
# to LDK 0.2.
esplora-client_0_11 = { package = "esplora-client", version = "0.11", default-features = false, features = ["tokio", "async-https-rustls"] }
electrum-client = { version = "0.23.1", default-features = true }
libc = "0.2"
uniffi = { version = "0.27.3", features = ["build"], optional = true }
serde = { version = "1.0.210", default-features = false, features = ["std", "derive"] }
Expand All @@ -98,10 +103,10 @@ proptest = "1.0.0"
regex = "1.5.6"

[target.'cfg(not(no_download))'.dev-dependencies]
electrsd = { version = "0.33.0", default-features = false, features = ["legacy", "esplora_a33e97e1", "corepc-node_27_2"] }
electrsd = { version = "0.34.0", default-features = false, features = ["legacy", "esplora_a33e97e1", "corepc-node_27_2"] }

[target.'cfg(no_download)'.dev-dependencies]
electrsd = { version = "0.33.0", default-features = false, features = ["legacy"] }
electrsd = { version = "0.34.0", default-features = false, features = ["legacy"] }
corepc-node = { version = "0.7.0", default-features = false, features = ["27_2"] }

[target.'cfg(cln_test)'.dev-dependencies]
Expand Down
31 changes: 30 additions & 1 deletion src/chain/bitcoind_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,24 @@ impl BitcoindRpcClient {
Ok(())
}

/// Returns two `Vec`s:
/// - mempool transactions, alongside their first-seen unix timestamps.
/// - transactions that have been evicted from the mempool, alongside the last time they were seen absent.
pub(crate) async fn get_updated_mempool_transactions(
&self, best_processed_height: u32, unconfirmed_txids: Vec<Txid>,
) -> std::io::Result<(Vec<(Transaction, u64)>, Vec<(Txid, u64)>)> {
let mempool_txs =
self.get_mempool_transactions_and_timestamp_at_height(best_processed_height).await?;
let evicted_txids = self.get_evicted_mempool_txids_and_timestamp(unconfirmed_txids).await?;
Ok((mempool_txs, evicted_txids))
}

/// Get mempool transactions, alongside their first-seen unix timestamps.
///
/// This method is an adapted version of `bdk_bitcoind_rpc::Emitter::mempool`. It emits each
/// transaction only once, unless we cannot assume the transaction's ancestors are already
/// emitted.
pub(crate) async fn get_mempool_transactions_and_timestamp_at_height(
async fn get_mempool_transactions_and_timestamp_at_height(
&self, best_processed_height: u32,
) -> std::io::Result<Vec<(Transaction, u64)>> {
let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed);
Expand Down Expand Up @@ -252,6 +264,23 @@ impl BitcoindRpcClient {
}
Ok(txs_to_emit)
}

// Retrieve a list of Txids that have been evicted from the mempool.
//
// To this end, we first update our local mempool_entries_cache and then return all unconfirmed
// wallet `Txid`s that don't appear in the mempool still.
async fn get_evicted_mempool_txids_and_timestamp(
&self, unconfirmed_txids: Vec<Txid>,
) -> std::io::Result<Vec<(Txid, u64)>> {
let latest_mempool_timestamp = self.latest_mempool_timestamp.load(Ordering::Relaxed);
let mempool_entries_cache = self.mempool_entries_cache.lock().await;
let evicted_txids = unconfirmed_txids
.into_iter()
.filter(|txid| mempool_entries_cache.contains_key(txid))
.map(|txid| (txid, latest_mempool_timestamp))
.collect();
Ok(evicted_txids)
}
}

impl BlockSource for BitcoindRpcClient {
Expand Down
25 changes: 19 additions & 6 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,18 @@ impl ChainSource {
kv_store: Arc<DynStore>, config: Arc<Config>, logger: Arc<Logger>,
node_metrics: Arc<RwLock<NodeMetrics>>,
) -> Self {
// FIXME / TODO: We introduced this to make `bdk_esplora` work separately without updating
// `lightning-transaction-sync`. We should revert this as part of of the upgrade to LDK 0.2.
let mut client_builder_0_11 = esplora_client_0_11::Builder::new(&server_url);
client_builder_0_11 = client_builder_0_11.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
let esplora_client_0_11 = client_builder_0_11.build_async().unwrap();
let tx_sync =
Arc::new(EsploraSyncClient::from_client(esplora_client_0_11, Arc::clone(&logger)));

let mut client_builder = esplora_client::Builder::new(&server_url);
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
let esplora_client = client_builder.build_async().unwrap();
let tx_sync =
Arc::new(EsploraSyncClient::from_client(esplora_client.clone(), Arc::clone(&logger)));

let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed);
Self::Esplora {
Expand Down Expand Up @@ -1088,18 +1095,24 @@ impl ChainSource {
let cur_height = channel_manager.current_best_block().height;

let now = SystemTime::now();
let unconfirmed_txids = onchain_wallet.get_unconfirmed_txids();
match bitcoind_rpc_client
.get_mempool_transactions_and_timestamp_at_height(cur_height)
.get_updated_mempool_transactions(cur_height, unconfirmed_txids)
.await
{
Ok(unconfirmed_txs) => {
Ok((unconfirmed_txs, evicted_txids)) => {
log_trace!(
logger,
"Finished polling mempool of size {} in {}ms",
"Finished polling mempool of size {} and {} evicted transactions in {}ms",
unconfirmed_txs.len(),
evicted_txids.len(),
now.elapsed().unwrap().as_millis()
);
let _ = onchain_wallet.apply_unconfirmed_txs(unconfirmed_txs);
onchain_wallet
.apply_mempool_txs(unconfirmed_txs, evicted_txids)
.unwrap_or_else(|e| {
log_error!(logger, "Failed to apply mempool transactions: {:?}", e);
});
},
Err(e) => {
log_error!(logger, "Failed to poll for mempool transactions: {:?}", e);
Expand Down
15 changes: 13 additions & 2 deletions src/wallet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@ where
self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect()
}

pub(crate) fn get_unconfirmed_txids(&self) -> Vec<Txid> {
self.inner
.lock()
.unwrap()
.transactions()
.filter(|t| t.chain_position.is_unconfirmed())
.map(|t| t.tx_node.txid)
.collect()
}

pub(crate) fn current_best_block(&self) -> BestBlock {
let checkpoint = self.inner.lock().unwrap().latest_checkpoint();
BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() }
Expand Down Expand Up @@ -136,11 +146,12 @@ where
}
}

pub(crate) fn apply_unconfirmed_txs(
&self, unconfirmed_txs: Vec<(Transaction, u64)>,
pub(crate) fn apply_mempool_txs(
&self, unconfirmed_txs: Vec<(Transaction, u64)>, evicted_txids: Vec<(Txid, u64)>,
) -> Result<(), Error> {
let mut locked_wallet = self.inner.lock().unwrap();
locked_wallet.apply_unconfirmed_txs(unconfirmed_txs);
locked_wallet.apply_evicted_txs(evicted_txids);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't clear from the bdk_wallet docs or release page that this is a new requirement. When upgrading, how do you typically see what changes are needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case we knew that we were waiting on exactly that change, but I also went through all the commit history of all the BDK crates since the last upgrade to double-check I'm not overlooking something fundamental. So yeah, I agree the changelog could do a better job listing all the changes that need to be considered (cc @notmandatory).


let mut locked_persister = self.persister.lock().unwrap();
locked_wallet.persist(&mut locked_persister).map_err(|e| {
Expand Down
15 changes: 14 additions & 1 deletion src/wallet/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ impl<'a> Writeable for ChangeSetSerWrapper<'a, BdkTxGraphChangeSet<ConfirmationB

encode_tlv_stream!(writer, {
(0, ChangeSetSerWrapper(&self.0.txs), required),
(1, Some(&self.0.first_seen), option),
(2, self.0.txouts, required),
(3, Some(&self.0.last_evicted), option),
(4, ChangeSetSerWrapper(&self.0.anchors), required),
(6, self.0.last_seen, required),
});
Expand All @@ -129,10 +131,14 @@ impl Readable for ChangeSetDeserWrapper<BdkTxGraphChangeSet<ConfirmationBlockTim
ChangeSetDeserWrapper<BTreeSet<(ConfirmationBlockTime, Txid)>>,
> = RequiredWrapper(None);
let mut last_seen: RequiredWrapper<BTreeMap<Txid, u64>> = RequiredWrapper(None);
let mut first_seen = None;
let mut last_evicted = None;

decode_tlv_stream!(reader, {
(0, txs, required),
(1, first_seen, option),
(2, txouts, required),
(3, last_evicted, option),
(4, anchors, required),
(6, last_seen, required),
});
Expand All @@ -142,6 +148,8 @@ impl Readable for ChangeSetDeserWrapper<BdkTxGraphChangeSet<ConfirmationBlockTim
txouts: txouts.0.unwrap(),
anchors: anchors.0.unwrap().0,
last_seen: last_seen.0.unwrap(),
first_seen: first_seen.unwrap_or_default(),
last_evicted: last_evicted.unwrap_or_default(),
}))
}
}
Expand Down Expand Up @@ -260,6 +268,7 @@ impl<'a> Writeable for ChangeSetSerWrapper<'a, BdkIndexerChangeSet> {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), lightning::io::Error> {
CHANGESET_SERIALIZATION_VERSION.write(writer)?;

// Note we don't persist/use the optional spk_cache currently.
encode_tlv_stream!(writer, { (0, ChangeSetSerWrapper(&self.0.last_revealed), required) });
Ok(())
}
Expand All @@ -275,9 +284,13 @@ impl Readable for ChangeSetDeserWrapper<BdkIndexerChangeSet> {
let mut last_revealed: RequiredWrapper<ChangeSetDeserWrapper<BTreeMap<DescriptorId, u32>>> =
RequiredWrapper(None);

// Note we don't persist/use the optional spk_cache currently.
decode_tlv_stream!(reader, { (0, last_revealed, required) });

Ok(Self(BdkIndexerChangeSet { last_revealed: last_revealed.0.unwrap().0 }))
Ok(Self(BdkIndexerChangeSet {
last_revealed: last_revealed.0.unwrap().0,
spk_cache: Default::default(),
}))
}
}

Expand Down
Loading