diff --git a/consensus/replay.go b/consensus/replay.go index ce34dd33..4590acaa 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -2,8 +2,10 @@ package consensus import ( "bufio" + "bytes" "errors" "fmt" + "io" "os" "reflect" "strconv" @@ -14,6 +16,7 @@ import ( "github.com/tendermint/go-wire" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/tailseek" "github.com/tendermint/tendermint/types" ) @@ -60,6 +63,74 @@ func (cs ConsensusState) ReplayMessages(file string) error { return cs.replay(file, false) } +func (cs *ConsensusState) catchupReplay(height int) error { + if !cs.msgLogExists { + return nil + } + + if cs.msgLogFP == nil { + log.Warn("consensus msg log is nil") + return nil + } + + log.Notice("Catchup by replaying consensus messages") + f := cs.msgLogFP + + n, err := seek.SeekFromEndOfFile(f, func(lineBytes []byte) bool { + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, lineBytes, &err) + if err != nil { + panic(Fmt("Failed to read cs_msg_log json: %v", err)) + } + m, ok := msg.Msg.(*types.EventDataRoundState) + if ok && m.Step == RoundStepNewHeight.String() { + r, err := f.Seek(0, 1) + // TODO: ensure the height matches + return true + } + return false + }) + + if err != nil { + return err + } + + // we found it, now we can replay everything + pb := newPlayback("", cs.msgLogFP, cs, nil, cs.state.Copy()) + + reader := bufio.NewReader(cs.msgLogFP) + i := 0 + for { + i += 1 + msgBytes, err := reader.ReadBytes('\n') + if err == io.EOF { + break + } else if err != nil { + return err + } else if len(msgBytes) == 0 { + continue + } + // the first msg is the NewHeight event, so we can ignore it + if i == 1 { + continue + } + + // NOTE: since the priv key is set when the msgs are received + // it will attempt to eg double sign but we can just ignore it + // since the votes will be replayed and we'll get to the next step + if err := pb.readReplayMessage(msgBytes); err != nil { + return err + + } + if i >= n { + break + } + } + return nil + +} + // replay all msgs or start the console func (cs *ConsensusState) replay(file string, console bool) error { if cs.IsRunning() { @@ -91,7 +162,7 @@ func (cs *ConsensusState) replay(file string, console bool) error { nextN = pb.replayConsoleLoop() } - if err := pb.readReplayMessage(); err != nil { + if err := pb.readReplayMessage(pb.scanner.Bytes()); err != nil { return err } @@ -152,7 +223,7 @@ func (pb *playback) replayReset(count int) error { pb.count = 0 pb.cs = newCs for i := 0; pb.scanner.Scan() && i < count; i++ { - if err := pb.readReplayMessage(); err != nil { + if err := pb.readReplayMessage(pb.scanner.Bytes()); err != nil { return err } pb.count += 1 @@ -264,10 +335,10 @@ func (pb *playback) replayConsoleLoop() int { return 0 } -func (pb *playback) readReplayMessage() error { +func (pb *playback) readReplayMessage(msgBytes []byte) error { var err error var msg ConsensusLogMessage - wire.ReadJSON(&msg, pb.scanner.Bytes(), &err) + wire.ReadJSON(&msg, msgBytes, &err) if err != nil { return fmt.Errorf("Error reading json data: %v", err) } @@ -278,14 +349,16 @@ func (pb *playback) readReplayMessage() error { log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) // these are playback checks ticker := time.After(time.Second * 2) - select { - case mi := <-pb.newStepCh: - m2 := mi.(*types.EventDataRoundState) - if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { - return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) + if pb.newStepCh != nil { + select { + case mi := <-pb.newStepCh: + m2 := mi.(*types.EventDataRoundState) + if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { + return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) + } + case <-ticker: + return fmt.Errorf("Failed to read off newStepCh") } - case <-ticker: - return fmt.Errorf("Failed to read off newStepCh") } case msgInfo: peerKey := m.PeerKey @@ -318,3 +391,46 @@ func (pb *playback) readReplayMessage() error { } return nil } + +// Read lines starting from the end of the file until we read a line that causes found to return true +func SeekFromEndOfFile(f *os.File, found func([]byte) bool) (nLines int, err error) { + var current int64 + // start at the end + current, err = f.Seek(0, 2) + if err != nil { + fmt.Println("1") + return + } + + // backup until we find the the right line + for { + current -= 1 + if current < 0 { + return + } + // backup one and read a new byte + if _, err = f.Seek(current, 0); err != nil { + fmt.Println("2", current) + return + } + b := make([]byte, 1) + if _, err = f.Read(b); err != nil { + return + } + if b[0] == '\n' || len(b) == 0 { + nLines += 1 + + // read a full line + reader := bufio.NewReader(f) + lineBytes, _ := reader.ReadBytes('\n') + if len(lineBytes) == 0 { + continue + } + + if found(lineBytes) { + f.Seek(current, 0) + return + } + } + } +} diff --git a/consensus/state.go b/consensus/state.go index d0d47a82..354038e7 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -192,7 +192,9 @@ type ConsensusState struct { evsw *events.EventSwitch - msgLogFP *os.File // file we write all msgs and timeouts to, in order (for deterministic replay) + msgLogFP *os.File // file we write all msgs and timeouts to, in order (for deterministic replay) + msgLogExists bool // if the file already existed (restarted process) + done chan struct{} // used to wait until the receiveRoutine quits so we can close the file nSteps int // used for testing to limit the number of transitions the state makes } @@ -207,6 +209,7 @@ func NewConsensusState(state *sm.State, proxyAppConn proxy.AppConn, blockStore * timeoutTicker: new(time.Ticker), tickChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize), + done: make(chan struct{}), } cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -252,14 +255,21 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) { } func (cs *ConsensusState) OnStart() error { - cs.BaseService.OnStart() + cs.QuitService.OnStart() - // first we schedule the round (no go routines) - // then we start the timeout and receive routines. - // tickChan is buffered so scheduleRound0 will finish. - // Then all further access to the RoundState is through the receiveRoutine - cs.scheduleRound0(cs.Height) + // start timeout and receive routines cs.startRoutines(0) + + // we may have lost some votes if the process crashed + // reload from consensus log to catchup + if err := cs.catchupReplay(cs.Height); err != nil { + log.Error("Error on catchup replay", "error", err.Error()) + // let's go for it anyways, maybe we're fine + } + + // schedule the first round! + cs.scheduleRound0(cs.Height) + return nil } @@ -272,6 +282,9 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() + + // wait to quit the receiveRoutine + <-cs.done if cs.msgLogFP != nil { cs.msgLogFP.Close() } @@ -281,7 +294,10 @@ func (cs *ConsensusState) OnStop() { func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { cs.mtx.Lock() defer cs.mtx.Unlock() - cs.msgLogFP, err = os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666) + if _, err := os.Stat(file); err == nil { + cs.msgLogExists = true + } + cs.msgLogFP, err = os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666) return err } @@ -568,6 +584,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { // go to the next step cs.handleTimeout(ti, rs) case <-cs.Quit: + close(cs.done) return } } diff --git a/node/node.go b/node/node.go index 1208a4b5..c090e871 100644 --- a/node/node.go +++ b/node/node.go @@ -85,7 +85,10 @@ func NewNode(privValidator *types.PrivValidator) *Node { } // deterministic accountability - consensusState.OpenFileForMessageLog(config.GetString("cs_msg_log")) + err = consensusState.OpenFileForMessageLog(config.GetString("cs_msg_log")) + if err != nil { + log.Error("Failed to open cs_msg_log", "error", err.Error()) + } // Make p2p network switch sw := p2p.NewSwitch()