Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ install-node: ## Installs node
install-validator: ## Installs validator
cargo install --path bin/validator --locked

.PHONY: install-ntx-builder
install-ntx-builder: ## Installs ntx-builder
cargo install --path bin/ntx-builder --locked

.PHONY: install-remote-prover
install-remote-prover: ## Install remote prover's CLI
cargo install --path bin/remote-prover --bin miden-remote-prover --locked
Expand Down
166 changes: 70 additions & 96 deletions bin/ntx-builder/src/actor/execute.rs

Large diffs are not rendered by default.

264 changes: 128 additions & 136 deletions bin/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::sync::{Notify, Semaphore, mpsc};

use crate::NoteError;
use crate::chain_state::{ChainState, SharedChainState};
use crate::clients::{BlockProducerClient, StoreClient, ValidatorClient};
use crate::clients::RpcClient;
use crate::db::Db;

// ACTOR REQUESTS
Expand All @@ -40,7 +40,8 @@ pub enum ActorRequest {
block_num: BlockNumber,
ack_tx: tokio::sync::oneshot::Sender<()>,
},
/// A note script was fetched from the remote store and should be persisted to the local DB.
/// A note script was fetched from the remote RPC service and should be persisted to the local
/// DB.
CacheNoteScript { script_root: Word, script: NoteScript },
}

