Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions apps/evm/cmd/rollback.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package cmd

import (
"bytes"
"context"
"errors"
"fmt"
"os"

"github.com/ethereum/go-ethereum/common"
ds "github.com/ipfs/go-datastore"
"github.com/spf13/cobra"

goheaderstore "github.com/celestiaorg/go-header/store"
"github.com/evstack/ev-node/execution/evm"
rollcmd "github.com/evstack/ev-node/pkg/cmd"
"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/types"
Expand Down Expand Up @@ -42,7 +47,7 @@ func NewRollbackCmd() *cobra.Command {

defer func() {
if closeErr := rawEvolveDB.Close(); closeErr != nil {
fmt.Printf("Warning: failed to close evolve database: %v\n", closeErr)
cmd.Printf("Warning: failed to close evolve database: %v\n", closeErr)
}
}()

Expand All @@ -63,6 +68,17 @@ func NewRollbackCmd() *cobra.Command {
return fmt.Errorf("failed to rollback ev-node state: %w", err)
}

// rollback execution layer via EngineClient
engineClient, err := createRollbackEngineClient(cmd, rawEvolveDB)
if err != nil {
cmd.Printf("Warning: failed to create engine client, skipping EL rollback: %v\n", err)
} else {
if err := engineClient.Rollback(goCtx, height); err != nil {
return fmt.Errorf("failed to rollback execution layer: %w", err)
}
cmd.Printf("Rolled back execution layer to height %d\n", height)
}

// rollback ev-node goheader state
headerStore, err := goheaderstore.NewStore[*types.SignedHeader](
evolveDB,
Expand Down Expand Up @@ -101,7 +117,7 @@ func NewRollbackCmd() *cobra.Command {
errs = errors.Join(errs, fmt.Errorf("failed to rollback data sync service state: %w", err))
}

fmt.Printf("Rolled back ev-node state to height %d\n", height)
cmd.Printf("Rolled back ev-node state to height %d\n", height)
if syncNode {
fmt.Println("Restart the node with the `--evnode.clear_cache` flag")
}
Expand All @@ -113,5 +129,42 @@ func NewRollbackCmd() *cobra.Command {
cmd.Flags().Uint64Var(&height, "height", 0, "rollback to a specific height")
cmd.Flags().BoolVar(&syncNode, "sync-node", false, "sync node (no aggregator)")

// EVM flags for execution layer rollback
cmd.Flags().String(evm.FlagEvmEthURL, "http://localhost:8545", "URL of the Ethereum JSON-RPC endpoint")
cmd.Flags().String(evm.FlagEvmEngineURL, "http://localhost:8551", "URL of the Engine API endpoint")
cmd.Flags().String(evm.FlagEvmJWTSecretFile, "", "Path to file containing the JWT secret for authentication")

return cmd
}

func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching) (*evm.EngineClient, error) {
ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL)
if err != nil {
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEthURL, err)
}
engineURL, err := cmd.Flags().GetString(evm.FlagEvmEngineURL)
if err != nil {
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEngineURL, err)
}

jwtSecretFile, err := cmd.Flags().GetString(evm.FlagEvmJWTSecretFile)
if err != nil {
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmJWTSecretFile, err)
}

if jwtSecretFile == "" {
return nil, fmt.Errorf("JWT secret file must be provided via --evm.jwt-secret-file for EL rollback")
}

secretBytes, err := os.ReadFile(jwtSecretFile)
if err != nil {
return nil, fmt.Errorf("failed to read JWT secret from file '%s': %w", jwtSecretFile, err)
}
jwtSecret := string(bytes.TrimSpace(secretBytes))

if jwtSecret == "" {
return nil, fmt.Errorf("JWT secret file '%s' is empty", jwtSecretFile)
}

return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, common.Hash{}, common.Address{}, db, false)
}
12 changes: 12 additions & 0 deletions core/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,15 @@ type HeightProvider interface {
// - error: Any errors during height retrieval
GetLatestHeight(ctx context.Context) (uint64, error)
}

