mirror of
https://github.com/fluencelabs/tendermint
synced 2025-07-05 23:51:41 +00:00
fixed data race in tests
This commit is contained in:
@ -98,8 +98,6 @@ func (peer *bpPeer) decrPending(recvSize int) {
|
|||||||
|
|
||||||
func (peer *bpPeer) onTimeout() {
|
func (peer *bpPeer) onTimeout() {
|
||||||
peer.errFunc(errNoPeerResponse, peer.id)
|
peer.errFunc(errNoPeerResponse, peer.id)
|
||||||
peer.logger.Error("SendTimeout", "reason", errNoPeerResponse, "timeout", peerTimeout)
|
|
||||||
peer.didTimeout = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (peer *bpPeer) isGood() error {
|
func (peer *bpPeer) isGood() error {
|
||||||
|
@ -16,14 +16,19 @@ import (
|
|||||||
var (
|
var (
|
||||||
numErrFuncCalls int
|
numErrFuncCalls int
|
||||||
lastErr error
|
lastErr error
|
||||||
|
peerTestMtx sync.Mutex
|
||||||
)
|
)
|
||||||
|
|
||||||
func resetErrors() {
|
func resetErrors() {
|
||||||
|
peerTestMtx.Lock()
|
||||||
|
defer peerTestMtx.Unlock()
|
||||||
numErrFuncCalls = 0
|
numErrFuncCalls = 0
|
||||||
lastErr = nil
|
lastErr = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func errFunc(err error, peerID p2p.ID) {
|
func errFunc(err error, peerID p2p.ID) {
|
||||||
|
peerTestMtx.Lock()
|
||||||
|
defer peerTestMtx.Unlock()
|
||||||
_ = peerID
|
_ = peerID
|
||||||
lastErr = err
|
lastErr = err
|
||||||
numErrFuncCalls++
|
numErrFuncCalls++
|
||||||
@ -86,16 +91,17 @@ func TestPeerTimer(t *testing.T) {
|
|||||||
time.Sleep(3 * time.Millisecond)
|
time.Sleep(3 * time.Millisecond)
|
||||||
checkByStoppingPeerTimer(t, peer, false)
|
checkByStoppingPeerTimer(t, peer, false)
|
||||||
|
|
||||||
|
peerTestMtx.Lock()
|
||||||
// ... check an error has been sent, error is peerNonResponsive
|
// ... check an error has been sent, error is peerNonResponsive
|
||||||
assert.Equal(t, 1, numErrFuncCalls)
|
assert.Equal(t, 1, numErrFuncCalls)
|
||||||
assert.Equal(t, lastErr, errNoPeerResponse)
|
assert.Equal(t, lastErr, errNoPeerResponse)
|
||||||
assert.True(t, peer.didTimeout)
|
peerTestMtx.Unlock()
|
||||||
|
|
||||||
// Restore the peerTimeout to its original value
|
// Restore the peerTimeout to its original value
|
||||||
peerTimeout = 15 * time.Second
|
peerTimeout = 15 * time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIncrPending(t *testing.T) {
|
func TestPeerIncrPending(t *testing.T) {
|
||||||
peerTimeout = 2 * time.Millisecond
|
peerTimeout = 2 * time.Millisecond
|
||||||
|
|
||||||
peer := &bpPeer{
|
peer := &bpPeer{
|
||||||
@ -119,7 +125,7 @@ func TestIncrPending(t *testing.T) {
|
|||||||
peerTimeout = 15 * time.Second
|
peerTimeout = 15 * time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDecrPending(t *testing.T) {
|
func TestPeerDecrPending(t *testing.T) {
|
||||||
peerTimeout = 2 * time.Millisecond
|
peerTimeout = 2 * time.Millisecond
|
||||||
|
|
||||||
peer := &bpPeer{
|
peer := &bpPeer{
|
||||||
@ -151,7 +157,7 @@ func TestDecrPending(t *testing.T) {
|
|||||||
peerTimeout = 15 * time.Second
|
peerTimeout = 15 * time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCanBeRemovedDueToExpiration(t *testing.T) {
|
func TestPeerCanBeRemovedDueToExpiration(t *testing.T) {
|
||||||
minRecvRate = int64(100) // 100 bytes/sec exponential moving average
|
minRecvRate = int64(100) // 100 bytes/sec exponential moving average
|
||||||
|
|
||||||
peer := &bpPeer{
|
peer := &bpPeer{
|
||||||
@ -165,14 +171,16 @@ func TestCanBeRemovedDueToExpiration(t *testing.T) {
|
|||||||
peer.incrPending()
|
peer.incrPending()
|
||||||
time.Sleep(2 * time.Millisecond)
|
time.Sleep(2 * time.Millisecond)
|
||||||
// timer expired, should be able to remove peer
|
// timer expired, should be able to remove peer
|
||||||
assert.Equal(t, errNoPeerResponse, peer.isGood())
|
peerTestMtx.Lock()
|
||||||
|
assert.Equal(t, errNoPeerResponse, lastErr)
|
||||||
|
peerTestMtx.Unlock()
|
||||||
|
|
||||||
// Restore the peerTimeout to its original value
|
// Restore the peerTimeout to its original value
|
||||||
peerTimeout = 15 * time.Second
|
peerTimeout = 15 * time.Second
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCanBeRemovedDueToLowSpeed(t *testing.T) {
|
func TestPeerCanBeRemovedDueToLowSpeed(t *testing.T) {
|
||||||
minRecvRate = int64(100) // 100 bytes/sec exponential moving average
|
minRecvRate = int64(100) // 100 bytes/sec exponential moving average
|
||||||
|
|
||||||
peer := &bpPeer{
|
peer := &bpPeer{
|
||||||
@ -209,8 +217,7 @@ func TestCanBeRemovedDueToLowSpeed(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCleanupPeer(t *testing.T) {
|
func TestPeerCleanup(t *testing.T) {
|
||||||
var mtx sync.Mutex
|
|
||||||
peer := &bpPeer{
|
peer := &bpPeer{
|
||||||
id: p2p.ID(cmn.RandStr(12)),
|
id: p2p.ID(cmn.RandStr(12)),
|
||||||
height: 10,
|
height: 10,
|
||||||
@ -224,9 +231,9 @@ func TestCleanupPeer(t *testing.T) {
|
|||||||
peer.resetTimeout()
|
peer.resetTimeout()
|
||||||
assert.NotNil(t, peer.timeout)
|
assert.NotNil(t, peer.timeout)
|
||||||
|
|
||||||
mtx.Lock()
|
peerTestMtx.Lock()
|
||||||
peer.cleanup()
|
peer.cleanup()
|
||||||
mtx.Unlock()
|
peerTestMtx.Unlock()
|
||||||
|
|
||||||
checkByStoppingPeerTimer(t, peer, false)
|
checkByStoppingPeerTimer(t, peer, false)
|
||||||
// Restore the peerTimeout to its original value
|
// Restore the peerTimeout to its original value
|
||||||
|
@ -49,10 +49,7 @@ func (testR *testBcR) sendBlockRequest(peerID p2p.ID, height int64) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (testR *testBcR) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) {
|
func (testR *testBcR) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) {
|
||||||
}
|
|
||||||
|
|
||||||
func (testR *testBcR) switchToConsensus() {
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestBcR() *testBcR {
|
func newTestBcR() *testBcR {
|
||||||
|
@ -356,7 +356,6 @@ ForLoop:
|
|||||||
if err == errMissingBlocks {
|
if err == errMissingBlocks {
|
||||||
continue ForLoop
|
continue ForLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify FSM of block processing result.
|
// Notify FSM of block processing result.
|
||||||
msgData := bReactorMessageData{
|
msgData := bReactorMessageData{
|
||||||
event: processedBlockEv,
|
event: processedBlockEv,
|
||||||
@ -445,11 +444,22 @@ func (bcR *BlockchainReactor) processBlocksFromPoolRoutine() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bcR *BlockchainReactor) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) {
|
func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) {
|
||||||
if timer == nil {
|
if timer == nil {
|
||||||
timer = time.AfterFunc(timeout, f)
|
panic("nil timer pointer parameter")
|
||||||
|
}
|
||||||
|
if *timer == nil {
|
||||||
|
*timer = time.AfterFunc(timeout, func() {
|
||||||
|
msg := bReactorMessageData{
|
||||||
|
event: stateTimeoutEv,
|
||||||
|
data: bReactorEventData{
|
||||||
|
stateName: name,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
bcR.sendMessageToFSMAsync(msg)
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
timer.Reset(timeout)
|
(*timer).Reset(timeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,8 +19,8 @@ type bReactorFSMState struct {
|
|||||||
// called when entering the state
|
// called when entering the state
|
||||||
enter func(fsm *bReactorFSM)
|
enter func(fsm *bReactorFSM)
|
||||||
|
|
||||||
// timer to ensure FSM is not stuck in a state forever
|
// timeout to ensure FSM is not stuck in a state forever
|
||||||
timer *time.Timer
|
// the timer is owned and run by the fsm instance
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -35,8 +35,7 @@ type bcRMessageInterface interface {
|
|||||||
sendStatusRequest()
|
sendStatusRequest()
|
||||||
sendBlockRequest(peerID p2p.ID, height int64) error
|
sendBlockRequest(peerID p2p.ID, height int64) error
|
||||||
sendPeerError(err error, peerID p2p.ID)
|
sendPeerError(err error, peerID p2p.ID)
|
||||||
resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func())
|
resetStateTimer(name string, timer **time.Timer, timeout time.Duration)
|
||||||
switchToConsensus()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Blockchain Reactor State Machine
|
// Blockchain Reactor State Machine
|
||||||
@ -45,6 +44,7 @@ type bReactorFSM struct {
|
|||||||
startTime time.Time
|
startTime time.Time
|
||||||
|
|
||||||
state *bReactorFSMState
|
state *bReactorFSMState
|
||||||
|
stateTimer *time.Timer
|
||||||
pool *blockPool
|
pool *blockPool
|
||||||
|
|
||||||
// interface used to call the Blockchain reactor to send StatusRequest, BlockRequest, reporting errors, etc.
|
// interface used to call the Blockchain reactor to send StatusRequest, BlockRequest, reporting errors, etc.
|
||||||
@ -169,9 +169,6 @@ func init() {
|
|||||||
case startFSMEv:
|
case startFSMEv:
|
||||||
// Broadcast Status message. Currently doesn't return non-nil error.
|
// Broadcast Status message. Currently doesn't return non-nil error.
|
||||||
fsm.toBcR.sendStatusRequest()
|
fsm.toBcR.sendStatusRequest()
|
||||||
if fsm.state.timer != nil {
|
|
||||||
fsm.state.timer.Stop()
|
|
||||||
}
|
|
||||||
return waitForPeer, nil
|
return waitForPeer, nil
|
||||||
|
|
||||||
case stopFSMEv:
|
case stopFSMEv:
|
||||||
@ -188,16 +185,17 @@ func init() {
|
|||||||
timeout: waitForPeerTimeout,
|
timeout: waitForPeerTimeout,
|
||||||
enter: func(fsm *bReactorFSM) {
|
enter: func(fsm *bReactorFSM) {
|
||||||
// Stop when leaving the state.
|
// Stop when leaving the state.
|
||||||
fsm.resetStateTimer(waitForPeer)
|
fsm.resetStateTimer()
|
||||||
},
|
},
|
||||||
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) {
|
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) {
|
||||||
switch ev {
|
switch ev {
|
||||||
case stateTimeoutEv:
|
case stateTimeoutEv:
|
||||||
|
if data.stateName != "waitForPeer" {
|
||||||
|
fsm.logger.Error("received a state timeout event for different state", "state", data.stateName)
|
||||||
|
return waitForPeer, nil
|
||||||
|
}
|
||||||
// There was no statusResponse received from any peer.
|
// There was no statusResponse received from any peer.
|
||||||
// Should we send status request again?
|
// Should we send status request again?
|
||||||
if fsm.state.timer != nil {
|
|
||||||
fsm.state.timer.Stop()
|
|
||||||
}
|
|
||||||
return finished, errNoPeerResponse
|
return finished, errNoPeerResponse
|
||||||
|
|
||||||
case statusResponseEv:
|
case statusResponseEv:
|
||||||
@ -209,6 +207,9 @@ func init() {
|
|||||||
return waitForBlock, nil
|
return waitForBlock, nil
|
||||||
|
|
||||||
case stopFSMEv:
|
case stopFSMEv:
|
||||||
|
if fsm.stateTimer != nil {
|
||||||
|
fsm.stateTimer.Stop()
|
||||||
|
}
|
||||||
return finished, errNoErrorFinished
|
return finished, errNoErrorFinished
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@ -225,10 +226,6 @@ func init() {
|
|||||||
case statusResponseEv:
|
case statusResponseEv:
|
||||||
err := fsm.pool.updatePeer(data.peerId, data.height)
|
err := fsm.pool.updatePeer(data.peerId, data.height)
|
||||||
if len(fsm.pool.peers) == 0 {
|
if len(fsm.pool.peers) == 0 {
|
||||||
fsm.toBcR.sendStatusRequest()
|
|
||||||
if fsm.state.timer != nil {
|
|
||||||
fsm.state.timer.Stop()
|
|
||||||
}
|
|
||||||
return waitForPeer, err
|
return waitForPeer, err
|
||||||
}
|
}
|
||||||
return waitForBlock, err
|
return waitForBlock, err
|
||||||
@ -360,26 +357,10 @@ func (fsm *bReactorFSM) transition(next *bReactorFSMState) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// FSM state timeout handler
|
|
||||||
func (fsm *bReactorFSM) sendStateTimeoutEvent(stateName string) {
|
|
||||||
// Check that the timeout is for the state we are currently in to prevent wrong transitions.
|
|
||||||
if stateName == fsm.state.name {
|
|
||||||
msg := bReactorMessageData{
|
|
||||||
event: stateTimeoutEv,
|
|
||||||
data: bReactorEventData{
|
|
||||||
stateName: stateName,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
_ = sendMessageToFSMSync(fsm, msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Called when entering an FSM state in order to detect lack of progress in the state machine.
|
// Called when entering an FSM state in order to detect lack of progress in the state machine.
|
||||||
// Note the use of the 'bcr' interface to facilitate testing without timer expiring.
|
// Note the use of the 'bcr' interface to facilitate testing without timer expiring.
|
||||||
func (fsm *bReactorFSM) resetStateTimer(state *bReactorFSMState) {
|
func (fsm *bReactorFSM) resetStateTimer() {
|
||||||
fsm.toBcR.resetStateTimer(state.name, state.timer, state.timeout, func() {
|
fsm.toBcR.resetStateTimer(fsm.state.name, &fsm.stateTimer, fsm.state.timeout)
|
||||||
fsm.sendStateTimeoutEvent(state.name)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fsm *bReactorFSM) isCaughtUp() bool {
|
func (fsm *bReactorFSM) isCaughtUp() bool {
|
||||||
|
@ -2,6 +2,7 @@ package blockchain
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -23,6 +24,8 @@ func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData)
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
testFSMmtx sync.Mutex
|
||||||
|
|
||||||
failSendStatusRequest bool
|
failSendStatusRequest bool
|
||||||
failSendBlockRequest bool
|
failSendBlockRequest bool
|
||||||
numStatusRequests int
|
numStatusRequests int
|
||||||
@ -444,7 +447,7 @@ const (
|
|||||||
maxRequestsPerPeerTest = 40
|
maxRequestsPerPeerTest = 40
|
||||||
maxTotalPendingRequestsTest = 600
|
maxTotalPendingRequestsTest = 600
|
||||||
maxNumPeersTest = 1000
|
maxNumPeersTest = 1000
|
||||||
maxNumBlocksInChainTest = 100000
|
maxNumBlocksInChainTest = 50000
|
||||||
)
|
)
|
||||||
|
|
||||||
type testFields struct {
|
type testFields struct {
|
||||||
@ -540,7 +543,7 @@ func makeCorrectTransitionSequenceWithRandomParameters() testFields {
|
|||||||
maxRequestsPerPeer := int32(cmn.RandIntn(maxRequestsPerPeerTest) + 1)
|
maxRequestsPerPeer := int32(cmn.RandIntn(maxRequestsPerPeerTest) + 1)
|
||||||
|
|
||||||
// generate the maximum number of total pending requests
|
// generate the maximum number of total pending requests
|
||||||
maxPendingRequests := int32(cmn.RandIntn(maxTotalPendingRequestsTest) + 1)
|
maxPendingRequests := int32(cmn.RandIntn(maxTotalPendingRequestsTest-int(maxRequestsPerPeer))) + maxRequestsPerPeer
|
||||||
|
|
||||||
// generate the number of blocks to be synced
|
// generate the number of blocks to be synced
|
||||||
numBlocks := int64(cmn.RandIntn(maxNumBlocksInChainTest)) + startingHeight
|
numBlocks := int64(cmn.RandIntn(maxNumBlocksInChainTest)) + startingHeight
|
||||||
@ -585,10 +588,6 @@ func TestFSMCorrectTransitionSequences(t *testing.T) {
|
|||||||
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
|
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, height := range step.blocksAdded {
|
|
||||||
_, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(height)
|
|
||||||
assert.Nil(t, err)
|
|
||||||
}
|
|
||||||
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
|
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -596,7 +595,7 @@ func TestFSMCorrectTransitionSequences(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestReactorFSMPeerTimeout(t *testing.T) {
|
func TestFSMPeerTimeout(t *testing.T) {
|
||||||
maxRequestsPerPeer = 2
|
maxRequestsPerPeer = 2
|
||||||
resetTestValues()
|
resetTestValues()
|
||||||
peerTimeout = 20 * time.Millisecond
|
peerTimeout = 20 * time.Millisecond
|
||||||
@ -626,6 +625,8 @@ func TestReactorFSMPeerTimeout(t *testing.T) {
|
|||||||
|
|
||||||
// let FSM timeout on the block response message
|
// let FSM timeout on the block response message
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
testFSMmtx.Lock()
|
||||||
|
defer testFSMmtx.Unlock()
|
||||||
assert.Equal(t, peerID, lastPeerError.peerID)
|
assert.Equal(t, peerID, lastPeerError.peerID)
|
||||||
assert.Equal(t, errNoPeerResponse, lastPeerError.err)
|
assert.Equal(t, errNoPeerResponse, lastPeerError.err)
|
||||||
|
|
||||||
@ -637,6 +638,8 @@ func TestReactorFSMPeerTimeout(t *testing.T) {
|
|||||||
// implementation for the test reactor APIs
|
// implementation for the test reactor APIs
|
||||||
|
|
||||||
func (testR *testReactor) sendPeerError(err error, peerID p2p.ID) {
|
func (testR *testReactor) sendPeerError(err error, peerID p2p.ID) {
|
||||||
|
testFSMmtx.Lock()
|
||||||
|
defer testFSMmtx.Unlock()
|
||||||
testR.logger.Info("Reactor received sendPeerError call from FSM", "peer", peerID, "err", err)
|
testR.logger.Info("Reactor received sendPeerError call from FSM", "peer", peerID, "err", err)
|
||||||
lastPeerError.peerID = peerID
|
lastPeerError.peerID = peerID
|
||||||
lastPeerError.err = err
|
lastPeerError.err = err
|
||||||
@ -655,7 +658,7 @@ func (testR *testReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (testR *testReactor) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) {
|
func (testR *testReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) {
|
||||||
testR.logger.Info("Reactor received resetStateTimer call from FSM", "state", name, "timeout", timeout)
|
testR.logger.Info("Reactor received resetStateTimer call from FSM", "state", name, "timeout", timeout)
|
||||||
if _, ok := stateTimerStarts[name]; !ok {
|
if _, ok := stateTimerStarts[name]; !ok {
|
||||||
stateTimerStarts[name] = 1
|
stateTimerStarts[name] = 1
|
||||||
@ -664,11 +667,6 @@ func (testR *testReactor) resetStateTimer(name string, timer *time.Timer, timeou
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (testR *testReactor) switchToConsensus() {
|
|
||||||
testR.logger.Info("Reactor received switchToConsensus call from FSM")
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// ----------------------------------------
|
// ----------------------------------------
|
||||||
|
|
||||||
// -------------------------------------------------------
|
// -------------------------------------------------------
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -61,8 +62,8 @@ func makeVote(header *types.Header, blockID types.BlockID, valset *types.Validat
|
|||||||
}
|
}
|
||||||
|
|
||||||
type BlockchainReactorPair struct {
|
type BlockchainReactorPair struct {
|
||||||
reactor *BlockchainReactor
|
bcR *BlockchainReactor
|
||||||
app proxy.AppConns
|
conR *consensusReactorTest
|
||||||
}
|
}
|
||||||
|
|
||||||
func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair {
|
func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair {
|
||||||
@ -121,10 +122,26 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals
|
|||||||
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
|
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
|
||||||
bcReactor.SetLogger(logger.With("module", "blockchain"))
|
bcReactor.SetLogger(logger.With("module", "blockchain"))
|
||||||
|
|
||||||
return BlockchainReactorPair{bcReactor, proxyApp}
|
consensusReactor := &consensusReactorTest{}
|
||||||
|
consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor)
|
||||||
|
|
||||||
|
return BlockchainReactorPair{bcReactor, consensusReactor}
|
||||||
|
}
|
||||||
|
|
||||||
|
type consensusReactorTest struct {
|
||||||
|
p2p.BaseReactor // BaseService + p2p.Switch
|
||||||
|
switchedToConsensus bool
|
||||||
|
mtx sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (conR *consensusReactorTest) SwitchToConsensus(state sm.State, blocksSynced int) {
|
||||||
|
conR.mtx.Lock()
|
||||||
|
defer conR.mtx.Unlock()
|
||||||
|
conR.switchedToConsensus = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestFastSyncNoBlockResponse(t *testing.T) {
|
func TestFastSyncNoBlockResponse(t *testing.T) {
|
||||||
|
|
||||||
peerTimeout = 15 * time.Second
|
peerTimeout = 15 * time.Second
|
||||||
maxRequestsPerPeer = 20
|
maxRequestsPerPeer = 20
|
||||||
maxNumPendingRequests = 100
|
maxNumPendingRequests = 100
|
||||||
@ -142,9 +159,10 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
|
|||||||
reactorPairs[1] = newBlockchainReactor(logger, genDoc, privVals, 0)
|
reactorPairs[1] = newBlockchainReactor(logger, genDoc, privVals, 0)
|
||||||
|
|
||||||
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
|
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||||
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
|
s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR)
|
||||||
|
s.AddReactor("CONSENSUS", reactorPairs[i].conR)
|
||||||
moduleName := fmt.Sprintf("blockchain-%v", i)
|
moduleName := fmt.Sprintf("blockchain-%v", i)
|
||||||
reactorPairs[i].reactor.SetLogger(logger.With("module", moduleName))
|
reactorPairs[i].bcR.SetLogger(logger.With("module", moduleName))
|
||||||
|
|
||||||
return s
|
return s
|
||||||
|
|
||||||
@ -152,8 +170,8 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
|
|||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
for _, r := range reactorPairs {
|
for _, r := range reactorPairs {
|
||||||
_ = r.reactor.Stop()
|
_ = r.bcR.Stop()
|
||||||
_ = r.app.Stop()
|
_ = r.conR.Stop()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -169,15 +187,18 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
if reactorPairs[1].reactor.fsm.isCaughtUp() {
|
reactorPairs[1].conR.mtx.Lock()
|
||||||
|
if reactorPairs[1].conR.switchedToConsensus {
|
||||||
|
reactorPairs[1].conR.mtx.Unlock()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
reactorPairs[1].conR.mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())
|
assert.Equal(t, maxBlockHeight, reactorPairs[0].bcR.store.Height())
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
|
block := reactorPairs[1].bcR.store.LoadBlock(tt.height)
|
||||||
if tt.existent {
|
if tt.existent {
|
||||||
assert.True(t, block != nil)
|
assert.True(t, block != nil)
|
||||||
} else {
|
} else {
|
||||||
@ -192,27 +213,24 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
|
|||||||
// Alternatively we could actually dial a TCP conn but
|
// Alternatively we could actually dial a TCP conn but
|
||||||
// that seems extreme.
|
// that seems extreme.
|
||||||
func TestFastSyncBadBlockStopsPeer(t *testing.T) {
|
func TestFastSyncBadBlockStopsPeer(t *testing.T) {
|
||||||
peerTimeout = 15 * time.Second
|
|
||||||
|
|
||||||
maxRequestsPerPeer = 20
|
maxRequestsPerPeer = 20
|
||||||
maxNumPendingRequests = 400
|
maxNumPendingRequests = 400
|
||||||
numNodes := 4
|
numNodes := 4
|
||||||
|
maxBlockHeight := int64(148)
|
||||||
|
|
||||||
config = cfg.ResetTestRoot("blockchain_reactor_test")
|
config = cfg.ResetTestRoot("blockchain_reactor_test")
|
||||||
defer os.RemoveAll(config.RootDir)
|
defer os.RemoveAll(config.RootDir)
|
||||||
genDoc, privVals := randGenesisDoc(1, false, 30)
|
genDoc, privVals := randGenesisDoc(1, false, 30)
|
||||||
|
|
||||||
maxBlockHeight := int64(148)
|
|
||||||
|
|
||||||
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
|
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
|
||||||
defer func() {
|
defer func() {
|
||||||
_ = otherChain.reactor.Stop()
|
_ = otherChain.bcR.Stop()
|
||||||
_ = otherChain.app.Stop()
|
_ = otherChain.conR.Stop()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
reactorPairs := make([]BlockchainReactorPair, numNodes)
|
reactorPairs := make([]BlockchainReactorPair, numNodes)
|
||||||
|
logger := make([]log.Logger, numNodes)
|
||||||
var logger = make([]log.Logger, numNodes)
|
|
||||||
|
|
||||||
for i := 0; i < numNodes; i++ {
|
for i := 0; i < numNodes; i++ {
|
||||||
logger[i] = log.TestingLogger()
|
logger[i] = log.TestingLogger()
|
||||||
@ -224,41 +242,54 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch) *p2p.Switch {
|
switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||||
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
|
reactorPairs[i].conR.mtx.Lock()
|
||||||
|
s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR)
|
||||||
|
s.AddReactor("CONSENSUS", reactorPairs[i].conR)
|
||||||
moduleName := fmt.Sprintf("blockchain-%v", i)
|
moduleName := fmt.Sprintf("blockchain-%v", i)
|
||||||
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName))
|
reactorPairs[i].bcR.SetLogger(logger[i].With("module", moduleName))
|
||||||
|
reactorPairs[i].conR.mtx.Unlock()
|
||||||
return s
|
return s
|
||||||
|
|
||||||
}, p2p.Connect2Switches)
|
}, p2p.Connect2Switches)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
for _, r := range reactorPairs {
|
for _, r := range reactorPairs {
|
||||||
_ = r.reactor.Stop()
|
_ = r.bcR.Stop()
|
||||||
_ = r.app.Stop()
|
_ = r.conR.Stop()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
if reactorPairs[numNodes-1].reactor.fsm.isCaughtUp() || reactorPairs[numNodes-1].reactor.Switch.Peers().Size() == 0 {
|
reactorPairs[1].conR.mtx.Lock()
|
||||||
|
if reactorPairs[1].conR.switchedToConsensus {
|
||||||
|
reactorPairs[1].conR.mtx.Unlock()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
reactorPairs[1].conR.mtx.Unlock()
|
||||||
|
|
||||||
|
if reactorPairs[numNodes-1].bcR.Switch.Peers().Size() == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//at this time, reactors[0-3] is the newest
|
//at this time, reactors[0-3] is the newest
|
||||||
assert.Equal(t, numNodes-1, reactorPairs[1].reactor.Switch.Peers().Size())
|
assert.Equal(t, numNodes-1, reactorPairs[1].bcR.Switch.Peers().Size())
|
||||||
|
|
||||||
//mark reactorPairs[3] is an invalid peer
|
//mark last reactorPair as an invalid peer
|
||||||
reactorPairs[numNodes-1].reactor.store = otherChain.reactor.store
|
reactorPairs[numNodes-1].conR.mtx.Lock()
|
||||||
|
reactorPairs[numNodes-1].bcR.store = otherChain.bcR.store
|
||||||
|
reactorPairs[numNodes-1].conR.mtx.Unlock()
|
||||||
|
|
||||||
lastLogger := log.TestingLogger()
|
lastLogger := log.TestingLogger()
|
||||||
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0)
|
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0)
|
||||||
reactorPairs = append(reactorPairs, lastReactorPair)
|
reactorPairs = append(reactorPairs, lastReactorPair)
|
||||||
|
|
||||||
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
|
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||||
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
|
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].bcR)
|
||||||
|
s.AddReactor("CONSENSUS", reactorPairs[len(reactorPairs)-1].conR)
|
||||||
moduleName := fmt.Sprintf("blockchain-%v", len(reactorPairs)-1)
|
moduleName := fmt.Sprintf("blockchain-%v", len(reactorPairs)-1)
|
||||||
reactorPairs[len(reactorPairs)-1].reactor.SetLogger(lastLogger.With("module", moduleName))
|
reactorPairs[len(reactorPairs)-1].bcR.SetLogger(lastLogger.With("module", moduleName))
|
||||||
return s
|
return s
|
||||||
|
|
||||||
}, p2p.Connect2Switches)...)
|
}, p2p.Connect2Switches)...)
|
||||||
@ -269,12 +300,20 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
if lastReactorPair.reactor.fsm.isCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
|
|
||||||
|
lastReactorPair.conR.mtx.Lock()
|
||||||
|
if lastReactorPair.conR.switchedToConsensus {
|
||||||
|
lastReactorPair.conR.mtx.Unlock()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
lastReactorPair.conR.mtx.Unlock()
|
||||||
|
|
||||||
|
if lastReactorPair.bcR.Switch.Peers().Size() == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
|
assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs)-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupReactors(
|
func setupReactors(
|
||||||
@ -297,9 +336,10 @@ func setupReactors(
|
|||||||
}
|
}
|
||||||
|
|
||||||
switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch {
|
switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||||
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
|
s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR)
|
||||||
|
s.AddReactor("CONSENSUS", reactorPairs[i].conR)
|
||||||
moduleName := fmt.Sprintf("blockchain-%v", i)
|
moduleName := fmt.Sprintf("blockchain-%v", i)
|
||||||
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName))
|
reactorPairs[i].bcR.SetLogger(logger[i].With("module", moduleName))
|
||||||
return s
|
return s
|
||||||
|
|
||||||
}, p2p.Connect2Switches)
|
}, p2p.Connect2Switches)
|
||||||
@ -328,39 +368,40 @@ func TestFastSyncMultiNode(t *testing.T) {
|
|||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
for _, r := range reactorPairs {
|
for _, r := range reactorPairs {
|
||||||
_ = r.reactor.Stop()
|
_ = r.bcR.Stop()
|
||||||
_ = r.app.Stop()
|
_ = r.conR.Stop()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
outerFor:
|
outerFor:
|
||||||
for {
|
for {
|
||||||
i := 0
|
time.Sleep(1 * time.Second)
|
||||||
for i < numNodes {
|
|
||||||
if !reactorPairs[i].reactor.fsm.isCaughtUp() {
|
for i := 0; i < numNodes; i++ {
|
||||||
break
|
reactorPairs[i].conR.mtx.Lock()
|
||||||
|
if !reactorPairs[i].conR.switchedToConsensus {
|
||||||
|
reactorPairs[i].conR.mtx.Unlock()
|
||||||
|
continue outerFor
|
||||||
}
|
}
|
||||||
i++
|
reactorPairs[i].conR.mtx.Unlock()
|
||||||
}
|
}
|
||||||
if i == numNodes {
|
|
||||||
fmt.Println("SETUP FAST SYNC Duration", time.Since(start))
|
fmt.Println("SETUP FAST SYNC Duration", time.Since(start))
|
||||||
break outerFor
|
break outerFor
|
||||||
} else {
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//at this time, reactors[0-3] are the newest
|
//at this time, reactors[0-3] are the newest
|
||||||
assert.Equal(t, numNodes-1, reactorPairs[0].reactor.Switch.Peers().Size())
|
assert.Equal(t, numNodes-1, reactorPairs[0].bcR.Switch.Peers().Size())
|
||||||
|
|
||||||
lastLogger := log.TestingLogger()
|
lastLogger := log.TestingLogger()
|
||||||
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0)
|
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0)
|
||||||
reactorPairs = append(reactorPairs, lastReactorPair)
|
reactorPairs = append(reactorPairs, lastReactorPair)
|
||||||
|
|
||||||
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
|
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||||
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
|
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].bcR)
|
||||||
|
s.AddReactor("CONSENSUS", reactorPairs[len(reactorPairs)-1].conR)
|
||||||
moduleName := fmt.Sprintf("blockchainTEST-%d", len(reactorPairs)-1)
|
moduleName := fmt.Sprintf("blockchainTEST-%d", len(reactorPairs)-1)
|
||||||
reactorPairs[len(reactorPairs)-1].reactor.SetLogger(lastLogger.With("module", moduleName))
|
reactorPairs[len(reactorPairs)-1].bcR.SetLogger(lastLogger.With("module", moduleName))
|
||||||
return s
|
return s
|
||||||
|
|
||||||
}, p2p.Connect2Switches)...)
|
}, p2p.Connect2Switches)...)
|
||||||
@ -373,14 +414,20 @@ outerFor:
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
if lastReactorPair.reactor.fsm.isCaughtUp() {
|
lastReactorPair.conR.mtx.Lock()
|
||||||
|
if lastReactorPair.conR.switchedToConsensus {
|
||||||
fmt.Println("FAST SYNC Duration", time.Since(start))
|
fmt.Println("FAST SYNC Duration", time.Since(start))
|
||||||
|
lastReactorPair.conR.mtx.Unlock()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
lastReactorPair.conR.mtx.Unlock()
|
||||||
|
|
||||||
|
if lastReactorPair.bcR.Switch.Peers().Size() == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs))
|
assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs))
|
||||||
assert.Equal(t, lastReactorPair.reactor.fsm.pool.getMaxPeerHeight(), lastReactorPair.reactor.fsm.pool.height)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//----------------------------------------------
|
//----------------------------------------------
|
||||||
@ -401,33 +448,3 @@ func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Bl
|
|||||||
type testApp struct {
|
type testApp struct {
|
||||||
abci.BaseApplication
|
abci.BaseApplication
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ abci.Application = (*testApp)(nil)
|
|
||||||
|
|
||||||
func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) {
|
|
||||||
return abci.ResponseInfo{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
|
|
||||||
return abci.ResponseBeginBlock{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
|
|
||||||
return abci.ResponseEndBlock{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
|
|
||||||
return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx {
|
|
||||||
return abci.ResponseCheckTx{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *testApp) Commit() abci.ResponseCommit {
|
|
||||||
return abci.ResponseCommit{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
Reference in New Issue
Block a user