This commit is contained in:
Anca Zamfir
2019-04-15 23:26:07 -04:00
parent 0541ecb243
commit bb36184aa2
6 changed files with 120 additions and 140 deletions

View File

@ -186,6 +186,78 @@ func (pool *blockPool) removeShortPeers() {
} }
} }
func (pool *blockPool) removeBadPeers() {
pool.removeShortPeers()
for _, peer := range pool.peers {
if err := peer.isGood(); err != nil {
pool.removePeer(peer.id, err)
if err == errSlowPeer {
peer.errFunc(errSlowPeer, peer.id)
}
}
}
}
func (pool *blockPool) makeRequestBatch(maxNumPendingRequests int32) []int {
pool.removeBadPeers()
// If running low on planned requests, make more.
numNeeded := int32(cmn.MinInt(int(maxNumPendingRequests), len(pool.peers)*int(maxRequestsPerPeer))) - pool.numPending
for int32(len(pool.requests)) < numNeeded {
if pool.nextRequestHeight > pool.maxPeerHeight {
break
}
pool.requests[pool.nextRequestHeight] = true
pool.nextRequestHeight++
}
heights := make([]int, 0, len(pool.requests))
for k := range pool.requests {
heights = append(heights, int(k))
}
sort.Ints(heights)
return heights
}
func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) {
heights := pool.makeRequestBatch(maxNumPendingRequests)
pool.logger.Debug("makeNextRequests will make following requests", "number", len(heights), "heights", heights)
for _, height := range heights {
h := int64(height)
if err := pool.sendRequest(h); err != nil {
// Errors from sendRequest() are handled by this function
return
}
delete(pool.requests, h)
}
}
func (pool *blockPool) sendRequest(height int64) error {
for _, peer := range pool.peers {
if peer.numPending >= int32(maxRequestsPerPeer) {
continue
}
if peer.height < height {
continue
}
pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height)
if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest {
pool.removePeer(peer.id, err)
pool.toBcR.sendPeerError(err, peer.id)
}
pool.blocks[height] = peer.id
pool.numPending++
peer.blocks[height] = nil
peer.incrPending()
return nil
}
pool.logger.Error("could not find peer to send request for block at height", "height", height)
return errNoPeerFoundForHeight
}
// Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map. // Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map.
func (pool *blockPool) addBlock(peerID p2p.ID, block *types.Block, blockSize int) error { func (pool *blockPool) addBlock(peerID p2p.ID, block *types.Block, blockSize int) error {
if _, ok := pool.peers[peerID]; !ok { if _, ok := pool.peers[peerID]; !ok {
@ -267,74 +339,10 @@ func (pool *blockPool) processedCurrentHeightBlock() {
pool.removeShortPeers() pool.removeShortPeers()
} }
func (pool *blockPool) removeBadPeers() { func (pool *blockPool) cleanup() {
pool.removeShortPeers()
for _, peer := range pool.peers { for _, peer := range pool.peers {
if err := peer.isGood(); err != nil { if peer.timeout != nil {
pool.removePeer(peer.id, err) peer.timeout.Stop()
if err == errSlowPeer {
peer.errFunc(errSlowPeer, peer.id)
}
} }
} }
} }
func (pool *blockPool) makeRequestBatch(maxNumPendingRequests int32) []int {
pool.removeBadPeers()
// If running low on planned requests, make more.
numNeeded := int32(cmn.MinInt(int(maxNumPendingRequests), len(pool.peers)*int(maxRequestsPerPeer))) - pool.numPending
for int32(len(pool.requests)) < numNeeded {
if pool.nextRequestHeight > pool.maxPeerHeight {
break
}
pool.requests[pool.nextRequestHeight] = true
pool.nextRequestHeight++
}
heights := make([]int, 0, len(pool.requests))
for k := range pool.requests {
heights = append(heights, int(k))
}
sort.Ints(heights)
return heights
}
func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) {
heights := pool.makeRequestBatch(maxNumPendingRequests)
pool.logger.Debug("makeNextRequests will make following requests", "number", len(heights), "heights", heights)
for _, height := range heights {
h := int64(height)
if err := pool.sendRequest(h); err != nil {
// Errors from sendRequest() are handled by this function
return
}
delete(pool.requests, h)
}
}
func (pool *blockPool) sendRequest(height int64) error {
for _, peer := range pool.peers {
if peer.numPending >= int32(maxRequestsPerPeer) {
continue
}
if peer.height < height {
continue
}
pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height)
if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest {
pool.removePeer(peer.id, err)
pool.toBcR.sendPeerError(err, peer.id)
}
pool.blocks[height] = peer.id
pool.numPending++
peer.blocks[height] = nil
peer.incrPending()
return nil
}
pool.logger.Error("could not find peer to send request for block at height", "height", height)
return errNoPeerFoundForHeight
}

View File

@ -39,8 +39,8 @@ const (
) )
var ( var (
maxRequestsPerPeer int32 = 20 maxRequestsPerPeer int32 = 40
maxNumPendingRequests int32 = 600 maxNumPendingRequests int32 = 500
) )
type consensusReactor interface { type consensusReactor interface {
@ -402,7 +402,7 @@ func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID)
// - pool calls when it detects slow peer or when peer times out // - pool calls when it detects slow peer or when peer times out
// - FSM calls when: // - FSM calls when:
// - processing a block (addBlock) fails // - processing a block (addBlock) fails
// - BCR process of block reports failure to FSM, FSM sends back the peers of first and second // - BCR processing of a block reports failure to FSM, FSM sends back the peers of first and second blocks
func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
bcR.errorsFromFSMCh <- peerError{err, peerID} bcR.errorsFromFSMCh <- peerError{err, peerID}
} }

