Skip to content

Commit 257b3e0

Browse files
committed
WIP DEBUG
1 parent 13b2491 commit 257b3e0

File tree

4 files changed

+102
-26
lines changed

4 files changed

+102
-26
lines changed

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ bip21 = { version = "0.5", features = ["std"], default-features = false }
9191
base64 = { version = "0.22.1", default-features = false, features = ["std"] }
9292
rand = "0.8.5"
9393
chrono = { version = "0.4", default-features = false, features = ["clock"] }
94-
tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros" ] }
94+
tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros", "tracing"] }
95+
console-subscriber = "0.4.1"
9596
esplora-client = { version = "0.12", default-features = false, features = ["tokio", "async-https-rustls"] }
9697
electrum-client = { version = "0.24.0", default-features = false, features = ["proxy", "use-rustls-ring"] }
9798
libc = "0.2"
@@ -100,7 +101,8 @@ serde = { version = "1.0.210", default-features = false, features = ["std", "der
100101
serde_json = { version = "1.0.128", default-features = false, features = ["std"] }
101102
log = { version = "0.4.22", default-features = false, features = ["std"]}
102103

103-
vss-client = "0.3"
104+
#vss-client = "0.3"
105+
vss-client = { path = "../vss-rust-client" }
104106
prost = { version = "0.11.6", default-features = false}
105107

106108
[target.'cfg(windows)'.dependencies]

src/io/vss_store.rs

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub struct VssStore {
5151
// operations aren't sensitive to the order of execution.
5252
next_version: AtomicU64,
5353
runtime: Arc<Runtime>,
54+
inner_runtime: Arc<tokio::runtime::Runtime>,
5455
}
5556

5657
impl VssStore {
@@ -60,7 +61,8 @@ impl VssStore {
6061
) -> Self {
6162
let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider));
6263
let next_version = AtomicU64::new(1);
63-
Self { inner, next_version, runtime }
64+
let inner_runtime = Arc::new(tokio::runtime::Runtime::new().unwrap());
65+
Self { inner, next_version, runtime, inner_runtime }
6466
}
6567

6668
// Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys
@@ -103,16 +105,31 @@ impl KVStoreSync for VssStore {
103105
) -> io::Result<()> {
104106
let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key);
105107
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone());
106-
let fut = self.inner.write_internal(
107-
inner_lock_ref,
108-
locking_key,
109-
version,
110-
primary_namespace,
111-
secondary_namespace,
112-
key,
113-
buf,
114-
);
115-
self.runtime.block_on(fut)
108+
let write_id: u64 = rand::random();
109+
println!("WRITE {} IS SYNC", write_id);
110+
let primary_namespace = primary_namespace.to_string();
111+
let secondary_namespace = secondary_namespace.to_string();
112+
let key = key.to_string();
113+
let inner = Arc::clone(&self.inner);
114+
let fut = async move {
115+
inner
116+
.write_internal(
117+
inner_lock_ref,
118+
locking_key,
119+
version,
120+
&primary_namespace,
121+
&secondary_namespace,
122+
&key,
123+
buf,
124+
write_id,
125+
)
126+
.await
127+
};
128+
// let spawned_fut = self.inner_runtime.spawn(fut);
129+
// self
130+
// .runtime
131+
// .block_on( async { spawned_fut.await.unwrap() })
132+
self.runtime.spawn_block_on(async { fut.await }, write_id)
116133
}
117134

