Initial commit

This commit is contained in:
Anca Zamfir 2019-03-26 09:58:30 +01:00
parent eb1573c683
commit 6428cba446
9 changed files with 2827 additions and 0 deletions

129
blockchain_new/peer.go Normal file
View File

@ -0,0 +1,129 @@
package blockchain_new
import (
"fmt"
"math"
"time"
flow "github.com/tendermint/tendermint/libs/flowrate"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
)
//--------
// Peer
var (
peerTimeout = 15 * time.Second // not const so we can override with tests
// Minimum recv rate to ensure we're receiving blocks from a peer fast
// enough. If a peer is not sending us data at at least that rate, we
// consider them to have timedout and we disconnect.
//
// Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s,
// sending data across atlantic ~ 7.5 KB/s.
minRecvRate = int64(7680)
// Monitor parameters
peerSampleRate = time.Second
peerWindowSize = 40 * peerSampleRate
)
type bpPeer struct {
id p2p.ID
recvMonitor *flow.Monitor
height int64
numPending int32
timeout *time.Timer
didTimeout bool
logger log.Logger
errFunc func(err error, peerID p2p.ID) // function to call on error
}
func newBPPeer(
peerID p2p.ID, height int64, errFunc func(err error, peerID p2p.ID)) *bpPeer {
peer := &bpPeer{
id: peerID,
height: height,
numPending: 0,
logger: log.NewNopLogger(),
errFunc: errFunc,
}
return peer
}
func (peer *bpPeer) setLogger(l log.Logger) {
peer.logger = l
}
func (peer *bpPeer) resetMonitor() {
peer.recvMonitor = flow.New(peerSampleRate, peerWindowSize)
initialValue := float64(minRecvRate) * math.E
peer.recvMonitor.SetREMA(initialValue)
}
func (peer *bpPeer) resetTimeout() {
if peer.timeout == nil {
peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout)
} else {
peer.timeout.Reset(peerTimeout)
}
}
func (peer *bpPeer) incrPending() {
if peer.numPending == 0 {
peer.resetMonitor()
peer.resetTimeout()
}
peer.numPending++
}
func (peer *bpPeer) decrPending(recvSize int) {
if peer.numPending == 0 {
panic("cannot decrement, peer does not have pending requests")
}
peer.numPending--
if peer.numPending == 0 {
peer.timeout.Stop()
} else {
peer.recvMonitor.Update(recvSize)
peer.resetTimeout()
}
}
func (peer *bpPeer) onTimeout() {
peer.errFunc(errNoPeerResponse, peer.id)
peer.logger.Error("SendTimeout", "reason", errNoPeerResponse, "timeout", peerTimeout)
peer.didTimeout = true
}
func (peer *bpPeer) isPeerGood() error {
if peer.didTimeout {
return errNoPeerResponse
}
if !peer.didTimeout && peer.numPending > 0 {
curRate := peer.recvMonitor.Status().CurRate
// curRate can be 0 on start
if curRate != 0 && curRate < minRecvRate {
err := errSlowPeer
peer.logger.Error("SendTimeout", "peer", peer.id,
"reason", err,
"curRate", fmt.Sprintf("%d KB/s", curRate/1024),
"minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024))
// consider the peer timedout
peer.didTimeout = true
return errSlowPeer
}
}
return nil
}
func (peer *bpPeer) cleanup() {
if peer.timeout != nil {
peer.timeout.Stop()
}
}

224
blockchain_new/peer_test.go Normal file
View File

@ -0,0 +1,224 @@
package blockchain_new
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
)
var (
numErrFuncCalls int
lastErr error
testLog = log.TestingLogger()
mtx sync.Mutex
)
func resetErrors() {
numErrFuncCalls = 0
lastErr = nil
}
func errFunc(err error, peerID p2p.ID) {
_ = peerID
lastErr = err
numErrFuncCalls++
}
// check if peer timer is running or not (a running timer can be successfully stopped)
// Note: it does stop the timer!
func checkByStoppingPeerTimer(t *testing.T, peer *bpPeer, running bool) {
assert.NotPanics(t, func() {
stopped := peer.timeout.Stop()
if running {
assert.True(t, stopped)
} else {
assert.False(t, stopped)
}
})
}
func TestPeerResetMonitor(t *testing.T) {
peer := &bpPeer{
id: p2p.ID(cmn.RandStr(12)),
height: 10,
logger: testLog,
errFunc: errFunc,
}
peer.resetMonitor()
assert.NotNil(t, peer.recvMonitor)
}
func TestPeerTimer(t *testing.T) {
peerTimeout = 2 * time.Millisecond
peer := &bpPeer{
id: p2p.ID(cmn.RandStr(12)),
height: 10,
logger: testLog,
errFunc: errFunc,
}
assert.Nil(t, peer.timeout)
// initial reset call with peer having a nil timer
peer.resetTimeout()
assert.NotNil(t, peer.timeout)
// make sure timer is running and stop it
checkByStoppingPeerTimer(t, peer, true)
// reset with non nil expired timer
peer.resetTimeout()
assert.NotNil(t, peer.timeout)
// make sure timer is running and stop it
checkByStoppingPeerTimer(t, peer, true)
resetErrors()
// reset with running timer (started above)
time.Sleep(time.Millisecond)
peer.resetTimeout()
assert.NotNil(t, peer.timeout)
// let the timer expire and ...
time.Sleep(3 * time.Millisecond)
checkByStoppingPeerTimer(t, peer, false)
// ... check an error has been sent, error is peerNonResponsive
assert.Equal(t, 1, numErrFuncCalls)
assert.Equal(t, lastErr, errNoPeerResponse)
assert.True(t, peer.didTimeout)
}
func TestIncrPending(t *testing.T) {
peerTimeout = 2 * time.Millisecond
peer := &bpPeer{
id: p2p.ID(cmn.RandStr(12)),
height: 10,
logger: testLog,
errFunc: errFunc,
}
peer.incrPending()
assert.NotNil(t, peer.recvMonitor)
assert.NotNil(t, peer.timeout)
assert.Equal(t, int32(1), peer.numPending)
peer.incrPending()
assert.NotNil(t, peer.recvMonitor)
assert.NotNil(t, peer.timeout)
assert.Equal(t, int32(2), peer.numPending)
}
func TestDecrPending(t *testing.T) {
peerTimeout = 2 * time.Millisecond
peer := &bpPeer{
id: p2p.ID(cmn.RandStr(12)),
height: 10,
logger: testLog,
errFunc: errFunc,
}
// panic if numPending is 0 and try to decrement it
assert.Panics(t, func() { peer.decrPending(10) })
// decrement to zero
peer.incrPending()
peer.decrPending(10)
assert.Equal(t, int32(0), peer.numPending)
// make sure timer is not running
checkByStoppingPeerTimer(t, peer, false)
// decrement to non zero
peer.incrPending()
peer.incrPending()
peer.decrPending(10)
assert.Equal(t, int32(1), peer.numPending)
// make sure timer is running and stop it
checkByStoppingPeerTimer(t, peer, true)
}
func TestCanBeRemovedDueToExpiration(t *testing.T) {
minRecvRate = int64(100) // 100 bytes/sec exponential moving average
peer := &bpPeer{
id: p2p.ID(cmn.RandStr(12)),
height: 10,
errFunc: errFunc,
logger: testLog,
}
peerTimeout = time.Millisecond
peer.incrPending()
time.Sleep(2 * time.Millisecond)
// timer expired, should be able to remove peer
assert.Equal(t, errNoPeerResponse, peer.isPeerGood())
}
func TestCanBeRemovedDueToLowSpeed(t *testing.T) {
minRecvRate = int64(100) // 100 bytes/sec exponential moving average
peer := &bpPeer{
id: p2p.ID(cmn.RandStr(12)),
height: 10,
errFunc: errFunc,
logger: testLog,
}
peerTimeout = time.Second
peerSampleRate = 0
peerWindowSize = 0
peer.incrPending()
peer.numPending = 100
// monitor starts with a higher rEMA (~ 2*minRecvRate), wait for it to go down
time.Sleep(900 * time.Millisecond)
// normal peer - send a bit more than 100 byes/sec, > 10 byes/100msec, check peer is not considered slow
for i := 0; i < 10; i++ {
peer.decrPending(11)
time.Sleep(100 * time.Millisecond)
require.Nil(t, peer.isPeerGood())
}
// slow peer - send a bit less than 10 byes/100msec
for i := 0; i < 10; i++ {
peer.decrPending(9)
time.Sleep(100 * time.Millisecond)
}
// check peer is considered slow
assert.Equal(t, errSlowPeer, peer.isPeerGood())
}
func TestCleanupPeer(t *testing.T) {
peer := &bpPeer{
id: p2p.ID(cmn.RandStr(12)),
height: 10,
logger: testLog,
errFunc: errFunc,
}
peerTimeout = 2 * time.Millisecond
assert.Nil(t, peer.timeout)
// initial reset call with peer having a nil timer
peer.resetTimeout()
assert.NotNil(t, peer.timeout)
mtx.Lock()
peer.cleanup()
mtx.Unlock()
checkByStoppingPeerTimer(t, peer, false)
}

445
blockchain_new/reactor.go Normal file
View File

