From bb36184aa2450532070d1e2e0239cdfdfd838ab6 Mon Sep 17 00:00:00 2001 From: Anca Zamfir Date: Mon, 15 Apr 2019 23:26:07 -0400 Subject: [PATCH] cleanup --- blockchain/pool.go | 142 +++++++++++++++++---------------- blockchain/reactor.go | 6 +- blockchain/reactor_fsm.go | 8 +- blockchain/reactor_fsm_test.go | 90 +++++++++------------ blockchain/reactor_test.go | 12 --- blockchain_old/reactor_test.go | 2 +- 6 files changed, 120 insertions(+), 140 deletions(-) diff --git a/blockchain/pool.go b/blockchain/pool.go index afe7422f..7a8411c9 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -186,6 +186,78 @@ func (pool *blockPool) removeShortPeers() { } } +func (pool *blockPool) removeBadPeers() { + pool.removeShortPeers() + for _, peer := range pool.peers { + if err := peer.isGood(); err != nil { + pool.removePeer(peer.id, err) + if err == errSlowPeer { + peer.errFunc(errSlowPeer, peer.id) + } + } + } +} + +func (pool *blockPool) makeRequestBatch(maxNumPendingRequests int32) []int { + pool.removeBadPeers() + // If running low on planned requests, make more. + numNeeded := int32(cmn.MinInt(int(maxNumPendingRequests), len(pool.peers)*int(maxRequestsPerPeer))) - pool.numPending + for int32(len(pool.requests)) < numNeeded { + if pool.nextRequestHeight > pool.maxPeerHeight { + break + } + pool.requests[pool.nextRequestHeight] = true + pool.nextRequestHeight++ + } + + heights := make([]int, 0, len(pool.requests)) + for k := range pool.requests { + heights = append(heights, int(k)) + } + sort.Ints(heights) + return heights +} + +func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) { + heights := pool.makeRequestBatch(maxNumPendingRequests) + pool.logger.Debug("makeNextRequests will make following requests", "number", len(heights), "heights", heights) + + for _, height := range heights { + h := int64(height) + if err := pool.sendRequest(h); err != nil { + // Errors from sendRequest() are handled by this function + return + } + delete(pool.requests, h) + } +} + +func (pool *blockPool) sendRequest(height int64) error { + for _, peer := range pool.peers { + if peer.numPending >= int32(maxRequestsPerPeer) { + continue + } + if peer.height < height { + continue + } + pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height) + if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest { + pool.removePeer(peer.id, err) + pool.toBcR.sendPeerError(err, peer.id) + } + + pool.blocks[height] = peer.id + pool.numPending++ + + peer.blocks[height] = nil + peer.incrPending() + + return nil + } + pool.logger.Error("could not find peer to send request for block at height", "height", height) + return errNoPeerFoundForHeight +} + // Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map. func (pool *blockPool) addBlock(peerID p2p.ID, block *types.Block, blockSize int) error { if _, ok := pool.peers[peerID]; !ok { @@ -267,74 +339,10 @@ func (pool *blockPool) processedCurrentHeightBlock() { pool.removeShortPeers() } -func (pool *blockPool) removeBadPeers() { - pool.removeShortPeers() +func (pool *blockPool) cleanup() { for _, peer := range pool.peers { - if err := peer.isGood(); err != nil { - pool.removePeer(peer.id, err) - if err == errSlowPeer { - peer.errFunc(errSlowPeer, peer.id) - } + if peer.timeout != nil { + peer.timeout.Stop() } } } - -func (pool *blockPool) makeRequestBatch(maxNumPendingRequests int32) []int { - pool.removeBadPeers() - // If running low on planned requests, make more. - numNeeded := int32(cmn.MinInt(int(maxNumPendingRequests), len(pool.peers)*int(maxRequestsPerPeer))) - pool.numPending - for int32(len(pool.requests)) < numNeeded { - if pool.nextRequestHeight > pool.maxPeerHeight { - break - } - pool.requests[pool.nextRequestHeight] = true - pool.nextRequestHeight++ - } - - heights := make([]int, 0, len(pool.requests)) - for k := range pool.requests { - heights = append(heights, int(k)) - } - sort.Ints(heights) - return heights -} - -func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) { - heights := pool.makeRequestBatch(maxNumPendingRequests) - pool.logger.Debug("makeNextRequests will make following requests", "number", len(heights), "heights", heights) - - for _, height := range heights { - h := int64(height) - if err := pool.sendRequest(h); err != nil { - // Errors from sendRequest() are handled by this function - return - } - delete(pool.requests, h) - } -} - -func (pool *blockPool) sendRequest(height int64) error { - for _, peer := range pool.peers { - if peer.numPending >= int32(maxRequestsPerPeer) { - continue - } - if peer.height < height { - continue - } - pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height) - if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest { - pool.removePeer(peer.id, err) - pool.toBcR.sendPeerError(err, peer.id) - } - - pool.blocks[height] = peer.id - pool.numPending++ - - peer.blocks[height] = nil - peer.incrPending() - - return nil - } - pool.logger.Error("could not find peer to send request for block at height", "height", height) - return errNoPeerFoundForHeight -} diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 2136589d..90ad5ff2 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -39,8 +39,8 @@ const ( ) var ( - maxRequestsPerPeer int32 = 20 - maxNumPendingRequests int32 = 600 + maxRequestsPerPeer int32 = 40 + maxNumPendingRequests int32 = 500 ) type consensusReactor interface { @@ -402,7 +402,7 @@ func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID) // - pool calls when it detects slow peer or when peer times out // - FSM calls when: // - processing a block (addBlock) fails -// - BCR process of block reports failure to FSM, FSM sends back the peers of first and second +// - BCR processing of a block reports failure to FSM, FSM sends back the peers of first and second blocks func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { bcR.errorsFromFSMCh <- peerError{err, peerID} } diff --git a/blockchain/reactor_fsm.go b/blockchain/reactor_fsm.go index 0eaf6153..0bb9600e 100644 --- a/blockchain/reactor_fsm.go +++ b/blockchain/reactor_fsm.go @@ -376,10 +376,10 @@ func (fsm *bReactorFSM) isCaughtUp() bool { return isCaughtUp } -func (fsm *bReactorFSM) cleanup() { - // TODO -} - func (fsm *bReactorFSM) makeNextRequests(maxNumPendingRequests int32) { fsm.pool.makeNextRequests(maxNumPendingRequests) } + +func (fsm *bReactorFSM) cleanup() { + fsm.pool.cleanup() +} diff --git a/blockchain/reactor_fsm_test.go b/blockchain/reactor_fsm_test.go index d3b27c70..87747942 100644 --- a/blockchain/reactor_fsm_test.go +++ b/blockchain/reactor_fsm_test.go @@ -26,10 +26,9 @@ func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) var ( testFSMmtx sync.Mutex - failSendStatusRequest bool - failSendBlockRequest bool - numStatusRequests int - numBlockRequests int32 + failSendBlockRequest bool + numStatusRequests int + numBlockRequests int32 ) type lastBlockRequestT struct { @@ -51,7 +50,6 @@ var stateTimerStarts map[string]int func resetTestValues() { stateTimerStarts = make(map[string]int) failSendBlockRequest = false - failSendStatusRequest = false numStatusRequests = 0 numBlockRequests = 0 lastBlockRequest.peerID = "" @@ -68,13 +66,11 @@ type fsmStepTestValues struct { errWanted error expectedState string - failStatusReq bool shouldSendStatusReq bool - - failBlockReq bool - blockReqIncreased bool - blocksAdded []int64 - peersNotInPool []p2p.ID + failBlockReq bool + blockReqIncreased bool + blocksAdded []int64 + peersNotInPool []p2p.ID expectedLastBlockReq *lastBlockRequestT } @@ -170,6 +166,20 @@ func fixBlockResponseEvStep(step *fsmStepTestValues, testBcR *testReactor) { } } +func shouldApplyProcessedBlockEvStep(step *fsmStepTestValues, testBcR *testReactor) bool { + if step.event == processedBlockEv { + _, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(testBcR.fsm.pool.height) + if err == errMissingBlocks { + return false + } + _, err = testBcR.fsm.pool.getBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1) + if err == errMissingBlocks { + return false + } + } + return true +} + func TestFSMBasic(t *testing.T) { tests := []struct { name string @@ -237,8 +247,6 @@ func TestFSMBasic(t *testing.T) { for _, step := range tt.steps { assert.Equal(t, step.currentState, testBcR.fsm.state.name) - failSendStatusRequest = step.failStatusReq - failSendBlockRequest = step.failBlockReq oldNumStatusRequests := numStatusRequests oldNumBlockRequests := numBlockRequests @@ -329,8 +337,6 @@ func TestFSMBlockVerificationFailure(t *testing.T) { for _, step := range tt.steps { assert.Equal(t, step.currentState, testBcR.fsm.state.name) - failSendStatusRequest = step.failStatusReq - failSendBlockRequest = step.failBlockReq oldNumStatusRequests := numStatusRequests oldNumBlockRequests := numBlockRequests @@ -418,8 +424,6 @@ func TestFSMPeerRemoveEvent(t *testing.T) { for _, step := range tt.steps { assert.Equal(t, step.currentState, testBcR.fsm.state.name) - failSendStatusRequest = step.failStatusReq - failSendBlockRequest = step.failBlockReq oldNumStatusRequests := numStatusRequests @@ -446,8 +450,8 @@ const ( maxStartingHeightTest = 100 maxRequestsPerPeerTest = 40 maxTotalPendingRequestsTest = 600 - maxNumPeersTest = 1000 - maxNumBlocksInChainTest = 50000 + maxNumPeersTest = 500 + maxNumBlocksInChainTest = 10000 ) type testFields struct { @@ -461,7 +465,7 @@ type testFields struct { func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPeers int, randomPeerHeights bool, maxRequestsPerPeer int32, maxPendingRequests int32) testFields { - // Generate numPeers peers with random or numBlocks heights according to the randomPeerHeights flag + // Generate numPeers peers with random or numBlocks heights according to the randomPeerHeights flag. peerHeights := make([]int64, numPeers) for i := 0; i < numPeers; i++ { if i == 0 { @@ -501,7 +505,7 @@ func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPee forLoop: for i := 0; i < int(numBlocks); i++ { - // Add the makeRequestEv step periodically + // Add the makeRequestEv step periodically. if i%int(maxRequestsPerPeer) == 0 { testSteps = append(testSteps, waitForBlock_MakeRequestEv_WantWaitForBlock(maxPendingRequests)) } @@ -512,10 +516,10 @@ forLoop: height++ numBlocksReceived++ - // Add the processedBlockEv step periodically + // Add the processedBlockEv step periodically. if numBlocksReceived >= int(maxRequestsPerPeer) || height >= numBlocks { for j := int(height) - numBlocksReceived; j < int(height); j++ { - if j >= int(numBlocks)-1 { + if j >= int(numBlocks) { // This is the last block that is processed, we should be in "finished" state. testSteps = append(testSteps, waitForBlock_ProcessedBlockEv_WantFinished) break forLoop @@ -531,24 +535,25 @@ forLoop: name: testName, startingHeight: startingHeight, maxRequestsPerPeer: maxRequestsPerPeer, + maxPendingRequests: maxPendingRequests, steps: testSteps, } } func makeCorrectTransitionSequenceWithRandomParameters() testFields { - // generate a starting height for fast sync + // Generate a starting height for fast sync. startingHeight := int64(cmn.RandIntn(maxStartingHeightTest) + 1) - // generate the number of requests per peer + // Generate the number of requests per peer. maxRequestsPerPeer := int32(cmn.RandIntn(maxRequestsPerPeerTest) + 1) - // generate the maximum number of total pending requests + // Generate the maximum number of total pending requests, >= maxRequestsPerPeer. maxPendingRequests := int32(cmn.RandIntn(maxTotalPendingRequestsTest-int(maxRequestsPerPeer))) + maxRequestsPerPeer - // generate the number of blocks to be synced + // Generate the number of blocks to be synced. numBlocks := int64(cmn.RandIntn(maxNumBlocksInChainTest)) + startingHeight - // generate a number of peers and their heights + // Generate a number of peers. numPeers := cmn.RandIntn(maxNumPeersTest) + 1 return makeCorrectTransitionSequence(startingHeight, numBlocks, numPeers, true, maxRequestsPerPeer, maxPendingRequests) @@ -573,11 +578,12 @@ func TestFSMCorrectTransitionSequences(t *testing.T) { for _, step := range tt.steps { assert.Equal(t, step.currentState, testBcR.fsm.state.name) - failSendStatusRequest = step.failStatusReq - failSendBlockRequest = step.failBlockReq oldNumStatusRequests := numStatusRequests fixBlockResponseEvStep(&step, testBcR) + if !shouldApplyProcessedBlockEvStep(&step, testBcR) { + continue + } fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) assert.Equal(t, step.errWanted, fsmErr) @@ -631,6 +637,7 @@ func TestFSMPeerTimeout(t *testing.T) { assert.Equal(t, errNoPeerResponse, lastPeerError.err) peerTimeout = 15 * time.Second + maxRequestsPerPeer = 40 } @@ -685,29 +692,6 @@ func sendStatusResponse(fsm *bReactorFSM, peerID p2p.ID, height int64) { _ = sendMessageToFSMSync(fsm, msgData) } -func sendStatusResponse2(fsm *bReactorFSM, peerID p2p.ID, height int64) { - msgBytes := makeStatusResponseMessage(height) - msgData := &bReactorMessageData{ - event: statusResponseEv, - data: bReactorEventData{ - peerId: peerID, - height: height, - length: len(msgBytes), - }, - } - _ = fsm.handle(msgData) -} - -func sendStateTimeout(fsm *bReactorFSM, name string) { - msgData := &bReactorMessageData{ - event: stateTimeoutEv, - data: bReactorEventData{ - stateName: name, - }, - } - _ = fsm.handle(msgData) -} - // ------------------------------------------------------- // ---------------------------------------------------- diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index dea972b6..bae4cc08 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -142,10 +142,6 @@ func (conR *consensusReactorTest) SwitchToConsensus(state sm.State, blocksSynced func TestFastSyncNoBlockResponse(t *testing.T) { - peerTimeout = 15 * time.Second - maxRequestsPerPeer = 20 - maxNumPendingRequests = 100 - config = cfg.ResetTestRoot("blockchain_new_reactor_test") defer os.RemoveAll(config.RootDir) genDoc, privVals := randGenesisDoc(1, false, 30) @@ -213,9 +209,6 @@ func TestFastSyncNoBlockResponse(t *testing.T) { // Alternatively we could actually dial a TCP conn but // that seems extreme. func TestFastSyncBadBlockStopsPeer(t *testing.T) { - - maxRequestsPerPeer = 20 - maxNumPendingRequests = 400 numNodes := 4 maxBlockHeight := int64(148) @@ -349,16 +342,11 @@ func setupReactors( // WIP - used for some scale testing, will remove func TestFastSyncMultiNode(t *testing.T) { - peerTimeout = 15 * time.Second - numNodes := 8 maxHeight := int64(1000) //numNodes := 20 //maxHeight := int64(10000) - maxRequestsPerPeer = 40 - maxNumPendingRequests = 500 - config = cfg.ResetTestRoot("blockchain_reactor_test") genDoc, privVals := randGenesisDoc(1, false, 30) diff --git a/blockchain_old/reactor_test.go b/blockchain_old/reactor_test.go index c4bb79ff..96df2b98 100644 --- a/blockchain_old/reactor_test.go +++ b/blockchain_old/reactor_test.go @@ -315,6 +315,7 @@ func TestFastSyncMultiNode(t *testing.T) { outerFor: for { + time.Sleep(1 * time.Second) i := 0 for i < numNodes { if !reactorPairs[i].reactor.pool.IsCaughtUp() { @@ -326,7 +327,6 @@ outerFor: fmt.Println("SETUP FAST SYNC Duration", time.Since(start)) break outerFor } else { - time.Sleep(1 * time.Second) } }