Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 39 additions & 6 deletions crates/hotblocks/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub fn build_api(app: App) -> Router {
Router::new()
.route("/", get(|| async { "Welcome to SQD hot block data service!" }))
.route("/datasets/{id}/stream", post(stream))
.route("/datasets/{id}/finalized-stream", post(finalized_stream))
.route("/datasets/{id}/head", get(get_head))
.route("/datasets/{id}/finalized-head", get(get_finalized_head))
.route("/datasets/{id}/retention", get(get_retention).post(set_retention))
Expand All @@ -66,25 +67,57 @@ async fn stream(
Path(dataset_id): Path<DatasetId>,
Json(query): Json<Query>
) -> Response
{
stream_internal(app, dataset_id, query, false).await
}


async fn finalized_stream(
Extension(app): Extension<AppRef>,
Path(dataset_id): Path<DatasetId>,
Json(query): Json<Query>
) -> Response
{
stream_internal(app, dataset_id, query, true).await
}


async fn stream_internal(
app: AppRef,
dataset_id: DatasetId,
query: Query,
finalized: bool
) -> Response
{
let dataset = get_dataset!(app, dataset_id);

if let Err(err) = query.validate() {
return text!(StatusCode::BAD_REQUEST, "{}", err)
}

match app.query_service.query(&dataset, query).await {
let query_result = if finalized {
app.query_service.query_finalized(&dataset, query).await
} else {
app.query_service.query(&dataset, query).await
};

match query_result {
Ok(stream) => {
let mut res = Response::builder()
.status(200)
.header("content-type", "text/plain")
.header("content-encoding", "gzip");

if let Some(head) = stream.finalized_head() {
let head_block = head.number.max(dataset.get_head_block_number().unwrap_or(0));
res = res.header("x-sqd-head-number", head_block);
res = res.header("x-sqd-finalized-head-number", head.number);
res = res.header("x-sqd-finalized-head-hash", head.hash.as_str());
if let Some(finalized_head) = stream.finalized_head() {
if finalized {
// For finalized stream, use the finalized head as the head
res = res.header("x-sqd-head-number", finalized_head.number);
} else {
let head_block = finalized_head.number.max(dataset.get_head_block_number().unwrap_or(0));
res = res.header("x-sqd-head-number", head_block);
}
res = res.header("x-sqd-finalized-head-number", finalized_head.number);
res = res.header("x-sqd-finalized-head-hash", finalized_head.hash.as_str());
} else if let Some(head_block) = dataset.get_head_block_number() {
res = res.header("x-sqd-head-number", head_block);
}
Expand Down
12 changes: 12 additions & 0 deletions crates/hotblocks/src/dataset_controller/dataset_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ impl DatasetController {
recv.changed().await.unwrap()
}
}

pub async fn wait_for_finalized_block(&self, block_number: BlockNumber) -> BlockNumber {
let mut recv = self.finalized_head_receiver.clone();
loop {
if let Some(block) = recv.borrow_and_update().as_ref() {
if block.number >= block_number {
return block.number
}
}
recv.changed().await.unwrap()
}
}
}


Expand Down
3 changes: 2 additions & 1 deletion crates/hotblocks/src/query/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl QueryResponse {
db: DBRef,
dataset_id: DatasetId,
query: Query,
only_finalized: bool,
) -> anyhow::Result<Self>
{
let Some(slot) = executor.get_slot() else {
Expand All @@ -33,7 +34,7 @@ impl QueryResponse {
let start = Instant::now();

let mut runner = slot.run(move |slot| -> anyhow::Result<_> {
let mut runner = RunningQuery::new(db, dataset_id, &query).map(Box::new)?;
let mut runner = RunningQuery::new(db, dataset_id, &query, only_finalized).map(Box::new)?;
next_run(&mut runner, slot)?;
Ok(runner)
}).await?;
Expand Down
20 changes: 18 additions & 2 deletions crates/hotblocks/src/query/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ impl RunningQuery {
pub fn new(
db: DBRef,
dataset_id: DatasetId,
query: &Query
query: &Query,
only_finalized: bool,
) -> anyhow::Result<Self>
{
let snapshot = StaticSnapshot::new(db);
Expand Down Expand Up @@ -95,9 +96,24 @@ impl RunningQuery {
query.compile()
};

let last_block = if only_finalized {
// Cap the query's last_block to the finalized head
if let Some(finalized_head) = &finalized_head {
let capped_last = query
.last_block()
.map(|end| end.min(finalized_head.number))
.or(Some(finalized_head.number));
capped_last
Comment on lines +102 to +106
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The variable capped_last is unnecessary and reduces clarity. Directly return the result of the expression on line 102-105.

Suggested change
let capped_last = query
.last_block()
.map(|end| end.min(finalized_head.number))
.or(Some(finalized_head.number));
capped_last
query
.last_block()
.map(|end| end.min(finalized_head.number))
.or(Some(finalized_head.number))

Copilot uses AI. Check for mistakes.
} else {
anyhow::bail!("Finalized head is not available yet");
Copy link

Copilot AI Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message 'Finalized head is not available yet' should be more specific about the context. Consider including that this is for a finalized query, e.g., 'Cannot execute finalized query: finalized head is not available yet'.

Suggested change
anyhow::bail!("Finalized head is not available yet");
anyhow::bail!("Cannot execute finalized query: finalized head is not available yet");

Copilot uses AI. Check for mistakes.
}
} else {
query.last_block()
};

Ok(Self {
plan,
last_block: query.last_block(),
last_block,
left_over: None,
next_chunk: Some(Ok(first_chunk)),
chunk_iterator,
Expand Down
49 changes: 40 additions & 9 deletions crates/hotblocks/src/query/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,28 @@ impl QueryService {
QueryServiceBuilder::new(db)
}

pub async fn query(&self, dataset: &DatasetController, query: Query) -> anyhow::Result<QueryResponse> {
pub async fn query(
&self,
dataset: &DatasetController,
query: Query,
) -> anyhow::Result<QueryResponse> {
self.query_internal(dataset, query, false).await
}

pub async fn query_finalized(
&self,
dataset: &DatasetController,
query: Query,
) -> anyhow::Result<QueryResponse> {
self.query_internal(dataset, query, true).await
}

async fn query_internal(
&self,
dataset: &DatasetController,
query: Query,
finalized: bool,
) -> anyhow::Result<QueryResponse> {
ensure!(
dataset.dataset_kind() == DatasetKind::from_query(&query),
QueryKindMismatch {
Expand All @@ -86,7 +107,13 @@ impl QueryService {
}
);

let should_wait = match dataset.get_head() {
let target_head = if finalized {
dataset.get_finalized_head()
} else {
dataset.get_head()
};

let should_wait = match target_head {
Some(head) if head.number >= query.first_block() => false,
Some(head) if head.number + 1 == query.first_block() => {
if let Some(parent_hash) = query.parent_block_hash() {
Expand All @@ -107,21 +134,25 @@ impl QueryService {
let Some(_wait_slot) = self.wait_slots.get() else {
bail!(Busy)
};
tokio::time::timeout(
Duration::from_secs(5),
dataset.wait_for_block(query.first_block())
).await.map_err(|_| {
QueryIsAboveTheHead {
finalized_head: None
tokio::time::timeout(Duration::from_secs(5), async {
if finalized {
dataset.wait_for_finalized_block(query.first_block()).await
} else {
dataset.wait_for_block(query.first_block()).await
}
})
.await
.map_err(|_| QueryIsAboveTheHead {
finalized_head: None
})?;
}

QueryResponse::new(
self.executor.clone(),
self.db.clone(),
dataset.dataset_id(),
query
query,
finalized,
).await
}
}
Expand Down