@ -0,0 +1,445 @@
package blockchain_new
import (
"errors"
"fmt"
"github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"reflect"
"time"
)
const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40)
maxTotalMessages = 1000
// NOTE: keep up to date with bcBlockResponseMessage
bcBlockResponseMessagePrefixSize = 4
bcBlockResponseMessageFieldKeySize = 1
maxMsgSize = types.MaxBlockSizeBytes +
bcBlockResponseMessagePrefixSize +
bcBlockResponseMessageFieldKeySize
)
type consensusReactor interface {
// for when we switch from blockchain reactor and fast sync to
// the consensus machine
SwitchToConsensus(sm.State, int)
}
type peerError struct {
err error
peerID p2p.ID
}
func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}
type bReactorMsgFromFSM uint
// message types
const (
// message type events
errorMsg = iota + 1
//blockRequestMsg
//statusRequestMsg
)
type msgFromFSM struct {
msgType bReactorMsgFromFSM
error peerError
}
// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
p2p.BaseReactor
// immutable
initialState sm.State
state sm.State
blockExec *sm.BlockExecutor
fastSync bool
fsm *bReactorFSM
blocksSynced int
lastHundred time.Time
lastRate float64
msgFromFSMCh chan msgFromFSM
}
// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore,
fastSync bool) *BlockchainReactor {
if state.LastBlockHeight != store.Height() {
panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight,
store.Height()))
}
bcR := &BlockchainReactor{
initialState: state,
state: state,
blockExec: blockExec,
fastSync: fastSync,
}
fsm := NewFSM(store, bcR)
bcR.fsm = fsm
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR
}
// SetLogger implements cmn.Service by setting the logger on reactor and pool.
func (bcR *BlockchainReactor) SetLogger(l log.Logger) {
bcR.BaseService.Logger = l
bcR.fsm.setLogger(l)
}
// OnStart implements cmn.Service.
func (bcR *BlockchainReactor) OnStart() error {
if bcR.fastSync {
bcR.fsm.start()
go bcR.poolRoutine()
}
return nil
}
// OnStop implements cmn.Service.
func (bcR *BlockchainReactor) OnStop() {
bcR.fsm.stop()
}
// GetChannels implements Reactor
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{
ID: BlockchainChannel,
Priority: 10,
SendQueueCapacity: 1000,
RecvBufferCapacity: 50 * 4096,
RecvMessageCapacity: maxMsgSize,
},
}
}
// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.fsm.store.Height()})
if !peer.Send(BlockchainChannel, msgBytes) {
// doing nothing, will try later in `poolRoutine`
}
// peer is added to the pool once we receive the first
// bcStatusResponseMessage from the peer and call pool.SetPeerHeight
}
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
bcR.fsm.RemovePeer(peer.ID())
}
// respondToPeer loads a block and sends it to the requesting peer,
// if we have it. Otherwise, we'll respond saying we don't have it.
// According to the Tendermint spec, if all nodes are honest,
// no node should be requesting for a block that's non-existent.
func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage,
src p2p.Peer) (queued bool) {
block := bcR.fsm.store.LoadBlock(msg.Height)
if block != nil {
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block})
return src.TrySend(BlockchainChannel, msgBytes)
}
bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height)
msgBytes := cdc.MustMarshalBinaryBare(&bcNoBlockResponseMessage{Height: msg.Height})
return src.TrySend(BlockchainChannel, msgBytes)
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
if err != nil {
bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
bcR.Switch.StopPeerForError(src, err)
return
}
if err = msg.ValidateBasic(); err != nil {
bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg)
switch msg := msg.(type) {
case *bcBlockRequestMessage:
if queued := bcR.respondToPeer(msg, src); !queued {
// Unfortunately not queued since the queue is full.
}
case *bcBlockResponseMessage:
msgData := bReactorMessageData{
event: blockResponseEv,
data: bReactorEventData{
peerId: src.ID(),
block: msg.Block,
length: len(msgBytes),
},
}
sendMessageToFSM(bcR.fsm, msgData)
case *bcStatusRequestMessage:
// Send peer our state.
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.fsm.store.Height()})
queued := src.TrySend(BlockchainChannel, msgBytes)
if !queued {
// sorry
}
case *bcStatusResponseMessage:
// Got a peer status. Unverified.
msgData := bReactorMessageData{
event: statusResponseEv,
data: bReactorEventData{
peerId: src.ID(),
height: msg.Height,
length: len(msgBytes),
},
}
sendMessageToFSM(bcR.fsm, msgData)
default:
bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}
func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Block) error {
chainID := bcR.initialState.ChainID
firstParts := first.MakePartSet(types.BlockPartSizeBytes)
firstPartsHeader := firstParts.Header()
firstID := types.BlockID{Hash: first.Hash(), PartsHeader: firstPartsHeader}
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
err := bcR.state.Validators.VerifyCommit(
chainID, firstID, first.Height, second.LastCommit)
if err != nil {
bcR.Logger.Error("Error in validation", "err", err, first.Height, second.Height)
peerID := bcR.fsm.blocks[first.Height].peerId
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err))
}
peerID = bcR.fsm.blocks[second.Height].peerId
peer = bcR.Switch.Peers().Get(peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err))
}
return errBlockVerificationFailure
}
bcR.fsm.store.SaveBlock(first, firstParts, second.LastCommit)
// get the hash without persisting the state
bcR.state, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
bcR.blocksSynced++
if bcR.blocksSynced%100 == 0 {
bcR.lastRate = 0.9*bcR.lastRate + 0.1*(100/time.Since(bcR.lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.fsm.height,
"max_peer_height", bcR.fsm.getMaxPeerHeight(), "blocks/s", bcR.lastRate)
bcR.lastHundred = time.Now()
}
return nil
}
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *BlockchainReactor) poolRoutine() {
for {
select {
case fromFSM := <-bcR.msgFromFSMCh:
switch fromFSM.msgType {
case errorMsg:
peer := bcR.Switch.Peers().Get(fromFSM.error.peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, fromFSM.error.err)
}
}
}
}
}
func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
bcR.Switch.StopPeerForError(peer, err)
}
}
func (bcR *BlockchainReactor) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) {
if timer == nil {
timer = time.AfterFunc(timeout, f)
} else {
timer.Reset(timeout)
}
}
// BroadcastStatusRequest broadcasts `BlockStore` height.
func (bcR *BlockchainReactor) sendStatusRequest() error {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.fsm.store.Height()})
bcR.Switch.Broadcast(BlockchainChannel, msgBytes)
return nil
}
// BlockRequest sends `BlockRequest` height.
func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
peer := bcR.Switch.Peers().Get(peerID)
if peer == nil {
return errNilPeerForBlockRequest
}
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockRequestMessage{height})
queued := peer.TrySend(BlockchainChannel, msgBytes)
if !queued {
return errSendQueueFull
}
return nil
}
func (bcR *BlockchainReactor) switchToConsensus() {
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(bcR.state, bcR.blocksSynced)
} else {
// should only happen during testing
}
}
//-----------------------------------------------------------------------------
// Messages
// BlockchainMessage is a generic message for this reactor.
type BlockchainMessage interface {
ValidateBasic() error
}
func RegisterBlockchainMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*BlockchainMessage)(nil), nil)
cdc.RegisterConcrete(&bcBlockRequestMessage{}, "tendermint/blockchain/BlockRequest", nil)
cdc.RegisterConcrete(&bcBlockResponseMessage{}, "tendermint/blockchain/BlockResponse", nil)
cdc.RegisterConcrete(&bcNoBlockResponseMessage{}, "tendermint/blockchain/NoBlockResponse", nil)
cdc.RegisterConcrete(&bcStatusResponseMessage{}, "tendermint/blockchain/StatusResponse", nil)
cdc.RegisterConcrete(&bcStatusRequestMessage{}, "tendermint/blockchain/StatusRequest", nil)
}
func decodeMsg(bz []byte) (msg BlockchainMessage, err error) {
if len(bz) > maxMsgSize {
return msg, fmt.Errorf("msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
}
err = cdc.UnmarshalBinaryBare(bz, &msg)
return
}
//-------------------------------------
type bcBlockRequestMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcBlockRequestMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
return nil
}
func (m *bcBlockRequestMessage) String() string {
return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height)
}
type bcNoBlockResponseMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcNoBlockResponseMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
return nil
}
func (m *bcNoBlockResponseMessage) String() string {
return fmt.Sprintf("[bcNoBlockResponseMessage %d]", m.Height)
}
//-------------------------------------
type bcBlockResponseMessage struct {
Block *types.Block
}
// ValidateBasic performs basic validation.
func (m *bcBlockResponseMessage) ValidateBasic() error {
return m.Block.ValidateBasic()
}
func (m *bcBlockResponseMessage) String() string {
return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height)
}
//-------------------------------------
type bcStatusRequestMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcStatusRequestMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
return nil
}
func (m *bcStatusRequestMessage) String() string {
return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height)
}
//-------------------------------------
type bcStatusResponseMessage struct {
Height int64
}
// ValidateBasic performs basic validation.
func (m *bcStatusResponseMessage) ValidateBasic() error {
if m.Height < 0 {
return errors.New("negative Height")
}
return nil
}
func (m *bcStatusResponseMessage) String() string {
return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height)
}

