Fix blockchain pool tests

This commit is contained in:
Jae Kwon 2015-09-09 21:44:48 -07:00
parent 625f23af13
commit 3a5741f70f
3 changed files with 37 additions and 14 deletions

View File

@ -72,6 +72,13 @@ func (m *Monitor) Update(n int) int {
return n return n
} }
// Hack to set the current rEMA.
func (m *Monitor) SetREMA(rEMA float64) {
m.mu.Lock()
m.rEMA = rEMA
m.mu.Unlock()
}
// IO is a convenience method intended to wrap io.Reader and io.Writer method // IO is a convenience method intended to wrap io.Reader and io.Writer method
// execution. It calls m.Update(n) and then returns (n, err) unmodified. // execution. It calls m.Update(n) and then returns (n, err) unmodified.
func (m *Monitor) IO(n int, err error) (int, error) { func (m *Monitor) IO(n int, err error) (int, error) {

View File

@ -82,9 +82,13 @@ func (pool *BlockPool) makeRequestsRoutine() {
if numPending >= maxPendingRequests { if numPending >= maxPendingRequests {
// sleep for a bit. // sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond) time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
} else if len(pool.requests) >= maxTotalRequests { } else if len(pool.requests) >= maxTotalRequests {
// sleep for a bit. // sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond) time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
} else { } else {
// request for more blocks. // request for more blocks.
pool.makeNextRequest() pool.makeNextRequest()
@ -92,6 +96,22 @@ func (pool *BlockPool) makeRequestsRoutine() {
} }
} }
func (pool *BlockPool) removeTimedoutPeers() {
for _, peer := range pool.peers {
if !peer.didTimeout && peer.numPending > 0 {
curRate := peer.recvMonitor.Status().CurRate
// XXX remove curRate != 0
if curRate != 0 && curRate < minRecvRate {
pool.sendTimeout(peer.id)
peer.didTimeout = true
}
}
if peer.didTimeout {
pool.removePeer(peer.id)
}
}
}
func (pool *BlockPool) GetStatus() (height int, numPending int32) { func (pool *BlockPool) GetStatus() (height int, numPending int32) {
pool.mtx.Lock() // Lock pool.mtx.Lock() // Lock
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
@ -228,6 +248,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
if peer.isBad() { if peer.isBad() {
pool.removePeer(peer.id) pool.removePeer(peer.id)
continue continue
} else {
} }
if peer.numPending >= maxPendingRequestsPerPeer { if peer.numPending >= maxPendingRequestsPerPeer {
continue continue
@ -309,7 +330,7 @@ func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer {
func (bpp *bpPeer) resetMonitor() { func (bpp *bpPeer) resetMonitor() {
bpp.recvMonitor = flow.New(time.Second, time.Second*40) bpp.recvMonitor = flow.New(time.Second, time.Second*40)
var initialValue = float64(minRecvRate) * math.E var initialValue = float64(minRecvRate) * math.E
bpp.recvMonitor.Update(int(initialValue)) bpp.recvMonitor.SetREMA(initialValue)
} }
func (bpp *bpPeer) resetTimeout() { func (bpp *bpPeer) resetTimeout() {
@ -339,20 +360,12 @@ func (bpp *bpPeer) decrPending(recvSize int) {
} }
func (bpp *bpPeer) onTimeout() { func (bpp *bpPeer) onTimeout() {
bpp.pool.sendTimeout(bpp.id)
bpp.didTimeout = true bpp.didTimeout = true
} }
func (bpp *bpPeer) isBad() bool { func (bpp *bpPeer) isBad() bool {
if bpp.didTimeout { return bpp.didTimeout
bpp.pool.sendTimeout(bpp.id)
return true
}
if bpp.numPending == 0 {
return false
} else {
bpp.pool.sendTimeout(bpp.id)
return bpp.recvMonitor.Status().CurRate < minRecvRate
}
} }
//------------------------------------- //-------------------------------------
@ -426,7 +439,6 @@ func (bpr *bpRequester) redo() {
func (bpr *bpRequester) requestRoutine() { func (bpr *bpRequester) requestRoutine() {
OUTER_LOOP: OUTER_LOOP:
for { for {
// Pick a peer to send request to. // Pick a peer to send request to.
var peer *bpPeer = nil var peer *bpPeer = nil
PICK_PEER_LOOP: PICK_PEER_LOOP:

View File

@ -25,8 +25,8 @@ func makePeers(numPeers int, minHeight, maxHeight int) map[string]testPeer {
} }
func TestBasic(t *testing.T) { func TestBasic(t *testing.T) {
peers := makePeers(10, 0, 1000)
start := 42 start := 42
peers := makePeers(10, start+1, 1000)
timeoutsCh := make(chan string, 100) timeoutsCh := make(chan string, 100)
requestsCh := make(chan BlockRequest, 100) requestsCh := make(chan BlockRequest, 100)
pool := NewBlockPool(start, requestsCh, timeoutsCh) pool := NewBlockPool(start, requestsCh, timeoutsCh)
@ -77,13 +77,17 @@ func TestBasic(t *testing.T) {
} }
func TestTimeout(t *testing.T) { func TestTimeout(t *testing.T) {
peers := makePeers(10, 0, 1000)
start := 42 start := 42
peers := makePeers(10, start+1, 1000)
timeoutsCh := make(chan string, 100) timeoutsCh := make(chan string, 100)
requestsCh := make(chan BlockRequest, 100) requestsCh := make(chan BlockRequest, 100)
pool := NewBlockPool(start, requestsCh, timeoutsCh) pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool.Start() pool.Start()
for _, peer := range peers {
log.Info("Peer", peer.id)
}
// Introduce each peer. // Introduce each peer.
go func() { go func() {
for _, peer := range peers { for _, peer := range peers {