more tests

This commit is contained in:
Anca Zamfir
2019-04-12 22:02:30 -04:00
parent fa9137af93
commit 5787f78841
6 changed files with 516 additions and 136 deletions

View File

@ -90,6 +90,9 @@ func TestPeerTimer(t *testing.T) {
assert.Equal(t, 1, numErrFuncCalls) assert.Equal(t, 1, numErrFuncCalls)
assert.Equal(t, lastErr, errNoPeerResponse) assert.Equal(t, lastErr, errNoPeerResponse)
assert.True(t, peer.didTimeout) assert.True(t, peer.didTimeout)
// Restore the peerTimeout to its original value
peerTimeout = 15 * time.Second
} }
func TestIncrPending(t *testing.T) { func TestIncrPending(t *testing.T) {
@ -111,6 +114,9 @@ func TestIncrPending(t *testing.T) {
assert.NotNil(t, peer.recvMonitor) assert.NotNil(t, peer.recvMonitor)
assert.NotNil(t, peer.timeout) assert.NotNil(t, peer.timeout)
assert.Equal(t, int32(2), peer.numPending) assert.Equal(t, int32(2), peer.numPending)
// Restore the peerTimeout to its original value
peerTimeout = 15 * time.Second
} }
func TestDecrPending(t *testing.T) { func TestDecrPending(t *testing.T) {
@ -140,6 +146,9 @@ func TestDecrPending(t *testing.T) {
assert.Equal(t, int32(1), peer.numPending) assert.Equal(t, int32(1), peer.numPending)
// make sure timer is running and stop it // make sure timer is running and stop it
checkByStoppingPeerTimer(t, peer, true) checkByStoppingPeerTimer(t, peer, true)
// Restore the peerTimeout to its original value
peerTimeout = 15 * time.Second
} }
func TestCanBeRemovedDueToExpiration(t *testing.T) { func TestCanBeRemovedDueToExpiration(t *testing.T) {
@ -157,6 +166,10 @@ func TestCanBeRemovedDueToExpiration(t *testing.T) {
time.Sleep(2 * time.Millisecond) time.Sleep(2 * time.Millisecond)
// timer expired, should be able to remove peer // timer expired, should be able to remove peer
assert.Equal(t, errNoPeerResponse, peer.isGood()) assert.Equal(t, errNoPeerResponse, peer.isGood())
// Restore the peerTimeout to its original value
peerTimeout = 15 * time.Second
} }
func TestCanBeRemovedDueToLowSpeed(t *testing.T) { func TestCanBeRemovedDueToLowSpeed(t *testing.T) {
@ -216,5 +229,6 @@ func TestCleanupPeer(t *testing.T) {
mtx.Unlock() mtx.Unlock()
checkByStoppingPeerTimer(t, peer, false) checkByStoppingPeerTimer(t, peer, false)
// Restore the peerTimeout to its original value
peerTimeout = 15 * time.Second
} }

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"sort" "sort"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -80,6 +81,7 @@ func (pool *blockPool) reachedMaxHeight() bool {
} }
func (pool *blockPool) rescheduleRequest(peerID p2p.ID, height int64) { func (pool *blockPool) rescheduleRequest(peerID p2p.ID, height int64) {
pool.logger.Debug("reschedule requests made to peer for height ", "peerID", peerID, "height", height)
pool.requests[height] = true pool.requests[height] = true
delete(pool.blocks, height) delete(pool.blocks, height)
delete(pool.peers[peerID].blocks, height) delete(pool.peers[peerID].blocks, height)
@ -99,6 +101,7 @@ func (pool *blockPool) updateMaxPeerHeight() {
// Adds a new peer or updates an existing peer with a new height. // Adds a new peer or updates an existing peer with a new height.
// If the peer is too short it is removed. // If the peer is too short it is removed.
func (pool *blockPool) updatePeer(peerID p2p.ID, height int64) error { func (pool *blockPool) updatePeer(peerID p2p.ID, height int64) error {
pool.logger.Debug("updatePeer", "peerID", peerID, "height", height)
peer := pool.peers[peerID] peer := pool.peers[peerID]
if height < pool.height { if height < pool.height {
@ -157,6 +160,8 @@ func (pool *blockPool) deletePeer(peerID p2p.ID) {
// Removes any blocks and requests associated with the peer and deletes the peer. // Removes any blocks and requests associated with the peer and deletes the peer.
// Also triggers new requests if blocks have been removed. // Also triggers new requests if blocks have been removed.
func (pool *blockPool) removePeer(peerID p2p.ID, err error) { func (pool *blockPool) removePeer(peerID p2p.ID, err error) {
pool.logger.Debug("removing peer", "peerID", peerID)
peer := pool.peers[peerID] peer := pool.peers[peerID]
if peer == nil { if peer == nil {
return return
@ -252,33 +257,17 @@ func (pool *blockPool) invalidateFirstTwoBlocks(err error) {
} }
func (pool *blockPool) processedCurrentHeightBlock() { func (pool *blockPool) processedCurrentHeightBlock() {
peerID := pool.blocks[pool.height] peerID, peerOk := pool.blocks[pool.height]
if peerOk {
delete(pool.peers[peerID].blocks, pool.height) delete(pool.peers[peerID].blocks, pool.height)
}
delete(pool.blocks, pool.height) delete(pool.blocks, pool.height)
pool.logger.Debug("processed and removed block at height", "height", pool.height)
pool.height++ pool.height++
pool.removeShortPeers() pool.removeShortPeers()
} }
func (pool *blockPool) makeRequestBatch() []int { func (pool *blockPool) removeBadPeers() {
pool.removeUselessPeers()
// If running low on planned requests, make more.
for height := pool.nextRequestHeight; pool.numPending < int32(maxNumPendingRequests); height++ {
if pool.nextRequestHeight > pool.maxPeerHeight {
break
}
pool.requests[height] = true
pool.nextRequestHeight++
}
heights := make([]int, 0, len(pool.requests))
for k := range pool.requests {
heights = append(heights, int(k))
}
sort.Ints(heights)
return heights
}
func (pool *blockPool) removeUselessPeers() {
pool.removeShortPeers() pool.removeShortPeers()
for _, peer := range pool.peers { for _, peer := range pool.peers {
if err := peer.isGood(); err != nil { if err := peer.isGood(); err != nil {
@ -290,22 +279,38 @@ func (pool *blockPool) removeUselessPeers() {
} }
} }
func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) (err error) { func (pool *blockPool) makeRequestBatch(maxNumPendingRequests int32) []int {
pool.removeUselessPeers() pool.removeBadPeers()
heights := pool.makeRequestBatch() // If running low on planned requests, make more.
numNeeded := int32(cmn.MinInt(int(maxNumPendingRequests), len(pool.peers)*int(maxRequestsPerPeer))) - pool.numPending
for int32(len(pool.requests)) < numNeeded {
if pool.nextRequestHeight > pool.maxPeerHeight {
break
}
pool.requests[pool.nextRequestHeight] = true
pool.nextRequestHeight++
}
heights := make([]int, 0, len(pool.requests))
for k := range pool.requests {
heights = append(heights, int(k))
}
sort.Ints(heights)
return heights
}
func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) {
heights := pool.makeRequestBatch(maxNumPendingRequests)
pool.logger.Debug("makeNextRequests will make following requests", "number", len(heights), "heights", heights)
for _, height := range heights { for _, height := range heights {
h := int64(height) h := int64(height)
if pool.numPending >= int32(len(pool.peers))*maxRequestsPerPeer { if err := pool.sendRequest(h); err != nil {
break // Errors from sendRequest() are handled by this function
} return
if err = pool.sendRequest(h); err == errNoPeerFoundForHeight {
break
} }
delete(pool.requests, h) delete(pool.requests, h)
} }
return err
} }
func (pool *blockPool) sendRequest(height int64) error { func (pool *blockPool) sendRequest(height int64) error {
@ -317,7 +322,10 @@ func (pool *blockPool) sendRequest(height int64) error {
continue continue
} }
pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height) pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height)
_ = pool.toBcR.sendBlockRequest(peer.id, height) if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest {
pool.removePeer(peer.id, err)
pool.toBcR.sendPeerError(err, peer.id)
}
pool.blocks[height] = peer.id pool.blocks[height] = peer.id
pool.numPending++ pool.numPending++

View File

@ -340,8 +340,7 @@ func TestBlockPoolSendRequestBatch(t *testing.T) {
pool := tt.pool pool := tt.pool
maxRequestsPerPeer = int32(tt.maxRequestsPerPeer) maxRequestsPerPeer = int32(tt.maxRequestsPerPeer)
err := pool.makeNextRequests(10) pool.makeNextRequests(10)
assert.Nil(t, err)
assert.Equal(t, tt.expNumPending, pool.numPending) assert.Equal(t, tt.expNumPending, pool.numPending)
assert.Equal(t, testResults.numRequestsSent, maxRequestsPerPeer*int32(len(pool.peers))) assert.Equal(t, testResults.numRequestsSent, maxRequestsPerPeer*int32(len(pool.peers)))

View File

@ -249,6 +249,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
event: blockResponseEv, event: blockResponseEv,
data: bReactorEventData{ data: bReactorEventData{
peerId: src.ID(), peerId: src.ID(),
height: msg.Block.Height,
block: msg.Block, block: msg.Block,
length: len(msgBytes), length: len(msgBytes),
}, },
@ -299,9 +300,13 @@ ForLoop:
case doSendCh <- struct{}{}: case doSendCh <- struct{}{}:
default: default:
} }
//continue ForLoop
case <-doSendCh: case <-doSendCh:
// Tell FSM to make more requests.
// The maxNumPendingRequests may be changed based on low/ high watermark thresholds for
// - the number of blocks received and waiting to be processed,
// - the number of blockResponse messages waiting in messagesForFSMCh, etc.
// Currently maxNumPendingRequests value is not changed.
msgData := bReactorMessageData{ msgData := bReactorMessageData{
event: makeRequestsEv, event: makeRequestsEv,
data: bReactorEventData{ data: bReactorEventData{
@ -334,7 +339,6 @@ ForLoop:
case doProcessCh <- struct{}{}: case doProcessCh <- struct{}{}:
default: default:
} }
//continue FOR_LOOP
case <-doProcessCh: case <-doProcessCh:
err := bcR.processBlocksFromPoolRoutine() err := bcR.processBlocksFromPoolRoutine()
@ -364,7 +368,6 @@ ForLoop:
"max_peer_height", bcR.fsm.pool.getMaxPeerHeight(), "blocks/s", lastRate) "max_peer_height", bcR.fsm.pool.getMaxPeerHeight(), "blocks/s", lastRate)
lastHundred = time.Now() lastHundred = time.Now()
} }
//continue ForLoop
case msg := <-bcR.messagesForFSMCh: case msg := <-bcR.messagesForFSMCh:
_ = sendMessageToFSMSync(bcR.fsm, msg) _ = sendMessageToFSMSync(bcR.fsm, msg)
@ -448,7 +451,6 @@ func (bcR *BlockchainReactor) sendStatusRequest() {
// BlockRequest sends `BlockRequest` height. // BlockRequest sends `BlockRequest` height.
func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error { func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
peer := bcR.Switch.Peers().Get(peerID) peer := bcR.Switch.Peers().Get(peerID)
if peer == nil { if peer == nil {
return errNilPeerForBlockRequest return errNilPeerForBlockRequest
} }

View File

@ -157,7 +157,7 @@ var (
errSendQueueFull = errors.New("block request not made, send-queue is full") errSendQueueFull = errors.New("block request not made, send-queue is full")
errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added") errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added")
errSlowPeer = errors.New("peer is not sending us data fast enough") errSlowPeer = errors.New("peer is not sending us data fast enough")
errNoPeerFoundForHeight = errors.New("could not found peer for block request") errNoPeerFoundForHeight = errors.New("could not find peer for block request")
) )
func init() { func init() {
@ -239,10 +239,8 @@ func init() {
// A block was received that was unsolicited, from unexpected peer, or that we already have it. // A block was received that was unsolicited, from unexpected peer, or that we already have it.
// Ignore block, remove peer and send error to switch. // Ignore block, remove peer and send error to switch.
fsm.pool.removePeer(data.peerId, err) fsm.pool.removePeer(data.peerId, err)
if err != nil {
fsm.toBcR.sendPeerError(err, data.peerId) fsm.toBcR.sendPeerError(err, data.peerId)
} }
}
return waitForBlock, err return waitForBlock, err
@ -255,6 +253,7 @@ func init() {
fsm.toBcR.sendPeerError(data.err, first.peer.id) fsm.toBcR.sendPeerError(data.err, first.peer.id)
fsm.logger.Error("send peer error for", "peer", second.peer.id) fsm.logger.Error("send peer error for", "peer", second.peer.id)
fsm.toBcR.sendPeerError(data.err, second.peer.id) fsm.toBcR.sendPeerError(data.err, second.peer.id)
// Remove the first two blocks. This will also remove the peers
fsm.pool.invalidateFirstTwoBlocks(data.err) fsm.pool.invalidateFirstTwoBlocks(data.err)
} else { } else {
fsm.pool.processedCurrentHeightBlock() fsm.pool.processedCurrentHeightBlock()
@ -272,8 +271,8 @@ func init() {
return waitForBlock, nil return waitForBlock, nil
case makeRequestsEv: case makeRequestsEv:
err := fsm.makeNextRequests(data.maxNumRequests) fsm.makeNextRequests(data.maxNumRequests)
return waitForBlock, err return waitForBlock, nil
case stopFSMEv: case stopFSMEv:
return finished, errNoErrorFinished return finished, errNoErrorFinished
@ -399,6 +398,6 @@ func (fsm *bReactorFSM) cleanup() {
// TODO // TODO
} }
func (fsm *bReactorFSM) makeNextRequests(maxNumPendingRequests int32) error { func (fsm *bReactorFSM) makeNextRequests(maxNumPendingRequests int32) {
return fsm.pool.makeNextRequests(maxNumPendingRequests) fsm.pool.makeNextRequests(maxNumPendingRequests)
} }

View File

@ -1,6 +1,7 @@
package blockchain_new package blockchain_new
import ( import (
"fmt"
"testing" "testing"
"time" "time"
@ -8,8 +9,19 @@ import (
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
) )
// reactor for FSM testing
type testReactor struct {
logger log.Logger
fsm *bReactorFSM
}
func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error {
return fsm.handle(&bReactorMessageData{event: ev, data: data})
}
var ( var (
failSendStatusRequest bool failSendStatusRequest bool
failSendBlockRequest bool failSendBlockRequest bool
@ -46,52 +58,168 @@ func resetTestValues() {
} }
type fsmStepTestValues struct { type fsmStepTestValues struct {
startingHeight int64
currentState string currentState string
event bReactorEvent event bReactorEvent
data bReactorEventData
errWanted error
expectedState string expectedState string
// input
failStatusReq bool failStatusReq bool
shouldSendStatusReq bool shouldSendStatusReq bool
failBlockReq bool failBlockReq bool
blockReqIncreased bool blockReqIncreased bool
blocksAdded []int64
data bReactorEventData
expectedLastBlockReq *lastBlockRequestT expectedLastBlockReq *lastBlockRequestT
} }
func newTestReactor() *testReactor { var (
unknown_StartFSMEv_WantWaitForPeer = fsmStepTestValues{
currentState: "unknown", event: startFSMEv,
expectedState: "waitForPeer",
shouldSendStatusReq: true}
waitForBlock_ProcessedBlockEv_WantWaitForBlock = fsmStepTestValues{
currentState: "waitForBlock", event: processedBlockEv,
data: bReactorEventData{err: nil},
expectedState: "waitForBlock",
}
waitForBlock_ProcessedBlockEv_Err_WantWaitForBlock = fsmStepTestValues{
currentState: "waitForBlock", event: processedBlockEv,
data: bReactorEventData{err: errBlockVerificationFailure},
expectedState: "waitForBlock",
errWanted: errBlockVerificationFailure,
}
waitForBlock_ProcessedBlockEv_WantFinished = fsmStepTestValues{
currentState: "waitForBlock", event: processedBlockEv,
data: bReactorEventData{err: nil},
expectedState: "finished",
}
)
func waitForBlock_MakeRequestEv_WantWaitForBlock(maxPendingRequests int32) fsmStepTestValues {
return fsmStepTestValues{
currentState: "waitForBlock", event: makeRequestsEv,
data: bReactorEventData{maxNumRequests: maxPendingRequests},
expectedState: "waitForBlock",
blockReqIncreased: true,
}
}
func waitForPeer_StatusEv_WantWaitForBlock(peerID p2p.ID, height int64) fsmStepTestValues {
return fsmStepTestValues{
currentState: "waitForPeer", event: statusResponseEv,
data: bReactorEventData{peerId: peerID, height: height},
expectedState: "waitForBlock"}
}
func waitForBlock_StatusEv_WantWaitForBlock(peerID p2p.ID, height int64) fsmStepTestValues {
return fsmStepTestValues{
currentState: "waitForBlock", event: statusResponseEv,
data: bReactorEventData{peerId: peerID, height: height},
expectedState: "waitForBlock"}
}
func waitForBlock_BlockRespEv_WantWaitForBlock(peerID p2p.ID, height int64, prevBlocks []int64) fsmStepTestValues {
txs := []types.Tx{types.Tx("foo"), types.Tx("bar")}
return fsmStepTestValues{
currentState: "waitForBlock", event: blockResponseEv,
data: bReactorEventData{
peerId: peerID,
height: height,
block: types.MakeBlock(int64(height), txs, nil, nil)},
expectedState: "waitForBlock",
blocksAdded: append(prevBlocks, height),
}
}
func newTestReactor(height int64) *testReactor {
testBcR := &testReactor{logger: log.TestingLogger()} testBcR := &testReactor{logger: log.TestingLogger()}
testBcR.fsm = NewFSM(1, testBcR) testBcR.fsm = NewFSM(height, testBcR)
testBcR.fsm.setLogger(testBcR.logger) testBcR.fsm.setLogger(testBcR.logger)
return testBcR return testBcR
} }
// WIP func fixBlockResponseEvStep(step *fsmStepTestValues, testBcR *testReactor) {
func TestFSMTransitionSequences(t *testing.T) { // There is no good way to know to which peer a block request was sent.
maxRequestsPerPeer = 2 // So before we simulate a block response we cheat and look where it is expected from.
fsmTransitionSequenceTests := [][]fsmStepTestValues{ if step.event == blockResponseEv {
height := step.data.height
peerID, ok := testBcR.fsm.pool.blocks[height]
if ok {
step.data.peerId = peerID
}
}
}
func TestFSMBasic(t *testing.T) {
tests := []struct {
name string
startingHeight int64
maxRequestsPerPeer int32
steps []fsmStepTestValues
}{
{ {
{currentState: "unknown", event: startFSMEv, shouldSendStatusReq: true, name: "one block, one peer",
expectedState: "waitForPeer"}, startingHeight: 1,
{currentState: "waitForPeer", event: statusResponseEv, maxRequestsPerPeer: 2,
data: bReactorEventData{peerId: "P1", height: 10}, steps: []fsmStepTestValues{
expectedState: "waitForBlock"}, // startFSMEv
{currentState: "waitForBlock", event: makeRequestsEv, unknown_StartFSMEv_WantWaitForPeer,
data: bReactorEventData{maxNumRequests: maxNumPendingRequests}, // statusResponseEv
blockReqIncreased: true, waitForPeer_StatusEv_WantWaitForBlock("P1", 2),
expectedState: "waitForBlock"}, // makeRequestEv
waitForBlock_MakeRequestEv_WantWaitForBlock(maxNumPendingRequests),
// blockResponseEv for height 1
waitForBlock_BlockRespEv_WantWaitForBlock("P1", 1, []int64{}),
// blockResponseEv for height 2
waitForBlock_BlockRespEv_WantWaitForBlock("P1", 2, []int64{1}),
// processedBlockEv
waitForBlock_ProcessedBlockEv_WantFinished,
},
},
{
name: "multi block, multi peer",
startingHeight: 1,
maxRequestsPerPeer: 2,
steps: []fsmStepTestValues{
// startFSMEv
unknown_StartFSMEv_WantWaitForPeer,
// statusResponseEv from P1
waitForPeer_StatusEv_WantWaitForBlock("P1", 4),
// statusResponseEv from P2
waitForBlock_StatusEv_WantWaitForBlock("P2", 4),
// makeRequestEv
waitForBlock_MakeRequestEv_WantWaitForBlock(maxNumPendingRequests),
// blockResponseEv for height 1
waitForBlock_BlockRespEv_WantWaitForBlock("P1", 1, []int64{}),
// blockResponseEv for height 2
waitForBlock_BlockRespEv_WantWaitForBlock("P1", 2, []int64{1}),
// blockResponseEv for height 3
waitForBlock_BlockRespEv_WantWaitForBlock("P2", 3, []int64{1, 2}),
// blockResponseEv for height 4
waitForBlock_BlockRespEv_WantWaitForBlock("P2", 4, []int64{1, 2, 3}),
// processedBlockEv
waitForBlock_ProcessedBlockEv_WantWaitForBlock,
waitForBlock_ProcessedBlockEv_WantWaitForBlock,
waitForBlock_ProcessedBlockEv_WantFinished,
},
}, },
} }
for _, tt := range fsmTransitionSequenceTests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor // Create test reactor
testBcR := newTestReactor() testBcR := newTestReactor(tt.startingHeight)
resetTestValues() resetTestValues()
for _, step := range tt { if tt.maxRequestsPerPeer != 0 {
maxRequestsPerPeer = tt.maxRequestsPerPeer
}
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name) assert.Equal(t, step.currentState, testBcR.fsm.state.name)
failSendStatusRequest = step.failStatusReq failSendStatusRequest = step.failStatusReq
failSendBlockRequest = step.failBlockReq failSendBlockRequest = step.failBlockReq
@ -99,7 +227,9 @@ func TestFSMTransitionSequences(t *testing.T) {
oldNumStatusRequests := numStatusRequests oldNumStatusRequests := numStatusRequests
oldNumBlockRequests := numBlockRequests oldNumBlockRequests := numBlockRequests
_ = sendEventToFSM(testBcR.fsm, step.event, step.data) fixBlockResponseEvStep(&step, testBcR)
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
if step.shouldSendStatusReq { if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
@ -108,44 +238,283 @@ func TestFSMTransitionSequences(t *testing.T) {
} }
if step.blockReqIncreased { if step.blockReqIncreased {
assert.Equal(t, oldNumBlockRequests+maxRequestsPerPeer, numBlockRequests) assert.True(t, oldNumBlockRequests < numBlockRequests)
} else { } else {
assert.Equal(t, oldNumBlockRequests, numBlockRequests) assert.Equal(t, oldNumBlockRequests, numBlockRequests)
} }
for _, height := range step.blocksAdded {
_, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(height)
assert.Nil(t, err)
}
assert.Equal(t, step.expectedState, testBcR.fsm.state.name) assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
} }
})
} }
} }
func TestReactorFSMBasic(t *testing.T) { func TestFSMBlockVerificationFailure(t *testing.T) {
maxRequestsPerPeer = 2 tests := []struct {
name string
startingHeight int64
maxRequestsPerPeer int32
steps []fsmStepTestValues
}{
{
name: "block verification failure",
startingHeight: 1,
maxRequestsPerPeer: 2,
steps: []fsmStepTestValues{
// startFSMEv
unknown_StartFSMEv_WantWaitForPeer,
// statusResponseEv from P1
waitForPeer_StatusEv_WantWaitForBlock("P1", 3),
// statusResponseEv from P2
waitForBlock_StatusEv_WantWaitForBlock("P2", 3),
// makeRequestEv
waitForBlock_MakeRequestEv_WantWaitForBlock(maxNumPendingRequests),
// blockResponseEv for height 1
waitForBlock_BlockRespEv_WantWaitForBlock("P1", 1, []int64{}),
// blockResponseEv for height 2
waitForBlock_BlockRespEv_WantWaitForBlock("P1", 2, []int64{1}),
// blockResponseEv for height 3
waitForBlock_BlockRespEv_WantWaitForBlock("P2", 3, []int64{1, 2}),
// processedBlockEv with Error
waitForBlock_ProcessedBlockEv_Err_WantWaitForBlock,
// makeRequestEv
waitForBlock_MakeRequestEv_WantWaitForBlock(maxNumPendingRequests),
// blockResponseEv for height 1
waitForBlock_BlockRespEv_WantWaitForBlock("P2", 1, []int64{3}),
// blockResponseEv for height 2
waitForBlock_BlockRespEv_WantWaitForBlock("P2", 2, []int64{1, 3}),
waitForBlock_ProcessedBlockEv_WantWaitForBlock,
waitForBlock_ProcessedBlockEv_WantFinished,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor // Create test reactor
testBcR := newTestReactor() testBcR := newTestReactor(tt.startingHeight)
resetTestValues() resetTestValues()
fsm := testBcR.fsm
if err := fsm.handle(&bReactorMessageData{event: startFSMEv}); err != nil { if tt.maxRequestsPerPeer != 0 {
maxRequestsPerPeer = tt.maxRequestsPerPeer
} }
// Check that FSM sends a status request message for _, step := range tt.steps {
assert.Equal(t, 1, numStatusRequests) assert.Equal(t, step.currentState, testBcR.fsm.state.name)
assert.Equal(t, waitForPeer.name, fsm.state.name) failSendStatusRequest = step.failStatusReq
failSendBlockRequest = step.failBlockReq
// Send a status response message to FSM oldNumStatusRequests := numStatusRequests
peerID := p2p.ID(cmn.RandStr(12)) oldNumBlockRequests := numBlockRequests
sendStatusResponse2(fsm, peerID, 10)
if err := fsm.handle(&bReactorMessageData{ var heightBefore int64
event: makeRequestsEv, if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure {
data: bReactorEventData{maxNumRequests: maxNumPendingRequests}}); err != nil { heightBefore = testBcR.fsm.pool.height
}
fixBlockResponseEvStep(&step, testBcR)
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
} else {
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
}
if step.blockReqIncreased {
assert.True(t, oldNumBlockRequests < numBlockRequests)
} else {
assert.Equal(t, oldNumBlockRequests, numBlockRequests)
}
for _, height := range step.blocksAdded {
_, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(height)
assert.Nil(t, err)
}
if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure {
heightAfter := testBcR.fsm.pool.height
assert.Equal(t, heightBefore, heightAfter)
firstAfter, err1 := testBcR.fsm.pool.getBlockAndPeerAtHeight(testBcR.fsm.pool.height)
secondAfter, err2 := testBcR.fsm.pool.getBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1)
assert.NotNil(t, err1)
assert.NotNil(t, err2)
assert.Nil(t, firstAfter.block)
assert.Nil(t, firstAfter.peer)
assert.Nil(t, secondAfter.block)
assert.Nil(t, secondAfter.peer)
}
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
}
})
}
}
const (
maxStartingHeightTest = 100
maxRequestsPerPeerTest = 40
maxTotalPendingRequestsTest = 600
maxNumPeersTest = 1000
maxNumBlocksInChainTest = 100000
)
type testFields struct {
name string
startingHeight int64
maxRequestsPerPeer int32
maxPendingRequests int32
steps []fsmStepTestValues
}
func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPeers int, randomPeerHeights bool,
maxRequestsPerPeer int32, maxPendingRequests int32) testFields {
// Generate numPeers peers with random or numBlocks heights according to the randomPeerHeights flag
peerHeights := make([]int64, numPeers)
for i := 0; i < numPeers; i++ {
if i == 0 {
peerHeights[0] = numBlocks
continue
}
if randomPeerHeights {
peerHeights[i] = int64(cmn.MaxInt(cmn.RandIntn(int(numBlocks)), int(startingHeight)+1))
} else {
peerHeights[i] = numBlocks
}
}
// Approximate the slice capacity to save time for appends.
testSteps := make([]fsmStepTestValues, 0, 3*numBlocks+int64(numPeers))
testName := fmt.Sprintf("%v-blocks %v-startingHeight %v-peers %v-maxRequestsPerPeer %v-maxNumPendingRequests",
numBlocks, startingHeight, numPeers, maxRequestsPerPeer, maxPendingRequests)
// Add startFSMEv step.
testSteps = append(testSteps, unknown_StartFSMEv_WantWaitForPeer)
// For each peer, add statusResponseEv step.
for i := 0; i < numPeers; i++ {
peerName := fmt.Sprintf("P%d", i)
if i == 0 {
testSteps = append(testSteps, waitForPeer_StatusEv_WantWaitForBlock(p2p.ID(peerName), peerHeights[i]))
} else {
testSteps = append(testSteps, waitForBlock_StatusEv_WantWaitForBlock(p2p.ID(peerName), peerHeights[i]))
}
}
height := startingHeight
numBlocksReceived := 0
prevBlocks := make([]int64, 0, maxPendingRequests)
forLoop:
for i := 0; i < int(numBlocks); i++ {
// Add the makeRequestEv step periodically
if i%int(maxRequestsPerPeer) == 0 {
testSteps = append(testSteps, waitForBlock_MakeRequestEv_WantWaitForBlock(maxPendingRequests))
}
// Add the blockRespEv step
testSteps = append(testSteps, waitForBlock_BlockRespEv_WantWaitForBlock("P0", height, prevBlocks))
prevBlocks = append(prevBlocks, height)
height++
numBlocksReceived++
// Add the processedBlockEv step periodically
if numBlocksReceived >= int(maxRequestsPerPeer) || height >= numBlocks {
for j := int(height) - numBlocksReceived; j < int(height); j++ {
if j >= int(numBlocks)-1 {
// This is the last block that is processed, we should be in "finished" state.
testSteps = append(testSteps, waitForBlock_ProcessedBlockEv_WantFinished)
break forLoop
}
testSteps = append(testSteps, waitForBlock_ProcessedBlockEv_WantWaitForBlock)
}
numBlocksReceived = 0
prevBlocks = make([]int64, 0, maxPendingRequests)
}
}
return testFields{
name: testName,
startingHeight: startingHeight,
maxRequestsPerPeer: maxRequestsPerPeer,
steps: testSteps,
}
}
func makeCorrectTransitionSequenceWithRandomParameters() testFields {
// generate a starting height for fast sync
startingHeight := int64(cmn.RandIntn(maxStartingHeightTest) + 1)
// generate the number of requests per peer
maxRequestsPerPeer := int32(cmn.RandIntn(maxRequestsPerPeerTest) + 1)
// generate the maximum number of total pending requests
maxPendingRequests := int32(cmn.RandIntn(maxTotalPendingRequestsTest) + 1)
// generate the number of blocks to be synced
numBlocks := int64(cmn.RandIntn(maxNumBlocksInChainTest)) + startingHeight
// generate a number of peers and their heights
numPeers := cmn.RandIntn(maxNumPeersTest) + 1
return makeCorrectTransitionSequence(startingHeight, numBlocks, numPeers, true, maxRequestsPerPeer, maxPendingRequests)
}
func TestFSMCorrectTransitionSequences(t *testing.T) {
tests := []testFields{
makeCorrectTransitionSequence(1, 100, 10, true, 10, 40),
makeCorrectTransitionSequenceWithRandomParameters(),
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
resetTestValues()
if tt.maxRequestsPerPeer != 0 {
maxRequestsPerPeer = tt.maxRequestsPerPeer
}
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name)
failSendStatusRequest = step.failStatusReq
failSendBlockRequest = step.failBlockReq
oldNumStatusRequests := numStatusRequests
fixBlockResponseEvStep(&step, testBcR)
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
} else {
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
}
for _, height := range step.blocksAdded {
_, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(height)
assert.Nil(t, err)
}
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
}
})
} }
// Check that FSM sends a block request message and...
assert.Equal(t, maxRequestsPerPeer, numBlockRequests)
// ... the block request has the expected height
assert.Equal(t, maxRequestsPerPeer, int32(len(fsm.pool.peers[peerID].blocks)))
assert.Equal(t, waitForBlock.name, fsm.state.name)
} }
func TestReactorFSMPeerTimeout(t *testing.T) { func TestReactorFSMPeerTimeout(t *testing.T) {
@ -153,7 +522,7 @@ func TestReactorFSMPeerTimeout(t *testing.T) {
resetTestValues() resetTestValues()
peerTimeout = 20 * time.Millisecond peerTimeout = 20 * time.Millisecond
// Create and start the FSM // Create and start the FSM
testBcR := newTestReactor() testBcR := newTestReactor(1)
fsm := testBcR.fsm fsm := testBcR.fsm
fsm.start() fsm.start()
@ -181,17 +550,6 @@ func TestReactorFSMPeerTimeout(t *testing.T) {
} }
// reactor for FSM testing
type testReactor struct {
logger log.Logger
fsm *bReactorFSM
testCh chan bReactorMessageData
}
func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error {
return fsm.handle(&bReactorMessageData{event: ev, data: data})
}
// ---------------------------------------- // ----------------------------------------
// implementation for the test reactor APIs // implementation for the test reactor APIs