binary format for WAL

This commit is contained in:
Anton Kaliaev
2017-10-09 23:10:58 +04:00
parent 31030c6514
commit 3115c23762
16 changed files with 451 additions and 143 deletions

View File

@ -8,7 +8,6 @@ import (
"io/ioutil"
"os"
"path"
"strings"
"testing"
"time"
@ -58,14 +57,14 @@ var baseStepChanges = []int{3, 6, 8}
// test recovery from each line in each testCase
var testCases = []*testCase{
newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part)
newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part
newTestCase("small_block2", []int{3, 7, 9}), // small block with txs across 6 smaller block parts
newTestCase("empty_block", baseStepChanges), // empty block (has 1 block part)
newTestCase("small_block1", baseStepChanges), // small block with txs in 1 block part
newTestCase("small_block2", []int{3, 12, 14}), // small block with txs across 6 smaller block parts
}
type testCase struct {
name string
log string //full cs wal
log []byte //full cs wal
stepMap map[int]int8 // map lines of log to privval step
proposeLine int
@ -100,29 +99,27 @@ func newMapFromChanges(changes []int) map[int]int8 {
return m
}
func readWAL(p string) string {
func readWAL(p string) []byte {
b, err := ioutil.ReadFile(p)
if err != nil {
panic(err)
}
return string(b)
return b
}
func writeWAL(walMsgs string) string {
tempDir := os.TempDir()
walDir := path.Join(tempDir, "/wal"+cmn.RandStr(12))
walFile := path.Join(walDir, "wal")
// Create WAL directory
err := cmn.EnsureDir(walDir, 0700)
func writeWAL(walMsgs []byte) string {
walFile, err := ioutil.TempFile("", "wal")
if err != nil {
panic(err)
panic(fmt.Errorf("failed to create temp WAL file: %v", err))
}
// Write the needed WAL to file
err = cmn.WriteFile(walFile, []byte(walMsgs), 0600)
_, err = walFile.Write(walMsgs)
if err != nil {
panic(err)
panic(fmt.Errorf("failed to write to temp WAL file: %v", err))
}
return walFile
if err := walFile.Close(); err != nil {
panic(fmt.Errorf("failed to close temp WAL file: %v", err))
}
return walFile.Name()
}
func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) {
@ -167,7 +164,7 @@ func toPV(pv types.PrivValidator) *types.PrivValidatorFS {
return pv.(*types.PrivValidatorFS)
}
func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) {
func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, []byte, string) {
t.Log("-------------------------------------")
t.Logf("Starting replay test %v (of %d lines of WAL). Crash after = %v", thisCase.name, nLines, crashAfter)
@ -176,11 +173,13 @@ func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bo
lineStep -= 1
}
split := strings.Split(thisCase.log, "\n")
split := bytes.Split(thisCase.log, walSeparator)
lastMsg := split[nLines]
// we write those lines up to (not including) one with the signature
walFile := writeWAL(strings.Join(split[:nLines], "\n") + "\n")
bytes := bytes.Join(split[:nLines], walSeparator)
bytes = append(bytes, walSeparator...)
walFile := writeWAL(bytes)
cs := fixedConsensusStateDummy()
@ -195,14 +194,18 @@ func setupReplayTest(t *testing.T, thisCase *testCase, nLines int, crashAfter bo
return cs, newBlockCh, lastMsg, walFile
}
func readTimedWALMessage(t *testing.T, walMsg string) TimedWALMessage {
var err error
var msg TimedWALMessage
wire.ReadJSON(&msg, []byte(walMsg), &err)
func readTimedWALMessage(t *testing.T, rawMsg []byte) TimedWALMessage {
b := bytes.NewBuffer(rawMsg)
_, err := b.Write(walSeparator)
if err != nil {
t.Fatal(err)
}
dec := NewWALDecoder(b)
msg, err := dec.Decode()
if err != nil {
t.Fatalf("Error reading json data: %v", err)
}
return msg
return *msg
}
//-----------------------------------------------
@ -211,10 +214,14 @@ func readTimedWALMessage(t *testing.T, walMsg string) TimedWALMessage {
func TestWALCrashAfterWrite(t *testing.T) {
for _, thisCase := range testCases {
split := strings.Split(thisCase.log, "\n")
for i := 0; i < len(split)-1; i++ {
cs, newBlockCh, _, walFile := setupReplayTest(t, thisCase, i+1, true)
runReplayTest(t, cs, walFile, newBlockCh, thisCase, i+1)
splitSize := bytes.Count(thisCase.log, walSeparator)
for i := 0; i < splitSize-1; i++ {
t.Run(fmt.Sprintf("%s:%d", thisCase.name, i), func(t *testing.T) {
cs, newBlockCh, _, walFile := setupReplayTest(t, thisCase, i+1, true)
runReplayTest(t, cs, walFile, newBlockCh, thisCase, i+1)
// cleanup
os.Remove(walFile)
})
}
}
}
@ -226,14 +233,18 @@ func TestWALCrashAfterWrite(t *testing.T) {
func TestWALCrashBeforeWritePropose(t *testing.T) {
for _, thisCase := range testCases {
lineNum := thisCase.proposeLine
// setup replay test where last message is a proposal
cs, newBlockCh, proposalMsg, walFile := setupReplayTest(t, thisCase, lineNum, false)
msg := readTimedWALMessage(t, proposalMsg)
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
// Set LastSig
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature
runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum)
t.Run(fmt.Sprintf("%s:%d", thisCase.name, lineNum), func(t *testing.T) {
// setup replay test where last message is a proposal
cs, newBlockCh, proposalMsg, walFile := setupReplayTest(t, thisCase, lineNum, false)
msg := readTimedWALMessage(t, proposalMsg)
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
// Set LastSig
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
toPV(cs.privValidator).LastSignature = proposal.Proposal.Signature
runReplayTest(t, cs, walFile, newBlockCh, thisCase, lineNum)
// cleanup
os.Remove(walFile)
})
}
}
@ -315,7 +326,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
if err != nil {
t.Fatal(err)
}
walFile := writeWAL(string(walBody))
walFile := writeWAL(walBody)
config.Consensus.SetWalFile(walFile)
privVal := types.LoadPrivValidatorFS(config.PrivValidatorFile())
@ -465,7 +476,7 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B
func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
// Search for height marker
gr, found, err := wal.group.Search("#ENDHEIGHT: ", makeHeightSearchFunc(0))
gr, found, err := wal.SearchForEndHeight(0)
if err != nil {
return nil, nil, err
}
@ -479,20 +490,18 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
var blockParts *types.PartSet
var blocks []*types.Block
var commits []*types.Commit
for {
line, err := gr.ReadLine()
if err != nil {
if err == io.EOF {
break
} else {
return nil, nil, err
}
}
piece, err := readPieceFromWAL([]byte(line))
dec := NewWALDecoder(gr)
for {
msg, err := dec.Decode()
if err == io.EOF {
break
}
if err != nil {
return nil, nil, err
}
piece := readPieceFromWAL(msg)
if piece == nil {
continue
}
@ -528,17 +537,10 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
return blocks, commits, nil
}
func readPieceFromWAL(msgBytes []byte) (interface{}, error) {
// Skip over empty and meta lines
if len(msgBytes) == 0 || msgBytes[0] == '#' {
return nil, nil
}
var err error
var msg TimedWALMessage
wire.ReadJSON(&msg, msgBytes, &err)
if err != nil {
fmt.Println("MsgBytes:", msgBytes, string(msgBytes))
return nil, fmt.Errorf("Error reading json data: %v", err)
func readPieceFromWAL(msg *TimedWALMessage) interface{} {
// Skip meta lines
if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil
}
// for logging
@ -546,14 +548,15 @@ func readPieceFromWAL(msgBytes []byte) (interface{}, error) {
case msgInfo:
switch msg := m.Msg.(type) {
case *ProposalMessage:
return &msg.Proposal.BlockPartsHeader, nil
return &msg.Proposal.BlockPartsHeader
case *BlockPartMessage:
return msg.Part, nil
return msg.Part
case *VoteMessage:
return msg.Vote, nil
return msg.Vote
}
}
return nil, nil
return nil
}
// fresh state and mock store