Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/testapp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
cosmossdk.io/log v1.6.0
github.com/ipfs/go-datastore v0.8.2
github.com/rollkit/rollkit v0.0.0-00010101000000-000000000000
github.com/rollkit/rollkit/core v0.0.0-20250312114929-104787ba1a4c
github.com/rollkit/rollkit/da v0.0.0-00010101000000-000000000000
github.com/rollkit/rollkit/sequencers/single v0.0.0-00010101000000-000000000000
github.com/rs/zerolog v1.34.0
Expand Down Expand Up @@ -147,7 +148,6 @@ require (
github.com/quic-go/quic-go v0.50.1 // indirect
github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/rollkit/rollkit/core v0.0.0-20250312114929-104787ba1a4c // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand Down
7 changes: 7 additions & 0 deletions apps/testapp/kv/kvexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
"github.com/rollkit/rollkit/core/execution"
"github.com/rollkit/rollkit/pkg/store"
)

Expand Down Expand Up @@ -243,6 +244,12 @@ func (k *KVExecutor) SetFinal(ctx context.Context, blockHeight uint64) error {
return k.db.Put(ctx, ds.NewKey("/finalizedHeight"), []byte(fmt.Sprintf("%d", blockHeight)))
}

// GetExecutionMode returns the execution mode for this executor.
// KVExecutor uses delayed execution mode.
func (k *KVExecutor) GetExecutionMode() execution.ExecutionMode {
return execution.ExecutionModeDelayed
}

// InjectTx adds a transaction to the mempool channel.
// Uses a non-blocking send to avoid blocking the caller if the channel is full.
func (k *KVExecutor) InjectTx(tx []byte) {
Expand Down
149 changes: 114 additions & 35 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@
metrics *Metrics

exec coreexecutor.Executor
// executionMode caches the execution mode from the executor
executionMode coreexecutor.ExecutionMode

// daIncludedHeight is rollkit height at which all blocks have been included
// in the DA
Expand Down Expand Up @@ -351,6 +353,8 @@
daH := atomic.Uint64{}
daH.Store(s.DAHeight)

executionMode := exec.GetExecutionMode()

m := &Manager{
signer: signer,
config: config,
Expand Down Expand Up @@ -380,6 +384,7 @@
metrics: seqMetrics,
sequencer: sequencer,
exec: exec,
executionMode: executionMode,
da: da,
gasPrice: gasPrice,
gasMultiplier: gasMultiplier,
Expand Down Expand Up @@ -800,22 +805,26 @@
}

// // Verify that the header's timestamp is strictly greater than the last block's time
// headerTime := header.Time()
// if header.Height() > 1 && lastState.LastBlockTime.After(headerTime) {
// return fmt.Errorf("block time must be strictly increasing: got %v, last block time was %v",
// headerTime.UnixNano(), lastState.LastBlockTime)
// }

// // Validate that the header's AppHash matches the lastState's AppHash
// // Note: Assumes deferred execution
// if !bytes.Equal(header.AppHash, lastState.AppHash) {
// return fmt.Errorf("app hash mismatch: expected %x, got %x", lastState.AppHash, header.AppHash)
// }
headerTime := header.Time()
if header.Height() > 1 && lastState.LastBlockTime.After(headerTime) {
return fmt.Errorf("block time must be strictly increasing: got %v, last block time was %v",
headerTime.UnixNano(), lastState.LastBlockTime)
}

// Validate AppHash based on execution mode
if m.executionMode == coreexecutor.ExecutionModeDelayed {
// In delayed mode, AppHash should match the last state's AppHash
if !bytes.Equal(header.AppHash, lastState.AppHash) {
return fmt.Errorf("appHash mismatch in delayed mode: expected %x, got %x", lastState.AppHash, header.AppHash)
}
}
// Note: For immediate mode, we can't validate AppHash against lastState because
// it represents the state after executing the current block's transactions

return nil
}

