Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"crates/data-source",
"crates/dataset",
"crates/hotblocks",
"crates/hotblocks-retain",
"crates/polars",
"crates/primitives",
"crates/query",
Expand Down
10 changes: 10 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ COPY --from=hotblocks-builder /app/target/release/sqd-hotblocks .
ENTRYPOINT ["/app/sqd-hotblocks"]


FROM builder AS hotblocks-retain-builder
RUN cargo build -p sqd-hotblocks-retain --release


FROM rust AS hotblocks-retain
WORKDIR /app
COPY --from=hotblocks-retain-builder /app/target/release/sqd-hotblocks-retain .
ENTRYPOINT ["/app/sqd-hotblocks-retain"]


FROM builder AS archive-builder
RUN cargo build -p sqd-archive --release

Expand Down
17 changes: 17 additions & 0 deletions crates/hotblocks-retain/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "sqd-hotblocks-retain"
version = "0.1.0"
edition = "2024"

[dependencies]
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
reqwest = { workspace = true, features = ["gzip", "json", "stream"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
sqd-primitives = { path = "../primitives" }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "json"] }
url = { workspace = true }
32 changes: 32 additions & 0 deletions crates/hotblocks-retain/src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::path::PathBuf;

use clap::Parser;
use url::Url;

#[derive(Parser, Debug)]
#[command(version, about = "Hotblocks retain service", long_about = None)]
pub struct Cli {
/// URL of the Hotblocks service to send dataset information to
#[arg(long)]
pub hotblocks_url: Url,

/// URL of the status endpoint to poll for dataset updates
#[arg(long)]
pub status_url: Url,

/// Path to the YAML config file listing datasets to track
#[arg(long)]
pub datasets_config: PathBuf,

/// URL of the datasets YAML file listing available network datasets
#[arg(long)]
pub datasets_url: Url,

/// Interval in seconds between refreshing the datasets list
#[arg(long, default_value = "3600", value_parser = clap::value_parser!(u64).range(1..))]
pub datasets_update_interval_secs: u64,

/// Additional delay in seconds after effective_from before applying retention
#[arg(long, default_value = "0")]
pub retain_delay_secs: u64,
}
40 changes: 40 additions & 0 deletions crates/hotblocks-retain/src/datasets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::types::DatasetId;
use serde::Deserialize;
use std::collections::HashMap;
use url::Url;

#[derive(Deserialize)]
struct Dataset {
name: String,
id: DatasetId,
}

#[derive(Deserialize)]
struct DatasetsFile {
#[serde(rename = "sqd-network-datasets")]
sqd_network_datasets: Vec<Dataset>,
}

/// Downloads the datasets manifest and returns a map from dataset name to dataset ID.
pub async fn get_name_to_id(
client: &reqwest::Client,
url: &Url,
) -> anyhow::Result<HashMap<DatasetId, String>> {
let bytes = client
.get(url.as_str())
.send()
.await?
.error_for_status()?
.bytes()
.await?;

let file: DatasetsFile = serde_yaml::from_slice(&bytes)?;

let map = file
.sqd_network_datasets
.into_iter()
.map(|d| (d.name, d.id))
.collect();

Ok(map)
}
21 changes: 21 additions & 0 deletions crates/hotblocks-retain/src/hotblocks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use reqwest::Client;
use sqd_primitives::BlockNumber;
use url::Url;

pub async fn set_retention(
client: &Client,
base_url: &Url,
dataset: &str,
from_block: BlockNumber,
) -> anyhow::Result<()> {
let retention_url = base_url.join(&format!("/datasets/{dataset}/retention"))?;

client
.post(retention_url)
.json(&serde_json::json!({"FromBlock": {"number": from_block}}))
.send()
.await?
.error_for_status()?;

Ok(())
}
216 changes: 216 additions & 0 deletions crates/hotblocks-retain/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
mod cli;
mod datasets;
mod hotblocks;
mod status;
mod types;

use anyhow::Context;
use clap::Parser;
use cli::Cli;
use std::collections::HashMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::time::Instant;
use types::{DatasetId, DatasetsConfig};
use url::Url;

