fix lint, remove old blockchain reactor and duplicates in fsm tests

This commit is contained in:
Anca Zamfir 2019-06-02 21:41:24 +02:00
parent ac004e49a0
commit 81d121b7d2
No known key found for this signature in database
GPG Key ID: 61B09A558E4B6A7D
8 changed files with 132 additions and 2798 deletions

View File

@ -2,12 +2,9 @@ package blockchain
import (
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
@ -15,46 +12,29 @@ import (
"github.com/tendermint/tendermint/types"
)
// reactor for FSM testing
type testReactor struct {
logger log.Logger
fsm *bReactorFSM
}
func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error {
return fsm.handle(&bcReactorMessage{event: ev, data: data})
}
var (
testMutex sync.Mutex
numStatusRequests int
numBlockRequests int32
)
type lastBlockRequestT struct {
peerID p2p.ID
height int64
}
var lastBlockRequest lastBlockRequestT
type lastPeerErrorT struct {
peerID p2p.ID
err error
}
var lastPeerError lastPeerErrorT
// reactor for FSM testing
type testReactor struct {
logger log.Logger
fsm *bReactorFSM
numStatusRequests int
numBlockRequests int32
lastBlockRequest lastBlockRequestT
lastPeerError lastPeerErrorT
stateTimerStarts map[string]int
}
var stateTimerStarts map[string]int
func resetTestValues() {
stateTimerStarts = make(map[string]int)
numStatusRequests = 0
numBlockRequests = 0
lastBlockRequest.peerID = ""
lastBlockRequest.height = 0
lastPeerError.peerID = ""
lastPeerError.err = nil
func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error {
return fsm.handle(&bcReactorMessage{event: ev, data: data})
}
type fsmStepTestValues struct {
@ -148,6 +128,7 @@ func makeStepMakeRequestsEvErrored(current, expected string,
expectedState: expected,
errWanted: err,
peersNotInPool: peersRemoved,
blockReqIncreased: true,
}
}
@ -201,7 +182,7 @@ func makeStepPeerRemoveEv(current, expected string, peerID p2p.ID, err error, pe
// --------------------------------------------
func newTestReactor(height int64) *testReactor {
testBcR := &testReactor{logger: log.TestingLogger()}
testBcR := &testReactor{logger: log.TestingLogger(), stateTimerStarts: make(map[string]int)}
testBcR.fsm = NewFSM(height, testBcR)
testBcR.fsm.setLogger(testBcR.logger)
return testBcR
@ -220,27 +201,79 @@ func fixBlockResponseEvStep(step *fsmStepTestValues, testBcR *testReactor) {
}
}
func shouldApplyProcessedBlockEvStep(step *fsmStepTestValues, testBcR *testReactor) bool {
if step.event == processedBlockEv {
_, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height)
if err == errMissingBlock {
return false
}
_, err = testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1)
if err == errMissingBlock {
return false
}
}
return true
}
func TestFSMBasic(t *testing.T) {
tests := []struct {
type testFields struct {
name string
startingHeight int64
maxRequestsPerPeer int32
maxPendingRequests int32
steps []fsmStepTestValues
}{
}
func executeFSMTests(t *testing.T, tests []testFields, matchRespToReq bool) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
if tt.maxRequestsPerPeer != 0 {
maxRequestsPerPeer = tt.maxRequestsPerPeer
}
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name)
var heightBefore int64
if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure {
heightBefore = testBcR.fsm.pool.height
}
oldNumStatusRequests := testBcR.numStatusRequests
oldNumBlockRequests := testBcR.numBlockRequests
if matchRespToReq {
fixBlockResponseEvStep(&step, testBcR)
}
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, testBcR.numStatusRequests)
} else {
assert.Equal(t, oldNumStatusRequests, testBcR.numStatusRequests)
}
if step.blockReqIncreased {
assert.True(t, oldNumBlockRequests < testBcR.numBlockRequests)
} else {
assert.Equal(t, oldNumBlockRequests, testBcR.numBlockRequests)
}
for _, height := range step.blocksAdded {
_, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(height)
assert.Nil(t, err)
}
if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure {
heightAfter := testBcR.fsm.pool.height
assert.Equal(t, heightBefore, heightAfter)
firstAfter, err1 := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height)
secondAfter, err2 := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1)
assert.NotNil(t, err1)
assert.NotNil(t, err2)
assert.Nil(t, firstAfter)
assert.Nil(t, secondAfter)
}
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
if step.expectedState == "finished" {
assert.True(t, testBcR.fsm.isCaughtUp())
}
}
})
}
}
func TestFSMBasic(t *testing.T) {
tests := []testFields{
{
name: "one block, one peer - TS2",
startingHeight: 1,
@ -292,59 +325,11 @@ func TestFSMBasic(t *testing.T) {
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
resetTestValues()
if tt.maxRequestsPerPeer != 0 {
maxRequestsPerPeer = tt.maxRequestsPerPeer
}
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name)
oldNumStatusRequests := numStatusRequests
oldNumBlockRequests := numBlockRequests
fixBlockResponseEvStep(&step, testBcR)
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
} else {
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
}
if step.blockReqIncreased {
assert.True(t, oldNumBlockRequests < numBlockRequests)
} else {
assert.Equal(t, oldNumBlockRequests, numBlockRequests)
}
for _, height := range step.blocksAdded {
_, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(height)
assert.Nil(t, err)
}
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
if step.expectedState == "finished" {
assert.True(t, testBcR.fsm.isCaughtUp())
}
}
})
}
executeFSMTests(t, tests, true)
}
func TestFSMBlockVerificationFailure(t *testing.T) {
tests := []struct {
name string
startingHeight int64
maxRequestsPerPeer int32
steps []fsmStepTestValues
}{
tests := []testFields{
{
name: "block verification failure - TS2 variant",
startingHeight: 1,
@ -388,73 +373,11 @@ func TestFSMBlockVerificationFailure(t *testing.T) {
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
resetTestValues()
if tt.maxRequestsPerPeer != 0 {
maxRequestsPerPeer = tt.maxRequestsPerPeer
}
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name)
oldNumStatusRequests := numStatusRequests
oldNumBlockRequests := numBlockRequests
var heightBefore int64
if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure {
heightBefore = testBcR.fsm.pool.height
}
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
} else {
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
}
if step.blockReqIncreased {
assert.True(t, oldNumBlockRequests < numBlockRequests)
} else {
assert.Equal(t, oldNumBlockRequests, numBlockRequests)
}
for _, height := range step.blocksAdded {
_, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(height)
assert.Nil(t, err)
}
if step.event == processedBlockEv && step.data.err == errBlockVerificationFailure {
heightAfter := testBcR.fsm.pool.height
assert.Equal(t, heightBefore, heightAfter)
firstAfter, err1 := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height)
secondAfter, err2 := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1)
assert.NotNil(t, err1)
assert.NotNil(t, err2)
assert.Nil(t, firstAfter)
assert.Nil(t, secondAfter)
}
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
if step.expectedState == "finished" {
assert.True(t, testBcR.fsm.isCaughtUp())
}
}
})
}
executeFSMTests(t, tests, false)
}
func TestFSMBadBlockFromPeer(t *testing.T) {
tests := []struct {
name string
startingHeight int64
maxRequestsPerPeer int32
steps []fsmStepTestValues
}{
tests := []testFields{
{
name: "block we haven't asked for",
startingHeight: 1,
@ -529,54 +452,11 @@ func TestFSMBadBlockFromPeer(t *testing.T) {
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
resetTestValues()
if tt.maxRequestsPerPeer != 0 {
maxRequestsPerPeer = tt.maxRequestsPerPeer
}
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name)
oldNumStatusRequests := numStatusRequests
oldNumBlockRequests := numBlockRequests
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
} else {
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
}
if step.blockReqIncreased {
assert.True(t, oldNumBlockRequests < numBlockRequests)
} else {
assert.Equal(t, oldNumBlockRequests, numBlockRequests)
}
for _, height := range step.blocksAdded {
_, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(height)
assert.Nil(t, err)
}
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
}
})
}
executeFSMTests(t, tests, false)
}
func TestFSMBlockAtCurrentHeightDoesNotArriveInTime(t *testing.T) {
tests := []struct {
name string
startingHeight int64
maxRequestsPerPeer int32
steps []fsmStepTestValues
}{
tests := []testFields{
{
name: "block at current height undelivered - TS5",
startingHeight: 1,
@ -640,54 +520,12 @@ func TestFSMBlockAtCurrentHeightDoesNotArriveInTime(t *testing.T) {
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
resetTestValues()
if tt.maxRequestsPerPeer != 0 {
maxRequestsPerPeer = tt.maxRequestsPerPeer
}
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name)
oldNumStatusRequests := numStatusRequests
oldNumBlockRequests := numBlockRequests
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
} else {
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
}
if step.blockReqIncreased {
assert.True(t, oldNumBlockRequests < numBlockRequests)
} else {
assert.Equal(t, oldNumBlockRequests, numBlockRequests)
}
for _, height := range step.blocksAdded {
_, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(height)
assert.Nil(t, err)
}
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
}
})
}
executeFSMTests(t, tests, true)
}
func TestFSMPeerRelatedEvents(t *testing.T) {
tests := []struct {
name string
startingHeight int64
maxRequestsPerPeer int32
steps []fsmStepTestValues
}{
tests := []testFields{
{
name: "peer remove event with no blocks",
startingHeight: 1,
@ -734,7 +572,7 @@ func TestFSMPeerRelatedEvents(t *testing.T) {
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 100, []int64{}),
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 101, []int64{1}),
"P1", 101, []int64{100}),
// processed block at heights 1 and 2
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil),
@ -761,7 +599,7 @@ func TestFSMPeerRelatedEvents(t *testing.T) {
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 100, []int64{}),
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 101, []int64{1}),
"P1", 101, []int64{100}),
// processed block at heights 1 and 2
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil),
@ -820,46 +658,11 @@ func TestFSMPeerRelatedEvents(t *testing.T) {
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
resetTestValues()
if tt.maxRequestsPerPeer != 0 {
maxRequestsPerPeer = tt.maxRequestsPerPeer
}
for _, step := range tt.steps {
require.Equal(t, step.currentState, testBcR.fsm.state.name)
oldNumStatusRequests := numStatusRequests
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
} else {
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
}
for _, peerID := range step.peersNotInPool {
_, ok := testBcR.fsm.pool.peers[peerID]
assert.False(t, ok)
}
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
}
})
}
executeFSMTests(t, tests, true)
}
func TestFSMStopFSM(t *testing.T) {
tests := []struct {
name string
startingHeight int64
steps []fsmStepTestValues
}{
tests := []testFields{
{
name: "stopFSMEv in unknown",
steps: []fsmStepTestValues{
@ -890,28 +693,11 @@ func TestFSMStopFSM(t *testing.T) {
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
resetTestValues()
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name)
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
}
})
}
executeFSMTests(t, tests, false)
}
func TestFSMUnknownElements(t *testing.T) {
tests := []struct {
name string
startingHeight int64
steps []fsmStepTestValues
}{
tests := []testFields{
{
name: "unknown event for state unknown",
steps: []fsmStepTestValues{
@ -936,29 +722,11 @@ func TestFSMUnknownElements(t *testing.T) {
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
resetTestValues()
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name)
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
}
})
}
executeFSMTests(t, tests, false)
}
func TestFSMPeerStateTimeoutEvent(t *testing.T) {
tests := []struct {
name string
startingHeight int64
maxRequestsPerPeer int32
steps []fsmStepTestValues
}{
tests := []testFields{
{
name: "timeout event for state waitForPeer while in state waitForPeer - TS1",
startingHeight: 1,
@ -1013,28 +781,7 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) {
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
resetTestValues()
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name)
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
}
})
}
}
type testFields struct {
name string
startingHeight int64
maxRequestsPerPeer int32
maxPendingRequests int32
steps []fsmStepTestValues
executeFSMTests(t, tests, false)
}
func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPeers int, randomPeerHeights bool,
@ -1155,6 +902,20 @@ func makeCorrectTransitionSequenceWithRandomParameters() testFields {
return makeCorrectTransitionSequence(startingHeight, numBlocks, numPeers, true, maxRequestsPerPeer, maxPendingRequests)
}
func shouldApplyProcessedBlockEvStep(step *fsmStepTestValues, testBcR *testReactor) bool {
if step.event == processedBlockEv {
_, err := testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height)
if err == errMissingBlock {
return false
}
_, err = testBcR.fsm.pool.GetBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1)
if err == errMissingBlock {
return false
}
}
return true
}
func TestFSMCorrectTransitionSequences(t *testing.T) {
tests := []testFields{
@ -1166,7 +927,6 @@ func TestFSMCorrectTransitionSequences(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
resetTestValues()
if tt.maxRequestsPerPeer != 0 {
maxRequestsPerPeer = tt.maxRequestsPerPeer
@ -1175,7 +935,7 @@ func TestFSMCorrectTransitionSequences(t *testing.T) {
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name)
oldNumStatusRequests := numStatusRequests
oldNumStatusRequests := testBcR.numStatusRequests
fixBlockResponseEvStep(&step, testBcR)
if !shouldApplyProcessedBlockEvStep(&step, testBcR) {
continue
@ -1185,9 +945,9 @@ func TestFSMCorrectTransitionSequences(t *testing.T) {
assert.Equal(t, step.errWanted, fsmErr)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
assert.Equal(t, oldNumStatusRequests+1, testBcR.numStatusRequests)
} else {
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
assert.Equal(t, oldNumStatusRequests, testBcR.numStatusRequests)
}
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
@ -1203,23 +963,21 @@ func TestFSMCorrectTransitionSequences(t *testing.T) {
// ----------------------------------------
// implements the bcRNotifier
func (testR *testReactor) sendPeerError(err error, peerID p2p.ID) {
testMutex.Lock()
defer testMutex.Unlock()
testR.logger.Info("Reactor received sendPeerError call from FSM", "peer", peerID, "err", err)
lastPeerError.peerID = peerID
lastPeerError.err = err
testR.lastPeerError.peerID = peerID
testR.lastPeerError.err = err
}
func (testR *testReactor) sendStatusRequest() {
testR.logger.Info("Reactor received sendStatusRequest call from FSM")
numStatusRequests++
testR.numStatusRequests++
}
func (testR *testReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
testR.logger.Info("Reactor received sendBlockRequest call from FSM", "peer", peerID, "height", height)
numBlockRequests++
lastBlockRequest.peerID = peerID
lastBlockRequest.height = height
testR.numBlockRequests++
testR.lastBlockRequest.peerID = peerID
testR.lastBlockRequest.height = height
if height == 9999999 {
// simulate switch does not have peer
return errNilPeerForBlockRequest
@ -1229,15 +987,14 @@ func (testR *testReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
func (testR *testReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) {
testR.logger.Info("Reactor received resetStateTimer call from FSM", "state", name, "timeout", timeout)
if _, ok := stateTimerStarts[name]; !ok {
stateTimerStarts[name] = 1
if _, ok := testR.stateTimerStarts[name]; !ok {
testR.stateTimerStarts[name] = 1
} else {
stateTimerStarts[name]++
testR.stateTimerStarts[name]++
}
}
func (testR *testReactor) switchToConsensus() {
}
// ----------------------------------------

View File

@ -1,630 +0,0 @@
package blockchain_old
import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
cmn "github.com/tendermint/tendermint/libs/common"
flow "github.com/tendermint/tendermint/libs/flowrate"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
/*
eg, L = latency = 0.1s
P = num peers = 10
FN = num full nodes
BS = 1kB block size
CB = 1 Mbit/s = 128 kB/s
CB/P = 12.8 kB
B/S = CB/P/BS = 12.8 blocks/s
12.8 * 0.1 = 1.28 blocks on conn
*/
const (
requestIntervalMS = 2
maxTotalRequesters = 600
maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 20
// Minimum recv rate to ensure we're receiving blocks from a peer fast
// enough. If a peer is not sending us data at at least that rate, we
// consider them to have timedout and we disconnect.
//
// Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s,
// sending data across atlantic ~ 7.5 KB/s.
minRecvRate = 7680
// Maximum difference between current and new block's height.
maxDiffBetweenCurrentAndReceivedBlockHeight = 100
)
var peerTimeout = 15 * time.Second // not const so we can override with tests
/*
Peers self report their heights when we join the block pool.
Starting from our latest pool.height, we request blocks
in sequence from peers that reported higher heights than ours.
Every so often we ask peers what height they're on so we can keep going.
Requests are continuously made for blocks of higher heights until
the limit is reached. If most of the requests have no available peers, and we
are not at peer limits, we can probably switch to consensus reactor
*/
type BlockPool struct {
cmn.BaseService
startTime time.Time
mtx sync.Mutex
// block requests
requesters map[int64]*bpRequester
height int64 // the lowest key in requesters.
// peers
peers map[p2p.ID]*bpPeer
maxPeerHeight int64 // the biggest reported height
// atomic
numPending int32 // number of requests pending assignment or block response
requestsCh chan<- BlockRequest
errorsCh chan<- peerError
}
// NewBlockPool returns a new BlockPool with the height equal to start. Block
// requests and errors will be sent to requestsCh and errorsCh accordingly.
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
bp := &BlockPool{
peers: make(map[p2p.ID]*bpPeer),
requesters: make(map[int64]*bpRequester),
height: start,
numPending: 0,
requestsCh: requestsCh,
errorsCh: errorsCh,
}
bp.BaseService = *cmn.NewBaseService(nil, "BlockPool", bp)
return bp
}
// OnStart implements cmn.Service by spawning requesters routine and recording
// pool's start time.
func (pool *BlockPool) OnStart() error {
go pool.makeRequestersRoutine()
pool.startTime = time.Now()
return nil
}
// spawns requesters as needed
func (pool *BlockPool) makeRequestersRoutine() {
for {
if !pool.IsRunning() {
break
}
_, numPending, lenRequesters := pool.GetStatus()
if numPending >= maxPendingRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
} else if lenRequesters >= maxTotalRequesters {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
} else {
// request for more blocks.
pool.makeNextRequester()
}
}
}
func (pool *BlockPool) removeTimedoutPeers() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
for _, peer := range pool.peers {
if !peer.didTimeout && peer.numPending > 0 {
curRate := peer.recvMonitor.Status().CurRate
// curRate can be 0 on start
if curRate != 0 && curRate < minRecvRate {
err := errors.New("peer is not sending us data fast enough")
pool.sendError(err, peer.id)
pool.Logger.Error("SendTimeout", "peer", peer.id,
"reason", err,
"curRate", fmt.Sprintf("%d KB/s", curRate/1024),
"minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024))
peer.didTimeout = true
}
}
if peer.didTimeout {
pool.removePeer(peer.id)
}
}
}
// GetStatus returns pool's height, numPending requests and the number of
// requesters.
func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequesters int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.height, atomic.LoadInt32(&pool.numPending), len(pool.requesters)
}
// IsCaughtUp returns true if this node is caught up, false - otherwise.
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.Lock()
defer pool.mtx.Unlock()
// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
pool.Logger.Debug("Blockpool has no peers")
return false
}
// Some conditions to determine if we're caught up.
// Ensures we've either received a block or waited some amount of time,
// and that we're synced to the highest known height.
// Note we use maxPeerHeight - 1 because to sync block H requires block H+1
// to verify the LastCommit.
receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1)
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
return isCaughtUp
}
// We need to see the second block's Commit to validate the first block.
// So we peek two blocks at a time.
// The caller will verify the commit.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
if r := pool.requesters[pool.height]; r != nil {
first = r.getBlock()
}
if r := pool.requesters[pool.height+1]; r != nil {
second = r.getBlock()
}
return
}
// Pop the first block at pool.height
// It must have been validated by 'second'.Commit from PeekTwoBlocks().
func (pool *BlockPool) PopRequest() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
if r := pool.requesters[pool.height]; r != nil {
/* The block can disappear at any time, due to removePeer().
if r := pool.requesters[pool.height]; r == nil || r.block == nil {
PanicSanity("PopRequest() requires a valid block")
}
*/
r.Stop()
delete(pool.requesters, pool.height)
pool.height++
} else {
panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height))
}
}
// Invalidates the block at pool.height,
// Remove the peer and redo request from others.
// Returns the ID of the removed peer.
func (pool *BlockPool) RedoRequest(height int64) p2p.ID {
pool.mtx.Lock()
defer pool.mtx.Unlock()
request := pool.requesters[height]
peerID := request.getPeerID()
if peerID != p2p.ID("") {
// RemovePeer will redo all requesters associated with this peer.
pool.removePeer(peerID)
}
return peerID
}
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
requester := pool.requesters[block.Height]
if requester == nil {
pool.Logger.Info("peer sent us a block we didn't expect", "peer", peerID, "curHeight", pool.height, "blockHeight", block.Height)
diff := pool.height - block.Height
if diff < 0 {
diff *= -1
}
if diff > maxDiffBetweenCurrentAndReceivedBlockHeight {
pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID)
}
return
}
if requester.setBlock(block, peerID) {
atomic.AddInt32(&pool.numPending, -1)
peer := pool.peers[peerID]
if peer != nil {
peer.decrPending(blockSize)
}
} else {
pool.Logger.Info("invalid peer", "peer", peerID, "blockHeight", block.Height)
pool.sendError(errors.New("invalid peer"), peerID)
}
}
// MaxPeerHeight returns the highest reported height.
func (pool *BlockPool) MaxPeerHeight() int64 {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.maxPeerHeight
}
// SetPeerHeight sets the peer's alleged blockchain height.
func (pool *BlockPool) SetPeerHeight(peerID p2p.ID, height int64) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
peer := pool.peers[peerID]
if peer != nil {
peer.height = height
} else {
peer = newBPPeer(pool, peerID, height)
peer.setLogger(pool.Logger.With("peer", peerID))
pool.peers[peerID] = peer
}
if height > pool.maxPeerHeight {
pool.maxPeerHeight = height
}
}
// RemovePeer removes the peer with peerID from the pool. If there's no peer
// with peerID, function is a no-op.
func (pool *BlockPool) RemovePeer(peerID p2p.ID) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
pool.removePeer(peerID)
}
func (pool *BlockPool) removePeer(peerID p2p.ID) {
for _, requester := range pool.requesters {
if requester.getPeerID() == peerID {
requester.redo(peerID)
}
}
peer, ok := pool.peers[peerID]
if ok {
if peer.timeout != nil {
peer.timeout.Stop()
}
delete(pool.peers, peerID)
// Find a new peer with the biggest height and update maxPeerHeight if the
// peer's height was the biggest.
if peer.height == pool.maxPeerHeight {
pool.updateMaxPeerHeight()
}
}
}
// If no peers are left, maxPeerHeight is set to 0.
func (pool *BlockPool) updateMaxPeerHeight() {
var max int64
for _, peer := range pool.peers {
if peer.height > max {
max = peer.height
}
}
pool.maxPeerHeight = max
}
// Pick an available peer with at least the given minHeight.
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(minHeight int64) *bpPeer {
pool.mtx.Lock()
defer pool.mtx.Unlock()
for _, peer := range pool.peers {
if peer.didTimeout {
pool.removePeer(peer.id)
continue
}
if peer.numPending >= maxPendingRequestsPerPeer {
continue
}
if peer.height < minHeight {
continue
}
peer.incrPending()
return peer
}
return nil
}
func (pool *BlockPool) makeNextRequester() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
nextHeight := pool.height + pool.requestersLen()
if nextHeight > pool.maxPeerHeight {
return
}
request := newBPRequester(pool, nextHeight)
pool.requesters[nextHeight] = request
atomic.AddInt32(&pool.numPending, 1)
err := request.Start()
if err != nil {
request.Logger.Error("Error starting request", "err", err)
}
}
func (pool *BlockPool) requestersLen() int64 {
return int64(len(pool.requesters))
}
func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) {
if !pool.IsRunning() {
return
}
pool.requestsCh <- BlockRequest{height, peerID}
}
func (pool *BlockPool) sendError(err error, peerID p2p.ID) {
if !pool.IsRunning() {
return
}
pool.errorsCh <- peerError{err, peerID}
}
// for debugging purposes
//nolint:unused
func (pool *BlockPool) debug() string {
pool.mtx.Lock()
defer pool.mtx.Unlock()
str := ""
nextHeight := pool.height + pool.requestersLen()
for h := pool.height; h < nextHeight; h++ {
if pool.requesters[h] == nil {
str += fmt.Sprintf("H(%v):X ", h)
} else {
str += fmt.Sprintf("H(%v):", h)
str += fmt.Sprintf("B?(%v) ", pool.requesters[h].block != nil)
}
}
return str
}
//-------------------------------------
type bpPeer struct {
pool *BlockPool
id p2p.ID
recvMonitor *flow.Monitor
height int64
numPending int32
timeout *time.Timer
didTimeout bool
logger log.Logger
}
func newBPPeer(pool *BlockPool, peerID p2p.ID, height int64) *bpPeer {
peer := &bpPeer{
pool: pool,
id: peerID,
height: height,
numPending: 0,
logger: log.NewNopLogger(),
}
return peer
}
func (peer *bpPeer) setLogger(l log.Logger) {
peer.logger = l
}
func (peer *bpPeer) resetMonitor() {
peer.recvMonitor = flow.New(time.Second, time.Second*40)
initialValue := float64(minRecvRate) * math.E
peer.recvMonitor.SetREMA(initialValue)
}
func (peer *bpPeer) resetTimeout() {
if peer.timeout == nil {
peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout)
} else {
peer.timeout.Reset(peerTimeout)
}
}
func (peer *bpPeer) incrPending() {
if peer.numPending == 0 {
peer.resetMonitor()
peer.resetTimeout()
}
peer.numPending++
}
func (peer *bpPeer) decrPending(recvSize int) {
peer.numPending--
if peer.numPending == 0 {
peer.timeout.Stop()
} else {
peer.recvMonitor.Update(recvSize)
peer.resetTimeout()
}
}
func (peer *bpPeer) onTimeout() {
peer.pool.mtx.Lock()
defer peer.pool.mtx.Unlock()
err := errors.New("peer did not send us anything")
peer.pool.sendError(err, peer.id)
peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout)
peer.didTimeout = true
}
//-------------------------------------
type bpRequester struct {
cmn.BaseService
pool *BlockPool
height int64
gotBlockCh chan struct{}
redoCh chan p2p.ID //redo may send multitime, add peerId to identify repeat
mtx sync.Mutex
peerID p2p.ID
block *types.Block
}
func newBPRequester(pool *BlockPool, height int64) *bpRequester {
bpr := &bpRequester{
pool: pool,
height: height,
gotBlockCh: make(chan struct{}, 1),
redoCh: make(chan p2p.ID, 1),
peerID: "",
block: nil,
}
bpr.BaseService = *cmn.NewBaseService(nil, "bpRequester", bpr)
return bpr
}
func (bpr *bpRequester) OnStart() error {
go bpr.requestRoutine()
return nil
}
// Returns true if the peer matches and block doesn't already exist.
func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
bpr.mtx.Lock()
if bpr.block != nil || bpr.peerID != peerID {
bpr.mtx.Unlock()
return false
}
bpr.block = block
bpr.mtx.Unlock()
select {
case bpr.gotBlockCh <- struct{}{}:
default:
}
return true
}
func (bpr *bpRequester) getBlock() *types.Block {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.block
}
func (bpr *bpRequester) getPeerID() p2p.ID {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.peerID
}
// This is called from the requestRoutine, upon redo().
func (bpr *bpRequester) reset() {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
if bpr.block != nil {
atomic.AddInt32(&bpr.pool.numPending, 1)
}
bpr.peerID = ""
bpr.block = nil
}
// Tells bpRequester to pick another peer and try again.
// NOTE: Nonblocking, and does nothing if another redo
// was already requested.
func (bpr *bpRequester) redo(peerId p2p.ID) {
select {
case bpr.redoCh <- peerId:
default:
}
}
// Responsible for making more requests as necessary
// Returns only when a block is found (e.g. AddBlock() is called)
func (bpr *bpRequester) requestRoutine() {
OUTER_LOOP:
for {
// Pick a peer to send request to.
var peer *bpPeer
PICK_PEER_LOOP:
for {
if !bpr.IsRunning() || !bpr.pool.IsRunning() {
return
}
peer = bpr.pool.pickIncrAvailablePeer(bpr.height)
if peer == nil {
//log.Info("No peers available", "height", height)
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_PEER_LOOP
}
break PICK_PEER_LOOP
}
bpr.mtx.Lock()
bpr.peerID = peer.id
bpr.mtx.Unlock()
// Send request and wait.
bpr.pool.sendRequest(bpr.height, peer.id)
WAIT_LOOP:
for {
select {
case <-bpr.pool.Quit():
bpr.Stop()
return
case <-bpr.Quit():
return
case peerID := <-bpr.redoCh:
if peerID == bpr.peerID {
bpr.reset()
continue OUTER_LOOP
} else {
continue WAIT_LOOP
}
case <-bpr.gotBlockCh:
// We got a block!
// Continue the for-loop and wait til Quit.
continue WAIT_LOOP
}
}
}
}
//-------------------------------------
type BlockRequest struct {
Height int64
PeerID p2p.ID
}

