diff --git a/DOCKER/docker.sh b/DOCKER/docker.sh index d64557a1..d2eb1471 100755 --- a/DOCKER/docker.sh +++ b/DOCKER/docker.sh @@ -8,11 +8,6 @@ docker build -t tmbase -f Dockerfile . # (config and blockchain data go in here) docker run --name tmdata --entrypoint /bin/echo tmbase Data-only container for tmnode -# Copy files into the data-only container -# You should stop the containers before running this -# cd $DATA_SRC -# tar cf - . | docker run -i --rm --volumes-from mintdata mint tar xvf - -C /data/tendermint - # Run tendermint node docker run --name tmnode --volumes-from tmdata -d -p 46656:46656 -p 46657:46657 -e TMSEEDS="goldenalchemist.chaintest.net:46657" -e TMNAME="testnode" -e TMREPO="github.com/tendermint/tendermint" -e TMHEAD="origin/develop" tmbase diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index 6ccbc4d1..b9c4538b 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -34,6 +34,8 @@ Commands: switch args[0] { case "node": node.RunNode() + case "replay": + node.RunReplay() case "init": init_files() case "show_validator": diff --git a/config/tendermint/config.go b/config/tendermint/config.go index b2e220fc..e6c74025 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -67,6 +67,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") + mapConfig.SetDefault("cs_msg_log", rootDir+"/cs_msg_log") return mapConfig } diff --git a/consensus/reactor.go b/consensus/reactor.go index 41ff77cc..608ebef0 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -239,8 +239,8 @@ func (conR *ConsensusReactor) registerEventCallbacks() { conR.broadcastNewRoundStep(rs) }) - conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) { - edv := data.(*types.EventDataVote) + conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { + edv := data.(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) } diff --git a/consensus/state.go b/consensus/state.go index 721f7157..9d9e65d8 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1,9 +1,11 @@ package consensus import ( + "bufio" "bytes" "errors" "fmt" + "os" "reflect" "sync" "time" @@ -153,20 +155,20 @@ var ( // msgs from the reactor which may update the state type msgInfo struct { - msg ConsensusMessage - peerKey string + Msg ConsensusMessage `json:"msg"` + PeerKey string `json:"peer_key"` } // internally generated messages which may update the state type timeoutInfo struct { - duration time.Duration - height int - round int - step RoundStepType + Duration time.Duration `json:"duration"` + Height int `json:"height"` + Round int `json:"round"` + Step RoundStepType `json:"step"` } func (ti *timeoutInfo) String() string { - return fmt.Sprintf("%v ; %d/%d %v", ti.duration, ti.height, ti.round, ti.step) + return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step) } // Tracks consensus state across block heights and rounds. @@ -190,6 +192,8 @@ type ConsensusState struct { evsw *events.EventSwitch + msgLogFP *os.File // file we write all msgs and timeouts to, in order (for deterministic replay) + nSteps int // used for testing to limit the number of transitions the state makes } @@ -268,6 +272,71 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() + if cs.msgLogFP != nil { + cs.msgLogFP.Close() + } +} + +func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + return err +} + +// Playback with tests +func (cs ConsensusState) ReplayMessagesFromFile(file string) error { + if cs.IsRunning() { + return errors.New("cs is already running, cannot replay") + } + + cs.BaseService.OnStart() + cs.startRoutines(0) + + if cs.msgLogFP != nil { + cs.msgLogFP.Close() + cs.msgLogFP = nil + } + + // we ensure all new step events are regenerated as expected + newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) + + fp, err := os.OpenFile(file, os.O_RDONLY, 0666) + if err != nil { + return err + } + defer fp.Close() + scanner := bufio.NewScanner(fp) + for scanner.Scan() { + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, scanner.Bytes(), &err) + if err != nil { + return fmt.Errorf("Error reading json data: %v", err) + } + log.Notice("Replaying message", "type", reflect.TypeOf(msg.Msg), "msg", msg.Msg) + switch m := msg.Msg.(type) { + case *types.EventDataRoundState: + // these are playback checks + mi := <-newStepCh + m2 := mi.(*types.EventDataRoundState) + if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { + return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) + } + case msgInfo: + // internal or from peer + if m.PeerKey == "" { + cs.internalMsgQueue <- m + } else { + cs.peerMsgQueue <- m + } + case timeoutInfo: + cs.tockChan <- m + default: + return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + } + } + return nil } //------------------------------------------------------------ @@ -455,10 +524,12 @@ func (cs *ConsensusState) updateToState(state *sm.State) { } func (cs *ConsensusState) newStep() { + rs := cs.RoundStateEvent() + cs.saveMsg(rs) cs.nSteps += 1 // newStep is called by updateToStep in NewConsensusState before the evsw is set! if cs.evsw != nil { - cs.evsw.FireEvent(types.EventStringNewRoundStep(), cs.RoundStateEvent()) + cs.evsw.FireEvent(types.EventStringNewRoundStep(), rs) } } @@ -477,13 +548,13 @@ func (cs *ConsensusState) timeoutRoutine() { log.Debug("Received tick", "old_ti", ti, "new_ti", newti) // ignore tickers for old height/round/step - if newti.height < ti.height { + if newti.Height < ti.Height { continue - } else if newti.height == ti.height { - if newti.round < ti.round { + } else if newti.Height == ti.Height { + if newti.Round < ti.Round { continue - } else if newti.round == ti.round { - if ti.step > 0 && newti.step <= ti.step { + } else if newti.Round == ti.Round { + if ti.Step > 0 && newti.Step <= ti.Step { continue } } @@ -492,16 +563,16 @@ func (cs *ConsensusState) timeoutRoutine() { ti = newti // if the newti has duration == 0, we relay to the tockChan immediately (no timeout) - if ti.duration == time.Duration(0) { + if ti.Duration == time.Duration(0) { go func(t timeoutInfo) { cs.tockChan <- t }(ti) continue } - log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) cs.timeoutTicker.Stop() - cs.timeoutTicker = time.NewTicker(ti.duration) + cs.timeoutTicker = time.NewTicker(ti.Duration) case <-cs.timeoutTicker.C: - log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) cs.timeoutTicker.Stop() // go routine here gaurantees timeoutRoutine doesn't block. // Determinism comes from playback in the receiveRoutine. @@ -537,13 +608,16 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { select { case mi = <-cs.peerMsgQueue: + cs.saveMsg(mi) // handles proposals, block parts, votes // may generate internal events (votes, complete proposals, 2/3 majorities) cs.handleMsg(mi, rs) case mi = <-cs.internalMsgQueue: + cs.saveMsg(mi) // handles proposals, block parts, votes cs.handleMsg(mi, rs) case ti := <-cs.tockChan: + cs.saveMsg(ti) // if the timeout is relevant to the rs // go to the next step cs.handleTimeout(ti, rs) @@ -559,7 +633,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { defer cs.mtx.Unlock() var err error - msg, peerKey := mi.msg, mi.peerKey + msg, peerKey := mi.Msg, mi.PeerKey switch msg := msg.(type) { case *ProposalMessage: // will not cause transition. @@ -592,10 +666,10 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { } func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { - log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + log.Debug("Received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) // timeouts must be for current height, round, step - if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) { + if ti.Height != rs.Height || ti.Round < rs.Round || (ti.Round == rs.Round && ti.Step < rs.Step) { log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step) return } @@ -604,22 +678,22 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { cs.mtx.Lock() defer cs.mtx.Unlock() - switch ti.step { + switch ti.Step { case RoundStepNewHeight: // NewRound event fired from enterNewRound. - // Do we want a timeout event too? - cs.enterNewRound(ti.height, 0) + // XXX: should we fire timeout here? + cs.enterNewRound(ti.Height, 0) case RoundStepPropose: cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) - cs.enterPrevote(ti.height, ti.round) + cs.enterPrevote(ti.Height, ti.Round) case RoundStepPrevoteWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.enterPrecommit(ti.height, ti.round) + cs.enterPrecommit(ti.Height, ti.Round) case RoundStepPrecommitWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.enterNewRound(ti.height, ti.round+1) + cs.enterNewRound(ti.Height, ti.Round+1) default: - panic(Fmt("Invalid timeout step: %v", ti.step)) + panic(Fmt("Invalid timeout step: %v", ti.Step)) } } @@ -1304,7 +1378,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string added, address, err = cs.LastCommit.AddByIndex(valIndex, vote) if added { log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) - cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) + cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote}) } return @@ -1315,7 +1389,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string height := cs.Height added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey) if added { - cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) + cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote}) switch vote.Type { case types.VoteTypePrevote: @@ -1414,6 +1488,66 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part } } +// Save Block, save the +2/3 Commits we've seen +func (cs *ConsensusState) saveBlock(block *types.Block, blockParts *types.PartSet, commits *types.VoteSet) { + + // The proposal must be valid. + if err := cs.stageBlock(block, blockParts); err != nil { + PanicSanity(Fmt("saveBlock() an invalid block: %v", err)) + } + + // Save to blockStore. + if cs.blockStore.Height() < block.Height { + seenValidation := commits.MakeValidation() + cs.blockStore.SaveBlock(block, blockParts, seenValidation) + } + + // Commit to proxyAppCtx + err := cs.stagedState.Commit(cs.proxyAppCtx) + if err != nil { + // TODO: handle this gracefully. + PanicQ(Fmt("Commit failed for applicaiton")) + } + + // Save the state. + cs.stagedState.Save() + + // Update mempool. + cs.mempool.Update(block) + + // Fire off event + if cs.evsw != nil && cs.evc != nil { + cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block}) + go cs.evc.Flush() + } + +} + +func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { + if cs.msgLogFP != nil { + var n int + var err error + wire.WriteJSON(ConsensusLogMessage{msg}, cs.msgLogFP, &n, &err) + wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line + if err != nil { + log.Error("Error writing to consensus message log file", "err", err, "msg", msg) + } + } +} + +type ConsensusLogMessage struct { + Msg ConsensusLogMessageInterface `json:"msg"` +} + +type ConsensusLogMessageInterface interface{} + +var _ = wire.RegisterInterface( + struct{ ConsensusLogMessageInterface }{}, + wire.ConcreteType{&types.EventDataRoundState{}, 0x01}, + wire.ConcreteType{msgInfo{}, 0x02}, + wire.ConcreteType{timeoutInfo{}, 0x03}, +) + //--------------------------------------------------------- func CompareHRS(h1, r1 int, s1 RoundStepType, h2, r2 int, s2 RoundStepType) int { diff --git a/node/node.go b/node/node.go index ab06d48c..a5f2fbda 100644 --- a/node/node.go +++ b/node/node.go @@ -84,6 +84,9 @@ func NewNode(privValidator *types.PrivValidator) *Node { consensusReactor.SetPrivValidator(privValidator) } + // deterministic accountability + consensusState.OpenFileForMessageLog(config.GetString("cs_msg_log")) + // Make p2p network switch sw := p2p.NewSwitch() sw.AddReactor("MEMPOOL", mempoolReactor) @@ -308,6 +311,47 @@ func RunNode() { }) } +func RunReplay() { + msgLogFile := config.GetString("cs_msg_log") + if msgLogFile == "" { + Exit("cs_msg_log file name not set in tendermint config") + } + + // Get BlockStore + blockStoreDB := dbm.GetDB("blockstore") + blockStore := bc.NewBlockStore(blockStoreDB) + + // Get State + stateDB := dbm.GetDB("state") + state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + + // Create two proxyAppCtx connections, + // one for the consensus and one for the mempool. + proxyAddr := config.GetString("proxy_app") + proxyAppCtxMempool := getProxyApp(proxyAddr, state.LastAppHash) + proxyAppCtxConsensus := getProxyApp(proxyAddr, state.LastAppHash) + + // add the chainid to the global config + config.Set("chain_id", state.ChainID) + + // Make event switch + eventSwitch := events.NewEventSwitch() + _, err := eventSwitch.Start() + if err != nil { + Exit(Fmt("Failed to start event switch: %v", err)) + } + + mempool := mempl.NewMempool(proxyAppCtxMempool) + + consensusState := consensus.NewConsensusState(state.Copy(), proxyAppCtxConsensus, blockStore, mempool) + consensusState.SetEventSwitch(eventSwitch) + + if err := consensusState.ReplayMessagesFromFile(msgLogFile); err != nil { + Exit(Fmt("Error during consensus replay: %v", err)) + } + log.Notice("Replay run successfully") +} + // Load the most recent state from "state" db, // or create a new one (and save) from genesis. func getState() *sm.State { diff --git a/proxy/remote_app_conn.go b/proxy/remote_app_conn.go index a1dff9a8..f95f1e70 100644 --- a/proxy/remote_app_conn.go +++ b/proxy/remote_app_conn.go @@ -106,7 +106,6 @@ func (app *remoteAppConn) sendRequestsRoutine() { app.StopForError(err) return } - log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) if _, ok := reqres.Request.(tmsp.RequestFlush); ok { err = app.bufWriter.Flush() if err != nil { @@ -133,7 +132,6 @@ func (app *remoteAppConn) recvResponseRoutine() { case tmsp.ResponseException: app.StopForError(errors.New(res.Error)) default: - log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) err := app.didRecvResponse(res) if err != nil { app.StopForError(err) diff --git a/types/events.go b/types/events.go index a316cfc5..72aabb63 100644 --- a/types/events.go +++ b/types/events.go @@ -52,7 +52,7 @@ var _ = wire.RegisterInterface( // wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, wire.ConcreteType{EventDataTx{}, EventDataTypeTx}, wire.ConcreteType{EventDataApp{}, EventDataTypeApp}, - wire.ConcreteType{EventDataRoundState{}, EventDataTypeRoundState}, + wire.ConcreteType{&EventDataRoundState{}, EventDataTypeRoundState}, // a pointer because we use it internally wire.ConcreteType{EventDataVote{}, EventDataTypeVote}, )