updating docs and cleanup

This commit is contained in:
Anca Zamfir 2019-04-30 21:46:27 -04:00
parent 8ef2f6b5f2
commit 02c34994fc
5 changed files with 30 additions and 45 deletions

View File

@ -83,7 +83,7 @@ type BlockchainReactor struct {
// This channel is used by the FSM and indirectly the block pool to report errors to the blockchain reactor and // This channel is used by the FSM and indirectly the block pool to report errors to the blockchain reactor and
// the switch. // the switch.
errorsFromFSMCh chan peerError errorsFromFSMCh chan bReactorMessageData
} }
type BlockRequest struct { type BlockRequest struct {
@ -107,7 +107,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl
} }
const capacity = 1000 const capacity = 1000
errorsFromFSMCh := make(chan peerError, capacity) errorsFromFSMCh := make(chan bReactorMessageData, capacity)
messagesForFSMCh := make(chan bReactorMessageData, capacity) messagesForFSMCh := make(chan bReactorMessageData, capacity)
errorsForFSMCh := make(chan bReactorMessageData, capacity) errorsForFSMCh := make(chan bReactorMessageData, capacity)
@ -198,11 +198,6 @@ func (bcR *BlockchainReactor) sendMessageToFSMAsync(msg bReactorMessageData) {
bcR.messagesForFSMCh <- msg bcR.messagesForFSMCh <- msg
} }
func (bcR *BlockchainReactor) sendErrorToFSMAsync(msg bReactorMessageData) {
bcR.Logger.Debug("send message to FSM for processing", "msg", msg.String())
bcR.errorsForFSMCh <- msg
}
// RemovePeer implements Reactor by removing peer from the pool. // RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
msgData := bReactorMessageData{ msgData := bReactorMessageData{
@ -212,7 +207,7 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
err: errSwitchRemovesPeer, err: errSwitchRemovesPeer,
}, },
} }
bcR.sendErrorToFSMAsync(msgData) bcR.errorsForFSMCh <- msgData
} }
// Receive implements Reactor by handling 4 types of messages (look below). // Receive implements Reactor by handling 4 types of messages (look below).
@ -302,26 +297,10 @@ ForLoop:
// - the number of blocks received and waiting to be processed, // - the number of blocks received and waiting to be processed,
// - the number of blockResponse messages waiting in messagesForFSMCh, etc. // - the number of blockResponse messages waiting in messagesForFSMCh, etc.
// Currently maxNumPendingRequests value is not changed. // Currently maxNumPendingRequests value is not changed.
msgData := bReactorMessageData{ _ = bcR.fsm.handle(&bReactorMessageData{
event: makeRequestsEv, event: makeRequestsEv,
data: bReactorEventData{ data: bReactorEventData{
maxNumRequests: maxNumPendingRequests, maxNumRequests: maxNumPendingRequests}})
},
}
_ = sendMessageToFSMSync(bcR.fsm, msgData)
case msg := <-bcR.errorsFromFSMCh:
bcR.reportPeerErrorToSwitch(msg.err, msg.peerID)
if msg.err == errNoPeerResponse {
msgData := bReactorMessageData{
event: peerRemoveEv,
data: bReactorEventData{
peerId: msg.peerID,
err: msg.err,
},
}
_ = sendMessageToFSMSync(bcR.fsm, msgData)
}
case <-statusUpdateTicker.C: case <-statusUpdateTicker.C:
// Ask for status updates. // Ask for status updates.
@ -351,13 +330,12 @@ ForLoop:
continue ForLoop continue ForLoop
} }
// Notify FSM of block processing result. // Notify FSM of block processing result.
msgData := bReactorMessageData{ _ = bcR.fsm.handle(&bReactorMessageData{
event: processedBlockEv, event: processedBlockEv,
data: bReactorEventData{ data: bReactorEventData{
err: err, err: err,
}, },
} })
_ = sendMessageToFSMSync(bcR.fsm, msgData)
if err == errBlockVerificationFailure { if err == errBlockVerificationFailure {
continue ForLoop continue ForLoop
@ -374,10 +352,16 @@ ForLoop:
} }
case msg := <-bcR.messagesForFSMCh: case msg := <-bcR.messagesForFSMCh:
_ = sendMessageToFSMSync(bcR.fsm, msg) _ = bcR.fsm.handle(&msg)
case msg := <-bcR.errorsForFSMCh: case msg := <-bcR.errorsForFSMCh:
_ = sendMessageToFSMSync(bcR.fsm, msg) _ = bcR.fsm.handle(&msg)
case msg := <-bcR.errorsFromFSMCh:
bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerId)
if msg.data.err == errNoPeerResponse {
_ = bcR.fsm.handle(&msg)
}
case <-bcR.Quit(): case <-bcR.Quit():
break ForLoop break ForLoop
@ -395,10 +379,17 @@ func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID)
// Called by FSM and pool: // Called by FSM and pool:
// - pool calls when it detects slow peer or when peer times out // - pool calls when it detects slow peer or when peer times out
// - FSM calls when: // - FSM calls when:
// - processing a block (addBlock) fails // - adding a block (addBlock) fails
// - BCR processing of a block reports failure to FSM, FSM sends back the peers of first and second blocks // - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks
func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
bcR.errorsFromFSMCh <- peerError{err, peerID} msgData := bReactorMessageData{
event: peerRemoveEv,
data: bReactorEventData{
peerId: peerID,
err: err,
},
}
bcR.errorsFromFSMCh <- msgData
} }
func (bcR *BlockchainReactor) processBlocksFromPoolRoutine() error { func (bcR *BlockchainReactor) processBlocksFromPoolRoutine() error {

View File

@ -285,6 +285,9 @@ func init() {
// We haven't received the block at current height. Remove peer. // We haven't received the block at current height. Remove peer.
fsm.pool.removePeerAtCurrentHeight(errNoPeerResponse) fsm.pool.removePeerAtCurrentHeight(errNoPeerResponse)
fsm.resetStateTimer() fsm.resetStateTimer()
if len(fsm.pool.peers) == 0 {
return waitForPeer, errNoPeerResponse
}
return waitForBlock, errNoPeerResponse return waitForBlock, errNoPeerResponse
case stopFSMEv: case stopFSMEv:
@ -325,23 +328,14 @@ func (fsm *bReactorFSM) setLogger(l log.Logger) {
fsm.pool.setLogger(l) fsm.pool.setLogger(l)
} }
func sendMessageToFSMSync(fsm *bReactorFSM, msg bReactorMessageData) error {
err := fsm.handle(&msg)
return err
}
// Starts the FSM goroutine. // Starts the FSM goroutine.
func (fsm *bReactorFSM) start() { func (fsm *bReactorFSM) start() {
_ = sendMessageToFSMSync(fsm, bReactorMessageData{ _ = fsm.handle(&bReactorMessageData{event: startFSMEv})
event: startFSMEv,
})
} }
// Stops the FSM goroutine. // Stops the FSM goroutine.
func (fsm *bReactorFSM) stop() { func (fsm *bReactorFSM) stop() {
_ = sendMessageToFSMSync(fsm, bReactorMessageData{ _ = fsm.handle(&bReactorMessageData{event: stopFSMEv})
event: stopFSMEv,
})
} }
// handle processes messages and events sent to the FSM. // handle processes messages and events sent to the FSM.

Binary file not shown.

After

Width:  |  Height:  |  Size: 73 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 63 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 158 KiB