diff --git a/blockchain_new/peer_test.go b/blockchain_new/peer_test.go index e277887e..ffe61626 100644 --- a/blockchain_new/peer_test.go +++ b/blockchain_new/peer_test.go @@ -90,6 +90,9 @@ func TestPeerTimer(t *testing.T) { assert.Equal(t, 1, numErrFuncCalls) assert.Equal(t, lastErr, errNoPeerResponse) assert.True(t, peer.didTimeout) + + // Restore the peerTimeout to its original value + peerTimeout = 15 * time.Second } func TestIncrPending(t *testing.T) { @@ -111,6 +114,9 @@ func TestIncrPending(t *testing.T) { assert.NotNil(t, peer.recvMonitor) assert.NotNil(t, peer.timeout) assert.Equal(t, int32(2), peer.numPending) + + // Restore the peerTimeout to its original value + peerTimeout = 15 * time.Second } func TestDecrPending(t *testing.T) { @@ -140,6 +146,9 @@ func TestDecrPending(t *testing.T) { assert.Equal(t, int32(1), peer.numPending) // make sure timer is running and stop it checkByStoppingPeerTimer(t, peer, true) + + // Restore the peerTimeout to its original value + peerTimeout = 15 * time.Second } func TestCanBeRemovedDueToExpiration(t *testing.T) { @@ -157,6 +166,10 @@ func TestCanBeRemovedDueToExpiration(t *testing.T) { time.Sleep(2 * time.Millisecond) // timer expired, should be able to remove peer assert.Equal(t, errNoPeerResponse, peer.isGood()) + + // Restore the peerTimeout to its original value + peerTimeout = 15 * time.Second + } func TestCanBeRemovedDueToLowSpeed(t *testing.T) { @@ -216,5 +229,6 @@ func TestCleanupPeer(t *testing.T) { mtx.Unlock() checkByStoppingPeerTimer(t, peer, false) - + // Restore the peerTimeout to its original value + peerTimeout = 15 * time.Second } diff --git a/blockchain_new/pool.go b/blockchain_new/pool.go index 65b1ef16..7c9cc157 100644 --- a/blockchain_new/pool.go +++ b/blockchain_new/pool.go @@ -4,6 +4,7 @@ import ( "fmt" "sort" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" @@ -80,6 +81,7 @@ func (pool *blockPool) reachedMaxHeight() bool { } func (pool *blockPool) rescheduleRequest(peerID p2p.ID, height int64) { + pool.logger.Debug("reschedule requests made to peer for height ", "peerID", peerID, "height", height) pool.requests[height] = true delete(pool.blocks, height) delete(pool.peers[peerID].blocks, height) @@ -99,6 +101,7 @@ func (pool *blockPool) updateMaxPeerHeight() { // Adds a new peer or updates an existing peer with a new height. // If the peer is too short it is removed. func (pool *blockPool) updatePeer(peerID p2p.ID, height int64) error { + pool.logger.Debug("updatePeer", "peerID", peerID, "height", height) peer := pool.peers[peerID] if height < pool.height { @@ -157,6 +160,8 @@ func (pool *blockPool) deletePeer(peerID p2p.ID) { // Removes any blocks and requests associated with the peer and deletes the peer. // Also triggers new requests if blocks have been removed. func (pool *blockPool) removePeer(peerID p2p.ID, err error) { + pool.logger.Debug("removing peer", "peerID", peerID) + peer := pool.peers[peerID] if peer == nil { return @@ -252,33 +257,17 @@ func (pool *blockPool) invalidateFirstTwoBlocks(err error) { } func (pool *blockPool) processedCurrentHeightBlock() { - peerID := pool.blocks[pool.height] - delete(pool.peers[peerID].blocks, pool.height) + peerID, peerOk := pool.blocks[pool.height] + if peerOk { + delete(pool.peers[peerID].blocks, pool.height) + } delete(pool.blocks, pool.height) + pool.logger.Debug("processed and removed block at height", "height", pool.height) pool.height++ pool.removeShortPeers() } -func (pool *blockPool) makeRequestBatch() []int { - pool.removeUselessPeers() - // If running low on planned requests, make more. - for height := pool.nextRequestHeight; pool.numPending < int32(maxNumPendingRequests); height++ { - if pool.nextRequestHeight > pool.maxPeerHeight { - break - } - pool.requests[height] = true - pool.nextRequestHeight++ - } - - heights := make([]int, 0, len(pool.requests)) - for k := range pool.requests { - heights = append(heights, int(k)) - } - sort.Ints(heights) - return heights -} - -func (pool *blockPool) removeUselessPeers() { +func (pool *blockPool) removeBadPeers() { pool.removeShortPeers() for _, peer := range pool.peers { if err := peer.isGood(); err != nil { @@ -290,22 +279,38 @@ func (pool *blockPool) removeUselessPeers() { } } -func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) (err error) { - pool.removeUselessPeers() - heights := pool.makeRequestBatch() +func (pool *blockPool) makeRequestBatch(maxNumPendingRequests int32) []int { + pool.removeBadPeers() + // If running low on planned requests, make more. + numNeeded := int32(cmn.MinInt(int(maxNumPendingRequests), len(pool.peers)*int(maxRequestsPerPeer))) - pool.numPending + for int32(len(pool.requests)) < numNeeded { + if pool.nextRequestHeight > pool.maxPeerHeight { + break + } + pool.requests[pool.nextRequestHeight] = true + pool.nextRequestHeight++ + } + + heights := make([]int, 0, len(pool.requests)) + for k := range pool.requests { + heights = append(heights, int(k)) + } + sort.Ints(heights) + return heights +} + +func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) { + heights := pool.makeRequestBatch(maxNumPendingRequests) + pool.logger.Debug("makeNextRequests will make following requests", "number", len(heights), "heights", heights) for _, height := range heights { h := int64(height) - if pool.numPending >= int32(len(pool.peers))*maxRequestsPerPeer { - break - } - if err = pool.sendRequest(h); err == errNoPeerFoundForHeight { - break + if err := pool.sendRequest(h); err != nil { + // Errors from sendRequest() are handled by this function + return } delete(pool.requests, h) } - - return err } func (pool *blockPool) sendRequest(height int64) error { @@ -317,7 +322,10 @@ func (pool *blockPool) sendRequest(height int64) error { continue } pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height) - _ = pool.toBcR.sendBlockRequest(peer.id, height) + if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest { + pool.removePeer(peer.id, err) + pool.toBcR.sendPeerError(err, peer.id) + } pool.blocks[height] = peer.id pool.numPending++ diff --git a/blockchain_new/pool_test.go b/blockchain_new/pool_test.go index 4a9bc591..0d6b17a3 100644 --- a/blockchain_new/pool_test.go +++ b/blockchain_new/pool_test.go @@ -340,8 +340,7 @@ func TestBlockPoolSendRequestBatch(t *testing.T) { pool := tt.pool maxRequestsPerPeer = int32(tt.maxRequestsPerPeer) - err := pool.makeNextRequests(10) - assert.Nil(t, err) + pool.makeNextRequests(10) assert.Equal(t, tt.expNumPending, pool.numPending) assert.Equal(t, testResults.numRequestsSent, maxRequestsPerPeer*int32(len(pool.peers))) diff --git a/blockchain_new/reactor.go b/blockchain_new/reactor.go index 0c79ea1a..0324d389 100644 --- a/blockchain_new/reactor.go +++ b/blockchain_new/reactor.go @@ -249,6 +249,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) event: blockResponseEv, data: bReactorEventData{ peerId: src.ID(), + height: msg.Block.Height, block: msg.Block, length: len(msgBytes), }, @@ -299,9 +300,13 @@ ForLoop: case doSendCh <- struct{}{}: default: } - //continue ForLoop case <-doSendCh: + // Tell FSM to make more requests. + // The maxNumPendingRequests may be changed based on low/ high watermark thresholds for + // - the number of blocks received and waiting to be processed, + // - the number of blockResponse messages waiting in messagesForFSMCh, etc. + // Currently maxNumPendingRequests value is not changed. msgData := bReactorMessageData{ event: makeRequestsEv, data: bReactorEventData{ @@ -334,7 +339,6 @@ ForLoop: case doProcessCh <- struct{}{}: default: } - //continue FOR_LOOP case <-doProcessCh: err := bcR.processBlocksFromPoolRoutine() @@ -364,7 +368,6 @@ ForLoop: "max_peer_height", bcR.fsm.pool.getMaxPeerHeight(), "blocks/s", lastRate) lastHundred = time.Now() } - //continue ForLoop case msg := <-bcR.messagesForFSMCh: _ = sendMessageToFSMSync(bcR.fsm, msg) @@ -448,7 +451,6 @@ func (bcR *BlockchainReactor) sendStatusRequest() { // BlockRequest sends `BlockRequest` height. func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error { peer := bcR.Switch.Peers().Get(peerID) - if peer == nil { return errNilPeerForBlockRequest } diff --git a/blockchain_new/reactor_fsm.go b/blockchain_new/reactor_fsm.go index 7db96e62..a5a62e80 100644 --- a/blockchain_new/reactor_fsm.go +++ b/blockchain_new/reactor_fsm.go @@ -157,7 +157,7 @@ var ( errSendQueueFull = errors.New("block request not made, send-queue is full") errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added") errSlowPeer = errors.New("peer is not sending us data fast enough") - errNoPeerFoundForHeight = errors.New("could not found peer for block request") + errNoPeerFoundForHeight = errors.New("could not find peer for block request") ) func init() { @@ -239,9 +239,7 @@ func init() { // A block was received that was unsolicited, from unexpected peer, or that we already have it. // Ignore block, remove peer and send error to switch. fsm.pool.removePeer(data.peerId, err) - if err != nil { - fsm.toBcR.sendPeerError(err, data.peerId) - } + fsm.toBcR.sendPeerError(err, data.peerId) } return waitForBlock, err @@ -255,6 +253,7 @@ func init() { fsm.toBcR.sendPeerError(data.err, first.peer.id) fsm.logger.Error("send peer error for", "peer", second.peer.id) fsm.toBcR.sendPeerError(data.err, second.peer.id) + // Remove the first two blocks. This will also remove the peers fsm.pool.invalidateFirstTwoBlocks(data.err) } else { fsm.pool.processedCurrentHeightBlock() @@ -272,8 +271,8 @@ func init() { return waitForBlock, nil case makeRequestsEv: - err := fsm.makeNextRequests(data.maxNumRequests) - return waitForBlock, err + fsm.makeNextRequests(data.maxNumRequests) + return waitForBlock, nil case stopFSMEv: return finished, errNoErrorFinished @@ -399,6 +398,6 @@ func (fsm *bReactorFSM) cleanup() { // TODO } -func (fsm *bReactorFSM) makeNextRequests(maxNumPendingRequests int32) error { - return fsm.pool.makeNextRequests(maxNumPendingRequests) +func (fsm *bReactorFSM) makeNextRequests(maxNumPendingRequests int32) { + fsm.pool.makeNextRequests(maxNumPendingRequests) } diff --git a/blockchain_new/reactor_fsm_test.go b/blockchain_new/reactor_fsm_test.go index b2a865cf..11481b3e 100644 --- a/blockchain_new/reactor_fsm_test.go +++ b/blockchain_new/reactor_fsm_test.go @@ -1,6 +1,7 @@ package blockchain_new import ( + "fmt" "testing" "time" @@ -8,8 +9,19 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" ) +// reactor for FSM testing +type testReactor struct { + logger log.Logger + fsm *bReactorFSM +} + +func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error { + return fsm.handle(&bReactorMessageData{event: ev, data: data}) +} + var ( failSendStatusRequest bool failSendBlockRequest bool @@ -46,106 +58,463 @@ func resetTestValues() { } type fsmStepTestValues struct { - currentState string - event bReactorEvent - expectedState string + startingHeight int64 + currentState string + event bReactorEvent + data bReactorEventData + errWanted error + expectedState string - // input failStatusReq bool shouldSendStatusReq bool failBlockReq bool blockReqIncreased bool - - data bReactorEventData + blocksAdded []int64 expectedLastBlockReq *lastBlockRequestT } -func newTestReactor() *testReactor { +var ( + unknown_StartFSMEv_WantWaitForPeer = fsmStepTestValues{ + currentState: "unknown", event: startFSMEv, + expectedState: "waitForPeer", + shouldSendStatusReq: true} + waitForBlock_ProcessedBlockEv_WantWaitForBlock = fsmStepTestValues{ + currentState: "waitForBlock", event: processedBlockEv, + data: bReactorEventData{err: nil}, + expectedState: "waitForBlock", + } + waitForBlock_ProcessedBlockEv_Err_WantWaitForBlock = fsmStepTestValues{ + currentState: "waitForBlock", event: processedBlockEv, + data: bReactorEventData{err: errBlockVerificationFailure}, + expectedState: "waitForBlock", + errWanted: errBlockVerificationFailure, + } + waitForBlock_ProcessedBlockEv_WantFinished = fsmStepTestValues{ + currentState: "waitForBlock", event: processedBlockEv, + data: bReactorEventData{err: nil}, + expectedState: "finished", + } +) + +func waitForBlock_MakeRequestEv_WantWaitForBlock(maxPendingRequests int32) fsmStepTestValues { + return fsmStepTestValues{ + currentState: "waitForBlock", event: makeRequestsEv, + data: bReactorEventData{maxNumRequests: maxPendingRequests}, + expectedState: "waitForBlock", + blockReqIncreased: true, + } +} + +func waitForPeer_StatusEv_WantWaitForBlock(peerID p2p.ID, height int64) fsmStepTestValues { + return fsmStepTestValues{ + currentState: "waitForPeer", event: statusResponseEv, + data: bReactorEventData{peerId: peerID, height: height}, + expectedState: "waitForBlock"} +} + +func waitForBlock_StatusEv_WantWaitForBlock(peerID p2p.ID, height int64) fsmStepTestValues { + return fsmStepTestValues{ + currentState: "waitForBlock", event: statusResponseEv, + data: bReactorEventData{peerId: peerID, height: height}, + expectedState: "waitForBlock"} +} + +func waitForBlock_BlockRespEv_WantWaitForBlock(peerID p2p.ID, height int64, prevBlocks []int64) fsmStepTestValues { + txs := []types.Tx{types.Tx("foo"), types.Tx("bar")} + + return fsmStepTestValues{ + currentState: "waitForBlock", event: blockResponseEv, + data: bReactorEventData{ + peerId: peerID, + height: height, + block: types.MakeBlock(int64(height), txs, nil, nil)}, + expectedState: "waitForBlock", + blocksAdded: append(prevBlocks, height), + } +} + +func newTestReactor(height int64) *testReactor { testBcR := &testReactor{logger: log.TestingLogger()} - testBcR.fsm = NewFSM(1, testBcR) + testBcR.fsm = NewFSM(height, testBcR) testBcR.fsm.setLogger(testBcR.logger) return testBcR } -// WIP -func TestFSMTransitionSequences(t *testing.T) { - maxRequestsPerPeer = 2 - fsmTransitionSequenceTests := [][]fsmStepTestValues{ - { - {currentState: "unknown", event: startFSMEv, shouldSendStatusReq: true, - expectedState: "waitForPeer"}, - {currentState: "waitForPeer", event: statusResponseEv, - data: bReactorEventData{peerId: "P1", height: 10}, - expectedState: "waitForBlock"}, - {currentState: "waitForBlock", event: makeRequestsEv, - data: bReactorEventData{maxNumRequests: maxNumPendingRequests}, - blockReqIncreased: true, - expectedState: "waitForBlock"}, - }, - } - - for _, tt := range fsmTransitionSequenceTests { - // Create test reactor - testBcR := newTestReactor() - resetTestValues() - - for _, step := range tt { - assert.Equal(t, step.currentState, testBcR.fsm.state.name) - failSendStatusRequest = step.failStatusReq - failSendBlockRequest = step.failBlockReq - - oldNumStatusRequests := numStatusRequests - oldNumBlockRequests := numBlockRequests - - _ = sendEventToFSM(testBcR.fsm, step.event, step.data) - - if step.shouldSendStatusReq { - assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) - } else { - assert.Equal(t, oldNumStatusRequests, numStatusRequests) - } - - if step.blockReqIncreased { - assert.Equal(t, oldNumBlockRequests+maxRequestsPerPeer, numBlockRequests) - } else { - assert.Equal(t, oldNumBlockRequests, numBlockRequests) - } - - assert.Equal(t, step.expectedState, testBcR.fsm.state.name) +func fixBlockResponseEvStep(step *fsmStepTestValues, testBcR *testReactor) { + // There is no good way to know to which peer a block request was sent. + // So before we simulate a block response we cheat and look where it is expected from. + if step.event == blockResponseEv { + height := step.data.height + peerID, ok := testBcR.fsm.pool.blocks[height] + if ok { + step.data.peerId = peerID } } } -func TestReactorFSMBasic(t *testing.T) { - maxRequestsPerPeer = 2 - - // Create test reactor - testBcR := newTestReactor() - resetTestValues() - fsm := testBcR.fsm - - if err := fsm.handle(&bReactorMessageData{event: startFSMEv}); err != nil { +func TestFSMBasic(t *testing.T) { + tests := []struct { + name string + startingHeight int64 + maxRequestsPerPeer int32 + steps []fsmStepTestValues + }{ + { + name: "one block, one peer", + startingHeight: 1, + maxRequestsPerPeer: 2, + steps: []fsmStepTestValues{ + // startFSMEv + unknown_StartFSMEv_WantWaitForPeer, + // statusResponseEv + waitForPeer_StatusEv_WantWaitForBlock("P1", 2), + // makeRequestEv + waitForBlock_MakeRequestEv_WantWaitForBlock(maxNumPendingRequests), + // blockResponseEv for height 1 + waitForBlock_BlockRespEv_WantWaitForBlock("P1", 1, []int64{}), + // blockResponseEv for height 2 + waitForBlock_BlockRespEv_WantWaitForBlock("P1", 2, []int64{1}), + // processedBlockEv + waitForBlock_ProcessedBlockEv_WantFinished, + }, + }, + { + name: "multi block, multi peer", + startingHeight: 1, + maxRequestsPerPeer: 2, + steps: []fsmStepTestValues{ + // startFSMEv + unknown_StartFSMEv_WantWaitForPeer, + // statusResponseEv from P1 + waitForPeer_StatusEv_WantWaitForBlock("P1", 4), + // statusResponseEv from P2 + waitForBlock_StatusEv_WantWaitForBlock("P2", 4), + // makeRequestEv + waitForBlock_MakeRequestEv_WantWaitForBlock(maxNumPendingRequests), + // blockResponseEv for height 1 + waitForBlock_BlockRespEv_WantWaitForBlock("P1", 1, []int64{}), + // blockResponseEv for height 2 + waitForBlock_BlockRespEv_WantWaitForBlock("P1", 2, []int64{1}), + // blockResponseEv for height 3 + waitForBlock_BlockRespEv_WantWaitForBlock("P2", 3, []int64{1, 2}), + // blockResponseEv for height 4 + waitForBlock_BlockRespEv_WantWaitForBlock("P2", 4, []int64{1, 2, 3}), + // processedBlockEv + waitForBlock_ProcessedBlockEv_WantWaitForBlock, + waitForBlock_ProcessedBlockEv_WantWaitForBlock, + waitForBlock_ProcessedBlockEv_WantFinished, + }, + }, } - // Check that FSM sends a status request message - assert.Equal(t, 1, numStatusRequests) - assert.Equal(t, waitForPeer.name, fsm.state.name) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create test reactor + testBcR := newTestReactor(tt.startingHeight) + resetTestValues() - // Send a status response message to FSM - peerID := p2p.ID(cmn.RandStr(12)) - sendStatusResponse2(fsm, peerID, 10) + if tt.maxRequestsPerPeer != 0 { + maxRequestsPerPeer = tt.maxRequestsPerPeer + } - if err := fsm.handle(&bReactorMessageData{ - event: makeRequestsEv, - data: bReactorEventData{maxNumRequests: maxNumPendingRequests}}); err != nil { + for _, step := range tt.steps { + assert.Equal(t, step.currentState, testBcR.fsm.state.name) + failSendStatusRequest = step.failStatusReq + failSendBlockRequest = step.failBlockReq + + oldNumStatusRequests := numStatusRequests + oldNumBlockRequests := numBlockRequests + + fixBlockResponseEvStep(&step, testBcR) + fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) + assert.Equal(t, step.errWanted, fsmErr) + + if step.shouldSendStatusReq { + assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) + } else { + assert.Equal(t, oldNumStatusRequests, numStatusRequests) + } + + if step.blockReqIncreased { + assert.True(t, oldNumBlockRequests < numBlockRequests) + } else { + assert.Equal(t, oldNumBlockRequests, numBlockRequests) + } + + for _, height := range step.blocksAdded { + _, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(height) + assert.Nil(t, err) + } + assert.Equal(t, step.expectedState, testBcR.fsm.state.name) + } + + }) + } +} + +func TestFSMBlockVerificationFailure(t *testing.T) { + tests := []struct { + name string + startingHeight int64 + maxRequestsPerPeer int32 + steps []fsmStepTestValues + }{ + { + name: "block verification failure", + startingHeight: 1, + maxRequestsPerPeer: 2, + steps: []fsmStepTestValues{ + // startFSMEv + unknown_StartFSMEv_WantWaitForPeer, + // statusResponseEv from P1 + waitForPeer_StatusEv_WantWaitForBlock("P1", 3), + // statusResponseEv from P2 + waitForBlock_StatusEv_WantWaitForBlock("P2", 3), + // makeRequestEv + waitForBlock_MakeRequestEv_WantWaitForBlock(maxNumPendingRequests), + // blockResponseEv for height 1 + waitForBlock_BlockRespEv_WantWaitForBlock("P1", 1, []int64{}), + // blockResponseEv for height 2 + waitForBlock_BlockRespEv_WantWaitForBlock("P1", 2, []int64{1}), + // blockResponseEv for height 3 + waitForBlock_BlockRespEv_WantWaitForBlock("P2", 3, []int64{1, 2}), + + // processedBlockEv with Error + waitForBlock_ProcessedBlockEv_Err_WantWaitForBlock, + + // makeRequestEv + waitForBlock_MakeRequestEv_WantWaitForBlock(maxNumPendingRequests), + // blockResponseEv for height 1 + waitForBlock_BlockRespEv_WantWaitForBlock("P2", 1, []int64{3}), + // blockResponseEv for height 2 + waitForBlock_BlockRespEv_WantWaitForBlock("P2", 2, []int64{1, 3}), + + waitForBlock_ProcessedBlockEv_WantWaitForBlock, + waitForBlock_ProcessedBlockEv_WantFinished, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create test reactor + testBcR := newTestReactor(tt.startingHeight) + resetTestValues() + + if tt.maxRequestsPerPeer != 0 { + maxRequestsPerPeer = tt.maxRequestsPerPeer + } + + for _, step := range tt.steps { + assert.Equal(t, step.currentState, testBcR.fsm.state.name) + failSendStatusRequest = step.failStatusReq + failSendBlockRequest = step.failBlockReq + + oldNumStatusRequests := numStatusRequests + oldNumBlockRequests := numBlockRequests + + var heightBefore int64 + if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure { + heightBefore = testBcR.fsm.pool.height + } + + fixBlockResponseEvStep(&step, testBcR) + + fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) + assert.Equal(t, step.errWanted, fsmErr) + + if step.shouldSendStatusReq { + assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) + } else { + assert.Equal(t, oldNumStatusRequests, numStatusRequests) + } + + if step.blockReqIncreased { + assert.True(t, oldNumBlockRequests < numBlockRequests) + } else { + assert.Equal(t, oldNumBlockRequests, numBlockRequests) + } + + for _, height := range step.blocksAdded { + _, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(height) + assert.Nil(t, err) + } + + if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure { + heightAfter := testBcR.fsm.pool.height + assert.Equal(t, heightBefore, heightAfter) + firstAfter, err1 := testBcR.fsm.pool.getBlockAndPeerAtHeight(testBcR.fsm.pool.height) + secondAfter, err2 := testBcR.fsm.pool.getBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1) + assert.NotNil(t, err1) + assert.NotNil(t, err2) + assert.Nil(t, firstAfter.block) + assert.Nil(t, firstAfter.peer) + assert.Nil(t, secondAfter.block) + assert.Nil(t, secondAfter.peer) + } + assert.Equal(t, step.expectedState, testBcR.fsm.state.name) + } + }) + } +} + +const ( + maxStartingHeightTest = 100 + maxRequestsPerPeerTest = 40 + maxTotalPendingRequestsTest = 600 + maxNumPeersTest = 1000 + maxNumBlocksInChainTest = 100000 +) + +type testFields struct { + name string + startingHeight int64 + maxRequestsPerPeer int32 + maxPendingRequests int32 + steps []fsmStepTestValues +} + +func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPeers int, randomPeerHeights bool, + maxRequestsPerPeer int32, maxPendingRequests int32) testFields { + + // Generate numPeers peers with random or numBlocks heights according to the randomPeerHeights flag + peerHeights := make([]int64, numPeers) + for i := 0; i < numPeers; i++ { + if i == 0 { + peerHeights[0] = numBlocks + continue + } + if randomPeerHeights { + peerHeights[i] = int64(cmn.MaxInt(cmn.RandIntn(int(numBlocks)), int(startingHeight)+1)) + } else { + peerHeights[i] = numBlocks + } + } + + // Approximate the slice capacity to save time for appends. + testSteps := make([]fsmStepTestValues, 0, 3*numBlocks+int64(numPeers)) + + testName := fmt.Sprintf("%v-blocks %v-startingHeight %v-peers %v-maxRequestsPerPeer %v-maxNumPendingRequests", + numBlocks, startingHeight, numPeers, maxRequestsPerPeer, maxPendingRequests) + + // Add startFSMEv step. + testSteps = append(testSteps, unknown_StartFSMEv_WantWaitForPeer) + + // For each peer, add statusResponseEv step. + for i := 0; i < numPeers; i++ { + peerName := fmt.Sprintf("P%d", i) + if i == 0 { + testSteps = append(testSteps, waitForPeer_StatusEv_WantWaitForBlock(p2p.ID(peerName), peerHeights[i])) + } else { + testSteps = append(testSteps, waitForBlock_StatusEv_WantWaitForBlock(p2p.ID(peerName), peerHeights[i])) + } + } + + height := startingHeight + numBlocksReceived := 0 + prevBlocks := make([]int64, 0, maxPendingRequests) + +forLoop: + for i := 0; i < int(numBlocks); i++ { + + // Add the makeRequestEv step periodically + if i%int(maxRequestsPerPeer) == 0 { + testSteps = append(testSteps, waitForBlock_MakeRequestEv_WantWaitForBlock(maxPendingRequests)) + } + + // Add the blockRespEv step + testSteps = append(testSteps, waitForBlock_BlockRespEv_WantWaitForBlock("P0", height, prevBlocks)) + prevBlocks = append(prevBlocks, height) + height++ + numBlocksReceived++ + + // Add the processedBlockEv step periodically + if numBlocksReceived >= int(maxRequestsPerPeer) || height >= numBlocks { + for j := int(height) - numBlocksReceived; j < int(height); j++ { + if j >= int(numBlocks)-1 { + // This is the last block that is processed, we should be in "finished" state. + testSteps = append(testSteps, waitForBlock_ProcessedBlockEv_WantFinished) + break forLoop + } + testSteps = append(testSteps, waitForBlock_ProcessedBlockEv_WantWaitForBlock) + } + numBlocksReceived = 0 + prevBlocks = make([]int64, 0, maxPendingRequests) + } + } + + return testFields{ + name: testName, + startingHeight: startingHeight, + maxRequestsPerPeer: maxRequestsPerPeer, + steps: testSteps, + } +} + +func makeCorrectTransitionSequenceWithRandomParameters() testFields { + // generate a starting height for fast sync + startingHeight := int64(cmn.RandIntn(maxStartingHeightTest) + 1) + + // generate the number of requests per peer + maxRequestsPerPeer := int32(cmn.RandIntn(maxRequestsPerPeerTest) + 1) + + // generate the maximum number of total pending requests + maxPendingRequests := int32(cmn.RandIntn(maxTotalPendingRequestsTest) + 1) + + // generate the number of blocks to be synced + numBlocks := int64(cmn.RandIntn(maxNumBlocksInChainTest)) + startingHeight + + // generate a number of peers and their heights + numPeers := cmn.RandIntn(maxNumPeersTest) + 1 + + return makeCorrectTransitionSequence(startingHeight, numBlocks, numPeers, true, maxRequestsPerPeer, maxPendingRequests) +} + +func TestFSMCorrectTransitionSequences(t *testing.T) { + + tests := []testFields{ + makeCorrectTransitionSequence(1, 100, 10, true, 10, 40), + makeCorrectTransitionSequenceWithRandomParameters(), + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create test reactor + testBcR := newTestReactor(tt.startingHeight) + resetTestValues() + + if tt.maxRequestsPerPeer != 0 { + maxRequestsPerPeer = tt.maxRequestsPerPeer + } + + for _, step := range tt.steps { + assert.Equal(t, step.currentState, testBcR.fsm.state.name) + failSendStatusRequest = step.failStatusReq + failSendBlockRequest = step.failBlockReq + + oldNumStatusRequests := numStatusRequests + fixBlockResponseEvStep(&step, testBcR) + + fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) + assert.Equal(t, step.errWanted, fsmErr) + + if step.shouldSendStatusReq { + assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) + } else { + assert.Equal(t, oldNumStatusRequests, numStatusRequests) + } + + for _, height := range step.blocksAdded { + _, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(height) + assert.Nil(t, err) + } + assert.Equal(t, step.expectedState, testBcR.fsm.state.name) + } + + }) } - // Check that FSM sends a block request message and... - assert.Equal(t, maxRequestsPerPeer, numBlockRequests) - // ... the block request has the expected height - assert.Equal(t, maxRequestsPerPeer, int32(len(fsm.pool.peers[peerID].blocks))) - assert.Equal(t, waitForBlock.name, fsm.state.name) } func TestReactorFSMPeerTimeout(t *testing.T) { @@ -153,7 +522,7 @@ func TestReactorFSMPeerTimeout(t *testing.T) { resetTestValues() peerTimeout = 20 * time.Millisecond // Create and start the FSM - testBcR := newTestReactor() + testBcR := newTestReactor(1) fsm := testBcR.fsm fsm.start() @@ -181,17 +550,6 @@ func TestReactorFSMPeerTimeout(t *testing.T) { } -// reactor for FSM testing -type testReactor struct { - logger log.Logger - fsm *bReactorFSM - testCh chan bReactorMessageData -} - -func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error { - return fsm.handle(&bReactorMessageData{event: ev, data: data}) -} - // ---------------------------------------- // implementation for the test reactor APIs