Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ miden-node-store = { path = "crates/store", version = "0.15" }
miden-node-test-macro = { path = "crates/test-macro" }
miden-node-utils = { path = "crates/utils", version = "0.15" }
miden-remote-prover-client = { path = "crates/remote-prover-client", version = "0.15" }
miden-validator = { path = "bin/validator", version = "0.15" }
# Temporary workaround until <https://github.com/rust-rocksdb/rust-rocksdb/pull/1029>
# is part of `rocksdb-rust` release
miden-node-rocksdb-cxx-linkage-fix = { path = "crates/rocksdb-cxx-linkage-fix", version = "0.15" }
Expand Down
27 changes: 13 additions & 14 deletions bin/stress-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@ version.workspace = true
workspace = true

[dependencies]
clap = { features = ["derive"], workspace = true }
fs-err = { workspace = true }
futures = { workspace = true }
miden-node-block-producer = { workspace = true }
miden-node-proto = { workspace = true }
miden-node-store = { workspace = true }
miden-node-utils = { workspace = true }
miden-protocol = { workspace = true }
miden-standards = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
tokio = { workspace = true }
tonic = { default-features = true, workspace = true }
url = { workspace = true }
clap = { features = ["derive"], workspace = true }
fs-err = { workspace = true }
futures = { workspace = true }
miden-node-proto = { workspace = true }
miden-node-store = { workspace = true }
miden-node-utils = { workspace = true }
miden-protocol = { workspace = true }
miden-standards = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
tokio = { workspace = true }
tonic = { default-features = true, workspace = true }
url = { workspace = true }
4 changes: 2 additions & 2 deletions bin/stress-test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ async fn main() {
vault_entries,
account_update_blocks,
} => {
seed_store(
Box::pin(seed_store(
data_directory,
num_accounts,
public_accounts_percentage,
storage_map_entries,
vault_entries,
account_update_blocks,
)
))
.await;
},
Command::BenchmarkStore {
Expand Down
88 changes: 46 additions & 42 deletions bin/stress-test/src/seeding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use std::sync::{Arc, Mutex};
use std::time::Instant;

use metrics::SeedingMetrics;
use miden_node_block_producer::store::StoreClient;
use miden_node_proto::domain::batch::BatchInputs;
use miden_node_proto::generated::store::rpc_client::RpcClient;
use miden_node_store::state::State;
use miden_node_store::{DataDirectory, GenesisState, Store, StoreMode};
use miden_node_utils::clap::{GrpcOptionsInternal, StorageOptions};
use miden_node_utils::tracing::grpc::OtelInterceptor;
Expand Down Expand Up @@ -131,9 +131,7 @@ pub async fn seed_store(
.expect("genesis block should be created");
Store::bootstrap(genesis_block, &data_directory).expect("store should bootstrap");

// start the store
let (_, store_url) = start_store(data_directory.clone()).await;
let store_client = StoreClient::new(store_url);
let store_state = load_state(data_directory.clone()).await;

// start generating blocks
let accounts_filepath = data_directory.join(ACCOUNTS_FILENAME);
Expand All @@ -145,7 +143,7 @@ pub async fn seed_store(
public_accounts_percentage,
faucet,
genesis_header,
&store_client,
&store_state,
data_directory,
accounts_filepath,
&signer,
Expand All @@ -171,7 +169,7 @@ async fn generate_blocks(
public_accounts_percentage: u8,
mut faucet: Account,
genesis_block: SignedBlock,
store_client: &StoreClient,
store_state: &Arc<State>,
data_directory: DataDirectory,
accounts_filepath: PathBuf,
signer: &EcdsaSecretKey,
Expand Down Expand Up @@ -260,11 +258,11 @@ async fn generate_blocks(
.collect();

// create the block and send it to the store
let block_inputs = get_block_inputs(store_client, &batches, &mut metrics).await;
let block_inputs = get_block_inputs(store_state, &batches, &mut metrics).await;

// update blocks
prev_block_header =
apply_block(batches, block_inputs, store_client, &mut metrics, signer).await;
apply_block(batches, block_inputs, store_state, &mut metrics, signer).await;
account_states
.extend(pending_consumed_accounts.into_iter().map(|account| (account.id(), account)));
if current_anchor_header.block_epoch() != prev_block_header.block_epoch() {
Expand All @@ -273,7 +271,7 @@ async fn generate_blocks(

// create the consume notes txs to be used in the next block
let batch_inputs =
get_batch_inputs(store_client, &prev_block_header, &notes, &mut metrics).await;
get_batch_inputs(store_state, &prev_block_header, &notes, &mut metrics).await;
(pending_consumed_accounts, consume_notes_txs) = create_consume_note_txs(
&prev_block_header,
accounts,
Expand Down Expand Up @@ -317,18 +315,18 @@ async fn generate_blocks(
.map(|txs| create_batch(txs, &prev_block_header))
.collect();

let block_inputs = get_block_inputs(store_client, &batches, &mut metrics).await;
let block_inputs = get_block_inputs(store_state, &batches, &mut metrics).await;

prev_block_header =
apply_block(batches, block_inputs, store_client, &mut metrics, signer).await;
apply_block(batches, block_inputs, store_state, &mut metrics, signer).await;
account_states
.extend(pending_consumed_accounts.into_iter().map(|account| (account.id(), account)));
if current_anchor_header.block_epoch() != prev_block_header.block_epoch() {
current_anchor_header = prev_block_header.clone();
}

let batch_inputs =
get_batch_inputs(store_client, &prev_block_header, &notes, &mut metrics).await;
get_batch_inputs(store_state, &prev_block_header, &notes, &mut metrics).await;
let accounts = selected_account_ids
.iter()
.filter_map(|account_id| account_states.get(account_id).cloned())
Expand Down Expand Up @@ -358,30 +356,33 @@ async fn generate_blocks(
metrics
}

/// Given a list of batches and block inputs, creates a `ProvenBlock` and sends it to the store.
/// Given a list of batches and block inputs, creates a `ProvenBlock` and applies it to the store.
/// Tracks the insertion time on the metrics.
///
/// Returns the the inserted block.
async fn apply_block(
batches: Vec<ProvenBatch>,
block_inputs: BlockInputs,
store_client: &StoreClient,
store_state: &Arc<State>,
metrics: &mut SeedingMetrics,
signer: &EcdsaSecretKey,
) -> BlockHeader {
let proposed_block = ProposedBlock::new(block_inputs, batches).unwrap();
let proposed_block = ProposedBlock::new(block_inputs.clone(), batches).unwrap();
let (header, body) = proposed_block.clone().into_header_and_body().unwrap();
let block_size: usize = header.to_bytes().len() + body.to_bytes().len();
let signature = signer.sign(header.commitment());
// SAFETY: The header, body, and signature are known to correspond to each other.
let signed_block = SignedBlock::new_unchecked(header, body, signature);
let header = signed_block.header().clone();
let ordered_batches = proposed_block.batches().clone();

let start = Instant::now();
store_client.apply_block(&ordered_batches, &signed_block).await.unwrap();
store_state
.apply_block_with_proving_inputs(ordered_batches, block_inputs, signed_block)
.await
.unwrap();
metrics.track_block_insertion(start.elapsed(), block_size);

let (header, ..) = signed_block.into_parts();
header
}

Expand Down Expand Up @@ -792,18 +793,18 @@ fn create_emit_note_tx(

/// Gets the batch inputs from the store and tracks the query time on the metrics.
async fn get_batch_inputs(
store_client: &StoreClient,
store_state: &Arc<State>,
block_ref: &BlockHeader,
notes: &[Note],
metrics: &mut SeedingMetrics,
) -> BatchInputs {
let start = Instant::now();
// Mark every note as unauthenticated, so that the store returns the inclusion proofs for all of
// them
let batch_inputs = store_client
let batch_inputs = store_state
.get_batch_inputs(
vec![(block_ref.block_num(), block_ref.commitment())].into_iter(),
notes.iter().map(Note::commitment),
[block_ref.block_num()].into_iter().collect(),
notes.iter().map(Note::commitment).collect(),
)
.await
.unwrap();
Expand All @@ -813,22 +814,25 @@ async fn get_batch_inputs(

/// Gets the block inputs from the store and tracks the query time on the metrics.
async fn get_block_inputs(
store_client: &StoreClient,
store_state: &Arc<State>,
batches: &[ProvenBatch],
metrics: &mut SeedingMetrics,
) -> BlockInputs {
let start = Instant::now();
let inputs = store_client
let inputs = store_state
.get_block_inputs(
batches.iter().flat_map(ProvenBatch::updated_accounts),
batches.iter().flat_map(ProvenBatch::created_nullifiers),
batches.iter().flat_map(|batch| {
batch
.input_notes()
.into_iter()
.filter_map(|note| note.header().map(NoteHeader::to_commitment))
}),
batches.iter().map(ProvenBatch::reference_block_num),
batches.iter().flat_map(ProvenBatch::updated_accounts).collect(),
batches.iter().flat_map(ProvenBatch::created_nullifiers).collect(),
batches
.iter()
.flat_map(|batch| {
batch
.input_notes()
.into_iter()
.filter_map(|note| note.header().map(NoteHeader::to_commitment))
})
.collect(),
batches.iter().map(ProvenBatch::reference_block_num).collect(),
)
.await
.unwrap();
Expand All @@ -848,20 +852,13 @@ pub async fn start_store(
let rpc_listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("Failed to bind store RPC gRPC endpoint");
let block_producer_listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("Failed to bind store block-producer gRPC endpoint");
let store_addr = rpc_listener.local_addr().expect("Failed to get store RPC address");
let store_block_producer_addr = block_producer_listener
.local_addr()
.expect("Failed to get store block-producer address");
let dir = data_directory.clone();

task::spawn(async move {
Store {
rpc_listener,
mode: StoreMode::BlockProducer {
block_producer_listener,
mode: StoreMode::Sequencer {
block_prover_url: None,
max_concurrent_proofs: miden_node_store::DEFAULT_MAX_CONCURRENT_PROOFS,
},
Expand All @@ -881,7 +878,14 @@ pub async fn start_store(
.await
.expect("Failed to connect to store");

// SAFETY: The store_block_producer_addr is always valid as it is created from a `SocketAddr`.
let store_url = Url::parse(&format!("http://{store_block_producer_addr}")).unwrap();
let store_url = Url::parse(&format!("http://{store_addr}")).unwrap();
(RpcClient::with_interceptor(channel, OtelInterceptor), store_url)
}

async fn load_state(data_directory: PathBuf) -> Arc<State> {
let (termination_ask, _termination_signal) = tokio::sync::mpsc::channel(1);
let (state, _) = State::load(&data_directory, StorageOptions::bench(), termination_ask)
.await
.expect("store state should load");
Arc::new(state)
}
3 changes: 1 addition & 2 deletions crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ anyhow = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
miden-node-proto = { workspace = true }
miden-node-store = { workspace = true }
miden-node-utils = { features = ["testing"], workspace = true }
miden-protocol = { default-features = true, workspace = true }
miden-remote-prover-client = { features = ["batch-prover", "block-prover"], workspace = true }
Expand All @@ -39,11 +40,9 @@ url = { workspace = true }

[dev-dependencies]
assert_matches = { workspace = true }
miden-node-store = { workspace = true }
miden-node-utils = { features = ["testing"], workspace = true }
miden-protocol = { default-features = true, features = ["testing"], workspace = true }
miden-standards = { features = ["testing"], workspace = true }
miden-validator = { workspace = true }
pretty_assertions = { workspace = true }
rand_chacha = { default-features = false, workspace = true }
serial_test = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/block-producer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Contains code defining the [Miden node's block-producer](/README.md#architecture
ordering transactions into blocks and submitting these for inclusion in the blockchain.

It exposes an in-process API which the node's RPC component uses to submit new transactions. In turn, the
`block-producer` uses the store's gRPC API to submit blocks and query chain state.
`block-producer` uses the store's in-process state to submit blocks and query chain state.

For more information on the installation and operation of this component, please see the [node's readme](../../README.md).

Expand Down
16 changes: 10 additions & 6 deletions crates/block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;

use futures::TryFutureExt;
use miden_node_proto::domain::batch::BatchInputs;
use miden_node_store::state::State;
use miden_node_utils::spawn::spawn_blocking_in_current_span;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::MIN_PROOF_SECURITY_LEVEL;
Expand All @@ -19,9 +20,8 @@ use url::Url;

use crate::domain::batch::SelectedBatch;
use crate::domain::transaction::AuthenticatedTransaction;
use crate::errors::BuildBatchError;
use crate::errors::{BuildBatchError, StoreError};
use crate::mempool::SharedMempool;
use crate::store::StoreClient;
use crate::{COMPONENT, TelemetryInjectorExt};

// BATCH BUILDER
Expand Down Expand Up @@ -49,7 +49,7 @@ pub struct BatchBuilder {
///
/// Note: this _must_ be sign positive and less than 1.0.
failure_rate: f64,
store: StoreClient,
store: Arc<State>,
}

impl BatchBuilder {
Expand All @@ -58,7 +58,7 @@ impl BatchBuilder {
///
/// If no batch prover URL is provided, a local batch prover is used instead.
pub fn new(
store: StoreClient,
store: Arc<State>,
num_workers: NonZeroUsize,
batch_prover_url: Option<Url>,
batch_interval: Duration,
Expand Down Expand Up @@ -161,7 +161,7 @@ struct BatchJob {
///
/// Note: this _must_ be sign positive and less than 1.0.
failure_rate: f64,
store: StoreClient,
store: Arc<State>,
batch_prover: BatchProver,
mempool: SharedMempool,
}
Expand Down Expand Up @@ -224,8 +224,12 @@ impl BatchJob {
.flat_map(AuthenticatedTransaction::unauthenticated_note_commitments);

self.store
.get_batch_inputs(block_references, unauthenticated_notes)
.get_batch_inputs(
block_references.map(|(block_num, _)| block_num).collect(),
unauthenticated_notes.collect(),
)
.await
.map_err(StoreError::GetBatchInputsFailed)
.map_err(BuildBatchError::FetchBatchInputsFailed)
.map(|inputs| (batch, inputs))
}
Expand Down
Loading
Loading