View File

@ -0,0 +1,721 @@
package blockchain_new
import (
"errors"
"fmt"
"time"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
var (
// should be >= 2
maxRequestBatchSize = 40
)
type blockData struct {
block *types.Block
peerId p2p.ID
}
// Blockchain Reactor State
type bReactorFSMState struct {
name string
// called when transitioning out of current state
handle func(*bReactorFSM, bReactorEvent, bReactorEventData) (next *bReactorFSMState, err error)
// called when entering the state
enter func(fsm *bReactorFSM)
// timer to ensure FSM is not stuck in a state forever
timer *time.Timer
timeout time.Duration
}
func (s *bReactorFSMState) String() string {
return s.name
}
// Blockchain Reactor State Machine
type bReactorFSM struct {
logger log.Logger
startTime time.Time
state *bReactorFSMState
blocks map[int64]*blockData
height int64 // processing height
lastRequestHeight int64
peers map[p2p.ID]*bpPeer
maxPeerHeight int64
store *BlockStore
// channel to receive messages
messageCh chan bReactorMessageData
// interface used to send StatusRequest, BlockRequest, errors
bcr sendMessage
}
// bReactorEventData is part of the message sent by the reactor to the FSM and used by the state handlers
type bReactorEventData struct {
peerId p2p.ID
err error // for peer error: timeout, slow,
height int64 // for status response
block *types.Block // for block response
stateName string // for state timeout events
length int // for block response to detect slow peers
}
// bReactorMessageData structure is used by the reactor when sending messages to the FSM.
type bReactorMessageData struct {
event bReactorEvent
data bReactorEventData
}
func (msg *bReactorMessageData) String() string {
var dataStr string
switch msg.event {
case startFSMEv:
dataStr = ""
case statusResponseEv:
dataStr = fmt.Sprintf("peer: %v height: %v", msg.data.peerId, msg.data.height)
case blockResponseEv:
dataStr = fmt.Sprintf("peer: %v block.height: %v lenght: %v", msg.data.peerId, msg.data.block.Height, msg.data.length)
case tryProcessBlockEv:
dataStr = ""
case stopFSMEv:
dataStr = ""
case peerErrEv:
dataStr = fmt.Sprintf("peer: %v err: %v", msg.data.peerId, msg.data.err)
case stateTimeoutEv:
dataStr = fmt.Sprintf("state: %v", msg.data.stateName)
default:
dataStr = fmt.Sprintf("cannot interpret message data")
return "event unknown"
}
return fmt.Sprintf("event: %v %v", msg.event, dataStr)
}
// Blockchain Reactor Events (the input to the state machine).
type bReactorEvent uint
const (
// message type events
startFSMEv = iota + 1
statusResponseEv
blockResponseEv
tryProcessBlockEv
stopFSMEv
// other events
peerErrEv = iota + 256
stateTimeoutEv
)
func (ev bReactorEvent) String() string {
switch ev {
case startFSMEv:
return "startFSMEv"
case statusResponseEv:
return "statusResponseEv"
case blockResponseEv:
return "blockResponseEv"
case tryProcessBlockEv:
return "tryProcessBlockEv"
case stopFSMEv:
return "stopFSMEv"
case peerErrEv:
return "peerErrEv"
case stateTimeoutEv:
return "stateTimeoutEv"
default:
return "event unknown"
}
}
// states
var (
unknown *bReactorFSMState
waitForPeer *bReactorFSMState
waitForBlock *bReactorFSMState
finished *bReactorFSMState
)
// state timers
var (
waitForPeerTimeout = 20 * time.Second
waitForBlockTimeout = 30 * time.Second // > peerTimeout which is 15 sec
)
// errors
var (
// errors
errInvalidEvent = errors.New("invalid event in current state")
errNoErrorFinished = errors.New("FSM is finished")
errNoPeerResponse = errors.New("FSM timed out on peer response")
errNoPeerFoundForRequest = errors.New("cannot use peer")
errBadDataFromPeer = errors.New("received from wrong peer or bad block")
errMissingBlocks = errors.New("missing blocks")
errBlockVerificationFailure = errors.New("block verification failure, redo")
errNilPeerForBlockRequest = errors.New("nil peer for block request")
errSendQueueFull = errors.New("block request not made, send-queue is full")
errPeerTooShort = errors.New("peer height too low, peer was either not added " +
"or removed after status update")
errSwitchPeerErr = errors.New("switch detected peer error")
errSlowPeer = errors.New("peer is not sending us data fast enough")
)
func init() {
unknown = &bReactorFSMState{
name: "unknown",
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) {
switch ev {
case startFSMEv:
// Broadcast Status message. Currently doesn't return non-nil error.
_ = fsm.bcr.sendStatusRequest()
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForPeer, nil
case stopFSMEv:
// cleanup
return finished, errNoErrorFinished
default:
return unknown, errInvalidEvent
}
},
}
waitForPeer = &bReactorFSMState{
name: "waitForPeer",
timeout: waitForPeerTimeout,
enter: func(fsm *bReactorFSM) {
// stop when leaving the state
fsm.resetStateTimer(waitForPeer)
},
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) {
switch ev {
case stateTimeoutEv:
// no statusResponse received from any peer
// Should we send status request again?
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return finished, errNoPeerResponse
case statusResponseEv:
// update peer
if err := fsm.setPeerHeight(data.peerId, data.height); err != nil {
if len(fsm.peers) == 0 {
return waitForPeer, err
}
}
// send first block requests
err := fsm.sendRequestBatch()
if err != nil {
// wait for more peers or state timeout
return waitForPeer, err
}
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, nil
case stopFSMEv:
// cleanup
return finished, errNoErrorFinished
default:
return waitForPeer, errInvalidEvent
}
},
}
waitForBlock = &bReactorFSMState{
name: "waitForBlock",
timeout: waitForBlockTimeout,
enter: func(fsm *bReactorFSM) {
// stop when leaving the state or receiving a block
fsm.resetStateTimer(waitForBlock)
},
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) {
switch ev {
case stateTimeoutEv:
// no blockResponse
// Should we send status request again? Switch to consensus?
// Note that any unresponsive peers have been already removed by their timer expiry handler.
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return finished, errNoPeerResponse
case statusResponseEv:
err := fsm.setPeerHeight(data.peerId, data.height)
return waitForBlock, err
case blockResponseEv:
// add block to fsm.blocks
fsm.logger.Info("blockResponseEv", "H", data.block.Height)
err := fsm.addBlock(data.peerId, data.block, data.length)
if err != nil {
// unsolicited, from different peer, already have it..
fsm.removePeer(data.peerId, err)
// ignore block
return waitForBlock, err
}
if fsm.shouldTryProcessBlock() {
fsm.logger.Info("shouldTryProcessBlock", "first", fsm.height, "second", fsm.height+1)
// try to process block at fsm.height with the help of block at fsm.height+1
fsm.sendSignalToProcessBlock()
}
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, nil
case tryProcessBlockEv:
if err := fsm.processBlock(); err != nil {
if err == errMissingBlocks {
// continue so we ask for more blocks
}
if err == errBlockVerificationFailure {
// remove peers that sent us those blocks, blocks will also be removed
first := fsm.blocks[fsm.height].peerId
fsm.removePeer(first, err)
second := fsm.blocks[fsm.height+1].peerId
fsm.removePeer(second, err)
}
} else {
delete(fsm.blocks, fsm.height)
fsm.height++
fsm.removeShortPeers()
// processed block, check if we are done
if fsm.height >= fsm.maxPeerHeight {
// TODO should we wait for more status responses in case a high peer is slow?
fsm.bcr.switchToConsensus()
return finished, nil
}
}
// get other block(s)
err := fsm.sendRequestBatch()
if err != nil {
// TBD on what to do here...
// wait for more peers or state timeout
}
if fsm.shouldTryProcessBlock() {
fsm.logger.Info("shouldTryProcessBlock", "first", fsm.height, "second", fsm.height+1)
// try to process block at fsm.height with the help of block at fsm.height+1
fsm.sendSignalToProcessBlock()
}
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, err
case peerErrEv:
fsm.removePeer(data.peerId, data.err)
err := fsm.sendRequestBatch()
if err != nil {
// TBD on what to do here...
// wait for more peers or state timeout
}
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, err
case stopFSMEv:
// cleanup
return finished, errNoErrorFinished
default:
return waitForBlock, errInvalidEvent
}
},
}
finished = &bReactorFSMState{
name: "finished",
enter: func(fsm *bReactorFSM) {
// cleanup
},
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) {
return nil, nil
},
}
}
func NewFSM(store *BlockStore, bcr sendMessage) *bReactorFSM {
messageCh := make(chan bReactorMessageData, maxTotalMessages)
return &bReactorFSM{
state: unknown,
store: store,
blocks: make(map[int64]*blockData),
peers: make(map[p2p.ID]*bpPeer),
height: store.Height() + 1,
bcr: bcr,
messageCh: messageCh,
}
}
func sendMessageToFSM(fsm *bReactorFSM, msg bReactorMessageData) {
fsm.logger.Info("send message to FSM", "msg", msg.String())
fsm.messageCh <- msg
}
func (fsm *bReactorFSM) setLogger(l log.Logger) {
fsm.logger = l
}
// starts the FSM go routine
func (fsm *bReactorFSM) start() {
go fsm.startRoutine()
fsm.startTime = time.Now()
}
// stops the FSM go routine
func (fsm *bReactorFSM) stop() {
msg := bReactorMessageData{
event: stopFSMEv,
}
sendMessageToFSM(fsm, msg)
}
// start the FSM
func (fsm *bReactorFSM) startRoutine() {
_ = fsm.handle(&bReactorMessageData{event: startFSMEv})
forLoop:
for {
select {
case msg := <-fsm.messageCh:
fsm.logger.Info("FSM Received message", "msg", msg.String())
_ = fsm.handle(&msg)
if msg.event == stopFSMEv {
break forLoop
}
// TODO - stop also on some errors returned by handle
default:
}
}
}
// handle processes messages and events sent to the FSM.
func (fsm *bReactorFSM) handle(msg *bReactorMessageData) error {
fsm.logger.Info("Blockchain reactor FSM received event", "event", msg.event, "state", fsm.state.name)
if fsm.state == nil {
fsm.state = unknown
}
next, err := fsm.state.handle(fsm, msg.event, msg.data)
if err != nil {
fsm.logger.Error("Blockchain reactor event handler returned", "err", err)
}
fsm.transition(next)
fsm.logger.Info("FSM new state", "state", fsm.state.name)
return err
}
func (fsm *bReactorFSM) transition(next *bReactorFSMState) {
if next == nil {
return
}
fsm.logger.Info("Blockchain reactor FSM changes state: ", "old", fsm.state.name, "new", next.name)
if fsm.state != next {
fsm.state = next
if next.enter != nil {
next.enter(fsm)
}
}
}
// Interface for sending Block and Status requests
// Implemented by BlockchainReactor and tests
type sendMessage interface {
sendStatusRequest() error
sendBlockRequest(peerID p2p.ID, height int64) error
sendPeerError(err error, peerID p2p.ID)
processBlocks(first *types.Block, second *types.Block) error
resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func())
switchToConsensus()
}
// FSM state timeout handler
func (fsm *bReactorFSM) sendStateTimeoutEvent(stateName string) {
// Check that the timeout is for the state we are currently in to prevent wrong transitions.
if stateName == fsm.state.name {
msg := bReactorMessageData{
event: stateTimeoutEv,
data: bReactorEventData{
stateName: stateName,
},
}
sendMessageToFSM(fsm, msg)
}
}
// This is called when entering an FSM state in order to detect lack of progress in the state machine.
// Note the use of the 'bcr' interface to facilitate testing without timer running.
func (fsm *bReactorFSM) resetStateTimer(state *bReactorFSMState) {
fsm.bcr.resetStateTimer(state.name, state.timer, state.timeout, func() {
fsm.sendStateTimeoutEvent(state.name)
})
}
// WIP
// TODO - pace the requests to peers
func (fsm *bReactorFSM) sendRequestBatch() error {
// remove slow and timed out peers
for _, peer := range fsm.peers {
if err := peer.isPeerGood(); err != nil {
fsm.logger.Info("Removing bad peer", "peer", peer.id, "err", err)
fsm.removePeer(peer.id, err)
}
}
var err error
// make requests
for i := 0; i < maxRequestBatchSize; i++ {
// request height
height := fsm.height + int64(i)
if height > fsm.maxPeerHeight {
return err
}
req := fsm.blocks[height]
if req == nil {
// make new request
err = fsm.sendRequest(height)
if err != nil {
// couldn't find a good peer or couldn't communicate with it
}
}
}
return nil
}
func (fsm *bReactorFSM) sendRequest(height int64) error {
// make requests
// TODO - sort peers in order of goodness
for _, peer := range fsm.peers {
// Send Block Request message to peer
fsm.logger.Info("Try to send request to peer", "peer", peer.id, "height", height)
err := fsm.bcr.sendBlockRequest(peer.id, height)
if err == errSendQueueFull {
fsm.logger.Info("cannot send request, queue full", "peer", peer.id, "height", height)
continue
}
if err == errNilPeerForBlockRequest {
// this peer does not exist in the switch, delete locally
fsm.logger.Info("Peer doesn't exist in the switch", "peer", peer.id)
fsm.deletePeer(peer.id)
continue
}
// reserve space for block
fsm.blocks[height] = &blockData{peerId: peer.id, block: nil}
fsm.peers[peer.id].incrPending()
return nil
}
return errNoPeerFoundForRequest
}
// Sets the peer's blockchain height.
func (fsm *bReactorFSM) setPeerHeight(peerID p2p.ID, height int64) error {
peer := fsm.peers[peerID]
if height < fsm.height {
fsm.logger.Info("Peer height too small", "peer", peerID, "height", height, "fsm_height", fsm.height)
// Don't add or update a peer that is not useful.
if peer != nil {
fsm.logger.Info("remove short peer", "peer", peerID, "height", height, "fsm_height", fsm.height)
fsm.removePeer(peerID, errPeerTooShort)
}
return errPeerTooShort
}
if peer == nil {
peer = newBPPeer(peerID, height, fsm.processPeerError)
peer.setLogger(fsm.logger.With("peer", peerID))
fsm.peers[peerID] = peer
} else {
// remove any requests made for heights in (height, peer.height]
for blockHeight, bData := range fsm.blocks {
if bData.peerId == peerID && blockHeight > height {
delete(fsm.blocks, blockHeight)
}
}
}
peer.height = height
if height > fsm.maxPeerHeight {
fsm.maxPeerHeight = height
}
return nil
}
func (fsm *bReactorFSM) getMaxPeerHeight() int64 {
return fsm.maxPeerHeight
}
// called from:
// - the switch from its go routing
// - when peer times out from the timer go routine.
// Send message to FSM
func (fsm *bReactorFSM) processPeerError(err error, peerID p2p.ID) {
msgData := bReactorMessageData{
event: peerErrEv,
data: bReactorEventData{
err: err,
peerId: peerID,
},
}
sendMessageToFSM(fsm, msgData)
}
// called by the switch on peer error
func (fsm *bReactorFSM) RemovePeer(peerID p2p.ID) {
fsm.logger.Info("Switch removes peer", "peer", peerID, "fsm_height", fsm.height)
fsm.processPeerError(errSwitchPeerErr, peerID)
}
// called every time FSM advances its height
func (fsm *bReactorFSM) removeShortPeers() {
for _, peer := range fsm.peers {
if peer.height < fsm.height {
fsm.logger.Info("removeShortPeers", "peer", peer.id)
fsm.removePeer(peer.id, nil)
}
}
}
// removes any blocks and requests associated with the peer, deletes the peer and informs the switch if needed.
func (fsm *bReactorFSM) removePeer(peerID p2p.ID, err error) {
fsm.logger.Info("removePeer", "peer", peerID, "err", err)
// remove all data for blocks waiting for the peer or not processed yet
for h, bData := range fsm.blocks {
if bData.peerId == peerID {
delete(fsm.blocks, h)
}
}
// delete peer
fsm.deletePeer(peerID)
// recompute maxPeerHeight
fsm.maxPeerHeight = 0
for _, peer := range fsm.peers {
if peer.height > fsm.maxPeerHeight {
fsm.maxPeerHeight = peer.height
}
}
// send error to switch if not coming from it
if err != nil && err != errSwitchPeerErr {
fsm.bcr.sendPeerError(err, peerID)
}
}
// stops the peer timer and deletes the peer
func (fsm *bReactorFSM) deletePeer(peerID p2p.ID) {
if p, exist := fsm.peers[peerID]; exist && p.timeout != nil {
p.timeout.Stop()
}
delete(fsm.peers, peerID)
}
// Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map.
func (fsm *bReactorFSM) addBlock(peerID p2p.ID, block *types.Block, blockSize int) error {
blockData := fsm.blocks[block.Height]
if blockData == nil {
fsm.logger.Error("peer sent us a block we didn't expect", "peer", peerID, "curHeight", fsm.height, "blockHeight", block.Height)
return errBadDataFromPeer
}
if blockData.peerId != peerID {
fsm.logger.Error("invalid peer", "peer", peerID, "blockHeight", block.Height)
return errBadDataFromPeer
}
if blockData.block != nil {
fsm.logger.Error("already have a block for height")
return errBadDataFromPeer
}
fsm.blocks[block.Height].block = block
peer := fsm.peers[peerID]
if peer != nil {
peer.decrPending(blockSize)
}
return nil
}
func (fsm *bReactorFSM) shouldTryProcessBlock() bool {
first := fsm.blocks[fsm.height]
second := fsm.blocks[fsm.height+1]
if first == nil || first.block == nil || second == nil || second.block == nil {
// We need both to sync the first block.
return false
}
return true
}
func (fsm *bReactorFSM) sendSignalToProcessBlock() {
// TODO - add check that this was sent before, currently there are extraneous tryProcessBlockEv
fsm.logger.Info("Send signal to process", "first", fsm.height, "second", fsm.height+1)
msgData := bReactorMessageData{
event: tryProcessBlockEv,
data: bReactorEventData{},
}
sendMessageToFSM(fsm, msgData)
}
// Processes block at height H = fsm.height. Expects both H and H+1 to be available
func (fsm *bReactorFSM) processBlock() error {
first := fsm.blocks[fsm.height]
second := fsm.blocks[fsm.height+1]
if first == nil || first.block == nil || second == nil || second.block == nil {
// We need both to sync the first block.
return errMissingBlocks
}
fsm.logger.Info("process blocks", "first", first.block.Height, "second", second.block.Height)
fsm.logger.Info("FSM blocks", "blocks", fsm.blocks)
if err := fsm.bcr.processBlocks(first.block, second.block); err != nil {
fsm.logger.Error("Process blocks returned error", "err", err, "first", first.block.Height, "second", second.block.Height)
fsm.logger.Error("FSM blocks", "blocks", fsm.blocks)
return err
}
return nil
}
func (fsm *bReactorFSM) IsFinished() bool {
return fsm.state == finished
}