View File

@ -1,222 +0,0 @@
package blockchain_old
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
func init() {
peerTimeout = 2 * time.Second
}
type testPeer struct {
id p2p.ID
height int64
inputChan chan inputData //make sure each peer's data is sequential
}
type inputData struct {
t *testing.T
pool *BlockPool
request BlockRequest
}
func (p testPeer) runInputRoutine() {
go func() {
for input := range p.inputChan {
p.simulateInput(input)
}
}()
}
// Request desired, pretend like we got the block immediately.
func (p testPeer) simulateInput(input inputData) {
block := &types.Block{Header: types.Header{Height: input.request.Height}}
input.pool.AddBlock(input.request.PeerID, block, 123)
input.t.Logf("Added block from peer %v (height: %v)", input.request.PeerID, input.request.Height)
}
type testPeers map[p2p.ID]testPeer
func (ps testPeers) start() {
for _, v := range ps {
v.runInputRoutine()
}
}
func (ps testPeers) stop() {
for _, v := range ps {
close(v.inputChan)
}
}
func makePeers(numPeers int, minHeight, maxHeight int64) testPeers {
peers := make(testPeers, numPeers)
for i := 0; i < numPeers; i++ {
peerID := p2p.ID(cmn.RandStr(12))
height := minHeight + cmn.RandInt63n(maxHeight-minHeight)
peers[peerID] = testPeer{peerID, height, make(chan inputData, 10)}
}
return peers
}
func TestBlockPoolBasic(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
err := pool.Start()
if err != nil {
t.Error(err)
}
defer pool.Stop()
peers.start()
defer peers.stop()
// Introduce each peer.
go func() {
for _, peer := range peers {
pool.SetPeerHeight(peer.id, peer.height)
}
}()
// Start a goroutine to pull blocks
go func() {
for {
if !pool.IsRunning() {
return
}
first, second := pool.PeekTwoBlocks()
if first != nil && second != nil {
pool.PopRequest()
} else {
time.Sleep(1 * time.Second)
}
}
}()
// Pull from channels
for {
select {
case err := <-errorsCh:
t.Error(err)
case request := <-requestsCh:
t.Logf("Pulled new BlockRequest %v", request)
if request.Height == 300 {
return // Done!
}
peers[request.PeerID].inputChan <- inputData{t, pool, request}
}
}
}
func TestBlockPoolTimeout(t *testing.T) {
start := int64(42)
peers := makePeers(10, start+1, 1000)
errorsCh := make(chan peerError, 1000)
requestsCh := make(chan BlockRequest, 1000)
pool := NewBlockPool(start, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
err := pool.Start()
if err != nil {
t.Error(err)
}
defer pool.Stop()
for _, peer := range peers {
t.Logf("Peer %v", peer.id)
}
// Introduce each peer.
go func() {
for _, peer := range peers {
pool.SetPeerHeight(peer.id, peer.height)
}
}()
// Start a goroutine to pull blocks
go func() {
for {
if !pool.IsRunning() {
return
}
first, second := pool.PeekTwoBlocks()
if first != nil && second != nil {
pool.PopRequest()
} else {
time.Sleep(1 * time.Second)
}
}
}()
// Pull from channels
counter := 0
timedOut := map[p2p.ID]struct{}{}
for {
select {
case err := <-errorsCh:
t.Log(err)
// consider error to be always timeout here
if _, ok := timedOut[err.peerID]; !ok {
counter++
if counter == len(peers) {
return // Done!
}
}
case request := <-requestsCh:
t.Logf("Pulled new BlockRequest %+v", request)
}
}
}
func TestBlockPoolRemovePeer(t *testing.T) {
peers := make(testPeers, 10)
for i := 0; i < 10; i++ {
peerID := p2p.ID(fmt.Sprintf("%d", i+1))
height := int64(i + 1)
peers[peerID] = testPeer{peerID, height, make(chan inputData)}
}
requestsCh := make(chan BlockRequest)
errorsCh := make(chan peerError)
pool := NewBlockPool(1, requestsCh, errorsCh)
pool.SetLogger(log.TestingLogger())
err := pool.Start()
require.NoError(t, err)
defer pool.Stop()
// add peers
for peerID, peer := range peers {
pool.SetPeerHeight(peerID, peer.height)
}
assert.EqualValues(t, 10, pool.MaxPeerHeight())
// remove not-existing peer
assert.NotPanics(t, func() { pool.RemovePeer(p2p.ID("Superman")) })
// remove peer with biggest height
pool.RemovePeer(p2p.ID("10"))
assert.EqualValues(t, 9, pool.MaxPeerHeight())
// remove all peers
for peerID := range peers {
pool.RemovePeer(peerID)
}
assert.EqualValues(t, 0, pool.MaxPeerHeight())
}

View File

@ -1,474 +0,0 @@
package blockchain_old
import (
"errors"
"fmt"
"reflect"
"time"
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)
const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40)
trySyncIntervalMS = 10
// stop syncing when last block's time is
// within this much of the system time.
// stopSyncingDurationMinutes = 10
// ask for best height every 10s
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1
// NOTE: keep up to date with bcBlockResponseMessage
bcBlockResponseMessagePrefixSize = 4
bcBlockResponseMessageFieldKeySize = 1
maxMsgSize = types.MaxBlockSizeBytes +
bcBlockResponseMessagePrefixSize +
bcBlockResponseMessageFieldKeySize
)
type consensusReactor interface {
// for when we switch from blockchain reactor and fast sync to
// the consensus machine
SwitchToConsensus(sm.State, int)
}
type peerError struct {
err error
peerID p2p.ID
}
func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}
// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
p2p.BaseReactor
// immutable
initialState sm.State
blockExec *sm.BlockExecutor
store *BlockStore
pool *BlockPool
fastSync bool
requestsCh <-chan BlockRequest
errorsCh <-chan peerError
}
// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore,
fastSync bool) *BlockchainReactor {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height()))
}
requestsCh := make(chan BlockRequest, maxTotalRequesters)
const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
pool := NewBlockPool(
store.Height()+1,
requestsCh,
errorsCh,
)
bcR := &BlockchainReactor{
initialState: state,
blockExec: blockExec,
store: store,
pool: pool,
fastSync: fastSync,
requestsCh: requestsCh,
errorsCh: errorsCh,
}
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR
}
// SetLogger implements cmn.Service by setting the logger on reactor and pool.
func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
bcR.BaseService.Logger = l
bcR.pool.Logger = l
}
// OnStart implements cmn.Service.
func (bcR *BlockchainReactor) OnStart() error {
if bcR.fastSync {
err := bcR.pool.Start()
if err != nil {
return err
}
go bcR.poolRoutine()
}
return nil
}
// OnStop implements cmn.Service.
func (bcR *BlockchainReactor) OnStop() {
bcR.pool.Stop()
}
// GetChannels implements Reactor
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: BlockchainChannel,
Priority: 10,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: maxMsgSize,
},
}
}
// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()})
if !peer.Send(BlockchainChannel, msgBytes) {
// doing nothing, will try later in `poolRoutine`
}
// peer is added to the pool once we receive the first
// bcStatusResponseMessage from the peer and call pool.SetPeerHeight
}
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
bcR.pool.RemovePeer(peer.ID())
}
// respondToPeer loads a block and sends it to the requesting peer,
// if we have it. Otherwise, we'll respond saying we don't have it.
// According to the Tendermint spec, if all nodes are honest,
// no node should be requesting for a block that's non-existent.
func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage,
src p2p.Peer) (queued bool) {
block := bcR.store.LoadBlock(msg.Height)
if block != nil {
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block})
return src.TrySend(BlockchainChannel, msgBytes)
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
msgBytes := cdc.MustMarshalBinaryBare(&bcNoBlockResponseMessage{Height: msg.Height})
return src.TrySend(BlockchainChannel, msgBytes)
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
bcR.Switch.StopPeerForError(src, err)
return
}
if err = msg.ValidateBasic(); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
switch msg := msg.(type) {
case *bcBlockRequestMessage:
if queued := bcR.respondToPeer(msg, src); !queued {
// Unfortunately not queued since the queue is full.
}
case *bcBlockResponseMessage:
bcR.pool.AddBlock(src.ID(), msg.Block, len(msgBytes))
case *bcStatusRequestMessage:
// Send peer our state.
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()})
queued := src.TrySend(BlockchainChannel, msgBytes)
if !queued {
// sorry
}
case *bcStatusResponseMessage:
// Got a peer status. Unverified.
bcR.pool.SetPeerHeight(src.ID(), msg.Height)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *BlockchainReactor) poolRoutine() {
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
blocksSynced := 0
chainID := bcR.initialState.ChainID
state := bcR.initialState
lastHundred := time.Now()
lastRate := 0.0
didProcessCh := make(chan struct{}, 1)
FOR_LOOP:
for {
select {
case request := <-bcR.requestsCh:
peer := bcR.Switch.Peers().Get(request.PeerID)
if peer == nil {
continue FOR_LOOP // Peer has since been disconnected.
}
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockRequestMessage{request.Height})
queued := peer.TrySend(BlockchainChannel, msgBytes)
if !queued {
// We couldn't make the request, send-queue full.
// The pool handles timeouts, just let it go.
continue FOR_LOOP
}
case err := <-bcR.errorsCh:
peer := bcR.Switch.Peers().Get(err.peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, err)
}
case <-statusUpdateTicker.C:
// ask for status updates
go bcR.BroadcastStatusRequest() // nolint: errcheck
case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers()
bcR.Logger.Debug("Consensus ticker", "numPending", numPending, "total", lenRequesters,
"outbound", outbound, "inbound", inbound)
if bcR.pool.IsCaughtUp() {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
bcR.pool.Stop()
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(state, blocksSynced)
} else {
// should only happen during testing
}
break FOR_LOOP
}
case <-trySyncTicker.C: // chan time
select {
case didProcessCh <- struct{}{}:
default:
}
case <-didProcessCh:
// NOTE: It is a subtle mistake to process more than a single block
// at a time (e.g. 10) here, because we only TrySend 1 request per
// loop. The ratio mismatch can result in starving of blocks, a
// sudden burst of requests and responses, and repeat.
// Consequently, it is better to split these routines rather than
// coupling them as it's written here. TODO uncouple from request
// routine.
// See if there are any blocks to sync.
first, second := bcR.pool.PeekTwoBlocks()
//bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
if first == nil || second == nil {
// We need both to sync the first block.
continue FOR_LOOP
} else {
// Try again quickly next loop.
didProcessCh <- struct{}{}
}
firstParts := first.MakePartSet(types.BlockPartSizeBytes)
firstPartsHeader := firstParts.Header()
firstID := types.BlockID{Hash: first.Hash(), PartsHeader: firstPartsHeader}
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
err := state.Validators.VerifyCommit(
chainID, firstID, first.Height, second.LastCommit)
if err != nil {
bcR.Logger.Error("Error in validation", "err", err)
peerID := bcR.pool.RedoRequest(first.Height)
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
// NOTE: we've already removed the peer's request, but we
// still need to clean up the rest.
bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err))
}
peerID2 := bcR.pool.RedoRequest(second.Height)
peer2 := bcR.Switch.Peers().Get(peerID2)
if peer2 != nil && peer2 != peer {
// NOTE: we've already removed the peer's request, but we
// still need to clean up the rest.
bcR.Switch.StopPeerForError(peer2, fmt.Errorf("BlockchainReactor validation error: %v", err))
}
continue FOR_LOOP
} else {
bcR.pool.PopRequest()
// TODO: batch saves so we dont persist to disk every block
bcR.store.SaveBlock(first, firstParts, second.LastCommit)
// TODO: same thing for app - but we would need a way to
// get the hash without persisting the state
var err error
state, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
blocksSynced++
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
lastHundred = time.Now()
}
}
continue FOR_LOOP
case <-bcR.Quit():
break FOR_LOOP
}
}
}
// BroadcastStatusRequest broadcasts `BlockStore` height.
func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.store.Height()})
bcR.Switch.Broadcast(BlockchainChannel, msgBytes)
return nil
}
//-----------------------------------------------------------------------------
// Messages
// BlockchainMessage is a generic message for this reactor.
type BlockchainMessage interface {
ValidateBasic() error
}
func RegisterBlockchainMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*BlockchainMessage)(nil), nil)
cdc.RegisterConcrete(&bcBlockRequestMessage{}, "tendermint/blockchain/BlockRequest", nil)
cdc.RegisterConcrete(&bcBlockResponseMessage{}, "tendermint/blockchain/BlockResponse", nil)
cdc.RegisterConcrete(&bcNoBlockResponseMessage{}, "tendermint/blockchain/NoBlockResponse", nil)
cdc.RegisterConcrete(&bcStatusResponseMessage{}, "tendermint/blockchain/StatusResponse", nil)
cdc.RegisterConcrete(&bcStatusRequestMessage{}, "tendermint/blockchain/StatusRequest", nil)
}
func decodeMsg(bz []byte) (msg BlockchainMessage, err error) {
if len(bz) > maxMsgSize {
return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
}
err = cdc.UnmarshalBinaryBare(bz, &msg)
return
}
//-------------------------------------
type bcBlockRequestMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcBlockRequestMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("Negative Height")
}
return nil
}
func (m *bcBlockRequestMessage) String() string {
return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
}
type bcNoBlockResponseMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcNoBlockResponseMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("Negative Height")
}
return nil
}
func (brm *bcNoBlockResponseMessage) String() string {
return fmt.Sprintf("[bcNoBlockResponseMessage %d]", brm.Height)
}
//-------------------------------------
type bcBlockResponseMessage struct {
Block *types.Block
}
// ValidateBasic performs basic validation.
func (m *bcBlockResponseMessage) ValidateBasic() error {
return m.Block.ValidateBasic()
}
func (m *bcBlockResponseMessage) String() string {
return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
}
//-------------------------------------
type bcStatusRequestMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcStatusRequestMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("Negative Height")
}
return nil
}
func (m *bcStatusRequestMessage) String() string {
return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
}
//-------------------------------------
type bcStatusResponseMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcStatusResponseMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("Negative Height")
}
return nil
}
func (m *bcStatusResponseMessage) String() string {
return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
}

