Skip to content

Commit 52c1aee

Browse files
feat: forester: grpc processing for v2 trees (#2024)
* feat(forester): gRPC-based event-driven processing for V2 trees * chore: add protobuf-compiler to setup-and-build * chore: update PHOTON_COMMIT version in versions.sh * feat: add grpc_port config to cli * cleanup * cleanup * cleanup * wait for indexer in rpc-interop.test.ts * wait for indexer in rpc-interop.test.ts * bump photon version * cleanup * cleanup
1 parent 897f8f7 commit 52c1aee

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1695
-141
lines changed

Cargo.lock

Lines changed: 372 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/src/commands/test-validator/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ class SetupCommand extends Command {
7575
default: 3001,
7676
exclusive: ["skip-prover"],
7777
}),
78+
"grpc-port": Flags.integer({
79+
description: "Enable Photon indexer gRPC on this port.",
80+
required: false,
81+
default: 50051,
82+
exclusive: ["skip-indexer"],
83+
}),
7884
"limit-ledger-size": Flags.integer({
7985
description: "Keep this amount of shreds in root slots.",
8086
required: false,
@@ -194,6 +200,7 @@ class SetupCommand extends Command {
194200
rpcPort: flags["rpc-port"],
195201
gossipHost: flags["gossip-host"],
196202
indexerPort: flags["indexer-port"],
203+
grpcPort: flags["grpc-port"],
197204
proverPort: flags["prover-port"],
198205
prover: !flags["skip-prover"],
199206
skipSystemAccounts: flags["skip-system-accounts"],

cli/src/utils/initTestEnv.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ export async function initTestEnv({
8787
prover = true,
8888
rpcPort = 8899,
8989
indexerPort = 8784,
90+
grpcPort = 50051,
9091
proverPort = 3001,
9192
gossipHost = "127.0.0.1",
9293
checkPhotonVersion = true,
@@ -101,6 +102,7 @@ export async function initTestEnv({
101102
prover: boolean;
102103
rpcPort?: number;
103104
indexerPort?: number;
105+
grpcPort?: number;
104106
proverPort?: number;
105107
gossipHost?: string;
106108
checkPhotonVersion?: boolean;
@@ -131,6 +133,7 @@ export async function initTestEnv({
131133
indexerPort,
132134
checkPhotonVersion,
133135
photonDatabaseUrl,
136+
grpcPort,
134137
);
135138
}
136139

cli/src/utils/processPhotonIndexer.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export async function startIndexer(
4141
indexerPort: number,
4242
checkPhotonVersion: boolean = true,
4343
photonDatabaseUrl?: string,
44+
grpcPort: number = 50051,
4445
) {
4546
await killIndexer();
4647
const resolvedOrNull = which.sync("photon", { nothrow: true });
@@ -57,6 +58,8 @@ export async function startIndexer(
5758
indexerPort.toString(),
5859
"--rpc-url",
5960
rpcUrl,
61+
"--grpc-port",
62+
grpcPort.toString(),
6063
];
6164
if (photonDatabaseUrl) {
6265
args.push("--db-url", photonDatabaseUrl);

forester-utils/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,7 @@ num-traits = { workspace = true }
4747
bb8 = { workspace = true }
4848
async-trait = { workspace = true }
4949
governor = { workspace = true }
50+
51+
[dev-dependencies]
52+
tokio-postgres = "0.7"
53+
bs58 = { workspace = true }

forester-utils/src/instructions/address_batch_update.rs

Lines changed: 80 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ use std::{pin::Pin, sync::Arc, time::Duration};
22

33
use account_compression::processor::initialize_address_merkle_tree::Pubkey;
44
use async_stream::stream;
5-
use futures::{
6-
stream::{FuturesOrdered, Stream},
7-
StreamExt,
8-
};
5+
use futures::stream::Stream;
96
use light_batched_merkle_tree::{
107
constants::DEFAULT_BATCH_ADDRESS_TREE_HEIGHT, merkle_tree::InstructionDataAddressAppendInputs,
118
};
@@ -23,8 +20,8 @@ use tracing::{debug, error, info, warn};
2320

2421
use crate::{error::ForesterUtilsError, rpc_pool::SolanaRpcPool, utils::wait_for_indexer};
2522

26-
const MAX_PHOTON_ELEMENTS_PER_CALL: usize = 500;
27-
const MAX_PROOFS_PER_TX: usize = 3;
23+
const MAX_PHOTON_ELEMENTS_PER_CALL: usize = 1000;
24+
const MAX_PROOFS_PER_TX: usize = 4;
2825

2926
pub struct AddressUpdateConfig<R: Rpc> {
3027
pub rpc_pool: Arc<SolanaRpcPool<R>>,
@@ -54,13 +51,14 @@ async fn stream_instruction_data<'a, R: Rpc>(
5451
let max_zkp_batches_per_call = calculate_max_zkp_batches_per_call(zkp_batch_size);
5552
let total_chunks = leaves_hash_chains.len().div_ceil(max_zkp_batches_per_call);
5653

54+
let mut next_queue_index: Option<u64> = None;
55+
5756
for chunk_idx in 0..total_chunks {
5857
let chunk_start = chunk_idx * max_zkp_batches_per_call;
5958
let chunk_end = std::cmp::min(chunk_start + max_zkp_batches_per_call, leaves_hash_chains.len());
6059
let chunk_hash_chains = &leaves_hash_chains[chunk_start..chunk_end];
6160

6261
let elements_for_chunk = chunk_hash_chains.len() * zkp_batch_size as usize;
63-
let processed_items_offset = chunk_start * zkp_batch_size as usize;
6462

6563
{
6664
if chunk_idx > 0 {
@@ -76,11 +74,15 @@ async fn stream_instruction_data<'a, R: Rpc>(
7674
let indexer_update_info = {
7775
let mut connection = rpc_pool.get_connection().await?;
7876
let indexer = connection.indexer_mut()?;
77+
debug!(
78+
"Requesting {} addresses from Photon for chunk {} with start_queue_index={:?}",
79+
elements_for_chunk, chunk_idx, next_queue_index
80+
);
7981
match indexer
8082
.get_address_queue_with_proofs(
8183
&merkle_tree_pubkey,
8284
elements_for_chunk as u16,
83-
Some(processed_items_offset as u64),
85+
next_queue_index,
8486
None,
8587
)
8688
.await {
@@ -92,6 +94,26 @@ async fn stream_instruction_data<'a, R: Rpc>(
9294
}
9395
};
9496

97+
// Log Photon response details
98+
debug!(
99+
"Photon response for chunk {}: received {} addresses, batch_start_index={}, first_queue_index={:?}, last_queue_index={:?}",
100+
chunk_idx,
101+
indexer_update_info.value.addresses.len(),
102+
indexer_update_info.value.batch_start_index,
103+
indexer_update_info.value.addresses.first().map(|a| a.queue_index),
104+
indexer_update_info.value.addresses.last().map(|a| a.queue_index)
105+
);
106+
107+
// Update next_queue_index for the next chunk based on the last address returned
108+
if let Some(last_address) = indexer_update_info.value.addresses.last() {
109+
next_queue_index = Some(last_address.queue_index + 1);
110+
debug!(
111+
"Setting next_queue_index={} for chunk {}",
112+
next_queue_index.unwrap(),
113+
chunk_idx + 1
114+
);
115+
}
116+
95117
if chunk_idx == 0 {
96118
if let Some(first_proof) = indexer_update_info.value.non_inclusion_proofs.first() {
97119
if first_proof.root != current_root {
@@ -121,56 +143,23 @@ async fn stream_instruction_data<'a, R: Rpc>(
121143
};
122144
current_root = new_current_root;
123145

124-
info!("Generating {} ZK proofs with hybrid approach for chunk {}", all_inputs.len(), chunk_idx + 1);
125-
126-
let mut futures_ordered = FuturesOrdered::new();
127-
let mut proof_buffer = Vec::new();
128-
let mut pending_count = 0;
146+
info!("Generating {} zk proofs for batch_address chunk {} (parallel)", all_inputs.len(), chunk_idx + 1);
129147

130-
for (i, inputs) in all_inputs.into_iter().enumerate() {
148+
// Generate ALL proofs in parallel using join_all
149+
let proof_futures: Vec<_> = all_inputs.into_iter().enumerate().map(|(i, inputs)| {
131150
let client = Arc::clone(&proof_client);
132-
futures_ordered.push_back(async move {
151+
async move {
133152
let result = client.generate_batch_address_append_proof(inputs).await;
134153
(i, result)
135-
});
136-
pending_count += 1;
137-
138-
if pending_count >= MAX_PROOFS_PER_TX {
139-
for _ in 0..MAX_PROOFS_PER_TX.min(pending_count) {
140-
if let Some((idx, result)) = futures_ordered.next().await {
141-
match result {
142-
Ok((compressed_proof, new_root)) => {
143-
let instruction_data = InstructionDataAddressAppendInputs {
144-
new_root,
145-
compressed_proof: CompressedProof {
146-
a: compressed_proof.a,
147-
b: compressed_proof.b,
148-
c: compressed_proof.c,
149-
},
150-
};
151-
proof_buffer.push(instruction_data);
152-
},
153-
Err(e) => {
154-
error!("Address proof failed to generate at index {}: {:?}", idx, e);
155-
yield Err(ForesterUtilsError::Prover(format!(
156-
"Address proof generation failed at batch {} in chunk {}: {}",
157-
idx, chunk_idx, e
158-
)));
159-
return;
160-
}
161-
}
162-
pending_count -= 1;
163-
}
164-
}
165-
166-
if !proof_buffer.is_empty() {
167-
yield Ok(proof_buffer.clone());
168-
proof_buffer.clear();
169-
}
170154
}
171-
}
155+
}).collect();
156+
157+
// Wait for all proofs to complete in parallel
158+
let proof_results = futures::future::join_all(proof_futures).await;
172159

173-
while let Some((idx, result)) = futures_ordered.next().await {
160+
// Process results and batch them into groups of MAX_PROOFS_PER_TX
161+
let mut proof_buffer = Vec::new();
162+
for (idx, result) in proof_results {
174163
match result {
175164
Ok((compressed_proof, new_root)) => {
176165
let instruction_data = InstructionDataAddressAppendInputs {
@@ -183,6 +172,7 @@ async fn stream_instruction_data<'a, R: Rpc>(
183172
};
184173
proof_buffer.push(instruction_data);
185174

175+
// Yield when we have MAX_PROOFS_PER_TX proofs ready
186176
if proof_buffer.len() >= MAX_PROOFS_PER_TX {
187177
yield Ok(proof_buffer.clone());
188178
proof_buffer.clear();
@@ -199,6 +189,7 @@ async fn stream_instruction_data<'a, R: Rpc>(
199189
}
200190
}
201191

192+
// Yield any remaining proofs
202193
if !proof_buffer.is_empty() {
203194
yield Ok(proof_buffer);
204195
}
@@ -249,27 +240,60 @@ fn get_all_circuit_inputs_for_chunk(
249240
for (batch_idx, leaves_hash_chain) in chunk_hash_chains.iter().enumerate() {
250241
let start_idx = batch_idx * batch_size as usize;
251242
let end_idx = start_idx + batch_size as usize;
243+
244+
let addresses_len = indexer_update_info.value.addresses.len();
245+
if start_idx >= addresses_len {
246+
return Err(ForesterUtilsError::Indexer(format!(
247+
"Insufficient addresses: batch {} requires start_idx {} but only {} addresses available",
248+
batch_idx, start_idx, addresses_len
249+
)));
250+
}
251+
let safe_end_idx = std::cmp::min(end_idx, addresses_len);
252+
if safe_end_idx - start_idx != batch_size as usize {
253+
return Err(ForesterUtilsError::Indexer(format!(
254+
"Insufficient addresses: batch {} requires {} addresses (indices {}..{}) but only {} available",
255+
batch_idx, batch_size, start_idx, end_idx, safe_end_idx - start_idx
256+
)));
257+
}
258+
252259
let batch_addresses: Vec<[u8; 32]> = indexer_update_info.value.addresses
253-
[start_idx..end_idx]
260+
[start_idx..safe_end_idx]
254261
.iter()
255262
.map(|x| x.address)
256263
.collect();
257264

265+
let proofs_len = indexer_update_info.value.non_inclusion_proofs.len();
266+
if start_idx >= proofs_len {
267+
return Err(ForesterUtilsError::Indexer(format!(
268+
"Insufficient non-inclusion proofs: batch {} requires start_idx {} but only {} proofs available",
269+
batch_idx, start_idx, proofs_len
270+
)));
271+
}
272+
let safe_proofs_end_idx = std::cmp::min(end_idx, proofs_len);
273+
if safe_proofs_end_idx - start_idx != batch_size as usize {
274+
return Err(ForesterUtilsError::Indexer(format!(
275+
"Insufficient non-inclusion proofs: batch {} requires {} proofs (indices {}..{}) but only {} available",
276+
batch_idx, batch_size, start_idx, end_idx, safe_proofs_end_idx - start_idx
277+
)));
278+
}
279+
258280
let mut low_element_values = Vec::new();
259281
let mut low_element_next_values = Vec::new();
260282
let mut low_element_indices = Vec::new();
261283
let mut low_element_next_indices = Vec::new();
262284
let mut low_element_proofs = Vec::new();
263285

264-
for proof in &indexer_update_info.value.non_inclusion_proofs[start_idx..end_idx] {
286+
for proof in &indexer_update_info.value.non_inclusion_proofs[start_idx..safe_proofs_end_idx]
287+
{
265288
low_element_values.push(proof.low_address_value);
266289
low_element_indices.push(proof.low_address_index as usize);
267290
low_element_next_indices.push(proof.low_address_next_index as usize);
268291
low_element_next_values.push(proof.low_address_next_value);
269292
low_element_proofs.push(proof.low_address_proof.to_vec());
270293
}
271294

272-
if create_hash_chain_from_slice(&batch_addresses)? != *leaves_hash_chain {
295+
let computed_hash_chain = create_hash_chain_from_slice(&batch_addresses)?;
296+
if computed_hash_chain != *leaves_hash_chain {
273297
return Err(ForesterUtilsError::Prover(
274298
"Addresses hash chain does not match".into(),
275299
));
@@ -323,6 +347,7 @@ pub async fn get_address_update_instruction_stream<'a, R: Rpc>(
323347
let (current_root, leaves_hash_chains, start_index, zkp_batch_size) = (
324348
merkle_tree_data.current_root,
325349
merkle_tree_data.leaves_hash_chains,
350+
// merkle_tree_data.batch_start_index,
326351
merkle_tree_data.next_index,
327352
merkle_tree_data.zkp_batch_size,
328353
);

forester-utils/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub struct ParsedMerkleTreeData {
2222
pub pending_batch_index: u32,
2323
pub num_inserted_zkps: u64,
2424
pub current_zkp_batch_index: u64,
25+
pub batch_start_index: u64,
2526
pub leaves_hash_chains: Vec<[u8; 32]>,
2627
}
2728

forester/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ solana-client = { workspace = true }
1212
solana-account-decoder = { workspace = true }
1313
solana-program = { workspace = true }
1414
account-compression = { workspace = true }
15+
light-account-checks = { workspace = true }
1516
light-batched-merkle-tree = { workspace = true }
1617
light-compressed-account = { workspace = true, features = ["std"] }
1718
light-system-program-anchor = { workspace = true, features = ["cpi"] }
@@ -52,6 +53,16 @@ scopeguard = "1.2.0"
5253
itertools = "0.14.0"
5354
num-bigint = { workspace = true }
5455

56+
# gRPC client for Photon queue subscriptions (match Photon versions)
57+
tonic = "0.14.2"
58+
prost = "0.14.1"
59+
prost-types = "0.14.1"
60+
tonic-prost = "0.14.2"
61+
tokio-stream = { version = "0.1", features = ["sync"] }
62+
63+
[build-dependencies]
64+
tonic-prost-build = "0.14.2"
65+
5566
[dev-dependencies]
5667
serial_test = { workspace = true }
5768
light-prover-client = { workspace = true, features = ["devenv"] }

forester/build.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
fn main() -> Result<(), Box<dyn std::error::Error>> {
2+
tonic_prost_build::configure().compile_protos(&["proto/photon.proto"], &["proto"])?;
3+
println!("cargo:rerun-if-changed=proto/photon.proto");
4+
5+
Ok(())
6+
}

0 commit comments

Comments
 (0)