fn main() -> anyhow::Result<()> {
let args = Cli::parse();

let datasets: DatasetsConfig = {
let contents = std::fs::read_to_string(&args.datasets_config)
.with_context(|| format!("failed to read {}", args.datasets_config.display()))?;
serde_yaml::from_str(&contents)
.with_context(|| format!("failed to parse {}", args.datasets_config.display()))?
};

init_tracing();

tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()?
.block_on(
HotblocksRetain::new(
args.hotblocks_url,
args.status_url,
args.datasets_url,
datasets,
Duration::from_secs(args.datasets_update_interval_secs),
Duration::from_secs(args.retain_delay_secs),
)
.run(),
)?;

Ok(())
}

struct HotblocksRetain {
client: reqwest::Client,
hotblocks_url: Url,
status_url: Url,
datasets_url: Url,
datasets: DatasetsConfig,
datasets_update_interval: Duration,
retain_delay: Duration,
name_to_id: HashMap<String, DatasetId>,
last_datasets_refresh: Instant,
last_effective_from: Option<u64>,
}

impl HotblocksRetain {
fn new(
hotblocks_url: Url,
status_url: Url,
datasets_url: Url,
datasets: DatasetsConfig,
datasets_update_interval: Duration,
retain_delay: Duration,
) -> Self {
Self {
client: reqwest::Client::new(),
hotblocks_url,
status_url,
datasets_url,
datasets,
datasets_update_interval,
retain_delay,
name_to_id: HashMap::new(),
last_datasets_refresh: Instant::now() - datasets_update_interval,
last_effective_from: None,
}
}

async fn run(&mut self) -> anyhow::Result<()> {
loop {
self.maybe_refresh_datasets().await;

let status = match status::get_status(&self.client, self.status_url.as_str()).await {
Ok(status) => status,
Err(err) => {
tracing::warn!(error = ?err, "failed to fetch status");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
}
};

if self.last_effective_from == Some(status.effective_from) {
tracing::info!("effective_from unchanged, re-checking in 5 minutes");
tokio::time::sleep(Duration::from_secs(300)).await;
continue;
}

let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();

let apply_at = status.effective_from + self.retain_delay.as_secs();
if now < apply_at {
let wait_secs = apply_at - now;
tracing::info!(
wait_secs,
effective_from = status.effective_from,
retain_delay_secs = self.retain_delay.as_secs(),
"waiting for effective time + retain delay"
);
tokio::time::sleep(Duration::from_secs(wait_secs)).await;
}

if self.apply_retention(&status).await {
self.last_effective_from = Some(status.effective_from);
} else {
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
}

async fn maybe_refresh_datasets(&mut self) {
if self.last_datasets_refresh.elapsed() < self.datasets_update_interval {
return;
}

match datasets::get_name_to_id(&self.client, &self.datasets_url).await {
Ok(map) => {
tracing::info!("refreshed datasets manifest");
self.name_to_id = map;
self.last_datasets_refresh = Instant::now();
}
Err(err) => {
tracing::warn!(error = ?err, "failed to refresh datasets manifest");
}
}
}

async fn apply_retention(&self, status: &status::SchedulingStatus) -> bool {
let statuses = status
.datasets
.iter()
.map(|dataset| (dataset.id.as_str(), dataset.height))
.collect::<HashMap<_, _>>();

let mut all_success = true;

for (dataset, props) in &self.datasets {
let dataset_id = if let Some(id) = props.as_ref().and_then(|p| p.id.as_deref()) {
id
} else {
match self.name_to_id.get(dataset) {
Some(id) => id.as_str(),
None => {
tracing::warn!(dataset, "dataset not found in manifest, skipping");
continue;
}
}
};

match statuses.get(dataset_id) {
Some(Some(height)) => {
match hotblocks::set_retention(
&self.client,
&self.hotblocks_url,
dataset,
*height,
)
.await
{
Ok(()) => {
tracing::info!(dataset, height, "applied retention policy");
}
Err(err) => {
all_success = false;
tracing::warn!(dataset, height, error = ?err, "failed to apply retention");
}
}
}
Some(None) => {
tracing::info!(dataset, "dataset has no reported height yet");
}
None => {
tracing::warn!(dataset, "dataset not found in status");
}
}
}

all_success
}
}

fn init_tracing() {
use std::io::IsTerminal;

let env_filter = tracing_subscriber::EnvFilter::builder().parse_lossy(
std::env::var(tracing_subscriber::EnvFilter::DEFAULT_ENV).unwrap_or("info".to_string()),
);

if std::io::stdout().is_terminal() {
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.compact()
.init();
} else {
tracing_subscriber::fmt()
.with_env_filter(env_filter)
.json()
.with_current_span(false)
.init();
}
}
Loading
Loading