Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 126 additions & 2 deletions src/faucet/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ impl<S: RateLimiterStorage> RateLimiterCore<S> {
Ok((faucet_info, id))
}

/// Same shape as [`parse_request_path`] but expects a trailing `/refund` segment.
fn parse_refund_path(path: &str) -> Result<(FaucetInfo, String)> {
let mut path_info = path.split('/');
if path_info.next_back() != Some("refund") {
return Err(Error::RustError(
"rate limiter refund path must end with /refund".into(),
));
}
let id = path_info.next_back().unwrap_or_default().to_string();
let faucet_info = FaucetInfo::from_str(path_info.next_back().unwrap_or_default())
.map_err(|e| Error::RustError(e.to_string()))?;
Ok((faucet_info, id))
}

async fn get_rate_limit(
&self,
faucet_info: &FaucetInfo,
Expand Down Expand Up @@ -184,6 +198,35 @@ impl<S: RateLimiterStorage> RateLimiterCore<S> {
Ok(())
}

async fn refund_last_drip(&self, faucet_info: &FaucetInfo, id: &str) -> Result<()> {
let drip_amount = faucet_info.drip_amount();
let dripped = self
.storage
.get::<DripAmount>("dripped")
.await
.ok()
.flatten()
.unwrap_or_else(|| DripAmount::zero(faucet_info.token_type()));
let claimed_key = format!("claimed_{id}");
let claimed = self
.storage
.get::<DripAmount>(&claimed_key)
.await
.ok()
.flatten()
.unwrap_or_else(|| DripAmount::zero(faucet_info.token_type()));
let updated_dripped = dripped.saturating_sub(&drip_amount);
let updated_claimed = claimed.saturating_sub(&drip_amount);
self.storage.put("dripped", updated_dripped.clone()).await?;
self.storage
.put(&claimed_key, updated_claimed.clone())
.await?;
log::info!(
"{faucet_info} Rate limiter refund for {id}: claimed={updated_claimed:?}, dripped={updated_dripped:?}"
);
Ok(())
}

#[allow(dead_code)]
async fn handle_request(&self, path: &str, now: DateTime<Utc>) -> Result<Option<i64>> {
let (faucet_info, id) = Self::parse_request_path(path)?;
Expand All @@ -208,13 +251,37 @@ impl<S: RateLimiterStorage> RateLimiterCore<S> {
pub struct RateLimiter {
#[cfg(not(test))]
state: State,
env: Env,
}

#[cfg(not(test))]
impl RateLimiter {
fn parse_request_path(path: &str) -> Result<(FaucetInfo, String)> {
RateLimiterCore::<DurableObjectStorage>::parse_request_path(path)
}

fn parse_refund_path(path: &str) -> Result<(FaucetInfo, String)> {
RateLimiterCore::<DurableObjectStorage>::parse_refund_path(path)
}

fn refund_authorized(req: &Request, env: &Env) -> Result<bool> {
let Ok(expected) = env
.secret("RATE_LIMITER_REFUND_SECRET")
.map(|s| s.to_string())
else {
return Ok(false);
};
let expected = expected.trim();
if expected.is_empty() {
return Ok(false);
}
let bearer = req
.headers()
.get("Authorization")?
.and_then(|h| h.strip_prefix("Bearer ").map(|t| t.trim().to_string()))
.unwrap_or_default();
Ok(bearer == expected)
}
fn create_core(&self) -> RateLimiterCore<DurableObjectStorage<'_>> {
RateLimiterCore::new(DurableObjectStorage::new(&self.state))
}
Expand All @@ -240,15 +307,28 @@ impl RateLimiter {
.update_rate_limit(faucet_info, id, now, claimed, dripped)
.await
}

async fn refund_rate_limit_for_path(&self, path: &str) -> Result<()> {
let (faucet_info, id) = Self::parse_refund_path(path)?;
self.create_core().refund_last_drip(&faucet_info, &id).await
}
}

#[cfg(not(test))]
impl DurableObject for RateLimiter {
fn new(state: State, _env: Env) -> Self {
Self { state }
fn new(state: State, env: Env) -> Self {
Self { state, env }
}

async fn fetch(&self, req: Request) -> Result<Response> {
if req.method() == Method::Post {
if !Self::refund_authorized(&req, &self.env)? {
return Response::error("Unauthorized", 401);
}
self.refund_rate_limit_for_path(&req.path()).await?;
return Response::ok("");
}

let now = Utc::now();
let (faucet_info, id) = Self::parse_request_path(&req.path())?;
let (is_allowed, retry_after, claimed, dripped) =
Expand Down Expand Up @@ -455,6 +535,50 @@ mod tests {
assert_eq!(id, "test_wallet_123");
}

#[tokio::test]
async fn test_parse_refund_path_ok() {
let path = "http://do/rate_limiter/CalibnetFIL/test_wallet_123/refund";
let (faucet_info, id) =
RateLimiterCore::<MockRateLimiterStorage>::parse_refund_path(path).unwrap();
assert_eq!(faucet_info, FaucetInfo::CalibnetFIL);
assert_eq!(id, "test_wallet_123");
}

#[tokio::test]
async fn test_refund_subtracts_last_drip() {
use crate::utils::drip_amount::TokenType;
use mockall::predicate::eq;

let wallet_id = "w1";
let faucet_info = FaucetInfo::CalibnetFIL;
let drip = faucet_info.drip_amount();
let drip_for_claimed = drip.clone();
let zero = DripAmount::zero(TokenType::Native);

let mut mock_storage = MockRateLimiterStorage::new();
mock_storage
.expect_get::<DripAmount>()
.with(eq("dripped"))
.return_once(move |_| Ok(Some(drip.clone())));
mock_storage
.expect_get::<DripAmount>()
.with(eq(format!("claimed_{wallet_id}")))
.return_once(move |_| Ok(Some(drip_for_claimed.clone())));
mock_storage
.expect_put::<DripAmount>()
.with(eq("dripped"), eq(zero.clone()))
.return_once(|_, _| Ok(()));
mock_storage
.expect_put::<DripAmount>()
.with(eq(format!("claimed_{wallet_id}")), eq(zero))
.return_once(|_, _| Ok(()));

let core = RateLimiterCore::new(mock_storage);
core.refund_last_drip(&faucet_info, wallet_id)
.await
.unwrap();
}

/// Checks path parsing with an invalid path.
#[tokio::test]
async fn test_parse_request_path_invalid_faucet() {
Expand Down
47 changes: 47 additions & 0 deletions src/faucet/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,53 @@ async fn query_rate_limiter(
.await
}

/// Undoes one drip allocation in the rate limiter DO after a failed on-chain submission.
/// Server-internal: do not call from a browser-facing server function (would defeat rate limiting).
pub async fn refund_rate_limit_by_key(
faucet_info: FaucetInfo,
wallet_addr: AnyAddress,
) -> Result<(), ServerFnError> {
SendWrapper::new(async move {
let Extension(env): Extension<Arc<Env>> = extract().await?;
if env
.secret("RATE_LIMITER_DISABLED")
.map(|v| v.to_string().to_lowercase() == "true")
.unwrap_or(false)
{
return Ok(());
}
let token = match env.secret("RATE_LIMITER_REFUND_SECRET") {
Ok(s) if !s.to_string().trim().is_empty() => s.to_string(),
_ => {
log::warn!("RATE_LIMITER_REFUND_SECRET unset/empty: skipping rate limit refund");
return Ok(());
}
};
let stub = env
.durable_object("RATE_LIMITER")?
.id_from_name(&faucet_info.to_string())?
.get_stub()?;
let headers = Headers::new();
headers.set("Authorization", &format!("Bearer {}", token.trim()))?;
let mut init = RequestInit::new();
init.with_method(Method::Post).with_headers(headers);
let request = Request::new_with_init(
&format!("http://do/rate_limiter/{faucet_info}/{wallet_addr}/refund"),
&init,
)?;
let status = stub
.fetch_with_request(request)
.await
.map_err(ServerFnError::new)?
.status_code();
if !(200..300).contains(&status) {
log::warn!("rate limit refund DO returned HTTP {status} (faucet={faucet_info})");
}
Ok(())
})
.await
}

/// Checks if the request can proceed based on the rate limit for the given faucet.
pub async fn check_rate_limit(
faucet_info: FaucetInfo,
Expand Down
40 changes: 25 additions & 15 deletions src/faucet/server_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use alloy::{sol, sol_types::SolCall};

#[cfg(feature = "ssr")]
use super::server::{
check_rate_limit, read_faucet_secret, secret_key, sign_with_eth_secret_key,
sign_with_secret_key,
check_rate_limit, read_faucet_secret, refund_rate_limit_by_key, secret_key,
sign_with_eth_secret_key, sign_with_secret_key,
};

#[cfg(feature = "ssr")]
Expand Down Expand Up @@ -439,14 +439,20 @@ async fn handle_native_claim(
)
.await
{
Ok(LotusJson(smsg)) => {
let cid = rpc.mpool_push(smsg).await.map_err(ServerFnError::new)?;
let tx_hash = rpc
Ok(LotusJson(smsg)) => match rpc.mpool_push(smsg).await.map_err(ServerFnError::new) {
Ok(cid) => rpc
.eth_get_transaction_hash_by_cid(cid)
.await
.map_err(ServerFnError::new)?;
Ok(tx_hash)
}
.map_err(ServerFnError::new),
Err(e) => {
let _ = refund_rate_limit_by_key(
faucet_info,
AnyAddress::Filecoin(LotusJson(id_address)),
)
.await;
Err(e)
}
},
Err(err) => Err(handle_faucet_error(err)),
}
}
Expand All @@ -469,13 +475,17 @@ async fn handle_erc20_claim(
let gas_price = rpc.gas_price().await.map_err(ServerFnError::new)?;

match signed_erc20_transfer(eth_to, nonce, gas_price, faucet_info).await {
Ok(signed) => {
let tx_hash = rpc
.send_eth_transaction_signed(&signed)
.await
.map_err(ServerFnError::new)?;
Ok(tx_hash)
}
Ok(signed) => match rpc
.send_eth_transaction_signed(&signed)
.await
.map_err(ServerFnError::new)
{
Ok(tx_hash) => Ok(tx_hash),
Err(e) => {
let _ = refund_rate_limit_by_key(faucet_info, AnyAddress::Ethereum(eth_to)).await;
Err(e)
}
},
Err(err) => Err(handle_faucet_error(err)),
}
}
Expand Down
16 changes: 15 additions & 1 deletion src/utils/drip_amount.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use fvm_shared::{bigint::Zero, econ::TokenAmount, sector::StoragePower};
use fvm_shared::{econ::TokenAmount, sector::StoragePower};
use num_traits::Zero as _;
use serde::{Deserialize, Serialize};
use std::ops::{Add, AddAssign, Mul};

Expand Down Expand Up @@ -27,6 +28,19 @@ impl DripAmount {
TokenType::Datacap => DripAmount::Storage(StoragePower::zero()),
}
}

/// Subtract `rhs` from self, saturating at the appropriate zero representation.
pub fn saturating_sub(&self, rhs: &DripAmount) -> DripAmount {
match (self, rhs) {
(DripAmount::Token(a), DripAmount::Token(b)) => {
DripAmount::Token(if *a <= *b { TokenAmount::zero() } else { a - b })
}
(DripAmount::Storage(a), DripAmount::Storage(b)) => {
DripAmount::Storage(if a <= b { StoragePower::zero() } else { a - b })
}
_ => unreachable!("DripAmount variant mismatch"),
}
}
}

impl Add<&DripAmount> for &DripAmount {
Expand Down
Loading