-
Notifications
You must be signed in to change notification settings - Fork 2
Stop user RPC on local DB too big #175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
55aa648
548e42d
80d5818
7d69282
dc00c4a
acdcef3
62dbf47
3871ee2
a5311fc
47a4f91
490d562
0c68418
4243732
a91ebae
af292c4
ce5067e
d568db3
7d784ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,14 @@ | ||||||||||||||||||||||||||||||||||||||
| //! Indexing functionality powered by Clickhouse. | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| use std::{fmt::Debug, time::Duration}; | ||||||||||||||||||||||||||||||||||||||
| use std::{ | ||||||||||||||||||||||||||||||||||||||
| fmt::Debug, | ||||||||||||||||||||||||||||||||||||||
| marker::PhantomData, | ||||||||||||||||||||||||||||||||||||||
| sync::{ | ||||||||||||||||||||||||||||||||||||||
| LazyLock, | ||||||||||||||||||||||||||||||||||||||
| atomic::{AtomicU64, Ordering}, | ||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
| time::Duration, | ||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| use crate::{ | ||||||||||||||||||||||||||||||||||||||
| cli::ClickhouseArgs, | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -34,9 +42,59 @@ fn config_from_clickhouse_args(args: &ClickhouseArgs, validation: bool) -> Click | |||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| struct MetricsWrapper; | ||||||||||||||||||||||||||||||||||||||
| /// little global (puaj) info to easily get the current clickhouse disk size. | ||||||||||||||||||||||||||||||||||||||
| #[derive(Default)] | ||||||||||||||||||||||||||||||||||||||
| pub(crate) struct ClickhouseLocalBackupDiskSize { | ||||||||||||||||||||||||||||||||||||||
| bundles_size: AtomicU64, | ||||||||||||||||||||||||||||||||||||||
| bundle_receipts_size: AtomicU64, | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| impl ClickhouseLocalBackupDiskSize { | ||||||||||||||||||||||||||||||||||||||
| pub(crate) fn set_bundles_size(&self, size: u64) { | ||||||||||||||||||||||||||||||||||||||
| self.bundles_size.store(size, Ordering::Relaxed); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| pub(crate) fn set_bundle_receipts_size(&self, size: u64) { | ||||||||||||||||||||||||||||||||||||||
| self.bundle_receipts_size.store(size, Ordering::Relaxed); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| pub(crate) fn disk_size(&self) -> u64 { | ||||||||||||||||||||||||||||||||||||||
| self.bundles_size.load(Ordering::Relaxed) + | ||||||||||||||||||||||||||||||||||||||
| self.bundle_receipts_size.load(Ordering::Relaxed) | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| /// We store here the current disk size of the backup database to avoid querying the metrics since | ||||||||||||||||||||||||||||||||||||||
| /// that would include a string map access. | ||||||||||||||||||||||||||||||||||||||
| pub(crate) static CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE: LazyLock<ClickhouseLocalBackupDiskSize> = | ||||||||||||||||||||||||||||||||||||||
| LazyLock::new(ClickhouseLocalBackupDiskSize::default); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| /// Callback invoked when disk backup size is set. Implement this trait to observe size updates. | ||||||||||||||||||||||||||||||||||||||
| pub(crate) trait DiskBackupSizeCallback: Send + Sync { | ||||||||||||||||||||||||||||||||||||||
| fn on_disk_backup_size(size_bytes: u64); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| struct UpdateBundleSizeCallback; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| impl DiskBackupSizeCallback for UpdateBundleSizeCallback { | ||||||||||||||||||||||||||||||||||||||
| fn on_disk_backup_size(size_bytes: u64) { | ||||||||||||||||||||||||||||||||||||||
| CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.set_bundles_size(size_bytes); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| struct UpdateBundleReceiptsSizeCallback; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| impl DiskBackupSizeCallback for UpdateBundleReceiptsSizeCallback { | ||||||||||||||||||||||||||||||||||||||
| fn on_disk_backup_size(size_bytes: u64) { | ||||||||||||||||||||||||||||||||||||||
| CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.set_bundle_receipts_size(size_bytes); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| struct MetricsWrapper<F>(PhantomData<F>) | ||||||||||||||||||||||||||||||||||||||
| where | ||||||||||||||||||||||||||||||||||||||
| F: DiskBackupSizeCallback; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| impl rbuilder_utils::clickhouse::backup::metrics::Metrics for MetricsWrapper { | ||||||||||||||||||||||||||||||||||||||
| impl<F: DiskBackupSizeCallback> rbuilder_utils::clickhouse::backup::metrics::Metrics | ||||||||||||||||||||||||||||||||||||||
| for MetricsWrapper<F> | ||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||
| fn increment_write_failures(err: String) { | ||||||||||||||||||||||||||||||||||||||
| CLICKHOUSE_METRICS.write_failures(err).inc(); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -60,6 +118,7 @@ impl rbuilder_utils::clickhouse::backup::metrics::Metrics for MetricsWrapper { | |||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| fn set_disk_backup_size(size_bytes: u64, batches: usize, order: &'static str) { | ||||||||||||||||||||||||||||||||||||||
| F::on_disk_backup_size(size_bytes); | ||||||||||||||||||||||||||||||||||||||
| CLICKHOUSE_METRICS.backup_size_bytes(order, "disk").set(size_bytes); | ||||||||||||||||||||||||||||||||||||||
| CLICKHOUSE_METRICS.backup_size_batches(order, "disk").set(batches); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -120,25 +179,27 @@ impl ClickhouseIndexer { | |||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| let send_timeout = Duration::from_millis(args.send_timeout_ms); | ||||||||||||||||||||||||||||||||||||||
| let end_timeout = Duration::from_millis(args.end_timeout_ms); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| let bundle_inserter_join_handle = | ||||||||||||||||||||||||||||||||||||||
| spawn_clickhouse_inserter_and_backup::<SystemBundle, BundleRow, MetricsWrapper>( | ||||||||||||||||||||||||||||||||||||||
| &client, | ||||||||||||||||||||||||||||||||||||||
| receivers.bundle_rx, | ||||||||||||||||||||||||||||||||||||||
| &task_executor, | ||||||||||||||||||||||||||||||||||||||
| args.bundles_table_name, | ||||||||||||||||||||||||||||||||||||||
| builder_name.clone(), | ||||||||||||||||||||||||||||||||||||||
| disk_backup.clone(), | ||||||||||||||||||||||||||||||||||||||
| args.backup_memory_max_size_bytes, | ||||||||||||||||||||||||||||||||||||||
| send_timeout, | ||||||||||||||||||||||||||||||||||||||
| end_timeout, | ||||||||||||||||||||||||||||||||||||||
| TARGET_INDEXER, | ||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||
| let bundle_inserter_join_handle = spawn_clickhouse_inserter_and_backup::< | ||||||||||||||||||||||||||||||||||||||
| SystemBundle, | ||||||||||||||||||||||||||||||||||||||
| BundleRow, | ||||||||||||||||||||||||||||||||||||||
| MetricsWrapper<UpdateBundleSizeCallback>, | ||||||||||||||||||||||||||||||||||||||
| >( | ||||||||||||||||||||||||||||||||||||||
| &client, | ||||||||||||||||||||||||||||||||||||||
| receivers.bundle_rx, | ||||||||||||||||||||||||||||||||||||||
| &task_executor, | ||||||||||||||||||||||||||||||||||||||
| args.bundles_table_name, | ||||||||||||||||||||||||||||||||||||||
| builder_name.clone(), | ||||||||||||||||||||||||||||||||||||||
| disk_backup.clone(), | ||||||||||||||||||||||||||||||||||||||
| args.backup_memory_max_size_bytes, | ||||||||||||||||||||||||||||||||||||||
| send_timeout, | ||||||||||||||||||||||||||||||||||||||
| end_timeout, | ||||||||||||||||||||||||||||||||||||||
| TARGET_INDEXER, | ||||||||||||||||||||||||||||||||||||||
| ); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| let bundle_receipt_inserter_join_handle = spawn_clickhouse_inserter_and_backup::< | ||||||||||||||||||||||||||||||||||||||
| BundleReceipt, | ||||||||||||||||||||||||||||||||||||||
| BundleReceiptRow, | ||||||||||||||||||||||||||||||||||||||
| MetricsWrapper, | ||||||||||||||||||||||||||||||||||||||
| MetricsWrapper<UpdateBundleReceiptsSizeCallback>, | ||||||||||||||||||||||||||||||||||||||
| >( | ||||||||||||||||||||||||||||||||||||||
| &client, | ||||||||||||||||||||||||||||||||||||||
| receivers.bundle_receipt_rx, | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -160,18 +221,29 @@ pub(crate) mod tests { | |||||||||||||||||||||||||||||||||||||
| use std::{borrow::Cow, collections::BTreeMap, fs, time::Duration}; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| use crate::{ | ||||||||||||||||||||||||||||||||||||||
| cli::ClickhouseArgs, | ||||||||||||||||||||||||||||||||||||||
| cli::{ClickhouseArgs, IndexerArgs, OrderflowIngressArgs}, | ||||||||||||||||||||||||||||||||||||||
| consts::FLASHBOTS_SIGNATURE_HEADER, | ||||||||||||||||||||||||||||||||||||||
| indexer::{ | ||||||||||||||||||||||||||||||||||||||
| BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME, OrderSenders, TARGET_INDEXER, | ||||||||||||||||||||||||||||||||||||||
| BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME, Indexer, OrderSenders, TARGET_INDEXER, | ||||||||||||||||||||||||||||||||||||||
| click::{ | ||||||||||||||||||||||||||||||||||||||
| ClickhouseClientConfig, ClickhouseIndexer, | ||||||||||||||||||||||||||||||||||||||
| models::{BundleReceiptRow, BundleRow}, | ||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
| tests::{bundle_receipt_example, system_bundle_example}, | ||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
| utils::{SHUTDOWN_TIMEOUT, wait_for_critical_tasks}, | ||||||||||||||||||||||||||||||||||||||
| jsonrpc::{JSONRPC_VERSION_2, JsonRpcError, JsonRpcResponse, JsonRpcResponseTy}, | ||||||||||||||||||||||||||||||||||||||
| utils::{ | ||||||||||||||||||||||||||||||||||||||
| SHUTDOWN_TIMEOUT, | ||||||||||||||||||||||||||||||||||||||
| testutils::{Random, random_raw_bundle_with_tx_count_and_input_size}, | ||||||||||||||||||||||||||||||||||||||
| wait_for_critical_tasks, | ||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||
| use alloy_primitives::keccak256; | ||||||||||||||||||||||||||||||||||||||
| use alloy_signer::Signer; | ||||||||||||||||||||||||||||||||||||||
| use alloy_signer_local::PrivateKeySigner; | ||||||||||||||||||||||||||||||||||||||
| use axum::{Json, http::StatusCode, routing::post}; | ||||||||||||||||||||||||||||||||||||||
| use clickhouse::{Client as ClickhouseClient, error::Result as ClickhouseResult}; | ||||||||||||||||||||||||||||||||||||||
| use rbuilder_primitives::serialize::RawBundle; | ||||||||||||||||||||||||||||||||||||||
| use rbuilder_utils::{ | ||||||||||||||||||||||||||||||||||||||
| clickhouse::{ | ||||||||||||||||||||||||||||||||||||||
| Quantities, | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -180,14 +252,16 @@ pub(crate) mod tests { | |||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
| tasks::TaskManager, | ||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||
| use serde_json::json; | ||||||||||||||||||||||||||||||||||||||
| use testcontainers::{ | ||||||||||||||||||||||||||||||||||||||
| ContainerAsync, Image, | ||||||||||||||||||||||||||||||||||||||
| core::{ | ||||||||||||||||||||||||||||||||||||||
| ContainerPort, WaitFor, error::Result as TestcontainersResult, wait::HttpWaitStrategy, | ||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||
| runners::AsyncRunner as _, | ||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||
| use tokio::{runtime::Handle, sync::mpsc}; | ||||||||||||||||||||||||||||||||||||||
| use tokio::{net::TcpListener, runtime::Handle, sync::mpsc}; | ||||||||||||||||||||||||||||||||||||||
| use tokio_util::sync::CancellationToken; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| // Uncomment to enable logging during tests. | ||||||||||||||||||||||||||||||||||||||
| // use tracing::level_filters::LevelFilter; | ||||||||||||||||||||||||||||||||||||||
|
|
@@ -552,4 +626,182 @@ pub(crate) mod tests { | |||||||||||||||||||||||||||||||||||||
| drop(image); | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| /// Integration test: when backup DB exceeds disk_max_size_to_accept_user_rpc_mb, user RPC | ||||||||||||||||||||||||||||||||||||||
| /// returns DiskFull; after fixing ClickHouse and draining backup, RPC accepts again. | ||||||||||||||||||||||||||||||||||||||
| /// A little long func.. consider improving this. | ||||||||||||||||||||||||||||||||||||||
| #[tokio::test(flavor = "multi_thread")] | ||||||||||||||||||||||||||||||||||||||
| async fn disk_full_rpc_rejects_then_accepts_after_drain() { | ||||||||||||||||||||||||||||||||||||||
| const BACKUP_DISK_MAX_SIZE_BYTES: u64 = 5; | ||||||||||||||||||||||||||||||||||||||
| const BUNDLE_TX_COUNT: usize = 10; | ||||||||||||||||||||||||||||||||||||||
| const BUNDLE_TX_INPUT_SIZE: usize = 1024; | ||||||||||||||||||||||||||||||||||||||
| const DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB: u64 = 1; | ||||||||||||||||||||||||||||||||||||||
| // We need to fill DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB with bundles. | ||||||||||||||||||||||||||||||||||||||
| // 2 is to play it safe. | ||||||||||||||||||||||||||||||||||||||
| const BUNDLE_COUNT_TO_FILL_DISK: usize = 2 * | ||||||||||||||||||||||||||||||||||||||
| (DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB * 1024 * 1024) as usize / | ||||||||||||||||||||||||||||||||||||||
| (BUNDLE_TX_COUNT * BUNDLE_TX_INPUT_SIZE); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| const FLOWPROXY_START_DELAY_MS: Duration = Duration::from_millis(800); | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
| // Assume 100ms per bundle to clickhouse (it's a LOT) | ||||||||||||||||||||||||||||||||||||||
| const DRAIN_TIMEOUT: Duration = | ||||||||||||||||||||||||||||||||||||||
| Duration::from_millis(100 * BUNDLE_COUNT_TO_FILL_DISK as u64); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| let mut rng = rand::rng(); | ||||||||||||||||||||||||||||||||||||||
| let task_manager = TaskManager::new(tokio::runtime::Handle::current()); | ||||||||||||||||||||||||||||||||||||||
| let task_executor = task_manager.executor(); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| // 1. Start ClickHouse without tables so inserts fail and go to backup. | ||||||||||||||||||||||||||||||||||||||
| let (image, client, config) = create_test_clickhouse_client(false).await.unwrap(); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| let temp_dir = tempfile::tempdir().unwrap(); | ||||||||||||||||||||||||||||||||||||||
| let backup_path = temp_dir.path().join("clickhouse-backup.db"); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| let mut clickhouse_args: ClickhouseArgs = config.clone().into(); | ||||||||||||||||||||||||||||||||||||||
| clickhouse_args.backup_disk_database_path = backup_path.to_string_lossy().to_string(); | ||||||||||||||||||||||||||||||||||||||
| clickhouse_args.backup_disk_max_size_bytes = BACKUP_DISK_MAX_SIZE_BYTES * 1024 * 1024; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| // 2. Mock builder so forwarder requests complete. | ||||||||||||||||||||||||||||||||||||||
| let builder_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||||||||||||||||||||||||||||||||||||||
| let builder_port = builder_listener.local_addr().unwrap().port(); | ||||||||||||||||||||||||||||||||||||||
| let builder_url = format!("http://127.0.0.1:{builder_port}"); | ||||||||||||||||||||||||||||||||||||||
| let app = | ||||||||||||||||||||||||||||||||||||||
| axum::Router::new().route("/", post(|| async { (StatusCode::OK, Json(json!({}))) })); | ||||||||||||||||||||||||||||||||||||||
| tokio::spawn(async move { axum::serve(builder_listener, app).await.unwrap() }); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| let mut args = OrderflowIngressArgs::default().gzip_enabled().disable_builder_hub(); | ||||||||||||||||||||||||||||||||||||||
| args.peer_update_interval_s = 5; | ||||||||||||||||||||||||||||||||||||||
| args.indexing = IndexerArgs { clickhouse: Some(clickhouse_args), parquet: None }; | ||||||||||||||||||||||||||||||||||||||
| args.disk_max_size_to_accept_user_rpc_mb = DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB; // 1 MiB threshold | ||||||||||||||||||||||||||||||||||||||
| args.builder_url = Some(builder_url.clone()); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| let user_listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); | ||||||||||||||||||||||||||||||||||||||
| let user_port = user_listener.local_addr().unwrap().port(); | ||||||||||||||||||||||||||||||||||||||
| let user_url = format!("http://127.0.0.1:{user_port}"); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| let (indexer_handle, _indexer_join_handles) = | ||||||||||||||||||||||||||||||||||||||
| Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone()); | ||||||||||||||||||||||||||||||||||||||
| let cancellation_token = CancellationToken::new(); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| tokio::spawn(async move { | ||||||||||||||||||||||||||||||||||||||
| crate::run_with_listeners( | ||||||||||||||||||||||||||||||||||||||
| args, | ||||||||||||||||||||||||||||||||||||||
| user_listener, | ||||||||||||||||||||||||||||||||||||||
| None, | ||||||||||||||||||||||||||||||||||||||
| task_executor, | ||||||||||||||||||||||||||||||||||||||
| indexer_handle, | ||||||||||||||||||||||||||||||||||||||
| cancellation_token, | ||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||
| .await | ||||||||||||||||||||||||||||||||||||||
| .unwrap(); | ||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| tokio::time::sleep(FLOWPROXY_START_DELAY_MS).await; | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
| let reqwest_client = reqwest::Client::default(); | ||||||||||||||||||||||||||||||||||||||
| let signer = PrivateKeySigner::random(); | ||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+699
to
+703
|
||||||||||||||||||||||||||||||||||||||
| tokio::time::sleep(FLOWPROXY_START_DELAY_MS).await; | |
| let reqwest_client = reqwest::Client::default(); | |
| let signer = PrivateKeySigner::random(); | |
| let reqwest_client = reqwest::Client::default(); | |
| let signer = PrivateKeySigner::random(); | |
| let readiness_client = reqwest_client.clone(); | |
| tokio::time::timeout(FLOWPROXY_START_DELAY_MS, async move { | |
| loop { | |
| match readiness_client.get(&user_url).send().await { | |
| Ok(_) => break, | |
| Err(_) => tokio::time::sleep(Duration::from_millis(50)).await, | |
| } | |
| } | |
| }) | |
| .await | |
| .expect("flowproxy did not become ready in time"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flag is documented as 'MB' but the runtime conversion uses
1024 * 1024(MiB). Either update docs/flag naming to MiB (recommended for clarity) or change the conversion to use decimal MB (1_000_000) to match the current documentation. Also consider aligning the env var name with the unit (e.g.,..._MBor..._MIB) to avoid ambiguity.