consensus: msg saving and replay

This commit is contained in:
Ethan Buchman 2015-12-22 15:23:22 -05:00
parent 892174997b
commit 6aaa5fb0bf
8 changed files with 213 additions and 39 deletions

View File

@ -8,11 +8,6 @@ docker build -t tmbase -f Dockerfile .
# (config and blockchain data go in here)
docker run --name tmdata --entrypoint /bin/echo tmbase Data-only container for tmnode
# Copy files into the data-only container
# You should stop the containers before running this
# cd $DATA_SRC
# tar cf - . | docker run -i --rm --volumes-from mintdata mint tar xvf - -C /data/tendermint
# Run tendermint node
docker run --name tmnode --volumes-from tmdata -d -p 46656:46656 -p 46657:46657 -e TMSEEDS="goldenalchemist.chaintest.net:46657" -e TMNAME="testnode" -e TMREPO="github.com/tendermint/tendermint" -e TMHEAD="origin/develop" tmbase

View File

@ -34,6 +34,8 @@ Commands:
switch args[0] {
case "node":
node.RunNode()
case "replay":
node.RunReplay()
case "init":
init_files()
case "show_validator":

View File

@ -67,6 +67,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657")
mapConfig.SetDefault("prof_laddr", "")
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cs_msg_log", rootDir+"/cs_msg_log")
return mapConfig
}

View File

@ -239,8 +239,8 @@ func (conR *ConsensusReactor) registerEventCallbacks() {
conR.broadcastNewRoundStep(rs)
})
conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) {
edv := data.(*types.EventDataVote)
conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) {
edv := data.(types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
})
}

View File

