Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions eth/bft/bft_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ func (b *Bfter) Vote(peer string, vote *types.Vote) error {
}

if verified {
b.broadcastCh <- vote
if !b.enqueueForBroadcast(vote) {
return nil
}
err = b.consensus.voteHandler(b.blockChainReader, vote)
if err != nil {
if _, ok := err.(*utils.ErrIncomingMessageRoundTooFarFromCurrentRound); ok {
Expand Down Expand Up @@ -129,7 +131,9 @@ func (b *Bfter) Timeout(peer string, timeout *types.Timeout) error {
log.Debug("[Timeout] Received Timeout", "gap", gapNum, "hash", timeout.Hash().Hex(), "round", timeout.Round, "signer", timeout.GetSigner().Hex()) //get signer after verifyTimeout

if verified {
b.broadcastCh <- timeout
if !b.enqueueForBroadcast(timeout) {
return nil
}
err = b.consensus.timeoutHandler(b.blockChainReader, timeout)
if err != nil {
if _, ok := err.(*utils.ErrIncomingMessageRoundNotEqualCurrentRound); ok {
Expand Down Expand Up @@ -170,7 +174,9 @@ func (b *Bfter) SyncInfo(peer string, syncInfo *types.SyncInfo) error {

// Process only if verified and qualified
if verified {
b.broadcastCh <- syncInfo
if !b.enqueueForBroadcast(syncInfo) {
return nil
}
err = b.consensus.syncInfoHandler(b.blockChainReader, syncInfo)
if err != nil {
log.Error("[SyncInfo] handle BFT SyncInfo", "error", err)
Expand All @@ -180,6 +186,15 @@ func (b *Bfter) SyncInfo(peer string, syncInfo *types.SyncInfo) error {
return nil
}

func (b *Bfter) enqueueForBroadcast(msg interface{}) bool {
select {
case <-b.quit:
return false
case b.broadcastCh <- msg:
return true
}
}

// Start Bft receiver
func (b *Bfter) Start() {
go b.loop()
Expand Down
75 changes: 75 additions & 0 deletions eth/bft/bft_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,78 @@ func TestTooFarSyncInfo(t *testing.T) {
t.Fatalf("count mismatch: have %v on verify, have %v on handler, %v on broadcast, want %v", verifyCounter, handlerCounter, broadcastCounter, targetSyncInfo)
}
}

func TestVoteReturnsAfterBftStop(t *testing.T) {
tester := newTester()
tester.bfter.consensus.verifyVote = func(chain consensus.ChainReader, vote *types.Vote) (bool, error) {
return true, nil
}
tester.bfter.consensus.voteHandler = func(chain consensus.ChainReader, vote *types.Vote) error {
return nil
}

tester.bfter.Stop()

vote := types.Vote{ProposedBlockInfo: &types.BlockInfo{Number: big.NewInt(1350)}}
done := make(chan struct{})
go func() {
_ = tester.bfter.Vote(peerID, &vote)
close(done)
}()

select {
case <-done:
case <-time.After(300 * time.Millisecond):
t.Fatal("Vote blocks after bft loop is stopped")
}
}

func TestTimeoutReturnsAfterBftStop(t *testing.T) {
tester := newTester()
tester.bfter.consensus.verifyTimeout = func(chain consensus.ChainReader, timeout *types.Timeout) (bool, error) {
return true, nil
}
tester.bfter.consensus.timeoutHandler = func(chain consensus.ChainReader, timeout *types.Timeout) error {
return nil
}

tester.bfter.Stop()

timeoutMsg := types.Timeout{GapNumber: 450}
done := make(chan struct{})
go func() {
_ = tester.bfter.Timeout(peerID, &timeoutMsg)
close(done)
}()

select {
case <-done:
case <-time.After(300 * time.Millisecond):
t.Fatal("Timeout blocks after bft loop is stopped")
}
}

func TestSyncInfoReturnsAfterBftStop(t *testing.T) {
tester := newTester()
tester.bfter.consensus.verifySyncInfo = func(chain consensus.ChainReader, syncInfo *types.SyncInfo) (bool, error) {
return true, nil
}
tester.bfter.consensus.syncInfoHandler = func(chain consensus.ChainReader, syncInfo *types.SyncInfo) error {
return nil
}

tester.bfter.Stop()

syncInfo := types.SyncInfo{HighestQuorumCert: &types.QuorumCert{ProposedBlockInfo: &types.BlockInfo{Number: big.NewInt(1350)}}}
done := make(chan struct{})
go func() {
_ = tester.bfter.SyncInfo(peerID, &syncInfo)
close(done)
}()

select {
case <-done:
case <-time.After(300 * time.Millisecond):
t.Fatal("SyncInfo blocks after bft loop is stopped")
}
}
Loading