func (m *Manager) execCreateBlock(_ context.Context, height uint64, lastSignature *types.Signature, lastHeaderHash types.Hash, _ types.State, batchData *BatchData) (*types.SignedHeader, *types.Data, error) {
func (m *Manager) execCreateBlock(ctx context.Context, height uint64, lastSignature *types.Signature, lastHeaderHash types.Hash, lastState types.State, batchData *BatchData) (*types.SignedHeader, *types.Data, error) {
// Use when batchData is set to data IDs from the DA layer
// batchDataIDs := convertBatchDataToBytes(batchData.Data)

Expand All @@ -840,21 +849,54 @@
// Determine if this is an empty block
isEmpty := batchData.Batch == nil || len(batchData.Transactions) == 0

// Create block data with appropriate transactions
blockData := &types.Data{
Txs: make(types.Txs, 0), // Start with empty transaction list
}

// Only add transactions if this is not an empty block
if !isEmpty {
blockData.Txs = make(types.Txs, len(batchData.Transactions))
for i := range batchData.Transactions {
blockData.Txs[i] = types.Tx(batchData.Transactions[i])
}
}

// Determine AppHash based on execution mode
var appHash []byte
if m.executionMode == coreexecutor.ExecutionModeImmediate && !isEmpty {
// For immediate execution, execute transactions now to get the new state root
rawTxs := make([][]byte, len(blockData.Txs))
for i := range blockData.Txs {
rawTxs[i] = blockData.Txs[i]
}

Check warning on line 872 in block/manager.go

View check run for this annotation

Codecov / codecov/patch

block/manager.go#L868-L872

Added lines #L868 - L872 were not covered by tests

// Execute transactions
newStateRoot, _, err := m.exec.ExecuteTxs(ctx, rawTxs, height, batchData.Time, lastState.AppHash)
if err != nil {
return nil, nil, fmt.Errorf("failed to execute transactions for immediate mode: %w", err)
}
appHash = newStateRoot

Check warning on line 879 in block/manager.go

View check run for this annotation

Codecov / codecov/patch

block/manager.go#L875-L879

Added lines #L875 - L879 were not covered by tests
} else {
// For delayed execution or empty blocks, use the app hash from last state
appHash = lastState.AppHash
}

header := &types.SignedHeader{
Header: types.Header{
Version: types.Version{
Block: m.lastState.Version.Block,
App: m.lastState.Version.App,
Block: lastState.Version.Block,
App: lastState.Version.App,
},
BaseHeader: types.BaseHeader{
ChainID: m.lastState.ChainID,
ChainID: lastState.ChainID,
Height: height,
Time: uint64(batchData.UnixNano()), //nolint:gosec // why is time unix? (tac0turtle)
},
LastHeaderHash: lastHeaderHash,
// DataHash is set at the end of the function
// DataHash is set below
ConsensusHash: make(types.Hash, 32),
AppHash: m.lastState.AppHash,
AppHash: appHash,
ProposerAddress: m.genesis.ProposerAddress,
},
Signature: *lastSignature,
Expand All @@ -864,17 +906,8 @@
},
}

// Create block data with appropriate transactions
blockData := &types.Data{
Txs: make(types.Txs, 0), // Start with empty transaction list
}

// Only add transactions if this is not an empty block
// Set DataHash
if !isEmpty {
blockData.Txs = make(types.Txs, len(batchData.Transactions))
for i := range batchData.Transactions {
blockData.Txs[i] = types.Tx(batchData.Transactions[i])
}
header.DataHash = blockData.DACommitment()
} else {
header.DataHash = dataHashForEmptyTxs
Expand All @@ -884,15 +917,61 @@
}

func (m *Manager) execApplyBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) (types.State, error) {
rawTxs := make([][]byte, len(data.Txs))
for i := range data.Txs {
rawTxs[i] = data.Txs[i]
}
var newStateRoot []byte

// Check if we need to execute transactions
if m.executionMode == coreexecutor.ExecutionModeImmediate {
// For immediate mode, the aggregator already executed during block creation
// But syncing nodes need to execute to verify the AppHash
// Check if we're the block creator by verifying we signed this block
isBlockCreator := false
if m.signer != nil {
myAddress, err := m.signer.GetAddress()
if err == nil && bytes.Equal(header.ProposerAddress, myAddress) {
// Also verify this is the current height we're producing
currentHeight, err := m.store.Height(ctx)
if err == nil && header.Height() == currentHeight+1 {
isBlockCreator = true
}

Check warning on line 935 in block/manager.go

View check run for this annotation

Codecov / codecov/patch

block/manager.go#L924-L935

Added lines #L924 - L935 were not covered by tests
}
}

ctx = context.WithValue(ctx, types.SignedHeaderContextKey, header)
newStateRoot, _, err := m.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), lastState.AppHash)
if err != nil {
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
if isBlockCreator {
// We created this block, so we already executed transactions in execCreateBlock
// Just use the AppHash from the header
newStateRoot = header.AppHash
} else {
// We're syncing this block, need to execute to verify AppHash
rawTxs := make([][]byte, len(data.Txs))
for i := range data.Txs {
rawTxs[i] = data.Txs[i]
}

Check warning on line 948 in block/manager.go

View check run for this annotation

Codecov / codecov/patch

block/manager.go#L939-L948

Added lines #L939 - L948 were not covered by tests

ctx = context.WithValue(ctx, types.SignedHeaderContextKey, header)
computedStateRoot, _, err := m.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), lastState.AppHash)
if err != nil {
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
}

