diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 3c2b1b2e..bc9bf3ff 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -83,7 +83,7 @@ type BlockchainReactor struct { // This channel is used by the FSM and indirectly the block pool to report errors to the blockchain reactor and // the switch. - errorsFromFSMCh chan peerError + errorsFromFSMCh chan bReactorMessageData } type BlockRequest struct { @@ -107,7 +107,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl } const capacity = 1000 - errorsFromFSMCh := make(chan peerError, capacity) + errorsFromFSMCh := make(chan bReactorMessageData, capacity) messagesForFSMCh := make(chan bReactorMessageData, capacity) errorsForFSMCh := make(chan bReactorMessageData, capacity) @@ -198,11 +198,6 @@ func (bcR *BlockchainReactor) sendMessageToFSMAsync(msg bReactorMessageData) { bcR.messagesForFSMCh <- msg } -func (bcR *BlockchainReactor) sendErrorToFSMAsync(msg bReactorMessageData) { - bcR.Logger.Debug("send message to FSM for processing", "msg", msg.String()) - bcR.errorsForFSMCh <- msg -} - // RemovePeer implements Reactor by removing peer from the pool. func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { msgData := bReactorMessageData{ @@ -212,7 +207,7 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { err: errSwitchRemovesPeer, }, } - bcR.sendErrorToFSMAsync(msgData) + bcR.errorsForFSMCh <- msgData } // Receive implements Reactor by handling 4 types of messages (look below). @@ -302,26 +297,10 @@ ForLoop: // - the number of blocks received and waiting to be processed, // - the number of blockResponse messages waiting in messagesForFSMCh, etc. // Currently maxNumPendingRequests value is not changed. - msgData := bReactorMessageData{ + _ = bcR.fsm.handle(&bReactorMessageData{ event: makeRequestsEv, data: bReactorEventData{ - maxNumRequests: maxNumPendingRequests, - }, - } - _ = sendMessageToFSMSync(bcR.fsm, msgData) - - case msg := <-bcR.errorsFromFSMCh: - bcR.reportPeerErrorToSwitch(msg.err, msg.peerID) - if msg.err == errNoPeerResponse { - msgData := bReactorMessageData{ - event: peerRemoveEv, - data: bReactorEventData{ - peerId: msg.peerID, - err: msg.err, - }, - } - _ = sendMessageToFSMSync(bcR.fsm, msgData) - } + maxNumRequests: maxNumPendingRequests}}) case <-statusUpdateTicker.C: // Ask for status updates. @@ -351,13 +330,12 @@ ForLoop: continue ForLoop } // Notify FSM of block processing result. - msgData := bReactorMessageData{ + _ = bcR.fsm.handle(&bReactorMessageData{ event: processedBlockEv, data: bReactorEventData{ err: err, }, - } - _ = sendMessageToFSMSync(bcR.fsm, msgData) + }) if err == errBlockVerificationFailure { continue ForLoop @@ -374,10 +352,16 @@ ForLoop: } case msg := <-bcR.messagesForFSMCh: - _ = sendMessageToFSMSync(bcR.fsm, msg) + _ = bcR.fsm.handle(&msg) case msg := <-bcR.errorsForFSMCh: - _ = sendMessageToFSMSync(bcR.fsm, msg) + _ = bcR.fsm.handle(&msg) + + case msg := <-bcR.errorsFromFSMCh: + bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerId) + if msg.data.err == errNoPeerResponse { + _ = bcR.fsm.handle(&msg) + } case <-bcR.Quit(): break ForLoop @@ -395,10 +379,17 @@ func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID) // Called by FSM and pool: // - pool calls when it detects slow peer or when peer times out // - FSM calls when: -// - processing a block (addBlock) fails -// - BCR processing of a block reports failure to FSM, FSM sends back the peers of first and second blocks +// - adding a block (addBlock) fails +// - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { - bcR.errorsFromFSMCh <- peerError{err, peerID} + msgData := bReactorMessageData{ + event: peerRemoveEv, + data: bReactorEventData{ + peerId: peerID, + err: err, + }, + } + bcR.errorsFromFSMCh <- msgData } func (bcR *BlockchainReactor) processBlocksFromPoolRoutine() error { diff --git a/blockchain/reactor_fsm.go b/blockchain/reactor_fsm.go index a713aca3..32013939 100644 --- a/blockchain/reactor_fsm.go +++ b/blockchain/reactor_fsm.go @@ -285,6 +285,9 @@ func init() { // We haven't received the block at current height. Remove peer. fsm.pool.removePeerAtCurrentHeight(errNoPeerResponse) fsm.resetStateTimer() + if len(fsm.pool.peers) == 0 { + return waitForPeer, errNoPeerResponse + } return waitForBlock, errNoPeerResponse case stopFSMEv: @@ -325,23 +328,14 @@ func (fsm *bReactorFSM) setLogger(l log.Logger) { fsm.pool.setLogger(l) } -func sendMessageToFSMSync(fsm *bReactorFSM, msg bReactorMessageData) error { - err := fsm.handle(&msg) - return err -} - // Starts the FSM goroutine. func (fsm *bReactorFSM) start() { - _ = sendMessageToFSMSync(fsm, bReactorMessageData{ - event: startFSMEv, - }) + _ = fsm.handle(&bReactorMessageData{event: startFSMEv}) } // Stops the FSM goroutine. func (fsm *bReactorFSM) stop() { - _ = sendMessageToFSMSync(fsm, bReactorMessageData{ - event: stopFSMEv, - }) + _ = fsm.handle(&bReactorMessageData{event: stopFSMEv}) } // handle processes messages and events sent to the FSM. diff --git a/docs/spec/reactors/block_sync/img/bc-reactor-new-datastructs.png b/docs/spec/reactors/block_sync/img/bc-reactor-new-datastructs.png new file mode 100644 index 00000000..dd93e5ab Binary files /dev/null and b/docs/spec/reactors/block_sync/img/bc-reactor-new-datastructs.png differ diff --git a/docs/spec/reactors/block_sync/img/bc-reactor-new-fsm.png b/docs/spec/reactors/block_sync/img/bc-reactor-new-fsm.png new file mode 100644 index 00000000..6eaa5428 Binary files /dev/null and b/docs/spec/reactors/block_sync/img/bc-reactor-new-fsm.png differ diff --git a/docs/spec/reactors/block_sync/img/bc-reactor-new-goroutines.png b/docs/spec/reactors/block_sync/img/bc-reactor-new-goroutines.png new file mode 100644 index 00000000..4201f634 Binary files /dev/null and b/docs/spec/reactors/block_sync/img/bc-reactor-new-goroutines.png differ