Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ func (pool *BlockPool) PeekTwoBlocks() (first, second *types.Block) {

// PopRequest pops the first block at pool.height.
// It must have been validated by the second Commit from PeekTwoBlocks.
// TODO(thane): (?) and its corresponding ExtendedCommit.
func (pool *BlockPool) PopRequest() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
Expand Down
27 changes: 16 additions & 11 deletions internal/blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ func setup(
require.True(t, numNodes >= 1,
"must specify at least one block height (nodes)")

logger, _ := log.NewDefaultLogger("plain", "info")
rts := &reactorTestSuite{
logger: log.NewNopLogger().With("module", "block_sync", "testCase", t.Name()),
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
logger: logger.With("module", "block_sync", "testCase", t.Name()),
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}),
nodes: make([]types.NodeID, 0, numNodes),
reactors: make(map[types.NodeID]*Reactor, numNodes),
app: make(map[types.NodeID]abciclient.Client, numNodes),
Expand All @@ -72,11 +73,15 @@ func setup(
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numNodes),
}

chDesc := &p2p.ChannelDescriptor{ID: BlockSyncChannel, MessageType: new(bcproto.Message)}
chDesc := &p2p.ChannelDescriptor{
ID: BlockSyncChannel,
MessageType: new(bcproto.Message),
RecvBufferCapacity: 32,
}
rts.blockSyncChannels = rts.network.MakeChannelsNoCleanup(t, chDesc)