Check warning on line 954 in block/manager.go

View check run for this annotation

Codecov / codecov/patch

block/manager.go#L950-L954

Added lines #L950 - L954 were not covered by tests

// Verify the AppHash matches
if !bytes.Equal(header.AppHash, computedStateRoot) {
return types.State{}, fmt.Errorf("AppHash mismatch in immediate mode: expected %x, got %x", computedStateRoot, header.AppHash)
}
newStateRoot = computedStateRoot

Check warning on line 960 in block/manager.go

View check run for this annotation

Codecov / codecov/patch

block/manager.go#L957-L960

Added lines #L957 - L960 were not covered by tests
}
} else {
// For delayed mode, always execute transactions
rawTxs := make([][]byte, len(data.Txs))
for i := range data.Txs {
rawTxs[i] = data.Txs[i]
}

ctx = context.WithValue(ctx, types.SignedHeaderContextKey, header)
var err error
newStateRoot, _, err = m.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), lastState.AppHash)
if err != nil {
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
}
}
Comment on lines +920 to 975
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Potential logic issue in block creator detection.

The block creator detection logic has a potential race condition or timing issue. The check header.Height() == currentHeight+1 might not be reliable in all scenarios, especially during concurrent operations or if there are delays in processing.

Consider simplifying the block creator detection or adding additional safeguards.

-		// Check if we're the block creator by verifying we signed this block
-		isBlockCreator := false
-		if m.signer != nil {
-			myAddress, err := m.signer.GetAddress()
-			if err == nil && bytes.Equal(header.ProposerAddress, myAddress) {
-				// Also verify this is the current height we're producing
-				currentHeight, err := m.store.Height(ctx)
-				if err == nil && header.Height() == currentHeight+1 {
-					isBlockCreator = true
-				}
-			}
-		}
+		// Check if we're the block creator by verifying we signed this block
+		isBlockCreator := false
+		if m.signer != nil {
+			myAddress, err := m.signer.GetAddress()
+			if err == nil && bytes.Equal(header.ProposerAddress, myAddress) {
+				isBlockCreator = true
+			}
+		}

