Skip to content

Conversation

@sergeytimoshin
Copy link
Contributor

@sergeytimoshin sergeytimoshin commented Dec 18, 2025

Forester v2 processing pipeline

  • remove legacy polling, supervisors, and deprecated v1/v2 paths
  • split processor into explicit stages: job building, proof generation, tx sending
  • introduce address staging tree
  • add detailed metrics for proof latency and throughput

Prover

  • precompute and cache circuit inputs per batch
  • add proof cache keyed by batch + root
  • parallelize proof generation with bounded workers

Misc

  • add log-based plotting scripts for pipeline analysis
  • clean up dependencies and workspace versions
  • improve error messages and debug logging across forester

Summary by CodeRabbit

  • New Features

    • Multi-tree processing support with configurable queue polling modes (indexer-based or on-chain).
    • Address lookup table integration for optimized transaction structure.
    • Proof result caching with deduplication to reduce redundant computations.
    • New performance analytics tools for monitoring throughput and latency metrics.
  • Improvements

    • Enhanced transaction confirmation with configurable polling intervals and retry limits.
    • Memory-aware concurrency management for improved resource utilization.
    • Strengthened error handling with detailed validation across configuration and processing paths.
    • Improved health check diagnostics and error messaging.
  • Chores

    • Updated dependencies and consolidated workspace-managed versions.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 18, 2025

Caution

Review failed

The pull request is closed.

Code Review Summary

Walkthrough

This PR performs a substantial architectural overhaul of Forester's proof processing pipeline, introduces address lookup table support for versioned transactions, removes the queue polling actor system, consolidates error handling patterns, and adds memory-driven concurrency controls to the prover system.

Changes

Cohort / File(s) Summary
V2 Processor Architecture Rewrite
forester/src/processor/v2/*, forester/src/processor/v2/strategy/*
Complete restructuring of proof batching, job building, and transaction sending. Introduces QueueProcessor, TreeStrategy trait system, BatchJobBuilder pattern, proof caching with warmup phases, and per-tree processors. Removes old state supervisor and address batch processing. Adds ProcessingResult and CircuitMetrics for detailed timing.
Configuration & CLI Expansion
forester/src/cli.rs, forester/src/config.rs
Single tree_id replaced with tree_ids (Vec). New QueuePollingMode enum (Indexer/OnChain). Adds lookup_table_address, confirmation_max_attempts, confirmation_poll_interval_ms. Updates ForesterConfig constructors and validation with InvalidArguments error consolidation.
Queue Polling System Removal
forester/src/polling/*, forester/src/lib.rs
Entirely removes queue_poller actor and polling module exports. Removes TreeFinder component. Updates run_pipeline to remove associated polling logic.
Error Handling Hardening
forester/src/errors.rs, forester/src/forester_status.rs, forester/src/health_check.rs, forester/src/metrics.rs, forester/src/utils.rs, forester/src/rollover/operations.rs
Consolidates error variants (InvalidKeypair/InvalidPubkey → InvalidArguments). Replaces expect/unwrap patterns with ? operator and contextualized error handling via anyhow::Context. Updates metric creation and status fetching to log errors instead of panic.
Epoch Manager & Metrics
forester/src/epoch_manager.rs
Adds ProcessingMetrics and CircuitMetrics structs. Adds per-epoch metrics tracking and per-tree processor maps (StateBatchProcessorMap, AddressBatchProcessorMap). Integrates lookup table support. Updates WorkReport to include metrics.
Shutdown & Control Flow
forester/src/main.rs, forester/src/lib.rs, forester/src/slot_tracker.rs
Introduces unified spawn_shutdown_handler for Ctrl+C handling with force-exit on second press. Reworks SlotTracker to use Instant-based timing instead of SystemTime. Improves slot_tracker resilience with retry logic and explicit shutdown handling.
Prover System Enhancements
prover/server/server/queue.go, prover/server/server/queue_job.go, prover/server/server/server.go
Adds Redis-backed fair queuing per tree and per-input-hash deduplication with caching. Introduces memory-driven concurrency controls (MemoryPerProofGB, calculateConcurrency). Extends ProofJob with TreeID and BatchIndex. Implements job metadata storage and in-flight marker management.
Prover Client & Types
prover/client/src/*, prover/server/prover/common/circuit_utils.go
Introduces SubmitProofResult enum (Queued/Immediate). Updates ProofResult structure with proof_duration_ms field. Adds batch proof helpers returning ProofResult. Introduces ProofWithTiming for server responses. Adds new error variants (InvalidHexString, InvalidProofData, HashchainMismatch, ProofPatchFailed).
Testing Infrastructure
forester/tests/*, sdk-libs/program-test/*
Updates test helpers and e2e tests to reflect config changes (tree_ids, queue_polling_mode, lookup_table_address). Adds get_next_active_phase_with_time utility. Updates wait_for_work_report and execute_test_transactions signatures.
Proof Type Enhancements
prover/client/src/proof_types/batch_*/
Adds tree_id and batch_index optional fields to BatchAppendInputsJson, BatchAddressAppendInputsJson, BatchUpdateProofInputsJson with builder methods. Updates input constructors to accept slices instead of owned Vecs. Introduces proof patching and hashchain validation in address append proofs.
Indexer & SDK Updates
sdk-libs/client/src/indexer/*, sdk-libs/client/src/rpc/*, sdk-libs/photon-api/src/models/*
Removes low_element_proofs and queue_indices from AddressQueueData. Adds versioned transaction support (create_and_send_versioned_transaction) with address lookup table integration. Adds new Photon API models for V2 queue elements.
Build & Deployment
prover/server/publish_*.sh, forester/Cargo.toml, forester/.gitignore
Consolidates dependencies to workspace management (base64, serde_json, tokio, etc.). Adds publish_podman.sh and publish_prover.sh scripts. Adds .env.devnet to gitignore. Updates Go toolchain to 1.25.5.
Analysis & Diagnostics Scripts
forester/scripts/plot*.py
Introduces three new Python plotting scripts: plot.py (basic per-tx visualization), plot_enhanced.py (comprehensive latency/throughput analysis), plot_proof_pipeline.py (proof lifecycle Gantt charts).
Misc Robustness
forester/src/processor/tx_cache.rs, forester/src/processor/v1/helpers.rs, forester/src/queue_helpers.rs, forester/src/tree_data_sync.rs, forester/src/compressible/*
Adds periodic cleanup for tx_cache with deferred scheduling. Adds bounds checking in get_capped_priority_fee. Adds explicit offset calculation in queue_helpers. Adds filtered tree fetching with discriminators. Simplifies base64 decoding paths.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120+ minutes

Complexity drivers:

  • Processor V2 architectural rewrite: The complete restructuring of proof batching, job building, transaction sending, and the introduction of the TreeStrategy trait system requires careful review of correctness, ordering, and data flow consistency.

  • High heterogeneity: This PR combines architectural refactoring (processor v2), feature removal (queue polling), feature addition (versioned transactions, fair queuing, deduplication), error handling consolidation, and CLI changes—each requiring distinct reasoning.

  • New public APIs and traits: Multiple new public types (QueueProcessor, TreeStrategy, BatchJobBuilder, ProcessingMetrics, V2Error, ProofWithTiming, DeduplicationResult) introduce new contract points requiring validation.

  • Distributed logic changes: Changes span Rust (Forester, client, SDK) and Go (prover server), requiring cross-language understanding of deduplication, caching, and queuing semantics.

  • Error handling pattern migration: Widespread replacement of panics with contextualized error propagation requires verification that error paths don't mask or lose diagnostic information.

Areas requiring extra attention:

  • forester/src/processor/v2/processor.rs — The core QueueProcessor orchestration: verify root reconciliation logic, batch enqueuing, and caching strategies handle all edge cases (overlaps, gaps, stale data).
  • forester/src/processor/v2/strategy/state.rs and address.rs — TreeStrategy implementations: validate proof job building, alignment checks, and interleaving logic.
  • forester/src/epoch_manager.rs — Per-epoch metrics accumulation and lookup table threading: ensure metrics are properly tagged and isolated per epoch; verify lookup table propagation.
  • prover/server/server/queue.go — Fair queuing logic and deduplication: verify Redis operations are atomic where required; check for race conditions in in-flight marker transitions.
  • forester/src/config.rs — Config construction and validation: ensure all new fields are properly validated and errors are mapped consistently.
  • forester/src/slot_tracker.rs — Timing model migration: verify Instant-based calculations are correct and don't suffer from clock skew issues.
  • Cross-file consistency: Ensure v2Error, ProcessingMetrics, and CircuitMetrics are used consistently across processor, epoch_manager, and tx_sender.

Possibly related PRs

Suggested labels

ai-review, architecture, prover-server, processor-v2

Suggested reviewers

  • SwenSchaeferjohann

Poem

🔧 From queues polled once, now strategies bloom,
Versioned transactions light the room,
Metrics gathered, proofs cached with care,
Fair queues balance the proof-work air,
Errors now graceful, no panics to fear,
The pipeline evolves crystal clear! ✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 44.90% which is insufficient. The required threshold is 70.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes a major refactor and optimization of the forester processing pipeline, including removal of legacy code paths, restructured processors, and new metrics/caching mechanisms.

📜 Recent review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9dddeb8 and 9f72ef4.

⛔ Files ignored due to path filters (3)
  • Cargo.lock is excluded by !**/*.lock and included by none
  • Cargo.toml is excluded by none and included by none
  • xtask/src/fetch_accounts.rs is excluded by none and included by none
