Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 14 additions & 120 deletions bin/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::time::Duration;
use allowlist::{NoteScriptNotAllowlisted, partition_by_allowlist};
use anyhow::Context;
use candidate::TransactionCandidate;
use futures::FutureExt;
use miden_node_utils::ErrorReport;
use miden_node_utils::lru_cache::LruCache;
use miden_protocol::Word;
Expand Down Expand Up @@ -231,93 +230,29 @@ impl AccountActor {
///
/// The return value signals the shutdown category to the coordinator:
///
/// - `Ok(())`: intentional shutdown (idle timeout or account removal).
/// - `Err(_)`: crash (database error, semaphore failure, or any other bug).
pub async fn run(self, semaphore: Arc<Semaphore>) -> anyhow::Result<()> {
/// - `Ok(())`: intentional shutdown (idle timeout or account not committed in time).
/// - `Err(_)`: crash (database error or any other bug).
pub async fn run(self, _semaphore: Arc<Semaphore>) -> anyhow::Result<()> {
let account_id = self.account_id;

// Wait for the account to be committed to the DB. For newly created accounts, the creation
// transaction must be committed before we start processing notes.
// transaction must be committed before the actor becomes active.
if !self.wait_for_committed_account(account_id).await? {
return Ok(());
}

// Determine initial mode by checking DB for available notes.
let block_num = self.state.chain.chain_tip_block_number();
let has_notes = self
.state
.db
.has_available_notes(account_id, block_num, self.config.max_note_attempts)
.await
.context("failed to check for available notes")?;

let mut mode = if has_notes {
ActorMode::NotesAvailable
} else {
ActorMode::NoViableNotes
};

loop {
// Enable or disable transaction execution based on actor mode.
let tx_permit_acquisition = match mode {
// Disable transaction execution.
ActorMode::NoViableNotes | ActorMode::TransactionInflight(_) => {
std::future::pending().boxed()
},
// Enable transaction execution.
ActorMode::NotesAvailable => semaphore.acquire().boxed(),
};

// Idle timeout timer: only ticks when in NoViableNotes mode. Mode changes cause the
// next loop iteration to create a fresh sleep or pending.
let idle_timeout_sleep = match mode {
ActorMode::NoViableNotes => tokio::time::sleep(self.config.idle_timeout).boxed(),
_ => std::future::pending().boxed(),
};

tokio::select! {
// Handle coordinator notifications. On notification, re-evaluate state from DB.
// A committed block touched this account (or the coordinator woke everyone). PR 3
// reconnects transaction execution here.
_ = self.notify.notified() => {
match mode {
ActorMode::TransactionInflight(awaited_id) => {
// Check DB: is the inflight tx still pending?
let exists = self
.state
.db
.transaction_exists(awaited_id)
.await
.context("failed to check transaction status")?;
if exists {
mode = ActorMode::NotesAvailable;
}
},
_ => {
mode = ActorMode::NotesAvailable;
}
}
},
// Execute transactions.
permit = tx_permit_acquisition => {
let _permit = permit.context("semaphore closed")?;

// Read the chain state.
let chain_state = self.state.chain.get_cloned();

// Query DB for latest account and available notes.
let tx_candidate = self.select_candidate_from_db(
account_id,
chain_state,
).await?;

if let Some(tx_candidate) = tx_candidate {
mode = self.execute_transactions(account_id, tx_candidate).await;
} else {
// No transactions to execute, wait for notifications.
mode = ActorMode::NoViableNotes;
}
tracing::debug!(
%account_id,
"actor notified; transaction execution reconnects in PR 3",
);
}
// Idle timeout: actor has been idle too long, deactivate account.
_ = idle_timeout_sleep => {
// Idle timeout: actor has been idle too long, deactivate.
() = tokio::time::sleep(self.config.idle_timeout) => {
tracing::info!(%account_id, "Account actor deactivated due to idle timeout");
return Ok(());
}
Expand Down Expand Up @@ -385,8 +320,8 @@ impl AccountActor {
/// For accounts that are being created by an inflight transaction, this will idle
/// until the transaction is committed. Returns `true` when the account is ready, or
/// `false` if no commit arrived within [`ActorConfig::idle_timeout`] — in which case
/// the coordinator will respawn a new actor when committed-chain processing or the account
/// loader observes the account again.
/// the coordinator will respawn a new actor when a later committed block targets the
/// account again.
async fn wait_for_committed_account(&self, account_id: AccountId) -> anyhow::Result<bool> {
// Check if the account is already committed.
if self
Expand Down Expand Up @@ -573,47 +508,6 @@ fn log_failed_notes(failed: Vec<FailedNote>) -> Vec<(Nullifier, NoteError)> {

#[cfg(test)]
mod tests {

use std::sync::Arc;

use tokio::sync::{Notify, mpsc};

use super::*;

const OTHER_NOTE_SCRIPT: &str = "\
@note_script
pub proc main
push.1 drop
end";

async fn ack_actor_requests(mut rx: mpsc::Receiver<ActorRequest>, db: Db) {
while let Some(request) = rx.recv().await {
match request {
ActorRequest::NotesFailed { failed_notes, block_num, ack_tx } => {
db.notes_failed(failed_notes, block_num)
.await
.expect("test DB write should succeed");
let _ = ack_tx.send(());
},
ActorRequest::CacheNoteScript { .. } => {},
}
}
}

fn actor_with_request_handler(
db: &Db,
account_id: AccountId,
) -> (AccountActor, AccountActorContext) {
let (request_tx, request_rx) = mpsc::channel(8);
let mut context = AccountActorContext::test(db);
context.request_tx = request_tx;
tokio::spawn(ack_actor_requests(request_rx, db.clone()));

let actor = AccountActor::new(account_id, &context, Arc::new(Notify::new()));

(actor, context)
}

#[tokio::test]
#[ignore = "wip refactor"]
async fn select_candidate_keeps_allowlisted_notes() {
Expand Down
Loading
Loading