mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 14:52:17 +00:00
Consensus WAL uses AutoFile/Group
This commit is contained in:
parent
480f44f16c
commit
1788a68b1c
@ -11,6 +11,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
auto "github.com/tendermint/go-autofile"
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
"github.com/tendermint/go-wire"
|
"github.com/tendermint/go-wire"
|
||||||
|
|
||||||
@ -18,12 +19,17 @@ import (
|
|||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// unmarshal and apply a single message to the consensus state
|
// Unmarshal and apply a single message to the consensus state
|
||||||
// as if it were received in receiveRoutine
|
// as if it were received in receiveRoutine
|
||||||
|
// Lines that start with "#" are ignored.
|
||||||
// NOTE: receiveRoutine should not be running
|
// NOTE: receiveRoutine should not be running
|
||||||
func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error {
|
func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan interface{}) error {
|
||||||
|
// Skip over empty and meta lines
|
||||||
|
if len(msgBytes) == 0 || msgBytes[0] == '#' {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
var err error
|
var err error
|
||||||
var msg ConsensusLogMessage
|
var msg TimedWALMessage
|
||||||
wire.ReadJSON(&msg, msgBytes, &err)
|
wire.ReadJSON(&msg, msgBytes, &err)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("MsgBytes:", msgBytes, string(msgBytes))
|
fmt.Println("MsgBytes:", msgBytes, string(msgBytes))
|
||||||
@ -70,7 +76,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte
|
|||||||
log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
|
log.Notice("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration)
|
||||||
cs.handleTimeout(m, cs.RoundState)
|
cs.handleTimeout(m, cs.RoundState)
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("Replay: Unknown ConsensusLogMessage type: %v", reflect.TypeOf(msg.Msg))
|
return fmt.Errorf("Replay: Unknown TimedWALMessage type: %v", reflect.TypeOf(msg.Msg))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -78,83 +84,45 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte
|
|||||||
// replay only those messages since the last block.
|
// replay only those messages since the last block.
|
||||||
// timeoutRoutine should run concurrently to read off tickChan
|
// timeoutRoutine should run concurrently to read off tickChan
|
||||||
func (cs *ConsensusState) catchupReplay(csHeight int) error {
|
func (cs *ConsensusState) catchupReplay(csHeight int) error {
|
||||||
if !cs.wal.Exists() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// set replayMode
|
// set replayMode
|
||||||
cs.replayMode = true
|
cs.replayMode = true
|
||||||
defer func() { cs.replayMode = false }()
|
defer func() { cs.replayMode = false }()
|
||||||
|
|
||||||
// starting from end of file,
|
// Ensure that height+1 doesn't exist
|
||||||
// read messages until a new height is found
|
gr, found, err := cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight+1))
|
||||||
var walHeight int
|
if found {
|
||||||
nLines, err := cs.wal.SeekFromEnd(func(lineBytes []byte) bool {
|
return errors.New(Fmt("WAL should not contain height %d.", csHeight+1))
|
||||||
var err error
|
|
||||||
var msg ConsensusLogMessage
|
|
||||||
wire.ReadJSON(&msg, lineBytes, &err)
|
|
||||||
if err != nil {
|
|
||||||
panic(Fmt("Failed to read cs_msg_log json: %v", err))
|
|
||||||
}
|
}
|
||||||
m, ok := msg.Msg.(types.EventDataRoundState)
|
if gr != nil {
|
||||||
walHeight = m.Height
|
gr.Close()
|
||||||
if ok && m.Step == RoundStepNewHeight.String() {
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
return false
|
|
||||||
})
|
|
||||||
|
|
||||||
|
// Search for height marker
|
||||||
|
gr, found, err = cs.wal.group.Search("#HEIGHT: ", makeHeightSearchFunc(csHeight))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if !found {
|
||||||
// ensure the height matches
|
return errors.New(Fmt("WAL does not contain height %d.", csHeight))
|
||||||
if walHeight != csHeight {
|
|
||||||
var err error
|
|
||||||
if walHeight > csHeight {
|
|
||||||
err = errors.New(Fmt("WAL height (%d) exceeds cs height (%d). Is your cs.state corrupted?", walHeight, csHeight))
|
|
||||||
} else {
|
|
||||||
log.Notice("Replay: nothing to do", "cs.height", csHeight, "wal.height", walHeight)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
defer gr.Close()
|
||||||
|
|
||||||
var beginning bool // if we had to go back to the beginning
|
log.Notice("Catchup by replaying consensus messages", "height", csHeight)
|
||||||
if c, _ := cs.wal.fp.Seek(0, 1); c == 0 {
|
|
||||||
beginning = true
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Notice("Catchup by replaying consensus messages", "n", nLines, "height", walHeight)
|
for {
|
||||||
|
line, err := gr.ReadLine()
|
||||||
// now we can replay the latest nLines on consensus state
|
if err != nil {
|
||||||
// note we can't use scan because we've already been reading from the file
|
|
||||||
// XXX: if a msg is too big we need to find out why or increase this for that case ...
|
|
||||||
maxMsgSize := 1000000
|
|
||||||
reader := bufio.NewReaderSize(cs.wal.fp, maxMsgSize)
|
|
||||||
for i := 0; i < nLines; i++ {
|
|
||||||
msgBytes, err := reader.ReadBytes('\n')
|
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
log.Warn("Replay: EOF", "bytes", string(msgBytes))
|
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else {
|
||||||
return err
|
return err
|
||||||
} else if len(msgBytes) == 0 {
|
|
||||||
log.Warn("Replay: msg bytes is 0")
|
|
||||||
continue
|
|
||||||
} else if len(msgBytes) == 1 && msgBytes[0] == '\n' {
|
|
||||||
log.Warn("Replay: new line")
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
// the first msg is the NewHeight event (if we're not at the beginning), so we can ignore it
|
|
||||||
if !beginning && i == 1 {
|
|
||||||
log.Warn("Replay: not beginning and 1")
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: since the priv key is set when the msgs are received
|
// NOTE: since the priv key is set when the msgs are received
|
||||||
// it will attempt to eg double sign but we can just ignore it
|
// it will attempt to eg double sign but we can just ignore it
|
||||||
// since the votes will be replayed and we'll get to the next step
|
// since the votes will be replayed and we'll get to the next step
|
||||||
if err := cs.readReplayMessage(msgBytes, nil); err != nil {
|
if err := cs.readReplayMessage([]byte(line), nil); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -245,6 +213,7 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.
|
|||||||
func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
|
func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
|
||||||
|
|
||||||
pb.cs.Stop()
|
pb.cs.Stop()
|
||||||
|
pb.cs.Wait()
|
||||||
|
|
||||||
newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool)
|
newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool)
|
||||||
newCS.SetEventSwitch(pb.cs.evsw)
|
newCS.SetEventSwitch(pb.cs.evsw)
|
||||||
@ -376,3 +345,28 @@ func (pb *playback) replayConsoleLoop() int {
|
|||||||
}
|
}
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//--------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// Parses marker lines of the form:
|
||||||
|
// #HEIGHT: 12345
|
||||||
|
func makeHeightSearchFunc(height int) auto.SearchFunc {
|
||||||
|
return func(line string) (int, error) {
|
||||||
|
line = strings.TrimRight(line, "\n")
|
||||||
|
parts := strings.Split(line, " ")
|
||||||
|
if len(parts) != 2 {
|
||||||
|
return -1, errors.New("Line did not have 2 parts")
|
||||||
|
}
|
||||||
|
i, err := strconv.Atoi(parts[1])
|
||||||
|
if err != nil {
|
||||||
|
return -1, errors.New("Failed to parse INFO: " + err.Error())
|
||||||
|
}
|
||||||
|
if height < i {
|
||||||
|
return 1, nil
|
||||||
|
} else if height == i {
|
||||||
|
return 0, nil
|
||||||
|
} else {
|
||||||
|
return -1, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -17,13 +17,13 @@ import (
|
|||||||
var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data")
|
var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data")
|
||||||
|
|
||||||
// the priv validator changes step at these lines for a block with 1 val and 1 part
|
// the priv validator changes step at these lines for a block with 1 val and 1 part
|
||||||
var baseStepChanges = []int{2, 5, 7}
|
var baseStepChanges = []int{3, 6, 8}
|
||||||
|
|
||||||
// test recovery from each line in each testCase
|
// test recovery from each line in each testCase
|
||||||
var testCases = []*testCase{
|
var testCases = []*testCase{
|
||||||
newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part)
|
newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part)
|
||||||
newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part
|
newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part
|
||||||
newTestCase("small_block2", []int{2, 7, 9}), // small block with txs across 3 smaller block parts
|
newTestCase("small_block2", []int{3, 8, 10}), // small block with txs across 3 smaller block parts
|
||||||
}
|
}
|
||||||
|
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
@ -108,6 +108,7 @@ func runReplayTest(t *testing.T, cs *ConsensusState, fileName string, newBlockCh
|
|||||||
// should eventually be followed by a new block, or else something is wrong
|
// should eventually be followed by a new block, or else something is wrong
|
||||||
waitForBlock(newBlockCh, thisCase, i)
|
waitForBlock(newBlockCh, thisCase, i)
|
||||||
cs.Stop()
|
cs.Stop()
|
||||||
|
cs.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) {
|
func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) {
|
||||||
@ -162,7 +163,7 @@ func TestReplayCrashBeforeWritePropose(t *testing.T) {
|
|||||||
cs, newBlockCh, proposalMsg, f := setupReplayTest(thisCase, lineNum, false) // propose
|
cs, newBlockCh, proposalMsg, f := setupReplayTest(thisCase, lineNum, false) // propose
|
||||||
// Set LastSig
|
// Set LastSig
|
||||||
var err error
|
var err error
|
||||||
var msg ConsensusLogMessage
|
var msg TimedWALMessage
|
||||||
wire.ReadJSON(&msg, []byte(proposalMsg), &err)
|
wire.ReadJSON(&msg, []byte(proposalMsg), &err)
|
||||||
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
|
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -181,7 +182,7 @@ func TestReplayCrashBeforeWritePrevote(t *testing.T) {
|
|||||||
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) {
|
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) {
|
||||||
// Set LastSig
|
// Set LastSig
|
||||||
var err error
|
var err error
|
||||||
var msg ConsensusLogMessage
|
var msg TimedWALMessage
|
||||||
wire.ReadJSON(&msg, []byte(voteMsg), &err)
|
wire.ReadJSON(&msg, []byte(voteMsg), &err)
|
||||||
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
|
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -201,7 +202,7 @@ func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
|
|||||||
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) {
|
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) {
|
||||||
// Set LastSig
|
// Set LastSig
|
||||||
var err error
|
var err error
|
||||||
var msg ConsensusLogMessage
|
var msg TimedWALMessage
|
||||||
wire.ReadJSON(&msg, []byte(voteMsg), &err)
|
wire.ReadJSON(&msg, []byte(voteMsg), &err)
|
||||||
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
|
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -343,6 +343,7 @@ func (cs *ConsensusState) startRoutines(maxSteps int) {
|
|||||||
func (cs *ConsensusState) OnStop() {
|
func (cs *ConsensusState) OnStop() {
|
||||||
cs.BaseService.OnStop()
|
cs.BaseService.OnStop()
|
||||||
|
|
||||||
|
// Make BaseService.Wait() wait until cs.wal.Wait()
|
||||||
if cs.wal != nil && cs.IsRunning() {
|
if cs.wal != nil && cs.IsRunning() {
|
||||||
cs.wal.Wait()
|
cs.wal.Wait()
|
||||||
}
|
}
|
||||||
@ -658,7 +659,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
|
|||||||
|
|
||||||
// close wal now that we're done writing to it
|
// close wal now that we're done writing to it
|
||||||
if cs.wal != nil {
|
if cs.wal != nil {
|
||||||
cs.wal.Close()
|
cs.wal.Stop()
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#HEIGHT: 1
|
||||||
{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]}
|
{"time":"2016-04-03T11:23:54.387Z","msg":[3,{"duration":972835254,"height":1,"round":0,"step":1}]}
|
||||||
{"time":"2016-04-03T11:23:54.388Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
|
{"time":"2016-04-03T11:23:54.388Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
|
||||||
{"time":"2016-04-03T11:23:54.388Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"pol_round":-1,"signature":"3A2ECD5023B21EC144EC16CFF1B992A4321317B83EEDD8969FDFEA6EB7BF4389F38DDA3E7BB109D63A07491C16277A197B241CF1F05F5E485C59882ECACD9E07"}}],"peer_key":""}]}
|
{"time":"2016-04-03T11:23:54.388Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"3BA1E90CB868DA6B4FD7F3589826EC461E9EB4EF"},"pol_round":-1,"signature":"3A2ECD5023B21EC144EC16CFF1B992A4321317B83EEDD8969FDFEA6EB7BF4389F38DDA3E7BB109D63A07491C16277A197B241CF1F05F5E485C59882ECACD9E07"}}],"peer_key":""}]}
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#HEIGHT: 1
|
||||||
{"time":"2016-10-11T15:29:08.113Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]}
|
{"time":"2016-10-11T15:29:08.113Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]}
|
||||||
{"time":"2016-10-11T15:29:08.115Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
|
{"time":"2016-10-11T15:29:08.115Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
|
||||||
{"time":"2016-10-11T15:29:08.115Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"A2C0B5D384DFF2692FF679D00CEAE93A55DCDD1A"},"pol_round":-1,"signature":"116961B715FB54C09983209F7F3BFD95C7DA2E0D7AB9CFE9F0750F2402E2AEB715CFD55FB2C5DB88F485391D426A48705E0474AB94328463290D086D88BAD704"}}],"peer_key":""}]}
|
{"time":"2016-10-11T15:29:08.115Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":1,"hash":"A2C0B5D384DFF2692FF679D00CEAE93A55DCDD1A"},"pol_round":-1,"signature":"116961B715FB54C09983209F7F3BFD95C7DA2E0D7AB9CFE9F0750F2402E2AEB715CFD55FB2C5DB88F485391D426A48705E0474AB94328463290D086D88BAD704"}}],"peer_key":""}]}
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
#HEIGHT: 1
|
||||||
{"time":"2016-10-11T16:21:23.438Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]}
|
{"time":"2016-10-11T16:21:23.438Z","msg":[3,{"duration":0,"height":1,"round":0,"step":1}]}
|
||||||
{"time":"2016-10-11T16:21:23.440Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
|
{"time":"2016-10-11T16:21:23.440Z","msg":[1,{"height":1,"round":0,"step":"RoundStepPropose"}]}
|
||||||
{"time":"2016-10-11T16:21:23.440Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":3,"hash":"88BC082C86DED0A5E2BBC3677B610D155FEDBCEA"},"pol_round":-1,"signature":"8F74F7032E50DFBC17E8B42DD15FD54858B45EEB1B8DAF6432AFBBB1333AC1E850290DE82DF613A10430EB723023527498D45C106FD2946FEF03A9C8B301020B"}}],"peer_key":""}]}
|
{"time":"2016-10-11T16:21:23.440Z","msg":[2,{"msg":[17,{"Proposal":{"height":1,"round":0,"block_parts_header":{"total":3,"hash":"88BC082C86DED0A5E2BBC3677B610D155FEDBCEA"},"pol_round":-1,"signature":"8F74F7032E50DFBC17E8B42DD15FD54858B45EEB1B8DAF6432AFBBB1333AC1E850290DE82DF613A10430EB723023527498D45C106FD2946FEF03A9C8B301020B"}}],"peer_key":""}]}
|
||||||
|
129
consensus/wal.go
129
consensus/wal.go
@ -1,10 +1,9 @@
|
|||||||
package consensus
|
package consensus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
auto "github.com/tendermint/go-autofile"
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
"github.com/tendermint/go-wire"
|
"github.com/tendermint/go-wire"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
@ -13,15 +12,15 @@ import (
|
|||||||
//--------------------------------------------------------
|
//--------------------------------------------------------
|
||||||
// types and functions for savings consensus messages
|
// types and functions for savings consensus messages
|
||||||
|
|
||||||
type ConsensusLogMessage struct {
|
type TimedWALMessage struct {
|
||||||
Time time.Time `json:"time"`
|
Time time.Time `json:"time"`
|
||||||
Msg ConsensusLogMessageInterface `json:"msg"`
|
Msg WALMessage `json:"msg"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ConsensusLogMessageInterface interface{}
|
type WALMessage interface{}
|
||||||
|
|
||||||
var _ = wire.RegisterInterface(
|
var _ = wire.RegisterInterface(
|
||||||
struct{ ConsensusLogMessageInterface }{},
|
struct{ WALMessage }{},
|
||||||
wire.ConcreteType{types.EventDataRoundState{}, 0x01},
|
wire.ConcreteType{types.EventDataRoundState{}, 0x01},
|
||||||
wire.ConcreteType{msgInfo{}, 0x02},
|
wire.ConcreteType{msgInfo{}, 0x02},
|
||||||
wire.ConcreteType{timeoutInfo{}, 0x03},
|
wire.ConcreteType{timeoutInfo{}, 0x03},
|
||||||
@ -35,119 +34,59 @@ var _ = wire.RegisterInterface(
|
|||||||
// TODO: currently the wal is overwritten during replay catchup
|
// TODO: currently the wal is overwritten during replay catchup
|
||||||
// give it a mode so it's either reading or appending - must read to end to start appending again
|
// give it a mode so it's either reading or appending - must read to end to start appending again
|
||||||
type WAL struct {
|
type WAL struct {
|
||||||
fp *os.File
|
BaseService
|
||||||
exists bool // if the file already existed (restarted process)
|
|
||||||
|
|
||||||
done chan struct{}
|
|
||||||
|
|
||||||
|
group *auto.Group
|
||||||
light bool // ignore block parts
|
light bool // ignore block parts
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWAL(file string, light bool) (*WAL, error) {
|
func NewWAL(path string, light bool) (*WAL, error) {
|
||||||
var walExists bool
|
head, err := auto.OpenAutoFile(path)
|
||||||
if _, err := os.Stat(file); err == nil {
|
|
||||||
walExists = true
|
|
||||||
}
|
|
||||||
fp, err := os.OpenFile(file, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &WAL{
|
group, err := auto.OpenGroup(head)
|
||||||
fp: fp,
|
if err != nil {
|
||||||
exists: walExists,
|
return nil, err
|
||||||
done: make(chan struct{}),
|
}
|
||||||
|
wal := &WAL{
|
||||||
|
group: group,
|
||||||
light: light,
|
light: light,
|
||||||
}, nil
|
}
|
||||||
|
wal.BaseService = *NewBaseService(log, "WAL", wal)
|
||||||
|
return wal, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wal *WAL) Exists() bool {
|
func (wal *WAL) OnStop() {
|
||||||
if wal == nil {
|
wal.BaseService.OnStop()
|
||||||
log.Warn("consensus msg log is nil")
|
wal.group.Head.Close()
|
||||||
return false
|
wal.group.Close()
|
||||||
}
|
|
||||||
return wal.exists
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// called in newStep and for each pass in receiveRoutine
|
// called in newStep and for each pass in receiveRoutine
|
||||||
func (wal *WAL) Save(clm ConsensusLogMessageInterface) {
|
func (wal *WAL) Save(wmsg WALMessage) {
|
||||||
if wal == nil {
|
if wal == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if wal.light {
|
if wal.light {
|
||||||
// in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
|
// in light mode we only write new steps, timeouts, and our own votes (no proposals, block parts)
|
||||||
if mi, ok := clm.(msgInfo); ok {
|
if mi, ok := wmsg.(msgInfo); ok {
|
||||||
_ = mi
|
_ = mi
|
||||||
if mi.PeerKey != "" {
|
if mi.PeerKey != "" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var clmBytes = wire.JSONBytes(ConsensusLogMessage{time.Now(), clm})
|
// Write #HEIGHT: XYZ if new height
|
||||||
var n int
|
if edrs, ok := wmsg.(types.EventDataRoundState); ok {
|
||||||
var err error
|
if edrs.Step == RoundStepNewHeight.String() {
|
||||||
wire.WriteTo(append(clmBytes, byte('\n')), wal.fp, &n, &err) // one message per line
|
wal.group.WriteLine(Fmt("#HEIGHT: %v", edrs.Height))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Write the wal message
|
||||||
|
var wmsgBytes = wire.JSONBytes(TimedWALMessage{time.Now(), wmsg})
|
||||||
|
err := wal.group.WriteLine(string(wmsgBytes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, clm))
|
PanicQ(Fmt("Error writing msg to consensus wal. Error: %v \n\nMessage: %v", err, wmsg))
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Must not be called concurrently with a write.
|
|
||||||
func (wal *WAL) Close() {
|
|
||||||
if wal != nil {
|
|
||||||
wal.fp.Close()
|
|
||||||
}
|
|
||||||
wal.done <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (wal *WAL) Wait() {
|
|
||||||
<-wal.done
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: remove once we stop supporting older golang version
|
|
||||||
const (
|
|
||||||
ioSeekStart = 0
|
|
||||||
ioSeekCurrent = 1
|
|
||||||
ioSeekEnd = 2
|
|
||||||
)
|
|
||||||
|
|
||||||
func (wal *WAL) SeekFromEnd(found func([]byte) bool) (nLines int, err error) {
|
|
||||||
var current int64
|
|
||||||
// start at the end
|
|
||||||
current, err = wal.fp.Seek(0, ioSeekEnd)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// backup until we find the the right line
|
|
||||||
// current is how far we are from the beginning
|
|
||||||
for {
|
|
||||||
current -= 1
|
|
||||||
if current < 0 {
|
|
||||||
wal.fp.Seek(0, ioSeekStart) // back to beginning
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// backup one and read a new byte
|
|
||||||
if _, err = wal.fp.Seek(current, ioSeekStart); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
b := make([]byte, 1)
|
|
||||||
if _, err = wal.fp.Read(b); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if b[0] == '\n' || len(b) == 0 {
|
|
||||||
nLines += 1
|
|
||||||
// read a full line
|
|
||||||
reader := bufio.NewReader(wal.fp)
|
|
||||||
lineBytes, _ := reader.ReadBytes('\n')
|
|
||||||
if len(lineBytes) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if found(lineBytes) {
|
|
||||||
wal.fp.Seek(0, ioSeekCurrent) // (?)
|
|
||||||
wal.fp.Seek(current, ioSeekStart)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,78 +0,0 @@
|
|||||||
package consensus
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io/ioutil"
|
|
||||||
"os"
|
|
||||||
"path"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
. "github.com/tendermint/go-common"
|
|
||||||
)
|
|
||||||
|
|
||||||
var testTxt = `{"time":"2016-01-16T04:42:00.390Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrevote"}]}
|
|
||||||
{"time":"2016-01-16T04:42:00.390Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":1,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"4CC6845A128E723A299B470CCBB2A158612AA51321447F6492F3DA57D135C27FCF4124B3B19446A248252BDA45B152819C76AAA5FD35E1C07091885CE6955E05"}}],"peer_key":""}]}
|
|
||||||
{"time":"2016-01-16T04:42:00.392Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepPrecommit"}]}
|
|
||||||
{"time":"2016-01-16T04:42:00.392Z","msg":[2,{"msg":[20,{"ValidatorIndex":0,"Vote":{"height":28219,"round":0,"type":2,"block_hash":"67F9689F15BEC30BF311FB4C0C80C5E661AA44E0","block_parts_header":{"total":1,"hash":"DFFD4409A1E273ED61AC27CAF975F446020D5676"},"signature":"1B9924E010F47E0817695DFE462C531196E5A12632434DE12180BBA3EFDAD6B3960FDB9357AFF085EB61729A7D4A6AD8408555D7569C87D9028F280192FD4E05"}}],"peer_key":""}]}
|
|
||||||
{"time":"2016-01-16T04:42:00.393Z","msg":[1,{"height":28219,"round":0,"step":"RoundStepCommit"}]}
|
|
||||||
{"time":"2016-01-16T04:42:00.395Z","msg":[1,{"height":28220,"round":0,"step":"RoundStepNewHeight"}]}`
|
|
||||||
|
|
||||||
func TestSeek(t *testing.T) {
|
|
||||||
f, err := ioutil.TempFile(os.TempDir(), "seek_test_")
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
stat, _ := f.Stat()
|
|
||||||
name := stat.Name()
|
|
||||||
|
|
||||||
_, err = f.WriteString(testTxt)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
f.Close()
|
|
||||||
|
|
||||||
wal, err := NewWAL(path.Join(os.TempDir(), name), config.GetBool("cswal_light"))
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
keyWord := "Precommit"
|
|
||||||
n, err := wal.SeekFromEnd(func(b []byte) bool {
|
|
||||||
if strings.Contains(string(b), keyWord) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// confirm n
|
|
||||||
spl := strings.Split(testTxt, "\n")
|
|
||||||
var i int
|
|
||||||
var s string
|
|
||||||
for i, s = range spl {
|
|
||||||
if strings.Contains(s, keyWord) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// n is lines from the end.
|
|
||||||
spl = spl[i:]
|
|
||||||
if n != len(spl) {
|
|
||||||
panic(Fmt("Wrong nLines. Got %d, expected %d", n, len(spl)))
|
|
||||||
}
|
|
||||||
|
|
||||||
b, err := ioutil.ReadAll(wal.fp)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
// first char is a \n
|
|
||||||
spl2 := strings.Split(strings.Trim(string(b), "\n"), "\n")
|
|
||||||
for i, s := range spl {
|
|
||||||
if s != spl2[i] {
|
|
||||||
panic(Fmt("Mismatch. Got %s, expected %s", spl2[i], s))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
2
glide.lock
generated
2
glide.lock
generated
@ -52,7 +52,7 @@ imports:
|
|||||||
- name: github.com/tendermint/flowcontrol
|
- name: github.com/tendermint/flowcontrol
|
||||||
version: 84d9671090430e8ec80e35b339907e0579b999eb
|
version: 84d9671090430e8ec80e35b339907e0579b999eb
|
||||||
- name: github.com/tendermint/go-autofile
|
- name: github.com/tendermint/go-autofile
|
||||||
version: c26b857900009ac81c78c1bc03f85e0c8e47818a
|
version: 916f3d789b6afaf7bfe161aeec391c8a35e354a8
|
||||||
- name: github.com/tendermint/go-clist
|
- name: github.com/tendermint/go-clist
|
||||||
version: 3baa390bbaf7634251c42ad69a8682e7e3990552
|
version: 3baa390bbaf7634251c42ad69a8682e7e3990552
|
||||||
- name: github.com/tendermint/go-common
|
- name: github.com/tendermint/go-common
|
||||||
|
Loading…
x
Reference in New Issue
Block a user