View File

@ -1,416 +0,0 @@
package blockchain_old
import (
"fmt"
"os"
"sort"
"testing"
"time"
"github.com/stretchr/testify/assert"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
)
var config *cfg.Config
func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) {
validators := make([]types.GenesisValidator, numValidators)
privValidators := make([]types.PrivValidator, numValidators)
for i := 0; i < numValidators; i++ {
val, privVal := types.RandValidator(randPower, minPower)
validators[i] = types.GenesisValidator{
PubKey: val.PubKey,
Power: val.VotingPower,
}
privValidators[i] = privVal
}
sort.Sort(types.PrivValidatorsByAddress(privValidators))
return &types.GenesisDoc{
GenesisTime: tmtime.Now(),
ChainID: config.ChainID(),
Validators: validators,
}, privValidators
}
func makeVote(header *types.Header, blockID types.BlockID, valset *types.ValidatorSet, privVal types.PrivValidator) *types.Vote {
addr := privVal.GetPubKey().Address()
idx, _ := valset.GetByAddress(addr)
vote := &types.Vote{
ValidatorAddress: addr,
ValidatorIndex: idx,
Height: header.Height,
Round: 1,
Timestamp: tmtime.Now(),
Type: types.PrecommitType,
BlockID: blockID,
}
privVal.SignVote(header.ChainID, vote)
return vote
}
type BlockchainReactorPair struct {
reactor *BlockchainReactor
app proxy.AppConns
}
func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair {
if len(privVals) != 1 {
panic("only support one validator")
}
app := &testApp{}
cc := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start()
if err != nil {
panic(cmn.ErrorWrap(err, "error start app"))
}
blockDB := dbm.NewMemDB()
stateDB := dbm.NewMemDB()
blockStore := NewBlockStore(blockDB)
state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
if err != nil {
panic(cmn.ErrorWrap(err, "error constructing state from genesis file"))
}
// Make the BlockchainReactor itself.
// NOTE we have to create and commit the blocks first because
// pool.height is determined from the store.
fastSync := true
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(),
sm.MockMempool{}, sm.MockEvidencePool{})
// let's add some blocks in
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
lastCommit := types.NewCommit(types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)
vote := makeVote(&lastBlock.Header, lastBlockMeta.BlockID, state.Validators, privVals[0]).CommitSig()
lastCommit = types.NewCommit(lastBlockMeta.BlockID, []*types.CommitSig{vote})
}
thisBlock := makeBlock(blockHeight, state, lastCommit)
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartsHeader: thisParts.Header()}
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
panic(cmn.ErrorWrap(err, "error apply block"))
}
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
return BlockchainReactorPair{bcReactor, proxyApp}
}
func TestFastSyncNoBlockResponse(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(500)
reactorPairs := make([]BlockchainReactorPair, 2)
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
defer func() {
for _, r := range reactorPairs {
r.reactor.Stop()
r.app.Stop()
}
}()
tests := []struct {
height int64
existent bool
}{
{maxBlockHeight + 2, false},
{10, true},
{1, true},
{maxBlockHeight + 100, false},
}
for {
if reactorPairs[1].reactor.pool.IsCaughtUp() {
break
}
time.Sleep(10 * time.Millisecond)
}
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.store.Height())
for _, tt := range tests {
block := reactorPairs[1].reactor.store.LoadBlock(tt.height)
if tt.existent {
assert.True(t, block != nil)
} else {
assert.True(t, block == nil)
}
}
}
// NOTE: This is too hard to test without
// an easy way to add test peer to switch
// or without significant refactoring of the module.
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
func TestFastSyncBadBlockStopsPeer(t *testing.T) {
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(148)
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
defer func() {
otherChain.reactor.Stop()
otherChain.app.Stop()
}()
reactorPairs := make([]BlockchainReactorPair, 4)
reactorPairs[0] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[2] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[3] = newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
defer func() {
for _, r := range reactorPairs {
r.reactor.Stop()
r.app.Stop()
}
}()
for {
if reactorPairs[3].reactor.pool.IsCaughtUp() {
break
}
time.Sleep(1 * time.Second)
}
//at this time, reactors[0-3] is the newest
assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size())
//mark reactorPairs[3] is an invalid peer
reactorPairs[3].reactor.store = otherChain.reactor.store
lastReactorPair := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s
}, p2p.Connect2Switches)...)
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
for {
if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break
}
time.Sleep(1 * time.Second)
}
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
assert.Equal(t, lastReactorPair.reactor.pool.maxPeerHeight, lastReactorPair.reactor.pool.height)
}
func setupReactors(
numReactors int, maxBlockHeight int64,
genDoc *types.GenesisDoc, privVals []types.PrivValidator) ([]BlockchainReactorPair, []*p2p.Switch) {
defer os.RemoveAll(config.RootDir)
reactorPairs := make([]BlockchainReactorPair, numReactors)
var logger = make([]log.Logger, numReactors)
for i := 0; i < numReactors; i++ {
logger[i] = log.TestingLogger()
height := int64(0)
if i == 0 {
height = maxBlockHeight
}
reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height)
}
switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
for i := 0; i < numReactors; i++ {
addr := reactorPairs[i].reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19]))
}
return reactorPairs, switches
}
func TestFastSyncMultiNode(t *testing.T) {
numNodes := 8
maxHeight := int64(1000)
//numNodes := 20
//maxHeight := int64(10000)
config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30)
start := time.Now()
reactorPairs, switches := setupReactors(numNodes, maxHeight, genDoc, privVals)
defer func() {
for _, r := range reactorPairs {
_ = r.reactor.Stop()
_ = r.app.Stop()
}
}()
outerFor:
for {
time.Sleep(1 * time.Second)
i := 0
for i < numNodes {
if !reactorPairs[i].reactor.pool.IsCaughtUp() {
break
}
i++
}
if i == numNodes {
fmt.Println("SETUP FAST SYNC Duration", time.Since(start))
break outerFor
} else {
}
}
//at this time, reactors[0-3] are the newest
assert.Equal(t, numNodes-1, reactorPairs[0].reactor.Switch.Peers().Size())
lastLogger := log.TestingLogger()
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s
}, p2p.Connect2Switches)...)
addr := lastReactorPair.reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19]))
start = time.Now()
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
for {
time.Sleep(1 * time.Second)
if lastReactorPair.reactor.pool.IsCaughtUp() {
fmt.Println("FAST SYNC Duration", time.Since(start))
break
}
}
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs))
assert.Equal(t, lastReactorPair.reactor.pool.maxPeerHeight, lastReactorPair.reactor.pool.height)
}
//----------------------------------------------
// utility funcs
func makeTxs(height int64) (txs []types.Tx) {
for i := 0; i < 10; i++ {
txs = append(txs, types.Tx([]byte{byte(height), byte(i)}))
}
return txs
}
func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address)
return block
}
type testApp struct {
abci.BaseApplication
}
var _ abci.Application = (*testApp)(nil)
func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) {
return abci.ResponseInfo{}
}
func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
return abci.ResponseBeginBlock{}
}
func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
return abci.ResponseEndBlock{}
}
func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}}
}
func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx {
return abci.ResponseCheckTx{}
}
func (app *testApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{}
}
func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) {
return
}

