Move block execution in separate goroutine

This commit is contained in:
Anca Zamfir 2019-05-20 23:20:18 -04:00
parent 60799a4a2c
commit 0bfc3724f7
No known key found for this signature in database
GPG Key ID: 61B09A558E4B6A7D
3 changed files with 109 additions and 71 deletions

View File

@ -67,10 +67,6 @@ func (pool *blockPool) setLogger(l log.Logger) {
pool.logger = l pool.logger = l
} }
func (pool blockPool) getMaxPeerHeight() int64 {
return pool.maxPeerHeight
}
func (pool *blockPool) reachedMaxHeight() bool { func (pool *blockPool) reachedMaxHeight() bool {
return pool.height >= pool.maxPeerHeight return pool.height >= pool.maxPeerHeight
} }

View File

@ -37,7 +37,7 @@ var (
// have not been received. // have not been received.
maxRequestsPerPeer int32 = 20 maxRequestsPerPeer int32 = 20
// Maximum number of block requests for the reactor, pending or for which blocks have been received. // Maximum number of block requests for the reactor, pending or for which blocks have been received.
maxNumRequests int32 = 500 maxNumRequests int32 = 64
) )
type consensusReactor interface { type consensusReactor interface {
@ -78,6 +78,37 @@ type BlockRequest struct {
PeerID p2p.ID PeerID p2p.ID
} }
// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore,
fastSync bool) *BlockchainReactor {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height()))
}
const capacity = 1000
eventsFromFSMCh := make(chan bcFsmMessage, capacity)
messagesForFSMCh := make(chan bcReactorMessage, capacity)
errorsForFSMCh := make(chan bcReactorMessage, capacity)
startHeight := store.Height() + 1
bcR := &BlockchainReactor{
initialState: state,
state: state,
blockExec: blockExec,
fastSync: fastSync,
store: store,
messagesForFSMCh: messagesForFSMCh,
eventsFromFSMCh: eventsFromFSMCh,
errorsForFSMCh: errorsForFSMCh,
}
fsm := NewFSM(startHeight, bcR)
bcR.fsm = fsm
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR
}
// bcReactorMessage is used by the reactor to send messages to the FSM. // bcReactorMessage is used by the reactor to send messages to the FSM.
type bcReactorMessage struct { type bcReactorMessage struct {
event bReactorEvent event bReactorEvent
@ -103,36 +134,6 @@ type bcFsmMessage struct {
data bFsmEventData data bFsmEventData
} }
// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore,
fastSync bool) *BlockchainReactor {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height()))
}
const capacity = 1000
eventsFromFSMCh := make(chan bcFsmMessage, capacity)
messagesForFSMCh := make(chan bcReactorMessage, capacity)
errorsForFSMCh := make(chan bcReactorMessage, capacity)
bcR := &BlockchainReactor{
initialState: state,
state: state,
blockExec: blockExec,
fastSync: fastSync,
store: store,
messagesForFSMCh: messagesForFSMCh,
eventsFromFSMCh: eventsFromFSMCh,
errorsForFSMCh: errorsForFSMCh,
}
fsm := NewFSM(store.Height()+1, bcR)
bcR.fsm = fsm
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR
}
// SetLogger implements cmn.Service by setting the logger on reactor and pool. // SetLogger implements cmn.Service by setting the logger on reactor and pool.
func (bcR *BlockchainReactor) SetLogger(l log.Logger) { func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
bcR.BaseService.Logger = l bcR.BaseService.Logger = l
@ -283,11 +284,60 @@ func (bcR *BlockchainReactor) poolRoutine() {
lastHundred := time.Now() lastHundred := time.Now()
lastRate := 0.0 lastRate := 0.0
stopProcessing := make(chan struct{}, 1)
go func(stopProcessing chan struct{}) {
ForLoop:
for {
select {
case <-stopProcessing:
bcR.Logger.Info("finishing block execution")
break ForLoop
case <-processReceivedBlockTicker.C: // try to execute blocks
select {
case doProcessBlockCh <- struct{}{}:
default:
}
case <-doProcessBlockCh:
for {
err := bcR.processBlock()
if err == errMissingBlocks {
break
}
// Notify FSM of block processing result.
msgForFSM := bcReactorMessage{
event: processedBlockEv,
data: bReactorEventData{
err: err,
},
}
_ = bcR.fsm.handle(&msgForFSM)
if err != nil {
break
}
bcR.blocksSynced++
if bcR.blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
height, maxPeerHeight := bcR.fsm.getStatus()
bcR.Logger.Info("Fast Sync Rate", "height", height,
"max_peer_height", maxPeerHeight, "blocks/s", lastRate)
lastHundred = time.Now()
}
}
}
}
}(stopProcessing)
ForLoop: ForLoop:
for { for {
select { select {
case <-sendBlockRequestTicker.C: case <-sendBlockRequestTicker.C:
if !bcR.fsm.needsBlocks() {
continue
}
_ = bcR.fsm.handle(&bcReactorMessage{ _ = bcR.fsm.handle(&bcReactorMessage{
event: makeRequestsEv, event: makeRequestsEv,
data: bReactorEventData{ data: bReactorEventData{
@ -297,39 +347,6 @@ ForLoop:
// Ask for status updates. // Ask for status updates.
go bcR.sendStatusRequest() go bcR.sendStatusRequest()
case <-processReceivedBlockTicker.C: // chan time
select {
case doProcessBlockCh <- struct{}{}:
default:
}
case <-doProcessBlockCh:
err := bcR.processBlock()
if err == errMissingBlocks {
continue
}
// Notify FSM of block processing result.
_ = bcR.fsm.handle(&bcReactorMessage{
event: processedBlockEv,
data: bReactorEventData{
err: err,
},
})
if err == errBlockVerificationFailure {
continue
}
doProcessBlockCh <- struct{}{}
bcR.blocksSynced++
if bcR.blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.fsm.pool.height,
"max_peer_height", bcR.fsm.pool.getMaxPeerHeight(), "blocks/s", lastRate)
lastHundred = time.Now()
}
case msg := <-bcR.messagesForFSMCh: case msg := <-bcR.messagesForFSMCh:
// Sent from the Receive() routine when status (statusResponseEv) and // Sent from the Receive() routine when status (statusResponseEv) and
// block (blockResponseEv) response events are received // block (blockResponseEv) response events are received
@ -343,6 +360,7 @@ ForLoop:
case msg := <-bcR.eventsFromFSMCh: case msg := <-bcR.eventsFromFSMCh:
switch msg.event { switch msg.event {
case syncFinishedEv: case syncFinishedEv:
stopProcessing <- struct{}{}
// Sent from the FSM when it enters finished state. // Sent from the FSM when it enters finished state.
break ForLoop break ForLoop
case peerErrorEv: case peerErrorEv:
@ -380,7 +398,7 @@ func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID)
func (bcR *BlockchainReactor) processBlock() error { func (bcR *BlockchainReactor) processBlock() error {
firstBP, secondBP, err := bcR.fsm.pool.getNextTwoBlocks() firstBP, secondBP, err := bcR.fsm.getNextTwoBlocks()
if err != nil { if err != nil {
// We need both to sync the first block. // We need both to sync the first block.
return err return err
@ -457,6 +475,7 @@ func (bcR *BlockchainReactor) switchToConsensus() {
// - adding a block (addBlock) fails // - adding a block (addBlock) fails
// - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks // - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks
func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
bcR.Logger.Info("sendPeerError:", "peer", peerID, "error", err)
msgData := bcFsmMessage{ msgData := bcFsmMessage{
event: peerErrorEv, event: peerErrorEv,
data: bFsmEventData{ data: bFsmEventData{

View File

@ -3,6 +3,7 @@ package blockchain
import ( import (
"errors" "errors"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -31,6 +32,8 @@ func (s *bReactorFSMState) String() string {
// Blockchain Reactor State Machine // Blockchain Reactor State Machine
type bReactorFSM struct { type bReactorFSM struct {
logger log.Logger logger log.Logger
mtx sync.Mutex
startTime time.Time startTime time.Time
state *bReactorFSMState state *bReactorFSMState
@ -325,7 +328,7 @@ func init() {
finished = &bReactorFSMState{ finished = &bReactorFSMState{
name: "finished", name: "finished",
enter: func(fsm *bReactorFSM) { enter: func(fsm *bReactorFSM) {
fsm.logger.Error("Time to switch to consensus reactor!", "height", fsm.pool.height) fsm.logger.Info("Time to switch to consensus reactor!", "height", fsm.pool.height)
fsm.toBcR.switchToConsensus() fsm.toBcR.switchToConsensus()
fsm.cleanup() fsm.cleanup()
}, },
@ -358,6 +361,8 @@ func (fsm *bReactorFSM) start() {
// handle processes messages and events sent to the FSM. // handle processes messages and events sent to the FSM.
func (fsm *bReactorFSM) handle(msg *bcReactorMessage) error { func (fsm *bReactorFSM) handle(msg *bcReactorMessage) error {
fsm.mtx.Lock()
defer fsm.mtx.Unlock()
fsm.logger.Debug("FSM received", "event", msg, "state", fsm.state) fsm.logger.Debug("FSM received", "event", msg, "state", fsm.state)
if fsm.state == nil { if fsm.state == nil {
@ -405,3 +410,21 @@ func (fsm *bReactorFSM) makeNextRequests(maxNumRequests int32) {
func (fsm *bReactorFSM) cleanup() { func (fsm *bReactorFSM) cleanup() {
fsm.pool.cleanup() fsm.pool.cleanup()
} }
func (fsm *bReactorFSM) needsBlocks() bool {
fsm.mtx.Lock()
defer fsm.mtx.Unlock()
return fsm.state.name == "waitForBlock"
}
func (fsm *bReactorFSM) getNextTwoBlocks() (first, second *blockData, err error) {
fsm.mtx.Lock()
defer fsm.mtx.Unlock()
return fsm.pool.getNextTwoBlocks()
}
func (fsm *bReactorFSM) getStatus() (height, maxPeerHeight int64) {
fsm.mtx.Lock()
defer fsm.mtx.Unlock()
return fsm.pool.height, fsm.pool.maxPeerHeight
}