From 81d121b7d25cddaea6f35cacd5864583f1dada11 Mon Sep 17 00:00:00 2001 From: Anca Zamfir Date: Sun, 2 Jun 2019 21:41:24 +0200 Subject: [PATCH] fix lint, remove old blockchain reactor and duplicates in fsm tests --- blockchain/reactor_fsm_test.go | 507 +++++++------------------- blockchain_old/pool.go | 630 --------------------------------- blockchain_old/pool_test.go | 222 ------------ blockchain_old/reactor.go | 474 ------------------------- blockchain_old/reactor_test.go | 416 ---------------------- blockchain_old/store.go | 247 ------------- blockchain_old/store_test.go | 421 ---------------------- blockchain_old/wire.go | 13 - 8 files changed, 132 insertions(+), 2798 deletions(-) delete mode 100644 blockchain_old/pool.go delete mode 100644 blockchain_old/pool_test.go delete mode 100644 blockchain_old/reactor.go delete mode 100644 blockchain_old/reactor_test.go delete mode 100644 blockchain_old/store.go delete mode 100644 blockchain_old/store_test.go delete mode 100644 blockchain_old/wire.go diff --git a/blockchain/reactor_fsm_test.go b/blockchain/reactor_fsm_test.go index 89a01331..eca31b09 100644 --- a/blockchain/reactor_fsm_test.go +++ b/blockchain/reactor_fsm_test.go @@ -2,12 +2,9 @@ package blockchain import ( "fmt" - "sync" "testing" "time" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/assert" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" @@ -15,46 +12,29 @@ import ( "github.com/tendermint/tendermint/types" ) -// reactor for FSM testing -type testReactor struct { - logger log.Logger - fsm *bReactorFSM -} - -func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error { - return fsm.handle(&bcReactorMessage{event: ev, data: data}) -} - -var ( - testMutex sync.Mutex - numStatusRequests int - numBlockRequests int32 -) - type lastBlockRequestT struct { peerID p2p.ID height int64 } -var lastBlockRequest lastBlockRequestT - type lastPeerErrorT struct { peerID p2p.ID err error } -var lastPeerError lastPeerErrorT +// reactor for FSM testing +type testReactor struct { + logger log.Logger + fsm *bReactorFSM + numStatusRequests int + numBlockRequests int32 + lastBlockRequest lastBlockRequestT + lastPeerError lastPeerErrorT + stateTimerStarts map[string]int +} -var stateTimerStarts map[string]int - -func resetTestValues() { - stateTimerStarts = make(map[string]int) - numStatusRequests = 0 - numBlockRequests = 0 - lastBlockRequest.peerID = "" - lastBlockRequest.height = 0 - lastPeerError.peerID = "" - lastPeerError.err = nil +func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error { + return fsm.handle(&bcReactorMessage{event: ev, data: data}) } type fsmStepTestValues struct { @@ -142,12 +122,13 @@ func makeStepMakeRequestsEv(current, expected string, maxPendingRequests int32) func makeStepMakeRequestsEvErrored(current, expected string, maxPendingRequests int32, err error, peersRemoved []p2p.ID) fsmStepTestValues { return fsmStepTestValues{ - currentState: current, - event: makeRequestsEv, - data: bReactorEventData{maxNumRequests: maxPendingRequests}, - expectedState: expected, - errWanted: err, - peersNotInPool: peersRemoved, + currentState: current, + event: makeRequestsEv, + data: bReactorEventData{maxNumRequests: maxPendingRequests}, + expectedState: expected, + errWanted: err, + peersNotInPool: peersRemoved, + blockReqIncreased: true, } } @@ -201,7 +182,7 @@ func makeStepPeerRemoveEv(current, expected string, peerID p2p.ID, err error, pe // -------------------------------------------- func newTestReactor(height int64) *testReactor { - testBcR := &testReactor{logger: log.TestingLogger()} + testBcR := &testReactor{logger: log.TestingLogger(), stateTimerStarts: make(map[string]int)} testBcR.fsm = NewFSM(height, testBcR) testBcR.fsm.setLogger(testBcR.logger) return testBcR @@ -220,27 +201,79 @@ func fixBlockResponseEvStep(step *fsmStepTestValues, testBcR *testReactor) { } } -func shouldApplyProcessedBlockEvStep(step *fsmStepTestValues, testBcR *testReactor) bool { - if step.event == processedBlockEv { - _, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height) - if err == errMissingBlock { - return false - } - _, err = testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1) - if err == errMissingBlock { - return false - } +type testFields struct { + name string + startingHeight int64 + maxRequestsPerPeer int32 + maxPendingRequests int32 + steps []fsmStepTestValues +} + +func executeFSMTests(t *testing.T, tests []testFields, matchRespToReq bool) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create test reactor + testBcR := newTestReactor(tt.startingHeight) + + if tt.maxRequestsPerPeer != 0 { + maxRequestsPerPeer = tt.maxRequestsPerPeer + } + + for _, step := range tt.steps { + assert.Equal(t, step.currentState, testBcR.fsm.state.name) + + var heightBefore int64 + if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure { + heightBefore = testBcR.fsm.pool.height + } + oldNumStatusRequests := testBcR.numStatusRequests + oldNumBlockRequests := testBcR.numBlockRequests + if matchRespToReq { + fixBlockResponseEvStep(&step, testBcR) + } + + fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) + assert.Equal(t, step.errWanted, fsmErr) + + if step.shouldSendStatusReq { + assert.Equal(t, oldNumStatusRequests+1, testBcR.numStatusRequests) + } else { + assert.Equal(t, oldNumStatusRequests, testBcR.numStatusRequests) + } + + if step.blockReqIncreased { + assert.True(t, oldNumBlockRequests < testBcR.numBlockRequests) + } else { + assert.Equal(t, oldNumBlockRequests, testBcR.numBlockRequests) + } + + for _, height := range step.blocksAdded { + _, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(height) + assert.Nil(t, err) + } + if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure { + heightAfter := testBcR.fsm.pool.height + assert.Equal(t, heightBefore, heightAfter) + firstAfter, err1 := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height) + secondAfter, err2 := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1) + assert.NotNil(t, err1) + assert.NotNil(t, err2) + assert.Nil(t, firstAfter) + assert.Nil(t, secondAfter) + } + + assert.Equal(t, step.expectedState, testBcR.fsm.state.name) + + if step.expectedState == "finished" { + assert.True(t, testBcR.fsm.isCaughtUp()) + } + } + }) } - return true } func TestFSMBasic(t *testing.T) { - tests := []struct { - name string - startingHeight int64 - maxRequestsPerPeer int32 - steps []fsmStepTestValues - }{ + tests := []testFields{ { name: "one block, one peer - TS2", startingHeight: 1, @@ -292,59 +325,11 @@ func TestFSMBasic(t *testing.T) { }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Create test reactor - testBcR := newTestReactor(tt.startingHeight) - resetTestValues() - - if tt.maxRequestsPerPeer != 0 { - maxRequestsPerPeer = tt.maxRequestsPerPeer - } - - for _, step := range tt.steps { - assert.Equal(t, step.currentState, testBcR.fsm.state.name) - - oldNumStatusRequests := numStatusRequests - oldNumBlockRequests := numBlockRequests - - fixBlockResponseEvStep(&step, testBcR) - fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) - assert.Equal(t, step.errWanted, fsmErr) - - if step.shouldSendStatusReq { - assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) - } else { - assert.Equal(t, oldNumStatusRequests, numStatusRequests) - } - - if step.blockReqIncreased { - assert.True(t, oldNumBlockRequests < numBlockRequests) - } else { - assert.Equal(t, oldNumBlockRequests, numBlockRequests) - } - - for _, height := range step.blocksAdded { - _, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(height) - assert.Nil(t, err) - } - assert.Equal(t, step.expectedState, testBcR.fsm.state.name) - if step.expectedState == "finished" { - assert.True(t, testBcR.fsm.isCaughtUp()) - } - } - - }) - } + executeFSMTests(t, tests, true) } func TestFSMBlockVerificationFailure(t *testing.T) { - tests := []struct { - name string - startingHeight int64 - maxRequestsPerPeer int32 - steps []fsmStepTestValues - }{ + tests := []testFields{ { name: "block verification failure - TS2 variant", startingHeight: 1, @@ -388,73 +373,11 @@ func TestFSMBlockVerificationFailure(t *testing.T) { }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Create test reactor - testBcR := newTestReactor(tt.startingHeight) - resetTestValues() - - if tt.maxRequestsPerPeer != 0 { - maxRequestsPerPeer = tt.maxRequestsPerPeer - } - - for _, step := range tt.steps { - assert.Equal(t, step.currentState, testBcR.fsm.state.name) - - oldNumStatusRequests := numStatusRequests - oldNumBlockRequests := numBlockRequests - - var heightBefore int64 - if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure { - heightBefore = testBcR.fsm.pool.height - } - - fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) - assert.Equal(t, step.errWanted, fsmErr) - - if step.shouldSendStatusReq { - assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) - } else { - assert.Equal(t, oldNumStatusRequests, numStatusRequests) - } - - if step.blockReqIncreased { - assert.True(t, oldNumBlockRequests < numBlockRequests) - } else { - assert.Equal(t, oldNumBlockRequests, numBlockRequests) - } - - for _, height := range step.blocksAdded { - _, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(height) - assert.Nil(t, err) - } - - if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure { - heightAfter := testBcR.fsm.pool.height - assert.Equal(t, heightBefore, heightAfter) - firstAfter, err1 := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height) - secondAfter, err2 := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1) - assert.NotNil(t, err1) - assert.NotNil(t, err2) - assert.Nil(t, firstAfter) - assert.Nil(t, secondAfter) - } - assert.Equal(t, step.expectedState, testBcR.fsm.state.name) - if step.expectedState == "finished" { - assert.True(t, testBcR.fsm.isCaughtUp()) - } - } - }) - } + executeFSMTests(t, tests, false) } func TestFSMBadBlockFromPeer(t *testing.T) { - tests := []struct { - name string - startingHeight int64 - maxRequestsPerPeer int32 - steps []fsmStepTestValues - }{ + tests := []testFields{ { name: "block we haven't asked for", startingHeight: 1, @@ -529,54 +452,11 @@ func TestFSMBadBlockFromPeer(t *testing.T) { }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Create test reactor - testBcR := newTestReactor(tt.startingHeight) - resetTestValues() - - if tt.maxRequestsPerPeer != 0 { - maxRequestsPerPeer = tt.maxRequestsPerPeer - } - - for _, step := range tt.steps { - assert.Equal(t, step.currentState, testBcR.fsm.state.name) - - oldNumStatusRequests := numStatusRequests - oldNumBlockRequests := numBlockRequests - - fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) - assert.Equal(t, step.errWanted, fsmErr) - - if step.shouldSendStatusReq { - assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) - } else { - assert.Equal(t, oldNumStatusRequests, numStatusRequests) - } - - if step.blockReqIncreased { - assert.True(t, oldNumBlockRequests < numBlockRequests) - } else { - assert.Equal(t, oldNumBlockRequests, numBlockRequests) - } - - for _, height := range step.blocksAdded { - _, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(height) - assert.Nil(t, err) - } - assert.Equal(t, step.expectedState, testBcR.fsm.state.name) - } - }) - } + executeFSMTests(t, tests, false) } func TestFSMBlockAtCurrentHeightDoesNotArriveInTime(t *testing.T) { - tests := []struct { - name string - startingHeight int64 - maxRequestsPerPeer int32 - steps []fsmStepTestValues - }{ + tests := []testFields{ { name: "block at current height undelivered - TS5", startingHeight: 1, @@ -640,54 +520,12 @@ func TestFSMBlockAtCurrentHeightDoesNotArriveInTime(t *testing.T) { }, }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Create test reactor - testBcR := newTestReactor(tt.startingHeight) - resetTestValues() - if tt.maxRequestsPerPeer != 0 { - maxRequestsPerPeer = tt.maxRequestsPerPeer - } - - for _, step := range tt.steps { - assert.Equal(t, step.currentState, testBcR.fsm.state.name) - - oldNumStatusRequests := numStatusRequests - oldNumBlockRequests := numBlockRequests - - fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) - assert.Equal(t, step.errWanted, fsmErr) - - if step.shouldSendStatusReq { - assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) - } else { - assert.Equal(t, oldNumStatusRequests, numStatusRequests) - } - - if step.blockReqIncreased { - assert.True(t, oldNumBlockRequests < numBlockRequests) - } else { - assert.Equal(t, oldNumBlockRequests, numBlockRequests) - } - - for _, height := range step.blocksAdded { - _, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(height) - assert.Nil(t, err) - } - assert.Equal(t, step.expectedState, testBcR.fsm.state.name) - } - }) - } + executeFSMTests(t, tests, true) } func TestFSMPeerRelatedEvents(t *testing.T) { - tests := []struct { - name string - startingHeight int64 - maxRequestsPerPeer int32 - steps []fsmStepTestValues - }{ + tests := []testFields{ { name: "peer remove event with no blocks", startingHeight: 1, @@ -734,7 +572,7 @@ func TestFSMPeerRelatedEvents(t *testing.T) { makeStepBlockRespEv("waitForBlock", "waitForBlock", "P1", 100, []int64{}), makeStepBlockRespEv("waitForBlock", "waitForBlock", - "P1", 101, []int64{1}), + "P1", 101, []int64{100}), // processed block at heights 1 and 2 makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), @@ -761,7 +599,7 @@ func TestFSMPeerRelatedEvents(t *testing.T) { makeStepBlockRespEv("waitForBlock", "waitForBlock", "P1", 100, []int64{}), makeStepBlockRespEv("waitForBlock", "waitForBlock", - "P1", 101, []int64{1}), + "P1", 101, []int64{100}), // processed block at heights 1 and 2 makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), @@ -820,46 +658,11 @@ func TestFSMPeerRelatedEvents(t *testing.T) { }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Create test reactor - testBcR := newTestReactor(tt.startingHeight) - resetTestValues() - - if tt.maxRequestsPerPeer != 0 { - maxRequestsPerPeer = tt.maxRequestsPerPeer - } - - for _, step := range tt.steps { - require.Equal(t, step.currentState, testBcR.fsm.state.name) - - oldNumStatusRequests := numStatusRequests - - fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) - assert.Equal(t, step.errWanted, fsmErr) - - if step.shouldSendStatusReq { - assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) - } else { - assert.Equal(t, oldNumStatusRequests, numStatusRequests) - } - - for _, peerID := range step.peersNotInPool { - _, ok := testBcR.fsm.pool.peers[peerID] - assert.False(t, ok) - } - assert.Equal(t, step.expectedState, testBcR.fsm.state.name) - } - }) - } + executeFSMTests(t, tests, true) } func TestFSMStopFSM(t *testing.T) { - tests := []struct { - name string - startingHeight int64 - steps []fsmStepTestValues - }{ + tests := []testFields{ { name: "stopFSMEv in unknown", steps: []fsmStepTestValues{ @@ -890,28 +693,11 @@ func TestFSMStopFSM(t *testing.T) { }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Create test reactor - testBcR := newTestReactor(tt.startingHeight) - resetTestValues() - - for _, step := range tt.steps { - assert.Equal(t, step.currentState, testBcR.fsm.state.name) - fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) - assert.Equal(t, step.errWanted, fsmErr) - assert.Equal(t, step.expectedState, testBcR.fsm.state.name) - } - }) - } + executeFSMTests(t, tests, false) } func TestFSMUnknownElements(t *testing.T) { - tests := []struct { - name string - startingHeight int64 - steps []fsmStepTestValues - }{ + tests := []testFields{ { name: "unknown event for state unknown", steps: []fsmStepTestValues{ @@ -936,29 +722,11 @@ func TestFSMUnknownElements(t *testing.T) { }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Create test reactor - testBcR := newTestReactor(tt.startingHeight) - resetTestValues() - - for _, step := range tt.steps { - assert.Equal(t, step.currentState, testBcR.fsm.state.name) - fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) - assert.Equal(t, step.errWanted, fsmErr) - assert.Equal(t, step.expectedState, testBcR.fsm.state.name) - } - }) - } + executeFSMTests(t, tests, false) } func TestFSMPeerStateTimeoutEvent(t *testing.T) { - tests := []struct { - name string - startingHeight int64 - maxRequestsPerPeer int32 - steps []fsmStepTestValues - }{ + tests := []testFields{ { name: "timeout event for state waitForPeer while in state waitForPeer - TS1", startingHeight: 1, @@ -1013,28 +781,7 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) { }, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // Create test reactor - testBcR := newTestReactor(tt.startingHeight) - resetTestValues() - - for _, step := range tt.steps { - assert.Equal(t, step.currentState, testBcR.fsm.state.name) - fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) - assert.Equal(t, step.errWanted, fsmErr) - assert.Equal(t, step.expectedState, testBcR.fsm.state.name) - } - }) - } -} - -type testFields struct { - name string - startingHeight int64 - maxRequestsPerPeer int32 - maxPendingRequests int32 - steps []fsmStepTestValues + executeFSMTests(t, tests, false) } func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPeers int, randomPeerHeights bool, @@ -1155,6 +902,20 @@ func makeCorrectTransitionSequenceWithRandomParameters() testFields { return makeCorrectTransitionSequence(startingHeight, numBlocks, numPeers, true, maxRequestsPerPeer, maxPendingRequests) } +func shouldApplyProcessedBlockEvStep(step *fsmStepTestValues, testBcR *testReactor) bool { + if step.event == processedBlockEv { + _, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height) + if err == errMissingBlock { + return false + } + _, err = testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1) + if err == errMissingBlock { + return false + } + } + return true +} + func TestFSMCorrectTransitionSequences(t *testing.T) { tests := []testFields{ @@ -1166,7 +927,6 @@ func TestFSMCorrectTransitionSequences(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Create test reactor testBcR := newTestReactor(tt.startingHeight) - resetTestValues() if tt.maxRequestsPerPeer != 0 { maxRequestsPerPeer = tt.maxRequestsPerPeer @@ -1175,7 +935,7 @@ func TestFSMCorrectTransitionSequences(t *testing.T) { for _, step := range tt.steps { assert.Equal(t, step.currentState, testBcR.fsm.state.name) - oldNumStatusRequests := numStatusRequests + oldNumStatusRequests := testBcR.numStatusRequests fixBlockResponseEvStep(&step, testBcR) if !shouldApplyProcessedBlockEvStep(&step, testBcR) { continue @@ -1185,9 +945,9 @@ func TestFSMCorrectTransitionSequences(t *testing.T) { assert.Equal(t, step.errWanted, fsmErr) if step.shouldSendStatusReq { - assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) + assert.Equal(t, oldNumStatusRequests+1, testBcR.numStatusRequests) } else { - assert.Equal(t, oldNumStatusRequests, numStatusRequests) + assert.Equal(t, oldNumStatusRequests, testBcR.numStatusRequests) } assert.Equal(t, step.expectedState, testBcR.fsm.state.name) @@ -1203,23 +963,21 @@ func TestFSMCorrectTransitionSequences(t *testing.T) { // ---------------------------------------- // implements the bcRNotifier func (testR *testReactor) sendPeerError(err error, peerID p2p.ID) { - testMutex.Lock() - defer testMutex.Unlock() testR.logger.Info("Reactor received sendPeerError call from FSM", "peer", peerID, "err", err) - lastPeerError.peerID = peerID - lastPeerError.err = err + testR.lastPeerError.peerID = peerID + testR.lastPeerError.err = err } func (testR *testReactor) sendStatusRequest() { testR.logger.Info("Reactor received sendStatusRequest call from FSM") - numStatusRequests++ + testR.numStatusRequests++ } func (testR *testReactor) sendBlockRequest(peerID p2p.ID, height int64) error { testR.logger.Info("Reactor received sendBlockRequest call from FSM", "peer", peerID, "height", height) - numBlockRequests++ - lastBlockRequest.peerID = peerID - lastBlockRequest.height = height + testR.numBlockRequests++ + testR.lastBlockRequest.peerID = peerID + testR.lastBlockRequest.height = height if height == 9999999 { // simulate switch does not have peer return errNilPeerForBlockRequest @@ -1229,15 +987,14 @@ func (testR *testReactor) sendBlockRequest(peerID p2p.ID, height int64) error { func (testR *testReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) { testR.logger.Info("Reactor received resetStateTimer call from FSM", "state", name, "timeout", timeout) - if _, ok := stateTimerStarts[name]; !ok { - stateTimerStarts[name] = 1 + if _, ok := testR.stateTimerStarts[name]; !ok { + testR.stateTimerStarts[name] = 1 } else { - stateTimerStarts[name]++ + testR.stateTimerStarts[name]++ } } func (testR *testReactor) switchToConsensus() { - } // ---------------------------------------- diff --git a/blockchain_old/pool.go b/blockchain_old/pool.go deleted file mode 100644 index 0fa65f65..00000000 --- a/blockchain_old/pool.go +++ /dev/null @@ -1,630 +0,0 @@ -package blockchain_old - -import ( - "errors" - "fmt" - "math" - "sync" - "sync/atomic" - "time" - - cmn "github.com/tendermint/tendermint/libs/common" - flow "github.com/tendermint/tendermint/libs/flowrate" - "github.com/tendermint/tendermint/libs/log" - - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" -) - -/* -eg, L = latency = 0.1s - P = num peers = 10 - FN = num full nodes - BS = 1kB block size - CB = 1 Mbit/s = 128 kB/s - CB/P = 12.8 kB - B/S = CB/P/BS = 12.8 blocks/s - - 12.8 * 0.1 = 1.28 blocks on conn -*/ - -const ( - requestIntervalMS = 2 - maxTotalRequesters = 600 - maxPendingRequests = maxTotalRequesters - maxPendingRequestsPerPeer = 20 - - // Minimum recv rate to ensure we're receiving blocks from a peer fast - // enough. If a peer is not sending us data at at least that rate, we - // consider them to have timedout and we disconnect. - // - // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, - // sending data across atlantic ~ 7.5 KB/s. - minRecvRate = 7680 - - // Maximum difference between current and new block's height. - maxDiffBetweenCurrentAndReceivedBlockHeight = 100 -) - -var peerTimeout = 15 * time.Second // not const so we can override with tests - -/* - Peers self report their heights when we join the block pool. - Starting from our latest pool.height, we request blocks - in sequence from peers that reported higher heights than ours. - Every so often we ask peers what height they're on so we can keep going. - - Requests are continuously made for blocks of higher heights until - the limit is reached. If most of the requests have no available peers, and we - are not at peer limits, we can probably switch to consensus reactor -*/ - -type BlockPool struct { - cmn.BaseService - startTime time.Time - - mtx sync.Mutex - // block requests - requesters map[int64]*bpRequester - height int64 // the lowest key in requesters. - // peers - peers map[p2p.ID]*bpPeer - maxPeerHeight int64 // the biggest reported height - - // atomic - numPending int32 // number of requests pending assignment or block response - - requestsCh chan<- BlockRequest - errorsCh chan<- peerError -} - -// NewBlockPool returns a new BlockPool with the height equal to start. Block -// requests and errors will be sent to requestsCh and errorsCh accordingly. -func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool { - bp := &BlockPool{ - peers: make(map[p2p.ID]*bpPeer), - - requesters: make(map[int64]*bpRequester), - height: start, - numPending: 0, - - requestsCh: requestsCh, - errorsCh: errorsCh, - } - bp.BaseService = *cmn.NewBaseService(nil, "BlockPool", bp) - return bp -} - -// OnStart implements cmn.Service by spawning requesters routine and recording -// pool's start time. -func (pool *BlockPool) OnStart() error { - go pool.makeRequestersRoutine() - pool.startTime = time.Now() - return nil -} - -// spawns requesters as needed -func (pool *BlockPool) makeRequestersRoutine() { - for { - if !pool.IsRunning() { - break - } - - _, numPending, lenRequesters := pool.GetStatus() - if numPending >= maxPendingRequests { - // sleep for a bit. - time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers - pool.removeTimedoutPeers() - } else if lenRequesters >= maxTotalRequesters { - // sleep for a bit. - time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers - pool.removeTimedoutPeers() - } else { - // request for more blocks. - pool.makeNextRequester() - } - } -} - -func (pool *BlockPool) removeTimedoutPeers() { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - for _, peer := range pool.peers { - if !peer.didTimeout && peer.numPending > 0 { - curRate := peer.recvMonitor.Status().CurRate - // curRate can be 0 on start - if curRate != 0 && curRate < minRecvRate { - err := errors.New("peer is not sending us data fast enough") - pool.sendError(err, peer.id) - pool.Logger.Error("SendTimeout", "peer", peer.id, - "reason", err, - "curRate", fmt.Sprintf("%d KB/s", curRate/1024), - "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) - peer.didTimeout = true - } - } - if peer.didTimeout { - pool.removePeer(peer.id) - } - } -} - -// GetStatus returns pool's height, numPending requests and the number of -// requesters. -func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters) -} - -// IsCaughtUp returns true if this node is caught up, false - otherwise. -// TODO: relax conditions, prevent abuse. -func (pool *BlockPool) IsCaughtUp() bool { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - // Need at least 1 peer to be considered caught up. - if len(pool.peers) == 0 { - pool.Logger.Debug("Blockpool has no peers") - return false - } - - // Some conditions to determine if we're caught up. - // Ensures we've either received a block or waited some amount of time, - // and that we're synced to the highest known height. - // Note we use maxPeerHeight - 1 because to sync block H requires block H+1 - // to verify the LastCommit. - receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second - ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1) - isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers - return isCaughtUp -} - -// We need to see the second block's Commit to validate the first block. -// So we peek two blocks at a time. -// The caller will verify the commit. -func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - if r := pool.requesters[pool.height]; r != nil { - first = r.getBlock() - } - if r := pool.requesters[pool.height+1]; r != nil { - second = r.getBlock() - } - return -} - -// Pop the first block at pool.height -// It must have been validated by 'second'.Commit from PeekTwoBlocks(). -func (pool *BlockPool) PopRequest() { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - if r := pool.requesters[pool.height]; r != nil { - /* The block can disappear at any time, due to removePeer(). - if r := pool.requesters[pool.height]; r == nil || r.block == nil { - PanicSanity("PopRequest() requires a valid block") - } - */ - r.Stop() - delete(pool.requesters, pool.height) - pool.height++ - } else { - panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) - } -} - -// Invalidates the block at pool.height, -// Remove the peer and redo request from others. -// Returns the ID of the removed peer. -func (pool *BlockPool) RedoRequest(height int64) p2p.ID { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - request := pool.requesters[height] - peerID := request.getPeerID() - if peerID != p2p.ID("") { - // RemovePeer will redo all requesters associated with this peer. - pool.removePeer(peerID) - } - return peerID -} - -// TODO: ensure that blocks come in order for each peer. -func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - requester := pool.requesters[block.Height] - if requester == nil { - pool.Logger.Info("peer sent us a block we didn't expect", "peer", peerID, "curHeight", pool.height, "blockHeight", block.Height) - diff := pool.height - block.Height - if diff < 0 { - diff *= -1 - } - if diff > maxDiffBetweenCurrentAndReceivedBlockHeight { - pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) - } - return - } - - if requester.setBlock(block, peerID) { - atomic.AddInt32(&pool.numPending, -1) - peer := pool.peers[peerID] - if peer != nil { - peer.decrPending(blockSize) - } - } else { - pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height) - pool.sendError(errors.New("invalid peer"), peerID) - } -} - -// MaxPeerHeight returns the highest reported height. -func (pool *BlockPool) MaxPeerHeight() int64 { - pool.mtx.Lock() - defer pool.mtx.Unlock() - return pool.maxPeerHeight -} - -// SetPeerHeight sets the peer's alleged blockchain height. -func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - peer := pool.peers[peerID] - if peer != nil { - peer.height = height - } else { - peer = newBPPeer(pool, peerID, height) - peer.setLogger(pool.Logger.With("peer", peerID)) - pool.peers[peerID] = peer - } - - if height > pool.maxPeerHeight { - pool.maxPeerHeight = height - } -} - -// RemovePeer removes the peer with peerID from the pool. If there's no peer -// with peerID, function is a no-op. -func (pool *BlockPool) RemovePeer(peerID p2p.ID) { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - pool.removePeer(peerID) -} - -func (pool *BlockPool) removePeer(peerID p2p.ID) { - for _, requester := range pool.requesters { - if requester.getPeerID() == peerID { - requester.redo(peerID) - } - } - - peer, ok := pool.peers[peerID] - if ok { - if peer.timeout != nil { - peer.timeout.Stop() - } - - delete(pool.peers, peerID) - - // Find a new peer with the biggest height and update maxPeerHeight if the - // peer's height was the biggest. - if peer.height == pool.maxPeerHeight { - pool.updateMaxPeerHeight() - } - } -} - -// If no peers are left, maxPeerHeight is set to 0. -func (pool *BlockPool) updateMaxPeerHeight() { - var max int64 - for _, peer := range pool.peers { - if peer.height > max { - max = peer.height - } - } - pool.maxPeerHeight = max -} - -// Pick an available peer with at least the given minHeight. -// If no peers are available, returns nil. -func (pool *BlockPool) pickIncrAvailablePeer(minHeight int64) *bpPeer { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - for _, peer := range pool.peers { - if peer.didTimeout { - pool.removePeer(peer.id) - continue - } - if peer.numPending >= maxPendingRequestsPerPeer { - continue - } - if peer.height < minHeight { - continue - } - peer.incrPending() - return peer - } - return nil -} - -func (pool *BlockPool) makeNextRequester() { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - nextHeight := pool.height + pool.requestersLen() - if nextHeight > pool.maxPeerHeight { - return - } - - request := newBPRequester(pool, nextHeight) - - pool.requesters[nextHeight] = request - atomic.AddInt32(&pool.numPending, 1) - - err := request.Start() - if err != nil { - request.Logger.Error("Error starting request", "err", err) - } -} - -func (pool *BlockPool) requestersLen() int64 { - return int64(len(pool.requesters)) -} - -func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) { - if !pool.IsRunning() { - return - } - pool.requestsCh <- BlockRequest{height, peerID} -} - -func (pool *BlockPool) sendError(err error, peerID p2p.ID) { - if !pool.IsRunning() { - return - } - pool.errorsCh <- peerError{err, peerID} -} - -// for debugging purposes -//nolint:unused -func (pool *BlockPool) debug() string { - pool.mtx.Lock() - defer pool.mtx.Unlock() - - str := "" - nextHeight := pool.height + pool.requestersLen() - for h := pool.height; h < nextHeight; h++ { - if pool.requesters[h] == nil { - str += fmt.Sprintf("H(%v):X ", h) - } else { - str += fmt.Sprintf("H(%v):", h) - str += fmt.Sprintf("B?(%v) ", pool.requesters[h].block != nil) - } - } - return str -} - -//------------------------------------- - -type bpPeer struct { - pool *BlockPool - id p2p.ID - recvMonitor *flow.Monitor - - height int64 - numPending int32 - timeout *time.Timer - didTimeout bool - - logger log.Logger -} - -func newBPPeer(pool *BlockPool, peerID p2p.ID, height int64) *bpPeer { - peer := &bpPeer{ - pool: pool, - id: peerID, - height: height, - numPending: 0, - logger: log.NewNopLogger(), - } - return peer -} - -func (peer *bpPeer) setLogger(l log.Logger) { - peer.logger = l -} - -func (peer *bpPeer) resetMonitor() { - peer.recvMonitor = flow.New(time.Second, time.Second*40) - initialValue := float64(minRecvRate) * math.E - peer.recvMonitor.SetREMA(initialValue) -} - -func (peer *bpPeer) resetTimeout() { - if peer.timeout == nil { - peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout) - } else { - peer.timeout.Reset(peerTimeout) - } -} - -func (peer *bpPeer) incrPending() { - if peer.numPending == 0 { - peer.resetMonitor() - peer.resetTimeout() - } - peer.numPending++ -} - -func (peer *bpPeer) decrPending(recvSize int) { - peer.numPending-- - if peer.numPending == 0 { - peer.timeout.Stop() - } else { - peer.recvMonitor.Update(recvSize) - peer.resetTimeout() - } -} - -func (peer *bpPeer) onTimeout() { - peer.pool.mtx.Lock() - defer peer.pool.mtx.Unlock() - - err := errors.New("peer did not send us anything") - peer.pool.sendError(err, peer.id) - peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout) - peer.didTimeout = true -} - -//------------------------------------- - -type bpRequester struct { - cmn.BaseService - pool *BlockPool - height int64 - gotBlockCh chan struct{} - redoCh chan p2p.ID //redo may send multitime, add peerId to identify repeat - - mtx sync.Mutex - peerID p2p.ID - block *types.Block -} - -func newBPRequester(pool *BlockPool, height int64) *bpRequester { - bpr := &bpRequester{ - pool: pool, - height: height, - gotBlockCh: make(chan struct{}, 1), - redoCh: make(chan p2p.ID, 1), - - peerID: "", - block: nil, - } - bpr.BaseService = *cmn.NewBaseService(nil, "bpRequester", bpr) - return bpr -} - -func (bpr *bpRequester) OnStart() error { - go bpr.requestRoutine() - return nil -} - -// Returns true if the peer matches and block doesn't already exist. -func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool { - bpr.mtx.Lock() - if bpr.block != nil || bpr.peerID != peerID { - bpr.mtx.Unlock() - return false - } - bpr.block = block - bpr.mtx.Unlock() - - select { - case bpr.gotBlockCh <- struct{}{}: - default: - } - return true -} - -func (bpr *bpRequester) getBlock() *types.Block { - bpr.mtx.Lock() - defer bpr.mtx.Unlock() - return bpr.block -} - -func (bpr *bpRequester) getPeerID() p2p.ID { - bpr.mtx.Lock() - defer bpr.mtx.Unlock() - return bpr.peerID -} - -// This is called from the requestRoutine, upon redo(). -func (bpr *bpRequester) reset() { - bpr.mtx.Lock() - defer bpr.mtx.Unlock() - - if bpr.block != nil { - atomic.AddInt32(&bpr.pool.numPending, 1) - } - - bpr.peerID = "" - bpr.block = nil -} - -// Tells bpRequester to pick another peer and try again. -// NOTE: Nonblocking, and does nothing if another redo -// was already requested. -func (bpr *bpRequester) redo(peerId p2p.ID) { - select { - case bpr.redoCh <- peerId: - default: - } -} - -// Responsible for making more requests as necessary -// Returns only when a block is found (e.g. AddBlock() is called) -func (bpr *bpRequester) requestRoutine() { -OUTER_LOOP: - for { - // Pick a peer to send request to. - var peer *bpPeer - PICK_PEER_LOOP: - for { - if !bpr.IsRunning() || !bpr.pool.IsRunning() { - return - } - peer = bpr.pool.pickIncrAvailablePeer(bpr.height) - if peer == nil { - //log.Info("No peers available", "height", height) - time.Sleep(requestIntervalMS * time.Millisecond) - continue PICK_PEER_LOOP - } - break PICK_PEER_LOOP - } - bpr.mtx.Lock() - bpr.peerID = peer.id - bpr.mtx.Unlock() - - // Send request and wait. - bpr.pool.sendRequest(bpr.height, peer.id) - WAIT_LOOP: - for { - select { - case <-bpr.pool.Quit(): - bpr.Stop() - return - case <-bpr.Quit(): - return - case peerID := <-bpr.redoCh: - if peerID == bpr.peerID { - bpr.reset() - continue OUTER_LOOP - } else { - continue WAIT_LOOP - } - case <-bpr.gotBlockCh: - // We got a block! - // Continue the for-loop and wait til Quit. - continue WAIT_LOOP - } - } - } -} - -//------------------------------------- - -type BlockRequest struct { - Height int64 - PeerID p2p.ID -} diff --git a/blockchain_old/pool_test.go b/blockchain_old/pool_test.go deleted file mode 100644 index e453c096..00000000 --- a/blockchain_old/pool_test.go +++ /dev/null @@ -1,222 +0,0 @@ -package blockchain_old - -import ( - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/types" -) - -func init() { - peerTimeout = 2 * time.Second -} - -type testPeer struct { - id p2p.ID - height int64 - inputChan chan inputData //make sure each peer's data is sequential -} - -type inputData struct { - t *testing.T - pool *BlockPool - request BlockRequest -} - -func (p testPeer) runInputRoutine() { - go func() { - for input := range p.inputChan { - p.simulateInput(input) - } - }() -} - -// Request desired, pretend like we got the block immediately. -func (p testPeer) simulateInput(input inputData) { - block := &types.Block{Header: types.Header{Height: input.request.Height}} - input.pool.AddBlock(input.request.PeerID, block, 123) - input.t.Logf("Added block from peer %v (height: %v)", input.request.PeerID, input.request.Height) -} - -type testPeers map[p2p.ID]testPeer - -func (ps testPeers) start() { - for _, v := range ps { - v.runInputRoutine() - } -} - -func (ps testPeers) stop() { - for _, v := range ps { - close(v.inputChan) - } -} - -func makePeers(numPeers int, minHeight, maxHeight int64) testPeers { - peers := make(testPeers, numPeers) - for i := 0; i < numPeers; i++ { - peerID := p2p.ID(cmn.RandStr(12)) - height := minHeight + cmn.RandInt63n(maxHeight-minHeight) - peers[peerID] = testPeer{peerID, height, make(chan inputData, 10)} - } - return peers -} - -func TestBlockPoolBasic(t *testing.T) { - start := int64(42) - peers := makePeers(10, start+1, 1000) - errorsCh := make(chan peerError, 1000) - requestsCh := make(chan BlockRequest, 1000) - pool := NewBlockPool(start, requestsCh, errorsCh) - pool.SetLogger(log.TestingLogger()) - - err := pool.Start() - if err != nil { - t.Error(err) - } - - defer pool.Stop() - - peers.start() - defer peers.stop() - - // Introduce each peer. - go func() { - for _, peer := range peers { - pool.SetPeerHeight(peer.id, peer.height) - } - }() - - // Start a goroutine to pull blocks - go func() { - for { - if !pool.IsRunning() { - return - } - first, second := pool.PeekTwoBlocks() - if first != nil && second != nil { - pool.PopRequest() - } else { - time.Sleep(1 * time.Second) - } - } - }() - - // Pull from channels - for { - select { - case err := <-errorsCh: - t.Error(err) - case request := <-requestsCh: - t.Logf("Pulled new BlockRequest %v", request) - if request.Height == 300 { - return // Done! - } - - peers[request.PeerID].inputChan <- inputData{t, pool, request} - } - } -} - -func TestBlockPoolTimeout(t *testing.T) { - start := int64(42) - peers := makePeers(10, start+1, 1000) - errorsCh := make(chan peerError, 1000) - requestsCh := make(chan BlockRequest, 1000) - pool := NewBlockPool(start, requestsCh, errorsCh) - pool.SetLogger(log.TestingLogger()) - err := pool.Start() - if err != nil { - t.Error(err) - } - defer pool.Stop() - - for _, peer := range peers { - t.Logf("Peer %v", peer.id) - } - - // Introduce each peer. - go func() { - for _, peer := range peers { - pool.SetPeerHeight(peer.id, peer.height) - } - }() - - // Start a goroutine to pull blocks - go func() { - for { - if !pool.IsRunning() { - return - } - first, second := pool.PeekTwoBlocks() - if first != nil && second != nil { - pool.PopRequest() - } else { - time.Sleep(1 * time.Second) - } - } - }() - - // Pull from channels - counter := 0 - timedOut := map[p2p.ID]struct{}{} - for { - select { - case err := <-errorsCh: - t.Log(err) - // consider error to be always timeout here - if _, ok := timedOut[err.peerID]; !ok { - counter++ - if counter == len(peers) { - return // Done! - } - } - case request := <-requestsCh: - t.Logf("Pulled new BlockRequest %+v", request) - } - } -} - -func TestBlockPoolRemovePeer(t *testing.T) { - peers := make(testPeers, 10) - for i := 0; i < 10; i++ { - peerID := p2p.ID(fmt.Sprintf("%d", i+1)) - height := int64(i + 1) - peers[peerID] = testPeer{peerID, height, make(chan inputData)} - } - requestsCh := make(chan BlockRequest) - errorsCh := make(chan peerError) - - pool := NewBlockPool(1, requestsCh, errorsCh) - pool.SetLogger(log.TestingLogger()) - err := pool.Start() - require.NoError(t, err) - defer pool.Stop() - - // add peers - for peerID, peer := range peers { - pool.SetPeerHeight(peerID, peer.height) - } - assert.EqualValues(t, 10, pool.MaxPeerHeight()) - - // remove not-existing peer - assert.NotPanics(t, func() { pool.RemovePeer(p2p.ID("Superman")) }) - - // remove peer with biggest height - pool.RemovePeer(p2p.ID("10")) - assert.EqualValues(t, 9, pool.MaxPeerHeight()) - - // remove all peers - for peerID := range peers { - pool.RemovePeer(peerID) - } - - assert.EqualValues(t, 0, pool.MaxPeerHeight()) -} diff --git a/blockchain_old/reactor.go b/blockchain_old/reactor.go deleted file mode 100644 index 92984f81..00000000 --- a/blockchain_old/reactor.go +++ /dev/null @@ -1,474 +0,0 @@ -package blockchain_old - -import ( - "errors" - "fmt" - "reflect" - "time" - - amino "github.com/tendermint/go-amino" - - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" - sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/types" -) - -const ( - // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) - BlockchainChannel = byte(0x40) - - trySyncIntervalMS = 10 - - // stop syncing when last block's time is - // within this much of the system time. - // stopSyncingDurationMinutes = 10 - - // ask for best height every 10s - statusUpdateIntervalSeconds = 10 - // check if we should switch to consensus reactor - switchToConsensusIntervalSeconds = 1 - - // NOTE: keep up to date with bcBlockResponseMessage - bcBlockResponseMessagePrefixSize = 4 - bcBlockResponseMessageFieldKeySize = 1 - maxMsgSize = types.MaxBlockSizeBytes + - bcBlockResponseMessagePrefixSize + - bcBlockResponseMessageFieldKeySize -) - -type consensusReactor interface { - // for when we switch from blockchain reactor and fast sync to - // the consensus machine - SwitchToConsensus(sm.State, int) -} - -type peerError struct { - err error - peerID p2p.ID -} - -func (e peerError) Error() string { - return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) -} - -// BlockchainReactor handles long-term catchup syncing. -type BlockchainReactor struct { - p2p.BaseReactor - - // immutable - initialState sm.State - - blockExec *sm.BlockExecutor - store *BlockStore - pool *BlockPool - fastSync bool - - requestsCh <-chan BlockRequest - errorsCh <-chan peerError -} - -// NewBlockchainReactor returns new reactor instance. -func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore, - fastSync bool) *BlockchainReactor { - - if state.LastBlockHeight != store.Height() { - panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, - store.Height())) - } - - requestsCh := make(chan BlockRequest, maxTotalRequesters) - - const capacity = 1000 // must be bigger than peers count - errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock - - pool := NewBlockPool( - store.Height()+1, - requestsCh, - errorsCh, - ) - - bcR := &BlockchainReactor{ - initialState: state, - blockExec: blockExec, - store: store, - pool: pool, - fastSync: fastSync, - requestsCh: requestsCh, - errorsCh: errorsCh, - } - bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) - return bcR -} - -// SetLogger implements cmn.Service by setting the logger on reactor and pool. -func (bcR *BlockchainReactor) SetLogger(l log.Logger) { - bcR.BaseService.Logger = l - bcR.pool.Logger = l -} - -// OnStart implements cmn.Service. -func (bcR *BlockchainReactor) OnStart() error { - if bcR.fastSync { - err := bcR.pool.Start() - if err != nil { - return err - } - go bcR.poolRoutine() - } - return nil -} - -// OnStop implements cmn.Service. -func (bcR *BlockchainReactor) OnStop() { - bcR.pool.Stop() -} - -// GetChannels implements Reactor -func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { - return []*p2p.ChannelDescriptor{ - { - ID: BlockchainChannel, - Priority: 10, - SendQueueCapacity: 1000, - RecvBufferCapacity: 50 * 4096, - RecvMessageCapacity: maxMsgSize, - }, - } -} - -// AddPeer implements Reactor by sending our state to peer. -func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) { - msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()}) - if !peer.Send(BlockchainChannel, msgBytes) { - // doing nothing, will try later in `poolRoutine` - } - // peer is added to the pool once we receive the first - // bcStatusResponseMessage from the peer and call pool.SetPeerHeight -} - -// RemovePeer implements Reactor by removing peer from the pool. -func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { - bcR.pool.RemovePeer(peer.ID()) -} - -// respondToPeer loads a block and sends it to the requesting peer, -// if we have it. Otherwise, we'll respond saying we don't have it. -// According to the Tendermint spec, if all nodes are honest, -// no node should be requesting for a block that's non-existent. -func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage, - src p2p.Peer) (queued bool) { - - block := bcR.store.LoadBlock(msg.Height) - if block != nil { - msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block}) - return src.TrySend(BlockchainChannel, msgBytes) - } - - bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height) - - msgBytes := cdc.MustMarshalBinaryBare(&bcNoBlockResponseMessage{Height: msg.Height}) - return src.TrySend(BlockchainChannel, msgBytes) -} - -// Receive implements Reactor by handling 4 types of messages (look below). -func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := decodeMsg(msgBytes) - if err != nil { - bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) - bcR.Switch.StopPeerForError(src, err) - return - } - - if err = msg.ValidateBasic(); err != nil { - bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) - bcR.Switch.StopPeerForError(src, err) - return - } - - bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg) - - switch msg := msg.(type) { - case *bcBlockRequestMessage: - if queued := bcR.respondToPeer(msg, src); !queued { - // Unfortunately not queued since the queue is full. - } - case *bcBlockResponseMessage: - bcR.pool.AddBlock(src.ID(), msg.Block, len(msgBytes)) - case *bcStatusRequestMessage: - // Send peer our state. - msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()}) - queued := src.TrySend(BlockchainChannel, msgBytes) - if !queued { - // sorry - } - case *bcStatusResponseMessage: - // Got a peer status. Unverified. - bcR.pool.SetPeerHeight(src.ID(), msg.Height) - default: - bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) - } -} - -// Handle messages from the poolReactor telling the reactor what to do. -// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! -func (bcR *BlockchainReactor) poolRoutine() { - - trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) - statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) - switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second) - - blocksSynced := 0 - - chainID := bcR.initialState.ChainID - state := bcR.initialState - - lastHundred := time.Now() - lastRate := 0.0 - - didProcessCh := make(chan struct{}, 1) - -FOR_LOOP: - for { - select { - case request := <-bcR.requestsCh: - peer := bcR.Switch.Peers().Get(request.PeerID) - if peer == nil { - continue FOR_LOOP // Peer has since been disconnected. - } - msgBytes := cdc.MustMarshalBinaryBare(&bcBlockRequestMessage{request.Height}) - queued := peer.TrySend(BlockchainChannel, msgBytes) - if !queued { - // We couldn't make the request, send-queue full. - // The pool handles timeouts, just let it go. - continue FOR_LOOP - } - - case err := <-bcR.errorsCh: - peer := bcR.Switch.Peers().Get(err.peerID) - if peer != nil { - bcR.Switch.StopPeerForError(peer, err) - } - - case <-statusUpdateTicker.C: - // ask for status updates - go bcR.BroadcastStatusRequest() // nolint: errcheck - - case <-switchToConsensusTicker.C: - height, numPending, lenRequesters := bcR.pool.GetStatus() - outbound, inbound, _ := bcR.Switch.NumPeers() - bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters, - "outbound", outbound, "inbound", inbound) - if bcR.pool.IsCaughtUp() { - bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) - bcR.pool.Stop() - - conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) - if ok { - conR.SwitchToConsensus(state, blocksSynced) - } else { - // should only happen during testing - } - - break FOR_LOOP - } - - case <-trySyncTicker.C: // chan time - select { - case didProcessCh <- struct{}{}: - default: - } - - case <-didProcessCh: - // NOTE: It is a subtle mistake to process more than a single block - // at a time (e.g. 10) here, because we only TrySend 1 request per - // loop. The ratio mismatch can result in starving of blocks, a - // sudden burst of requests and responses, and repeat. - // Consequently, it is better to split these routines rather than - // coupling them as it's written here. TODO uncouple from request - // routine. - - // See if there are any blocks to sync. - first, second := bcR.pool.PeekTwoBlocks() - //bcR.Logger.Info("TrySync peeked", "first", first, "second", second) - if first == nil || second == nil { - // We need both to sync the first block. - continue FOR_LOOP - } else { - // Try again quickly next loop. - didProcessCh <- struct{}{} - } - - firstParts := first.MakePartSet(types.BlockPartSizeBytes) - firstPartsHeader := firstParts.Header() - firstID := types.BlockID{Hash: first.Hash(), PartsHeader: firstPartsHeader} - // Finally, verify the first block using the second's commit - // NOTE: we can probably make this more efficient, but note that calling - // first.Hash() doesn't verify the tx contents, so MakePartSet() is - // currently necessary. - err := state.Validators.VerifyCommit( - chainID, firstID, first.Height, second.LastCommit) - if err != nil { - bcR.Logger.Error("Error in validation", "err", err) - peerID := bcR.pool.RedoRequest(first.Height) - peer := bcR.Switch.Peers().Get(peerID) - if peer != nil { - // NOTE: we've already removed the peer's request, but we - // still need to clean up the rest. - bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) - } - peerID2 := bcR.pool.RedoRequest(second.Height) - peer2 := bcR.Switch.Peers().Get(peerID2) - if peer2 != nil && peer2 != peer { - // NOTE: we've already removed the peer's request, but we - // still need to clean up the rest. - bcR.Switch.StopPeerForError(peer2, fmt.Errorf("BlockchainReactor validation error: %v", err)) - } - continue FOR_LOOP - } else { - bcR.pool.PopRequest() - - // TODO: batch saves so we dont persist to disk every block - bcR.store.SaveBlock(first, firstParts, second.LastCommit) - - // TODO: same thing for app - but we would need a way to - // get the hash without persisting the state - var err error - state, err = bcR.blockExec.ApplyBlock(state, firstID, first) - if err != nil { - // TODO This is bad, are we zombie? - panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) - } - blocksSynced++ - - if blocksSynced%100 == 0 { - lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) - bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, - "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) - lastHundred = time.Now() - } - } - continue FOR_LOOP - - case <-bcR.Quit(): - break FOR_LOOP - } - } -} - -// BroadcastStatusRequest broadcasts `BlockStore` height. -func (bcR *BlockchainReactor) BroadcastStatusRequest() error { - msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.store.Height()}) - bcR.Switch.Broadcast(BlockchainChannel, msgBytes) - return nil -} - -//----------------------------------------------------------------------------- -// Messages - -// BlockchainMessage is a generic message for this reactor. -type BlockchainMessage interface { - ValidateBasic() error -} - -func RegisterBlockchainMessages(cdc *amino.Codec) { - cdc.RegisterInterface((*BlockchainMessage)(nil), nil) - cdc.RegisterConcrete(&bcBlockRequestMessage{}, "tendermint/blockchain/BlockRequest", nil) - cdc.RegisterConcrete(&bcBlockResponseMessage{}, "tendermint/blockchain/BlockResponse", nil) - cdc.RegisterConcrete(&bcNoBlockResponseMessage{}, "tendermint/blockchain/NoBlockResponse", nil) - cdc.RegisterConcrete(&bcStatusResponseMessage{}, "tendermint/blockchain/StatusResponse", nil) - cdc.RegisterConcrete(&bcStatusRequestMessage{}, "tendermint/blockchain/StatusRequest", nil) -} - -func decodeMsg(bz []byte) (msg BlockchainMessage, err error) { - if len(bz) > maxMsgSize { - return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize) - } - err = cdc.UnmarshalBinaryBare(bz, &msg) - return -} - -//------------------------------------- - -type bcBlockRequestMessage struct { - Height int64 -} - -// ValidateBasic performs basic validation. -func (m *bcBlockRequestMessage) ValidateBasic() error { - if m.Height < 0 { - return errors.New("Negative Height") - } - return nil -} - -func (m *bcBlockRequestMessage) String() string { - return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height) -} - -type bcNoBlockResponseMessage struct { - Height int64 -} - -// ValidateBasic performs basic validation. -func (m *bcNoBlockResponseMessage) ValidateBasic() error { - if m.Height < 0 { - return errors.New("Negative Height") - } - return nil -} - -func (brm *bcNoBlockResponseMessage) String() string { - return fmt.Sprintf("[bcNoBlockResponseMessage %d]", brm.Height) -} - -//------------------------------------- - -type bcBlockResponseMessage struct { - Block *types.Block -} - -// ValidateBasic performs basic validation. -func (m *bcBlockResponseMessage) ValidateBasic() error { - return m.Block.ValidateBasic() -} - -func (m *bcBlockResponseMessage) String() string { - return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height) -} - -//------------------------------------- - -type bcStatusRequestMessage struct { - Height int64 -} - -// ValidateBasic performs basic validation. -func (m *bcStatusRequestMessage) ValidateBasic() error { - if m.Height < 0 { - return errors.New("Negative Height") - } - return nil -} - -func (m *bcStatusRequestMessage) String() string { - return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height) -} - -//------------------------------------- - -type bcStatusResponseMessage struct { - Height int64 -} - -// ValidateBasic performs basic validation. -func (m *bcStatusResponseMessage) ValidateBasic() error { - if m.Height < 0 { - return errors.New("Negative Height") - } - return nil -} - -func (m *bcStatusResponseMessage) String() string { - return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height) -} diff --git a/blockchain_old/reactor_test.go b/blockchain_old/reactor_test.go deleted file mode 100644 index 2a889a5a..00000000 --- a/blockchain_old/reactor_test.go +++ /dev/null @@ -1,416 +0,0 @@ -package blockchain_old - -import ( - "fmt" - "os" - "sort" - "testing" - "time" - - "github.com/stretchr/testify/assert" - - abci "github.com/tendermint/tendermint/abci/types" - cfg "github.com/tendermint/tendermint/config" - cmn "github.com/tendermint/tendermint/libs/common" - dbm "github.com/tendermint/tendermint/libs/db" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/p2p" - "github.com/tendermint/tendermint/proxy" - sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/types" - tmtime "github.com/tendermint/tendermint/types/time" -) - -var config *cfg.Config - -func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) { - validators := make([]types.GenesisValidator, numValidators) - privValidators := make([]types.PrivValidator, numValidators) - for i := 0; i < numValidators; i++ { - val, privVal := types.RandValidator(randPower, minPower) - validators[i] = types.GenesisValidator{ - PubKey: val.PubKey, - Power: val.VotingPower, - } - privValidators[i] = privVal - } - sort.Sort(types.PrivValidatorsByAddress(privValidators)) - - return &types.GenesisDoc{ - GenesisTime: tmtime.Now(), - ChainID: config.ChainID(), - Validators: validators, - }, privValidators -} - -func makeVote(header *types.Header, blockID types.BlockID, valset *types.ValidatorSet, privVal types.PrivValidator) *types.Vote { - addr := privVal.GetPubKey().Address() - idx, _ := valset.GetByAddress(addr) - vote := &types.Vote{ - ValidatorAddress: addr, - ValidatorIndex: idx, - Height: header.Height, - Round: 1, - Timestamp: tmtime.Now(), - Type: types.PrecommitType, - BlockID: blockID, - } - - privVal.SignVote(header.ChainID, vote) - - return vote -} - -type BlockchainReactorPair struct { - reactor *BlockchainReactor - app proxy.AppConns -} - -func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair { - if len(privVals) != 1 { - panic("only support one validator") - } - - app := &testApp{} - cc := proxy.NewLocalClientCreator(app) - proxyApp := proxy.NewAppConns(cc) - err := proxyApp.Start() - if err != nil { - panic(cmn.ErrorWrap(err, "error start app")) - } - - blockDB := dbm.NewMemDB() - stateDB := dbm.NewMemDB() - blockStore := NewBlockStore(blockDB) - - state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc) - if err != nil { - panic(cmn.ErrorWrap(err, "error constructing state from genesis file")) - } - - // Make the BlockchainReactor itself. - // NOTE we have to create and commit the blocks first because - // pool.height is determined from the store. - fastSync := true - blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(), - sm.MockMempool{}, sm.MockEvidencePool{}) - - // let's add some blocks in - for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { - lastCommit := types.NewCommit(types.BlockID{}, nil) - if blockHeight > 1 { - lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) - lastBlock := blockStore.LoadBlock(blockHeight - 1) - - vote := makeVote(&lastBlock.Header, lastBlockMeta.BlockID, state.Validators, privVals[0]).CommitSig() - lastCommit = types.NewCommit(lastBlockMeta.BlockID, []*types.CommitSig{vote}) - } - - thisBlock := makeBlock(blockHeight, state, lastCommit) - - thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) - blockID := types.BlockID{Hash: thisBlock.Hash(), PartsHeader: thisParts.Header()} - - state, err = blockExec.ApplyBlock(state, blockID, thisBlock) - if err != nil { - panic(cmn.ErrorWrap(err, "error apply block")) - } - - blockStore.SaveBlock(thisBlock, thisParts, lastCommit) - } - - bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) - bcReactor.SetLogger(logger.With("module", "blockchain")) - - return BlockchainReactorPair{bcReactor, proxyApp} -} - -func TestFastSyncNoBlockResponse(t *testing.T) { - config = cfg.ResetTestRoot("blockchain_reactor_test") - defer os.RemoveAll(config.RootDir) - genDoc, privVals := randGenesisDoc(1, false, 30) - - maxBlockHeight := int64(500) - - reactorPairs := make([]BlockchainReactorPair, 2) - - reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - - p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) - return s - - }, p2p.Connect2Switches) - - defer func() { - for _, r := range reactorPairs { - r.reactor.Stop() - r.app.Stop() - } - }() - - tests := []struct { - height int64 - existent bool - }{ - {maxBlockHeight + 2, false}, - {10, true}, - {1, true}, - {maxBlockHeight + 100, false}, - } - - for { - if reactorPairs[1].reactor.pool.IsCaughtUp() { - break - } - - time.Sleep(10 * time.Millisecond) - } - - assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height()) - - for _, tt := range tests { - block := reactorPairs[1].reactor.store.LoadBlock(tt.height) - if tt.existent { - assert.True(t, block != nil) - } else { - assert.True(t, block == nil) - } - } -} - -// NOTE: This is too hard to test without -// an easy way to add test peer to switch -// or without significant refactoring of the module. -// Alternatively we could actually dial a TCP conn but -// that seems extreme. -func TestFastSyncBadBlockStopsPeer(t *testing.T) { - config = cfg.ResetTestRoot("blockchain_reactor_test") - defer os.RemoveAll(config.RootDir) - genDoc, privVals := randGenesisDoc(1, false, 30) - - maxBlockHeight := int64(148) - - otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) - defer func() { - otherChain.reactor.Stop() - otherChain.app.Stop() - }() - - reactorPairs := make([]BlockchainReactorPair, 4) - - reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - - switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) - return s - - }, p2p.Connect2Switches) - - defer func() { - for _, r := range reactorPairs { - r.reactor.Stop() - r.app.Stop() - } - }() - - for { - if reactorPairs[3].reactor.pool.IsCaughtUp() { - break - } - - time.Sleep(1 * time.Second) - } - - //at this time, reactors[0-3] is the newest - assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size()) - - //mark reactorPairs[3] is an invalid peer - reactorPairs[3].reactor.store = otherChain.reactor.store - - lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0) - reactorPairs = append(reactorPairs, lastReactorPair) - - switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) - return s - - }, p2p.Connect2Switches)...) - - for i := 0; i < len(reactorPairs)-1; i++ { - p2p.Connect2Switches(switches, i, len(reactorPairs)-1) - } - - for { - if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { - break - } - - time.Sleep(1 * time.Second) - } - - assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) - assert.Equal(t, lastReactorPair.reactor.pool.maxPeerHeight, lastReactorPair.reactor.pool.height) - -} - -func setupReactors( - numReactors int, maxBlockHeight int64, - genDoc *types.GenesisDoc, privVals []types.PrivValidator) ([]BlockchainReactorPair, []*p2p.Switch) { - - defer os.RemoveAll(config.RootDir) - - reactorPairs := make([]BlockchainReactorPair, numReactors) - - var logger = make([]log.Logger, numReactors) - - for i := 0; i < numReactors; i++ { - logger[i] = log.TestingLogger() - height := int64(0) - if i == 0 { - height = maxBlockHeight - } - reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height) - } - - switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) - return s - - }, p2p.Connect2Switches) - - for i := 0; i < numReactors; i++ { - addr := reactorPairs[i].reactor.Switch.NodeInfo().ID() - moduleName := fmt.Sprintf("blockchain-%v", addr) - reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19])) - } - - return reactorPairs, switches -} - -func TestFastSyncMultiNode(t *testing.T) { - - numNodes := 8 - maxHeight := int64(1000) - //numNodes := 20 - //maxHeight := int64(10000) - - config = cfg.ResetTestRoot("blockchain_reactor_test") - genDoc, privVals := randGenesisDoc(1, false, 30) - - start := time.Now() - - reactorPairs, switches := setupReactors(numNodes, maxHeight, genDoc, privVals) - - defer func() { - for _, r := range reactorPairs { - _ = r.reactor.Stop() - _ = r.app.Stop() - } - }() - -outerFor: - for { - time.Sleep(1 * time.Second) - i := 0 - for i < numNodes { - if !reactorPairs[i].reactor.pool.IsCaughtUp() { - break - } - i++ - } - if i == numNodes { - fmt.Println("SETUP FAST SYNC Duration", time.Since(start)) - break outerFor - } else { - } - } - - //at this time, reactors[0-3] are the newest - assert.Equal(t, numNodes-1, reactorPairs[0].reactor.Switch.Peers().Size()) - - lastLogger := log.TestingLogger() - lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) - reactorPairs = append(reactorPairs, lastReactorPair) - - switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) - return s - - }, p2p.Connect2Switches)...) - - addr := lastReactorPair.reactor.Switch.NodeInfo().ID() - moduleName := fmt.Sprintf("blockchain-%v", addr) - lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19])) - - start = time.Now() - - for i := 0; i < len(reactorPairs)-1; i++ { - p2p.Connect2Switches(switches, i, len(reactorPairs)-1) - } - - for { - time.Sleep(1 * time.Second) - if lastReactorPair.reactor.pool.IsCaughtUp() { - fmt.Println("FAST SYNC Duration", time.Since(start)) - break - } - } - - assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)) - assert.Equal(t, lastReactorPair.reactor.pool.maxPeerHeight, lastReactorPair.reactor.pool.height) - -} - -//---------------------------------------------- -// utility funcs - -func makeTxs(height int64) (txs []types.Tx) { - for i := 0; i < 10; i++ { - txs = append(txs, types.Tx([]byte{byte(height), byte(i)})) - } - return txs -} - -func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block { - block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address) - return block -} - -type testApp struct { - abci.BaseApplication -} - -var _ abci.Application = (*testApp)(nil) - -func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) { - return abci.ResponseInfo{} -} - -func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock { - return abci.ResponseBeginBlock{} -} - -func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { - return abci.ResponseEndBlock{} -} - -func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx { - return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}} -} - -func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx { - return abci.ResponseCheckTx{} -} - -func (app *testApp) Commit() abci.ResponseCommit { - return abci.ResponseCommit{} -} - -func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) { - return -} diff --git a/blockchain_old/store.go b/blockchain_old/store.go deleted file mode 100644 index 712172e6..00000000 --- a/blockchain_old/store.go +++ /dev/null @@ -1,247 +0,0 @@ -package blockchain_old - -import ( - "fmt" - "sync" - - cmn "github.com/tendermint/tendermint/libs/common" - dbm "github.com/tendermint/tendermint/libs/db" - - "github.com/tendermint/tendermint/types" -) - -/* -BlockStore is a simple low level store for blocks. - -There are three types of information stored: - - BlockMeta: Meta information about each block - - Block part: Parts of each block, aggregated w/ PartSet - - Commit: The commit part of each block, for gossiping precommit votes - -Currently the precommit signatures are duplicated in the Block parts as -well as the Commit. In the future this may change, perhaps by moving -the Commit data outside the Block. (TODO) - -// NOTE: BlockStore methods will panic if they encounter errors -// deserializing loaded data, indicating probable corruption on disk. -*/ -type BlockStore struct { - db dbm.DB - - mtx sync.RWMutex - height int64 -} - -// NewBlockStore returns a new BlockStore with the given DB, -// initialized to the last height that was committed to the DB. -func NewBlockStore(db dbm.DB) *BlockStore { - bsjson := LoadBlockStoreStateJSON(db) - return &BlockStore{ - height: bsjson.Height, - db: db, - } -} - -// Height returns the last known contiguous block height. -func (bs *BlockStore) Height() int64 { - bs.mtx.RLock() - defer bs.mtx.RUnlock() - return bs.height -} - -// LoadBlock returns the block with the given height. -// If no block is found for that height, it returns nil. -func (bs *BlockStore) LoadBlock(height int64) *types.Block { - var blockMeta = bs.LoadBlockMeta(height) - if blockMeta == nil { - return nil - } - - var block = new(types.Block) - buf := []byte{} - for i := 0; i < blockMeta.BlockID.PartsHeader.Total; i++ { - part := bs.LoadBlockPart(height, i) - buf = append(buf, part.Bytes...) - } - err := cdc.UnmarshalBinaryLengthPrefixed(buf, block) - if err != nil { - // NOTE: The existence of meta should imply the existence of the - // block. So, make sure meta is only saved after blocks are saved. - panic(cmn.ErrorWrap(err, "Error reading block")) - } - return block -} - -// LoadBlockPart returns the Part at the given index -// from the block at the given height. -// If no part is found for the given height and index, it returns nil. -func (bs *BlockStore) LoadBlockPart(height int64, index int) *types.Part { - var part = new(types.Part) - bz := bs.db.Get(calcBlockPartKey(height, index)) - if len(bz) == 0 { - return nil - } - err := cdc.UnmarshalBinaryBare(bz, part) - if err != nil { - panic(cmn.ErrorWrap(err, "Error reading block part")) - } - return part -} - -// LoadBlockMeta returns the BlockMeta for the given height. -// If no block is found for the given height, it returns nil. -func (bs *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta { - var blockMeta = new(types.BlockMeta) - bz := bs.db.Get(calcBlockMetaKey(height)) - if len(bz) == 0 { - return nil - } - err := cdc.UnmarshalBinaryBare(bz, blockMeta) - if err != nil { - panic(cmn.ErrorWrap(err, "Error reading block meta")) - } - return blockMeta -} - -// LoadBlockCommit returns the Commit for the given height. -// This commit consists of the +2/3 and other Precommit-votes for block at `height`, -// and it comes from the block.LastCommit for `height+1`. -// If no commit is found for the given height, it returns nil. -func (bs *BlockStore) LoadBlockCommit(height int64) *types.Commit { - var commit = new(types.Commit) - bz := bs.db.Get(calcBlockCommitKey(height)) - if len(bz) == 0 { - return nil - } - err := cdc.UnmarshalBinaryBare(bz, commit) - if err != nil { - panic(cmn.ErrorWrap(err, "Error reading block commit")) - } - return commit -} - -// LoadSeenCommit returns the locally seen Commit for the given height. -// This is useful when we've seen a commit, but there has not yet been -// a new block at `height + 1` that includes this commit in its block.LastCommit. -func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit { - var commit = new(types.Commit) - bz := bs.db.Get(calcSeenCommitKey(height)) - if len(bz) == 0 { - return nil - } - err := cdc.UnmarshalBinaryBare(bz, commit) - if err != nil { - panic(cmn.ErrorWrap(err, "Error reading block seen commit")) - } - return commit -} - -// SaveBlock persists the given block, blockParts, and seenCommit to the underlying db. -// blockParts: Must be parts of the block -// seenCommit: The +2/3 precommits that were seen which committed at height. -// If all the nodes restart after committing a block, -// we need this to reload the precommits to catch-up nodes to the -// most recent height. Otherwise they'd stall at H-1. -func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { - if block == nil { - cmn.PanicSanity("BlockStore can only save a non-nil block") - } - height := block.Height - if g, w := height, bs.Height()+1; g != w { - cmn.PanicSanity(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", w, g)) - } - if !blockParts.IsComplete() { - cmn.PanicSanity(fmt.Sprintf("BlockStore can only save complete block part sets")) - } - - // Save block meta - blockMeta := types.NewBlockMeta(block, blockParts) - metaBytes := cdc.MustMarshalBinaryBare(blockMeta) - bs.db.Set(calcBlockMetaKey(height), metaBytes) - - // Save block parts - for i := 0; i < blockParts.Total(); i++ { - part := blockParts.GetPart(i) - bs.saveBlockPart(height, i, part) - } - - // Save block commit (duplicate and separate from the Block) - blockCommitBytes := cdc.MustMarshalBinaryBare(block.LastCommit) - bs.db.Set(calcBlockCommitKey(height-1), blockCommitBytes) - - // Save seen commit (seen +2/3 precommits for block) - // NOTE: we can delete this at a later height - seenCommitBytes := cdc.MustMarshalBinaryBare(seenCommit) - bs.db.Set(calcSeenCommitKey(height), seenCommitBytes) - - // Save new BlockStoreStateJSON descriptor - BlockStoreStateJSON{Height: height}.Save(bs.db) - - // Done! - bs.mtx.Lock() - bs.height = height - bs.mtx.Unlock() - - // Flush - bs.db.SetSync(nil, nil) -} - -func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) { - if height != bs.Height()+1 { - cmn.PanicSanity(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height)) - } - partBytes := cdc.MustMarshalBinaryBare(part) - bs.db.Set(calcBlockPartKey(height, index), partBytes) -} - -//----------------------------------------------------------------------------- - -func calcBlockMetaKey(height int64) []byte { - return []byte(fmt.Sprintf("H:%v", height)) -} - -func calcBlockPartKey(height int64, partIndex int) []byte { - return []byte(fmt.Sprintf("P:%v:%v", height, partIndex)) -} - -func calcBlockCommitKey(height int64) []byte { - return []byte(fmt.Sprintf("C:%v", height)) -} - -func calcSeenCommitKey(height int64) []byte { - return []byte(fmt.Sprintf("SC:%v", height)) -} - -//----------------------------------------------------------------------------- - -var blockStoreKey = []byte("blockStore") - -type BlockStoreStateJSON struct { - Height int64 `json:"height"` -} - -// Save persists the blockStore state to the database as JSON. -func (bsj BlockStoreStateJSON) Save(db dbm.DB) { - bytes, err := cdc.MarshalJSON(bsj) - if err != nil { - cmn.PanicSanity(fmt.Sprintf("Could not marshal state bytes: %v", err)) - } - db.SetSync(blockStoreKey, bytes) -} - -// LoadBlockStoreStateJSON returns the BlockStoreStateJSON as loaded from disk. -// If no BlockStoreStateJSON was previously persisted, it returns the zero value. -func LoadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON { - bytes := db.Get(blockStoreKey) - if len(bytes) == 0 { - return BlockStoreStateJSON{ - Height: 0, - } - } - bsj := BlockStoreStateJSON{} - err := cdc.UnmarshalJSON(bytes, &bsj) - if err != nil { - panic(fmt.Sprintf("Could not unmarshal bytes: %X", bytes)) - } - return bsj -} diff --git a/blockchain_old/store_test.go b/blockchain_old/store_test.go deleted file mode 100644 index 24740a84..00000000 --- a/blockchain_old/store_test.go +++ /dev/null @@ -1,421 +0,0 @@ -package blockchain_old - -import ( - "bytes" - "fmt" - "os" - "runtime/debug" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - cfg "github.com/tendermint/tendermint/config" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/db" - dbm "github.com/tendermint/tendermint/libs/db" - "github.com/tendermint/tendermint/libs/log" - sm "github.com/tendermint/tendermint/state" - - "github.com/tendermint/tendermint/types" - tmtime "github.com/tendermint/tendermint/types/time" -) - -// A cleanupFunc cleans up any config / test files created for a particular -// test. -type cleanupFunc func() - -// make a Commit with a single vote containing just the height and a timestamp -func makeTestCommit(height int64, timestamp time.Time) *types.Commit { - commitSigs := []*types.CommitSig{{Height: height, Timestamp: timestamp}} - return types.NewCommit(types.BlockID{}, commitSigs) -} - -func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore, cleanupFunc) { - config := cfg.ResetTestRoot("blockchain_reactor_test") - // blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB()) - // stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB()) - blockDB := dbm.NewMemDB() - stateDB := dbm.NewMemDB() - state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile()) - if err != nil { - panic(cmn.ErrorWrap(err, "error constructing state from genesis file")) - } - return state, NewBlockStore(blockDB), func() { os.RemoveAll(config.RootDir) } -} - -func TestLoadBlockStoreStateJSON(t *testing.T) { - db := db.NewMemDB() - - bsj := &BlockStoreStateJSON{Height: 1000} - bsj.Save(db) - - retrBSJ := LoadBlockStoreStateJSON(db) - - assert.Equal(t, *bsj, retrBSJ, "expected the retrieved DBs to match") -} - -func TestNewBlockStore(t *testing.T) { - db := db.NewMemDB() - db.Set(blockStoreKey, []byte(`{"height": "10000"}`)) - bs := NewBlockStore(db) - require.Equal(t, int64(10000), bs.Height(), "failed to properly parse blockstore") - - panicCausers := []struct { - data []byte - wantErr string - }{ - {[]byte("artful-doger"), "not unmarshal bytes"}, - {[]byte(" "), "unmarshal bytes"}, - } - - for i, tt := range panicCausers { - // Expecting a panic here on trying to parse an invalid blockStore - _, _, panicErr := doFn(func() (interface{}, error) { - db.Set(blockStoreKey, tt.data) - _ = NewBlockStore(db) - return nil, nil - }) - require.NotNil(t, panicErr, "#%d panicCauser: %q expected a panic", i, tt.data) - assert.Contains(t, fmt.Sprintf("%#v", panicErr), tt.wantErr, "#%d data: %q", i, tt.data) - } - - db.Set(blockStoreKey, nil) - bs = NewBlockStore(db) - assert.Equal(t, bs.Height(), int64(0), "expecting nil bytes to be unmarshaled alright") -} - -func freshBlockStore() (*BlockStore, db.DB) { - db := db.NewMemDB() - return NewBlockStore(db), db -} - -var ( - state sm.State - block *types.Block - partSet *types.PartSet - part1 *types.Part - part2 *types.Part - seenCommit1 *types.Commit -) - -func TestMain(m *testing.M) { - var cleanup cleanupFunc - state, _, cleanup = makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) - block = makeBlock(1, state, new(types.Commit)) - partSet = block.MakePartSet(2) - part1 = partSet.GetPart(0) - part2 = partSet.GetPart(1) - seenCommit1 = makeTestCommit(10, tmtime.Now()) - code := m.Run() - cleanup() - os.Exit(code) -} - -// TODO: This test should be simplified ... - -func TestBlockStoreSaveLoadBlock(t *testing.T) { - state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) - defer cleanup() - require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") - - // check there are no blocks at various heights - noBlockHeights := []int64{0, -1, 100, 1000, 2} - for i, height := range noBlockHeights { - if g := bs.LoadBlock(height); g != nil { - t.Errorf("#%d: height(%d) got a block; want nil", i, height) - } - } - - // save a block - block := makeBlock(bs.Height()+1, state, new(types.Commit)) - validPartSet := block.MakePartSet(2) - seenCommit := makeTestCommit(10, tmtime.Now()) - bs.SaveBlock(block, partSet, seenCommit) - require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed") - - incompletePartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 2}) - uncontiguousPartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 0}) - uncontiguousPartSet.AddPart(part2) - - header1 := types.Header{ - Height: 1, - NumTxs: 100, - ChainID: "block_test", - Time: tmtime.Now(), - } - header2 := header1 - header2.Height = 4 - - // End of setup, test data - - commitAtH10 := makeTestCommit(10, tmtime.Now()) - tuples := []struct { - block *types.Block - parts *types.PartSet - seenCommit *types.Commit - wantErr bool - wantPanic string - - corruptBlockInDB bool - corruptCommitInDB bool - corruptSeenCommitInDB bool - eraseCommitInDB bool - eraseSeenCommitInDB bool - }{ - { - block: newBlock(header1, commitAtH10), - parts: validPartSet, - seenCommit: seenCommit1, - }, - - { - block: nil, - wantPanic: "only save a non-nil block", - }, - - { - block: newBlock(header2, commitAtH10), - parts: uncontiguousPartSet, - wantPanic: "only save contiguous blocks", // and incomplete and uncontiguous parts - }, - - { - block: newBlock(header1, commitAtH10), - parts: incompletePartSet, - wantPanic: "only save complete block", // incomplete parts - }, - - { - block: newBlock(header1, commitAtH10), - parts: validPartSet, - seenCommit: seenCommit1, - corruptCommitInDB: true, // Corrupt the DB's commit entry - wantPanic: "unmarshal to types.Commit failed", - }, - - { - block: newBlock(header1, commitAtH10), - parts: validPartSet, - seenCommit: seenCommit1, - wantPanic: "unmarshal to types.BlockMeta failed", - corruptBlockInDB: true, // Corrupt the DB's block entry - }, - - { - block: newBlock(header1, commitAtH10), - parts: validPartSet, - seenCommit: seenCommit1, - - // Expecting no error and we want a nil back - eraseSeenCommitInDB: true, - }, - - { - block: newBlock(header1, commitAtH10), - parts: validPartSet, - seenCommit: seenCommit1, - - corruptSeenCommitInDB: true, - wantPanic: "unmarshal to types.Commit failed", - }, - - { - block: newBlock(header1, commitAtH10), - parts: validPartSet, - seenCommit: seenCommit1, - - // Expecting no error and we want a nil back - eraseCommitInDB: true, - }, - } - - type quad struct { - block *types.Block - commit *types.Commit - meta *types.BlockMeta - - seenCommit *types.Commit - } - - for i, tuple := range tuples { - bs, db := freshBlockStore() - // SaveBlock - res, err, panicErr := doFn(func() (interface{}, error) { - bs.SaveBlock(tuple.block, tuple.parts, tuple.seenCommit) - if tuple.block == nil { - return nil, nil - } - - if tuple.corruptBlockInDB { - db.Set(calcBlockMetaKey(tuple.block.Height), []byte("block-bogus")) - } - bBlock := bs.LoadBlock(tuple.block.Height) - bBlockMeta := bs.LoadBlockMeta(tuple.block.Height) - - if tuple.eraseSeenCommitInDB { - db.Delete(calcSeenCommitKey(tuple.block.Height)) - } - if tuple.corruptSeenCommitInDB { - db.Set(calcSeenCommitKey(tuple.block.Height), []byte("bogus-seen-commit")) - } - bSeenCommit := bs.LoadSeenCommit(tuple.block.Height) - - commitHeight := tuple.block.Height - 1 - if tuple.eraseCommitInDB { - db.Delete(calcBlockCommitKey(commitHeight)) - } - if tuple.corruptCommitInDB { - db.Set(calcBlockCommitKey(commitHeight), []byte("foo-bogus")) - } - bCommit := bs.LoadBlockCommit(commitHeight) - return &quad{block: bBlock, seenCommit: bSeenCommit, commit: bCommit, - meta: bBlockMeta}, nil - }) - - if subStr := tuple.wantPanic; subStr != "" { - if panicErr == nil { - t.Errorf("#%d: want a non-nil panic", i) - } else if got := fmt.Sprintf("%#v", panicErr); !strings.Contains(got, subStr) { - t.Errorf("#%d:\n\tgotErr: %q\nwant substring: %q", i, got, subStr) - } - continue - } - - if tuple.wantErr { - if err == nil { - t.Errorf("#%d: got nil error", i) - } - continue - } - - assert.Nil(t, panicErr, "#%d: unexpected panic", i) - assert.Nil(t, err, "#%d: expecting a non-nil error", i) - qua, ok := res.(*quad) - if !ok || qua == nil { - t.Errorf("#%d: got nil quad back; gotType=%T", i, res) - continue - } - if tuple.eraseSeenCommitInDB { - assert.Nil(t, qua.seenCommit, - "erased the seenCommit in the DB hence we should get back a nil seenCommit") - } - if tuple.eraseCommitInDB { - assert.Nil(t, qua.commit, - "erased the commit in the DB hence we should get back a nil commit") - } - } -} - -func TestLoadBlockPart(t *testing.T) { - bs, db := freshBlockStore() - height, index := int64(10), 1 - loadPart := func() (interface{}, error) { - part := bs.LoadBlockPart(height, index) - return part, nil - } - - // Initially no contents. - // 1. Requesting for a non-existent block shouldn't fail - res, _, panicErr := doFn(loadPart) - require.Nil(t, panicErr, "a non-existent block part shouldn't cause a panic") - require.Nil(t, res, "a non-existent block part should return nil") - - // 2. Next save a corrupted block then try to load it - db.Set(calcBlockPartKey(height, index), []byte("Tendermint")) - res, _, panicErr = doFn(loadPart) - require.NotNil(t, panicErr, "expecting a non-nil panic") - require.Contains(t, panicErr.Error(), "unmarshal to types.Part failed") - - // 3. A good block serialized and saved to the DB should be retrievable - db.Set(calcBlockPartKey(height, index), cdc.MustMarshalBinaryBare(part1)) - gotPart, _, panicErr := doFn(loadPart) - require.Nil(t, panicErr, "an existent and proper block should not panic") - require.Nil(t, res, "a properly saved block should return a proper block") - require.Equal(t, gotPart.(*types.Part), part1, - "expecting successful retrieval of previously saved block") -} - -func TestLoadBlockMeta(t *testing.T) { - bs, db := freshBlockStore() - height := int64(10) - loadMeta := func() (interface{}, error) { - meta := bs.LoadBlockMeta(height) - return meta, nil - } - - // Initially no contents. - // 1. Requesting for a non-existent blockMeta shouldn't fail - res, _, panicErr := doFn(loadMeta) - require.Nil(t, panicErr, "a non-existent blockMeta shouldn't cause a panic") - require.Nil(t, res, "a non-existent blockMeta should return nil") - - // 2. Next save a corrupted blockMeta then try to load it - db.Set(calcBlockMetaKey(height), []byte("Tendermint-Meta")) - res, _, panicErr = doFn(loadMeta) - require.NotNil(t, panicErr, "expecting a non-nil panic") - require.Contains(t, panicErr.Error(), "unmarshal to types.BlockMeta") - - // 3. A good blockMeta serialized and saved to the DB should be retrievable - meta := &types.BlockMeta{} - db.Set(calcBlockMetaKey(height), cdc.MustMarshalBinaryBare(meta)) - gotMeta, _, panicErr := doFn(loadMeta) - require.Nil(t, panicErr, "an existent and proper block should not panic") - require.Nil(t, res, "a properly saved blockMeta should return a proper blocMeta ") - require.Equal(t, cdc.MustMarshalBinaryBare(meta), cdc.MustMarshalBinaryBare(gotMeta), - "expecting successful retrieval of previously saved blockMeta") -} - -func TestBlockFetchAtHeight(t *testing.T) { - state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) - defer cleanup() - require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") - block := makeBlock(bs.Height()+1, state, new(types.Commit)) - - partSet := block.MakePartSet(2) - seenCommit := makeTestCommit(10, tmtime.Now()) - bs.SaveBlock(block, partSet, seenCommit) - require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed") - - blockAtHeight := bs.LoadBlock(bs.Height()) - bz1 := cdc.MustMarshalBinaryBare(block) - bz2 := cdc.MustMarshalBinaryBare(blockAtHeight) - require.Equal(t, bz1, bz2) - require.Equal(t, block.Hash(), blockAtHeight.Hash(), - "expecting a successful load of the last saved block") - - blockAtHeightPlus1 := bs.LoadBlock(bs.Height() + 1) - require.Nil(t, blockAtHeightPlus1, "expecting an unsuccessful load of Height()+1") - blockAtHeightPlus2 := bs.LoadBlock(bs.Height() + 2) - require.Nil(t, blockAtHeightPlus2, "expecting an unsuccessful load of Height()+2") -} - -func doFn(fn func() (interface{}, error)) (res interface{}, err error, panicErr error) { - defer func() { - if r := recover(); r != nil { - switch e := r.(type) { - case error: - panicErr = e - case string: - panicErr = fmt.Errorf("%s", e) - default: - if st, ok := r.(fmt.Stringer); ok { - panicErr = fmt.Errorf("%s", st) - } else { - panicErr = fmt.Errorf("%s", debug.Stack()) - } - } - } - }() - - res, err = fn() - return res, err, panicErr -} - -func newBlock(hdr types.Header, lastCommit *types.Commit) *types.Block { - return &types.Block{ - Header: hdr, - LastCommit: lastCommit, - } -} diff --git a/blockchain_old/wire.go b/blockchain_old/wire.go deleted file mode 100644 index fd46b34f..00000000 --- a/blockchain_old/wire.go +++ /dev/null @@ -1,13 +0,0 @@ -package blockchain_old - -import ( - amino "github.com/tendermint/go-amino" - "github.com/tendermint/tendermint/types" -) - -var cdc = amino.NewCodec() - -func init() { - RegisterBlockchainMessages(cdc) - types.RegisterBlockAmino(cdc) -}