Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 31 additions & 49 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use crate::commands::*;
use crate::error::BDKCliError as Error;
#[cfg(any(feature = "sqlite", feature = "redb"))]
use crate::persister::Persister;
#[cfg(feature = "cbf")]
use crate::utils::BlockchainClient::KyotoClient;
use crate::utils::*;
#[cfg(feature = "redb")]
use bdk_redb::Store as RedbStore;
Expand Down Expand Up @@ -45,8 +47,6 @@ use bdk_wallet::{
};
use cli_table::{Cell, CellStruct, Style, Table, format::Justify};
use serde_json::json;
#[cfg(feature = "cbf")]
use {crate::utils::BlockchainClient::KyotoClient, bdk_kyoto::LightClient, tokio::select};

#[cfg(feature = "electrum")]
use crate::utils::BlockchainClient::Electrum;
Expand Down Expand Up @@ -602,7 +602,7 @@ pub fn handle_offline_wallet_subcommand(
))]
pub(crate) async fn handle_online_wallet_subcommand(
wallet: &mut Wallet,
client: BlockchainClient,
client: &mut BlockchainClient,
online_subcommand: OnlineWalletSubCommand,
) -> Result<String, Error> {
match online_subcommand {
Expand All @@ -629,7 +629,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
client
.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx));

let update = client.full_scan(request, _stop_gap, batch_size, false)?;
let update = client.full_scan(request, _stop_gap, *batch_size, false)?;
wallet.apply_update(update)?;
}
#[cfg(feature = "esplora")]
Expand All @@ -638,7 +638,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
parallel_requests,
} => {
let update = client
.full_scan(request, _stop_gap, parallel_requests)
.full_scan(request, _stop_gap, *parallel_requests)
.await
.map_err(|e| *e)?;
wallet.apply_update(update)?;
Expand All @@ -655,7 +655,7 @@ pub(crate) async fn handle_online_wallet_subcommand(
hash: genesis_block.block_hash(),
});
let mut emitter = Emitter::new(
&*client,
&**client,
genesis_cp.clone(),
genesis_cp.height(),
NO_EXPECTED_MEMPOOL_TXS,
Expand Down Expand Up @@ -986,11 +986,12 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
};

let mut wallet = new_persisted_wallet(network, &mut persister, wallet_opts)?;
let blockchain_client = new_blockchain_client(wallet_opts, &wallet, database_path)?;
let mut blockchain_client =
new_blockchain_client(wallet_opts, &wallet, database_path)?;

let result = handle_online_wallet_subcommand(
&mut wallet,
blockchain_client,
&mut blockchain_client,
online_subcommand,
)
.await?;
Expand All @@ -1000,11 +1001,15 @@ pub(crate) async fn handle_command(cli_opts: CliOpts) -> Result<String, Error> {
#[cfg(not(any(feature = "sqlite", feature = "redb")))]
let result = {
let wallet = new_wallet(network, wallet_opts)?;
let blockchain_client =
let mut blockchain_client =
crate::utils::new_blockchain_client(wallet_opts, &wallet, database_path)?;
let mut wallet = new_wallet(network, wallet_opts)?;
handle_online_wallet_subcommand(&mut wallet, blockchain_client, online_subcommand)
.await?
handle_online_wallet_subcommand(
&mut wallet,
&mut blockchain_client,
online_subcommand,
)
.await?
};
Ok(result)
}
Expand Down Expand Up @@ -1183,9 +1188,9 @@ async fn respond(
ReplSubCommand::Wallet {
subcommand: WalletSubCommand::OnlineWalletSubCommand(online_subcommand),
} => {
let blockchain =
let mut blockchain =
new_blockchain_client(wallet_opts, wallet, _datadir).map_err(|e| e.to_string())?;
let value = handle_online_wallet_subcommand(wallet, blockchain, online_subcommand)
let value = handle_online_wallet_subcommand(wallet, &mut blockchain, online_subcommand)
.await
.map_err(|e| e.to_string())?;
Some(value)
Expand Down Expand Up @@ -1228,7 +1233,7 @@ async fn respond(
feature = "rpc"
))]
/// Syncs a given wallet using the blockchain client.
pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> {
pub async fn sync_wallet(client: &mut BlockchainClient, wallet: &mut Wallet) -> Result<(), Error> {
#[cfg(any(feature = "electrum", feature = "esplora"))]
let request = wallet
.start_sync_with_revealed_spks()
Expand All @@ -1243,7 +1248,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
// already have.
client.populate_tx_cache(wallet.tx_graph().full_txs().map(|tx_node| tx_node.tx));

let update = client.sync(request, batch_size, false)?;
let update = client.sync(request, *batch_size, false)?;
wallet
.apply_update(update)
.map_err(|e| Error::Generic(e.to_string()))
Expand All @@ -1254,7 +1259,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
parallel_requests,
} => {
let update = client
.sync(request, parallel_requests)
.sync(request, *parallel_requests)
.await
.map_err(|e| *e)?;
wallet
Expand All @@ -1269,7 +1274,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
// reload the last 200 blocks in case of a reorg
let emitter_height = wallet_cp.height().saturating_sub(200);
let mut emitter = Emitter::new(
&*client,
&**client,
wallet_cp,
emitter_height,
wallet
Expand Down Expand Up @@ -1320,7 +1325,7 @@ pub async fn sync_wallet(client: BlockchainClient, wallet: &mut Wallet) -> Resul
))]
/// Broadcasts a given transaction using the blockchain client.
pub async fn broadcast_transaction(
client: BlockchainClient,
client: &mut BlockchainClient,
tx: Transaction,
) -> Result<Txid, Error> {
match client {
Expand All @@ -1347,38 +1352,15 @@ pub async fn broadcast_transaction(

#[cfg(feature = "cbf")]
KyotoClient { client } => {
let LightClient {
requester,
mut info_subscriber,
mut warning_subscriber,
update_subscriber: _,
node,
} = *client;

let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber)
.map_err(|e| Error::Generic(format!("SetGlobalDefault error: {e}")))?;

tokio::task::spawn(async move { node.run().await });
tokio::task::spawn(async move {
select! {
info = info_subscriber.recv() => {
if let Some(info) = info {
tracing::info!("{info}");
}
},
warn = warning_subscriber.recv() => {
if let Some(warn) = warn {
tracing::warn!("{warn}");
}
}
}
});
let txid = tx.compute_txid();
let wtxid = requester.broadcast_random(tx.clone()).await.map_err(|_| {
tracing::warn!("Broadcast was unsuccessful");
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
})?;
let wtxid = client
.requester
.broadcast_random(tx.clone())
.await
.map_err(|_| {
tracing::warn!("Broadcast was unsuccessful");
Error::Generic("Transaction broadcast timed out after 30 seconds".into())
})?;
tracing::info!("Successfully broadcast WTXID: {wtxid}");
Ok(txid)
}
Expand Down
Loading
Loading