diff --git a/blockchain/peer.go b/blockchain/peer.go index 5da5c61c..e9377615 100644 --- a/blockchain/peer.go +++ b/blockchain/peer.go @@ -17,7 +17,7 @@ var ( peerTimeout = 15 * time.Second // not const so we can override with tests // Minimum recv rate to ensure we're receiving blocks from a peer fast - // enough. If a peer is not sending us data at at least that rate, we + // enough. If a peer is not sending data at at least that rate, we // consider them to have timedout and we disconnect. // // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, diff --git a/blockchain/pool.go b/blockchain/pool.go index 1c4fb88b..893543ee 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -83,23 +83,26 @@ func (pool *blockPool) rescheduleRequest(peerID p2p.ID, height int64) { // Updates the pool's max height. If no peers are left maxPeerHeight is set to 0. func (pool *blockPool) updateMaxPeerHeight() { - var max int64 + var newMax int64 for _, peer := range pool.peers { - if peer.height > max { - max = peer.height + if peer.height > newMax { + newMax = peer.height } } - if max < pool.maxPeerHeight { - // Remove any planned requests for heights over the new maxPeerHeight + if newMax < pool.maxPeerHeight { + // Remove any planned requests for heights over the new maxPeerHeight. + // This may happen if a peer has updated with lower height. for h := range pool.requests { - if h > max { + if h > newMax { delete(pool.requests, h) } } } - - pool.maxPeerHeight = max + if pool.nextRequestHeight > newMax { + pool.nextRequestHeight = newMax + 1 + } + pool.maxPeerHeight = newMax } // Adds a new peer or updates an existing peer with a new height. diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 247aba9c..9622e7e9 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -240,7 +240,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) } case *bcBlockResponseMessage: - msgData := bcReactorMessage{ + msgForFSM := bcReactorMessage{ event: blockResponseEv, data: bReactorEventData{ peerId: src.ID(), @@ -249,11 +249,11 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) length: len(msgBytes), }, } - bcR.sendMessageToFSM(msgData) + bcR.messagesForFSMCh <- msgForFSM case *bcStatusResponseMessage: // Got a peer status. Unverified. - msgData := bcReactorMessage{ + msgForFSM := bcReactorMessage{ event: statusResponseEv, data: bReactorEventData{ peerId: src.ID(), @@ -261,18 +261,13 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) length: len(msgBytes), }, } - bcR.sendMessageToFSM(msgData) + bcR.messagesForFSMCh <- msgForFSM default: bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg))) } } -func (bcR *BlockchainReactor) sendMessageToFSM(msg bcReactorMessage) { - bcR.Logger.Debug("send message to FSM for processing", "msg", msg.String()) - bcR.messagesForFSMCh <- msg -} - // poolRoutine receives and handles messages from the Receive() routine and from the FSM func (bcR *BlockchainReactor) poolRoutine() { @@ -280,14 +275,12 @@ func (bcR *BlockchainReactor) poolRoutine() { processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond) - statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + doProcessBlockCh := make(chan struct{}, 1) lastHundred := time.Now() lastRate := 0.0 - doProcessBlockCh := make(chan struct{}, 1) - ForLoop: for { select { @@ -341,18 +334,25 @@ ForLoop: } case msg := <-bcR.messagesForFSMCh: + // Sent from the Receive() routine when status (statusResponseEv) and + // block (blockResponseEv) response events are received _ = bcR.fsm.handle(&msg) case msg := <-bcR.errorsForFSMCh: + // Sent from the switch.RemovePeer() routine (RemovePeerEv) and + // FSM state timer expiry routine (stateTimeoutEv). _ = bcR.fsm.handle(&msg) case msg := <-bcR.eventsFromFSMCh: switch msg.event { case syncFinishedEv: + // Sent from the FSM when it enters finished state. break ForLoop case peerErrorEv: + // Sent from the FSM when it detects peer error bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID) if msg.data.err == errNoPeerResponse { + // Sent from the peer timeout handler routine _ = bcR.fsm.handle(&bcReactorMessage{ event: peerRemoveEv, data: bReactorEventData{ @@ -360,6 +360,9 @@ ForLoop: err: msg.data.err, }, }) + } else { + // For slow peers, or errors due to blocks received from wrong peer + // the FSM had already removed the peers } default: bcR.Logger.Error("Event from FSM not supported", "type", msg.event) @@ -480,7 +483,7 @@ func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, t stateName: name, }, } - bcR.sendMessageToFSM(msg) + bcR.errorsForFSMCh <- msg }) } else { (*timer).Reset(timeout) diff --git a/docs/architecture/adr-042-blockchain-reactor-impl.md b/docs/architecture/adr-042-blockchain-reactor-impl.md index 560b358f..8f212460 100644 --- a/docs/architecture/adr-042-blockchain-reactor-impl.md +++ b/docs/architecture/adr-042-blockchain-reactor-impl.md @@ -1,4 +1,4 @@ -# ADR 041: Blockchain Reactor Design +# ADR 042: Blockchain Reactor Design Author: @ancaz @@ -187,19 +187,19 @@ H: The FSM times out in `waitForPeer` state, moves to `finished` state where it #### 2. Typical Fast Sync S: A node fast syncs blocks from honest peers and eventually downloads and executes the penultimate block. + H: The FSM in `waitForBlock` state will receive the processedBlockEv from the reactor and detect that the termination height is achieved. -#### 3. Peer Fakes "Big" Height +#### 3. Peer Claims Big Height but no Blocks S: In this scenario a faulty peer claims a big height (for which there are no blocks). -H: The requests for the non-existing block will timeout, the peer removed and the pool's maxPeerHeight updated. FSM checks if the termination height is achieved when peers are removed. -S': A variation of this scenario is a faulty peer that claims a big height and after receiving the Block request for an non-existing block sends an unsolicited Status response with a lower height. -H': FSM updates the peer height and check if the termination height is achieved. +H: The requests for the non-existing block will timeout, the peer removed and the pool's maxPeerHeight updated. FSM checks if the termination height is achieved when peers are removed. #### 4. Highest Peer Removed or Updated to Short S: The fast sync node is caught up with all peers except one tall peer. The tall peer is removed or it sends Status response with low height. + H: FSM checks termination condition on peer removal and updates. #### 5. Block At Current Height Delayed