diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 1b3aee56..30f9806a 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -1,6 +1,7 @@ package blockchain import ( + "fmt" "os" "sort" "testing" @@ -255,6 +256,95 @@ func TestBadBlockStopsPeer(t *testing.T) { assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) } +func setupReactors( + numReactors int, maxBlockHeight int64, + genDoc *types.GenesisDoc, privVals []types.PrivValidator) ([]BlockchainReactorPair, []*p2p.Switch) { + + defer os.RemoveAll(config.RootDir) + + reactorPairs := make([]BlockchainReactorPair, numReactors) + + var logger = make([]log.Logger, numReactors) + + for i := 0; i < numReactors; i++ { + logger[i] = log.TestingLogger() + height := int64(0) + if i == 0 { + height = maxBlockHeight + } + reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height) + } + + switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + return s + + }, p2p.Connect2Switches) + + for i := 0; i < numReactors; i++ { + addr := reactorPairs[i].reactor.Switch.NodeInfo().ID() + moduleName := fmt.Sprintf("blockchain-%v", addr) + reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19])) + } + + return reactorPairs, switches +} + +func TestFastSyncMultiNode(t *testing.T) { + + numNodes := 8 + maxHeight := int64(1000) + config = cfg.ResetTestRoot("blockchain_reactor_test") + genDoc, privVals := randGenesisDoc(1, false, 30) + + reactorPairs, switches := setupReactors(numNodes, maxHeight, genDoc, privVals) + + defer func() { + for _, r := range reactorPairs { + _ = r.reactor.Stop() + _ = r.app.Stop() + } + }() + + for { + if reactorPairs[numNodes-1].reactor.pool.IsCaughtUp() || reactorPairs[numNodes-1].reactor.Switch.Peers().Size() == 0 { + break + } + + time.Sleep(1 * time.Second) + } + + //at this time, reactors[0-3] are the newest + assert.Equal(t, numNodes-1, reactorPairs[1].reactor.Switch.Peers().Size()) + + lastLogger := log.TestingLogger() + lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) + reactorPairs = append(reactorPairs, lastReactorPair) + + switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) + return s + + }, p2p.Connect2Switches)...) + + addr := lastReactorPair.reactor.Switch.NodeInfo().ID() + moduleName := fmt.Sprintf("blockchain-%v", addr) + lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19])) + + for i := 0; i < len(reactorPairs)-1; i++ { + p2p.Connect2Switches(switches, i, len(reactorPairs)-1) + } + + for { + if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { + break + } + + time.Sleep(1 * time.Second) + } + assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)) +} + //---------------------------------------------- // utility funcs diff --git a/blockchain_new/peer.go b/blockchain_new/peer.go index 1c6e97d0..227d36f8 100644 --- a/blockchain_new/peer.go +++ b/blockchain_new/peer.go @@ -99,7 +99,7 @@ func (peer *bpPeer) onTimeout() { peer.didTimeout = true } -func (peer *bpPeer) isPeerGood() error { +func (peer *bpPeer) isGood() error { if peer.didTimeout { return errNoPeerResponse } diff --git a/blockchain_new/peer_test.go b/blockchain_new/peer_test.go index 68f247ff..8364764c 100644 --- a/blockchain_new/peer_test.go +++ b/blockchain_new/peer_test.go @@ -160,7 +160,7 @@ func TestCanBeRemovedDueToExpiration(t *testing.T) { peer.incrPending() time.Sleep(2 * time.Millisecond) // timer expired, should be able to remove peer - assert.Equal(t, errNoPeerResponse, peer.isPeerGood()) + assert.Equal(t, errNoPeerResponse, peer.isGood()) } func TestCanBeRemovedDueToLowSpeed(t *testing.T) { @@ -187,7 +187,7 @@ func TestCanBeRemovedDueToLowSpeed(t *testing.T) { for i := 0; i < 10; i++ { peer.decrPending(11) time.Sleep(100 * time.Millisecond) - require.Nil(t, peer.isPeerGood()) + require.Nil(t, peer.isGood()) } // slow peer - send a bit less than 10 byes/100msec @@ -196,7 +196,7 @@ func TestCanBeRemovedDueToLowSpeed(t *testing.T) { time.Sleep(100 * time.Millisecond) } // check peer is considered slow - assert.Equal(t, errSlowPeer, peer.isPeerGood()) + assert.Equal(t, errSlowPeer, peer.isGood()) } diff --git a/blockchain_new/reactor.go b/blockchain_new/reactor.go index 758045de..78564d75 100644 --- a/blockchain_new/reactor.go +++ b/blockchain_new/reactor.go @@ -3,13 +3,14 @@ package blockchain_new import ( "errors" "fmt" + "reflect" + "time" + "github.com/tendermint/go-amino" "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" - "reflect" - "time" ) const ( @@ -157,7 +158,7 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage, return src.TrySend(BlockchainChannel, msgBytes) } - bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height) + bcR.Logger.Info("peer asking for a block we don't have", "src", src, "height", msg.Height) msgBytes := cdc.MustMarshalBinaryBare(&bcNoBlockResponseMessage{Height: msg.Height}) return src.TrySend(BlockchainChannel, msgBytes) @@ -167,13 +168,13 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage, func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { msg, err := decodeMsg(msgBytes) if err != nil { - bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) + bcR.Logger.Error("error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) bcR.Switch.StopPeerForError(src, err) return } if err = msg.ValidateBasic(); err != nil { - bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) + bcR.Logger.Error("peer sent us invalid msg", "peer", src, "msg", msg, "err", err) bcR.Switch.StopPeerForError(src, err) return } @@ -216,7 +217,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) sendMessageToFSM(bcR.fsm, msgData) default: - bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg))) } } @@ -235,7 +236,7 @@ func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Bl chainID, firstID, first.Height, second.LastCommit) if err != nil { - bcR.Logger.Error("Error in validation", "err", err, first.Height, second.Height) + bcR.Logger.Error("error in validation", "err", err, first.Height, second.Height) peerID := bcR.fsm.blocks[first.Height].peerId peer := bcR.Switch.Peers().Get(peerID) if peer != nil { @@ -255,7 +256,7 @@ func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Bl bcR.state, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first) if err != nil { // TODO This is bad, are we zombie? - panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) + panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } bcR.blocksSynced++ diff --git a/blockchain_new/reactor_fsm.go b/blockchain_new/reactor_fsm.go index 434f6e0c..82016e02 100644 --- a/blockchain_new/reactor_fsm.go +++ b/blockchain_new/reactor_fsm.go @@ -20,6 +20,17 @@ type blockData struct { peerId p2p.ID } +func (bd *blockData) String() string { + if bd == nil { + return fmt.Sprintf("blockData nil") + } + if bd.block == nil { + return fmt.Sprintf("block: nil peer: %v", bd.peerId) + } + return fmt.Sprintf("block: %v peer: %v", bd.block.Height, bd.peerId) + +} + // Blockchain Reactor State type bReactorFSMState struct { name string @@ -44,9 +55,8 @@ type bReactorFSM struct { state *bReactorFSMState - blocks map[int64]*blockData - height int64 // processing height - lastRequestHeight int64 + blocks map[int64]*blockData + height int64 // processing height peers map[p2p.ID]*bpPeer maxPeerHeight int64 @@ -54,7 +64,8 @@ type bReactorFSM struct { store *BlockStore // channel to receive messages - messageCh chan bReactorMessageData + messageCh chan bReactorMessageData + processSignalActive bool // interface used to send StatusRequest, BlockRequest, errors bcr sendMessage @@ -266,7 +277,7 @@ func init() { case blockResponseEv: // add block to fsm.blocks - fsm.logger.Info("blockResponseEv", "H", data.block.Height) + fsm.logger.Debug("blockResponseEv", "H", data.block.Height) err := fsm.addBlock(data.peerId, data.block, data.length) if err != nil { // unsolicited, from different peer, already have it.. @@ -275,17 +286,15 @@ func init() { return waitForBlock, err } - if fsm.shouldTryProcessBlock() { - fsm.logger.Info("shouldTryProcessBlock", "first", fsm.height, "second", fsm.height+1) - // try to process block at fsm.height with the help of block at fsm.height+1 - fsm.sendSignalToProcessBlock() - } + fsm.sendSignalToProcessBlock() + if fsm.state.timer != nil { fsm.state.timer.Stop() } return waitForBlock, nil case tryProcessBlockEv: + fsm.logger.Debug("FSM blocks", "blocks", fsm.blocks, "fsm_height", fsm.height) if err := fsm.processBlock(); err != nil { if err == errMissingBlocks { // continue so we ask for more blocks @@ -300,6 +309,7 @@ func init() { } else { delete(fsm.blocks, fsm.height) fsm.height++ + fsm.processSignalActive = false fsm.removeShortPeers() // processed block, check if we are done @@ -316,11 +326,7 @@ func init() { // wait for more peers or state timeout } - if fsm.shouldTryProcessBlock() { - fsm.logger.Info("shouldTryProcessBlock", "first", fsm.height, "second", fsm.height+1) - // try to process block at fsm.height with the help of block at fsm.height+1 - fsm.sendSignalToProcessBlock() - } + fsm.sendSignalToProcessBlock() if fsm.state.timer != nil { fsm.state.timer.Stop() @@ -378,7 +384,7 @@ func NewFSM(store *BlockStore, bcr sendMessage) *bReactorFSM { } func sendMessageToFSM(fsm *bReactorFSM, msg bReactorMessageData) { - fsm.logger.Info("send message to FSM", "msg", msg.String()) + fsm.logger.Debug("send message to FSM", "msg", msg.String()) fsm.messageCh <- msg } @@ -409,7 +415,7 @@ forLoop: for { select { case msg := <-fsm.messageCh: - fsm.logger.Info("FSM Received message", "msg", msg.String()) + fsm.logger.Debug("FSM Received message", "msg", msg.String()) _ = fsm.handle(&msg) if msg.event == stopFSMEv { break forLoop @@ -423,18 +429,21 @@ forLoop: // handle processes messages and events sent to the FSM. func (fsm *bReactorFSM) handle(msg *bReactorMessageData) error { - fsm.logger.Info("Blockchain reactor FSM received event", "event", msg.event, "state", fsm.state.name) + fsm.logger.Debug("FSM received event", "event", msg.event, "state", fsm.state.name) if fsm.state == nil { fsm.state = unknown } next, err := fsm.state.handle(fsm, msg.event, msg.data) if err != nil { - fsm.logger.Error("Blockchain reactor event handler returned", "err", err) + fsm.logger.Error("FSM event handler returned", "err", err, "state", fsm.state.name, "event", msg.event) } + oldState := fsm.state.name fsm.transition(next) - fsm.logger.Info("FSM new state", "state", fsm.state.name) + if oldState != fsm.state.name { + fsm.logger.Info("FSM changed state", "old_state", oldState, "event", msg.event, "new_state", fsm.state.name) + } return err } @@ -442,7 +451,7 @@ func (fsm *bReactorFSM) transition(next *bReactorFSMState) { if next == nil { return } - fsm.logger.Info("Blockchain reactor FSM changes state: ", "old", fsm.state.name, "new", next.name) + fsm.logger.Debug("changes state: ", "old", fsm.state.name, "new", next.name) if fsm.state != next { fsm.state = next @@ -490,7 +499,7 @@ func (fsm *bReactorFSM) resetStateTimer(state *bReactorFSMState) { func (fsm *bReactorFSM) sendRequestBatch() error { // remove slow and timed out peers for _, peer := range fsm.peers { - if err := peer.isPeerGood(); err != nil { + if err := peer.isGood(); err != nil { fsm.logger.Info("Removing bad peer", "peer", peer.id, "err", err) fsm.removePeer(peer.id, err) } @@ -502,6 +511,7 @@ func (fsm *bReactorFSM) sendRequestBatch() error { // request height height := fsm.height + int64(i) if height > fsm.maxPeerHeight { + fsm.logger.Debug("Will not send request for", "height", height) return err } req := fsm.blocks[height] @@ -520,20 +530,27 @@ func (fsm *bReactorFSM) sendRequestBatch() error { func (fsm *bReactorFSM) sendRequest(height int64) error { // make requests // TODO - sort peers in order of goodness + fsm.logger.Debug("try to send request for", "height", height) for _, peer := range fsm.peers { // Send Block Request message to peer - fsm.logger.Info("Try to send request to peer", "peer", peer.id, "height", height) + if peer.height < height { + continue + } + fsm.logger.Debug("Try to send request to peer", "peer", peer.id, "height", height) err := fsm.bcr.sendBlockRequest(peer.id, height) if err == errSendQueueFull { - fsm.logger.Info("cannot send request, queue full", "peer", peer.id, "height", height) + fsm.logger.Error("cannot send request, queue full", "peer", peer.id, "height", height) continue } if err == errNilPeerForBlockRequest { // this peer does not exist in the switch, delete locally - fsm.logger.Info("Peer doesn't exist in the switch", "peer", peer.id) + fsm.logger.Error("peer doesn't exist in the switch", "peer", peer.id) fsm.deletePeer(peer.id) continue } + + fsm.logger.Debug("Sent request to peer", "peer", peer.id, "height", height) + // reserve space for block fsm.blocks[height] = &blockData{peerId: peer.id, block: nil} fsm.peers[peer.id].incrPending() @@ -616,7 +633,7 @@ func (fsm *bReactorFSM) removeShortPeers() { // removes any blocks and requests associated with the peer, deletes the peer and informs the switch if needed. func (fsm *bReactorFSM) removePeer(peerID p2p.ID, err error) { - fsm.logger.Info("removePeer", "peer", peerID, "err", err) + fsm.logger.Debug("removePeer", "peer", peerID, "err", err) // remove all data for blocks waiting for the peer or not processed yet for h, bData := range fsm.blocks { if bData.peerId == peerID { @@ -664,7 +681,7 @@ func (fsm *bReactorFSM) addBlock(peerID p2p.ID, block *types.Block, blockSize in return errBadDataFromPeer } if blockData.block != nil { - fsm.logger.Error("already have a block for height") + fsm.logger.Error("already have a block for height", "height", block.Height) return errBadDataFromPeer } @@ -676,25 +693,27 @@ func (fsm *bReactorFSM) addBlock(peerID p2p.ID, block *types.Block, blockSize in return nil } -func (fsm *bReactorFSM) shouldTryProcessBlock() bool { +func (fsm *bReactorFSM) sendSignalToProcessBlock() { first := fsm.blocks[fsm.height] second := fsm.blocks[fsm.height+1] if first == nil || first.block == nil || second == nil || second.block == nil { // We need both to sync the first block. - return false + fsm.logger.Debug("No need to send signal to process, blocks missing") + return } - return true -} -func (fsm *bReactorFSM) sendSignalToProcessBlock() { - // TODO - add check that this was sent before, currently there are extraneous tryProcessBlockEv - fsm.logger.Info("Send signal to process", "first", fsm.height, "second", fsm.height+1) + fsm.logger.Debug("Send signal to process", "first", fsm.height, "second", fsm.height+1) + if fsm.processSignalActive { + fsm.logger.Debug("..already sent") + return + } msgData := bReactorMessageData{ event: tryProcessBlockEv, data: bReactorEventData{}, } sendMessageToFSM(fsm, msgData) + fsm.processSignalActive = true } // Processes block at height H = fsm.height. Expects both H and H+1 to be available @@ -703,13 +722,14 @@ func (fsm *bReactorFSM) processBlock() error { second := fsm.blocks[fsm.height+1] if first == nil || first.block == nil || second == nil || second.block == nil { // We need both to sync the first block. + fsm.logger.Debug("process blocks doesn't have the blocks", "first", first, "second", second) return errMissingBlocks } - fsm.logger.Info("process blocks", "first", first.block.Height, "second", second.block.Height) - fsm.logger.Info("FSM blocks", "blocks", fsm.blocks) + fsm.logger.Debug("process blocks", "first", first.block.Height, "second", second.block.Height) + fsm.logger.Debug("FSM blocks", "blocks", fsm.blocks) if err := fsm.bcr.processBlocks(first.block, second.block); err != nil { - fsm.logger.Error("Process blocks returned error", "err", err, "first", first.block.Height, "second", second.block.Height) + fsm.logger.Error("process blocks returned error", "err", err, "first", first.block.Height, "second", second.block.Height) fsm.logger.Error("FSM blocks", "blocks", fsm.blocks) return err } diff --git a/blockchain_new/reactor_fsm_test.go b/blockchain_new/reactor_fsm_test.go index 920e195a..5c84cddc 100644 --- a/blockchain_new/reactor_fsm_test.go +++ b/blockchain_new/reactor_fsm_test.go @@ -64,6 +64,16 @@ type fsmStepTestValues struct { expectedLastBlockReq *lastBlockRequestT } +func newTestReactor() *testReactor { + logger := log.TestingLogger() + blockDB := dbm.NewMemDB() + store := NewBlockStore(blockDB) + testBcR := &testReactor{logger: logger} + testBcR.fsm = NewFSM(store, testBcR) + testBcR.fsm.setLogger(logger) + return testBcR +} + // WIP func TestFSMTransitionSequences(t *testing.T) { maxRequestBatchSize = 2 @@ -79,27 +89,19 @@ func TestFSMTransitionSequences(t *testing.T) { } for _, tt := range fsmTransitionSequenceTests { - // Create and start the FSM - testBcR := &testReactor{logger: log.TestingLogger()} - blockDB := dbm.NewMemDB() - store := NewBlockStore(blockDB) - fsm := NewFSM(store, testBcR) - fsm.setLogger(log.TestingLogger()) + // Create test reactor + testBcR := newTestReactor() resetTestValues() - // always start from unknown - fsm.resetStateTimer(unknown) - assert.Equal(t, 1, stateTimerStarts[unknown.name]) - for _, step := range tt { - assert.Equal(t, step.currentState, fsm.state.name) + assert.Equal(t, step.currentState, testBcR.fsm.state.name) failSendStatusRequest = step.failStatusReq failSendBlockRequest = step.failBlockReq oldNumStatusRequests := numStatusRequests oldNumBlockRequests := numBlockRequests - _ = sendEventToFSM(fsm, step.event, step.data) + _ = sendEventToFSM(testBcR.fsm, step.event, step.data) if step.shouldSendStatusReq { assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) } else { @@ -112,7 +114,7 @@ func TestFSMTransitionSequences(t *testing.T) { assert.Equal(t, oldNumBlockRequests, numBlockRequests) } - assert.Equal(t, step.expectedState, fsm.state.name) + assert.Equal(t, step.expectedState, testBcR.fsm.state.name) } } } @@ -120,13 +122,10 @@ func TestFSMTransitionSequences(t *testing.T) { func TestReactorFSMBasic(t *testing.T) { maxRequestBatchSize = 2 + // Create test reactor + testBcR := newTestReactor() resetTestValues() - // Create and start the FSM - testBcR := &testReactor{logger: log.TestingLogger()} - blockDB := dbm.NewMemDB() - store := NewBlockStore(blockDB) - fsm := NewFSM(store, testBcR) - fsm.setLogger(log.TestingLogger()) + fsm := testBcR.fsm if err := fsm.handle(&bReactorMessageData{event: startFSMEv}); err != nil { } @@ -151,11 +150,8 @@ func TestReactorFSMPeerTimeout(t *testing.T) { resetTestValues() peerTimeout = 20 * time.Millisecond // Create and start the FSM - testBcR := &testReactor{logger: log.TestingLogger()} - blockDB := dbm.NewMemDB() - store := NewBlockStore(blockDB) - fsm := NewFSM(store, testBcR) - fsm.setLogger(log.TestingLogger()) + testBcR := newTestReactor() + fsm := testBcR.fsm fsm.start() // Check that FSM sends a status request message diff --git a/blockchain_new/reactor_test.go b/blockchain_new/reactor_test.go index a9e9706c..7da63a5e 100644 --- a/blockchain_new/reactor_test.go +++ b/blockchain_new/reactor_test.go @@ -290,6 +290,100 @@ func TestBadBlockStopsPeer(t *testing.T) { assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) } +func setupReactors( + numReactors int, maxBlockHeight int64, + genDoc *types.GenesisDoc, privVals []types.PrivValidator) ([]BlockchainReactorPair, []*p2p.Switch) { + + defer os.RemoveAll(config.RootDir) + + reactorPairs := make([]BlockchainReactorPair, numReactors) + + var logger = make([]log.Logger, numReactors) + + for i := 0; i < numReactors; i++ { + logger[i] = log.TestingLogger() + height := int64(0) + if i == 0 { + height = maxBlockHeight + } + reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height) + } + + switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + return s + + }, p2p.Connect2Switches) + + for i := 0; i < numReactors; i++ { + addr := reactorPairs[i].reactor.Switch.NodeInfo().ID() + moduleName := fmt.Sprintf("blockchain-%v", addr) + reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19])) + } + + return reactorPairs, switches +} + +func TestFastSyncMultiNode(t *testing.T) { + + numNodes := 8 + maxHeight := int64(1000) + peerTimeout = 15 * time.Second + maxRequestBatchSize = 40 + + config = cfg.ResetTestRoot("blockchain_reactor_test") + genDoc, privVals := randGenesisDoc(1, false, 30) + + reactorPairs, switches := setupReactors(numNodes, maxHeight, genDoc, privVals) + + defer func() { + for _, r := range reactorPairs { + _ = r.reactor.Stop() + _ = r.app.Stop() + } + }() + + for { + if reactorPairs[numNodes-1].reactor.fsm.IsFinished() || reactorPairs[3].reactor.Switch.Peers().Size() == 0 { + break + } + + time.Sleep(1 * time.Second) + } + + //at this time, reactors[0-3] are the newest + assert.Equal(t, numNodes-1, reactorPairs[1].reactor.Switch.Peers().Size()) + + lastLogger := log.TestingLogger() + lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) + reactorPairs = append(reactorPairs, lastReactorPair) + + switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) + return s + + }, p2p.Connect2Switches)...) + + addr := lastReactorPair.reactor.Switch.NodeInfo().ID() + moduleName := fmt.Sprintf("blockchain-%v", addr) + lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19])) + + for i := 0; i < len(reactorPairs)-1; i++ { + p2p.Connect2Switches(switches, i, len(reactorPairs)-1) + } + + for { + if lastReactorPair.reactor.fsm.IsFinished() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { + break + } + + time.Sleep(1 * time.Second) + } + + assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)) + assert.Equal(t, lastReactorPair.reactor.fsm.maxPeerHeight, lastReactorPair.reactor.fsm.height) +} + //---------------------------------------------- // utility funcs diff --git a/libs/log/testing_logger.go b/libs/log/testing_logger.go index 8914bd81..0cf337be 100644 --- a/libs/log/testing_logger.go +++ b/libs/log/testing_logger.go @@ -30,10 +30,6 @@ func TestingLogger() Logger { // inside a test (not in the init func) because // verbose flag only set at the time of testing. func TestingLoggerWithOutput(w io.Writer) Logger { - if _testingLogger != nil { - return _testingLogger - } - if testing.Verbose() { _testingLogger = NewTMLogger(NewSyncWriter(w)) } else {