130 lines
3.1 KiB
Go
Raw Normal View History

2019-04-13 09:23:43 -04:00
package blockchain
2019-03-26 09:58:30 +01:00
import (
"fmt"
"math"
"time"
flow "github.com/tendermint/tendermint/libs/flowrate"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
2019-03-26 09:58:30 +01:00
)
//--------
// Peer
var (
peerTimeout = 15 * time.Second // not const so we can override with tests
// Minimum recv rate to ensure we're receiving blocks from a peer fast
// enough. If a peer is not sending data at at least that rate, we
2019-03-26 09:58:30 +01:00
// consider them to have timedout and we disconnect.
//
// Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s,
// sending data across atlantic ~ 7.5 KB/s.
minRecvRate = int64(7680)
// Monitor parameters
peerSampleRate = time.Second
peerWindowSize = 40 * peerSampleRate
)
type bpPeer struct {
2019-05-07 21:06:43 -04:00
logger log.Logger
id p2p.ID
height int64 // the peer reported height
numPending int32 // number of requests still waiting for block responses
blocks map[int64]*types.Block // blocks received or expected to be received from this peer
timeout *time.Timer
2019-03-26 09:58:30 +01:00
recvMonitor *flow.Monitor
errFunc func(err error, peerID p2p.ID) // function to call on error
}
func newBPPeer(
peerID p2p.ID, height int64, errFunc func(err error, peerID p2p.ID)) *bpPeer {
peer := &bpPeer{
id: peerID,
height: height,
blocks: make(map[int64]*types.Block, maxRequestsPerPeer),
2019-03-26 09:58:30 +01:00
numPending: 0,
logger: log.NewNopLogger(),
errFunc: errFunc,
}
return peer
}
func (peer *bpPeer) setLogger(l log.Logger) {
peer.logger = l
}
func (peer *bpPeer) resetMonitor() {
peer.recvMonitor = flow.New(peerSampleRate, peerWindowSize)
initialValue := float64(minRecvRate) * math.E
peer.recvMonitor.SetREMA(initialValue)
}
func (peer *bpPeer) resetTimeout() {
if peer.timeout == nil {
peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout)
} else {
peer.timeout.Reset(peerTimeout)
}
}
func (peer *bpPeer) incrPending() {
if peer.numPending == 0 {
peer.resetMonitor()
peer.resetTimeout()
}
peer.numPending++
}
func (peer *bpPeer) decrPending(recvSize int) {
if peer.numPending == 0 {
panic("cannot decrement, peer does not have pending requests")
}
peer.numPending--
if peer.numPending == 0 {
peer.timeout.Stop()
} else {
peer.recvMonitor.Update(recvSize)
peer.resetTimeout()
}
}
func (peer *bpPeer) onTimeout() {
peer.errFunc(errNoPeerResponse, peer.id)
}
2019-03-26 14:51:37 +01:00
func (peer *bpPeer) isGood() error {
2019-03-26 09:58:30 +01:00
2019-05-15 09:28:50 -04:00
if peer.numPending > 0 {
2019-03-26 09:58:30 +01:00
curRate := peer.recvMonitor.Status().CurRate
// curRate can be 0 on start
if curRate != 0 && curRate < minRecvRate {
err := errSlowPeer
peer.logger.Error("SendTimeout", "peer", peer.id,
"reason", err,
"curRate", fmt.Sprintf("%d KB/s", curRate/1024),
"minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024))
// consider the peer timedout
return errSlowPeer
}
}
return nil
}
func (peer *bpPeer) cleanup() {
if peer.timeout != nil {
peer.timeout.Stop()
}
}
2019-03-27 19:54:08 +01:00
func (peer *bpPeer) String() string {
return fmt.Sprintf("peer: %v height: %v pending: %v", peer.id, peer.height, peer.numPending)
}