View File

@ -0,0 +1,287 @@
package blockchain_new
import (
"github.com/stretchr/testify/assert"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
"testing"
"time"
)
var (
failSendStatusRequest bool
failSendBlockRequest bool
numStatusRequests int
numBlockRequests int
)
type lastBlockRequestT struct {
peerID p2p.ID
height int64
}
var lastBlockRequest lastBlockRequestT
type lastPeerErrorT struct {
peerID p2p.ID
err error
}
var lastPeerError lastPeerErrorT
var stateTimerStarts map[string]int
func resetTestValues() {
stateTimerStarts = make(map[string]int)
failSendBlockRequest = false
failSendStatusRequest = false
numStatusRequests = 0
numBlockRequests = 0
lastBlockRequest.peerID = ""
lastBlockRequest.height = 0
lastPeerError.peerID = ""
lastPeerError.err = nil
}
type fsmStepTestValues struct {
currentState string
event bReactorEvent
expectedState string
// input
failStatusReq bool
shouldSendStatusReq bool
failBlockReq bool
blockReqIncreased bool
data bReactorEventData
expectedLastBlockReq *lastBlockRequestT
}
// WIP
func TestFSMTransitionSequences(t *testing.T) {
maxRequestBatchSize = 2
fsmTransitionSequenceTests := [][]fsmStepTestValues{
{
{currentState: "unknown", event: startFSMEv, shouldSendStatusReq: true,
expectedState: "waitForPeer"},
{currentState: "waitForPeer", event: statusResponseEv,
data: bReactorEventData{peerId: "P1", height: 10},
blockReqIncreased: true,
expectedState: "waitForBlock"},
},
}
for _, tt := range fsmTransitionSequenceTests {
// Create and start the FSM
testBcR := &testReactor{logger: log.TestingLogger()}
blockDB := dbm.NewMemDB()
store := NewBlockStore(blockDB)
fsm := NewFSM(store, testBcR)
fsm.setLogger(log.TestingLogger())
resetTestValues()
// always start from unknown
fsm.resetStateTimer(unknown)
assert.Equal(t, 1, stateTimerStarts[unknown.name])
for _, step := range tt {
assert.Equal(t, step.currentState, fsm.state.name)
failSendStatusRequest = step.failStatusReq
failSendBlockRequest = step.failBlockReq
oldNumStatusRequests := numStatusRequests
oldNumBlockRequests := numBlockRequests
_ = sendEventToFSM(fsm, step.event, step.data)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
} else {
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
}
if step.blockReqIncreased {
assert.Equal(t, oldNumBlockRequests+maxRequestBatchSize, numBlockRequests)
} else {
assert.Equal(t, oldNumBlockRequests, numBlockRequests)
}
assert.Equal(t, step.expectedState, fsm.state.name)
}
}
}
func TestReactorFSMBasic(t *testing.T) {
maxRequestBatchSize = 2
resetTestValues()
// Create and start the FSM
testBcR := &testReactor{logger: log.TestingLogger()}
blockDB := dbm.NewMemDB()
store := NewBlockStore(blockDB)
fsm := NewFSM(store, testBcR)
fsm.setLogger(log.TestingLogger())
if err := fsm.handle(&bReactorMessageData{event: startFSMEv}); err != nil {
}
// Check that FSM sends a status request message
assert.Equal(t, 1, numStatusRequests)
assert.Equal(t, waitForPeer.name, fsm.state.name)
// Send a status response message to FSM
peerID := p2p.ID(cmn.RandStr(12))
sendStatusResponse2(fsm, peerID, 10)
// Check that FSM sends a block request message and...
assert.Equal(t, maxRequestBatchSize, numBlockRequests)
// ... the block request has the expected height
assert.Equal(t, int64(maxRequestBatchSize), lastBlockRequest.height)
assert.Equal(t, waitForBlock.name, fsm.state.name)
}
func TestReactorFSMPeerTimeout(t *testing.T) {
maxRequestBatchSize = 2
resetTestValues()
peerTimeout = 20 * time.Millisecond
// Create and start the FSM
testBcR := &testReactor{logger: log.TestingLogger()}
blockDB := dbm.NewMemDB()
store := NewBlockStore(blockDB)
fsm := NewFSM(store, testBcR)
fsm.setLogger(log.TestingLogger())
fsm.start()
// Check that FSM sends a status request message
time.Sleep(time.Millisecond)
assert.Equal(t, 1, numStatusRequests)
// Send a status response message to FSM
peerID := p2p.ID(cmn.RandStr(12))
sendStatusResponse(fsm, peerID, 10)
time.Sleep(5 * time.Millisecond)
// Check that FSM sends a block request message and...
assert.Equal(t, maxRequestBatchSize, numBlockRequests)
// ... the block request has the expected height and peer
assert.Equal(t, int64(maxRequestBatchSize), lastBlockRequest.height)
assert.Equal(t, peerID, lastBlockRequest.peerID)
// let FSM timeout on the block response message
time.Sleep(100 * time.Millisecond)
}
// reactor for FSM testing
type testReactor struct {
logger log.Logger
fsm *bReactorFSM
testCh chan bReactorMessageData
}
func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error {
return fsm.handle(&bReactorMessageData{event: ev, data: data})
}
// ----------------------------------------
// implementation for the test reactor APIs
func (testR *testReactor) sendPeerError(err error, peerID p2p.ID) {
testR.logger.Info("Reactor received sendPeerError call from FSM", "peer", peerID, "err", err)
lastPeerError.peerID = peerID
lastPeerError.err = err
}
func (testR *testReactor) sendStatusRequest() error {
testR.logger.Info("Reactor received sendStatusRequest call from FSM")
numStatusRequests++
if failSendStatusRequest {
return errSendQueueFull
}
return nil
}
func (testR *testReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
testR.logger.Info("Reactor received sendBlockRequest call from FSM", "peer", peerID, "height", height)
numBlockRequests++
lastBlockRequest.peerID = peerID
lastBlockRequest.height = height
return nil
}
func (testR *testReactor) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) {
testR.logger.Info("Reactor received resetStateTimer call from FSM", "state", name, "timeout", timeout)
if _, ok := stateTimerStarts[name]; !ok {
stateTimerStarts[name] = 1
} else {
stateTimerStarts[name]++
}
}
func (testR *testReactor) processBlocks(first *types.Block, second *types.Block) error {
testR.logger.Info("Reactor received processBlocks call from FSM", "first", first.Height, "second", second.Height)
return nil
}
func (testR *testReactor) switchToConsensus() {
testR.logger.Info("Reactor received switchToConsensus call from FSM")
}
// ----------------------------------------
// -------------------------------------------------------
// helper functions for tests to simulate different events
func sendStatusResponse(fsm *bReactorFSM, peerID p2p.ID, height int64) {
msgBytes := makeStatusResponseMessage(height)
msgData := bReactorMessageData{
event: statusResponseEv,
data: bReactorEventData{
peerId: peerID,
height: height,
length: len(msgBytes),
},
}
sendMessageToFSM(fsm, msgData)
}
func sendStatusResponse2(fsm *bReactorFSM, peerID p2p.ID, height int64) {
msgBytes := makeStatusResponseMessage(height)
msgData := &bReactorMessageData{
event: statusResponseEv,
data: bReactorEventData{
peerId: peerID,
height: height,
length: len(msgBytes),
},
}
_ = fsm.handle(msgData)
}
func sendStateTimeout(fsm *bReactorFSM, name string) {
msgData := &bReactorMessageData{
event: stateTimeoutEv,
data: bReactorEventData{
stateName: name,
},
}
_ = fsm.handle(msgData)
}
// -------------------------------------------------------
// ----------------------------------------------------
// helper functions to make blockchain reactor messages
func makeStatusResponseMessage(height int64) []byte {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{height})
return msgBytes
}
// ----------------------------------------------------

