Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 7 additions & 41 deletions consensus/XDPoS/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,7 @@ const (
statusObservernode AccountRewardStatus = "ObserverNode"
)

type MessageStatus map[string]map[string]interface{}

type SyncInfoTypes struct {
Hash common.Hash `json:"hash"`
QCSigners int `json:"qcSigners"`
TCSigners int `json:"tcSigners"`
}

type PoolStatus struct {
Vote map[string]SignerTypes `json:"vote"`
Timeout map[string]SignerTypes `json:"timeout"`
SyncInfo map[string]SyncInfoTypes `json:"syncInfo"`
}
type MessageStatus map[string]map[string]SignerTypes

// GetSnapshot retrieves the state snapshot at a given block.
func (api *API) GetSnapshot(number *rpc.BlockNumber) (*utils.PublicApiSnapshot, error) {
Expand Down Expand Up @@ -242,40 +230,18 @@ func (api *API) GetMasternodesByNumber(number *rpc.BlockNumber) MasternodesStatu
}

// Get current vote pool and timeout pool content and missing messages
func (api *API) GetLatestPoolStatus() PoolStatus {
func (api *API) GetLatestPoolStatus() MessageStatus {
header := api.chain.CurrentHeader()
masternodes := api.XDPoS.EngineV2.GetMasternodes(api.chain, header)

receivedVotes := api.XDPoS.EngineV2.ReceivedVotes()
receivedTimeouts := api.XDPoS.EngineV2.ReceivedTimeouts()
receivedSyncInfo := api.XDPoS.EngineV2.ReceivedSyncInfo()

info := PoolStatus{}
info.Vote = make(map[string]SignerTypes)
info.Timeout = make(map[string]SignerTypes)
info.SyncInfo = make(map[string]SyncInfoTypes)

calculateSigners(info.Vote, receivedVotes, masternodes)
calculateSigners(info.Timeout, receivedTimeouts, masternodes)
info := make(MessageStatus)
info["vote"] = make(map[string]SignerTypes)
info["timeout"] = make(map[string]SignerTypes)

for name, objs := range receivedSyncInfo {
for _, obj := range objs {
syncInfo := obj.(*types.SyncInfo)
hash := syncInfo.Hash()
key := name + ":" + hash.Hex()

qcSigners := len(syncInfo.HighestQuorumCert.Signatures)
tcSigners := 0
if syncInfo.HighestTimeoutCert != nil {
tcSigners = len(syncInfo.HighestTimeoutCert.Signatures)
}
info.SyncInfo[key] = SyncInfoTypes{
Hash: hash,
QCSigners: qcSigners,
TCSigners: tcSigners,
}
}
}
calculateSigners(info["vote"], receivedVotes, masternodes)
calculateSigners(info["timeout"], receivedTimeouts, masternodes)

return info
Comment on lines 232 to 246
}
Expand Down
209 changes: 193 additions & 16 deletions consensus/XDPoS/engines/engine_v2/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package engine_v2

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"os"
"path/filepath"
"runtime"
"strconv"
"sync"
"time"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/params"
"github.com/XinFinOrg/XDPoSChain/trie"
"golang.org/x/sync/errgroup"
)

type XDPoS_v2 struct {
Expand Down Expand Up @@ -60,7 +63,6 @@ type XDPoS_v2 struct {

timeoutPool *utils.Pool
votePool *utils.Pool
syncInfoPool *utils.Pool
currentRound types.Round
highestSelfMinedRound types.Round
highestVotedRound types.Round
Expand Down Expand Up @@ -92,7 +94,6 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i

timeoutPool := utils.NewPool()
votePool := utils.NewPool()
syncInfoPool := utils.NewPool()
engine := &XDPoS_v2{
chainConfig: chainConfig,

Expand All @@ -112,9 +113,8 @@ func New(chainConfig *params.ChainConfig, db ethdb.Database, minePeriodCh chan i

round2epochBlockInfo: lru.NewCache[types.Round, *types.BlockInfo](utils.InMemoryRound2Epochs),

timeoutPool: timeoutPool,
votePool: votePool,
syncInfoPool: syncInfoPool,
timeoutPool: timeoutPool,
votePool: votePool,

highestSelfMinedRound: types.Round(0),

Expand Down Expand Up @@ -647,6 +647,147 @@ func (x *XDPoS_v2) VerifyHeaders(chain consensus.ChainReader, headers []*types.H
}()
}

/*
SyncInfo workflow
*/
// Verify syncInfo and trigger process QC or TC if successful
func (x *XDPoS_v2) VerifySyncInfoMessage(chain consensus.ChainReader, syncInfo *types.SyncInfo) (bool, error) {
/*
1. Check QC and TC against highest QC TC. Skip if none of them need to be updated
2. Verify items including:
- verifyQC
- verifyTC
3. Broadcast(Not part of consensus)
*/

if (x.highestQuorumCert.ProposedBlockInfo.Round >= syncInfo.HighestQuorumCert.ProposedBlockInfo.Round) && (x.highestTimeoutCert.Round >= syncInfo.HighestTimeoutCert.Round) {
log.Debug("[VerifySyncInfoMessage] Round from incoming syncInfo message is no longer qualified", "Highest QC Round", x.highestQuorumCert.ProposedBlockInfo.Round, "Incoming SyncInfo QC Round", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round, "highestTimeoutCert Round", x.highestTimeoutCert.Round, "Incoming syncInfo TC Round", syncInfo.HighestTimeoutCert.Round)
return false, nil
}

err := x.verifyQC(chain, syncInfo.HighestQuorumCert, nil)
if err != nil {
log.Warn("[VerifySyncInfoMessage] SyncInfo message verification failed due to QC", "blockNum", syncInfo.HighestQuorumCert.ProposedBlockInfo.Number, "round", syncInfo.HighestQuorumCert.ProposedBlockInfo.Round, "error", err)
return false, err
}
err = x.verifyTC(chain, syncInfo.HighestTimeoutCert)
if err != nil {
log.Warn("[VerifySyncInfoMessage] SyncInfo message verification failed due to TC", "gapNum", syncInfo.HighestTimeoutCert.GapNumber, "round", syncInfo.HighestTimeoutCert.Round, "error", err)
return false, err
Comment on lines +663 to +676
}
return true, nil
}

func (x *XDPoS_v2) SyncInfoHandler(chain consensus.ChainReader, syncInfo *types.SyncInfo) error {
x.lock.Lock()
defer x.lock.Unlock()
/*
1. processQC
2. processTC
*/
err := x.processQC(chain, syncInfo.HighestQuorumCert)
if err != nil {
return err
}
return x.processTC(chain, syncInfo.HighestTimeoutCert)
}

/*
Vote workflow
*/
func (x *XDPoS_v2) VerifyVoteMessage(chain consensus.ChainReader, vote *types.Vote) (bool, error) {
/*
1. Check vote round with current round for fast fail(disqualifed)
2. Get masterNode list from snapshot by using vote.GapNumber
3. Check signature:
- Use ecRecover to get the public key
- Use the above public key to find out the xdc address
- Use the above xdc address to check against the master node list from step 1(For the running epoch)
4. Broadcast(Not part of consensus)
*/
if vote.ProposedBlockInfo.Round < x.currentRound {
log.Debug("[VerifyVoteMessage] Disqualified vote message as the proposed round does not match currentRound", "voteHash", vote.Hash(), "voteProposedBlockInfoRound", vote.ProposedBlockInfo.Round, "currentRound", x.currentRound)
return false, nil
}

snapshot, err := x.getSnapshot(chain, vote.GapNumber, true)
if err != nil {
log.Error("[VerifyVoteMessage] fail to get snapshot for a vote message", "blockNum", vote.ProposedBlockInfo.Number, "blockHash", vote.ProposedBlockInfo.Hash, "voteHash", vote.Hash(), "error", err.Error())
return false, err
}
verified, signer, err := x.verifyMsgSignature(types.VoteSigHash(&types.VoteForSign{
ProposedBlockInfo: vote.ProposedBlockInfo,
GapNumber: vote.GapNumber,
}), vote.Signature, snapshot.NextEpochCandidates)
if err != nil {
for i, mn := range snapshot.NextEpochCandidates {
log.Warn("[VerifyVoteMessage] Master node list item", "index", i, "Master node", mn.Hex())
}
log.Warn("[VerifyVoteMessage] Error while verifying vote message", "votedBlockNum", vote.ProposedBlockInfo.Number.Uint64(), "votedBlockHash", vote.ProposedBlockInfo.Hash.Hex(), "voteHash", vote.Hash(), "error", err.Error())
return false, err
}
vote.SetSigner(signer)

return verified, nil
}

// Consensus entry point for processing vote message to produce QC
func (x *XDPoS_v2) VoteHandler(chain consensus.ChainReader, voteMsg *types.Vote) error {
x.lock.Lock()
defer x.lock.Unlock()
return x.voteHandler(chain, voteMsg)
}

/*
Timeout workflow
*/
// Verify timeout message type from peers in bft.go
/*
1. Get master node list by timeout msg round
2. Check signature:
- Use ecRecover to get the public key
- Use the above public key to find out the xdc address
- Use the above xdc address to check against the master node list from step 1(For the running epoch)
3. Broadcast(Not part of consensus)
*/
func (x *XDPoS_v2) VerifyTimeoutMessage(chain consensus.ChainReader, timeoutMsg *types.Timeout) (bool, error) {
if timeoutMsg.Round < x.currentRound {
log.Debug("[VerifyTimeoutMessage] Disqualified timeout message as the proposed round does not match currentRound", "timeoutHash", timeoutMsg.Hash(), "timeoutRound", timeoutMsg.Round, "currentRound", x.currentRound)
return false, nil
}
snap, err := x.getSnapshot(chain, timeoutMsg.GapNumber, true)
if err != nil || snap == nil {
log.Error("[VerifyTimeoutMessage] Fail to get snapshot when verifying timeout message!", "messageGapNumber", timeoutMsg.GapNumber, "err", err)
return false, err
}
if len(snap.NextEpochCandidates) == 0 {
log.Error("[VerifyTimeoutMessage] cannot find NextEpochCandidates from snapshot", "messageGapNumber", timeoutMsg.GapNumber)
return false, errors.New("empty master node lists from snapshot")
}

verified, signer, err := x.verifyMsgSignature(types.TimeoutSigHash(&types.TimeoutForSign{
Round: timeoutMsg.Round,
GapNumber: timeoutMsg.GapNumber,
}), timeoutMsg.Signature, snap.NextEpochCandidates)

if err != nil {
log.Warn("[VerifyTimeoutMessage] cannot verify timeout signature", "err", err)
return false, err
}

timeoutMsg.SetSigner(signer)
return verified, nil
}

/*
Entry point for handling timeout message to process below:
*/
func (x *XDPoS_v2) TimeoutHandler(blockChainReader consensus.ChainReader, timeout *types.Timeout) error {
x.lock.Lock()
defer x.lock.Unlock()
return x.timeoutHandler(blockChainReader, timeout)
}

/*
Proposed Block workflow
*/
Expand Down Expand Up @@ -756,23 +897,59 @@ func (x *XDPoS_v2) verifyQC(blockChainReader consensus.ChainReader, quorumCert *
ProposedBlockInfo: quorumCert.ProposedBlockInfo,
GapNumber: quorumCert.GapNumber,
})
start := time.Now()
numValidSignatures, err := x.countValidSignatures(signedVoteObj, quorumCert.Signatures, epochInfo.Masternodes)
elapsed := time.Since(start)
log.Debug("[verifyQC] time verify message signatures of qc", "elapsed", elapsed)

signatures, duplicates, err := RecoverUniqueSigners(signedVoteObj, quorumCert.Signatures)
if err != nil {
log.Error("[verifyQC] Error while verifying QC message signatures", "Error", err)
log.Error("[verifyQC] Error while getting unique signatures from QC", "qcBlockNum", quorumCert.ProposedBlockInfo.Number, "qcRound", quorumCert.ProposedBlockInfo.Round, "qcBlockHash", quorumCert.ProposedBlockInfo.Hash, "qcSignLen", len(quorumCert.Signatures), "error", err)
return err
}

if len(duplicates) != 0 {
for _, d := range duplicates {
log.Warn("[verifyQC] duplicated signature in QC", "duplicate", common.Bytes2Hex(d))
}
}

qcRound := quorumCert.ProposedBlockInfo.Round
certThreshold := x.config.V2.Config(uint64(qcRound)).CertThreshold
if (qcRound > 0) && (float64(numValidSignatures) < float64(epochInfo.MasternodesLen)*certThreshold) {
if (qcRound > 0) && (signatures == nil || float64(len(signatures)) < float64(epochInfo.MasternodesLen)*certThreshold) {
//First V2 Block QC, QC Signatures is initial nil
log.Warn("[verifyHeader] Invalid QC Signature is nil or less then config", "QCNumber", quorumCert.ProposedBlockInfo.Number, "numValidSignatures", numValidSignatures, "CertThreshold", float64(epochInfo.MasternodesLen)*certThreshold)
log.Warn("[verifyHeader] Invalid QC Signature is nil or less then config", "QCNumber", quorumCert.ProposedBlockInfo.Number, "LenSignatures", len(signatures), "CertThreshold", float64(epochInfo.MasternodesLen)*certThreshold)
return utils.ErrInvalidQCSignatures
}
start := time.Now()

eg, ctx := errgroup.WithContext(context.Background())
eg.SetLimit(runtime.NumCPU())
for _, sig := range signatures {
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
verified, _, err := x.verifyMsgSignature(types.VoteSigHash(&types.VoteForSign{
ProposedBlockInfo: quorumCert.ProposedBlockInfo,
GapNumber: quorumCert.GapNumber,
}), sig, epochInfo.Masternodes)
if err != nil {
log.Error("[verifyQC] Error while verfying QC message signatures", "Error", err)
return errors.New("error while verfying QC message signatures")
}
if !verified {
log.Warn("[verifyQC] Signature not verified doing QC verification", "QC", quorumCert)
return errors.New("fail to verify QC due to signature mis-match")
}
return nil
}
})
}
err = eg.Wait()

