switch fast sync to new implementation

This commit is contained in:
Anca Zamfir
2019-04-13 09:23:43 -04:00
parent 4d54cced43
commit 0c84d780d7
18 changed files with 2541 additions and 2541 deletions

View File

@ -8,7 +8,6 @@ import (
"time"
"github.com/stretchr/testify/assert"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tendermint/libs/common"
@ -56,7 +55,7 @@ func makeVote(header *types.Header, blockID types.BlockID, valset *types.Validat
BlockID: blockID,
}
privVal.SignVote(header.ChainID, vote)
_ = privVal.SignVote(header.ChainID, vote)
return vote
}
@ -109,7 +108,7 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals
thisBlock := makeBlock(blockHeight, state, lastCommit)
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{thisBlock.Hash(), thisParts.Header()}
blockID := types.BlockID{Hash: thisBlock.Hash(), PartsHeader: thisParts.Header()}
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
@ -126,7 +125,11 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals
}
func TestFastSyncNoBlockResponse(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
peerTimeout = 15 * time.Second
maxRequestsPerPeer = 20
maxNumPendingRequests = 100
config = cfg.ResetTestRoot("blockchain_new_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
@ -134,19 +137,23 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
reactorPairs := make([]BlockchainReactorPair, 2)
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
logger := log.TestingLogger()
reactorPairs[0] = newBlockchainReactor(logger, genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(logger, genDoc, privVals, 0)
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
moduleName := fmt.Sprintf("blockchain-%v", i)
reactorPairs[i].reactor.SetLogger(logger.With("module", moduleName))
return s
}, p2p.Connect2Switches)
defer func() {
for _, r := range reactorPairs {
r.reactor.Stop()
r.app.Stop()
_ = r.reactor.Stop()
_ = r.app.Stop()
}
}()
@ -161,11 +168,10 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
}
for {
if reactorPairs[1].reactor.pool.IsCaughtUp() {
time.Sleep(1 * time.Second)
if reactorPairs[1].reactor.fsm.isCaughtUp() {
break
}
time.Sleep(10 * time.Millisecond)
}
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())
@ -186,6 +192,12 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
func TestFastSyncBadBlockStopsPeer(t *testing.T) {
peerTimeout = 15 * time.Second
maxRequestsPerPeer = 20
maxNumPendingRequests = 400
numNodes := 4
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
@ -194,49 +206,59 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
defer func() {
otherChain.reactor.Stop()
otherChain.app.Stop()
_ = otherChain.reactor.Stop()
_ = otherChain.app.Stop()
}()
reactorPairs := make([]BlockchainReactorPair, 4)
reactorPairs := make([]BlockchainReactorPair, numNodes)
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
var logger = make([]log.Logger, numNodes)
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
for i := 0; i < numNodes; i++ {
logger[i] = log.TestingLogger()
height := int64(0)
if i == 0 {
height = maxBlockHeight
}
reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height)
}
switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
moduleName := fmt.Sprintf("blockchain-%v", i)
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName))
return s
}, p2p.Connect2Switches)
defer func() {
for _, r := range reactorPairs {
r.reactor.Stop()
r.app.Stop()
_ = r.reactor.Stop()
_ = r.app.Stop()
}
}()
for {
if reactorPairs[3].reactor.pool.IsCaughtUp() {
time.Sleep(1 * time.Second)
if reactorPairs[numNodes-1].reactor.fsm.isCaughtUp() || reactorPairs[numNodes-1].reactor.Switch.Peers().Size() == 0 {
break
}
time.Sleep(1 * time.Second)
}
//at this time, reactors[0-3] is the newest
assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size())
assert.Equal(t, numNodes-1, reactorPairs[1].reactor.Switch.Peers().Size())
//mark reactorPairs[3] is an invalid peer
reactorPairs[3].reactor.store = otherChain.reactor.store
reactorPairs[numNodes-1].reactor.store = otherChain.reactor.store
lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
lastLogger := log.TestingLogger()
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
moduleName := fmt.Sprintf("blockchain-%v", len(reactorPairs)-1)
reactorPairs[len(reactorPairs)-1].reactor.SetLogger(lastLogger.With("module", moduleName))
return s
}, p2p.Connect2Switches)...)
@ -246,16 +268,13 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
}
for {
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
time.Sleep(1 * time.Second)
if lastReactorPair.reactor.fsm.isCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break
}
time.Sleep(1 * time.Second)
}
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
assert.Equal(t, lastReactorPair.reactor.pool.maxPeerHeight, lastReactorPair.reactor.pool.height)
}
func setupReactors(
@ -279,26 +298,27 @@ func setupReactors(
switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
moduleName := fmt.Sprintf("blockchain-%v", i)
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName))
return s
}, p2p.Connect2Switches)
for i := 0; i < numReactors; i++ {
addr := reactorPairs[i].reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19]))
}
return reactorPairs, switches
}
// WIP - used for some scale testing, will remove
func TestFastSyncMultiNode(t *testing.T) {
peerTimeout = 15 * time.Second
numNodes := 8
maxHeight := int64(1000)
//numNodes := 20
//maxHeight := int64(10000)
maxRequestsPerPeer = 40
maxNumPendingRequests = 500
config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30)
@ -317,7 +337,7 @@ outerFor:
for {
i := 0
for i < numNodes {
if !reactorPairs[i].reactor.pool.IsCaughtUp() {
if !reactorPairs[i].reactor.fsm.isCaughtUp() {
break
}
i++
@ -339,14 +359,12 @@ outerFor:
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
moduleName := fmt.Sprintf("blockchainTEST-%d", len(reactorPairs)-1)
reactorPairs[len(reactorPairs)-1].reactor.SetLogger(lastLogger.With("module", moduleName))
return s
}, p2p.Connect2Switches)...)
addr := lastReactorPair.reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19]))
start = time.Now()
for i := 0; i < len(reactorPairs)-1; i++ {
@ -355,15 +373,14 @@ outerFor:
for {
time.Sleep(1 * time.Second)
if lastReactorPair.reactor.pool.IsCaughtUp() {
if lastReactorPair.reactor.fsm.isCaughtUp() {
fmt.Println("FAST SYNC Duration", time.Since(start))
break
}
}
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs))
assert.Equal(t, lastReactorPair.reactor.pool.maxPeerHeight, lastReactorPair.reactor.pool.height)
assert.Equal(t, lastReactorPair.reactor.fsm.pool.getMaxPeerHeight(), lastReactorPair.reactor.fsm.pool.height)
}
//----------------------------------------------