From 26f0e2bc2d76d331f5bb534d691c149f30035947 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 18 Jan 2016 14:10:05 -0500 Subject: [PATCH] msgLogFP -> write ahead log --- config/tendermint/config.go | 2 +- config/tendermint_test/config.go | 2 +- consensus/replay.go | 292 +++++++++++-------------------- consensus/state.go | 36 ++-- consensus/wal.go | 113 ++++++++++++ consensus/wal_test.go | 76 ++++++++ node/node.go | 110 ++++++------ 7 files changed, 368 insertions(+), 263 deletions(-) create mode 100644 consensus/wal.go create mode 100644 consensus/wal_test.go diff --git a/config/tendermint/config.go b/config/tendermint/config.go index e6c74025..bdeb6add 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -67,7 +67,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("rpc_laddr", "0.0.0.0:46657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") - mapConfig.SetDefault("cs_msg_log", rootDir+"/cs_msg_log") + mapConfig.SetDefault("cswal", rootDir+"/cswal") return mapConfig } diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index fa1fe3de..04d300d5 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -75,7 +75,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("rpc_laddr", "0.0.0.0:36657") mapConfig.SetDefault("prof_laddr", "") mapConfig.SetDefault("revision_file", rootDir+"/revision") - mapConfig.SetDefault("cs_msg_log", rootDir+"/cs_msg_log") + mapConfig.SetDefault("cswal", rootDir+"/cswal") return mapConfig } diff --git a/consensus/replay.go b/consensus/replay.go index 611f5e7a..5d193efe 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -15,67 +15,83 @@ import ( "github.com/tendermint/go-wire" sm "github.com/tendermint/tendermint/state" - "github.com/tendermint/tendermint/tailseek" "github.com/tendermint/tendermint/types" ) -//-------------------------------------------------------- -// types and functions for savings consensus messages - -type ConsensusLogMessage struct { - Time time.Time `json:"time"` - Msg ConsensusLogMessageInterface `json:"msg"` -} - -type ConsensusLogMessageInterface interface{} - -var _ = wire.RegisterInterface( - struct{ ConsensusLogMessageInterface }{}, - wire.ConcreteType{&types.EventDataRoundState{}, 0x01}, - wire.ConcreteType{msgInfo{}, 0x02}, - wire.ConcreteType{timeoutInfo{}, 0x03}, -) - -// called in newStep and for each pass in receiveRoutine -func (cs *ConsensusState) saveMsg(msg ConsensusLogMessageInterface) { - if cs.msgLogFP != nil { - var n int - var err error - wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, cs.msgLogFP, &n, &err) - wire.WriteTo([]byte("\n"), cs.msgLogFP, &n, &err) // one message per line - if err != nil { - log.Error("Error writing to consensus message log file", "err", err, "msg", msg) - } +// unmarshal and apply a single message to the consensus state +func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error { + var err error + var msg ConsensusLogMessage + wire.ReadJSON(&msg, msgBytes, &err) + if err != nil { + return fmt.Errorf("Error reading json data: %v", err) } + + // for logging + switch m := msg.Msg.(type) { + case *types.EventDataRoundState: + log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) + // these are playback checks + ticker := time.After(time.Second * 2) + if newStepCh != nil { + select { + case mi := <-newStepCh: + m2 := mi.(*types.EventDataRoundState) + if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { + return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) + } + case <-ticker: + return fmt.Errorf("Failed to read off newStepCh") + } + } + case msgInfo: + peerKey := m.PeerKey + if peerKey == "" { + peerKey = "local" + } + switch msg := m.Msg.(type) { + case *ProposalMessage: + p := msg.Proposal + log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", + p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) + case *BlockPartMessage: + log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) + case *VoteMessage: + v := msg.Vote + log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, + "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) + } + // internal or from peer + if m.PeerKey == "" { + cs.internalMsgQueue <- m + } else { + cs.peerMsgQueue <- m + } + case timeoutInfo: + log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) + cs.tockChan <- m + default: + return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) + } + return nil } -//-------------------------------------------------------- -// replay the messages - -// Interactive playback -func (cs ConsensusState) ReplayConsole(file string) error { - return cs.replay(file, true) -} - -// Full playback, with tests -func (cs ConsensusState) ReplayMessages(file string) error { - return cs.replay(file, false) -} - +// replay only those messages since the last block func (cs *ConsensusState) catchupReplay(height int) error { - if !cs.msgLogExists { + if cs.wal == nil { + log.Warn("consensus msg log is nil") return nil } - - if cs.msgLogFP == nil { - log.Warn("consensus msg log is nil") + if !cs.wal.exists { + // new wal, nothing to catchup on return nil } log.Notice("Catchup by replaying consensus messages") - f := cs.msgLogFP - n, err := seek.SeekFromEndOfFile(f, func(lineBytes []byte) bool { + // starting from end of file, + // read messages until a new height is found + nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool { var err error var msg ConsensusLogMessage wire.ReadJSON(&msg, lineBytes, &err) @@ -84,7 +100,6 @@ func (cs *ConsensusState) catchupReplay(height int) error { } m, ok := msg.Msg.(*types.EventDataRoundState) if ok && m.Step == RoundStepNewHeight.String() { - f.Seek(0, 1) // TODO: ensure the height matches return true } @@ -95,13 +110,10 @@ func (cs *ConsensusState) catchupReplay(height int) error { return err } - // we found it, now we can replay everything - pb := newPlayback("", cs.msgLogFP, cs, nil, cs.state.Copy()) - - reader := bufio.NewReader(cs.msgLogFP) - i := 0 - for { - i += 1 + // now we can replay the latest nLines on consensus state + // note we can't use scan because we've already been reading from the file + reader := bufio.NewReader(cs.wal.fp) + for i := 0; i < nLines; i++ { msgBytes, err := reader.ReadBytes('\n') if err == io.EOF { break @@ -118,16 +130,24 @@ func (cs *ConsensusState) catchupReplay(height int) error { // NOTE: since the priv key is set when the msgs are received // it will attempt to eg double sign but we can just ignore it // since the votes will be replayed and we'll get to the next step - if err := pb.readReplayMessage(msgBytes); err != nil { + if err := cs.readReplayMessage(msgBytes, nil); err != nil { return err - - } - if i >= n { - break } } return nil +} +//-------------------------------------------------------- +// replay messages interactively or all at once + +// Interactive playback +func (cs ConsensusState) ReplayConsole(file string) error { + return cs.replay(file, true) +} + +// Full playback, with tests +func (cs ConsensusState) ReplayMessages(file string) error { + return cs.replay(file, false) } // replay all msgs or start the console @@ -135,24 +155,22 @@ func (cs *ConsensusState) replay(file string, console bool) error { if cs.IsRunning() { return errors.New("cs is already running, cannot replay") } + if cs.wal != nil { + return errors.New("cs wal is open, cannot replay") + } cs.startForReplay() - // set the FP to nil so we don't overwrite - if cs.msgLogFP != nil { - cs.msgLogFP.Close() - cs.msgLogFP = nil - } - // ensure all new step events are regenerated as expected newStepCh := cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) + // just open the file for reading, no need to use wal fp, err := os.OpenFile(file, os.O_RDONLY, 0666) if err != nil { return err } - pb := newPlayback(file, fp, cs, newStepCh, cs.state.Copy()) + pb := newPlayback(file, fp, cs, cs.state.Copy()) defer pb.fp.Close() var nextN int // apply N msgs in a row @@ -161,7 +179,7 @@ func (cs *ConsensusState) replay(file string, console bool) error { nextN = pb.replayConsoleLoop() } - if err := pb.readReplayMessage(pb.scanner.Bytes()); err != nil { + if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil { return err } @@ -177,41 +195,39 @@ func (cs *ConsensusState) replay(file string, console bool) error { // playback manager type playback struct { - cs *ConsensusState - file string - fp *os.File - scanner *bufio.Scanner - newStepCh chan interface{} - genesisState *sm.State - count int + cs *ConsensusState + + fp *os.File + scanner *bufio.Scanner + count int // how many lines/msgs into the file are we + + // replays can be reset to beginning + fileName string // so we can close/reopen the file + genesisState *sm.State // so the replay session knows where to restart from } -func newPlayback(file string, fp *os.File, cs *ConsensusState, ch chan interface{}, genState *sm.State) *playback { +func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback { return &playback{ cs: cs, - file: file, - newStepCh: ch, - genesisState: genState, fp: fp, + fileName: fileName, + genesisState: genState, scanner: bufio.NewScanner(fp), } } // go back count steps by resetting the state and running (pb.count - count) steps -func (pb *playback) replayReset(count int) error { +func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { pb.cs.Stop() newCs := NewConsensusState(pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool) newCs.SetEventSwitch(pb.cs.evsw) - // ensure all new step events are regenerated as expected - pb.newStepCh = newCs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) - newCs.startForReplay() pb.fp.Close() - fp, err := os.OpenFile(pb.file, os.O_RDONLY, 0666) + fp, err := os.OpenFile(pb.fileName, os.O_RDONLY, 0666) if err != nil { return err } @@ -222,7 +238,7 @@ func (pb *playback) replayReset(count int) error { pb.count = 0 pb.cs = newCs for i := 0; pb.scanner.Scan() && i < count; i++ { - if err := pb.readReplayMessage(pb.scanner.Bytes()); err != nil { + if err := pb.cs.readReplayMessage(pb.scanner.Bytes(), newStepCh); err != nil { return err } pb.count += 1 @@ -285,8 +301,10 @@ func (pb *playback) replayConsoleLoop() int { // NOTE: "back" is not supported in the state machine design, // so we restart and replay up to + // ensure all new step events are regenerated as expected + newStepCh := pb.cs.evsw.SubscribeToEvent("replay-test", types.EventStringNewRoundStep(), 1) if len(tokens) == 1 { - pb.replayReset(1) + pb.replayReset(1, newStepCh) } else { i, err := strconv.Atoi(tokens[1]) if err != nil { @@ -294,7 +312,7 @@ func (pb *playback) replayConsoleLoop() int { } else if i > pb.count { fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count) } else { - pb.replayReset(i) + pb.replayReset(i, newStepCh) } } @@ -333,103 +351,3 @@ func (pb *playback) replayConsoleLoop() int { } return 0 } - -func (pb *playback) readReplayMessage(msgBytes []byte) error { - var err error - var msg ConsensusLogMessage - wire.ReadJSON(&msg, msgBytes, &err) - if err != nil { - return fmt.Errorf("Error reading json data: %v", err) - } - - // for logging - switch m := msg.Msg.(type) { - case *types.EventDataRoundState: - log.Notice("New Step", "height", m.Height, "round", m.Round, "step", m.Step) - // these are playback checks - ticker := time.After(time.Second * 2) - if pb.newStepCh != nil { - select { - case mi := <-pb.newStepCh: - m2 := mi.(*types.EventDataRoundState) - if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { - return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) - } - case <-ticker: - return fmt.Errorf("Failed to read off newStepCh") - } - } - case msgInfo: - peerKey := m.PeerKey - if peerKey == "" { - peerKey = "local" - } - switch msg := m.Msg.(type) { - case *ProposalMessage: - p := msg.Proposal - log.Notice("Proposal", "height", p.Height, "round", p.Round, "header", - p.BlockPartsHeader, "pol", p.POLRound, "peer", peerKey) - case *BlockPartMessage: - log.Notice("BlockPart", "height", msg.Height, "round", msg.Round, "peer", peerKey) - case *VoteMessage: - v := msg.Vote - log.Notice("Vote", "height", v.Height, "round", v.Round, "type", v.Type, - "hash", v.BlockHash, "header", v.BlockPartsHeader, "peer", peerKey) - } - // internal or from peer - if m.PeerKey == "" { - pb.cs.internalMsgQueue <- m - } else { - pb.cs.peerMsgQueue <- m - } - case timeoutInfo: - log.Notice("Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) - pb.cs.tockChan <- m - default: - return fmt.Errorf("Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg)) - } - return nil -} - -// Read lines starting from the end of the file until we read a line that causes found to return true -func SeekFromEndOfFile(f *os.File, found func([]byte) bool) (nLines int, err error) { - var current int64 - // start at the end - current, err = f.Seek(0, 2) - if err != nil { - fmt.Println("1") - return - } - - // backup until we find the the right line - for { - current -= 1 - if current < 0 { - return - } - // backup one and read a new byte - if _, err = f.Seek(current, 0); err != nil { - fmt.Println("2", current) - return - } - b := make([]byte, 1) - if _, err = f.Read(b); err != nil { - return - } - if b[0] == '\n' || len(b) == 0 { - nLines += 1 - - // read a full line - reader := bufio.NewReader(f) - lineBytes, _ := reader.ReadBytes('\n') - if len(lineBytes) == 0 { - continue - } - - if found(lineBytes) { - f.Seek(current, 0) - return - } - } - } -} diff --git a/consensus/state.go b/consensus/state.go index 8b61aa88..c91ffd63 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -4,7 +4,6 @@ import ( "bytes" "errors" "fmt" - "os" "reflect" "sync" "time" @@ -192,9 +191,7 @@ type ConsensusState struct { evsw *events.EventSwitch - msgLogFP *os.File // file we write all msgs and timeouts to, in order (for deterministic replay) - msgLogExists bool // if the file already existed (restarted process) - done chan struct{} // used to wait until the receiveRoutine quits so we can close the file + wal *WAL nSteps int // used for testing to limit the number of transitions the state makes } @@ -209,7 +206,6 @@ func NewConsensusState(state *sm.State, proxyAppConn proxy.AppConn, blockStore * timeoutTicker: new(time.Ticker), tickChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize), - done: make(chan struct{}), } cs.updateToState(state) // Don't call scheduleRound0 yet. @@ -282,23 +278,18 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { func (cs *ConsensusState) OnStop() { cs.QuitService.OnStop() - - // wait to quit the receiveRoutine - <-cs.done - if cs.msgLogFP != nil { - cs.msgLogFP.Close() - } } // Open file to log all consensus messages and timeouts for deterministic accountability -func (cs *ConsensusState) OpenFileForMessageLog(file string) (err error) { +func (cs *ConsensusState) OpenWAL(file string) (err error) { cs.mtx.Lock() defer cs.mtx.Unlock() - if _, err := os.Stat(file); err == nil { - cs.msgLogExists = true + wal, err := NewWAL(file) + if err != nil { + return err } - cs.msgLogFP, err = os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0666) - return err + cs.wal = wal + return nil } //------------------------------------------------------------ @@ -487,7 +478,7 @@ func (cs *ConsensusState) updateToState(state *sm.State) { func (cs *ConsensusState) newStep() { rs := cs.RoundStateEvent() - cs.saveMsg(rs) + cs.wal.Save(rs) cs.nSteps += 1 // newStep is called by updateToStep in NewConsensusState before the evsw is set! if cs.evsw != nil { @@ -570,21 +561,24 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { select { case mi = <-cs.peerMsgQueue: - cs.saveMsg(mi) + cs.wal.Save(mi) // handles proposals, block parts, votes // may generate internal events (votes, complete proposals, 2/3 majorities) cs.handleMsg(mi, rs) case mi = <-cs.internalMsgQueue: - cs.saveMsg(mi) + cs.wal.Save(mi) // handles proposals, block parts, votes cs.handleMsg(mi, rs) case ti := <-cs.tockChan: - cs.saveMsg(ti) + cs.wal.Save(ti) // if the timeout is relevant to the rs // go to the next step cs.handleTimeout(ti, rs) case <-cs.Quit: - close(cs.done) + // close wal now that we're done writing to it + if cs.wal != nil { + cs.wal.Close() + } return } } diff --git a/consensus/wal.go b/consensus/wal.go new file mode 100644 index 00000000..5b6e3989 --- /dev/null +++ b/consensus/wal.go @@ -0,0 +1,113 @@ +package consensus + +import ( + "bufio" + "os" + "time" + + . "github.com/tendermint/go-common" + "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/types" +) + +//-------------------------------------------------------- +// types and functions for savings consensus messages + +type ConsensusLogMessage struct { + Time time.Time `json:"time"` + Msg ConsensusLogMessageInterface `json:"msg"` +} + +type ConsensusLogMessageInterface interface{} + +var _ = wire.RegisterInterface( + struct{ ConsensusLogMessageInterface }{}, + wire.ConcreteType{&types.EventDataRoundState{}, 0x01}, + wire.ConcreteType{msgInfo{}, 0x02}, + wire.ConcreteType{timeoutInfo{}, 0x03}, +) + +//-------------------------------------------------------- +// Simple write-ahead logger + +// Write ahead logger writes msgs to disk before they are processed. +// Can be used for crash-recovery and deterministic replay +type WAL struct { + fp *os.File + exists bool // if the file already existed (restarted process) +} + +func NewWAL(file string) (*WAL, error) { + var walExists bool + if _, err := os.Stat(file); err == nil { + walExists = true + } + fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600) + if err != nil { + return nil, err + } + return &WAL{ + fp: fp, + exists: walExists, + }, nil +} + +// called in newStep and for each pass in receiveRoutine +func (wal *WAL) Save(msg ConsensusLogMessageInterface) { + if wal != nil { + var n int + var err error + wire.WriteJSON(ConsensusLogMessage{time.Now(), msg}, wal.fp, &n, &err) + wire.WriteTo([]byte("\n"), wal.fp, &n, &err) // one message per line + if err != nil { + PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, msg)) + } + } +} + +// Must not be called concurrently. +func (wal *WAL) Close() { + if wal != nil { + wal.fp.Close() + } +} + +func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) { + var current int64 + // start at the end + current, err = wal.fp.Seek(0, 2) + if err != nil { + return + } + + // backup until we find the the right line + for { + current -= 1 + if current < 0 { + return + } + // backup one and read a new byte + if _, err = wal.fp.Seek(current, 0); err != nil { + return + } + b := make([]byte, 1) + if _, err = wal.fp.Read(b); err != nil { + return + } + if b[0] == '\n' || len(b) == 0 { + // read a full line + reader := bufio.NewReader(wal.fp) + lineBytes, _ := reader.ReadBytes('\n') + if len(lineBytes) == 0 { + continue + } + + nLines += 1 + if found(lineBytes) { + wal.fp.Seek(0, 1) // (?) + wal.fp.Seek(current, 0) + return + } + } + } +} diff --git a/consensus/wal_test.go b/consensus/wal_test.go new file mode 100644 index 00000000..8348ca24 --- /dev/null +++ b/consensus/wal_test.go @@ -0,0 +1,76 @@ +package consensus + +import ( + "io/ioutil" + "os" + "path" + "strings" + "testing" +) + +var testTxt = `{"time":"2016-01-16T04:42:00.390Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrevote"}]} +{"time":"2016-01-16T04:42:00.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":1,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"4CC6845A128E723A299B470CCBB2A158612AA51321447F6492F3DA57D135C27FCF4124B3B19446A248252BDA45B152819C76AAA5FD35E1C07091885CE6955E05"}}],"peer_key":""}]} +{"time":"2016-01-16T04:42:00.392Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrecommit"}]} +{"time":"2016-01-16T04:42:00.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":2,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"1B9924E010F47E0817695DFE462C531196E5A12632434DE12180BBA3EFDAD6B3960FDB9357AFF085EB61729A7D4A6AD8408555D7569C87D9028F280192FD4E05"}}],"peer_key":""}]} +{"time":"2016-01-16T04:42:00.393Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepCommit"}]} +{"time":"2016-01-16T04:42:00.395Z","msg":[1,{"height":28220,"round":0,"step":"RoundStepNewHeight"}]}` + +func TestSeek(t *testing.T) { + f, err := ioutil.TempFile(os.TempDir(), "seek_test_") + if err != nil { + t.Fatal(err) + } + + stat, _ := f.Stat() + name := stat.Name() + + _, err = f.WriteString(testTxt) + if err != nil { + t.Fatal(err) + } + f.Close() + + wal, err := NewWAL(path.Join(os.TempDir(), name)) + if err != nil { + t.Fatal(err) + } + + keyWord := "Precommit" + n, err := wal.SeekFromEnd(func(b []byte) bool { + if strings.Contains(string(b), keyWord) { + return true + } + return false + }) + if err != nil { + t.Fatal(err) + } + + // confirm n + spl := strings.Split(testTxt, "\n") + var i int + var s string + for i, s = range spl { + if strings.Contains(s, keyWord) { + break + } + } + // n is lines from the end. + spl = spl[i:] + if n != len(spl) { + t.Fatalf("Wrong nLines. Got %d, expected %d", n, len(spl)) + } + + b, err := ioutil.ReadAll(wal.fp) + if err != nil { + t.Fatal(err) + } + // first char is a \n + spl2 := strings.Split(strings.Trim(string(b), "\n"), "\n") + for i, s := range spl { + if s != spl2[i] { + t.Fatalf("Mismatch. Got %s, expected %s", spl2[i], s) + } + } + +} diff --git a/node/node.go b/node/node.go index b9250bdd..34d0c469 100644 --- a/node/node.go +++ b/node/node.go @@ -85,9 +85,9 @@ func NewNode(privValidator *types.PrivValidator) *Node { } // deterministic accountability - err = consensusState.OpenFileForMessageLog(config.GetString("cs_msg_log")) + err = consensusState.OpenWAL(config.GetString("cswal")) if err != nil { - log.Error("Failed to open cs_msg_log", "error", err.Error()) + log.Error("Failed to open cswal", "error", err.Error()) } // Make p2p network switch @@ -251,6 +251,49 @@ func makeNodeInfo(sw *p2p.Switch, privKey crypto.PrivKeyEd25519) *p2p.NodeInfo { return nodeInfo } +// Get a connection to the proxyAppConn addr. +// Check the current hash, and panic if it doesn't match. +func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) { + // use local app (for testing) + if addr == "local" { + app := example.NewCounterApplication(true) + mtx := new(sync.Mutex) + proxyAppConn = proxy.NewLocalAppConn(mtx, app) + } else { + proxyConn, err := Connect(addr) + if err != nil { + Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) + } + remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024) + remoteApp.Start() + + proxyAppConn = remoteApp + } + + // Check the hash + currentHash, err := proxyAppConn.GetHashSync() + if err != nil { + PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", err)) + } + if !bytes.Equal(hash, currentHash) { + PanicCrisis(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, currentHash)) + } + + return proxyAppConn +} + +// Load the most recent state from "state" db, +// or create a new one (and save) from genesis. +func getState() *sm.State { + stateDB := dbm.GetDB("state") + state := sm.LoadState(stateDB) + if state == nil { + state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) + state.Save() + } + return state +} + //------------------------------------------------------------------------------ // Users wishing to use an external signer for their validators @@ -314,6 +357,10 @@ func RunNode() { }) } +//------------------------------------------------------------------------------ +// replay + +// convenience for replay mode func newConsensusState() *consensus.ConsensusState { // Get BlockStore blockStoreDB := dbm.GetDB("blockstore") @@ -347,71 +394,28 @@ func newConsensusState() *consensus.ConsensusState { } func RunReplayConsole() { - msgLogFile := config.GetString("cs_msg_log") - if msgLogFile == "" { - Exit("cs_msg_log file name not set in tendermint config") + walFile := config.GetString("cswal") + if walFile == "" { + Exit("cswal file name not set in tendermint config") } consensusState := newConsensusState() - if err := consensusState.ReplayConsole(msgLogFile); err != nil { + if err := consensusState.ReplayConsole(walFile); err != nil { Exit(Fmt("Error during consensus replay: %v", err)) } } func RunReplay() { - msgLogFile := config.GetString("cs_msg_log") - if msgLogFile == "" { - Exit("cs_msg_log file name not set in tendermint config") + walFile := config.GetString("cswal") + if walFile == "" { + Exit("cswal file name not set in tendermint config") } consensusState := newConsensusState() - if err := consensusState.ReplayMessages(msgLogFile); err != nil { + if err := consensusState.ReplayMessages(walFile); err != nil { Exit(Fmt("Error during consensus replay: %v", err)) } log.Notice("Replay run successfully") } - -// Load the most recent state from "state" db, -// or create a new one (and save) from genesis. -func getState() *sm.State { - stateDB := dbm.GetDB("state") - state := sm.LoadState(stateDB) - if state == nil { - state = sm.MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) - state.Save() - } - return state -} - -// Get a connection to the proxyAppConn addr. -// Check the current hash, and panic if it doesn't match. -func getProxyApp(addr string, hash []byte) (proxyAppConn proxy.AppConn) { - // use local app (for testing) - if addr == "local" { - app := example.NewCounterApplication(true) - mtx := new(sync.Mutex) - proxyAppConn = proxy.NewLocalAppConn(mtx, app) - } else { - proxyConn, err := Connect(addr) - if err != nil { - Exit(Fmt("Failed to connect to proxy for mempool: %v", err)) - } - remoteApp := proxy.NewRemoteAppConn(proxyConn, 1024) - remoteApp.Start() - - proxyAppConn = remoteApp - } - - // Check the hash - currentHash, err := proxyAppConn.GetHashSync() - if err != nil { - PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", err)) - } - if !bytes.Equal(hash, currentHash) { - PanicCrisis(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, currentHash)) - } - - return proxyAppConn -}