changes to the design

added block requests under peer

moved the request trigger in the reactor poolRoutine, triggered now by a ticker

in general moved everything required for making block requests smarter in the poolRoutine

added a simple map of heights to keep track of what will need to be requested next

added a few more tests
This commit is contained in:
Anca Zamfir 2019-04-04 00:13:32 +02:00
parent 70c703860e
commit 4fcd7b86ac
9 changed files with 811 additions and 741 deletions

View File

@ -125,7 +125,7 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals
return BlockchainReactorPair{bcReactor, proxyApp}
}
func TestNoBlockResponse(t *testing.T) {
func TestFastSyncNoBlockResponse(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
@ -185,12 +185,12 @@ func TestNoBlockResponse(t *testing.T) {
// or without significant refactoring of the module.
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
func TestBadBlockStopsPeer(t *testing.T) {
func TestFastSyncBadBlockStopsPeer(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(500)
maxBlockHeight := int64(148)
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
defer func() {
@ -254,6 +254,8 @@ func TestBadBlockStopsPeer(t *testing.T) {
}
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
assert.Equal(t, lastReactorPair.reactor.pool.maxPeerHeight, lastReactorPair.reactor.pool.height)
}
func setupReactors(
@ -294,9 +296,14 @@ func TestFastSyncMultiNode(t *testing.T) {
numNodes := 8
maxHeight := int64(1000)
//numNodes := 20
//maxHeight := int64(10000)
config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30)
start := time.Now()
reactorPairs, switches := setupReactors(numNodes, maxHeight, genDoc, privVals)
defer func() {
@ -306,23 +313,30 @@ func TestFastSyncMultiNode(t *testing.T) {
}
}()
outerFor:
for {
if reactorPairs[numNodes-1].reactor.pool.IsCaughtUp() || reactorPairs[numNodes-1].reactor.Switch.Peers().Size() == 0 {
break
i := 0
for i < numNodes {
if !reactorPairs[i].reactor.pool.IsCaughtUp() {
break
}
i++
}
if i == numNodes {
fmt.Println("SETUP FAST SYNC Duration", time.Since(start))
break outerFor
} else {
time.Sleep(1 * time.Second)
}
time.Sleep(1 * time.Second)
}
//at this time, reactors[0-3] are the newest
assert.Equal(t, numNodes-1, reactorPairs[1].reactor.Switch.Peers().Size())
assert.Equal(t, numNodes-1, reactorPairs[0].reactor.Switch.Peers().Size())
lastLogger := log.TestingLogger()
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)
start := time.Now()
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s
@ -333,20 +347,23 @@ func TestFastSyncMultiNode(t *testing.T) {
moduleName := fmt.Sprintf("blockchain-%v", addr)
lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19]))
start = time.Now()
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
for {
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
time.Sleep(1 * time.Second)
if lastReactorPair.reactor.pool.IsCaughtUp() {
fmt.Println("FAST SYNC Duration", time.Since(start))
break
}
time.Sleep(1 * time.Second)
}
fmt.Println(time.Since(start))
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs))
assert.Equal(t, lastReactorPair.reactor.pool.maxPeerHeight, lastReactorPair.reactor.pool.height)
}
//----------------------------------------------

View File

@ -8,6 +8,7 @@ import (
flow "github.com/tendermint/tendermint/libs/flowrate"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
//--------
@ -32,8 +33,9 @@ type bpPeer struct {
id p2p.ID
recvMonitor *flow.Monitor
height int64
numPending int32
height int64 // the peer reported height
numPending int32 // number of requests pending assignment or block response
blocks map[int64]*types.Block // blocks received or waiting to be received from this peer
timeout *time.Timer
didTimeout bool
@ -46,6 +48,7 @@ func newBPPeer(
peer := &bpPeer{
id: peerID,
height: height,
blocks: make(map[int64]*types.Block, maxRequestsPerPeer),
numPending: 0,
logger: log.NewNopLogger(),
errFunc: errFunc,

View File

@ -43,7 +43,6 @@ func checkByStoppingPeerTimer(t *testing.T, peer *bpPeer, running bool) {
}
func TestPeerResetMonitor(t *testing.T) {
peer := &bpPeer{
id: p2p.ID(cmn.RandStr(12)),
height: 10,
@ -198,7 +197,6 @@ func TestCanBeRemovedDueToLowSpeed(t *testing.T) {
}
func TestCleanupPeer(t *testing.T) {
var mtx sync.Mutex
peer := &bpPeer{
id: p2p.ID(cmn.RandStr(12)),

View File

@ -2,6 +2,7 @@ package blockchain_new
import (
"fmt"
"sort"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
@ -9,8 +10,8 @@ import (
)
type blockData struct {
block *types.Block
peerId p2p.ID
block *types.Block
peer *bpPeer
}
func (bd *blockData) String() string {
@ -18,26 +19,37 @@ func (bd *blockData) String() string {
return fmt.Sprintf("blockData nil")
}
if bd.block == nil {
return fmt.Sprintf("block: nil peer: %v", bd.peerId)
if bd.peer == nil {
return fmt.Sprintf("block: nil peer: nil")
}
return fmt.Sprintf("block: nil peer: %v", bd.peer.id)
}
return fmt.Sprintf("block: %v peer: %v", bd.block.Height, bd.peerId)
return fmt.Sprintf("block: %v peer: %v", bd.block.Height, bd.peer.id)
}
type blockPool struct {
logger log.Logger
peers map[p2p.ID]*bpPeer
blocks map[int64]*blockData
logger log.Logger
peers map[p2p.ID]*bpPeer
blocks map[int64]p2p.ID
requests map[int64]bool // list of blocks to be assigned peers for blockRequest
nextRequestHeight int64 // next request to be added to requests
height int64 // processing height
maxPeerHeight int64
maxPeerHeight int64 // maximum height of all peers
numPending int32 // total numPending across peers
toBcR bcRMessageInterface
}
func newBlockPool(height int64) *blockPool {
func newBlockPool(height int64, toBcR bcRMessageInterface) *blockPool {
return &blockPool{
peers: make(map[p2p.ID]*bpPeer),
maxPeerHeight: 0,
blocks: make(map[int64]*blockData),
height: height,
peers: make(map[p2p.ID]*bpPeer),
maxPeerHeight: 0,
blocks: make(map[int64]p2p.ID),
requests: make(map[int64]bool),
nextRequestHeight: height,
height: height,
toBcR: toBcR,
}
}
@ -53,19 +65,40 @@ func (pool *blockPool) setLogger(l log.Logger) {
pool.logger = l
}
// GetStatus returns pool's height, numPending requests and the number of
// requests ready to be send in the future.
func (pool *blockPool) getStatus() (height int64, numPending int32, maxPeerHeight int64) {
return pool.height, pool.numPending, pool.maxPeerHeight
}
func (pool blockPool) getMaxPeerHeight() int64 {
return pool.maxPeerHeight
}
func (pool *blockPool) reachedMaxHeight() bool {
return pool.height >= pool.maxPeerHeight
return pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight
}
func (pool *blockPool) rescheduleRequest(peerID p2p.ID, height int64) {
pool.requests[height] = true
delete(pool.blocks, height)
delete(pool.peers[peerID].blocks, height)
}
// Updates the pool's max height. If no peers are left maxPeerHeight is set to 0.
func (pool *blockPool) updateMaxPeerHeight() {
var max int64
for _, peer := range pool.peers {
if peer.height > max {
max = peer.height
}
}
pool.maxPeerHeight = max
}
// Adds a new peer or updates an existing peer with a new height.
// If the peer is too short it is removed
// Should change function name??
func (pool *blockPool) updatePeer(peerID p2p.ID, height int64, errFunc func(err error, peerID p2p.ID)) error {
// If the peer is too short it is removed.
func (pool *blockPool) updatePeer(peerID p2p.ID, height int64) error {
peer := pool.peers[peerID]
if height < pool.height {
@ -80,88 +113,69 @@ func (pool *blockPool) updatePeer(peerID p2p.ID, height int64, errFunc func(err
}
if peer == nil {
peer = newBPPeer(peerID, height, errFunc)
// Add new peer.
peer = newBPPeer(peerID, height, pool.toBcR.sendPeerError)
peer.setLogger(pool.logger.With("peer", peerID))
pool.peers[peerID] = peer
} else {
// remove any requests made for heights in (height, peer.height]
for blockHeight, bData := range pool.blocks {
if bData.peerId == peerID && blockHeight > height {
delete(pool.blocks, blockHeight)
// Update existing peer.
// Remove any requests made for heights in (height, peer.height].
for h, block := range pool.peers[peerID].blocks {
if h <= height {
continue
}
// Reschedule the requests for all blocks waiting for the peer, or received and not processed yet.
if block == nil {
// Since block was not yet received it is counted in numPending, decrement.
pool.numPending--
pool.peers[peerID].numPending--
}
pool.rescheduleRequest(peerID, h)
}
peer.height = height
}
oldH := peer.height
pool.logger.Info("setting peer height to", "peer", peerID, "height", height)
peer.height = height
if oldH == pool.maxPeerHeight && height < pool.maxPeerHeight {
// peer was at max height, update max if not other high peers
pool.updateMaxPeerHeight()
}
if height > pool.maxPeerHeight {
// peer increased height over maxPeerHeight
pool.maxPeerHeight = height
pool.logger.Info("setting maxPeerHeight", "max", pool.maxPeerHeight)
}
pool.updateMaxPeerHeight()
return nil
}
// If no peers are left, maxPeerHeight is set to 0.
func (pool *blockPool) updateMaxPeerHeight() {
var max int64
for _, peer := range pool.peers {
if peer.height > max {
max = peer.height
// Stops the peer timer and deletes the peer. Recomputes the max peer height.
func (pool *blockPool) deletePeer(peerID p2p.ID) {
if p, ok := pool.peers[peerID]; ok {
if p.timeout != nil {
p.timeout.Stop()
}
delete(pool.peers, peerID)
if p.height == pool.maxPeerHeight {
pool.updateMaxPeerHeight()
}
}
pool.maxPeerHeight = max
}
// Stops the peer timer and deletes the peer. Recomputes the max peer height
func (pool *blockPool) deletePeer(peerID p2p.ID) {
p, exist := pool.peers[peerID]
if !exist {
// Removes any blocks and requests associated with the peer and deletes the peer.
// Also triggers new requests if blocks have been removed.
func (pool *blockPool) removePeer(peerID p2p.ID, err error) {
peer := pool.peers[peerID]
if peer == nil {
return
}
if p.timeout != nil {
p.timeout.Stop()
}
delete(pool.peers, peerID)
if p.height == pool.maxPeerHeight {
pool.updateMaxPeerHeight()
}
}
// removes any blocks and requests associated with the peer, deletes the peer and informs the switch if needed.
func (pool *blockPool) removePeer(peerID p2p.ID, err error) {
pool.logger.Debug("removePeer", "peer", peerID, "err", err)
// remove all data for blocks waiting for the peer or not processed yet
for h, bData := range pool.blocks {
if bData.peerId == peerID {
if h == pool.height {
}
delete(pool.blocks, h)
// Reschedule the requests for all blocks waiting for the peer, or received and not processed yet.
for h, block := range pool.peers[peerID].blocks {
if block == nil {
pool.numPending--
}
pool.rescheduleRequest(peerID, h)
}
// delete peer
pool.deletePeer(peerID)
}
// called every time FSM advances its height
// Called every time FSM advances its height.
func (pool *blockPool) removeShortPeers() {
for _, peer := range pool.peers {
if peer.height < pool.height {
pool.logger.Info("removeShortPeers", "peer", peer.id)
pool.removePeer(peer.id, nil)
}
}
@ -169,138 +183,150 @@ func (pool *blockPool) removeShortPeers() {
// Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map.
func (pool *blockPool) addBlock(peerID p2p.ID, block *types.Block, blockSize int) error {
blockData := pool.blocks[block.Height]
if blockData == nil {
pool.logger.Error("peer sent us a block we didn't expect", "peer", peerID, "curHeight", pool.height, "blockHeight", block.Height)
if _, ok := pool.peers[peerID]; !ok {
pool.logger.Error("peer doesn't exist", "peer", peerID, "block_receieved", block.Height)
return errBadDataFromPeer
}
if blockData.peerId != peerID {
pool.logger.Error("invalid peer", "peer", peerID, "blockHeight", block.Height)
b, ok := pool.peers[peerID].blocks[block.Height]
if !ok {
pool.logger.Error("peer sent us a block we didn't expect", "peer", peerID, "blockHeight", block.Height)
if expPeerID, pok := pool.blocks[block.Height]; pok {
pool.logger.Error("expected this block from peer", "peer", expPeerID)
}
return errBadDataFromPeer
}
if blockData.block != nil {
if b != nil {
pool.logger.Error("already have a block for height", "height", block.Height)
return errBadDataFromPeer
}
pool.blocks[block.Height].block = block
peer := pool.peers[peerID]
if peer != nil {
peer.decrPending(blockSize)
}
pool.peers[peerID].blocks[block.Height] = block
pool.blocks[block.Height] = peerID
pool.numPending--
pool.peers[peerID].decrPending(blockSize)
pool.logger.Debug("added new block", "height", block.Height, "from_peer", peerID, "total", len(pool.blocks))
return nil
}
func (pool *blockPool) getBlockAndPeerAtHeight(height int64) (bData *blockData, err error) {
peerID := pool.blocks[height]
peer := pool.peers[peerID]
if peer == nil {
return &blockData{}, errMissingBlocks
}
block, ok := peer.blocks[height]
if !ok || block == nil {
return &blockData{}, errMissingBlocks
}
return &blockData{peer: peer, block: block}, nil
}
func (pool *blockPool) getNextTwoBlocks() (first, second *blockData, err error) {
var block1, block2 *types.Block
if first = pool.blocks[pool.height]; first != nil {
block1 = first.block
}
if second = pool.blocks[pool.height+1]; second != nil {
block2 = second.block
first, err = pool.getBlockAndPeerAtHeight(pool.height)
second, err2 := pool.getBlockAndPeerAtHeight(pool.height + 1)
if err == nil {
err = err2
}
if block1 == nil || block2 == nil {
if err == errMissingBlocks {
// We need both to sync the first block.
pool.logger.Debug("process blocks doesn't have the blocks", "first", block1, "second", block2)
err = errMissingBlocks
pool.logger.Error("missing first two blocks from height", "height", pool.height)
}
return
}
// remove peers that sent us the first two blocks, blocks will also be removed by removePeer()
// Remove peers that sent us the first two blocks, blocks will also be removed by removePeer().
func (pool *blockPool) invalidateFirstTwoBlocks(err error) {
if first, ok := pool.blocks[pool.height]; ok {
pool.removePeer(first.peerId, err)
first, err1 := pool.getBlockAndPeerAtHeight(pool.height)
second, err2 := pool.getBlockAndPeerAtHeight(pool.height + 1)
if err1 == nil {
pool.removePeer(first.peer.id, err)
}
if second, ok := pool.blocks[pool.height+1]; ok {
pool.removePeer(second.peerId, err)
if err2 == nil {
pool.removePeer(second.peer.id, err)
}
}
func (pool *blockPool) processedCurrentHeightBlock() {
peerID := pool.blocks[pool.height]
delete(pool.peers[peerID].blocks, pool.height)
delete(pool.blocks, pool.height)
pool.height++
pool.removeShortPeers()
}
// WIP
// TODO - pace the requests to peers
func (pool *blockPool) sendRequestBatch(sendFunc func(peerID p2p.ID, height int64) error) error {
if len(pool.blocks) > 30 {
return nil
func (pool *blockPool) makeRequestBatch() []int {
pool.removeUselessPeers()
// If running low on planned requests, make more.
for height := pool.nextRequestHeight; pool.numPending < int32(maxNumPendingRequests); height++ {
if pool.nextRequestHeight > pool.maxPeerHeight {
break
}
pool.requests[height] = true
pool.nextRequestHeight++
}
// remove slow and timed out peers
heights := make([]int, 0, len(pool.requests))
for k := range pool.requests {
heights = append(heights, int(k))
}
sort.Ints(heights)
return heights
}
func (pool *blockPool) removeUselessPeers() {
pool.removeShortPeers()
for _, peer := range pool.peers {
if err := peer.isGood(); err != nil {
pool.logger.Info("Removing bad peer", "peer", peer.id, "err", err)
pool.removePeer(peer.id, err)
if err == errSlowPeer {
peer.errFunc(errSlowPeer, peer.id)
}
}
}
var err error
// make requests
for i := 0; i < maxRequestBatchSize; i++ {
// request height
height := pool.height + int64(i)
if height > pool.maxPeerHeight {
pool.logger.Debug("Will not send request for", "height", height, "max", pool.maxPeerHeight)
return err
}
req := pool.blocks[height]
if req == nil {
// make new request
peerId, err := pool.getBestPeer(height)
if err != nil {
// couldn't find a good peer or couldn't communicate with it
continue
}
pool.logger.Debug("Try to send request to peer", "peer", peerId, "height", height)
err = sendFunc(peerId, height)
if err == errSendQueueFull {
pool.logger.Error("cannot send request, queue full", "peer", peerId, "height", height)
continue
}
if err == errNilPeerForBlockRequest {
// this peer does not exist in the switch, delete locally
pool.logger.Error("peer doesn't exist in the switch", "peer", peerId)
pool.removePeer(peerId, errNilPeerForBlockRequest)
continue
}
pool.logger.Debug("Sent request to peer", "peer", peerId, "height", height)
}
}
return nil
}
func (pool *blockPool) getBestPeer(height int64) (peerId p2p.ID, err error) {
// make requests
// TODO - sort peers in order of goodness
pool.logger.Debug("try to find peer for", "height", height)
func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) (err error) {
pool.removeUselessPeers()
heights := pool.makeRequestBatch()
for _, height := range heights {
h := int64(height)
if pool.numPending >= int32(len(pool.peers))*maxRequestsPerPeer {
break
}
if err = pool.sendRequest(h); err == errNoPeerFoundForHeight {
break
}
delete(pool.requests, h)
}
return err
}
func (pool *blockPool) sendRequest(height int64) error {
for _, peer := range pool.peers {
// Send Block Request message to peer
if peer.numPending >= int32(maxRequestsPerPeer) {
continue
}
if peer.height < height {
continue
}
if peer.numPending > int32(maxRequestBatchSize/len(pool.peers)) {
continue
}
// reserve space for block
pool.blocks[height] = &blockData{peerId: peer.id, block: nil}
pool.peers[peer.id].incrPending()
return peer.id, nil
}
pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height)
_ = pool.toBcR.sendBlockRequest(peer.id, height)
pool.logger.Debug("List of peers", "peers", pool.peers)
return "", errNoPeerFoundForRequest
pool.blocks[height] = peer.id
pool.numPending++
peer.blocks[height] = nil
peer.incrPending()
return nil
}
pool.logger.Error("could not find peer to send request for block at height", "height", height)
return errNoPeerFoundForHeight
}

View File

@ -1,9 +1,11 @@
package blockchain_new
import (
"github.com/stretchr/testify/assert"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
@ -13,7 +15,7 @@ import (
type fields struct {
logger log.Logger
peers map[p2p.ID]*bpPeer
blocks map[int64]*blockData
blocks map[int64]p2p.ID
height int64
maxPeerHeight int64
}
@ -23,15 +25,55 @@ type testPeer struct {
height int64
}
func testErrFunc(err error, peerID p2p.ID) {
type testPeerResult struct {
id p2p.ID
height int64
numPending int32
blocks map[int64]*types.Block
}
func makeUpdateFields(log log.Logger, height int64, peers []testPeer, generateBlocks bool) fields {
ufields := fields{
logger: log,
type testBcR struct {
logger log.Logger
}
type testValues struct {
numRequestsSent int32
}
var testResults testValues
func resetPoolTestResults() {
testResults.numRequestsSent = 0
}
func (testR *testBcR) sendPeerError(err error, peerID p2p.ID) {
}
func (testR *testBcR) sendStatusRequest() {
}
func (testR *testBcR) sendBlockRequest(peerID p2p.ID, height int64) error {
testResults.numRequestsSent++
return nil
}
func (testR *testBcR) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) {
}
func (testR *testBcR) switchToConsensus() {
}
func newTestBcR() *testBcR {
testBcR := &testBcR{logger: log.TestingLogger()}
return testBcR
}
func makeUpdateFields(bcr *testBcR, height int64, peers []bpPeer, generateBlocks bool) fields {
uFields := fields{
logger: bcr.logger,
height: height,
peers: make(map[p2p.ID]*bpPeer),
blocks: make(map[int64]*blockData),
blocks: make(map[int64]p2p.ID),
}
var maxH int64
@ -39,31 +81,25 @@ func makeUpdateFields(log log.Logger, height int64, peers []testPeer, generateBl
if p.height > maxH {
maxH = p.height
}
ufields.peers[p.id] = newBPPeer(p.id, p.height, testErrFunc)
uFields.peers[p.id] = newBPPeer(p.id, p.height, bcr.sendPeerError)
uFields.peers[p.id].setLogger(bcr.logger)
}
ufields.maxPeerHeight = maxH
return ufields
uFields.maxPeerHeight = maxH
return uFields
}
func poolCopy(pool *blockPool) *blockPool {
return &blockPool{
peers: peersCopy(pool.peers),
logger: pool.logger,
blocks: blocksCopy(pool.blocks),
blocks: pool.blocks,
height: pool.height,
maxPeerHeight: pool.maxPeerHeight,
toBcR: pool.toBcR,
}
}
func blocksCopy(blocks map[int64]*blockData) map[int64]*blockData {
blockCopy := make(map[int64]*blockData)
for _, b := range blocks {
blockCopy[b.block.Height] = &blockData{peerId: b.peerId, block: b.block}
}
return blockCopy
}
func peersCopy(peers map[p2p.ID]*bpPeer) map[p2p.ID]*bpPeer {
peerCopy := make(map[p2p.ID]*bpPeer)
for _, p := range peers {
@ -72,18 +108,13 @@ func peersCopy(peers map[p2p.ID]*bpPeer) map[p2p.ID]*bpPeer {
return peerCopy
}
func TestBlockPoolUpdatePeer(t *testing.T) {
l := log.TestingLogger()
type args struct {
peerID p2p.ID
height int64
errFunc func(err error, peerID p2p.ID)
}
func TestBlockPoolUpdateEmptyPeer(t *testing.T) {
testBcR := newTestBcR()
tests := []struct {
name string
fields fields
args args
args testPeer
errWanted error
addWanted bool
delWanted bool
@ -92,34 +123,34 @@ func TestBlockPoolUpdatePeer(t *testing.T) {
{
name: "add a first short peer",
fields: makeUpdateFields(l, 100, []testPeer{}, false),
args: args{"P1", 50, func(err error, peerId p2p.ID) {}},
fields: makeUpdateFields(testBcR, 100, []bpPeer{}, false),
args: testPeer{"P1", 50},
errWanted: errPeerTooShort,
maxHeightWanted: int64(0),
},
{
name: "add a first good peer",
fields: makeUpdateFields(l, 100, []testPeer{}, false),
args: args{"P1", 101, func(err error, peerId p2p.ID) {}},
fields: makeUpdateFields(testBcR, 100, []bpPeer{}, false),
args: testPeer{"P1", 101},
addWanted: true,
maxHeightWanted: int64(101),
},
{
name: "increase the height of P1 from 120 to 123",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false),
args: args{"P1", 123, func(err error, peerId p2p.ID) {}},
fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, false),
args: testPeer{"P1", 123},
maxHeightWanted: int64(123),
},
{
name: "decrease the height of P1 from 120 to 110",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false),
args: args{"P1", 110, func(err error, peerId p2p.ID) {}},
fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, false),
args: testPeer{"P1", 110},
maxHeightWanted: int64(110),
},
{
name: "decrease the height of P1 from 120 to 90",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false),
args: args{"P1", 90, func(err error, peerId p2p.ID) {}},
fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, false),
args: testPeer{"P1", 90},
delWanted: true,
errWanted: errPeerTooShort,
maxHeightWanted: int64(0),
@ -134,10 +165,11 @@ func TestBlockPoolUpdatePeer(t *testing.T) {
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
toBcR: testBcR,
}
beforePool := poolCopy(pool)
err := pool.updatePeer(tt.args.peerID, tt.args.height, tt.args.errFunc)
err := pool.updatePeer(tt.args.id, tt.args.height)
if err != tt.errWanted {
t.Errorf("blockPool.updatePeer() error = %v, wantErr %v", err, tt.errWanted)
}
@ -161,21 +193,21 @@ func TestBlockPoolUpdatePeer(t *testing.T) {
}
// both add and update
assert.Equal(t, pool.peers[tt.args.peerID].height, tt.args.height)
assert.Equal(t, pool.peers[tt.args.id].height, tt.args.height)
assert.Equal(t, tt.maxHeightWanted, pool.maxPeerHeight)
})
}
}
func TestBlockPoolRemovePeer(t *testing.T) {
func TestBlockPoolRemoveEmptyPeer(t *testing.T) {
testBcR := newTestBcR()
type args struct {
peerID p2p.ID
err error
}
l := log.TestingLogger()
tests := []struct {
name string
fields fields
@ -184,25 +216,25 @@ func TestBlockPoolRemovePeer(t *testing.T) {
}{
{
name: "attempt to delete non-existing peer",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false),
fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, false),
args: args{"P99", nil},
maxHeightWanted: int64(120),
},
{
name: "delete the only peer",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false),
fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, false),
args: args{"P1", nil},
maxHeightWanted: int64(0),
},
{
name: "delete shorter of two peers",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 100}, {"P2", 120}}, false),
name: "delete the shortest of two peers",
fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, false),
args: args{"P1", nil},
maxHeightWanted: int64(120),
},
{
name: "delete taller of two peers",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 100}, {"P2", 120}}, false),
name: "delete the tallest of two peers",
fields: makeUpdateFields(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, false),
args: args{"P2", nil},
maxHeightWanted: int64(100),
},
@ -225,9 +257,8 @@ func TestBlockPoolRemovePeer(t *testing.T) {
}
}
func TestBlockPoolRemoveShortPeers(t *testing.T) {
l := log.TestingLogger()
func TestBlockPoolRemoveShortEmptyPeers(t *testing.T) {
testBcR := newTestBcR()
tests := []struct {
name string
@ -237,23 +268,23 @@ func TestBlockPoolRemoveShortPeers(t *testing.T) {
}{
{
name: "no short peers",
fields: makeUpdateFields(l, 100,
[]testPeer{{"P1", 100}, {"P2", 110}, {"P3", 120}},
fields: makeUpdateFields(testBcR, 100,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 110}, {id: "P3", height: 120}},
false),
maxHeightWanted: int64(120),
noChange: true,
},
{
name: "one short peers",
fields: makeUpdateFields(l, 100,
[]testPeer{{"P1", 100}, {"P2", 90}, {"P3", 120}},
fields: makeUpdateFields(testBcR, 100,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 90}, {id: "P3", height: 120}},
false),
maxHeightWanted: int64(120),
},
{
name: "all short peers",
fields: makeUpdateFields(l, 100,
[]testPeer{{"P1", 90}, {"P2", 91}, {"P3", 92}},
fields: makeUpdateFields(testBcR, 100,
[]bpPeer{{id: "P1", height: 90}, {id: "P2", height: 91}, {id: "P3", height: 92}},
false),
maxHeightWanted: int64(0),
},
@ -288,6 +319,77 @@ func TestBlockPoolRemoveShortPeers(t *testing.T) {
}
}
func TestBlockPoolSendRequestBatch(t *testing.T) {
testBcR := newTestBcR()
tests := []struct {
name string
fields fields
maxRequestsPerPeer int32
expRequests map[int64]bool
expPeerResults []testPeerResult
expNumPending int32
}{
{
name: "one peer - send up to maxRequestsPerPeer block requests",
fields: makeUpdateFields(testBcR, 10, []bpPeer{{id: "P1", height: 100}}, false),
maxRequestsPerPeer: 2,
expRequests: map[int64]bool{10: true, 11: true},
expPeerResults: []testPeerResult{{id: "P1", height: 100, numPending: 2, blocks: map[int64]*types.Block{10: nil, 11: nil}}},
expNumPending: 2,
},
{
name: "n peers - send n*maxRequestsPerPeer block requests",
fields: makeUpdateFields(testBcR, 10, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, false),
maxRequestsPerPeer: 2,
expRequests: map[int64]bool{10: true, 11: true},
expPeerResults: []testPeerResult{
{id: "P1", height: 100, numPending: 2, blocks: map[int64]*types.Block{10: nil, 11: nil}},
{id: "P2", height: 100, numPending: 2, blocks: map[int64]*types.Block{12: nil, 13: nil}}},
expNumPending: 4,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resetPoolTestResults()
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
requests: make(map[int64]bool),
height: tt.fields.height,
nextRequestHeight: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
toBcR: testBcR,
}
maxRequestsPerPeer = int32(tt.maxRequestsPerPeer)
//beforePool := poolCopy(pool)
err := pool.makeNextRequests(10)
assert.Nil(t, err)
assert.Equal(t, tt.expNumPending, pool.numPending)
assert.Equal(t, testResults.numRequestsSent, maxRequestsPerPeer*int32(len(pool.peers)))
for _, tPeer := range tt.expPeerResults {
peer := pool.peers[tPeer.id]
assert.NotNil(t, peer)
assert.Equal(t, tPeer.numPending, peer.numPending)
/*
fmt.Println("tt", tt.name, "peer", peer.id, "expected:", tPeer.blocks, "actual:", peer.blocks)
assert.Equal(t, tPeer.blocks, peer.blocks)
for h, tBl := range tPeer.blocks {
block := peer.blocks[h]
assert.Equal(t, tBl, block)
}
*/
}
assert.Equal(t, testResults.numRequestsSent, maxRequestsPerPeer*int32(len(pool.peers)))
})
}
}
func TestBlockPoolAddBlock(t *testing.T) {
type args struct {
peerID p2p.ID
@ -400,67 +502,3 @@ func TestBlockPoolProcessedCurrentHeightBlock(t *testing.T) {
})
}
}
func TestBlockPoolSendRequestBatch(t *testing.T) {
type args struct {
sendFunc func(peerID p2p.ID, height int64) error
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
}
if err := pool.sendRequestBatch(tt.args.sendFunc); (err != nil) != tt.wantErr {
t.Errorf("blockPool.sendRequestBatch() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestBlockPoolGetBestPeer(t *testing.T) {
type args struct {
height int64
}
tests := []struct {
name string
fields fields
args args
wantPeerId p2p.ID
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
}
gotPeerId, err := pool.getBestPeer(tt.args.height)
if (err != nil) != tt.wantErr {
t.Errorf("blockPool.getBestPeer() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotPeerId, tt.wantPeerId) {
t.Errorf("blockPool.getBestPeer() = %v, want %v", gotPeerId, tt.wantPeerId)
}
})
}
}

View File

@ -6,18 +6,29 @@ import (
"reflect"
"time"
"github.com/tendermint/go-amino"
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40)
trySyncIntervalMS = 10
trySendIntervalMS = 10
maxTotalMessages = 1000
// stop syncing when last block's time is
// within this much of the system time.
// stopSyncingDurationMinutes = 10
// ask for best height every 10s
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1
// NOTE: keep up to date with bcBlockResponseMessage
bcBlockResponseMessagePrefixSize = 4
@ -27,6 +38,11 @@ const (
bcBlockResponseMessageFieldKeySize
)
var (
maxRequestsPerPeer int32 = 20
maxNumPendingRequests int32 = 600
)
type consensusReactor interface {
// for when we switch from blockchain reactor and fast sync to
// the consensus machine
@ -42,21 +58,6 @@ func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}
type bReactorMsgFromFSM uint
// message types
const (
// message type events
errorMsg = iota + 1
//blockRequestMsg
//statusRequestMsg
)
type msgFromFSM struct {
msgType bReactorMsgFromFSM
error peerError
}
// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
p2p.BaseReactor
@ -72,10 +73,20 @@ type BlockchainReactor struct {
fsm *bReactorFSM
blocksSynced int
lastHundred time.Time
lastRate float64
msgFromFSMCh chan msgFromFSM
errorsCh chan peerError
messageForFSMCh chan bReactorMessageData // received in poolRoutine from Receive routine
}
type BlockRequest struct {
Height int64
PeerID p2p.ID
}
// bReactorMessageData structure is used by the reactor when sending messages to the FSM.
type bReactorMessageData struct {
event bReactorEvent
data bReactorEventData
}
// NewBlockchainReactor returns new reactor instance.
@ -87,12 +98,18 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl
store.Height()))
}
const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
messageForFSMCh := make(chan bReactorMessageData, capacity) // so we don't block in #Receive#pool.AddBlock
bcR := &BlockchainReactor{
initialState: state,
state: state,
blockExec: blockExec,
fastSync: fastSync,
store: store,
initialState: state,
state: state,
blockExec: blockExec,
fastSync: fastSync,
store: store,
errorsCh: errorsCh,
messageForFSMCh: messageForFSMCh,
}
fsm := NewFSM(store.Height()+1, bcR)
bcR.fsm = fsm
@ -109,7 +126,6 @@ func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
// OnStart implements cmn.Service.
func (bcR *BlockchainReactor) OnStart() error {
if bcR.fastSync {
bcR.fsm.start()
go bcR.poolRoutine()
}
return nil
@ -117,7 +133,7 @@ func (bcR *BlockchainReactor) OnStart() error {
// OnStop implements cmn.Service.
func (bcR *BlockchainReactor) OnStop() {
bcR.fsm.stop()
bcR.Stop()
}
// GetChannels implements Reactor
@ -143,16 +159,11 @@ func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
// bcStatusResponseMessage from the peer and call pool.SetPeerHeight
}
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
bcR.fsm.RemovePeer(peer.ID())
}
// respondToPeer loads a block and sends it to the requesting peer,
// if we have it. Otherwise, we'll respond saying we don't have it.
// According to the Tendermint spec, if all nodes are honest,
// no node should be requesting for a block that's non-existent.
func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage,
func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcBlockRequestMessage,
src p2p.Peer) (queued bool) {
block := bcR.store.LoadBlock(msg.Height)
@ -167,6 +178,31 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage,
return src.TrySend(BlockchainChannel, msgBytes)
}
func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcStatusRequestMessage, src p2p.Peer) (queued bool) {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()})
return src.TrySend(BlockchainChannel, msgBytes)
}
func (bcR *BlockchainReactor) sendMessageToFSMAsync(msg bReactorMessageData) {
bcR.Logger.Error("send message to FSM for processing", "msg", msg.String())
bcR.messageForFSMCh <- msg
}
func (bcR *BlockchainReactor) sendRemovePeerToFSM(peerID p2p.ID) {
msgData := bReactorMessageData{
event: peerRemoveEv,
data: bReactorEventData{
peerId: peerID,
},
}
bcR.sendMessageToFSMAsync(msgData)
}
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
bcR.sendRemovePeerToFSM(peer.ID())
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
@ -186,9 +222,18 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
switch msg := msg.(type) {
case *bcBlockRequestMessage:
if queued := bcR.respondToPeer(msg, src); !queued {
if queued := bcR.sendBlockToPeer(msg, src); !queued {
// Unfortunately not queued since the queue is full.
bcR.Logger.Error("Could not send block message to peer", "src", src, "height", msg.Height)
}
case *bcStatusRequestMessage:
// Send peer our state.
if queued := bcR.sendStatusResponseToPeer(msg, src); !queued {
// Unfortunately not queued since the queue is full.
bcR.Logger.Error("Could not send status message to peer", "src", src)
}
case *bcBlockResponseMessage:
msgData := bReactorMessageData{
event: blockResponseEv,
@ -198,15 +243,8 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
length: len(msgBytes),
},
}
sendMessageToFSM(bcR.fsm, msgData)
bcR.sendMessageToFSMAsync(msgData)
case *bcStatusRequestMessage:
// Send peer our state.
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()})
queued := src.TrySend(BlockchainChannel, msgBytes)
if !queued {
// sorry
}
case *bcStatusResponseMessage:
// Got a peer status. Unverified.
msgData := bReactorMessageData{
@ -217,14 +255,141 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
length: len(msgBytes),
},
}
sendMessageToFSM(bcR.fsm, msgData)
bcR.sendMessageToFSMAsync(msgData)
default:
bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg)))
}
}
func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Block) error {
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *BlockchainReactor) poolRoutine() {
bcR.fsm.start()
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
trySendTicker := time.NewTicker(trySendIntervalMS * time.Millisecond)
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
lastHundred := time.Now()
lastRate := 0.0
doProcessCh := make(chan struct{}, 1)
doSendCh := make(chan struct{}, 1)
FOR_LOOP:
for {
select {
case <-trySendTicker.C: // chan time
select {
case doSendCh <- struct{}{}:
default:
}
continue FOR_LOOP
case <-doSendCh:
msgData := bReactorMessageData{
event: makeRequestsEv,
data: bReactorEventData{
maxNumRequests: maxNumPendingRequests,
},
}
_ = sendMessageToFSMSync(bcR.fsm, msgData)
case err := <-bcR.errorsCh:
bcR.reportPeerErrorToSwitch(err.err, err.peerID)
case <-statusUpdateTicker.C:
// Ask for status updates.
go bcR.sendStatusRequest()
case <-switchToConsensusTicker.C:
height, numPending, maxPeerHeight := bcR.fsm.pool.getStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "maxPeerHeight", maxPeerHeight,
"outbound", outbound, "inbound", inbound)
if bcR.fsm.isCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
bcR.fsm.stop()
bcR.switchToConsensus()
break FOR_LOOP
}
case <-trySyncTicker.C: // chan time
select {
case doProcessCh <- struct{}{}:
default:
}
//continue FOR_LOOP
case <-doProcessCh:
err := bcR.processBlocksFromPoolRoutine()
if err == errMissingBlocks {
continue FOR_LOOP
}
// Notify FSM of block processing result.
msgData := bReactorMessageData{
event: processedBlockEv,
data: bReactorEventData{
err: err,
},
}
_ = sendMessageToFSMSync(bcR.fsm, msgData)
if err == errBlockVerificationFailure {
continue FOR_LOOP
}
doProcessCh <- struct{}{}
bcR.blocksSynced++
if bcR.blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.fsm.pool.height,
"max_peer_height", bcR.fsm.pool.getMaxPeerHeight(), "blocks/s", lastRate)
lastHundred = time.Now()
}
continue FOR_LOOP
case msg := <-bcR.messageForFSMCh:
_ = sendMessageToFSMSync(bcR.fsm, msg)
case <-bcR.Quit():
break FOR_LOOP
}
}
}
func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID) {
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, err)
}
}
// Called by FSM and pool:
// - pool calls when it detects slow peer or when peer times out
// - FSM calls when:
// - processing a block (addBlock) fails
// - BCR process of block reports failure to FSM, FSM sends back the peers of first and second
func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
bcR.errorsCh <- peerError{err, peerID}
}
func (bcR *BlockchainReactor) processBlocksFromPoolRoutine() error {
firstBP, secondBP, err := bcR.fsm.pool.getNextTwoBlocks()
if err != nil {
// We need both to sync the first block.
return err
}
first := firstBP.block
second := secondBP.block
chainID := bcR.initialState.ChainID
@ -235,7 +400,7 @@ func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Bl
// NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
err := bcR.state.Validators.VerifyCommit(
err = bcR.state.Validators.VerifyCommit(
chainID, firstID, first.Height, second.LastCommit)
if err != nil {
@ -245,50 +410,15 @@ func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Bl
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
// get the hash without persisting the state
// Get the hash without persisting the state.
bcR.state, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
bcR.blocksSynced++
if bcR.blocksSynced%100 == 0 {
bcR.lastRate = 0.9*bcR.lastRate + 0.1*(100/time.Since(bcR.lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.fsm.pool.height,
"max_peer_height", bcR.fsm.pool.getMaxPeerHeight(), "blocks/s", bcR.lastRate)
bcR.lastHundred = time.Now()
}
return nil
}
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *BlockchainReactor) poolRoutine() {
for {
select {
case fromFSM := <-bcR.msgFromFSMCh:
switch fromFSM.msgType {
case errorMsg:
peer := bcR.Switch.Peers().Get(fromFSM.error.peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, fromFSM.error.err)
}
default:
}
}
}
}
// TODO - send the error on msgFromFSMCh to process it above
func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, err)
}
}
func (bcR *BlockchainReactor) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) {
if timer == nil {
timer = time.AfterFunc(timeout, f)
@ -298,10 +428,9 @@ func (bcR *BlockchainReactor) resetStateTimer(name string, timer *time.Timer, ti
}
// BroadcastStatusRequest broadcasts `BlockStore` height.
func (bcR *BlockchainReactor) sendStatusRequest() error {
func (bcR *BlockchainReactor) sendStatusRequest() {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.store.Height()})
bcR.Switch.Broadcast(BlockchainChannel, msgBytes)
return nil
}
// BlockRequest sends `BlockRequest` height.
@ -326,7 +455,7 @@ func (bcR *BlockchainReactor) switchToConsensus() {
if ok {
conR.SwitchToConsensus(bcR.state, bcR.blocksSynced)
} else {
// should only happen during testing
// Should only happen during testing.
}
}

View File

@ -10,14 +10,10 @@ import (
"github.com/tendermint/tendermint/types"
)
var (
// should be >= 2
maxRequestBatchSize = 40
)
// Blockchain Reactor State
type bReactorFSMState struct {
name string
// called when transitioning out of current state
handle func(*bReactorFSM, bReactorEvent, bReactorEventData) (next *bReactorFSMState, err error)
// called when entering the state
@ -32,39 +28,56 @@ func (s *bReactorFSMState) String() string {
return s.name
}
// Interface used by FSM for sending Block and Status requests,
// informing of peer errors and state timeouts
// Implemented by BlockchainReactor and tests
type bcRMessageInterface interface {
sendStatusRequest()
sendBlockRequest(peerID p2p.ID, height int64) error
sendPeerError(err error, peerID p2p.ID)
resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func())
switchToConsensus()
}
// Blockchain Reactor State Machine
type bReactorFSM struct {
logger log.Logger
startTime time.Time
state *bReactorFSMState
pool *blockPool
pool *blockPool
// channel to receive messages
messageCh chan bReactorMessageData
processSignalActive bool
// interface used to send StatusRequest, BlockRequest, errors
bcr sendMessage
// interface used to call the Blockchain reactor to send StatusRequest, BlockRequest, reporting errors, etc.
toBcR bcRMessageInterface
}
// bReactorEventData is part of the message sent by the reactor to the FSM and used by the state handlers
// bReactorEventData is part of the message sent by the reactor to the FSM and used by the state handlers.
type bReactorEventData struct {
peerId p2p.ID
err error // for peer error: timeout, slow,
height int64 // for status response
block *types.Block // for block response
stateName string // for state timeout events
length int // for block response to detect slow peers
peerId p2p.ID
err error // for peer error: timeout, slow; for processed block event if error occurred
height int64 // for status response; for processed block event
block *types.Block // for block response
stateName string // for state timeout events
length int // for block response event, length of received block, used to detect slow peers
maxNumRequests int32 // for request needed event, maximum number of pending requests
}
// bReactorMessageData structure is used by the reactor when sending messages to the FSM.
type bReactorMessageData struct {
event bReactorEvent
data bReactorEventData
}
// Blockchain Reactor Events (the input to the state machine)
type bReactorEvent uint
const (
// message type events
startFSMEv = iota + 1
statusResponseEv
blockResponseEv
processedBlockEv
makeRequestsEv
stopFSMEv
// other events
peerRemoveEv = iota + 256
stateTimeoutEv
)
func (msg *bReactorMessageData) String() string {
var dataStr string
@ -76,14 +89,17 @@ func (msg *bReactorMessageData) String() string {
dataStr = fmt.Sprintf("peer: %v height: %v", msg.data.peerId, msg.data.height)
case blockResponseEv:
dataStr = fmt.Sprintf("peer: %v block.height: %v lenght: %v", msg.data.peerId, msg.data.block.Height, msg.data.length)
case tryProcessBlockEv:
dataStr = ""
case processedBlockEv:
dataStr = fmt.Sprintf("block processing returned following error: %v", msg.data.err)
case makeRequestsEv:
dataStr = fmt.Sprintf("new requests needed")
case stopFSMEv:
dataStr = ""
case peerErrEv:
dataStr = fmt.Sprintf("peer: %v err: %v", msg.data.peerId, msg.data.err)
case peerRemoveEv:
dataStr = fmt.Sprintf("peer: %v is being removed by the switch", msg.data.peerId)
case stateTimeoutEv:
dataStr = fmt.Sprintf("state: %v", msg.data.stateName)
default:
dataStr = fmt.Sprintf("cannot interpret message data")
return "event unknown"
@ -92,22 +108,6 @@ func (msg *bReactorMessageData) String() string {
return fmt.Sprintf("event: %v %v", msg.event, dataStr)
}
// Blockchain Reactor Events (the input to the state machine).
type bReactorEvent uint
const (
// message type events
startFSMEv = iota + 1
statusResponseEv
blockResponseEv
tryProcessBlockEv
stopFSMEv
// other events
peerErrEv = iota + 256
stateTimeoutEv
)
func (ev bReactorEvent) String() string {
switch ev {
case startFSMEv:
@ -116,12 +116,14 @@ func (ev bReactorEvent) String() string {
return "statusResponseEv"
case blockResponseEv:
return "blockResponseEv"
case tryProcessBlockEv:
return "tryProcessBlockEv"
case processedBlockEv:
return "processedBlockEv"
case makeRequestsEv:
return "makeRequestsEv"
case stopFSMEv:
return "stopFSMEv"
case peerErrEv:
return "peerErrEv"
case peerRemoveEv:
return "peerRemoveEv"
case stateTimeoutEv:
return "stateTimeoutEv"
default:
@ -140,26 +142,22 @@ var (
// state timers
var (
waitForPeerTimeout = 20 * time.Second
waitForBlockTimeout = 30 * time.Second // > peerTimeout which is 15 sec
waitForPeerTimeout = 2 * time.Second
)
// errors
var (
// errors
errInvalidEvent = errors.New("invalid event in current state")
errNoErrorFinished = errors.New("FSM is finished")
errInvalidEvent = errors.New("invalid event in current state")
errNoPeerResponse = errors.New("FSM timed out on peer response")
errNoPeerFoundForRequest = errors.New("cannot use peer")
errBadDataFromPeer = errors.New("received from wrong peer or bad block")
errMissingBlocks = errors.New("missing blocks")
errBlockVerificationFailure = errors.New("block verification failure, redo")
errNilPeerForBlockRequest = errors.New("nil peer for block request")
errSendQueueFull = errors.New("block request not made, send-queue is full")
errPeerTooShort = errors.New("peer height too low, peer was either not added " +
"or removed after status update")
errSwitchPeerErr = errors.New("switch detected peer error")
errSlowPeer = errors.New("peer is not sending us data fast enough")
errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added")
errSlowPeer = errors.New("peer is not sending us data fast enough")
errNoPeerFoundForHeight = errors.New("could not found peer for block request")
)
func init() {
@ -169,14 +167,13 @@ func init() {
switch ev {
case startFSMEv:
// Broadcast Status message. Currently doesn't return non-nil error.
_ = fsm.bcr.sendStatusRequest()
fsm.toBcR.sendStatusRequest()
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForPeer, nil
case stopFSMEv:
// cleanup
return finished, errNoErrorFinished
default:
@ -189,13 +186,13 @@ func init() {
name: "waitForPeer",
timeout: waitForPeerTimeout,
enter: func(fsm *bReactorFSM) {
// stop when leaving the state
// Stop when leaving the state.
fsm.resetStateTimer(waitForPeer)
},
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) {
switch ev {
case stateTimeoutEv:
// no statusResponse received from any peer
// There was no statusResponse received from any peer.
// Should we send status request again?
if fsm.state.timer != nil {
fsm.state.timer.Stop()
@ -203,26 +200,14 @@ func init() {
return finished, errNoPeerResponse
case statusResponseEv:
// update peer
if err := fsm.pool.updatePeer(data.peerId, data.height, fsm.processPeerError); err != nil {
if err := fsm.pool.updatePeer(data.peerId, data.height); err != nil {
if len(fsm.pool.peers) == 0 {
return waitForPeer, err
}
}
// send first block requests
err := fsm.pool.sendRequestBatch(fsm.bcr.sendBlockRequest)
if err != nil {
// wait for more peers or state timeout
return waitForPeer, err
}
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, nil
case stopFSMEv:
// cleanup
return finished, errNoErrorFinished
default:
@ -232,106 +217,65 @@ func init() {
}
waitForBlock = &bReactorFSMState{
name: "waitForBlock",
timeout: waitForBlockTimeout,
enter: func(fsm *bReactorFSM) {
// stop when leaving the state or receiving a block
fsm.resetStateTimer(waitForBlock)
},
name: "waitForBlock",
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) {
switch ev {
case stateTimeoutEv:
// no blockResponse
// Should we send status request again? Switch to consensus?
// Note that any unresponsive peers have been already removed by their timer expiry handler.
return finished, errNoPeerResponse
case statusResponseEv:
err := fsm.pool.updatePeer(data.peerId, data.height, fsm.processPeerError)
err := fsm.pool.updatePeer(data.peerId, data.height)
if len(fsm.pool.peers) == 0 {
_ = fsm.bcr.sendStatusRequest()
fsm.toBcR.sendStatusRequest()
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForPeer, err
}
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, err
case blockResponseEv:
fsm.logger.Debug("blockResponseEv", "H", data.block.Height)
err := fsm.pool.addBlock(data.peerId, data.block, data.length)
if err != nil {
// unsolicited, from different peer, already have it..
// A block was received that was unsolicited, from unexpected peer, or that we already have it.
// Ignore block, remove peer and send error to switch.
fsm.pool.removePeer(data.peerId, err)
// ignore block
// send error to switch
if err != nil {
fsm.bcr.sendPeerError(err, data.peerId)
fsm.toBcR.sendPeerError(err, data.peerId)
}
fsm.processSignalActive = false
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, err
}
fsm.sendSignalToProcessBlock()
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, nil
case tryProcessBlockEv:
fsm.logger.Debug("FSM blocks", "blocks", fsm.pool.blocks, "fsm_height", fsm.pool.height)
// process block, it detects errors and deals with them
fsm.processBlock()
// processed block, check if we are done
if fsm.pool.reachedMaxHeight() {
// TODO should we wait for more status responses in case a high peer is slow?
fsm.logger.Info("Switching to consensus!!!!")
fsm.bcr.switchToConsensus()
return finished, nil
}
// get other block(s)
err := fsm.pool.sendRequestBatch(fsm.bcr.sendBlockRequest)
if err != nil {
// TBD on what to do here...
// wait for more peers or state timeout
}
fsm.sendSignalToProcessBlock()
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, err
case peerErrEv:
// This event is sent by:
// - the switch to report disconnected and errored peers
// - when the FSM pool peer times out
case processedBlockEv:
fsm.logger.Debug("processedBlockEv", "err", data.err)
first, second, _ := fsm.pool.getNextTwoBlocks()
if data.err != nil {
fsm.logger.Error("process blocks returned error", "err", data.err, "first", first.block.Height, "second", second.block.Height)
fsm.logger.Error("send peer error for", "peer", first.peer.id)
fsm.toBcR.sendPeerError(data.err, first.peer.id)
fsm.logger.Error("send peer error for", "peer", second.peer.id)
fsm.toBcR.sendPeerError(data.err, second.peer.id)
fsm.pool.invalidateFirstTwoBlocks(data.err)
} else {
fsm.pool.processedCurrentHeightBlock()
if fsm.pool.reachedMaxHeight() {
fsm.stop()
return finished, nil
}
}
return waitForBlock, data.err
case peerRemoveEv:
// This event is sent by the switch to remove disconnected and errored peers.
fsm.pool.removePeer(data.peerId, data.err)
if data.err != nil && data.err != errSwitchPeerErr {
// not sent by the switch so report it
fsm.bcr.sendPeerError(data.err, data.peerId)
}
err := fsm.pool.sendRequestBatch(fsm.bcr.sendBlockRequest)
if err != nil {
// TBD on what to do here...
// wait for more peers or state timeout
}
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, nil
case makeRequestsEv:
err := fsm.makeNextRequests(data.maxNumRequests)
return waitForBlock, err
case stopFSMEv:
// cleanup
return finished, errNoErrorFinished
default:
@ -343,69 +287,45 @@ func init() {
finished = &bReactorFSMState{
name: "finished",
enter: func(fsm *bReactorFSM) {
// cleanup
fsm.cleanup()
},
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) {
return nil, nil
},
}
}
func NewFSM(height int64, bcr sendMessage) *bReactorFSM {
messageCh := make(chan bReactorMessageData, maxTotalMessages)
func NewFSM(height int64, toBcR bcRMessageInterface) *bReactorFSM {
return &bReactorFSM{
state: unknown,
pool: newBlockPool(height),
bcr: bcr,
messageCh: messageCh,
startTime: time.Now(),
pool: newBlockPool(height, toBcR),
toBcR: toBcR,
}
}
func sendMessageToFSM(fsm *bReactorFSM, msg bReactorMessageData) {
fsm.logger.Debug("send message to FSM", "msg", msg.String())
fsm.messageCh <- msg
}
func (fsm *bReactorFSM) setLogger(l log.Logger) {
fsm.logger = l
fsm.pool.setLogger(l)
}
// starts the FSM go routine
func sendMessageToFSMSync(fsm *bReactorFSM, msg bReactorMessageData) error {
err := fsm.handle(&msg)
return err
}
// Starts the FSM goroutine.
func (fsm *bReactorFSM) start() {
go fsm.startRoutine()
fsm.startTime = time.Now()
_ = sendMessageToFSMSync(fsm, bReactorMessageData{
event: startFSMEv,
})
}
// stops the FSM go routine
// Stops the FSM goroutine.
func (fsm *bReactorFSM) stop() {
msg := bReactorMessageData{
_ = sendMessageToFSMSync(fsm, bReactorMessageData{
event: stopFSMEv,
}
sendMessageToFSM(fsm, msg)
}
// start the FSM
func (fsm *bReactorFSM) startRoutine() {
_ = fsm.handle(&bReactorMessageData{event: startFSMEv})
forLoop:
for {
select {
case msg := <-fsm.messageCh:
fsm.logger.Debug("FSM Received message", "msg", msg.String())
_ = fsm.handle(&msg)
if msg.event == stopFSMEv {
break forLoop
}
// TODO - stop also on some errors returned by handle
default:
}
}
})
}
// handle processes messages and events sent to the FSM.
@ -432,8 +352,6 @@ func (fsm *bReactorFSM) transition(next *bReactorFSMState) {
if next == nil {
return
}
fsm.logger.Debug("changes state: ", "old", fsm.state.name, "new", next.name)
if fsm.state != next {
fsm.state = next
if next.enter != nil {
@ -442,17 +360,6 @@ func (fsm *bReactorFSM) transition(next *bReactorFSMState) {
}
}
// Interface for sending Block and Status requests
// Implemented by BlockchainReactor and tests
type sendMessage interface {
sendStatusRequest() error
sendBlockRequest(peerID p2p.ID, height int64) error
sendPeerError(err error, peerID p2p.ID)
processBlocks(first *types.Block, second *types.Block) error
resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func())
switchToConsensus()
}
// FSM state timeout handler
func (fsm *bReactorFSM) sendStateTimeoutEvent(stateName string) {
// Check that the timeout is for the state we are currently in to prevent wrong transitions.
@ -463,84 +370,35 @@ func (fsm *bReactorFSM) sendStateTimeoutEvent(stateName string) {
stateName: stateName,
},
}
sendMessageToFSM(fsm, msg)
_ = sendMessageToFSMSync(fsm, msg)
}
}
// This is called when entering an FSM state in order to detect lack of progress in the state machine.
// Note the use of the 'bcr' interface to facilitate testing without timer running.
// Called when entering an FSM state in order to detect lack of progress in the state machine.
// Note the use of the 'bcr' interface to facilitate testing without timer expiring.
func (fsm *bReactorFSM) resetStateTimer(state *bReactorFSMState) {
fsm.bcr.resetStateTimer(state.name, state.timer, state.timeout, func() {
fsm.toBcR.resetStateTimer(state.name, state.timer, state.timeout, func() {
fsm.sendStateTimeoutEvent(state.name)
})
}
// called by the switch on peer error
func (fsm *bReactorFSM) RemovePeer(peerID p2p.ID) {
fsm.logger.Info("Switch removes peer", "peer", peerID, "fsm_height", fsm.pool.height)
fsm.processPeerError(errSwitchPeerErr, peerID)
func (fsm *bReactorFSM) isCaughtUp() bool {
// Some conditions to determine if we're caught up.
// Ensures we've either received a block or waited some amount of time,
// and that we're synced to the highest known height.
// Note we use maxPeerHeight - 1 because to sync block H requires block H+1
// to verify the LastCommit.
receivedBlockOrTimedOut := fsm.pool.height > 0 || time.Since(fsm.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := fsm.pool.maxPeerHeight == 0 || fsm.pool.height >= (fsm.pool.maxPeerHeight-1) || fsm.state == finished
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
return isCaughtUp
}
func (fsm *bReactorFSM) sendSignalToProcessBlock() {
_, _, err := fsm.pool.getNextTwoBlocks()
if err != nil {
fsm.logger.Debug("No need to send signal, blocks missing")
return
}
fsm.logger.Debug("Send signal to process blocks")
if fsm.processSignalActive {
fsm.logger.Debug("..already sent")
return
}
msgData := bReactorMessageData{
event: tryProcessBlockEv,
data: bReactorEventData{},
}
sendMessageToFSM(fsm, msgData)
fsm.processSignalActive = true
func (fsm *bReactorFSM) cleanup() {
// TODO
}
// Processes block at height H = fsm.height. Expects both H and H+1 to be available
func (fsm *bReactorFSM) processBlock() {
first, second, err := fsm.pool.getNextTwoBlocks()
if err != nil {
fsm.processSignalActive = false
return
}
fsm.logger.Debug("process blocks", "first", first.block.Height, "second", second.block.Height)
fsm.logger.Debug("FSM blocks", "blocks", fsm.pool.blocks)
if err = fsm.bcr.processBlocks(first.block, second.block); err != nil {
fsm.logger.Error("process blocks returned error", "err", err, "first", first.block.Height, "second", second.block.Height)
fsm.logger.Error("FSM blocks", "blocks", fsm.pool.blocks)
fsm.pool.invalidateFirstTwoBlocks(err)
fsm.bcr.sendPeerError(err, first.peerId)
fsm.bcr.sendPeerError(err, second.peerId)
} else {
fsm.pool.processedCurrentHeightBlock()
}
fsm.processSignalActive = false
}
func (fsm *bReactorFSM) IsFinished() bool {
return fsm.state == finished
}
// called from:
// - the switch from its go routing
// - when peer times out from the timer go routine.
// Send message to FSM
func (fsm *bReactorFSM) processPeerError(err error, peerID p2p.ID) {
msgData := bReactorMessageData{
event: peerErrEv,
data: bReactorEventData{
err: err,
peerId: peerID,
},
}
sendMessageToFSM(fsm, msgData)
func (fsm *bReactorFSM) makeNextRequests(maxNumPendingRequests int32) error {
return fsm.pool.makeNextRequests(maxNumPendingRequests)
}

View File

@ -1,21 +1,20 @@
package blockchain_new
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
"testing"
"time"
)
var (
failSendStatusRequest bool
failSendBlockRequest bool
numStatusRequests int
numBlockRequests int
numBlockRequests int32
)
type lastBlockRequestT struct {
@ -72,13 +71,16 @@ func newTestReactor() *testReactor {
// WIP
func TestFSMTransitionSequences(t *testing.T) {
maxRequestBatchSize = 2
maxRequestsPerPeer = 2
fsmTransitionSequenceTests := [][]fsmStepTestValues{
{
{currentState: "unknown", event: startFSMEv, shouldSendStatusReq: true,
expectedState: "waitForPeer"},
{currentState: "waitForPeer", event: statusResponseEv,
data: bReactorEventData{peerId: "P1", height: 10},
data: bReactorEventData{peerId: "P1", height: 10},
expectedState: "waitForBlock"},
{currentState: "waitForBlock", event: makeRequestsEv,
data: bReactorEventData{maxNumRequests: maxNumPendingRequests},
blockReqIncreased: true,
expectedState: "waitForBlock"},
},
@ -98,6 +100,7 @@ func TestFSMTransitionSequences(t *testing.T) {
oldNumBlockRequests := numBlockRequests
_ = sendEventToFSM(testBcR.fsm, step.event, step.data)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
} else {
@ -105,7 +108,7 @@ func TestFSMTransitionSequences(t *testing.T) {
}
if step.blockReqIncreased {
assert.Equal(t, oldNumBlockRequests+maxRequestBatchSize, numBlockRequests)
assert.Equal(t, oldNumBlockRequests+maxRequestsPerPeer, numBlockRequests)
} else {
assert.Equal(t, oldNumBlockRequests, numBlockRequests)
}
@ -116,7 +119,7 @@ func TestFSMTransitionSequences(t *testing.T) {
}
func TestReactorFSMBasic(t *testing.T) {
maxRequestBatchSize = 2
maxRequestsPerPeer = 2
// Create test reactor
testBcR := newTestReactor()
@ -134,15 +137,19 @@ func TestReactorFSMBasic(t *testing.T) {
peerID := p2p.ID(cmn.RandStr(12))
sendStatusResponse2(fsm, peerID, 10)
if err := fsm.handle(&bReactorMessageData{
event: makeRequestsEv,
data: bReactorEventData{maxNumRequests: maxNumPendingRequests}}); err != nil {
}
// Check that FSM sends a block request message and...
assert.Equal(t, maxRequestBatchSize, numBlockRequests)
assert.Equal(t, maxRequestsPerPeer, numBlockRequests)
// ... the block request has the expected height
assert.Equal(t, int64(maxRequestBatchSize), lastBlockRequest.height)
assert.Equal(t, maxRequestsPerPeer, int32(len(fsm.pool.peers[peerID].blocks)))
assert.Equal(t, waitForBlock.name, fsm.state.name)
}
func TestReactorFSMPeerTimeout(t *testing.T) {
maxRequestBatchSize = 2
maxRequestsPerPeer = 2
resetTestValues()
peerTimeout = 20 * time.Millisecond
// Create and start the FSM
@ -159,10 +166,14 @@ func TestReactorFSMPeerTimeout(t *testing.T) {
sendStatusResponse(fsm, peerID, 10)
time.Sleep(5 * time.Millisecond)
if err := fsm.handle(&bReactorMessageData{
event: makeRequestsEv,
data: bReactorEventData{maxNumRequests: maxNumPendingRequests}}); err != nil {
}
// Check that FSM sends a block request message and...
assert.Equal(t, maxRequestBatchSize, numBlockRequests)
assert.Equal(t, maxRequestsPerPeer, numBlockRequests)
// ... the block request has the expected height and peer
assert.Equal(t, int64(maxRequestBatchSize), lastBlockRequest.height)
assert.Equal(t, maxRequestsPerPeer, int32(len(fsm.pool.peers[peerID].blocks)))
assert.Equal(t, peerID, lastBlockRequest.peerID)
// let FSM timeout on the block response message
@ -190,13 +201,9 @@ func (testR *testReactor) sendPeerError(err error, peerID p2p.ID) {
lastPeerError.err = err
}
func (testR *testReactor) sendStatusRequest() error {
func (testR *testReactor) sendStatusRequest() {
testR.logger.Info("Reactor received sendStatusRequest call from FSM")
numStatusRequests++
if failSendStatusRequest {
return errSendQueueFull
}
return nil
}
func (testR *testReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
@ -216,11 +223,6 @@ func (testR *testReactor) resetStateTimer(name string, timer *time.Timer, timeou
}
}
func (testR *testReactor) processBlocks(first *types.Block, second *types.Block) error {
testR.logger.Info("Reactor received processBlocks call from FSM", "first", first.Height, "second", second.Height)
return nil
}
func (testR *testReactor) switchToConsensus() {
testR.logger.Info("Reactor received switchToConsensus call from FSM")
@ -241,7 +243,7 @@ func sendStatusResponse(fsm *bReactorFSM, peerID p2p.ID, height int64) {
},
}
sendMessageToFSM(fsm, msgData)
_ = sendMessageToFSMSync(fsm, msgData)
}
func sendStatusResponse2(fsm *bReactorFSM, peerID p2p.ID, height int64) {

View File

@ -124,9 +124,10 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals
return BlockchainReactorPair{bcReactor, proxyApp}
}
func TestNoBlockResponse(t *testing.T) {
func TestFastSyncNoBlockResponse(t *testing.T) {
peerTimeout = 15 * time.Second
maxRequestBatchSize = 40
maxRequestsPerPeer = 20
maxNumPendingRequests = 100
config = cfg.ResetTestRoot("blockchain_new_reactor_test")
defer os.RemoveAll(config.RootDir)
@ -136,24 +137,19 @@ func TestNoBlockResponse(t *testing.T) {
reactorPairs := make([]BlockchainReactorPair, 2)
logger1 := log.TestingLogger()
reactorPairs[0] = newBlockchainReactor(logger1, genDoc, privVals, maxBlockHeight)
logger2 := log.TestingLogger()
reactorPairs[1] = newBlockchainReactor(logger2, genDoc, privVals, 0)
logger := log.TestingLogger()
reactorPairs[0] = newBlockchainReactor(logger, genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(logger, genDoc, privVals, 0)
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
moduleName := fmt.Sprintf("blockchain-%v", i)
reactorPairs[i].reactor.SetLogger(logger.With("module", moduleName))
return s
}, p2p.Connect2Switches)
addr0 := reactorPairs[0].reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr0)
reactorPairs[0].reactor.SetLogger(logger1.With("module", moduleName[:19]))
addr1 := reactorPairs[1].reactor.Switch.NodeInfo().ID()
moduleName = fmt.Sprintf("blockchain-%v", addr1)
reactorPairs[1].reactor.SetLogger(logger1.With("module", moduleName[:19]))
defer func() {
for _, r := range reactorPairs {
_ = r.reactor.Stop()
@ -172,11 +168,10 @@ func TestNoBlockResponse(t *testing.T) {
}
for {
if reactorPairs[1].reactor.fsm.IsFinished() {
time.Sleep(1 * time.Second)
if reactorPairs[1].reactor.fsm.isCaughtUp() {
break
}
time.Sleep(10 * time.Millisecond)
}
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())
@ -196,14 +191,17 @@ func TestNoBlockResponse(t *testing.T) {
// or without significant refactoring of the module.
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
func TestBadBlockStopsPeer(t *testing.T) {
func TestFastSyncBadBlockStopsPeer(t *testing.T) {
peerTimeout = 15 * time.Second
maxRequestBatchSize = 40
maxRequestsPerPeer = 20
maxNumPendingRequests = 20
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(500)
maxBlockHeight := int64(148)
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
defer func() {
@ -226,16 +224,12 @@ func TestBadBlockStopsPeer(t *testing.T) {
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
moduleName := fmt.Sprintf("blockchain-%v", i)
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName))
return s
}, p2p.Connect2Switches)
for i := 0; i < 4; i++ {
addr := reactorPairs[i].reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19]))
}
defer func() {
for _, r := range reactorPairs {
_ = r.reactor.Stop()
@ -244,11 +238,10 @@ func TestBadBlockStopsPeer(t *testing.T) {
}()
for {
if reactorPairs[3].reactor.fsm.IsFinished() || reactorPairs[3].reactor.Switch.Peers().Size() == 0 {
time.Sleep(1 * time.Second)
if reactorPairs[3].reactor.fsm.isCaughtUp() || reactorPairs[3].reactor.Switch.Peers().Size() == 0 {
break
}
time.Sleep(1 * time.Second)
}
//at this time, reactors[0-3] is the newest
@ -263,24 +256,21 @@ func TestBadBlockStopsPeer(t *testing.T) {
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
moduleName := fmt.Sprintf("blockchain-%v", len(reactorPairs)-1)
reactorPairs[len(reactorPairs)-1].reactor.SetLogger(lastLogger.With("module", moduleName))
return s
}, p2p.Connect2Switches)...)
addr := lastReactorPair.reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19]))
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
for {
if lastReactorPair.reactor.fsm.IsFinished() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
time.Sleep(1 * time.Second)
if lastReactorPair.reactor.fsm.isCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break
}
time.Sleep(1 * time.Second)
}
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
@ -307,29 +297,32 @@ func setupReactors(
switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
moduleName := fmt.Sprintf("blockchain-%v", i)
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName))
return s
}, p2p.Connect2Switches)
for i := 0; i < numReactors; i++ {
addr := reactorPairs[i].reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19]))
}
return reactorPairs, switches
}
// WIP - used for some scale testing, will remove
func TestFastSyncMultiNode(t *testing.T) {
peerTimeout = 15 * time.Second
numNodes := 8
maxHeight := int64(1000)
peerTimeout = 15 * time.Second
maxRequestBatchSize = 80
//numNodes := 20
//maxHeight := int64(10000)
maxRequestsPerPeer = 40
maxNumPendingRequests = 500
config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30)
start := time.Now()
reactorPairs, switches := setupReactors(numNodes, maxHeight, genDoc, privVals)
defer func() {
@ -339,16 +332,25 @@ func TestFastSyncMultiNode(t *testing.T) {
}
}()
outerFor:
for {
if reactorPairs[numNodes-1].reactor.fsm.IsFinished() || reactorPairs[numNodes-1].reactor.Switch.Peers().Size() == 0 {
break
i := 0
for i < numNodes {
if !reactorPairs[i].reactor.fsm.isCaughtUp() {
break
}
i++
}
if i == numNodes {
fmt.Println("SETUP FAST SYNC Duration", time.Since(start))
break outerFor
} else {
time.Sleep(1 * time.Second)
}
time.Sleep(1 * time.Second)
}
//at this time, reactors[0-3] are the newest
assert.Equal(t, numNodes-1, reactorPairs[1].reactor.Switch.Peers().Size())
assert.Equal(t, numNodes-1, reactorPairs[0].reactor.Switch.Peers().Size())
lastLogger := log.TestingLogger()
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0)
@ -356,29 +358,26 @@ func TestFastSyncMultiNode(t *testing.T) {
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
moduleName := fmt.Sprintf("blockchainTEST-%d", len(reactorPairs)-1)
reactorPairs[len(reactorPairs)-1].reactor.SetLogger(lastLogger.With("module", moduleName))
return s
}, p2p.Connect2Switches)...)
addr := lastReactorPair.reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19]))
start := time.Now()
start = time.Now()
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
for {
if lastReactorPair.reactor.fsm.IsFinished() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
time.Sleep(1 * time.Second)
if lastReactorPair.reactor.fsm.isCaughtUp() {
fmt.Println("FAST SYNC Duration", time.Since(start))
break
}
time.Sleep(1 * time.Second)
}
fmt.Println(time.Since(start))
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs))
assert.Equal(t, lastReactorPair.reactor.fsm.pool.getMaxPeerHeight(), lastReactorPair.reactor.fsm.pool.height)
}