This commit is contained in:
Anca Zamfir 2019-03-26 14:51:37 +01:00
parent 6428cba446
commit fd9b10f6fe
8 changed files with 274 additions and 77 deletions

View File

@ -1,6 +1,7 @@
package blockchain package blockchain
import ( import (
"fmt"
"os" "os"
"sort" "sort"
"testing" "testing"
@ -255,6 +256,95 @@ func TestBadBlockStopsPeer(t *testing.T) {
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
} }
func setupReactors(
numReactors int, maxBlockHeight int64,
genDoc *types.GenesisDoc, privVals []types.PrivValidator) ([]BlockchainReactorPair, []*p2p.Switch) {
defer os.RemoveAll(config.RootDir)
reactorPairs := make([]BlockchainReactorPair, numReactors)
var logger = make([]log.Logger, numReactors)
for i := 0; i < numReactors; i++ {
logger[i] = log.TestingLogger()
height := int64(0)
if i == 0 {
height = maxBlockHeight
}
reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height)
}
switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
for i := 0; i < numReactors; i++ {
addr := reactorPairs[i].reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19]))
}
return reactorPairs, switches
}
func TestFastSyncMultiNode(t *testing.T) {
numNodes := 8
maxHeight := int64(1000)
config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30)
reactorPairs, switches := setupReactors(numNodes, maxHeight, genDoc, privVals)
defer func() {
for _, r := range reactorPairs {
_ = r.reactor.Stop()
_ = r.app.Stop()
}
}()
for {
if reactorPairs[numNodes-1].reactor.pool.IsCaughtUp() || reactorPairs[numNodes-1].reactor.Switch.Peers().Size() == 0 {
break
}
time.Sleep(1 * time.Second)
}
//at this time, reactors[0-3] are the newest
assert.Equal(t, numNodes-1, reactorPairs[1].reactor.Switch.Peers().Size())
lastLogger := log.TestingLogger()
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s
}, p2p.Connect2Switches)...)
addr := lastReactorPair.reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19]))
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
for {
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break
}
time.Sleep(1 * time.Second)
}
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs))
}
//---------------------------------------------- //----------------------------------------------
// utility funcs // utility funcs

View File

@ -99,7 +99,7 @@ func (peer *bpPeer) onTimeout() {
peer.didTimeout = true peer.didTimeout = true
} }
func (peer *bpPeer) isPeerGood() error { func (peer *bpPeer) isGood() error {
if peer.didTimeout { if peer.didTimeout {
return errNoPeerResponse return errNoPeerResponse
} }

View File

@ -160,7 +160,7 @@ func TestCanBeRemovedDueToExpiration(t *testing.T) {
peer.incrPending() peer.incrPending()
time.Sleep(2 * time.Millisecond) time.Sleep(2 * time.Millisecond)
// timer expired, should be able to remove peer // timer expired, should be able to remove peer
assert.Equal(t, errNoPeerResponse, peer.isPeerGood()) assert.Equal(t, errNoPeerResponse, peer.isGood())
} }
func TestCanBeRemovedDueToLowSpeed(t *testing.T) { func TestCanBeRemovedDueToLowSpeed(t *testing.T) {
@ -187,7 +187,7 @@ func TestCanBeRemovedDueToLowSpeed(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
peer.decrPending(11) peer.decrPending(11)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
require.Nil(t, peer.isPeerGood()) require.Nil(t, peer.isGood())
} }
// slow peer - send a bit less than 10 byes/100msec // slow peer - send a bit less than 10 byes/100msec
@ -196,7 +196,7 @@ func TestCanBeRemovedDueToLowSpeed(t *testing.T) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
// check peer is considered slow // check peer is considered slow
assert.Equal(t, errSlowPeer, peer.isPeerGood()) assert.Equal(t, errSlowPeer, peer.isGood())
} }

View File

@ -3,13 +3,14 @@ package blockchain_new
import ( import (
"errors" "errors"
"fmt" "fmt"
"reflect"
"time"
"github.com/tendermint/go-amino" "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
"reflect"
"time"
) )
const ( const (
@ -157,7 +158,7 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage,
return src.TrySend(BlockchainChannel, msgBytes) return src.TrySend(BlockchainChannel, msgBytes)
} }
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height) bcR.Logger.Info("peer asking for a block we don't have", "src", src, "height", msg.Height)
msgBytes := cdc.MustMarshalBinaryBare(&bcNoBlockResponseMessage{Height: msg.Height}) msgBytes := cdc.MustMarshalBinaryBare(&bcNoBlockResponseMessage{Height: msg.Height})
return src.TrySend(BlockchainChannel, msgBytes) return src.TrySend(BlockchainChannel, msgBytes)
@ -167,13 +168,13 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage,
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes) msg, err := decodeMsg(msgBytes)
if err != nil { if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) bcR.Logger.Error("error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
bcR.Switch.StopPeerForError(src, err) bcR.Switch.StopPeerForError(src, err)
return return
} }
if err = msg.ValidateBasic(); err != nil { if err = msg.ValidateBasic(); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) bcR.Logger.Error("peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err) bcR.Switch.StopPeerForError(src, err)
return return
} }
@ -216,7 +217,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
sendMessageToFSM(bcR.fsm, msgData) sendMessageToFSM(bcR.fsm, msgData)
default: default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg)))
} }
} }
@ -235,7 +236,7 @@ func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Bl
chainID, firstID, first.Height, second.LastCommit) chainID, firstID, first.Height, second.LastCommit)
if err != nil { if err != nil {
bcR.Logger.Error("Error in validation", "err", err, first.Height, second.Height) bcR.Logger.Error("error in validation", "err", err, first.Height, second.Height)
peerID := bcR.fsm.blocks[first.Height].peerId peerID := bcR.fsm.blocks[first.Height].peerId
peer := bcR.Switch.Peers().Get(peerID) peer := bcR.Switch.Peers().Get(peerID)
if peer != nil { if peer != nil {
@ -255,7 +256,7 @@ func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Bl
bcR.state, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first) bcR.state, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first)
if err != nil { if err != nil {
// TODO This is bad, are we zombie? // TODO This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
} }
bcR.blocksSynced++ bcR.blocksSynced++

View File

@ -20,6 +20,17 @@ type blockData struct {
peerId p2p.ID peerId p2p.ID
} }
func (bd *blockData) String() string {
if bd == nil {
return fmt.Sprintf("blockData nil")
}
if bd.block == nil {
return fmt.Sprintf("block: nil peer: %v", bd.peerId)
}
return fmt.Sprintf("block: %v peer: %v", bd.block.Height, bd.peerId)
}
// Blockchain Reactor State // Blockchain Reactor State
type bReactorFSMState struct { type bReactorFSMState struct {
name string name string
@ -46,7 +57,6 @@ type bReactorFSM struct {
blocks map[int64]*blockData blocks map[int64]*blockData
height int64 // processing height height int64 // processing height
lastRequestHeight int64
peers map[p2p.ID]*bpPeer peers map[p2p.ID]*bpPeer
maxPeerHeight int64 maxPeerHeight int64
@ -55,6 +65,7 @@ type bReactorFSM struct {
// channel to receive messages // channel to receive messages
messageCh chan bReactorMessageData messageCh chan bReactorMessageData
processSignalActive bool
// interface used to send StatusRequest, BlockRequest, errors // interface used to send StatusRequest, BlockRequest, errors
bcr sendMessage bcr sendMessage
@ -266,7 +277,7 @@ func init() {
case blockResponseEv: case blockResponseEv:
// add block to fsm.blocks // add block to fsm.blocks
fsm.logger.Info("blockResponseEv", "H", data.block.Height) fsm.logger.Debug("blockResponseEv", "H", data.block.Height)
err := fsm.addBlock(data.peerId, data.block, data.length) err := fsm.addBlock(data.peerId, data.block, data.length)
if err != nil { if err != nil {
// unsolicited, from different peer, already have it.. // unsolicited, from different peer, already have it..
@ -275,17 +286,15 @@ func init() {
return waitForBlock, err return waitForBlock, err
} }
if fsm.shouldTryProcessBlock() {
fsm.logger.Info("shouldTryProcessBlock", "first", fsm.height, "second", fsm.height+1)
// try to process block at fsm.height with the help of block at fsm.height+1
fsm.sendSignalToProcessBlock() fsm.sendSignalToProcessBlock()
}
if fsm.state.timer != nil { if fsm.state.timer != nil {
fsm.state.timer.Stop() fsm.state.timer.Stop()
} }
return waitForBlock, nil return waitForBlock, nil
case tryProcessBlockEv: case tryProcessBlockEv:
fsm.logger.Debug("FSM blocks", "blocks", fsm.blocks, "fsm_height", fsm.height)
if err := fsm.processBlock(); err != nil { if err := fsm.processBlock(); err != nil {
if err == errMissingBlocks { if err == errMissingBlocks {
// continue so we ask for more blocks // continue so we ask for more blocks
@ -300,6 +309,7 @@ func init() {
} else { } else {
delete(fsm.blocks, fsm.height) delete(fsm.blocks, fsm.height)
fsm.height++ fsm.height++
fsm.processSignalActive = false
fsm.removeShortPeers() fsm.removeShortPeers()
// processed block, check if we are done // processed block, check if we are done
@ -316,11 +326,7 @@ func init() {
// wait for more peers or state timeout // wait for more peers or state timeout
} }
if fsm.shouldTryProcessBlock() {
fsm.logger.Info("shouldTryProcessBlock", "first", fsm.height, "second", fsm.height+1)
// try to process block at fsm.height with the help of block at fsm.height+1
fsm.sendSignalToProcessBlock() fsm.sendSignalToProcessBlock()
}
if fsm.state.timer != nil { if fsm.state.timer != nil {
fsm.state.timer.Stop() fsm.state.timer.Stop()
@ -378,7 +384,7 @@ func NewFSM(store *BlockStore, bcr sendMessage) *bReactorFSM {
} }
func sendMessageToFSM(fsm *bReactorFSM, msg bReactorMessageData) { func sendMessageToFSM(fsm *bReactorFSM, msg bReactorMessageData) {
fsm.logger.Info("send message to FSM", "msg", msg.String()) fsm.logger.Debug("send message to FSM", "msg", msg.String())
fsm.messageCh <- msg fsm.messageCh <- msg
} }
@ -409,7 +415,7 @@ forLoop:
for { for {
select { select {
case msg := <-fsm.messageCh: case msg := <-fsm.messageCh:
fsm.logger.Info("FSM Received message", "msg", msg.String()) fsm.logger.Debug("FSM Received message", "msg", msg.String())
_ = fsm.handle(&msg) _ = fsm.handle(&msg)
if msg.event == stopFSMEv { if msg.event == stopFSMEv {
break forLoop break forLoop
@ -423,18 +429,21 @@ forLoop:
// handle processes messages and events sent to the FSM. // handle processes messages and events sent to the FSM.
func (fsm *bReactorFSM) handle(msg *bReactorMessageData) error { func (fsm *bReactorFSM) handle(msg *bReactorMessageData) error {
fsm.logger.Info("Blockchain reactor FSM received event", "event", msg.event, "state", fsm.state.name) fsm.logger.Debug("FSM received event", "event", msg.event, "state", fsm.state.name)
if fsm.state == nil { if fsm.state == nil {
fsm.state = unknown fsm.state = unknown
} }
next, err := fsm.state.handle(fsm, msg.event, msg.data) next, err := fsm.state.handle(fsm, msg.event, msg.data)
if err != nil { if err != nil {
fsm.logger.Error("Blockchain reactor event handler returned", "err", err) fsm.logger.Error("FSM event handler returned", "err", err, "state", fsm.state.name, "event", msg.event)
} }
oldState := fsm.state.name
fsm.transition(next) fsm.transition(next)
fsm.logger.Info("FSM new state", "state", fsm.state.name) if oldState != fsm.state.name {
fsm.logger.Info("FSM changed state", "old_state", oldState, "event", msg.event, "new_state", fsm.state.name)
}
return err return err
} }
@ -442,7 +451,7 @@ func (fsm *bReactorFSM) transition(next *bReactorFSMState) {
if next == nil { if next == nil {
return return
} }
fsm.logger.Info("Blockchain reactor FSM changes state: ", "old", fsm.state.name, "new", next.name) fsm.logger.Debug("changes state: ", "old", fsm.state.name, "new", next.name)
if fsm.state != next { if fsm.state != next {
fsm.state = next fsm.state = next
@ -490,7 +499,7 @@ func (fsm *bReactorFSM) resetStateTimer(state *bReactorFSMState) {
func (fsm *bReactorFSM) sendRequestBatch() error { func (fsm *bReactorFSM) sendRequestBatch() error {
// remove slow and timed out peers // remove slow and timed out peers
for _, peer := range fsm.peers { for _, peer := range fsm.peers {
if err := peer.isPeerGood(); err != nil { if err := peer.isGood(); err != nil {
fsm.logger.Info("Removing bad peer", "peer", peer.id, "err", err) fsm.logger.Info("Removing bad peer", "peer", peer.id, "err", err)
fsm.removePeer(peer.id, err) fsm.removePeer(peer.id, err)
} }
@ -502,6 +511,7 @@ func (fsm *bReactorFSM) sendRequestBatch() error {
// request height // request height
height := fsm.height + int64(i) height := fsm.height + int64(i)
if height > fsm.maxPeerHeight { if height > fsm.maxPeerHeight {
fsm.logger.Debug("Will not send request for", "height", height)
return err return err
} }
req := fsm.blocks[height] req := fsm.blocks[height]
@ -520,20 +530,27 @@ func (fsm *bReactorFSM) sendRequestBatch() error {
func (fsm *bReactorFSM) sendRequest(height int64) error { func (fsm *bReactorFSM) sendRequest(height int64) error {
// make requests // make requests
// TODO - sort peers in order of goodness // TODO - sort peers in order of goodness
fsm.logger.Debug("try to send request for", "height", height)
for _, peer := range fsm.peers { for _, peer := range fsm.peers {
// Send Block Request message to peer // Send Block Request message to peer
fsm.logger.Info("Try to send request to peer", "peer", peer.id, "height", height) if peer.height < height {
continue
}
fsm.logger.Debug("Try to send request to peer", "peer", peer.id, "height", height)
err := fsm.bcr.sendBlockRequest(peer.id, height) err := fsm.bcr.sendBlockRequest(peer.id, height)
if err == errSendQueueFull { if err == errSendQueueFull {
fsm.logger.Info("cannot send request, queue full", "peer", peer.id, "height", height) fsm.logger.Error("cannot send request, queue full", "peer", peer.id, "height", height)
continue continue
} }
if err == errNilPeerForBlockRequest { if err == errNilPeerForBlockRequest {
// this peer does not exist in the switch, delete locally // this peer does not exist in the switch, delete locally
fsm.logger.Info("Peer doesn't exist in the switch", "peer", peer.id) fsm.logger.Error("peer doesn't exist in the switch", "peer", peer.id)
fsm.deletePeer(peer.id) fsm.deletePeer(peer.id)
continue continue
} }
fsm.logger.Debug("Sent request to peer", "peer", peer.id, "height", height)
// reserve space for block // reserve space for block
fsm.blocks[height] = &blockData{peerId: peer.id, block: nil} fsm.blocks[height] = &blockData{peerId: peer.id, block: nil}
fsm.peers[peer.id].incrPending() fsm.peers[peer.id].incrPending()
@ -616,7 +633,7 @@ func (fsm *bReactorFSM) removeShortPeers() {
// removes any blocks and requests associated with the peer, deletes the peer and informs the switch if needed. // removes any blocks and requests associated with the peer, deletes the peer and informs the switch if needed.
func (fsm *bReactorFSM) removePeer(peerID p2p.ID, err error) { func (fsm *bReactorFSM) removePeer(peerID p2p.ID, err error) {
fsm.logger.Info("removePeer", "peer", peerID, "err", err) fsm.logger.Debug("removePeer", "peer", peerID, "err", err)
// remove all data for blocks waiting for the peer or not processed yet // remove all data for blocks waiting for the peer or not processed yet
for h, bData := range fsm.blocks { for h, bData := range fsm.blocks {
if bData.peerId == peerID { if bData.peerId == peerID {
@ -664,7 +681,7 @@ func (fsm *bReactorFSM) addBlock(peerID p2p.ID, block *types.Block, blockSize in
return errBadDataFromPeer return errBadDataFromPeer
} }
if blockData.block != nil { if blockData.block != nil {
fsm.logger.Error("already have a block for height") fsm.logger.Error("already have a block for height", "height", block.Height)
return errBadDataFromPeer return errBadDataFromPeer
} }
@ -676,25 +693,27 @@ func (fsm *bReactorFSM) addBlock(peerID p2p.ID, block *types.Block, blockSize in
return nil return nil
} }
func (fsm *bReactorFSM) shouldTryProcessBlock() bool { func (fsm *bReactorFSM) sendSignalToProcessBlock() {
first := fsm.blocks[fsm.height] first := fsm.blocks[fsm.height]
second := fsm.blocks[fsm.height+1] second := fsm.blocks[fsm.height+1]
if first == nil || first.block == nil || second == nil || second.block == nil { if first == nil || first.block == nil || second == nil || second.block == nil {
// We need both to sync the first block. // We need both to sync the first block.
return false fsm.logger.Debug("No need to send signal to process, blocks missing")
return
} }
return true
}
func (fsm *bReactorFSM) sendSignalToProcessBlock() { fsm.logger.Debug("Send signal to process", "first", fsm.height, "second", fsm.height+1)
// TODO - add check that this was sent before, currently there are extraneous tryProcessBlockEv if fsm.processSignalActive {
fsm.logger.Info("Send signal to process", "first", fsm.height, "second", fsm.height+1) fsm.logger.Debug("..already sent")
return
}
msgData := bReactorMessageData{ msgData := bReactorMessageData{
event: tryProcessBlockEv, event: tryProcessBlockEv,
data: bReactorEventData{}, data: bReactorEventData{},
} }
sendMessageToFSM(fsm, msgData) sendMessageToFSM(fsm, msgData)
fsm.processSignalActive = true
} }
// Processes block at height H = fsm.height. Expects both H and H+1 to be available // Processes block at height H = fsm.height. Expects both H and H+1 to be available
@ -703,13 +722,14 @@ func (fsm *bReactorFSM) processBlock() error {
second := fsm.blocks[fsm.height+1] second := fsm.blocks[fsm.height+1]
if first == nil || first.block == nil || second == nil || second.block == nil { if first == nil || first.block == nil || second == nil || second.block == nil {
// We need both to sync the first block. // We need both to sync the first block.
fsm.logger.Debug("process blocks doesn't have the blocks", "first", first, "second", second)
return errMissingBlocks return errMissingBlocks
} }
fsm.logger.Info("process blocks", "first", first.block.Height, "second", second.block.Height) fsm.logger.Debug("process blocks", "first", first.block.Height, "second", second.block.Height)
fsm.logger.Info("FSM blocks", "blocks", fsm.blocks) fsm.logger.Debug("FSM blocks", "blocks", fsm.blocks)
if err := fsm.bcr.processBlocks(first.block, second.block); err != nil { if err := fsm.bcr.processBlocks(first.block, second.block); err != nil {
fsm.logger.Error("Process blocks returned error", "err", err, "first", first.block.Height, "second", second.block.Height) fsm.logger.Error("process blocks returned error", "err", err, "first", first.block.Height, "second", second.block.Height)
fsm.logger.Error("FSM blocks", "blocks", fsm.blocks) fsm.logger.Error("FSM blocks", "blocks", fsm.blocks)
return err return err
} }

View File

@ -64,6 +64,16 @@ type fsmStepTestValues struct {
expectedLastBlockReq *lastBlockRequestT expectedLastBlockReq *lastBlockRequestT
} }
func newTestReactor() *testReactor {
logger := log.TestingLogger()
blockDB := dbm.NewMemDB()
store := NewBlockStore(blockDB)
testBcR := &testReactor{logger: logger}
testBcR.fsm = NewFSM(store, testBcR)
testBcR.fsm.setLogger(logger)
return testBcR
}
// WIP // WIP
func TestFSMTransitionSequences(t *testing.T) { func TestFSMTransitionSequences(t *testing.T) {
maxRequestBatchSize = 2 maxRequestBatchSize = 2
@ -79,27 +89,19 @@ func TestFSMTransitionSequences(t *testing.T) {
} }
for _, tt := range fsmTransitionSequenceTests { for _, tt := range fsmTransitionSequenceTests {
// Create and start the FSM // Create test reactor
testBcR := &testReactor{logger: log.TestingLogger()} testBcR := newTestReactor()
blockDB := dbm.NewMemDB()
store := NewBlockStore(blockDB)
fsm := NewFSM(store, testBcR)
fsm.setLogger(log.TestingLogger())
resetTestValues() resetTestValues()
// always start from unknown
fsm.resetStateTimer(unknown)
assert.Equal(t, 1, stateTimerStarts[unknown.name])
for _, step := range tt { for _, step := range tt {
assert.Equal(t, step.currentState, fsm.state.name) assert.Equal(t, step.currentState, testBcR.fsm.state.name)
failSendStatusRequest = step.failStatusReq failSendStatusRequest = step.failStatusReq
failSendBlockRequest = step.failBlockReq failSendBlockRequest = step.failBlockReq
oldNumStatusRequests := numStatusRequests oldNumStatusRequests := numStatusRequests
oldNumBlockRequests := numBlockRequests oldNumBlockRequests := numBlockRequests
_ = sendEventToFSM(fsm, step.event, step.data) _ = sendEventToFSM(testBcR.fsm, step.event, step.data)
if step.shouldSendStatusReq { if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
} else { } else {
@ -112,7 +114,7 @@ func TestFSMTransitionSequences(t *testing.T) {
assert.Equal(t, oldNumBlockRequests, numBlockRequests) assert.Equal(t, oldNumBlockRequests, numBlockRequests)
} }
assert.Equal(t, step.expectedState, fsm.state.name) assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
} }
} }
} }
@ -120,13 +122,10 @@ func TestFSMTransitionSequences(t *testing.T) {
func TestReactorFSMBasic(t *testing.T) { func TestReactorFSMBasic(t *testing.T) {
maxRequestBatchSize = 2 maxRequestBatchSize = 2
// Create test reactor
testBcR := newTestReactor()
resetTestValues() resetTestValues()
// Create and start the FSM fsm := testBcR.fsm
testBcR := &testReactor{logger: log.TestingLogger()}
blockDB := dbm.NewMemDB()
store := NewBlockStore(blockDB)
fsm := NewFSM(store, testBcR)
fsm.setLogger(log.TestingLogger())
if err := fsm.handle(&bReactorMessageData{event: startFSMEv}); err != nil { if err := fsm.handle(&bReactorMessageData{event: startFSMEv}); err != nil {
} }
@ -151,11 +150,8 @@ func TestReactorFSMPeerTimeout(t *testing.T) {
resetTestValues() resetTestValues()
peerTimeout = 20 * time.Millisecond peerTimeout = 20 * time.Millisecond
// Create and start the FSM // Create and start the FSM
testBcR := &testReactor{logger: log.TestingLogger()} testBcR := newTestReactor()
blockDB := dbm.NewMemDB() fsm := testBcR.fsm
store := NewBlockStore(blockDB)
fsm := NewFSM(store, testBcR)
fsm.setLogger(log.TestingLogger())
fsm.start() fsm.start()
// Check that FSM sends a status request message // Check that FSM sends a status request message

View File

@ -290,6 +290,100 @@ func TestBadBlockStopsPeer(t *testing.T) {
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
} }
func setupReactors(
numReactors int, maxBlockHeight int64,
genDoc *types.GenesisDoc, privVals []types.PrivValidator) ([]BlockchainReactorPair, []*p2p.Switch) {
defer os.RemoveAll(config.RootDir)
reactorPairs := make([]BlockchainReactorPair, numReactors)
var logger = make([]log.Logger, numReactors)
for i := 0; i < numReactors; i++ {
logger[i] = log.TestingLogger()
height := int64(0)
if i == 0 {
height = maxBlockHeight
}
reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height)
}
switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
for i := 0; i < numReactors; i++ {
addr := reactorPairs[i].reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19]))
}
return reactorPairs, switches
}
func TestFastSyncMultiNode(t *testing.T) {
numNodes := 8
maxHeight := int64(1000)
peerTimeout = 15 * time.Second
maxRequestBatchSize = 40
config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30)
reactorPairs, switches := setupReactors(numNodes, maxHeight, genDoc, privVals)
defer func() {
for _, r := range reactorPairs {
_ = r.reactor.Stop()
_ = r.app.Stop()
}
}()
for {
if reactorPairs[numNodes-1].reactor.fsm.IsFinished() || reactorPairs[3].reactor.Switch.Peers().Size() == 0 {
break
}
time.Sleep(1 * time.Second)
}
//at this time, reactors[0-3] are the newest
assert.Equal(t, numNodes-1, reactorPairs[1].reactor.Switch.Peers().Size())
lastLogger := log.TestingLogger()
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s
}, p2p.Connect2Switches)...)
addr := lastReactorPair.reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19]))
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
for {
if lastReactorPair.reactor.fsm.IsFinished() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break
}
time.Sleep(1 * time.Second)
}
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs))
assert.Equal(t, lastReactorPair.reactor.fsm.maxPeerHeight, lastReactorPair.reactor.fsm.height)
}
//---------------------------------------------- //----------------------------------------------
// utility funcs // utility funcs

View File

@ -30,10 +30,6 @@ func TestingLogger() Logger {
// inside a test (not in the init func) because // inside a test (not in the init func) because
// verbose flag only set at the time of testing. // verbose flag only set at the time of testing.
func TestingLoggerWithOutput(w io.Writer) Logger { func TestingLoggerWithOutput(w io.Writer) Logger {
if _testingLogger != nil {
return _testingLogger
}
if testing.Verbose() { if testing.Verbose() {
_testingLogger = NewTMLogger(NewSyncWriter(w)) _testingLogger = NewTMLogger(NewSyncWriter(w))
} else { } else {