mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-28 04:01:40 +00:00
Merge branch 'develop' into jae/aminoify
This commit is contained in:
@ -26,6 +26,8 @@ const (
|
||||
VoteSetBitsChannel = byte(0x23)
|
||||
|
||||
maxConsensusMessageSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes.
|
||||
|
||||
blocksToContributeToBecomeGoodPeer = 10000
|
||||
)
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
@ -254,7 +256,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
|
||||
ps.ApplyProposalPOLMessage(msg)
|
||||
case *BlockPartMessage:
|
||||
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
|
||||
if numBlocks := ps.RecordBlockPart(msg); numBlocks > 10000 {
|
||||
if numBlocks := ps.RecordBlockPart(msg); numBlocks%blocksToContributeToBecomeGoodPeer == 0 {
|
||||
conR.Switch.MarkPeerAsGood(src)
|
||||
}
|
||||
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
|
||||
@ -276,7 +278,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
|
||||
ps.EnsureVoteBitArrays(height, valSize)
|
||||
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
|
||||
ps.SetHasVote(msg.Vote)
|
||||
if blocks := ps.RecordVote(msg.Vote); blocks > 10000 {
|
||||
if blocks := ps.RecordVote(msg.Vote); blocks%blocksToContributeToBecomeGoodPeer == 0 {
|
||||
conR.Switch.MarkPeerAsGood(src)
|
||||
}
|
||||
|
||||
@ -372,19 +374,21 @@ func (conR *ConsensusReactor) startBroadcastRoutine() error {
|
||||
}
|
||||
|
||||
go func() {
|
||||
var data interface{}
|
||||
var ok bool
|
||||
for {
|
||||
select {
|
||||
case data, ok := <-stepsCh:
|
||||
case data, ok = <-stepsCh:
|
||||
if ok { // a receive from a closed channel returns the zero value immediately
|
||||
edrs := data.(types.EventDataRoundState)
|
||||
conR.broadcastNewRoundStep(edrs.RoundState.(*cstypes.RoundState))
|
||||
}
|
||||
case data, ok := <-votesCh:
|
||||
case data, ok = <-votesCh:
|
||||
if ok {
|
||||
edv := data.(types.EventDataVote)
|
||||
conR.broadcastHasVoteMessage(edv.Vote)
|
||||
}
|
||||
case data, ok := <-heartbeatsCh:
|
||||
case data, ok = <-heartbeatsCh:
|
||||
if ok {
|
||||
edph := data.(types.EventDataProposalHeartbeat)
|
||||
conR.broadcastProposalHeartbeatMessage(edph)
|
||||
@ -393,6 +397,10 @@ func (conR *ConsensusReactor) startBroadcastRoutine() error {
|
||||
conR.eventBus.UnsubscribeAll(ctx, subscriber)
|
||||
return
|
||||
}
|
||||
if !ok {
|
||||
conR.eventBus.UnsubscribeAll(ctx, subscriber)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -603,11 +611,9 @@ func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *cstype
|
||||
logger.Debug("Sending block part for catchup failed")
|
||||
}
|
||||
return
|
||||
} else {
|
||||
//logger.Info("No parts to send in catch-up, sleeping")
|
||||
time.Sleep(conR.conS.config.PeerGossipSleep())
|
||||
return
|
||||
}
|
||||
//logger.Info("No parts to send in catch-up, sleeping")
|
||||
time.Sleep(conR.conS.config.PeerGossipSleep())
|
||||
}
|
||||
|
||||
func (conR *ConsensusReactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
|
||||
@ -853,6 +859,10 @@ type peerStateStats struct {
|
||||
blockParts int
|
||||
}
|
||||
|
||||
func (pss peerStateStats) String() string {
|
||||
return fmt.Sprintf("peerStateStats{votes: %d, blockParts: %d}", pss.votes, pss.blockParts)
|
||||
}
|
||||
|
||||
// NewPeerState returns a new PeerState for the given Peer
|
||||
func NewPeerState(peer p2p.Peer) *PeerState {
|
||||
return &PeerState{
|
||||
@ -1083,27 +1093,46 @@ func (ps *PeerState) RecordVote(vote *types.Vote) int {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
|
||||
if ps.stats.lastVoteHeight == vote.Height {
|
||||
if ps.stats.lastVoteHeight >= vote.Height {
|
||||
return ps.stats.votes
|
||||
}
|
||||
ps.stats.lastVoteHeight = vote.Height
|
||||
ps.stats.votes += 1
|
||||
ps.stats.votes++
|
||||
return ps.stats.votes
|
||||
}
|
||||
|
||||
// RecordVote updates internal statistics for this peer by recording the block part.
|
||||
// It returns the total number of block parts (1 per block). This essentially means
|
||||
// the number of blocks for which peer has been sending us block parts.
|
||||
// VotesSent returns the number of blocks for which peer has been sending us
|
||||
// votes.
|
||||
func (ps *PeerState) VotesSent() int {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
|
||||
return ps.stats.votes
|
||||
}
|
||||
|
||||
// RecordBlockPart updates internal statistics for this peer by recording the
|
||||
// block part. It returns the total number of block parts (1 per block). This
|
||||
// essentially means the number of blocks for which peer has been sending us
|
||||
// block parts.
|
||||
func (ps *PeerState) RecordBlockPart(bp *BlockPartMessage) int {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
|
||||
if ps.stats.lastBlockPartHeight == bp.Height {
|
||||
if ps.stats.lastBlockPartHeight >= bp.Height {
|
||||
return ps.stats.blockParts
|
||||
}
|
||||
|
||||
ps.stats.lastBlockPartHeight = bp.Height
|
||||
ps.stats.blockParts += 1
|
||||
ps.stats.blockParts++
|
||||
return ps.stats.blockParts
|
||||
}
|
||||
|
||||
// BlockPartsSent returns the number of blocks for which peer has been sending
|
||||
// us block parts.
|
||||
func (ps *PeerState) BlockPartsSent() int {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
|
||||
return ps.stats.blockParts
|
||||
}
|
||||
|
||||
@ -1253,11 +1282,13 @@ func (ps *PeerState) StringIndented(indent string) string {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
return fmt.Sprintf(`PeerState{
|
||||
%s Key %v
|
||||
%s PRS %v
|
||||
%s Key %v
|
||||
%s PRS %v
|
||||
%s Stats %v
|
||||
%s}`,
|
||||
indent, ps.Peer.ID(),
|
||||
indent, ps.PeerRoundState.StringIndented(indent+" "),
|
||||
indent, ps.stats,
|
||||
indent)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user