Merge pull request #1244 from tendermint/1147-using-mark-good-and-stop-peer-for-error

Using MarkGood and StopPeerForError
This commit is contained in:
Ethan Buchman 2018-03-15 18:58:29 +01:00 committed by GitHub
commit 68e049d3af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 172 additions and 68 deletions

View File

@ -1,6 +1,7 @@
package blockchain package blockchain
import ( import (
"errors"
"fmt" "fmt"
"math" "math"
"sync" "sync"
@ -39,9 +40,12 @@ const (
// Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s,
// sending data across atlantic ~ 7.5 KB/s. // sending data across atlantic ~ 7.5 KB/s.
minRecvRate = 7680 minRecvRate = 7680
// Maximum difference between current and new block's height.
maxDiffBetweenCurrentAndReceivedBlockHeight = 100
) )
var peerTimeoutSeconds = time.Duration(15) // not const so we can override with tests var peerTimeout = 15 * time.Second // not const so we can override with tests
/* /*
Peers self report their heights when we join the block pool. Peers self report their heights when we join the block pool.
@ -68,10 +72,10 @@ type BlockPool struct {
maxPeerHeight int64 maxPeerHeight int64
requestsCh chan<- BlockRequest requestsCh chan<- BlockRequest
timeoutsCh chan<- p2p.ID errorsCh chan<- peerError
} }
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, timeoutsCh chan<- p2p.ID) *BlockPool { func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
bp := &BlockPool{ bp := &BlockPool{
peers: make(map[p2p.ID]*bpPeer), peers: make(map[p2p.ID]*bpPeer),
@ -80,7 +84,7 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, timeoutsCh chan<-
numPending: 0, numPending: 0,
requestsCh: requestsCh, requestsCh: requestsCh,
timeoutsCh: timeoutsCh, errorsCh: errorsCh,
} }
bp.BaseService = *cmn.NewBaseService(nil, "BlockPool", bp) bp.BaseService = *cmn.NewBaseService(nil, "BlockPool", bp)
return bp return bp
@ -128,9 +132,10 @@ func (pool *BlockPool) removeTimedoutPeers() {
curRate := peer.recvMonitor.Status().CurRate curRate := peer.recvMonitor.Status().CurRate
// curRate can be 0 on start // curRate can be 0 on start
if curRate != 0 && curRate < minRecvRate { if curRate != 0 && curRate < minRecvRate {
pool.sendTimeout(peer.id) err := errors.New("peer is not sending us data fast enough")
pool.sendError(err, peer.id)
pool.Logger.Error("SendTimeout", "peer", peer.id, pool.Logger.Error("SendTimeout", "peer", peer.id,
"reason", "peer is not sending us data fast enough", "reason", err,
"curRate", fmt.Sprintf("%d KB/s", curRate/1024), "curRate", fmt.Sprintf("%d KB/s", curRate/1024),
"minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024))
peer.didTimeout = true peer.didTimeout = true
@ -199,7 +204,7 @@ func (pool *BlockPool) PopRequest() {
delete(pool.requesters, pool.height) delete(pool.requesters, pool.height)
pool.height++ pool.height++
} else { } else {
cmn.PanicSanity(cmn.Fmt("Expected requester to pop, got nothing at height %v", pool.height)) panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height))
} }
} }
@ -213,8 +218,9 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID {
request := pool.requesters[height] request := pool.requesters[height]
if request.block == nil { if request.block == nil {
cmn.PanicSanity("Expected block to be non-nil") panic("Expected block to be non-nil")
} }
// RemovePeer will redo all requesters associated with this peer. // RemovePeer will redo all requesters associated with this peer.
pool.removePeer(request.peerID) pool.removePeer(request.peerID)
return request.peerID return request.peerID
@ -227,8 +233,14 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
requester := pool.requesters[block.Height] requester := pool.requesters[block.Height]
if requester == nil { if requester == nil {
// a block we didn't expect. pool.Logger.Info("peer sent us a block we didn't expect", "peer", peerID, "curHeight", pool.height, "blockHeight", block.Height)
// TODO:if height is too far ahead, punish peer diff := pool.height - block.Height
if diff < 0 {
diff *= -1
}
if diff > maxDiffBetweenCurrentAndReceivedBlockHeight {
pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID)
}
return return
} }
@ -339,11 +351,11 @@ func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) {
pool.requestsCh <- BlockRequest{height, peerID} pool.requestsCh <- BlockRequest{height, peerID}
} }
func (pool *BlockPool) sendTimeout(peerID p2p.ID) { func (pool *BlockPool) sendError(err error, peerID p2p.ID) {
if !pool.IsRunning() { if !pool.IsRunning() {
return return
} }
pool.timeoutsCh <- peerID pool.errorsCh <- peerError{err, peerID}
} }
// unused by tendermint; left for debugging purposes // unused by tendermint; left for debugging purposes
@ -402,9 +414,9 @@ func (peer *bpPeer) resetMonitor() {
func (peer *bpPeer) resetTimeout() { func (peer *bpPeer) resetTimeout() {
if peer.timeout == nil { if peer.timeout == nil {
peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout) peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout)
} else { } else {
peer.timeout.Reset(time.Second * peerTimeoutSeconds) peer.timeout.Reset(peerTimeout)
} }
} }
@ -430,8 +442,9 @@ func (peer *bpPeer) onTimeout() {
peer.pool.mtx.Lock() peer.pool.mtx.Lock()
defer peer.pool.mtx.Unlock() defer peer.pool.mtx.Unlock()
peer.pool.sendTimeout(peer.id) err := errors.New("peer did not send us anything")
peer.logger.Error("SendTimeout", "reason", "onTimeout") peer.pool.sendError(err, peer.id)
peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout)
peer.didTimeout = true peer.didTimeout = true
} }

View File

@ -13,7 +13,7 @@ import (
) )
func init() { func init() {
peerTimeoutSeconds = time.Duration(2) peerTimeout = 2 * time.Second
} }
type testPeer struct { type testPeer struct {
@ -34,9 +34,9 @@ func makePeers(numPeers int, minHeight, maxHeight int64) map[p2p.ID]testPeer {
func TestBasic(t *testing.T) { func TestBasic(t *testing.T) {
start := int64(42) start := int64(42)
peers := makePeers(10, start+1, 1000) peers := makePeers(10, start+1, 1000)
timeoutsCh := make(chan p2p.ID, 100) errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 100) requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(start, requestsCh, timeoutsCh) pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger()) pool.SetLogger(log.TestingLogger())
err := pool.Start() err := pool.Start()
@ -71,8 +71,8 @@ func TestBasic(t *testing.T) {
// Pull from channels // Pull from channels
for { for {
select { select {
case peerID := <-timeoutsCh: case err := <-errorsCh:
t.Errorf("timeout: %v", peerID) t.Error(err)
case request := <-requestsCh: case request := <-requestsCh:
t.Logf("Pulled new BlockRequest %v", request) t.Logf("Pulled new BlockRequest %v", request)
if request.Height == 300 { if request.Height == 300 {
@ -91,9 +91,9 @@ func TestBasic(t *testing.T) {
func TestTimeout(t *testing.T) { func TestTimeout(t *testing.T) {
start := int64(42) start := int64(42)
peers := makePeers(10, start+1, 1000) peers := makePeers(10, start+1, 1000)
timeoutsCh := make(chan p2p.ID, 100) errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 100) requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(start, requestsCh, timeoutsCh) pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger()) pool.SetLogger(log.TestingLogger())
err := pool.Start() err := pool.Start()
if err != nil { if err != nil {
@ -132,9 +132,10 @@ func TestTimeout(t *testing.T) {
timedOut := map[p2p.ID]struct{}{} timedOut := map[p2p.ID]struct{}{}
for { for {
select { select {
case peerID := <-timeoutsCh: case err := <-errorsCh:
t.Logf("Peer %v timeouted", peerID) t.Log(err)
if _, ok := timedOut[peerID]; !ok { // consider error to be always timeout here
if _, ok := timedOut[err.peerID]; !ok {
counter++ counter++
if counter == len(peers) { if counter == len(peers) {
return // Done! return // Done!

View File

@ -22,8 +22,7 @@ const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40) BlockchainChannel = byte(0x40)
defaultChannelCapacity = 1000 trySyncIntervalMS = 50
trySyncIntervalMS = 50
// stop syncing when last block's time is // stop syncing when last block's time is
// within this much of the system time. // within this much of the system time.
// stopSyncingDurationMinutes = 10 // stopSyncingDurationMinutes = 10
@ -40,6 +39,15 @@ type consensusReactor interface {
SwitchToConsensus(sm.State, int) SwitchToConsensus(sm.State, int)
} }
type peerError struct {
err error
peerID p2p.ID
}
func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}
// BlockchainReactor handles long-term catchup syncing. // BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct { type BlockchainReactor struct {
p2p.BaseReactor p2p.BaseReactor
@ -56,7 +64,7 @@ type BlockchainReactor struct {
fastSync bool fastSync bool
requestsCh <-chan BlockRequest requestsCh <-chan BlockRequest
timeoutsCh <-chan p2p.ID errorsCh <-chan peerError
} }
// NewBlockchainReactor returns new reactor instance. // NewBlockchainReactor returns new reactor instance.
@ -64,17 +72,20 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl
fastSync bool) *BlockchainReactor { fastSync bool) *BlockchainReactor {
if state.LastBlockHeight != store.Height() { if state.LastBlockHeight != store.Height() {
cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height())) store.Height()))
} }
requestsCh := make(chan BlockRequest, defaultChannelCapacity) const cap = 1000 // must be bigger than peers count
timeoutsCh := make(chan p2p.ID, defaultChannelCapacity) requestsCh := make(chan BlockRequest, cap)
errorsCh := make(chan peerError, cap) // so we don't block in #Receive#pool.AddBlock
pool := NewBlockPool( pool := NewBlockPool(
store.Height()+1, store.Height()+1,
requestsCh, requestsCh,
timeoutsCh, errorsCh,
) )
bcR := &BlockchainReactor{ bcR := &BlockchainReactor{
params: state.ConsensusParams, params: state.ConsensusParams,
initialState: state, initialState: state,
@ -83,7 +94,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl
pool: pool, pool: pool,
fastSync: fastSync, fastSync: fastSync,
requestsCh: requestsCh, requestsCh: requestsCh,
timeoutsCh: timeoutsCh, errorsCh: errorsCh,
} }
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR return bcR
@ -166,7 +177,8 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage,
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes, bcR.maxMsgSize()) _, msg, err := DecodeMessage(msgBytes, bcR.maxMsgSize())
if err != nil { if err != nil {
bcR.Logger.Error("Error decoding message", "err", err) bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
bcR.Switch.StopPeerForError(src, err)
return return
} }
@ -230,7 +242,7 @@ func (bcR *BlockchainReactor) poolRoutine() {
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case request := <-bcR.requestsCh: // chan BlockRequest case request := <-bcR.requestsCh:
peer := bcR.Switch.Peers().Get(request.PeerID) peer := bcR.Switch.Peers().Get(request.PeerID)
if peer == nil { if peer == nil {
continue FOR_LOOP // Peer has since been disconnected. continue FOR_LOOP // Peer has since been disconnected.
@ -242,11 +254,10 @@ FOR_LOOP:
// The pool handles timeouts, just let it go. // The pool handles timeouts, just let it go.
continue FOR_LOOP continue FOR_LOOP
} }
case peerID := <-bcR.timeoutsCh: // chan string case err := <-bcR.errorsCh:
// Peer timed out. peer := bcR.Switch.Peers().Get(err.peerID)
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil { if peer != nil {
bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) bcR.Switch.StopPeerForError(peer, err)
} }
case <-statusUpdateTicker.C: case <-statusUpdateTicker.C:
// ask for status updates // ask for status updates

View File

@ -76,7 +76,7 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block {
} }
blockMeta := wire.ReadBinary(&types.BlockMeta{}, r, 0, &n, &err).(*types.BlockMeta) blockMeta := wire.ReadBinary(&types.BlockMeta{}, r, 0, &n, &err).(*types.BlockMeta)
if err != nil { if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error reading block meta: %v", err)) panic(fmt.Sprintf("Error reading block meta: %v", err))
} }
bytez := []byte{} bytez := []byte{}
for i := 0; i < blockMeta.BlockID.PartsHeader.Total; i++ { for i := 0; i < blockMeta.BlockID.PartsHeader.Total; i++ {
@ -85,7 +85,7 @@ func (bs *BlockStore) LoadBlock(height int64) *types.Block {
} }
block := wire.ReadBinary(&types.Block{}, bytes.NewReader(bytez), 0, &n, &err).(*types.Block) block := wire.ReadBinary(&types.Block{}, bytes.NewReader(bytez), 0, &n, &err).(*types.Block)
if err != nil { if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error reading block: %v", err)) panic(fmt.Sprintf("Error reading block: %v", err))
} }
return block return block
} }
@ -102,7 +102,7 @@ func (bs *BlockStore) LoadBlockPart(height int64, index int) *types.Part {
} }
part := wire.ReadBinary(&types.Part{}, r, 0, &n, &err).(*types.Part) part := wire.ReadBinary(&types.Part{}, r, 0, &n, &err).(*types.Part)
if err != nil { if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error reading block part: %v", err)) panic(fmt.Sprintf("Error reading block part: %v", err))
} }
return part return part
} }
@ -118,7 +118,7 @@ func (bs *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
} }
blockMeta := wire.ReadBinary(&types.BlockMeta{}, r, 0, &n, &err).(*types.BlockMeta) blockMeta := wire.ReadBinary(&types.BlockMeta{}, r, 0, &n, &err).(*types.BlockMeta)
if err != nil { if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error reading block meta: %v", err)) panic(fmt.Sprintf("Error reading block meta: %v", err))
} }
return blockMeta return blockMeta
} }
@ -136,7 +136,7 @@ func (bs *BlockStore) LoadBlockCommit(height int64) *types.Commit {
} }
commit := wire.ReadBinary(&types.Commit{}, r, 0, &n, &err).(*types.Commit) commit := wire.ReadBinary(&types.Commit{}, r, 0, &n, &err).(*types.Commit)
if err != nil { if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error reading commit: %v", err)) panic(fmt.Sprintf("Error reading commit: %v", err))
} }
return commit return commit
} }
@ -153,7 +153,7 @@ func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit {
} }
commit := wire.ReadBinary(&types.Commit{}, r, 0, &n, &err).(*types.Commit) commit := wire.ReadBinary(&types.Commit{}, r, 0, &n, &err).(*types.Commit)
if err != nil { if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error reading commit: %v", err)) panic(fmt.Sprintf("Error reading commit: %v", err))
} }
return commit return commit
} }
@ -262,7 +262,7 @@ func LoadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON {
bsj := BlockStoreStateJSON{} bsj := BlockStoreStateJSON{}
err := json.Unmarshal(bytes, &bsj) err := json.Unmarshal(bytes, &bsj)
if err != nil { if err != nil {
cmn.PanicCrisis(cmn.Fmt("Could not unmarshal bytes: %X", bytes)) panic(fmt.Sprintf("Could not unmarshal bytes: %X", bytes))
} }
return bsj return bsj
} }

View File

@ -46,9 +46,9 @@ func TestByzantine(t *testing.T) {
eventChans := make([]chan interface{}, N) eventChans := make([]chan interface{}, N)
reactors := make([]p2p.Reactor, N) reactors := make([]p2p.Reactor, N)
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
// make first val byzantine
if i == 0 { if i == 0 {
css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator) css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator)
// make byzantine
css[i].decideProposal = func(j int) func(int64, int) { css[i].decideProposal = func(j int) func(int64, int) {
return func(height int64, round int) { return func(height int64, round int) {
byzantineDecideProposalFunc(t, height, round, css[j], switches[j]) byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
@ -74,9 +74,11 @@ func TestByzantine(t *testing.T) {
var conRI p2p.Reactor // nolint: gotype, gosimple var conRI p2p.Reactor // nolint: gotype, gosimple
conRI = conR conRI = conR
// make first val byzantine
if i == 0 { if i == 0 {
conRI = NewByzantineReactor(conR) conRI = NewByzantineReactor(conR)
} }
reactors[i] = conRI reactors[i] = conRI
} }
@ -115,19 +117,19 @@ func TestByzantine(t *testing.T) {
// and the other block to peers[1] and peers[2]. // and the other block to peers[1] and peers[2].
// note peers and switches order don't match. // note peers and switches order don't match.
peers := switches[0].Peers().List() peers := switches[0].Peers().List()
// partition A
ind0 := getSwitchIndex(switches, peers[0]) ind0 := getSwitchIndex(switches, peers[0])
// partition B
ind1 := getSwitchIndex(switches, peers[1]) ind1 := getSwitchIndex(switches, peers[1])
ind2 := getSwitchIndex(switches, peers[2]) ind2 := getSwitchIndex(switches, peers[2])
// connect the 2 peers in the larger partition
p2p.Connect2Switches(switches, ind1, ind2) p2p.Connect2Switches(switches, ind1, ind2)
// wait for someone in the big partition to make a block // wait for someone in the big partition (B) to make a block
<-eventChans[ind2] <-eventChans[ind2]
t.Log("A block has been committed. Healing partition") t.Log("A block has been committed. Healing partition")
// connect the partitions
p2p.Connect2Switches(switches, ind0, ind1) p2p.Connect2Switches(switches, ind0, ind1)
p2p.Connect2Switches(switches, ind0, ind2) p2p.Connect2Switches(switches, ind0, ind2)

View File

@ -179,7 +179,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
// TODO punish peer? conR.Switch.StopPeerForError(src, err)
return return
} }
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
@ -251,6 +251,9 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
ps.ApplyProposalPOLMessage(msg) ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage: case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
if numBlocks := ps.RecordBlockPart(msg); numBlocks > 10000 {
conR.Switch.MarkPeerAsGood(src)
}
conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()} conR.conS.peerMsgQueue <- msgInfo{msg, src.ID()}
default: default:
conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
@ -270,6 +273,9 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height, valSize)
ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote) ps.SetHasVote(msg.Vote)
if blocks := ps.RecordVote(msg.Vote); blocks > 10000 {
conR.Switch.MarkPeerAsGood(src)
}
cs.peerMsgQueue <- msgInfo{msg, src.ID()} cs.peerMsgQueue <- msgInfo{msg, src.ID()}
@ -831,6 +837,17 @@ type PeerState struct {
mtx sync.Mutex mtx sync.Mutex
cstypes.PeerRoundState cstypes.PeerRoundState
stats *peerStateStats
}
// peerStateStats holds internal statistics for a peer.
type peerStateStats struct {
lastVoteHeight int64
votes int
lastBlockPartHeight int64
blockParts int
} }
// NewPeerState returns a new PeerState for the given Peer // NewPeerState returns a new PeerState for the given Peer
@ -844,6 +861,7 @@ func NewPeerState(peer p2p.Peer) *PeerState {
LastCommitRound: -1, LastCommitRound: -1,
CatchupCommitRound: -1, CatchupCommitRound: -1,
}, },
stats: &peerStateStats{},
} }
} }
@ -1055,6 +1073,37 @@ func (ps *PeerState) ensureVoteBitArrays(height int64, numValidators int) {
} }
} }
// RecordVote updates internal statistics for this peer by recording the vote.
// It returns the total number of votes (1 per block). This essentially means
// the number of blocks for which peer has been sending us votes.
func (ps *PeerState) RecordVote(vote *types.Vote) int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.stats.lastVoteHeight == vote.Height {
return ps.stats.votes
}
ps.stats.lastVoteHeight = vote.Height
ps.stats.votes += 1
return ps.stats.votes
}
// RecordVote updates internal statistics for this peer by recording the block part.
// It returns the total number of block parts (1 per block). This essentially means
// the number of blocks for which peer has been sending us block parts.
func (ps *PeerState) RecordBlockPart(bp *BlockPartMessage) int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.stats.lastBlockPartHeight == bp.Height {
return ps.stats.blockParts
}
ps.stats.lastBlockPartHeight = bp.Height
ps.stats.blockParts += 1
return ps.stats.blockParts
}
// SetHasVote sets the given vote as known by the peer // SetHasVote sets the given vote as known by the peer
func (ps *PeerState) SetHasVote(vote *types.Vote) { func (ps *PeerState) SetHasVote(vote *types.Vote) {
ps.mtx.Lock() ps.mtx.Lock()

View File

@ -583,6 +583,10 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) {
err := cs.tryAddVote(msg.Vote, peerID) err := cs.tryAddVote(msg.Vote, peerID)
if err == ErrAddingVote { if err == ErrAddingVote {
// TODO: punish peer // TODO: punish peer
// We probably don't want to stop the peer here. The vote does not
// necessarily comes from a malicious peer but can be just broadcasted by
// a typical peer.
// https://github.com/tendermint/tendermint/issues/1281
} }
// NOTE: the vote is broadcast to peers by the reactor listening // NOTE: the vote is broadcast to peers by the reactor listening

View File

@ -1,6 +1,7 @@
package types package types
import ( import (
"errors"
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
@ -15,6 +16,10 @@ type RoundVoteSet struct {
Precommits *types.VoteSet Precommits *types.VoteSet
} }
var (
GotVoteFromUnwantedRoundError = errors.New("Peer has sent a vote that does not match our round for more than one round")
)
/* /*
Keeps track of all VoteSets from round 0 to round 'round'. Keeps track of all VoteSets from round 0 to round 'round'.
@ -117,10 +122,8 @@ func (hvs *HeightVoteSet) AddVote(vote *types.Vote, peerID p2p.ID) (added bool,
voteSet = hvs.getVoteSet(vote.Round, vote.Type) voteSet = hvs.getVoteSet(vote.Round, vote.Type)
hvs.peerCatchupRounds[peerID] = append(rndz, vote.Round) hvs.peerCatchupRounds[peerID] = append(rndz, vote.Round)
} else { } else {
// Peer has sent a vote that does not match our round, // punish peer
// for more than one round. Bad peer! err = GotVoteFromUnwantedRoundError
// TODO punish peer.
// log.Warn("Deal with peer giving votes from unwanted rounds")
return return
} }
} }

View File

@ -34,8 +34,8 @@ func TestPeerCatchupRounds(t *testing.T) {
vote1001_0 := makeVoteHR(t, 1, 1001, privVals, 0) vote1001_0 := makeVoteHR(t, 1, 1001, privVals, 0)
added, err = hvs.AddVote(vote1001_0, "peer1") added, err = hvs.AddVote(vote1001_0, "peer1")
if err != nil { if err != GotVoteFromUnwantedRoundError {
t.Error("AddVote error", err) t.Errorf("Expected GotVoteFromUnwantedRoundError, but got %v", err)
} }
if added { if added {
t.Error("Expected to *not* add vote from peer, too many catchup rounds.") t.Error("Expected to *not* add vote from peer, too many catchup rounds.")

View File

@ -84,7 +84,8 @@ func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
evR.Logger.Error("Error decoding message", "err", err) evR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
evR.Switch.StopPeerForError(src, err)
return return
} }
evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
@ -95,7 +96,8 @@ func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
err := evR.evpool.AddEvidence(ev) err := evR.evpool.AddEvidence(ev)
if err != nil { if err != nil {
evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err) evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err)
// TODO: punish peer // punish peer
evR.Switch.StopPeerForError(src, err)
} }
} }
default: default:

View File

@ -73,7 +73,8 @@ func (memR *MempoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { func (memR *MempoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
memR.Logger.Error("Error decoding message", "err", err) memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
memR.Switch.StopPeerForError(src, err)
return return
} }
memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) memR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)

View File

@ -287,6 +287,8 @@ func NewNode(config *cfg.Config,
sw.AddReactor("PEX", pexReactor) sw.AddReactor("PEX", pexReactor)
} }
sw.SetAddrBook(addrBook)
// Filter peers by addr or pubkey with an ABCI query. // Filter peers by addr or pubkey with an ABCI query.
// If the query return code is OK, add peer. // If the query return code is OK, add peer.
// XXX: Query format subject to change // XXX: Query format subject to change

View File

@ -358,7 +358,7 @@ func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, ch
onReceive := func(chID byte, msgBytes []byte) { onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID] reactor := reactorsByCh[chID]
if reactor == nil { if reactor == nil {
cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID)) onPeerError(p, fmt.Errorf("Unknown channel %X", chID))
} }
reactor.Receive(chID, p, msgBytes) reactor.Receive(chID, p, msgBytes)
} }

View File

@ -167,7 +167,8 @@ func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) {
func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
r.Logger.Error("Error decoding message", "err", err) r.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
r.Switch.StopPeerForError(src, err)
return return
} }
r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg) r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)

View File

@ -35,6 +35,7 @@ const (
type AddrBook interface { type AddrBook interface {
AddAddress(addr *NetAddress, src *NetAddress) error AddAddress(addr *NetAddress, src *NetAddress) error
MarkGood(*NetAddress)
Save() Save()
} }
@ -57,6 +58,7 @@ type Switch struct {
dialing *cmn.CMap dialing *cmn.CMap
nodeInfo NodeInfo // our node info nodeInfo NodeInfo // our node info
nodeKey *NodeKey // our node privkey nodeKey *NodeKey // our node privkey
addrBook AddrBook
filterConnByAddr func(net.Addr) error filterConnByAddr func(net.Addr) error
filterConnByID func(ID) error filterConnByID func(ID) error
@ -317,6 +319,19 @@ func (sw *Switch) reconnectToPeer(peer Peer) {
sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start)) sw.Logger.Error("Failed to reconnect to peer. Giving up", "peer", peer, "elapsed", time.Since(start))
} }
// SetAddrBook allows to set address book on Switch.
func (sw *Switch) SetAddrBook(addrBook AddrBook) {
sw.addrBook = addrBook
}
// MarkPeerAsGood marks the given peer as good when it did something useful
// like contributed to consensus.
func (sw *Switch) MarkPeerAsGood(peer Peer) {
if sw.addrBook != nil {
sw.addrBook.MarkGood(peer.NodeInfo().NetAddress())
}
}
//--------------------------------------------------------------------- //---------------------------------------------------------------------
// Dialing // Dialing

View File

@ -6,7 +6,7 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/wire" "github.com/tendermint/tendermint/wire"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
) )