@ -1,9 +1,11 @@
package consensus
import (
"bufio"
"bytes"
"errors"
"fmt"
"os"
"reflect"
"sync"
"time"
@ -153,20 +155,20 @@ var (
// msgs from the reactor which may update the state
type msgInfo struct {
msg ConsensusMessage
peerKey string
Msg ConsensusMessage `json:"msg"`
PeerKey string `json:"peer_key"`
}
// internally generated messages which may update the state
type timeoutInfo struct {
duration time.Duration
height int
round int
step RoundStepType
Duration time.Duration `json:"duration"`
Height int `json:"height"`
Round int `json:"round"`
Step RoundStepType `json:"step"`
}
func (ti *timeoutInfo) String() string {
return fmt.Sprintf("%v ; %d/%d %v", ti.duration, ti.height, ti.round, ti.step)
return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step)
}
// Tracks consensus state across block heights and rounds.
@ -190,6 +192,8 @@ type ConsensusState struct {
evsw *events.EventSwitch
msgLogFP *os.File // file we write all msgs and timeouts to, in order (for deterministic replay)
nSteps int // used for testing to limit the number of transitions the state makes
}
@ -268,6 +272,71 @@ func (cs *ConsensusState) startRoutines(maxSteps int) {
func (cs *ConsensusState) OnStop() {
cs.QuitService.OnStop()
if cs.msgLogFP != nil {
cs.msgLogFP.Close()
}
}
func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
return err
}
// Playback with tests
func (cs ConsensusState) ReplayMessagesFromFile(file string) error {
if cs.IsRunning() {
return errors.New("cs is already running, cannot replay")
}
cs.BaseService.OnStart()
cs.startRoutines(0)
if cs.msgLogFP != nil {
cs.msgLogFP.Close()
cs.msgLogFP = nil
}
// we ensure all new step events are regenerated as expected
newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1)
fp, err := os.OpenFile(file, os.O_RDONLY, 0666)
if err != nil {
return err
}
defer fp.Close()
scanner := bufio.NewScanner(fp)
for scanner.Scan() {
var err error
var msg ConsensusLogMessage
wire.ReadJSON(&msg, scanner.Bytes(), &err)
if err != nil {
return fmt.Errorf("Error reading json data: %v", err)
}
log.Notice("Replaying message", "type", reflect.TypeOf(msg.Msg), "msg", msg.Msg)
switch m := msg.Msg.(type) {
case *types.EventDataRoundState:
// these are playback checks
mi := <-newStepCh
m2 := mi.(*types.EventDataRoundState)
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
}
case msgInfo:
// internal or from peer
if m.PeerKey == "" {
cs.internalMsgQueue <- m
} else {
cs.peerMsgQueue <- m
}
case timeoutInfo:
cs.tockChan <- m
default:
return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg))
}
}
return nil
}
//------------------------------------------------------------
@ -455,10 +524,12 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
}
func (cs *ConsensusState) newStep() {
rs := cs.RoundStateEvent()
cs.saveMsg(rs)
cs.nSteps += 1
// newStep is called by updateToStep in NewConsensusState before the evsw is set!
if cs.evsw != nil {
cs.evsw.FireEvent(types.EventStringNewRoundStep(), cs.RoundStateEvent())
cs.evsw.FireEvent(types.EventStringNewRoundStep(), rs)
}
}
@ -477,13 +548,13 @@ func (cs *ConsensusState) timeoutRoutine() {
log.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// ignore tickers for old height/round/step
if newti.height < ti.height {
if newti.Height < ti.Height {
continue
} else if newti.height == ti.height {
if newti.round < ti.round {
} else if newti.Height == ti.Height {
if newti.Round < ti.Round {
continue
} else if newti.round == ti.round {
if ti.step > 0 && newti.step <= ti.step {
} else if newti.Round == ti.Round {
if ti.Step > 0 && newti.Step <= ti.Step {
continue
}
}
@ -492,16 +563,16 @@ func (cs *ConsensusState) timeoutRoutine() {
ti = newti
// if the newti has duration == 0, we relay to the tockChan immediately (no timeout)
if ti.duration == time.Duration(0) {
if ti.Duration == time.Duration(0) {
go func(t timeoutInfo) { cs.tockChan <- t }(ti)
continue
}
log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
cs.timeoutTicker.Stop()
cs.timeoutTicker = time.NewTicker(ti.duration)
cs.timeoutTicker = time.NewTicker(ti.Duration)
case <-cs.timeoutTicker.C:
log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
cs.timeoutTicker.Stop()
// go routine here gaurantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
@ -537,13 +608,16 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
select {
case mi = <-cs.peerMsgQueue:
cs.saveMsg(mi)
// handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi, rs)
case mi = <-cs.internalMsgQueue:
cs.saveMsg(mi)
// handles proposals, block parts, votes
cs.handleMsg(mi, rs)
case ti := <-cs.tockChan:
cs.saveMsg(ti)
// if the timeout is relevant to the rs
// go to the next step
cs.handleTimeout(ti, rs)
@ -559,7 +633,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
defer cs.mtx.Unlock()
var err error
msg, peerKey := mi.msg, mi.peerKey
msg, peerKey := mi.Msg, mi.PeerKey
switch msg := msg.(type) {
case *ProposalMessage:
// will not cause transition.
@ -592,10 +666,10 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
}
func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
log.Debug("Received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
// timeouts must be for current height, round, step
if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) {
if ti.Height != rs.Height || ti.Round < rs.Round || (ti.Round == rs.Round && ti.Step < rs.Step) {
log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step)
return
}
@ -604,22 +678,22 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
switch ti.step {
switch ti.Step {
case RoundStepNewHeight:
// NewRound event fired from enterNewRound.
// Do we want a timeout event too?
cs.enterNewRound(ti.height, 0)
// XXX: should we fire timeout here?
cs.enterNewRound(ti.Height, 0)
case RoundStepPropose:
cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent())
cs.enterPrevote(ti.height, ti.round)
cs.enterPrevote(ti.Height, ti.Round)
case RoundStepPrevoteWait:
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
cs.enterPrecommit(ti.height, ti.round)
cs.enterPrecommit(ti.Height, ti.Round)
case RoundStepPrecommitWait:
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
cs.enterNewRound(ti.height, ti.round+1)
cs.enterNewRound(ti.Height, ti.Round+1)
default:
panic(Fmt("Invalid timeout step: %v", ti.step))
panic(Fmt("Invalid timeout step: %v", ti.Step))
}
}
@ -1304,7 +1378,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
added, address, err = cs.LastCommit.AddByIndex(valIndex, vote)
if added {
log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort()))
cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote})
cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote})
}
return
@ -1315,7 +1389,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
height := cs.Height
added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey)
if added {
cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote})
cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote})
switch vote.Type {
case types.VoteTypePrevote:
@ -1414,6 +1488,66 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part
}
}
// Save Block, save the +2/3 Commits we've seen
func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSet, commits *types.VoteSet) {
// The proposal must be valid.
if err := cs.stageBlock(block, blockParts); err != nil {
PanicSanity(Fmt("saveBlock() an invalid block: %v", err))
}
// Save to blockStore.
if cs.blockStore.Height() < block.Height {
seenValidation := commits.MakeValidation()
cs.blockStore.SaveBlock(block, blockParts, seenValidation)
}
// Commit to proxyAppCtx
err := cs.stagedState.Commit(cs.proxyAppCtx)
if err != nil {
// TODO: handle this gracefully.
PanicQ(Fmt("Commit failed for applicaiton"))
}
// Save the state.
cs.stagedState.Save()
// Update mempool.
cs.mempool.Update(block)
// Fire off event
if cs.evsw != nil && cs.evc != nil {
cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block})
go cs.evc.Flush()
}
}
func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) {
if cs.msgLogFP != nil {
var n int
var err error
wire.WriteJSON(ConsensusLogMessage{msg}, cs.msgLogFP, &n, &err)
wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line
if err != nil {
log.Error("Error writing to consensus message log file", "err", err, "msg", msg)
}
}
}
type ConsensusLogMessage struct {
Msg ConsensusLogMessageInterface `json:"msg"`
}
type ConsensusLogMessageInterface interface{}
var _ = wire.RegisterInterface(
struct{ ConsensusLogMessageInterface }{},
wire.ConcreteType{&types.EventDataRoundState{}, 0x01},
wire.ConcreteType{msgInfo{}, 0x02},
wire.ConcreteType{timeoutInfo{}, 0x03},
)
//---------------------------------------------------------
func CompareHRS(h1, r1 int, s1 RoundStepType, h2, r2 int, s2 RoundStepType) int {

View File

@ -84,6 +84,9 @@ func NewNode(privValidator *types.PrivValidator) *Node {
consensusReactor.SetPrivValidator(privValidator)
}
// deterministic accountability
consensusState.OpenFileForMessageLog(config.GetString("cs_msg_log"))
// Make p2p network switch
sw := p2p.NewSwitch()
sw.AddReactor("MEMPOOL", mempoolReactor)
@ -308,6 +311,47 @@ func RunNode() {
})
}
func RunReplay() {
msgLogFile := config.GetString("cs_msg_log")
if msgLogFile == "" {
Exit("cs_msg_log file name not set in tendermint config")
}
// Get BlockStore
blockStoreDB := dbm.GetDB("blockstore")
blockStore := bc.NewBlockStore(blockStoreDB)
// Get State
stateDB := dbm.GetDB("state")
state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file"))
// Create two proxyAppCtx connections,
// one for the consensus and one for the mempool.
proxyAddr := config.GetString("proxy_app")
proxyAppCtxMempool := getProxyApp(proxyAddr, state.LastAppHash)
proxyAppCtxConsensus := getProxyApp(proxyAddr, state.LastAppHash)
// add the chainid to the global config
config.Set("chain_id", state.ChainID)
// Make event switch
eventSwitch := events.NewEventSwitch()
_, err := eventSwitch.Start()
if err != nil {
Exit(Fmt("Failed to start event switch: %v", err))
}
mempool := mempl.NewMempool(proxyAppCtxMempool)
consensusState := consensus.NewConsensusState(state.Copy(), proxyAppCtxConsensus, blockStore, mempool)
consensusState.SetEventSwitch(eventSwitch)
if err := consensusState.ReplayMessagesFromFile(msgLogFile); err != nil {
Exit(Fmt("Error during consensus replay: %v", err))
}
log.Notice("Replay run successfully")
}
// Load the most recent state from "state" db,
// or create a new one (and save) from genesis.
func getState() *sm.State {

View File

@ -106,7 +106,6 @@ func (app *remoteAppConn) sendRequestsRoutine() {
app.StopForError(err)
return
}
log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
if _, ok := reqres.Request.(tmsp.RequestFlush); ok {
err = app.bufWriter.Flush()
if err != nil {
@ -133,7 +132,6 @@ func (app *remoteAppConn) recvResponseRoutine() {
case tmsp.ResponseException:
app.StopForError(errors.New(res.Error))
default:
log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
err := app.didRecvResponse(res)
if err != nil {
app.StopForError(err)

View File

@ -52,7 +52,7 @@ var _ = wire.RegisterInterface(
// wire.ConcreteType{EventDataFork{}, EventDataTypeFork },
wire.ConcreteType{EventDataTx{}, EventDataTypeTx},
wire.ConcreteType{EventDataApp{}, EventDataTypeApp},
wire.ConcreteType{EventDataRoundState{}, EventDataTypeRoundState},
wire.ConcreteType{&EventDataRoundState{}, EventDataTypeRoundState}, // a pointer because we use it internally
wire.ConcreteType{EventDataVote{}, EventDataTypeVote},
)