i := 0
for nodeID := range rts.network.Nodes {
for _, nodeID := range rts.network.NodeIDs() {
rts.addNode(ctx, t, nodeID, genDoc, privVal, maxBlockHeights[i])
rts.reactors[nodeID].SetChannel(rts.blockSyncChannels[nodeID])
i++
Expand Down Expand Up @@ -183,7 +188,7 @@ func (rts *reactorTestSuite) addNode(

rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
rts.peerUpdates[nodeID] = p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.network.Nodes[nodeID].PeerManager.Register(ctx, rts.peerUpdates[nodeID])
rts.network.Node(nodeID).PeerManager.Register(ctx, rts.peerUpdates[nodeID])

peerEvents := func(ctx context.Context) *p2p.PeerUpdates { return rts.peerUpdates[nodeID] }
restartChan := make(chan struct{})
Expand All @@ -195,7 +200,7 @@ func (rts *reactorTestSuite) addNode(
t,
genDoc,
peerEvents,
rts.network.Nodes[nodeID].PeerManager,
rts.network.Node(nodeID).PeerManager,
restartChan,
config.DefaultSelfRemediationConfig(),
)
Expand Down Expand Up @@ -253,9 +258,9 @@ func makeNextBlock(ctx context.Context,
return block, blockID, partSet, seenCommit
}

func (rts *reactorTestSuite) start(ctx context.Context, t *testing.T) {
func (rts *reactorTestSuite) start(t *testing.T) {
t.Helper()
rts.network.Start(ctx, t)
rts.network.Start(t)
require.Len(t,
rts.network.RandomNode().PeerManager.Peers(),
len(rts.nodes)-1,
Expand All @@ -277,7 +282,7 @@ func TestReactor_AbruptDisconnect(t *testing.T) {

require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())

rts.start(ctx, t)
rts.start(t)

secondaryPool := rts.reactors[rts.nodes[1]].pool

Expand All @@ -298,7 +303,7 @@ func TestReactor_AbruptDisconnect(t *testing.T) {
Status: p2p.PeerStatusDown,
NodeID: rts.nodes[0],
}
rts.network.Nodes[rts.nodes[1]].PeerManager.Disconnected(ctx, rts.nodes[0])
rts.network.Node(rts.nodes[1]).PeerManager.Disconnected(ctx, rts.nodes[0])
}

func TestReactor_SyncTime(t *testing.T) {
Expand All @@ -314,7 +319,7 @@ func TestReactor_SyncTime(t *testing.T) {

rts := setup(ctx, t, genDoc, privVals[0], []int64{maxBlockHeight, 0})
require.Equal(t, maxBlockHeight, rts.reactors[rts.nodes[0]].store.Height())
rts.start(ctx, t)
rts.start(t)

require.Eventually(
t,
Expand Down
22 changes: 12 additions & 10 deletions internal/consensus/invalid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,26 @@ import (
)

func TestReactorInvalidPrecommit(t *testing.T) {
t.Skip("test doesn't check anything useful")
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()

config := configSetup(t)

const n = 2
const n = 4
states, cleanup := makeConsensusState(ctx, t,
config, n, "consensus_reactor_test",
newMockTickerFunc(true))
t.Cleanup(cleanup)

for i := 0; i < n; i++ {
for i := range n {
ticker := NewTimeoutTicker(states[i].logger)
states[i].SetTimeoutTicker(ticker)
}

rts := setup(ctx, t, n, states, 100) // buffer must be large enough to not deadlock
t.Logf("setup()")
rts := setup(ctx, t, n, states, 1) // buffer must be large enough to not deadlock
t.Logf("setup() done")

for _, reactor := range rts.reactors {
state := reactor.state.GetState()
Expand All @@ -58,16 +61,15 @@ func TestReactorInvalidPrecommit(t *testing.T) {
privVal := byzState.privValidator
byzState.doPrevote = func(ctx context.Context, height int64, round int32) {
defer close(signal)
invalidDoPrevoteFunc(ctx, t, height, round, byzState, byzReactor, rts.voteChannels[node.NodeID], privVal)
invalidDoPrevoteFunc(ctx, t, byzState, byzReactor, rts.voteChannels[node.NodeID], privVal)
}
byzState.mtx.Unlock()

// wait for a bunch of blocks
//
t.Log("wait for a bunch of blocks")
// TODO: Make this tighter by ensuring the halt happens by block 2.
var wg sync.WaitGroup

for i := 0; i < 10; i++ {
for range 10 {
for _, sub := range rts.subs {
wg.Add(1)

Expand All @@ -77,6 +79,7 @@ func TestReactorInvalidPrecommit(t *testing.T) {
if ctx.Err() != nil {
return
}
t.Log("BLOCK")
if !assert.NoError(t, err) {
cancel() // cancel other subscribers on failure
}
Expand Down Expand Up @@ -104,8 +107,6 @@ func TestReactorInvalidPrecommit(t *testing.T) {
func invalidDoPrevoteFunc(
ctx context.Context,
t *testing.T,
height int64,
round int32,
cs *State,
r *Reactor,
voteCh *p2p.Channel,
Expand Down Expand Up @@ -136,7 +137,8 @@ func invalidDoPrevoteFunc(
Type: tmproto.PrecommitType,
BlockID: types.BlockID{
Hash: blockHash,
PartSetHeader: types.PartSetHeader{Total: 1, Hash: tmrand.Bytes(32)}},
PartSetHeader: types.PartSetHeader{Total: 1, Hash: tmrand.Bytes(32)},
},
}

p := precommit.ToProto()
Expand Down
7 changes: 4 additions & 3 deletions internal/consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func setup(
t.Helper()

rts := &reactorTestSuite{
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numNodes}),
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numNodes}),
states: make(map[types.NodeID]*State),
reactors: make(map[types.NodeID]*Reactor, numNodes),
subs: make(map[types.NodeID]eventbus.Subscription, numNodes),
Expand All @@ -87,7 +87,8 @@ func setup(
rts.voteSetBitsChannels = rts.network.MakeChannelsNoCleanup(t, chDesc(VoteSetBitsChannel, size))

i := 0
for nodeID, node := range rts.network.Nodes {
for _, node := range rts.network.Nodes() {
nodeID := node.NodeID
state := states[i]

reactor := NewReactor(
Expand Down Expand Up @@ -139,7 +140,7 @@ func setup(
require.Len(t, rts.reactors, numNodes)

// start the in-memory network and connect all peers with each other
rts.network.Start(ctx, t)
rts.network.Start(t)

t.Cleanup(leaktest.Check(t))

Expand Down
39 changes: 20 additions & 19 deletions internal/evidence/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe
rts := &reactorTestSuite{
numStateStores: numStateStores,
logger: log.NewNopLogger().With("testCase", t.Name()),
network: p2ptest.MakeNetwork(ctx, t, p2ptest.NetworkOptions{NumNodes: numStateStores}),
network: p2ptest.MakeNetwork(t, p2ptest.NetworkOptions{NumNodes: numStateStores}),
reactors: make(map[types.NodeID]*evidence.Reactor, numStateStores),
pools: make(map[types.NodeID]*evidence.Pool, numStateStores),
peerUpdates: make(map[types.NodeID]*p2p.PeerUpdates, numStateStores),
Expand All @@ -74,7 +74,8 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe
idx := 0
evidenceTime := time.Date(2019, 1, 1, 0, 0, 0, 0, time.UTC)

for nodeID := range rts.network.Nodes {
for _, node := range rts.network.Nodes() {
nodeID := node.NodeID
logger := rts.logger.With("validator", idx)
evidenceDB := dbm.NewMemDB()
blockStore := &mocks.BlockStore{}
Expand All @@ -97,8 +98,8 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe
rts.peerChans[nodeID] = make(chan p2p.PeerUpdate)
pu := p2p.NewPeerUpdates(rts.peerChans[nodeID], 1)
rts.peerUpdates[nodeID] = pu
rts.network.Nodes[nodeID].PeerManager.Register(ctx, pu)
rts.nodes = append(rts.nodes, rts.network.Nodes[nodeID])
node.PeerManager.Register(ctx, pu)
rts.nodes = append(rts.nodes, node)

rts.reactors[nodeID] = evidence.NewReactor(
logger,
Expand Down Expand Up @@ -127,8 +128,8 @@ func setup(ctx context.Context, t *testing.T, stateStores []sm.Store) *reactorTe
return rts
}

func (rts *reactorTestSuite) start(ctx context.Context, t *testing.T) {
rts.network.Start(ctx, t)
func (rts *reactorTestSuite) start(t *testing.T) {
rts.network.Start(t)
require.Len(t,
rts.network.RandomNode().PeerManager.Peers(),
rts.numStateStores-1,
Expand Down Expand Up @@ -215,7 +216,7 @@ func createEvidenceList(

evList := make([]types.Evidence, numEvidence)

for i := 0; i < numEvidence; i++ {
for i := range numEvidence {
ev, err := types.NewMockDuplicateVoteEvidenceWithValidator(
ctx,
int64(i+1),
Expand Down Expand Up @@ -251,7 +252,7 @@ func TestReactorMultiDisconnect(t *testing.T) {

require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusDown)

rts.start(ctx, t)
rts.start(t)

require.Equal(t, primary.PeerManager.Status(secondary.NodeID), p2p.PeerStatusUp)
// Ensure "disconnecting" the secondary peer from the primary more than once
Expand Down Expand Up @@ -283,13 +284,13 @@ func TestReactorBroadcastEvidence(t *testing.T) {
// We need all validators saved for heights at least as high as we have
// evidence for.
height := int64(numEvidence) + 10
for i := 0; i < numPeers; i++ {
for i := range numPeers {
stateDBs[i] = initializeValidatorState(ctx, t, val, height)
}

rts := setup(ctx, t, stateDBs)

rts.start(ctx, t)
rts.start(t)

// Create a series of fixtures where each suite contains a reactor and
// evidence pool. In addition, we mark a primary suite and the rest are
Expand All @@ -299,13 +300,13 @@ func TestReactorBroadcastEvidence(t *testing.T) {
primary := rts.network.RandomNode()
secondaries := make([]*p2ptest.Node, 0, len(rts.network.NodeIDs())-1)
secondaryIDs := make([]types.NodeID, 0, cap(secondaries))
for id := range rts.network.Nodes {
if id == primary.NodeID {
for _, node := range rts.network.Nodes() {
if node.NodeID == primary.NodeID {
continue
}

secondaries = append(secondaries, rts.network.Nodes[id])
secondaryIDs = append(secondaryIDs, id)
secondaries = append(secondaries, node)
secondaryIDs = append(secondaryIDs, node.NodeID)
}

evList := createEvidenceList(ctx, t, rts.pools[primary.NodeID], val, numEvidence)
Expand Down Expand Up @@ -345,7 +346,7 @@ func TestReactorBroadcastEvidence_Lagging(t *testing.T) {
stateDB2 := initializeValidatorState(ctx, t, val, height2)

rts := setup(ctx, t, []sm.Store{stateDB1, stateDB2})
rts.start(ctx, t)
rts.start(t)

primary := rts.nodes[0]
secondary := rts.nodes[1]
Expand Down Expand Up @@ -393,7 +394,7 @@ func TestReactorBroadcastEvidence_Pending(t *testing.T) {
// the secondary should have half the evidence as pending
require.Equal(t, numEvidence/2, int(rts.pools[secondary.NodeID].Size()))

rts.start(ctx, t)
rts.start(t)

// The secondary reactor should have received all the evidence ignoring the
// already pending evidence.
Expand Down Expand Up @@ -446,7 +447,7 @@ func TestReactorBroadcastEvidence_Committed(t *testing.T) {
require.Equal(t, 0, int(rts.pools[secondary.NodeID].Size()))

// start the network and ensure it's configured
rts.start(ctx, t)
rts.start(t)

// The secondary reactor should have received all the evidence ignoring the
// already committed evidence.
Expand All @@ -471,12 +472,12 @@ func TestReactorBroadcastEvidence_FullyConnected(t *testing.T) {
// We need all validators saved for heights at least as high as we have
// evidence for.
height := int64(numEvidence) + 10
for i := 0; i < numPeers; i++ {
for i := range numPeers {
stateDBs[i] = initializeValidatorState(ctx, t, val, height)
}

rts := setup(ctx, t, stateDBs)
rts.start(ctx, t)
rts.start(t)

evList := createEvidenceList(ctx, t, rts.pools[rts.network.RandomNode().NodeID], val, numEvidence)

Expand Down
Loading