2015-12-23 01:27:40 -05:00
|
|
|
package consensus
|
|
|
|
|
|
|
|
import (
|
2017-02-20 19:52:36 -05:00
|
|
|
"bytes"
|
2015-12-23 01:27:40 -05:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
2017-09-20 15:33:05 -07:00
|
|
|
"hash/crc32"
|
2016-01-10 23:31:05 -05:00
|
|
|
"io"
|
2015-12-23 01:27:40 -05:00
|
|
|
"reflect"
|
2017-10-28 11:07:59 -04:00
|
|
|
//"strconv"
|
|
|
|
//"strings"
|
2015-12-23 01:27:40 -05:00
|
|
|
"time"
|
|
|
|
|
2017-02-20 19:52:36 -05:00
|
|
|
abci "github.com/tendermint/abci/types"
|
2017-10-28 11:07:59 -04:00
|
|
|
//auto "github.com/tendermint/tmlibs/autofile"
|
2017-04-25 14:50:20 -04:00
|
|
|
cmn "github.com/tendermint/tmlibs/common"
|
2017-05-02 11:53:32 +04:00
|
|
|
"github.com/tendermint/tmlibs/log"
|
2015-12-23 01:27:40 -05:00
|
|
|
|
2017-02-20 19:52:36 -05:00
|
|
|
"github.com/tendermint/tendermint/proxy"
|
|
|
|
sm "github.com/tendermint/tendermint/state"
|
2015-12-23 01:27:40 -05:00
|
|
|
"github.com/tendermint/tendermint/types"
|
2017-09-22 11:42:40 -04:00
|
|
|
"github.com/tendermint/tendermint/version"
|
2015-12-23 01:27:40 -05:00
|
|
|
)
|
|
|
|
|
2017-09-26 09:51:08 +03:00
|
|
|
var crc32c = crc32.MakeTable(crc32.Castagnoli)
|
|
|
|
|
2017-02-20 19:52:36 -05:00
|
|
|
// Functionality to replay blocks and messages on recovery from a crash.
|
|
|
|
// There are two general failure scenarios: failure during consensus, and failure while applying the block.
|
|
|
|
// The former is handled by the WAL, the latter by the proxyApp Handshake on restart,
|
|
|
|
// which ultimately hands off the work to the WAL.
|
|
|
|
|
|
|
|
//-----------------------------------------
|
|
|
|
// recover from failure during consensus
|
|
|
|
// by replaying messages from the WAL
|
|
|
|
|
2016-10-28 15:01:14 -07:00
|
|
|
// Unmarshal and apply a single message to the consensus state
|
2016-08-14 12:31:24 -04:00
|
|
|
// as if it were received in receiveRoutine
|
2016-10-28 15:01:14 -07:00
|
|
|
// Lines that start with "#" are ignored.
|
2016-08-14 12:31:24 -04:00
|
|
|
// NOTE: receiveRoutine should not be running
|
2017-10-09 23:10:58 +04:00
|
|
|
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error {
|
2017-10-23 23:33:17 +04:00
|
|
|
// skip meta messages
|
2017-10-09 23:10:58 +04:00
|
|
|
if _, ok := msg.Msg.(EndHeightMessage); ok {
|
2016-10-28 15:01:14 -07:00
|
|
|
return nil
|
|
|
|
}
|
2015-12-23 01:27:40 -05:00
|
|
|
|
2016-01-18 14:10:05 -05:00
|
|
|
// for logging
|
|
|
|
switch m := msg.Msg.(type) {
|
2016-01-28 19:53:22 -08:00
|
|
|
case types.EventDataRoundState:
|
2017-05-02 11:53:32 +04:00
|
|
|
cs.Logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
|
2016-01-18 14:10:05 -05:00
|
|
|
// these are playback checks
|
|
|
|
ticker := time.After(time.Second * 2)
|
|
|
|
if newStepCh != nil {
|
|
|
|
select {
|
|
|
|
case mi := <-newStepCh:
|
2016-01-28 19:53:22 -08:00
|
|
|
m2 := mi.(types.EventDataRoundState)
|
2016-01-18 14:10:05 -05:00
|
|
|
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
|
|
|
|
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
|
|
|
|
}
|
|
|
|
case <-ticker:
|
|
|
|
return fmt.Errorf("Failed to read off newStepCh")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
case msgInfo:
|
|
|
|
peerKey := m.PeerKey
|
|
|
|
if peerKey == "" {
|
|
|
|
peerKey = "local"
|
|
|
|
}
|
|
|
|
switch msg := m.Msg.(type) {
|
|
|
|
case *ProposalMessage:
|
|
|
|
p := msg.Proposal
|
2017-05-02 11:53:32 +04:00
|
|
|
cs.Logger.Info("Replay: Proposal", "height", p.Height, "round", p.Round, "header",
|
2016-01-18 14:10:05 -05:00
|
|
|
p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey)
|
|
|
|
case *BlockPartMessage:
|
2017-05-02 11:53:32 +04:00
|
|
|
cs.Logger.Info("Replay: BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey)
|
2016-01-18 14:10:05 -05:00
|
|
|
case *VoteMessage:
|
|
|
|
v := msg.Vote
|
2017-05-02 11:53:32 +04:00
|
|
|
cs.Logger.Info("Replay: Vote", "height", v.Height, "round", v.Round, "type", v.Type,
|
2016-08-16 14:59:19 -07:00
|
|
|
"blockID", v.BlockID, "peer", peerKey)
|
2015-12-23 01:27:40 -05:00
|
|
|
}
|
2016-08-14 12:31:24 -04:00
|
|
|
|
2017-07-25 10:52:14 -04:00
|
|
|
cs.handleMsg(m)
|
2016-01-18 14:10:05 -05:00
|
|
|
case timeoutInfo:
|
2017-05-02 11:53:32 +04:00
|
|
|
cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
|
2016-08-14 12:31:24 -04:00
|
|
|
cs.handleTimeout(m, cs.RoundState)
|
2016-01-18 14:10:05 -05:00
|
|
|
default:
|
2016-10-28 15:01:14 -07:00
|
|
|
return fmt.Errorf("Replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg))
|
2015-12-23 01:27:40 -05:00
|
|
|
}
|
2016-01-18 14:10:05 -05:00
|
|
|
return nil
|
2015-12-23 01:27:40 -05:00
|
|
|
}
|
|
|
|
|
2016-08-14 12:31:24 -04:00
|
|
|
// replay only those messages since the last block.
|
|
|
|
// timeoutRoutine should run concurrently to read off tickChan
|
2016-10-11 16:06:46 -04:00
|
|
|
func (cs *ConsensusState) catchupReplay(csHeight int) error {
|
2016-09-08 18:06:25 -04:00
|
|
|
// set replayMode
|
|
|
|
cs.replayMode = true
|
|
|
|
defer func() { cs.replayMode = false }()
|
|
|
|
|
2017-04-14 16:27:22 -04:00
|
|
|
// Ensure that ENDHEIGHT for this height doesn't exist
|
2017-04-14 20:30:15 -04:00
|
|
|
// NOTE: This is just a sanity check. As far as we know things work fine without it,
|
|
|
|
// and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT).
|
2017-10-09 23:10:58 +04:00
|
|
|
gr, found, err := cs.wal.SearchForEndHeight(uint64(csHeight))
|
2017-10-03 19:11:55 -04:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-10-28 15:01:14 -07:00
|
|
|
if gr != nil {
|
2017-09-06 13:11:47 -04:00
|
|
|
if err := gr.Close(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-10-28 15:01:14 -07:00
|
|
|
}
|
2017-04-17 21:10:38 -04:00
|
|
|
if found {
|
new pubsub package
comment out failing consensus tests for now
rewrite rpc httpclient to use new pubsub package
import pubsub as tmpubsub, query as tmquery
make event IDs constants
EventKey -> EventTypeKey
rename EventsPubsub to PubSub
mempool does not use pubsub
rename eventsSub to pubsub
new subscribe API
fix channel size issues and consensus tests bugs
refactor rpc client
add missing discardFromChan method
add mutex
rename pubsub to eventBus
remove IsRunning from WSRPCConnection interface (not needed)
add a comment in broadcastNewRoundStepsAndVotes
rename registerEventCallbacks to broadcastNewRoundStepsAndVotes
See https://dave.cheney.net/2014/03/19/channel-axioms
stop eventBuses after reactor tests
remove unnecessary Unsubscribe
return subscribe helper function
move discardFromChan to where it is used
subscribe now returns an err
this gives us ability to refuse to subscribe if pubsub is at its max
capacity.
use context for control overflow
cache queries
handle err when subscribing in replay_test
rename testClientID to testSubscriber
extract var
set channel buffer capacity to 1 in replay_file
fix byzantine_test
unsubscribe from single event, not all events
refactor httpclient to return events to appropriate channels
return failing testReplayCrashBeforeWriteVote test
fix TestValidatorSetChanges
refactor code a bit
fix testReplayCrashBeforeWriteVote
add comment
fix TestValidatorSetChanges
fixes from Bucky's review
update comment [ci skip]
test TxEventBuffer
update changelog
fix TestValidatorSetChanges (2nd attempt)
only do wg.Done when no errors
benchmark event bus
create pubsub server inside NewEventBus
only expose config params (later if needed)
set buffer capacity to 0 so we are not testing cache
new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ}
This should allow to subscribe to all transactions! or a specific one
using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'"
use TimeoutCommit instead of afterPublishEventNewBlockTimeout
TimeoutCommit is the time a node waits after committing a block, before
it goes into the next height. So it will finish everything from the last
block, but then wait a bit. The idea is this gives it time to hear more
votes from other validators, to strengthen the commit it includes in the
next block. But it also gives it time to hear about new transactions.
waitForBlockWithUpdatedVals
rewrite WAL crash tests
Task:
test that we can recover from any WAL crash.
Solution:
the old tests were relying on event hub being run in the same thread (we
were injecting the private validator's last signature).
when considering a rewrite, we considered two possible solutions: write
a "fuzzy" testing system where WAL is crashing upon receiving a new
message, or inject failures and trigger them in tests using something
like https://github.com/coreos/gofail.
remove sleep
no cs.Lock around wal.Save
test different cases (empty block, non-empty block, ...)
comments
add comments
test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks
fixes as per Bucky's last review
reset subscriptions on UnsubscribeAll
use a simple counter to track message for which we panicked
also, set a smaller part size for all test cases
2017-06-26 19:00:30 +04:00
|
|
|
return fmt.Errorf("WAL should not contain #ENDHEIGHT %d.", csHeight)
|
2017-04-17 21:10:38 -04:00
|
|
|
}
|
2016-01-10 23:31:05 -05:00
|
|
|
|
2017-04-14 16:27:22 -04:00
|
|
|
// Search for last height marker
|
2017-10-09 23:10:58 +04:00
|
|
|
gr, found, err = cs.wal.SearchForEndHeight(uint64(csHeight - 1))
|
2016-12-06 01:16:13 -08:00
|
|
|
if err == io.EOF {
|
2017-05-02 11:53:32 +04:00
|
|
|
cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1)
|
2016-12-06 01:16:13 -08:00
|
|
|
} else if err != nil {
|
2016-01-10 23:31:05 -05:00
|
|
|
return err
|
2017-09-06 13:11:47 -04:00
|
|
|
} else {
|
2017-10-03 18:49:20 -04:00
|
|
|
defer gr.Close() // nolint: errcheck
|
2016-01-10 23:31:05 -05:00
|
|
|
}
|
2016-10-28 15:01:14 -07:00
|
|
|
if !found {
|
2017-09-06 01:53:41 -04:00
|
|
|
return errors.New(cmn.Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1))
|
2016-10-11 16:06:46 -04:00
|
|
|
}
|
2017-10-25 21:54:56 -04:00
|
|
|
defer gr.Close()
|
2016-10-11 16:06:46 -04:00
|
|
|
|
2017-05-02 11:53:32 +04:00
|
|
|
cs.Logger.Info("Catchup by replaying consensus messages", "height", csHeight)
|
2016-01-18 15:57:57 -05:00
|
|
|
|
2017-10-09 23:10:58 +04:00
|
|
|
var msg *TimedWALMessage
|
|
|
|
dec := WALDecoder{gr}
|
|
|
|
|
2016-10-28 15:01:14 -07:00
|
|
|
for {
|
2017-10-09 23:10:58 +04:00
|
|
|
msg, err = dec.Decode()
|
|
|
|
if err == io.EOF {
|
|
|
|
break
|
2017-10-23 23:33:17 +04:00
|
|
|
} else if err != nil {
|
2017-10-09 23:10:58 +04:00
|
|
|
return err
|
2016-01-10 23:31:05 -05:00
|
|
|
}
|
|
|
|
// NOTE: since the priv key is set when the msgs are received
|
|
|
|
// it will attempt to eg double sign but we can just ignore it
|
|
|
|
// since the votes will be replayed and we'll get to the next step
|
2017-10-09 23:10:58 +04:00
|
|
|
if err := cs.readReplayMessage(msg, nil); err != nil {
|
2016-01-10 23:31:05 -05:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2017-05-02 11:53:32 +04:00
|
|
|
cs.Logger.Info("Replay: Done")
|
2016-01-10 23:31:05 -05:00
|
|
|
return nil
|
2016-01-18 14:10:05 -05:00
|
|
|
}
|
|
|
|
|
2016-10-28 15:01:14 -07:00
|
|
|
//--------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// Parses marker lines of the form:
|
2017-04-14 16:27:22 -04:00
|
|
|
// #ENDHEIGHT: 12345
|
2017-10-28 11:07:59 -04:00
|
|
|
/*
|
2016-10-28 15:01:14 -07:00
|
|
|
func makeHeightSearchFunc(height int) auto.SearchFunc {
|
|
|
|
return func(line string) (int, error) {
|
|
|
|
line = strings.TrimRight(line, "\n")
|
|
|
|
parts := strings.Split(line, " ")
|
|
|
|
if len(parts) != 2 {
|
|
|
|
return -1, errors.New("Line did not have 2 parts")
|
|
|
|
}
|
|
|
|
i, err := strconv.Atoi(parts[1])
|
|
|
|
if err != nil {
|
|
|
|
return -1, errors.New("Failed to parse INFO: " + err.Error())
|
|
|
|
}
|
|
|
|
if height < i {
|
|
|
|
return 1, nil
|
|
|
|
} else if height == i {
|
|
|
|
return 0, nil
|
|
|
|
} else {
|
|
|
|
return -1, nil
|
|
|
|
}
|
|
|
|
}
|
2017-10-28 11:07:59 -04:00
|
|
|
}*/
|
2017-02-20 19:52:36 -05:00
|
|
|
|
|
|
|
//----------------------------------------------
|
|
|
|
// Recover from failure during block processing
|
|
|
|
// by handshaking with the app to figure out where
|
|
|
|
// we were last and using the WAL to recover there
|
|
|
|
|
|
|
|
type Handshaker struct {
|
2017-05-02 11:53:32 +04:00
|
|
|
state *sm.State
|
|
|
|
store types.BlockStore
|
|
|
|
logger log.Logger
|
2017-02-20 19:52:36 -05:00
|
|
|
|
|
|
|
nBlocks int // number of blocks applied to the state
|
|
|
|
}
|
|
|
|
|
2017-04-29 00:18:30 -04:00
|
|
|
func NewHandshaker(state *sm.State, store types.BlockStore) *Handshaker {
|
2017-05-02 11:53:32 +04:00
|
|
|
return &Handshaker{state, store, log.NewNopLogger(), 0}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *Handshaker) SetLogger(l log.Logger) {
|
|
|
|
h.logger = l
|
2017-02-20 19:52:36 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
func (h *Handshaker) NBlocks() int {
|
|
|
|
return h.nBlocks
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: retry the handshake/replay if it fails ?
|
|
|
|
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
|
|
|
|
// handshake is done via info request on the query conn
|
2017-09-22 11:42:40 -04:00
|
|
|
res, err := proxyApp.Query().InfoSync(abci.RequestInfo{version.Version})
|
2017-02-20 19:52:36 -05:00
|
|
|
if err != nil {
|
2017-04-25 14:50:20 -04:00
|
|
|
return errors.New(cmn.Fmt("Error calling Info: %v", err))
|
2017-02-20 19:52:36 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
blockHeight := int(res.LastBlockHeight) // XXX: beware overflow
|
|
|
|
appHash := res.LastBlockAppHash
|
|
|
|
|
2017-05-12 23:07:53 +02:00
|
|
|
h.logger.Info("ABCI Handshake", "appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash))
|
2017-02-20 19:52:36 -05:00
|
|
|
|
|
|
|
// TODO: check version
|
|
|
|
|
|
|
|
// replay blocks up to the latest in the blockstore
|
|
|
|
_, err = h.ReplayBlocks(appHash, blockHeight, proxyApp)
|
2017-04-27 18:29:38 -04:00
|
|
|
if err != nil {
|
2017-04-25 14:50:20 -04:00
|
|
|
return errors.New(cmn.Fmt("Error on replay: %v", err))
|
2017-02-20 19:52:36 -05:00
|
|
|
}
|
|
|
|
|
2017-05-12 23:07:53 +02:00
|
|
|
h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced", "appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash))
|
2017-03-05 02:04:09 -05:00
|
|
|
|
2017-02-20 19:52:36 -05:00
|
|
|
// TODO: (on restart) replay mempool
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Replay all blocks since appBlockHeight and ensure the result matches the current state.
|
|
|
|
// Returns the final AppHash or an error
|
|
|
|
func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp proxy.AppConns) ([]byte, error) {
|
|
|
|
|
|
|
|
storeBlockHeight := h.store.Height()
|
|
|
|
stateBlockHeight := h.state.LastBlockHeight
|
2017-05-02 11:53:32 +04:00
|
|
|
h.logger.Info("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight)
|
2017-02-20 19:52:36 -05:00
|
|
|
|
2017-04-27 19:39:06 +02:00
|
|
|
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain
|
|
|
|
if appBlockHeight == 0 {
|
|
|
|
validators := types.TM2PB.Validators(h.state.Validators)
|
2017-09-06 13:11:47 -04:00
|
|
|
if err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2017-04-27 19:39:06 +02:00
|
|
|
}
|
|
|
|
|
2017-02-20 19:52:36 -05:00
|
|
|
// First handle edge cases and constraints on the storeBlockHeight
|
|
|
|
if storeBlockHeight == 0 {
|
|
|
|
return appHash, h.checkAppHash(appHash)
|
|
|
|
|
|
|
|
} else if storeBlockHeight < appBlockHeight {
|
|
|
|
// the app should never be ahead of the store (but this is under app's control)
|
|
|
|
return appHash, sm.ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight}
|
|
|
|
|
|
|
|
} else if storeBlockHeight < stateBlockHeight {
|
|
|
|
// the state should never be ahead of the store (this is under tendermint's control)
|
2017-04-25 14:50:20 -04:00
|
|
|
cmn.PanicSanity(cmn.Fmt("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight))
|
2017-02-20 19:52:36 -05:00
|
|
|
|
|
|
|
} else if storeBlockHeight > stateBlockHeight+1 {
|
|
|
|
// store should be at most one ahead of the state (this is under tendermint's control)
|
2017-04-25 14:50:20 -04:00
|
|
|
cmn.PanicSanity(cmn.Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
|
2017-02-20 19:52:36 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
// Now either store is equal to state, or one ahead.
|
|
|
|
// For each, consider all cases of where the app could be, given app <= store
|
|
|
|
if storeBlockHeight == stateBlockHeight {
|
|
|
|
// Tendermint ran Commit and saved the state.
|
|
|
|
// Either the app is asking for replay, or we're all synced up.
|
|
|
|
if appBlockHeight < storeBlockHeight {
|
|
|
|
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
|
|
|
|
return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, false)
|
|
|
|
|
|
|
|
} else if appBlockHeight == storeBlockHeight {
|
2017-03-05 14:59:02 -05:00
|
|
|
// We're good!
|
2017-02-20 19:52:36 -05:00
|
|
|
return appHash, h.checkAppHash(appHash)
|
|
|
|
}
|
|
|
|
|
|
|
|
} else if storeBlockHeight == stateBlockHeight+1 {
|
|
|
|
// We saved the block in the store but haven't updated the state,
|
|
|
|
// so we'll need to replay a block using the WAL.
|
|
|
|
if appBlockHeight < stateBlockHeight {
|
|
|
|
// the app is further behind than it should be, so replay blocks
|
|
|
|
// but leave the last block to go through the WAL
|
|
|
|
return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, true)
|
|
|
|
|
|
|
|
} else if appBlockHeight == stateBlockHeight {
|
|
|
|
// We haven't run Commit (both the state and app are one block behind),
|
2017-04-15 01:33:30 -04:00
|
|
|
// so replayBlock with the real app.
|
2017-04-14 20:30:15 -04:00
|
|
|
// NOTE: We could instead use the cs.WAL on cs.Start,
|
|
|
|
// but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT
|
2017-05-02 11:53:32 +04:00
|
|
|
h.logger.Info("Replay last block using real app")
|
2017-04-15 01:33:30 -04:00
|
|
|
return h.replayBlock(storeBlockHeight, proxyApp.Consensus())
|
2017-02-20 19:52:36 -05:00
|
|
|
|
|
|
|
} else if appBlockHeight == storeBlockHeight {
|
2017-04-15 01:33:30 -04:00
|
|
|
// We ran Commit, but didn't save the state, so replayBlock with mock app
|
2017-04-14 20:30:15 -04:00
|
|
|
abciResponses := h.state.LoadABCIResponses()
|
|
|
|
mockApp := newMockProxyApp(appHash, abciResponses)
|
2017-05-02 11:53:32 +04:00
|
|
|
h.logger.Info("Replay last block using mock app")
|
2017-04-15 01:33:30 -04:00
|
|
|
return h.replayBlock(storeBlockHeight, mockApp)
|
2017-02-20 19:52:36 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2017-04-25 14:50:20 -04:00
|
|
|
cmn.PanicSanity("Should never happen")
|
2017-02-20 19:52:36 -05:00
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
2017-04-15 01:33:30 -04:00
|
|
|
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, mutateState bool) ([]byte, error) {
|
2017-02-20 19:52:36 -05:00
|
|
|
// App is further behind than it should be, so we need to replay blocks.
|
2017-03-28 14:06:03 -04:00
|
|
|
// We replay all blocks from appBlockHeight+1.
|
2017-08-21 16:31:54 -04:00
|
|
|
//
|
2017-02-20 19:52:36 -05:00
|
|
|
// Note that we don't have an old version of the state,
|
2017-04-17 15:24:44 -07:00
|
|
|
// so we by-pass state validation/mutation using sm.ExecCommitBlock.
|
2017-08-21 16:31:54 -04:00
|
|
|
// This also means we won't be saving validator sets if they change during this period.
|
|
|
|
//
|
2017-04-17 21:14:35 -04:00
|
|
|
// If mutateState == true, the final block is replayed with h.replayBlock()
|
2017-02-20 19:52:36 -05:00
|
|
|
|
|
|
|
var appHash []byte
|
|
|
|
var err error
|
|
|
|
finalBlock := storeBlockHeight
|
2017-04-15 01:33:30 -04:00
|
|
|
if mutateState {
|
2017-02-20 19:52:36 -05:00
|
|
|
finalBlock -= 1
|
|
|
|
}
|
|
|
|
for i := appBlockHeight + 1; i <= finalBlock; i++ {
|
2017-05-02 11:53:32 +04:00
|
|
|
h.logger.Info("Applying block", "height", i)
|
2017-02-20 19:52:36 -05:00
|
|
|
block := h.store.LoadBlock(i)
|
2017-05-02 11:53:32 +04:00
|
|
|
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger)
|
2017-02-20 19:52:36 -05:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2017-02-20 21:45:53 -05:00
|
|
|
|
|
|
|
h.nBlocks += 1
|
2017-02-20 19:52:36 -05:00
|
|
|
}
|
|
|
|
|
2017-04-15 01:33:30 -04:00
|
|
|
if mutateState {
|
2017-02-20 19:52:36 -05:00
|
|
|
// sync the final block
|
2017-04-17 21:14:35 -04:00
|
|
|
return h.replayBlock(storeBlockHeight, proxyApp.Consensus())
|
2017-02-20 19:52:36 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
return appHash, h.checkAppHash(appHash)
|
|
|
|
}
|
|
|
|
|
2017-04-14 20:30:15 -04:00
|
|
|
// ApplyBlock on the proxyApp with the last block.
|
2017-04-15 01:33:30 -04:00
|
|
|
func (h *Handshaker) replayBlock(height int, proxyApp proxy.AppConnConsensus) ([]byte, error) {
|
2017-02-20 21:45:53 -05:00
|
|
|
mempool := types.MockMempool{}
|
|
|
|
|
2017-04-14 20:30:15 -04:00
|
|
|
block := h.store.LoadBlock(height)
|
|
|
|
meta := h.store.LoadBlockMeta(height)
|
2017-02-20 21:45:53 -05:00
|
|
|
|
new pubsub package
comment out failing consensus tests for now
rewrite rpc httpclient to use new pubsub package
import pubsub as tmpubsub, query as tmquery
make event IDs constants
EventKey -> EventTypeKey
rename EventsPubsub to PubSub
mempool does not use pubsub
rename eventsSub to pubsub
new subscribe API
fix channel size issues and consensus tests bugs
refactor rpc client
add missing discardFromChan method
add mutex
rename pubsub to eventBus
remove IsRunning from WSRPCConnection interface (not needed)
add a comment in broadcastNewRoundStepsAndVotes
rename registerEventCallbacks to broadcastNewRoundStepsAndVotes
See https://dave.cheney.net/2014/03/19/channel-axioms
stop eventBuses after reactor tests
remove unnecessary Unsubscribe
return subscribe helper function
move discardFromChan to where it is used
subscribe now returns an err
this gives us ability to refuse to subscribe if pubsub is at its max
capacity.
use context for control overflow
cache queries
handle err when subscribing in replay_test
rename testClientID to testSubscriber
extract var
set channel buffer capacity to 1 in replay_file
fix byzantine_test
unsubscribe from single event, not all events
refactor httpclient to return events to appropriate channels
return failing testReplayCrashBeforeWriteVote test
fix TestValidatorSetChanges
refactor code a bit
fix testReplayCrashBeforeWriteVote
add comment
fix TestValidatorSetChanges
fixes from Bucky's review
update comment [ci skip]
test TxEventBuffer
update changelog
fix TestValidatorSetChanges (2nd attempt)
only do wg.Done when no errors
benchmark event bus
create pubsub server inside NewEventBus
only expose config params (later if needed)
set buffer capacity to 0 so we are not testing cache
new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ}
This should allow to subscribe to all transactions! or a specific one
using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'"
use TimeoutCommit instead of afterPublishEventNewBlockTimeout
TimeoutCommit is the time a node waits after committing a block, before
it goes into the next height. So it will finish everything from the last
block, but then wait a bit. The idea is this gives it time to hear more
votes from other validators, to strengthen the commit it includes in the
next block. But it also gives it time to hear about new transactions.
waitForBlockWithUpdatedVals
rewrite WAL crash tests
Task:
test that we can recover from any WAL crash.
Solution:
the old tests were relying on event hub being run in the same thread (we
were injecting the private validator's last signature).
when considering a rewrite, we considered two possible solutions: write
a "fuzzy" testing system where WAL is crashing upon receiving a new
message, or inject failures and trigger them in tests using something
like https://github.com/coreos/gofail.
remove sleep
no cs.Lock around wal.Save
test different cases (empty block, non-empty block, ...)
comments
add comments
test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks
fixes as per Bucky's last review
reset subscriptions on UnsubscribeAll
use a simple counter to track message for which we panicked
also, set a smaller part size for all test cases
2017-06-26 19:00:30 +04:00
|
|
|
if err := h.state.ApplyBlock(types.NopEventBus{}, proxyApp, block, meta.BlockID.PartsHeader, mempool); err != nil {
|
2017-03-27 15:41:45 -04:00
|
|
|
return nil, err
|
|
|
|
}
|
2017-02-20 21:45:53 -05:00
|
|
|
|
|
|
|
h.nBlocks += 1
|
|
|
|
|
2017-04-14 20:30:15 -04:00
|
|
|
return h.state.AppHash, nil
|
2017-02-20 21:45:53 -05:00
|
|
|
}
|
|
|
|
|
2017-02-20 19:52:36 -05:00
|
|
|
func (h *Handshaker) checkAppHash(appHash []byte) error {
|
|
|
|
if !bytes.Equal(h.state.AppHash, appHash) {
|
2017-04-25 14:50:20 -04:00
|
|
|
panic(errors.New(cmn.Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)).Error())
|
2017-02-20 21:45:53 -05:00
|
|
|
return nil
|
2017-02-20 19:52:36 -05:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
//--------------------------------------------------------------------------------
|
2017-04-14 20:30:15 -04:00
|
|
|
// mockProxyApp uses ABCIResponses to give the right results
|
|
|
|
// Useful because we don't want to call Commit() twice for the same block on the real app.
|
|
|
|
|
|
|
|
func newMockProxyApp(appHash []byte, abciResponses *sm.ABCIResponses) proxy.AppConnConsensus {
|
|
|
|
clientCreator := proxy.NewLocalClientCreator(&mockProxyApp{
|
|
|
|
appHash: appHash,
|
|
|
|
abciResponses: abciResponses,
|
|
|
|
})
|
2017-02-20 19:52:36 -05:00
|
|
|
cli, _ := clientCreator.NewABCIClient()
|
2017-09-06 13:11:47 -04:00
|
|
|
_, err := cli.Start()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
2017-02-20 19:52:36 -05:00
|
|
|
return proxy.NewAppConnConsensus(cli)
|
|
|
|
}
|
|
|
|
|
|
|
|
type mockProxyApp struct {
|
|
|
|
abci.BaseApplication
|
|
|
|
|
2017-04-14 20:30:15 -04:00
|
|
|
appHash []byte
|
|
|
|
txCount int
|
|
|
|
abciResponses *sm.ABCIResponses
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mock *mockProxyApp) DeliverTx(tx []byte) abci.Result {
|
|
|
|
r := mock.abciResponses.DeliverTx[mock.txCount]
|
|
|
|
mock.txCount += 1
|
|
|
|
return abci.Result{
|
|
|
|
r.Code,
|
|
|
|
r.Data,
|
|
|
|
r.Log,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (mock *mockProxyApp) EndBlock(height uint64) abci.ResponseEndBlock {
|
|
|
|
mock.txCount = 0
|
|
|
|
return mock.abciResponses.EndBlock
|
2017-02-20 19:52:36 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
func (mock *mockProxyApp) Commit() abci.Result {
|
|
|
|
return abci.NewResultOK(mock.appHash, "")
|
|
|
|
}
|