Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1ef71a2
add pool indexer crate, focused on subgraph seeding
AryanGodara May 18, 2026
632f916
fix CI: truncate all tables in one statement
AryanGodara May 18, 2026
f21e002
fix metrics label, and failing CI job
AryanGodara May 18, 2026
3239628
refactor: acc review comments, remove explainer doc-comments, update …
AryanGodara May 20, 2026
f68a76f
docs: add README for pool-indexer crate detailing setup and usage
AryanGodara May 20, 2026
17d1f3e
add new tests for pool indexer functionality
AryanGodara May 20, 2026
3bfee07
docs: clarify pool-indexer module comment
AryanGodara May 20, 2026
3671eb8
fix: derive pool liquidity from Mint/Burn events instead of historica…
AryanGodara May 22, 2026
61efadd
fix: pin V3 pool snapshot to indexer's actual block via wait_until ba…
AryanGodara May 22, 2026
d1bd61a
fix: supervise pool-indexer backfill tasks via JoinSet so panics exit…
AryanGodara May 22, 2026
d79e58e
fix: probe subgraph factory before seeding to catch URL/factory misma…
AryanGodara May 22, 2026
f6c1877
fix: gate UPSERT with total_delta <> 0 to close INSERT-with-zero leak…
AryanGodara May 23, 2026
9fb748a
fix: aggregate decimals-missing pool drops into single DEBUG per resp…
AryanGodara May 23, 2026
c5bef57
fix: probe Factory entity (not Pool.factory) for subgraph attestation
AryanGodara May 26, 2026
f119dab
fix: catch_up checkpoint at seed_block to avoid re-applying Mint/Burn…
AryanGodara May 26, 2026
ce32bc1
fix: tolerate indexer bootstrap 503 in driver wait_until barrier
AryanGodara May 26, 2026
c9d8acd
fix: silently skip apply_position_delta for unknown pools (foreign-fa…
AryanGodara May 26, 2026
bfc5f39
chore: drop subgraph-bearer-token; use public subgraph endpoints
AryanGodara May 27, 2026
31f4e16
refactor: model uniswap-v3 pool source as enum, not two Options
AryanGodara May 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ order-validation = { path = "crates/order-validation" }
orderbook = { path = "crates/orderbook" }
paste = "1.0"
pin-project-lite = "0.2.14"
pool-indexer = { path = "crates/pool-indexer" }
prettyplease = "0.2.37"
price-estimation = { path = "crates/price-estimation" }
proc-macro2 = "1.0.103"
Expand Down
10 changes: 8 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ RUN rustup install stable && rustup default stable
COPY . .
RUN --mount=type=cache,target=/usr/local/cargo/registry --mount=type=cache,target=/src/target \
CARGO_PROFILE_RELEASE_DEBUG=1 RUSTFLAGS="${RUSTFLAGS}" cargo build --release \
-p autopilot -p driver -p orderbook -p refunder -p solvers \
-p autopilot -p driver -p orderbook -p refunder -p solvers -p pool-indexer \
${CARGO_BUILD_FEATURES} && \
cp target/release/autopilot / && \
cp target/release/driver / && \
cp target/release/orderbook / && \
cp target/release/refunder / && \
cp target/release/solvers /
cp target/release/solvers / && \
cp target/release/pool-indexer /

# Create an intermediate image to extract the binaries
FROM docker.io/debian:bookworm-slim AS intermediate
Expand Down Expand Up @@ -53,6 +54,10 @@ FROM intermediate AS solvers
COPY --from=cargo-build /solvers /usr/local/bin/solvers
ENTRYPOINT [ "solvers" ]

FROM intermediate AS pool-indexer
COPY --from=cargo-build /pool-indexer /usr/local/bin/pool-indexer
ENTRYPOINT [ "pool-indexer" ]

# Extract Binary
FROM intermediate

Expand All @@ -62,5 +67,6 @@ COPY --from=cargo-build /driver /usr/local/bin/driver
COPY --from=cargo-build /orderbook /usr/local/bin/orderbook
COPY --from=cargo-build /refunder /usr/local/bin/refunder
COPY --from=cargo-build /solvers /usr/local/bin/solvers
COPY --from=cargo-build /pool-indexer /usr/local/bin/pool-indexer

ENTRYPOINT ["/usr/bin/tini", "-s", "--"]
33 changes: 3 additions & 30 deletions crates/autopilot/src/domain/settlement/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use {
anyhow::{Context, Result, anyhow},
eth_domain_types as eth,
futures::StreamExt,
rand::Rng as _,
std::time::Duration,
shared::retry::retry_with_sleep,
};

#[derive(Clone)]
Expand All @@ -45,9 +44,7 @@ impl Observer {
/// prod vs. staging).
pub async fn post_process_outstanding_settlement_transactions(&self) {
let settlements =
match Self::retry_with_sleep(|| self.persistence.get_settlements_without_auction())
.await
{
match retry_with_sleep(|| self.persistence.get_settlements_without_auction()).await {
Ok(settlements) => settlements,
Err(errs) => {
tracing::warn!(?errs, "failed to fetch unprocessed settlements");
Expand All @@ -67,7 +64,7 @@ impl Observer {
futures::stream::iter(settlements)
.for_each_concurrent(MAX_CONCURRENCY, |settlement| async move {
tracing::debug!(tx = ?settlement.transaction, "start post processing of settlement");
match Self::retry_with_sleep(|| self.post_process_settlement(settlement)).await {
match retry_with_sleep(|| self.post_process_settlement(settlement)).await {
Ok(_) => tracing::debug!(
tx = ?settlement.transaction,
"successfully post-processed settlement"
Expand Down Expand Up @@ -160,28 +157,4 @@ impl Observer {
}
}
}

async fn retry_with_sleep<F, OK, ERR>(future: impl Fn() -> F) -> Result<OK, Vec<ERR>>
where
F: Future<Output = Result<OK, ERR>>,
ERR: std::fmt::Debug,
{
const MAX_RETRIES: usize = 5;

let mut errors = Vec::new();
let mut tries = 0;
while tries < MAX_RETRIES {
match future().await {
Ok(res) => return Ok(res),
Err(err) => {
errors.push(err);
tries += 1;
// wait a little to give temporary errors a chance to resolve themselves
let timeout_with_jitter = 50u64 + rand::rng().random_range(0..=50);
tokio::time::sleep(Duration::from_millis(timeout_with_jitter)).await;
}
}
}
Err(errors)
}
}
20 changes: 20 additions & 0 deletions crates/chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,26 @@ impl Chain {
}
}

/// Kebab-case slug used in URLs and per-network configs (pool-indexer API
/// routes, DB database names, etc). Stable — other services parse it.
pub fn as_str(&self) -> &'static str {
match &self {
Self::Mainnet => "mainnet",
Self::Gnosis => "gnosis",
Self::Sepolia => "sepolia",
Self::ArbitrumOne => "arbitrum-one",
Self::Base => "base",
Self::Hardhat => "hardhat",
Self::Bnb => "bnb",
Self::Avalanche => "avalanche",
Self::Optimism => "optimism",
Self::Polygon => "polygon",
Self::Linea => "linea",
Self::Plasma => "plasma",
Self::Ink => "ink",
}
}

/// The default amount in native tokens atoms to use for price estimation
pub fn default_amount_to_estimate_native_prices_with(&self) -> U256 {
match &self {
Expand Down
34 changes: 22 additions & 12 deletions crates/configs/src/deserialize_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn invalid_value_unable_to_parse_url<E: serde::de::Error>(err: ParseError) -> E

/// Deserializes an URL from *either* an environment variable — with the format
/// `%<ENV_VAR_NAME>` — or interpreting a String as a URL.
pub(crate) fn deserialize_url_from_env<'de, D>(deserializer: D) -> Result<Url, D::Error>
pub fn deserialize_url_from_env<'de, D>(deserializer: D) -> Result<Url, D::Error>
where
D: Deserializer<'de>,
{
Expand All @@ -51,9 +51,9 @@ where
Url::from_str(&raw_url).map_err(invalid_value_unable_to_parse_url)
}

/// Deserializes an optional String from *either* an environment variable —
/// Deserializes a String from *either* an environment variable —
/// with the format `%<ENV_VAR_NAME>` — or directly from the field value.
pub(crate) fn deserialize_string_from_env<'de, D>(deserializer: D) -> Result<String, D::Error>
pub fn deserialize_string_from_env<'de, D>(deserializer: D) -> Result<String, D::Error>
where
D: Deserializer<'de>,
{
Expand All @@ -69,10 +69,17 @@ where
}

/// Deserializes an optional String from *either* an environment variable —
/// with the format `%<ENV_VAR_NAME>` — or directly from the field value. A
/// missing env var is treated as `None` rather than an error, matching
/// [`deserialize_optional_url_from_env`].
pub(crate) fn deserialize_optional_string_from_env<'de, D>(
/// with the format `%<ENV_VAR_NAME>` — or directly from the field value.
/// Missing field or missing env var (when referenced) → `None`.
///
/// When the field text references an env var (starts with `%`) but the env
/// var is unset, we log a `warn!` and return `None`. The deserializer can't
/// fail here because some callers legitimately want the optional-with-fallback
/// semantics (e.g. an unauthenticated subgraph endpoint is fine), but the
/// warning surfaces the misconfiguration — without it, a typo in the env var
/// name would silently disable a feature that the operator thought was on,
/// and the symptom (e.g. 401s on a subgraph) would appear far from its cause.
pub fn deserialize_optional_string_from_env<'de, D>(
Comment thread
AryanGodara marked this conversation as resolved.
deserializer: D,
) -> Result<Option<String>, D::Error>
where
Expand All @@ -82,17 +89,20 @@ where
return Ok(None);
};
match value.strip_prefix(ENV_VAR_PREFIX) {
// In the case of optional variables, we assume a missing env var as empty
Some(env_var_name) => Ok(std::env::var(env_var_name).ok()),
Some(env_var_name) => Ok(std::env::var(env_var_name).ok().or_else(|| {
tracing::warn!(
%env_var_name,
"optional env var referenced in config but not set; field is None",
);
None
})),
None => Ok(Some(value)),
}
}

/// Deserializes an optional URL from *either* an environment variable — with
/// the format `%<ENV_VAR_NAME>` — or interpreting a String as a URL.
pub(crate) fn deserialize_optional_url_from_env<'de, D>(
deserializer: D,
) -> Result<Option<Url>, D::Error>
pub fn deserialize_optional_url_from_env<'de, D>(deserializer: D) -> Result<Option<Url>, D::Error>
where
D: Deserializer<'de>,
{
Expand Down
2 changes: 1 addition & 1 deletion crates/configs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod autopilot;
pub mod banned_users;
pub mod database;
pub(crate) mod deserialize_env;
Comment thread
AryanGodara marked this conversation as resolved.
pub mod deserialize_env;
pub mod fee_factor;
pub mod gas_price_estimation;
pub mod http_client;
Expand Down
14 changes: 11 additions & 3 deletions crates/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub const TABLES: &[&str] = &[
"last_indexed_blocks",
"onchain_order_invalidations",
"onchain_placed_orders",
"pool_indexer_checkpoints",
"presignature_events",
"proposed_jit_orders",
"quotes",
Expand All @@ -71,6 +72,8 @@ pub const TABLES: &[&str] = &[
"solver_competitions",
"surplus_capturing_jit_order_owners",
"trades",
"uniswap_v3_pool_states",
"uniswap_v3_pools",
];

/// The names of potentially big volume tables we use in the db.
Expand All @@ -85,18 +88,23 @@ pub const LARGE_TABLES: &[&str] = &[
"order_quotes",
"proposed_solutions",
"proposed_trade_executions",
"uniswap_v3_ticks",
];

pub fn all_tables() -> impl Iterator<Item = &'static str> {
TABLES.iter().copied().chain(LARGE_TABLES.iter().copied())
}

/// Delete all data in the database. Only used by tests.
///
/// Truncates all tables in a single statement so Postgres accepts foreign-key
/// cycles between listed tables (e.g. `uniswap_v3_pool_states` →
/// `uniswap_v3_pools`). Individual per-table `TRUNCATE`s error out when any
/// other listed table references the one being truncated.
#[expect(non_snake_case)]
pub async fn clear_DANGER_(ex: &mut PgTransaction<'_>) -> sqlx::Result<()> {
for table in all_tables() {
ex.execute(format!("TRUNCATE {table};").as_str()).await?;
}
let tables = all_tables().collect::<Vec<_>>().join(", ");
ex.execute(format!("TRUNCATE {tables};").as_str()).await?;
Ok(())
}

Expand Down
44 changes: 40 additions & 4 deletions crates/driver/src/boundary/liquidity/uniswap/v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ use {
anyhow::Context,
eth_domain_types as eth,
event_indexing::{block_retriever::BlockRetrieving, maintenance::ServiceMaintenance},
liquidity_sources::uniswap_v3::pool_fetching::UniswapV3PoolFetcher,
liquidity_sources::uniswap_v3::{
V3PoolDataSource,
graph_api::UniV3SubgraphClient,
pool_fetching::UniswapV3PoolFetcher,
pool_indexer::PoolIndexerClient,
},
shared::{http_solver::model::TokenAmount, interaction::Interaction},
solver::{
liquidity::{
Expand Down Expand Up @@ -114,15 +119,14 @@ async fn init_liquidity(
config: &infra::liquidity::config::UniswapV3,
) -> anyhow::Result<impl LiquidityCollecting + use<>> {
let web3 = eth.web3().clone();
let source = build_pool_data_source(eth, config).await?;

let pool_fetcher = Arc::new(
UniswapV3PoolFetcher::new(
&config.graph_url,
source,
web3.clone(),
boundary::liquidity::http_client(),
block_retriever,
config.max_pools_to_initialize,
config.max_pools_per_tick_query,
)
.await
.context("failed to initialise UniswapV3 liquidity")?,
Expand All @@ -138,3 +142,35 @@ async fn init_liquidity(
pool_fetcher,
))
}

/// Picks the V3 pool data source based on the configured pool source variant.
async fn build_pool_data_source(
eth: &Ethereum,
config: &infra::liquidity::config::UniswapV3,
) -> anyhow::Result<Arc<dyn V3PoolDataSource>> {
use infra::liquidity::config::UniswapV3PoolSource;
let http = boundary::liquidity::http_client();

match &config.pool_source {
UniswapV3PoolSource::PoolIndexer(url) => {
tracing::info!(%url, "uniswap v3: using pool-indexer as data source");
Ok(Arc::new(PoolIndexerClient::new(
url.clone(),
eth.chain(),
http,
)))
}
UniswapV3PoolSource::Subgraph(subgraph) => {
tracing::info!(url = %subgraph.url, "uniswap v3: using subgraph as data source");
Ok(Arc::new(
UniV3SubgraphClient::from_subgraph_url(
&subgraph.url,
http,
subgraph.max_pools_per_tick_query,
)
.await
.context("failed to construct UniV3 subgraph client")?,
))
}
}
}
Loading
Loading