// Rollbackable is an optional interface that execution clients can implement
// to support automatic rollback when the execution layer is ahead of the target height.
// This enables automatic recovery during rolling restarts when the EL has committed
// blocks that were not replicated to the consensus layer.
//
// Requirements:
// - Only execution layers supporting in-flight rollback should implement this.
type Rollbackable interface {
// Rollback resets the execution layer head to the specified height.
Rollback(ctx context.Context, targetHeight uint64) error
}
119 changes: 102 additions & 17 deletions execution/evm/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ var (
// Ensure EngineAPIExecutionClient implements the execution.Execute interface
var _ execution.Executor = (*EngineClient)(nil)

// Ensure EngineClient implements the execution.HeightProvider interface
var _ execution.HeightProvider = (*EngineClient)(nil)

// Ensure EngineClient implements the execution.Rollbackable interface
var _ execution.Rollbackable = (*EngineClient)(nil)

// validatePayloadStatus checks the payload status and returns appropriate errors.
// It implements the Engine API specification's status handling:
// - VALID: Operation succeeded, return nil
Expand Down Expand Up @@ -338,7 +344,7 @@ func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) {
func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, maxBytes uint64, err error) {

// 1. Check for idempotent execution
stateRoot, payloadID, found, err := c.checkIdempotency(ctx, blockHeight, timestamp, txs)
stateRoot, payloadID, found, err := c.reconcileExecutionAtHeight(ctx, blockHeight, timestamp, txs)
if err != nil {
c.logger.Warn().Err(err).Uint64("height", blockHeight).Msg("ExecuteTxs: idempotency check failed")
// Continue execution on error, as it might be transient
Expand Down Expand Up @@ -548,6 +554,7 @@ func (c *EngineClient) setFinalWithHeight(ctx context.Context, blockHash common.

// doForkchoiceUpdate performs the actual forkchoice update RPC call with retry logic.
func (c *EngineClient) doForkchoiceUpdate(ctx context.Context, args engine.ForkchoiceStateV1, operation string) error {

// Call forkchoice update with retry logic for SYNCING status
err := retryWithBackoffOnPayloadStatus(ctx, func() error {
forkchoiceResult, err := c.engineClient.ForkchoiceUpdated(ctx, args, nil)
Expand Down Expand Up @@ -690,35 +697,57 @@ func (c *EngineClient) ResumePayload(ctx context.Context, payloadIDBytes []byte)
return stateRoot, err
}

// checkIdempotency checks if the block at the given height and timestamp has already been executed.
// reconcileExecutionAtHeight checks if the block at the given height and timestamp has already been executed.
// It returns:
// - stateRoot: non-nil if block is already promoted/finalized (idempotent success)
// - payloadID: non-nil if block execution was started but not finished (resume needed)
// - found: true if either of the above is true
// - err: error during checks
func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) {
func (c *EngineClient) reconcileExecutionAtHeight(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) {
// 1. Check ExecMeta from store
execMeta, err := c.store.GetExecMeta(ctx, height)
if err == nil && execMeta != nil {
// If we already have a promoted block at this height, return the stored StateRoot
// If we already have a promoted block at this height, verify timestamp matches
// to catch Dual-Store Conflicts where ExecMeta was saved for an old block
// that was later replaced via consensus.
if execMeta.Stage == ExecStagePromoted && len(execMeta.StateRoot) > 0 {
c.logger.Info().
if execMeta.Timestamp == timestamp.Unix() {
c.logger.Info().
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still a weak check without hash but it fixed the problems on restart

Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: reusing already-promoted execution (idempotent)")
return execMeta.StateRoot, nil, true, nil
}
// Timestamp mismatch - ExecMeta is stale from an old block that was replaced.
// Ignore it and proceed to EL check which will handle rollback if needed.
c.logger.Warn().
Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: reusing already-promoted execution (idempotent)")
return execMeta.StateRoot, nil, true, nil
Int64("execmeta_timestamp", execMeta.Timestamp).
Int64("requested_timestamp", timestamp.Unix()).
Msg("ExecuteTxs: ExecMeta timestamp mismatch, ignoring stale promoted record")
}

// If we have a started execution with a payloadID, return it to resume
// If we have a started execution with a payloadID, validate it still exists before resuming.
// After node restart, the EL's payload cache is ephemeral and the payloadID may be stale.
if execMeta.Stage == ExecStageStarted && len(execMeta.PayloadID) == 8 {
c.logger.Info().
Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume")

var pid engine.PayloadID
copy(pid[:], execMeta.PayloadID)
return nil, &pid, true, nil

// Validate payload still exists by attempting to retrieve it
if _, err = c.engineClient.GetPayload(ctx, pid); err == nil {
c.logger.Info().
Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume")
return nil, &pid, true, nil
}
// Payload is stale (expired or node restarted) - proceed with fresh execution
c.logger.Warn().
Uint64("height", height).
Str("payloadID", pid.String()).
Err(err).
Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute")
// Don't return - fall through to fresh execution
}
}

Expand All @@ -744,12 +773,27 @@ func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, time

return existingStateRoot.Bytes(), nil, true, nil
}
// Timestamp mismatch - log warning but proceed
// We need to rollback the EL to height-1 so it can re-execute
c.logger.Warn().
Uint64("height", height).
Uint64("existingTimestamp", existingTimestamp).
Int64("requestedTimestamp", timestamp.Unix()).
Msg("ExecuteTxs: block exists at height but timestamp differs")
Msg("ExecuteTxs: block exists at height but timestamp differs - rolling back EL to re-sync")

// Rollback to height-1 to allow re-execution with correct timestamp
if height > 0 {
if err := c.Rollback(ctx, height-1); err != nil {
c.logger.Error().Err(err).
Uint64("height", height).
Uint64("rollback_target", height-1).
Msg("ExecuteTxs: failed to rollback EL for timestamp mismatch")
return nil, nil, false, fmt.Errorf("failed to rollback EL for timestamp mismatch at height %d: %w", height, err)
}
c.logger.Info().
Uint64("height", height).
Uint64("rollback_target", height-1).
Msg("ExecuteTxs: EL rolled back successfully, will re-execute with correct timestamp")
}
}

return nil, nil, false, nil
Expand Down Expand Up @@ -907,6 +951,47 @@ func (c *EngineClient) GetLatestHeight(ctx context.Context) (uint64, error) {
return header.Number.Uint64(), nil
}

// Rollback resets the execution layer head to the specified height using forkchoice update.
// This is used for recovery when the EL is ahead of the consensus layer (e.g., during rolling restarts
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this function be called while the node is running?

Copy link
Member

@julienrbrt julienrbrt Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For EVM yes (as it resends a payload), for the SDK no. (and our DM discussion #2983 (comment), showed that ev-abci won't implement it)
So maybe we should add that only execution layer supporting in-flight rollback should implement this then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want inflight? is that a goal for later on?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the doc to mention in-flight updates on the interface.
This method is used during startup reconcile only.
Another use-case could be within HA when the leader lock is lost during block production. But this is an edge case.

// Implements the execution.Rollbackable interface.
func (c *EngineClient) Rollback(ctx context.Context, targetHeight uint64) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we wire the command directly then?
This would fix #2967.

For consitency, a good follow-up would be to implement this in ev-abci and use it in the rollback command as well instead of directly calling: https://github.com/evstack/ev-abci/blob/e905b0b/server/rollback_cmd.go#L91-L95.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I have added it to the rollback cmd.
Not started abci. I will DM you.

// Get block hash at target height
blockHash, _, _, _, err := c.getBlockInfo(ctx, targetHeight)
if err != nil {
return fmt.Errorf("get block at height %d: %w", targetHeight, err)
}

c.logger.Info().
Uint64("target_height", targetHeight).
Str("block_hash", blockHash.Hex()).
Msg("rolling back execution layer via forkchoice update")

// Reset head, safe, and finalized to target block
// This forces the EL to reorg its canonical chain to the target height
c.mu.Lock()
c.currentHeadBlockHash = blockHash
c.currentHeadHeight = targetHeight
c.currentSafeBlockHash = blockHash
c.currentFinalizedBlockHash = blockHash
args := engine.ForkchoiceStateV1{
HeadBlockHash: blockHash,
SafeBlockHash: blockHash,
FinalizedBlockHash: blockHash,
}
c.mu.Unlock()

if err := c.doForkchoiceUpdate(ctx, args, "Rollback"); err != nil {
return fmt.Errorf("forkchoice update for rollback failed: %w", err)
}

c.logger.Info().
Uint64("target_height", targetHeight).
Msg("execution layer rollback completed")

return nil
}

// decodeSecret decodes a hex-encoded JWT secret string into a byte slice.
func decodeSecret(jwtSecret string) ([]byte, error) {
secret, err := hex.DecodeString(strings.TrimPrefix(jwtSecret, "0x"))
Expand Down
Loading