Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 34 additions & 28 deletions server/index-node/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use graph::components::versions::VERSIONS;
use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap};
use graph::data::subgraph::{status, DeploymentFeatures};
use graph::data::value::Object;
use graph::futures03::TryFutureExt;
use graph::futures03::{future, TryFutureExt};
use graph::prelude::*;
use graph_graphql::prelude::{a, ExecutionContext, Resolver};

Expand Down Expand Up @@ -352,7 +352,10 @@ where
))
}

fn resolve_proof_of_indexing(&self, field: &a::Field) -> Result<r::Value, QueryExecutionError> {
async fn resolve_proof_of_indexing(
&self,
field: &a::Field,
) -> Result<r::Value, QueryExecutionError> {
let deployment_id = field
.get_required::<DeploymentHash>("subgraph")
.expect("Valid subgraphId required");
Expand Down Expand Up @@ -381,7 +384,7 @@ where
let poi_fut = self
.store
.get_proof_of_indexing(&deployment_id, &indexer, block.clone());
let poi = match graph::futures03::executor::block_on(poi_fut) {
let poi = match poi_fut.await {
Ok(Some(poi)) => r::Value::String(format!("0x{}", hex::encode(poi))),
Ok(None) => r::Value::Null,
Err(e) => {
Expand Down Expand Up @@ -414,28 +417,29 @@ where
return Err(QueryExecutionError::TooExpensive);
}

let mut public_poi_results = vec![];
for request in requests {
let (poi_result, request) = match self
.store
.get_public_proof_of_indexing(&request.deployment, request.block_number, self)
.await
{
Ok(Some(poi)) => (Some(poi), request),
Ok(None) => (None, request),
Err(e) => {
error!(
self.logger,
"Failed to query public proof of indexing";
"subgraph" => &request.deployment,
"block" => format!("{}", request.block_number),
"error" => format!("{:?}", e)
);
(None, request)
}
};
// Process all POI requests in parallel for better throughput
let poi_futures: Vec<_> = requests
.into_iter()
.map(|request| async move {
let poi_result = match self
.store
.get_public_proof_of_indexing(&request.deployment, request.block_number, self)
.await
{
Ok(Some(poi)) => Some(poi),
Ok(None) => None,
Err(e) => {
error!(
self.logger,
"Failed to query public proof of indexing";
"subgraph" => &request.deployment,
"block" => format!("{}", request.block_number),
"error" => format!("{:?}", e)
);
None
}
};

public_poi_results.push(
PublicProofOfIndexingResult {
deployment: request.deployment,
block: match poi_result {
Expand All @@ -444,9 +448,11 @@ where
},
proof_of_indexing: poi_result.map(|(_, poi)| poi),
}
.into_value(),
)
}
.into_value()
})
.collect();

let public_poi_results = future::join_all(poi_futures).await;

Ok(r::Value::List(public_poi_results))
}
Expand Down Expand Up @@ -791,7 +797,7 @@ where
field.name.as_str(),
scalar_type.name.as_str(),
) {
("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field),
("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field).await,
("Query", "blockData", "JSONObject") => self.resolve_block_data(field).await,
("Query", "blockHashFromNumber", "Bytes") => {
self.resolve_block_hash_from_number(field).await
Expand Down