Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Fixed

- Server: per-block execution timeouts (`--substreams-block-execution-timeout`) are no longer silently swallowed when a WASM host-function panic (e.g. wasmtime) coincides with the deadline. Previously, `recoverExecutionPanic` would return `nil` instead of `CodeDeadlineExceeded`, causing the offending block to be skipped and the stream to complete successfully.
- CI: Docker image login, build and push are now skipped for fork PRs; image is still built (without push) to validate the Dockerfile.

## v1.18.5
Expand Down
18 changes: 11 additions & 7 deletions pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,16 +663,18 @@ func recoverExecutionPanic(ctx context.Context, executionError error, recovered
recoveredErr = fmt.Errorf("%v", recovered)
}

// If the context deadline was exceeded, return a deadline exceeded error RPC error.
// This must be checked before context.Canceled, because the catch-all `ctx.Err() != nil`
// below would otherwise swallow deadline-exceeded panics and silently drop the block.
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
return connect.NewError(connect.CodeDeadlineExceeded, fmt.Errorf("execution timed out at block %s: %w", blockRef, recoveredErr))
}

// If recovering from context cancel, simply return the execution error as-is
if ctx.Err() != nil || errors.Is(recoveredErr, context.Canceled) {
return executionError
}

// If the context deadline was exceeded, return a deadline exceeded error RPC error
if ctx.Err() == context.DeadlineExceeded {
return connect.NewError(connect.CodeDeadlineExceeded, fmt.Errorf("execution timed out at block %s: %w", blockRef, recoveredErr))
}

// Otherwise, log the panic and return a generic error
if PrintStack {
debug.PrintStack()
Expand Down Expand Up @@ -721,8 +723,10 @@ func (p *Pipeline) execute(ctx context.Context, executor exec.ModuleExecutor, ex
return
}

// Move the panic up so it's handled at a higher level by those who call execute()
panic(fmt.Errorf("unknown error: %s", r))
// Move the panic up so it's handled at a higher level by those who call execute().
// Wrap with %w (not %s) to preserve the error chain so that callers can
// errors.Is(err, context.DeadlineExceeded) on the result.
panic(fmt.Errorf("unknown error: %w", recoveredErr))
}
}()

Expand Down
99 changes: 99 additions & 0 deletions pipeline/process_block_recover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package pipeline

import (
"context"
"errors"
"fmt"
"testing"
"time"

"connectrpc.com/connect"
"github.com/streamingfast/bstream"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestRecoverExecutionPanic_DeadlineExceeded is the regression test for the
// silent-swallow bug: when a WASM host-function panic coincides with the
// per-block execution deadline, recoverExecutionPanic must return a
// CodeDeadlineExceeded connect error β€” not nil β€” so the stream terminates
// instead of skipping the block.
func TestRecoverExecutionPanic_DeadlineExceeded(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
defer cancel()
// Wait for the deadline to fire.
<-ctx.Done()
require.ErrorIs(t, ctx.Err(), context.DeadlineExceeded)

blockRef := bstream.NewBlockRef("blk-id", 42)
recovered := fmt.Errorf("running wasm extension %q: %w", "eth::eth_call",
fmt.Errorf("timeout while doing eth_call: %w", context.DeadlineExceeded))

got := recoverExecutionPanic(ctx, nil, recovered, blockRef)

require.NotNil(t, got, "deadline-exceeded panic must not be silently swallowed")

var ce *connect.Error
require.ErrorAs(t, got, &ce, "result must be a connect.Error")
assert.Equal(t, connect.CodeDeadlineExceeded, ce.Code())

// The original error chain must be preserved so callers can introspect.
assert.ErrorIs(t, got, context.DeadlineExceeded)
assert.Contains(t, got.Error(), "blk-id")
}

// TestRecoverExecutionPanic_ContextCanceled covers the non-deadline cancel
// path: the recover must return the caller's executionError as-is (we don't
// invent a new error when the request was cancelled).
func TestRecoverExecutionPanic_ContextCanceled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
require.ErrorIs(t, ctx.Err(), context.Canceled)

blockRef := bstream.NewBlockRef("blk-id", 42)
executionErr := errors.New("some prior error")

got := recoverExecutionPanic(ctx, executionErr, errors.New("panicked"), blockRef)
assert.Same(t, executionErr, got, "canceled ctx should return executionError unchanged")
}

// TestRecoverExecutionPanic_RecoveredIsCanceled covers the case where ctx
// itself is alive but the panic value contains context.Canceled (e.g.
// propagated through wrapping).
func TestRecoverExecutionPanic_RecoveredIsCanceled(t *testing.T) {
ctx := context.Background()
blockRef := bstream.NewBlockRef("blk-id", 42)
executionErr := errors.New("some prior error")

recovered := fmt.Errorf("wrapped: %w", context.Canceled)
got := recoverExecutionPanic(ctx, executionErr, recovered, blockRef)
assert.Same(t, executionErr, got)
}

// TestRecoverExecutionPanic_GenericPanic verifies the fallback path: a real
// runtime panic with a live ctx should yield a "panic at block ..." error
// that wraps the recovered value.
func TestRecoverExecutionPanic_GenericPanic(t *testing.T) {
ctx := context.Background()
blockRef := bstream.NewBlockRef("blk-id", 42)

recovered := errors.New("nil pointer dereference")
got := recoverExecutionPanic(ctx, nil, recovered, blockRef)

require.NotNil(t, got)
assert.ErrorIs(t, got, recovered)
assert.Contains(t, got.Error(), "blk-id")
}

// TestRecoverExecutionPanic_NonErrorRecovered handles the case where the
// panic value isn't an `error` (e.g. a plain string panic from somewhere).
func TestRecoverExecutionPanic_NonErrorRecovered(t *testing.T) {
ctx := context.Background()
blockRef := bstream.NewBlockRef("blk-id", 42)

got := recoverExecutionPanic(ctx, nil, "something broke", blockRef)

require.NotNil(t, got)
assert.Contains(t, got.Error(), "something broke")
assert.Contains(t, got.Error(), "blk-id")
}
Loading