fixes from review

This commit is contained in:
Ethan Buchman
2016-12-22 21:51:58 -05:00
parent 2f0d31b4b6
commit c9698e4848
5 changed files with 26 additions and 21 deletions

View File

@ -30,7 +30,7 @@ Panics indicate probable corruption in the data
type BlockStore struct { type BlockStore struct {
db dbm.DB db dbm.DB
mtx sync.Mutex mtx sync.RWMutex
height int height int
} }
@ -44,8 +44,8 @@ func NewBlockStore(db dbm.DB) *BlockStore {
// Height() returns the last known contiguous block height. // Height() returns the last known contiguous block height.
func (bs *BlockStore) Height() int { func (bs *BlockStore) Height() int {
bs.mtx.Lock() bs.mtx.RLock()
defer bs.mtx.Unlock() defer bs.mtx.RUnlock()
return bs.height return bs.height
} }
@ -146,9 +146,8 @@ func (bs *BlockStore) LoadSeenCommit(height int) *types.Commit {
// most recent height. Otherwise they'd stall at H-1. // most recent height. Otherwise they'd stall at H-1.
func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
height := block.Height height := block.Height
bsHeight := bs.Height() if height != bs.Height()+1 {
if height != bsHeight+1 { PanicSanity(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height))
PanicSanity(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bsHeight+1, height))
} }
if !blockParts.IsComplete() { if !blockParts.IsComplete() {
PanicSanity(Fmt("BlockStore can only save complete block part sets")) PanicSanity(Fmt("BlockStore can only save complete block part sets"))
@ -161,7 +160,7 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
// Save block parts // Save block parts
for i := 0; i < blockParts.Total(); i++ { for i := 0; i < blockParts.Total(); i++ {
bs.saveBlockPart(height, bsHeight, i, blockParts.GetPart(i)) bs.saveBlockPart(height, i, blockParts.GetPart(i))
} }
// Save block commit (duplicate and separate from the Block) // Save block commit (duplicate and separate from the Block)
@ -185,9 +184,9 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
bs.db.SetSync(nil, nil) bs.db.SetSync(nil, nil)
} }
func (bs *BlockStore) saveBlockPart(height, bsHeight int, index int, part *types.Part) { func (bs *BlockStore) saveBlockPart(height int, index int, part *types.Part) {
if height != bsHeight+1 { if height != bs.Height()+1 {
PanicSanity(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bsHeight+1, height)) PanicSanity(Fmt("BlockStore can only save contiguous blocks. Wanted %v, got %v", bs.Height()+1, height))
} }
partBytes := wire.BinaryBytes(part) partBytes := wire.BinaryBytes(part)
bs.db.Set(calcBlockPartKey(height, index), partBytes) bs.db.Set(calcBlockPartKey(height, index), partBytes)

View File

@ -4,6 +4,9 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
// XXX: WARNING: these functions can halt the consensus as firing events is synchronous.
// Make sure to read off the channels, and in the case of subscribeToEventRespond, to write back on it
// NOTE: if chanCap=0, this blocks on the event being consumed // NOTE: if chanCap=0, this blocks on the event being consumed
func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} { func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} {
// listen for event // listen for event

View File

@ -393,7 +393,7 @@ func newMockTickerFunc(onlyOnce bool) func() TimeoutTicker {
} }
} }
// mock ticker only fires once // mock ticker only fires on RoundStepNewHeight
// and only once if onlyOnce=true // and only once if onlyOnce=true
type mockTicker struct { type mockTicker struct {
c chan timeoutInfo c chan timeoutInfo

View File

@ -79,8 +79,8 @@ func TestValidatorSetChanges(t *testing.T) {
}, p2p.Connect2Switches) }, p2p.Connect2Switches)
// now that everyone is connected, start the state machines // now that everyone is connected, start the state machines
// (otherwise, we could block forever in firing new block while a peer is trying to // If we started the state machines before everyone was connected,
// access state info for AddPeer) // we'd block when the cs fires NewBlockEvent and the peers are trying to start their reactors
for i := 0; i < nPeers; i++ { for i := 0; i < nPeers; i++ {
s := reactors[i].conS.GetState() s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s) reactors[i].SwitchToConsensus(s)

View File

@ -15,9 +15,10 @@ import (
) )
// TODO: these tests ensure we can always recover from any state of the wal, // TODO: these tests ensure we can always recover from any state of the wal,
// assuming a related state of the priv val // assuming it comes with a correct related state for the priv_validator.json.
// it would be better to verify explicitly which states we can recover from without the wal // It would be better to verify explicitly which states we can recover from without the wal
// and which ones we need the wal for // and which ones we need the wal for - then we'd also be able to only flush the
// wal writer when we need to, instead of with every message.
var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data") var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data")
@ -147,7 +148,7 @@ func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*Consensu
return cs, newBlockCh, lastMsg, walDir return cs, newBlockCh, lastMsg, walDir
} }
func readJSON(t *testing.T, walMsg string) TimedWALMessage { func readTimedWALMessage(t *testing.T, walMsg string) TimedWALMessage {
var err error var err error
var msg TimedWALMessage var msg TimedWALMessage
wire.ReadJSON(&msg, []byte(walMsg), &err) wire.ReadJSON(&msg, []byte(walMsg), &err)
@ -178,8 +179,9 @@ func TestReplayCrashAfterWrite(t *testing.T) {
func TestReplayCrashBeforeWritePropose(t *testing.T) { func TestReplayCrashBeforeWritePropose(t *testing.T) {
for _, thisCase := range testCases { for _, thisCase := range testCases {
lineNum := thisCase.proposeLine lineNum := thisCase.proposeLine
cs, newBlockCh, proposalMsg, walDir := setupReplayTest(thisCase, lineNum, false) // propose // setup replay test where last message is a proposal
msg := readJSON(t, proposalMsg) cs, newBlockCh, proposalMsg, walDir := setupReplayTest(thisCase, lineNum, false)
msg := readTimedWALMessage(t, proposalMsg)
proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage) proposal := msg.Msg.(msgInfo).Msg.(*ProposalMessage)
// Set LastSig // Set LastSig
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal) toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, proposal.Proposal)
@ -201,9 +203,10 @@ func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
} }
func testReplayCrashBeforeWriteVote(t *testing.T, thisCase *testCase, lineNum int, eventString string) { func testReplayCrashBeforeWriteVote(t *testing.T, thisCase *testCase, lineNum int, eventString string) {
cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false) // prevote // setup replay test where last message is a vote
cs, newBlockCh, voteMsg, walDir := setupReplayTest(thisCase, lineNum, false)
types.AddListenerForEvent(cs.evsw, "tester", eventString, func(data types.TMEventData) { types.AddListenerForEvent(cs.evsw, "tester", eventString, func(data types.TMEventData) {
msg := readJSON(t, voteMsg) msg := readTimedWALMessage(t, voteMsg)
vote := msg.Msg.(msgInfo).Msg.(*VoteMessage) vote := msg.Msg.(msgInfo).Msg.(*VoteMessage)
// Set LastSig // Set LastSig
toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote) toPV(cs.privValidator).LastSignBytes = types.SignBytes(cs.state.ChainID, vote.Vote)