diff --git a/blockchain/pool.go b/blockchain/pool.go index cfff67fd..42587313 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -67,10 +67,6 @@ func (pool *blockPool) setLogger(l log.Logger) { pool.logger = l } -func (pool blockPool) getMaxPeerHeight() int64 { - return pool.maxPeerHeight -} - func (pool *blockPool) reachedMaxHeight() bool { return pool.height >= pool.maxPeerHeight } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 219027d2..77c98419 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -37,7 +37,7 @@ var ( // have not been received. maxRequestsPerPeer int32 = 20 // Maximum number of block requests for the reactor, pending or for which blocks have been received. - maxNumRequests int32 = 500 + maxNumRequests int32 = 64 ) type consensusReactor interface { @@ -78,6 +78,37 @@ type BlockRequest struct { PeerID p2p.ID } +// NewBlockchainReactor returns new reactor instance. +func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore, + fastSync bool) *BlockchainReactor { + + if state.LastBlockHeight != store.Height() { + panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, + store.Height())) + } + + const capacity = 1000 + eventsFromFSMCh := make(chan bcFsmMessage, capacity) + messagesForFSMCh := make(chan bcReactorMessage, capacity) + errorsForFSMCh := make(chan bcReactorMessage, capacity) + + startHeight := store.Height() + 1 + bcR := &BlockchainReactor{ + initialState: state, + state: state, + blockExec: blockExec, + fastSync: fastSync, + store: store, + messagesForFSMCh: messagesForFSMCh, + eventsFromFSMCh: eventsFromFSMCh, + errorsForFSMCh: errorsForFSMCh, + } + fsm := NewFSM(startHeight, bcR) + bcR.fsm = fsm + bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) + return bcR +} + // bcReactorMessage is used by the reactor to send messages to the FSM. type bcReactorMessage struct { event bReactorEvent @@ -103,36 +134,6 @@ type bcFsmMessage struct { data bFsmEventData } -// NewBlockchainReactor returns new reactor instance. -func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore, - fastSync bool) *BlockchainReactor { - - if state.LastBlockHeight != store.Height() { - panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, - store.Height())) - } - - const capacity = 1000 - eventsFromFSMCh := make(chan bcFsmMessage, capacity) - messagesForFSMCh := make(chan bcReactorMessage, capacity) - errorsForFSMCh := make(chan bcReactorMessage, capacity) - - bcR := &BlockchainReactor{ - initialState: state, - state: state, - blockExec: blockExec, - fastSync: fastSync, - store: store, - messagesForFSMCh: messagesForFSMCh, - eventsFromFSMCh: eventsFromFSMCh, - errorsForFSMCh: errorsForFSMCh, - } - fsm := NewFSM(store.Height()+1, bcR) - bcR.fsm = fsm - bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) - return bcR -} - // SetLogger implements cmn.Service by setting the logger on reactor and pool. func (bcR *BlockchainReactor) SetLogger(l log.Logger) { bcR.BaseService.Logger = l @@ -283,11 +284,60 @@ func (bcR *BlockchainReactor) poolRoutine() { lastHundred := time.Now() lastRate := 0.0 + stopProcessing := make(chan struct{}, 1) + + go func(stopProcessing chan struct{}) { + ForLoop: + for { + select { + case <-stopProcessing: + bcR.Logger.Info("finishing block execution") + break ForLoop + case <-processReceivedBlockTicker.C: // try to execute blocks + select { + case doProcessBlockCh <- struct{}{}: + default: + } + case <-doProcessBlockCh: + for { + err := bcR.processBlock() + if err == errMissingBlocks { + break + } + // Notify FSM of block processing result. + msgForFSM := bcReactorMessage{ + event: processedBlockEv, + data: bReactorEventData{ + err: err, + }, + } + _ = bcR.fsm.handle(&msgForFSM) + + if err != nil { + break + } + + bcR.blocksSynced++ + if bcR.blocksSynced%100 == 0 { + lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) + height, maxPeerHeight := bcR.fsm.getStatus() + bcR.Logger.Info("Fast Sync Rate", "height", height, + "max_peer_height", maxPeerHeight, "blocks/s", lastRate) + lastHundred = time.Now() + } + } + } + } + }(stopProcessing) + ForLoop: for { select { case <-sendBlockRequestTicker.C: + if !bcR.fsm.needsBlocks() { + continue + } _ = bcR.fsm.handle(&bcReactorMessage{ event: makeRequestsEv, data: bReactorEventData{ @@ -297,39 +347,6 @@ ForLoop: // Ask for status updates. go bcR.sendStatusRequest() - case <-processReceivedBlockTicker.C: // chan time - select { - case doProcessBlockCh <- struct{}{}: - default: - } - - case <-doProcessBlockCh: - err := bcR.processBlock() - if err == errMissingBlocks { - continue - } - // Notify FSM of block processing result. - _ = bcR.fsm.handle(&bcReactorMessage{ - event: processedBlockEv, - data: bReactorEventData{ - err: err, - }, - }) - - if err == errBlockVerificationFailure { - continue - } - - doProcessBlockCh <- struct{}{} - - bcR.blocksSynced++ - if bcR.blocksSynced%100 == 0 { - lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) - bcR.Logger.Info("Fast Sync Rate", "height", bcR.fsm.pool.height, - "max_peer_height", bcR.fsm.pool.getMaxPeerHeight(), "blocks/s", lastRate) - lastHundred = time.Now() - } - case msg := <-bcR.messagesForFSMCh: // Sent from the Receive() routine when status (statusResponseEv) and // block (blockResponseEv) response events are received @@ -343,6 +360,7 @@ ForLoop: case msg := <-bcR.eventsFromFSMCh: switch msg.event { case syncFinishedEv: + stopProcessing <- struct{}{} // Sent from the FSM when it enters finished state. break ForLoop case peerErrorEv: @@ -380,7 +398,7 @@ func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID) func (bcR *BlockchainReactor) processBlock() error { - firstBP, secondBP, err := bcR.fsm.pool.getNextTwoBlocks() + firstBP, secondBP, err := bcR.fsm.getNextTwoBlocks() if err != nil { // We need both to sync the first block. return err @@ -457,6 +475,7 @@ func (bcR *BlockchainReactor) switchToConsensus() { // - adding a block (addBlock) fails // - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { + bcR.Logger.Info("sendPeerError:", "peer", peerID, "error", err) msgData := bcFsmMessage{ event: peerErrorEv, data: bFsmEventData{ diff --git a/blockchain/reactor_fsm.go b/blockchain/reactor_fsm.go index 0c06f085..d26fff2d 100644 --- a/blockchain/reactor_fsm.go +++ b/blockchain/reactor_fsm.go @@ -3,6 +3,7 @@ package blockchain import ( "errors" "fmt" + "sync" "time" "github.com/tendermint/tendermint/libs/log" @@ -30,7 +31,9 @@ func (s *bReactorFSMState) String() string { // Blockchain Reactor State Machine type bReactorFSM struct { - logger log.Logger + logger log.Logger + mtx sync.Mutex + startTime time.Time state *bReactorFSMState @@ -325,7 +328,7 @@ func init() { finished = &bReactorFSMState{ name: "finished", enter: func(fsm *bReactorFSM) { - fsm.logger.Error("Time to switch to consensus reactor!", "height", fsm.pool.height) + fsm.logger.Info("Time to switch to consensus reactor!", "height", fsm.pool.height) fsm.toBcR.switchToConsensus() fsm.cleanup() }, @@ -358,6 +361,8 @@ func (fsm *bReactorFSM) start() { // handle processes messages and events sent to the FSM. func (fsm *bReactorFSM) handle(msg *bcReactorMessage) error { + fsm.mtx.Lock() + defer fsm.mtx.Unlock() fsm.logger.Debug("FSM received", "event", msg, "state", fsm.state) if fsm.state == nil { @@ -405,3 +410,21 @@ func (fsm *bReactorFSM) makeNextRequests(maxNumRequests int32) { func (fsm *bReactorFSM) cleanup() { fsm.pool.cleanup() } + +func (fsm *bReactorFSM) needsBlocks() bool { + fsm.mtx.Lock() + defer fsm.mtx.Unlock() + return fsm.state.name == "waitForBlock" +} + +func (fsm *bReactorFSM) getNextTwoBlocks() (first, second *blockData, err error) { + fsm.mtx.Lock() + defer fsm.mtx.Unlock() + return fsm.pool.getNextTwoBlocks() +} + +func (fsm *bReactorFSM) getStatus() (height, maxPeerHeight int64) { + fsm.mtx.Lock() + defer fsm.mtx.Unlock() + return fsm.pool.height, fsm.pool.maxPeerHeight +}