From b25cfb0e0bae591e00c1c4a3d5a869fd16f5dee1 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 28 Jun 2016 18:02:27 -0700 Subject: [PATCH] Unify blockpool mtxs --- blockchain/pool.go | 105 +++++++++++++++++---------------------------- 1 file changed, 40 insertions(+), 65 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index 3c06d669..86f67296 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -35,15 +35,13 @@ type BlockPool struct { QuitService startTime time.Time + mtx sync.Mutex // block requests - mtx sync.Mutex requesters map[int]*bpRequester height int // the lowest key in requesters. numPending int32 // number of requests pending assignment or block response - // peers - peersMtx sync.Mutex - peers map[string]*bpPeer + peers map[string]*bpPeer requestsCh chan<- BlockRequest timeoutsCh chan<- string @@ -100,8 +98,9 @@ func (pool *BlockPool) makeRequestersRoutine() { } func (pool *BlockPool) removeTimedoutPeers() { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() + pool.mtx.Lock() + defer pool.mtx.Unlock() + for _, peer := range pool.peers { if !peer.didTimeout && peer.numPending > 0 { curRate := peer.recvMonitor.Status().CurRate @@ -119,7 +118,7 @@ func (pool *BlockPool) removeTimedoutPeers() { } func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() return pool.height, pool.numPending, len(pool.requesters) @@ -127,10 +126,10 @@ func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters // TODO: relax conditions, prevent abuse. func (pool *BlockPool) IsCaughtUp() bool { - height, _, _ := pool.GetStatus() + pool.mtx.Lock() + defer pool.mtx.Unlock() - pool.peersMtx.Lock() - defer pool.peersMtx.Unlock() + height := pool.height // Need at least 1 peer to be considered caught up. if len(pool.peers) == 0 { @@ -152,7 +151,7 @@ func (pool *BlockPool) IsCaughtUp() bool { // So we peek two blocks at a time. // The caller will verify the commit. func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() if r := pool.requesters[pool.height]; r != nil { @@ -167,7 +166,7 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) // Pop the first block at pool.height // It must have been validated by 'second'.Commit from PeekTwoBlocks(). func (pool *BlockPool) PopRequest() { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() if r := pool.requesters[pool.height]; r != nil { @@ -187,7 +186,7 @@ func (pool *BlockPool) PopRequest() { // Invalidates the block at pool.height, // Remove the peer and redo request from others. func (pool *BlockPool) RedoRequest(height int) { - pool.mtx.Lock() // Lock + pool.mtx.Lock() request := pool.requesters[height] pool.mtx.Unlock() @@ -196,12 +195,12 @@ func (pool *BlockPool) RedoRequest(height int) { } // RemovePeer will redo all requesters associated with this peer. // TODO: record this malfeasance - pool.RemovePeer(request.peerID) // Lock on peersMtx and mtx + pool.RemovePeer(request.peerID) } // TODO: ensure that blocks come in order for each peer. func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() requester := pool.requesters[block.Height] @@ -211,11 +210,8 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int if requester.setBlock(block, peerID) { pool.numPending-- - peer := pool.getPeer(peerID) - - pool.peersMtx.Lock() - peer.decrPending(blockSize, pool.onTimeout(peer)) - pool.peersMtx.Unlock() + peer := pool.peers[peerID] + peer.decrPending(blockSize) } else { // Bad peer? } @@ -223,8 +219,8 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int // Sets the peer's alleged blockchain height. func (pool *BlockPool) SetPeerHeight(peerID string, height int) { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() + pool.mtx.Lock() + defer pool.mtx.Unlock() peer := pool.peers[peerID] if peer != nil { @@ -236,18 +232,13 @@ func (pool *BlockPool) SetPeerHeight(peerID string, height int) { } func (pool *BlockPool) RemovePeer(peerID string) { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() + pool.mtx.Lock() + defer pool.mtx.Unlock() pool.removePeer(peerID) } func (pool *BlockPool) removePeer(peerID string) { - // need to lock pool to access requesters and numPending. - // peersMtx should be locked by caller - pool.mtx.Lock() - defer pool.mtx.Unlock() - for _, requester := range pool.requesters { if requester.getPeerID() == peerID { pool.numPending++ @@ -257,22 +248,14 @@ func (pool *BlockPool) removePeer(peerID string) { delete(pool.peers, peerID) } -func (pool *BlockPool) getPeer(peerID string) *bpPeer { - pool.peersMtx.Lock() // Lock - defer pool.peersMtx.Unlock() - - peer := pool.peers[peerID] - return peer -} - // Pick an available peer with at least the given minHeight. // If no peers are available, returns nil. func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { - pool.peersMtx.Lock() - defer pool.peersMtx.Unlock() + pool.mtx.Lock() + defer pool.mtx.Unlock() for _, peer := range pool.peers { - if peer.isBad() { + if peer.didTimeout { pool.removePeer(peer.id) continue } else { @@ -283,14 +266,14 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer { if peer.height < minHeight { continue } - peer.incrPending(pool.onTimeout(peer)) + peer.incrPending() return peer } return nil } func (pool *BlockPool) makeNextRequester() { - pool.mtx.Lock() // Lock + pool.mtx.Lock() defer pool.mtx.Unlock() nextHeight := pool.height + len(pool.requesters) @@ -316,14 +299,6 @@ func (pool *BlockPool) sendTimeout(peerID string) { pool.timeoutsCh <- peerID } -func (pool *BlockPool) onTimeout(peer *bpPeer) func() { - return func() { - pool.peersMtx.Lock() - defer pool.peersMtx.Unlock() - peer.onTimeout() - } -} - func (pool *BlockPool) debug() string { pool.mtx.Lock() // Lock defer pool.mtx.Unlock() @@ -345,11 +320,13 @@ func (pool *BlockPool) debug() string { type bpPeer struct { pool *BlockPool id string - height int - numPending int32 recvMonitor *flow.Monitor - timeout *time.Timer - didTimeout bool + + mtx sync.Mutex + height int + numPending int32 + timeout *time.Timer + didTimeout bool } func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer { @@ -368,43 +345,41 @@ func (peer *bpPeer) resetMonitor() { peer.recvMonitor.SetREMA(initialValue) } -// needs the closure so we can lock the peersMtx -func (peer *bpPeer) resetTimeout(callback func()) { +func (peer *bpPeer) resetTimeout() { if peer.timeout == nil { - peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, callback) + peer.timeout = time.AfterFunc(time.Second*peerTimeoutSeconds, peer.onTimeout) } else { peer.timeout.Reset(time.Second * peerTimeoutSeconds) } } -func (peer *bpPeer) incrPending(onTimeout func()) { +func (peer *bpPeer) incrPending() { if peer.numPending == 0 { peer.resetMonitor() - peer.resetTimeout(onTimeout) + peer.resetTimeout() } peer.numPending++ } -func (peer *bpPeer) decrPending(recvSize int, onTimeout func()) { +func (peer *bpPeer) decrPending(recvSize int) { peer.numPending-- if peer.numPending == 0 { peer.timeout.Stop() } else { peer.recvMonitor.Update(recvSize) - peer.resetTimeout(onTimeout) + peer.resetTimeout() } } func (peer *bpPeer) onTimeout() { + peer.pool.mtx.Lock() + defer peer.pool.mtx.Unlock() + peer.pool.sendTimeout(peer.id) log.Warn("SendTimeout", "peer", peer.id, "reason", "onTimeout") peer.didTimeout = true } -func (peer *bpPeer) isBad() bool { - return peer.didTimeout -} - //------------------------------------- type bpRequester struct {