From 10ff7bd3425aeb361e8828e6dd161ba007b4119f Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Fri, 19 Jul 2019 02:17:49 +0300 Subject: [PATCH] Peer pruning --- blockchain/v2/scheduler.go | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/blockchain/v2/scheduler.go b/blockchain/v2/scheduler.go index c1d89372..27280747 100644 --- a/blockchain/v2/scheduler.go +++ b/blockchain/v2/scheduler.go @@ -380,9 +380,16 @@ func (sc *schedule) selectPeer(peers []*scPeer) *scPeer { return peers[r.Intn(len(peers))] } -func (sc *schedule) prunablePeers(time time.Time, minSpeed int) []p2p.ID { - // TODO - return []p2p.ID{} +// XXX: this duplicates the logic of peersInactiveSince and peersSlowerThan +func (sc *schedule) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID { + prunable := []p2p.ID{} + for peerID, peer := range sc.peers { + if now.Sub(peer.lastTouched) > peerTimout || peer.lastRate < minRecvRate { + prunable = append(prunable, peerID) + } + } + + return prunable } func (sc *schedule) numBlockInState(targetState blockState) uint32 { @@ -399,12 +406,16 @@ type Scheduler struct { sc *schedule targetPending uint32 // the number of blocks we want in blockStatePending targetReceived uint32 // the number of blocks we want in blockStateReceived - + minRecvRate int64 + peerTimeout time.Duration } -func NewScheduler(minHeight int64, targetPending uint32, targetReceived uint32) *Scheduler { +func NewScheduler(minHeight int64, minRecvRate int64, peerTimeout time.Duration, + targetPending uint32, targetReceived uint32) *Scheduler { return &Scheduler{ sc: newSchedule(minHeight), + minRecvRate: minRecvRate, + peerTimeout: peerTimeout, targetPending: targetPending, targetReceived: targetReceived, } @@ -499,9 +510,16 @@ func (sdr *Scheduler) handleBlockProcessError(peerID p2p.ID, height int64) Event return Skip{} } -func (sdr *Scheduler) handleTimeCheck(peerID p2p.ID, now time.Time) interface{} { - // prune peers - // TODO +// XXX: Probably split this into two methods +// + handlePurgePeers +// + handleNextSchehedule +func (sdr *Scheduler) handleTimeCheck(now time.Time) interface{} { + prunablePeers := sdr.sc.prunablePeers(sdr.peerTimeout, sdr.minRecvRate, now) + + for _, peerID := range prunablePeers { + // XXX: Emit these events + _ = sdr.sc.removePeer(peerID) + } // produce new schedule events := []scBlockRequestMessage{}