📒 Files selected for processing (11)
  • forester/README.md (1 hunks)
  • forester/src/cli.rs (4 hunks)
  • forester/src/compressible/bootstrap.rs (1 hunks)
  • forester/src/compressible/subscriber.rs (1 hunks)
  • forester/src/config.rs (16 hunks)
  • forester/src/epoch_manager.rs (42 hunks)
  • forester/src/processor/v2/processor.rs (1 hunks)
  • forester/src/tree_data_sync.rs (3 hunks)
  • sdk-libs/client/src/indexer/types.rs (0 hunks)
  • sdk-libs/program-test/src/program_test/rpc.rs (2 hunks)
  • sdk-libs/program-test/src/utils/load_accounts.rs (2 hunks)

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sergeytimoshin sergeytimoshin changed the title refactor: forester v2 refactor: optimize forester processing Dec 18, 2025
@sergeytimoshin sergeytimoshin marked this pull request as ready for review December 18, 2025 02:01
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 75

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (12)
prover/server/prover/common/proof_request_meta.go (1)

38-42: Fix: Incorrect comment.

The comment on line 38 says "Extract AddressTreeHeight" but this code block extracts treeHeight, not addressTreeHeight.

🔎 Apply this diff to correct the comment:
-	// Extract AddressTreeHeight
+	// Extract TreeHeight
 	treeHeight := uint32(0)
 	if height, ok := rawInput["treeHeight"].(float64); ok && height > 0 {
 		treeHeight = uint32(height)
prover/client/src/proof_types/batch_address_append/proof_inputs.rs (1)

186-571: Consider extracting validation logic into helper functions.

The get_batch_address_append_circuit_inputs function is 385 lines long and handles multiple responsibilities: hashchain validation, proof patching, low element updates, new element insertion, and multiple validation checks. This complexity makes it harder to test and maintain.

Consider extracting logical blocks into helper functions:

  • validate_hashchain(...) -> Result<(), ProverClientError>
  • process_low_element_update(...) -> Result<IntermediateRoot, ProverClientError>
  • process_new_element_insertion(...) -> Result<NewRoot, ProverClientError>
  • validate_address_ordering(...) -> Result<(), ProverClientError>

This would improve readability, testability, and make the main function's flow clearer.

forester/src/rollover/operations.rs (2)

102-107: Same issue: get_indexed_merkle_tree lacks error handling.

This call mirrors the issue at lines 63-68. The AddressV1 account fetch is guarded, but the subsequent tree parsing could panic on malformed data. For consistency with the V2 paths, this should return an error rather than panic.


398-399: Address TODO: verify cpi_context assignment.

This TODO has been here through the refactor. If there's uncertainty about whether old_cpi_context_pubkey is the correct value for cpi_context, it should be validated or tracked as an issue before this code handles production rollovers.

Would you like me to open an issue to track this verification, or can you confirm the correctness of this assignment?

program-libs/indexed-merkle-tree/src/reference.rs (2)

168-182: Another debug println! to remove.

🔎 Suggested fix
     pub fn append(
         &mut self,
         value: &BigUint,
         indexed_array: &mut IndexedArray<H, I>,
     ) -> Result<(), IndexedReferenceMerkleTreeError> {
-        println!("appending {:?}", value);
         let nullifier_bundle = indexed_array.append(value).unwrap();
         self.update(
             &nullifier_bundle.new_low_element,
             &nullifier_bundle.new_element,
             &nullifier_bundle.new_element_next_value,
         )?;
 
         Ok(())
     }

237-250: Comment may be misleading for V2 schema.

The doc comment on line 240 states the leaf hash is H(leaf_lower_range_value, leaf_next_index, leaf_higher_value), which is only true for V1. When V2 is used, next_index isn't included in the hash.

🔎 Suggested fix
 // TODO: check why next_index is usize while index is I
 /// We prove non-inclusion by:
 /// 1. Showing that value is greater than leaf_lower_range_value and less than leaf_higher_range_value
-/// 2. Showing that the leaf_hash H(leaf_lower_range_value, leaf_next_index, leaf_higher_value) is included in the root (Merkle tree)
+/// 2. Showing that the leaf_hash is included in the root (Merkle tree)
+///    - V1 schema: H(leaf_lower_range_value, leaf_next_index, leaf_higher_value)
+///    - V2 schema: H(leaf_lower_range_value, leaf_higher_value)
 #[derive(Debug)]
 pub struct NonInclusionProof {
forester/src/processor/tx_cache.rs (1)

90-103: Clarify the warning condition logic.

The warning at line 94 fires when an entry expires within 30 seconds. The condition !should_keep && age < Duration::from_secs(30) means:

  • The entry has exceeded its timeout (expired normally)
  • But it expired in less than 30 seconds

If the intent is to detect overly short timeouts, the message "after {:?} timeout" is misleading—the entry did time out as configured. This could spam logs if short timeouts are intentional.

🔎 Consider making the intent explicit:
             let should_keep = age < entry.timeout;
-            if !should_keep && age < Duration::from_secs(30) {
+            if !should_keep && entry.timeout < Duration::from_secs(30) {
                 warn!(
-                    "Removing cache entry {} after {:?} timeout (was: {:?})",
+                    "Cache entry {} has unusually short timeout: {:?}",
-                    hash, age, entry.timeout
+                    hash, entry.timeout
                 );
             }

Or if the current logic is intentional, clarify the message to indicate you're warning about short-lived entries specifically.

forester/src/processor/v1/helpers.rs (2)

349-364: Validate configuration at parse time rather than runtime.

The change from panic to warn+clamp improves resilience in the processing loop. However, silently clamping invalid configuration (max < min) at runtime can mask configuration errors. Users might not notice their settings are being adjusted, leading to unexpected fee behavior in production.

Add validation when parsing CapConfig to reject invalid configurations early during startup, before entering the processing loop. This maintains the defensive clamping here while ensuring configuration errors are caught immediately.

Example validation at config parse time

In your config parsing/construction code (outside this file), add:

impl CapConfig {
    pub fn validate(&self) -> Result<(), ConfigError> {
        if self.max_fee_lamports < self.min_fee_lamports {
            return Err(ConfigError::InvalidFeeRange {
                max: self.max_fee_lamports,
                min: self.min_fee_lamports,
            });
        }
        Ok(())
    }
}

Then call config.validate()? during initialization.


172-260: Remove dead code or convert to proper debug configuration.

The _debug variable is hardcoded to false, making this entire 88-line block unreachable. This includes validation logic (lines 251-259) that checks whether the indexer root exists in the onchain root history—a potentially valuable safety check.

Consider one of these approaches:

  1. Remove the block entirely if this validation is no longer needed
  2. Enable the validation permanently if it catches real issues (remove the if _debug wrapper)
  3. Convert to a runtime config flag (e.g., config.enable_root_validation) if optional debug logging is desired
forester/src/cli.rs (1)

363-416: Consider adding tests for QueuePollingMode parsing.

The existing tests cover ProcessorMode parsing but not the new QueuePollingMode. For completeness, consider adding similar tests to validate parsing of --queue-polling-mode indexer and --queue-polling-mode onchain.

Would you like me to generate test cases for QueuePollingMode parsing?

prover/client/src/proof_client.rs (1)

326-336: Duplicate constraint error check.

Lines 326-330 and 332-336 contain identical constraint error checks. This appears to be an accidental duplication:

🔎 Suggested fix
     fn should_retry(&self, error: &ProverClientError, retries: u32, elapsed: Duration) -> bool {
         let error_str = error.to_string();
 
         let is_constraint_error =
             error_str.contains("constraint") || error_str.contains("is not satisfied");
         if is_constraint_error {
             return false;
         }
 
-        let is_constraint_error =
-            error_str.contains("constraint") || error_str.contains("is not satisfied");
-        if is_constraint_error {
-            return false;
-        }
-
         let is_retryable_error = error_str.contains("job_not_found")
forester/src/epoch_manager.rs (1)

2040-2152: Handling of v2 processing errors and cache invalidation is mostly correct, but constraint errors fully drop processors

The new process_v2 integrates state and address QueueProcessors and associates each tree with a SharedProofCache. Constraint errors (via V2Error::is_constraint) drop the processor and its proof cache for that tree, forcing a full rebuild next tick. Hashchain mismatches simply clear the processor’s internal cache and continue, which is the right level of reaction. Other errors are logged and treated as transient, returning a zeroed ProcessingResult. This policy is reasonable, but be aware that dropping the processor on constraint errors also discards its cached staging-tree state; if that becomes frequent in production, we might want a narrower recovery strategy (e.g., per-batch invalidation) instead of full processor reset.

♻️ Duplicate comments (1)
sdk-libs/photon-api/src/models/_get_queue_elements_v2_post_request.rs (1)

44-54: Verify OpenAPI spec: Id enum has only a test variant.

Same issue as in the response model - the Id enum contains only TestAccount (serialized as "test-account"). This appears to be example/test code from the OpenAPI specification and may not be suitable for production use.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review continued from previous batch...

…_condvar` for improved signaling of background task completion
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
forester/src/epoch_manager.rs (1)

2741-2958: LUT loading path can abort the service on transient RPC failures

In run_service, the initial tree fetch has a robust retry loop that handles rpc_pool.get_connection() and fetch_trees failures with bounded backoff. By contrast, the lookup table loading block:

let address_lookup_tables = {
    let rpc = rpc_pool.get_connection().await?;
    match load_lookup_table_async(&*rpc, LOOKUP_TABLE_ADDRESS).await {
        Ok(lut) => { ... }
        Err(e) => { ... fallback to empty vec ... }
    }
};

uses ? on get_connection(). Any transient RPC‑pool failure here will bubble out of run_service immediately, bypassing your outer retry loop and bringing the forester down, even though you can operate in “legacy transactions” mode when the LUT is unavailable.

Consider treating connection errors here the same way you treat load_lookup_table_async errors: log and fall back to an empty LUT set instead of aborting, e.g.:

Proposed change to make LUT loading resilient
-            let address_lookup_tables = {
-                let rpc = rpc_pool.get_connection().await?;
-                match load_lookup_table_async(&*rpc, LOOKUP_TABLE_ADDRESS).await
-                {
+            let address_lookup_tables = {
+                match rpc_pool.get_connection().await {
+                    Ok(rpc) => match load_lookup_table_async(&*rpc, LOOKUP_TABLE_ADDRESS).await {
                         Ok(lut) => {
@@
-                        }
-                        Err(e) => {
+                        }
+                        Err(e) => {
                             warn!(
                                 "Failed to load lookup table {}: {:?}. Falling back to legacy transactions.",
                                 LOOKUP_TABLE_ADDRESS, e
                             );
                             Arc::new(Vec::new())
                         }
-                    }
+                    },
+                    Err(e) => {
+                        warn!(
+                            "Failed to get RPC connection for lookup table {}: {:?}. Falling back to legacy transactions.",
+                            LOOKUP_TABLE_ADDRESS, e
+                        );
+                        Arc::new(Vec::new())
+                    }
                 };

This keeps startup behavior consistent: inability to use LUTs should degrade gracefully rather than aborting the entire service.

♻️ Duplicate comments (9)
prover/server/server/metrics.go (2)

159-163: Consider exposing negative memory deltas instead of flooring to zero.

Flooring negative deltas to zero hides garbage collection activity. For capacity planning and debugging memory issues, knowing when GC reclaimed memory between start and end could be valuable. Consider either:

  • Recording the actual delta (positive or negative)
  • Adding a separate metric for GC-induced reductions
🔎 Option: Record actual delta without flooring
-	// Calculate memory delta (may be negative due to GC, use max with 0)
-	memDelta := int64(memStats.HeapAlloc) - int64(t.startHeapAlloc)
-	if memDelta < 0 {
-		memDelta = 0
-	}
+	// Calculate memory delta (may be negative due to GC)
+	memDelta := int64(memStats.HeapAlloc) - int64(t.startHeapAlloc)

Note: You'll need to adjust histogram buckets to include negative values if taking this approach.


168-172: Incomplete ProofPeakMemory tracking - gauge is never set.

The ProofPeakMemory gauge is retrieved but never updated with a value. The comment mentions "track via histogram max" but that doesn't update this gauge. Either implement the peak tracking or remove the dead code.

🔎 Option: Actually track peak memory
 	// Update peak memory if this is higher
-	currentPeak, _ := ProofPeakMemory.GetMetricWithLabelValues(t.circuitType)
-	if currentPeak != nil {
-		// Note: Gauge doesn't have a Get method, so we track via histogram max
-	}
+	// Track the current heap allocation as peak for this circuit type
+	// Note: This represents the heap size at proof completion, useful for capacity planning
+	ProofPeakMemory.WithLabelValues(t.circuitType).Set(float64(memStats.HeapAlloc))
forester/src/epoch_manager.rs (1)

2042-2157: Hashchain mismatch still doesn’t flush the shared proof cache

On both TreeType::StateV2 and TreeType::AddressV2 branches, the hashchain‑mismatch path logs “Clearing cache and retrying” and calls proc.clear_cache().await, but it never clears the corresponding SharedProofCache entry in self.proof_caches. That means any pre‑warmed or previously saved proofs for the old hashchain can still be pulled by try_send_cached_proofs, risking retries of obviously stale proofs for this tree.

Given you now have an explicit per‑tree SharedProofCache and a cached‑proof sending path, this asymmetry is more dangerous than before.

Suggestion: in both hashchain‑mismatch arms, clear the shared cache as well:

Proposed fix: clear SharedProofCache on hashchain mismatch
-                        } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
+                        } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
                             warn!(
@@
-                            proc.clear_cache().await;
+                            // Clear local processor state and the shared proof cache
+                            proc.clear_cache().await;
+                            if let Some(cache) = self.proof_caches.get(&tree_accounts.merkle_tree) {
+                                cache.clear().await;
+                            }
                             Ok(ProcessingResult::default())
@@
-                        } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
+                        } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
@@
-                            proc.clear_cache().await;
+                            proc.clear_cache().await;
+                            if let Some(cache) = self.proof_caches.get(&tree_accounts.merkle_tree) {
+                                cache.clear().await;
+                            }
                             Ok(ProcessingResult::default())
forester/src/processor/v2/tx_sender.rs (4)

101-167: OrderedProofBuffer refactor removes O(n) rotation and looks correct

The new OrderedProofBuffer replaces the previous rotate_left(1) scheme with a head index and logical base_seq. insert and pop_next now operate in O(1) time, and expected_seq() correctly tracks the next sequence number.

The pattern of calling pop_next() and then using expected_seq() - 1 to recover the just‑popped seq is safe: base_seq is incremented only when an item is present, so underflow can’t occur. This addresses the earlier performance concern around buffer rotation without introducing ordering bugs.


503-549: Proof caching on epoch end is sensible, but base root may lag the last sent tx

save_proofs_to_cache uses self.last_seen_root as the base root for cache.start_warming and for logging. That value is the initial root passed into TxSender::spawn; it isn’t updated from the sender task’s sender_last_root, which tracks the last root returned by the registry instructions.

Functionally this is fine because the cache tracks its own internal chain of (old_root, new_root) per proof, but it does mean:

  • The warm‑up root used for chain validation may be slightly stale compared to the last actually updated on‑chain root.
  • Logs like "Saved {} proofs to cache ... (root: {:?})" will show that stale base root, which might be confusing during investigations.

If you want the cache’s base root to reflect the latest processed root, consider wiring a small shared state (e.g., Arc<Mutex<[u8; 32]>> or similar) from the sender task back into TxSender and updating last_seen_root whenever sender_last_root is advanced.


252-318: _address_append_count is unused and can be dropped

_address_append_count is incremented in the BatchInstruction::AddressAppend arm, but never read and its leading underscore just suppresses warnings:

let mut _address_append_count = 0usize;
...
BatchInstruction::AddressAppend(proofs) => {
    _address_append_count += 1;
    ...
}

Since instr_type already falls back to "AddressAppend" when both append_count and nullify_count are zero, this counter isn’t contributing to behavior. You can remove it and simplify the loop a bit:

Simplify by removing the unused counter
-                let mut append_count = 0usize;
-                let mut nullify_count = 0usize;
-                let mut _address_append_count = 0usize;
+                let mut append_count = 0usize;
+                let mut nullify_count = 0usize;
@@
-                        BatchInstruction::AddressAppend(proofs) => {
-                            _address_append_count += 1;
+                        BatchInstruction::AddressAppend(proofs) => {
                             let ix_res = proofs

252-376: Sender early‑exit on NotInActivePhase loses “why” but preserves counts; consider also caching outstanding proofs

The spawned sender task returns (sender_processed, total_tx_sending_duration) as Ok even when it hits ForesterError::NotInActivePhase from send_transaction_batch, and the outer run method only sees the numbers, not the exit reason. That’s acceptable for now because run separately checks eligibility and handles the active‑phase end by caching proofs via save_proofs_to_cache.

Given this, the main functional issue is just style (and CI):

  • The turbofish Ok::<_, anyhow::Error> plus tuple return is what the lint is complaining about.
  • You don’t need the explicit turbofish here; type inference is clear from the surrounding Err(e).

A small style change should unblock lint:

Proposed fix for the lint failure and minor simplification
-                        warn!("tx error {} epoch {}", e, sender_context.epoch);
-                        if let Some(ForesterError::NotInActivePhase) =
-                            e.downcast_ref::<ForesterError>()
-                        {
-                            warn!("Active phase ended while sending tx, stopping sender loop");
-                            return Ok::<_, anyhow::Error>((sender_processed, total_tx_sending_duration));
-                        } else {
-                            return Err(e);
-                        }
+                        warn!("tx error {} epoch {}", e, sender_context.epoch);
+                        if let Some(ForesterError::NotInActivePhase) =
+                            e.downcast_ref::<ForesterError>()
+                        {
+                            warn!("Active phase ended while sending tx, stopping sender loop");
+                            return Ok((sender_processed, total_tx_sending_duration));
+                        }
+                        return Err(e);

This keeps behavior the same while satisfying the linter. If you later decide you also want to cache outstanding proofs in this path (similar to the explicit !is_still_eligible_at branch), you may want to propagate an explicit “exit reason” alongside the counts in TxSenderResult.

Also applies to: 481-500

forester/src/processor/v2/processor.rs (1)

521-548: Consider removing or documenting the unused _queue_type parameter.

As noted in a previous review, _queue_type is unused. If it's reserved for future differentiation of prewarm behavior by queue type, a brief doc comment would clarify intent. Otherwise, removing it simplifies the API.

forester/src/processor/v2/helpers.rs (1)

546-565: Missing zkp_batch_size == 0 guard causes division by zero.

While fetch_paginated_batches has a guard for zkp_batch_size == 0 (line 128), this function lacks the same protection. Line 555 computes page_size_elements = ADDRESS_PAGE_SIZE_BATCHES * zkp_batch_size which is safe, but line 561 logs total_elements / zkp_batch_size which will panic if zkp_batch_size is 0.

🔎 Suggested fix
 pub async fn fetch_streaming_address_batches<R: Rpc + 'static>(
     context: &BatchContext<R>,
     total_elements: u64,
     zkp_batch_size: u64,
 ) -> crate::Result<Option<StreamingAddressQueue>> {
     if total_elements == 0 {
         return Ok(None);
     }
+    if zkp_batch_size == 0 {
+        return Err(anyhow!("zkp_batch_size cannot be zero"));
+    }

     let page_size_elements = ADDRESS_PAGE_SIZE_BATCHES * zkp_batch_size;

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
forester/src/epoch_manager.rs (1)

1431-1453: Cached‑proof send path is now eligibility‑aware and root‑validated

The new V2 slot processing and cached‑proof pipeline hangs together well:

  • process_light_slot_v2 now:

    • Waits until the configured slot window,
    • Tries cached proofs first, timing their send and feeding the duration into update_metrics_and_counts,
    • Then falls back to polling the indexer via dispatch_tree_processing with a bounded poll interval.
  • try_send_cached_proofs:

    • Checks consecutive_eligibility_end against the current slot, so you no longer send cached work past the eligibility window.
    • Uses fetch_current_root plus SharedProofCache::take_if_valid so only root‑consistent, contiguous proofs are consumed.
    • Skips work while the cache is still warming.
  • send_cached_proofs_as_transactions:

    • Correctly uses proof.items to accumulate total_items,
    • Reconstructs the three batch instruction variants with try_to_vec and the same CPI helpers used elsewhere, so semantics stay aligned with the live path.

One nuance to be aware of: even when create_and_send_transaction returns an error, those proofs are permanently consumed and still counted in total_items. That’s acceptable if you interpret processed_items as “attempted work”, but if downstream consumers rely on it as “confirmed work”, you may eventually want to gate the increment on send success or introduce a separate metric for failed cached sends.

Also applies to: 1462-1477, 1481-1483, 1513-1538, 2334-2401, 2432-2519

forester/tests/e2e_test.rs (1)

423-441: New work‑report waiting and iteration‑driven test load are simple but slightly decouple from batch sizing

The updated flow:

  • Fetches protocol_config once and uses it to derive registration/active phase slots (good reuse vs re‑implementing phase math).
  • Drives load via a fixed iterations: usize = 100, passing this into execute_test_transactions and then into wait_for_work_report as expected_minimum_processed_items.
  • execute_test_transactions now just returns iterations, and wait_for_work_report stops as soon as the accumulated WorkReport.processed_items reaches or exceeds that threshold.

This is functionally fine—the test asserts that at least some reasonable amount of work was processed—but note that expected_minimum_processed_items is now decoupled from the actual tree batch sizes (output_queue_batch_size / *_zkp_batch_size). Given that a single iteration can generate multiple queue items across several trees, the threshold is fairly conservative.

If you want the e2e to more directly validate that the v2 pipeline can process at least one full output batch, you could instead derive expected_minimum_processed_items from state_tree_params (as in the legacy helper) or from the per‑tree zkp_batch_size exposed by the new v2 processor.

Also applies to: 463-487, 805-857, 859-880, 979-980

♻️ Duplicate comments (2)
forester/src/processor/v2/tx_sender.rs (1)

270-270: Remove unused variable tracking.

The _address_append_count variable is declared, incremented at line 308, but never read. Since it serves no purpose, remove both the declaration and the increment to clean up the code.

🔎 Proposed fix
-                let mut _address_append_count = 0usize;
...
                        BatchInstruction::AddressAppend(proofs) => {
-                            _address_append_count += 1;
                            let ix_res = proofs
forester/src/epoch_manager.rs (1)

2053-2166: Hashchain mismatch still doesn’t flush the shared proof cache

On both the StateV2 and AddressV2 branches in process_v2, the V2Error::is_hashchain_mismatch arm logs “Clearing cache and retrying.” but only calls proc.clear_cache().await. The per-tree SharedProofCache entry is left untouched:

} else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
    warn!(...);
    proc.clear_cache().await;
    Ok(ProcessingResult::default())
}

Given that:

  • You attach a SharedProofCache per tree via set_proof_cache, and
  • try_send_cached_proofs later pulls proofs from that same cache independent of the processor,

stale proofs built against the old hashchain can still be sent after a mismatch, even though the processor’s internal state was flushed. That’s exactly the scenario where those proofs are no longer valid.

You likely want to flush the shared proof cache as well in this branch. A minimal change would be:

Suggested fix: clear `SharedProofCache` on hashchain mismatch
-                        } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
+                        } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
                             warn!(
                                 "State processing hit hashchain mismatch for tree {}: {}. Clearing cache and retrying.",
                                 tree_accounts.merkle_tree,
                                 e
                             );
-                            proc.clear_cache().await;
+                            // Clear both the processor's internal cache and the shared proof cache
+                            proc.clear_cache().await;
+                            self.proof_caches.remove(&tree_accounts.merkle_tree);
                             Ok(ProcessingResult::default())

and analogously for the AddressV2 branch.

Alternatively, keep the SharedProofCache entry but call cache.clear().await instead of removing the map entry.


pub struct ProofResult {
pub proof: ProofCompressed,
pub public_inputs: Vec<[u8; 32]>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we dont return the public inputs we cannot verify the proof

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or debug public inputs in case that verification fails.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. but we have them in request, and if we request a proof of zkp batch of 1000 it wouldn't be very efficient to transfer the same data back and forth
  2. previous version of ProofResult was dead code btw, it was never used

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
forester/tests/e2e_test.rs (1)

464-487: Minor design question on returned iteration count.

The refactoring to use explicit iteration counts improves clarity. However, execute_test_transactions always returns the input iterations value (see line 980) since there are no early-exit paths. Consider whether the return value adds value, or if the function could be async fn execute_test_transactions(...) -> () for simplicity.

Optional simplification

If there's no plan for partial execution or early exits, consider simplifying:

 async fn execute_test_transactions<R: Rpc>(
     iterations: usize,
     // ... other params
-) -> usize {
+) {
     println!("Executing {} test transactions", iterations);
     // ... rest of implementation
-
-    iterations
 }

And update the call site:

-    let test_iterations = execute_test_transactions(
+    execute_test_transactions(
         iterations,
         // ... other params
     )
     .await;
 
     wait_for_work_report(
         &mut work_report_receiver,
         &state_tree_params,
-        test_iterations,
+        iterations,
     )
     .await;
forester/src/epoch_manager.rs (1)

1042-1149: Finalize-registration guard correctly avoids late finalization, but errors are surfaced as generic anyhow

The additional check in wait_for_active_phase that re-reads current_slot and bails out if the active phase already ended before sending FinalizeRegistration avoids spurious failures late in the epoch, which is important for stability. You return an anyhow! error with a descriptive message; this is acceptable but note that callers have no structured way to distinguish “too late to finalize” from other errors. If you later want to react differently to this condition (e.g., treat as a clean skip rather than error), consider introducing a dedicated error variant instead of a bare anyhow!.

♻️ Duplicate comments (1)
forester/src/epoch_manager.rs (1)

2050-2155: Hashchain-mismatch should also clear SharedProofCache to avoid stale proofs

In process_v2’s hashchain-mismatch branches (both StateV2 and AddressV2), you log “Clearing cache and retrying.” Currently this calls only proc.clear_cache().await on the QueueProcessor, which drops cached staging state but leaves the corresponding SharedProofCache entry untouched.

Because that shared cache can still hold proofs built against the old hashchain, try_send_cached_proofs may later consider them when the on-chain root realigns, risking reuse of stale proofs for a logically different queue history.

You already have the cache handle in scope; you should clear it alongside the processor-local cache.

The concern builds on the earlier review note that highlighted the same gap.
Suggested fix: clear `SharedProofCache` on hashchain mismatch
@@ async fn process_v2(
-                        } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
+                        } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
                             warn!(
                                 "State processing hit hashchain mismatch for tree {}: {}. Clearing cache and retrying.",
                                 tree_accounts.merkle_tree,
                                 e
                             );
-                            proc.clear_cache().await;
+                            // Clear both processor-local state and the shared proof cache so that
+                            // no proofs tied to the old hashchain can be reused.
+                            proc.clear_cache().await;
+                            cache.clear().await;
                             Ok(ProcessingResult::default())
@@
-                        } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
+                        } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
                             warn!(
                                 "Address processing hit hashchain mismatch for tree {}: {}. Clearing cache and retrying.",
                                 tree_accounts.merkle_tree,
                                 e
                             );
-                            proc.clear_cache().await;
+                            proc.clear_cache().await;
+                            cache.clear().await;
                             Ok(ProcessingResult::default())

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
forester/src/cli.rs (1)

386-440: Unit tests cover ProcessorMode but not QueuePollingMode.

The existing tests validate ProcessorMode parsing. Consider adding similar tests for QueuePollingMode to ensure the CLI parsing works correctly.

🔎 Suggested test additions
#[test]
fn test_queue_polling_mode_parsing() {
    // Test indexer mode
    let args = StartArgs::try_parse_from([
        "forester",
        "--queue-polling-mode", "indexer",
        "--rpc-url", "http://test.com",
        "--payer", "[1,2,3]",
        "--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]"
    ]).unwrap();
    assert_eq!(args.queue_polling_mode, QueuePollingMode::Indexer);

    // Test onchain mode
    let args = StartArgs::try_parse_from([
        "forester",
        "--queue-polling-mode", "onchain",
        "--rpc-url", "http://test.com",
        "--payer", "[1,2,3]",
        "--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]"
    ]).unwrap();
    assert_eq!(args.queue_polling_mode, QueuePollingMode::OnChain);

    // Test default (indexer)
    let args = StartArgs::try_parse_from([
        "forester",
        "--rpc-url", "http://test.com",
        "--payer", "[1,2,3]",
        "--derivation", "[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]"
    ]).unwrap();
    assert_eq!(args.queue_polling_mode, QueuePollingMode::Indexer);
}

#[test]
fn test_queue_polling_mode_display() {
    assert_eq!(QueuePollingMode::Indexer.to_string(), "indexer");
    assert_eq!(QueuePollingMode::OnChain.to_string(), "onchain");
}
forester/tests/e2e_test.rs (1)

1312-1323: Add retry logic and batch readiness check before compress operations.

Error 0x37dd (14301, BatchNotReady) occurs when a batch isn't in the correct state for ZKP computation or tree insertion. The batch state machine requires: Fill (accepting elements) → Full (ready for ZKP) → Inserted (ZKP processed). The compress call occurs before the forester has computed ZKP batches—ensure num_full_zkp_batches > num_inserted_zkp_batches is satisfied. Implement retry logic with delays (max 3 attempts, 5s interval) before compress, similar to the known mitigation for batched tree transfer test flakiness referenced in CLAUDE.md.

♻️ Duplicate comments (3)
forester/src/processor/v2/processor.rs (1)

520-547: Unused _queue_type parameter.

This was flagged in a previous review. The parameter is marked unused but may be intended for future queue-type-specific prewarm behavior. If this is intentional, add a brief comment explaining the planned use.

forester/src/epoch_manager.rs (2)

489-577: Forward-scanning epoch registration logic is correct.

The loop properly handles all three scenarios: waiting for registration to start, catching open registration windows, and advancing past closed windows. The REGISTRATION_BUFFER_SLOTS = 30 provides adequate margin for transaction propagation.

As noted in a past review, this complex state machine would benefit from targeted unit tests covering edge cases like joining after missing multiple epochs. That remains a good suggestion for test coverage.


2088-2095: Hashchain mismatch handling: SharedProofCache still not explicitly cleared.

As flagged in a previous review, the hashchain mismatch branches call proc.clear_cache().await but don't clear the SharedProofCache (cache). While try_send_cached_proofs does validate cached proofs against the current on-chain root via take_if_valid(&current_root), which provides some protection, explicitly clearing the proof cache here would be more defensive:

 } else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
     warn!(
         "State processing hit hashchain mismatch for tree {}: {}. Clearing cache and retrying.",
         tree_accounts.merkle_tree,
         e
     );
     proc.clear_cache().await;
+    // Also clear proof cache to avoid reusing proofs built against old hashchain
+    if let Some(proof_cache) = self.proof_caches.get(&tree_accounts.merkle_tree) {
+        proof_cache.clear().await;
+    }
     Ok(ProcessingResult::default())

The root validation in take_if_valid should catch most stale proofs, but an explicit clear removes any ambiguity.

Also applies to: 2138-2145

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
forester/src/epoch_manager.rs (3)

191-214: Work reporting separation (channel vs on‑chain) is a good design choice.

Storing per‑epoch metrics in processing_metrics_per_epoch, then always emitting a WorkReport over work_report_sender while gating report_work_onchain on the phase window, cleanly decouples observability/testing from on‑chain side effects. The additional early exits when work is already reported avoid duplicate on‑chain ixs.

I’d only consider one refinement: on report_work_onchain failure paths where you map registry errors into WorkReportError, think about whether you also want to send a final WorkReport that flags the failure explicitly (e.g., via logs/metrics) so operators can correlate missed on‑chain reports with off‑chain processing.

Also applies to: 225-238, 242-279, 777-789, 2558-2655


461-583: Epoch monitoring and registration flow improvements look solid but remain timing‑sensitive.

The updated monitor_epochs / process_current_and_previous_epochs logic plus register_for_epoch_with_retry’s pre‑registration wait address the earlier concerns about skipping epochs and racing the registration window. The forward scan over target_epoch with a registration buffer and the explicit “already registered” check in process_current_and_previous_epochs both reduce spurious errors.

One nuance: in the forward‑scan loop you still rely on the original slot value for some decisions (slot < target_phases.registration.end), so in extreme cases the registration window could close between get_current_slot_and_epoch and that check, leading to a late attempt that will simply be rejected downstream. That’s not a correctness bug (the downstream path handles RegistrationPhaseEnded), but if you want to be extra defensive you could re‑read the current slot before sending and skip epochs whose registration has already closed.

Also applies to: 632-692, 838-846, 1067-1068


1430-1474: V2 light‑slot processing loop and cached‑proof sending are generally robust.

The V2 process_light_slot_v2 flow now:

  • Waits until the slot’s start.
  • Attempts to send cached proofs first (guarded by consecutive_eligibility_end and cache.is_warming()).
  • Uses a short poll interval and eligibility checks via check_forester_eligibility.
  • Delegates actual queue work to dispatch_tree_processing, then updates metrics and item counts.

This is a good structure: cached proofs are opportunistic but bounded by eligibility, and live processing reuses the same dispatch_tree_processing abstraction as v1. The metrics updates for both cached and live sends use real elapsed durations, so latency/throughput stats should be meaningful now.

One thing to keep in mind: try_send_cached_proofs only checks the eligibility window at the start; if a cached‑send chunk runs long you may technically cross consecutive_eligibility_end mid‑flight. Given Solana timing and your batching size this is probably acceptable, but if eligibility is extremely strict you could also recheck before each tx send in send_cached_proofs_as_transactions.

Also applies to: 1481-1545, 1600-1637

forester/src/config.rs (1)

85-115: GeneralConfig’s multi‑tree and queue polling mode support are well‑structured.

Switching from a single tree_id to tree_ids: Vec<Pubkey> in GeneralConfig, with:

  • StartArgs.tree_ids: Vec<String>
  • Validation in new_for_start that partitions valid vs. invalid IDs and fails fast on any invalid input
  • run_service filtering fetched trees against general_config.tree_ids

is a big improvement over the previous silent‑drop behavior for malformed keys. Likewise, carrying queue_polling_mode: QueuePollingMode into config and defaulting to Indexer for start and OnChain for status should make future selection of polling backends straightforward.

Nice touch keeping default/test constructors (default, test_address_v2, test_state_v2, new_for_status) all initializing tree_ids to empty and queue polling modes explicitly, so tests don’t rely on implicit defaults.

Also applies to: 118-150, 298-328, 409-420

♻️ Duplicate comments (5)
sdk-libs/program-test/src/program_test/rpc.rs (2)

343-343: Add documentation explaining the LiteSVM limitation.

While the unimplemented!() approach correctly reflects that this test harness cannot support versioned transactions, a doc comment would clarify why and guide users who need this functionality.

🔎 Proposed documentation
+    /// Note: LiteSVM does not support V0 message compilation or address lookup tables.
+    /// Tests requiring versioned transactions must use `LightClient` against a real RPC endpoint.
     async fn create_and_send_versioned_transaction<'a>(

350-352: Fix the formatting to satisfy the linter.

The multi-line formatting of unimplemented!() is causing pipeline failures. The linter requires this macro call to be on a single line.

🔎 Proposed fix
-        unimplemented!(
-            "create_and_send_versioned_transaction is unimplemented for LightProgramTest"
-        );
+        unimplemented!("create_and_send_versioned_transaction is unimplemented for LightProgramTest")
forester/src/processor/v2/processor.rs (1)

527-527: Unused _queue_type parameter remains

The _queue_type parameter is still unused. This was flagged in a previous review. If it's reserved for future queue-specific prewarm behavior, a brief comment would clarify intent. Otherwise, consider removing it.

forester/src/epoch_manager.rs (1)

1882-1943: V2 processor reuse and error handling are mostly correct; hashchain mismatch still leaves proof cache stale.

The new build_batch_context + get_or_create_state_processor / get_or_create_address_processor helpers are well‑structured:

  • Reuse QueueProcessor instances across epochs keyed by tree pubkey, updating the epoch/phases in place to preserve cached state.
  • Cache zkp_batch_size per tree for threshold filtering elsewhere.
  • Avoid holding DashMap guards across .await points by cloning the Arc before locking.

process_v2 correctly wires processors and proof caches per tree and distinguishes:

  • Constraint errors → drop processor + proof cache entry, forcing a clean rebuild.
  • Hashchain mismatches → clear the processor’s internal cache and treat this tick as a no‑op.
  • Other errors → log + retry on the next tick without tearing down state.

One remaining gap (called out previously) is that hashchain mismatch branches only call proc.clear_cache().await and do not clear the shared SharedProofCache entry for that tree. While try_send_cached_proofs now validates proofs against the current root and will skip stale chains, leaving them in the cache:

  • Wastes memory for proofs that are no longer usable.
  • Can lead to surprising behavior if future code ever relaxes the root check or adds different cache consumers.

Consider also clearing the shared proof cache on hashchain mismatch:

} else if is_v2_error(&e, V2Error::is_hashchain_mismatch) {
    warn!(/* existing log */);
    proc.clear_cache().await;
    self.proof_caches.remove(&tree_accounts.merkle_tree);
    Ok(ProcessingResult::default())
}

(or cache.clear().await if you prefer to keep the entry but drop contents).

This keeps the per‑tree state fully in sync after a detected inconsistency.

Also applies to: 1945-2047, 2050-2165

forester/README.md (1)

63-74: enable‑compressible is now documented; consider also exposing the env var.

The new --enable-compressible row correctly reflects the default (false) and the requirement for --ws-rpc-url, matching the CLI and config gating.

To fully address operator discoverability (and align with the earlier review), I’d recommend also mentioning the corresponding environment variable, e.g.:

| `--enable-compressible` | `FORESTER_ENABLE_COMPRESSIBLE` | Enable compressible account tracking and compression (requires `--ws-rpc-url`, default: false) |

either in the “Required Configuration” style table or a short note in the Environment Variables section.

Comment on lines +131 to +145
#[arg(
long,
env = "FORESTER_CONFIRMATION_MAX_ATTEMPTS",
default_value = "60",
help = "Maximum attempts to confirm a transaction before timing out"
)]
pub confirmation_max_attempts: u32,

#[arg(
long,
env = "FORESTER_CONFIRMATION_POLL_INTERVAL_MS",
default_value = "500",
help = "Interval between confirmation polling attempts in milliseconds"
)]
pub confirmation_poll_interval_ms: u64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Confirmation tuning knobs look correct; consider documenting defaults.

The new confirmation_max_attempts / confirmation_poll_interval_ms flags are wired cleanly (types match config/BatchContext), and the defaults (60 attempts, 500ms) are reasonable. I’d suggest adding them to the README’s configuration table so operators understand how to tune confirmation latency vs. timeout for v2 sends.

🤖 Prompt for AI Agents
In forester/src/cli.rs around lines 131 to 145, add documentation for the two
new CLI flags to the README configuration table: include entries for
confirmation_max_attempts (default 60) and confirmation_poll_interval_ms
(default 500 ms), describe their meanings (max polling attempts before timeout
and poll interval in milliseconds) and note how to tune them to trade off
confirmation latency vs overall timeout; ensure the README shows the env var
names (FORESTER_CONFIRMATION_MAX_ATTEMPTS,
FORESTER_CONFIRMATION_POLL_INTERVAL_MS), types, and example values.

Comment on lines 247 to +276
#[arg(
long,
env = "FORESTER_TREE_ID",
help = "Process only the specified tree (Pubkey). If specified, forester will process only this tree and ignore all others"
env = "FORESTER_QUEUE_POLLING_MODE",
default_value_t = QueuePollingMode::Indexer,
help = "Queue polling mode: indexer (poll indexer API, requires indexer_url), onchain (read queue status directly from RPC)"
)]
pub queue_polling_mode: QueuePollingMode,

#[arg(
long = "tree-id",
env = "FORESTER_TREE_IDS",
help = "Process only the specified trees (Pubkeys). Can be specified multiple times. If specified, forester will process only these trees and ignore all others",
value_delimiter = ','
)]
pub tree_id: Option<String>,
pub tree_ids: Vec<String>,

#[arg(
long,
env = "FORESTER_ENABLE_COMPRESSIBLE",
help = "Enable compressible account tracking and compression using ws_rpc_url",
help = "Enable compressible account tracking and compression using ws_rpc_url (requires --ws-rpc-url)",
default_value = "false"
)]
pub enable_compressible: bool,

#[arg(
long,
env = "FORESTER_LOOKUP_TABLE_ADDRESS",
help = "Address lookup table pubkey for versioned transactions. If not provided, legacy transactions will be used."
)]
pub lookup_table_address: Option<String>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Tree selection and lookup table CLI surface is solid; double‑check UX around FORESTER_TREE_IDS.

The tree_ids argument + FORESTER_TREE_IDS env (multi‑use + comma‑delimited) combined with lookup_table_address gives a nice, explicit control surface. Given config now hard‑fails on invalid tree IDs, this is safer than the previous silent‑drop behavior.

One small UX suggestion: call out FORESTER_TREE_IDS and FORESTER_LOOKUP_TABLE_ADDRESS explicitly in the README so operators don’t have to infer env var names from the generic FORESTER_<OPTION> rule.

🤖 Prompt for AI Agents
In forester/src/cli.rs around lines 247 to 276: update the project README (or
operator docs) to explicitly document the FORESTER_TREE_IDS and
FORESTER_LOOKUP_TABLE_ADDRESS environment variables (rather than relying on the
generic FORESTER_<OPTION> rule); note that FORESTER_TREE_IDS accepts multiple
values either via repeated flags or a comma-delimited list, and that invalid
tree IDs now cause a hard-fail; also document FORESTER_LOOKUP_TABLE_ADDRESS
purpose (optional Address Lookup Table pubkey, falls back to legacy transactions
when unset) so operators can find these settings without inferring names.

Comment on lines +355 to +365
/// Queue polling mode determines how the forester discovers pending queue items.
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub enum QueuePollingMode {
/// Poll the indexer API for queue status (requires indexer_url)
#[clap(name = "indexer")]
#[default]
Indexer,
/// Read queue status directly from on-chain accounts via RPC
#[clap(name = "onchain")]
OnChain,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

QueuePollingMode enum is well‑designed; add minimal parsing tests.

The QueuePollingMode ValueEnum + Display impls are idiomatic and match the CLI help text (indexer / onchain) and GeneralConfig defaults. To prevent regressions, it’d be useful to add a small test (similar to test_processor_mode_parsing) that asserts:

  • --queue-polling-mode indexerQueuePollingMode::Indexer
  • --queue-polling-mode onchainQueuePollingMode::OnChain
  • Default (no flag) → Indexer
  • QueuePollingMode::Indexer.to_string() == "indexer" etc.

This catches any accidental renames or clap attribute changes.

Also applies to: 377-384

🤖 Prompt for AI Agents
In forester/src/cli.rs around lines 355-365 (and also apply same approach for
377-384), add a unit test that verifies parsing and Display for
QueuePollingMode: assert that parsing CLI args `--queue-polling-mode indexer`
yields QueuePollingMode::Indexer, `--queue-polling-mode onchain` yields
QueuePollingMode::OnChain, the default (no flag) yields
QueuePollingMode::Indexer, and that QueuePollingMode::Indexer.to_string() ==
"indexer" and QueuePollingMode::OnChain.to_string() == "onchain"; implement the
test similarly to the existing test_processor_mode_parsing by constructing the
CLI parser (or using clap::Command/try_get_matches_from or the crate's Parser)
and asserting the enum value and its string representation.

Comment on lines +61 to +63
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider making RPC timeout configurable.

The 30-second timeout may be too aggressive for high-latency networks or rate-limited RPC endpoints. Consider extracting this as a configuration parameter.

💡 Example: configurable timeout
pub async fn fetch_trees_filtered_with_timeout(
    rpc_url: &str, 
    timeout_secs: u64
) -> Result<Vec<TreeAccounts>> {
    let client = reqwest::Client::builder()
        .timeout(std::time::Duration::from_secs(timeout_secs))
        .build()?;
    // ... rest of implementation
}

Then in the main fetch_trees_filtered:

pub async fn fetch_trees_filtered(rpc_url: &str) -> Result<Vec<TreeAccounts>> {
    const DEFAULT_TIMEOUT_SECS: u64 = 30;
    fetch_trees_filtered_with_timeout(rpc_url, DEFAULT_TIMEOUT_SECS).await
}
🤖 Prompt for AI Agents
In forester/src/tree_data_sync.rs around lines 61 to 63, the reqwest client is
built with a hardcoded 30-second timeout which should be configurable; change
the API to accept a timeout parameter (or read from a config struct) and use
that value when constructing Client::builder().timeout(...), provide a
convenience wrapper that keeps the existing function signature by calling the
new function with a DEFAULT_TIMEOUT_SECS constant, and update all call sites to
pass a configurable timeout or use the wrapper so high-latency or rate-limited
RPC endpoints can increase the timeout without changing the code.

Comment on lines +82 to +92
match batched_result {
Ok(accounts) => {
debug!("Fetched {} batched tree accounts", accounts.len());
for (pubkey, mut account) in accounts {
// Try state first, then address
if let Ok(tree) = process_batch_state_account(&mut account, pubkey) {
all_trees.push(tree);
} else if let Ok(tree) = process_batch_address_account(&mut account, pubkey) {
all_trees.push(tree);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Silent deserialization failures may hide data corruption.

When processing batched trees, if both process_batch_state_account and process_batch_address_account fail, the account is silently skipped with no logging. This differs from the V1 processing paths (lines 105-108, 121-123) and could mask issues like:

  • Data corruption in on-chain accounts
  • Version mismatches between client and program
  • Incorrect discriminator filtering
🔎 Add debug logging for failed batch accounts
             for (pubkey, mut account) in accounts {
                 // Try state first, then address
                 if let Ok(tree) = process_batch_state_account(&mut account, pubkey) {
                     all_trees.push(tree);
                 } else if let Ok(tree) = process_batch_address_account(&mut account, pubkey) {
                     all_trees.push(tree);
+                } else {
+                    debug!(
+                        "Failed to deserialize batched tree account {} as either state or address",
+                        pubkey
+                    );
                 }
             }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
match batched_result {
Ok(accounts) => {
debug!("Fetched {} batched tree accounts", accounts.len());
for (pubkey, mut account) in accounts {
// Try state first, then address
if let Ok(tree) = process_batch_state_account(&mut account, pubkey) {
all_trees.push(tree);
} else if let Ok(tree) = process_batch_address_account(&mut account, pubkey) {
all_trees.push(tree);
}
}
match batched_result {
Ok(accounts) => {
debug!("Fetched {} batched tree accounts", accounts.len());
for (pubkey, mut account) in accounts {
// Try state first, then address
if let Ok(tree) = process_batch_state_account(&mut account, pubkey) {
all_trees.push(tree);
} else if let Ok(tree) = process_batch_address_account(&mut account, pubkey) {
all_trees.push(tree);
} else {
debug!(
"Failed to deserialize batched tree account {} as either state or address",
pubkey
);
}
}
🤖 Prompt for AI Agents
In forester/src/tree_data_sync.rs around lines 82 to 92, the batched-account
processing currently silently skips accounts when both
process_batch_state_account and process_batch_address_account fail; add a debug
(or warn) log when both deserializations fail that includes the account pubkey
and both error messages (or a combined error) so failures are visible; modify
the match arm to capture the Err results from each attempt and log a clear
message like "failed to deserialize batched account" with pubkey and error
details before continuing.

Comment on lines +158 to +161
{
"encoding": "base64",
"commitment": "confirmed",
"filters": [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Document rationale for "confirmed" commitment level.

The choice of "confirmed" over "finalized" trades finality guarantees for faster response times. Consider adding a comment explaining this trade-off, especially since foresters process on-chain state that could potentially be reorganized.

💡 Example comment
             {
                 "encoding": "base64",
+                // Use "confirmed" for faster responses; tree accounts are
+                // relatively static and reorg risk is acceptable for discovery
                 "commitment": "confirmed",
🤖 Prompt for AI Agents
In forester/src/tree_data_sync.rs around lines 158 to 161, the RPC subscription
uses the "confirmed" commitment but lacks any comment explaining the trade-off;
add a concise inline comment above this configuration stating that "confirmed"
was chosen to reduce latency at the expense of finality (risk of chain reorgs),
and list the mitigation/operational choices (e.g., re-checking or reprocessing
on "finalized" notifications, waiting additional confirmations, or accepting
occasional rollbacks) so future readers understand why confirmed is used and how
reorgs are handled or can be handled.

@sergeytimoshin sergeytimoshin merged commit 0c4e241 into main Dec 19, 2025
44 checks passed
@sergeytimoshin sergeytimoshin deleted the sergey/forester-r3 branch December 19, 2025 12:30
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants