mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 14:52:17 +00:00
Add stats related channel between consensus state and reactor (#2388)
This commit is contained in:
parent
f11db8c1b0
commit
f99e4010f2
@ -33,4 +33,5 @@ IMPROVEMENTS:
|
|||||||
|
|
||||||
BUG FIXES:
|
BUG FIXES:
|
||||||
- [node] \#2294 Delay starting node until Genesis time
|
- [node] \#2294 Delay starting node until Genesis time
|
||||||
|
- [consensus] \#2048 Correct peer statistics for marking peer as good
|
||||||
- [rpc] \#2460 StartHTTPAndTLSServer() now passes StartTLS() errors back to the caller rather than hanging forever.
|
- [rpc] \#2460 StartHTTPAndTLSServer() now passes StartTLS() errors back to the caller rather than hanging forever.
|
@ -59,8 +59,8 @@ func initFilesWithConfig(config *cfg.Config) error {
|
|||||||
}
|
}
|
||||||
genDoc.Validators = []types.GenesisValidator{{
|
genDoc.Validators = []types.GenesisValidator{{
|
||||||
Address: pv.GetPubKey().Address(),
|
Address: pv.GetPubKey().Address(),
|
||||||
PubKey: pv.GetPubKey(),
|
PubKey: pv.GetPubKey(),
|
||||||
Power: 10,
|
Power: 10,
|
||||||
}}
|
}}
|
||||||
|
|
||||||
if err := genDoc.SaveAs(genFile); err != nil {
|
if err := genDoc.SaveAs(genFile); err != nil {
|
||||||
|
@ -92,9 +92,9 @@ func testnetFiles(cmd *cobra.Command, args []string) error {
|
|||||||
pv := privval.LoadFilePV(pvFile)
|
pv := privval.LoadFilePV(pvFile)
|
||||||
genVals[i] = types.GenesisValidator{
|
genVals[i] = types.GenesisValidator{
|
||||||
Address: pv.GetPubKey().Address(),
|
Address: pv.GetPubKey().Address(),
|
||||||
PubKey: pv.GetPubKey(),
|
PubKey: pv.GetPubKey(),
|
||||||
Power: 1,
|
Power: 1,
|
||||||
Name: nodeDirName,
|
Name: nodeDirName,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,7 +85,6 @@ func PrometheusMetrics() *Metrics {
|
|||||||
Help: "Total power of the byzantine validators.",
|
Help: "Total power of the byzantine validators.",
|
||||||
}, []string{}),
|
}, []string{}),
|
||||||
|
|
||||||
|
|
||||||
BlockIntervalSeconds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
BlockIntervalSeconds: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
|
||||||
Subsystem: "consensus",
|
Subsystem: "consensus",
|
||||||
Name: "block_interval_seconds",
|
Name: "block_interval_seconds",
|
||||||
|
@ -29,6 +29,7 @@ const (
|
|||||||
maxMsgSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes.
|
maxMsgSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes.
|
||||||
|
|
||||||
blocksToContributeToBecomeGoodPeer = 10000
|
blocksToContributeToBecomeGoodPeer = 10000
|
||||||
|
votesToContributeToBecomeGoodPeer = 10000
|
||||||
)
|
)
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
//-----------------------------------------------------------------------------
|
||||||
@ -60,6 +61,9 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens
|
|||||||
func (conR *ConsensusReactor) OnStart() error {
|
func (conR *ConsensusReactor) OnStart() error {
|
||||||
conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync())
|
conR.Logger.Info("ConsensusReactor ", "fastSync", conR.FastSync())
|
||||||
|
|
||||||
|
// start routine that computes peer statistics for evaluating peer quality
|
||||||
|
go conR.peerStatsRoutine()
|
||||||
|
|
||||||
conR.subscribeToBroadcastEvents()
|
conR.subscribeToBroadcastEvents()
|
||||||
|
|
||||||
if !conR.FastSync() {
|
if !conR.FastSync() {
|
||||||
@ -258,9 +262,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
|
|||||||
ps.ApplyProposalPOLMessage(msg)
|
ps.ApplyProposalPOLMessage(msg)
|
||||||
case *BlockPartMessage:
|
case *BlockPartMessage:
|
||||||
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
|
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
|
||||||
if numBlocks := ps.RecordBlockPart(msg); numBlocks%blocksToContributeToBecomeGoodPeer == 0 {
|
|
||||||
conR.Switch.MarkPeerAsGood(src)
|
|
||||||
}
|
|
||||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
|
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||||
default:
|
default:
|
||||||
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
|
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
|
||||||
@ -280,9 +282,6 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
|
|||||||
ps.EnsureVoteBitArrays(height, valSize)
|
ps.EnsureVoteBitArrays(height, valSize)
|
||||||
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
|
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
|
||||||
ps.SetHasVote(msg.Vote)
|
ps.SetHasVote(msg.Vote)
|
||||||
if blocks := ps.RecordVote(msg.Vote); blocks%blocksToContributeToBecomeGoodPeer == 0 {
|
|
||||||
conR.Switch.MarkPeerAsGood(src)
|
|
||||||
}
|
|
||||||
|
|
||||||
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
|
cs.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||||
|
|
||||||
@ -794,6 +793,43 @@ OUTER_LOOP:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (conR *ConsensusReactor) peerStatsRoutine() {
|
||||||
|
for {
|
||||||
|
if !conR.IsRunning() {
|
||||||
|
conR.Logger.Info("Stopping peerStatsRoutine")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case msg := <-conR.conS.statsMsgQueue:
|
||||||
|
// Get peer
|
||||||
|
peer := conR.Switch.Peers().Get(msg.PeerID)
|
||||||
|
if peer == nil {
|
||||||
|
conR.Logger.Debug("Attempt to update stats for non-existent peer",
|
||||||
|
"peer", msg.PeerID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Get peer state
|
||||||
|
ps := peer.Get(types.PeerStateKey).(*PeerState)
|
||||||
|
switch msg.Msg.(type) {
|
||||||
|
case *VoteMessage:
|
||||||
|
if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 {
|
||||||
|
conR.Switch.MarkPeerAsGood(peer)
|
||||||
|
}
|
||||||
|
case *BlockPartMessage:
|
||||||
|
if numParts := ps.RecordBlockPart(); numParts%blocksToContributeToBecomeGoodPeer == 0 {
|
||||||
|
conR.Switch.MarkPeerAsGood(peer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-conR.conS.Quit():
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-conR.Quit():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// String returns a string representation of the ConsensusReactor.
|
// String returns a string representation of the ConsensusReactor.
|
||||||
// NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables.
|
// NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables.
|
||||||
// TODO: improve!
|
// TODO: improve!
|
||||||
@ -836,15 +872,13 @@ type PeerState struct {
|
|||||||
|
|
||||||
// peerStateStats holds internal statistics for a peer.
|
// peerStateStats holds internal statistics for a peer.
|
||||||
type peerStateStats struct {
|
type peerStateStats struct {
|
||||||
LastVoteHeight int64 `json:"last_vote_height"`
|
Votes int `json:"votes"`
|
||||||
Votes int `json:"votes"`
|
BlockParts int `json:"block_parts"`
|
||||||
LastBlockPartHeight int64 `json:"last_block_part_height"`
|
|
||||||
BlockParts int `json:"block_parts"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pss peerStateStats) String() string {
|
func (pss peerStateStats) String() string {
|
||||||
return fmt.Sprintf("peerStateStats{lvh: %d, votes: %d, lbph: %d, blockParts: %d}",
|
return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}",
|
||||||
pss.LastVoteHeight, pss.Votes, pss.LastBlockPartHeight, pss.BlockParts)
|
pss.Votes, pss.BlockParts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPeerState returns a new PeerState for the given Peer
|
// NewPeerState returns a new PeerState for the given Peer
|
||||||
@ -1080,18 +1114,14 @@ func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RecordVote updates internal statistics for this peer by recording the vote.
|
// RecordVote increments internal votes related statistics for this peer.
|
||||||
// It returns the total number of votes (1 per block). This essentially means
|
// It returns the total number of added votes.
|
||||||
// the number of blocks for which peer has been sending us votes.
|
func (ps *PeerState) RecordVote() int {
|
||||||
func (ps *PeerState) RecordVote(vote *types.Vote) int {
|
|
||||||
ps.mtx.Lock()
|
ps.mtx.Lock()
|
||||||
defer ps.mtx.Unlock()
|
defer ps.mtx.Unlock()
|
||||||
|
|
||||||
if ps.Stats.LastVoteHeight >= vote.Height {
|
|
||||||
return ps.Stats.Votes
|
|
||||||
}
|
|
||||||
ps.Stats.LastVoteHeight = vote.Height
|
|
||||||
ps.Stats.Votes++
|
ps.Stats.Votes++
|
||||||
|
|
||||||
return ps.Stats.Votes
|
return ps.Stats.Votes
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1104,25 +1134,17 @@ func (ps *PeerState) VotesSent() int {
|
|||||||
return ps.Stats.Votes
|
return ps.Stats.Votes
|
||||||
}
|
}
|
||||||
|
|
||||||
// RecordBlockPart updates internal statistics for this peer by recording the
|
// RecordBlockPart increments internal block part related statistics for this peer.
|
||||||
// block part. It returns the total number of block parts (1 per block). This
|
// It returns the total number of added block parts.
|
||||||
// essentially means the number of blocks for which peer has been sending us
|
func (ps *PeerState) RecordBlockPart() int {
|
||||||
// block parts.
|
|
||||||
func (ps *PeerState) RecordBlockPart(bp *BlockPartMessage) int {
|
|
||||||
ps.mtx.Lock()
|
ps.mtx.Lock()
|
||||||
defer ps.mtx.Unlock()
|
defer ps.mtx.Unlock()
|
||||||
|
|
||||||
if ps.Stats.LastBlockPartHeight >= bp.Height {
|
|
||||||
return ps.Stats.BlockParts
|
|
||||||
}
|
|
||||||
|
|
||||||
ps.Stats.LastBlockPartHeight = bp.Height
|
|
||||||
ps.Stats.BlockParts++
|
ps.Stats.BlockParts++
|
||||||
return ps.Stats.BlockParts
|
return ps.Stats.BlockParts
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlockPartsSent returns the number of blocks for which peer has been sending
|
// BlockPartsSent returns the number of useful block parts the peer has sent us.
|
||||||
// us block parts.
|
|
||||||
func (ps *PeerState) BlockPartsSent() int {
|
func (ps *PeerState) BlockPartsSent() int {
|
||||||
ps.mtx.Lock()
|
ps.mtx.Lock()
|
||||||
defer ps.mtx.Unlock()
|
defer ps.mtx.Unlock()
|
||||||
|
@ -11,20 +11,16 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
abcicli "github.com/tendermint/tendermint/abci/client"
|
"github.com/tendermint/tendermint/abci/client"
|
||||||
"github.com/tendermint/tendermint/abci/example/kvstore"
|
"github.com/tendermint/tendermint/abci/example/kvstore"
|
||||||
abci "github.com/tendermint/tendermint/abci/types"
|
abci "github.com/tendermint/tendermint/abci/types"
|
||||||
bc "github.com/tendermint/tendermint/blockchain"
|
bc "github.com/tendermint/tendermint/blockchain"
|
||||||
cmn "github.com/tendermint/tendermint/libs/common"
|
cfg "github.com/tendermint/tendermint/config"
|
||||||
dbm "github.com/tendermint/tendermint/libs/db"
|
dbm "github.com/tendermint/tendermint/libs/db"
|
||||||
"github.com/tendermint/tendermint/libs/log"
|
"github.com/tendermint/tendermint/libs/log"
|
||||||
mempl "github.com/tendermint/tendermint/mempool"
|
mempl "github.com/tendermint/tendermint/mempool"
|
||||||
sm "github.com/tendermint/tendermint/state"
|
|
||||||
tmtime "github.com/tendermint/tendermint/types/time"
|
|
||||||
|
|
||||||
cfg "github.com/tendermint/tendermint/config"
|
|
||||||
"github.com/tendermint/tendermint/p2p"
|
"github.com/tendermint/tendermint/p2p"
|
||||||
p2pdummy "github.com/tendermint/tendermint/p2p/dummy"
|
sm "github.com/tendermint/tendermint/state"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@ -246,110 +242,25 @@ func TestReactorProposalHeartbeats(t *testing.T) {
|
|||||||
}, css)
|
}, css)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test we record block parts from other peers
|
// Test we record stats about votes and block parts from other peers.
|
||||||
func TestReactorRecordsBlockParts(t *testing.T) {
|
func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
|
||||||
// create dummy peer
|
N := 4
|
||||||
peer := p2pdummy.NewPeer()
|
css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
|
||||||
ps := NewPeerState(peer).SetLogger(log.TestingLogger())
|
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
|
||||||
peer.Set(types.PeerStateKey, ps)
|
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||||
|
|
||||||
// create reactor
|
// wait till everyone makes the first new block
|
||||||
css := randConsensusNet(1, "consensus_reactor_records_block_parts_test", newMockTickerFunc(true), newPersistentKVStore)
|
timeoutWaitGroup(t, N, func(j int) {
|
||||||
reactor := NewConsensusReactor(css[0], false) // so we dont start the consensus states
|
<-eventChans[j]
|
||||||
reactor.SetEventBus(css[0].eventBus)
|
}, css)
|
||||||
reactor.SetLogger(log.TestingLogger())
|
|
||||||
sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
|
|
||||||
reactor.SetSwitch(sw)
|
|
||||||
err := reactor.Start()
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer reactor.Stop()
|
|
||||||
|
|
||||||
// 1) new block part
|
// Get peer
|
||||||
parts := types.NewPartSetFromData(cmn.RandBytes(100), 10)
|
peer := reactors[1].Switch.Peers().List()[0]
|
||||||
msg := &BlockPartMessage{
|
// Get peer state
|
||||||
Height: 2,
|
ps := peer.Get(types.PeerStateKey).(*PeerState)
|
||||||
Round: 0,
|
|
||||||
Part: parts.GetPart(0),
|
|
||||||
}
|
|
||||||
bz, err := cdc.MarshalBinaryBare(msg)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
reactor.Receive(DataChannel, peer, bz)
|
assert.Equal(t, true, ps.VotesSent() > 0, "number of votes sent should have increased")
|
||||||
require.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should have increased by 1")
|
assert.Equal(t, true, ps.BlockPartsSent() > 0, "number of votes sent should have increased")
|
||||||
|
|
||||||
// 2) block part with the same height, but different round
|
|
||||||
msg.Round = 1
|
|
||||||
|
|
||||||
bz, err = cdc.MarshalBinaryBare(msg)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
reactor.Receive(DataChannel, peer, bz)
|
|
||||||
require.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should stay the same")
|
|
||||||
|
|
||||||
// 3) block part from earlier height
|
|
||||||
msg.Height = 1
|
|
||||||
msg.Round = 0
|
|
||||||
|
|
||||||
bz, err = cdc.MarshalBinaryBare(msg)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
reactor.Receive(DataChannel, peer, bz)
|
|
||||||
require.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should stay the same")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test we record votes from other peers.
|
|
||||||
func TestReactorRecordsVotes(t *testing.T) {
|
|
||||||
// Create dummy peer.
|
|
||||||
peer := p2pdummy.NewPeer()
|
|
||||||
ps := NewPeerState(peer).SetLogger(log.TestingLogger())
|
|
||||||
peer.Set(types.PeerStateKey, ps)
|
|
||||||
|
|
||||||
// Create reactor.
|
|
||||||
css := randConsensusNet(1, "consensus_reactor_records_votes_test", newMockTickerFunc(true), newPersistentKVStore)
|
|
||||||
reactor := NewConsensusReactor(css[0], false) // so we dont start the consensus states
|
|
||||||
reactor.SetEventBus(css[0].eventBus)
|
|
||||||
reactor.SetLogger(log.TestingLogger())
|
|
||||||
sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
|
|
||||||
reactor.SetSwitch(sw)
|
|
||||||
err := reactor.Start()
|
|
||||||
require.NoError(t, err)
|
|
||||||
defer reactor.Stop()
|
|
||||||
_, val := css[0].state.Validators.GetByIndex(0)
|
|
||||||
|
|
||||||
// 1) new vote
|
|
||||||
vote := &types.Vote{
|
|
||||||
ValidatorIndex: 0,
|
|
||||||
ValidatorAddress: val.Address,
|
|
||||||
Height: 2,
|
|
||||||
Round: 0,
|
|
||||||
Timestamp: tmtime.Now(),
|
|
||||||
Type: types.VoteTypePrevote,
|
|
||||||
BlockID: types.BlockID{},
|
|
||||||
}
|
|
||||||
bz, err := cdc.MarshalBinaryBare(&VoteMessage{vote})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
reactor.Receive(VoteChannel, peer, bz)
|
|
||||||
assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should have increased by 1")
|
|
||||||
|
|
||||||
// 2) vote with the same height, but different round
|
|
||||||
vote.Round = 1
|
|
||||||
|
|
||||||
bz, err = cdc.MarshalBinaryBare(&VoteMessage{vote})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
reactor.Receive(VoteChannel, peer, bz)
|
|
||||||
assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should stay the same")
|
|
||||||
|
|
||||||
// 3) vote from earlier height
|
|
||||||
vote.Height = 1
|
|
||||||
vote.Round = 0
|
|
||||||
|
|
||||||
bz, err = cdc.MarshalBinaryBare(&VoteMessage{vote})
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
reactor.Receive(VoteChannel, peer, bz)
|
|
||||||
assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should stay the same")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//-------------------------------------------------------------
|
//-------------------------------------------------------------
|
||||||
|
@ -91,6 +91,10 @@ type ConsensusState struct {
|
|||||||
internalMsgQueue chan msgInfo
|
internalMsgQueue chan msgInfo
|
||||||
timeoutTicker TimeoutTicker
|
timeoutTicker TimeoutTicker
|
||||||
|
|
||||||
|
// information about about added votes and block parts are written on this channel
|
||||||
|
// so statistics can be computed by reactor
|
||||||
|
statsMsgQueue chan msgInfo
|
||||||
|
|
||||||
// we use eventBus to trigger msg broadcasts in the reactor,
|
// we use eventBus to trigger msg broadcasts in the reactor,
|
||||||
// and to notify external subscribers, eg. through a websocket
|
// and to notify external subscribers, eg. through a websocket
|
||||||
eventBus *types.EventBus
|
eventBus *types.EventBus
|
||||||
@ -141,6 +145,7 @@ func NewConsensusState(
|
|||||||
peerMsgQueue: make(chan msgInfo, msgQueueSize),
|
peerMsgQueue: make(chan msgInfo, msgQueueSize),
|
||||||
internalMsgQueue: make(chan msgInfo, msgQueueSize),
|
internalMsgQueue: make(chan msgInfo, msgQueueSize),
|
||||||
timeoutTicker: NewTimeoutTicker(),
|
timeoutTicker: NewTimeoutTicker(),
|
||||||
|
statsMsgQueue: make(chan msgInfo, msgQueueSize),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
doWALCatchup: true,
|
doWALCatchup: true,
|
||||||
wal: nilWAL{},
|
wal: nilWAL{},
|
||||||
@ -639,7 +644,11 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) {
|
|||||||
err = cs.setProposal(msg.Proposal)
|
err = cs.setProposal(msg.Proposal)
|
||||||
case *BlockPartMessage:
|
case *BlockPartMessage:
|
||||||
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
|
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
|
||||||
_, err = cs.addProposalBlockPart(msg, peerID)
|
added, err := cs.addProposalBlockPart(msg, peerID)
|
||||||
|
if added {
|
||||||
|
cs.statsMsgQueue <- mi
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil && msg.Round != cs.Round {
|
if err != nil && msg.Round != cs.Round {
|
||||||
cs.Logger.Debug("Received block part from wrong round", "height", cs.Height, "csRound", cs.Round, "blockRound", msg.Round)
|
cs.Logger.Debug("Received block part from wrong round", "height", cs.Height, "csRound", cs.Round, "blockRound", msg.Round)
|
||||||
err = nil
|
err = nil
|
||||||
@ -647,7 +656,11 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) {
|
|||||||
case *VoteMessage:
|
case *VoteMessage:
|
||||||
// attempt to add the vote and dupeout the validator if its a duplicate signature
|
// attempt to add the vote and dupeout the validator if its a duplicate signature
|
||||||
// if the vote gives us a 2/3-any or 2/3-one, we transition
|
// if the vote gives us a 2/3-any or 2/3-one, we transition
|
||||||
err := cs.tryAddVote(msg.Vote, peerID)
|
added, err := cs.tryAddVote(msg.Vote, peerID)
|
||||||
|
if added {
|
||||||
|
cs.statsMsgQueue <- mi
|
||||||
|
}
|
||||||
|
|
||||||
if err == ErrAddingVote {
|
if err == ErrAddingVote {
|
||||||
// TODO: punish peer
|
// TODO: punish peer
|
||||||
// We probably don't want to stop the peer here. The vote does not
|
// We probably don't want to stop the peer here. The vote does not
|
||||||
@ -1454,7 +1467,7 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p
|
|||||||
int64(cs.state.ConsensusParams.BlockSize.MaxBytes),
|
int64(cs.state.ConsensusParams.BlockSize.MaxBytes),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return added, err
|
||||||
}
|
}
|
||||||
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
|
// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
|
||||||
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
|
cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash())
|
||||||
@ -1484,35 +1497,35 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p
|
|||||||
// If we're waiting on the proposal block...
|
// If we're waiting on the proposal block...
|
||||||
cs.tryFinalizeCommit(height)
|
cs.tryFinalizeCommit(height)
|
||||||
}
|
}
|
||||||
return true, nil
|
return added, nil
|
||||||
}
|
}
|
||||||
return added, nil
|
return added, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Attempt to add the vote. if its a duplicate signature, dupeout the validator
|
// Attempt to add the vote. if its a duplicate signature, dupeout the validator
|
||||||
func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) error {
|
func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) (bool, error) {
|
||||||
_, err := cs.addVote(vote, peerID)
|
added, err := cs.addVote(vote, peerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// If the vote height is off, we'll just ignore it,
|
// If the vote height is off, we'll just ignore it,
|
||||||
// But if it's a conflicting sig, add it to the cs.evpool.
|
// But if it's a conflicting sig, add it to the cs.evpool.
|
||||||
// If it's otherwise invalid, punish peer.
|
// If it's otherwise invalid, punish peer.
|
||||||
if err == ErrVoteHeightMismatch {
|
if err == ErrVoteHeightMismatch {
|
||||||
return err
|
return added, err
|
||||||
} else if voteErr, ok := err.(*types.ErrVoteConflictingVotes); ok {
|
} else if voteErr, ok := err.(*types.ErrVoteConflictingVotes); ok {
|
||||||
if bytes.Equal(vote.ValidatorAddress, cs.privValidator.GetAddress()) {
|
if bytes.Equal(vote.ValidatorAddress, cs.privValidator.GetAddress()) {
|
||||||
cs.Logger.Error("Found conflicting vote from ourselves. Did you unsafe_reset a validator?", "height", vote.Height, "round", vote.Round, "type", vote.Type)
|
cs.Logger.Error("Found conflicting vote from ourselves. Did you unsafe_reset a validator?", "height", vote.Height, "round", vote.Round, "type", vote.Type)
|
||||||
return err
|
return added, err
|
||||||
}
|
}
|
||||||
cs.evpool.AddEvidence(voteErr.DuplicateVoteEvidence)
|
cs.evpool.AddEvidence(voteErr.DuplicateVoteEvidence)
|
||||||
return err
|
return added, err
|
||||||
} else {
|
} else {
|
||||||
// Probably an invalid signature / Bad peer.
|
// Probably an invalid signature / Bad peer.
|
||||||
// Seems this can also err sometimes with "Unexpected step" - perhaps not from a bad peer ?
|
// Seems this can also err sometimes with "Unexpected step" - perhaps not from a bad peer ?
|
||||||
cs.Logger.Error("Error attempting to add vote", "err", err)
|
cs.Logger.Error("Error attempting to add vote", "err", err)
|
||||||
return ErrAddingVote
|
return added, ErrAddingVote
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return added, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
//-----------------------------------------------------------------------------
|
||||||
|
@ -7,9 +7,13 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||||
|
cmn "github.com/tendermint/tendermint/libs/common"
|
||||||
"github.com/tendermint/tendermint/libs/log"
|
"github.com/tendermint/tendermint/libs/log"
|
||||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||||
|
p2pdummy "github.com/tendermint/tendermint/p2p/dummy"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1081,6 +1085,80 @@ func TestStateHalt1(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateOutputsBlockPartsStats(t *testing.T) {
|
||||||
|
// create dummy peer
|
||||||
|
cs, _ := randConsensusState(1)
|
||||||
|
peer := p2pdummy.NewPeer()
|
||||||
|
|
||||||
|
// 1) new block part
|
||||||
|
parts := types.NewPartSetFromData(cmn.RandBytes(100), 10)
|
||||||
|
msg := &BlockPartMessage{
|
||||||
|
Height: 1,
|
||||||
|
Round: 0,
|
||||||
|
Part: parts.GetPart(0),
|
||||||
|
}
|
||||||
|
|
||||||
|
cs.ProposalBlockParts = types.NewPartSetFromHeader(parts.Header())
|
||||||
|
cs.handleMsg(msgInfo{msg, peer.ID()})
|
||||||
|
|
||||||
|
statsMessage := <-cs.statsMsgQueue
|
||||||
|
require.Equal(t, msg, statsMessage.Msg, "")
|
||||||
|
require.Equal(t, peer.ID(), statsMessage.PeerID, "")
|
||||||
|
|
||||||
|
// sending the same part from different peer
|
||||||
|
cs.handleMsg(msgInfo{msg, "peer2"})
|
||||||
|
|
||||||
|
// sending the part with the same height, but different round
|
||||||
|
msg.Round = 1
|
||||||
|
cs.handleMsg(msgInfo{msg, peer.ID()})
|
||||||
|
|
||||||
|
// sending the part from the smaller height
|
||||||
|
msg.Height = 0
|
||||||
|
cs.handleMsg(msgInfo{msg, peer.ID()})
|
||||||
|
|
||||||
|
// sending the part from the bigger height
|
||||||
|
msg.Height = 3
|
||||||
|
cs.handleMsg(msgInfo{msg, peer.ID()})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-cs.statsMsgQueue:
|
||||||
|
t.Errorf("Should not output stats message after receiving the known block part!")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateOutputVoteStats(t *testing.T) {
|
||||||
|
cs, vss := randConsensusState(2)
|
||||||
|
// create dummy peer
|
||||||
|
peer := p2pdummy.NewPeer()
|
||||||
|
|
||||||
|
vote := signVote(vss[1], types.VoteTypePrecommit, []byte("test"), types.PartSetHeader{})
|
||||||
|
|
||||||
|
voteMessage := &VoteMessage{vote}
|
||||||
|
cs.handleMsg(msgInfo{voteMessage, peer.ID()})
|
||||||
|
|
||||||
|
statsMessage := <-cs.statsMsgQueue
|
||||||
|
require.Equal(t, voteMessage, statsMessage.Msg, "")
|
||||||
|
require.Equal(t, peer.ID(), statsMessage.PeerID, "")
|
||||||
|
|
||||||
|
// sending the same part from different peer
|
||||||
|
cs.handleMsg(msgInfo{&VoteMessage{vote}, "peer2"})
|
||||||
|
|
||||||
|
// sending the vote for the bigger height
|
||||||
|
incrementHeight(vss[1])
|
||||||
|
vote = signVote(vss[1], types.VoteTypePrecommit, []byte("test"), types.PartSetHeader{})
|
||||||
|
|
||||||
|
cs.handleMsg(msgInfo{&VoteMessage{vote}, peer.ID()})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-cs.statsMsgQueue:
|
||||||
|
t.Errorf("Should not output stats message after receiving the known vote or vote from bigger height")
|
||||||
|
case <-time.After(50 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// subscribe subscribes test client to the given query and returns a channel with cap = 1.
|
// subscribe subscribes test client to the given query and returns a channel with cap = 1.
|
||||||
func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan interface{} {
|
func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan interface{} {
|
||||||
out := make(chan interface{}, 1)
|
out := make(chan interface{}, 1)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user