View File

@ -0,0 +1,340 @@
package blockchain_new
import (
"fmt"
"os"
"sort"
"testing"
"time"
"github.com/stretchr/testify/assert"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
)
var config *cfg.Config
func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) {
validators := make([]types.GenesisValidator, numValidators)
privValidators := make([]types.PrivValidator, numValidators)
for i := 0; i < numValidators; i++ {
val, privVal := types.RandValidator(randPower, minPower)
validators[i] = types.GenesisValidator{
PubKey: val.PubKey,
Power: val.VotingPower,
}
privValidators[i] = privVal
}
sort.Sort(types.PrivValidatorsByAddress(privValidators))
return &types.GenesisDoc{
GenesisTime: tmtime.Now(),
ChainID: config.ChainID(),
Validators: validators,
}, privValidators
}
func makeVote(header *types.Header, blockID types.BlockID, valset *types.ValidatorSet, privVal types.PrivValidator) *types.Vote {
addr := privVal.GetPubKey().Address()
idx, _ := valset.GetByAddress(addr)
vote := &types.Vote{
ValidatorAddress: addr,
ValidatorIndex: idx,
Height: header.Height,
Round: 1,
Timestamp: tmtime.Now(),
Type: types.PrecommitType,
BlockID: blockID,
}
_ = privVal.SignVote(header.ChainID, vote)
return vote
}
type BlockchainReactorPair struct {
reactor *BlockchainReactor
app proxy.AppConns
}
func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair {
if len(privVals) != 1 {
panic("only support one validator")
}
app := &testApp{}
cc := proxy.NewLocalClientCreator(app)
proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start()
if err != nil {
panic(cmn.ErrorWrap(err, "error start app"))
}
blockDB := dbm.NewMemDB()
stateDB := dbm.NewMemDB()
blockStore := NewBlockStore(blockDB)
state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
if err != nil {
panic(cmn.ErrorWrap(err, "error constructing state from genesis file"))
}
// Make the BlockchainReactor itself.
// NOTE we have to create and commit the blocks first because
// pool.height is determined from the store.
fastSync := true
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(),
sm.MockMempool{}, sm.MockEvidencePool{})
// let's add some blocks in
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {
lastCommit := types.NewCommit(types.BlockID{}, nil)
if blockHeight > 1 {
lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1)
lastBlock := blockStore.LoadBlock(blockHeight - 1)
vote := makeVote(&lastBlock.Header, lastBlockMeta.BlockID, state.Validators, privVals[0]).CommitSig()
lastCommit = types.NewCommit(lastBlockMeta.BlockID, []*types.CommitSig{vote})
}
thisBlock := makeBlock(blockHeight, state, lastCommit)
thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes)
blockID := types.BlockID{Hash: thisBlock.Hash(), PartsHeader: thisParts.Header()}
state, err = blockExec.ApplyBlock(state, blockID, thisBlock)
if err != nil {
panic(cmn.ErrorWrap(err, "error apply block"))
}
blockStore.SaveBlock(thisBlock, thisParts, lastCommit)
}
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
/*
addr := bcReactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
bcReactor.SetLogger(logger.With("module", moduleName))
*/
bcReactor.SetLogger(logger.With("module", "blockchain"))
return BlockchainReactorPair{bcReactor, proxyApp}
}
func TestNoBlockResponse(t *testing.T) {
peerTimeout = 15 * time.Second
maxRequestBatchSize = 40
config = cfg.ResetTestRoot("blockchain_new_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(65)
reactorPairs := make([]BlockchainReactorPair, 2)
logger1 := log.TestingLogger()
reactorPairs[0] = newBlockchainReactor(logger1, genDoc, privVals, maxBlockHeight)
logger2 := log.TestingLogger()
reactorPairs[1] = newBlockchainReactor(logger2, genDoc, privVals, 0)
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
addr0 := reactorPairs[0].reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr0)
reactorPairs[0].reactor.SetLogger(logger1.With("module", moduleName[:19]))
addr1 := reactorPairs[1].reactor.Switch.NodeInfo().ID()
moduleName = fmt.Sprintf("blockchain-%v", addr1)
reactorPairs[1].reactor.SetLogger(logger1.With("module", moduleName[:19]))
defer func() {
for _, r := range reactorPairs {
_ = r.reactor.Stop()
_ = r.app.Stop()
}
}()
tests := []struct {
height int64
existent bool
}{
{maxBlockHeight + 2, false},
{10, true},
{1, true},
{100, false}}
for {
if reactorPairs[1].reactor.fsm.IsFinished() {
break
}
time.Sleep(10 * time.Millisecond)
}
assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.fsm.store.Height())
for _, tt := range tests {
block := reactorPairs[1].reactor.fsm.store.LoadBlock(tt.height)
if tt.existent {
assert.True(t, block != nil)
} else {
assert.True(t, block == nil)
}
}
}
// NOTE: This is too hard to test without
// an easy way to add test peer to switch
// or without significant refactoring of the module.
// Alternatively we could actually dial a TCP conn but
// that seems extreme.
func TestBadBlockStopsPeer(t *testing.T) {
peerTimeout = 15 * time.Second
maxRequestBatchSize = 40
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(148)
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
defer func() {
_ = otherChain.reactor.Stop()
_ = otherChain.app.Stop()
}()
reactorPairs := make([]BlockchainReactorPair, 4)
var logger = make([]log.Logger, 4)
for i := 0; i < 4; i++ {
logger[i] = log.TestingLogger()
height := int64(0)
if i == 0 {
height = maxBlockHeight
}
reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height)
}
switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor)
return s
}, p2p.Connect2Switches)
for i := 0; i < 4; i++ {
addr := reactorPairs[i].reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19]))
}
defer func() {
for _, r := range reactorPairs {
_ = r.reactor.Stop()
_ = r.app.Stop()
}
}()
for {
if reactorPairs[3].reactor.fsm.IsFinished() {
break
}
time.Sleep(1 * time.Second)
}
//at this time, reactors[0-3] is the newest
assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size())
//mark reactorPairs[3] is an invalid peer
reactorPairs[3].reactor.fsm.store = otherChain.reactor.fsm.store
lastLogger := log.TestingLogger()
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor)
return s
}, p2p.Connect2Switches)...)
addr := lastReactorPair.reactor.Switch.NodeInfo().ID()
moduleName := fmt.Sprintf("blockchain-%v", addr)
lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19]))
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
for {
if lastReactorPair.reactor.fsm.IsFinished() || lastReactorPair.reactor.Switch.Peers().Size() == 0 {
break
}
time.Sleep(1 * time.Second)
}
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1)
}
//----------------------------------------------
// utility funcs
func makeTxs(height int64) (txs []types.Tx) {
for i := 0; i < 10; i++ {
txs = append(txs, types.Tx([]byte{byte(height), byte(i)}))
}
return txs
}
func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address)
return block
}
type testApp struct {
abci.BaseApplication
}
var _ abci.Application = (*testApp)(nil)
func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) {
return abci.ResponseInfo{}
}
func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock {
return abci.ResponseBeginBlock{}
}
func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock {
return abci.ResponseEndBlock{}
}
func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx {
return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}}
}
func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx {
return abci.ResponseCheckTx{}
}
func (app *testApp) Commit() abci.ResponseCommit {
return abci.ResponseCommit{}
}
func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) {
return
}