View File

@ -1,247 +0,0 @@
package blockchain_old
import (
"fmt"
"sync"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/types"
)
/*
BlockStore is a simple low level store for blocks.
There are three types of information stored:
- BlockMeta: Meta information about each block
- Block part: Parts of each block, aggregated w/ PartSet
- Commit: The commit part of each block, for gossiping precommit votes
Currently the precommit signatures are duplicated in the Block parts as
well as the Commit. In the future this may change, perhaps by moving
the Commit data outside the Block. (TODO)
// NOTE: BlockStore methods will panic if they encounter errors
// deserializing loaded data, indicating probable corruption on disk.
*/
type BlockStore struct {
db dbm.DB
mtx sync.RWMutex
height int64
}
// NewBlockStore returns a new BlockStore with the given DB,
// initialized to the last height that was committed to the DB.
func NewBlockStore(db dbm.DB) *BlockStore {
bsjson := LoadBlockStoreStateJSON(db)
return &BlockStore{
height: bsjson.Height,
db: db,
}
}
// Height returns the last known contiguous block height.
func (bs *BlockStore) Height() int64 {
bs.mtx.RLock()
defer bs.mtx.RUnlock()
return bs.height
}
// LoadBlock returns the block with the given height.
// If no block is found for that height, it returns nil.
func (bs *BlockStore) LoadBlock(height int64) *types.Block {
var blockMeta = bs.LoadBlockMeta(height)
if blockMeta == nil {
return nil
}
var block = new(types.Block)
buf := []byte{}
for i := 0; i < blockMeta.BlockID.PartsHeader.Total; i++ {
part := bs.LoadBlockPart(height, i)
buf = append(buf, part.Bytes...)
}
err := cdc.UnmarshalBinaryLengthPrefixed(buf, block)
if err != nil {
// NOTE: The existence of meta should imply the existence of the
// block. So, make sure meta is only saved after blocks are saved.
panic(cmn.ErrorWrap(err, "Error reading block"))
}
return block
}
// LoadBlockPart returns the Part at the given index
// from the block at the given height.
// If no part is found for the given height and index, it returns nil.
func (bs *BlockStore) LoadBlockPart(height int64, index int) *types.Part {
var part = new(types.Part)
bz := bs.db.Get(calcBlockPartKey(height, index))
if len(bz) == 0 {
return nil
}
err := cdc.UnmarshalBinaryBare(bz, part)
if err != nil {
panic(cmn.ErrorWrap(err, "Error reading block part"))
}
return part
}
// LoadBlockMeta returns the BlockMeta for the given height.
// If no block is found for the given height, it returns nil.
func (bs *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
var blockMeta = new(types.BlockMeta)
bz := bs.db.Get(calcBlockMetaKey(height))
if len(bz) == 0 {
return nil
}
err := cdc.UnmarshalBinaryBare(bz, blockMeta)
if err != nil {
panic(cmn.ErrorWrap(err, "Error reading block meta"))
}
return blockMeta
}
// LoadBlockCommit returns the Commit for the given height.
// This commit consists of the +2/3 and other Precommit-votes for block at `height`,
// and it comes from the block.LastCommit for `height+1`.
// If no commit is found for the given height, it returns nil.
func (bs *BlockStore) LoadBlockCommit(height int64) *types.Commit {
var commit = new(types.Commit)
bz := bs.db.Get(calcBlockCommitKey(height))
if len(bz) == 0 {
return nil
}
err := cdc.UnmarshalBinaryBare(bz, commit)
if err != nil {
panic(cmn.ErrorWrap(err, "Error reading block commit"))
}
return commit
}
// LoadSeenCommit returns the locally seen Commit for the given height.
// This is useful when we've seen a commit, but there has not yet been
// a new block at `height + 1` that includes this commit in its block.LastCommit.
func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit {
var commit = new(types.Commit)
bz := bs.db.Get(calcSeenCommitKey(height))
if len(bz) == 0 {
return nil
}
err := cdc.UnmarshalBinaryBare(bz, commit)
if err != nil {
panic(cmn.ErrorWrap(err, "Error reading block seen commit"))
}
return commit
}
// SaveBlock persists the given block, blockParts, and seenCommit to the underlying db.
// blockParts: Must be parts of the block
// seenCommit: The +2/3 precommits that were seen which committed at height.
// If all the nodes restart after committing a block,
// we need this to reload the precommits to catch-up nodes to the
// most recent height. Otherwise they'd stall at H-1.
func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
if block == nil {
cmn.PanicSanity("BlockStore can only save a non-nil block")
}
height := block.Height
if g, w := height, bs.Height()+1; g != w {
cmn.PanicSanity(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", w, g))
}
if !blockParts.IsComplete() {
cmn.PanicSanity(fmt.Sprintf("BlockStore can only save complete block part sets"))
}
// Save block meta
blockMeta := types.NewBlockMeta(block, blockParts)
metaBytes := cdc.MustMarshalBinaryBare(blockMeta)
bs.db.Set(calcBlockMetaKey(height), metaBytes)
// Save block parts
for i := 0; i < blockParts.Total(); i++ {
part := blockParts.GetPart(i)
bs.saveBlockPart(height, i, part)
}
// Save block commit (duplicate and separate from the Block)
blockCommitBytes := cdc.MustMarshalBinaryBare(block.LastCommit)
bs.db.Set(calcBlockCommitKey(height-1), blockCommitBytes)
// Save seen commit (seen +2/3 precommits for block)
// NOTE: we can delete this at a later height
seenCommitBytes := cdc.MustMarshalBinaryBare(seenCommit)
bs.db.Set(calcSeenCommitKey(height), seenCommitBytes)
// Save new BlockStoreStateJSON descriptor
BlockStoreStateJSON{Height: height}.Save(bs.db)
// Done!
bs.mtx.Lock()
bs.height = height
bs.mtx.Unlock()
// Flush
bs.db.SetSync(nil, nil)
}
func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) {
if height != bs.Height()+1 {
cmn.PanicSanity(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height))
}
partBytes := cdc.MustMarshalBinaryBare(part)
bs.db.Set(calcBlockPartKey(height, index), partBytes)
}
//-----------------------------------------------------------------------------
func calcBlockMetaKey(height int64) []byte {
return []byte(fmt.Sprintf("H:%v", height))
}
func calcBlockPartKey(height int64, partIndex int) []byte {
return []byte(fmt.Sprintf("P:%v:%v", height, partIndex))
}
func calcBlockCommitKey(height int64) []byte {
return []byte(fmt.Sprintf("C:%v", height))
}
func calcSeenCommitKey(height int64) []byte {
return []byte(fmt.Sprintf("SC:%v", height))
}
//-----------------------------------------------------------------------------
var blockStoreKey = []byte("blockStore")
type BlockStoreStateJSON struct {
Height int64 `json:"height"`
}
// Save persists the blockStore state to the database as JSON.
func (bsj BlockStoreStateJSON) Save(db dbm.DB) {
bytes, err := cdc.MarshalJSON(bsj)
if err != nil {
cmn.PanicSanity(fmt.Sprintf("Could not marshal state bytes: %v", err))
}
db.SetSync(blockStoreKey, bytes)
}
// LoadBlockStoreStateJSON returns the BlockStoreStateJSON as loaded from disk.
// If no BlockStoreStateJSON was previously persisted, it returns the zero value.
func LoadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON {
bytes := db.Get(blockStoreKey)
if len(bytes) == 0 {
return BlockStoreStateJSON{
Height: 0,
}
}
bsj := BlockStoreStateJSON{}
err := cdc.UnmarshalJSON(bytes, &bsj)
if err != nil {
panic(fmt.Sprintf("Could not unmarshal bytes: %X", bytes))
}
return bsj
}

