Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func NewRound(block VerifiedBlock) *Round {

type EpochConfig struct {
MaxProposalWait time.Duration
MaxRoundWindow uint64
MaxRebroadcastWait time.Duration
FinalizeRebroadcastTimeout time.Duration
QCDeserializer QCDeserializer
Expand Down Expand Up @@ -96,7 +97,6 @@ type Epoch struct {
oldestNotFinalizedNotarization NotarizationTime
futureMessages messagesFromNode
round uint64 // The current round we notarize
maxRoundWindow uint64
monitor *Monitor
haltedError error
cancelWaitForBlockNotarization context.CancelFunc
Expand Down Expand Up @@ -195,11 +195,10 @@ func (e *Epoch) init() error {
e.timedOutRounds = make(map[uint16]uint64, len(e.nodes))
e.redeemedRounds = make(map[uint16]uint64, len(e.nodes))
e.rounds = make(map[uint64]*Round)
e.maxRoundWindow = DefaultMaxRoundWindow
e.emptyVotes = make(map[uint64]*EmptyVoteSet)
e.eligibleNodeIDs = make(map[string]struct{}, len(e.nodes))
e.futureMessages = make(messagesFromNode, len(e.nodes))
e.replicationState = NewReplicationState(e.Logger, e.Comm, e.ID, e.maxRoundWindow, e.ReplicationEnabled, e.StartTime, &e.lock)
e.replicationState = NewReplicationState(e.Logger, e.Comm, e.ID, e.MaxRoundWindow, e.ReplicationEnabled, e.StartTime, &e.lock)
e.timeoutHandler = NewTimeoutHandler(e.Logger, "emptyVoteRebroadcast", e.StartTime, e.MaxRebroadcastWait, e.emptyVoteTimeoutTaskRunner)

for _, node := range e.nodes {
Expand Down Expand Up @@ -2563,7 +2562,7 @@ func (e *Epoch) voteOnBlock(block VerifiedBlock) (Vote, error) {
// deletesRounds deletes all the rounds before [round] in the rounds map.
func (e *Epoch) deleteRounds(round uint64) {
for i, r := range e.rounds {
if r.num+e.maxRoundWindow < round {
if r.num+e.MaxRoundWindow < round {
delete(e.rounds, i)
}
}
Expand Down Expand Up @@ -2753,12 +2752,12 @@ func (e *Epoch) handleReplicationRequest(req *ReplicationRequest, from NodeID) e
}
response := &VerifiedReplicationResponse{}

if len(req.Seqs) > int(e.maxRoundWindow) || len(req.Rounds) > int(e.maxRoundWindow) {
if len(req.Seqs) > int(e.MaxRoundWindow) || len(req.Rounds) > int(e.MaxRoundWindow) {
e.Logger.Info("Replication request exceeds maximum allowed seqs and rounds",
zap.Stringer("from", from),
zap.Int("num seqs", len(req.Seqs)),
zap.Int("num rounds", len(req.Rounds)),
zap.Uint64("max round window", e.maxRoundWindow))
zap.Uint64("max round window", e.MaxRoundWindow))
return nil
}

Expand Down Expand Up @@ -2928,14 +2927,14 @@ func (e *Epoch) handleReplicationResponse(resp *ReplicationResponse, from NodeID
nextSeqToCommit := e.nextSeqToCommit()

for _, data := range resp.Data {
if data.Finalization != nil && data.GetSequence() > nextSeqToCommit+e.maxRoundWindow {
if data.Finalization != nil && data.GetSequence() > nextSeqToCommit+e.MaxRoundWindow {
e.Logger.Debug("Received quorum round for a seq that is too far ahead", zap.Uint64("seq", data.GetSequence()))
// we are too far behind, we should ignore this message
continue
}

// We may be really far behind, so we shouldn't process sequences unless they are the nextSequenceToCommit
if data.GetRound() > e.round+e.maxRoundWindow && data.GetSequence() != nextSeqToCommit {
if data.GetRound() > e.round+e.MaxRoundWindow && data.GetSequence() != nextSeqToCommit {
e.Logger.Debug("Received quorum round for a round that is too far ahead", zap.Uint64("round", data.GetRound()))
// we are too far behind, we should ignore this message
continue
Expand Down Expand Up @@ -3150,12 +3149,12 @@ func (e *Epoch) getLatestVerifiedQuorumRound() *VerifiedQuorumRound {

// isRoundTooFarAhead returns true if [round] is more than `maxRoundWindow` rounds ahead of the current round.
func (e *Epoch) isRoundTooFarAhead(round uint64) bool {
return round > e.round+e.maxRoundWindow
return round > e.round+e.MaxRoundWindow
}

// isWithinMaxRoundWindow checks if [round] is within `maxRoundWindow` rounds ahead of the current round.
func (e *Epoch) isWithinMaxRoundWindow(round uint64) bool {
return e.round < round && round-e.round < e.maxRoundWindow
return e.round < round && round-e.round < e.MaxRoundWindow
}

func (e *Epoch) retrieveBlockOrHalt(seq uint64) (VerifiedBlock, Finalization, bool) {
Expand Down
61 changes: 61 additions & 0 deletions long_running_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package simplex_test

import (
"math"
"testing"

"github.com/ava-labs/simplex/testutil"
)

func TestLongRunningSimple(t *testing.T) {
net := testutil.NewDefaultLongRunningNetwork(t, 5)

net.StartInstances()
net.WaitForNodesToEnterRound(40)
net.StopAndAssert(false)
}

func TestLongRunningReplication(t *testing.T) {
net := testutil.NewDefaultLongRunningNetwork(t, 10)
for _, instance := range net.Instances {
instance.SilenceExceptKeywords("Received replication response", "Resending replication requests for missing rounds/sequences")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we're doing this. There is nothing in the test that I see that requires intercepting the log, so why do we care that it's printed?

Can you explain?

}
net.StartInstances()

net.WaitForNodesToEnterRound(40)
net.NoMoreBlocks()
net.DisconnectNodes(2)
net.ContinueBlocks()
net.WaitForNodesToEnterRound(70, 1, 3, 4, 5, 6)
net.DisconnectNodes(4)
net.WaitForNodesToEnterRound(90, 1, 3, 5, 6, 7, 8, 9)
net.ConnectNodes(2, 4)
net.WaitForNodesToEnterRound(150)
net.StopAndAssert(false)
}

func TestLongRunningCrash(t *testing.T) {
net := testutil.NewDefaultLongRunningNetwork(t, 10)
for i, instance := range net.Instances {
if i == 3 {
instance.SilenceExceptKeywords("WAL")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we're silencing the logs. Is that to make the test less flaky or something?

And why aren't we silencing the WAL pattern here?

continue
}
instance.Silence()
}

net.StartInstances()
net.WaitForNodesToEnterRound(30)
net.CrashNodes(3)
crashedNodeLatestBlock := net.Instances[3].Storage.NumBlocks()

net.WaitForNodesToEnterRound(80, 1, 2, 4, 5, 6, 7, 8, 9)
net.RestartNodes(3)

waitForRound := math.Max(float64(crashedNodeLatestBlock*2), 150)
net.WaitForNodesToEnterRound(uint64(waitForRound))
net.StopAndAssert(false)
}
4 changes: 4 additions & 0 deletions testutil/block_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,7 @@ func (t *testControlledBlockBuilder) BuildBlock(ctx context.Context, metadata si
}
return t.TestBlockBuilder.BuildBlock(ctx, metadata, blacklist)
}

func (t *testControlledBlockBuilder) ShouldBlockBeBuilt() bool {
return len(t.BlockShouldBeBuilt) > 0
}
56 changes: 56 additions & 0 deletions testutil/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package testutil

import (
"fmt"
"os"
"strings"
"testing"
Expand All @@ -16,6 +17,25 @@ import (
type TestLogger struct {
*zap.Logger
traceVerboseLogger *zap.Logger
panicOnError bool
panicOnWarn bool
}

// keywordFilterCore is a zapcore.Core wrapper that only logs entries whose
// message contains at least one of the configured keywords.
type keywordFilterCore struct {
zapcore.Core
keywords []string
}

func (k keywordFilterCore) Check(ent zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
for _, kw := range k.keywords {
if strings.Contains(ent.Message, kw) {
return k.Core.Check(ent, ce)
}
}
// If no keyword matches, drop the entry by returning ce unchanged.
return ce
}

func (t *TestLogger) Intercept(hook func(entry zapcore.Entry) error) {
Expand All @@ -30,6 +50,26 @@ func (t *TestLogger) Silence() {
t.traceVerboseLogger = zap.New(core, zap.AddCaller(), zap.IncreaseLevel(atomicLevel))
}

// SilenceExceptKeywords silences all logs EXCEPT those whose message contains
// at least one of the provided keywords.
func (t *TestLogger) SilenceExceptKeywords(keywords ...string) {
core := t.Logger.Core()
filteredCore := keywordFilterCore{
Core: core,
keywords: keywords,
}
t.Logger = zap.New(filteredCore, zap.AddCaller())
t.traceVerboseLogger = zap.New(filteredCore, zap.AddCaller())
}

func (t *TestLogger) SetPanicOnError(panicOnError bool) {
t.panicOnError = panicOnError
}

func (t *TestLogger) SetPanicOnWarn(panicOnWarn bool) {
t.panicOnWarn = panicOnWarn
}

func (tl *TestLogger) Trace(msg string, fields ...zap.Field) {
tl.traceVerboseLogger.Log(zapcore.DebugLevel, msg, fields...)
}
Expand All @@ -38,6 +78,22 @@ func (tl *TestLogger) Verbo(msg string, fields ...zap.Field) {
tl.traceVerboseLogger.Log(zapcore.DebugLevel, msg, fields...)
}

func (tl *TestLogger) Warn(msg string, fields ...zap.Field) {
if tl.panicOnWarn {
panicMsg := fmt.Sprintf("WARN: %s", msg)
panic(panicMsg)
}
tl.Logger.Warn(msg, fields...)
}

func (tl *TestLogger) Error(msg string, fields ...zap.Field) {
if tl.panicOnError {
panicMsg := fmt.Sprintf("ERROR: %s", msg)
panic(panicMsg)
}
tl.Logger.Error(msg, fields...)
}

func MakeLogger(t *testing.T, node ...int) *TestLogger {
defaultEncoderConfig := zapcore.EncoderConfig{
TimeKey: "timestamp",
Expand Down
Loading