From 6aaa5fb0bffd08dad0b7f5d98b90bbdb0d3f309a Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 22 Dec 2015 15:23:22 -0500 Subject: [PATCH 01/11] consensus: msg saving and replay --- DOCKER/docker.sh | 5 - cmd/tendermint/main.go | 2 + config/tendermint/config.go | 1 + consensus/reactor.go | 4 +- consensus/state.go | 192 ++++++++++++++++++++++++++++++------ node/node.go | 44 +++++++++ proxy/remote_app_conn.go | 2 - types/events.go | 2 +- 8 files changed, 213 insertions(+), 39 deletions(-) diff --git a/DOCKER/docker.sh b/DOCKER/docker.sh index d64557a1..d2eb1471 100755 --- a/DOCKER/docker.sh +++ b/DOCKER/docker.sh @@ -8,11 +8,6 @@ docker build -t tmbase -f Dockerfile . # (config and blockchain data go in here) docker run --name tmdata --entrypoint /bin/echo tmbase Data-only container for tmnode -# Copy files into the data-only container -# You should stop the containers before running this -# cd $DATA_SRC -# tar cf - . | docker run -i --rm --volumes-from mintdata mint tar xvf - -C /data/tendermint - # Run tendermint node docker run --name tmnode --volumes-from tmdata -d -p 46656:46656 -p 46657:46657 -e TMSEEDS="goldenalchemist.chaintest.net:46657" -e TMNAME="testnode" -e TMREPO="github.com/tendermint/tendermint" -e TMHEAD="origin/develop" tmbase diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index 6ccbc4d1..b9c4538b 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -34,6 +34,8 @@ Commands: switch args[0] { case "node": node.RunNode() + case "replay": + node.RunReplay() case "init": init_files() case "show_validator": diff --git a/config/tendermint/config.go b/config/tendermint/config.go index b2e220fc..e6c74025 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -67,6 +67,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") + mapConfig.SetDefault("cs_msg_log", rootDir+"/cs_msg_log") return mapConfig } diff --git a/consensus/reactor.go b/consensus/reactor.go index 41ff77cc..608ebef0 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -239,8 +239,8 @@ func (conR *ConsensusReactor) registerEventCallbacks() { conR.broadcastNewRoundStep(rs) }) - conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) { - edv := data.(*types.EventDataVote) + conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + edv := data.(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) } diff --git a/consensus/state.go b/consensus/state.go index 721f7157..9d9e65d8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1,9 +1,11 @@ package consensus import ( + "bufio" "bytes" "errors" "fmt" + "os" "reflect" "sync" "time" @@ -153,20 +155,20 @@ var ( // msgs from the reactor which may update the state type msgInfo struct { - msg ConsensusMessage - peerKey string + Msg ConsensusMessage `json:"msg"` + PeerKey string `json:"peer_key"` } // internally generated messages which may update the state type timeoutInfo struct { - duration time.Duration - height int - round int - step RoundStepType + Duration time.Duration `json:"duration"` + Height int `json:"height"` + Round int `json:"round"` + Step RoundStepType `json:"step"` } func (ti *timeoutInfo) String() string { - return fmt.Sprintf("%v ; %d/%d %v", ti.duration, ti.height, ti.round, ti.step) + return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step) } // Tracks consensus state across block heights and rounds. @@ -190,6 +192,8 @@ type ConsensusState struct { evsw *events.EventSwitch + msgLogFP *os.File // file we write all msgs and timeouts to, in order (for deterministic replay) + nSteps int // used for testing to limit the number of transitions the state makes } @@ -268,6 +272,71 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() + if cs.msgLogFP != nil { + cs.msgLogFP.Close() + } +} + +func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + return err +} + +// Playback with tests +func (cs ConsensusState) ReplayMessagesFromFile(file string) error { + if cs.IsRunning() { + return errors.New("cs is already running, cannot replay") + } + + cs.BaseService.OnStart() + cs.startRoutines(0) + + if cs.msgLogFP != nil { + cs.msgLogFP.Close() + cs.msgLogFP = nil + } + + // we ensure all new step events are regenerated as expected + newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) + + fp, err := os.OpenFile(file, os.O_RDONLY, 0666) + if err != nil { + return err + } + defer fp.Close() + scanner := bufio.NewScanner(fp) + for scanner.Scan() { + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, scanner.Bytes(), &err) + if err != nil { + return fmt.Errorf("Error reading json data: %v", err) + } + log.Notice("Replaying message", "type", reflect.TypeOf(msg.Msg), "msg", msg.Msg) + switch m := msg.Msg.(type) { + case *types.EventDataRoundState: + // these are playback checks + mi := <-newStepCh + m2 := mi.(*types.EventDataRoundState) + if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { + return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) + } + case msgInfo: + // internal or from peer + if m.PeerKey == "" { + cs.internalMsgQueue <- m + } else { + cs.peerMsgQueue <- m + } + case timeoutInfo: + cs.tockChan <- m + default: + return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + } + } + return nil } //------------------------------------------------------------ @@ -455,10 +524,12 @@ func (cs *ConsensusState) updateToState(state *sm.State) { } func (cs *ConsensusState) newStep() { + rs := cs.RoundStateEvent() + cs.saveMsg(rs) cs.nSteps += 1 // newStep is called by updateToStep in NewConsensusState before the evsw is set! if cs.evsw != nil { - cs.evsw.FireEvent(types.EventStringNewRoundStep(), cs.RoundStateEvent()) + cs.evsw.FireEvent(types.EventStringNewRoundStep(), rs) } } @@ -477,13 +548,13 @@ func (cs *ConsensusState) timeoutRoutine() { log.Debug("Received tick", "old_ti", ti, "new_ti", newti) // ignore tickers for old height/round/step - if newti.height < ti.height { + if newti.Height < ti.Height { continue - } else if newti.height == ti.height { - if newti.round < ti.round { + } else if newti.Height == ti.Height { + if newti.Round < ti.Round { continue - } else if newti.round == ti.round { - if ti.step > 0 && newti.step <= ti.step { + } else if newti.Round == ti.Round { + if ti.Step > 0 && newti.Step <= ti.Step { continue } } @@ -492,16 +563,16 @@ func (cs *ConsensusState) timeoutRoutine() { ti = newti // if the newti has duration == 0, we relay to the tockChan immediately (no timeout) - if ti.duration == time.Duration(0) { + if ti.Duration == time.Duration(0) { go func(t timeoutInfo) { cs.tockChan <- t }(ti) continue } - log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) cs.timeoutTicker.Stop() - cs.timeoutTicker = time.NewTicker(ti.duration) + cs.timeoutTicker = time.NewTicker(ti.Duration) case <-cs.timeoutTicker.C: - log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) cs.timeoutTicker.Stop() // go routine here gaurantees timeoutRoutine doesn't block. // Determinism comes from playback in the receiveRoutine. @@ -537,13 +608,16 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { select { case mi = <-cs.peerMsgQueue: + cs.saveMsg(mi) // handles proposals, block parts, votes // may generate internal events (votes, complete proposals, 2/3 majorities) cs.handleMsg(mi, rs) case mi = <-cs.internalMsgQueue: + cs.saveMsg(mi) // handles proposals, block parts, votes cs.handleMsg(mi, rs) case ti := <-cs.tockChan: + cs.saveMsg(ti) // if the timeout is relevant to the rs // go to the next step cs.handleTimeout(ti, rs) @@ -559,7 +633,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { defer cs.mtx.Unlock() var err error - msg, peerKey := mi.msg, mi.peerKey + msg, peerKey := mi.Msg, mi.PeerKey switch msg := msg.(type) { case *ProposalMessage: // will not cause transition. @@ -592,10 +666,10 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { } func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { - log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + log.Debug("Received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) // timeouts must be for current height, round, step - if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) { + if ti.Height != rs.Height || ti.Round < rs.Round || (ti.Round == rs.Round && ti.Step < rs.Step) { log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step) return } @@ -604,22 +678,22 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { cs.mtx.Lock() defer cs.mtx.Unlock() - switch ti.step { + switch ti.Step { case RoundStepNewHeight: // NewRound event fired from enterNewRound. - // Do we want a timeout event too? - cs.enterNewRound(ti.height, 0) + // XXX: should we fire timeout here? + cs.enterNewRound(ti.Height, 0) case RoundStepPropose: cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) - cs.enterPrevote(ti.height, ti.round) + cs.enterPrevote(ti.Height, ti.Round) case RoundStepPrevoteWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.enterPrecommit(ti.height, ti.round) + cs.enterPrecommit(ti.Height, ti.Round) case RoundStepPrecommitWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.enterNewRound(ti.height, ti.round+1) + cs.enterNewRound(ti.Height, ti.Round+1) default: - panic(Fmt("Invalid timeout step: %v", ti.step)) + panic(Fmt("Invalid timeout step: %v", ti.Step)) } } @@ -1304,7 +1378,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string added, address, err = cs.LastCommit.AddByIndex(valIndex, vote) if added { log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) - cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) + cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote}) } return @@ -1315,7 +1389,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string height := cs.Height added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey) if added { - cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) + cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote}) switch vote.Type { case types.VoteTypePrevote: @@ -1414,6 +1488,66 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part } } +// Save Block, save the +2/3 Commits we've seen +func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSet, commits *types.VoteSet) { + + // The proposal must be valid. + if err := cs.stageBlock(block, blockParts); err != nil { + PanicSanity(Fmt("saveBlock() an invalid block: %v", err)) + } + + // Save to blockStore. + if cs.blockStore.Height() < block.Height { + seenValidation := commits.MakeValidation() + cs.blockStore.SaveBlock(block, blockParts, seenValidation) + } + + // Commit to proxyAppCtx + err := cs.stagedState.Commit(cs.proxyAppCtx) + if err != nil { + // TODO: handle this gracefully. + PanicQ(Fmt("Commit failed for applicaiton")) + } + + // Save the state. + cs.stagedState.Save() + + // Update mempool. + cs.mempool.Update(block) + + // Fire off event + if cs.evsw != nil && cs.evc != nil { + cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block}) + go cs.evc.Flush() + } + +} + +func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { + if cs.msgLogFP != nil { + var n int + var err error + wire.WriteJSON(ConsensusLogMessage{msg}, cs.msgLogFP, &n, &err) + wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line + if err != nil { + log.Error("Error writing to consensus message log file", "err", err, "msg", msg) + } + } +} + +type ConsensusLogMessage struct { + Msg ConsensusLogMessageInterface `json:"msg"` +} + +type ConsensusLogMessageInterface interface{} + +var _ = wire.RegisterInterface( + struct{ ConsensusLogMessageInterface }{}, + wire.ConcreteType{&types.EventDataRoundState{}, 0x01}, + wire.ConcreteType{msgInfo{}, 0x02}, + wire.ConcreteType{timeoutInfo{}, 0x03}, +) + //--------------------------------------------------------- func CompareHRS(h1, r1 int, s1 RoundStepType, h2, r2 int, s2 RoundStepType) int { diff --git a/node/node.go b/node/node.go index ab06d48c..a5f2fbda 100644 --- a/node/node.go +++ b/node/node.go @@ -84,6 +84,9 @@ func NewNode(privValidator *types.PrivValidator) *Node { consensusReactor.SetPrivValidator(privValidator) } + // deterministic accountability + consensusState.OpenFileForMessageLog(config.GetString("cs_msg_log")) + // Make p2p network switch sw := p2p.NewSwitch() sw.AddReactor("MEMPOOL", mempoolReactor) @@ -308,6 +311,47 @@ func RunNode() { }) } +func RunReplay() { + msgLogFile := config.GetString("cs_msg_log") + if msgLogFile == "" { + Exit("cs_msg_log file name not set in tendermint config") + } + + // Get BlockStore + blockStoreDB := dbm.GetDB("blockstore") + blockStore := bc.NewBlockStore(blockStoreDB) + + // Get State + stateDB := dbm.GetDB("state") + state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + + // Create two proxyAppCtx connections, + // one for the consensus and one for the mempool. + proxyAddr := config.GetString("proxy_app") + proxyAppCtxMempool := getProxyApp(proxyAddr, state.LastAppHash) + proxyAppCtxConsensus := getProxyApp(proxyAddr, state.LastAppHash) + + // add the chainid to the global config + config.Set("chain_id", state.ChainID) + + // Make event switch + eventSwitch := events.NewEventSwitch() + _, err := eventSwitch.Start() + if err != nil { + Exit(Fmt("Failed to start event switch: %v", err)) + } + + mempool := mempl.NewMempool(proxyAppCtxMempool) + + consensusState := consensus.NewConsensusState(state.Copy(), proxyAppCtxConsensus, blockStore, mempool) + consensusState.SetEventSwitch(eventSwitch) + + if err := consensusState.ReplayMessagesFromFile(msgLogFile); err != nil { + Exit(Fmt("Error during consensus replay: %v", err)) + } + log.Notice("Replay run successfully") +} + // Load the most recent state from "state" db, // or create a new one (and save) from genesis. func getState() *sm.State { diff --git a/proxy/remote_app_conn.go b/proxy/remote_app_conn.go index a1dff9a8..f95f1e70 100644 --- a/proxy/remote_app_conn.go +++ b/proxy/remote_app_conn.go @@ -106,7 +106,6 @@ func (app *remoteAppConn) sendRequestsRoutine() { app.StopForError(err) return } - log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) if _, ok := reqres.Request.(tmsp.RequestFlush); ok { err = app.bufWriter.Flush() if err != nil { @@ -133,7 +132,6 @@ func (app *remoteAppConn) recvResponseRoutine() { case tmsp.ResponseException: app.StopForError(errors.New(res.Error)) default: - log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) err := app.didRecvResponse(res) if err != nil { app.StopForError(err) diff --git a/types/events.go b/types/events.go index a316cfc5..72aabb63 100644 --- a/types/events.go +++ b/types/events.go @@ -52,7 +52,7 @@ var _ = wire.RegisterInterface( // wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, wire.ConcreteType{EventDataTx{}, EventDataTypeTx}, wire.ConcreteType{EventDataApp{}, EventDataTypeApp}, - wire.ConcreteType{EventDataRoundState{}, EventDataTypeRoundState}, + wire.ConcreteType{&EventDataRoundState{}, EventDataTypeRoundState}, // a pointer because we use it internally wire.ConcreteType{EventDataVote{}, EventDataTypeVote}, ) From 2b13386d7b9fa8d19eaeaaafabad673ded3f6335 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 23 Dec 2015 01:27:40 -0500 Subject: [PATCH 02/11] consensus: replay console --- cmd/tendermint/main.go | 6 +- consensus/replay.go | 287 +++++++++++++++++++++++++++++++++++++++++ consensus/state.go | 88 ------------- node/node.go | 32 ++++- 4 files changed, 317 insertions(+), 96 deletions(-) create mode 100644 consensus/replay.go diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index b9c4538b..e3847e0f 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -35,7 +35,11 @@ Commands: case "node": node.RunNode() case "replay": - node.RunReplay() + if len(args) > 1 && args[1] == "console" { + node.RunReplayConsole() + } else { + node.RunReplay() + } case "init": init_files() case "show_validator": diff --git a/consensus/replay.go b/consensus/replay.go new file mode 100644 index 00000000..3abe0bf4 --- /dev/null +++ b/consensus/replay.go @@ -0,0 +1,287 @@ +package consensus + +import ( + "bufio" + "errors" + "fmt" + "os" + "reflect" + "strconv" + "strings" + "time" + + . "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/go-common" + "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/go-wire" + + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" +) + +//-------------------------------------------------------- +// types and functions for savings consensus messages + +type ConsensusLogMessage struct { + Msg ConsensusLogMessageInterface `json:"msg"` +} + +type ConsensusLogMessageInterface interface{} + +var _ = wire.RegisterInterface( + struct{ ConsensusLogMessageInterface }{}, + wire.ConcreteType{&types.EventDataRoundState{}, 0x01}, + wire.ConcreteType{msgInfo{}, 0x02}, + wire.ConcreteType{timeoutInfo{}, 0x03}, +) + +// called in newStep and for each pass in receiveRoutine +func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { + if cs.msgLogFP != nil { + var n int + var err error + wire.WriteJSON(ConsensusLogMessage{msg}, cs.msgLogFP, &n, &err) + wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line + if err != nil { + log.Error("Error writing to consensus message log file", "err", err, "msg", msg) + } + } +} + +// Open file to log all consensus messages and timeouts for deterministic accountability +func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + return err +} + +//-------------------------------------------------------- +// replay messages + +// Interactive playback +func (cs ConsensusState) ReplayConsole(file string) error { + return cs.replay(file, true) +} + +// Full playback, with tests +func (cs ConsensusState) ReplayMessages(file string) error { + return cs.replay(file, false) +} + +type playback struct { + cs *ConsensusState + file string + fp *os.File + scanner *bufio.Scanner + newStepCh chan interface{} + genesisState *sm.State + count int +} + +func newPlayback(file string, fp *os.File, cs *ConsensusState, ch chan interface{}, genState *sm.State) *playback { + return &playback{ + cs: cs, + file: file, + newStepCh: ch, + genesisState: genState, + fp: fp, + scanner: bufio.NewScanner(fp), + } +} + +func (pb *playback) replayReset(count int) error { + + pb.cs.Stop() + + newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppCtx, pb.cs.blockStore, pb.cs.mempool) + newCs.SetEventSwitch(pb.cs.evsw) + + // we ensure all new step events are regenerated as expected + pb.newStepCh = newCs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) + + newCs.BaseService.OnStart() + newCs.startRoutines(0) + + pb.fp.Close() + fp, err := os.OpenFile(pb.file, os.O_RDONLY, 0666) + if err != nil { + return err + } + pb.fp = fp + pb.scanner = bufio.NewScanner(fp) + count = pb.count - count + log.Notice(Fmt("Reseting from %d to %d", pb.count, count)) + pb.count = 0 + pb.cs = newCs + for i := 0; pb.scanner.Scan() && i < count; i++ { + if err := pb.readReplayMessage(); err != nil { + return err + } + pb.count += 1 + } + return nil +} + +func (cs *ConsensusState) replay(file string, console bool) error { + if cs.IsRunning() { + return errors.New("cs is already running, cannot replay") + } + + cs.BaseService.OnStart() + cs.startRoutines(0) + + if cs.msgLogFP != nil { + cs.msgLogFP.Close() + cs.msgLogFP = nil + } + + // we ensure all new step events are regenerated as expected + newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) + + fp, err := os.OpenFile(file, os.O_RDONLY, 0666) + if err != nil { + return err + } + + pb := newPlayback(file, fp, cs, newStepCh, cs.state.Copy()) + + defer pb.fp.Close() + + var nextN int // apply N msgs in a row + for pb.scanner.Scan() { + if nextN == 0 && console { + nextN = pb.replayConsoleLoop() + } + + if err := pb.readReplayMessage(); err != nil { + return err + } + + if nextN > 0 { + nextN -= 1 + } + pb.count += 1 + } + return nil +} + +func (pb *playback) replayConsoleLoop() int { + for { + fmt.Printf("> ") + bufReader := bufio.NewReader(os.Stdin) + line, more, err := bufReader.ReadLine() + if more { + Exit("input is too long") + } else if err != nil { + Exit(err.Error()) + } + + tokens := strings.Split(string(line), " ") + if len(tokens) == 0 { + continue + } + + switch tokens[0] { + case "next": + // "next" -> replay next message + // "next N" -> replay next N messages + + if len(tokens) == 1 { + return 0 + } else { + i, err := strconv.Atoi(tokens[1]) + if err != nil { + fmt.Println("next takes an integer argument") + } else { + return i + } + } + + case "back": + // "back" -> go back one message + // "back N" -> go back N messages + + // NOTE: "back" is not supported in the state machine design, + // so we restart to do this (expensive ...) + + if len(tokens) == 1 { + pb.replayReset(1) + } else { + i, err := strconv.Atoi(tokens[1]) + if err != nil { + fmt.Println("back takes an integer argument") + } else if i > pb.count { + fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count) + } else { + pb.replayReset(i) + } + } + + case "rs": + // "rs" -> print entire round state + // "rs short" -> print height/round/step + // "rs " -> print another field of the round state + + rs := pb.cs.RoundState + if len(tokens) == 1 { + fmt.Println(rs) + } else { + switch tokens[1] { + case "short": + fmt.Printf("%v/%v/%v\n", rs.Height, rs.Round, rs.Step) + case "validators": + fmt.Println(rs.Validators) + case "proposal": + fmt.Println(rs.Proposal) + case "proposal_block": + fmt.Printf("%v %v\n", rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort()) + case "locked_round": + fmt.Println(rs.LockedRound) + case "locked_block": + fmt.Printf("%v %v\n", rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort()) + case "votes": + fmt.Println(rs.Votes.StringIndented(" ")) + + default: + fmt.Println("Unknown option", tokens[1]) + } + } + } + } + return 0 +} + +func (pb *playback) readReplayMessage() error { + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, pb.scanner.Bytes(), &err) + if err != nil { + return fmt.Errorf("Error reading json data: %v", err) + } + log.Notice("Replaying message", "type", reflect.TypeOf(msg.Msg), "msg", msg.Msg) + switch m := msg.Msg.(type) { + case *types.EventDataRoundState: + // these are playback checks + ticker := time.After(time.Second * 2) + select { + case mi := <-pb.newStepCh: + m2 := mi.(*types.EventDataRoundState) + if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { + return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) + } + case <-ticker: + return fmt.Errorf("Failed to read off newStepCh") + } + case msgInfo: + // internal or from peer + if m.PeerKey == "" { + pb.cs.internalMsgQueue <- m + } else { + pb.cs.peerMsgQueue <- m + } + case timeoutInfo: + pb.cs.tockChan <- m + default: + return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + } + return nil +} diff --git a/consensus/state.go b/consensus/state.go index 9d9e65d8..5217a231 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1,7 +1,6 @@ package consensus import ( - "bufio" "bytes" "errors" "fmt" @@ -277,68 +276,6 @@ func (cs *ConsensusState) OnStop() { } } -func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) - return err -} - -// Playback with tests -func (cs ConsensusState) ReplayMessagesFromFile(file string) error { - if cs.IsRunning() { - return errors.New("cs is already running, cannot replay") - } - - cs.BaseService.OnStart() - cs.startRoutines(0) - - if cs.msgLogFP != nil { - cs.msgLogFP.Close() - cs.msgLogFP = nil - } - - // we ensure all new step events are regenerated as expected - newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) - - fp, err := os.OpenFile(file, os.O_RDONLY, 0666) - if err != nil { - return err - } - defer fp.Close() - scanner := bufio.NewScanner(fp) - for scanner.Scan() { - var err error - var msg ConsensusLogMessage - wire.ReadJSON(&msg, scanner.Bytes(), &err) - if err != nil { - return fmt.Errorf("Error reading json data: %v", err) - } - log.Notice("Replaying message", "type", reflect.TypeOf(msg.Msg), "msg", msg.Msg) - switch m := msg.Msg.(type) { - case *types.EventDataRoundState: - // these are playback checks - mi := <-newStepCh - m2 := mi.(*types.EventDataRoundState) - if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { - return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) - } - case msgInfo: - // internal or from peer - if m.PeerKey == "" { - cs.internalMsgQueue <- m - } else { - cs.peerMsgQueue <- m - } - case timeoutInfo: - cs.tockChan <- m - default: - return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) - } - } - return nil -} - //------------------------------------------------------------ // Public interface for passing messages into the consensus state, // possibly causing a state transition @@ -1523,31 +1460,6 @@ func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSe } -func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { - if cs.msgLogFP != nil { - var n int - var err error - wire.WriteJSON(ConsensusLogMessage{msg}, cs.msgLogFP, &n, &err) - wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line - if err != nil { - log.Error("Error writing to consensus message log file", "err", err, "msg", msg) - } - } -} - -type ConsensusLogMessage struct { - Msg ConsensusLogMessageInterface `json:"msg"` -} - -type ConsensusLogMessageInterface interface{} - -var _ = wire.RegisterInterface( - struct{ ConsensusLogMessageInterface }{}, - wire.ConcreteType{&types.EventDataRoundState{}, 0x01}, - wire.ConcreteType{msgInfo{}, 0x02}, - wire.ConcreteType{timeoutInfo{}, 0x03}, -) - //--------------------------------------------------------- func CompareHRS(h1, r1 int, s1 RoundStepType, h2, r2 int, s2 RoundStepType) int { diff --git a/node/node.go b/node/node.go index a5f2fbda..1208a4b5 100644 --- a/node/node.go +++ b/node/node.go @@ -311,12 +311,7 @@ func RunNode() { }) } -func RunReplay() { - msgLogFile := config.GetString("cs_msg_log") - if msgLogFile == "" { - Exit("cs_msg_log file name not set in tendermint config") - } - +func newConsensusState() *consensus.ConsensusState { // Get BlockStore blockStoreDB := dbm.GetDB("blockstore") blockStore := bc.NewBlockStore(blockStoreDB) @@ -345,8 +340,31 @@ func RunReplay() { consensusState := consensus.NewConsensusState(state.Copy(), proxyAppCtxConsensus, blockStore, mempool) consensusState.SetEventSwitch(eventSwitch) + return consensusState +} - if err := consensusState.ReplayMessagesFromFile(msgLogFile); err != nil { +func RunReplayConsole() { + msgLogFile := config.GetString("cs_msg_log") + if msgLogFile == "" { + Exit("cs_msg_log file name not set in tendermint config") + } + + consensusState := newConsensusState() + + if err := consensusState.ReplayConsole(msgLogFile); err != nil { + Exit(Fmt("Error during consensus replay: %v", err)) + } +} + +func RunReplay() { + msgLogFile := config.GetString("cs_msg_log") + if msgLogFile == "" { + Exit("cs_msg_log file name not set in tendermint config") + } + + consensusState := newConsensusState() + + if err := consensusState.ReplayMessages(msgLogFile); err != nil { Exit(Fmt("Error during consensus replay: %v", err)) } log.Notice("Replay run successfully") From c0c2a4b968e6873f08898cf07a0a2efec797506c Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 22 Dec 2015 23:24:15 -0500 Subject: [PATCH 03/11] use validation round in reconstructLastCommit --- consensus/replay.go | 4 ++-- consensus/state.go | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index 3abe0bf4..0e11afb8 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -10,8 +10,8 @@ import ( "strings" "time" - . "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/go-common" - "github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/go-wire" + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" diff --git a/consensus/state.go b/consensus/state.go index 5217a231..50b048ff 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -378,8 +378,8 @@ func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { if state.LastBlockHeight == 0 { return } - lastPrecommits := types.NewVoteSet(state.LastBlockHeight, 0, types.VoteTypePrecommit, state.LastValidators) seenValidation := cs.blockStore.LoadSeenValidation(state.LastBlockHeight) + lastPrecommits := types.NewVoteSet(state.LastBlockHeight, seenValidation.Round(), types.VoteTypePrecommit, state.LastValidators) for idx, precommit := range seenValidation.Precommits { if precommit == nil { continue @@ -1258,7 +1258,8 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad var n int var err error cs.ProposalBlock = wire.ReadBinary(&types.Block{}, cs.ProposalBlockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block) - log.Info("Received complete proposal", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) + // NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal + log.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) if cs.Step == RoundStepPropose && cs.isProposalComplete() { // Move onto the next step cs.enterPrevote(height, cs.Round) From 298928dc443cab2c62381806d0b27892fdc0e5a1 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 23 Dec 2015 21:43:28 -0500 Subject: [PATCH 04/11] fix replay bug from timeoutRoutine --- consensus/replay.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index 0e11afb8..5bb7d27b 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -88,6 +88,7 @@ func newPlayback(file string, fp *os.File, cs *ConsensusState, ch chan interface } } +// reset the state and run (pb.count - count) steps func (pb *playback) replayReset(count int) error { pb.cs.Stop() @@ -98,8 +99,7 @@ func (pb *playback) replayReset(count int) error { // we ensure all new step events are regenerated as expected pb.newStepCh = newCs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) - newCs.BaseService.OnStart() - newCs.startRoutines(0) + newCs.startForReplay() pb.fp.Close() fp, err := os.OpenFile(pb.file, os.O_RDONLY, 0666) @@ -121,13 +121,28 @@ func (pb *playback) replayReset(count int) error { return nil } +func (cs *ConsensusState) startForReplay() { + cs.BaseService.OnStart() + go cs.receiveRoutine(0) + // since we replay tocks we just ignore ticks + go func() { + for { + select { + case <-cs.tickChan: + case <-cs.Quit: + return + } + } + }() +} + +// replay all msgs or start the console func (cs *ConsensusState) replay(file string, console bool) error { if cs.IsRunning() { return errors.New("cs is already running, cannot replay") } - cs.BaseService.OnStart() - cs.startRoutines(0) + cs.startForReplay() if cs.msgLogFP != nil { cs.msgLogFP.Close() @@ -245,6 +260,8 @@ func (pb *playback) replayConsoleLoop() int { fmt.Println("Unknown option", tokens[1]) } } + case "n": + fmt.Println(pb.count) } } return 0 From 9fc072650467cd8719493e2cc5731b8bbce4a956 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 23 Dec 2015 21:43:48 -0500 Subject: [PATCH 05/11] timeoutProposeDelta --- config/tendermint_test/config.go | 1 + consensus/common_test.go | 4 ++-- consensus/state.go | 27 +++------------------------ consensus/state_test.go | 7 ++++--- 4 files changed, 10 insertions(+), 29 deletions(-) diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 9820ba24..702fb689 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -76,6 +76,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") + mapConfig.SetDefault("cs_msg_log", rootDir+"/cs_msg_log") return mapConfig } diff --git a/consensus/common_test.go b/consensus/common_test.go index 7effed68..dae0882b 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -140,7 +140,7 @@ func addVoteToFromMany(to *ConsensusState, votes []*types.Vote, froms ...*valida func addVoteToFrom(to *ConsensusState, from *validatorStub, vote *types.Vote) { valIndex, _ := to.Validators.GetByAddress(from.PrivValidator.Address) - to.peerMsgQueue <- msgInfo{msg: &VoteMessage{valIndex, vote}} + to.peerMsgQueue <- msgInfo{Msg: &VoteMessage{valIndex, vote}} // added, err := to.TryAddVote(valIndex, vote, "") /* if _, ok := err.(*types.ErrVoteConflictingSignature); ok { @@ -344,7 +344,7 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { go func() { for { v := <-voteCh0 - vote := v.(*types.EventDataVote) + vote := v.(types.EventDataVote) // we only fire for our own votes if bytes.Equal(addr, vote.Address) { voteCh <- v diff --git a/consensus/state.go b/consensus/state.go index 50b048ff..232f9e95 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -20,7 +20,8 @@ import ( ) var ( - timeoutPropose = 3000 * time.Millisecond // Maximum duration of RoundStepPropose + timeoutPropose0 = 3000 * time.Millisecond // Wait this long for a proposal + timeoutProposeDelta = 0500 * time.Millisecond // timeoutProposeN is timeoutPropose0 + timeoutProposeDelta*N timeoutPrevote0 = 1000 * time.Millisecond // After any +2/3 prevotes received, wait this long for stragglers. timeoutPrevoteDelta = 0500 * time.Millisecond // timeoutPrevoteN is timeoutPrevote0 + timeoutPrevoteDelta*N timeoutPrecommit0 = 1000 * time.Millisecond // After any +2/3 precommits received, wait this long for stragglers. @@ -687,9 +688,6 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { // Enter: from NewRound(height,round). func (cs *ConsensusState) enterPropose(height int, round int) { - // cs.mtx.Lock() - // cs.mtx.Unlock() - if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) { log.Debug(Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return @@ -710,7 +708,7 @@ func (cs *ConsensusState) enterPropose(height int, round int) { }() // This step times out after `timeoutPropose` - cs.scheduleTimeout(timeoutPropose, height, round, RoundStepPropose) + cs.scheduleTimeout(timeoutPropose0+timeoutProposeDelta*time.Duration(round), height, round, RoundStepPropose) // Nothing more to do if we're not a validator if cs.privValidator == nil { @@ -837,8 +835,6 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Prevote for LockedBlock if we're locked, or ProposalBlock if valid. // Otherwise vote nil. func (cs *ConsensusState) enterPrevote(height int, round int) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevote <= cs.Step) { log.Debug(Fmt("enterPrevote(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return @@ -902,8 +898,6 @@ func (cs *ConsensusState) doPrevote(height int, round int) { // Enter: any +2/3 prevotes at next round. func (cs *ConsensusState) enterPrevoteWait(height int, round int) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevoteWait <= cs.Step) { log.Debug(Fmt("enterPrevoteWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return @@ -930,8 +924,6 @@ func (cs *ConsensusState) enterPrevoteWait(height int, round int) { // else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil, // else, precommit nil otherwise. func (cs *ConsensusState) enterPrecommit(height int, round int) { - //cs.mtx.Lock() - // defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommit <= cs.Step) { log.Debug(Fmt("enterPrecommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return @@ -1027,8 +1019,6 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { // Enter: any +2/3 precommits for next round. func (cs *ConsensusState) enterPrecommitWait(height int, round int) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommitWait <= cs.Step) { log.Debug(Fmt("enterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return @@ -1051,8 +1041,6 @@ func (cs *ConsensusState) enterPrecommitWait(height int, round int) { // Enter: +2/3 precommits for block func (cs *ConsensusState) enterCommit(height int, commitRound int) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() if cs.Height != height || RoundStepCommit <= cs.Step { log.Debug(Fmt("enterCommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) return @@ -1117,9 +1105,6 @@ func (cs *ConsensusState) tryFinalizeCommit(height int) { // Increment height and goto RoundStepNewHeight func (cs *ConsensusState) finalizeCommit(height int) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() - if cs.Height != height || cs.Step != RoundStepCommit { log.Debug(Fmt("finalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step)) return @@ -1199,9 +1184,6 @@ func (cs *ConsensusState) finalizeCommit(height int) { //----------------------------------------------------------------------------- func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() - // Already have one if cs.Proposal != nil { return nil @@ -1236,9 +1218,6 @@ func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { // NOTE: block is not necessarily valid. // This can trigger us to go into enterPrevote asynchronously (before we timeout of propose) or to attempt to commit func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() - // Blocks might be reused, so round mismatch is OK if cs.Height != height { return false, nil diff --git a/consensus/state_test.go b/consensus/state_test.go index f4d8f548..ab6795a6 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -46,7 +46,8 @@ x * TestHalt1 - if we see +2/3 precommits after timing out into new round, we sh func init() { fmt.Println("") - timeoutPropose = 500 * time.Millisecond + timeoutPropose0 = 100 * time.Millisecond + timeoutProposeDelta = 1 * time.Millisecond } func TestProposerSelection0(t *testing.T) { @@ -123,7 +124,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { startTestRound(cs, height, round) // if we're not a validator, EnterPropose should timeout - ticker := time.NewTicker(timeoutPropose * 2) + ticker := time.NewTicker(timeoutPropose0 * 2) select { case <-timeoutCh: case <-ticker.C: @@ -164,7 +165,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { } // if we're a validator, enterPropose should not timeout - ticker := time.NewTicker(timeoutPropose * 2) + ticker := time.NewTicker(timeoutPropose0 * 2) select { case <-timeoutCh: t.Fatal("Expected EnterPropose not to timeout") From 56076d4d0e72112a72d6f0a6531db2b39abf3dc3 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 6 Jan 2016 18:42:12 -0500 Subject: [PATCH 06/11] some cleanup; log time with consensus msgs --- consensus/replay.go | 111 +++++++++++++++++++++----------------------- consensus/state.go | 8 ++++ 2 files changed, 62 insertions(+), 57 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index 5bb7d27b..5e7e4d08 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -21,7 +21,8 @@ import ( // types and functions for savings consensus messages type ConsensusLogMessage struct { - Msg ConsensusLogMessageInterface `json:"msg"` + Time time.Time `json:"time"` + Msg ConsensusLogMessageInterface `json:"msg"` } type ConsensusLogMessageInterface interface{} @@ -38,7 +39,7 @@ func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { if cs.msgLogFP != nil { var n int var err error - wire.WriteJSON(ConsensusLogMessage{msg}, cs.msgLogFP, &n, &err) + wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, cs.msgLogFP, &n, &err) wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line if err != nil { log.Error("Error writing to consensus message log file", "err", err, "msg", msg) @@ -46,16 +47,8 @@ func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { } } -// Open file to log all consensus messages and timeouts for deterministic accountability -func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { - cs.mtx.Lock() - defer cs.mtx.Unlock() - cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) - return err -} - //-------------------------------------------------------- -// replay messages +// replay the messages // Interactive playback func (cs ConsensusState) ReplayConsole(file string) error { @@ -67,6 +60,52 @@ func (cs ConsensusState) ReplayMessages(file string) error { return cs.replay(file, false) } +// replay all msgs or start the console +func (cs *ConsensusState) replay(file string, console bool) error { + if cs.IsRunning() { + return errors.New("cs is already running, cannot replay") + } + + cs.startForReplay() + + // set the FP to nil so we don't overwrite + if cs.msgLogFP != nil { + cs.msgLogFP.Close() + cs.msgLogFP = nil + } + + // ensure all new step events are regenerated as expected + newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) + + fp, err := os.OpenFile(file, os.O_RDONLY, 0666) + if err != nil { + return err + } + + pb := newPlayback(file, fp, cs, newStepCh, cs.state.Copy()) + defer pb.fp.Close() + + var nextN int // apply N msgs in a row + for pb.scanner.Scan() { + if nextN == 0 && console { + nextN = pb.replayConsoleLoop() + } + + if err := pb.readReplayMessage(); err != nil { + return err + } + + if nextN > 0 { + nextN -= 1 + } + pb.count += 1 + } + return nil +} + +//------------------------------------------------ +// playback manager + type playback struct { cs *ConsensusState file string @@ -88,7 +127,7 @@ func newPlayback(file string, fp *os.File, cs *ConsensusState, ch chan interface } } -// reset the state and run (pb.count - count) steps +// go back count steps by resetting the state and running (pb.count - count) steps func (pb *playback) replayReset(count int) error { pb.cs.Stop() @@ -96,7 +135,7 @@ func (pb *playback) replayReset(count int) error { newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppCtx, pb.cs.blockStore, pb.cs.mempool) newCs.SetEventSwitch(pb.cs.evsw) - // we ensure all new step events are regenerated as expected + // ensure all new step events are regenerated as expected pb.newStepCh = newCs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) newCs.startForReplay() @@ -136,49 +175,7 @@ func (cs *ConsensusState) startForReplay() { }() } -// replay all msgs or start the console -func (cs *ConsensusState) replay(file string, console bool) error { - if cs.IsRunning() { - return errors.New("cs is already running, cannot replay") - } - - cs.startForReplay() - - if cs.msgLogFP != nil { - cs.msgLogFP.Close() - cs.msgLogFP = nil - } - - // we ensure all new step events are regenerated as expected - newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) - - fp, err := os.OpenFile(file, os.O_RDONLY, 0666) - if err != nil { - return err - } - - pb := newPlayback(file, fp, cs, newStepCh, cs.state.Copy()) - - defer pb.fp.Close() - - var nextN int // apply N msgs in a row - for pb.scanner.Scan() { - if nextN == 0 && console { - nextN = pb.replayConsoleLoop() - } - - if err := pb.readReplayMessage(); err != nil { - return err - } - - if nextN > 0 { - nextN -= 1 - } - pb.count += 1 - } - return nil -} - +// console function for parsing input and running commands func (pb *playback) replayConsoleLoop() int { for { fmt.Printf("> ") @@ -216,7 +213,7 @@ func (pb *playback) replayConsoleLoop() int { // "back N" -> go back N messages // NOTE: "back" is not supported in the state machine design, - // so we restart to do this (expensive ...) + // so we restart and replay up to if len(tokens) == 1 { pb.replayReset(1) diff --git a/consensus/state.go b/consensus/state.go index 232f9e95..d0d47a82 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -277,6 +277,14 @@ func (cs *ConsensusState) OnStop() { } } +// Open file to log all consensus messages and timeouts for deterministic accountability +func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + return err +} + //------------------------------------------------------------ // Public interface for passing messages into the consensus state, // possibly causing a state transition From bc2a721a0a23d8085d1ee8f63fd59b56fd335175 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 10 Jan 2016 16:56:52 -0500 Subject: [PATCH 07/11] clean up replay output --- consensus/replay.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/consensus/replay.go b/consensus/replay.go index 5e7e4d08..ce34dd33 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -271,9 +271,11 @@ func (pb *playback) readReplayMessage() error { if err != nil { return fmt.Errorf("Error reading json data: %v", err) } - log.Notice("Replaying message", "type", reflect.TypeOf(msg.Msg), "msg", msg.Msg) + + // for logging switch m := msg.Msg.(type) { case *types.EventDataRoundState: + log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) // these are playback checks ticker := time.After(time.Second * 2) select { @@ -286,6 +288,22 @@ func (pb *playback) readReplayMessage() error { return fmt.Errorf("Failed to read off newStepCh") } case msgInfo: + peerKey := m.PeerKey + if peerKey == "" { + peerKey = "local" + } + switch msg := m.Msg.(type) { + case *ProposalMessage: + p := msg.Proposal + log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", + p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) + case *BlockPartMessage: + log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) + case *VoteMessage: + v := msg.Vote + log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, + "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) + } // internal or from peer if m.PeerKey == "" { pb.cs.internalMsgQueue <- m @@ -293,6 +311,7 @@ func (pb *playback) readReplayMessage() error { pb.cs.peerMsgQueue <- m } case timeoutInfo: + log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) pb.cs.tockChan <- m default: return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) From 6bcf53195fe50b9044caf3e1658faee956e43c4c Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 10 Jan 2016 23:31:05 -0500 Subject: [PATCH 08/11] consensus: use replay log to avoid sign regression --- consensus/replay.go | 138 ++++++++++++++++++++++++++++++++++++++++---- consensus/state.go | 33 ++++++++--- node/node.go | 5 +- 3 files changed, 156 insertions(+), 20 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index ce34dd33..4590acaa 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -2,8 +2,10 @@ package consensus import ( "bufio" + "bytes" "errors" "fmt" + "io" "os" "reflect" "strconv" @@ -14,6 +16,7 @@ import ( "github.com/tendermint/go-wire" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/tailseek" "github.com/tendermint/tendermint/types" ) @@ -60,6 +63,74 @@ func (cs ConsensusState) ReplayMessages(file string) error { return cs.replay(file, false) } +func (cs *ConsensusState) catchupReplay(height int) error { + if !cs.msgLogExists { + return nil + } + + if cs.msgLogFP == nil { + log.Warn("consensus msg log is nil") + return nil + } + + log.Notice("Catchup by replaying consensus messages") + f := cs.msgLogFP + + n, err := seek.SeekFromEndOfFile(f, func(lineBytes []byte) bool { + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, lineBytes, &err) + if err != nil { + panic(Fmt("Failed to read cs_msg_log json: %v", err)) + } + m, ok := msg.Msg.(*types.EventDataRoundState) + if ok && m.Step == RoundStepNewHeight.String() { + r, err := f.Seek(0, 1) + // TODO: ensure the height matches + return true + } + return false + }) + + if err != nil { + return err + } + + // we found it, now we can replay everything + pb := newPlayback("", cs.msgLogFP, cs, nil, cs.state.Copy()) + + reader := bufio.NewReader(cs.msgLogFP) + i := 0 + for { + i += 1 + msgBytes, err := reader.ReadBytes('\n') + if err == io.EOF { + break + } else if err != nil { + return err + } else if len(msgBytes) == 0 { + continue + } + // the first msg is the NewHeight event, so we can ignore it + if i == 1 { + continue + } + + // NOTE: since the priv key is set when the msgs are received + // it will attempt to eg double sign but we can just ignore it + // since the votes will be replayed and we'll get to the next step + if err := pb.readReplayMessage(msgBytes); err != nil { + return err + + } + if i >= n { + break + } + } + return nil + +} + // replay all msgs or start the console func (cs *ConsensusState) replay(file string, console bool) error { if cs.IsRunning() { @@ -91,7 +162,7 @@ func (cs *ConsensusState) replay(file string, console bool) error { nextN = pb.replayConsoleLoop() } - if err := pb.readReplayMessage(); err != nil { + if err := pb.readReplayMessage(pb.scanner.Bytes()); err != nil { return err } @@ -152,7 +223,7 @@ func (pb *playback) replayReset(count int) error { pb.count = 0 pb.cs = newCs for i := 0; pb.scanner.Scan() && i < count; i++ { - if err := pb.readReplayMessage(); err != nil { + if err := pb.readReplayMessage(pb.scanner.Bytes()); err != nil { return err } pb.count += 1 @@ -264,10 +335,10 @@ func (pb *playback) replayConsoleLoop() int { return 0 } -func (pb *playback) readReplayMessage() error { +func (pb *playback) readReplayMessage(msgBytes []byte) error { var err error var msg ConsensusLogMessage - wire.ReadJSON(&msg, pb.scanner.Bytes(), &err) + wire.ReadJSON(&msg, msgBytes, &err) if err != nil { return fmt.Errorf("Error reading json data: %v", err) } @@ -278,14 +349,16 @@ func (pb *playback) readReplayMessage() error { log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) // these are playback checks ticker := time.After(time.Second * 2) - select { - case mi := <-pb.newStepCh: - m2 := mi.(*types.EventDataRoundState) - if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { - return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) + if pb.newStepCh != nil { + select { + case mi := <-pb.newStepCh: + m2 := mi.(*types.EventDataRoundState) + if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { + return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) + } + case <-ticker: + return fmt.Errorf("Failed to read off newStepCh") } - case <-ticker: - return fmt.Errorf("Failed to read off newStepCh") } case msgInfo: peerKey := m.PeerKey @@ -318,3 +391,46 @@ func (pb *playback) readReplayMessage() error { } return nil } + +// Read lines starting from the end of the file until we read a line that causes found to return true +func SeekFromEndOfFile(f *os.File, found func([]byte) bool) (nLines int, err error) { + var current int64 + // start at the end + current, err = f.Seek(0, 2) + if err != nil { + fmt.Println("1") + return + } + + // backup until we find the the right line + for { + current -= 1 + if current < 0 { + return + } + // backup one and read a new byte + if _, err = f.Seek(current, 0); err != nil { + fmt.Println("2", current) + return + } + b := make([]byte, 1) + if _, err = f.Read(b); err != nil { + return + } + if b[0] == '\n' || len(b) == 0 { + nLines += 1 + + // read a full line + reader := bufio.NewReader(f) + lineBytes, _ := reader.ReadBytes('\n') + if len(lineBytes) == 0 { + continue + } + + if found(lineBytes) { + f.Seek(current, 0) + return + } + } + } +} diff --git a/consensus/state.go b/consensus/state.go index d0d47a82..354038e7 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -192,7 +192,9 @@ type ConsensusState struct { evsw *events.EventSwitch - msgLogFP *os.File // file we write all msgs and timeouts to, in order (for deterministic replay) + msgLogFP *os.File // file we write all msgs and timeouts to, in order (for deterministic replay) + msgLogExists bool // if the file already existed (restarted process) + done chan struct{} // used to wait until the receiveRoutine quits so we can close the file nSteps int // used for testing to limit the number of transitions the state makes } @@ -207,6 +209,7 @@ func NewConsensusState(state *sm.State, proxyAppConn proxy.AppConn, blockStore * timeoutTicker: new(time.Ticker), tickChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize), + done: make(chan struct{}), } cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -252,14 +255,21 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { } func (cs *ConsensusState) OnStart() error { - cs.BaseService.OnStart() + cs.QuitService.OnStart() - // first we schedule the round (no go routines) - // then we start the timeout and receive routines. - // tickChan is buffered so scheduleRound0 will finish. - // Then all further access to the RoundState is through the receiveRoutine - cs.scheduleRound0(cs.Height) + // start timeout and receive routines cs.startRoutines(0) + + // we may have lost some votes if the process crashed + // reload from consensus log to catchup + if err := cs.catchupReplay(cs.Height); err != nil { + log.Error("Error on catchup replay", "error", err.Error()) + // let's go for it anyways, maybe we're fine + } + + // schedule the first round! + cs.scheduleRound0(cs.Height) + return nil } @@ -272,6 +282,9 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() + + // wait to quit the receiveRoutine + <-cs.done if cs.msgLogFP != nil { cs.msgLogFP.Close() } @@ -281,7 +294,10 @@ func (cs *ConsensusState) OnStop() { func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { cs.mtx.Lock() defer cs.mtx.Unlock() - cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + if _, err := os.Stat(file); err == nil { + cs.msgLogExists = true + } + cs.msgLogFP, err = os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666) return err } @@ -568,6 +584,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // go to the next step cs.handleTimeout(ti, rs) case <-cs.Quit: + close(cs.done) return } } diff --git a/node/node.go b/node/node.go index 1208a4b5..c090e871 100644 --- a/node/node.go +++ b/node/node.go @@ -85,7 +85,10 @@ func NewNode(privValidator *types.PrivValidator) *Node { } // deterministic accountability - consensusState.OpenFileForMessageLog(config.GetString("cs_msg_log")) + err = consensusState.OpenFileForMessageLog(config.GetString("cs_msg_log")) + if err != nil { + log.Error("Failed to open cs_msg_log", "error", err.Error()) + } // Make p2p network switch sw := p2p.NewSwitch() From 3b063685691fe86a8f0183b4338918e69b81119b Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 14 Jan 2016 19:04:01 -0500 Subject: [PATCH 09/11] rebase fixes --- config/tendermint_test/config.go | 5 ++--- consensus/reactor.go | 2 +- consensus/replay.go | 5 ++--- consensus/state.go | 35 -------------------------------- node/node.go | 10 ++++----- 5 files changed, 10 insertions(+), 47 deletions(-) diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 702fb689..fa1fe3de 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -41,9 +41,8 @@ func initTMRoot(rootDir string) { if !FileExists(genesisFilePath) { MustWriteFile(genesisFilePath, []byte(defaultGenesis), 0644) } - if !FileExists(privFilePath) { - MustWriteFile(privFilePath, []byte(defaultPrivValidator), 0644) - } + // we always overwrite the priv val + MustWriteFile(privFilePath, []byte(defaultPrivValidator), 0644) } func GetConfig(rootDir string) cfg.Config { diff --git a/consensus/reactor.go b/consensus/reactor.go index 608ebef0..81991972 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -239,7 +239,7 @@ func (conR *ConsensusReactor) registerEventCallbacks() { conR.broadcastNewRoundStep(rs) }) - conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) { edv := data.(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) diff --git a/consensus/replay.go b/consensus/replay.go index 4590acaa..611f5e7a 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -2,7 +2,6 @@ package consensus import ( "bufio" - "bytes" "errors" "fmt" "io" @@ -85,7 +84,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { } m, ok := msg.Msg.(*types.EventDataRoundState) if ok && m.Step == RoundStepNewHeight.String() { - r, err := f.Seek(0, 1) + f.Seek(0, 1) // TODO: ensure the height matches return true } @@ -203,7 +202,7 @@ func (pb *playback) replayReset(count int) error { pb.cs.Stop() - newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppCtx, pb.cs.blockStore, pb.cs.mempool) + newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool) newCs.SetEventSwitch(pb.cs.evsw) // ensure all new step events are regenerated as expected diff --git a/consensus/state.go b/consensus/state.go index 354038e7..8b61aa88 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1430,41 +1430,6 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part } } -// Save Block, save the +2/3 Commits we've seen -func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSet, commits *types.VoteSet) { - - // The proposal must be valid. - if err := cs.stageBlock(block, blockParts); err != nil { - PanicSanity(Fmt("saveBlock() an invalid block: %v", err)) - } - - // Save to blockStore. - if cs.blockStore.Height() < block.Height { - seenValidation := commits.MakeValidation() - cs.blockStore.SaveBlock(block, blockParts, seenValidation) - } - - // Commit to proxyAppCtx - err := cs.stagedState.Commit(cs.proxyAppCtx) - if err != nil { - // TODO: handle this gracefully. - PanicQ(Fmt("Commit failed for applicaiton")) - } - - // Save the state. - cs.stagedState.Save() - - // Update mempool. - cs.mempool.Update(block) - - // Fire off event - if cs.evsw != nil && cs.evc != nil { - cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block}) - go cs.evc.Flush() - } - -} - //--------------------------------------------------------- func CompareHRS(h1, r1 int, s1 RoundStepType, h2, r2 int, s2 RoundStepType) int { diff --git a/node/node.go b/node/node.go index c090e871..b9250bdd 100644 --- a/node/node.go +++ b/node/node.go @@ -323,11 +323,11 @@ func newConsensusState() *consensus.ConsensusState { stateDB := dbm.GetDB("state") state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) - // Create two proxyAppCtx connections, + // Create two proxyAppConn connections, // one for the consensus and one for the mempool. proxyAddr := config.GetString("proxy_app") - proxyAppCtxMempool := getProxyApp(proxyAddr, state.LastAppHash) - proxyAppCtxConsensus := getProxyApp(proxyAddr, state.LastAppHash) + proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash) + proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash) // add the chainid to the global config config.Set("chain_id", state.ChainID) @@ -339,9 +339,9 @@ func newConsensusState() *consensus.ConsensusState { Exit(Fmt("Failed to start event switch: %v", err)) } - mempool := mempl.NewMempool(proxyAppCtxMempool) + mempool := mempl.NewMempool(proxyAppConnMempool) - consensusState := consensus.NewConsensusState(state.Copy(), proxyAppCtxConsensus, blockStore, mempool) + consensusState := consensus.NewConsensusState(state.Copy(), proxyAppConnConsensus, blockStore, mempool) consensusState.SetEventSwitch(eventSwitch) return consensusState } From 26f0e2bc2d76d331f5bb534d691c149f30035947 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 18 Jan 2016 14:10:05 -0500 Subject: [PATCH 10/11] msgLogFP -> write ahead log --- config/tendermint/config.go | 2 +- config/tendermint_test/config.go | 2 +- consensus/replay.go | 292 +++++++++++-------------------- consensus/state.go | 36 ++-- consensus/wal.go | 113 ++++++++++++ consensus/wal_test.go | 76 ++++++++ node/node.go | 110 ++++++------ 7 files changed, 368 insertions(+), 263 deletions(-) create mode 100644 consensus/wal.go create mode 100644 consensus/wal_test.go diff --git a/config/tendermint/config.go b/config/tendermint/config.go index e6c74025..bdeb6add 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -67,7 +67,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") - mapConfig.SetDefault("cs_msg_log", rootDir+"/cs_msg_log") + mapConfig.SetDefault("cswal", rootDir+"/cswal") return mapConfig } diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index fa1fe3de..04d300d5 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -75,7 +75,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") - mapConfig.SetDefault("cs_msg_log", rootDir+"/cs_msg_log") + mapConfig.SetDefault("cswal", rootDir+"/cswal") return mapConfig } diff --git a/consensus/replay.go b/consensus/replay.go index 611f5e7a..5d193efe 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -15,67 +15,83 @@ import ( "github.com/tendermint/go-wire" sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/tailseek" "github.com/tendermint/tendermint/types" ) -//-------------------------------------------------------- -// types and functions for savings consensus messages - -type ConsensusLogMessage struct { - Time time.Time `json:"time"` - Msg ConsensusLogMessageInterface `json:"msg"` -} - -type ConsensusLogMessageInterface interface{} - -var _ = wire.RegisterInterface( - struct{ ConsensusLogMessageInterface }{}, - wire.ConcreteType{&types.EventDataRoundState{}, 0x01}, - wire.ConcreteType{msgInfo{}, 0x02}, - wire.ConcreteType{timeoutInfo{}, 0x03}, -) - -// called in newStep and for each pass in receiveRoutine -func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { - if cs.msgLogFP != nil { - var n int - var err error - wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, cs.msgLogFP, &n, &err) - wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line - if err != nil { - log.Error("Error writing to consensus message log file", "err", err, "msg", msg) - } +// unmarshal and apply a single message to the consensus state +func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error { + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, msgBytes, &err) + if err != nil { + return fmt.Errorf("Error reading json data: %v", err) } + + // for logging + switch m := msg.Msg.(type) { + case *types.EventDataRoundState: + log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) + // these are playback checks + ticker := time.After(time.Second * 2) + if newStepCh != nil { + select { + case mi := <-newStepCh: + m2 := mi.(*types.EventDataRoundState) + if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { + return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) + } + case <-ticker: + return fmt.Errorf("Failed to read off newStepCh") + } + } + case msgInfo: + peerKey := m.PeerKey + if peerKey == "" { + peerKey = "local" + } + switch msg := m.Msg.(type) { + case *ProposalMessage: + p := msg.Proposal + log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", + p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) + case *BlockPartMessage: + log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) + case *VoteMessage: + v := msg.Vote + log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, + "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) + } + // internal or from peer + if m.PeerKey == "" { + cs.internalMsgQueue <- m + } else { + cs.peerMsgQueue <- m + } + case timeoutInfo: + log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) + cs.tockChan <- m + default: + return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + } + return nil } -//-------------------------------------------------------- -// replay the messages - -// Interactive playback -func (cs ConsensusState) ReplayConsole(file string) error { - return cs.replay(file, true) -} - -// Full playback, with tests -func (cs ConsensusState) ReplayMessages(file string) error { - return cs.replay(file, false) -} - +// replay only those messages since the last block func (cs *ConsensusState) catchupReplay(height int) error { - if !cs.msgLogExists { + if cs.wal == nil { + log.Warn("consensus msg log is nil") return nil } - - if cs.msgLogFP == nil { - log.Warn("consensus msg log is nil") + if !cs.wal.exists { + // new wal, nothing to catchup on return nil } log.Notice("Catchup by replaying consensus messages") - f := cs.msgLogFP - n, err := seek.SeekFromEndOfFile(f, func(lineBytes []byte) bool { + // starting from end of file, + // read messages until a new height is found + nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { var err error var msg ConsensusLogMessage wire.ReadJSON(&msg, lineBytes, &err) @@ -84,7 +100,6 @@ func (cs *ConsensusState) catchupReplay(height int) error { } m, ok := msg.Msg.(*types.EventDataRoundState) if ok && m.Step == RoundStepNewHeight.String() { - f.Seek(0, 1) // TODO: ensure the height matches return true } @@ -95,13 +110,10 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } - // we found it, now we can replay everything - pb := newPlayback("", cs.msgLogFP, cs, nil, cs.state.Copy()) - - reader := bufio.NewReader(cs.msgLogFP) - i := 0 - for { - i += 1 + // now we can replay the latest nLines on consensus state + // note we can't use scan because we've already been reading from the file + reader := bufio.NewReader(cs.wal.fp) + for i := 0; i < nLines; i++ { msgBytes, err := reader.ReadBytes('\n') if err == io.EOF { break @@ -118,16 +130,24 @@ func (cs *ConsensusState) catchupReplay(height int) error { // NOTE: since the priv key is set when the msgs are received // it will attempt to eg double sign but we can just ignore it // since the votes will be replayed and we'll get to the next step - if err := pb.readReplayMessage(msgBytes); err != nil { + if err := cs.readReplayMessage(msgBytes, nil); err != nil { return err - - } - if i >= n { - break } } return nil +} +//-------------------------------------------------------- +// replay messages interactively or all at once + +// Interactive playback +func (cs ConsensusState) ReplayConsole(file string) error { + return cs.replay(file, true) +} + +// Full playback, with tests +func (cs ConsensusState) ReplayMessages(file string) error { + return cs.replay(file, false) } // replay all msgs or start the console @@ -135,24 +155,22 @@ func (cs *ConsensusState) replay(file string, console bool) error { if cs.IsRunning() { return errors.New("cs is already running, cannot replay") } + if cs.wal != nil { + return errors.New("cs wal is open, cannot replay") + } cs.startForReplay() - // set the FP to nil so we don't overwrite - if cs.msgLogFP != nil { - cs.msgLogFP.Close() - cs.msgLogFP = nil - } - // ensure all new step events are regenerated as expected newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) + // just open the file for reading, no need to use wal fp, err := os.OpenFile(file, os.O_RDONLY, 0666) if err != nil { return err } - pb := newPlayback(file, fp, cs, newStepCh, cs.state.Copy()) + pb := newPlayback(file, fp, cs, cs.state.Copy()) defer pb.fp.Close() var nextN int // apply N msgs in a row @@ -161,7 +179,7 @@ func (cs *ConsensusState) replay(file string, console bool) error { nextN = pb.replayConsoleLoop() } - if err := pb.readReplayMessage(pb.scanner.Bytes()); err != nil { + if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil { return err } @@ -177,41 +195,39 @@ func (cs *ConsensusState) replay(file string, console bool) error { // playback manager type playback struct { - cs *ConsensusState - file string - fp *os.File - scanner *bufio.Scanner - newStepCh chan interface{} - genesisState *sm.State - count int + cs *ConsensusState + + fp *os.File + scanner *bufio.Scanner + count int // how many lines/msgs into the file are we + + // replays can be reset to beginning + fileName string // so we can close/reopen the file + genesisState *sm.State // so the replay session knows where to restart from } -func newPlayback(file string, fp *os.File, cs *ConsensusState, ch chan interface{}, genState *sm.State) *playback { +func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback { return &playback{ cs: cs, - file: file, - newStepCh: ch, - genesisState: genState, fp: fp, + fileName: fileName, + genesisState: genState, scanner: bufio.NewScanner(fp), } } // go back count steps by resetting the state and running (pb.count - count) steps -func (pb *playback) replayReset(count int) error { +func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { pb.cs.Stop() newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool) newCs.SetEventSwitch(pb.cs.evsw) - // ensure all new step events are regenerated as expected - pb.newStepCh = newCs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) - newCs.startForReplay() pb.fp.Close() - fp, err := os.OpenFile(pb.file, os.O_RDONLY, 0666) + fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0666) if err != nil { return err } @@ -222,7 +238,7 @@ func (pb *playback) replayReset(count int) error { pb.count = 0 pb.cs = newCs for i := 0; pb.scanner.Scan() && i < count; i++ { - if err := pb.readReplayMessage(pb.scanner.Bytes()); err != nil { + if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil { return err } pb.count += 1 @@ -285,8 +301,10 @@ func (pb *playback) replayConsoleLoop() int { // NOTE: "back" is not supported in the state machine design, // so we restart and replay up to + // ensure all new step events are regenerated as expected + newStepCh := pb.cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) if len(tokens) == 1 { - pb.replayReset(1) + pb.replayReset(1, newStepCh) } else { i, err := strconv.Atoi(tokens[1]) if err != nil { @@ -294,7 +312,7 @@ func (pb *playback) replayConsoleLoop() int { } else if i > pb.count { fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count) } else { - pb.replayReset(i) + pb.replayReset(i, newStepCh) } } @@ -333,103 +351,3 @@ func (pb *playback) replayConsoleLoop() int { } return 0 } - -func (pb *playback) readReplayMessage(msgBytes []byte) error { - var err error - var msg ConsensusLogMessage - wire.ReadJSON(&msg, msgBytes, &err) - if err != nil { - return fmt.Errorf("Error reading json data: %v", err) - } - - // for logging - switch m := msg.Msg.(type) { - case *types.EventDataRoundState: - log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) - // these are playback checks - ticker := time.After(time.Second * 2) - if pb.newStepCh != nil { - select { - case mi := <-pb.newStepCh: - m2 := mi.(*types.EventDataRoundState) - if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { - return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) - } - case <-ticker: - return fmt.Errorf("Failed to read off newStepCh") - } - } - case msgInfo: - peerKey := m.PeerKey - if peerKey == "" { - peerKey = "local" - } - switch msg := m.Msg.(type) { - case *ProposalMessage: - p := msg.Proposal - log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", - p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) - case *BlockPartMessage: - log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) - case *VoteMessage: - v := msg.Vote - log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, - "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) - } - // internal or from peer - if m.PeerKey == "" { - pb.cs.internalMsgQueue <- m - } else { - pb.cs.peerMsgQueue <- m - } - case timeoutInfo: - log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) - pb.cs.tockChan <- m - default: - return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) - } - return nil -} - -// Read lines starting from the end of the file until we read a line that causes found to return true -func SeekFromEndOfFile(f *os.File, found func([]byte) bool) (nLines int, err error) { - var current int64 - // start at the end - current, err = f.Seek(0, 2) - if err != nil { - fmt.Println("1") - return - } - - // backup until we find the the right line - for { - current -= 1 - if current < 0 { - return - } - // backup one and read a new byte - if _, err = f.Seek(current, 0); err != nil { - fmt.Println("2", current) - return - } - b := make([]byte, 1) - if _, err = f.Read(b); err != nil { - return - } - if b[0] == '\n' || len(b) == 0 { - nLines += 1 - - // read a full line - reader := bufio.NewReader(f) - lineBytes, _ := reader.ReadBytes('\n') - if len(lineBytes) == 0 { - continue - } - - if found(lineBytes) { - f.Seek(current, 0) - return - } - } - } -} diff --git a/consensus/state.go b/consensus/state.go index 8b61aa88..c91ffd63 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "os" "reflect" "sync" "time" @@ -192,9 +191,7 @@ type ConsensusState struct { evsw *events.EventSwitch - msgLogFP *os.File // file we write all msgs and timeouts to, in order (for deterministic replay) - msgLogExists bool // if the file already existed (restarted process) - done chan struct{} // used to wait until the receiveRoutine quits so we can close the file + wal *WAL nSteps int // used for testing to limit the number of transitions the state makes } @@ -209,7 +206,6 @@ func NewConsensusState(state *sm.State, proxyAppConn proxy.AppConn, blockStore * timeoutTicker: new(time.Ticker), tickChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize), - done: make(chan struct{}), } cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -282,23 +278,18 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() - - // wait to quit the receiveRoutine - <-cs.done - if cs.msgLogFP != nil { - cs.msgLogFP.Close() - } } // Open file to log all consensus messages and timeouts for deterministic accountability -func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { +func (cs *ConsensusState) OpenWAL(file string) (err error) { cs.mtx.Lock() defer cs.mtx.Unlock() - if _, err := os.Stat(file); err == nil { - cs.msgLogExists = true + wal, err := NewWAL(file) + if err != nil { + return err } - cs.msgLogFP, err = os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666) - return err + cs.wal = wal + return nil } //------------------------------------------------------------ @@ -487,7 +478,7 @@ func (cs *ConsensusState) updateToState(state *sm.State) { func (cs *ConsensusState) newStep() { rs := cs.RoundStateEvent() - cs.saveMsg(rs) + cs.wal.Save(rs) cs.nSteps += 1 // newStep is called by updateToStep in NewConsensusState before the evsw is set! if cs.evsw != nil { @@ -570,21 +561,24 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { select { case mi = <-cs.peerMsgQueue: - cs.saveMsg(mi) + cs.wal.Save(mi) // handles proposals, block parts, votes // may generate internal events (votes, complete proposals, 2/3 majorities) cs.handleMsg(mi, rs) case mi = <-cs.internalMsgQueue: - cs.saveMsg(mi) + cs.wal.Save(mi) // handles proposals, block parts, votes cs.handleMsg(mi, rs) case ti := <-cs.tockChan: - cs.saveMsg(ti) + cs.wal.Save(ti) // if the timeout is relevant to the rs // go to the next step cs.handleTimeout(ti, rs) case <-cs.Quit: - close(cs.done) + // close wal now that we're done writing to it + if cs.wal != nil { + cs.wal.Close() + } return } } diff --git a/consensus/wal.go b/consensus/wal.go new file mode 100644 index 00000000..5b6e3989 --- /dev/null +++ b/consensus/wal.go @@ -0,0 +1,113 @@ +package consensus + +import ( + "bufio" + "os" + "time" + + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/types" +) + +//-------------------------------------------------------- +// types and functions for savings consensus messages + +type ConsensusLogMessage struct { + Time time.Time `json:"time"` + Msg ConsensusLogMessageInterface `json:"msg"` +} + +type ConsensusLogMessageInterface interface{} + +var _ = wire.RegisterInterface( + struct{ ConsensusLogMessageInterface }{}, + wire.ConcreteType{&types.EventDataRoundState{}, 0x01}, + wire.ConcreteType{msgInfo{}, 0x02}, + wire.ConcreteType{timeoutInfo{}, 0x03}, +) + +//-------------------------------------------------------- +// Simple write-ahead logger + +// Write ahead logger writes msgs to disk before they are processed. +// Can be used for crash-recovery and deterministic replay +type WAL struct { + fp *os.File + exists bool // if the file already existed (restarted process) +} + +func NewWAL(file string) (*WAL, error) { + var walExists bool + if _, err := os.Stat(file); err == nil { + walExists = true + } + fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) + if err != nil { + return nil, err + } + return &WAL{ + fp: fp, + exists: walExists, + }, nil +} + +// called in newStep and for each pass in receiveRoutine +func (wal *WAL) Save(msg ConsensusLogMessageInterface) { + if wal != nil { + var n int + var err error + wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, wal.fp, &n, &err) + wire.WriteTo([]byte("\n"), wal.fp, &n, &err) // one message per line + if err != nil { + PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, msg)) + } + } +} + +// Must not be called concurrently. +func (wal *WAL) Close() { + if wal != nil { + wal.fp.Close() + } +} + +func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { + var current int64 + // start at the end + current, err = wal.fp.Seek(0, 2) + if err != nil { + return + } + + // backup until we find the the right line + for { + current -= 1 + if current < 0 { + return + } + // backup one and read a new byte + if _, err = wal.fp.Seek(current, 0); err != nil { + return + } + b := make([]byte, 1) + if _, err = wal.fp.Read(b); err != nil { + return + } + if b[0] == '\n' || len(b) == 0 { + // read a full line + reader := bufio.NewReader(wal.fp) + lineBytes, _ := reader.ReadBytes('\n') + if len(lineBytes) == 0 { + continue + } + + nLines += 1 + if found(lineBytes) { + wal.fp.Seek(0, 1) // (?) + wal.fp.Seek(current, 0) + return + } + } + } +} diff --git a/consensus/wal_test.go b/consensus/wal_test.go new file mode 100644 index 00000000..8348ca24 --- /dev/null +++ b/consensus/wal_test.go @@ -0,0 +1,76 @@ +package consensus + +import ( + "io/ioutil" + "os" + "path" + "strings" + "testing" +) + +var testTxt = `{"time":"2016-01-16T04:42:00.390Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrevote"}]} +{"time":"2016-01-16T04:42:00.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":1,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"4CC6845A128E723A299B470CCBB2A158612AA51321447F6492F3DA57D135C27FCF4124B3B19446A248252BDA45B152819C76AAA5FD35E1C07091885CE6955E05"}}],"peer_key":""}]} +{"time":"2016-01-16T04:42:00.392Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrecommit"}]} +{"time":"2016-01-16T04:42:00.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":2,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"1B9924E010F47E0817695DFE462C531196E5A12632434DE12180BBA3EFDAD6B3960FDB9357AFF085EB61729A7D4A6AD8408555D7569C87D9028F280192FD4E05"}}],"peer_key":""}]} +{"time":"2016-01-16T04:42:00.393Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepCommit"}]} +{"time":"2016-01-16T04:42:00.395Z","msg":[1,{"height":28220,"round":0,"step":"RoundStepNewHeight"}]}` + +func TestSeek(t *testing.T) { + f, err := ioutil.TempFile(os.TempDir(), "seek_test_") + if err != nil { + t.Fatal(err) + } + + stat, _ := f.Stat() + name := stat.Name() + + _, err = f.WriteString(testTxt) + if err != nil { + t.Fatal(err) + } + f.Close() + + wal, err := NewWAL(path.Join(os.TempDir(), name)) + if err != nil { + t.Fatal(err) + } + + keyWord := "Precommit" + n, err := wal.SeekFromEnd(func(b []byte) bool { + if strings.Contains(string(b), keyWord) { + return true + } + return false + }) + if err != nil { + t.Fatal(err) + } + + // confirm n + spl := strings.Split(testTxt, "\n") + var i int + var s string + for i, s = range spl { + if strings.Contains(s, keyWord) { + break + } + } + // n is lines from the end. + spl = spl[i:] + if n != len(spl) { + t.Fatalf("Wrong nLines. Got %d, expected %d", n, len(spl)) + } + + b, err := ioutil.ReadAll(wal.fp) + if err != nil { + t.Fatal(err) + } + // first char is a \n + spl2 := strings.Split(strings.Trim(string(b), "\n"), "\n") + for i, s := range spl { + if s != spl2[i] { + t.Fatalf("Mismatch. Got %s, expected %s", spl2[i], s) + } + } + +} diff --git a/node/node.go b/node/node.go index b9250bdd..34d0c469 100644 --- a/node/node.go +++ b/node/node.go @@ -85,9 +85,9 @@ func NewNode(privValidator *types.PrivValidator) *Node { } // deterministic accountability - err = consensusState.OpenFileForMessageLog(config.GetString("cs_msg_log")) + err = consensusState.OpenWAL(config.GetString("cswal")) if err != nil { - log.Error("Failed to open cs_msg_log", "error", err.Error()) + log.Error("Failed to open cswal", "error", err.Error()) } // Make p2p network switch @@ -251,6 +251,49 @@ func makeNodeInfo(sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo { return nodeInfo } +// Get a connection to the proxyAppConn addr. +// Check the current hash, and panic if it doesn't match. +func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) { + // use local app (for testing) + if addr == "local" { + app := example.NewCounterApplication(true) + mtx := new(sync.Mutex) + proxyAppConn = proxy.NewLocalAppConn(mtx, app) + } else { + proxyConn, err := Connect(addr) + if err != nil { + Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) + } + remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024) + remoteApp.Start() + + proxyAppConn = remoteApp + } + + // Check the hash + currentHash, err := proxyAppConn.GetHashSync() + if err != nil { + PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", err)) + } + if !bytes.Equal(hash, currentHash) { + PanicCrisis(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, currentHash)) + } + + return proxyAppConn +} + +// Load the most recent state from "state" db, +// or create a new one (and save) from genesis. +func getState() *sm.State { + stateDB := dbm.GetDB("state") + state := sm.LoadState(stateDB) + if state == nil { + state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + state.Save() + } + return state +} + //------------------------------------------------------------------------------ // Users wishing to use an external signer for their validators @@ -314,6 +357,10 @@ func RunNode() { }) } +//------------------------------------------------------------------------------ +// replay + +// convenience for replay mode func newConsensusState() *consensus.ConsensusState { // Get BlockStore blockStoreDB := dbm.GetDB("blockstore") @@ -347,71 +394,28 @@ func newConsensusState() *consensus.ConsensusState { } func RunReplayConsole() { - msgLogFile := config.GetString("cs_msg_log") - if msgLogFile == "" { - Exit("cs_msg_log file name not set in tendermint config") + walFile := config.GetString("cswal") + if walFile == "" { + Exit("cswal file name not set in tendermint config") } consensusState := newConsensusState() - if err := consensusState.ReplayConsole(msgLogFile); err != nil { + if err := consensusState.ReplayConsole(walFile); err != nil { Exit(Fmt("Error during consensus replay: %v", err)) } } func RunReplay() { - msgLogFile := config.GetString("cs_msg_log") - if msgLogFile == "" { - Exit("cs_msg_log file name not set in tendermint config") + walFile := config.GetString("cswal") + if walFile == "" { + Exit("cswal file name not set in tendermint config") } consensusState := newConsensusState() - if err := consensusState.ReplayMessages(msgLogFile); err != nil { + if err := consensusState.ReplayMessages(walFile); err != nil { Exit(Fmt("Error during consensus replay: %v", err)) } log.Notice("Replay run successfully") } - -// Load the most recent state from "state" db, -// or create a new one (and save) from genesis. -func getState() *sm.State { - stateDB := dbm.GetDB("state") - state := sm.LoadState(stateDB) - if state == nil { - state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) - state.Save() - } - return state -} - -// Get a connection to the proxyAppConn addr. -// Check the current hash, and panic if it doesn't match. -func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) { - // use local app (for testing) - if addr == "local" { - app := example.NewCounterApplication(true) - mtx := new(sync.Mutex) - proxyAppConn = proxy.NewLocalAppConn(mtx, app) - } else { - proxyConn, err := Connect(addr) - if err != nil { - Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) - } - remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024) - remoteApp.Start() - - proxyAppConn = remoteApp - } - - // Check the hash - currentHash, err := proxyAppConn.GetHashSync() - if err != nil { - PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", err)) - } - if !bytes.Equal(hash, currentHash) { - PanicCrisis(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, currentHash)) - } - - return proxyAppConn -} From 273a65724d6958d34dd098f2d68a516f29716976 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 18 Jan 2016 15:57:57 -0500 Subject: [PATCH 11/11] replayCatchup test --- consensus/common_test.go | 31 ++++++++++------- consensus/replay.go | 15 ++++++--- consensus/replay_test.go | 72 ++++++++++++++++++++++++++++++++++++++++ consensus/state_test.go | 32 +++++++++--------- consensus/wal.go | 6 +++- 5 files changed, 123 insertions(+), 33 deletions(-) create mode 100644 consensus/replay_test.go diff --git a/consensus/common_test.go b/consensus/common_test.go index dae0882b..c222db67 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -9,9 +9,9 @@ import ( "time" dbm "github.com/tendermint/go-db" + "github.com/tendermint/go-events" bc "github.com/tendermint/tendermint/blockchain" _ "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/go-events" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" @@ -296,16 +296,16 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo cs.mtx.Unlock() } -func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { - // Get State - state, privVals := randGenesisState(nValidators, false, 10) +func fixedConsensusState() *ConsensusState { + stateDB := dbm.NewMemDB() + state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + privValidatorFile := config.GetString("priv_validator_file") + privValidator := types.LoadOrGenPrivValidator(privValidatorFile) + return newConsensusState(state, privValidator) - // fmt.Println(state.Validators) - - vss := make([]*validatorStub, nValidators) - - // make consensus state for lead validator +} +func newConsensusState(state *sm.State, pv *types.PrivValidator) *ConsensusState { // Get BlockStore blockDB := dbm.NewMemDB() blockStore := bc.NewBlockStore(blockDB) @@ -320,14 +320,21 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { // Make ConsensusReactor cs := NewConsensusState(state, proxyAppConnCon, blockStore, mempool) - cs.SetPrivValidator(privVals[0]) + cs.SetPrivValidator(pv) evsw := events.NewEventSwitch() cs.SetEventSwitch(evsw) evsw.Start() + return cs +} - // start the transition routines - // cs.startRoutines() +func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { + // Get State + state, privVals := randGenesisState(nValidators, false, 10) + + vss := make([]*validatorStub, nValidators) + + cs := newConsensusState(state, privVals[0]) for i := 0; i < nValidators; i++ { vss[i] = NewValidatorStub(privVals[i]) diff --git a/consensus/replay.go b/consensus/replay.go index 5d193efe..4f6cb1c5 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -24,6 +24,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte var msg ConsensusLogMessage wire.ReadJSON(&msg, msgBytes, &err) if err != nil { + fmt.Println(string(msgBytes)) return fmt.Errorf("Error reading json data: %v", err) } @@ -87,8 +88,6 @@ func (cs *ConsensusState) catchupReplay(height int) error { return nil } - log.Notice("Catchup by replaying consensus messages") - // starting from end of file, // read messages until a new height is found nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { @@ -110,6 +109,13 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } + var beginning bool // if we had to go back to the beginning + if c, _ := cs.wal.fp.Seek(0, 1); c == 0 { + beginning = true + } + + log.Notice("Catchup by replaying consensus messages", "n", nLines) + // now we can replay the latest nLines on consensus state // note we can't use scan because we've already been reading from the file reader := bufio.NewReader(cs.wal.fp) @@ -122,8 +128,8 @@ func (cs *ConsensusState) catchupReplay(height int) error { } else if len(msgBytes) == 0 { continue } - // the first msg is the NewHeight event, so we can ignore it - if i == 1 { + // the first msg is (usually) the NewHeight event, so we can ignore it + if !beginning && i == 1 { continue } @@ -134,6 +140,7 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } } + log.Info("Done catchup replay") return nil } diff --git a/consensus/replay_test.go b/consensus/replay_test.go new file mode 100644 index 00000000..f94c73fc --- /dev/null +++ b/consensus/replay_test.go @@ -0,0 +1,72 @@ +package consensus + +import ( + "io/ioutil" + "os" + "testing" + "time" + + "github.com/tendermint/tendermint/types" +) + +var testLog = `{"time":"2016-01-18T20:46:00.774Z","msg":[3,{"duration":982632969,"height":1,"round":0,"step":1}]} +{"time":"2016-01-18T20:46:00.776Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} +{"time":"2016-01-18T20:46:00.776Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"pol_round":-1,"signature":"A1803A1364F6398C154FE45D5649A89129039F18A0FE42B211BADFDF6E81EA53F48F83D3610DDD848C3A5284D3F09BDEB26FA1D856BDF70D48C507BF2453A70E"}}],"peer_key":""}]} +{"time":"2016-01-18T20:46:00.777Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F746573740101142AA030B15DDFC000000000000000000000000000000114C4B01D3810579550997AC5641E759E20D99B51C10001000100","proof":{"aunts":[]}}}],"peer_key":""}]} +{"time":"2016-01-18T20:46:00.781Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]} +{"time":"2016-01-18T20:46:00.781Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"E05D1DB8DEC7CDA507A42C8FF208EE4317C663F6","block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"signature":"88F5708C802BEE54EFBF438967FBC6C6EAAFC41258A85D92B9B055481175BE9FA71007B1AAF2BFBC3BF3CC0542DB48A9812324B7BBA7307446CCDBF029077F07"}}],"peer_key":""}]} +{"time":"2016-01-18T20:46:00.786Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} +{"time":"2016-01-18T20:46:00.786Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"E05D1DB8DEC7CDA507A42C8FF208EE4317C663F6","block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"signature":"65B0C9D2A8C9919FC9B036F82C3F1818E706E8BC066A78D99D3316E4814AB06594841E387B323AA7773F926D253C1E4D4A0930F7A8C8AE1E838CA15C673B2B02"}}],"peer_key":""}]} +` + +func TestReplayCatchup(t *testing.T) { + // write the needed wal to file + f, err := ioutil.TempFile(os.TempDir(), "replay_test_") + if err != nil { + t.Fatal(err) + } + name := f.Name() + _, err = f.WriteString(testLog) + if err != nil { + t.Fatal(err) + } + f.Close() + + cs := fixedConsensusState() + + // we've already precommitted on the first block + // without replay catchup we would be halted here forever + cs.privValidator.LastHeight = 1 // first block + cs.privValidator.LastStep = 3 // precommit + + newBlockCh := cs.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0) + + // start timeout and receive routines + cs.startRoutines(0) + + // open wal and run catchup messages + openWAL(t, cs, name) + if err := cs.catchupReplay(cs.Height); err != nil { + t.Fatalf("Error on catchup replay %v", err) + } + + cs.enterNewRound(cs.Height, cs.Round) + + after := time.After(time.Second * 2) + select { + case <-newBlockCh: + case <-after: + t.Fatal("Timed out waiting for new block") + } + +} + +func openWAL(t *testing.T, cs *ConsensusState, file string) { + // open the wal + wal, err := NewWAL(file) + if err != nil { + t.Fatal(err) + } + wal.exists = true + cs.wal = wal +} diff --git a/consensus/state_test.go b/consensus/state_test.go index ab6795a6..ba24a5ab 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -51,7 +51,7 @@ func init() { } func TestProposerSelection0(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) height, round := cs1.Height, cs1.Round newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) @@ -85,7 +85,7 @@ func TestProposerSelection0(t *testing.T) { // Now let's do it all again, but starting from round 2 instead of 0 func TestProposerSelection2(t *testing.T) { - cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators + cs1, vss := randConsensusState(4) // test needs more work for more than 3 validators newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1) @@ -114,7 +114,7 @@ func TestProposerSelection2(t *testing.T) { // a non-validator should timeout into the prevote round func TestEnterProposeNoPrivValidator(t *testing.T) { - cs, _ := simpleConsensusState(1) + cs, _ := randConsensusState(1) cs.SetPrivValidator(nil) height, round := cs.Height, cs.Round @@ -139,7 +139,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { // a validator should not timeout of the prevote round (TODO: unless the block is really big!) func TestEnterProposeYesPrivValidator(t *testing.T) { - cs, _ := simpleConsensusState(1) + cs, _ := randConsensusState(1) height, round := cs.Height, cs.Round // Listen for propose timeout event @@ -175,7 +175,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { } func TestBadProposal(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) height, round := cs1.Height, cs1.Round cs2 := vss[1] @@ -231,7 +231,7 @@ func TestBadProposal(t *testing.T) { // propose, prevote, and precommit a block func TestFullRound1(t *testing.T) { - cs, vss := simpleConsensusState(1) + cs, vss := randConsensusState(1) height, round := cs.Height, cs.Round voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) @@ -259,7 +259,7 @@ func TestFullRound1(t *testing.T) { // nil is proposed, so prevote and precommit nil func TestFullRoundNil(t *testing.T) { - cs, vss := simpleConsensusState(1) + cs, vss := randConsensusState(1) height, round := cs.Height, cs.Round voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0) @@ -277,7 +277,7 @@ func TestFullRoundNil(t *testing.T) { // run through propose, prevote, precommit commit with two validators // where the first validator has to wait for votes from the second func TestFullRound2(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] height, round := cs1.Height, cs1.Round @@ -318,7 +318,7 @@ func TestFullRound2(t *testing.T) { // two validators, 4 rounds. // two vals take turns proposing. val1 locks on first one, precommits nil on everything else func TestLockNoPOL(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] height := cs1.Height @@ -481,7 +481,7 @@ func TestLockNoPOL(t *testing.T) { // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka func TestLockPOLRelock(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0) @@ -589,7 +589,7 @@ func TestLockPOLRelock(t *testing.T) { // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka func TestLockPOLUnlock(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) @@ -680,7 +680,7 @@ func TestLockPOLUnlock(t *testing.T) { // then a polka at round 2 that we lock on // then we see the polka from round 1 but shouldn't unlock func TestLockPOLSafety1(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) @@ -799,7 +799,7 @@ func TestLockPOLSafety1(t *testing.T) { // What we want: // dont see P0, lock on P1 at R1, dont unlock using P0 at R2 func TestLockPOLSafety2(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) @@ -889,7 +889,7 @@ func TestLockPOLSafety2(t *testing.T) { /* func TestSlashingPrevotes(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] @@ -924,7 +924,7 @@ func TestSlashingPrevotes(t *testing.T) { } func TestSlashingPrecommits(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] @@ -969,7 +969,7 @@ func TestSlashingPrecommits(t *testing.T) { // 4 vals. // we receive a final precommit after going into next round, but others might have gone to commit already! func TestHalt1(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0) diff --git a/consensus/wal.go b/consensus/wal.go index 5b6e3989..e2599de2 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -32,6 +32,8 @@ var _ = wire.RegisterInterface( // Write ahead logger writes msgs to disk before they are processed. // Can be used for crash-recovery and deterministic replay +// TODO: currently the wal is overwritten during replay catchup +// give it a mode so it's either reading or appending - must read to end to start appending again type WAL struct { fp *os.File exists bool // if the file already existed (restarted process) @@ -81,9 +83,11 @@ func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { } // backup until we find the the right line + // current is how far we are from the beginning for { current -= 1 if current < 0 { + wal.fp.Seek(0, 0) // back to beginning return } // backup one and read a new byte @@ -95,6 +99,7 @@ func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { return } if b[0] == '\n' || len(b) == 0 { + nLines += 1 // read a full line reader := bufio.NewReader(wal.fp) lineBytes, _ := reader.ReadBytes('\n') @@ -102,7 +107,6 @@ func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { continue } - nLines += 1 if found(lineBytes) { wal.fp.Seek(0, 1) // (?) wal.fp.Seek(current, 0)