Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,9 +668,9 @@ impl NodeBuilder {
let logger = setup_logger(&self.log_writer_config, &self.config)?;

let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
Arc::new(Runtime::with_handle(handle.clone()))
Arc::new(Runtime::with_handle(handle.clone(), Arc::clone(&logger)))
} else {
Arc::new(Runtime::new().map_err(|e| {
Arc::new(Runtime::new(Arc::clone(&logger)).map_err(|e| {
log_error!(logger, "Failed to setup tokio runtime: {}", e);
BuildError::RuntimeSetupFailed
})?)
Expand Down Expand Up @@ -715,9 +715,9 @@ impl NodeBuilder {
let logger = setup_logger(&self.log_writer_config, &self.config)?;

let runtime = if let Some(handle) = self.runtime_handle.as_ref() {
Arc::new(Runtime::with_handle(handle.clone()))
Arc::new(Runtime::with_handle(handle.clone(), Arc::clone(&logger)))
} else {
Arc::new(Runtime::new().map_err(|e| {
Arc::new(Runtime::new(Arc::clone(&logger)).map_err(|e| {
log_error!(logger, "Failed to setup tokio runtime: {}", e);
BuildError::RuntimeSetupFailed
})?)
Expand Down Expand Up @@ -1668,18 +1668,11 @@ fn build_with_store_internal(
};

let (stop_sender, _) = tokio::sync::watch::channel(());
let background_processor_task = Mutex::new(None);
let background_tasks = Mutex::new(None);
let cancellable_background_tasks = Mutex::new(None);

let is_running = Arc::new(RwLock::new(false));

Ok(Node {
runtime,
stop_sender,
background_processor_task,
background_tasks,
cancellable_background_tasks,
config,
wallet,
chain_source,
Expand Down
4 changes: 2 additions & 2 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ where
forwarding_channel_manager.process_pending_htlc_forwards();
};

self.runtime.spawn(future);
self.runtime.spawn_cancellable_background_task(future);
},
LdkEvent::SpendableOutputs { outputs, channel_id } => {
match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) {
Expand Down Expand Up @@ -1441,7 +1441,7 @@ where
}
}
};
self.runtime.spawn(future);
self.runtime.spawn_cancellable_background_task(future);
},
LdkEvent::BumpTransaction(bte) => {
match bte {
Expand Down
2 changes: 1 addition & 1 deletion src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,6 @@ impl RuntimeSpawner {

impl FutureSpawner for RuntimeSpawner {
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
self.runtime.spawn(future);
self.runtime.spawn_cancellable_background_task(future);
}
}
186 changes: 36 additions & 150 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,8 @@ pub use builder::NodeBuilder as Builder;

use chain::ChainSource;
use config::{
default_user_config, may_announce_channel, ChannelConfig, Config,
BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS, LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS,
NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL,
PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
};
use connection::ConnectionManager;
use event::{EventHandler, EventQueue};
Expand Down Expand Up @@ -181,9 +180,6 @@ uniffi::include_scaffolding!("ldk_node");
pub struct Node {
runtime: Arc<Runtime>,
stop_sender: tokio::sync::watch::Sender<()>,
background_processor_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
background_tasks: Mutex<Option<tokio::task::JoinSet<()>>>,
cancellable_background_tasks: Mutex<Option<tokio::task::JoinSet<()>>>,
config: Arc<Config>,
wallet: Arc<Wallet>,
chain_source: Arc<ChainSource>,
Expand Down Expand Up @@ -226,10 +222,6 @@ impl Node {
return Err(Error::AlreadyRunning);
}

let mut background_tasks = tokio::task::JoinSet::new();
let mut cancellable_background_tasks = tokio::task::JoinSet::new();
let runtime_handle = self.runtime.handle();

log_info!(
self.logger,
"Starting up LDK Node with node ID {} on network: {}",
Expand All @@ -253,27 +245,19 @@ impl Node {
let sync_cman = Arc::clone(&self.channel_manager);
let sync_cmon = Arc::clone(&self.chain_monitor);
let sync_sweeper = Arc::clone(&self.output_sweeper);
background_tasks.spawn_on(
async move {
chain_source
.continuously_sync_wallets(
stop_sync_receiver,
sync_cman,
sync_cmon,
sync_sweeper,
)
.await;
},
runtime_handle,
);
self.runtime.spawn_background_task(async move {
chain_source
.continuously_sync_wallets(stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper)
.await;
});

if self.gossip_source.is_rgs() {
let gossip_source = Arc::clone(&self.gossip_source);
let gossip_sync_store = Arc::clone(&self.kv_store);
let gossip_sync_logger = Arc::clone(&self.logger);
let gossip_node_metrics = Arc::clone(&self.node_metrics);
let mut stop_gossip_sync = self.stop_sender.subscribe();
cancellable_background_tasks.spawn_on(async move {
self.runtime.spawn_cancellable_background_task(async move {
let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL);
loop {
tokio::select! {
Expand Down Expand Up @@ -314,7 +298,7 @@ impl Node {
}
}
}
}, runtime_handle);
});
}

if let Some(listening_addresses) = &self.config.listening_addresses {
Expand All @@ -340,7 +324,7 @@ impl Node {
bind_addrs.extend(resolved_address);
}

cancellable_background_tasks.spawn_on(async move {
self.runtime.spawn_cancellable_background_task(async move {
{
let listener =
tokio::net::TcpListener::bind(&*bind_addrs).await
Expand Down Expand Up @@ -378,7 +362,7 @@ impl Node {
}

listening_indicator.store(false, Ordering::Release);
}, runtime_handle);
});
}

// Regularly reconnect to persisted peers.
Expand All @@ -387,7 +371,7 @@ impl Node {
let connect_logger = Arc::clone(&self.logger);
let connect_peer_store = Arc::clone(&self.peer_store);
let mut stop_connect = self.stop_sender.subscribe();
cancellable_background_tasks.spawn_on(async move {
self.runtime.spawn_cancellable_background_task(async move {
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
Expand Down Expand Up @@ -415,7 +399,7 @@ impl Node {
}
}
}
}, runtime_handle);
});

// Regularly broadcast node announcements.
let bcast_cm = Arc::clone(&self.channel_manager);
Expand All @@ -427,7 +411,7 @@ impl Node {
let mut stop_bcast = self.stop_sender.subscribe();
let node_alias = self.config.node_alias.clone();
if may_announce_channel(&self.config).is_ok() {
cancellable_background_tasks.spawn_on(async move {
self.runtime.spawn_cancellable_background_task(async move {
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
#[cfg(not(test))]
let mut interval = tokio::time::interval(Duration::from_secs(30));
Expand Down Expand Up @@ -498,15 +482,14 @@ impl Node {
}
}
}
}, runtime_handle);
});
}

let stop_tx_bcast = self.stop_sender.subscribe();
let chain_source = Arc::clone(&self.chain_source);
cancellable_background_tasks.spawn_on(
async move { chain_source.continuously_process_broadcast_queue(stop_tx_bcast).await },
runtime_handle,
);
self.runtime.spawn_cancellable_background_task(async move {
chain_source.continuously_process_broadcast_queue(stop_tx_bcast).await
});

let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new(
Arc::clone(&self.tx_broadcaster),
Expand Down Expand Up @@ -563,7 +546,7 @@ impl Node {
})
};

let handle = self.runtime.spawn(async move {
self.runtime.spawn_background_processor_task(async move {
process_events_async(
background_persister,
|e| background_event_handler.handle_event(e),
Expand All @@ -584,38 +567,27 @@ impl Node {
panic!("Failed to process events");
});
});
debug_assert!(self.background_processor_task.lock().unwrap().is_none());
*self.background_processor_task.lock().unwrap() = Some(handle);

if let Some(liquidity_source) = self.liquidity_source.as_ref() {
let mut stop_liquidity_handler = self.stop_sender.subscribe();
let liquidity_handler = Arc::clone(&liquidity_source);
let liquidity_logger = Arc::clone(&self.logger);
background_tasks.spawn_on(
async move {
loop {
tokio::select! {
_ = stop_liquidity_handler.changed() => {
log_debug!(
liquidity_logger,
"Stopping processing liquidity events.",
);
return;
}
_ = liquidity_handler.handle_next_event() => {}
self.runtime.spawn_background_task(async move {
loop {
tokio::select! {
_ = stop_liquidity_handler.changed() => {
log_debug!(
liquidity_logger,
"Stopping processing liquidity events.",
);
return;
}
_ = liquidity_handler.handle_next_event() => {}
}
},
runtime_handle,
);
}
});
}

debug_assert!(self.background_tasks.lock().unwrap().is_none());
*self.background_tasks.lock().unwrap() = Some(background_tasks);

debug_assert!(self.cancellable_background_tasks.lock().unwrap().is_none());
*self.cancellable_background_tasks.lock().unwrap() = Some(cancellable_background_tasks);

log_info!(self.logger, "Startup complete.");
*is_running_lock = true;
Ok(())
Expand Down Expand Up @@ -649,15 +621,7 @@ impl Node {
}

// Cancel cancellable background tasks
if let Some(mut tasks) = self.cancellable_background_tasks.lock().unwrap().take() {
let runtime_handle = self.runtime.handle();
tasks.abort_all();
tokio::task::block_in_place(move || {
runtime_handle.block_on(async { while let Some(_) = tasks.join_next().await {} })
});
} else {
debug_assert!(false, "Expected some cancellable background tasks");
};
self.runtime.abort_cancellable_background_tasks();

// Disconnect all peers.
self.peer_manager.disconnect_all_peers();
Expand All @@ -668,91 +632,13 @@ impl Node {
log_debug!(self.logger, "Stopped chain sources.");

// Wait until non-cancellable background tasks (mod LDK's background processor) are done.
let runtime_handle = self.runtime.handle();
if let Some(mut tasks) = self.background_tasks.lock().unwrap().take() {
tokio::task::block_in_place(move || {
runtime_handle.block_on(async {
loop {
let timeout_fut = tokio::time::timeout(
Duration::from_secs(BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS),
tasks.join_next_with_id(),
);
match timeout_fut.await {
Ok(Some(Ok((id, _)))) => {
log_trace!(self.logger, "Stopped background task with id {}", id);
},
Ok(Some(Err(e))) => {
tasks.abort_all();
log_trace!(self.logger, "Stopping background task failed: {}", e);
break;
},
Ok(None) => {
log_debug!(self.logger, "Stopped all background tasks");
break;
},
Err(e) => {
tasks.abort_all();
log_error!(
self.logger,
"Stopping background task timed out: {}",
e
);
break;
},
}
}
})
});
} else {
debug_assert!(false, "Expected some background tasks");
};
self.runtime.wait_on_background_tasks();

// Wait until background processing stopped, at least until a timeout is reached.
if let Some(background_processor_task) =
self.background_processor_task.lock().unwrap().take()
{
let abort_handle = background_processor_task.abort_handle();
let timeout_res = tokio::task::block_in_place(move || {
self.runtime.block_on(async {
tokio::time::timeout(
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
background_processor_task,
)
.await
})
});

match timeout_res {
Ok(stop_res) => match stop_res {
Ok(()) => log_debug!(self.logger, "Stopped background processing of events."),
Err(e) => {
abort_handle.abort();
log_error!(
self.logger,
"Stopping event handling failed. This should never happen: {}",
e
);
panic!("Stopping event handling failed. This should never happen.");
},
},
Err(e) => {
abort_handle.abort();
log_error!(self.logger, "Stopping event handling timed out: {}", e);
},
}
} else {
debug_assert!(false, "Expected a background processing task");
};
// Finally, wait until background processing stopped, at least until a timeout is reached.
self.runtime.wait_on_background_processor_task();

#[cfg(tokio_unstable)]
{
let runtime_handle = self.runtime.handle();
log_trace!(
self.logger,
"Active runtime tasks left prior to shutdown: {}",
runtime_handle.metrics().active_tasks_count()
);
}
self.runtime.log_metrics();

log_info!(self.logger, "Shutdown complete.");
*is_running_lock = false;
Expand Down
Loading
Loading