diff --git a/DOCKER/docker.sh b/DOCKER/docker.sh index d64557a1..d2eb1471 100755 --- a/DOCKER/docker.sh +++ b/DOCKER/docker.sh @@ -8,11 +8,6 @@ docker build -t tmbase -f Dockerfile . # (config and blockchain data go in here) docker run --name tmdata --entrypoint /bin/echo tmbase Data-only container for tmnode -# Copy files into the data-only container -# You should stop the containers before running this -# cd $DATA_SRC -# tar cf - . | docker run -i --rm --volumes-from mintdata mint tar xvf - -C /data/tendermint - # Run tendermint node docker run --name tmnode --volumes-from tmdata -d -p 46656:46656 -p 46657:46657 -e TMSEEDS="goldenalchemist.chaintest.net:46657" -e TMNAME="testnode" -e TMREPO="github.com/tendermint/tendermint" -e TMHEAD="origin/develop" tmbase diff --git a/cmd/tendermint/main.go b/cmd/tendermint/main.go index 81c55f32..64ac87e5 100644 --- a/cmd/tendermint/main.go +++ b/cmd/tendermint/main.go @@ -35,6 +35,12 @@ Commands: switch args[0] { case "node": node.RunNode() + case "replay": + if len(args) > 1 && args[1] == "console" { + node.RunReplayConsole() + } else { + node.RunReplay() + } case "init": init_files() case "show_validator": diff --git a/config/tendermint/config.go b/config/tendermint/config.go index b2e220fc..bdeb6add 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -67,6 +67,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") + mapConfig.SetDefault("cswal", rootDir+"/cswal") return mapConfig } diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 82047ded..3ded9b1e 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -57,9 +57,8 @@ func initTMRoot(rootDir string) { if !FileExists(genesisFilePath) { MustWriteFile(genesisFilePath, []byte(defaultGenesis), 0644) } - if !FileExists(privFilePath) { - MustWriteFile(privFilePath, []byte(defaultPrivValidator), 0644) - } + // we always overwrite the priv val + MustWriteFile(privFilePath, []byte(defaultPrivValidator), 0644) } func GetConfig(rootDir string) cfg.Config { @@ -92,6 +91,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") + mapConfig.SetDefault("cswal", rootDir+"/cswal") return mapConfig } diff --git a/consensus/common.go b/consensus/common.go new file mode 100644 index 00000000..ec1a0fa9 --- /dev/null +++ b/consensus/common.go @@ -0,0 +1,15 @@ +package consensus + +import ( + "github.com/tendermint/go-events" +) + +// NOTE: this is blocking +func subscribeToEvent(evsw *events.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { + // listen for new round + ch := make(chan interface{}, chanCap) + evsw.AddListenerForEvent(receiver, eventID, func(data events.EventData) { + ch <- data + }) + return ch +} diff --git a/consensus/common_test.go b/consensus/common_test.go index c70db420..08912665 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -140,7 +140,7 @@ func addVoteToFromMany(to *ConsensusState, votes []*types.Vote, froms ...*valida func addVoteToFrom(to *ConsensusState, from *validatorStub, vote *types.Vote) { valIndex, _ := to.Validators.GetByAddress(from.PrivValidator.Address) - to.peerMsgQueue <- msgInfo{msg: &VoteMessage{valIndex, vote}} + to.peerMsgQueue <- msgInfo{Msg: &VoteMessage{valIndex, vote}} // added, err := to.TryAddVote(valIndex, vote, "") /* if _, ok := err.(*types.ErrVoteConflictingSignature); ok { @@ -296,16 +296,16 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo cs.mtx.Unlock() } -func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { - // Get State - state, privVals := randGenesisState(nValidators, false, 10) +func fixedConsensusState() *ConsensusState { + stateDB := dbm.NewMemDB() + state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + privValidatorFile := config.GetString("priv_validator_file") + privValidator := types.LoadOrGenPrivValidator(privValidatorFile) + return newConsensusState(state, privValidator) - // fmt.Println(state.Validators) - - vss := make([]*validatorStub, nValidators) - - // make consensus state for lead validator +} +func newConsensusState(state *sm.State, pv *types.PrivValidator) *ConsensusState { // Get BlockStore blockDB := dbm.NewMemDB() blockStore := bc.NewBlockStore(blockDB) @@ -320,14 +320,21 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { // Make ConsensusReactor cs := NewConsensusState(state, proxyAppConnCon, blockStore, mempool) - cs.SetPrivValidator(privVals[0]) + cs.SetPrivValidator(pv) evsw := events.NewEventSwitch() cs.SetEventSwitch(evsw) evsw.Start() + return cs +} - // start the transition routines - // cs.startRoutines() +func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { + // Get State + state, privVals := randGenesisState(nValidators, false, 10) + + vss := make([]*validatorStub, nValidators) + + cs := newConsensusState(state, privVals[0]) for i := 0; i < nValidators; i++ { vss[i] = NewValidatorStub(privVals[i]) @@ -344,7 +351,7 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { go func() { for { v := <-voteCh0 - vote := v.(*types.EventDataVote) + vote := v.(types.EventDataVote) // we only fire for our own votes if bytes.Equal(addr, vote.Address) { voteCh <- v @@ -386,13 +393,3 @@ func startTestRound(cs *ConsensusState, height, round int) { cs.enterNewRound(height, round) cs.startRoutines(0) } - -// NOTE: this is blocking -func subscribeToEvent(evsw *events.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { - // listen for new round - ch := make(chan interface{}, chanCap) - evsw.AddListenerForEvent(receiver, eventID, func(data events.EventData) { - ch <- data - }) - return ch -} diff --git a/consensus/reactor.go b/consensus/reactor.go index 493fb2e2..8f4cacee 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -240,7 +240,7 @@ func (conR *ConsensusReactor) registerEventCallbacks() { }) conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) { - edv := data.(*types.EventDataVote) + edv := data.(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote, edv.Index) }) } diff --git a/consensus/replay.go b/consensus/replay.go new file mode 100644 index 00000000..2de357b2 --- /dev/null +++ b/consensus/replay.go @@ -0,0 +1,360 @@ +package consensus + +import ( + "bufio" + "errors" + "fmt" + "io" + "os" + "reflect" + "strconv" + "strings" + "time" + + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" + + sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" +) + +// unmarshal and apply a single message to the consensus state +func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error { + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, msgBytes, &err) + if err != nil { + fmt.Println(string(msgBytes)) + return fmt.Errorf("Error reading json data: %v", err) + } + + // for logging + switch m := msg.Msg.(type) { + case types.EventDataRoundState: + log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) + // these are playback checks + ticker := time.After(time.Second * 2) + if newStepCh != nil { + select { + case mi := <-newStepCh: + m2 := mi.(types.EventDataRoundState) + if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { + return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) + } + case <-ticker: + return fmt.Errorf("Failed to read off newStepCh") + } + } + case msgInfo: + peerKey := m.PeerKey + if peerKey == "" { + peerKey = "local" + } + switch msg := m.Msg.(type) { + case *ProposalMessage: + p := msg.Proposal + log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", + p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) + case *BlockPartMessage: + log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) + case *VoteMessage: + v := msg.Vote + log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, + "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) + } + // internal or from peer + if m.PeerKey == "" { + cs.internalMsgQueue <- m + } else { + cs.peerMsgQueue <- m + } + case timeoutInfo: + log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) + cs.tockChan <- m + default: + return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + } + return nil +} + +// replay only those messages since the last block +func (cs *ConsensusState) catchupReplay(height int) error { + if cs.wal == nil { + log.Warn("consensus msg log is nil") + return nil + } + if !cs.wal.exists { + // new wal, nothing to catchup on + return nil + } + + // starting from end of file, + // read messages until a new height is found + nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, lineBytes, &err) + if err != nil { + panic(Fmt("Failed to read cs_msg_log json: %v", err)) + } + m, ok := msg.Msg.(types.EventDataRoundState) + if ok && m.Step == RoundStepNewHeight.String() { + // TODO: ensure the height matches + return true + } + return false + }) + + if err != nil { + return err + } + + var beginning bool // if we had to go back to the beginning + if c, _ := cs.wal.fp.Seek(0, 1); c == 0 { + beginning = true + } + + log.Notice("Catchup by replaying consensus messages", "n", nLines) + + // now we can replay the latest nLines on consensus state + // note we can't use scan because we've already been reading from the file + reader := bufio.NewReader(cs.wal.fp) + for i := 0; i < nLines; i++ { + msgBytes, err := reader.ReadBytes('\n') + if err == io.EOF { + break + } else if err != nil { + return err + } else if len(msgBytes) == 0 { + continue + } + // the first msg is (usually) the NewHeight event, so we can ignore it + if !beginning && i == 1 { + continue + } + + // NOTE: since the priv key is set when the msgs are received + // it will attempt to eg double sign but we can just ignore it + // since the votes will be replayed and we'll get to the next step + if err := cs.readReplayMessage(msgBytes, nil); err != nil { + return err + } + } + log.Info("Done catchup replay") + return nil +} + +//-------------------------------------------------------- +// replay messages interactively or all at once + +// Interactive playback +func (cs ConsensusState) ReplayConsole(file string) error { + return cs.replay(file, true) +} + +// Full playback, with tests +func (cs ConsensusState) ReplayMessages(file string) error { + return cs.replay(file, false) +} + +// replay all msgs or start the console +func (cs *ConsensusState) replay(file string, console bool) error { + if cs.IsRunning() { + return errors.New("cs is already running, cannot replay") + } + if cs.wal != nil { + return errors.New("cs wal is open, cannot replay") + } + + cs.startForReplay() + + // ensure all new step events are regenerated as expected + newStepCh := subscribeToEvent(cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1) + + // just open the file for reading, no need to use wal + fp, err := os.OpenFile(file, os.O_RDONLY, 0666) + if err != nil { + return err + } + + pb := newPlayback(file, fp, cs, cs.state.Copy()) + defer pb.fp.Close() + + var nextN int // apply N msgs in a row + for pb.scanner.Scan() { + if nextN == 0 && console { + nextN = pb.replayConsoleLoop() + } + + if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil { + return err + } + + if nextN > 0 { + nextN -= 1 + } + pb.count += 1 + } + return nil +} + +//------------------------------------------------ +// playback manager + +type playback struct { + cs *ConsensusState + + fp *os.File + scanner *bufio.Scanner + count int // how many lines/msgs into the file are we + + // replays can be reset to beginning + fileName string // so we can close/reopen the file + genesisState *sm.State // so the replay session knows where to restart from +} + +func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback { + return &playback{ + cs: cs, + fp: fp, + fileName: fileName, + genesisState: genState, + scanner: bufio.NewScanner(fp), + } +} + +// go back count steps by resetting the state and running (pb.count - count) steps +func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { + + pb.cs.Stop() + + newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool) + newCs.SetEventSwitch(pb.cs.evsw) + + newCs.startForReplay() + + pb.fp.Close() + fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0666) + if err != nil { + return err + } + pb.fp = fp + pb.scanner = bufio.NewScanner(fp) + count = pb.count - count + log.Notice(Fmt("Reseting from %d to %d", pb.count, count)) + pb.count = 0 + pb.cs = newCs + for i := 0; pb.scanner.Scan() && i < count; i++ { + if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil { + return err + } + pb.count += 1 + } + return nil +} + +func (cs *ConsensusState) startForReplay() { + cs.BaseService.OnStart() + go cs.receiveRoutine(0) + // since we replay tocks we just ignore ticks + go func() { + for { + select { + case <-cs.tickChan: + case <-cs.Quit: + return + } + } + }() +} + +// console function for parsing input and running commands +func (pb *playback) replayConsoleLoop() int { + for { + fmt.Printf("> ") + bufReader := bufio.NewReader(os.Stdin) + line, more, err := bufReader.ReadLine() + if more { + Exit("input is too long") + } else if err != nil { + Exit(err.Error()) + } + + tokens := strings.Split(string(line), " ") + if len(tokens) == 0 { + continue + } + + switch tokens[0] { + case "next": + // "next" -> replay next message + // "next N" -> replay next N messages + + if len(tokens) == 1 { + return 0 + } else { + i, err := strconv.Atoi(tokens[1]) + if err != nil { + fmt.Println("next takes an integer argument") + } else { + return i + } + } + + case "back": + // "back" -> go back one message + // "back N" -> go back N messages + + // NOTE: "back" is not supported in the state machine design, + // so we restart and replay up to + + // ensure all new step events are regenerated as expected + newStepCh := subscribeToEvent(pb.cs.evsw, "replay-test", types.EventStringNewRoundStep(), 1) + if len(tokens) == 1 { + pb.replayReset(1, newStepCh) + } else { + i, err := strconv.Atoi(tokens[1]) + if err != nil { + fmt.Println("back takes an integer argument") + } else if i > pb.count { + fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count) + } else { + pb.replayReset(i, newStepCh) + } + } + + case "rs": + // "rs" -> print entire round state + // "rs short" -> print height/round/step + // "rs " -> print another field of the round state + + rs := pb.cs.RoundState + if len(tokens) == 1 { + fmt.Println(rs) + } else { + switch tokens[1] { + case "short": + fmt.Printf("%v/%v/%v\n", rs.Height, rs.Round, rs.Step) + case "validators": + fmt.Println(rs.Validators) + case "proposal": + fmt.Println(rs.Proposal) + case "proposal_block": + fmt.Printf("%v %v\n", rs.ProposalBlockParts.StringShort(), rs.ProposalBlock.StringShort()) + case "locked_round": + fmt.Println(rs.LockedRound) + case "locked_block": + fmt.Printf("%v %v\n", rs.LockedBlockParts.StringShort(), rs.LockedBlock.StringShort()) + case "votes": + fmt.Println(rs.Votes.StringIndented(" ")) + + default: + fmt.Println("Unknown option", tokens[1]) + } + } + case "n": + fmt.Println(pb.count) + } + } + return 0 +} diff --git a/consensus/replay_test.go b/consensus/replay_test.go new file mode 100644 index 00000000..2e93450a --- /dev/null +++ b/consensus/replay_test.go @@ -0,0 +1,72 @@ +package consensus + +import ( + "io/ioutil" + "os" + "testing" + "time" + + "github.com/tendermint/tendermint/types" +) + +var testLog = `{"time":"2016-01-18T20:46:00.774Z","msg":[3,{"duration":982632969,"height":1,"round":0,"step":1}]} +{"time":"2016-01-18T20:46:00.776Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]} +{"time":"2016-01-18T20:46:00.776Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"pol_round":-1,"signature":"A1803A1364F6398C154FE45D5649A89129039F18A0FE42B211BADFDF6E81EA53F48F83D3610DDD848C3A5284D3F09BDEB26FA1D856BDF70D48C507BF2453A70E"}}],"peer_key":""}]} +{"time":"2016-01-18T20:46:00.777Z","msg":[2,{"msg":[19,{"Height":1,"Round":0,"Part":{"index":0,"bytes":"0101010F74656E6465726D696E745F746573740101142AA030B15DDFC000000000000000000000000000000114C4B01D3810579550997AC5641E759E20D99B51C10001000100","proof":{"aunts":[]}}}],"peer_key":""}]} +{"time":"2016-01-18T20:46:00.781Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrevote"}]} +{"time":"2016-01-18T20:46:00.781Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":1,"block_hash":"E05D1DB8DEC7CDA507A42C8FF208EE4317C663F6","block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"signature":"88F5708C802BEE54EFBF438967FBC6C6EAAFC41258A85D92B9B055481175BE9FA71007B1AAF2BFBC3BF3CC0542DB48A9812324B7BBA7307446CCDBF029077F07"}}],"peer_key":""}]} +{"time":"2016-01-18T20:46:00.786Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPrecommit"}]} +{"time":"2016-01-18T20:46:00.786Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":1,"round":0,"type":2,"block_hash":"E05D1DB8DEC7CDA507A42C8FF208EE4317C663F6","block_parts_header":{"total":1,"hash":"B6227255FF20758326B0B7DFF529F20E33E58F45"},"signature":"65B0C9D2A8C9919FC9B036F82C3F1818E706E8BC066A78D99D3316E4814AB06594841E387B323AA7773F926D253C1E4D4A0930F7A8C8AE1E838CA15C673B2B02"}}],"peer_key":""}]} +` + +func TestReplayCatchup(t *testing.T) { + // write the needed wal to file + f, err := ioutil.TempFile(os.TempDir(), "replay_test_") + if err != nil { + t.Fatal(err) + } + name := f.Name() + _, err = f.WriteString(testLog) + if err != nil { + t.Fatal(err) + } + f.Close() + + cs := fixedConsensusState() + + // we've already precommitted on the first block + // without replay catchup we would be halted here forever + cs.privValidator.LastHeight = 1 // first block + cs.privValidator.LastStep = 3 // precommit + + newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 0) + + // start timeout and receive routines + cs.startRoutines(0) + + // open wal and run catchup messages + openWAL(t, cs, name) + if err := cs.catchupReplay(cs.Height); err != nil { + t.Fatalf("Error on catchup replay %v", err) + } + + cs.enterNewRound(cs.Height, cs.Round) + + after := time.After(time.Second * 2) + select { + case <-newBlockCh: + case <-after: + t.Fatal("Timed out waiting for new block") + } + +} + +func openWAL(t *testing.T, cs *ConsensusState, file string) { + // open the wal + wal, err := NewWAL(file) + if err != nil { + t.Fatal(err) + } + wal.exists = true + cs.wal = wal +} diff --git a/consensus/state.go b/consensus/state.go index 84d7fe2f..2fa3cd2c 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -19,7 +19,8 @@ import ( ) var ( - timeoutPropose = 3000 * time.Millisecond // Maximum duration of RoundStepPropose + timeoutPropose0 = 3000 * time.Millisecond // Wait this long for a proposal + timeoutProposeDelta = 0500 * time.Millisecond // timeoutProposeN is timeoutPropose0 + timeoutProposeDelta*N timeoutPrevote0 = 1000 * time.Millisecond // After any +2/3 prevotes received, wait this long for stragglers. timeoutPrevoteDelta = 0500 * time.Millisecond // timeoutPrevoteN is timeoutPrevote0 + timeoutPrevoteDelta*N timeoutPrecommit0 = 1000 * time.Millisecond // After any +2/3 precommits received, wait this long for stragglers. @@ -153,20 +154,20 @@ var ( // msgs from the reactor which may update the state type msgInfo struct { - msg ConsensusMessage - peerKey string + Msg ConsensusMessage `json:"msg"` + PeerKey string `json:"peer_key"` } // internally generated messages which may update the state type timeoutInfo struct { - duration time.Duration - height int - round int - step RoundStepType + Duration time.Duration `json:"duration"` + Height int `json:"height"` + Round int `json:"round"` + Step RoundStepType `json:"step"` } func (ti *timeoutInfo) String() string { - return fmt.Sprintf("%v ; %d/%d %v", ti.duration, ti.height, ti.round, ti.step) + return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step) } // Tracks consensus state across block heights and rounds. @@ -190,6 +191,8 @@ type ConsensusState struct { evsw *events.EventSwitch + wal *WAL + nSteps int // used for testing to limit the number of transitions the state makes } @@ -248,14 +251,21 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { } func (cs *ConsensusState) OnStart() error { - cs.BaseService.OnStart() + cs.QuitService.OnStart() - // first we schedule the round (no go routines) - // then we start the timeout and receive routines. - // tickChan is buffered so scheduleRound0 will finish. - // Then all further access to the RoundState is through the receiveRoutine - cs.scheduleRound0(cs.Height) + // start timeout and receive routines cs.startRoutines(0) + + // we may have lost some votes if the process crashed + // reload from consensus log to catchup + if err := cs.catchupReplay(cs.Height); err != nil { + log.Error("Error on catchup replay", "error", err.Error()) + // let's go for it anyways, maybe we're fine + } + + // schedule the first round! + cs.scheduleRound0(cs.Height) + return nil } @@ -270,6 +280,18 @@ func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() } +// Open file to log all consensus messages and timeouts for deterministic accountability +func (cs *ConsensusState) OpenWAL(file string) (err error) { + cs.mtx.Lock() + defer cs.mtx.Unlock() + wal, err := NewWAL(file) + if err != nil { + return err + } + cs.wal = wal + return nil +} + //------------------------------------------------------------ // Public interface for passing messages into the consensus state, // possibly causing a state transition @@ -372,8 +394,8 @@ func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { if state.LastBlockHeight == 0 { return } - lastPrecommits := types.NewVoteSet(state.LastBlockHeight, 0, types.VoteTypePrecommit, state.LastValidators) seenValidation := cs.blockStore.LoadSeenValidation(state.LastBlockHeight) + lastPrecommits := types.NewVoteSet(state.LastBlockHeight, seenValidation.Round(), types.VoteTypePrecommit, state.LastValidators) for idx, precommit := range seenValidation.Precommits { if precommit == nil { continue @@ -455,10 +477,12 @@ func (cs *ConsensusState) updateToState(state *sm.State) { } func (cs *ConsensusState) newStep() { + rs := cs.RoundStateEvent() + cs.wal.Save(rs) cs.nSteps += 1 // newStep is called by updateToStep in NewConsensusState before the evsw is set! if cs.evsw != nil { - cs.evsw.FireEvent(types.EventStringNewRoundStep(), cs.RoundStateEvent()) + cs.evsw.FireEvent(types.EventStringNewRoundStep(), rs) } } @@ -477,13 +501,13 @@ func (cs *ConsensusState) timeoutRoutine() { log.Debug("Received tick", "old_ti", ti, "new_ti", newti) // ignore tickers for old height/round/step - if newti.height < ti.height { + if newti.Height < ti.Height { continue - } else if newti.height == ti.height { - if newti.round < ti.round { + } else if newti.Height == ti.Height { + if newti.Round < ti.Round { continue - } else if newti.round == ti.round { - if ti.step > 0 && newti.step <= ti.step { + } else if newti.Round == ti.Round { + if ti.Step > 0 && newti.Step <= ti.Step { continue } } @@ -492,16 +516,16 @@ func (cs *ConsensusState) timeoutRoutine() { ti = newti // if the newti has duration == 0, we relay to the tockChan immediately (no timeout) - if ti.duration == time.Duration(0) { + if ti.Duration == time.Duration(0) { go func(t timeoutInfo) { cs.tockChan <- t }(ti) continue } - log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + log.Debug("Scheduling timeout", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) cs.timeoutTicker.Stop() - cs.timeoutTicker = time.NewTicker(ti.duration) + cs.timeoutTicker = time.NewTicker(ti.Duration) case <-cs.timeoutTicker.C: - log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + log.Info("Timed out", "dur", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) cs.timeoutTicker.Stop() // go routine here gaurantees timeoutRoutine doesn't block. // Determinism comes from playback in the receiveRoutine. @@ -537,17 +561,24 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { select { case mi = <-cs.peerMsgQueue: + cs.wal.Save(mi) // handles proposals, block parts, votes // may generate internal events (votes, complete proposals, 2/3 majorities) cs.handleMsg(mi, rs) case mi = <-cs.internalMsgQueue: + cs.wal.Save(mi) // handles proposals, block parts, votes cs.handleMsg(mi, rs) case ti := <-cs.tockChan: + cs.wal.Save(ti) // if the timeout is relevant to the rs // go to the next step cs.handleTimeout(ti, rs) case <-cs.Quit: + // close wal now that we're done writing to it + if cs.wal != nil { + cs.wal.Close() + } return } } @@ -559,7 +590,7 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { defer cs.mtx.Unlock() var err error - msg, peerKey := mi.msg, mi.peerKey + msg, peerKey := mi.Msg, mi.PeerKey switch msg := msg.(type) { case *ProposalMessage: // will not cause transition. @@ -592,10 +623,10 @@ func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { } func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { - log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step) + log.Debug("Received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) // timeouts must be for current height, round, step - if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) { + if ti.Height != rs.Height || ti.Round < rs.Round || (ti.Round == rs.Round && ti.Step < rs.Step) { log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step) return } @@ -604,22 +635,22 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { cs.mtx.Lock() defer cs.mtx.Unlock() - switch ti.step { + switch ti.Step { case RoundStepNewHeight: // NewRound event fired from enterNewRound. - // Do we want a timeout event too? - cs.enterNewRound(ti.height, 0) + // XXX: should we fire timeout here? + cs.enterNewRound(ti.Height, 0) case RoundStepPropose: cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent()) - cs.enterPrevote(ti.height, ti.round) + cs.enterPrevote(ti.Height, ti.Round) case RoundStepPrevoteWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.enterPrecommit(ti.height, ti.round) + cs.enterPrecommit(ti.Height, ti.Round) case RoundStepPrecommitWait: cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent()) - cs.enterNewRound(ti.height, ti.round+1) + cs.enterNewRound(ti.Height, ti.Round+1) default: - panic(Fmt("Invalid timeout step: %v", ti.step)) + panic(Fmt("Invalid timeout step: %v", ti.Step)) } } @@ -676,9 +707,6 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { // Enter: from NewRound(height,round). func (cs *ConsensusState) enterPropose(height int, round int) { - // cs.mtx.Lock() - // cs.mtx.Unlock() - if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) { log.Debug(Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return @@ -699,7 +727,7 @@ func (cs *ConsensusState) enterPropose(height int, round int) { }() // This step times out after `timeoutPropose` - cs.scheduleTimeout(timeoutPropose, height, round, RoundStepPropose) + cs.scheduleTimeout(timeoutPropose0+timeoutProposeDelta*time.Duration(round), height, round, RoundStepPropose) // Nothing more to do if we're not a validator if cs.privValidator == nil { @@ -826,8 +854,6 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Prevote for LockedBlock if we're locked, or ProposalBlock if valid. // Otherwise vote nil. func (cs *ConsensusState) enterPrevote(height int, round int) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevote <= cs.Step) { log.Debug(Fmt("enterPrevote(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return @@ -891,8 +917,6 @@ func (cs *ConsensusState) doPrevote(height int, round int) { // Enter: any +2/3 prevotes at next round. func (cs *ConsensusState) enterPrevoteWait(height int, round int) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevoteWait <= cs.Step) { log.Debug(Fmt("enterPrevoteWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return @@ -919,8 +943,6 @@ func (cs *ConsensusState) enterPrevoteWait(height int, round int) { // else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil, // else, precommit nil otherwise. func (cs *ConsensusState) enterPrecommit(height int, round int) { - //cs.mtx.Lock() - // defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommit <= cs.Step) { log.Debug(Fmt("enterPrecommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return @@ -1016,8 +1038,6 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) { // Enter: any +2/3 precommits for next round. func (cs *ConsensusState) enterPrecommitWait(height int, round int) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommitWait <= cs.Step) { log.Debug(Fmt("enterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) return @@ -1040,8 +1060,6 @@ func (cs *ConsensusState) enterPrecommitWait(height int, round int) { // Enter: +2/3 precommits for block func (cs *ConsensusState) enterCommit(height int, commitRound int) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() if cs.Height != height || RoundStepCommit <= cs.Step { log.Debug(Fmt("enterCommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step)) return @@ -1107,9 +1125,6 @@ func (cs *ConsensusState) tryFinalizeCommit(height int) { // Increment height and goto RoundStepNewHeight func (cs *ConsensusState) finalizeCommit(height int) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() - if cs.Height != height || cs.Step != RoundStepCommit { log.Debug(Fmt("finalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step)) return @@ -1189,9 +1204,6 @@ func (cs *ConsensusState) finalizeCommit(height int) { //----------------------------------------------------------------------------- func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() - // Already have one if cs.Proposal != nil { return nil @@ -1226,9 +1238,6 @@ func (cs *ConsensusState) setProposal(proposal *types.Proposal) error { // NOTE: block is not necessarily valid. // This can trigger us to go into enterPrevote asynchronously (before we timeout of propose) or to attempt to commit func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (added bool, err error) { - //cs.mtx.Lock() - //defer cs.mtx.Unlock() - // Blocks might be reused, so round mismatch is OK if cs.Height != height { return false, nil @@ -1248,7 +1257,8 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part) (ad var n int var err error cs.ProposalBlock = wire.ReadBinary(&types.Block{}, cs.ProposalBlockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block) - log.Info("Received complete proposal", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) + // NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal + log.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) if cs.Step == RoundStepPropose && cs.isProposalComplete() { // Move onto the next step cs.enterPrevote(height, cs.Round) @@ -1305,7 +1315,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string added, address, err = cs.LastCommit.AddByIndex(valIndex, vote) if added { log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) - cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) + cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote}) } return @@ -1316,7 +1326,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string height := cs.Height added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey) if added { - cs.evsw.FireEvent(types.EventStringVote(), &types.EventDataVote{valIndex, address, vote}) + cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote}) switch vote.Type { case types.VoteTypePrevote: diff --git a/consensus/state_test.go b/consensus/state_test.go index beaafd49..1def6ae8 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -46,11 +46,12 @@ x * TestHalt1 - if we see +2/3 precommits after timing out into new round, we sh func init() { fmt.Println("") - timeoutPropose = 500 * time.Millisecond + timeoutPropose0 = 100 * time.Millisecond + timeoutProposeDelta = 1 * time.Millisecond } func TestProposerSelection0(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) height, round := cs1.Height, cs1.Round newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) @@ -84,7 +85,7 @@ func TestProposerSelection0(t *testing.T) { // Now let's do it all again, but starting from round 2 instead of 0 func TestProposerSelection2(t *testing.T) { - cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators + cs1, vss := randConsensusState(4) // test needs more work for more than 3 validators newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1) @@ -113,7 +114,7 @@ func TestProposerSelection2(t *testing.T) { // a non-validator should timeout into the prevote round func TestEnterProposeNoPrivValidator(t *testing.T) { - cs, _ := simpleConsensusState(1) + cs, _ := randConsensusState(1) cs.SetPrivValidator(nil) height, round := cs.Height, cs.Round @@ -123,7 +124,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { startTestRound(cs, height, round) // if we're not a validator, EnterPropose should timeout - ticker := time.NewTicker(timeoutPropose * 2) + ticker := time.NewTicker(timeoutPropose0 * 2) select { case <-timeoutCh: case <-ticker.C: @@ -138,7 +139,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) { // a validator should not timeout of the prevote round (TODO: unless the block is really big!) func TestEnterProposeYesPrivValidator(t *testing.T) { - cs, _ := simpleConsensusState(1) + cs, _ := randConsensusState(1) height, round := cs.Height, cs.Round // Listen for propose timeout event @@ -164,7 +165,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { } // if we're a validator, enterPropose should not timeout - ticker := time.NewTicker(timeoutPropose * 2) + ticker := time.NewTicker(timeoutPropose0 * 2) select { case <-timeoutCh: t.Fatal("Expected EnterPropose not to timeout") @@ -174,7 +175,7 @@ func TestEnterProposeYesPrivValidator(t *testing.T) { } func TestBadProposal(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) height, round := cs1.Height, cs1.Round cs2 := vss[1] @@ -230,7 +231,7 @@ func TestBadProposal(t *testing.T) { // propose, prevote, and precommit a block func TestFullRound1(t *testing.T) { - cs, vss := simpleConsensusState(1) + cs, vss := randConsensusState(1) height, round := cs.Height, cs.Round voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1) @@ -258,7 +259,7 @@ func TestFullRound1(t *testing.T) { // nil is proposed, so prevote and precommit nil func TestFullRoundNil(t *testing.T) { - cs, vss := simpleConsensusState(1) + cs, vss := randConsensusState(1) height, round := cs.Height, cs.Round voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1) @@ -276,7 +277,7 @@ func TestFullRoundNil(t *testing.T) { // run through propose, prevote, precommit commit with two validators // where the first validator has to wait for votes from the second func TestFullRound2(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] height, round := cs1.Height, cs1.Round @@ -317,7 +318,7 @@ func TestFullRound2(t *testing.T) { // two validators, 4 rounds. // two vals take turns proposing. val1 locks on first one, precommits nil on everything else func TestLockNoPOL(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] height := cs1.Height @@ -480,7 +481,7 @@ func TestLockNoPOL(t *testing.T) { // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka func TestLockPOLRelock(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) @@ -588,7 +589,7 @@ func TestLockPOLRelock(t *testing.T) { // 4 vals, one precommits, other 3 polka at next round, so we unlock and precomit the polka func TestLockPOLUnlock(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) @@ -679,7 +680,7 @@ func TestLockPOLUnlock(t *testing.T) { // then a polka at round 2 that we lock on // then we see the polka from round 1 but shouldn't unlock func TestLockPOLSafety1(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) @@ -798,7 +799,7 @@ func TestLockPOLSafety1(t *testing.T) { // What we want: // dont see P0, lock on P1 at R1, dont unlock using P0 at R2 func TestLockPOLSafety2(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) @@ -888,7 +889,7 @@ func TestLockPOLSafety2(t *testing.T) { /* func TestSlashingPrevotes(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] @@ -923,7 +924,7 @@ func TestSlashingPrevotes(t *testing.T) { } func TestSlashingPrecommits(t *testing.T) { - cs1, vss := simpleConsensusState(2) + cs1, vss := randConsensusState(2) cs2 := vss[1] @@ -968,7 +969,7 @@ func TestSlashingPrecommits(t *testing.T) { // 4 vals. // we receive a final precommit after going into next round, but others might have gone to commit already! func TestHalt1(t *testing.T) { - cs1, vss := simpleConsensusState(4) + cs1, vss := randConsensusState(4) cs2, cs3, cs4 := vss[1], vss[2], vss[3] proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) diff --git a/consensus/wal.go b/consensus/wal.go new file mode 100644 index 00000000..46f7a9df --- /dev/null +++ b/consensus/wal.go @@ -0,0 +1,117 @@ +package consensus + +import ( + "bufio" + "os" + "time" + + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/types" +) + +//-------------------------------------------------------- +// types and functions for savings consensus messages + +type ConsensusLogMessage struct { + Time time.Time `json:"time"` + Msg ConsensusLogMessageInterface `json:"msg"` +} + +type ConsensusLogMessageInterface interface{} + +var _ = wire.RegisterInterface( + struct{ ConsensusLogMessageInterface }{}, + wire.ConcreteType{types.EventDataRoundState{}, 0x01}, + wire.ConcreteType{msgInfo{}, 0x02}, + wire.ConcreteType{timeoutInfo{}, 0x03}, +) + +//-------------------------------------------------------- +// Simple write-ahead logger + +// Write ahead logger writes msgs to disk before they are processed. +// Can be used for crash-recovery and deterministic replay +// TODO: currently the wal is overwritten during replay catchup +// give it a mode so it's either reading or appending - must read to end to start appending again +type WAL struct { + fp *os.File + exists bool // if the file already existed (restarted process) +} + +func NewWAL(file string) (*WAL, error) { + var walExists bool + if _, err := os.Stat(file); err == nil { + walExists = true + } + fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) + if err != nil { + return nil, err + } + return &WAL{ + fp: fp, + exists: walExists, + }, nil +} + +// called in newStep and for each pass in receiveRoutine +func (wal *WAL) Save(msg ConsensusLogMessageInterface) { + if wal != nil { + var n int + var err error + wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, wal.fp, &n, &err) + wire.WriteTo([]byte("\n"), wal.fp, &n, &err) // one message per line + if err != nil { + PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, msg)) + } + } +} + +// Must not be called concurrently. +func (wal *WAL) Close() { + if wal != nil { + wal.fp.Close() + } +} + +func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { + var current int64 + // start at the end + current, err = wal.fp.Seek(0, 2) + if err != nil { + return + } + + // backup until we find the the right line + // current is how far we are from the beginning + for { + current -= 1 + if current < 0 { + wal.fp.Seek(0, 0) // back to beginning + return + } + // backup one and read a new byte + if _, err = wal.fp.Seek(current, 0); err != nil { + return + } + b := make([]byte, 1) + if _, err = wal.fp.Read(b); err != nil { + return + } + if b[0] == '\n' || len(b) == 0 { + nLines += 1 + // read a full line + reader := bufio.NewReader(wal.fp) + lineBytes, _ := reader.ReadBytes('\n') + if len(lineBytes) == 0 { + continue + } + + if found(lineBytes) { + wal.fp.Seek(0, 1) // (?) + wal.fp.Seek(current, 0) + return + } + } + } +} diff --git a/consensus/wal_test.go b/consensus/wal_test.go new file mode 100644 index 00000000..8348ca24 --- /dev/null +++ b/consensus/wal_test.go @@ -0,0 +1,76 @@ +package consensus + +import ( + "io/ioutil" + "os" + "path" + "strings" + "testing" +) + +var testTxt = `{"time":"2016-01-16T04:42:00.390Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrevote"}]} +{"time":"2016-01-16T04:42:00.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":1,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"4CC6845A128E723A299B470CCBB2A158612AA51321447F6492F3DA57D135C27FCF4124B3B19446A248252BDA45B152819C76AAA5FD35E1C07091885CE6955E05"}}],"peer_key":""}]} +{"time":"2016-01-16T04:42:00.392Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrecommit"}]} +{"time":"2016-01-16T04:42:00.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":2,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"1B9924E010F47E0817695DFE462C531196E5A12632434DE12180BBA3EFDAD6B3960FDB9357AFF085EB61729A7D4A6AD8408555D7569C87D9028F280192FD4E05"}}],"peer_key":""}]} +{"time":"2016-01-16T04:42:00.393Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepCommit"}]} +{"time":"2016-01-16T04:42:00.395Z","msg":[1,{"height":28220,"round":0,"step":"RoundStepNewHeight"}]}` + +func TestSeek(t *testing.T) { + f, err := ioutil.TempFile(os.TempDir(), "seek_test_") + if err != nil { + t.Fatal(err) + } + + stat, _ := f.Stat() + name := stat.Name() + + _, err = f.WriteString(testTxt) + if err != nil { + t.Fatal(err) + } + f.Close() + + wal, err := NewWAL(path.Join(os.TempDir(), name)) + if err != nil { + t.Fatal(err) + } + + keyWord := "Precommit" + n, err := wal.SeekFromEnd(func(b []byte) bool { + if strings.Contains(string(b), keyWord) { + return true + } + return false + }) + if err != nil { + t.Fatal(err) + } + + // confirm n + spl := strings.Split(testTxt, "\n") + var i int + var s string + for i, s = range spl { + if strings.Contains(s, keyWord) { + break + } + } + // n is lines from the end. + spl = spl[i:] + if n != len(spl) { + t.Fatalf("Wrong nLines. Got %d, expected %d", n, len(spl)) + } + + b, err := ioutil.ReadAll(wal.fp) + if err != nil { + t.Fatal(err) + } + // first char is a \n + spl2 := strings.Split(strings.Trim(string(b), "\n"), "\n") + for i, s := range spl { + if s != spl2[i] { + t.Fatalf("Mismatch. Got %s, expected %s", spl2[i], s) + } + } + +} diff --git a/node/node.go b/node/node.go index bda20084..3d02589c 100644 --- a/node/node.go +++ b/node/node.go @@ -84,6 +84,12 @@ func NewNode(privValidator *types.PrivValidator) *Node { consensusReactor.SetPrivValidator(privValidator) } + // deterministic accountability + err = consensusState.OpenWAL(config.GetString("cswal")) + if err != nil { + log.Error("Failed to open cswal", "error", err.Error()) + } + // Make p2p network switch sw := p2p.NewSwitch() sw.AddReactor("MEMPOOL", mempoolReactor) @@ -219,6 +225,49 @@ func makeNodeInfo(sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo { return nodeInfo } +// Get a connection to the proxyAppConn addr. +// Check the current hash, and panic if it doesn't match. +func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) { + // use local app (for testing) + if addr == "local" { + app := example.NewCounterApplication(true) + mtx := new(sync.Mutex) + proxyAppConn = proxy.NewLocalAppConn(mtx, app) + } else { + proxyConn, err := Connect(addr) + if err != nil { + Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) + } + remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024) + remoteApp.Start() + + proxyAppConn = remoteApp + } + + // Check the hash + currentHash, err := proxyAppConn.GetHashSync() + if err != nil { + PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", err)) + } + if !bytes.Equal(hash, currentHash) { + PanicCrisis(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, currentHash)) + } + + return proxyAppConn +} + +// Load the most recent state from "state" db, +// or create a new one (and save) from genesis. +func getState() *sm.State { + stateDB := dbm.GetDB("state") + state := sm.LoadState(stateDB) + if state == nil { + state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + state.Save() + } + return state +} + //------------------------------------------------------------------------------ // Users wishing to use an external signer for their validators @@ -283,45 +332,65 @@ func RunNode() { }) } -// Load the most recent state from "state" db, -// or create a new one (and save) from genesis. -func getState() *sm.State { +//------------------------------------------------------------------------------ +// replay + +// convenience for replay mode +func newConsensusState() *consensus.ConsensusState { + // Get BlockStore + blockStoreDB := dbm.GetDB("blockstore") + blockStore := bc.NewBlockStore(blockStoreDB) + + // Get State stateDB := dbm.GetDB("state") - state := sm.LoadState(stateDB) - if state == nil { - state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) - state.Save() - } - return state -} + state := sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) -// Get a connection to the proxyAppConn addr. -// Check the current hash, and panic if it doesn't match. -func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) { - // use local app (for testing) - if addr == "local" { - app := example.NewCounterApplication(true) - mtx := new(sync.Mutex) - proxyAppConn = proxy.NewLocalAppConn(mtx, app) - } else { - proxyConn, err := Connect(addr) - if err != nil { - Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) - } - remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024) - remoteApp.Start() + // Create two proxyAppConn connections, + // one for the consensus and one for the mempool. + proxyAddr := config.GetString("proxy_app") + proxyAppConnMempool := getProxyApp(proxyAddr, state.AppHash) + proxyAppConnConsensus := getProxyApp(proxyAddr, state.AppHash) - proxyAppConn = remoteApp - } + // add the chainid to the global config + config.Set("chain_id", state.ChainID) - // Check the hash - currentHash, err := proxyAppConn.GetHashSync() + // Make event switch + eventSwitch := events.NewEventSwitch() + _, err := eventSwitch.Start() if err != nil { - PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", err)) - } - if !bytes.Equal(hash, currentHash) { - PanicCrisis(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, currentHash)) + Exit(Fmt("Failed to start event switch: %v", err)) } - return proxyAppConn + mempool := mempl.NewMempool(proxyAppConnMempool) + + consensusState := consensus.NewConsensusState(state.Copy(), proxyAppConnConsensus, blockStore, mempool) + consensusState.SetEventSwitch(eventSwitch) + return consensusState +} + +func RunReplayConsole() { + walFile := config.GetString("cswal") + if walFile == "" { + Exit("cswal file name not set in tendermint config") + } + + consensusState := newConsensusState() + + if err := consensusState.ReplayConsole(walFile); err != nil { + Exit(Fmt("Error during consensus replay: %v", err)) + } +} + +func RunReplay() { + walFile := config.GetString("cswal") + if walFile == "" { + Exit("cswal file name not set in tendermint config") + } + + consensusState := newConsensusState() + + if err := consensusState.ReplayMessages(walFile); err != nil { + Exit(Fmt("Error during consensus replay: %v", err)) + } + log.Notice("Replay run successfully") } diff --git a/types/events.go b/types/events.go index a316cfc5..7bcd2ea2 100644 --- a/types/events.go +++ b/types/events.go @@ -75,6 +75,7 @@ type EventDataApp struct { Data []byte `json:"bytes"` } +// NOTE: This goes into the replay WAL type EventDataRoundState struct { Height int `json:"height"` Round int `json:"round"`