diff --git a/blockchain_new/peer.go b/blockchain_new/peer.go new file mode 100644 index 00000000..1c6e97d0 --- /dev/null +++ b/blockchain_new/peer.go @@ -0,0 +1,129 @@ +package blockchain_new + +import ( + "fmt" + "math" + "time" + + flow "github.com/tendermint/tendermint/libs/flowrate" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" +) + +//-------- +// Peer +var ( + peerTimeout = 15 * time.Second // not const so we can override with tests + + // Minimum recv rate to ensure we're receiving blocks from a peer fast + // enough. If a peer is not sending us data at at least that rate, we + // consider them to have timedout and we disconnect. + // + // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, + // sending data across atlantic ~ 7.5 KB/s. + minRecvRate = int64(7680) + + // Monitor parameters + peerSampleRate = time.Second + peerWindowSize = 40 * peerSampleRate +) + +type bpPeer struct { + id p2p.ID + recvMonitor *flow.Monitor + + height int64 + numPending int32 + timeout *time.Timer + didTimeout bool + + logger log.Logger + errFunc func(err error, peerID p2p.ID) // function to call on error +} + +func newBPPeer( + peerID p2p.ID, height int64, errFunc func(err error, peerID p2p.ID)) *bpPeer { + peer := &bpPeer{ + id: peerID, + height: height, + numPending: 0, + logger: log.NewNopLogger(), + errFunc: errFunc, + } + return peer +} + +func (peer *bpPeer) setLogger(l log.Logger) { + peer.logger = l +} + +func (peer *bpPeer) resetMonitor() { + peer.recvMonitor = flow.New(peerSampleRate, peerWindowSize) + initialValue := float64(minRecvRate) * math.E + peer.recvMonitor.SetREMA(initialValue) +} + +func (peer *bpPeer) resetTimeout() { + if peer.timeout == nil { + peer.timeout = time.AfterFunc(peerTimeout, peer.onTimeout) + } else { + peer.timeout.Reset(peerTimeout) + } +} + +func (peer *bpPeer) incrPending() { + if peer.numPending == 0 { + peer.resetMonitor() + peer.resetTimeout() + } + peer.numPending++ +} + +func (peer *bpPeer) decrPending(recvSize int) { + if peer.numPending == 0 { + panic("cannot decrement, peer does not have pending requests") + } + + peer.numPending-- + if peer.numPending == 0 { + peer.timeout.Stop() + } else { + peer.recvMonitor.Update(recvSize) + peer.resetTimeout() + } +} + +func (peer *bpPeer) onTimeout() { + peer.errFunc(errNoPeerResponse, peer.id) + peer.logger.Error("SendTimeout", "reason", errNoPeerResponse, "timeout", peerTimeout) + peer.didTimeout = true +} + +func (peer *bpPeer) isPeerGood() error { + if peer.didTimeout { + return errNoPeerResponse + } + + if !peer.didTimeout && peer.numPending > 0 { + curRate := peer.recvMonitor.Status().CurRate + // curRate can be 0 on start + if curRate != 0 && curRate < minRecvRate { + err := errSlowPeer + peer.logger.Error("SendTimeout", "peer", peer.id, + "reason", err, + "curRate", fmt.Sprintf("%d KB/s", curRate/1024), + "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) + // consider the peer timedout + peer.didTimeout = true + return errSlowPeer + } + } + + return nil +} + +func (peer *bpPeer) cleanup() { + if peer.timeout != nil { + peer.timeout.Stop() + } +} diff --git a/blockchain_new/peer_test.go b/blockchain_new/peer_test.go new file mode 100644 index 00000000..68f247ff --- /dev/null +++ b/blockchain_new/peer_test.go @@ -0,0 +1,224 @@ +package blockchain_new + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" +) + +var ( + numErrFuncCalls int + lastErr error + + testLog = log.TestingLogger() + mtx sync.Mutex +) + +func resetErrors() { + numErrFuncCalls = 0 + lastErr = nil +} + +func errFunc(err error, peerID p2p.ID) { + _ = peerID + lastErr = err + numErrFuncCalls++ +} + +// check if peer timer is running or not (a running timer can be successfully stopped) +// Note: it does stop the timer! +func checkByStoppingPeerTimer(t *testing.T, peer *bpPeer, running bool) { + assert.NotPanics(t, func() { + stopped := peer.timeout.Stop() + if running { + assert.True(t, stopped) + } else { + assert.False(t, stopped) + } + }) +} + +func TestPeerResetMonitor(t *testing.T) { + + peer := &bpPeer{ + id: p2p.ID(cmn.RandStr(12)), + height: 10, + logger: testLog, + errFunc: errFunc, + } + peer.resetMonitor() + assert.NotNil(t, peer.recvMonitor) +} + +func TestPeerTimer(t *testing.T) { + peerTimeout = 2 * time.Millisecond + + peer := &bpPeer{ + id: p2p.ID(cmn.RandStr(12)), + height: 10, + logger: testLog, + errFunc: errFunc, + } + assert.Nil(t, peer.timeout) + + // initial reset call with peer having a nil timer + peer.resetTimeout() + assert.NotNil(t, peer.timeout) + // make sure timer is running and stop it + checkByStoppingPeerTimer(t, peer, true) + + // reset with non nil expired timer + peer.resetTimeout() + assert.NotNil(t, peer.timeout) + // make sure timer is running and stop it + checkByStoppingPeerTimer(t, peer, true) + resetErrors() + + // reset with running timer (started above) + time.Sleep(time.Millisecond) + peer.resetTimeout() + assert.NotNil(t, peer.timeout) + + // let the timer expire and ... + time.Sleep(3 * time.Millisecond) + checkByStoppingPeerTimer(t, peer, false) + + // ... check an error has been sent, error is peerNonResponsive + assert.Equal(t, 1, numErrFuncCalls) + assert.Equal(t, lastErr, errNoPeerResponse) + assert.True(t, peer.didTimeout) +} + +func TestIncrPending(t *testing.T) { + peerTimeout = 2 * time.Millisecond + + peer := &bpPeer{ + id: p2p.ID(cmn.RandStr(12)), + height: 10, + logger: testLog, + errFunc: errFunc, + } + + peer.incrPending() + assert.NotNil(t, peer.recvMonitor) + assert.NotNil(t, peer.timeout) + assert.Equal(t, int32(1), peer.numPending) + + peer.incrPending() + assert.NotNil(t, peer.recvMonitor) + assert.NotNil(t, peer.timeout) + assert.Equal(t, int32(2), peer.numPending) +} + +func TestDecrPending(t *testing.T) { + peerTimeout = 2 * time.Millisecond + + peer := &bpPeer{ + id: p2p.ID(cmn.RandStr(12)), + height: 10, + logger: testLog, + errFunc: errFunc, + } + + // panic if numPending is 0 and try to decrement it + assert.Panics(t, func() { peer.decrPending(10) }) + + // decrement to zero + peer.incrPending() + peer.decrPending(10) + assert.Equal(t, int32(0), peer.numPending) + // make sure timer is not running + checkByStoppingPeerTimer(t, peer, false) + + // decrement to non zero + peer.incrPending() + peer.incrPending() + peer.decrPending(10) + assert.Equal(t, int32(1), peer.numPending) + // make sure timer is running and stop it + checkByStoppingPeerTimer(t, peer, true) +} + +func TestCanBeRemovedDueToExpiration(t *testing.T) { + minRecvRate = int64(100) // 100 bytes/sec exponential moving average + + peer := &bpPeer{ + id: p2p.ID(cmn.RandStr(12)), + height: 10, + errFunc: errFunc, + logger: testLog, + } + + peerTimeout = time.Millisecond + peer.incrPending() + time.Sleep(2 * time.Millisecond) + // timer expired, should be able to remove peer + assert.Equal(t, errNoPeerResponse, peer.isPeerGood()) +} + +func TestCanBeRemovedDueToLowSpeed(t *testing.T) { + minRecvRate = int64(100) // 100 bytes/sec exponential moving average + + peer := &bpPeer{ + id: p2p.ID(cmn.RandStr(12)), + height: 10, + errFunc: errFunc, + logger: testLog, + } + + peerTimeout = time.Second + peerSampleRate = 0 + peerWindowSize = 0 + + peer.incrPending() + peer.numPending = 100 + + // monitor starts with a higher rEMA (~ 2*minRecvRate), wait for it to go down + time.Sleep(900 * time.Millisecond) + + // normal peer - send a bit more than 100 byes/sec, > 10 byes/100msec, check peer is not considered slow + for i := 0; i < 10; i++ { + peer.decrPending(11) + time.Sleep(100 * time.Millisecond) + require.Nil(t, peer.isPeerGood()) + } + + // slow peer - send a bit less than 10 byes/100msec + for i := 0; i < 10; i++ { + peer.decrPending(9) + time.Sleep(100 * time.Millisecond) + } + // check peer is considered slow + assert.Equal(t, errSlowPeer, peer.isPeerGood()) + +} + +func TestCleanupPeer(t *testing.T) { + + peer := &bpPeer{ + id: p2p.ID(cmn.RandStr(12)), + height: 10, + logger: testLog, + errFunc: errFunc, + } + peerTimeout = 2 * time.Millisecond + assert.Nil(t, peer.timeout) + + // initial reset call with peer having a nil timer + peer.resetTimeout() + assert.NotNil(t, peer.timeout) + + mtx.Lock() + peer.cleanup() + mtx.Unlock() + + checkByStoppingPeerTimer(t, peer, false) + +} diff --git a/blockchain_new/reactor.go b/blockchain_new/reactor.go new file mode 100644 index 00000000..758045de --- /dev/null +++ b/blockchain_new/reactor.go @@ -0,0 +1,445 @@ +package blockchain_new + +import ( + "errors" + "fmt" + "github.com/tendermint/go-amino" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" + "reflect" + "time" +) + +const ( + // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) + BlockchainChannel = byte(0x40) + + maxTotalMessages = 1000 + + // NOTE: keep up to date with bcBlockResponseMessage + bcBlockResponseMessagePrefixSize = 4 + bcBlockResponseMessageFieldKeySize = 1 + maxMsgSize = types.MaxBlockSizeBytes + + bcBlockResponseMessagePrefixSize + + bcBlockResponseMessageFieldKeySize +) + +type consensusReactor interface { + // for when we switch from blockchain reactor and fast sync to + // the consensus machine + SwitchToConsensus(sm.State, int) +} + +type peerError struct { + err error + peerID p2p.ID +} + +func (e peerError) Error() string { + return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error()) +} + +type bReactorMsgFromFSM uint + +// message types +const ( + // message type events + errorMsg = iota + 1 + //blockRequestMsg + //statusRequestMsg +) + +type msgFromFSM struct { + msgType bReactorMsgFromFSM + error peerError +} + +// BlockchainReactor handles long-term catchup syncing. +type BlockchainReactor struct { + p2p.BaseReactor + + // immutable + initialState sm.State + state sm.State + + blockExec *sm.BlockExecutor + fastSync bool + + fsm *bReactorFSM + blocksSynced int + lastHundred time.Time + lastRate float64 + + msgFromFSMCh chan msgFromFSM +} + +// NewBlockchainReactor returns new reactor instance. +func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore, + fastSync bool) *BlockchainReactor { + + if state.LastBlockHeight != store.Height() { + panic(fmt.Sprintf("state (%v) and store (%v) height mismatch", state.LastBlockHeight, + store.Height())) + } + + bcR := &BlockchainReactor{ + initialState: state, + state: state, + blockExec: blockExec, + fastSync: fastSync, + } + fsm := NewFSM(store, bcR) + bcR.fsm = fsm + bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR) + return bcR +} + +// SetLogger implements cmn.Service by setting the logger on reactor and pool. +func (bcR *BlockchainReactor) SetLogger(l log.Logger) { + bcR.BaseService.Logger = l + bcR.fsm.setLogger(l) +} + +// OnStart implements cmn.Service. +func (bcR *BlockchainReactor) OnStart() error { + if bcR.fastSync { + bcR.fsm.start() + go bcR.poolRoutine() + } + return nil +} + +// OnStop implements cmn.Service. +func (bcR *BlockchainReactor) OnStop() { + bcR.fsm.stop() +} + +// GetChannels implements Reactor +func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + { + ID: BlockchainChannel, + Priority: 10, + SendQueueCapacity: 1000, + RecvBufferCapacity: 50 * 4096, + RecvMessageCapacity: maxMsgSize, + }, + } +} + +// AddPeer implements Reactor by sending our state to peer. +func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) { + msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.fsm.store.Height()}) + if !peer.Send(BlockchainChannel, msgBytes) { + // doing nothing, will try later in `poolRoutine` + } + // peer is added to the pool once we receive the first + // bcStatusResponseMessage from the peer and call pool.SetPeerHeight +} + +// RemovePeer implements Reactor by removing peer from the pool. +func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + bcR.fsm.RemovePeer(peer.ID()) +} + +// respondToPeer loads a block and sends it to the requesting peer, +// if we have it. Otherwise, we'll respond saying we don't have it. +// According to the Tendermint spec, if all nodes are honest, +// no node should be requesting for a block that's non-existent. +func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage, + src p2p.Peer) (queued bool) { + + block := bcR.fsm.store.LoadBlock(msg.Height) + if block != nil { + msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block}) + return src.TrySend(BlockchainChannel, msgBytes) + } + + bcR.Logger.Info("Peer asking for a block we don't have", "src", src, "height", msg.Height) + + msgBytes := cdc.MustMarshalBinaryBare(&bcNoBlockResponseMessage{Height: msg.Height}) + return src.TrySend(BlockchainChannel, msgBytes) +} + +// Receive implements Reactor by handling 4 types of messages (look below). +func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { + msg, err := decodeMsg(msgBytes) + if err != nil { + bcR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) + bcR.Switch.StopPeerForError(src, err) + return + } + + if err = msg.ValidateBasic(); err != nil { + bcR.Logger.Error("Peer sent us invalid msg", "peer", src, "msg", msg, "err", err) + bcR.Switch.StopPeerForError(src, err) + return + } + + bcR.Logger.Debug("Receive", "src", src, "chID", chID, "msg", msg) + + switch msg := msg.(type) { + case *bcBlockRequestMessage: + if queued := bcR.respondToPeer(msg, src); !queued { + // Unfortunately not queued since the queue is full. + } + case *bcBlockResponseMessage: + msgData := bReactorMessageData{ + event: blockResponseEv, + data: bReactorEventData{ + peerId: src.ID(), + block: msg.Block, + length: len(msgBytes), + }, + } + sendMessageToFSM(bcR.fsm, msgData) + + case *bcStatusRequestMessage: + // Send peer our state. + msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.fsm.store.Height()}) + queued := src.TrySend(BlockchainChannel, msgBytes) + if !queued { + // sorry + } + case *bcStatusResponseMessage: + // Got a peer status. Unverified. + msgData := bReactorMessageData{ + event: statusResponseEv, + data: bReactorEventData{ + peerId: src.ID(), + height: msg.Height, + length: len(msgBytes), + }, + } + sendMessageToFSM(bcR.fsm, msgData) + + default: + bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } +} + +func (bcR *BlockchainReactor) processBlocks(first *types.Block, second *types.Block) error { + + chainID := bcR.initialState.ChainID + + firstParts := first.MakePartSet(types.BlockPartSizeBytes) + firstPartsHeader := firstParts.Header() + firstID := types.BlockID{Hash: first.Hash(), PartsHeader: firstPartsHeader} + // Finally, verify the first block using the second's commit + // NOTE: we can probably make this more efficient, but note that calling + // first.Hash() doesn't verify the tx contents, so MakePartSet() is + // currently necessary. + err := bcR.state.Validators.VerifyCommit( + chainID, firstID, first.Height, second.LastCommit) + + if err != nil { + bcR.Logger.Error("Error in validation", "err", err, first.Height, second.Height) + peerID := bcR.fsm.blocks[first.Height].peerId + peer := bcR.Switch.Peers().Get(peerID) + if peer != nil { + bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) + } + peerID = bcR.fsm.blocks[second.Height].peerId + peer = bcR.Switch.Peers().Get(peerID) + if peer != nil { + bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) + } + return errBlockVerificationFailure + } + + bcR.fsm.store.SaveBlock(first, firstParts, second.LastCommit) + + // get the hash without persisting the state + bcR.state, err = bcR.blockExec.ApplyBlock(bcR.state, firstID, first) + if err != nil { + // TODO This is bad, are we zombie? + panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) + } + bcR.blocksSynced++ + + if bcR.blocksSynced%100 == 0 { + bcR.lastRate = 0.9*bcR.lastRate + 0.1*(100/time.Since(bcR.lastHundred).Seconds()) + bcR.Logger.Info("Fast Sync Rate", "height", bcR.fsm.height, + "max_peer_height", bcR.fsm.getMaxPeerHeight(), "blocks/s", bcR.lastRate) + bcR.lastHundred = time.Now() + } + return nil +} + +// Handle messages from the poolReactor telling the reactor what to do. +// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! +func (bcR *BlockchainReactor) poolRoutine() { + + for { + select { + case fromFSM := <-bcR.msgFromFSMCh: + switch fromFSM.msgType { + case errorMsg: + peer := bcR.Switch.Peers().Get(fromFSM.error.peerID) + if peer != nil { + bcR.Switch.StopPeerForError(peer, fromFSM.error.err) + } + } + } + } +} + +func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) { + peer := bcR.Switch.Peers().Get(peerID) + if peer != nil { + bcR.Switch.StopPeerForError(peer, err) + } +} + +func (bcR *BlockchainReactor) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) { + if timer == nil { + timer = time.AfterFunc(timeout, f) + } else { + timer.Reset(timeout) + } +} + +// BroadcastStatusRequest broadcasts `BlockStore` height. +func (bcR *BlockchainReactor) sendStatusRequest() error { + msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.fsm.store.Height()}) + bcR.Switch.Broadcast(BlockchainChannel, msgBytes) + return nil +} + +// BlockRequest sends `BlockRequest` height. +func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error { + peer := bcR.Switch.Peers().Get(peerID) + + if peer == nil { + return errNilPeerForBlockRequest + } + + msgBytes := cdc.MustMarshalBinaryBare(&bcBlockRequestMessage{height}) + queued := peer.TrySend(BlockchainChannel, msgBytes) + if !queued { + return errSendQueueFull + } + return nil +} + +func (bcR *BlockchainReactor) switchToConsensus() { + + conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) + if ok { + conR.SwitchToConsensus(bcR.state, bcR.blocksSynced) + } else { + // should only happen during testing + } +} + +//----------------------------------------------------------------------------- +// Messages + +// BlockchainMessage is a generic message for this reactor. +type BlockchainMessage interface { + ValidateBasic() error +} + +func RegisterBlockchainMessages(cdc *amino.Codec) { + cdc.RegisterInterface((*BlockchainMessage)(nil), nil) + cdc.RegisterConcrete(&bcBlockRequestMessage{}, "tendermint/blockchain/BlockRequest", nil) + cdc.RegisterConcrete(&bcBlockResponseMessage{}, "tendermint/blockchain/BlockResponse", nil) + cdc.RegisterConcrete(&bcNoBlockResponseMessage{}, "tendermint/blockchain/NoBlockResponse", nil) + cdc.RegisterConcrete(&bcStatusResponseMessage{}, "tendermint/blockchain/StatusResponse", nil) + cdc.RegisterConcrete(&bcStatusRequestMessage{}, "tendermint/blockchain/StatusRequest", nil) +} + +func decodeMsg(bz []byte) (msg BlockchainMessage, err error) { + if len(bz) > maxMsgSize { + return msg, fmt.Errorf("msg exceeds max size (%d > %d)", len(bz), maxMsgSize) + } + err = cdc.UnmarshalBinaryBare(bz, &msg) + return +} + +//------------------------------------- + +type bcBlockRequestMessage struct { + Height int64 +} + +// ValidateBasic performs basic validation. +func (m *bcBlockRequestMessage) ValidateBasic() error { + if m.Height < 0 { + return errors.New("negative Height") + } + return nil +} + +func (m *bcBlockRequestMessage) String() string { + return fmt.Sprintf("[bcBlockRequestMessage %v]", m.Height) +} + +type bcNoBlockResponseMessage struct { + Height int64 +} + +// ValidateBasic performs basic validation. +func (m *bcNoBlockResponseMessage) ValidateBasic() error { + if m.Height < 0 { + return errors.New("negative Height") + } + return nil +} + +func (m *bcNoBlockResponseMessage) String() string { + return fmt.Sprintf("[bcNoBlockResponseMessage %d]", m.Height) +} + +//------------------------------------- + +type bcBlockResponseMessage struct { + Block *types.Block +} + +// ValidateBasic performs basic validation. +func (m *bcBlockResponseMessage) ValidateBasic() error { + return m.Block.ValidateBasic() +} + +func (m *bcBlockResponseMessage) String() string { + return fmt.Sprintf("[bcBlockResponseMessage %v]", m.Block.Height) +} + +//------------------------------------- + +type bcStatusRequestMessage struct { + Height int64 +} + +// ValidateBasic performs basic validation. +func (m *bcStatusRequestMessage) ValidateBasic() error { + if m.Height < 0 { + return errors.New("negative Height") + } + return nil +} + +func (m *bcStatusRequestMessage) String() string { + return fmt.Sprintf("[bcStatusRequestMessage %v]", m.Height) +} + +//------------------------------------- + +type bcStatusResponseMessage struct { + Height int64 +} + +// ValidateBasic performs basic validation. +func (m *bcStatusResponseMessage) ValidateBasic() error { + if m.Height < 0 { + return errors.New("negative Height") + } + return nil +} + +func (m *bcStatusResponseMessage) String() string { + return fmt.Sprintf("[bcStatusResponseMessage %v]", m.Height) +} diff --git a/blockchain_new/reactor_fsm.go b/blockchain_new/reactor_fsm.go new file mode 100644 index 00000000..434f6e0c --- /dev/null +++ b/blockchain_new/reactor_fsm.go @@ -0,0 +1,721 @@ +package blockchain_new + +import ( + "errors" + "fmt" + "time" + + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" +) + +var ( + // should be >= 2 + maxRequestBatchSize = 40 +) + +type blockData struct { + block *types.Block + peerId p2p.ID +} + +// Blockchain Reactor State +type bReactorFSMState struct { + name string + // called when transitioning out of current state + handle func(*bReactorFSM, bReactorEvent, bReactorEventData) (next *bReactorFSMState, err error) + // called when entering the state + enter func(fsm *bReactorFSM) + + // timer to ensure FSM is not stuck in a state forever + timer *time.Timer + timeout time.Duration +} + +func (s *bReactorFSMState) String() string { + return s.name +} + +// Blockchain Reactor State Machine +type bReactorFSM struct { + logger log.Logger + startTime time.Time + + state *bReactorFSMState + + blocks map[int64]*blockData + height int64 // processing height + lastRequestHeight int64 + + peers map[p2p.ID]*bpPeer + maxPeerHeight int64 + + store *BlockStore + + // channel to receive messages + messageCh chan bReactorMessageData + + // interface used to send StatusRequest, BlockRequest, errors + bcr sendMessage +} + +// bReactorEventData is part of the message sent by the reactor to the FSM and used by the state handlers +type bReactorEventData struct { + peerId p2p.ID + err error // for peer error: timeout, slow, + height int64 // for status response + block *types.Block // for block response + stateName string // for state timeout events + length int // for block response to detect slow peers + +} + +// bReactorMessageData structure is used by the reactor when sending messages to the FSM. +type bReactorMessageData struct { + event bReactorEvent + data bReactorEventData +} + +func (msg *bReactorMessageData) String() string { + var dataStr string + + switch msg.event { + case startFSMEv: + dataStr = "" + case statusResponseEv: + dataStr = fmt.Sprintf("peer: %v height: %v", msg.data.peerId, msg.data.height) + case blockResponseEv: + dataStr = fmt.Sprintf("peer: %v block.height: %v lenght: %v", msg.data.peerId, msg.data.block.Height, msg.data.length) + case tryProcessBlockEv: + dataStr = "" + case stopFSMEv: + dataStr = "" + case peerErrEv: + dataStr = fmt.Sprintf("peer: %v err: %v", msg.data.peerId, msg.data.err) + case stateTimeoutEv: + dataStr = fmt.Sprintf("state: %v", msg.data.stateName) + default: + dataStr = fmt.Sprintf("cannot interpret message data") + return "event unknown" + } + + return fmt.Sprintf("event: %v %v", msg.event, dataStr) +} + +// Blockchain Reactor Events (the input to the state machine). +type bReactorEvent uint + +const ( + // message type events + startFSMEv = iota + 1 + statusResponseEv + blockResponseEv + tryProcessBlockEv + stopFSMEv + + // other events + peerErrEv = iota + 256 + stateTimeoutEv +) + +func (ev bReactorEvent) String() string { + switch ev { + case startFSMEv: + return "startFSMEv" + case statusResponseEv: + return "statusResponseEv" + case blockResponseEv: + return "blockResponseEv" + case tryProcessBlockEv: + return "tryProcessBlockEv" + case stopFSMEv: + return "stopFSMEv" + case peerErrEv: + return "peerErrEv" + case stateTimeoutEv: + return "stateTimeoutEv" + default: + return "event unknown" + } + +} + +// states +var ( + unknown *bReactorFSMState + waitForPeer *bReactorFSMState + waitForBlock *bReactorFSMState + finished *bReactorFSMState +) + +// state timers +var ( + waitForPeerTimeout = 20 * time.Second + waitForBlockTimeout = 30 * time.Second // > peerTimeout which is 15 sec +) + +// errors +var ( + // errors + errInvalidEvent = errors.New("invalid event in current state") + errNoErrorFinished = errors.New("FSM is finished") + errNoPeerResponse = errors.New("FSM timed out on peer response") + errNoPeerFoundForRequest = errors.New("cannot use peer") + errBadDataFromPeer = errors.New("received from wrong peer or bad block") + errMissingBlocks = errors.New("missing blocks") + errBlockVerificationFailure = errors.New("block verification failure, redo") + errNilPeerForBlockRequest = errors.New("nil peer for block request") + errSendQueueFull = errors.New("block request not made, send-queue is full") + errPeerTooShort = errors.New("peer height too low, peer was either not added " + + "or removed after status update") + errSwitchPeerErr = errors.New("switch detected peer error") + errSlowPeer = errors.New("peer is not sending us data fast enough") +) + +func init() { + unknown = &bReactorFSMState{ + name: "unknown", + handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { + switch ev { + case startFSMEv: + // Broadcast Status message. Currently doesn't return non-nil error. + _ = fsm.bcr.sendStatusRequest() + if fsm.state.timer != nil { + fsm.state.timer.Stop() + } + return waitForPeer, nil + + case stopFSMEv: + // cleanup + return finished, errNoErrorFinished + + default: + return unknown, errInvalidEvent + } + }, + } + + waitForPeer = &bReactorFSMState{ + name: "waitForPeer", + timeout: waitForPeerTimeout, + enter: func(fsm *bReactorFSM) { + // stop when leaving the state + fsm.resetStateTimer(waitForPeer) + }, + handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { + switch ev { + case stateTimeoutEv: + // no statusResponse received from any peer + // Should we send status request again? + if fsm.state.timer != nil { + fsm.state.timer.Stop() + } + return finished, errNoPeerResponse + + case statusResponseEv: + // update peer + if err := fsm.setPeerHeight(data.peerId, data.height); err != nil { + if len(fsm.peers) == 0 { + return waitForPeer, err + } + } + + // send first block requests + err := fsm.sendRequestBatch() + if err != nil { + // wait for more peers or state timeout + return waitForPeer, err + } + if fsm.state.timer != nil { + fsm.state.timer.Stop() + } + return waitForBlock, nil + + case stopFSMEv: + // cleanup + return finished, errNoErrorFinished + + default: + return waitForPeer, errInvalidEvent + } + }, + } + + waitForBlock = &bReactorFSMState{ + name: "waitForBlock", + timeout: waitForBlockTimeout, + enter: func(fsm *bReactorFSM) { + // stop when leaving the state or receiving a block + fsm.resetStateTimer(waitForBlock) + }, + handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { + switch ev { + case stateTimeoutEv: + // no blockResponse + // Should we send status request again? Switch to consensus? + // Note that any unresponsive peers have been already removed by their timer expiry handler. + if fsm.state.timer != nil { + fsm.state.timer.Stop() + } + return finished, errNoPeerResponse + + case statusResponseEv: + err := fsm.setPeerHeight(data.peerId, data.height) + return waitForBlock, err + + case blockResponseEv: + // add block to fsm.blocks + fsm.logger.Info("blockResponseEv", "H", data.block.Height) + err := fsm.addBlock(data.peerId, data.block, data.length) + if err != nil { + // unsolicited, from different peer, already have it.. + fsm.removePeer(data.peerId, err) + // ignore block + return waitForBlock, err + } + + if fsm.shouldTryProcessBlock() { + fsm.logger.Info("shouldTryProcessBlock", "first", fsm.height, "second", fsm.height+1) + // try to process block at fsm.height with the help of block at fsm.height+1 + fsm.sendSignalToProcessBlock() + } + if fsm.state.timer != nil { + fsm.state.timer.Stop() + } + return waitForBlock, nil + + case tryProcessBlockEv: + if err := fsm.processBlock(); err != nil { + if err == errMissingBlocks { + // continue so we ask for more blocks + } + if err == errBlockVerificationFailure { + // remove peers that sent us those blocks, blocks will also be removed + first := fsm.blocks[fsm.height].peerId + fsm.removePeer(first, err) + second := fsm.blocks[fsm.height+1].peerId + fsm.removePeer(second, err) + } + } else { + delete(fsm.blocks, fsm.height) + fsm.height++ + fsm.removeShortPeers() + + // processed block, check if we are done + if fsm.height >= fsm.maxPeerHeight { + // TODO should we wait for more status responses in case a high peer is slow? + fsm.bcr.switchToConsensus() + return finished, nil + } + } + // get other block(s) + err := fsm.sendRequestBatch() + if err != nil { + // TBD on what to do here... + // wait for more peers or state timeout + } + + if fsm.shouldTryProcessBlock() { + fsm.logger.Info("shouldTryProcessBlock", "first", fsm.height, "second", fsm.height+1) + // try to process block at fsm.height with the help of block at fsm.height+1 + fsm.sendSignalToProcessBlock() + } + + if fsm.state.timer != nil { + fsm.state.timer.Stop() + } + return waitForBlock, err + + case peerErrEv: + fsm.removePeer(data.peerId, data.err) + err := fsm.sendRequestBatch() + if err != nil { + // TBD on what to do here... + // wait for more peers or state timeout + } + if fsm.state.timer != nil { + fsm.state.timer.Stop() + } + return waitForBlock, err + + case stopFSMEv: + // cleanup + return finished, errNoErrorFinished + + default: + return waitForBlock, errInvalidEvent + } + }, + } + + finished = &bReactorFSMState{ + name: "finished", + enter: func(fsm *bReactorFSM) { + // cleanup + }, + handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { + return nil, nil + }, + } + +} + +func NewFSM(store *BlockStore, bcr sendMessage) *bReactorFSM { + messageCh := make(chan bReactorMessageData, maxTotalMessages) + + return &bReactorFSM{ + state: unknown, + store: store, + + blocks: make(map[int64]*blockData), + + peers: make(map[p2p.ID]*bpPeer), + height: store.Height() + 1, + bcr: bcr, + messageCh: messageCh, + } +} + +func sendMessageToFSM(fsm *bReactorFSM, msg bReactorMessageData) { + fsm.logger.Info("send message to FSM", "msg", msg.String()) + fsm.messageCh <- msg +} + +func (fsm *bReactorFSM) setLogger(l log.Logger) { + fsm.logger = l +} + +// starts the FSM go routine +func (fsm *bReactorFSM) start() { + go fsm.startRoutine() + fsm.startTime = time.Now() +} + +// stops the FSM go routine +func (fsm *bReactorFSM) stop() { + msg := bReactorMessageData{ + event: stopFSMEv, + } + sendMessageToFSM(fsm, msg) +} + +// start the FSM +func (fsm *bReactorFSM) startRoutine() { + + _ = fsm.handle(&bReactorMessageData{event: startFSMEv}) + +forLoop: + for { + select { + case msg := <-fsm.messageCh: + fsm.logger.Info("FSM Received message", "msg", msg.String()) + _ = fsm.handle(&msg) + if msg.event == stopFSMEv { + break forLoop + } + // TODO - stop also on some errors returned by handle + + default: + } + } +} + +// handle processes messages and events sent to the FSM. +func (fsm *bReactorFSM) handle(msg *bReactorMessageData) error { + fsm.logger.Info("Blockchain reactor FSM received event", "event", msg.event, "state", fsm.state.name) + + if fsm.state == nil { + fsm.state = unknown + } + next, err := fsm.state.handle(fsm, msg.event, msg.data) + if err != nil { + fsm.logger.Error("Blockchain reactor event handler returned", "err", err) + } + + fsm.transition(next) + fsm.logger.Info("FSM new state", "state", fsm.state.name) + return err +} + +func (fsm *bReactorFSM) transition(next *bReactorFSMState) { + if next == nil { + return + } + fsm.logger.Info("Blockchain reactor FSM changes state: ", "old", fsm.state.name, "new", next.name) + + if fsm.state != next { + fsm.state = next + if next.enter != nil { + next.enter(fsm) + } + } +} + +// Interface for sending Block and Status requests +// Implemented by BlockchainReactor and tests +type sendMessage interface { + sendStatusRequest() error + sendBlockRequest(peerID p2p.ID, height int64) error + sendPeerError(err error, peerID p2p.ID) + processBlocks(first *types.Block, second *types.Block) error + resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) + switchToConsensus() +} + +// FSM state timeout handler +func (fsm *bReactorFSM) sendStateTimeoutEvent(stateName string) { + // Check that the timeout is for the state we are currently in to prevent wrong transitions. + if stateName == fsm.state.name { + msg := bReactorMessageData{ + event: stateTimeoutEv, + data: bReactorEventData{ + stateName: stateName, + }, + } + sendMessageToFSM(fsm, msg) + } +} + +// This is called when entering an FSM state in order to detect lack of progress in the state machine. +// Note the use of the 'bcr' interface to facilitate testing without timer running. +func (fsm *bReactorFSM) resetStateTimer(state *bReactorFSMState) { + fsm.bcr.resetStateTimer(state.name, state.timer, state.timeout, func() { + fsm.sendStateTimeoutEvent(state.name) + }) +} + +// WIP +// TODO - pace the requests to peers +func (fsm *bReactorFSM) sendRequestBatch() error { + // remove slow and timed out peers + for _, peer := range fsm.peers { + if err := peer.isPeerGood(); err != nil { + fsm.logger.Info("Removing bad peer", "peer", peer.id, "err", err) + fsm.removePeer(peer.id, err) + } + } + + var err error + // make requests + for i := 0; i < maxRequestBatchSize; i++ { + // request height + height := fsm.height + int64(i) + if height > fsm.maxPeerHeight { + return err + } + req := fsm.blocks[height] + if req == nil { + // make new request + err = fsm.sendRequest(height) + if err != nil { + // couldn't find a good peer or couldn't communicate with it + } + } + } + + return nil +} + +func (fsm *bReactorFSM) sendRequest(height int64) error { + // make requests + // TODO - sort peers in order of goodness + for _, peer := range fsm.peers { + // Send Block Request message to peer + fsm.logger.Info("Try to send request to peer", "peer", peer.id, "height", height) + err := fsm.bcr.sendBlockRequest(peer.id, height) + if err == errSendQueueFull { + fsm.logger.Info("cannot send request, queue full", "peer", peer.id, "height", height) + continue + } + if err == errNilPeerForBlockRequest { + // this peer does not exist in the switch, delete locally + fsm.logger.Info("Peer doesn't exist in the switch", "peer", peer.id) + fsm.deletePeer(peer.id) + continue + } + // reserve space for block + fsm.blocks[height] = &blockData{peerId: peer.id, block: nil} + fsm.peers[peer.id].incrPending() + return nil + } + + return errNoPeerFoundForRequest +} + +// Sets the peer's blockchain height. +func (fsm *bReactorFSM) setPeerHeight(peerID p2p.ID, height int64) error { + + peer := fsm.peers[peerID] + + if height < fsm.height { + fsm.logger.Info("Peer height too small", "peer", peerID, "height", height, "fsm_height", fsm.height) + + // Don't add or update a peer that is not useful. + if peer != nil { + fsm.logger.Info("remove short peer", "peer", peerID, "height", height, "fsm_height", fsm.height) + fsm.removePeer(peerID, errPeerTooShort) + } + return errPeerTooShort + } + + if peer == nil { + peer = newBPPeer(peerID, height, fsm.processPeerError) + peer.setLogger(fsm.logger.With("peer", peerID)) + fsm.peers[peerID] = peer + } else { + // remove any requests made for heights in (height, peer.height] + for blockHeight, bData := range fsm.blocks { + if bData.peerId == peerID && blockHeight > height { + delete(fsm.blocks, blockHeight) + } + } + } + + peer.height = height + if height > fsm.maxPeerHeight { + fsm.maxPeerHeight = height + } + return nil +} + +func (fsm *bReactorFSM) getMaxPeerHeight() int64 { + return fsm.maxPeerHeight +} + +// called from: +// - the switch from its go routing +// - when peer times out from the timer go routine. +// Send message to FSM +func (fsm *bReactorFSM) processPeerError(err error, peerID p2p.ID) { + msgData := bReactorMessageData{ + event: peerErrEv, + data: bReactorEventData{ + err: err, + peerId: peerID, + }, + } + sendMessageToFSM(fsm, msgData) +} + +// called by the switch on peer error +func (fsm *bReactorFSM) RemovePeer(peerID p2p.ID) { + fsm.logger.Info("Switch removes peer", "peer", peerID, "fsm_height", fsm.height) + fsm.processPeerError(errSwitchPeerErr, peerID) +} + +// called every time FSM advances its height +func (fsm *bReactorFSM) removeShortPeers() { + for _, peer := range fsm.peers { + if peer.height < fsm.height { + fsm.logger.Info("removeShortPeers", "peer", peer.id) + fsm.removePeer(peer.id, nil) + } + } +} + +// removes any blocks and requests associated with the peer, deletes the peer and informs the switch if needed. +func (fsm *bReactorFSM) removePeer(peerID p2p.ID, err error) { + fsm.logger.Info("removePeer", "peer", peerID, "err", err) + // remove all data for blocks waiting for the peer or not processed yet + for h, bData := range fsm.blocks { + if bData.peerId == peerID { + delete(fsm.blocks, h) + } + } + + // delete peer + fsm.deletePeer(peerID) + + // recompute maxPeerHeight + fsm.maxPeerHeight = 0 + for _, peer := range fsm.peers { + if peer.height > fsm.maxPeerHeight { + fsm.maxPeerHeight = peer.height + } + } + + // send error to switch if not coming from it + if err != nil && err != errSwitchPeerErr { + fsm.bcr.sendPeerError(err, peerID) + } +} + +// stops the peer timer and deletes the peer +func (fsm *bReactorFSM) deletePeer(peerID p2p.ID) { + if p, exist := fsm.peers[peerID]; exist && p.timeout != nil { + p.timeout.Stop() + } + delete(fsm.peers, peerID) +} + +// Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map. +func (fsm *bReactorFSM) addBlock(peerID p2p.ID, block *types.Block, blockSize int) error { + + blockData := fsm.blocks[block.Height] + + if blockData == nil { + fsm.logger.Error("peer sent us a block we didn't expect", "peer", peerID, "curHeight", fsm.height, "blockHeight", block.Height) + return errBadDataFromPeer + } + + if blockData.peerId != peerID { + fsm.logger.Error("invalid peer", "peer", peerID, "blockHeight", block.Height) + return errBadDataFromPeer + } + if blockData.block != nil { + fsm.logger.Error("already have a block for height") + return errBadDataFromPeer + } + + fsm.blocks[block.Height].block = block + peer := fsm.peers[peerID] + if peer != nil { + peer.decrPending(blockSize) + } + return nil +} + +func (fsm *bReactorFSM) shouldTryProcessBlock() bool { + first := fsm.blocks[fsm.height] + second := fsm.blocks[fsm.height+1] + if first == nil || first.block == nil || second == nil || second.block == nil { + // We need both to sync the first block. + return false + } + return true +} + +func (fsm *bReactorFSM) sendSignalToProcessBlock() { + // TODO - add check that this was sent before, currently there are extraneous tryProcessBlockEv + fsm.logger.Info("Send signal to process", "first", fsm.height, "second", fsm.height+1) + + msgData := bReactorMessageData{ + event: tryProcessBlockEv, + data: bReactorEventData{}, + } + sendMessageToFSM(fsm, msgData) +} + +// Processes block at height H = fsm.height. Expects both H and H+1 to be available +func (fsm *bReactorFSM) processBlock() error { + first := fsm.blocks[fsm.height] + second := fsm.blocks[fsm.height+1] + if first == nil || first.block == nil || second == nil || second.block == nil { + // We need both to sync the first block. + return errMissingBlocks + } + fsm.logger.Info("process blocks", "first", first.block.Height, "second", second.block.Height) + fsm.logger.Info("FSM blocks", "blocks", fsm.blocks) + + if err := fsm.bcr.processBlocks(first.block, second.block); err != nil { + fsm.logger.Error("Process blocks returned error", "err", err, "first", first.block.Height, "second", second.block.Height) + fsm.logger.Error("FSM blocks", "blocks", fsm.blocks) + return err + } + return nil +} + +func (fsm *bReactorFSM) IsFinished() bool { + return fsm.state == finished +} diff --git a/blockchain_new/reactor_fsm_test.go b/blockchain_new/reactor_fsm_test.go new file mode 100644 index 00000000..920e195a --- /dev/null +++ b/blockchain_new/reactor_fsm_test.go @@ -0,0 +1,287 @@ +package blockchain_new + +import ( + "github.com/stretchr/testify/assert" + cmn "github.com/tendermint/tendermint/libs/common" + dbm "github.com/tendermint/tendermint/libs/db" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" + + "testing" + "time" +) + +var ( + failSendStatusRequest bool + failSendBlockRequest bool + numStatusRequests int + numBlockRequests int +) + +type lastBlockRequestT struct { + peerID p2p.ID + height int64 +} + +var lastBlockRequest lastBlockRequestT + +type lastPeerErrorT struct { + peerID p2p.ID + err error +} + +var lastPeerError lastPeerErrorT + +var stateTimerStarts map[string]int + +func resetTestValues() { + stateTimerStarts = make(map[string]int) + failSendBlockRequest = false + failSendStatusRequest = false + numStatusRequests = 0 + numBlockRequests = 0 + lastBlockRequest.peerID = "" + lastBlockRequest.height = 0 + lastPeerError.peerID = "" + lastPeerError.err = nil +} + +type fsmStepTestValues struct { + currentState string + event bReactorEvent + expectedState string + + // input + failStatusReq bool + shouldSendStatusReq bool + + failBlockReq bool + blockReqIncreased bool + + data bReactorEventData + + expectedLastBlockReq *lastBlockRequestT +} + +// WIP +func TestFSMTransitionSequences(t *testing.T) { + maxRequestBatchSize = 2 + fsmTransitionSequenceTests := [][]fsmStepTestValues{ + { + {currentState: "unknown", event: startFSMEv, shouldSendStatusReq: true, + expectedState: "waitForPeer"}, + {currentState: "waitForPeer", event: statusResponseEv, + data: bReactorEventData{peerId: "P1", height: 10}, + blockReqIncreased: true, + expectedState: "waitForBlock"}, + }, + } + + for _, tt := range fsmTransitionSequenceTests { + // Create and start the FSM + testBcR := &testReactor{logger: log.TestingLogger()} + blockDB := dbm.NewMemDB() + store := NewBlockStore(blockDB) + fsm := NewFSM(store, testBcR) + fsm.setLogger(log.TestingLogger()) + resetTestValues() + + // always start from unknown + fsm.resetStateTimer(unknown) + assert.Equal(t, 1, stateTimerStarts[unknown.name]) + + for _, step := range tt { + assert.Equal(t, step.currentState, fsm.state.name) + failSendStatusRequest = step.failStatusReq + failSendBlockRequest = step.failBlockReq + + oldNumStatusRequests := numStatusRequests + oldNumBlockRequests := numBlockRequests + + _ = sendEventToFSM(fsm, step.event, step.data) + if step.shouldSendStatusReq { + assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) + } else { + assert.Equal(t, oldNumStatusRequests, numStatusRequests) + } + + if step.blockReqIncreased { + assert.Equal(t, oldNumBlockRequests+maxRequestBatchSize, numBlockRequests) + } else { + assert.Equal(t, oldNumBlockRequests, numBlockRequests) + } + + assert.Equal(t, step.expectedState, fsm.state.name) + } + } +} + +func TestReactorFSMBasic(t *testing.T) { + maxRequestBatchSize = 2 + + resetTestValues() + // Create and start the FSM + testBcR := &testReactor{logger: log.TestingLogger()} + blockDB := dbm.NewMemDB() + store := NewBlockStore(blockDB) + fsm := NewFSM(store, testBcR) + fsm.setLogger(log.TestingLogger()) + + if err := fsm.handle(&bReactorMessageData{event: startFSMEv}); err != nil { + } + + // Check that FSM sends a status request message + assert.Equal(t, 1, numStatusRequests) + assert.Equal(t, waitForPeer.name, fsm.state.name) + + // Send a status response message to FSM + peerID := p2p.ID(cmn.RandStr(12)) + sendStatusResponse2(fsm, peerID, 10) + + // Check that FSM sends a block request message and... + assert.Equal(t, maxRequestBatchSize, numBlockRequests) + // ... the block request has the expected height + assert.Equal(t, int64(maxRequestBatchSize), lastBlockRequest.height) + assert.Equal(t, waitForBlock.name, fsm.state.name) +} + +func TestReactorFSMPeerTimeout(t *testing.T) { + maxRequestBatchSize = 2 + resetTestValues() + peerTimeout = 20 * time.Millisecond + // Create and start the FSM + testBcR := &testReactor{logger: log.TestingLogger()} + blockDB := dbm.NewMemDB() + store := NewBlockStore(blockDB) + fsm := NewFSM(store, testBcR) + fsm.setLogger(log.TestingLogger()) + fsm.start() + + // Check that FSM sends a status request message + time.Sleep(time.Millisecond) + assert.Equal(t, 1, numStatusRequests) + + // Send a status response message to FSM + peerID := p2p.ID(cmn.RandStr(12)) + sendStatusResponse(fsm, peerID, 10) + time.Sleep(5 * time.Millisecond) + + // Check that FSM sends a block request message and... + assert.Equal(t, maxRequestBatchSize, numBlockRequests) + // ... the block request has the expected height and peer + assert.Equal(t, int64(maxRequestBatchSize), lastBlockRequest.height) + assert.Equal(t, peerID, lastBlockRequest.peerID) + + // let FSM timeout on the block response message + time.Sleep(100 * time.Millisecond) + +} + +// reactor for FSM testing +type testReactor struct { + logger log.Logger + fsm *bReactorFSM + testCh chan bReactorMessageData +} + +func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error { + return fsm.handle(&bReactorMessageData{event: ev, data: data}) +} + +// ---------------------------------------- +// implementation for the test reactor APIs + +func (testR *testReactor) sendPeerError(err error, peerID p2p.ID) { + testR.logger.Info("Reactor received sendPeerError call from FSM", "peer", peerID, "err", err) + lastPeerError.peerID = peerID + lastPeerError.err = err +} + +func (testR *testReactor) sendStatusRequest() error { + testR.logger.Info("Reactor received sendStatusRequest call from FSM") + numStatusRequests++ + if failSendStatusRequest { + return errSendQueueFull + } + return nil +} + +func (testR *testReactor) sendBlockRequest(peerID p2p.ID, height int64) error { + testR.logger.Info("Reactor received sendBlockRequest call from FSM", "peer", peerID, "height", height) + numBlockRequests++ + lastBlockRequest.peerID = peerID + lastBlockRequest.height = height + return nil +} + +func (testR *testReactor) resetStateTimer(name string, timer *time.Timer, timeout time.Duration, f func()) { + testR.logger.Info("Reactor received resetStateTimer call from FSM", "state", name, "timeout", timeout) + if _, ok := stateTimerStarts[name]; !ok { + stateTimerStarts[name] = 1 + } else { + stateTimerStarts[name]++ + } +} + +func (testR *testReactor) processBlocks(first *types.Block, second *types.Block) error { + testR.logger.Info("Reactor received processBlocks call from FSM", "first", first.Height, "second", second.Height) + return nil +} + +func (testR *testReactor) switchToConsensus() { + testR.logger.Info("Reactor received switchToConsensus call from FSM") + +} + +// ---------------------------------------- + +// ------------------------------------------------------- +// helper functions for tests to simulate different events +func sendStatusResponse(fsm *bReactorFSM, peerID p2p.ID, height int64) { + msgBytes := makeStatusResponseMessage(height) + msgData := bReactorMessageData{ + event: statusResponseEv, + data: bReactorEventData{ + peerId: peerID, + height: height, + length: len(msgBytes), + }, + } + + sendMessageToFSM(fsm, msgData) +} + +func sendStatusResponse2(fsm *bReactorFSM, peerID p2p.ID, height int64) { + msgBytes := makeStatusResponseMessage(height) + msgData := &bReactorMessageData{ + event: statusResponseEv, + data: bReactorEventData{ + peerId: peerID, + height: height, + length: len(msgBytes), + }, + } + _ = fsm.handle(msgData) +} + +func sendStateTimeout(fsm *bReactorFSM, name string) { + msgData := &bReactorMessageData{ + event: stateTimeoutEv, + data: bReactorEventData{ + stateName: name, + }, + } + _ = fsm.handle(msgData) +} + +// ------------------------------------------------------- + +// ---------------------------------------------------- +// helper functions to make blockchain reactor messages +func makeStatusResponseMessage(height int64) []byte { + msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{height}) + return msgBytes +} + +// ---------------------------------------------------- diff --git a/blockchain_new/reactor_test.go b/blockchain_new/reactor_test.go new file mode 100644 index 00000000..a9e9706c --- /dev/null +++ b/blockchain_new/reactor_test.go @@ -0,0 +1,340 @@ +package blockchain_new + +import ( + "fmt" + "os" + "sort" + "testing" + "time" + + "github.com/stretchr/testify/assert" + abci "github.com/tendermint/tendermint/abci/types" + cfg "github.com/tendermint/tendermint/config" + cmn "github.com/tendermint/tendermint/libs/common" + dbm "github.com/tendermint/tendermint/libs/db" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/proxy" + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" + tmtime "github.com/tendermint/tendermint/types/time" +) + +var config *cfg.Config + +func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.GenesisDoc, []types.PrivValidator) { + validators := make([]types.GenesisValidator, numValidators) + privValidators := make([]types.PrivValidator, numValidators) + for i := 0; i < numValidators; i++ { + val, privVal := types.RandValidator(randPower, minPower) + validators[i] = types.GenesisValidator{ + PubKey: val.PubKey, + Power: val.VotingPower, + } + privValidators[i] = privVal + } + sort.Sort(types.PrivValidatorsByAddress(privValidators)) + + return &types.GenesisDoc{ + GenesisTime: tmtime.Now(), + ChainID: config.ChainID(), + Validators: validators, + }, privValidators +} + +func makeVote(header *types.Header, blockID types.BlockID, valset *types.ValidatorSet, privVal types.PrivValidator) *types.Vote { + addr := privVal.GetPubKey().Address() + idx, _ := valset.GetByAddress(addr) + vote := &types.Vote{ + ValidatorAddress: addr, + ValidatorIndex: idx, + Height: header.Height, + Round: 1, + Timestamp: tmtime.Now(), + Type: types.PrecommitType, + BlockID: blockID, + } + + _ = privVal.SignVote(header.ChainID, vote) + + return vote +} + +type BlockchainReactorPair struct { + reactor *BlockchainReactor + app proxy.AppConns +} + +func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair { + if len(privVals) != 1 { + panic("only support one validator") + } + + app := &testApp{} + cc := proxy.NewLocalClientCreator(app) + proxyApp := proxy.NewAppConns(cc) + err := proxyApp.Start() + if err != nil { + panic(cmn.ErrorWrap(err, "error start app")) + } + + blockDB := dbm.NewMemDB() + stateDB := dbm.NewMemDB() + blockStore := NewBlockStore(blockDB) + + state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc) + if err != nil { + panic(cmn.ErrorWrap(err, "error constructing state from genesis file")) + } + + // Make the BlockchainReactor itself. + // NOTE we have to create and commit the blocks first because + // pool.height is determined from the store. + fastSync := true + blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(), + sm.MockMempool{}, sm.MockEvidencePool{}) + + // let's add some blocks in + for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { + lastCommit := types.NewCommit(types.BlockID{}, nil) + if blockHeight > 1 { + lastBlockMeta := blockStore.LoadBlockMeta(blockHeight - 1) + lastBlock := blockStore.LoadBlock(blockHeight - 1) + + vote := makeVote(&lastBlock.Header, lastBlockMeta.BlockID, state.Validators, privVals[0]).CommitSig() + lastCommit = types.NewCommit(lastBlockMeta.BlockID, []*types.CommitSig{vote}) + } + + thisBlock := makeBlock(blockHeight, state, lastCommit) + + thisParts := thisBlock.MakePartSet(types.BlockPartSizeBytes) + blockID := types.BlockID{Hash: thisBlock.Hash(), PartsHeader: thisParts.Header()} + + state, err = blockExec.ApplyBlock(state, blockID, thisBlock) + if err != nil { + panic(cmn.ErrorWrap(err, "error apply block")) + } + + blockStore.SaveBlock(thisBlock, thisParts, lastCommit) + } + + bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) + /* + addr := bcReactor.Switch.NodeInfo().ID() + moduleName := fmt.Sprintf("blockchain-%v", addr) + bcReactor.SetLogger(logger.With("module", moduleName)) + */ + bcReactor.SetLogger(logger.With("module", "blockchain")) + + return BlockchainReactorPair{bcReactor, proxyApp} +} + +func TestNoBlockResponse(t *testing.T) { + peerTimeout = 15 * time.Second + maxRequestBatchSize = 40 + + config = cfg.ResetTestRoot("blockchain_new_reactor_test") + defer os.RemoveAll(config.RootDir) + genDoc, privVals := randGenesisDoc(1, false, 30) + + maxBlockHeight := int64(65) + + reactorPairs := make([]BlockchainReactorPair, 2) + + logger1 := log.TestingLogger() + reactorPairs[0] = newBlockchainReactor(logger1, genDoc, privVals, maxBlockHeight) + logger2 := log.TestingLogger() + reactorPairs[1] = newBlockchainReactor(logger2, genDoc, privVals, 0) + + p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + return s + + }, p2p.Connect2Switches) + + addr0 := reactorPairs[0].reactor.Switch.NodeInfo().ID() + moduleName := fmt.Sprintf("blockchain-%v", addr0) + reactorPairs[0].reactor.SetLogger(logger1.With("module", moduleName[:19])) + addr1 := reactorPairs[1].reactor.Switch.NodeInfo().ID() + moduleName = fmt.Sprintf("blockchain-%v", addr1) + reactorPairs[1].reactor.SetLogger(logger1.With("module", moduleName[:19])) + + defer func() { + for _, r := range reactorPairs { + _ = r.reactor.Stop() + _ = r.app.Stop() + } + }() + + tests := []struct { + height int64 + existent bool + }{ + {maxBlockHeight + 2, false}, + {10, true}, + {1, true}, + {100, false}} + + for { + if reactorPairs[1].reactor.fsm.IsFinished() { + break + } + + time.Sleep(10 * time.Millisecond) + } + + assert.Equal(t, maxBlockHeight, reactorPairs[0].reactor.fsm.store.Height()) + + for _, tt := range tests { + block := reactorPairs[1].reactor.fsm.store.LoadBlock(tt.height) + if tt.existent { + assert.True(t, block != nil) + } else { + assert.True(t, block == nil) + } + } +} + +// NOTE: This is too hard to test without +// an easy way to add test peer to switch +// or without significant refactoring of the module. +// Alternatively we could actually dial a TCP conn but +// that seems extreme. +func TestBadBlockStopsPeer(t *testing.T) { + peerTimeout = 15 * time.Second + maxRequestBatchSize = 40 + config = cfg.ResetTestRoot("blockchain_reactor_test") + defer os.RemoveAll(config.RootDir) + genDoc, privVals := randGenesisDoc(1, false, 30) + + maxBlockHeight := int64(148) + + otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) + defer func() { + _ = otherChain.reactor.Stop() + _ = otherChain.app.Stop() + }() + + reactorPairs := make([]BlockchainReactorPair, 4) + + var logger = make([]log.Logger, 4) + + for i := 0; i < 4; i++ { + logger[i] = log.TestingLogger() + height := int64(0) + if i == 0 { + height = maxBlockHeight + } + reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height) + } + + switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) + return s + + }, p2p.Connect2Switches) + + for i := 0; i < 4; i++ { + addr := reactorPairs[i].reactor.Switch.NodeInfo().ID() + moduleName := fmt.Sprintf("blockchain-%v", addr) + reactorPairs[i].reactor.SetLogger(logger[i].With("module", moduleName[:19])) + } + + defer func() { + for _, r := range reactorPairs { + _ = r.reactor.Stop() + _ = r.app.Stop() + } + }() + + for { + if reactorPairs[3].reactor.fsm.IsFinished() { + break + } + + time.Sleep(1 * time.Second) + } + + //at this time, reactors[0-3] is the newest + assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size()) + + //mark reactorPairs[3] is an invalid peer + reactorPairs[3].reactor.fsm.store = otherChain.reactor.fsm.store + + lastLogger := log.TestingLogger() + lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) + reactorPairs = append(reactorPairs, lastReactorPair) + + switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) + return s + + }, p2p.Connect2Switches)...) + + addr := lastReactorPair.reactor.Switch.NodeInfo().ID() + moduleName := fmt.Sprintf("blockchain-%v", addr) + lastReactorPair.reactor.SetLogger(lastLogger.With("module", moduleName[:19])) + + for i := 0; i < len(reactorPairs)-1; i++ { + p2p.Connect2Switches(switches, i, len(reactorPairs)-1) + } + + for { + if lastReactorPair.reactor.fsm.IsFinished() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { + break + } + + time.Sleep(1 * time.Second) + } + + assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) +} + +//---------------------------------------------- +// utility funcs + +func makeTxs(height int64) (txs []types.Tx) { + for i := 0; i < 10; i++ { + txs = append(txs, types.Tx([]byte{byte(height), byte(i)})) + } + return txs +} + +func makeBlock(height int64, state sm.State, lastCommit *types.Commit) *types.Block { + block, _ := state.MakeBlock(height, makeTxs(height), lastCommit, nil, state.Validators.GetProposer().Address) + return block +} + +type testApp struct { + abci.BaseApplication +} + +var _ abci.Application = (*testApp)(nil) + +func (app *testApp) Info(req abci.RequestInfo) (resInfo abci.ResponseInfo) { + return abci.ResponseInfo{} +} + +func (app *testApp) BeginBlock(req abci.RequestBeginBlock) abci.ResponseBeginBlock { + return abci.ResponseBeginBlock{} +} + +func (app *testApp) EndBlock(req abci.RequestEndBlock) abci.ResponseEndBlock { + return abci.ResponseEndBlock{} +} + +func (app *testApp) DeliverTx(tx []byte) abci.ResponseDeliverTx { + return abci.ResponseDeliverTx{Tags: []cmn.KVPair{}} +} + +func (app *testApp) CheckTx(tx []byte) abci.ResponseCheckTx { + return abci.ResponseCheckTx{} +} + +func (app *testApp) Commit() abci.ResponseCommit { + return abci.ResponseCommit{} +} + +func (app *testApp) Query(reqQuery abci.RequestQuery) (resQuery abci.ResponseQuery) { + return +} diff --git a/blockchain_new/store.go b/blockchain_new/store.go new file mode 100644 index 00000000..ff4aa6c8 --- /dev/null +++ b/blockchain_new/store.go @@ -0,0 +1,247 @@ +package blockchain_new + +import ( + "fmt" + "sync" + + cmn "github.com/tendermint/tendermint/libs/common" + dbm "github.com/tendermint/tendermint/libs/db" + + "github.com/tendermint/tendermint/types" +) + +/* +BlockStore is a simple low level store for blocks. + +There are three types of information stored: + - BlockMeta: Meta information about each block + - Block part: Parts of each block, aggregated w/ PartSet + - Commit: The commit part of each block, for gossiping precommit votes + +Currently the precommit signatures are duplicated in the Block parts as +well as the Commit. In the future this may change, perhaps by moving +the Commit data outside the Block. (TODO) + +// NOTE: BlockStore methods will panic if they encounter errors +// deserializing loaded data, indicating probable corruption on disk. +*/ +type BlockStore struct { + db dbm.DB + + mtx sync.RWMutex + height int64 +} + +// NewBlockStore returns a new BlockStore with the given DB, +// initialized to the last height that was committed to the DB. +func NewBlockStore(db dbm.DB) *BlockStore { + bsjson := LoadBlockStoreStateJSON(db) + return &BlockStore{ + height: bsjson.Height, + db: db, + } +} + +// Height returns the last known contiguous block height. +func (bs *BlockStore) Height() int64 { + bs.mtx.RLock() + defer bs.mtx.RUnlock() + return bs.height +} + +// LoadBlock returns the block with the given height. +// If no block is found for that height, it returns nil. +func (bs *BlockStore) LoadBlock(height int64) *types.Block { + var blockMeta = bs.LoadBlockMeta(height) + if blockMeta == nil { + return nil + } + + var block = new(types.Block) + buf := []byte{} + for i := 0; i < blockMeta.BlockID.PartsHeader.Total; i++ { + part := bs.LoadBlockPart(height, i) + buf = append(buf, part.Bytes...) + } + err := cdc.UnmarshalBinaryLengthPrefixed(buf, block) + if err != nil { + // NOTE: The existence of meta should imply the existence of the + // block. So, make sure meta is only saved after blocks are saved. + panic(cmn.ErrorWrap(err, "Error reading block")) + } + return block +} + +// LoadBlockPart returns the Part at the given index +// from the block at the given height. +// If no part is found for the given height and index, it returns nil. +func (bs *BlockStore) LoadBlockPart(height int64, index int) *types.Part { + var part = new(types.Part) + bz := bs.db.Get(calcBlockPartKey(height, index)) + if len(bz) == 0 { + return nil + } + err := cdc.UnmarshalBinaryBare(bz, part) + if err != nil { + panic(cmn.ErrorWrap(err, "Error reading block part")) + } + return part +} + +// LoadBlockMeta returns the BlockMeta for the given height. +// If no block is found for the given height, it returns nil. +func (bs *BlockStore) LoadBlockMeta(height int64) *types.BlockMeta { + var blockMeta = new(types.BlockMeta) + bz := bs.db.Get(calcBlockMetaKey(height)) + if len(bz) == 0 { + return nil + } + err := cdc.UnmarshalBinaryBare(bz, blockMeta) + if err != nil { + panic(cmn.ErrorWrap(err, "Error reading block meta")) + } + return blockMeta +} + +// LoadBlockCommit returns the Commit for the given height. +// This commit consists of the +2/3 and other Precommit-votes for block at `height`, +// and it comes from the block.LastCommit for `height+1`. +// If no commit is found for the given height, it returns nil. +func (bs *BlockStore) LoadBlockCommit(height int64) *types.Commit { + var commit = new(types.Commit) + bz := bs.db.Get(calcBlockCommitKey(height)) + if len(bz) == 0 { + return nil + } + err := cdc.UnmarshalBinaryBare(bz, commit) + if err != nil { + panic(cmn.ErrorWrap(err, "Error reading block commit")) + } + return commit +} + +// LoadSeenCommit returns the locally seen Commit for the given height. +// This is useful when we've seen a commit, but there has not yet been +// a new block at `height + 1` that includes this commit in its block.LastCommit. +func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit { + var commit = new(types.Commit) + bz := bs.db.Get(calcSeenCommitKey(height)) + if len(bz) == 0 { + return nil + } + err := cdc.UnmarshalBinaryBare(bz, commit) + if err != nil { + panic(cmn.ErrorWrap(err, "Error reading block seen commit")) + } + return commit +} + +// SaveBlock persists the given block, blockParts, and seenCommit to the underlying db. +// blockParts: Must be parts of the block +// seenCommit: The +2/3 precommits that were seen which committed at height. +// If all the nodes restart after committing a block, +// we need this to reload the precommits to catch-up nodes to the +// most recent height. Otherwise they'd stall at H-1. +func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { + if block == nil { + cmn.PanicSanity("BlockStore can only save a non-nil block") + } + height := block.Height + if g, w := height, bs.Height()+1; g != w { + cmn.PanicSanity(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", w, g)) + } + if !blockParts.IsComplete() { + cmn.PanicSanity(fmt.Sprintf("BlockStore can only save complete block part sets")) + } + + // Save block meta + blockMeta := types.NewBlockMeta(block, blockParts) + metaBytes := cdc.MustMarshalBinaryBare(blockMeta) + bs.db.Set(calcBlockMetaKey(height), metaBytes) + + // Save block parts + for i := 0; i < blockParts.Total(); i++ { + part := blockParts.GetPart(i) + bs.saveBlockPart(height, i, part) + } + + // Save block commit (duplicate and separate from the Block) + blockCommitBytes := cdc.MustMarshalBinaryBare(block.LastCommit) + bs.db.Set(calcBlockCommitKey(height-1), blockCommitBytes) + + // Save seen commit (seen +2/3 precommits for block) + // NOTE: we can delete this at a later height + seenCommitBytes := cdc.MustMarshalBinaryBare(seenCommit) + bs.db.Set(calcSeenCommitKey(height), seenCommitBytes) + + // Save new BlockStoreStateJSON descriptor + BlockStoreStateJSON{Height: height}.Save(bs.db) + + // Done! + bs.mtx.Lock() + bs.height = height + bs.mtx.Unlock() + + // Flush + bs.db.SetSync(nil, nil) +} + +func (bs *BlockStore) saveBlockPart(height int64, index int, part *types.Part) { + if height != bs.Height()+1 { + cmn.PanicSanity(fmt.Sprintf("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height)) + } + partBytes := cdc.MustMarshalBinaryBare(part) + bs.db.Set(calcBlockPartKey(height, index), partBytes) +} + +//----------------------------------------------------------------------------- + +func calcBlockMetaKey(height int64) []byte { + return []byte(fmt.Sprintf("H:%v", height)) +} + +func calcBlockPartKey(height int64, partIndex int) []byte { + return []byte(fmt.Sprintf("P:%v:%v", height, partIndex)) +} + +func calcBlockCommitKey(height int64) []byte { + return []byte(fmt.Sprintf("C:%v", height)) +} + +func calcSeenCommitKey(height int64) []byte { + return []byte(fmt.Sprintf("SC:%v", height)) +} + +//----------------------------------------------------------------------------- + +var blockStoreKey = []byte("blockStore") + +type BlockStoreStateJSON struct { + Height int64 `json:"height"` +} + +// Save persists the blockStore state to the database as JSON. +func (bsj BlockStoreStateJSON) Save(db dbm.DB) { + bytes, err := cdc.MarshalJSON(bsj) + if err != nil { + cmn.PanicSanity(fmt.Sprintf("Could not marshal state bytes: %v", err)) + } + db.SetSync(blockStoreKey, bytes) +} + +// LoadBlockStoreStateJSON returns the BlockStoreStateJSON as loaded from disk. +// If no BlockStoreStateJSON was previously persisted, it returns the zero value. +func LoadBlockStoreStateJSON(db dbm.DB) BlockStoreStateJSON { + bytes := db.Get(blockStoreKey) + if len(bytes) == 0 { + return BlockStoreStateJSON{ + Height: 0, + } + } + bsj := BlockStoreStateJSON{} + err := cdc.UnmarshalJSON(bytes, &bsj) + if err != nil { + panic(fmt.Sprintf("Could not unmarshal bytes: %X", bytes)) + } + return bsj +} diff --git a/blockchain_new/store_test.go b/blockchain_new/store_test.go new file mode 100644 index 00000000..dcbc0a13 --- /dev/null +++ b/blockchain_new/store_test.go @@ -0,0 +1,421 @@ +package blockchain_new + +import ( + "bytes" + "fmt" + "os" + "runtime/debug" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + cfg "github.com/tendermint/tendermint/config" + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/db" + dbm "github.com/tendermint/tendermint/libs/db" + "github.com/tendermint/tendermint/libs/log" + sm "github.com/tendermint/tendermint/state" + + "github.com/tendermint/tendermint/types" + tmtime "github.com/tendermint/tendermint/types/time" +) + +// A cleanupFunc cleans up any config / test files created for a particular +// test. +type cleanupFunc func() + +// make a Commit with a single vote containing just the height and a timestamp +func makeTestCommit(height int64, timestamp time.Time) *types.Commit { + commitSigs := []*types.CommitSig{{Height: height, Timestamp: timestamp}} + return types.NewCommit(types.BlockID{}, commitSigs) +} + +func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore, cleanupFunc) { + config := cfg.ResetTestRoot("blockchain_reactor_test") + // blockDB := dbm.NewDebugDB("blockDB", dbm.NewMemDB()) + // stateDB := dbm.NewDebugDB("stateDB", dbm.NewMemDB()) + blockDB := dbm.NewMemDB() + stateDB := dbm.NewMemDB() + state, err := sm.LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile()) + if err != nil { + panic(cmn.ErrorWrap(err, "error constructing state from genesis file")) + } + return state, NewBlockStore(blockDB), func() { os.RemoveAll(config.RootDir) } +} + +func TestLoadBlockStoreStateJSON(t *testing.T) { + db := db.NewMemDB() + + bsj := &BlockStoreStateJSON{Height: 1000} + bsj.Save(db) + + retrBSJ := LoadBlockStoreStateJSON(db) + + assert.Equal(t, *bsj, retrBSJ, "expected the retrieved DBs to match") +} + +func TestNewBlockStore(t *testing.T) { + db := db.NewMemDB() + db.Set(blockStoreKey, []byte(`{"height": "10000"}`)) + bs := NewBlockStore(db) + require.Equal(t, int64(10000), bs.Height(), "failed to properly parse blockstore") + + panicCausers := []struct { + data []byte + wantErr string + }{ + {[]byte("artful-doger"), "not unmarshal bytes"}, + {[]byte(" "), "unmarshal bytes"}, + } + + for i, tt := range panicCausers { + // Expecting a panic here on trying to parse an invalid blockStore + _, _, panicErr := doFn(func() (interface{}, error) { + db.Set(blockStoreKey, tt.data) + _ = NewBlockStore(db) + return nil, nil + }) + require.NotNil(t, panicErr, "#%d panicCauser: %q expected a panic", i, tt.data) + assert.Contains(t, fmt.Sprintf("%#v", panicErr), tt.wantErr, "#%d data: %q", i, tt.data) + } + + db.Set(blockStoreKey, nil) + bs = NewBlockStore(db) + assert.Equal(t, bs.Height(), int64(0), "expecting nil bytes to be unmarshaled alright") +} + +func freshBlockStore() (*BlockStore, db.DB) { + db := db.NewMemDB() + return NewBlockStore(db), db +} + +var ( + state sm.State + block *types.Block + partSet *types.PartSet + part1 *types.Part + part2 *types.Part + seenCommit1 *types.Commit +) + +func TestMain(m *testing.M) { + var cleanup cleanupFunc + state, _, cleanup = makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) + block = makeBlock(1, state, new(types.Commit)) + partSet = block.MakePartSet(2) + part1 = partSet.GetPart(0) + part2 = partSet.GetPart(1) + seenCommit1 = makeTestCommit(10, tmtime.Now()) + code := m.Run() + cleanup() + os.Exit(code) +} + +// TODO: This test should be simplified ... + +func TestBlockStoreSaveLoadBlock(t *testing.T) { + state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) + defer cleanup() + require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") + + // check there are no blocks at various heights + noBlockHeights := []int64{0, -1, 100, 1000, 2} + for i, height := range noBlockHeights { + if g := bs.LoadBlock(height); g != nil { + t.Errorf("#%d: height(%d) got a block; want nil", i, height) + } + } + + // save a block + block := makeBlock(bs.Height()+1, state, new(types.Commit)) + validPartSet := block.MakePartSet(2) + seenCommit := makeTestCommit(10, tmtime.Now()) + bs.SaveBlock(block, partSet, seenCommit) + require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed") + + incompletePartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 2}) + uncontiguousPartSet := types.NewPartSetFromHeader(types.PartSetHeader{Total: 0}) + uncontiguousPartSet.AddPart(part2) + + header1 := types.Header{ + Height: 1, + NumTxs: 100, + ChainID: "block_test", + Time: tmtime.Now(), + } + header2 := header1 + header2.Height = 4 + + // End of setup, test data + + commitAtH10 := makeTestCommit(10, tmtime.Now()) + tuples := []struct { + block *types.Block + parts *types.PartSet + seenCommit *types.Commit + wantErr bool + wantPanic string + + corruptBlockInDB bool + corruptCommitInDB bool + corruptSeenCommitInDB bool + eraseCommitInDB bool + eraseSeenCommitInDB bool + }{ + { + block: newBlock(header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + }, + + { + block: nil, + wantPanic: "only save a non-nil block", + }, + + { + block: newBlock(header2, commitAtH10), + parts: uncontiguousPartSet, + wantPanic: "only save contiguous blocks", // and incomplete and uncontiguous parts + }, + + { + block: newBlock(header1, commitAtH10), + parts: incompletePartSet, + wantPanic: "only save complete block", // incomplete parts + }, + + { + block: newBlock(header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + corruptCommitInDB: true, // Corrupt the DB's commit entry + wantPanic: "unmarshal to types.Commit failed", + }, + + { + block: newBlock(header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + wantPanic: "unmarshal to types.BlockMeta failed", + corruptBlockInDB: true, // Corrupt the DB's block entry + }, + + { + block: newBlock(header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + + // Expecting no error and we want a nil back + eraseSeenCommitInDB: true, + }, + + { + block: newBlock(header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + + corruptSeenCommitInDB: true, + wantPanic: "unmarshal to types.Commit failed", + }, + + { + block: newBlock(header1, commitAtH10), + parts: validPartSet, + seenCommit: seenCommit1, + + // Expecting no error and we want a nil back + eraseCommitInDB: true, + }, + } + + type quad struct { + block *types.Block + commit *types.Commit + meta *types.BlockMeta + + seenCommit *types.Commit + } + + for i, tuple := range tuples { + bs, db := freshBlockStore() + // SaveBlock + res, err, panicErr := doFn(func() (interface{}, error) { + bs.SaveBlock(tuple.block, tuple.parts, tuple.seenCommit) + if tuple.block == nil { + return nil, nil + } + + if tuple.corruptBlockInDB { + db.Set(calcBlockMetaKey(tuple.block.Height), []byte("block-bogus")) + } + bBlock := bs.LoadBlock(tuple.block.Height) + bBlockMeta := bs.LoadBlockMeta(tuple.block.Height) + + if tuple.eraseSeenCommitInDB { + db.Delete(calcSeenCommitKey(tuple.block.Height)) + } + if tuple.corruptSeenCommitInDB { + db.Set(calcSeenCommitKey(tuple.block.Height), []byte("bogus-seen-commit")) + } + bSeenCommit := bs.LoadSeenCommit(tuple.block.Height) + + commitHeight := tuple.block.Height - 1 + if tuple.eraseCommitInDB { + db.Delete(calcBlockCommitKey(commitHeight)) + } + if tuple.corruptCommitInDB { + db.Set(calcBlockCommitKey(commitHeight), []byte("foo-bogus")) + } + bCommit := bs.LoadBlockCommit(commitHeight) + return &quad{block: bBlock, seenCommit: bSeenCommit, commit: bCommit, + meta: bBlockMeta}, nil + }) + + if subStr := tuple.wantPanic; subStr != "" { + if panicErr == nil { + t.Errorf("#%d: want a non-nil panic", i) + } else if got := fmt.Sprintf("%#v", panicErr); !strings.Contains(got, subStr) { + t.Errorf("#%d:\n\tgotErr: %q\nwant substring: %q", i, got, subStr) + } + continue + } + + if tuple.wantErr { + if err == nil { + t.Errorf("#%d: got nil error", i) + } + continue + } + + assert.Nil(t, panicErr, "#%d: unexpected panic", i) + assert.Nil(t, err, "#%d: expecting a non-nil error", i) + qua, ok := res.(*quad) + if !ok || qua == nil { + t.Errorf("#%d: got nil quad back; gotType=%T", i, res) + continue + } + if tuple.eraseSeenCommitInDB { + assert.Nil(t, qua.seenCommit, + "erased the seenCommit in the DB hence we should get back a nil seenCommit") + } + if tuple.eraseCommitInDB { + assert.Nil(t, qua.commit, + "erased the commit in the DB hence we should get back a nil commit") + } + } +} + +func TestLoadBlockPart(t *testing.T) { + bs, db := freshBlockStore() + height, index := int64(10), 1 + loadPart := func() (interface{}, error) { + part := bs.LoadBlockPart(height, index) + return part, nil + } + + // Initially no contents. + // 1. Requesting for a non-existent block shouldn't fail + res, _, panicErr := doFn(loadPart) + require.Nil(t, panicErr, "a non-existent block part shouldn't cause a panic") + require.Nil(t, res, "a non-existent block part should return nil") + + // 2. Next save a corrupted block then try to load it + db.Set(calcBlockPartKey(height, index), []byte("Tendermint")) + res, _, panicErr = doFn(loadPart) + require.NotNil(t, panicErr, "expecting a non-nil panic") + require.Contains(t, panicErr.Error(), "unmarshal to types.Part failed") + + // 3. A good block serialized and saved to the DB should be retrievable + db.Set(calcBlockPartKey(height, index), cdc.MustMarshalBinaryBare(part1)) + gotPart, _, panicErr := doFn(loadPart) + require.Nil(t, panicErr, "an existent and proper block should not panic") + require.Nil(t, res, "a properly saved block should return a proper block") + require.Equal(t, gotPart.(*types.Part), part1, + "expecting successful retrieval of previously saved block") +} + +func TestLoadBlockMeta(t *testing.T) { + bs, db := freshBlockStore() + height := int64(10) + loadMeta := func() (interface{}, error) { + meta := bs.LoadBlockMeta(height) + return meta, nil + } + + // Initially no contents. + // 1. Requesting for a non-existent blockMeta shouldn't fail + res, _, panicErr := doFn(loadMeta) + require.Nil(t, panicErr, "a non-existent blockMeta shouldn't cause a panic") + require.Nil(t, res, "a non-existent blockMeta should return nil") + + // 2. Next save a corrupted blockMeta then try to load it + db.Set(calcBlockMetaKey(height), []byte("Tendermint-Meta")) + res, _, panicErr = doFn(loadMeta) + require.NotNil(t, panicErr, "expecting a non-nil panic") + require.Contains(t, panicErr.Error(), "unmarshal to types.BlockMeta") + + // 3. A good blockMeta serialized and saved to the DB should be retrievable + meta := &types.BlockMeta{} + db.Set(calcBlockMetaKey(height), cdc.MustMarshalBinaryBare(meta)) + gotMeta, _, panicErr := doFn(loadMeta) + require.Nil(t, panicErr, "an existent and proper block should not panic") + require.Nil(t, res, "a properly saved blockMeta should return a proper blocMeta ") + require.Equal(t, cdc.MustMarshalBinaryBare(meta), cdc.MustMarshalBinaryBare(gotMeta), + "expecting successful retrieval of previously saved blockMeta") +} + +func TestBlockFetchAtHeight(t *testing.T) { + state, bs, cleanup := makeStateAndBlockStore(log.NewTMLogger(new(bytes.Buffer))) + defer cleanup() + require.Equal(t, bs.Height(), int64(0), "initially the height should be zero") + block := makeBlock(bs.Height()+1, state, new(types.Commit)) + + partSet := block.MakePartSet(2) + seenCommit := makeTestCommit(10, tmtime.Now()) + bs.SaveBlock(block, partSet, seenCommit) + require.Equal(t, bs.Height(), block.Header.Height, "expecting the new height to be changed") + + blockAtHeight := bs.LoadBlock(bs.Height()) + bz1 := cdc.MustMarshalBinaryBare(block) + bz2 := cdc.MustMarshalBinaryBare(blockAtHeight) + require.Equal(t, bz1, bz2) + require.Equal(t, block.Hash(), blockAtHeight.Hash(), + "expecting a successful load of the last saved block") + + blockAtHeightPlus1 := bs.LoadBlock(bs.Height() + 1) + require.Nil(t, blockAtHeightPlus1, "expecting an unsuccessful load of Height()+1") + blockAtHeightPlus2 := bs.LoadBlock(bs.Height() + 2) + require.Nil(t, blockAtHeightPlus2, "expecting an unsuccessful load of Height()+2") +} + +func doFn(fn func() (interface{}, error)) (res interface{}, err error, panicErr error) { + defer func() { + if r := recover(); r != nil { + switch e := r.(type) { + case error: + panicErr = e + case string: + panicErr = fmt.Errorf("%s", e) + default: + if st, ok := r.(fmt.Stringer); ok { + panicErr = fmt.Errorf("%s", st) + } else { + panicErr = fmt.Errorf("%s", debug.Stack()) + } + } + } + }() + + res, err = fn() + return res, err, panicErr +} + +func newBlock(hdr types.Header, lastCommit *types.Commit) *types.Block { + return &types.Block{ + Header: hdr, + LastCommit: lastCommit, + } +} diff --git a/blockchain_new/wire.go b/blockchain_new/wire.go new file mode 100644 index 00000000..445c3e88 --- /dev/null +++ b/blockchain_new/wire.go @@ -0,0 +1,13 @@ +package blockchain_new + +import ( + amino "github.com/tendermint/go-amino" + "github.com/tendermint/tendermint/types" +) + +var cdc = amino.NewCodec() + +func init() { + RegisterBlockchainMessages(cdc) + types.RegisterBlockAmino(cdc) +}