From 0541ecb243e425c72f214c673256a82b70350dc5 Mon Sep 17 00:00:00 2001 From: Anca Zamfir Date: Sun, 14 Apr 2019 22:53:24 -0400 Subject: [PATCH] fixed data race in tests --- blockchain/peer.go | 2 - blockchain/peer_test.go | 27 +++-- blockchain/pool_test.go | 5 +- blockchain/reactor.go | 18 +++- blockchain/reactor_fsm.go | 51 +++------- blockchain/reactor_fsm_test.go | 24 ++--- blockchain/reactor_test.go | 177 ++++++++++++++++++--------------- 7 files changed, 156 insertions(+), 148 deletions(-) diff --git a/blockchain/peer.go b/blockchain/peer.go index 73377545..42d80993 100644 --- a/blockchain/peer.go +++ b/blockchain/peer.go @@ -98,8 +98,6 @@ func (peer *bpPeer) decrPending(recvSize int) { func (peer *bpPeer) onTimeout() { peer.errFunc(errNoPeerResponse, peer.id) - peer.logger.Error("SendTimeout", "reason", errNoPeerResponse, "timeout", peerTimeout) - peer.didTimeout = true } func (peer *bpPeer) isGood() error { diff --git a/blockchain/peer_test.go b/blockchain/peer_test.go index 57092537..876ab355 100644 --- a/blockchain/peer_test.go +++ b/blockchain/peer_test.go @@ -16,14 +16,19 @@ import ( var ( numErrFuncCalls int lastErr error + peerTestMtx sync.Mutex ) func resetErrors() { + peerTestMtx.Lock() + defer peerTestMtx.Unlock() numErrFuncCalls = 0 lastErr = nil } func errFunc(err error, peerID p2p.ID) { + peerTestMtx.Lock() + defer peerTestMtx.Unlock() _ = peerID lastErr = err numErrFuncCalls++ @@ -86,16 +91,17 @@ func TestPeerTimer(t *testing.T) { time.Sleep(3 * time.Millisecond) checkByStoppingPeerTimer(t, peer, false) + peerTestMtx.Lock() // ... check an error has been sent, error is peerNonResponsive assert.Equal(t, 1, numErrFuncCalls) assert.Equal(t, lastErr, errNoPeerResponse) - assert.True(t, peer.didTimeout) + peerTestMtx.Unlock() // Restore the peerTimeout to its original value peerTimeout = 15 * time.Second } -func TestIncrPending(t *testing.T) { +func TestPeerIncrPending(t *testing.T) { peerTimeout = 2 * time.Millisecond peer := &bpPeer{ @@ -119,7 +125,7 @@ func TestIncrPending(t *testing.T) { peerTimeout = 15 * time.Second } -func TestDecrPending(t *testing.T) { +func TestPeerDecrPending(t *testing.T) { peerTimeout = 2 * time.Millisecond peer := &bpPeer{ @@ -151,7 +157,7 @@ func TestDecrPending(t *testing.T) { peerTimeout = 15 * time.Second } -func TestCanBeRemovedDueToExpiration(t *testing.T) { +func TestPeerCanBeRemovedDueToExpiration(t *testing.T) { minRecvRate = int64(100) // 100 bytes/sec exponential moving average peer := &bpPeer{ @@ -165,14 +171,16 @@ func TestCanBeRemovedDueToExpiration(t *testing.T) { peer.incrPending() time.Sleep(2 * time.Millisecond) // timer expired, should be able to remove peer - assert.Equal(t, errNoPeerResponse, peer.isGood()) + peerTestMtx.Lock() + assert.Equal(t, errNoPeerResponse, lastErr) + peerTestMtx.Unlock() // Restore the peerTimeout to its original value peerTimeout = 15 * time.Second } -func TestCanBeRemovedDueToLowSpeed(t *testing.T) { +func TestPeerCanBeRemovedDueToLowSpeed(t *testing.T) { minRecvRate = int64(100) // 100 bytes/sec exponential moving average peer := &bpPeer{ @@ -209,8 +217,7 @@ func TestCanBeRemovedDueToLowSpeed(t *testing.T) { } -func TestCleanupPeer(t *testing.T) { - var mtx sync.Mutex +func TestPeerCleanup(t *testing.T) { peer := &bpPeer{ id: p2p.ID(cmn.RandStr(12)), height: 10, @@ -224,9 +231,9 @@ func TestCleanupPeer(t *testing.T) { peer.resetTimeout() assert.NotNil(t, peer.timeout) - mtx.Lock() + peerTestMtx.Lock() peer.cleanup() - mtx.Unlock() + peerTestMtx.Unlock() checkByStoppingPeerTimer(t, peer, false) // Restore the peerTimeout to its original value diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index 1630376a..f1b11366 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -49,10 +49,7 @@ func (testR *testBcR) sendBlockRequest(peerID p2p.ID, height int64) error { return nil } -func (testR *testBcR) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) { -} - -func (testR *testBcR) switchToConsensus() { +func (testR *testBcR) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { } func newTestBcR() *testBcR { diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 65ac9087..2136589d 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -356,7 +356,6 @@ ForLoop: if err == errMissingBlocks { continue ForLoop } - // Notify FSM of block processing result. msgData := bReactorMessageData{ event: processedBlockEv, @@ -445,11 +444,22 @@ func (bcR *BlockchainReactor) processBlocksFromPoolRoutine() error { return nil } -func (bcR *BlockchainReactor) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) { +func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { if timer == nil { - timer = time.AfterFunc(timeout, f) + panic("nil timer pointer parameter") + } + if *timer == nil { + *timer = time.AfterFunc(timeout, func() { + msg := bReactorMessageData{ + event: stateTimeoutEv, + data: bReactorEventData{ + stateName: name, + }, + } + bcR.sendMessageToFSMAsync(msg) + }) } else { - timer.Reset(timeout) + (*timer).Reset(timeout) } } diff --git a/blockchain/reactor_fsm.go b/blockchain/reactor_fsm.go index ada3adfb..0eaf6153 100644 --- a/blockchain/reactor_fsm.go +++ b/blockchain/reactor_fsm.go @@ -19,8 +19,8 @@ type bReactorFSMState struct { // called when entering the state enter func(fsm *bReactorFSM) - // timer to ensure FSM is not stuck in a state forever - timer *time.Timer + // timeout to ensure FSM is not stuck in a state forever + // the timer is owned and run by the fsm instance timeout time.Duration } @@ -35,8 +35,7 @@ type bcRMessageInterface interface { sendStatusRequest() sendBlockRequest(peerID p2p.ID, height int64) error sendPeerError(err error, peerID p2p.ID) - resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) - switchToConsensus() + resetStateTimer(name string, timer **time.Timer, timeout time.Duration) } // Blockchain Reactor State Machine @@ -44,8 +43,9 @@ type bReactorFSM struct { logger log.Logger startTime time.Time - state *bReactorFSMState - pool *blockPool + state *bReactorFSMState + stateTimer *time.Timer + pool *blockPool // interface used to call the Blockchain reactor to send StatusRequest, BlockRequest, reporting errors, etc. toBcR bcRMessageInterface @@ -169,9 +169,6 @@ func init() { case startFSMEv: // Broadcast Status message. Currently doesn't return non-nil error. fsm.toBcR.sendStatusRequest() - if fsm.state.timer != nil { - fsm.state.timer.Stop() - } return waitForPeer, nil case stopFSMEv: @@ -188,16 +185,17 @@ func init() { timeout: waitForPeerTimeout, enter: func(fsm *bReactorFSM) { // Stop when leaving the state. - fsm.resetStateTimer(waitForPeer) + fsm.resetStateTimer() }, handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { switch ev { case stateTimeoutEv: + if data.stateName != "waitForPeer" { + fsm.logger.Error("received a state timeout event for different state", "state", data.stateName) + return waitForPeer, nil + } // There was no statusResponse received from any peer. // Should we send status request again? - if fsm.state.timer != nil { - fsm.state.timer.Stop() - } return finished, errNoPeerResponse case statusResponseEv: @@ -209,6 +207,9 @@ func init() { return waitForBlock, nil case stopFSMEv: + if fsm.stateTimer != nil { + fsm.stateTimer.Stop() + } return finished, errNoErrorFinished default: @@ -225,10 +226,6 @@ func init() { case statusResponseEv: err := fsm.pool.updatePeer(data.peerId, data.height) if len(fsm.pool.peers) == 0 { - fsm.toBcR.sendStatusRequest() - if fsm.state.timer != nil { - fsm.state.timer.Stop() - } return waitForPeer, err } return waitForBlock, err @@ -360,26 +357,10 @@ func (fsm *bReactorFSM) transition(next *bReactorFSMState) { } } -// FSM state timeout handler -func (fsm *bReactorFSM) sendStateTimeoutEvent(stateName string) { - // Check that the timeout is for the state we are currently in to prevent wrong transitions. - if stateName == fsm.state.name { - msg := bReactorMessageData{ - event: stateTimeoutEv, - data: bReactorEventData{ - stateName: stateName, - }, - } - _ = sendMessageToFSMSync(fsm, msg) - } -} - // Called when entering an FSM state in order to detect lack of progress in the state machine. // Note the use of the 'bcr' interface to facilitate testing without timer expiring. -func (fsm *bReactorFSM) resetStateTimer(state *bReactorFSMState) { - fsm.toBcR.resetStateTimer(state.name, state.timer, state.timeout, func() { - fsm.sendStateTimeoutEvent(state.name) - }) +func (fsm *bReactorFSM) resetStateTimer() { + fsm.toBcR.resetStateTimer(fsm.state.name, &fsm.stateTimer, fsm.state.timeout) } func (fsm *bReactorFSM) isCaughtUp() bool { diff --git a/blockchain/reactor_fsm_test.go b/blockchain/reactor_fsm_test.go index 75b35dab..d3b27c70 100644 --- a/blockchain/reactor_fsm_test.go +++ b/blockchain/reactor_fsm_test.go @@ -2,6 +2,7 @@ package blockchain import ( "fmt" + "sync" "testing" "time" @@ -23,6 +24,8 @@ func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) } var ( + testFSMmtx sync.Mutex + failSendStatusRequest bool failSendBlockRequest bool numStatusRequests int @@ -444,7 +447,7 @@ const ( maxRequestsPerPeerTest = 40 maxTotalPendingRequestsTest = 600 maxNumPeersTest = 1000 - maxNumBlocksInChainTest = 100000 + maxNumBlocksInChainTest = 50000 ) type testFields struct { @@ -540,7 +543,7 @@ func makeCorrectTransitionSequenceWithRandomParameters() testFields { maxRequestsPerPeer := int32(cmn.RandIntn(maxRequestsPerPeerTest) + 1) // generate the maximum number of total pending requests - maxPendingRequests := int32(cmn.RandIntn(maxTotalPendingRequestsTest) + 1) + maxPendingRequests := int32(cmn.RandIntn(maxTotalPendingRequestsTest-int(maxRequestsPerPeer))) + maxRequestsPerPeer // generate the number of blocks to be synced numBlocks := int64(cmn.RandIntn(maxNumBlocksInChainTest)) + startingHeight @@ -585,10 +588,6 @@ func TestFSMCorrectTransitionSequences(t *testing.T) { assert.Equal(t, oldNumStatusRequests, numStatusRequests) } - for _, height := range step.blocksAdded { - _, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(height) - assert.Nil(t, err) - } assert.Equal(t, step.expectedState, testBcR.fsm.state.name) } @@ -596,7 +595,7 @@ func TestFSMCorrectTransitionSequences(t *testing.T) { } } -func TestReactorFSMPeerTimeout(t *testing.T) { +func TestFSMPeerTimeout(t *testing.T) { maxRequestsPerPeer = 2 resetTestValues() peerTimeout = 20 * time.Millisecond @@ -626,6 +625,8 @@ func TestReactorFSMPeerTimeout(t *testing.T) { // let FSM timeout on the block response message time.Sleep(100 * time.Millisecond) + testFSMmtx.Lock() + defer testFSMmtx.Unlock() assert.Equal(t, peerID, lastPeerError.peerID) assert.Equal(t, errNoPeerResponse, lastPeerError.err) @@ -637,6 +638,8 @@ func TestReactorFSMPeerTimeout(t *testing.T) { // implementation for the test reactor APIs func (testR *testReactor) sendPeerError(err error, peerID p2p.ID) { + testFSMmtx.Lock() + defer testFSMmtx.Unlock() testR.logger.Info("Reactor received sendPeerError call from FSM", "peer", peerID, "err", err) lastPeerError.peerID = peerID lastPeerError.err = err @@ -655,7 +658,7 @@ func (testR *testReactor) sendBlockRequest(peerID p2p.ID, height int64) error { return nil } -func (testR *testReactor) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) { +func (testR *testReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { testR.logger.Info("Reactor received resetStateTimer call from FSM", "state", name, "timeout", timeout) if _, ok := stateTimerStarts[name]; !ok { stateTimerStarts[name] = 1 @@ -664,11 +667,6 @@ func (testR *testReactor) resetStateTimer(name string, timer *time.Timer, timeou } } -func (testR *testReactor) switchToConsensus() { - testR.logger.Info("Reactor received switchToConsensus call from FSM") - -} - // ---------------------------------------- // ------------------------------------------------------- diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 46d52b77..dea972b6 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "sort" + "sync" "testing" "time" @@ -61,8 +62,8 @@ func makeVote(header *types.Header, blockID types.BlockID, valset *types.Validat } type BlockchainReactorPair struct { - reactor *BlockchainReactor - app proxy.AppConns + bcR *BlockchainReactor + conR *consensusReactorTest } func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair { @@ -121,10 +122,26 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor.SetLogger(logger.With("module", "blockchain")) - return BlockchainReactorPair{bcReactor, proxyApp} + consensusReactor := &consensusReactorTest{} + consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor) + + return BlockchainReactorPair{bcReactor, consensusReactor} +} + +type consensusReactorTest struct { + p2p.BaseReactor // BaseService + p2p.Switch + switchedToConsensus bool + mtx sync.Mutex +} + +func (conR *consensusReactorTest) SwitchToConsensus(state sm.State, blocksSynced int) { + conR.mtx.Lock() + defer conR.mtx.Unlock() + conR.switchedToConsensus = true } func TestFastSyncNoBlockResponse(t *testing.T) { + peerTimeout = 15 * time.Second maxRequestsPerPeer = 20 maxNumPendingRequests = 100 @@ -142,9 +159,10 @@ func TestFastSyncNoBlockResponse(t *testing.T) { reactorPairs[1] = newBlockchainReactor(logger, genDoc, privVals, 0) p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR) + s.AddReactor("CONSENSUS", reactorPairs[i].conR) moduleName := fmt.Sprintf("blockchain-%v", i) - reactorPairs[i].reactor.SetLogger(logger.With("module", moduleName)) + reactorPairs[i].bcR.SetLogger(logger.With("module", moduleName)) return s @@ -152,8 +170,8 @@ func TestFastSyncNoBlockResponse(t *testing.T) { defer func() { for _, r := range reactorPairs { - _ = r.reactor.Stop() - _ = r.app.Stop() + _ = r.bcR.Stop() + _ = r.conR.Stop() } }() @@ -169,15 +187,18 @@ func TestFastSyncNoBlockResponse(t *testing.T) { for { time.Sleep(1 * time.Second) - if reactorPairs[1].reactor.fsm.isCaughtUp() { + reactorPairs[1].conR.mtx.Lock() + if reactorPairs[1].conR.switchedToConsensus { + reactorPairs[1].conR.mtx.Unlock() break } + reactorPairs[1].conR.mtx.Unlock() } - assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height()) + assert.Equal(t, maxBlockHeight, reactorPairs[0].bcR.store.Height()) for _, tt := range tests { - block := reactorPairs[1].reactor.store.LoadBlock(tt.height) + block := reactorPairs[1].bcR.store.LoadBlock(tt.height) if tt.existent { assert.True(t, block != nil) } else { @@ -192,27 +213,24 @@ func TestFastSyncNoBlockResponse(t *testing.T) { // Alternatively we could actually dial a TCP conn but // that seems extreme. func TestFastSyncBadBlockStopsPeer(t *testing.T) { - peerTimeout = 15 * time.Second maxRequestsPerPeer = 20 maxNumPendingRequests = 400 numNodes := 4 + maxBlockHeight := int64(148) config = cfg.ResetTestRoot("blockchain_reactor_test") defer os.RemoveAll(config.RootDir) genDoc, privVals := randGenesisDoc(1, false, 30) - maxBlockHeight := int64(148) - otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) defer func() { - _ = otherChain.reactor.Stop() - _ = otherChain.app.Stop() + _ = otherChain.bcR.Stop() + _ = otherChain.conR.Stop() }() reactorPairs := make([]BlockchainReactorPair, numNodes) - - var logger = make([]log.Logger, numNodes) + logger := make([]log.Logger, numNodes) for i := 0; i < numNodes; i++ { logger[i] = log.TestingLogger() @@ -224,41 +242,54 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) { } switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + reactorPairs[i].conR.mtx.Lock() + s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR) + s.AddReactor("CONSENSUS", reactorPairs[i].conR) moduleName := fmt.Sprintf("blockchain-%v", i) - reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName)) + reactorPairs[i].bcR.SetLogger(logger[i].With("module", moduleName)) + reactorPairs[i].conR.mtx.Unlock() return s }, p2p.Connect2Switches) defer func() { for _, r := range reactorPairs { - _ = r.reactor.Stop() - _ = r.app.Stop() + _ = r.bcR.Stop() + _ = r.conR.Stop() } }() for { time.Sleep(1 * time.Second) - if reactorPairs[numNodes-1].reactor.fsm.isCaughtUp() || reactorPairs[numNodes-1].reactor.Switch.Peers().Size() == 0 { + reactorPairs[1].conR.mtx.Lock() + if reactorPairs[1].conR.switchedToConsensus { + reactorPairs[1].conR.mtx.Unlock() + break + } + reactorPairs[1].conR.mtx.Unlock() + + if reactorPairs[numNodes-1].bcR.Switch.Peers().Size() == 0 { break } } //at this time, reactors[0-3] is the newest - assert.Equal(t, numNodes-1, reactorPairs[1].reactor.Switch.Peers().Size()) + assert.Equal(t, numNodes-1, reactorPairs[1].bcR.Switch.Peers().Size()) - //mark reactorPairs[3] is an invalid peer - reactorPairs[numNodes-1].reactor.store = otherChain.reactor.store + //mark last reactorPair as an invalid peer + reactorPairs[numNodes-1].conR.mtx.Lock() + reactorPairs[numNodes-1].bcR.store = otherChain.bcR.store + reactorPairs[numNodes-1].conR.mtx.Unlock() lastLogger := log.TestingLogger() lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) reactorPairs = append(reactorPairs, lastReactorPair) switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) + s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].bcR) + s.AddReactor("CONSENSUS", reactorPairs[len(reactorPairs)-1].conR) moduleName := fmt.Sprintf("blockchain-%v", len(reactorPairs)-1) - reactorPairs[len(reactorPairs)-1].reactor.SetLogger(lastLogger.With("module", moduleName)) + reactorPairs[len(reactorPairs)-1].bcR.SetLogger(lastLogger.With("module", moduleName)) return s }, p2p.Connect2Switches)...) @@ -269,12 +300,20 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) { for { time.Sleep(1 * time.Second) - if lastReactorPair.reactor.fsm.isCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { + + lastReactorPair.conR.mtx.Lock() + if lastReactorPair.conR.switchedToConsensus { + lastReactorPair.conR.mtx.Unlock() + break + } + lastReactorPair.conR.mtx.Unlock() + + if lastReactorPair.bcR.Switch.Peers().Size() == 0 { break } } - assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) + assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs)-1) } func setupReactors( @@ -297,9 +336,10 @@ func setupReactors( } switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR) + s.AddReactor("CONSENSUS", reactorPairs[i].conR) moduleName := fmt.Sprintf("blockchain-%v", i) - reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName)) + reactorPairs[i].bcR.SetLogger(logger[i].With("module", moduleName)) return s }, p2p.Connect2Switches) @@ -328,39 +368,40 @@ func TestFastSyncMultiNode(t *testing.T) { defer func() { for _, r := range reactorPairs { - _ = r.reactor.Stop() - _ = r.app.Stop() + _ = r.bcR.Stop() + _ = r.conR.Stop() } }() outerFor: for { - i := 0 - for i < numNodes { - if !reactorPairs[i].reactor.fsm.isCaughtUp() { - break + time.Sleep(1 * time.Second) + + for i := 0; i < numNodes; i++ { + reactorPairs[i].conR.mtx.Lock() + if !reactorPairs[i].conR.switchedToConsensus { + reactorPairs[i].conR.mtx.Unlock() + continue outerFor } - i++ - } - if i == numNodes { - fmt.Println("SETUP FAST SYNC Duration", time.Since(start)) - break outerFor - } else { - time.Sleep(1 * time.Second) + reactorPairs[i].conR.mtx.Unlock() } + + fmt.Println("SETUP FAST SYNC Duration", time.Since(start)) + break outerFor } //at this time, reactors[0-3] are the newest - assert.Equal(t, numNodes-1, reactorPairs[0].reactor.Switch.Peers().Size()) + assert.Equal(t, numNodes-1, reactorPairs[0].bcR.Switch.Peers().Size()) lastLogger := log.TestingLogger() lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) reactorPairs = append(reactorPairs, lastReactorPair) switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) + s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].bcR) + s.AddReactor("CONSENSUS", reactorPairs[len(reactorPairs)-1].conR) moduleName := fmt.Sprintf("blockchainTEST-%d", len(reactorPairs)-1) - reactorPairs[len(reactorPairs)-1].reactor.SetLogger(lastLogger.With("module", moduleName)) + reactorPairs[len(reactorPairs)-1].bcR.SetLogger(lastLogger.With("module", moduleName)) return s }, p2p.Connect2Switches)...) @@ -373,14 +414,20 @@ outerFor: for { time.Sleep(1 * time.Second) - if lastReactorPair.reactor.fsm.isCaughtUp() { + lastReactorPair.conR.mtx.Lock() + if lastReactorPair.conR.switchedToConsensus { fmt.Println("FAST SYNC Duration", time.Since(start)) + lastReactorPair.conR.mtx.Unlock() + break + } + lastReactorPair.conR.mtx.Unlock() + + if lastReactorPair.bcR.Switch.Peers().Size() == 0 { break } } - assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)) - assert.Equal(t, lastReactorPair.reactor.fsm.pool.getMaxPeerHeight(), lastReactorPair.reactor.fsm.pool.height) + assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs)) } //---------------------------------------------- @@ -401,33 +448,3 @@ func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Bl type testApp struct { abci.BaseApplication } - -var _ abci.Application = (*testApp)(nil) - -func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) { - return abci.ResponseInfo{} -} - -func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock { - return abci.ResponseBeginBlock{} -} - -func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { - return abci.ResponseEndBlock{} -} - -func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx { - return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}} -} - -func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx { - return abci.ResponseCheckTx{} -} - -func (app *testApp) Commit() abci.ResponseCommit { - return abci.ResponseCommit{} -} - -func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) { - return -}