Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions internal/consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,7 @@ func (cs *State) OnStart(ctx context.Context) error {

// we need the timeoutRoutine for replay so
// we don't block on the tick chan.
// NOTE: we will get a build up of garbage go routines
// firing on the tockChan until the receiveRoutine is started
// to deal with them (by that point, at most one will be valid)
cs.Spawn("timeoutTicker", cs.timeoutTicker.Run)
cs.SpawnCritical("timeoutTicker", cs.timeoutTicker.Run)

// We may have lost some votes if the process crashed reload from consensus
// log to catchup.
Expand Down
38 changes: 24 additions & 14 deletions internal/consensus/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ import (
"github.com/tendermint/tendermint/libs/utils/scope"
)

var (
tickTockBufferSize = 10
)

// TimeoutTicker is a timer that schedules timeouts
// conditional on the height/round/step in the timeoutInfo.
// The timeoutInfo.Duration may be non-positive.
Expand All @@ -36,7 +32,7 @@ func NewTimeoutTicker(logger log.Logger) TimeoutTicker {
tt := &timeoutTicker{
logger: logger,
tick: utils.NewAtomicWatch(utils.None[timeoutInfo]()),
tockChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo),
}
return tt
}
Expand All @@ -60,19 +56,33 @@ func (t *timeoutTicker) ScheduleTimeout(newti timeoutInfo) {
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (t *timeoutTicker) Run(ctx context.Context) error {
tock := utils.NewAtomicWatch(utils.None[timeoutInfo]()) // last fired timeout
return scope.Run(ctx, func(ctx context.Context, s scope.Scope) error {
return t.tick.Iter(ctx, func(ctx context.Context, mti utils.Option[timeoutInfo]) error {
ti, ok := mti.Get()
s.Spawn(func() error {
// Task measuring timeouts.
return t.tick.Iter(ctx, func(ctx context.Context, mti utils.Option[timeoutInfo]) error {
ti, ok := mti.Get()
if !ok {
return nil
}
t.logger.Debug("Internal state machine timeout scheduled", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
if err := utils.Sleep(ctx, ti.Duration); err != nil {
return err
}
t.logger.Debug("Internal state machine timeout elapsed ", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
tock.Store(utils.Some(ti))
return nil
})
})
// Task reporting timeouts via channel.
// TODO(gprusak): it would be better to expose t.tock directly,
// however the receiving task doesn't support receiving from AtomicWatch yet.
return tock.Iter(ctx, func(ctx context.Context, mto utils.Option[timeoutInfo]) error {
to, ok := mto.Get()
if !ok {
return nil
}
t.logger.Debug("Internal state machine timeout scheduled", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
if err := utils.Sleep(ctx, ti.Duration); err != nil {
return err
}
t.logger.Debug("Internal state machine timeout elapsed ", "duration", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
s.Spawn(func() error { return utils.Send(ctx, t.tockChan, ti) })
return nil
return utils.Send(ctx, t.tockChan, to)
})
})
}
62 changes: 62 additions & 0 deletions internal/consensus/ticker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package consensus

import (
"context"
"fmt"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/utils"
"github.com/tendermint/tendermint/libs/utils/scope"
"testing"
"time"
)

func TestTicker(t *testing.T) {
err := scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
logger, _ := log.NewDefaultLogger("plain", "debug")
ticker := NewTimeoutTicker(logger)
ch := ticker.Chan()
s.SpawnBg(func() error {
err := ticker.Run(ctx)
if ctx.Err() == nil {
return fmt.Errorf("ticker terminated with %v, before the test ended", err)
}
return utils.IgnoreCancel(err)
})
if got := len(ch); got > 0 {
return fmt.Errorf("expected empty, got len=%v", got)
}

t.Log("Fill the channel.")
h := int64(0)
for h < int64(cap(ch)) {
h += 1
ticker.ScheduleTimeout(timeoutInfo{Height: h, Duration: 0})
for len(ch) < int(h) {
if err := utils.Sleep(ctx, 10*time.Millisecond); err != nil {
return err
}
}
}
t.Log("Add a bunch of timeouts blindly.")
for range 3 {
h += 1
ticker.ScheduleTimeout(timeoutInfo{Height: h, Duration: 0})
if err := utils.Sleep(ctx, 100*time.Millisecond); err != nil {
return err
}
}
t.Log("Await the latest timeout")
for {
got, err := utils.Recv(ctx, ch)
if err != nil {
return err
}
if got.Height == h {
return nil
}
}
})
if err != nil {
t.Fatal(err)
}
}
22 changes: 13 additions & 9 deletions internal/p2p/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,10 @@ func TestRouter_Channel_Error(t *testing.T) {
})
}

func waitUntilDone(args mock.Arguments) {
<-args.Get(0).(context.Context).Done()
}

func TestRouter_AcceptPeers(t *testing.T) {
testcases := map[string]struct {
peerInfo types.NodeInfo
Expand Down Expand Up @@ -380,7 +384,7 @@ func TestRouter_AcceptPeers(t *testing.T) {

mockTransport := &mocks.Transport{}
mockTransport.On("Accept", mock.Anything).Once().Return(mockConnection, nil)
mockTransport.On("Accept", mock.Anything).Maybe().Return(nil, context.Canceled)
mockTransport.On("Accept", mock.Anything).Maybe().Run(waitUntilDone).Return(nil, context.Canceled)
mockTransport.On("Run", mock.Anything).Return(nil)

// Set up and start the router.
Expand Down Expand Up @@ -436,7 +440,7 @@ func TestRouter_AcceptPeers_Errors(t *testing.T) {
// Set up a mock transport that returns io.EOF once, which should prevent
// the router from calling Accept again.
mockTransport := &mocks.Transport{}
mockTransport.On("Accept", mock.Anything).Once().Return(nil, context.Canceled)
mockTransport.On("Accept", mock.Anything).Once().Run(waitUntilDone).Return(nil, context.Canceled)
mockTransport.On("Run", mock.Anything).Return(nil)

// Set up and start the router.
Expand Down Expand Up @@ -488,7 +492,7 @@ func TestRouter_AcceptPeers_HeadOfLineBlocking(t *testing.T) {
mockTransport.On("Accept", mock.Anything).Times(3).Run(func(_ mock.Arguments) {
acceptCh <- true
}).Return(mockConnection, nil)
mockTransport.On("Accept", mock.Anything).Once().Return(nil, context.Canceled)
mockTransport.On("Accept", mock.Anything).Once().Run(waitUntilDone).Return(nil, context.Canceled)
mockTransport.On("Run", mock.Anything).Return(nil)

// Set up and start the router.
Expand Down Expand Up @@ -570,7 +574,7 @@ func TestRouter_DialPeers(t *testing.T) {

mockTransport := &mocks.Transport{}
mockTransport.On("Run", mock.Anything).Return(nil)
mockTransport.On("Accept", mock.Anything).Maybe().Return(nil, context.Canceled)
mockTransport.On("Accept", mock.Anything).Maybe().Run(waitUntilDone).Return(nil, context.Canceled)
if tc.dialErr == nil {
mockTransport.On("Dial", mock.Anything, endpoint).Once().Return(mockConnection, nil)
// This handles the retry when a dialed connection gets closed after ReceiveMessage
Expand Down Expand Up @@ -649,7 +653,7 @@ func TestRouter_DialPeers_Parallel(t *testing.T) {

mockTransport := &mocks.Transport{}
mockTransport.On("Run", mock.Anything).Return(nil)
mockTransport.On("Accept", mock.Anything).Once().Return(nil, context.Canceled)
mockTransport.On("Accept", mock.Anything).Once().Run(waitUntilDone).Return(nil, context.Canceled)
for _, address := range []p2p.NodeAddress{a, b, c} {
endpoint := p2p.Endpoint{Protocol: address.Protocol, Path: string(address.NodeID)}
mockTransport.On("Dial", mock.Anything, endpoint).Run(func(_ mock.Arguments) {
Expand Down Expand Up @@ -736,7 +740,7 @@ func TestRouter_EvictPeers(t *testing.T) {

mockTransport := &mocks.Transport{}
mockTransport.On("Accept", mock.Anything).Once().Return(mockConnection, nil)
mockTransport.On("Accept", mock.Anything).Maybe().Return(nil, context.Canceled)
mockTransport.On("Accept", mock.Anything).Maybe().Run(waitUntilDone).Return(nil, context.Canceled)
mockTransport.On("Run", mock.Anything).Return(nil)

// Set up and start the router.
Expand Down Expand Up @@ -798,7 +802,7 @@ func TestRouter_ChannelCompatability(t *testing.T) {
mockTransport := &mocks.Transport{}
mockTransport.On("Run", mock.Anything).Return(nil)
mockTransport.On("Accept", mock.Anything).Once().Return(mockConnection, nil)
mockTransport.On("Accept", mock.Anything).Once().Return(nil, context.Canceled)
mockTransport.On("Accept", mock.Anything).Once().Run(waitUntilDone).Return(nil, context.Canceled)

// Set up and start the router.
peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics())
Expand Down Expand Up @@ -847,7 +851,7 @@ func TestRouter_DontSendOnInvalidChannel(t *testing.T) {
mockTransport := &mocks.Transport{}
mockTransport.On("AddChannelDescriptors", mock.Anything).Return()
mockTransport.On("Accept", mock.Anything).Once().Return(mockConnection, nil)
mockTransport.On("Accept", mock.Anything).Maybe().Return(nil, context.Canceled)
mockTransport.On("Accept", mock.Anything).Maybe().Run(waitUntilDone).Return(nil, context.Canceled)
mockTransport.On("Run", mock.Anything).Return(nil)

// Set up and start the router.
Expand Down Expand Up @@ -912,7 +916,7 @@ func TestRouter_Channel_FilterByID(t *testing.T) {
mockTransport.On("AddChannelDescriptors", mock.Anything).Return()
mockTransport.On("String").Maybe().Return("mock")
mockTransport.On("Accept", mock.Anything).Once().Return(mockConnection, nil)
mockTransport.On("Accept", mock.Anything).Maybe().Return(nil, context.Canceled)
mockTransport.On("Accept", mock.Anything).Maybe().Run(waitUntilDone).Return(nil, context.Canceled)
mockTransport.On("Run", mock.Anything).Return(nil)

peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{}, p2p.NopMetrics())
Expand Down
11 changes: 9 additions & 2 deletions libs/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"errors"
"fmt"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/utils"
Expand Down Expand Up @@ -165,6 +166,10 @@ func (bs *BaseService) Spawn(name string, task func(ctx context.Context) error)
}()
}

// Spawns a critical task which should run until success OR as long as the service is running.
// It panics in any of the following cases:
// * task returns context.Canceled BEFORE the service is canceled.
// * task returns an error other than context.Canceled.
func (bs *BaseService) SpawnCritical(name string, task func(ctx context.Context) error) {
inner := bs.inner.Load()
if inner == nil {
Expand All @@ -174,8 +179,10 @@ func (bs *BaseService) SpawnCritical(name string, task func(ctx context.Context)
inner.wg.Add(1)
go func() {
defer inner.wg.Done()
if err := utils.IgnoreCancel(task(inner.ctx)); err != nil {
panic(fmt.Sprintf("critical task failed: name=%v, service=%v: %v", name, bs.name, err))
if err := task(inner.ctx); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opinions coming up:
I am honestly against the entire pattern that tendermint seems to follow: the less than optimal abstractions like BaseService that only serve to convolute the logic, reduce readability and make it easy for footguns to materialise. This approach is an anti-pattern in Golang as far as I understand the general Go community sentiment. A lot of it seem to have been stemmed from "Java Best Practices" that do not translate well in Go.

Go is supposed to be boring to read, simple if and elses.

I hope that at Sei we can move away from this kind of abstraction in favour simple boring code that puts readability first.

if !errors.Is(err, context.Canceled) || inner.ctx.Err() == nil {
panic(fmt.Sprintf("critical task failed: name=%v, service=%v: %v", name, bs.name, err))
}
}
}()
}
Expand Down