mirror of
https://github.com/fluencelabs/tendermint
synced 2025-05-29 14:11:21 +00:00
Merge pull request #231 from tendermint/blockpool_race_fix_jae
Blockpool race fix
This commit is contained in:
commit
574d735a81
3
Makefile
3
Makefile
@ -20,6 +20,9 @@ build_race:
|
|||||||
|
|
||||||
test: build
|
test: build
|
||||||
go test `${NOVENDOR}`
|
go test `${NOVENDOR}`
|
||||||
|
|
||||||
|
test_race: build
|
||||||
|
go test -race `${NOVENDOR}`
|
||||||
|
|
||||||
test100: build
|
test100: build
|
||||||
for i in {1..100}; do make test; done
|
for i in {1..100}; do make test; done
|
||||||
|
@ -35,15 +35,13 @@ type BlockPool struct {
|
|||||||
QuitService
|
QuitService
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
|
|
||||||
|
mtx sync.Mutex
|
||||||
// block requests
|
// block requests
|
||||||
mtx sync.Mutex
|
|
||||||
requesters map[int]*bpRequester
|
requesters map[int]*bpRequester
|
||||||
height int // the lowest key in requesters.
|
height int // the lowest key in requesters.
|
||||||
numPending int32 // number of requests pending assignment or block response
|
numPending int32 // number of requests pending assignment or block response
|
||||||
|
|
||||||
// peers
|
// peers
|
||||||
peersMtx sync.Mutex
|
peers map[string]*bpPeer
|
||||||
peers map[string]*bpPeer
|
|
||||||
|
|
||||||
requestsCh chan<- BlockRequest
|
requestsCh chan<- BlockRequest
|
||||||
timeoutsCh chan<- string
|
timeoutsCh chan<- string
|
||||||
@ -81,13 +79,13 @@ func (pool *BlockPool) makeRequestersRoutine() {
|
|||||||
if !pool.IsRunning() {
|
if !pool.IsRunning() {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
_, numPending := pool.GetStatus()
|
_, numPending, lenRequesters := pool.GetStatus()
|
||||||
if numPending >= maxPendingRequests {
|
if numPending >= maxPendingRequests {
|
||||||
// sleep for a bit.
|
// sleep for a bit.
|
||||||
time.Sleep(requestIntervalMS * time.Millisecond)
|
time.Sleep(requestIntervalMS * time.Millisecond)
|
||||||
// check for timed out peers
|
// check for timed out peers
|
||||||
pool.removeTimedoutPeers()
|
pool.removeTimedoutPeers()
|
||||||
} else if len(pool.requesters) >= maxTotalRequesters {
|
} else if lenRequesters >= maxTotalRequesters {
|
||||||
// sleep for a bit.
|
// sleep for a bit.
|
||||||
time.Sleep(requestIntervalMS * time.Millisecond)
|
time.Sleep(requestIntervalMS * time.Millisecond)
|
||||||
// check for timed out peers
|
// check for timed out peers
|
||||||
@ -100,6 +98,9 @@ func (pool *BlockPool) makeRequestersRoutine() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pool *BlockPool) removeTimedoutPeers() {
|
func (pool *BlockPool) removeTimedoutPeers() {
|
||||||
|
pool.mtx.Lock()
|
||||||
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
for _, peer := range pool.peers {
|
for _, peer := range pool.peers {
|
||||||
if !peer.didTimeout && peer.numPending > 0 {
|
if !peer.didTimeout && peer.numPending > 0 {
|
||||||
curRate := peer.recvMonitor.Status().CurRate
|
curRate := peer.recvMonitor.Status().CurRate
|
||||||
@ -111,25 +112,24 @@ func (pool *BlockPool) removeTimedoutPeers() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if peer.didTimeout {
|
if peer.didTimeout {
|
||||||
pool.peersMtx.Lock() // Lock
|
|
||||||
pool.removePeer(peer.id)
|
pool.removePeer(peer.id)
|
||||||
pool.peersMtx.Unlock()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *BlockPool) GetStatus() (height int, numPending int32) {
|
func (pool *BlockPool) GetStatus() (height int, numPending int32, lenRequesters int) {
|
||||||
pool.mtx.Lock() // Lock
|
pool.mtx.Lock()
|
||||||
defer pool.mtx.Unlock()
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
return pool.height, pool.numPending
|
return pool.height, pool.numPending, len(pool.requesters)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: relax conditions, prevent abuse.
|
// TODO: relax conditions, prevent abuse.
|
||||||
func (pool *BlockPool) IsCaughtUp() bool {
|
func (pool *BlockPool) IsCaughtUp() bool {
|
||||||
pool.mtx.Lock()
|
pool.mtx.Lock()
|
||||||
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
height := pool.height
|
height := pool.height
|
||||||
pool.mtx.Unlock()
|
|
||||||
|
|
||||||
// Need at least 1 peer to be considered caught up.
|
// Need at least 1 peer to be considered caught up.
|
||||||
if len(pool.peers) == 0 {
|
if len(pool.peers) == 0 {
|
||||||
@ -137,12 +137,10 @@ func (pool *BlockPool) IsCaughtUp() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
pool.peersMtx.Lock()
|
|
||||||
maxPeerHeight := 0
|
maxPeerHeight := 0
|
||||||
for _, peer := range pool.peers {
|
for _, peer := range pool.peers {
|
||||||
maxPeerHeight = MaxInt(maxPeerHeight, peer.height)
|
maxPeerHeight = MaxInt(maxPeerHeight, peer.height)
|
||||||
}
|
}
|
||||||
pool.peersMtx.Unlock()
|
|
||||||
|
|
||||||
isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight)
|
isCaughtUp := (height > 0 || time.Now().Sub(pool.startTime) > 5*time.Second) && (maxPeerHeight == 0 || height >= maxPeerHeight)
|
||||||
log.Notice(Fmt("IsCaughtUp: %v", isCaughtUp), "height", height, "maxPeerHeight", maxPeerHeight)
|
log.Notice(Fmt("IsCaughtUp: %v", isCaughtUp), "height", height, "maxPeerHeight", maxPeerHeight)
|
||||||
@ -153,7 +151,7 @@ func (pool *BlockPool) IsCaughtUp() bool {
|
|||||||
// So we peek two blocks at a time.
|
// So we peek two blocks at a time.
|
||||||
// The caller will verify the commit.
|
// The caller will verify the commit.
|
||||||
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
|
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
|
||||||
pool.mtx.Lock() // Lock
|
pool.mtx.Lock()
|
||||||
defer pool.mtx.Unlock()
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
if r := pool.requesters[pool.height]; r != nil {
|
if r := pool.requesters[pool.height]; r != nil {
|
||||||
@ -168,7 +166,7 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block)
|
|||||||
// Pop the first block at pool.height
|
// Pop the first block at pool.height
|
||||||
// It must have been validated by 'second'.Commit from PeekTwoBlocks().
|
// It must have been validated by 'second'.Commit from PeekTwoBlocks().
|
||||||
func (pool *BlockPool) PopRequest() {
|
func (pool *BlockPool) PopRequest() {
|
||||||
pool.mtx.Lock() // Lock
|
pool.mtx.Lock()
|
||||||
defer pool.mtx.Unlock()
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
if r := pool.requesters[pool.height]; r != nil {
|
if r := pool.requesters[pool.height]; r != nil {
|
||||||
@ -188,21 +186,21 @@ func (pool *BlockPool) PopRequest() {
|
|||||||
// Invalidates the block at pool.height,
|
// Invalidates the block at pool.height,
|
||||||
// Remove the peer and redo request from others.
|
// Remove the peer and redo request from others.
|
||||||
func (pool *BlockPool) RedoRequest(height int) {
|
func (pool *BlockPool) RedoRequest(height int) {
|
||||||
pool.mtx.Lock() // Lock
|
pool.mtx.Lock()
|
||||||
defer pool.mtx.Unlock()
|
|
||||||
|
|
||||||
request := pool.requesters[height]
|
request := pool.requesters[height]
|
||||||
|
pool.mtx.Unlock()
|
||||||
|
|
||||||
if request.block == nil {
|
if request.block == nil {
|
||||||
PanicSanity("Expected block to be non-nil")
|
PanicSanity("Expected block to be non-nil")
|
||||||
}
|
}
|
||||||
// RemovePeer will redo all requesters associated with this peer.
|
// RemovePeer will redo all requesters associated with this peer.
|
||||||
// TODO: record this malfeasance
|
// TODO: record this malfeasance
|
||||||
pool.RemovePeer(request.peerID) // Lock on peersMtx.
|
pool.RemovePeer(request.peerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: ensure that blocks come in order for each peer.
|
// TODO: ensure that blocks come in order for each peer.
|
||||||
func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) {
|
func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int) {
|
||||||
pool.mtx.Lock() // Lock
|
pool.mtx.Lock()
|
||||||
defer pool.mtx.Unlock()
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
requester := pool.requesters[block.Height]
|
requester := pool.requesters[block.Height]
|
||||||
@ -212,7 +210,7 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int
|
|||||||
|
|
||||||
if requester.setBlock(block, peerID) {
|
if requester.setBlock(block, peerID) {
|
||||||
pool.numPending--
|
pool.numPending--
|
||||||
peer := pool.getPeer(peerID)
|
peer := pool.peers[peerID]
|
||||||
peer.decrPending(blockSize)
|
peer.decrPending(blockSize)
|
||||||
} else {
|
} else {
|
||||||
// Bad peer?
|
// Bad peer?
|
||||||
@ -221,8 +219,8 @@ func (pool *BlockPool) AddBlock(peerID string, block *types.Block, blockSize int
|
|||||||
|
|
||||||
// Sets the peer's alleged blockchain height.
|
// Sets the peer's alleged blockchain height.
|
||||||
func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
|
func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
|
||||||
pool.peersMtx.Lock() // Lock
|
pool.mtx.Lock()
|
||||||
defer pool.peersMtx.Unlock()
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
peer := pool.peers[peerID]
|
peer := pool.peers[peerID]
|
||||||
if peer != nil {
|
if peer != nil {
|
||||||
@ -234,8 +232,8 @@ func (pool *BlockPool) SetPeerHeight(peerID string, height int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pool *BlockPool) RemovePeer(peerID string) {
|
func (pool *BlockPool) RemovePeer(peerID string) {
|
||||||
pool.peersMtx.Lock() // Lock
|
pool.mtx.Lock()
|
||||||
defer pool.peersMtx.Unlock()
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
pool.removePeer(peerID)
|
pool.removePeer(peerID)
|
||||||
}
|
}
|
||||||
@ -250,22 +248,14 @@ func (pool *BlockPool) removePeer(peerID string) {
|
|||||||
delete(pool.peers, peerID)
|
delete(pool.peers, peerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pool *BlockPool) getPeer(peerID string) *bpPeer {
|
|
||||||
pool.peersMtx.Lock() // Lock
|
|
||||||
defer pool.peersMtx.Unlock()
|
|
||||||
|
|
||||||
peer := pool.peers[peerID]
|
|
||||||
return peer
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pick an available peer with at least the given minHeight.
|
// Pick an available peer with at least the given minHeight.
|
||||||
// If no peers are available, returns nil.
|
// If no peers are available, returns nil.
|
||||||
func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
|
func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
|
||||||
pool.peersMtx.Lock()
|
pool.mtx.Lock()
|
||||||
defer pool.peersMtx.Unlock()
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
for _, peer := range pool.peers {
|
for _, peer := range pool.peers {
|
||||||
if peer.isBad() {
|
if peer.didTimeout {
|
||||||
pool.removePeer(peer.id)
|
pool.removePeer(peer.id)
|
||||||
continue
|
continue
|
||||||
} else {
|
} else {
|
||||||
@ -283,7 +273,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(minHeight int) *bpPeer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pool *BlockPool) makeNextRequester() {
|
func (pool *BlockPool) makeNextRequester() {
|
||||||
pool.mtx.Lock() // Lock
|
pool.mtx.Lock()
|
||||||
defer pool.mtx.Unlock()
|
defer pool.mtx.Unlock()
|
||||||
|
|
||||||
nextHeight := pool.height + len(pool.requesters)
|
nextHeight := pool.height + len(pool.requesters)
|
||||||
@ -330,11 +320,13 @@ func (pool *BlockPool) debug() string {
|
|||||||
type bpPeer struct {
|
type bpPeer struct {
|
||||||
pool *BlockPool
|
pool *BlockPool
|
||||||
id string
|
id string
|
||||||
height int
|
|
||||||
numPending int32
|
|
||||||
recvMonitor *flow.Monitor
|
recvMonitor *flow.Monitor
|
||||||
timeout *time.Timer
|
|
||||||
didTimeout bool
|
mtx sync.Mutex
|
||||||
|
height int
|
||||||
|
numPending int32
|
||||||
|
timeout *time.Timer
|
||||||
|
didTimeout bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer {
|
func newBPPeer(pool *BlockPool, peerID string, height int) *bpPeer {
|
||||||
@ -380,15 +372,14 @@ func (peer *bpPeer) decrPending(recvSize int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (peer *bpPeer) onTimeout() {
|
func (peer *bpPeer) onTimeout() {
|
||||||
|
peer.pool.mtx.Lock()
|
||||||
|
defer peer.pool.mtx.Unlock()
|
||||||
|
|
||||||
peer.pool.sendTimeout(peer.id)
|
peer.pool.sendTimeout(peer.id)
|
||||||
log.Warn("SendTimeout", "peer", peer.id, "reason", "onTimeout")
|
log.Warn("SendTimeout", "peer", peer.id, "reason", "onTimeout")
|
||||||
peer.didTimeout = true
|
peer.didTimeout = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (peer *bpPeer) isBad() bool {
|
|
||||||
return peer.didTimeout
|
|
||||||
}
|
|
||||||
|
|
||||||
//-------------------------------------
|
//-------------------------------------
|
||||||
|
|
||||||
type bpRequester struct {
|
type bpRequester struct {
|
||||||
|
@ -196,7 +196,7 @@ FOR_LOOP:
|
|||||||
// ask for status updates
|
// ask for status updates
|
||||||
go bcR.BroadcastStatusRequest()
|
go bcR.BroadcastStatusRequest()
|
||||||
case _ = <-switchToConsensusTicker.C:
|
case _ = <-switchToConsensusTicker.C:
|
||||||
height, numPending := bcR.pool.GetStatus()
|
height, numPending, _ := bcR.pool.GetStatus()
|
||||||
outbound, inbound, _ := bcR.Switch.NumPeers()
|
outbound, inbound, _ := bcR.Switch.NumPeers()
|
||||||
log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
|
log.Info("Consensus ticker", "numPending", numPending, "total", len(bcR.pool.requesters),
|
||||||
"outbound", outbound, "inbound", inbound)
|
"outbound", outbound, "inbound", inbound)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user