elapsed := time.Since(start)
log.Debug("[verifyQC] time verify message signatures of qc", "elapsed", elapsed)
if err != nil {
return err
}
epochSwitchNumber := epochInfo.EpochSwitchBlockInfo.Number.Uint64()
gapNumber := epochSwitchNumber - epochSwitchNumber%x.config.Epoch
if gapNumber > x.config.Gap {
Expand All @@ -790,7 +967,7 @@ func (x *XDPoS_v2) verifyQC(blockChainReader consensus.ChainReader, quorumCert *

// Update local QC variables including highestQC & lockQuorumCert, as well as commit the blocks that satisfy the algorithm requirements
func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuorumCert *types.QuorumCert) error {
log.Debug("[processQC][Before]", "HighQC", x.highestQuorumCert.ProposedBlockInfo.Round)
log.Trace("[processQC][Before]", "HighQC", x.highestQuorumCert)
// 1. Update HighestQC
if incomingQuorumCert.ProposedBlockInfo.Round > x.highestQuorumCert.ProposedBlockInfo.Round {
log.Debug("[processQC] update x.highestQuorumCert", "blockNum", incomingQuorumCert.ProposedBlockInfo.Number, "round", incomingQuorumCert.ProposedBlockInfo.Round, "hash", incomingQuorumCert.ProposedBlockInfo.Hash)
Expand Down Expand Up @@ -824,7 +1001,7 @@ func (x *XDPoS_v2) processQC(blockChainReader consensus.ChainReader, incomingQuo
if incomingQuorumCert.ProposedBlockInfo.Round >= x.currentRound {
x.setNewRound(blockChainReader, incomingQuorumCert.ProposedBlockInfo.Round+1)
}
log.Debug("[processQC][After]", "HighQC", x.highestQuorumCert.ProposedBlockInfo.Round)
log.Trace("[processQC][After]", "HighQC", x.highestQuorumCert)
return nil
}

Expand All @@ -839,7 +1016,8 @@ func (x *XDPoS_v2) setNewRound(blockChainReader consensus.ChainReader, round typ
x.currentRound = round
x.timeoutCount = 0
x.timeoutWorker.Reset(blockChainReader, x.currentRound, x.highestQuorumCert.ProposedBlockInfo.Round)
// don't need to clean pool, we have other process to clean and it's not good to clean here, some edge case may break
x.timeoutPool.Clear()
// don't need to clean vote pool, we have other process to clean and it's not good to clean here, some edge case may break
// for example round gets bump during collecting vote, so we have to keep vote.

// send signal to newRoundCh, but if full don't send
Expand Down Expand Up @@ -1072,7 +1250,6 @@ func (x *XDPoS_v2) periodicJob() {
<-ticker.C
x.hygieneVotePool()
x.hygieneTimeoutPool()
x.hygieneSyncInfoPool()
}
}()
}
Expand Down
Loading
Loading