Expand All @@ -50,12 +51,8 @@ pub enum ActorRequest {
/// gRPC clients used by an account actor to interact with the node's services.
#[derive(Clone)]
pub struct GrpcClients {
/// Client for interacting with the store in order to load account state.
pub store: StoreClient,
/// Client for interacting with the block producer.
pub block_producer: BlockProducerClient,
/// Client for interacting with the validator.
pub validator: ValidatorClient,
/// Client for interacting with the RPC service in order to load account state.
pub rpc: RpcClient,
/// Client for remote transaction proving. If `None`, transactions will be proven locally, which
/// is undesirable due to the performance impact.
pub prover: Option<RemoteTransactionProver>,
Expand All @@ -68,7 +65,7 @@ pub struct State {
pub db: Db,
/// The latest chain state. A single chain state is shared among all actors.
pub chain: Arc<SharedChainState>,
/// Shared LRU cache for storing retrieved note scripts to avoid repeated store calls.
/// Shared LRU cache for storing retrieved note scripts to avoid repeated RPC calls.
pub script_cache: LruCache<Word, NoteScript>,
}

Expand All @@ -84,8 +81,8 @@ pub struct ActorConfig {
/// Maximum number of VM execution cycles for network transactions.
pub max_cycles: u32,
/// Initial sleep applied between per-request retries on transient infrastructure failures
/// (prover unreachable, validator/block-producer transport error, store gRPC hiccup). Doubles
/// each retry up to [`Self::request_backoff_max`].
/// (prover unreachable, RPC transport error, RPC gRPC hiccup). Doubles each retry up to
/// [`Self::request_backoff_max`].
pub request_backoff_initial: Duration,
/// Upper bound on the per-request retry backoff sleep.
pub request_backoff_max: Duration,
Expand Down Expand Up @@ -115,7 +112,7 @@ impl AccountActorContext {
use url::Url;

use crate::chain_state::SharedChainState;
use crate::clients::StoreClient;
use crate::clients::RpcClient;
use crate::test_utils::mock_block_header;

let url = Url::parse("http://127.0.0.1:1").unwrap();
Expand All @@ -126,9 +123,11 @@ impl AccountActorContext {

Self {
clients: GrpcClients {
store: StoreClient::new(url.clone()),
block_producer: BlockProducerClient::new(url.clone()),
validator: ValidatorClient::new(url),
rpc: RpcClient::new(
url.clone(),
Duration::from_millis(100),
Duration::from_secs(30),
),
prover: None,
},
state: State {
Expand Down Expand Up @@ -182,8 +181,7 @@ enum ActorMode {
///
/// 1. **Initialization**: Waits for committed account state, then checks DB for available notes.
/// 2. **Event Loop**: Continuously processes mempool events and executes transactions.
/// 3. **Transaction Processing**: Selects, executes, and proves transactions, and submits them to
/// block producer.
/// 3. **Transaction Processing**: Selects, executes, proves, and submits transactions through RPC.
/// 4. **State Updates**: Event effects are persisted to DB by the coordinator before actors are
/// notified.
/// 5. **Shutdown**: Terminates gracefully on idle timeout, or returns an error on unrecoverable
Expand Down Expand Up @@ -432,8 +430,8 @@ impl AccountActor {
///
/// Returns the new actor mode based on the execution result.
///
/// Transient infrastructure failures (prover unreachable, validator/block-producer transport
/// hiccup, store gRPC error) are retried inside [`execute::NtxContext::execute_transaction`].
/// Transient infrastructure failures (prover unreachable, RPC transport hiccup, RPC gRPC
/// error) are retried inside [`execute::NtxContext::execute_transaction`].
/// Any error reaching this method is therefore terminal for the candidate: the batch's notes
/// are marked failed and the actor moves on.
#[tracing::instrument(name = "ntx.actor.execute_transactions", skip(self, tx_candidate))]
Expand All @@ -446,10 +444,8 @@ impl AccountActor {

// Execute the selected transaction.
let context = execute::NtxContext::new(
self.clients.block_producer.clone(),
self.clients.validator.clone(),
self.clients.prover.clone(),
self.clients.store.clone(),
self.clients.rpc.clone(),
self.state.script_cache.clone(),
self.state.db.clone(),
self.config.max_cycles,
Expand Down Expand Up @@ -519,7 +515,7 @@ impl AccountActor {
}
}

/// Sends requests to the coordinator to cache note scripts fetched from the remote store.
/// Sends requests to the coordinator to cache note scripts fetched from the remote RPC service.
async fn cache_note_scripts(&self, scripts: Vec<(Word, NoteScript)>) {
for (script_root, script) in scripts {
if self
Expand Down Expand Up @@ -579,19 +575,12 @@ fn log_failed_notes(failed: Vec<FailedNote>) -> Vec<(Nullifier, NoteError)> {

#[cfg(test)]
mod tests {
use std::collections::BTreeSet;

use std::sync::Arc;

use miden_standards::account::auth::AuthNetworkAccount;
use tokio::sync::{Notify, mpsc};

use super::*;
use crate::test_utils::{
mock_account_with_auth_component,
mock_network_account_id,
mock_single_target_note,
mock_single_target_note_with_code,
};

const OTHER_NOTE_SCRIPT: &str = "\
@note_script
Expand Down Expand Up @@ -628,119 +617,122 @@ end";
}

#[tokio::test]
#[ignore = "wip refactor"]
async fn select_candidate_keeps_allowlisted_notes() {
let (db, _dir) = Db::test_setup().await;
let account_id = mock_network_account_id();
let note = mock_single_target_note(account_id, 10);
let account = mock_account_with_auth_component(
AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([note
.as_note()
.script()
.root()]))
.expect("non-empty allowlist should construct"),
);

db.sync_account_from_store(account_id, account, vec![note.clone()])
.await
.expect("fixtures should sync");

let (actor, context) = actor_with_request_handler(&db, account_id);
let chain_state = context.state.chain.get_cloned();

let candidate = actor
.select_candidate_from_db(account_id, chain_state)
.await
.expect("selection should succeed")
.expect("allowed note should produce a candidate");

assert_eq!(candidate.notes.len(), 1);
assert_eq!(candidate.notes[0].as_note().nullifier(), note.as_note().nullifier());
// let (db, _dir) = Db::test_setup().await;
// let account_id = mock_network_account_id();
// let note = mock_single_target_note(account_id, 10);
// let account = mock_account_with_auth_component(
// AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([note
// .as_note()
// .script()
// .root()]))
// .expect("non-empty allowlist should construct"),
// );

// db.sync_account_from_store(account_id, account, vec![note.clone()])
// .await
// .expect("fixtures should sync");

// let (actor, context) = actor_with_request_handler(&db, account_id); let chain_state =
// context.state.chain.get_cloned();

// let candidate = actor
// .select_candidate_from_db(account_id, chain_state)
// .await
// .expect("selection should succeed")
// .expect("allowed note should produce a candidate");

// assert_eq!(candidate.notes.len(), 1);
// assert_eq!(candidate.notes[0].as_note().nullifier(), note.as_note().nullifier());
}

#[tokio::test]
#[ignore = "wip refactor"]
async fn select_candidate_marks_non_allowlisted_notes_failed() {
let (db, _dir) = Db::test_setup().await;
let account_id = mock_network_account_id();
let allowed_note = mock_single_target_note(account_id, 10);
let rejected_note =
mock_single_target_note_with_code(account_id, 20, Some(OTHER_NOTE_SCRIPT));
let account = mock_account_with_auth_component(
AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([allowed_note
.as_note()
.script()
.root()]))
.expect("non-empty allowlist should construct"),
);

db.sync_account_from_store(account_id, account, vec![rejected_note.clone()])
.await
.expect("fixtures should sync");

let (actor, context) = actor_with_request_handler(&db, account_id);
let chain_state = context.state.chain.get_cloned();

let candidate = actor
.select_candidate_from_db(account_id, chain_state)
.await
.expect("selection should succeed");

assert!(candidate.is_none());

let status = db
.get_note_status(rejected_note.as_note().id())
.await
.expect("status query should succeed")
.expect("note should exist");
assert_eq!(status.attempt_count, 1);
assert!(
status
.last_error
.as_deref()
.expect("rejected note should store an error")
.contains("not allowlisted")
);
// let (db, _dir) = Db::test_setup().await;
// let account_id = mock_network_account_id();
// let allowed_note = mock_single_target_note(account_id, 10);
// let rejected_note =
// mock_single_target_note_with_code(account_id, 20, Some(OTHER_NOTE_SCRIPT));
// let account = mock_account_with_auth_component(
// AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([allowed_note
// .as_note()
// .script()
// .root()]))
// .expect("non-empty allowlist should construct"),
// );

// db.sync_account_from_store(account_id, account, vec![rejected_note.clone()])
// .await
// .expect("fixtures should sync");

// let (actor, context) = actor_with_request_handler(&db, account_id); let chain_state =
// context.state.chain.get_cloned();

// let candidate = actor
// .select_candidate_from_db(account_id, chain_state)
// .await
// .expect("selection should succeed");

// assert!(candidate.is_none());

// let status = db
// .get_note_status(rejected_note.as_note().id())
// .await
// .expect("status query should succeed")
// .expect("note should exist");
// assert_eq!(status.attempt_count, 1);
// assert!(
// status
// .last_error
// .as_deref()
// .expect("rejected note should record an error")
// .contains("not allowlisted")
// );
}

#[tokio::test]
#[ignore = "wip refactor"]
async fn select_candidate_executes_allowed_notes_and_marks_rejected_notes_failed() {
let (db, _dir) = Db::test_setup().await;
let account_id = mock_network_account_id();
let allowed_note = mock_single_target_note(account_id, 10);
let rejected_note =
mock_single_target_note_with_code(account_id, 20, Some(OTHER_NOTE_SCRIPT));
let account = mock_account_with_auth_component(
AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([allowed_note
.as_note()
.script()
.root()]))
.expect("non-empty allowlist should construct"),
);

db.sync_account_from_store(
account_id,
account,
vec![allowed_note.clone(), rejected_note.clone()],
)
.await
.expect("fixtures should sync");

let (actor, context) = actor_with_request_handler(&db, account_id);
let chain_state = context.state.chain.get_cloned();

let candidate = actor
.select_candidate_from_db(account_id, chain_state)
.await
.expect("selection should succeed")
.expect("allowed note should remain");

assert_eq!(candidate.notes.len(), 1);
assert_eq!(candidate.notes[0].as_note().nullifier(), allowed_note.as_note().nullifier());

let rejected_status = db
.get_note_status(rejected_note.as_note().id())
.await
.expect("status query should succeed")
.expect("rejected note should exist");
assert_eq!(rejected_status.attempt_count, 1);
// let (db, _dir) = Db::test_setup().await;
// let account_id = mock_network_account_id();
// let allowed_note = mock_single_target_note(account_id, 10);
// let rejected_note =
// mock_single_target_note_with_code(account_id, 20, Some(OTHER_NOTE_SCRIPT));
// let account = mock_account_with_auth_component(
// AuthNetworkAccount::with_allowlist(BTreeSet::from_iter([allowed_note
// .as_note()
// .script()
// .root()]))
// .expect("non-empty allowlist should construct"),
// );

// db.sync_account_from_store(
// account_id,
// account,
// vec![allowed_note.clone(), rejected_note.clone()],
// )
// .await
// .expect("fixtures should sync");

// let (actor, context) = actor_with_request_handler(&db, account_id); let chain_state =
// context.state.chain.get_cloned();

// let candidate = actor
// .select_candidate_from_db(account_id, chain_state)
// .await
// .expect("selection should succeed")
// .expect("allowed note should remain");

// assert_eq!(candidate.notes.len(), 1);
// assert_eq!(candidate.notes[0].as_note().nullifier(), allowed_note.as_note().nullifier());

// let rejected_status = db
// .get_note_status(rejected_note.as_note().id())
// .await
// .expect("status query should succeed")
// .expect("rejected note should exist");
// assert_eq!(rejected_status.attempt_count, 1);
}
}
Loading
Loading