Merge pull request #765 from tendermint/762-blockchain-reactor-timeout

blockchain reactor timeout
This commit is contained in:
Ethan Buchman 2017-10-24 09:13:26 -04:00 committed by GitHub
commit 38fc351532
4 changed files with 46 additions and 28 deletions

View File

@ -11,11 +11,25 @@ import (
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
) )
/*
eg, L = latency = 0.1s
P = num peers = 10
FN = num full nodes
BS = 1kB block size
CB = 1 Mbit/s = 128 kB/s
CB/P = 12.8 kB
B/S = CB/P/BS = 12.8 blocks/s
12.8 * 0.1 = 1.28 blocks on conn
*/
const ( const (
requestIntervalMS = 250 requestIntervalMS = 250
maxTotalRequesters = 300 maxTotalRequesters = 300
maxPendingRequests = maxTotalRequesters maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 75 maxPendingRequestsPerPeer = 10
minRecvRate = 10240 // 10Kb/s minRecvRate = 10240 // 10Kb/s
) )
@ -69,9 +83,7 @@ func (pool *BlockPool) OnStart() error {
return nil return nil
} }
func (pool *BlockPool) OnStop() { func (pool *BlockPool) OnStop() {}
pool.BaseService.OnStop()
}
// Run spawns requesters as needed. // Run spawns requesters as needed.
func (pool *BlockPool) makeRequestersRoutine() { func (pool *BlockPool) makeRequestersRoutine() {
@ -188,15 +200,16 @@ func (pool *BlockPool) PopRequest() {
// Remove the peer and redo request from others. // Remove the peer and redo request from others.
func (pool *BlockPool) RedoRequest(height int) { func (pool *BlockPool) RedoRequest(height int) {
pool.mtx.Lock() pool.mtx.Lock()
defer pool.mtx.Unlock()
request := pool.requesters[height] request := pool.requesters[height]
pool.mtx.Unlock()
if request.block == nil { if request.block == nil {
cmn.PanicSanity("Expected block to be non-nil") cmn.PanicSanity("Expected block to be non-nil")
} }
// RemovePeer will redo all requesters associated with this peer. // RemovePeer will redo all requesters associated with this peer.
// TODO: record this malfeasance // TODO: record this malfeasance
pool.RemovePeer(request.peerID) pool.removePeer(request.peerID)
} }
// TODO: ensure that blocks come in order for each peer. // TODO: ensure that blocks come in order for each peer.
@ -206,13 +219,17 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int
requester := pool.requesters[block.Height] requester := pool.requesters[block.Height]
if requester == nil { if requester == nil {
// a block we didn't expect.
// TODO:if height is too far ahead, punish peer
return return
} }
if requester.setBlock(block, peerID) { if requester.setBlock(block, peerID) {
pool.numPending-- pool.numPending--
peer := pool.peers[peerID] peer := pool.peers[peerID]
peer.decrPending(blockSize) if peer != nil {
peer.decrPending(blockSize)
}
} else { } else {
// Bad peer? // Bad peer?
} }

View File

@ -12,6 +12,7 @@ import (
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
) )
const ( const (
@ -79,7 +80,13 @@ func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus,
return bcR return bcR
} }
// OnStart implements BaseService // SetLogger implements cmn.Service by setting the logger on reactor and pool.
func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
bcR.BaseService.Logger = l
bcR.pool.Logger = l
}
// OnStart implements cmn.Service.
func (bcR *BlockchainReactor) OnStart() error { func (bcR *BlockchainReactor) OnStart() error {
bcR.BaseReactor.OnStart() bcR.BaseReactor.OnStart()
if bcR.fastSync { if bcR.fastSync {
@ -92,7 +99,7 @@ func (bcR *BlockchainReactor) OnStart() error {
return nil return nil
} }
// OnStop implements BaseService // OnStop implements cmn.Service.
func (bcR *BlockchainReactor) OnStop() { func (bcR *BlockchainReactor) OnStop() {
bcR.BaseReactor.OnStop() bcR.BaseReactor.OnStop()
bcR.pool.Stop() bcR.pool.Stop()
@ -214,9 +221,9 @@ FOR_LOOP:
// ask for status updates // ask for status updates
go bcR.BroadcastStatusRequest() go bcR.BroadcastStatusRequest()
case <-switchToConsensusTicker.C: case <-switchToConsensusTicker.C:
height, numPending, _ := bcR.pool.GetStatus() height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers() outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters), bcR.Logger.Info("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound) "outbound", outbound, "inbound", inbound)
if bcR.pool.IsCaughtUp() { if bcR.pool.IsCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)

View File

@ -1,12 +1,11 @@
package blockchain package blockchain
import ( import (
"bytes"
"testing" "testing"
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
@ -15,28 +14,24 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
func newBlockchainReactor(logger log.Logger, maxBlockHeight int) *BlockchainReactor { func newBlockchainReactor(maxBlockHeight int) *BlockchainReactor {
config := cfg.ResetTestRoot("node_node_test") logger := log.TestingLogger()
config := cfg.ResetTestRoot("blockchain_reactor_test")
blockStoreDB := db.NewDB("blockstore", config.DBBackend, config.DBDir()) blockStore := NewBlockStore(dbm.NewMemDB())
blockStore := NewBlockStore(blockStoreDB)
stateLogger := logger.With("module", "state")
// Get State // Get State
stateDB := db.NewDB("state", config.DBBackend, config.DBDir()) state, _ := sm.GetState(dbm.NewMemDB(), config.GenesisFile())
state, _ := sm.GetState(stateDB, config.GenesisFile()) state.SetLogger(logger.With("module", "state"))
state.SetLogger(stateLogger)
state.Save() state.Save()
// Make the blockchainReactor itself // Make the blockchainReactor itself
fastSync := true fastSync := true
bcReactor := NewBlockchainReactor(state.Copy(), nil, blockStore, fastSync) bcReactor := NewBlockchainReactor(state.Copy(), nil, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
// Next: we need to set a switch in order for peers to be added in // Next: we need to set a switch in order for peers to be added in
bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig()) bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig())
bcReactor.SetLogger(logger.With("module", "blockchain"))
// Lastly: let's add some blocks in // Lastly: let's add some blocks in
for blockHeight := 1; blockHeight <= maxBlockHeight; blockHeight++ { for blockHeight := 1; blockHeight <= maxBlockHeight; blockHeight++ {
@ -50,12 +45,10 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int) *BlockchainReac
} }
func TestNoBlockMessageResponse(t *testing.T) { func TestNoBlockMessageResponse(t *testing.T) {
logBuf := new(bytes.Buffer)
logger := log.NewTMLogger(logBuf)
maxBlockHeight := 20 maxBlockHeight := 20
bcr := newBlockchainReactor(logger, maxBlockHeight) bcr := newBlockchainReactor(maxBlockHeight)
go bcr.OnStart() bcr.Start()
defer bcr.Stop() defer bcr.Stop()
// Add some peers in // Add some peers in

View File

@ -139,6 +139,7 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
switch msg := msg.(type) { switch msg := msg.(type) {
case *pexRequestMessage: case *pexRequestMessage:
// src requested some peers. // src requested some peers.
// NOTE: we might send an empty selection
r.SendAddrs(src, r.book.GetSelection()) r.SendAddrs(src, r.book.GetSelection())
case *pexAddrsMessage: case *pexAddrsMessage:
// We received some peer addresses from src. // We received some peer addresses from src.