247
blockchain_new/store.go Normal file
View File

@ -0,0 +1,247 @@
package blockchain_new
import (
"fmt"
"sync"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/types"
)
/*
BlockStore is a simple low level store for blocks.
There are three types of information stored:
- BlockMeta: Meta information about each block
- Block part: Parts of each block, aggregated w/ PartSet
- Commit: The commit part of each block, for gossiping precommit votes
Currently the precommit signatures are duplicated in the Block parts as
well as the Commit. In the future this may change, perhaps by moving
the Commit data outside the Block. (TODO)
// NOTE: BlockStore methods will panic if they encounter errors
// deserializing loaded data, indicating probable corruption on disk.
*/
type BlockStore struct {
db dbm.DB
mtx sync.RWMutex
height int64
}
// NewBlockStore returns a new BlockStore with the given DB,
// initialized to the last height that was committed to the DB.
func NewBlockStore(db dbm.DB) *BlockStore {
bsjson := LoadBlockStoreStateJSON(db)
return &BlockStore{
height: bsjson.Height,
db: db,
}
}
// Height returns the last known contiguous block height.
func (bs *BlockStore) Height() int64 {
bs.mtx.RLock()
defer bs.mtx.RUnlock()
return bs.height
}
// LoadBlock returns the block with the given height.
// If no block is found for that height, it returns nil.
func (bs *BlockStore) LoadBlock(height int64) *types.Block {
var blockMeta = bs.LoadBlockMeta(height)
if blockMeta == nil {
return nil
}
var block = new(types.Block)
buf := []byte{}
for i := 0; i < blockMeta.BlockID.PartsHeader.Total; i++ {
part := bs.LoadBlockPart(height, i)
buf = append(buf, part.Bytes...)
}
err := cdc.UnmarshalBinaryLengthPrefixed(buf, block)
if err != nil {
// NOTE: The existence of meta should imply the existence of the
// block. So, make sure meta is only saved after blocks are saved.
panic(cmn.ErrorWrap(err, "Error reading block"))
}
return block
}
// LoadBlockPart returns the Part at the given index
// from the block at the given height.
// If no part is found for the given height and index, it returns nil.
func (bs *BlockStore) LoadBlockPart(height int64, index int) *types.Part {
var part = new(types.Part)
bz := bs.db.Get(calcBlockPartKey(height, index))
if len(bz) == 0 {
return nil
}
err := cdc.UnmarshalBinaryBare(bz, part)
if err != nil {
panic(cmn.ErrorWrap(err, "Error reading block part"))
}
return part
}
// LoadBlockMeta returns the BlockMeta for the given height.
// If no block is found for the given height, it returns nil.
func (bs *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta {
var blockMeta = new(types.BlockMeta)
bz := bs.db.Get(calcBlockMetaKey(height))
if len(bz) == 0 {
return nil
}
err := cdc.UnmarshalBinaryBare(bz, blockMeta)
if err != nil {
panic(cmn.ErrorWrap(err, "Error reading block meta"))
}
return blockMeta
}
// LoadBlockCommit returns the Commit for the given height.
// This commit consists of the +2/3 and other Precommit-votes for block at `height`,
// and it comes from the block.LastCommit for `height+1`.
// If no commit is found for the given height, it returns nil.
func (bs *BlockStore) LoadBlockCommit(height int64) *types.Commit {
var commit = new(types.Commit)
bz := bs.db.Get(calcBlockCommitKey(height))
if len(bz) == 0 {
return nil
}
err := cdc.UnmarshalBinaryBare(bz, commit)
if err != nil {
panic(cmn.ErrorWrap(err, "Error reading block commit"))
}
return commit
}
// LoadSeenCommit returns the locally seen Commit for the given height.
// This is useful when we've seen a commit, but there has not yet been
// a new block at `height + 1` that includes this commit in its block.LastCommit.
func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit {
var commit = new(types.Commit)
bz := bs.db.Get(calcSeenCommitKey(height))
if len(bz) == 0 {
return nil
}
err := cdc.UnmarshalBinaryBare(bz, commit)
if err != nil {
panic(cmn.ErrorWrap(err, "Error reading block seen commit"))
}
return commit
}
// SaveBlock persists the given block, blockParts, and seenCommit to the underlying db.
// blockParts: Must be parts of the block
// seenCommit: The +2/3 precommits that were seen which committed at height.
// If all the nodes restart after committing a block,
// we need this to reload the precommits to catch-up nodes to the
// most recent height. Otherwise they'd stall at H-1.
func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
if block == nil {
cmn.PanicSanity("BlockStore can only save a non-nil block")
}
height := block.Height
if g, w := height, bs.Height()+1; g != w {
cmn.PanicSanity(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", w, g))
}
if !blockParts.IsComplete() {
cmn.PanicSanity(fmt.Sprintf("BlockStore can only save complete block part sets"))
}
// Save block meta
blockMeta := types.NewBlockMeta(block, blockParts)
metaBytes := cdc.MustMarshalBinaryBare(blockMeta)
bs.db.Set(calcBlockMetaKey(height), metaBytes)
// Save block parts
for i := 0; i < blockParts.Total(); i++ {
part := blockParts.GetPart(i)
bs.saveBlockPart(height, i, part)
}
// Save block commit (duplicate and separate from the Block)
blockCommitBytes := cdc.MustMarshalBinaryBare(block.LastCommit)
bs.db.Set(calcBlockCommitKey(height-1), blockCommitBytes)
// Save seen commit (seen +2/3 precommits for block)
// NOTE: we can delete this at a later height
seenCommitBytes := cdc.MustMarshalBinaryBare(seenCommit)
bs.db.Set(calcSeenCommitKey(height), seenCommitBytes)
// Save new BlockStoreStateJSON descriptor
BlockStoreStateJSON{Height: height}.Save(bs.db)
// Done!
bs.mtx.Lock()
bs.height = height
bs.mtx.Unlock()
// Flush
bs.db.SetSync(nil, nil)
}
func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) {
if height != bs.Height()+1 {
cmn.PanicSanity(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height))
}
partBytes := cdc.MustMarshalBinaryBare(part)
bs.db.Set(calcBlockPartKey(height, index), partBytes)
}
//-----------------------------------------------------------------------------
func calcBlockMetaKey(height int64) []byte {
return []byte(fmt.Sprintf("H:%v", height))
}
func calcBlockPartKey(height int64, partIndex int) []byte {
return []byte(fmt.Sprintf("P:%v:%v", height, partIndex))
}
func calcBlockCommitKey(height int64) []byte {
return []byte(fmt.Sprintf("C:%v", height))
}
func calcSeenCommitKey(height int64) []byte {
return []byte(fmt.Sprintf("SC:%v", height))
}
//-----------------------------------------------------------------------------
var blockStoreKey = []byte("blockStore")
type BlockStoreStateJSON struct {
Height int64 `json:"height"`
}
// Save persists the blockStore state to the database as JSON.
func (bsj BlockStoreStateJSON) Save(db dbm.DB) {
bytes, err := cdc.MarshalJSON(bsj)
if err != nil {
cmn.PanicSanity(fmt.Sprintf("Could not marshal state bytes: %v", err))
}
db.SetSync(blockStoreKey, bytes)
}
// LoadBlockStoreStateJSON returns the BlockStoreStateJSON as loaded from disk.
// If no BlockStoreStateJSON was previously persisted, it returns the zero value.
func LoadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON {
bytes := db.Get(blockStoreKey)
if len(bytes) == 0 {
return BlockStoreStateJSON{
Height: 0,
}
}
bsj := BlockStoreStateJSON{}
err := cdc.UnmarshalJSON(bytes, &bsj)
if err != nil {
panic(fmt.Sprintf("Could not unmarshal bytes: %X", bytes))
}
return bsj
}

