Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions block/internal/reaping/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
coreexecutor "github.com/evstack/ev-node/core/execution"
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/sequencers/solo"
)

const (
Expand Down Expand Up @@ -193,6 +194,17 @@ func (r *Reaper) drainMempool(cleanupCh <-chan time.Time) (bool, error) {
Id: []byte(r.chainID),
Batch: &coresequencer.Batch{Transactions: newTxs},
})
if errors.Is(err, solo.ErrQueueFull) {
// Sequencer queue is full — backpressure signal. Mark the
// batch as "seen" so we don't waste cycles re-hashing the
// same dropped txs every reaper tick, and surface the drop
// as a warning rather than tearing down the daemon. The
// loadgen sees lower acceptance via /tx flow control once
// the executor's own mempool fills up.
r.cache.SetTxsSeen(newHashes)
r.logger.Warn().Int("dropped", len(newTxs)).Msg("sequencer queue full, dropping txs (backpressure)")
break
}
if err != nil {
return totalSubmitted > 0, fmt.Errorf("failed to submit txs to sequencer: %w", err)
}
Expand Down
66 changes: 46 additions & 20 deletions block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,18 @@ type Submitter struct {
// DA state
daIncludedHeight *atomic.Uint64

// Submission state to prevent concurrent submissions
headerSubmissionMtx sync.Mutex
dataSubmissionMtx sync.Mutex
// Submission concurrency: each semaphore is a buffered channel
// sized to MaxPendingHeadersAndData. A zero value disables the
// limit and falls back to a single in-flight submission per type
// (= cap 1) so callers that opt out of pending-cap don't get
// unbounded fan-out. Tickets are acquired non-blocking via
// `select` and released by the goroutine that started the
// submission. Replaces the previous single-flight Mutex which
// pinned data-upload throughput at the latency of a single
// gRPC round-trip — under sustained load that capped DA at
// ~20 MB/s even though Fibre's per-blob upload took ≤1.5 s.
headerSubmissionSem chan struct{}
dataSubmissionSem chan struct{}

// Batching strategy state
lastHeaderSubmit atomic.Int64 // stores Unix nanoseconds
Expand Down Expand Up @@ -95,20 +104,31 @@ func NewSubmitter(
strategy = NewTimeBasedStrategy(config.DA.BlockTime.Duration, 0, 1)
}

// Pool size = pending-cap. Each pending blob gets up to one
// in-flight submission; if the cap is 0 (unbounded pending) we
// keep at least one slot so we don't reintroduce single-flight
// behavior accidentally.
poolSize := int(config.Node.MaxPendingHeadersAndData)
if poolSize <= 0 {
poolSize = 1
}

submitter := &Submitter{
store: store,
exec: exec,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
daSubmitter: daSubmitter,
sequencer: sequencer,
signer: signer,
daIncludedHeight: &atomic.Uint64{},
batchingStrategy: strategy,
errorCh: errorCh,
logger: submitterLogger,
store: store,
exec: exec,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
daSubmitter: daSubmitter,
sequencer: sequencer,
signer: signer,
daIncludedHeight: &atomic.Uint64{},
batchingStrategy: strategy,
errorCh: errorCh,
logger: submitterLogger,
headerSubmissionSem: make(chan struct{}, poolSize),
dataSubmissionSem: make(chan struct{}, poolSize),
}

now := time.Now().UnixNano()
Expand Down Expand Up @@ -194,12 +214,13 @@ func (s *Submitter) daSubmissionLoop() {

// For strategy decision, we need to estimate the size
// We'll fetch headers to check, but only submit if strategy approves
if s.headerSubmissionMtx.TryLock() {
select {
case s.headerSubmissionSem <- struct{}{}:
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission in progress")
s.wg.Add(1)
go func() {
defer func() {
s.headerSubmissionMtx.Unlock()
<-s.headerSubmissionSem
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission completed")
s.wg.Done()
}()
Expand Down Expand Up @@ -266,6 +287,8 @@ func (s *Submitter) daSubmissionLoop() {
s.logger.Error().Err(err).Msg("failed to enqueue header submission")
}
}()
default:
// All header workers busy; try again on the next tick.
}
}

Expand All @@ -274,12 +297,13 @@ func (s *Submitter) daSubmissionLoop() {
if dataNb > 0 {
lastSubmitNanos := s.lastDataSubmit.Load()
timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos))
if s.dataSubmissionMtx.TryLock() {
select {
case s.dataSubmissionSem <- struct{}{}:
s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission in progress")
s.wg.Add(1)
go func() {
defer func() {
s.dataSubmissionMtx.Unlock()
<-s.dataSubmissionSem
s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission completed")
s.wg.Done()
}()
Expand Down Expand Up @@ -346,6 +370,8 @@ func (s *Submitter) daSubmissionLoop() {
s.logger.Error().Err(err).Msg("failed to enqueue data submission")
}
}()
default:
// All data workers busy; try again on the next tick.
}
}

Expand Down
24 changes: 13 additions & 11 deletions block/internal/submitting/submitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,17 +361,19 @@ func TestSubmitter_daSubmissionLoop(t *testing.T) {
require.NoError(t, err)

s := &Submitter{
store: st,
exec: exec,
cache: cm,
metrics: metrics,
config: cfg,
genesis: genesis.Genesis{},
daSubmitter: fakeDA,
signer: &fakeSigner{},
daIncludedHeight: &atomic.Uint64{},
batchingStrategy: batchingStrategy,
logger: zerolog.Nop(),
store: st,
exec: exec,
cache: cm,
metrics: metrics,
config: cfg,
genesis: genesis.Genesis{},
daSubmitter: fakeDA,
signer: &fakeSigner{},
daIncludedHeight: &atomic.Uint64{},
batchingStrategy: batchingStrategy,
logger: zerolog.Nop(),
headerSubmissionSem: make(chan struct{}, 1),
dataSubmissionSem: make(chan struct{}, 1),
}

// Set last submit times far in past so strategy allows submission
Expand Down
7 changes: 6 additions & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,12 @@ func (c *Config) ApplyFiberDefaults() {
}

c.DA.BlockTime = DurationWrapper{Duration: 1 * time.Second}
c.Node.MaxPendingHeadersAndData = 50
// Tighter pending cap (was 50). At 50, a Fibre upload stall lets the
// submitter accumulate 50 × ~32 MiB blob copies + their per-validator
// retry buffers; under load that exceeded c6in.8xlarge's 64 GiB and
// OOM-killed evnode at 63.8 GiB. 10 keeps the in-flight footprint
// bounded while still letting healthy uploads pipeline.
c.Node.MaxPendingHeadersAndData = 10
}

// GetNamespace returns the namespace for header submissions.
Expand Down
67 changes: 64 additions & 3 deletions pkg/sequencers/solo/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@ import (
coresequencer "github.com/evstack/ev-node/core/sequencer"
)

var ErrInvalidID = errors.New("invalid chain id")
var (
ErrInvalidID = errors.New("invalid chain id")
// ErrQueueFull is returned from SubmitBatchTxs when the in-memory
// queue is at its byte cap (see SetMaxQueueBytes). Callers should
// treat this as transient backpressure (drop or retry); the
// reaper bridging executor mempool → sequencer matches it via
// errors.Is and downgrades to a warning.
ErrQueueFull = errors.New("sequencer queue full")
)

var (
emptyBatch = &coresequencer.Batch{}
Expand All @@ -27,15 +35,26 @@ var _ coresequencer.Sequencer = (*SoloSequencer)(nil)
// SoloSequencer is a single-leader sequencer without forced inclusion
// support. It maintains a simple in-memory queue of mempool transactions and
// produces batches on demand.
//
// The queue can be bounded in bytes via SetMaxQueueBytes. A bound is
// strongly recommended in any high-throughput configuration: under
// sustained ingest above the block-production drain rate the queue
// otherwise grows monotonically until OOM. With a bound set,
// SubmitBatchTxs admits only as many incoming txs as fit and returns
// ErrQueueFull if the bound rejected at least one tx, so callers can
// surface backpressure (e.g. via HTTP 503) instead of silently
// retaining bytes.
type SoloSequencer struct {
logger zerolog.Logger
id []byte
executor execution.Executor

daHeight atomic.Uint64

mu sync.Mutex
queue [][]byte
mu sync.Mutex
queue [][]byte
queueBytes uint64
maxQueueBytes uint64 // 0 = unbounded (legacy default)
}

func NewSoloSequencer(
Expand All @@ -51,6 +70,16 @@ func NewSoloSequencer(
}
}

// SetMaxQueueBytes sets a soft cap on the sequencer's in-memory tx
// queue. SubmitBatchTxs admits txs in arrival order while the cap has
// room and returns ErrQueueFull as soon as one is rejected. A zero value
// disables the cap. Intended to be called once at startup.
func (s *SoloSequencer) SetMaxQueueBytes(n uint64) {
s.mu.Lock()
defer s.mu.Unlock()
s.maxQueueBytes = n
}

func (s *SoloSequencer) isValid(id []byte) bool {
return bytes.Equal(s.id, id)
}
Expand All @@ -67,7 +96,30 @@ func (s *SoloSequencer) SubmitBatchTxs(ctx context.Context, req coresequencer.Su
s.mu.Lock()
defer s.mu.Unlock()

if s.maxQueueBytes == 0 {
// Unbounded path (legacy). Suitable for tests and small
// deployments; in production use SetMaxQueueBytes.
s.queue = append(s.queue, req.Batch.Transactions...)
return submitBatchResp, nil
}

// All-or-nothing: if the whole incoming batch doesn't fit, reject
// it untouched. Partial admission would force the caller (e.g.
// the reaper bridging executor mempool → sequencer) to reason
// about which prefix was admitted and re-feed only the suffix on
// retry, which it doesn't currently do — leading to duplicate-tx
// resubmission on each retry. Rejecting the whole batch lets the
// reaper just retry with the same batch later when the queue has
// drained.
var batchBytes uint64
for _, tx := range req.Batch.Transactions {
batchBytes += uint64(len(tx))
}
if s.queueBytes+batchBytes > s.maxQueueBytes {
return submitBatchResp, ErrQueueFull
}
s.queue = append(s.queue, req.Batch.Transactions...)
s.queueBytes += batchBytes
return submitBatchResp, nil
}

Expand All @@ -79,6 +131,7 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN
s.mu.Lock()
txs := s.queue
s.queue = nil
s.queueBytes = 0
s.mu.Unlock()

if len(txs) == 0 {
Expand Down Expand Up @@ -122,6 +175,14 @@ func (s *SoloSequencer) GetNextBatch(ctx context.Context, req coresequencer.GetN
if len(postponedTxs) > 0 {
s.mu.Lock()
s.queue = append(postponedTxs, s.queue...)
// Postponed txs were already in the queue's byte count when
// SubmitBatchTxs admitted them. We zeroed queueBytes on drain
// above, so re-queuing requires re-counting whatever survived.
var bytes uint64
for _, tx := range postponedTxs {
bytes += uint64(len(tx))
}
s.queueBytes += bytes
s.mu.Unlock()
}

Expand Down
Loading
Loading