The height check might be unnecessary since the proposer address check should be sufficient to determine if we created the block.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var newStateRoot []byte
// Check if we need to execute transactions
if m.executionMode == coreexecutor.ExecutionModeImmediate {
// For immediate mode, the aggregator already executed during block creation
// But syncing nodes need to execute to verify the AppHash
// Check if we're the block creator by verifying we signed this block
isBlockCreator := false
if m.signer != nil {
myAddress, err := m.signer.GetAddress()
if err == nil && bytes.Equal(header.ProposerAddress, myAddress) {
// Also verify this is the current height we're producing
currentHeight, err := m.store.Height(ctx)
if err == nil && header.Height() == currentHeight+1 {
isBlockCreator = true
}
}
}
ctx = context.WithValue(ctx, types.SignedHeaderContextKey, header)
newStateRoot, _, err := m.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), lastState.AppHash)
if err != nil {
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
if isBlockCreator {
// We created this block, so we already executed transactions in execCreateBlock
// Just use the AppHash from the header
newStateRoot = header.AppHash
} else {
// We're syncing this block, need to execute to verify AppHash
rawTxs := make([][]byte, len(data.Txs))
for i := range data.Txs {
rawTxs[i] = data.Txs[i]
}
ctx = context.WithValue(ctx, types.SignedHeaderContextKey, header)
computedStateRoot, _, err := m.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), lastState.AppHash)
if err != nil {
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
}
// Verify the AppHash matches
if !bytes.Equal(header.AppHash, computedStateRoot) {
return types.State{}, fmt.Errorf("AppHash mismatch in immediate mode: expected %x, got %x", computedStateRoot, header.AppHash)
}
newStateRoot = computedStateRoot
}
} else {
// For delayed mode, always execute transactions
rawTxs := make([][]byte, len(data.Txs))
for i := range data.Txs {
rawTxs[i] = data.Txs[i]
}
ctx = context.WithValue(ctx, types.SignedHeaderContextKey, header)
var err error
newStateRoot, _, err = m.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), lastState.AppHash)
if err != nil {
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
}
}
var newStateRoot []byte
// Check if we need to execute transactions
if m.executionMode == coreexecutor.ExecutionModeImmediate {
// For immediate mode, the aggregator already executed during block creation
// But syncing nodes need to execute to verify the AppHash
// Check if we're the block creator by verifying we signed this block
isBlockCreator := false
if m.signer != nil {
myAddress, err := m.signer.GetAddress()
if err == nil && bytes.Equal(header.ProposerAddress, myAddress) {
isBlockCreator = true
}
}
if isBlockCreator {
// We created this block, so we already executed transactions in execCreateBlock
// Just use the AppHash from the header
newStateRoot = header.AppHash
} else {
// We're syncing this block, need to execute to verify AppHash
rawTxs := make([][]byte, len(data.Txs))
for i := range data.Txs {
rawTxs[i] = data.Txs[i]
}
ctx = context.WithValue(ctx, types.SignedHeaderContextKey, header)
computedStateRoot, _, err := m.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), lastState.AppHash)
if err != nil {
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
}
// Verify the AppHash matches
if !bytes.Equal(header.AppHash, computedStateRoot) {
return types.State{}, fmt.Errorf("AppHash mismatch in immediate mode: expected %x, got %x", computedStateRoot, header.AppHash)
}
newStateRoot = computedStateRoot
}
} else {
// For delayed mode, always execute transactions
rawTxs := make([][]byte, len(data.Txs))
for i := range data.Txs {
rawTxs[i] = data.Txs[i]
}
ctx = context.WithValue(ctx, types.SignedHeaderContextKey, header)
var err error
newStateRoot, _, err = m.exec.ExecuteTxs(ctx, rawTxs, header.Height(), header.Time(), lastState.AppHash)
if err != nil {
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
}
}
🤖 Prompt for AI Agents
In block/manager.go around lines 929 to 984, the block creator detection logic
uses a height check that may cause race conditions or timing issues. To fix
this, remove the height comparison and rely solely on verifying if the signer’s
address matches the header’s proposer address to determine block creator status.
This simplifies the logic and avoids potential concurrency problems.


s, err := lastState.NextState(header, newStateRoot)
Expand Down
42 changes: 20 additions & 22 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,28 +575,26 @@ func TestManager_execValidate(t *testing.T) {
require.NoError(err)
})

// TODO: https://github.com/rollkit/rollkit/issues/2250

// t.Run("non-monotonic block time with height > 1", func(t *testing.T) {
// state, header, data, privKey := makeValid()
// state.LastBlockTime = time.Now().Add(time.Minute)
// state.LastBlockHeight = 1
// header.BaseHeader.Height = state.LastBlockHeight + 1
// data.Metadata.Height = state.LastBlockHeight + 1
// signer, err := noopsigner.NewNoopSigner(privKey)
// require.NoError(err)
// header.Signature, err = types.GetSignature(header.Header, signer)
// require.NoError(err)
// err = m.execValidate(state, header, data)
// require.ErrorContains(err, "block time must be strictly increasing")
// })

// t.Run("app hash mismatch", func(t *testing.T) {
// state, header, data, _ := makeValid()
// state.AppHash = []byte("different")
// err := m.execValidate(state, header, data)
// require.ErrorContains(err, "app hash mismatch")
// })
t.Run("non-monotonic block time with height > 1", func(t *testing.T) {
state, header, data, privKey := makeValid()
state.LastBlockTime = time.Now().Add(time.Minute)
state.LastBlockHeight = 1
header.BaseHeader.Height = state.LastBlockHeight + 1
data.Metadata.Height = state.LastBlockHeight + 1
signer, err := noopsigner.NewNoopSigner(privKey)
require.NoError(err)
header.Signature, err = types.GetSignature(header.Header, signer)
require.NoError(err)
err = m.execValidate(state, header, data)
require.ErrorContains(err, "block time must be strictly increasing")
})