View File

@ -0,0 +1,421 @@
package blockchain_new
import (
"bytes"
"fmt"
"os"
"runtime/debug"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/db"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
)
// A cleanupFunc cleans up any config / test files created for a particular
// test.
type cleanupFunc func()
// make a Commit with a single vote containing just the height and a timestamp
func makeTestCommit(height int64, timestamp time.Time) *types.Commit {
commitSigs := []*types.CommitSig{{Height: height, Timestamp: timestamp}}
return types.NewCommit(types.BlockID{}, commitSigs)
}
func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore, cleanupFunc) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
// blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB())
// stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB())
blockDB := dbm.NewMemDB()
stateDB := dbm.NewMemDB()
state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile())
if err != nil {
panic(cmn.ErrorWrap(err, "error constructing state from genesis file"))
}
return state, NewBlockStore(blockDB), func() { os.RemoveAll(config.RootDir) }
}
func TestLoadBlockStoreStateJSON(t *testing.T) {
db := db.NewMemDB()
bsj := &BlockStoreStateJSON{Height: 1000}
bsj.Save(db)
retrBSJ := LoadBlockStoreStateJSON(db)
assert.Equal(t, *bsj, retrBSJ, "expected the retrieved DBs to match")
}
func TestNewBlockStore(t *testing.T) {
db := db.NewMemDB()
db.Set(blockStoreKey, []byte(`{"height": "10000"}`))
bs := NewBlockStore(db)
require.Equal(t, int64(10000), bs.Height(), "failed to properly parse blockstore")
panicCausers := []struct {
data []byte
wantErr string
}{
{[]byte("artful-doger"), "not unmarshal bytes"},
{[]byte(" "), "unmarshal bytes"},
}
for i, tt := range panicCausers {
// Expecting a panic here on trying to parse an invalid blockStore
_, _, panicErr := doFn(func() (interface{}, error) {
db.Set(blockStoreKey, tt.data)
_ = NewBlockStore(db)
return nil, nil
})
require.NotNil(t, panicErr, "#%d panicCauser: %q expected a panic", i, tt.data)
assert.Contains(t, fmt.Sprintf("%#v", panicErr), tt.wantErr, "#%d data: %q", i, tt.data)
}
db.Set(blockStoreKey, nil)
bs = NewBlockStore(db)
assert.Equal(t, bs.Height(), int64(0), "expecting nil bytes to be unmarshaled alright")
}
func freshBlockStore() (*BlockStore, db.DB) {
db := db.NewMemDB()
return NewBlockStore(db), db
}
var (
state sm.State
block *types.Block
partSet *types.PartSet
part1 *types.Part
part2 *types.Part
seenCommit1 *types.Commit
)
func TestMain(m *testing.M) {
var cleanup cleanupFunc
state, _, cleanup = makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer)))
block = makeBlock(1, state, new(types.Commit))
partSet = block.MakePartSet(2)
part1 = partSet.GetPart(0)
part2 = partSet.GetPart(1)
seenCommit1 = makeTestCommit(10, tmtime.Now())
code := m.Run()
cleanup()
os.Exit(code)
}
// TODO: This test should be simplified ...
func TestBlockStoreSaveLoadBlock(t *testing.T) {
state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer)))
defer cleanup()
require.Equal(t, bs.Height(), int64(0), "initially the height should be zero")
// check there are no blocks at various heights
noBlockHeights := []int64{0, -1, 100, 1000, 2}
for i, height := range noBlockHeights {
if g := bs.LoadBlock(height); g != nil {
t.Errorf("#%d: height(%d) got a block; want nil", i, height)
}
}
// save a block
block := makeBlock(bs.Height()+1, state, new(types.Commit))
validPartSet := block.MakePartSet(2)
seenCommit := makeTestCommit(10, tmtime.Now())
bs.SaveBlock(block, partSet, seenCommit)
require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed")
incompletePartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 2})
uncontiguousPartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 0})
uncontiguousPartSet.AddPart(part2)
header1 := types.Header{
Height: 1,
NumTxs: 100,
ChainID: "block_test",
Time: tmtime.Now(),
}
header2 := header1
header2.Height = 4
// End of setup, test data
commitAtH10 := makeTestCommit(10, tmtime.Now())
tuples := []struct {
block *types.Block
parts *types.PartSet
seenCommit *types.Commit
wantErr bool
wantPanic string
corruptBlockInDB bool
corruptCommitInDB bool
corruptSeenCommitInDB bool
eraseCommitInDB bool
eraseSeenCommitInDB bool
}{
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
},
{
block: nil,
wantPanic: "only save a non-nil block",
},
{
block: newBlock(header2, commitAtH10),
parts: uncontiguousPartSet,
wantPanic: "only save contiguous blocks", // and incomplete and uncontiguous parts
},
{
block: newBlock(header1, commitAtH10),
parts: incompletePartSet,
wantPanic: "only save complete block", // incomplete parts
},
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
corruptCommitInDB: true, // Corrupt the DB's commit entry
wantPanic: "unmarshal to types.Commit failed",
},
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
wantPanic: "unmarshal to types.BlockMeta failed",
corruptBlockInDB: true, // Corrupt the DB's block entry
},
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
// Expecting no error and we want a nil back
eraseSeenCommitInDB: true,
},
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
corruptSeenCommitInDB: true,
wantPanic: "unmarshal to types.Commit failed",
},
{
block: newBlock(header1, commitAtH10),
parts: validPartSet,
seenCommit: seenCommit1,
// Expecting no error and we want a nil back
eraseCommitInDB: true,
},
}
type quad struct {
block *types.Block
commit *types.Commit
meta *types.BlockMeta
seenCommit *types.Commit
}
for i, tuple := range tuples {
bs, db := freshBlockStore()
// SaveBlock
res, err, panicErr := doFn(func() (interface{}, error) {
bs.SaveBlock(tuple.block, tuple.parts, tuple.seenCommit)
if tuple.block == nil {
return nil, nil
}
if tuple.corruptBlockInDB {
db.Set(calcBlockMetaKey(tuple.block.Height), []byte("block-bogus"))
}
bBlock := bs.LoadBlock(tuple.block.Height)
bBlockMeta := bs.LoadBlockMeta(tuple.block.Height)
if tuple.eraseSeenCommitInDB {
db.Delete(calcSeenCommitKey(tuple.block.Height))
}
if tuple.corruptSeenCommitInDB {
db.Set(calcSeenCommitKey(tuple.block.Height), []byte("bogus-seen-commit"))
}
bSeenCommit := bs.LoadSeenCommit(tuple.block.Height)
commitHeight := tuple.block.Height - 1
if tuple.eraseCommitInDB {
db.Delete(calcBlockCommitKey(commitHeight))
}
if tuple.corruptCommitInDB {
db.Set(calcBlockCommitKey(commitHeight), []byte("foo-bogus"))
}
bCommit := bs.LoadBlockCommit(commitHeight)
return &quad{block: bBlock, seenCommit: bSeenCommit, commit: bCommit,
meta: bBlockMeta}, nil
})
if subStr := tuple.wantPanic; subStr != "" {
if panicErr == nil {
t.Errorf("#%d: want a non-nil panic", i)
} else if got := fmt.Sprintf("%#v", panicErr); !strings.Contains(got, subStr) {
t.Errorf("#%d:\n\tgotErr: %q\nwant substring: %q", i, got, subStr)
}
continue
}
if tuple.wantErr {
if err == nil {
t.Errorf("#%d: got nil error", i)
}
continue
}
assert.Nil(t, panicErr, "#%d: unexpected panic", i)
assert.Nil(t, err, "#%d: expecting a non-nil error", i)
qua, ok := res.(*quad)
if !ok || qua == nil {
t.Errorf("#%d: got nil quad back; gotType=%T", i, res)
continue
}
if tuple.eraseSeenCommitInDB {
assert.Nil(t, qua.seenCommit,
"erased the seenCommit in the DB hence we should get back a nil seenCommit")
}
if tuple.eraseCommitInDB {
assert.Nil(t, qua.commit,
"erased the commit in the DB hence we should get back a nil commit")
}
}
}
func TestLoadBlockPart(t *testing.T) {
bs, db := freshBlockStore()
height, index := int64(10), 1
loadPart := func() (interface{}, error) {
part := bs.LoadBlockPart(height, index)
return part, nil
}
// Initially no contents.
// 1. Requesting for a non-existent block shouldn't fail
res, _, panicErr := doFn(loadPart)
require.Nil(t, panicErr, "a non-existent block part shouldn't cause a panic")
require.Nil(t, res, "a non-existent block part should return nil")
// 2. Next save a corrupted block then try to load it
db.Set(calcBlockPartKey(height, index), []byte("Tendermint"))
res, _, panicErr = doFn(loadPart)
require.NotNil(t, panicErr, "expecting a non-nil panic")
require.Contains(t, panicErr.Error(), "unmarshal to types.Part failed")
// 3. A good block serialized and saved to the DB should be retrievable
db.Set(calcBlockPartKey(height, index), cdc.MustMarshalBinaryBare(part1))
gotPart, _, panicErr := doFn(loadPart)
require.Nil(t, panicErr, "an existent and proper block should not panic")
require.Nil(t, res, "a properly saved block should return a proper block")
require.Equal(t, gotPart.(*types.Part), part1,
"expecting successful retrieval of previously saved block")
}
func TestLoadBlockMeta(t *testing.T) {
bs, db := freshBlockStore()
height := int64(10)
loadMeta := func() (interface{}, error) {
meta := bs.LoadBlockMeta(height)
return meta, nil
}
// Initially no contents.
// 1. Requesting for a non-existent blockMeta shouldn't fail
res, _, panicErr := doFn(loadMeta)
require.Nil(t, panicErr, "a non-existent blockMeta shouldn't cause a panic")
require.Nil(t, res, "a non-existent blockMeta should return nil")
// 2. Next save a corrupted blockMeta then try to load it
db.Set(calcBlockMetaKey(height), []byte("Tendermint-Meta"))
res, _, panicErr = doFn(loadMeta)
require.NotNil(t, panicErr, "expecting a non-nil panic")
require.Contains(t, panicErr.Error(), "unmarshal to types.BlockMeta")
// 3. A good blockMeta serialized and saved to the DB should be retrievable
meta := &types.BlockMeta{}
db.Set(calcBlockMetaKey(height), cdc.MustMarshalBinaryBare(meta))
gotMeta, _, panicErr := doFn(loadMeta)
require.Nil(t, panicErr, "an existent and proper block should not panic")
require.Nil(t, res, "a properly saved blockMeta should return a proper blocMeta ")
require.Equal(t, cdc.MustMarshalBinaryBare(meta), cdc.MustMarshalBinaryBare(gotMeta),
"expecting successful retrieval of previously saved blockMeta")
}
func TestBlockFetchAtHeight(t *testing.T) {
state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer)))
defer cleanup()
require.Equal(t, bs.Height(), int64(0), "initially the height should be zero")
block := makeBlock(bs.Height()+1, state, new(types.Commit))
partSet := block.MakePartSet(2)
seenCommit := makeTestCommit(10, tmtime.Now())
bs.SaveBlock(block, partSet, seenCommit)
require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed")
blockAtHeight := bs.LoadBlock(bs.Height())
bz1 := cdc.MustMarshalBinaryBare(block)
bz2 := cdc.MustMarshalBinaryBare(blockAtHeight)
require.Equal(t, bz1, bz2)
require.Equal(t, block.Hash(), blockAtHeight.Hash(),
"expecting a successful load of the last saved block")
blockAtHeightPlus1 := bs.LoadBlock(bs.Height() + 1)
require.Nil(t, blockAtHeightPlus1, "expecting an unsuccessful load of Height()+1")
blockAtHeightPlus2 := bs.LoadBlock(bs.Height() + 2)
require.Nil(t, blockAtHeightPlus2, "expecting an unsuccessful load of Height()+2")
}
func doFn(fn func() (interface{}, error)) (res interface{}, err error, panicErr error) {
defer func() {
if r := recover(); r != nil {
switch e := r.(type) {
case error:
panicErr = e
case string:
panicErr = fmt.Errorf("%s", e)
default:
if st, ok := r.(fmt.Stringer); ok {
panicErr = fmt.Errorf("%s", st)
} else {
panicErr = fmt.Errorf("%s", debug.Stack())
}
}
}
}()
res, err = fn()
return res, err, panicErr
}
func newBlock(hdr types.Header, lastCommit *types.Commit) *types.Block {
return &types.Block{
Header: hdr,
LastCommit: lastCommit,
}
}

13
blockchain_new/wire.go Normal file
View File

@ -0,0 +1,13 @@
package blockchain_new
import (
amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/types"
)
var cdc = amino.NewCodec()
func init() {
RegisterBlockchainMessages(cdc)
types.RegisterBlockAmino(cdc)
}