From 4fcd7b86acd2f00db8656ecf36bba8e01f2f4108 Mon Sep 17 00:00:00 2001 From: Anca Zamfir Date: Thu, 4 Apr 2019 00:13:32 +0200 Subject: [PATCH] changes to the design added block requests under peer moved the request trigger in the reactor poolRoutine, triggered now by a ticker in general moved everything required for making block requests smarter in the poolRoutine added a simple map of heights to keep track of what will need to be requested next added a few more tests --- blockchain/reactor_test.go | 45 +++- blockchain_new/peer.go | 7 +- blockchain_new/peer_test.go | 2 - blockchain_new/pool.go | 354 +++++++++++++------------ blockchain_new/pool_test.go | 282 +++++++++++--------- blockchain_new/reactor.go | 297 +++++++++++++++------ blockchain_new/reactor_fsm.go | 400 ++++++++++------------------- blockchain_new/reactor_fsm_test.go | 52 ++-- blockchain_new/reactor_test.go | 113 ++++---- 9 files changed, 811 insertions(+), 741 deletions(-) diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index f4682862..c515d65e 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -125,7 +125,7 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals return BlockchainReactorPair{bcReactor, proxyApp} } -func TestNoBlockResponse(t *testing.T) { +func TestFastSyncNoBlockResponse(t *testing.T) { config = cfg.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) genDoc, privVals := randGenesisDoc(1, false, 30) @@ -185,12 +185,12 @@ func TestNoBlockResponse(t *testing.T) { // or without significant refactoring of the module. // Alternatively we could actually dial a TCP conn but // that seems extreme. -func TestBadBlockStopsPeer(t *testing.T) { +func TestFastSyncBadBlockStopsPeer(t *testing.T) { config = cfg.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) genDoc, privVals := randGenesisDoc(1, false, 30) - maxBlockHeight := int64(500) + maxBlockHeight := int64(148) otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) defer func() { @@ -254,6 +254,8 @@ func TestBadBlockStopsPeer(t *testing.T) { } assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) + assert.Equal(t, lastReactorPair.reactor.pool.maxPeerHeight, lastReactorPair.reactor.pool.height) + } func setupReactors( @@ -294,9 +296,14 @@ func TestFastSyncMultiNode(t *testing.T) { numNodes := 8 maxHeight := int64(1000) + //numNodes := 20 + //maxHeight := int64(10000) + config = cfg.ResetTestRoot("blockchain_reactor_test") genDoc, privVals := randGenesisDoc(1, false, 30) + start := time.Now() + reactorPairs, switches := setupReactors(numNodes, maxHeight, genDoc, privVals) defer func() { @@ -306,23 +313,30 @@ func TestFastSyncMultiNode(t *testing.T) { } }() +outerFor: for { - if reactorPairs[numNodes-1].reactor.pool.IsCaughtUp() || reactorPairs[numNodes-1].reactor.Switch.Peers().Size() == 0 { - break + i := 0 + for i < numNodes { + if !reactorPairs[i].reactor.pool.IsCaughtUp() { + break + } + i++ + } + if i == numNodes { + fmt.Println("SETUP FAST SYNC Duration", time.Since(start)) + break outerFor + } else { + time.Sleep(1 * time.Second) } - - time.Sleep(1 * time.Second) } //at this time, reactors[0-3] are the newest - assert.Equal(t, numNodes-1, reactorPairs[1].reactor.Switch.Peers().Size()) + assert.Equal(t, numNodes-1, reactorPairs[0].reactor.Switch.Peers().Size()) lastLogger := log.TestingLogger() lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) reactorPairs = append(reactorPairs, lastReactorPair) - start := time.Now() - switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) return s @@ -333,20 +347,23 @@ func TestFastSyncMultiNode(t *testing.T) { moduleName := fmt.Sprintf("blockchain-%v", addr) lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19])) + start = time.Now() + for i := 0; i < len(reactorPairs)-1; i++ { p2p.Connect2Switches(switches, i, len(reactorPairs)-1) } for { - if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { + time.Sleep(1 * time.Second) + if lastReactorPair.reactor.pool.IsCaughtUp() { + fmt.Println("FAST SYNC Duration", time.Since(start)) break } - - time.Sleep(1 * time.Second) } - fmt.Println(time.Since(start)) assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)) + assert.Equal(t, lastReactorPair.reactor.pool.maxPeerHeight, lastReactorPair.reactor.pool.height) + } //---------------------------------------------- diff --git a/blockchain_new/peer.go b/blockchain_new/peer.go index ac830c44..ff2c3e04 100644 --- a/blockchain_new/peer.go +++ b/blockchain_new/peer.go @@ -8,6 +8,7 @@ import ( flow "github.com/tendermint/tendermint/libs/flowrate" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" ) //-------- @@ -32,8 +33,9 @@ type bpPeer struct { id p2p.ID recvMonitor *flow.Monitor - height int64 - numPending int32 + height int64 // the peer reported height + numPending int32 // number of requests pending assignment or block response + blocks map[int64]*types.Block // blocks received or waiting to be received from this peer timeout *time.Timer didTimeout bool @@ -46,6 +48,7 @@ func newBPPeer( peer := &bpPeer{ id: peerID, height: height, + blocks: make(map[int64]*types.Block, maxRequestsPerPeer), numPending: 0, logger: log.NewNopLogger(), errFunc: errFunc, diff --git a/blockchain_new/peer_test.go b/blockchain_new/peer_test.go index f616fcf5..e277887e 100644 --- a/blockchain_new/peer_test.go +++ b/blockchain_new/peer_test.go @@ -43,7 +43,6 @@ func checkByStoppingPeerTimer(t *testing.T, peer *bpPeer, running bool) { } func TestPeerResetMonitor(t *testing.T) { - peer := &bpPeer{ id: p2p.ID(cmn.RandStr(12)), height: 10, @@ -198,7 +197,6 @@ func TestCanBeRemovedDueToLowSpeed(t *testing.T) { } func TestCleanupPeer(t *testing.T) { - var mtx sync.Mutex peer := &bpPeer{ id: p2p.ID(cmn.RandStr(12)), diff --git a/blockchain_new/pool.go b/blockchain_new/pool.go index 8adb2617..65b1ef16 100644 --- a/blockchain_new/pool.go +++ b/blockchain_new/pool.go @@ -2,6 +2,7 @@ package blockchain_new import ( "fmt" + "sort" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" @@ -9,8 +10,8 @@ import ( ) type blockData struct { - block *types.Block - peerId p2p.ID + block *types.Block + peer *bpPeer } func (bd *blockData) String() string { @@ -18,26 +19,37 @@ func (bd *blockData) String() string { return fmt.Sprintf("blockData nil") } if bd.block == nil { - return fmt.Sprintf("block: nil peer: %v", bd.peerId) + if bd.peer == nil { + return fmt.Sprintf("block: nil peer: nil") + } + return fmt.Sprintf("block: nil peer: %v", bd.peer.id) } - return fmt.Sprintf("block: %v peer: %v", bd.block.Height, bd.peerId) - + return fmt.Sprintf("block: %v peer: %v", bd.block.Height, bd.peer.id) } type blockPool struct { - logger log.Logger - peers map[p2p.ID]*bpPeer - blocks map[int64]*blockData + logger log.Logger + peers map[p2p.ID]*bpPeer + blocks map[int64]p2p.ID + + requests map[int64]bool // list of blocks to be assigned peers for blockRequest + nextRequestHeight int64 // next request to be added to requests + height int64 // processing height - maxPeerHeight int64 + maxPeerHeight int64 // maximum height of all peers + numPending int32 // total numPending across peers + toBcR bcRMessageInterface } -func newBlockPool(height int64) *blockPool { +func newBlockPool(height int64, toBcR bcRMessageInterface) *blockPool { return &blockPool{ - peers: make(map[p2p.ID]*bpPeer), - maxPeerHeight: 0, - blocks: make(map[int64]*blockData), - height: height, + peers: make(map[p2p.ID]*bpPeer), + maxPeerHeight: 0, + blocks: make(map[int64]p2p.ID), + requests: make(map[int64]bool), + nextRequestHeight: height, + height: height, + toBcR: toBcR, } } @@ -53,19 +65,40 @@ func (pool *blockPool) setLogger(l log.Logger) { pool.logger = l } +// GetStatus returns pool's height, numPending requests and the number of +// requests ready to be send in the future. +func (pool *blockPool) getStatus() (height int64, numPending int32, maxPeerHeight int64) { + return pool.height, pool.numPending, pool.maxPeerHeight +} + func (pool blockPool) getMaxPeerHeight() int64 { return pool.maxPeerHeight } func (pool *blockPool) reachedMaxHeight() bool { - return pool.height >= pool.maxPeerHeight + return pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight +} + +func (pool *blockPool) rescheduleRequest(peerID p2p.ID, height int64) { + pool.requests[height] = true + delete(pool.blocks, height) + delete(pool.peers[peerID].blocks, height) +} + +// Updates the pool's max height. If no peers are left maxPeerHeight is set to 0. +func (pool *blockPool) updateMaxPeerHeight() { + var max int64 + for _, peer := range pool.peers { + if peer.height > max { + max = peer.height + } + } + pool.maxPeerHeight = max } // Adds a new peer or updates an existing peer with a new height. -// If the peer is too short it is removed -// Should change function name?? -func (pool *blockPool) updatePeer(peerID p2p.ID, height int64, errFunc func(err error, peerID p2p.ID)) error { - +// If the peer is too short it is removed. +func (pool *blockPool) updatePeer(peerID p2p.ID, height int64) error { peer := pool.peers[peerID] if height < pool.height { @@ -80,88 +113,69 @@ func (pool *blockPool) updatePeer(peerID p2p.ID, height int64, errFunc func(err } if peer == nil { - peer = newBPPeer(peerID, height, errFunc) + // Add new peer. + peer = newBPPeer(peerID, height, pool.toBcR.sendPeerError) peer.setLogger(pool.logger.With("peer", peerID)) pool.peers[peerID] = peer } else { - // remove any requests made for heights in (height, peer.height] - for blockHeight, bData := range pool.blocks { - if bData.peerId == peerID && blockHeight > height { - delete(pool.blocks, blockHeight) + // Update existing peer. + // Remove any requests made for heights in (height, peer.height]. + for h, block := range pool.peers[peerID].blocks { + if h <= height { + continue } + // Reschedule the requests for all blocks waiting for the peer, or received and not processed yet. + if block == nil { + // Since block was not yet received it is counted in numPending, decrement. + pool.numPending-- + pool.peers[peerID].numPending-- + } + pool.rescheduleRequest(peerID, h) } + peer.height = height } - oldH := peer.height - pool.logger.Info("setting peer height to", "peer", peerID, "height", height) - - peer.height = height - - if oldH == pool.maxPeerHeight && height < pool.maxPeerHeight { - // peer was at max height, update max if not other high peers - pool.updateMaxPeerHeight() - } - - if height > pool.maxPeerHeight { - // peer increased height over maxPeerHeight - pool.maxPeerHeight = height - pool.logger.Info("setting maxPeerHeight", "max", pool.maxPeerHeight) - } + pool.updateMaxPeerHeight() return nil } -// If no peers are left, maxPeerHeight is set to 0. -func (pool *blockPool) updateMaxPeerHeight() { - var max int64 - for _, peer := range pool.peers { - if peer.height > max { - max = peer.height +// Stops the peer timer and deletes the peer. Recomputes the max peer height. +func (pool *blockPool) deletePeer(peerID p2p.ID) { + if p, ok := pool.peers[peerID]; ok { + if p.timeout != nil { + p.timeout.Stop() + } + delete(pool.peers, peerID) + + if p.height == pool.maxPeerHeight { + pool.updateMaxPeerHeight() } } - pool.maxPeerHeight = max } -// Stops the peer timer and deletes the peer. Recomputes the max peer height -func (pool *blockPool) deletePeer(peerID p2p.ID) { - p, exist := pool.peers[peerID] - - if !exist { +// Removes any blocks and requests associated with the peer and deletes the peer. +// Also triggers new requests if blocks have been removed. +func (pool *blockPool) removePeer(peerID p2p.ID, err error) { + peer := pool.peers[peerID] + if peer == nil { return } - - if p.timeout != nil { - p.timeout.Stop() - } - - delete(pool.peers, peerID) - - if p.height == pool.maxPeerHeight { - pool.updateMaxPeerHeight() - } - -} - -// removes any blocks and requests associated with the peer, deletes the peer and informs the switch if needed. -func (pool *blockPool) removePeer(peerID p2p.ID, err error) { - pool.logger.Debug("removePeer", "peer", peerID, "err", err) - // remove all data for blocks waiting for the peer or not processed yet - for h, bData := range pool.blocks { - if bData.peerId == peerID { - if h == pool.height { - } - delete(pool.blocks, h) + // Reschedule the requests for all blocks waiting for the peer, or received and not processed yet. + for h, block := range pool.peers[peerID].blocks { + if block == nil { + pool.numPending-- } + pool.rescheduleRequest(peerID, h) } - // delete peer pool.deletePeer(peerID) + } -// called every time FSM advances its height +// Called every time FSM advances its height. func (pool *blockPool) removeShortPeers() { for _, peer := range pool.peers { if peer.height < pool.height { - pool.logger.Info("removeShortPeers", "peer", peer.id) pool.removePeer(peer.id, nil) } } @@ -169,138 +183,150 @@ func (pool *blockPool) removeShortPeers() { // Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map. func (pool *blockPool) addBlock(peerID p2p.ID, block *types.Block, blockSize int) error { - - blockData := pool.blocks[block.Height] - - if blockData == nil { - pool.logger.Error("peer sent us a block we didn't expect", "peer", peerID, "curHeight", pool.height, "blockHeight", block.Height) + if _, ok := pool.peers[peerID]; !ok { + pool.logger.Error("peer doesn't exist", "peer", peerID, "block_receieved", block.Height) return errBadDataFromPeer } - - if blockData.peerId != peerID { - pool.logger.Error("invalid peer", "peer", peerID, "blockHeight", block.Height) + b, ok := pool.peers[peerID].blocks[block.Height] + if !ok { + pool.logger.Error("peer sent us a block we didn't expect", "peer", peerID, "blockHeight", block.Height) + if expPeerID, pok := pool.blocks[block.Height]; pok { + pool.logger.Error("expected this block from peer", "peer", expPeerID) + } return errBadDataFromPeer } - if blockData.block != nil { + if b != nil { pool.logger.Error("already have a block for height", "height", block.Height) return errBadDataFromPeer } - pool.blocks[block.Height].block = block - peer := pool.peers[peerID] - if peer != nil { - peer.decrPending(blockSize) - } + pool.peers[peerID].blocks[block.Height] = block + pool.blocks[block.Height] = peerID + pool.numPending-- + pool.peers[peerID].decrPending(blockSize) + pool.logger.Debug("added new block", "height", block.Height, "from_peer", peerID, "total", len(pool.blocks)) return nil } +func (pool *blockPool) getBlockAndPeerAtHeight(height int64) (bData *blockData, err error) { + peerID := pool.blocks[height] + peer := pool.peers[peerID] + if peer == nil { + return &blockData{}, errMissingBlocks + } + + block, ok := peer.blocks[height] + if !ok || block == nil { + return &blockData{}, errMissingBlocks + } + + return &blockData{peer: peer, block: block}, nil + +} + func (pool *blockPool) getNextTwoBlocks() (first, second *blockData, err error) { - - var block1, block2 *types.Block - - if first = pool.blocks[pool.height]; first != nil { - block1 = first.block - } - if second = pool.blocks[pool.height+1]; second != nil { - block2 = second.block + first, err = pool.getBlockAndPeerAtHeight(pool.height) + second, err2 := pool.getBlockAndPeerAtHeight(pool.height + 1) + if err == nil { + err = err2 } - if block1 == nil || block2 == nil { + if err == errMissingBlocks { // We need both to sync the first block. - pool.logger.Debug("process blocks doesn't have the blocks", "first", block1, "second", block2) - err = errMissingBlocks + pool.logger.Error("missing first two blocks from height", "height", pool.height) } return } -// remove peers that sent us the first two blocks, blocks will also be removed by removePeer() +// Remove peers that sent us the first two blocks, blocks will also be removed by removePeer(). func (pool *blockPool) invalidateFirstTwoBlocks(err error) { - if first, ok := pool.blocks[pool.height]; ok { - pool.removePeer(first.peerId, err) + first, err1 := pool.getBlockAndPeerAtHeight(pool.height) + second, err2 := pool.getBlockAndPeerAtHeight(pool.height + 1) + + if err1 == nil { + pool.removePeer(first.peer.id, err) } - if second, ok := pool.blocks[pool.height+1]; ok { - pool.removePeer(second.peerId, err) + if err2 == nil { + pool.removePeer(second.peer.id, err) } } func (pool *blockPool) processedCurrentHeightBlock() { + peerID := pool.blocks[pool.height] + delete(pool.peers[peerID].blocks, pool.height) delete(pool.blocks, pool.height) pool.height++ pool.removeShortPeers() } -// WIP -// TODO - pace the requests to peers -func (pool *blockPool) sendRequestBatch(sendFunc func(peerID p2p.ID, height int64) error) error { - if len(pool.blocks) > 30 { - return nil +func (pool *blockPool) makeRequestBatch() []int { + pool.removeUselessPeers() + // If running low on planned requests, make more. + for height := pool.nextRequestHeight; pool.numPending < int32(maxNumPendingRequests); height++ { + if pool.nextRequestHeight > pool.maxPeerHeight { + break + } + pool.requests[height] = true + pool.nextRequestHeight++ } - // remove slow and timed out peers + + heights := make([]int, 0, len(pool.requests)) + for k := range pool.requests { + heights = append(heights, int(k)) + } + sort.Ints(heights) + return heights +} + +func (pool *blockPool) removeUselessPeers() { + pool.removeShortPeers() for _, peer := range pool.peers { if err := peer.isGood(); err != nil { - pool.logger.Info("Removing bad peer", "peer", peer.id, "err", err) pool.removePeer(peer.id, err) if err == errSlowPeer { peer.errFunc(errSlowPeer, peer.id) } } } - - var err error - // make requests - for i := 0; i < maxRequestBatchSize; i++ { - // request height - height := pool.height + int64(i) - if height > pool.maxPeerHeight { - pool.logger.Debug("Will not send request for", "height", height, "max", pool.maxPeerHeight) - return err - } - req := pool.blocks[height] - if req == nil { - // make new request - peerId, err := pool.getBestPeer(height) - if err != nil { - // couldn't find a good peer or couldn't communicate with it - continue - } - - pool.logger.Debug("Try to send request to peer", "peer", peerId, "height", height) - err = sendFunc(peerId, height) - if err == errSendQueueFull { - pool.logger.Error("cannot send request, queue full", "peer", peerId, "height", height) - continue - } - if err == errNilPeerForBlockRequest { - // this peer does not exist in the switch, delete locally - pool.logger.Error("peer doesn't exist in the switch", "peer", peerId) - pool.removePeer(peerId, errNilPeerForBlockRequest) - continue - } - - pool.logger.Debug("Sent request to peer", "peer", peerId, "height", height) - } - } - return nil } -func (pool *blockPool) getBestPeer(height int64) (peerId p2p.ID, err error) { - // make requests - // TODO - sort peers in order of goodness - pool.logger.Debug("try to find peer for", "height", height) +func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) (err error) { + pool.removeUselessPeers() + heights := pool.makeRequestBatch() + + for _, height := range heights { + h := int64(height) + if pool.numPending >= int32(len(pool.peers))*maxRequestsPerPeer { + break + } + if err = pool.sendRequest(h); err == errNoPeerFoundForHeight { + break + } + delete(pool.requests, h) + } + + return err +} + +func (pool *blockPool) sendRequest(height int64) error { for _, peer := range pool.peers { - // Send Block Request message to peer + if peer.numPending >= int32(maxRequestsPerPeer) { + continue + } if peer.height < height { continue } - if peer.numPending > int32(maxRequestBatchSize/len(pool.peers)) { - continue - } - // reserve space for block - pool.blocks[height] = &blockData{peerId: peer.id, block: nil} - pool.peers[peer.id].incrPending() - return peer.id, nil - } + pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height) + _ = pool.toBcR.sendBlockRequest(peer.id, height) - pool.logger.Debug("List of peers", "peers", pool.peers) - return "", errNoPeerFoundForRequest + pool.blocks[height] = peer.id + pool.numPending++ + + peer.blocks[height] = nil + peer.incrPending() + + return nil + } + pool.logger.Error("could not find peer to send request for block at height", "height", height) + return errNoPeerFoundForHeight } diff --git a/blockchain_new/pool_test.go b/blockchain_new/pool_test.go index 67f8ce7f..9b9b4491 100644 --- a/blockchain_new/pool_test.go +++ b/blockchain_new/pool_test.go @@ -1,9 +1,11 @@ package blockchain_new import ( - "github.com/stretchr/testify/assert" "reflect" "testing" + "time" + + "github.com/stretchr/testify/assert" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" @@ -13,7 +15,7 @@ import ( type fields struct { logger log.Logger peers map[p2p.ID]*bpPeer - blocks map[int64]*blockData + blocks map[int64]p2p.ID height int64 maxPeerHeight int64 } @@ -23,15 +25,55 @@ type testPeer struct { height int64 } -func testErrFunc(err error, peerID p2p.ID) { +type testPeerResult struct { + id p2p.ID + height int64 + numPending int32 + blocks map[int64]*types.Block } -func makeUpdateFields(log log.Logger, height int64, peers []testPeer, generateBlocks bool) fields { - ufields := fields{ - logger: log, +type testBcR struct { + logger log.Logger +} + +type testValues struct { + numRequestsSent int32 +} + +var testResults testValues + +func resetPoolTestResults() { + testResults.numRequestsSent = 0 +} + +func (testR *testBcR) sendPeerError(err error, peerID p2p.ID) { +} + +func (testR *testBcR) sendStatusRequest() { +} + +func (testR *testBcR) sendBlockRequest(peerID p2p.ID, height int64) error { + testResults.numRequestsSent++ + return nil +} + +func (testR *testBcR) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) { +} + +func (testR *testBcR) switchToConsensus() { +} + +func newTestBcR() *testBcR { + testBcR := &testBcR{logger: log.TestingLogger()} + return testBcR +} + +func makeUpdateFields(bcr *testBcR, height int64, peers []bpPeer, generateBlocks bool) fields { + uFields := fields{ + logger: bcr.logger, height: height, peers: make(map[p2p.ID]*bpPeer), - blocks: make(map[int64]*blockData), + blocks: make(map[int64]p2p.ID), } var maxH int64 @@ -39,31 +81,25 @@ func makeUpdateFields(log log.Logger, height int64, peers []testPeer, generateBl if p.height > maxH { maxH = p.height } - ufields.peers[p.id] = newBPPeer(p.id, p.height, testErrFunc) + uFields.peers[p.id] = newBPPeer(p.id, p.height, bcr.sendPeerError) + uFields.peers[p.id].setLogger(bcr.logger) + } - ufields.maxPeerHeight = maxH - return ufields + uFields.maxPeerHeight = maxH + return uFields } func poolCopy(pool *blockPool) *blockPool { return &blockPool{ peers: peersCopy(pool.peers), logger: pool.logger, - blocks: blocksCopy(pool.blocks), + blocks: pool.blocks, height: pool.height, maxPeerHeight: pool.maxPeerHeight, + toBcR: pool.toBcR, } } -func blocksCopy(blocks map[int64]*blockData) map[int64]*blockData { - blockCopy := make(map[int64]*blockData) - for _, b := range blocks { - blockCopy[b.block.Height] = &blockData{peerId: b.peerId, block: b.block} - - } - return blockCopy -} - func peersCopy(peers map[p2p.ID]*bpPeer) map[p2p.ID]*bpPeer { peerCopy := make(map[p2p.ID]*bpPeer) for _, p := range peers { @@ -72,18 +108,13 @@ func peersCopy(peers map[p2p.ID]*bpPeer) map[p2p.ID]*bpPeer { return peerCopy } -func TestBlockPoolUpdatePeer(t *testing.T) { - l := log.TestingLogger() - type args struct { - peerID p2p.ID - height int64 - errFunc func(err error, peerID p2p.ID) - } +func TestBlockPoolUpdateEmptyPeer(t *testing.T) { + testBcR := newTestBcR() tests := []struct { name string fields fields - args args + args testPeer errWanted error addWanted bool delWanted bool @@ -92,34 +123,34 @@ func TestBlockPoolUpdatePeer(t *testing.T) { { name: "add a first short peer", - fields: makeUpdateFields(l, 100, []testPeer{}, false), - args: args{"P1", 50, func(err error, peerId p2p.ID) {}}, + fields: makeUpdateFields(testBcR, 100, []bpPeer{}, false), + args: testPeer{"P1", 50}, errWanted: errPeerTooShort, maxHeightWanted: int64(0), }, { name: "add a first good peer", - fields: makeUpdateFields(l, 100, []testPeer{}, false), - args: args{"P1", 101, func(err error, peerId p2p.ID) {}}, + fields: makeUpdateFields(testBcR, 100, []bpPeer{}, false), + args: testPeer{"P1", 101}, addWanted: true, maxHeightWanted: int64(101), }, { name: "increase the height of P1 from 120 to 123", - fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false), - args: args{"P1", 123, func(err error, peerId p2p.ID) {}}, + fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, false), + args: testPeer{"P1", 123}, maxHeightWanted: int64(123), }, { name: "decrease the height of P1 from 120 to 110", - fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false), - args: args{"P1", 110, func(err error, peerId p2p.ID) {}}, + fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, false), + args: testPeer{"P1", 110}, maxHeightWanted: int64(110), }, { name: "decrease the height of P1 from 120 to 90", - fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false), - args: args{"P1", 90, func(err error, peerId p2p.ID) {}}, + fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, false), + args: testPeer{"P1", 90}, delWanted: true, errWanted: errPeerTooShort, maxHeightWanted: int64(0), @@ -134,10 +165,11 @@ func TestBlockPoolUpdatePeer(t *testing.T) { blocks: tt.fields.blocks, height: tt.fields.height, maxPeerHeight: tt.fields.maxPeerHeight, + toBcR: testBcR, } beforePool := poolCopy(pool) - err := pool.updatePeer(tt.args.peerID, tt.args.height, tt.args.errFunc) + err := pool.updatePeer(tt.args.id, tt.args.height) if err != tt.errWanted { t.Errorf("blockPool.updatePeer() error = %v, wantErr %v", err, tt.errWanted) } @@ -161,21 +193,21 @@ func TestBlockPoolUpdatePeer(t *testing.T) { } // both add and update - assert.Equal(t, pool.peers[tt.args.peerID].height, tt.args.height) + assert.Equal(t, pool.peers[tt.args.id].height, tt.args.height) assert.Equal(t, tt.maxHeightWanted, pool.maxPeerHeight) }) } } -func TestBlockPoolRemovePeer(t *testing.T) { +func TestBlockPoolRemoveEmptyPeer(t *testing.T) { + testBcR := newTestBcR() + type args struct { peerID p2p.ID err error } - l := log.TestingLogger() - tests := []struct { name string fields fields @@ -184,25 +216,25 @@ func TestBlockPoolRemovePeer(t *testing.T) { }{ { name: "attempt to delete non-existing peer", - fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false), + fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, false), args: args{"P99", nil}, maxHeightWanted: int64(120), }, { name: "delete the only peer", - fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false), + fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, false), args: args{"P1", nil}, maxHeightWanted: int64(0), }, { - name: "delete shorter of two peers", - fields: makeUpdateFields(l, 100, []testPeer{{"P1", 100}, {"P2", 120}}, false), + name: "delete the shortest of two peers", + fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, false), args: args{"P1", nil}, maxHeightWanted: int64(120), }, { - name: "delete taller of two peers", - fields: makeUpdateFields(l, 100, []testPeer{{"P1", 100}, {"P2", 120}}, false), + name: "delete the tallest of two peers", + fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, false), args: args{"P2", nil}, maxHeightWanted: int64(100), }, @@ -225,9 +257,8 @@ func TestBlockPoolRemovePeer(t *testing.T) { } } -func TestBlockPoolRemoveShortPeers(t *testing.T) { - - l := log.TestingLogger() +func TestBlockPoolRemoveShortEmptyPeers(t *testing.T) { + testBcR := newTestBcR() tests := []struct { name string @@ -237,23 +268,23 @@ func TestBlockPoolRemoveShortPeers(t *testing.T) { }{ { name: "no short peers", - fields: makeUpdateFields(l, 100, - []testPeer{{"P1", 100}, {"P2", 110}, {"P3", 120}}, + fields: makeUpdateFields(testBcR, 100, + []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 110}, {id: "P3", height: 120}}, false), maxHeightWanted: int64(120), noChange: true, }, { name: "one short peers", - fields: makeUpdateFields(l, 100, - []testPeer{{"P1", 100}, {"P2", 90}, {"P3", 120}}, + fields: makeUpdateFields(testBcR, 100, + []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 90}, {id: "P3", height: 120}}, false), maxHeightWanted: int64(120), }, { name: "all short peers", - fields: makeUpdateFields(l, 100, - []testPeer{{"P1", 90}, {"P2", 91}, {"P3", 92}}, + fields: makeUpdateFields(testBcR, 100, + []bpPeer{{id: "P1", height: 90}, {id: "P2", height: 91}, {id: "P3", height: 92}}, false), maxHeightWanted: int64(0), }, @@ -288,6 +319,77 @@ func TestBlockPoolRemoveShortPeers(t *testing.T) { } } +func TestBlockPoolSendRequestBatch(t *testing.T) { + testBcR := newTestBcR() + tests := []struct { + name string + fields fields + maxRequestsPerPeer int32 + expRequests map[int64]bool + expPeerResults []testPeerResult + expNumPending int32 + }{ + { + name: "one peer - send up to maxRequestsPerPeer block requests", + fields: makeUpdateFields(testBcR, 10, []bpPeer{{id: "P1", height: 100}}, false), + maxRequestsPerPeer: 2, + expRequests: map[int64]bool{10: true, 11: true}, + expPeerResults: []testPeerResult{{id: "P1", height: 100, numPending: 2, blocks: map[int64]*types.Block{10: nil, 11: nil}}}, + expNumPending: 2, + }, + { + name: "n peers - send n*maxRequestsPerPeer block requests", + fields: makeUpdateFields(testBcR, 10, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, false), + maxRequestsPerPeer: 2, + expRequests: map[int64]bool{10: true, 11: true}, + expPeerResults: []testPeerResult{ + {id: "P1", height: 100, numPending: 2, blocks: map[int64]*types.Block{10: nil, 11: nil}}, + {id: "P2", height: 100, numPending: 2, blocks: map[int64]*types.Block{12: nil, 13: nil}}}, + expNumPending: 4, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resetPoolTestResults() + + pool := &blockPool{ + logger: tt.fields.logger, + peers: tt.fields.peers, + blocks: tt.fields.blocks, + requests: make(map[int64]bool), + height: tt.fields.height, + nextRequestHeight: tt.fields.height, + maxPeerHeight: tt.fields.maxPeerHeight, + toBcR: testBcR, + } + + maxRequestsPerPeer = int32(tt.maxRequestsPerPeer) + //beforePool := poolCopy(pool) + err := pool.makeNextRequests(10) + assert.Nil(t, err) + assert.Equal(t, tt.expNumPending, pool.numPending) + assert.Equal(t, testResults.numRequestsSent, maxRequestsPerPeer*int32(len(pool.peers))) + + for _, tPeer := range tt.expPeerResults { + peer := pool.peers[tPeer.id] + assert.NotNil(t, peer) + assert.Equal(t, tPeer.numPending, peer.numPending) + /* + fmt.Println("tt", tt.name, "peer", peer.id, "expected:", tPeer.blocks, "actual:", peer.blocks) + assert.Equal(t, tPeer.blocks, peer.blocks) + for h, tBl := range tPeer.blocks { + block := peer.blocks[h] + assert.Equal(t, tBl, block) + } + */ + } + assert.Equal(t, testResults.numRequestsSent, maxRequestsPerPeer*int32(len(pool.peers))) + + }) + } +} + func TestBlockPoolAddBlock(t *testing.T) { type args struct { peerID p2p.ID @@ -400,67 +502,3 @@ func TestBlockPoolProcessedCurrentHeightBlock(t *testing.T) { }) } } - -func TestBlockPoolSendRequestBatch(t *testing.T) { - - type args struct { - sendFunc func(peerID p2p.ID, height int64) error - } - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - pool := &blockPool{ - logger: tt.fields.logger, - peers: tt.fields.peers, - blocks: tt.fields.blocks, - height: tt.fields.height, - maxPeerHeight: tt.fields.maxPeerHeight, - } - if err := pool.sendRequestBatch(tt.args.sendFunc); (err != nil) != tt.wantErr { - t.Errorf("blockPool.sendRequestBatch() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestBlockPoolGetBestPeer(t *testing.T) { - - type args struct { - height int64 - } - tests := []struct { - name string - fields fields - args args - wantPeerId p2p.ID - wantErr bool - }{ - // TODO: Add test cases. - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - pool := &blockPool{ - logger: tt.fields.logger, - peers: tt.fields.peers, - blocks: tt.fields.blocks, - height: tt.fields.height, - maxPeerHeight: tt.fields.maxPeerHeight, - } - gotPeerId, err := pool.getBestPeer(tt.args.height) - if (err != nil) != tt.wantErr { - t.Errorf("blockPool.getBestPeer() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(gotPeerId, tt.wantPeerId) { - t.Errorf("blockPool.getBestPeer() = %v, want %v", gotPeerId, tt.wantPeerId) - } - }) - } -} diff --git a/blockchain_new/reactor.go b/blockchain_new/reactor.go index fd07fdbd..2db64b4a 100644 --- a/blockchain_new/reactor.go +++ b/blockchain_new/reactor.go @@ -6,18 +6,29 @@ import ( "reflect" "time" - "github.com/tendermint/go-amino" + amino "github.com/tendermint/go-amino" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" ) const ( // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) BlockchainChannel = byte(0x40) + trySyncIntervalMS = 10 + trySendIntervalMS = 10 - maxTotalMessages = 1000 + // stop syncing when last block's time is + // within this much of the system time. + // stopSyncingDurationMinutes = 10 + + // ask for best height every 10s + statusUpdateIntervalSeconds = 10 + // check if we should switch to consensus reactor + switchToConsensusIntervalSeconds = 1 // NOTE: keep up to date with bcBlockResponseMessage bcBlockResponseMessagePrefixSize = 4 @@ -27,6 +38,11 @@ const ( bcBlockResponseMessageFieldKeySize ) +var ( + maxRequestsPerPeer int32 = 20 + maxNumPendingRequests int32 = 600 +) + type consensusReactor interface { // for when we switch from blockchain reactor and fast sync to // the consensus machine @@ -42,21 +58,6 @@ func (e peerError) Error() string { return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) } -type bReactorMsgFromFSM uint - -// message types -const ( - // message type events - errorMsg = iota + 1 - //blockRequestMsg - //statusRequestMsg -) - -type msgFromFSM struct { - msgType bReactorMsgFromFSM - error peerError -} - // BlockchainReactor handles long-term catchup syncing. type BlockchainReactor struct { p2p.BaseReactor @@ -72,10 +73,20 @@ type BlockchainReactor struct { fsm *bReactorFSM blocksSynced int - lastHundred time.Time - lastRate float64 - msgFromFSMCh chan msgFromFSM + errorsCh chan peerError + messageForFSMCh chan bReactorMessageData // received in poolRoutine from Receive routine +} + +type BlockRequest struct { + Height int64 + PeerID p2p.ID +} + +// bReactorMessageData structure is used by the reactor when sending messages to the FSM. +type bReactorMessageData struct { + event bReactorEvent + data bReactorEventData } // NewBlockchainReactor returns new reactor instance. @@ -87,12 +98,18 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl store.Height())) } + const capacity = 1000 // must be bigger than peers count + errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock + messageForFSMCh := make(chan bReactorMessageData, capacity) // so we don't block in #Receive#pool.AddBlock + bcR := &BlockchainReactor{ - initialState: state, - state: state, - blockExec: blockExec, - fastSync: fastSync, - store: store, + initialState: state, + state: state, + blockExec: blockExec, + fastSync: fastSync, + store: store, + errorsCh: errorsCh, + messageForFSMCh: messageForFSMCh, } fsm := NewFSM(store.Height()+1, bcR) bcR.fsm = fsm @@ -109,7 +126,6 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) { // OnStart implements cmn.Service. func (bcR *BlockchainReactor) OnStart() error { if bcR.fastSync { - bcR.fsm.start() go bcR.poolRoutine() } return nil @@ -117,7 +133,7 @@ func (bcR *BlockchainReactor) OnStart() error { // OnStop implements cmn.Service. func (bcR *BlockchainReactor) OnStop() { - bcR.fsm.stop() + bcR.Stop() } // GetChannels implements Reactor @@ -143,16 +159,11 @@ func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) { // bcStatusResponseMessage from the peer and call pool.SetPeerHeight } -// RemovePeer implements Reactor by removing peer from the pool. -func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { - bcR.fsm.RemovePeer(peer.ID()) -} - // respondToPeer loads a block and sends it to the requesting peer, // if we have it. Otherwise, we'll respond saying we don't have it. // According to the Tendermint spec, if all nodes are honest, // no node should be requesting for a block that's non-existent. -func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage, +func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcBlockRequestMessage, src p2p.Peer) (queued bool) { block := bcR.store.LoadBlock(msg.Height) @@ -167,6 +178,31 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage, return src.TrySend(BlockchainChannel, msgBytes) } +func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcStatusRequestMessage, src p2p.Peer) (queued bool) { + msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()}) + return src.TrySend(BlockchainChannel, msgBytes) +} + +func (bcR *BlockchainReactor) sendMessageToFSMAsync(msg bReactorMessageData) { + bcR.Logger.Error("send message to FSM for processing", "msg", msg.String()) + bcR.messageForFSMCh <- msg +} + +func (bcR *BlockchainReactor) sendRemovePeerToFSM(peerID p2p.ID) { + msgData := bReactorMessageData{ + event: peerRemoveEv, + data: bReactorEventData{ + peerId: peerID, + }, + } + bcR.sendMessageToFSMAsync(msgData) +} + +// RemovePeer implements Reactor by removing peer from the pool. +func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + bcR.sendRemovePeerToFSM(peer.ID()) +} + // Receive implements Reactor by handling 4 types of messages (look below). func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { msg, err := decodeMsg(msgBytes) @@ -186,9 +222,18 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) switch msg := msg.(type) { case *bcBlockRequestMessage: - if queued := bcR.respondToPeer(msg, src); !queued { + if queued := bcR.sendBlockToPeer(msg, src); !queued { // Unfortunately not queued since the queue is full. + bcR.Logger.Error("Could not send block message to peer", "src", src, "height", msg.Height) } + + case *bcStatusRequestMessage: + // Send peer our state. + if queued := bcR.sendStatusResponseToPeer(msg, src); !queued { + // Unfortunately not queued since the queue is full. + bcR.Logger.Error("Could not send status message to peer", "src", src) + } + case *bcBlockResponseMessage: msgData := bReactorMessageData{ event: blockResponseEv, @@ -198,15 +243,8 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) length: len(msgBytes), }, } - sendMessageToFSM(bcR.fsm, msgData) + bcR.sendMessageToFSMAsync(msgData) - case *bcStatusRequestMessage: - // Send peer our state. - msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()}) - queued := src.TrySend(BlockchainChannel, msgBytes) - if !queued { - // sorry - } case *bcStatusResponseMessage: // Got a peer status. Unverified. msgData := bReactorMessageData{ @@ -217,14 +255,141 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) length: len(msgBytes), }, } - sendMessageToFSM(bcR.fsm, msgData) + bcR.sendMessageToFSMAsync(msgData) default: bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg))) } } -func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Block) error { +// Handle messages from the poolReactor telling the reactor what to do. +// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! +func (bcR *BlockchainReactor) poolRoutine() { + + bcR.fsm.start() + + trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) + trySendTicker := time.NewTicker(trySendIntervalMS * time.Millisecond) + + statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second) + + lastHundred := time.Now() + lastRate := 0.0 + + doProcessCh := make(chan struct{}, 1) + doSendCh := make(chan struct{}, 1) + +FOR_LOOP: + for { + select { + + case <-trySendTicker.C: // chan time + select { + case doSendCh <- struct{}{}: + default: + } + continue FOR_LOOP + + case <-doSendCh: + msgData := bReactorMessageData{ + event: makeRequestsEv, + data: bReactorEventData{ + maxNumRequests: maxNumPendingRequests, + }, + } + _ = sendMessageToFSMSync(bcR.fsm, msgData) + + case err := <-bcR.errorsCh: + bcR.reportPeerErrorToSwitch(err.err, err.peerID) + + case <-statusUpdateTicker.C: + // Ask for status updates. + go bcR.sendStatusRequest() + + case <-switchToConsensusTicker.C: + height, numPending, maxPeerHeight := bcR.fsm.pool.getStatus() + outbound, inbound, _ := bcR.Switch.NumPeers() + bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "maxPeerHeight", maxPeerHeight, + "outbound", outbound, "inbound", inbound) + if bcR.fsm.isCaughtUp() { + bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) + bcR.fsm.stop() + bcR.switchToConsensus() + break FOR_LOOP + } + + case <-trySyncTicker.C: // chan time + select { + case doProcessCh <- struct{}{}: + default: + } + //continue FOR_LOOP + + case <-doProcessCh: + err := bcR.processBlocksFromPoolRoutine() + if err == errMissingBlocks { + continue FOR_LOOP + } + + // Notify FSM of block processing result. + msgData := bReactorMessageData{ + event: processedBlockEv, + data: bReactorEventData{ + err: err, + }, + } + _ = sendMessageToFSMSync(bcR.fsm, msgData) + + if err == errBlockVerificationFailure { + continue FOR_LOOP + } + + doProcessCh <- struct{}{} + + bcR.blocksSynced++ + if bcR.blocksSynced%100 == 0 { + lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) + bcR.Logger.Info("Fast Sync Rate", "height", bcR.fsm.pool.height, + "max_peer_height", bcR.fsm.pool.getMaxPeerHeight(), "blocks/s", lastRate) + lastHundred = time.Now() + } + continue FOR_LOOP + + case msg := <-bcR.messageForFSMCh: + _ = sendMessageToFSMSync(bcR.fsm, msg) + + case <-bcR.Quit(): + break FOR_LOOP + } + } +} + +func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID) { + peer := bcR.Switch.Peers().Get(peerID) + if peer != nil { + bcR.Switch.StopPeerForError(peer, err) + } +} + +// Called by FSM and pool: +// - pool calls when it detects slow peer or when peer times out +// - FSM calls when: +// - processing a block (addBlock) fails +// - BCR process of block reports failure to FSM, FSM sends back the peers of first and second +func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { + bcR.errorsCh <- peerError{err, peerID} +} + +func (bcR *BlockchainReactor) processBlocksFromPoolRoutine() error { + firstBP, secondBP, err := bcR.fsm.pool.getNextTwoBlocks() + if err != nil { + // We need both to sync the first block. + return err + } + + first := firstBP.block + second := secondBP.block chainID := bcR.initialState.ChainID @@ -235,7 +400,7 @@ func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Bl // NOTE: we can probably make this more efficient, but note that calling // first.Hash() doesn't verify the tx contents, so MakePartSet() is // currently necessary. - err := bcR.state.Validators.VerifyCommit( + err = bcR.state.Validators.VerifyCommit( chainID, firstID, first.Height, second.LastCommit) if err != nil { @@ -245,50 +410,15 @@ func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Bl bcR.store.SaveBlock(first, firstParts, second.LastCommit) - // get the hash without persisting the state + // Get the hash without persisting the state. bcR.state, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first) if err != nil { // TODO This is bad, are we zombie? panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } - bcR.blocksSynced++ - - if bcR.blocksSynced%100 == 0 { - bcR.lastRate = 0.9*bcR.lastRate + 0.1*(100/time.Since(bcR.lastHundred).Seconds()) - bcR.Logger.Info("Fast Sync Rate", "height", bcR.fsm.pool.height, - "max_peer_height", bcR.fsm.pool.getMaxPeerHeight(), "blocks/s", bcR.lastRate) - bcR.lastHundred = time.Now() - } return nil } -// Handle messages from the poolReactor telling the reactor what to do. -// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! -func (bcR *BlockchainReactor) poolRoutine() { - - for { - select { - case fromFSM := <-bcR.msgFromFSMCh: - switch fromFSM.msgType { - case errorMsg: - peer := bcR.Switch.Peers().Get(fromFSM.error.peerID) - if peer != nil { - bcR.Switch.StopPeerForError(peer, fromFSM.error.err) - } - default: - } - } - } -} - -// TODO - send the error on msgFromFSMCh to process it above -func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { - peer := bcR.Switch.Peers().Get(peerID) - if peer != nil { - bcR.Switch.StopPeerForError(peer, err) - } -} - func (bcR *BlockchainReactor) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) { if timer == nil { timer = time.AfterFunc(timeout, f) @@ -298,10 +428,9 @@ func (bcR *BlockchainReactor) resetStateTimer(name string, timer *time.Timer, ti } // BroadcastStatusRequest broadcasts `BlockStore` height. -func (bcR *BlockchainReactor) sendStatusRequest() error { +func (bcR *BlockchainReactor) sendStatusRequest() { msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.store.Height()}) bcR.Switch.Broadcast(BlockchainChannel, msgBytes) - return nil } // BlockRequest sends `BlockRequest` height. @@ -326,7 +455,7 @@ func (bcR *BlockchainReactor) switchToConsensus() { if ok { conR.SwitchToConsensus(bcR.state, bcR.blocksSynced) } else { - // should only happen during testing + // Should only happen during testing. } } diff --git a/blockchain_new/reactor_fsm.go b/blockchain_new/reactor_fsm.go index 74258f4f..7db96e62 100644 --- a/blockchain_new/reactor_fsm.go +++ b/blockchain_new/reactor_fsm.go @@ -10,14 +10,10 @@ import ( "github.com/tendermint/tendermint/types" ) -var ( - // should be >= 2 - maxRequestBatchSize = 40 -) - // Blockchain Reactor State type bReactorFSMState struct { name string + // called when transitioning out of current state handle func(*bReactorFSM, bReactorEvent, bReactorEventData) (next *bReactorFSMState, err error) // called when entering the state @@ -32,39 +28,56 @@ func (s *bReactorFSMState) String() string { return s.name } +// Interface used by FSM for sending Block and Status requests, +// informing of peer errors and state timeouts +// Implemented by BlockchainReactor and tests +type bcRMessageInterface interface { + sendStatusRequest() + sendBlockRequest(peerID p2p.ID, height int64) error + sendPeerError(err error, peerID p2p.ID) + resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) + switchToConsensus() +} + // Blockchain Reactor State Machine type bReactorFSM struct { logger log.Logger startTime time.Time state *bReactorFSMState + pool *blockPool - pool *blockPool - - // channel to receive messages - messageCh chan bReactorMessageData - processSignalActive bool - - // interface used to send StatusRequest, BlockRequest, errors - bcr sendMessage + // interface used to call the Blockchain reactor to send StatusRequest, BlockRequest, reporting errors, etc. + toBcR bcRMessageInterface } -// bReactorEventData is part of the message sent by the reactor to the FSM and used by the state handlers +// bReactorEventData is part of the message sent by the reactor to the FSM and used by the state handlers. type bReactorEventData struct { - peerId p2p.ID - err error // for peer error: timeout, slow, - height int64 // for status response - block *types.Block // for block response - stateName string // for state timeout events - length int // for block response to detect slow peers - + peerId p2p.ID + err error // for peer error: timeout, slow; for processed block event if error occurred + height int64 // for status response; for processed block event + block *types.Block // for block response + stateName string // for state timeout events + length int // for block response event, length of received block, used to detect slow peers + maxNumRequests int32 // for request needed event, maximum number of pending requests } -// bReactorMessageData structure is used by the reactor when sending messages to the FSM. -type bReactorMessageData struct { - event bReactorEvent - data bReactorEventData -} +// Blockchain Reactor Events (the input to the state machine) +type bReactorEvent uint + +const ( + // message type events + startFSMEv = iota + 1 + statusResponseEv + blockResponseEv + processedBlockEv + makeRequestsEv + stopFSMEv + + // other events + peerRemoveEv = iota + 256 + stateTimeoutEv +) func (msg *bReactorMessageData) String() string { var dataStr string @@ -76,14 +89,17 @@ func (msg *bReactorMessageData) String() string { dataStr = fmt.Sprintf("peer: %v height: %v", msg.data.peerId, msg.data.height) case blockResponseEv: dataStr = fmt.Sprintf("peer: %v block.height: %v lenght: %v", msg.data.peerId, msg.data.block.Height, msg.data.length) - case tryProcessBlockEv: - dataStr = "" + case processedBlockEv: + dataStr = fmt.Sprintf("block processing returned following error: %v", msg.data.err) + case makeRequestsEv: + dataStr = fmt.Sprintf("new requests needed") case stopFSMEv: dataStr = "" - case peerErrEv: - dataStr = fmt.Sprintf("peer: %v err: %v", msg.data.peerId, msg.data.err) + case peerRemoveEv: + dataStr = fmt.Sprintf("peer: %v is being removed by the switch", msg.data.peerId) case stateTimeoutEv: dataStr = fmt.Sprintf("state: %v", msg.data.stateName) + default: dataStr = fmt.Sprintf("cannot interpret message data") return "event unknown" @@ -92,22 +108,6 @@ func (msg *bReactorMessageData) String() string { return fmt.Sprintf("event: %v %v", msg.event, dataStr) } -// Blockchain Reactor Events (the input to the state machine). -type bReactorEvent uint - -const ( - // message type events - startFSMEv = iota + 1 - statusResponseEv - blockResponseEv - tryProcessBlockEv - stopFSMEv - - // other events - peerErrEv = iota + 256 - stateTimeoutEv -) - func (ev bReactorEvent) String() string { switch ev { case startFSMEv: @@ -116,12 +116,14 @@ func (ev bReactorEvent) String() string { return "statusResponseEv" case blockResponseEv: return "blockResponseEv" - case tryProcessBlockEv: - return "tryProcessBlockEv" + case processedBlockEv: + return "processedBlockEv" + case makeRequestsEv: + return "makeRequestsEv" case stopFSMEv: return "stopFSMEv" - case peerErrEv: - return "peerErrEv" + case peerRemoveEv: + return "peerRemoveEv" case stateTimeoutEv: return "stateTimeoutEv" default: @@ -140,26 +142,22 @@ var ( // state timers var ( - waitForPeerTimeout = 20 * time.Second - waitForBlockTimeout = 30 * time.Second // > peerTimeout which is 15 sec + waitForPeerTimeout = 2 * time.Second ) // errors var ( - // errors - errInvalidEvent = errors.New("invalid event in current state") errNoErrorFinished = errors.New("FSM is finished") + errInvalidEvent = errors.New("invalid event in current state") errNoPeerResponse = errors.New("FSM timed out on peer response") - errNoPeerFoundForRequest = errors.New("cannot use peer") errBadDataFromPeer = errors.New("received from wrong peer or bad block") errMissingBlocks = errors.New("missing blocks") errBlockVerificationFailure = errors.New("block verification failure, redo") errNilPeerForBlockRequest = errors.New("nil peer for block request") errSendQueueFull = errors.New("block request not made, send-queue is full") - errPeerTooShort = errors.New("peer height too low, peer was either not added " + - "or removed after status update") - errSwitchPeerErr = errors.New("switch detected peer error") - errSlowPeer = errors.New("peer is not sending us data fast enough") + errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added") + errSlowPeer = errors.New("peer is not sending us data fast enough") + errNoPeerFoundForHeight = errors.New("could not found peer for block request") ) func init() { @@ -169,14 +167,13 @@ func init() { switch ev { case startFSMEv: // Broadcast Status message. Currently doesn't return non-nil error. - _ = fsm.bcr.sendStatusRequest() + fsm.toBcR.sendStatusRequest() if fsm.state.timer != nil { fsm.state.timer.Stop() } return waitForPeer, nil case stopFSMEv: - // cleanup return finished, errNoErrorFinished default: @@ -189,13 +186,13 @@ func init() { name: "waitForPeer", timeout: waitForPeerTimeout, enter: func(fsm *bReactorFSM) { - // stop when leaving the state + // Stop when leaving the state. fsm.resetStateTimer(waitForPeer) }, handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { switch ev { case stateTimeoutEv: - // no statusResponse received from any peer + // There was no statusResponse received from any peer. // Should we send status request again? if fsm.state.timer != nil { fsm.state.timer.Stop() @@ -203,26 +200,14 @@ func init() { return finished, errNoPeerResponse case statusResponseEv: - // update peer - if err := fsm.pool.updatePeer(data.peerId, data.height, fsm.processPeerError); err != nil { + if err := fsm.pool.updatePeer(data.peerId, data.height); err != nil { if len(fsm.pool.peers) == 0 { return waitForPeer, err } } - - // send first block requests - err := fsm.pool.sendRequestBatch(fsm.bcr.sendBlockRequest) - if err != nil { - // wait for more peers or state timeout - return waitForPeer, err - } - if fsm.state.timer != nil { - fsm.state.timer.Stop() - } return waitForBlock, nil case stopFSMEv: - // cleanup return finished, errNoErrorFinished default: @@ -232,106 +217,65 @@ func init() { } waitForBlock = &bReactorFSMState{ - name: "waitForBlock", - timeout: waitForBlockTimeout, - enter: func(fsm *bReactorFSM) { - // stop when leaving the state or receiving a block - fsm.resetStateTimer(waitForBlock) - }, + name: "waitForBlock", handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { switch ev { - case stateTimeoutEv: - // no blockResponse - // Should we send status request again? Switch to consensus? - // Note that any unresponsive peers have been already removed by their timer expiry handler. - return finished, errNoPeerResponse case statusResponseEv: - err := fsm.pool.updatePeer(data.peerId, data.height, fsm.processPeerError) + err := fsm.pool.updatePeer(data.peerId, data.height) if len(fsm.pool.peers) == 0 { - _ = fsm.bcr.sendStatusRequest() + fsm.toBcR.sendStatusRequest() if fsm.state.timer != nil { fsm.state.timer.Stop() } return waitForPeer, err } - if fsm.state.timer != nil { - fsm.state.timer.Stop() - } return waitForBlock, err case blockResponseEv: fsm.logger.Debug("blockResponseEv", "H", data.block.Height) err := fsm.pool.addBlock(data.peerId, data.block, data.length) if err != nil { - // unsolicited, from different peer, already have it.. + // A block was received that was unsolicited, from unexpected peer, or that we already have it. + // Ignore block, remove peer and send error to switch. fsm.pool.removePeer(data.peerId, err) - // ignore block - // send error to switch if err != nil { - fsm.bcr.sendPeerError(err, data.peerId) + fsm.toBcR.sendPeerError(err, data.peerId) } - fsm.processSignalActive = false - if fsm.state.timer != nil { - fsm.state.timer.Stop() - } - return waitForBlock, err } - fsm.sendSignalToProcessBlock() - - if fsm.state.timer != nil { - fsm.state.timer.Stop() - } - return waitForBlock, nil - - case tryProcessBlockEv: - fsm.logger.Debug("FSM blocks", "blocks", fsm.pool.blocks, "fsm_height", fsm.pool.height) - // process block, it detects errors and deals with them - fsm.processBlock() - - // processed block, check if we are done - if fsm.pool.reachedMaxHeight() { - // TODO should we wait for more status responses in case a high peer is slow? - fsm.logger.Info("Switching to consensus!!!!") - fsm.bcr.switchToConsensus() - return finished, nil - } - - // get other block(s) - err := fsm.pool.sendRequestBatch(fsm.bcr.sendBlockRequest) - if err != nil { - // TBD on what to do here... - // wait for more peers or state timeout - } - fsm.sendSignalToProcessBlock() - - if fsm.state.timer != nil { - fsm.state.timer.Stop() - } return waitForBlock, err - case peerErrEv: - // This event is sent by: - // - the switch to report disconnected and errored peers - // - when the FSM pool peer times out + case processedBlockEv: + fsm.logger.Debug("processedBlockEv", "err", data.err) + first, second, _ := fsm.pool.getNextTwoBlocks() + if data.err != nil { + fsm.logger.Error("process blocks returned error", "err", data.err, "first", first.block.Height, "second", second.block.Height) + fsm.logger.Error("send peer error for", "peer", first.peer.id) + fsm.toBcR.sendPeerError(data.err, first.peer.id) + fsm.logger.Error("send peer error for", "peer", second.peer.id) + fsm.toBcR.sendPeerError(data.err, second.peer.id) + fsm.pool.invalidateFirstTwoBlocks(data.err) + } else { + fsm.pool.processedCurrentHeightBlock() + if fsm.pool.reachedMaxHeight() { + fsm.stop() + return finished, nil + } + } + + return waitForBlock, data.err + + case peerRemoveEv: + // This event is sent by the switch to remove disconnected and errored peers. fsm.pool.removePeer(data.peerId, data.err) - if data.err != nil && data.err != errSwitchPeerErr { - // not sent by the switch so report it - fsm.bcr.sendPeerError(data.err, data.peerId) - } - err := fsm.pool.sendRequestBatch(fsm.bcr.sendBlockRequest) - if err != nil { - // TBD on what to do here... - // wait for more peers or state timeout - } - if fsm.state.timer != nil { - fsm.state.timer.Stop() - } + return waitForBlock, nil + + case makeRequestsEv: + err := fsm.makeNextRequests(data.maxNumRequests) return waitForBlock, err case stopFSMEv: - // cleanup return finished, errNoErrorFinished default: @@ -343,69 +287,45 @@ func init() { finished = &bReactorFSMState{ name: "finished", enter: func(fsm *bReactorFSM) { - // cleanup + fsm.cleanup() }, handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { return nil, nil }, } - } -func NewFSM(height int64, bcr sendMessage) *bReactorFSM { - messageCh := make(chan bReactorMessageData, maxTotalMessages) - +func NewFSM(height int64, toBcR bcRMessageInterface) *bReactorFSM { return &bReactorFSM{ state: unknown, - pool: newBlockPool(height), - bcr: bcr, - messageCh: messageCh, + startTime: time.Now(), + pool: newBlockPool(height, toBcR), + toBcR: toBcR, } } -func sendMessageToFSM(fsm *bReactorFSM, msg bReactorMessageData) { - fsm.logger.Debug("send message to FSM", "msg", msg.String()) - fsm.messageCh <- msg -} - func (fsm *bReactorFSM) setLogger(l log.Logger) { fsm.logger = l fsm.pool.setLogger(l) } -// starts the FSM go routine +func sendMessageToFSMSync(fsm *bReactorFSM, msg bReactorMessageData) error { + err := fsm.handle(&msg) + return err +} + +// Starts the FSM goroutine. func (fsm *bReactorFSM) start() { - go fsm.startRoutine() - fsm.startTime = time.Now() + _ = sendMessageToFSMSync(fsm, bReactorMessageData{ + event: startFSMEv, + }) } -// stops the FSM go routine +// Stops the FSM goroutine. func (fsm *bReactorFSM) stop() { - msg := bReactorMessageData{ + _ = sendMessageToFSMSync(fsm, bReactorMessageData{ event: stopFSMEv, - } - sendMessageToFSM(fsm, msg) -} - -// start the FSM -func (fsm *bReactorFSM) startRoutine() { - - _ = fsm.handle(&bReactorMessageData{event: startFSMEv}) - -forLoop: - for { - select { - case msg := <-fsm.messageCh: - fsm.logger.Debug("FSM Received message", "msg", msg.String()) - _ = fsm.handle(&msg) - if msg.event == stopFSMEv { - break forLoop - } - // TODO - stop also on some errors returned by handle - - default: - } - } + }) } // handle processes messages and events sent to the FSM. @@ -432,8 +352,6 @@ func (fsm *bReactorFSM) transition(next *bReactorFSMState) { if next == nil { return } - fsm.logger.Debug("changes state: ", "old", fsm.state.name, "new", next.name) - if fsm.state != next { fsm.state = next if next.enter != nil { @@ -442,17 +360,6 @@ func (fsm *bReactorFSM) transition(next *bReactorFSMState) { } } -// Interface for sending Block and Status requests -// Implemented by BlockchainReactor and tests -type sendMessage interface { - sendStatusRequest() error - sendBlockRequest(peerID p2p.ID, height int64) error - sendPeerError(err error, peerID p2p.ID) - processBlocks(first *types.Block, second *types.Block) error - resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) - switchToConsensus() -} - // FSM state timeout handler func (fsm *bReactorFSM) sendStateTimeoutEvent(stateName string) { // Check that the timeout is for the state we are currently in to prevent wrong transitions. @@ -463,84 +370,35 @@ func (fsm *bReactorFSM) sendStateTimeoutEvent(stateName string) { stateName: stateName, }, } - sendMessageToFSM(fsm, msg) + _ = sendMessageToFSMSync(fsm, msg) } } -// This is called when entering an FSM state in order to detect lack of progress in the state machine. -// Note the use of the 'bcr' interface to facilitate testing without timer running. +// Called when entering an FSM state in order to detect lack of progress in the state machine. +// Note the use of the 'bcr' interface to facilitate testing without timer expiring. func (fsm *bReactorFSM) resetStateTimer(state *bReactorFSMState) { - fsm.bcr.resetStateTimer(state.name, state.timer, state.timeout, func() { + fsm.toBcR.resetStateTimer(state.name, state.timer, state.timeout, func() { fsm.sendStateTimeoutEvent(state.name) }) } -// called by the switch on peer error -func (fsm *bReactorFSM) RemovePeer(peerID p2p.ID) { - fsm.logger.Info("Switch removes peer", "peer", peerID, "fsm_height", fsm.pool.height) - fsm.processPeerError(errSwitchPeerErr, peerID) +func (fsm *bReactorFSM) isCaughtUp() bool { + // Some conditions to determine if we're caught up. + // Ensures we've either received a block or waited some amount of time, + // and that we're synced to the highest known height. + // Note we use maxPeerHeight - 1 because to sync block H requires block H+1 + // to verify the LastCommit. + receivedBlockOrTimedOut := fsm.pool.height > 0 || time.Since(fsm.startTime) > 5*time.Second + ourChainIsLongestAmongPeers := fsm.pool.maxPeerHeight == 0 || fsm.pool.height >= (fsm.pool.maxPeerHeight-1) || fsm.state == finished + isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers + + return isCaughtUp } -func (fsm *bReactorFSM) sendSignalToProcessBlock() { - _, _, err := fsm.pool.getNextTwoBlocks() - if err != nil { - fsm.logger.Debug("No need to send signal, blocks missing") - return - } - - fsm.logger.Debug("Send signal to process blocks") - if fsm.processSignalActive { - fsm.logger.Debug("..already sent") - return - } - - msgData := bReactorMessageData{ - event: tryProcessBlockEv, - data: bReactorEventData{}, - } - sendMessageToFSM(fsm, msgData) - fsm.processSignalActive = true +func (fsm *bReactorFSM) cleanup() { + // TODO } -// Processes block at height H = fsm.height. Expects both H and H+1 to be available -func (fsm *bReactorFSM) processBlock() { - first, second, err := fsm.pool.getNextTwoBlocks() - if err != nil { - fsm.processSignalActive = false - return - } - fsm.logger.Debug("process blocks", "first", first.block.Height, "second", second.block.Height) - fsm.logger.Debug("FSM blocks", "blocks", fsm.pool.blocks) - - if err = fsm.bcr.processBlocks(first.block, second.block); err != nil { - fsm.logger.Error("process blocks returned error", "err", err, "first", first.block.Height, "second", second.block.Height) - fsm.logger.Error("FSM blocks", "blocks", fsm.pool.blocks) - fsm.pool.invalidateFirstTwoBlocks(err) - fsm.bcr.sendPeerError(err, first.peerId) - fsm.bcr.sendPeerError(err, second.peerId) - - } else { - fsm.pool.processedCurrentHeightBlock() - } - - fsm.processSignalActive = false -} - -func (fsm *bReactorFSM) IsFinished() bool { - return fsm.state == finished -} - -// called from: -// - the switch from its go routing -// - when peer times out from the timer go routine. -// Send message to FSM -func (fsm *bReactorFSM) processPeerError(err error, peerID p2p.ID) { - msgData := bReactorMessageData{ - event: peerErrEv, - data: bReactorEventData{ - err: err, - peerId: peerID, - }, - } - sendMessageToFSM(fsm, msgData) +func (fsm *bReactorFSM) makeNextRequests(maxNumPendingRequests int32) error { + return fsm.pool.makeNextRequests(maxNumPendingRequests) } diff --git a/blockchain_new/reactor_fsm_test.go b/blockchain_new/reactor_fsm_test.go index b061a990..b2a865cf 100644 --- a/blockchain_new/reactor_fsm_test.go +++ b/blockchain_new/reactor_fsm_test.go @@ -1,21 +1,20 @@ package blockchain_new import ( + "testing" + "time" + "github.com/stretchr/testify/assert" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" - - "testing" - "time" ) var ( failSendStatusRequest bool failSendBlockRequest bool numStatusRequests int - numBlockRequests int + numBlockRequests int32 ) type lastBlockRequestT struct { @@ -72,13 +71,16 @@ func newTestReactor() *testReactor { // WIP func TestFSMTransitionSequences(t *testing.T) { - maxRequestBatchSize = 2 + maxRequestsPerPeer = 2 fsmTransitionSequenceTests := [][]fsmStepTestValues{ { {currentState: "unknown", event: startFSMEv, shouldSendStatusReq: true, expectedState: "waitForPeer"}, {currentState: "waitForPeer", event: statusResponseEv, - data: bReactorEventData{peerId: "P1", height: 10}, + data: bReactorEventData{peerId: "P1", height: 10}, + expectedState: "waitForBlock"}, + {currentState: "waitForBlock", event: makeRequestsEv, + data: bReactorEventData{maxNumRequests: maxNumPendingRequests}, blockReqIncreased: true, expectedState: "waitForBlock"}, }, @@ -98,6 +100,7 @@ func TestFSMTransitionSequences(t *testing.T) { oldNumBlockRequests := numBlockRequests _ = sendEventToFSM(testBcR.fsm, step.event, step.data) + if step.shouldSendStatusReq { assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) } else { @@ -105,7 +108,7 @@ func TestFSMTransitionSequences(t *testing.T) { } if step.blockReqIncreased { - assert.Equal(t, oldNumBlockRequests+maxRequestBatchSize, numBlockRequests) + assert.Equal(t, oldNumBlockRequests+maxRequestsPerPeer, numBlockRequests) } else { assert.Equal(t, oldNumBlockRequests, numBlockRequests) } @@ -116,7 +119,7 @@ func TestFSMTransitionSequences(t *testing.T) { } func TestReactorFSMBasic(t *testing.T) { - maxRequestBatchSize = 2 + maxRequestsPerPeer = 2 // Create test reactor testBcR := newTestReactor() @@ -134,15 +137,19 @@ func TestReactorFSMBasic(t *testing.T) { peerID := p2p.ID(cmn.RandStr(12)) sendStatusResponse2(fsm, peerID, 10) + if err := fsm.handle(&bReactorMessageData{ + event: makeRequestsEv, + data: bReactorEventData{maxNumRequests: maxNumPendingRequests}}); err != nil { + } // Check that FSM sends a block request message and... - assert.Equal(t, maxRequestBatchSize, numBlockRequests) + assert.Equal(t, maxRequestsPerPeer, numBlockRequests) // ... the block request has the expected height - assert.Equal(t, int64(maxRequestBatchSize), lastBlockRequest.height) + assert.Equal(t, maxRequestsPerPeer, int32(len(fsm.pool.peers[peerID].blocks))) assert.Equal(t, waitForBlock.name, fsm.state.name) } func TestReactorFSMPeerTimeout(t *testing.T) { - maxRequestBatchSize = 2 + maxRequestsPerPeer = 2 resetTestValues() peerTimeout = 20 * time.Millisecond // Create and start the FSM @@ -159,10 +166,14 @@ func TestReactorFSMPeerTimeout(t *testing.T) { sendStatusResponse(fsm, peerID, 10) time.Sleep(5 * time.Millisecond) + if err := fsm.handle(&bReactorMessageData{ + event: makeRequestsEv, + data: bReactorEventData{maxNumRequests: maxNumPendingRequests}}); err != nil { + } // Check that FSM sends a block request message and... - assert.Equal(t, maxRequestBatchSize, numBlockRequests) + assert.Equal(t, maxRequestsPerPeer, numBlockRequests) // ... the block request has the expected height and peer - assert.Equal(t, int64(maxRequestBatchSize), lastBlockRequest.height) + assert.Equal(t, maxRequestsPerPeer, int32(len(fsm.pool.peers[peerID].blocks))) assert.Equal(t, peerID, lastBlockRequest.peerID) // let FSM timeout on the block response message @@ -190,13 +201,9 @@ func (testR *testReactor) sendPeerError(err error, peerID p2p.ID) { lastPeerError.err = err } -func (testR *testReactor) sendStatusRequest() error { +func (testR *testReactor) sendStatusRequest() { testR.logger.Info("Reactor received sendStatusRequest call from FSM") numStatusRequests++ - if failSendStatusRequest { - return errSendQueueFull - } - return nil } func (testR *testReactor) sendBlockRequest(peerID p2p.ID, height int64) error { @@ -216,11 +223,6 @@ func (testR *testReactor) resetStateTimer(name string, timer *time.Timer, timeou } } -func (testR *testReactor) processBlocks(first *types.Block, second *types.Block) error { - testR.logger.Info("Reactor received processBlocks call from FSM", "first", first.Height, "second", second.Height) - return nil -} - func (testR *testReactor) switchToConsensus() { testR.logger.Info("Reactor received switchToConsensus call from FSM") @@ -241,7 +243,7 @@ func sendStatusResponse(fsm *bReactorFSM, peerID p2p.ID, height int64) { }, } - sendMessageToFSM(fsm, msgData) + _ = sendMessageToFSMSync(fsm, msgData) } func sendStatusResponse2(fsm *bReactorFSM, peerID p2p.ID, height int64) { diff --git a/blockchain_new/reactor_test.go b/blockchain_new/reactor_test.go index d0f59279..d0d535fa 100644 --- a/blockchain_new/reactor_test.go +++ b/blockchain_new/reactor_test.go @@ -124,9 +124,10 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals return BlockchainReactorPair{bcReactor, proxyApp} } -func TestNoBlockResponse(t *testing.T) { +func TestFastSyncNoBlockResponse(t *testing.T) { peerTimeout = 15 * time.Second - maxRequestBatchSize = 40 + maxRequestsPerPeer = 20 + maxNumPendingRequests = 100 config = cfg.ResetTestRoot("blockchain_new_reactor_test") defer os.RemoveAll(config.RootDir) @@ -136,24 +137,19 @@ func TestNoBlockResponse(t *testing.T) { reactorPairs := make([]BlockchainReactorPair, 2) - logger1 := log.TestingLogger() - reactorPairs[0] = newBlockchainReactor(logger1, genDoc, privVals, maxBlockHeight) - logger2 := log.TestingLogger() - reactorPairs[1] = newBlockchainReactor(logger2, genDoc, privVals, 0) + logger := log.TestingLogger() + reactorPairs[0] = newBlockchainReactor(logger, genDoc, privVals, maxBlockHeight) + reactorPairs[1] = newBlockchainReactor(logger, genDoc, privVals, 0) p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + moduleName := fmt.Sprintf("blockchain-%v", i) + reactorPairs[i].reactor.SetLogger(logger.With("module", moduleName)) + return s }, p2p.Connect2Switches) - addr0 := reactorPairs[0].reactor.Switch.NodeInfo().ID() - moduleName := fmt.Sprintf("blockchain-%v", addr0) - reactorPairs[0].reactor.SetLogger(logger1.With("module", moduleName[:19])) - addr1 := reactorPairs[1].reactor.Switch.NodeInfo().ID() - moduleName = fmt.Sprintf("blockchain-%v", addr1) - reactorPairs[1].reactor.SetLogger(logger1.With("module", moduleName[:19])) - defer func() { for _, r := range reactorPairs { _ = r.reactor.Stop() @@ -172,11 +168,10 @@ func TestNoBlockResponse(t *testing.T) { } for { - if reactorPairs[1].reactor.fsm.IsFinished() { + time.Sleep(1 * time.Second) + if reactorPairs[1].reactor.fsm.isCaughtUp() { break } - - time.Sleep(10 * time.Millisecond) } assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height()) @@ -196,14 +191,17 @@ func TestNoBlockResponse(t *testing.T) { // or without significant refactoring of the module. // Alternatively we could actually dial a TCP conn but // that seems extreme. -func TestBadBlockStopsPeer(t *testing.T) { +func TestFastSyncBadBlockStopsPeer(t *testing.T) { peerTimeout = 15 * time.Second - maxRequestBatchSize = 40 + + maxRequestsPerPeer = 20 + maxNumPendingRequests = 20 + config = cfg.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) genDoc, privVals := randGenesisDoc(1, false, 30) - maxBlockHeight := int64(500) + maxBlockHeight := int64(148) otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) defer func() { @@ -226,16 +224,12 @@ func TestBadBlockStopsPeer(t *testing.T) { switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + moduleName := fmt.Sprintf("blockchain-%v", i) + reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName)) return s }, p2p.Connect2Switches) - for i := 0; i < 4; i++ { - addr := reactorPairs[i].reactor.Switch.NodeInfo().ID() - moduleName := fmt.Sprintf("blockchain-%v", addr) - reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19])) - } - defer func() { for _, r := range reactorPairs { _ = r.reactor.Stop() @@ -244,11 +238,10 @@ func TestBadBlockStopsPeer(t *testing.T) { }() for { - if reactorPairs[3].reactor.fsm.IsFinished() || reactorPairs[3].reactor.Switch.Peers().Size() == 0 { + time.Sleep(1 * time.Second) + if reactorPairs[3].reactor.fsm.isCaughtUp() || reactorPairs[3].reactor.Switch.Peers().Size() == 0 { break } - - time.Sleep(1 * time.Second) } //at this time, reactors[0-3] is the newest @@ -263,24 +256,21 @@ func TestBadBlockStopsPeer(t *testing.T) { switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) + moduleName := fmt.Sprintf("blockchain-%v", len(reactorPairs)-1) + reactorPairs[len(reactorPairs)-1].reactor.SetLogger(lastLogger.With("module", moduleName)) return s }, p2p.Connect2Switches)...) - addr := lastReactorPair.reactor.Switch.NodeInfo().ID() - moduleName := fmt.Sprintf("blockchain-%v", addr) - lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19])) - for i := 0; i < len(reactorPairs)-1; i++ { p2p.Connect2Switches(switches, i, len(reactorPairs)-1) } for { - if lastReactorPair.reactor.fsm.IsFinished() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { + time.Sleep(1 * time.Second) + if lastReactorPair.reactor.fsm.isCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { break } - - time.Sleep(1 * time.Second) } assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) @@ -307,29 +297,32 @@ func setupReactors( switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + moduleName := fmt.Sprintf("blockchain-%v", i) + reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName)) return s }, p2p.Connect2Switches) - for i := 0; i < numReactors; i++ { - addr := reactorPairs[i].reactor.Switch.NodeInfo().ID() - moduleName := fmt.Sprintf("blockchain-%v", addr) - reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19])) - } - return reactorPairs, switches } +// WIP - used for some scale testing, will remove func TestFastSyncMultiNode(t *testing.T) { + peerTimeout = 15 * time.Second numNodes := 8 maxHeight := int64(1000) - peerTimeout = 15 * time.Second - maxRequestBatchSize = 80 + //numNodes := 20 + //maxHeight := int64(10000) + + maxRequestsPerPeer = 40 + maxNumPendingRequests = 500 config = cfg.ResetTestRoot("blockchain_reactor_test") genDoc, privVals := randGenesisDoc(1, false, 30) + start := time.Now() + reactorPairs, switches := setupReactors(numNodes, maxHeight, genDoc, privVals) defer func() { @@ -339,16 +332,25 @@ func TestFastSyncMultiNode(t *testing.T) { } }() +outerFor: for { - if reactorPairs[numNodes-1].reactor.fsm.IsFinished() || reactorPairs[numNodes-1].reactor.Switch.Peers().Size() == 0 { - break + i := 0 + for i < numNodes { + if !reactorPairs[i].reactor.fsm.isCaughtUp() { + break + } + i++ + } + if i == numNodes { + fmt.Println("SETUP FAST SYNC Duration", time.Since(start)) + break outerFor + } else { + time.Sleep(1 * time.Second) } - - time.Sleep(1 * time.Second) } //at this time, reactors[0-3] are the newest - assert.Equal(t, numNodes-1, reactorPairs[1].reactor.Switch.Peers().Size()) + assert.Equal(t, numNodes-1, reactorPairs[0].reactor.Switch.Peers().Size()) lastLogger := log.TestingLogger() lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) @@ -356,29 +358,26 @@ func TestFastSyncMultiNode(t *testing.T) { switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) + moduleName := fmt.Sprintf("blockchainTEST-%d", len(reactorPairs)-1) + reactorPairs[len(reactorPairs)-1].reactor.SetLogger(lastLogger.With("module", moduleName)) return s }, p2p.Connect2Switches)...) - addr := lastReactorPair.reactor.Switch.NodeInfo().ID() - moduleName := fmt.Sprintf("blockchain-%v", addr) - lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19])) - - start := time.Now() + start = time.Now() for i := 0; i < len(reactorPairs)-1; i++ { p2p.Connect2Switches(switches, i, len(reactorPairs)-1) } for { - if lastReactorPair.reactor.fsm.IsFinished() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { + time.Sleep(1 * time.Second) + if lastReactorPair.reactor.fsm.isCaughtUp() { + fmt.Println("FAST SYNC Duration", time.Since(start)) break } - - time.Sleep(1 * time.Second) } - fmt.Println(time.Since(start)) assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)) assert.Equal(t, lastReactorPair.reactor.fsm.pool.getMaxPeerHeight(), lastReactorPair.reactor.fsm.pool.height) }