[blockchain] replace timeoutsCh with more abstract errorsCh

This commit is contained in:
Anton Kaliaev
2018-02-26 17:35:01 +04:00
parent 87ce804b4a
commit 0c7e871ef0
3 changed files with 47 additions and 36 deletions

View File

@ -1,6 +1,7 @@
package blockchain package blockchain
import ( import (
"errors"
"fmt" "fmt"
"math" "math"
"sync" "sync"
@ -41,7 +42,7 @@ const (
minRecvRate = 7680 minRecvRate = 7680
) )
var peerTimeoutSeconds = time.Duration(15) // not const so we can override with tests var peerTimeout = 15 * time.Second // not const so we can override with tests
/* /*
Peers self report their heights when we join the block pool. Peers self report their heights when we join the block pool.
@ -68,10 +69,10 @@ type BlockPool struct {
maxPeerHeight int64 maxPeerHeight int64
requestsCh chan<- BlockRequest requestsCh chan<- BlockRequest
timeoutsCh chan<- p2p.ID errorsCh chan<- peerError
} }
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, timeoutsCh chan<- p2p.ID) *BlockPool { func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
bp := &BlockPool{ bp := &BlockPool{
peers: make(map[p2p.ID]*bpPeer), peers: make(map[p2p.ID]*bpPeer),
@ -80,7 +81,7 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, timeoutsCh chan<-
numPending: 0, numPending: 0,
requestsCh: requestsCh, requestsCh: requestsCh,
timeoutsCh: timeoutsCh, errorsCh: errorsCh,
} }
bp.BaseService = *cmn.NewBaseService(nil, "BlockPool", bp) bp.BaseService = *cmn.NewBaseService(nil, "BlockPool", bp)
return bp return bp
@ -128,9 +129,10 @@ func (pool *BlockPool) removeTimedoutPeers() {
curRate := peer.recvMonitor.Status().CurRate curRate := peer.recvMonitor.Status().CurRate
// curRate can be 0 on start // curRate can be 0 on start
if curRate != 0 && curRate < minRecvRate { if curRate != 0 && curRate < minRecvRate {
pool.sendTimeout(peer.id) err := errors.New("peer is not sending us data fast enough")
pool.sendError(err, peer.id)
pool.Logger.Error("SendTimeout", "peer", peer.id, pool.Logger.Error("SendTimeout", "peer", peer.id,
"reason", "peer is not sending us data fast enough", "reason", err,
"curRate", fmt.Sprintf("%d KB/s", curRate/1024), "curRate", fmt.Sprintf("%d KB/s", curRate/1024),
"minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024))
peer.didTimeout = true peer.didTimeout = true
@ -340,11 +342,11 @@ func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) {
pool.requestsCh <- BlockRequest{height, peerID} pool.requestsCh <- BlockRequest{height, peerID}
} }
func (pool *BlockPool) sendTimeout(peerID p2p.ID) { func (pool *BlockPool) sendError(err error, peerID p2p.ID) {
if !pool.IsRunning() { if !pool.IsRunning() {
return return
} }
pool.timeoutsCh <- peerID pool.errorsCh <- peerError{err, peerID}
} }
// unused by tendermint; left for debugging purposes // unused by tendermint; left for debugging purposes
@ -403,9 +405,9 @@ func (peer *bpPeer) resetMonitor() {
func (peer *bpPeer) resetTimeout() { func (peer *bpPeer) resetTimeout() {
if peer.timeout == nil { if peer.timeout == nil {
peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout) peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout)
} else { } else {
peer.timeout.Reset(time.Second * peerTimeoutSeconds) peer.timeout.Reset(peerTimeout)
} }
} }
@ -431,8 +433,9 @@ func (peer *bpPeer) onTimeout() {
peer.pool.mtx.Lock() peer.pool.mtx.Lock()
defer peer.pool.mtx.Unlock() defer peer.pool.mtx.Unlock()
peer.pool.sendTimeout(peer.id) err := errors.New("peer did not send us anything")
peer.logger.Error("SendTimeout", "reason", "onTimeout") peer.pool.sendError(err, peer.id)
peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout)
peer.didTimeout = true peer.didTimeout = true
} }

View File

@ -13,7 +13,7 @@ import (
) )
func init() { func init() {
peerTimeoutSeconds = time.Duration(2) peerTimeout = 2 * time.Second
} }
type testPeer struct { type testPeer struct {
@ -34,9 +34,9 @@ func makePeers(numPeers int, minHeight, maxHeight int64) map[p2p.ID]testPeer {
func TestBasic(t *testing.T) { func TestBasic(t *testing.T) {
start := int64(42) start := int64(42)
peers := makePeers(10, start+1, 1000) peers := makePeers(10, start+1, 1000)
timeoutsCh := make(chan p2p.ID, 100) errorsCh := make(chan peerError)
requestsCh := make(chan BlockRequest, 100) requestsCh := make(chan BlockRequest)
pool := NewBlockPool(start, requestsCh, timeoutsCh) pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger()) pool.SetLogger(log.TestingLogger())
err := pool.Start() err := pool.Start()
@ -71,8 +71,8 @@ func TestBasic(t *testing.T) {
// Pull from channels // Pull from channels
for { for {
select { select {
case peerID := <-timeoutsCh: case err := <-errorsCh:
t.Errorf("timeout: %v", peerID) t.Error(err)
case request := <-requestsCh: case request := <-requestsCh:
t.Logf("Pulled new BlockRequest %v", request) t.Logf("Pulled new BlockRequest %v", request)
if request.Height == 300 { if request.Height == 300 {
@ -91,9 +91,9 @@ func TestBasic(t *testing.T) {
func TestTimeout(t *testing.T) { func TestTimeout(t *testing.T) {
start := int64(42) start := int64(42)
peers := makePeers(10, start+1, 1000) peers := makePeers(10, start+1, 1000)
timeoutsCh := make(chan p2p.ID, 100) errorsCh := make(chan peerError)
requestsCh := make(chan BlockRequest, 100) requestsCh := make(chan BlockRequest)
pool := NewBlockPool(start, requestsCh, timeoutsCh) pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger()) pool.SetLogger(log.TestingLogger())
err := pool.Start() err := pool.Start()
if err != nil { if err != nil {
@ -132,9 +132,10 @@ func TestTimeout(t *testing.T) {
timedOut := map[p2p.ID]struct{}{} timedOut := map[p2p.ID]struct{}{}
for { for {
select { select {
case peerID := <-timeoutsCh: case err := <-errorsCh:
t.Logf("Peer %v timeouted", peerID) t.Log(err)
if _, ok := timedOut[peerID]; !ok { // consider error to be always timeout here
if _, ok := timedOut[err.peerID]; !ok {
counter++ counter++
if counter == len(peers) { if counter == len(peers) {
return // Done! return // Done!

View File

@ -22,7 +22,6 @@ const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40) BlockchainChannel = byte(0x40)
defaultChannelCapacity = 1000
trySyncIntervalMS = 50 trySyncIntervalMS = 50
// stop syncing when last block's time is // stop syncing when last block's time is
// within this much of the system time. // within this much of the system time.
@ -40,6 +39,15 @@ type consensusReactor interface {
SwitchToConsensus(sm.State, int) SwitchToConsensus(sm.State, int)
} }
type peerError struct {
err error
peerID p2p.ID
}
func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}
// BlockchainReactor handles long-term catchup syncing. // BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct { type BlockchainReactor struct {
p2p.BaseReactor p2p.BaseReactor
@ -56,7 +64,7 @@ type BlockchainReactor struct {
fastSync bool fastSync bool
requestsCh <-chan BlockRequest requestsCh <-chan BlockRequest
timeoutsCh <-chan p2p.ID errorsCh <-chan peerError
} }
// NewBlockchainReactor returns new reactor instance. // NewBlockchainReactor returns new reactor instance.
@ -68,12 +76,12 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl
store.Height())) store.Height()))
} }
requestsCh := make(chan BlockRequest, defaultChannelCapacity) requestsCh := make(chan BlockRequest)
timeoutsCh := make(chan p2p.ID, defaultChannelCapacity) errorsCh := make(chan peerError)
pool := NewBlockPool( pool := NewBlockPool(
store.Height()+1, store.Height()+1,
requestsCh, requestsCh,
timeoutsCh, errorsCh,
) )
bcR := &BlockchainReactor{ bcR := &BlockchainReactor{
params: state.ConsensusParams, params: state.ConsensusParams,
@ -83,7 +91,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl
pool: pool, pool: pool,
fastSync: fastSync, fastSync: fastSync,
requestsCh: requestsCh, requestsCh: requestsCh,
timeoutsCh: timeoutsCh, errorsCh: errorsCh,
} }
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR return bcR
@ -230,7 +238,7 @@ func (bcR *BlockchainReactor) poolRoutine() {
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
case request := <-bcR.requestsCh: // chan BlockRequest case request := <-bcR.requestsCh:
peer := bcR.Switch.Peers().Get(request.PeerID) peer := bcR.Switch.Peers().Get(request.PeerID)
if peer == nil { if peer == nil {
continue FOR_LOOP // Peer has since been disconnected. continue FOR_LOOP // Peer has since been disconnected.
@ -242,11 +250,10 @@ FOR_LOOP:
// The pool handles timeouts, just let it go. // The pool handles timeouts, just let it go.
continue FOR_LOOP continue FOR_LOOP
} }
case peerID := <-bcR.timeoutsCh: // chan string case err := <-bcR.errorsCh:
// Peer timed out. peer := bcR.Switch.Peers().Get(err.peerID)
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil { if peer != nil {
bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout")) bcR.Switch.StopPeerForError(peer, err)
} }
case <-statusUpdateTicker.C: case <-statusUpdateTicker.C:
// ask for status updates // ask for status updates