From af90cb4f2db260fd1bf2cfec583d2fd0f09a6eb9 Mon Sep 17 00:00:00 2001 From: Anca Zamfir Date: Tue, 7 May 2019 21:06:43 -0400 Subject: [PATCH] cleanup --- Gopkg.lock | 8 +- blockchain/peer.go | 16 +-- blockchain/pool.go | 22 ++-- blockchain/reactor.go | 192 ++++++++++++++++++--------------- blockchain/reactor_fsm.go | 33 +++--- blockchain/reactor_fsm_test.go | 46 ++++++-- 6 files changed, 194 insertions(+), 123 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 530cd89d..56c79ab6 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -170,9 +170,12 @@ revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0" [[projects]] - digest = "1:c568d7727aa262c32bdf8a3f7db83614f7af0ed661474b24588de635c20024c7" + digest = "1:53e8c5c79716437e601696140e8b1801aae4204f4ec54a504333702a49572c4f" name = "github.com/magiconair/properties" - packages = ["."] + packages = [ + ".", + "assert", + ] pruneopts = "UT" revision = "c2353362d570a7bfa228149c62842019201cfb71" version = "v1.8.0" @@ -516,6 +519,7 @@ "github.com/golang/protobuf/ptypes/timestamp", "github.com/gorilla/websocket", "github.com/jmhodges/levigo", + "github.com/magiconair/properties/assert", "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus/promhttp", diff --git a/blockchain/peer.go b/blockchain/peer.go index 42d80993..5da5c61c 100644 --- a/blockchain/peer.go +++ b/blockchain/peer.go @@ -30,16 +30,16 @@ var ( ) type bpPeer struct { - id p2p.ID + logger log.Logger + id p2p.ID + + height int64 // the peer reported height + numPending int32 // number of requests still waiting for block responses + blocks map[int64]*types.Block // blocks received or expected to be received from this peer + timeout *time.Timer + didTimeout bool recvMonitor *flow.Monitor - height int64 // the peer reported height - numPending int32 // number of requests pending assignment or block response - blocks map[int64]*types.Block // blocks received or waiting to be received from this peer - timeout *time.Timer - didTimeout bool - - logger log.Logger errFunc func(err error, peerID p2p.ID) // function to call on error } diff --git a/blockchain/pool.go b/blockchain/pool.go index 38ade94f..8b9a0094 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -239,7 +239,14 @@ func (pool *blockPool) sendRequest(height int64) bool { if peer.height < height { continue } - pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height) + + // Log with Info if the request is made for current processing height, Debug otherwise + if height == pool.height { + pool.logger.Info("assign request to peer", "peer", peer.id, "height", height) + } else { + pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height) + } + if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest { pool.removePeer(peer.id, err) } @@ -279,7 +286,7 @@ func (pool *blockPool) addBlock(peerID p2p.ID, block *types.Block, blockSize int pool.blocks[block.Height] = peerID pool.numPending-- pool.peers[peerID].decrPending(blockSize) - pool.logger.Debug("added new block", "height", block.Height, "from_peer", peerID, "total", len(pool.blocks)) + pool.logger.Info("added new block", "height", block.Height, "from_peer", peerID, "total", len(pool.blocks)) return nil } @@ -305,9 +312,6 @@ func (pool *blockPool) getNextTwoBlocks() (first, second *blockData, err error) if err == nil { err = err2 } - - pool.logger.Debug("blocks at height and/ or height+1", "first", first, "second", second) - return } @@ -336,10 +340,12 @@ func (pool *blockPool) processedCurrentHeightBlock() { } func (pool *blockPool) removePeerAtCurrentHeight(err error) { - first, err := pool.getBlockAndPeerAtHeight(pool.height) - if err == nil { - pool.removePeer(first.peer.id, err) + peerID := pool.blocks[pool.height] + peer := pool.peers[peerID] + if peer == nil { + return } + pool.removePeer(peer.id, err) } func (pool *blockPool) cleanup() { diff --git a/blockchain/reactor.go b/blockchain/reactor.go index bc9bf3ff..10a22d62 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -21,10 +21,6 @@ const ( trySyncIntervalMS = 10 trySendIntervalMS = 10 - // stop syncing when last block's time is - // within this much of the system time. - // stopSyncingDurationMinutes = 10 - // ask for best height every 10s statusUpdateIntervalSeconds = 10 // check if we should switch to consensus reactor @@ -49,15 +45,6 @@ type consensusReactor interface { SwitchToConsensus(sm.State, int) } -type peerError struct { - err error - peerID p2p.ID -} - -func (e peerError) Error() string { - return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) -} - // BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { p2p.BaseReactor @@ -75,15 +62,15 @@ type BlockchainReactor struct { blocksSynced int // Receive goroutine forwards messages to this channel to be processed in the context of the poolRoutine. - messagesForFSMCh chan bReactorMessageData + messagesForFSMCh chan bcReactorMessage // Switch goroutine may send RemovePeer to the blockchain reactor. This is an error message that is relayed // to this channel to be processed in the context of the poolRoutine. - errorsForFSMCh chan bReactorMessageData + errorsForFSMCh chan bcReactorMessage // This channel is used by the FSM and indirectly the block pool to report errors to the blockchain reactor and // the switch. - errorsFromFSMCh chan bReactorMessageData + eventsFromFSMCh chan bcFsmMessage } type BlockRequest struct { @@ -91,12 +78,31 @@ type BlockRequest struct { PeerID p2p.ID } -// bReactorMessageData structure is used by the reactor when sending messages to the FSM. -type bReactorMessageData struct { +// bcReactorMessage is used by the reactor to send messages to the FSM. +type bcReactorMessage struct { event bReactorEvent data bReactorEventData } +type bFsmEvent uint + +const ( + // message type events + peerErrorEv = iota + 1 + syncFinishedEv +) + +type bFsmEventData struct { + peerID p2p.ID + err error +} + +// bcFsmMessage is used by the FSM to send messages to the reactor +type bcFsmMessage struct { + event bFsmEvent + data bFsmEventData +} + // NewBlockchainReactor returns new reactor instance. func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore, fastSync bool) *BlockchainReactor { @@ -107,9 +113,9 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl } const capacity = 1000 - errorsFromFSMCh := make(chan bReactorMessageData, capacity) - messagesForFSMCh := make(chan bReactorMessageData, capacity) - errorsForFSMCh := make(chan bReactorMessageData, capacity) + eventsFromFSMCh := make(chan bcFsmMessage, capacity) + messagesForFSMCh := make(chan bcReactorMessage, capacity) + errorsForFSMCh := make(chan bcReactorMessage, capacity) bcR := &BlockchainReactor{ initialState: state, @@ -118,7 +124,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl fastSync: fastSync, store: store, messagesForFSMCh: messagesForFSMCh, - errorsFromFSMCh: errorsFromFSMCh, + eventsFromFSMCh: eventsFromFSMCh, errorsForFSMCh: errorsForFSMCh, } fsm := NewFSM(store.Height()+1, bcR) @@ -162,17 +168,16 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor by sending our state to peer. func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) { msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()}) - if !peer.Send(BlockchainChannel, msgBytes) { + if !peer.TrySend(BlockchainChannel, msgBytes) { // doing nothing, will try later in `poolRoutine` } // peer is added to the pool once we receive the first - // bcStatusResponseMessage from the peer and call pool.SetPeerHeight + // bcStatusResponseMessage from the peer and call pool.updatePeer() } -// respondToPeer loads a block and sends it to the requesting peer, -// if we have it. Otherwise, we'll respond saying we don't have it. -// According to the Tendermint spec, if all nodes are honest, -// no node should be requesting for a block that's non-existent. +// sendBlockToPeer loads a block and sends it to the requesting peer. +// If the block doesn't exist a bcNoBlockResponseMessage is sent. +// If all nodes are honest, no node should be requesting for a block that doesn't exist. func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcBlockRequestMessage, src p2p.Peer) (queued bool) { @@ -193,14 +198,9 @@ func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcStatusRequestMessa return src.TrySend(BlockchainChannel, msgBytes) } -func (bcR *BlockchainReactor) sendMessageToFSMAsync(msg bReactorMessageData) { - bcR.Logger.Debug("send message to FSM for processing", "msg", msg.String()) - bcR.messagesForFSMCh <- msg -} - // RemovePeer implements Reactor by removing peer from the pool. func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { - msgData := bReactorMessageData{ + msgData := bcReactorMessage{ event: peerRemoveEv, data: bReactorEventData{ peerId: peer.ID(), @@ -242,7 +242,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) } case *bcBlockResponseMessage: - msgData := bReactorMessageData{ + msgData := bcReactorMessage{ event: blockResponseEv, data: bReactorEventData{ peerId: src.ID(), @@ -251,11 +251,11 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) length: len(msgBytes), }, } - bcR.sendMessageToFSMAsync(msgData) + bcR.sendMessageToFSM(msgData) case *bcStatusResponseMessage: // Got a peer status. Unverified. - msgData := bReactorMessageData{ + msgData := bcReactorMessage{ event: statusResponseEv, data: bReactorEventData{ peerId: src.ID(), @@ -263,15 +263,19 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) length: len(msgBytes), }, } - bcR.sendMessageToFSMAsync(msgData) + bcR.sendMessageToFSM(msgData) default: bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg))) } } -// Handle messages from the poolReactor telling the reactor what to do. -// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! +func (bcR *BlockchainReactor) sendMessageToFSM(msg bcReactorMessage) { + bcR.Logger.Debug("send message to FSM for processing", "msg", msg.String()) + bcR.messagesForFSMCh <- msg +} + +// poolRoutine receives and handles messages from the Receive() routine and from the FSM func (bcR *BlockchainReactor) poolRoutine() { bcR.fsm.start() @@ -297,7 +301,7 @@ ForLoop: // - the number of blocks received and waiting to be processed, // - the number of blockResponse messages waiting in messagesForFSMCh, etc. // Currently maxNumPendingRequests value is not changed. - _ = bcR.fsm.handle(&bReactorMessageData{ + _ = bcR.fsm.handle(&bcReactorMessage{ event: makeRequestsEv, data: bReactorEventData{ maxNumRequests: maxNumPendingRequests}}) @@ -325,12 +329,12 @@ ForLoop: } case <-doProcessBlockCh: - err := bcR.processBlocksFromPoolRoutine() + err := bcR.processBlock() if err == errMissingBlocks { - continue ForLoop + continue } // Notify FSM of block processing result. - _ = bcR.fsm.handle(&bReactorMessageData{ + _ = bcR.fsm.handle(&bcReactorMessage{ event: processedBlockEv, data: bReactorEventData{ err: err, @@ -338,7 +342,7 @@ ForLoop: }) if err == errBlockVerificationFailure { - continue ForLoop + continue } doProcessBlockCh <- struct{}{} @@ -357,10 +361,21 @@ ForLoop: case msg := <-bcR.errorsForFSMCh: _ = bcR.fsm.handle(&msg) - case msg := <-bcR.errorsFromFSMCh: - bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerId) - if msg.data.err == errNoPeerResponse { - _ = bcR.fsm.handle(&msg) + case msg := <-bcR.eventsFromFSMCh: + switch msg.event { + case peerErrorEv: + bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID) + if msg.data.err == errNoPeerResponse { + _ = bcR.fsm.handle(&bcReactorMessage{ + event: peerRemoveEv, + data: bReactorEventData{ + peerId: msg.data.peerID, + err: msg.data.err, + }, + }) + } + default: + bcR.Logger.Error("Event from FSM not supported", "type", msg.event) } case <-bcR.Quit(): @@ -376,23 +391,8 @@ func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID) } } -// Called by FSM and pool: -// - pool calls when it detects slow peer or when peer times out -// - FSM calls when: -// - adding a block (addBlock) fails -// - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks -func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { - msgData := bReactorMessageData{ - event: peerRemoveEv, - data: bReactorEventData{ - peerId: peerID, - err: err, - }, - } - bcR.errorsFromFSMCh <- msgData -} +func (bcR *BlockchainReactor) processBlock() error { -func (bcR *BlockchainReactor) processBlocksFromPoolRoutine() error { firstBP, secondBP, err := bcR.fsm.pool.getNextTwoBlocks() if err != nil { // We need both to sync the first block. @@ -429,31 +429,14 @@ func (bcR *BlockchainReactor) processBlocksFromPoolRoutine() error { return nil } -func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { - if timer == nil { - panic("nil timer pointer parameter") - } - if *timer == nil { - *timer = time.AfterFunc(timeout, func() { - msg := bReactorMessageData{ - event: stateTimeoutEv, - data: bReactorEventData{ - stateName: name, - }, - } - bcR.sendMessageToFSMAsync(msg) - }) - } else { - (*timer).Reset(timeout) - } -} - -// BroadcastStatusRequest broadcasts `BlockStore` height. +// Implements bcRMessageInterface +// sendStatusRequest broadcasts `BlockStore` height. func (bcR *BlockchainReactor) sendStatusRequest() { msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.store.Height()}) bcR.Switch.Broadcast(BlockchainChannel, msgBytes) } +// Implements bcRMessageInterface // BlockRequest sends `BlockRequest` height. func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error { peer := bcR.Switch.Peers().Get(peerID) @@ -470,15 +453,52 @@ func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) erro } func (bcR *BlockchainReactor) switchToConsensus() { - conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) if ok { conR.SwitchToConsensus(bcR.state, bcR.blocksSynced) + bcR.eventsFromFSMCh <- bcFsmMessage{event: syncFinishedEv} } else { // Should only happen during testing. } } +// Implements bcRMessageInterface +// Called by FSM and pool: +// - pool calls when it detects slow peer or when peer times out +// - FSM calls when: +// - adding a block (addBlock) fails +// - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks +func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { + msgData := bcFsmMessage{ + event: peerErrorEv, + data: bFsmEventData{ + peerID: peerID, + err: err, + }, + } + bcR.eventsFromFSMCh <- msgData +} + +// Implements bcRMessageInterface +func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { + if timer == nil { + panic("nil timer pointer parameter") + } + if *timer == nil { + *timer = time.AfterFunc(timeout, func() { + msg := bcReactorMessage{ + event: stateTimeoutEv, + data: bReactorEventData{ + stateName: name, + }, + } + bcR.sendMessageToFSM(msg) + }) + } else { + (*timer).Reset(timeout) + } +} + //----------------------------------------------------------------------------- // Messages diff --git a/blockchain/reactor_fsm.go b/blockchain/reactor_fsm.go index 32013939..e63494e0 100644 --- a/blockchain/reactor_fsm.go +++ b/blockchain/reactor_fsm.go @@ -78,7 +78,7 @@ const ( stateTimeoutEv ) -func (msg *bReactorMessageData) String() string { +func (msg *bcReactorMessage) String() string { var dataStr string switch msg.event { @@ -139,10 +139,13 @@ var ( finished *bReactorFSMState ) -// state timers var ( + // timeouts for state timers waitForPeerTimeout = 2 * time.Second waitForBlockAtCurrentHeightTimeout = 5 * time.Second + + // overall timer for fast sync + fastSyncNoProgressTimeout = 5 * time.Second ) // errors @@ -158,7 +161,7 @@ var ( errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added") errSlowPeer = errors.New("peer is not sending us data fast enough") errSwitchRemovesPeer = errors.New("switch is removing peer") - errTimeoutEventWrongState = errors.New("timeout event for a different state than the current one") + errTimeoutEventWrongState = errors.New("timeout event for a state different than the current one") ) func init() { @@ -260,9 +263,9 @@ func init() { } else { fsm.pool.processedCurrentHeightBlock() if fsm.pool.reachedMaxHeight() { - fsm.stop() return finished, nil } + // Since we advanced one block reset the state timer fsm.resetStateTimer() } @@ -270,6 +273,9 @@ func init() { case peerRemoveEv: // This event is sent by the switch to remove disconnected and errored peers. + if len(fsm.pool.peers) == 0 { + return waitForPeer, nil + } fsm.pool.removePeer(data.peerId, data.err) return waitForBlock, nil @@ -328,18 +334,18 @@ func (fsm *bReactorFSM) setLogger(l log.Logger) { fsm.pool.setLogger(l) } -// Starts the FSM goroutine. +// Starts the FSM. func (fsm *bReactorFSM) start() { - _ = fsm.handle(&bReactorMessageData{event: startFSMEv}) + _ = fsm.handle(&bcReactorMessage{event: startFSMEv}) } -// Stops the FSM goroutine. +// Stops the FSM. func (fsm *bReactorFSM) stop() { - _ = fsm.handle(&bReactorMessageData{event: stopFSMEv}) + _ = fsm.handle(&bcReactorMessage{event: stopFSMEv}) } // handle processes messages and events sent to the FSM. -func (fsm *bReactorFSM) handle(msg *bReactorMessageData) error { +func (fsm *bReactorFSM) handle(msg *bcReactorMessage) error { fsm.logger.Debug("FSM received", "event", msg, "state", fsm.state) if fsm.state == nil { @@ -380,13 +386,14 @@ func (fsm *bReactorFSM) isCaughtUp() bool { // Some conditions to determine if we're caught up. // Ensures we've either received a block or waited some amount of time, // and that we're synced to the highest known height. - // Note we use maxPeerHeight - 1 because to sync block H requires block H+1 - // to verify the LastCommit. - receivedBlockOrTimedOut := fsm.pool.height > 0 || time.Since(fsm.startTime) > 5*time.Second - ourChainIsLongestAmongPeers := fsm.pool.maxPeerHeight == 0 || fsm.pool.height >= (fsm.pool.maxPeerHeight-1) || fsm.state == finished + // Note we use maxPeerHeight - 1 because to sync block H requires block H+1 to verify the LastCommit. + receivedBlockOrTimedOut := fsm.pool.height > 0 || time.Since(fsm.startTime) > fastSyncNoProgressTimeout + ourChainIsLongestAmongPeers := fsm.pool.maxPeerHeight == 0 || fsm.state == finished isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers return isCaughtUp + + //return fsm.state == finished } func (fsm *bReactorFSM) makeNextRequests(maxNumPendingRequests int32) { diff --git a/blockchain/reactor_fsm_test.go b/blockchain/reactor_fsm_test.go index d0543e01..84e186cd 100644 --- a/blockchain/reactor_fsm_test.go +++ b/blockchain/reactor_fsm_test.go @@ -20,7 +20,7 @@ type testReactor struct { } func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error { - return fsm.handle(&bReactorMessageData{event: ev, data: data}) + return fsm.handle(&bcReactorMessage{event: ev, data: data}) } var ( @@ -567,7 +567,7 @@ func TestFSMBadBlockFromPeer(t *testing.T) { } } -func TestFSMBlockAtCurrentHeightNeverArrives(t *testing.T) { +func TestFSMBlockAtCurrentHeightDoesNotArriveInTime(t *testing.T) { tests := []struct { name string startingHeight int64 @@ -861,7 +861,7 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) { steps []fsmStepTestValues }{ { - name: "timeout event in state waitForPeer for state == waitForPeer", + name: "timeout event for state waitForPeer while in state waitForPeer", startingHeight: 1, maxRequestsPerPeer: 3, steps: []fsmStepTestValues{ @@ -870,7 +870,7 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) { }, }, { - name: "timeout event for state waitForPeer for state != waitForPeer", + name: "timeout event for state waitForPeer while in a state != waitForPeer", startingHeight: 1, maxRequestsPerPeer: 3, steps: []fsmStepTestValues{ @@ -878,6 +878,40 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) { makeStepStateTimeoutEv("waitForPeer", "waitForPeer", "waitForBlock", errTimeoutEventWrongState), }, }, + { + name: "timeout event for state waitForBlock while in state waitForBlock ", + startingHeight: 1, + maxRequestsPerPeer: 3, + steps: []fsmStepTestValues{ + makeStepStartFSMEv(), + makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), + makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests), + makeStepStateTimeoutEv("waitForBlock", "waitForPeer", "waitForBlock", errNoPeerResponse), + }, + }, + { + name: "timeout event for state waitForBlock while in a state != waitForBlock", + startingHeight: 1, + maxRequestsPerPeer: 3, + steps: []fsmStepTestValues{ + makeStepStartFSMEv(), + makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), + makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests), + makeStepStateTimeoutEv("waitForBlock", "waitForBlock", "waitForPeer", errTimeoutEventWrongState), + }, + }, + { + name: "timeout event for state waitForBlock with multiple peers", + startingHeight: 1, + maxRequestsPerPeer: 3, + steps: []fsmStepTestValues{ + makeStepStartFSMEv(), + makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), + makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests), + makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), + makeStepStateTimeoutEv("waitForBlock", "waitForBlock", "waitForBlock", errNoPeerResponse), + }, + }, } for _, tt := range tests { @@ -1085,7 +1119,7 @@ func TestFSMPeerTimeout(t *testing.T) { sendStatusResponse(fsm, peerID, 10) time.Sleep(5 * time.Millisecond) - if err := fsm.handle(&bReactorMessageData{ + if err := fsm.handle(&bcReactorMessage{ event: makeRequestsEv, data: bReactorEventData{maxNumRequests: maxNumPendingRequests}}); err != nil { } @@ -1151,7 +1185,7 @@ func (testR *testReactor) resetStateTimer(name string, timer **time.Timer, timeo func sendStatusResponse(fsm *bReactorFSM, peerID p2p.ID, height int64) { msgBytes := makeStatusResponseMessage(height) - _ = fsm.handle(&bReactorMessageData{ + _ = fsm.handle(&bcReactorMessage{ event: statusResponseEv, data: bReactorEventData{ peerId: peerID,