Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion apps/evm/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var RunCmd = &cobra.Command{
}

// Create sequencer based on configuration
sequencer, err := createSequencer(logger, datastore, nodeConfig, genesis, daClient)
sequencer, err := createSequencer(logger, datastore, nodeConfig, genesis, daClient, executor)
if err != nil {
return err
}
Expand Down Expand Up @@ -160,6 +160,7 @@ func createSequencer(
nodeConfig config.Config,
genesis genesis.Genesis,
daClient block.FullDAClient,
executor execution.Executor,
) (coresequencer.Sequencer, error) {
if nodeConfig.Node.BasedSequencer {
// Based sequencer mode - fetch transactions only from DA
Expand Down Expand Up @@ -193,6 +194,9 @@ func createSequencer(
return nil, fmt.Errorf("failed to create single sequencer: %w", err)
}

// Configure executor for DA transaction gas-based filtering
sequencer.SetExecutor(executor)

logger.Info().
Str("forced_inclusion_namespace", nodeConfig.DA.GetForcedInclusionNamespace()).
Msg("single sequencer initialized")
Expand Down
6 changes: 3 additions & 3 deletions apps/testapp/kv/http_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestHandleKV_Get(t *testing.T) {
// Create and execute the transaction directly
tx := []byte(fmt.Sprintf("%s=%s", tt.key, tt.value))
ctx := context.Background()
_, _, err := exec.ExecuteTxs(ctx, [][]byte{tx}, 1, time.Now(), []byte(""))
_, err := exec.ExecuteTxs(ctx, [][]byte{tx}, 1, time.Now(), []byte(""))
if err != nil {
t.Fatalf("Failed to execute setup transaction: %v", err)
}
Expand Down Expand Up @@ -287,13 +287,13 @@ func TestHTTPIntegration_GetKVWithMultipleHeights(t *testing.T) {

// Execute transactions at different heights for the same key
txsHeight1 := [][]byte{[]byte("testkey=original_value")}
_, _, err = exec.ExecuteTxs(ctx, txsHeight1, 1, time.Now(), []byte(""))
_, err = exec.ExecuteTxs(ctx, txsHeight1, 1, time.Now(), []byte(""))
if err != nil {
t.Fatalf("ExecuteTxs failed for height 1: %v", err)
}

txsHeight2 := [][]byte{[]byte("testkey=updated_value")}
_, _, err = exec.ExecuteTxs(ctx, txsHeight2, 2, time.Now(), []byte(""))
_, err = exec.ExecuteTxs(ctx, txsHeight2, 2, time.Now(), []byte(""))
if err != nil {
t.Fatalf("ExecuteTxs failed for height 2: %v", err)
}
Expand Down
64 changes: 46 additions & 18 deletions apps/testapp/kv/kvexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"
"time"

"github.com/evstack/ev-node/core/execution"
"github.com/evstack/ev-node/pkg/store"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -141,52 +142,52 @@ func (k *KVExecutor) computeStateRoot(ctx context.Context) ([]byte, error) {
// InitChain initializes the chain state with genesis parameters.
// It checks the database to see if genesis was already performed.
// If not, it computes the state root from the current DB state and persists genesis info.
func (k *KVExecutor) InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) ([]byte, uint64, error) {
func (k *KVExecutor) InitChain(ctx context.Context, genesisTime time.Time, initialHeight uint64, chainID string) ([]byte, error) {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
return nil, ctx.Err()
default:
}

initialized, err := k.db.Has(ctx, genesisInitializedKey)
if err != nil {
return nil, 0, fmt.Errorf("failed to check genesis initialization status: %w", err)
return nil, fmt.Errorf("failed to check genesis initialization status: %w", err)
}

if initialized {
genesisRoot, err := k.db.Get(ctx, genesisStateRootKey)
if err != nil {
return nil, 0, fmt.Errorf("genesis initialized but failed to retrieve state root: %w", err)
return nil, fmt.Errorf("genesis initialized but failed to retrieve state root: %w", err)
}
return genesisRoot, 1024, nil // Assuming 1024 is a constant gas value
return genesisRoot, nil
}

// Genesis not initialized. Compute state root from the current DB state.
// Note: The DB might not be empty if restarting, this reflects the state *at genesis time*.
stateRoot, err := k.computeStateRoot(ctx)
if err != nil {
return nil, 0, fmt.Errorf("failed to compute initial state root for genesis: %w", err)
return nil, fmt.Errorf("failed to compute initial state root for genesis: %w", err)
}

// Persist genesis state root and initialized flag
batch, err := k.db.Batch(ctx)
if err != nil {
return nil, 0, fmt.Errorf("failed to create batch for genesis persistence: %w", err)
return nil, fmt.Errorf("failed to create batch for genesis persistence: %w", err)
}
err = batch.Put(ctx, genesisStateRootKey, stateRoot)
if err != nil {
return nil, 0, fmt.Errorf("failed to put genesis state root in batch: %w", err)
return nil, fmt.Errorf("failed to put genesis state root in batch: %w", err)
}
err = batch.Put(ctx, genesisInitializedKey, []byte("true")) // Store a marker value
if err != nil {
return nil, 0, fmt.Errorf("failed to put genesis initialized flag in batch: %w", err)
return nil, fmt.Errorf("failed to put genesis initialized flag in batch: %w", err)
}
err = batch.Commit(ctx)
if err != nil {
return nil, 0, fmt.Errorf("failed to commit genesis persistence batch: %w", err)
return nil, fmt.Errorf("failed to commit genesis persistence batch: %w", err)
}

return stateRoot, 1024, nil // Assuming 1024 is a constant gas value
return stateRoot, nil
}

// GetTxs retrieves available transactions from the mempool channel.
Expand Down Expand Up @@ -222,16 +223,16 @@ func (k *KVExecutor) GetTxs(ctx context.Context) ([][]byte, error) {
// ExecuteTxs processes each transaction assumed to be in the format "key=value".
// It updates the database accordingly using a batch and removes the executed transactions from the mempool.
// Invalid transactions are filtered out and logged, but execution continues.
func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) ([]byte, uint64, error) {
func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) ([]byte, error) {
select {
case <-ctx.Done():
return nil, 0, ctx.Err()
return nil, ctx.Err()
default:
}

batch, err := k.db.Batch(ctx)
if err != nil {
return nil, 0, fmt.Errorf("failed to create database batch: %w", err)
return nil, fmt.Errorf("failed to create database batch: %w", err)
}

validTxCount := 0
Expand Down Expand Up @@ -274,7 +275,7 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u
err = batch.Put(ctx, dsKey, []byte(value))
if err != nil {
// This error is unlikely for Put unless the context is cancelled.
return nil, 0, fmt.Errorf("failed to stage put operation in batch for key '%s': %w", key, err)
return nil, fmt.Errorf("failed to stage put operation in batch for key '%s': %w", key, err)
}
validTxCount++
}
Expand All @@ -287,18 +288,18 @@ func (k *KVExecutor) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight u
// Commit the batch to apply all changes atomically
err = batch.Commit(ctx)
if err != nil {
return nil, 0, fmt.Errorf("failed to commit transaction batch: %w", err)
return nil, fmt.Errorf("failed to commit transaction batch: %w", err)
}

// Compute the new state root *after* successful commit
stateRoot, err := k.computeStateRoot(ctx)
if err != nil {
// This is problematic, state was changed but root calculation failed.
// May need more robust error handling or recovery logic.
return nil, 0, fmt.Errorf("failed to compute state root after executing transactions: %w", err)
return nil, fmt.Errorf("failed to compute state root after executing transactions: %w", err)
}

return stateRoot, 1024, nil
return stateRoot, nil
}

// SetFinal marks a block as finalized at the specified height.
Expand Down Expand Up @@ -422,3 +423,30 @@ func (k *KVExecutor) Rollback(ctx context.Context, height uint64) error {
func getTxKey(height uint64, txKey string) ds.Key {
return heightKeyPrefix.Child(ds.NewKey(fmt.Sprintf("%d/%s", height, txKey)))
}

// GetExecutionInfo returns execution layer parameters.
// For KVExecutor, returns MaxGas=0 indicating no gas-based filtering.
func (k *KVExecutor) GetExecutionInfo(ctx context.Context, height uint64) (execution.ExecutionInfo, error) {
return execution.ExecutionInfo{MaxGas: 0}, nil
}

// FilterDATransactions validates and filters force-included transactions from DA.
// For KVExecutor, all transactions are considered valid (no gas-based filtering).
// Invalid transactions (not in key=value format) are filtered out.
func (k *KVExecutor) FilterDATransactions(ctx context.Context, txs [][]byte, maxGas uint64) ([][]byte, [][]byte, error) {
// KVExecutor doesn't do gas filtering but does basic validation
validTxs := make([][]byte, 0, len(txs))
for _, tx := range txs {
if len(tx) == 0 {
continue // Skip empty transactions
}
// Basic format validation: must be key=value
parts := strings.SplitN(string(tx), "=", 2)
if len(parts) != 2 || strings.TrimSpace(parts[0]) == "" {
continue // Filter out malformed transactions
}
validTxs = append(validTxs, tx)
}
// No gas-based filtering, so no remaining transactions
return validTxs, nil, nil
}
63 changes: 21 additions & 42 deletions apps/testapp/kv/kvexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,19 @@ func TestInitChain_Idempotency(t *testing.T) {
chainID := "test-chain"

// First call initializes genesis state
stateRoot1, maxBytes1, err := exec.InitChain(ctx, genesisTime, initialHeight, chainID)
stateRoot1, err := exec.InitChain(ctx, genesisTime, initialHeight, chainID)
if err != nil {
t.Fatalf("InitChain failed on first call: %v", err)
}
if maxBytes1 != 1024 {
t.Errorf("Expected maxBytes 1024, got %d", maxBytes1)
}

// Second call should return the same genesis state root
stateRoot2, maxBytes2, err := exec.InitChain(ctx, genesisTime, initialHeight, chainID)
stateRoot2, err := exec.InitChain(ctx, genesisTime, initialHeight, chainID)
if err != nil {
t.Fatalf("InitChain failed on second call: %v", err)
}
if !bytes.Equal(stateRoot1, stateRoot2) {
t.Errorf("Genesis state roots do not match: %s vs %s", stateRoot1, stateRoot2)
}
if maxBytes2 != 1024 {
t.Errorf("Expected maxBytes 1024, got %d", maxBytes2)
}
}

func TestGetTxs(t *testing.T) {
Expand All @@ -58,43 +52,34 @@ func TestGetTxs(t *testing.T) {
// though for buffered channels it should be immediate unless full.
time.Sleep(10 * time.Millisecond)

// First call to GetTxs should retrieve the injected transactions
txs, err := exec.GetTxs(ctx)
if err != nil {
t.Fatalf("GetTxs returned error on first call: %v", err)
t.Fatalf("GetTxs returned error: %v", err)
}
if len(txs) != 2 {
t.Errorf("Expected 2 transactions on first call, got %d", len(txs))
t.Errorf("Expected 2 transactions, got %d", len(txs))
}

// Verify the content (order might not be guaranteed depending on channel internals, but likely FIFO here)
foundTx1 := false
foundTx2 := false
for _, tx := range txs {
if reflect.DeepEqual(tx, tx1) {
foundTx1 = true
}
if reflect.DeepEqual(tx, tx2) {
foundTx2 = true
}
if !reflect.DeepEqual(txs[0], tx1) {
t.Errorf("Expected first tx 'a=1', got %s", string(txs[0]))
}
if !foundTx1 || !foundTx2 {
t.Errorf("Did not retrieve expected transactions. Got: %v", txs)
if !reflect.DeepEqual(txs[1], tx2) {
t.Errorf("Expected second tx 'b=2', got %s", string(txs[1]))
}

// Second call to GetTxs should return no transactions as the channel was drained
txsAfterDrain, err := exec.GetTxs(ctx)
// GetTxs should drain the channel, so a second call should return empty or nil
txsAgain, err := exec.GetTxs(ctx)
if err != nil {
t.Fatalf("GetTxs returned error on second call: %v", err)
t.Fatalf("GetTxs (second call) returned error: %v", err)
}
if len(txsAfterDrain) != 0 {
t.Errorf("Expected 0 transactions after drain, got %d", len(txsAfterDrain))
if len(txsAgain) != 0 {
t.Errorf("Expected 0 transactions on second call (drained), got %d", len(txsAgain))
}

// Test injecting again after draining
// Inject another transaction and verify it's available
tx3 := []byte("c=3")
exec.InjectTx(tx3)
time.Sleep(10 * time.Millisecond)

txsAfterReinject, err := exec.GetTxs(ctx)
if err != nil {
t.Fatalf("GetTxs returned error after re-inject: %v", err)
Expand All @@ -120,13 +105,10 @@ func TestExecuteTxs_Valid(t *testing.T) {
[]byte("key2=value2"),
}

stateRoot, maxBytes, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
stateRoot, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
if err != nil {
t.Fatalf("ExecuteTxs failed: %v", err)
}
if maxBytes != 1024 {
t.Errorf("Expected maxBytes 1024, got %d", maxBytes)
}

// Check that stateRoot contains the updated key-value pairs
rootStr := string(stateRoot)
Expand All @@ -152,13 +134,10 @@ func TestExecuteTxs_Invalid(t *testing.T) {
[]byte(""),
}

stateRoot, maxBytes, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
stateRoot, err := exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
if err != nil {
t.Fatalf("ExecuteTxs should handle gibberish gracefully, got error: %v", err)
}
if maxBytes != 1024 {
t.Errorf("Expected maxBytes 1024, got %d", maxBytes)
}

// State root should still be computed (empty block is valid)
if stateRoot == nil {
Expand All @@ -173,7 +152,7 @@ func TestExecuteTxs_Invalid(t *testing.T) {
[]byte(""),
}

stateRoot2, _, err := exec.ExecuteTxs(ctx, mixedTxs, 2, time.Now(), stateRoot)
stateRoot2, err := exec.ExecuteTxs(ctx, mixedTxs, 2, time.Now(), stateRoot)
if err != nil {
t.Fatalf("ExecuteTxs should filter invalid transactions and process valid ones, got error: %v", err)
}
Expand Down Expand Up @@ -213,7 +192,7 @@ func TestReservedKeysExcludedFromAppHash(t *testing.T) {
ctx := context.Background()

// Initialize chain to set up genesis state (this writes genesis reserved keys)
_, _, err = exec.InitChain(ctx, time.Now(), 1, "test-chain")
_, err = exec.InitChain(ctx, time.Now(), 1, "test-chain")
if err != nil {
t.Fatalf("Failed to initialize chain: %v", err)
}
Expand All @@ -223,7 +202,7 @@ func TestReservedKeysExcludedFromAppHash(t *testing.T) {
[]byte("user/key1=value1"),
[]byte("user/key2=value2"),
}
_, _, err = exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
_, err = exec.ExecuteTxs(ctx, txs, 1, time.Now(), []byte(""))
if err != nil {
t.Fatalf("Failed to execute transactions: %v", err)
}
Expand Down Expand Up @@ -279,7 +258,7 @@ func TestReservedKeysExcludedFromAppHash(t *testing.T) {
moreTxs := [][]byte{
[]byte("user/key3=value3"),
}
_, _, err = exec.ExecuteTxs(ctx, moreTxs, 2, time.Now(), stateRootAfterReservedKeyWrite)
_, err = exec.ExecuteTxs(ctx, moreTxs, 2, time.Now(), stateRootAfterReservedKeyWrite)
if err != nil {
t.Fatalf("Failed to execute more transactions: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions block/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) {

// Mock InitChain to succeed initially
mockExec.On("InitChain", mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return([]byte("state-root"), uint64(1024), nil).Once()
Return([]byte("state-root"), nil).Once()

// Mock SetDAHeight to be called during initialization
mockSeq.On("SetDAHeight", uint64(0)).Return().Once()
Expand All @@ -220,7 +220,7 @@ func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) {
// Mock ExecuteTxs to fail with a critical error
criticalError := errors.New("execution client RPC connection failed")
mockExec.On("ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(nil, uint64(0), criticalError).Maybe()
Return(nil, criticalError).Maybe()

// Create aggregator node
components, err := NewAggregatorComponents(
Expand Down
2 changes: 1 addition & 1 deletion block/internal/common/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (s *Replayer) replayBlock(ctx context.Context, height uint64) error {
Int("tx_count", len(rawTxs)).
Msg("executing transactions on execution layer")

newAppHash, _, err := s.exec.ExecuteTxs(ctx, rawTxs, height, header.Time(), prevState.AppHash)
newAppHash, err := s.exec.ExecuteTxs(ctx, rawTxs, height, header.Time(), prevState.AppHash)
if err != nil {
return fmt.Errorf("failed to execute transactions: %w", err)
}
Expand Down
Loading
Loading