mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-13 13:21:20 +00:00
Merge pull request #1333 from tendermint/1244-follow-up
consensus: fix tracking for MarkGood
This commit is contained in:
@ -76,9 +76,9 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl
|
|||||||
store.Height()))
|
store.Height()))
|
||||||
}
|
}
|
||||||
|
|
||||||
const cap = 1000 // must be bigger than peers count
|
const capacity = 1000 // must be bigger than peers count
|
||||||
requestsCh := make(chan BlockRequest, cap)
|
requestsCh := make(chan BlockRequest, capacity)
|
||||||
errorsCh := make(chan peerError, cap) // so we don't block in #Receive#pool.AddBlock
|
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
|
||||||
|
|
||||||
pool := NewBlockPool(
|
pool := NewBlockPool(
|
||||||
store.Height()+1,
|
store.Height()+1,
|
||||||
|
@ -27,6 +27,8 @@ const (
|
|||||||
VoteSetBitsChannel = byte(0x23)
|
VoteSetBitsChannel = byte(0x23)
|
||||||
|
|
||||||
maxConsensusMessageSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes.
|
maxConsensusMessageSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes.
|
||||||
|
|
||||||
|
blocksToContributeToBecomeGoodPeer = 10000
|
||||||
)
|
)
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
//-----------------------------------------------------------------------------
|
||||||
@ -251,7 +253,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
|
|||||||
ps.ApplyProposalPOLMessage(msg)
|
ps.ApplyProposalPOLMessage(msg)
|
||||||
case *BlockPartMessage:
|
case *BlockPartMessage:
|
||||||
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
|
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
|
||||||
if numBlocks := ps.RecordBlockPart(msg); numBlocks > 10000 {
|
if numBlocks := ps.RecordBlockPart(msg); numBlocks%blocksToContributeToBecomeGoodPeer == 0 {
|
||||||
conR.Switch.MarkPeerAsGood(src)
|
conR.Switch.MarkPeerAsGood(src)
|
||||||
}
|
}
|
||||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
|
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||||
@ -273,7 +275,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
|
|||||||
ps.EnsureVoteBitArrays(height, valSize)
|
ps.EnsureVoteBitArrays(height, valSize)
|
||||||
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
|
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
|
||||||
ps.SetHasVote(msg.Vote)
|
ps.SetHasVote(msg.Vote)
|
||||||
if blocks := ps.RecordVote(msg.Vote); blocks > 10000 {
|
if blocks := ps.RecordVote(msg.Vote); blocks%blocksToContributeToBecomeGoodPeer == 0 {
|
||||||
conR.Switch.MarkPeerAsGood(src)
|
conR.Switch.MarkPeerAsGood(src)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -850,6 +852,10 @@ type peerStateStats struct {
|
|||||||
blockParts int
|
blockParts int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (pss peerStateStats) String() string {
|
||||||
|
return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}", pss.votes, pss.blockParts)
|
||||||
|
}
|
||||||
|
|
||||||
// NewPeerState returns a new PeerState for the given Peer
|
// NewPeerState returns a new PeerState for the given Peer
|
||||||
func NewPeerState(peer p2p.Peer) *PeerState {
|
func NewPeerState(peer p2p.Peer) *PeerState {
|
||||||
return &PeerState{
|
return &PeerState{
|
||||||
@ -1077,10 +1083,7 @@ func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) {
|
|||||||
// It returns the total number of votes (1 per block). This essentially means
|
// It returns the total number of votes (1 per block). This essentially means
|
||||||
// the number of blocks for which peer has been sending us votes.
|
// the number of blocks for which peer has been sending us votes.
|
||||||
func (ps *PeerState) RecordVote(vote *types.Vote) int {
|
func (ps *PeerState) RecordVote(vote *types.Vote) int {
|
||||||
ps.mtx.Lock()
|
if ps.stats.lastVoteHeight >= vote.Height {
|
||||||
defer ps.mtx.Unlock()
|
|
||||||
|
|
||||||
if ps.stats.lastVoteHeight == vote.Height {
|
|
||||||
return ps.stats.votes
|
return ps.stats.votes
|
||||||
}
|
}
|
||||||
ps.stats.lastVoteHeight = vote.Height
|
ps.stats.lastVoteHeight = vote.Height
|
||||||
@ -1088,14 +1091,17 @@ func (ps *PeerState) RecordVote(vote *types.Vote) int {
|
|||||||
return ps.stats.votes
|
return ps.stats.votes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// VotesSent returns the number of blocks for which peer has been sending us
|
||||||
|
// votes.
|
||||||
|
func (ps *PeerState) VotesSent() int {
|
||||||
|
return ps.stats.votes
|
||||||
|
}
|
||||||
|
|
||||||
// RecordVote updates internal statistics for this peer by recording the block part.
|
// RecordVote updates internal statistics for this peer by recording the block part.
|
||||||
// It returns the total number of block parts (1 per block). This essentially means
|
// It returns the total number of block parts (1 per block). This essentially means
|
||||||
// the number of blocks for which peer has been sending us block parts.
|
// the number of blocks for which peer has been sending us block parts.
|
||||||
func (ps *PeerState) RecordBlockPart(bp *BlockPartMessage) int {
|
func (ps *PeerState) RecordBlockPart(bp *BlockPartMessage) int {
|
||||||
ps.mtx.Lock()
|
if ps.stats.lastBlockPartHeight >= bp.Height {
|
||||||
defer ps.mtx.Unlock()
|
|
||||||
|
|
||||||
if ps.stats.lastBlockPartHeight == bp.Height {
|
|
||||||
return ps.stats.blockParts
|
return ps.stats.blockParts
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1104,6 +1110,12 @@ func (ps *PeerState) RecordBlockPart(bp *BlockPartMessage) int {
|
|||||||
return ps.stats.blockParts
|
return ps.stats.blockParts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BlockPartsSent returns the number of blocks for which peer has been sending
|
||||||
|
// us block parts.
|
||||||
|
func (ps *PeerState) BlockPartsSent() int {
|
||||||
|
return ps.stats.blockParts
|
||||||
|
}
|
||||||
|
|
||||||
// SetHasVote sets the given vote as known by the peer
|
// SetHasVote sets the given vote as known by the peer
|
||||||
func (ps *PeerState) SetHasVote(vote *types.Vote) {
|
func (ps *PeerState) SetHasVote(vote *types.Vote) {
|
||||||
ps.mtx.Lock()
|
ps.mtx.Lock()
|
||||||
@ -1250,11 +1262,13 @@ func (ps *PeerState) StringIndented(indent string) string {
|
|||||||
ps.mtx.Lock()
|
ps.mtx.Lock()
|
||||||
defer ps.mtx.Unlock()
|
defer ps.mtx.Unlock()
|
||||||
return fmt.Sprintf(`PeerState{
|
return fmt.Sprintf(`PeerState{
|
||||||
%s Key %v
|
%s Key %v
|
||||||
%s PRS %v
|
%s PRS %v
|
||||||
|
%s Stats %v
|
||||||
%s}`,
|
%s}`,
|
||||||
indent, ps.Peer.ID(),
|
indent, ps.Peer.ID(),
|
||||||
indent, ps.PeerRoundState.StringIndented(indent+" "),
|
indent, ps.PeerRoundState.StringIndented(indent+" "),
|
||||||
|
indent, ps.stats,
|
||||||
indent)
|
indent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,10 +11,13 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tendermint/abci/example/kvstore"
|
"github.com/tendermint/abci/example/kvstore"
|
||||||
|
wire "github.com/tendermint/tendermint/wire"
|
||||||
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
"github.com/tendermint/tmlibs/log"
|
"github.com/tendermint/tmlibs/log"
|
||||||
|
|
||||||
cfg "github.com/tendermint/tendermint/config"
|
cfg "github.com/tendermint/tendermint/config"
|
||||||
"github.com/tendermint/tendermint/p2p"
|
"github.com/tendermint/tendermint/p2p"
|
||||||
|
p2pdummy "github.com/tendermint/tendermint/p2p/dummy"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@ -121,6 +124,112 @@ func TestReactorProposalHeartbeats(t *testing.T) {
|
|||||||
}, css)
|
}, css)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test we record block parts from other peers
|
||||||
|
func TestReactorRecordsBlockParts(t *testing.T) {
|
||||||
|
// create dummy peer
|
||||||
|
peer := p2pdummy.NewPeer()
|
||||||
|
ps := NewPeerState(peer).SetLogger(log.TestingLogger())
|
||||||
|
peer.Set(types.PeerStateKey, ps)
|
||||||
|
|
||||||
|
// create reactor
|
||||||
|
css := randConsensusNet(1, "consensus_reactor_records_block_parts_test", newMockTickerFunc(true), newPersistentKVStore)
|
||||||
|
reactor := NewConsensusReactor(css[0], false) // so we dont start the consensus states
|
||||||
|
reactor.SetEventBus(css[0].eventBus)
|
||||||
|
reactor.SetLogger(log.TestingLogger())
|
||||||
|
sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
|
||||||
|
reactor.SetSwitch(sw)
|
||||||
|
err := reactor.Start()
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer reactor.Stop()
|
||||||
|
|
||||||
|
// 1) new block part
|
||||||
|
parts := types.NewPartSetFromData(cmn.RandBytes(100), 10)
|
||||||
|
msg := &BlockPartMessage{
|
||||||
|
Height: 2,
|
||||||
|
Round: 0,
|
||||||
|
Part: parts.GetPart(0),
|
||||||
|
}
|
||||||
|
bz, err := wire.MarshalBinary(struct{ ConsensusMessage }{msg})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
reactor.Receive(DataChannel, peer, bz)
|
||||||
|
assert.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should have increased by 1")
|
||||||
|
|
||||||
|
// 2) block part with the same height, but different round
|
||||||
|
msg.Round = 1
|
||||||
|
|
||||||
|
bz, err = wire.MarshalBinary(struct{ ConsensusMessage }{msg})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
reactor.Receive(DataChannel, peer, bz)
|
||||||
|
assert.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should stay the same")
|
||||||
|
|
||||||
|
// 3) block part from earlier height
|
||||||
|
msg.Height = 1
|
||||||
|
msg.Round = 0
|
||||||
|
|
||||||
|
bz, err = wire.MarshalBinary(struct{ ConsensusMessage }{msg})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
reactor.Receive(DataChannel, peer, bz)
|
||||||
|
assert.Equal(t, 1, ps.BlockPartsSent(), "number of block parts sent should stay the same")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test we record votes from other peers
|
||||||
|
func TestReactorRecordsVotes(t *testing.T) {
|
||||||
|
// create dummy peer
|
||||||
|
peer := p2pdummy.NewPeer()
|
||||||
|
ps := NewPeerState(peer).SetLogger(log.TestingLogger())
|
||||||
|
peer.Set(types.PeerStateKey, ps)
|
||||||
|
|
||||||
|
// create reactor
|
||||||
|
css := randConsensusNet(1, "consensus_reactor_records_votes_test", newMockTickerFunc(true), newPersistentKVStore)
|
||||||
|
reactor := NewConsensusReactor(css[0], false) // so we dont start the consensus states
|
||||||
|
reactor.SetEventBus(css[0].eventBus)
|
||||||
|
reactor.SetLogger(log.TestingLogger())
|
||||||
|
sw := p2p.MakeSwitch(cfg.DefaultP2PConfig(), 1, "testing", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
|
||||||
|
reactor.SetSwitch(sw)
|
||||||
|
err := reactor.Start()
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer reactor.Stop()
|
||||||
|
_, val := css[0].state.Validators.GetByIndex(0)
|
||||||
|
|
||||||
|
// 1) new vote
|
||||||
|
vote := &types.Vote{
|
||||||
|
ValidatorIndex: 0,
|
||||||
|
ValidatorAddress: val.Address,
|
||||||
|
Height: 2,
|
||||||
|
Round: 0,
|
||||||
|
Timestamp: time.Now().UTC(),
|
||||||
|
Type: types.VoteTypePrevote,
|
||||||
|
BlockID: types.BlockID{},
|
||||||
|
}
|
||||||
|
bz, err := wire.MarshalBinary(struct{ ConsensusMessage }{&VoteMessage{vote}})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
reactor.Receive(VoteChannel, peer, bz)
|
||||||
|
assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should have increased by 1")
|
||||||
|
|
||||||
|
// 2) vote with the same height, but different round
|
||||||
|
vote.Round = 1
|
||||||
|
|
||||||
|
bz, err = wire.MarshalBinary(struct{ ConsensusMessage }{&VoteMessage{vote}})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
reactor.Receive(VoteChannel, peer, bz)
|
||||||
|
assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should stay the same")
|
||||||
|
|
||||||
|
// 3) vote from earlier height
|
||||||
|
vote.Height = 1
|
||||||
|
vote.Round = 0
|
||||||
|
|
||||||
|
bz, err = wire.MarshalBinary(struct{ ConsensusMessage }{&VoteMessage{vote}})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
reactor.Receive(VoteChannel, peer, bz)
|
||||||
|
assert.Equal(t, 1, ps.VotesSent(), "number of votes sent should stay the same")
|
||||||
|
}
|
||||||
|
|
||||||
//-------------------------------------------------------------
|
//-------------------------------------------------------------
|
||||||
// ensure we can make blocks despite cycling a validator set
|
// ensure we can make blocks despite cycling a validator set
|
||||||
|
|
||||||
|
72
p2p/dummy/peer.go
Normal file
72
p2p/dummy/peer.go
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
package dummy
|
||||||
|
|
||||||
|
import (
|
||||||
|
p2p "github.com/tendermint/tendermint/p2p"
|
||||||
|
tmconn "github.com/tendermint/tendermint/p2p/conn"
|
||||||
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
|
)
|
||||||
|
|
||||||
|
type peer struct {
|
||||||
|
cmn.BaseService
|
||||||
|
kv map[string]interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ p2p.Peer = (*peer)(nil)
|
||||||
|
|
||||||
|
// NewPeer creates new dummy peer.
|
||||||
|
func NewPeer() *peer {
|
||||||
|
p := &peer{
|
||||||
|
kv: make(map[string]interface{}),
|
||||||
|
}
|
||||||
|
p.BaseService = *cmn.NewBaseService(nil, "peer", p)
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// ID always returns dummy.
|
||||||
|
func (p *peer) ID() p2p.ID {
|
||||||
|
return p2p.ID("dummy")
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsOutbound always returns false.
|
||||||
|
func (p *peer) IsOutbound() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsPersistent always returns false.
|
||||||
|
func (p *peer) IsPersistent() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// NodeInfo always returns empty node info.
|
||||||
|
func (p *peer) NodeInfo() p2p.NodeInfo {
|
||||||
|
return p2p.NodeInfo{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Status always returns empry connection status.
|
||||||
|
func (p *peer) Status() tmconn.ConnectionStatus {
|
||||||
|
return tmconn.ConnectionStatus{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send does not do anything and just returns true.
|
||||||
|
func (p *peer) Send(byte, interface{}) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// TrySend does not do anything and just returns true.
|
||||||
|
func (p *peer) TrySend(byte, interface{}) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set records value under key specified in the map.
|
||||||
|
func (p *peer) Set(key string, value interface{}) {
|
||||||
|
p.kv[key] = value
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns a value associated with the key. Nil is returned if no value
|
||||||
|
// found.
|
||||||
|
func (p *peer) Get(key string) interface{} {
|
||||||
|
if value, ok := p.kv[key]; ok {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -358,7 +358,9 @@ func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, ch
|
|||||||
onReceive := func(chID byte, msgBytes []byte) {
|
onReceive := func(chID byte, msgBytes []byte) {
|
||||||
reactor := reactorsByCh[chID]
|
reactor := reactorsByCh[chID]
|
||||||
if reactor == nil {
|
if reactor == nil {
|
||||||
onPeerError(p, fmt.Errorf("Unknown channel %X", chID))
|
// Note that its ok to panic here as it's caught in the conn._recover,
|
||||||
|
// which does onPeerError.
|
||||||
|
panic(cmn.Fmt("Unknown channel %X", chID))
|
||||||
}
|
}
|
||||||
reactor.Receive(chID, p, msgBytes)
|
reactor.Receive(chID, p, msgBytes)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user