t.Run("app hash mismatch", func(t *testing.T) {
state, header, data, _ := makeValid()
state.AppHash = []byte("different")
err := m.execValidate(state, header, data)
require.ErrorContains(err, "appHash mismatch in delayed mode")
})
}

// TestGetterMethods tests simple getter methods for the Manager
Expand Down
5 changes: 5 additions & 0 deletions block/publish_block_p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

coreexecution "github.com/rollkit/rollkit/core/execution"
coresequencer "github.com/rollkit/rollkit/core/sequencer"
"github.com/rollkit/rollkit/pkg/config"
genesispkg "github.com/rollkit/rollkit/pkg/genesis"
Expand Down Expand Up @@ -236,6 +237,10 @@ func (m mockExecutor) SetFinal(ctx context.Context, blockHeight uint64) error {
return nil
}

func (m mockExecutor) GetExecutionMode() coreexecution.ExecutionMode {
return coreexecution.ExecutionModeDelayed
}

var rnd = rand.New(rand.NewSource(1)) //nolint:gosec // test code only

func bytesN(n int) []byte {
Expand Down
6 changes: 6 additions & 0 deletions core/execution/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@
return fmt.Errorf("cannot set finalized block at height %d", blockHeight)
}

// GetExecutionMode returns the execution mode for this executor.
// DummyExecutor uses delayed execution mode.
func (e *DummyExecutor) GetExecutionMode() ExecutionMode {
return ExecutionModeDelayed

Check warning on line 97 in core/execution/dummy.go

View check run for this annotation

Codecov / codecov/patch

core/execution/dummy.go#L96-L97

Added lines #L96 - L97 were not covered by tests
}

func (e *DummyExecutor) removeExecutedTxs(txs [][]byte) {
e.injectedTxs = slices.DeleteFunc(e.injectedTxs, func(tx []byte) bool {
return slices.ContainsFunc(txs, func(t []byte) bool { return bytes.Equal(tx, t) })
Expand Down
31 changes: 31 additions & 0 deletions core/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,23 @@ import (
"time"
)

// ExecutionMode defines when state transitions are applied relative to block creation
type ExecutionMode int

const (
// ExecutionModeDelayed means the AppHash in block N represents the state after executing block N-1
// This is the traditional approach used by CometBFT/Tendermint
ExecutionModeDelayed ExecutionMode = iota

// ExecutionModeImmediate means the AppHash in block N represents the state after executing block N
// Transactions are executed during block creation and the resulting state root is included
ExecutionModeImmediate

// ExecutionModeOptimistic means transactions are executed optimistically and finalized later
// This allows for faster block creation but requires a finalization step to confirm state
// ExecutionModeOptimistic
)

// Executor defines the interface that execution clients must implement to be compatible with Rollkit.
// This interface enables the separation between consensus and execution layers, allowing for modular
// and pluggable execution environments.
Expand Down Expand Up @@ -53,6 +70,7 @@ type Executor interface {
// - Must maintain deterministic execution
// - Must respect context cancellation/timeout
// - The rest of the rules are defined by the specific execution layer
// - Must verify state root after execution
//
// Parameters:
// - ctx: Context for timeout/cancellation control
Expand Down Expand Up @@ -82,4 +100,17 @@ type Executor interface {
// Returns:
// - error: Any errors during finalization
SetFinal(ctx context.Context, blockHeight uint64) error

// GetExecutionMode returns the execution mode supported by this executor.
// This determines when state transitions are applied:
// - Delayed: AppHash in block N represents state after executing block N-1
// - Immediate: AppHash in block N represents state after executing block N
//
// Requirements:
// - Must return consistent value across all calls
// - Mode cannot change after initialization
//
// Returns:
// - ExecutionMode: The execution mode supported by this executor
GetExecutionMode() ExecutionMode
}
7 changes: 7 additions & 0 deletions execution/evm/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ func (c *EngineClient) SetFinal(ctx context.Context, blockHeight uint64) error {
return c.setFinal(ctx, blockHash, true)
}

// GetExecutionMode returns the execution mode for this executor.
// EngineClient uses immediate execution mode since it calculates the state root
// during transaction execution via engine_getPayloadV4.
func (c *EngineClient) GetExecutionMode() execution.ExecutionMode {
return execution.ExecutionModeImmediate
}

func (c *EngineClient) derivePrevRandao(blockHeight uint64) common.Hash {
return common.BigToHash(new(big.Int).SetUint64(blockHeight))
}
Expand Down
Loading
Loading