Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ once_cell = "1.19"
# Rate limiting
governor = "0.6"

# Concurrency
parking_lot = "0.12"

[dev-dependencies]
tempfile = { workspace = true }
tokio-test = "0.4"
11 changes: 6 additions & 5 deletions crates/rpc/src/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use alloy_primitives::{Address, Bytes, B256, U256};
use alloy_rpc_types_eth::{Block, Filter, Log, Transaction, TransactionReceipt};
use async_trait::async_trait;
use cipherbft_execution::database::Provider;
use parking_lot::RwLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tracing::{debug, trace};
Expand All @@ -47,7 +48,7 @@ pub struct ProviderBasedRpcStorage<P: Provider> {
/// Latest known block number (updated by consensus).
latest_block: AtomicU64,
/// Sync status tracking.
sync_state: std::sync::RwLock<SyncStateTracker>,
sync_state: RwLock<SyncStateTracker>,
}

/// Internal sync state tracker.
Expand All @@ -70,7 +71,7 @@ impl<P: Provider> ProviderBasedRpcStorage<P> {
provider,
chain_id,
latest_block: AtomicU64::new(0),
sync_state: std::sync::RwLock::new(SyncStateTracker::default()),
sync_state: RwLock::new(SyncStateTracker::default()),
}
}

Expand All @@ -86,7 +87,7 @@ impl<P: Provider> ProviderBasedRpcStorage<P> {

/// Update sync status (called by sync service).
pub fn set_syncing(&self, starting: u64, current: u64, highest: u64) {
let mut state = self.sync_state.write().expect("sync_state lock poisoned");
let mut state = self.sync_state.write();
state.is_syncing = true;
state.starting_block = starting;
state.current_block = current;
Expand All @@ -95,7 +96,7 @@ impl<P: Provider> ProviderBasedRpcStorage<P> {

/// Mark sync as complete.
pub fn set_synced(&self) {
let mut state = self.sync_state.write().expect("sync_state lock poisoned");
let mut state = self.sync_state.write();
state.is_syncing = false;
}

Expand Down Expand Up @@ -169,7 +170,7 @@ impl<P: Provider + 'static> RpcStorage for ProviderBasedRpcStorage<P> {

async fn sync_status(&self) -> RpcResult<SyncStatus> {
trace!("ProviderBasedRpcStorage::sync_status");
let state = self.sync_state.read().expect("sync_state lock poisoned");
let state = self.sync_state.read();
if state.is_syncing {
Ok(SyncStatus::Syncing {
starting_block: state.starting_block,
Expand Down
10 changes: 6 additions & 4 deletions crates/rpc/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
use std::collections::HashMap;
use std::net::IpAddr;
use std::num::NonZeroU32;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::{Duration, Instant};

use parking_lot::RwLock;

use governor::{Quota, RateLimiter};
use tracing::{debug, info, warn};

Expand Down Expand Up @@ -46,12 +48,12 @@ impl IpRateLimiter {
pub fn check(&self, ip: IpAddr) -> bool {
// Get or create limiter for this IP
let limiter = {
let limiters = self.limiters.read().expect("RwLock poisoned");
let limiters = self.limiters.read();
if let Some(limiter) = limiters.get(&ip) {
Arc::clone(limiter)
} else {
drop(limiters);
let mut limiters = self.limiters.write().expect("RwLock poisoned");
let mut limiters = self.limiters.write();
// Double-check after acquiring write lock
if let Some(limiter) = limiters.get(&ip) {
Arc::clone(limiter)
Expand All @@ -78,7 +80,7 @@ impl IpRateLimiter {
/// Clean up stale entries (IPs that haven't been seen for a while).
/// This should be called periodically to prevent memory leaks.
pub fn cleanup_stale_entries(&self) {
let mut limiters = self.limiters.write().expect("RwLock poisoned");
let mut limiters = self.limiters.write();
// Simple cleanup: remove entries with refcount == 1 (only our map holds them)
limiters.retain(|_, limiter| Arc::strong_count(limiter) > 1);
}
Expand Down