Skip to content
11 changes: 11 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,16 @@ pub struct OrderflowIngressArgs {
#[clap(long = "http.enable-gzip", default_value_t = false)]
pub gzip_enabled: bool,

/// Maximum local ClickHouse backup disk size in MB above which user RPC (e.g. eth_sendBundle)
/// is rejected with disk full. Defaults to 1024 MB (1 GiB).
#[clap(
long = "disk-max-size-to-accept-user-rpc-mb",
default_value_t = 1024,
env = "DISK_MAX_SIZE_TO_ACCEPT_USER_RPC",
id = "DISK_MAX_SIZE_TO_ACCEPT_USER_RPC"
)]
pub disk_max_size_to_accept_user_rpc_mb: u64,
Comment on lines +269 to +277
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flag is documented as 'MB' but the runtime conversion uses 1024 * 1024 (MiB). Either update docs/flag naming to MiB (recommended for clarity) or change the conversion to use decimal MB (1_000_000) to match the current documentation. Also consider aligning the env var name with the unit (e.g., ..._MB or ..._MIB) to avoid ambiguity.

Copilot uses AI. Check for mistakes.

/// The interval in seconds to update the peer list from BuilderHub.
#[clap(
long = "peer.update-interval-s",
Expand Down Expand Up @@ -342,6 +352,7 @@ impl Default for OrderflowIngressArgs {
score_bucket_s: 4,
log_json: false,
gzip_enabled: false,
disk_max_size_to_accept_user_rpc_mb: 1024,
tcp_small_clients: NonZero::new(1).expect("non-zero"),
tcp_big_clients: 0,
io_threads: 4,
Expand Down
296 changes: 274 additions & 22 deletions src/indexer/click/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
//! Indexing functionality powered by Clickhouse.

use std::{fmt::Debug, time::Duration};
use std::{
fmt::Debug,
marker::PhantomData,
sync::{
LazyLock,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};

use crate::{
cli::ClickhouseArgs,
Expand Down Expand Up @@ -34,9 +42,59 @@ fn config_from_clickhouse_args(args: &ClickhouseArgs, validation: bool) -> Click
}
}

struct MetricsWrapper;
/// little global (puaj) info to easily get the current clickhouse disk size.
#[derive(Default)]
pub(crate) struct ClickhouseLocalBackupDiskSize {
bundles_size: AtomicU64,
bundle_receipts_size: AtomicU64,
}

impl ClickhouseLocalBackupDiskSize {
pub(crate) fn set_bundles_size(&self, size: u64) {
self.bundles_size.store(size, Ordering::Relaxed);
}
pub(crate) fn set_bundle_receipts_size(&self, size: u64) {
self.bundle_receipts_size.store(size, Ordering::Relaxed);
}
pub(crate) fn disk_size(&self) -> u64 {
self.bundles_size.load(Ordering::Relaxed) +
self.bundle_receipts_size.load(Ordering::Relaxed)
}
}

/// We store here the current disk size of the backup database to avoid querying the metrics since
/// that would include a string map access.
pub(crate) static CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE: LazyLock<ClickhouseLocalBackupDiskSize> =
LazyLock::new(ClickhouseLocalBackupDiskSize::default);

/// Callback invoked when disk backup size is set. Implement this trait to observe size updates.
pub(crate) trait DiskBackupSizeCallback: Send + Sync {
fn on_disk_backup_size(size_bytes: u64);
}

struct UpdateBundleSizeCallback;

impl DiskBackupSizeCallback for UpdateBundleSizeCallback {
fn on_disk_backup_size(size_bytes: u64) {
CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.set_bundles_size(size_bytes);
}
}

struct UpdateBundleReceiptsSizeCallback;

impl DiskBackupSizeCallback for UpdateBundleReceiptsSizeCallback {
fn on_disk_backup_size(size_bytes: u64) {
CLICKHOUSE_LOCAL_BACKUP_DISK_SIZE.set_bundle_receipts_size(size_bytes);
}
}

struct MetricsWrapper<F>(PhantomData<F>)
where
F: DiskBackupSizeCallback;

impl rbuilder_utils::clickhouse::backup::metrics::Metrics for MetricsWrapper {
impl<F: DiskBackupSizeCallback> rbuilder_utils::clickhouse::backup::metrics::Metrics
for MetricsWrapper<F>
{
fn increment_write_failures(err: String) {
CLICKHOUSE_METRICS.write_failures(err).inc();
}
Expand All @@ -60,6 +118,7 @@ impl rbuilder_utils::clickhouse::backup::metrics::Metrics for MetricsWrapper {
}

fn set_disk_backup_size(size_bytes: u64, batches: usize, order: &'static str) {
F::on_disk_backup_size(size_bytes);
CLICKHOUSE_METRICS.backup_size_bytes(order, "disk").set(size_bytes);
CLICKHOUSE_METRICS.backup_size_batches(order, "disk").set(batches);
}
Expand Down Expand Up @@ -120,25 +179,27 @@ impl ClickhouseIndexer {

let send_timeout = Duration::from_millis(args.send_timeout_ms);
let end_timeout = Duration::from_millis(args.end_timeout_ms);

let bundle_inserter_join_handle =
spawn_clickhouse_inserter_and_backup::<SystemBundle, BundleRow, MetricsWrapper>(
&client,
receivers.bundle_rx,
&task_executor,
args.bundles_table_name,
builder_name.clone(),
disk_backup.clone(),
args.backup_memory_max_size_bytes,
send_timeout,
end_timeout,
TARGET_INDEXER,
);
let bundle_inserter_join_handle = spawn_clickhouse_inserter_and_backup::<
SystemBundle,
BundleRow,
MetricsWrapper<UpdateBundleSizeCallback>,
>(
&client,
receivers.bundle_rx,
&task_executor,
args.bundles_table_name,
builder_name.clone(),
disk_backup.clone(),
args.backup_memory_max_size_bytes,
send_timeout,
end_timeout,
TARGET_INDEXER,
);

let bundle_receipt_inserter_join_handle = spawn_clickhouse_inserter_and_backup::<
BundleReceipt,
BundleReceiptRow,
MetricsWrapper,
MetricsWrapper<UpdateBundleReceiptsSizeCallback>,
>(
&client,
receivers.bundle_receipt_rx,
Expand All @@ -160,18 +221,29 @@ pub(crate) mod tests {
use std::{borrow::Cow, collections::BTreeMap, fs, time::Duration};

use crate::{
cli::ClickhouseArgs,
cli::{ClickhouseArgs, IndexerArgs, OrderflowIngressArgs},
consts::FLASHBOTS_SIGNATURE_HEADER,
indexer::{
BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME, OrderSenders, TARGET_INDEXER,
BUNDLE_RECEIPTS_TABLE_NAME, BUNDLE_TABLE_NAME, Indexer, OrderSenders, TARGET_INDEXER,
click::{
ClickhouseClientConfig, ClickhouseIndexer,
models::{BundleReceiptRow, BundleRow},
},
tests::{bundle_receipt_example, system_bundle_example},
},
utils::{SHUTDOWN_TIMEOUT, wait_for_critical_tasks},
jsonrpc::{JSONRPC_VERSION_2, JsonRpcError, JsonRpcResponse, JsonRpcResponseTy},
utils::{
SHUTDOWN_TIMEOUT,
testutils::{Random, random_raw_bundle_with_tx_count_and_input_size},
wait_for_critical_tasks,
},
};
use alloy_primitives::keccak256;
use alloy_signer::Signer;
use alloy_signer_local::PrivateKeySigner;
use axum::{Json, http::StatusCode, routing::post};
use clickhouse::{Client as ClickhouseClient, error::Result as ClickhouseResult};
use rbuilder_primitives::serialize::RawBundle;
use rbuilder_utils::{
clickhouse::{
Quantities,
Expand All @@ -180,14 +252,16 @@ pub(crate) mod tests {
},
tasks::TaskManager,
};
use serde_json::json;
use testcontainers::{
ContainerAsync, Image,
core::{
ContainerPort, WaitFor, error::Result as TestcontainersResult, wait::HttpWaitStrategy,
},
runners::AsyncRunner as _,
};
use tokio::{runtime::Handle, sync::mpsc};
use tokio::{net::TcpListener, runtime::Handle, sync::mpsc};
use tokio_util::sync::CancellationToken;

// Uncomment to enable logging during tests.
// use tracing::level_filters::LevelFilter;
Expand Down Expand Up @@ -552,4 +626,182 @@ pub(crate) mod tests {
drop(image);
}
}

/// Integration test: when backup DB exceeds disk_max_size_to_accept_user_rpc_mb, user RPC
/// returns DiskFull; after fixing ClickHouse and draining backup, RPC accepts again.
/// A little long func.. consider improving this.
#[tokio::test(flavor = "multi_thread")]
async fn disk_full_rpc_rejects_then_accepts_after_drain() {
const BACKUP_DISK_MAX_SIZE_BYTES: u64 = 5;
const BUNDLE_TX_COUNT: usize = 10;
const BUNDLE_TX_INPUT_SIZE: usize = 1024;
const DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB: u64 = 1;
// We need to fill DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB with bundles.
// 2 is to play it safe.
const BUNDLE_COUNT_TO_FILL_DISK: usize = 2 *
(DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB * 1024 * 1024) as usize /
(BUNDLE_TX_COUNT * BUNDLE_TX_INPUT_SIZE);

const FLOWPROXY_START_DELAY_MS: Duration = Duration::from_millis(800);
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test uses a fixed sleep to wait for the server to start, which is a common source of flaky CI failures. Prefer polling a readiness condition (e.g., hitting /readyz or retrying the RPC until it responds) with a bounded timeout instead of sleeping a fixed duration.

Copilot uses AI. Check for mistakes.
// Assume 100ms per bundle to clickhouse (it's a LOT)
const DRAIN_TIMEOUT: Duration =
Duration::from_millis(100 * BUNDLE_COUNT_TO_FILL_DISK as u64);

let mut rng = rand::rng();
let task_manager = TaskManager::new(tokio::runtime::Handle::current());
let task_executor = task_manager.executor();

// 1. Start ClickHouse without tables so inserts fail and go to backup.
let (image, client, config) = create_test_clickhouse_client(false).await.unwrap();

let temp_dir = tempfile::tempdir().unwrap();
let backup_path = temp_dir.path().join("clickhouse-backup.db");

let mut clickhouse_args: ClickhouseArgs = config.clone().into();
clickhouse_args.backup_disk_database_path = backup_path.to_string_lossy().to_string();
clickhouse_args.backup_disk_max_size_bytes = BACKUP_DISK_MAX_SIZE_BYTES * 1024 * 1024;

// 2. Mock builder so forwarder requests complete.
let builder_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let builder_port = builder_listener.local_addr().unwrap().port();
let builder_url = format!("http://127.0.0.1:{builder_port}");
let app =
axum::Router::new().route("/", post(|| async { (StatusCode::OK, Json(json!({}))) }));
tokio::spawn(async move { axum::serve(builder_listener, app).await.unwrap() });

let mut args = OrderflowIngressArgs::default().gzip_enabled().disable_builder_hub();
args.peer_update_interval_s = 5;
args.indexing = IndexerArgs { clickhouse: Some(clickhouse_args), parquet: None };
args.disk_max_size_to_accept_user_rpc_mb = DISK_MAX_SIZE_TO_ACCEPT_USER_RPC_MB; // 1 MiB threshold
args.builder_url = Some(builder_url.clone());

let user_listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let user_port = user_listener.local_addr().unwrap().port();
let user_url = format!("http://127.0.0.1:{user_port}");

let (indexer_handle, _indexer_join_handles) =
Indexer::run(args.indexing.clone(), args.builder_name.clone(), task_executor.clone());
let cancellation_token = CancellationToken::new();

tokio::spawn(async move {
crate::run_with_listeners(
args,
user_listener,
None,
task_executor,
indexer_handle,
cancellation_token,
)
.await
.unwrap();
});

tokio::time::sleep(FLOWPROXY_START_DELAY_MS).await;

let reqwest_client = reqwest::Client::default();
let signer = PrivateKeySigner::random();

Comment on lines +699 to +703
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test uses a fixed sleep to wait for the server to start, which is a common source of flaky CI failures. Prefer polling a readiness condition (e.g., hitting /readyz or retrying the RPC until it responds) with a bounded timeout instead of sleeping a fixed duration.

Suggested change
tokio::time::sleep(FLOWPROXY_START_DELAY_MS).await;
let reqwest_client = reqwest::Client::default();
let signer = PrivateKeySigner::random();
let reqwest_client = reqwest::Client::default();
let signer = PrivateKeySigner::random();
let readiness_client = reqwest_client.clone();
tokio::time::timeout(FLOWPROXY_START_DELAY_MS, async move {
loop {
match readiness_client.get(&user_url).send().await {
Ok(_) => break,
Err(_) => tokio::time::sleep(Duration::from_millis(50)).await,
}
}
})
.await
.expect("flowproxy did not become ready in time");

Copilot uses AI. Check for mistakes.
// 3. Phase 1: send bundles until backup exceeds threshold and we get DiskFull.
// Delay between requests so the indexer can commit batches (which fail and go to backup).
let mut got_disk_full = false;
for _ in 0..BUNDLE_COUNT_TO_FILL_DISK {
let bundle = random_raw_bundle_with_tx_count_and_input_size(
&mut rng,
BUNDLE_TX_COUNT,
Some(BUNDLE_TX_INPUT_SIZE),
);
let response = send_bundle_req(&reqwest_client, &user_url, &signer, &bundle).await;
if is_disk_full(response).await {
got_disk_full = true;
break;
}
tokio::time::sleep(Duration::from_millis(30)).await;
}
assert!(got_disk_full, "expected RPC to eventually return DiskFull");

// 4. Phase 2: create tables so backup can drain.
create_clickhouse_bundles_table(&client).await.unwrap();
create_clickhouse_bundle_receipts_table(&client).await.unwrap();

// 5. Poll until RPC accepts again (backup drained).
let poll_interval = Duration::from_millis(500);
let mut elapsed = Duration::ZERO;
while elapsed < DRAIN_TIMEOUT {
tokio::time::sleep(poll_interval).await;
elapsed += poll_interval;
let bundle = RawBundle::random(&mut rng);
let response = send_bundle_req(&reqwest_client, &user_url, &signer, &bundle).await;
if response.status().is_success() {
let body = response.bytes().await.unwrap();
let parsed: JsonRpcResponse<serde_json::Value> =
serde_json::from_slice(body.as_ref()).expect("valid json");
if matches!(parsed.result_or_error, JsonRpcResponseTy::Result(_)) {
break;
}
}
}
assert!(elapsed < DRAIN_TIMEOUT, "RPC did not accept again within {:?}", DRAIN_TIMEOUT);

// 6. Phase 3: one more successful eth_sendBundle.
let bundle = RawBundle::random(&mut rng);
let response = send_bundle_req(&reqwest_client, &user_url, &signer, &bundle).await;
assert!(
response.status().is_success(),
"expected success after drain, got {}",
response.text().await.unwrap_or_default()
);
let body = response.bytes().await.unwrap();
let parsed: JsonRpcResponse<serde_json::Value> =
serde_json::from_slice(body.as_ref()).unwrap();
assert!(
matches!(parsed.result_or_error, JsonRpcResponseTy::Result(_)),
"expected result, got {:?}",
parsed.result_or_error
);

drop(image);
}

async fn is_disk_full(response: reqwest::Response) -> bool {
let status = response.status();
let body = response.bytes().await.unwrap();
if !status.is_success() {
let parsed: JsonRpcResponse<()> = match serde_json::from_slice(body.as_ref()) {
Ok(p) => p,
Err(_) => return false,
};
matches!(
parsed.result_or_error,
JsonRpcResponseTy::Error { code: -32603, message: JsonRpcError::DiskFull }
)
} else {
false
}
}

async fn send_bundle_req(
client: &reqwest::Client,
url: &str,
signer: &PrivateKeySigner,
bundle: &RawBundle,
) -> reqwest::Response {
let body = json!({
"id": 0,
"jsonrpc": JSONRPC_VERSION_2,
"method": "eth_sendBundle",
"params": [bundle]
});
let body_bytes = serde_json::to_vec(&body).unwrap();
let sighash = format!("{:?}", keccak256(&body_bytes));
let sig = signer.sign_message(sighash.as_bytes()).await.unwrap();
let signature_header = format!("{:?}:{}", signer.address(), sig);
client
.post(url)
.header(reqwest::header::CONTENT_TYPE, "application/json")
.header(FLASHBOTS_SIGNATURE_HEADER, signature_header)
.body(body_bytes)
.send()
.await
.unwrap()
}
}
Loading