View File

@ -376,10 +376,10 @@ func (fsm *bReactorFSM) isCaughtUp() bool {
return isCaughtUp return isCaughtUp
} }
func (fsm *bReactorFSM) cleanup() {
// TODO
}
func (fsm *bReactorFSM) makeNextRequests(maxNumPendingRequests int32) { func (fsm *bReactorFSM) makeNextRequests(maxNumPendingRequests int32) {
fsm.pool.makeNextRequests(maxNumPendingRequests) fsm.pool.makeNextRequests(maxNumPendingRequests)
} }
func (fsm *bReactorFSM) cleanup() {
fsm.pool.cleanup()
}

View File

@ -26,10 +26,9 @@ func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData)
var ( var (
testFSMmtx sync.Mutex testFSMmtx sync.Mutex
failSendStatusRequest bool failSendBlockRequest bool
failSendBlockRequest bool numStatusRequests int
numStatusRequests int numBlockRequests int32
numBlockRequests int32
) )
type lastBlockRequestT struct { type lastBlockRequestT struct {
@ -51,7 +50,6 @@ var stateTimerStarts map[string]int
func resetTestValues() { func resetTestValues() {
stateTimerStarts = make(map[string]int) stateTimerStarts = make(map[string]int)
failSendBlockRequest = false failSendBlockRequest = false
failSendStatusRequest = false
numStatusRequests = 0 numStatusRequests = 0
numBlockRequests = 0 numBlockRequests = 0
lastBlockRequest.peerID = "" lastBlockRequest.peerID = ""
@ -68,13 +66,11 @@ type fsmStepTestValues struct {
errWanted error errWanted error
expectedState string expectedState string
failStatusReq bool
shouldSendStatusReq bool shouldSendStatusReq bool
failBlockReq bool
failBlockReq bool blockReqIncreased bool
blockReqIncreased bool blocksAdded []int64
blocksAdded []int64 peersNotInPool []p2p.ID
peersNotInPool []p2p.ID
expectedLastBlockReq *lastBlockRequestT expectedLastBlockReq *lastBlockRequestT
} }
@ -170,6 +166,20 @@ func fixBlockResponseEvStep(step *fsmStepTestValues, testBcR *testReactor) {
} }
} }
func shouldApplyProcessedBlockEvStep(step *fsmStepTestValues, testBcR *testReactor) bool {
if step.event == processedBlockEv {
_, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(testBcR.fsm.pool.height)
if err == errMissingBlocks {
return false
}
_, err = testBcR.fsm.pool.getBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1)
if err == errMissingBlocks {
return false
}
}
return true
}
func TestFSMBasic(t *testing.T) { func TestFSMBasic(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -237,8 +247,6 @@ func TestFSMBasic(t *testing.T) {
for _, step := range tt.steps { for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name) assert.Equal(t, step.currentState, testBcR.fsm.state.name)
failSendStatusRequest = step.failStatusReq
failSendBlockRequest = step.failBlockReq
oldNumStatusRequests := numStatusRequests oldNumStatusRequests := numStatusRequests
oldNumBlockRequests := numBlockRequests oldNumBlockRequests := numBlockRequests
@ -329,8 +337,6 @@ func TestFSMBlockVerificationFailure(t *testing.T) {
for _, step := range tt.steps { for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name) assert.Equal(t, step.currentState, testBcR.fsm.state.name)
failSendStatusRequest = step.failStatusReq
failSendBlockRequest = step.failBlockReq
oldNumStatusRequests := numStatusRequests oldNumStatusRequests := numStatusRequests
oldNumBlockRequests := numBlockRequests oldNumBlockRequests := numBlockRequests
@ -418,8 +424,6 @@ func TestFSMPeerRemoveEvent(t *testing.T) {
for _, step := range tt.steps { for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name) assert.Equal(t, step.currentState, testBcR.fsm.state.name)
failSendStatusRequest = step.failStatusReq
failSendBlockRequest = step.failBlockReq
oldNumStatusRequests := numStatusRequests oldNumStatusRequests := numStatusRequests
@ -446,8 +450,8 @@ const (
maxStartingHeightTest = 100 maxStartingHeightTest = 100
maxRequestsPerPeerTest = 40 maxRequestsPerPeerTest = 40
maxTotalPendingRequestsTest = 600 maxTotalPendingRequestsTest = 600
maxNumPeersTest = 1000 maxNumPeersTest = 500
maxNumBlocksInChainTest = 50000 maxNumBlocksInChainTest = 10000
) )
type testFields struct { type testFields struct {
@ -461,7 +465,7 @@ type testFields struct {
func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPeers int, randomPeerHeights bool, func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPeers int, randomPeerHeights bool,
maxRequestsPerPeer int32, maxPendingRequests int32) testFields { maxRequestsPerPeer int32, maxPendingRequests int32) testFields {
// Generate numPeers peers with random or numBlocks heights according to the randomPeerHeights flag // Generate numPeers peers with random or numBlocks heights according to the randomPeerHeights flag.
peerHeights := make([]int64, numPeers) peerHeights := make([]int64, numPeers)
for i := 0; i < numPeers; i++ { for i := 0; i < numPeers; i++ {
if i == 0 { if i == 0 {
@ -501,7 +505,7 @@ func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPee
forLoop: forLoop:
for i := 0; i < int(numBlocks); i++ { for i := 0; i < int(numBlocks); i++ {
// Add the makeRequestEv step periodically // Add the makeRequestEv step periodically.
if i%int(maxRequestsPerPeer) == 0 { if i%int(maxRequestsPerPeer) == 0 {
testSteps = append(testSteps, waitForBlock_MakeRequestEv_WantWaitForBlock(maxPendingRequests)) testSteps = append(testSteps, waitForBlock_MakeRequestEv_WantWaitForBlock(maxPendingRequests))
} }
@ -512,10 +516,10 @@ forLoop:
height++ height++
numBlocksReceived++ numBlocksReceived++
// Add the processedBlockEv step periodically // Add the processedBlockEv step periodically.
if numBlocksReceived >= int(maxRequestsPerPeer) || height >= numBlocks { if numBlocksReceived >= int(maxRequestsPerPeer) || height >= numBlocks {
for j := int(height) - numBlocksReceived; j < int(height); j++ { for j := int(height) - numBlocksReceived; j < int(height); j++ {
if j >= int(numBlocks)-1 { if j >= int(numBlocks) {
// This is the last block that is processed, we should be in "finished" state. // This is the last block that is processed, we should be in "finished" state.
testSteps = append(testSteps, waitForBlock_ProcessedBlockEv_WantFinished) testSteps = append(testSteps, waitForBlock_ProcessedBlockEv_WantFinished)
break forLoop break forLoop
@ -531,24 +535,25 @@ forLoop:
name: testName, name: testName,
startingHeight: startingHeight, startingHeight: startingHeight,
maxRequestsPerPeer: maxRequestsPerPeer, maxRequestsPerPeer: maxRequestsPerPeer,
maxPendingRequests: maxPendingRequests,
steps: testSteps, steps: testSteps,
} }
} }
func makeCorrectTransitionSequenceWithRandomParameters() testFields { func makeCorrectTransitionSequenceWithRandomParameters() testFields {
// generate a starting height for fast sync // Generate a starting height for fast sync.
startingHeight := int64(cmn.RandIntn(maxStartingHeightTest) + 1) startingHeight := int64(cmn.RandIntn(maxStartingHeightTest) + 1)
// generate the number of requests per peer // Generate the number of requests per peer.
maxRequestsPerPeer := int32(cmn.RandIntn(maxRequestsPerPeerTest) + 1) maxRequestsPerPeer := int32(cmn.RandIntn(maxRequestsPerPeerTest) + 1)
// generate the maximum number of total pending requests // Generate the maximum number of total pending requests, >= maxRequestsPerPeer.
maxPendingRequests := int32(cmn.RandIntn(maxTotalPendingRequestsTest-int(maxRequestsPerPeer))) + maxRequestsPerPeer maxPendingRequests := int32(cmn.RandIntn(maxTotalPendingRequestsTest-int(maxRequestsPerPeer))) + maxRequestsPerPeer
// generate the number of blocks to be synced // Generate the number of blocks to be synced.
numBlocks := int64(cmn.RandIntn(maxNumBlocksInChainTest)) + startingHeight numBlocks := int64(cmn.RandIntn(maxNumBlocksInChainTest)) + startingHeight
// generate a number of peers and their heights // Generate a number of peers.
numPeers := cmn.RandIntn(maxNumPeersTest) + 1 numPeers := cmn.RandIntn(maxNumPeersTest) + 1
return makeCorrectTransitionSequence(startingHeight, numBlocks, numPeers, true, maxRequestsPerPeer, maxPendingRequests) return makeCorrectTransitionSequence(startingHeight, numBlocks, numPeers, true, maxRequestsPerPeer, maxPendingRequests)
@ -573,11 +578,12 @@ func TestFSMCorrectTransitionSequences(t *testing.T) {
for _, step := range tt.steps { for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name) assert.Equal(t, step.currentState, testBcR.fsm.state.name)
failSendStatusRequest = step.failStatusReq
failSendBlockRequest = step.failBlockReq
oldNumStatusRequests := numStatusRequests oldNumStatusRequests := numStatusRequests
fixBlockResponseEvStep(&step, testBcR) fixBlockResponseEvStep(&step, testBcR)
if !shouldApplyProcessedBlockEvStep(&step, testBcR) {
continue
}
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr) assert.Equal(t, step.errWanted, fsmErr)
@ -631,6 +637,7 @@ func TestFSMPeerTimeout(t *testing.T) {
assert.Equal(t, errNoPeerResponse, lastPeerError.err) assert.Equal(t, errNoPeerResponse, lastPeerError.err)
peerTimeout = 15 * time.Second peerTimeout = 15 * time.Second
maxRequestsPerPeer = 40
} }
@ -685,29 +692,6 @@ func sendStatusResponse(fsm *bReactorFSM, peerID p2p.ID, height int64) {
_ = sendMessageToFSMSync(fsm, msgData) _ = sendMessageToFSMSync(fsm, msgData)
} }
func sendStatusResponse2(fsm *bReactorFSM, peerID p2p.ID, height int64) {
msgBytes := makeStatusResponseMessage(height)
msgData := &bReactorMessageData{
event: statusResponseEv,
data: bReactorEventData{
peerId: peerID,
height: height,
length: len(msgBytes),
},
}
_ = fsm.handle(msgData)
}
func sendStateTimeout(fsm *bReactorFSM, name string) {
msgData := &bReactorMessageData{
event: stateTimeoutEv,
data: bReactorEventData{
stateName: name,
},
}
_ = fsm.handle(msgData)
}
// ------------------------------------------------------- // -------------------------------------------------------
// ---------------------------------------------------- // ----------------------------------------------------

View File

@ -142,10 +142,6 @@ func (conR *consensusReactorTest) SwitchToConsensus(state sm.State, blocksSynced
func TestFastSyncNoBlockResponse(t *testing.T) { func TestFastSyncNoBlockResponse(t *testing.T) {
peerTimeout = 15 * time.Second
maxRequestsPerPeer = 20
maxNumPendingRequests = 100
config = cfg.ResetTestRoot("blockchain_new_reactor_test") config = cfg.ResetTestRoot("blockchain_new_reactor_test")
defer os.RemoveAll(config.RootDir) defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30) genDoc, privVals := randGenesisDoc(1, false, 30)
@ -213,9 +209,6 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
// Alternatively we could actually dial a TCP conn but // Alternatively we could actually dial a TCP conn but
// that seems extreme. // that seems extreme.
func TestFastSyncBadBlockStopsPeer(t *testing.T) { func TestFastSyncBadBlockStopsPeer(t *testing.T) {
maxRequestsPerPeer = 20
maxNumPendingRequests = 400
numNodes := 4 numNodes := 4
maxBlockHeight := int64(148) maxBlockHeight := int64(148)
@ -349,16 +342,11 @@ func setupReactors(
// WIP - used for some scale testing, will remove // WIP - used for some scale testing, will remove
func TestFastSyncMultiNode(t *testing.T) { func TestFastSyncMultiNode(t *testing.T) {
peerTimeout = 15 * time.Second
numNodes := 8 numNodes := 8
maxHeight := int64(1000) maxHeight := int64(1000)
//numNodes := 20 //numNodes := 20
//maxHeight := int64(10000) //maxHeight := int64(10000)
maxRequestsPerPeer = 40
maxNumPendingRequests = 500
config = cfg.ResetTestRoot("blockchain_reactor_test") config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30) genDoc, privVals := randGenesisDoc(1, false, 30)

View File

@ -315,6 +315,7 @@ func TestFastSyncMultiNode(t *testing.T) {
outerFor: outerFor:
for { for {
time.Sleep(1 * time.Second)
i := 0 i := 0
for i < numNodes { for i < numNodes {
if !reactorPairs[i].reactor.pool.IsCaughtUp() { if !reactorPairs[i].reactor.pool.IsCaughtUp() {
@ -326,7 +327,6 @@ outerFor:
fmt.Println("SETUP FAST SYNC Duration", time.Since(start)) fmt.Println("SETUP FAST SYNC Duration", time.Since(start))
break outerFor break outerFor
} else { } else {
time.Sleep(1 * time.Second)
} }
} }