Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 33 additions & 38 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
//! Bundler service responsible for fetching bundles and sending them to the simulator.
use crate::config::BuilderConfig;
use init4_bin_base::perms::SharedToken;
use reqwest::{Client, Url};
use signet_tx_cache::types::{TxCacheBundle, TxCacheBundlesResponse};
use init4_bin_base::perms::tx_cache::BuilderTxCache;
use signet_tx_cache::{TxCacheError, types::TxCacheBundle};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
task::JoinHandle,
time::{self, Duration},
};
use tracing::{Instrument, debug, error, trace, trace_span};
use tracing::{Instrument, error, trace, trace_span};

/// Poll interval for the bundle poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
Expand All @@ -18,10 +17,10 @@ const POLL_INTERVAL_MS: u64 = 1000;
pub struct BundlePoller {
/// The builder configuration values.
config: &'static BuilderConfig,
/// Authentication module that periodically fetches and stores auth tokens.
token: SharedToken,
/// Holds a Reqwest client
client: Client,

/// Client for the tx cache.
tx_cache: BuilderTxCache,

/// Defines the interval at which the bundler polls the tx-pool for bundles.
poll_interval_ms: u64,
}
Expand All @@ -42,34 +41,37 @@ impl BundlePoller {
/// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self {
let config = crate::config();
let token = config.oauth_token();
Self { config, token, client: Client::new(), poll_interval_ms }
}

/// Fetches bundles from the transaction cache and returns them.
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<TxCacheBundle>> {
let bundle_url: Url = self.config.tx_pool_url.join("bundles")?;
let token =
self.token.secret().await.map_err(|e| eyre::eyre!("Failed to read token: {e}"))?;

self.client
.get(bundle_url)
.bearer_auth(token)
.send()
.await?
.error_for_status()?
.json()
.await
.map(|resp: TxCacheBundlesResponse| resp.bundles)
.map_err(Into::into)
let cache = signet_tx_cache::TxCache::new(config.tx_pool_url.clone());
let tx_cache = BuilderTxCache::new(cache, config.oauth_token());
Self { config, tx_cache, poll_interval_ms }
}

/// Returns the poll duration as a [`Duration`].
const fn poll_duration(&self) -> Duration {
Duration::from_millis(self.poll_interval_ms)
}

async fn task_future(mut self, outbound: UnboundedSender<TxCacheBundle>) {
/// Checks the bundle cache for new bundles.
pub async fn check_bundle_cache(&self) -> Result<Vec<TxCacheBundle>, TxCacheError> {
let res = self.tx_cache.get_bundles().await;

match res {
Ok(bundles) => {
trace!(count = ?bundles.len(), "found bundles");
Ok(bundles)
}
Err(TxCacheError::NotOurSlot) => {
trace!("Not our slot to fetch bundles");
Err(TxCacheError::NotOurSlot)
}
Err(err) => {
error!(?err, "Failed to fetch bundles from tx-cache");
Err(err)
}
}
}

async fn task_future(self, outbound: UnboundedSender<TxCacheBundle>) {
loop {
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);

Expand All @@ -85,17 +87,10 @@ impl BundlePoller {
// exit the span after the check.
drop(_guard);

if let Ok(bundles) = self
.check_bundle_cache()
.instrument(span.clone())
.await
.inspect_err(|err| debug!(%err, "Error fetching bundles"))
{
let _guard = span.entered();
trace!(count = ?bundles.len(), "found bundles");
if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await {
for bundle in bundles.into_iter() {
if let Err(err) = outbound.send(bundle) {
error!(err = ?err, "Failed to send bundle - channel is dropped");
span_debug!(span, ?err, "Failed to send bundle - channel is dropped");
break;
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/tasks/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,10 @@ impl EnvTask {
.expect("valid timestamp");
let sim_slot = self.config.slot_calculator.current_slot().expect("chain has started");

// Create a `SimEnv` span
// Create a `BlockConstruction` span
let span = info_span!(
parent: None,
"SimEnv",
"BlockConstruction",
confirmed.host.number = host_block_number,
confirmed.host.hash = tracing::field::Empty,
confirmed.ru.number = rollup_block_number,
Expand Down Expand Up @@ -321,15 +321,15 @@ impl EnvTask {
Err(QuinceyError::NotOurSlot) => {
span_debug!(
span,
"not our slot according to quincey - skipping block submission"
"not our slot according to quincey - skipping block construction"
);
continue;
}
Err(err) => {
span_error!(
span,
%err,
"error during quincey preflight check - skipping block submission"
"error during quincey preflight check - skipping block construction"
);
continue;
}
Expand All @@ -339,13 +339,13 @@ impl EnvTask {
let host_block_opt = res_unwrap_or_continue!(
host_block_res,
span,
error!("error fetching previous host block - skipping block submission")
error!("error fetching previous host block - skipping block construction")
);

let host_header = opt_unwrap_or_continue!(
host_block_opt,
span,
warn!("previous host block not found - skipping block submission")
warn!("previous host block not found - skipping block construction")
)
.header
.inner;
Expand Down
2 changes: 1 addition & 1 deletion tests/bundle_poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ async fn test_bundle_poller_roundtrip() -> Result<()> {
setup_logging();
setup_test_config();

let mut bundle_poller = builder::tasks::cache::BundlePoller::new();
let bundle_poller = builder::tasks::cache::BundlePoller::new();

let _ = bundle_poller.check_bundle_cache().await?;

Expand Down