View File

@ -1,421 +0,0 @@
package blockchain_old
import (
"bytes"
"fmt"
"os"
"runtime/debug"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/db"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
)
// A cleanupFunc cleans up any config / test files created for a particular
// test.
type cleanupFunc func()
// make a Commit with a single vote containing just the height and a timestamp
func makeTestCommit(height int64, timestamp time.Time) *types.Commit {
commitSigs := []*types.CommitSig{{Height: height, Timestamp: timestamp}}
return types.NewCommit(types.BlockID{}, commitSigs)
}
func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore, cleanupFunc) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
// blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB())
// stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB())
blockDB := dbm.NewMemDB()
stateDB := dbm.NewMemDB()
state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile())
if err != nil {
panic(cmn.ErrorWrap(err, "error constructing state from genesis file"))
}
return state, NewBlockStore(blockDB), func() { os.RemoveAll(config.RootDir) }
}
func TestLoadBlockStoreStateJSON(t *testing.T) {
db := db.NewMemDB()
bsj := &BlockStoreStateJSON{Height: 1000}
bsj.Save(db)
retrBSJ := LoadBlockStoreStateJSON(db)
assert.Equal(t, *bsj, retrBSJ, "expected the retrieved DBs to match")
}
func TestNewBlockStore(t *testing.T) {
db := db.NewMemDB()
db.Set(blockStoreKey, []byte(`{"height": "10000"}`))
bs := NewBlockStore(db)
require.Equal(t, int64(10000), bs.Height(), "failed to properly parse blockstore")
panicCausers := []struct {
data []byte
wantErr string
}{
{[]byte("artful-doger"), "not unmarshal bytes"},
{[]byte(" "), "unmarshal bytes"},
}
for i, tt := range panicCausers {
// Expecting a panic here on trying to parse an invalid blockStore
_, _, panicErr := doFn(func() (interface{}, error) {
db.Set(blockStoreKey, tt.data)
_ = NewBlockStore(db)
return nil, nil
})
require.NotNil(t, panicErr, "#%d panicCauser: %q expected a panic", i, tt.data)
assert.Contains(t, fmt.Sprintf("%#v", panicErr), tt.wantErr, "#%d data: %q", i, tt.data)
}
db.Set(blockStoreKey, nil)
bs = NewBlockStore(db)
assert.Equal(t, bs.Height(), int64(0), "expecting nil bytes to be unmarshaled alright")
}
func freshBlockStore() (*BlockStore, db.DB) {
db := db.NewMemDB()
return NewBlockStore(db), db
}
var (
state sm.State
block *types.Block
partSet *types.PartSet
part1 *types.Part
part2 *types.Part
seenCommit1 *types.Commit
)
func TestMain(m *testing.M) {
var cleanup cleanupFunc
state, _, cleanup = makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer)))
block = makeBlock(1, state, new(types.Commit))
partSet = block.MakePartSet(2)
part1 = partSet.GetPart(0)
part2 = partSet.GetPart(1)
seenCommit1 = makeTestCommit(10, tmtime.Now())
code := m.Run()
cleanup()
os.Exit(code)
}
// TODO: This test should be simplified ...
func TestBlockStoreSaveLoadBlock(t *testing.T) {
state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer)))
defer cleanup()
require.Equal(t, bs.Height(), int64(0), "initially the height should be zero")
// check there are no blocks at various heights
noBlockHeights := []int64{0, -1, 100, 1000, 2}
for i, height := range noBlockHeights {
if g := bs.LoadBlock(height); g != nil {
t.Errorf("#%d: height(%d) got a block; want nil", i, height)
}
}
// save a block
block := makeBlock(bs.Height()+1, state, new(types.Commit))
validPartSet := block.MakePartSet(2)
seenCommit := makeTestCommit(10, tmtime.Now())
bs.SaveBlock(block, partSet, seenCommit)
require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed")
incompletePartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 2})
uncontiguousPartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 0})
uncontiguousPartSet.AddPart(part2)
header1 := types.Header{
Height: 1,
NumTxs: 100,
ChainID: "block_test",
Time: tmtime.Now(),
}
header2 := header1
header2.Height = 4
// End of setup, test data
commitAtH10 := makeTestCommit(10, tmtime.Now())
tuples := []struct {
block *types.Block
parts *types.PartSet
seenCommit *types.Commit
wantErr bool
wantPanic string
corruptBlockInDB bool
corruptCommitInDB bool
corruptSeenCommitInDB bool
eraseCommitInDB bool
eraseSeenCommitInDB bool
}{
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
},
{
block: nil,
wantPanic: "only save a non-nil block",
},
{
block: newBlock(header2, commitAtH10),
parts: uncontiguousPartSet,
wantPanic: "only save contiguous blocks", // and incomplete and uncontiguous parts
},
{
block: newBlock(header1, commitAtH10),
parts: incompletePartSet,
wantPanic: "only save complete block", // incomplete parts
},
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
corruptCommitInDB: true, // Corrupt the DB's commit entry
wantPanic: "unmarshal to types.Commit failed",
},
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
wantPanic: "unmarshal to types.BlockMeta failed",
corruptBlockInDB: true, // Corrupt the DB's block entry
},
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
// Expecting no error and we want a nil back
eraseSeenCommitInDB: true,
},
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
corruptSeenCommitInDB: true,
wantPanic: "unmarshal to types.Commit failed",
},
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
// Expecting no error and we want a nil back
eraseCommitInDB: true,
},
}
type quad struct {
block *types.Block
commit *types.Commit
meta *types.BlockMeta
seenCommit *types.Commit
}
for i, tuple := range tuples {
bs, db := freshBlockStore()
// SaveBlock
res, err, panicErr := doFn(func() (interface{}, error) {
bs.SaveBlock(tuple.block, tuple.parts, tuple.seenCommit)
if tuple.block == nil {
return nil, nil
}
if tuple.corruptBlockInDB {
db.Set(calcBlockMetaKey(tuple.block.Height), []byte("block-bogus"))
}
bBlock := bs.LoadBlock(tuple.block.Height)
bBlockMeta := bs.LoadBlockMeta(tuple.block.Height)
if tuple.eraseSeenCommitInDB {
db.Delete(calcSeenCommitKey(tuple.block.Height))
}
if tuple.corruptSeenCommitInDB {
db.Set(calcSeenCommitKey(tuple.block.Height), []byte("bogus-seen-commit"))
}
bSeenCommit := bs.LoadSeenCommit(tuple.block.Height)
commitHeight := tuple.block.Height - 1
if tuple.eraseCommitInDB {
db.Delete(calcBlockCommitKey(commitHeight))
}
if tuple.corruptCommitInDB {
db.Set(calcBlockCommitKey(commitHeight), []byte("foo-bogus"))
}
bCommit := bs.LoadBlockCommit(commitHeight)
return &quad{block: bBlock, seenCommit: bSeenCommit, commit: bCommit,
meta: bBlockMeta}, nil
})
if subStr := tuple.wantPanic; subStr != "" {
if panicErr == nil {
t.Errorf("#%d: want a non-nil panic", i)
} else if got := fmt.Sprintf("%#v", panicErr); !strings.Contains(got, subStr) {
t.Errorf("#%d:\n\tgotErr: %q\nwant substring: %q", i, got, subStr)
}
continue
}
if tuple.wantErr {
if err == nil {
t.Errorf("#%d: got nil error", i)
}
continue
}
assert.Nil(t, panicErr, "#%d: unexpected panic", i)
assert.Nil(t, err, "#%d: expecting a non-nil error", i)
qua, ok := res.(*quad)
if !ok || qua == nil {
t.Errorf("#%d: got nil quad back; gotType=%T", i, res)
continue
}
if tuple.eraseSeenCommitInDB {
assert.Nil(t, qua.seenCommit,
"erased the seenCommit in the DB hence we should get back a nil seenCommit")
}
if tuple.eraseCommitInDB {
assert.Nil(t, qua.commit,
"erased the commit in the DB hence we should get back a nil commit")
}
}
}
func TestLoadBlockPart(t *testing.T) {
bs, db := freshBlockStore()
height, index := int64(10), 1
loadPart := func() (interface{}, error) {
part := bs.LoadBlockPart(height, index)
return part, nil
}
// Initially no contents.
// 1. Requesting for a non-existent block shouldn't fail
res, _, panicErr := doFn(loadPart)
require.Nil(t, panicErr, "a non-existent block part shouldn't cause a panic")
require.Nil(t, res, "a non-existent block part should return nil")
// 2. Next save a corrupted block then try to load it
db.Set(calcBlockPartKey(height, index), []byte("Tendermint"))
res, _, panicErr = doFn(loadPart)
require.NotNil(t, panicErr, "expecting a non-nil panic")
require.Contains(t, panicErr.Error(), "unmarshal to types.Part failed")
// 3. A good block serialized and saved to the DB should be retrievable
db.Set(calcBlockPartKey(height, index), cdc.MustMarshalBinaryBare(part1))
gotPart, _, panicErr := doFn(loadPart)
require.Nil(t, panicErr, "an existent and proper block should not panic")
require.Nil(t, res, "a properly saved block should return a proper block")
require.Equal(t, gotPart.(*types.Part), part1,
"expecting successful retrieval of previously saved block")
}
func TestLoadBlockMeta(t *testing.T) {
bs, db := freshBlockStore()
height := int64(10)
loadMeta := func() (interface{}, error) {
meta := bs.LoadBlockMeta(height)
return meta, nil
}
// Initially no contents.
// 1. Requesting for a non-existent blockMeta shouldn't fail
res, _, panicErr := doFn(loadMeta)
require.Nil(t, panicErr, "a non-existent blockMeta shouldn't cause a panic")
require.Nil(t, res, "a non-existent blockMeta should return nil")
// 2. Next save a corrupted blockMeta then try to load it
db.Set(calcBlockMetaKey(height), []byte("Tendermint-Meta"))
res, _, panicErr = doFn(loadMeta)
require.NotNil(t, panicErr, "expecting a non-nil panic")
require.Contains(t, panicErr.Error(), "unmarshal to types.BlockMeta")
// 3. A good blockMeta serialized and saved to the DB should be retrievable
meta := &types.BlockMeta{}
db.Set(calcBlockMetaKey(height), cdc.MustMarshalBinaryBare(meta))
gotMeta, _, panicErr := doFn(loadMeta)
require.Nil(t, panicErr, "an existent and proper block should not panic")
require.Nil(t, res, "a properly saved blockMeta should return a proper blocMeta ")
require.Equal(t, cdc.MustMarshalBinaryBare(meta), cdc.MustMarshalBinaryBare(gotMeta),
"expecting successful retrieval of previously saved blockMeta")
}
func TestBlockFetchAtHeight(t *testing.T) {
state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer)))
defer cleanup()
require.Equal(t, bs.Height(), int64(0), "initially the height should be zero")
block := makeBlock(bs.Height()+1, state, new(types.Commit))
partSet := block.MakePartSet(2)
seenCommit := makeTestCommit(10, tmtime.Now())
bs.SaveBlock(block, partSet, seenCommit)
require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed")
blockAtHeight := bs.LoadBlock(bs.Height())
bz1 := cdc.MustMarshalBinaryBare(block)
bz2 := cdc.MustMarshalBinaryBare(blockAtHeight)
require.Equal(t, bz1, bz2)
require.Equal(t, block.Hash(), blockAtHeight.Hash(),
"expecting a successful load of the last saved block")
blockAtHeightPlus1 := bs.LoadBlock(bs.Height() + 1)
require.Nil(t, blockAtHeightPlus1, "expecting an unsuccessful load of Height()+1")
blockAtHeightPlus2 := bs.LoadBlock(bs.Height() + 2)
require.Nil(t, blockAtHeightPlus2, "expecting an unsuccessful load of Height()+2")
}
func doFn(fn func() (interface{}, error)) (res interface{}, err error, panicErr error) {
defer func() {
if r := recover(); r != nil {
switch e := r.(type) {
case error:
panicErr = e
case string:
panicErr = fmt.Errorf("%s", e)
default:
if st, ok := r.(fmt.Stringer); ok {
panicErr = fmt.Errorf("%s", st)
} else {
panicErr = fmt.Errorf("%s", debug.Stack())
}
}
}
}()
res, err = fn()
return res, err, panicErr
}
func newBlock(hdr types.Header, lastCommit *types.Commit) *types.Block {
return &types.Block{
Header: hdr,
LastCommit: lastCommit,
}
}

View File

@ -1,13 +0,0 @@
package blockchain_old
import (
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/types"
)
var cdc = amino.NewCodec()
func init() {
RegisterBlockchainMessages(cdc)
types.RegisterBlockAmino(cdc)
}