118135
fn remove(
@@ -158,6 +175,8 @@ impl KVStore for VssStore {
158175
let secondary_namespace = secondary_namespace.to_string();
159176
let key = key.to_string();
160177
let inner = Arc::clone(&self.inner);
178+
let write_id: u64 = rand::random();
179+
println!("WRITE {} IS ASYNC", write_id);
161180
Box::pin(async move {
162181
inner
163182
.write_internal(
@@ -168,6 +187,7 @@ impl KVStore for VssStore {
168187
&secondary_namespace,
169188
&key,
170189
buf,
190+
write_id,
171191
)
172192
.await
173193
})
@@ -332,7 +352,7 @@ impl VssStoreInner {
332352

333353
async fn write_internal(
334354
&self, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>, locking_key: String, version: u64,
335-
primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
355+
primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>, write_id: u64,
336356
) -> io::Result<()> {
337357
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
338358

@@ -345,21 +365,39 @@ impl VssStoreInner {
345365
store_id: self.store_id.clone(),
346366
global_version: None,
347367
transaction_items: vec![KeyValue {
348-
key: obfuscated_key,
368+
key: obfuscated_key.clone(),
349369
version: vss_version,
350370
value: storable.encode_to_vec(),
351371
}],
352372
delete_items: vec![],
353373
};
354374

355-
self.client.put_object(&request).await.map_err(|e| {
375+
println!(
376+
"WRITE {}: {}/{}/{} ({})",
377+
write_id, primary_namespace, secondary_namespace, key, obfuscated_key
378+
);
379+
let fut = self.client.put_object(&request);
380+
// let res =
381+
// tokio::time::timeout(Duration::from_secs(5), fut).await.unwrap().map_err(|e| {
382+
// let msg = format!(
383+
// "Failed to write to key {}/{}/{}: {}",
384+
// primary_namespace, secondary_namespace, key, e
385+
// );
386+
// Error::new(ErrorKind::Other, msg)
387+
// });
388+
let res = fut.await.map_err(|e| {
356389
let msg = format!(
357390
"Failed to write to key {}/{}/{}: {}",
358391
primary_namespace, secondary_namespace, key, e
359392
);
360393
Error::new(ErrorKind::Other, msg)
361-
})?;
394+
});
395+
println!(
396+
"WRITE DONE {}: {}/{}/{} ({})",
397+
write_id, primary_namespace, secondary_namespace, key, obfuscated_key
398+
);
362399

400+
res?;
363401
Ok(())
364402
})
365403
.await
@@ -417,7 +455,9 @@ impl VssStoreInner {
417455
callback: FN,
418456
) -> Result<(), lightning::io::Error> {
419457
let res = {
458+
println!("BEFORE TAKING THE LOCK");
420459
let mut last_written_version = inner_lock_ref.lock().await;
460+
println!("AFTER TAKING THE LOCK");
421461

422462
// Check if we already have a newer version written/removed. This is used in async contexts to realize eventual
423463
// consistency.

src/runtime.rs

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,16 @@ impl Runtime {
2929
let mode = match tokio::runtime::Handle::try_current() {
3030
Ok(handle) => RuntimeMode::Handle(handle),
3131
Err(_) => {
32-
let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build()?;
32+
let rt = tokio::runtime::Builder::new_multi_thread()
33+
.thread_name_fn(|| {
34+
"MY-CUSTOM".to_owned()
35+
})
36+
.worker_threads(5)
37+
.max_blocking_threads(20)
38+
.on_thread_start(|| {
39+
println!("THREAD started");
40+
})
41+
.enable_all().build()?;
3342
RuntimeMode::Owned(rt)
3443
},
3544
};
@@ -67,7 +76,7 @@ impl Runtime {
6776
{
6877
let mut background_tasks = self.background_tasks.lock().unwrap();
6978
let runtime_handle = self.handle();
70-
background_tasks.spawn_on(future, runtime_handle);
79+
background_tasks.spawn_on(async { future.await }, runtime_handle);
7180
}
7281

7382
pub fn spawn_cancellable_background_task<F>(&self, future: F)
@@ -76,7 +85,7 @@ impl Runtime {
7685
{
7786
let mut cancellable_background_tasks = self.cancellable_background_tasks.lock().unwrap();
7887
let runtime_handle = self.handle();
79-
cancellable_background_tasks.spawn_on(future, runtime_handle);
88+
cancellable_background_tasks.spawn_on(async { future.await }, runtime_handle);
8089
}
8190

8291
pub fn spawn_background_processor_task<F>(&self, future: F)
@@ -87,7 +96,7 @@ impl Runtime {
8796
debug_assert!(background_processor_task.is_none(), "Expected no background processor_task");
8897

8998
let runtime_handle = self.handle();
90-
let handle = runtime_handle.spawn(future);
99+
let handle = runtime_handle.spawn(async { future.await });
91100
*background_processor_task = Some(handle);
92101
}
93102

@@ -106,21 +115,41 @@ impl Runtime {
106115
// during `block_on`, as this is the context `block_in_place` would operate on. So we try
107116
// to detect the outer context here, and otherwise use whatever was set during
108117
// initialization.
109-
let handle = tokio::runtime::Handle::try_current().unwrap_or(self.handle().clone());
110-
tokio::task::block_in_place(move || handle.block_on(future))
118+
let runtime_handle = tokio::runtime::Handle::try_current().unwrap_or(self.handle().clone());
119+
tokio::task::block_in_place(move || runtime_handle.block_on(async { future.await }))
120+
}
121+
122+
pub fn spawn_block_on<F: Future + Send + 'static>(&self, future: F, write_id: u64) -> F::Output
123+
where
124+
<F as std::future::Future>::Output: Send + std::fmt::Debug,
125+
{
126+
// While we generally decided not to overthink via which call graph users would enter our
127+
// runtime context, we'd still try to reuse whatever current context would be present
128+
// during `block_on`, as this is the context `block_in_place` would operate on. So we try
129+
// to detect the outer context here, and otherwise use whatever was set during
130+
// initialization.
131+
let runtime_handle = tokio::runtime::Handle::try_current().unwrap_or(self.handle().clone());
132+
let (tx, rx) = tokio::sync::oneshot::channel();
133+
134+
println!("WRITE {} is TASK {}", write_id, handle.id());
135+
println!("RUNTIME STATS BEFORE: {} workers, {} blocking queue depth", runtime_handle.metrics().num_workers(), runtime_handle.metrics().blocking_queue_depth());
136+
let res = tokio::task::block_in_place(move || runtime_handle.block_on(async { future.await }));
137+
println!("WRITE {} AFTER block_in_place", write_id);
138+
println!("RUNTIME STATS AFTER: {} workers, {} blocking queue depth", runtime_handle.metrics().num_workers(), runtime_handle.metrics().blocking_queue_depth());
139+
res
111140
}
112141

113142
pub fn abort_cancellable_background_tasks(&self) {
114143
let mut tasks = core::mem::take(&mut *self.cancellable_background_tasks.lock().unwrap());
115144
debug_assert!(tasks.len() > 0, "Expected some cancellable background_tasks");
116145
tasks.abort_all();
117-
self.block_on(async { while let Some(_) = tasks.join_next().await {} })
146+
self.block_on(async move { while let Some(_) = tasks.join_next().await {} })
118147
}
119148

120149
pub fn wait_on_background_tasks(&self) {
121150
let mut tasks = core::mem::take(&mut *self.background_tasks.lock().unwrap());
122151
debug_assert!(tasks.len() > 0, "Expected some background_tasks");
123-
self.block_on(async {
152+
self.block_on(async move {
124153
loop {
125154
let timeout_fut = tokio::time::timeout(
126155
Duration::from_secs(BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS),
@@ -154,7 +183,7 @@ impl Runtime {
154183
self.background_processor_task.lock().unwrap().take()
155184
{
156185
let abort_handle = background_processor_task.abort_handle();
157-
let timeout_res = self.block_on(async {
186+
let timeout_res = self.block_on(async move {
158187
tokio::time::timeout(
159188
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
160189
background_processor_task,

tests/integration_tests_vss.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,20 @@ mod common;
1111

1212
use std::collections::HashMap;
1313

14+
use ldk_node::logger::LogLevel;
1415
use ldk_node::Builder;
1516

1617
#[test]
1718
fn channel_full_cycle_with_vss_store() {
19+
console_subscriber::init();
1820
let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd();
1921
println!("== Node A ==");
2022
let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap());
2123
let config_a = common::random_config(true);
2224
let mut builder_a = Builder::from_config(config_a.node_config);
2325
builder_a.set_chain_source_esplora(esplora_url.clone(), None);
26+
builder_a.set_filesystem_logger(None, Some(LogLevel::Trace));
27+
builder_a.set_entropy_seed_bytes([42u8; 64]);
2428
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
2529
let node_a = builder_a
2630
.build_with_vss_store_and_fixed_headers(
@@ -35,6 +39,7 @@ fn channel_full_cycle_with_vss_store() {
3539
let config_b = common::random_config(true);
3640
let mut builder_b = Builder::from_config(config_b.node_config);
3741
builder_b.set_chain_source_esplora(esplora_url.clone(), None);
42+
builder_b.set_entropy_seed_bytes([43u8; 64]);
3843
let node_b = builder_b
3944
.build_with_vss_store_and_fixed_headers(
4045
vss_base_url,

0 commit comments

Comments
 (0)