diff --git a/blockchain/reactor.go b/blockchain/reactor.go index e41300d5..f985e284 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -272,7 +272,9 @@ FOR_LOOP: // NOTE: we could improve performance if we // didn't make the app commit to disk every block // ... but we would need a way to get the hash without it persisting - err := bcR.state.ApplyBlock(bcR.eventBus, bcR.proxyAppConn, first, firstPartsHeader, types.MockMempool{}) + err := bcR.state.ApplyBlock(bcR.eventBus, bcR.proxyAppConn, + first, firstPartsHeader, + types.MockMempool{}, types.MockEvidencePool{}) // TODO unmock! if err != nil { // TODO This is bad, are we zombie? cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) diff --git a/consensus/common_test.go b/consensus/common_test.go index dd6fce66..6598c15e 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -260,8 +260,11 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state *sm. mempool.EnableTxsAvailable() } + // mock the evidence pool + evpool := types.MockEvidencePool{} + // Make ConsensusReactor - cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool) + cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool, evpool) cs.SetLogger(log.TestingLogger()) cs.SetPrivValidator(pv) diff --git a/consensus/replay.go b/consensus/replay.go index 57a9dd4c..209ea597 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -323,6 +323,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store // Note that we don't have an old version of the state, // so we by-pass state validation/mutation using sm.ExecCommitBlock. // This also means we won't be saving validator sets if they change during this period. + // TODO: Load the historical information to fix this and just use state.ApplyBlock // // If mutateState == true, the final block is replayed with h.replayBlock() @@ -354,11 +355,13 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store // ApplyBlock on the proxyApp with the last block. func (h *Handshaker) replayBlock(height int64, proxyApp proxy.AppConnConsensus) ([]byte, error) { mempool := types.MockMempool{} + evpool := types.MockEvidencePool{} block := h.store.LoadBlock(height) meta := h.store.LoadBlockMeta(height) - if err := h.state.ApplyBlock(types.NopEventBus{}, proxyApp, block, meta.BlockID.PartsHeader, mempool); err != nil { + if err := h.state.ApplyBlock(types.NopEventBus{}, proxyApp, + block, meta.BlockID.PartsHeader, mempool, evpool); err != nil { return nil, err } diff --git a/consensus/replay_file.go b/consensus/replay_file.go index d291e87c..4db58ada 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -123,7 +123,8 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { pb.cs.Stop() pb.cs.Wait() - newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, pb.cs.blockStore, pb.cs.mempool) + newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, + pb.cs.blockStore, pb.cs.mempool, pb.cs.evpool) newCS.SetEventBus(pb.cs.eventBus) newCS.startForReplay() @@ -302,7 +303,8 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err)) } - consensusState := NewConsensusState(csConfig, state.Copy(), proxyApp.Consensus(), blockStore, types.MockMempool{}) + consensusState := NewConsensusState(csConfig, state.Copy(), proxyApp.Consensus(), + blockStore, types.MockMempool{}, types.MockEvidencePool{}) consensusState.SetEventBus(eventBus) return consensusState diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 43c2a469..f1a060ec 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -262,6 +262,7 @@ const ( var ( mempool = types.MockMempool{} + evpool = types.MockEvidencePool{} ) //--------------------------------------- @@ -394,7 +395,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) { testPartSize := st.ConsensusParams.BlockPartSizeBytes - err := st.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool) + err := st.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool, evpool) if err != nil { panic(err) } diff --git a/consensus/state.go b/consensus/state.go index 67a1d821..5e83e6a5 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -78,6 +78,7 @@ type ConsensusState struct { proxyAppConn proxy.AppConnConsensus blockStore types.BlockStore mempool types.Mempool + evpool types.EvidencePool // internal state mtx sync.Mutex @@ -113,7 +114,7 @@ type ConsensusState struct { } // NewConsensusState returns a new ConsensusState. -func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool) *ConsensusState { +func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState { cs := &ConsensusState{ config: config, proxyAppConn: proxyAppConn, @@ -125,6 +126,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppCon done: make(chan struct{}), doWALCatchup: true, wal: nilWAL{}, + evpool: evpool, } // set function defaults (may be overwritten before calling Start) cs.decideProposal = cs.defaultDecideProposal @@ -863,7 +865,10 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts // Mempool validated transactions txs := cs.mempool.Reap(cs.config.MaxBlockSizeTxs) - return cs.state.MakeBlock(cs.Height, txs, commit) + block, parts := cs.state.MakeBlock(cs.Height, txs, commit) + evidence := cs.evpool.PendingEvidence() + block.AddEvidence(evidence) + return block, parts } // Enter: `timeoutPropose` after entering Propose. @@ -1203,7 +1208,9 @@ func (cs *ConsensusState) finalizeCommit(height int64) { // Execute and commit the block, update and save the state, and update the mempool. // All calls to the proxyAppConn come here. // NOTE: the block.AppHash wont reflect these txs until the next block - err := stateCopy.ApplyBlock(txEventBuffer, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) + err := stateCopy.ApplyBlock(txEventBuffer, cs.proxyAppConn, + block, blockParts.Header(), + cs.mempool, cs.evpool) if err != nil { cs.Logger.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "err", err) err := cmn.Kill() @@ -1323,19 +1330,16 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerKey string) error { _, err := cs.addVote(vote, peerKey) if err != nil { // If the vote height is off, we'll just ignore it, - // But if it's a conflicting sig, broadcast evidence tx for slashing. + // But if it's a conflicting sig, add it to the cs.evpool. // If it's otherwise invalid, punish peer. if err == ErrVoteHeightMismatch { return err - } else if _, ok := err.(*types.ErrVoteConflictingVotes); ok { + } else if voteErr, ok := err.(*types.ErrVoteConflictingVotes); ok { if bytes.Equal(vote.ValidatorAddress, cs.privValidator.GetAddress()) { cs.Logger.Error("Found conflicting vote from ourselves. Did you unsafe_reset a validator?", "height", vote.Height, "round", vote.Round, "type", vote.Type) return err } - cs.Logger.Error("Found conflicting vote. Publish evidence (TODO)", "height", vote.Height, "round", vote.Round, "type", vote.Type, "valAddr", vote.ValidatorAddress, "valIndex", vote.ValidatorIndex) - - // TODO: track evidence for inclusion in a block - + cs.evpool.AddEvidence(voteErr.DuplicateVoteEvidence) return err } else { // Probably an invalid signature / Bad peer. diff --git a/consensus/test_data/many_blocks.cswal b/consensus/test_data/many_blocks.cswal new file mode 100644 index 00000000..d443fff7 Binary files /dev/null and b/consensus/test_data/many_blocks.cswal differ diff --git a/consensus/types/state.go b/consensus/types/state.go index da4df6a4..b95131f4 100644 --- a/consensus/types/state.go +++ b/consensus/types/state.go @@ -107,8 +107,8 @@ func (rs *RoundState) StringIndented(indent string) string { %s LockedRound: %v %s LockedBlock: %v %v %s Votes: %v -%s LastCommit: %v -%s LastValidators: %v +%s LastCommit: %v +%s LastValidators:%v %s}`, indent, rs.Height, rs.Round, rs.Step, indent, rs.StartTime, diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index b46ef93d..b4efb5a9 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -66,7 +66,8 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) { } defer eventBus.Stop() mempool := types.MockMempool{} - consensusState := NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool) + evpool := types.MockEvidencePool{} + consensusState := NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool, evpool) consensusState.SetLogger(logger) consensusState.SetEventBus(eventBus) if privValidator != nil { diff --git a/evidence/pool.go b/evidence/pool.go new file mode 100644 index 00000000..2296ac02 --- /dev/null +++ b/evidence/pool.go @@ -0,0 +1,84 @@ +package evidence + +import ( + "github.com/tendermint/tmlibs/log" + + "github.com/tendermint/tendermint/types" +) + +// EvidencePool maintains a pool of valid evidence +// in an EvidenceStore. +type EvidencePool struct { + params types.EvidenceParams + logger log.Logger + + state types.State // TODO: update this on commit! + evidenceStore *EvidenceStore + + // never close + evidenceChan chan types.Evidence +} + +func NewEvidencePool(params types.EvidenceParams, evidenceStore *EvidenceStore, state types.State) *EvidencePool { + evpool := &EvidencePool{ + params: params, + logger: log.NewNopLogger(), + evidenceStore: evidenceStore, + state: state, + evidenceChan: make(chan types.Evidence), + } + return evpool +} + +// SetLogger sets the Logger. +func (evpool *EvidencePool) SetLogger(l log.Logger) { + evpool.logger = l +} + +// EvidenceChan returns an unbuffered channel on which new evidence can be received. +func (evpool *EvidencePool) EvidenceChan() <-chan types.Evidence { + return evpool.evidenceChan +} + +// PriorityEvidence returns the priority evidence. +func (evpool *EvidencePool) PriorityEvidence() []types.Evidence { + return evpool.evidenceStore.PriorityEvidence() +} + +// PendingEvidence returns all uncommitted evidence. +func (evpool *EvidencePool) PendingEvidence() []types.Evidence { + return evpool.evidenceStore.PendingEvidence() +} + +// AddEvidence checks the evidence is valid and adds it to the pool. +// Blocks on the EvidenceChan. +func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { + // TODO: check if we already have evidence for this + // validator at this height so we dont get spammed + + priority, err := evpool.state.VerifyEvidence(evidence) + if err != nil { + // TODO: if err is just that we cant find it cuz we pruned, ignore. + // TODO: if its actually bad evidence, punish peer + return err + } + + added := evpool.evidenceStore.AddNewEvidence(evidence, priority) + if !added { + // evidence already known, just ignore + return + } + + evpool.logger.Info("Verified new evidence of byzantine behaviour", "evidence", evidence) + + // never closes. always safe to send on + evpool.evidenceChan <- evidence + return nil +} + +// MarkEvidenceAsCommitted marks all the evidence as committed. +func (evpool *EvidencePool) MarkEvidenceAsCommitted(evidence []types.Evidence) { + for _, ev := range evidence { + evpool.evidenceStore.MarkEvidenceAsCommitted(ev) + } +} diff --git a/evidence/pool_test.go b/evidence/pool_test.go new file mode 100644 index 00000000..0997505c --- /dev/null +++ b/evidence/pool_test.go @@ -0,0 +1,53 @@ +package evidence + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tmlibs/db" +) + +type mockState struct{} + +func (m mockState) VerifyEvidence(ev types.Evidence) (int64, error) { + err := ev.Verify("") + return 10, err +} + +func TestEvidencePool(t *testing.T) { + assert := assert.New(t) + + params := types.EvidenceParams{} + store := NewEvidenceStore(dbm.NewMemDB()) + state := mockState{} + pool := NewEvidencePool(params, store, state) + + goodEvidence := newMockGoodEvidence(5, 1, []byte("val1")) + badEvidence := MockBadEvidence{goodEvidence} + + err := pool.AddEvidence(badEvidence) + assert.NotNil(err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + <-pool.EvidenceChan() + wg.Done() + }() + + err = pool.AddEvidence(goodEvidence) + assert.Nil(err) + wg.Wait() + + // if we send it again it wont fire on the chan + err = pool.AddEvidence(goodEvidence) + assert.Nil(err) + select { + case <-pool.EvidenceChan(): + t.Fatal("unexpected read on EvidenceChan") + default: + } +} diff --git a/evidence/reactor.go b/evidence/reactor.go new file mode 100644 index 00000000..cb9706a3 --- /dev/null +++ b/evidence/reactor.go @@ -0,0 +1,169 @@ +package evidence + +import ( + "bytes" + "fmt" + "reflect" + "time" + + wire "github.com/tendermint/go-wire" + "github.com/tendermint/tmlibs/log" + + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" +) + +const ( + EvidenceChannel = byte(0x38) + + maxEvidenceMessageSize = 1048576 // 1MB TODO make it configurable + broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often +) + +// EvidenceReactor handles evpool evidence broadcasting amongst peers. +type EvidenceReactor struct { + p2p.BaseReactor + evpool *EvidencePool + eventBus *types.EventBus +} + +// NewEvidenceReactor returns a new EvidenceReactor with the given config and evpool. +func NewEvidenceReactor(evpool *EvidencePool) *EvidenceReactor { + evR := &EvidenceReactor{ + evpool: evpool, + } + evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR) + return evR +} + +// SetLogger sets the Logger on the reactor and the underlying Evidence. +func (evR *EvidenceReactor) SetLogger(l log.Logger) { + evR.Logger = l + evR.evpool.SetLogger(l) +} + +// OnStart implements cmn.Service +func (evR *EvidenceReactor) OnStart() error { + if err := evR.BaseReactor.OnStart(); err != nil { + return err + } + go evR.broadcastRoutine() + return nil +} + +// GetChannels implements Reactor. +// It returns the list of channels for this reactor. +func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor { + return []*p2p.ChannelDescriptor{ + &p2p.ChannelDescriptor{ + ID: EvidenceChannel, + Priority: 5, + }, + } +} + +// AddPeer implements Reactor. +func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) { + // send the peer our high-priority evidence. + // the rest will be sent by the broadcastRoutine + evidences := evR.evpool.PriorityEvidence() + msg := &EvidenceListMessage{evidences} + success := peer.Send(EvidenceChannel, struct{ EvidenceMessage }{msg}) + if !success { + // TODO: remove peer ? + } +} + +// RemovePeer implements Reactor. +func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) { + // nothing to do +} + +// Receive implements Reactor. +// It adds any received evidence to the evpool. +func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { + _, msg, err := DecodeMessage(msgBytes) + if err != nil { + evR.Logger.Error("Error decoding message", "err", err) + return + } + evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) + + switch msg := msg.(type) { + case *EvidenceListMessage: + for _, ev := range msg.Evidence { + err := evR.evpool.AddEvidence(ev) + if err != nil { + evR.Logger.Info("Evidence is not valid", "evidence", msg.Evidence, "err", err) + // TODO: punish peer + } + } + default: + evR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) + } +} + +// SetEventSwitch implements events.Eventable. +func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) { + evR.eventBus = b +} + +// Broadcast new evidence to all peers. +// Broadcasts must be non-blocking so routine is always available to read off EvidenceChan. +func (evR *EvidenceReactor) broadcastRoutine() { + ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS) + for { + select { + case evidence := <-evR.evpool.EvidenceChan(): + // broadcast some new evidence + msg := &EvidenceListMessage{[]types.Evidence{evidence}} + evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg}) + + // TODO: Broadcast runs asynchronously, so this should wait on the successChan + // in another routine before marking to be proper. + evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence) + case <-ticker.C: + // broadcast all pending evidence + msg := &EvidenceListMessage{evR.evpool.PendingEvidence()} + evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg}) + case <-evR.Quit: + return + } + } +} + +//----------------------------------------------------------------------------- +// Messages + +const ( + msgTypeEvidence = byte(0x01) +) + +// EvidenceMessage is a message sent or received by the EvidenceReactor. +type EvidenceMessage interface{} + +var _ = wire.RegisterInterface( + struct{ EvidenceMessage }{}, + wire.ConcreteType{&EvidenceListMessage{}, msgTypeEvidence}, +) + +// DecodeMessage decodes a byte-array into a EvidenceMessage. +func DecodeMessage(bz []byte) (msgType byte, msg EvidenceMessage, err error) { + msgType = bz[0] + n := new(int) + r := bytes.NewReader(bz) + msg = wire.ReadBinary(struct{ EvidenceMessage }{}, r, maxEvidenceMessageSize, n, &err).(struct{ EvidenceMessage }).EvidenceMessage + return +} + +//------------------------------------- + +// EvidenceMessage contains a list of evidence. +type EvidenceListMessage struct { + Evidence []types.Evidence +} + +// String returns a string representation of the EvidenceListMessage. +func (m *EvidenceListMessage) String() string { + return fmt.Sprintf("[EvidenceListMessage %v]", m.Evidence) +} diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go new file mode 100644 index 00000000..fb83667c --- /dev/null +++ b/evidence/reactor_test.go @@ -0,0 +1,127 @@ +package evidence + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/go-kit/kit/log/term" + + dbm "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" +) + +// evidenceLogger is a TestingLogger which uses a different +// color for each validator ("validator" key must exist). +func evidenceLogger() log.Logger { + return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { + for i := 0; i < len(keyvals)-1; i += 2 { + if keyvals[i] == "validator" { + return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} + } + } + return term.FgBgColor{} + }) +} + +// connect N evidence reactors through N switches +func makeAndConnectEvidenceReactors(config *cfg.Config, N int) []*EvidenceReactor { + reactors := make([]*EvidenceReactor, N) + logger := evidenceLogger() + for i := 0; i < N; i++ { + + params := types.EvidenceParams{} + store := NewEvidenceStore(dbm.NewMemDB()) + state := mockState{} + pool := NewEvidencePool(params, store, state) + reactors[i] = NewEvidenceReactor(pool) + reactors[i].SetLogger(logger.With("validator", i)) + } + + p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { + s.AddReactor("EVIDENCE", reactors[i]) + return s + + }, p2p.Connect2Switches) + return reactors +} + +// wait for all evidence on all reactors +func waitForEvidence(t *testing.T, evs types.EvidenceList, reactors []*EvidenceReactor) { + // wait for the evidence in all evpools + wg := new(sync.WaitGroup) + for i := 0; i < len(reactors); i++ { + wg.Add(1) + go _waitForEvidence(t, wg, evs, i, reactors) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + timer := time.After(TIMEOUT) + select { + case <-timer: + t.Fatal("Timed out waiting for evidence") + case <-done: + } +} + +// wait for all evidence on a single evpool +func _waitForEvidence(t *testing.T, wg *sync.WaitGroup, evs types.EvidenceList, reactorIdx int, reactors []*EvidenceReactor) { + + evpool := reactors[reactorIdx].evpool + for len(evpool.PendingEvidence()) != len(evs) { + time.Sleep(time.Millisecond * 100) + } + + reapedEv := evpool.PendingEvidence() + // put the reaped evidence is a map so we can quickly check we got everything + evMap := make(map[string]types.Evidence) + for _, e := range reapedEv { + evMap[string(e.Hash())] = e + } + for i, expectedEv := range evs { + gotEv := evMap[string(expectedEv.Hash())] + assert.Equal(t, expectedEv, gotEv, + fmt.Sprintf("evidence at index %d on reactor %d don't match: %v vs %v", + i, reactorIdx, expectedEv, gotEv)) + } + wg.Done() +} + +func sendEvidence(t *testing.T, evpool *EvidencePool, n int) types.EvidenceList { + evList := make([]types.Evidence, n) + for i := 0; i < n; i++ { + ev := newMockGoodEvidence(int64(i), 2, []byte("val")) + err := evpool.AddEvidence(ev) + assert.Nil(t, err) + evList[i] = ev + } + return evList +} + +var ( + NUM_EVIDENCE = 1000 + TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow +) + +func TestReactorBroadcastEvidence(t *testing.T) { + config := cfg.TestConfig() + N := 7 + reactors := makeAndConnectEvidenceReactors(config, N) + + // send a bunch of evidence to the first reactor's evpool + // and wait for them all to be received in the others + evList := sendEvidence(t, reactors[0].evpool, NUM_EVIDENCE) + waitForEvidence(t, evList, reactors) +} diff --git a/evidence/store.go b/evidence/store.go new file mode 100644 index 00000000..fd40b533 --- /dev/null +++ b/evidence/store.go @@ -0,0 +1,186 @@ +package evidence + +import ( + "fmt" + + wire "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tmlibs/db" +) + +/* +Requirements: + - Valid new evidence must be persisted immediately and never forgotten + - Uncommitted evidence must be continuously broadcast + - Uncommitted evidence has a partial order, the evidence's priority + +Impl: + - First commit atomically in outqueue, pending, lookup. + - Once broadcast, remove from outqueue. No need to sync + - Once committed, atomically remove from pending and update lookup. + - TODO: If we crash after committed but before removing/updating, + we'll be stuck broadcasting evidence we never know we committed. + so either share the state db and atomically MarkCommitted + with ApplyBlock, or check all outqueue/pending on Start to see if its committed + +Schema for indexing evidence (note you need both height and hash to find a piece of evidence): + +"evidence-lookup"// -> EvidenceInfo +"evidence-outqueue"/// -> EvidenceInfo +"evidence-pending"// -> EvidenceInfo +*/ + +type EvidenceInfo struct { + Committed bool + Priority int64 + Evidence types.Evidence +} + +const ( + baseKeyLookup = "evidence-lookup" // all evidence + baseKeyOutqueue = "evidence-outqueue" // not-yet broadcast + baseKeyPending = "evidence-pending" // broadcast but not committed +) + +func keyLookup(evidence types.Evidence) []byte { + return keyLookupFromHeightAndHash(evidence.Height(), evidence.Hash()) +} + +// big endian padded hex +func bE(h int64) string { + return fmt.Sprintf("%0.16X", h) +} + +func keyLookupFromHeightAndHash(height int64, hash []byte) []byte { + return _key("%s/%s/%X", baseKeyLookup, bE(height), hash) +} + +func keyOutqueue(evidence types.Evidence, priority int64) []byte { + return _key("%s/%s/%s/%X", baseKeyOutqueue, bE(priority), bE(evidence.Height()), evidence.Hash()) +} + +func keyPending(evidence types.Evidence) []byte { + return _key("%s/%s/%X", baseKeyPending, bE(evidence.Height()), evidence.Hash()) +} + +func _key(fmt_ string, o ...interface{}) []byte { + return []byte(fmt.Sprintf(fmt_, o...)) +} + +// EvidenceStore is a store of all the evidence we've seen, including +// evidence that has been committed, evidence that has been verified but not broadcast, +// and evidence that has been broadcast but not yet committed. +type EvidenceStore struct { + db dbm.DB +} + +func NewEvidenceStore(db dbm.DB) *EvidenceStore { + return &EvidenceStore{ + db: db, + } +} + +// PriorityEvidence returns the evidence from the outqueue, sorted by highest priority. +func (store *EvidenceStore) PriorityEvidence() (evidence []types.Evidence) { + // reverse the order so highest priority is first + l := store.ListEvidence(baseKeyOutqueue) + l2 := make([]types.Evidence, len(l)) + for i := range l { + l2[i] = l[len(l)-1-i] + } + return l2 +} + +// PendingEvidence returns all known uncommitted evidence. +func (store *EvidenceStore) PendingEvidence() (evidence []types.Evidence) { + return store.ListEvidence(baseKeyPending) +} + +// ListEvidence lists the evidence for the given prefix key. +// It is wrapped by PriorityEvidence and PendingEvidence for convenience. +func (store *EvidenceStore) ListEvidence(prefixKey string) (evidence []types.Evidence) { + iter := store.db.IteratorPrefix([]byte(prefixKey)) + for iter.Next() { + val := iter.Value() + + var ei EvidenceInfo + wire.ReadBinaryBytes(val, &ei) + evidence = append(evidence, ei.Evidence) + } + return evidence +} + +// GetEvidence fetches the evidence with the given height and hash. +func (store *EvidenceStore) GetEvidence(height int64, hash []byte) *EvidenceInfo { + key := keyLookupFromHeightAndHash(height, hash) + val := store.db.Get(key) + + if len(val) == 0 { + return nil + } + var ei EvidenceInfo + wire.ReadBinaryBytes(val, &ei) + return &ei +} + +// AddNewEvidence adds the given evidence to the database. +// It returns false if the evidence is already stored. +func (store *EvidenceStore) AddNewEvidence(evidence types.Evidence, priority int64) bool { + // check if we already have seen it + ei_ := store.GetEvidence(evidence.Height(), evidence.Hash()) + if ei_ != nil && ei_.Evidence != nil { + return false + } + + ei := EvidenceInfo{ + Committed: false, + Priority: priority, + Evidence: evidence, + } + eiBytes := wire.BinaryBytes(ei) + + // add it to the store + key := keyOutqueue(evidence, priority) + store.db.Set(key, eiBytes) + + key = keyPending(evidence) + store.db.Set(key, eiBytes) + + key = keyLookup(evidence) + store.db.SetSync(key, eiBytes) + + return true +} + +// MarkEvidenceAsBroadcasted removes evidence from Outqueue. +func (store *EvidenceStore) MarkEvidenceAsBroadcasted(evidence types.Evidence) { + ei := store.getEvidenceInfo(evidence) + key := keyOutqueue(evidence, ei.Priority) + store.db.Delete(key) +} + +// MarkEvidenceAsPending removes evidence from pending and outqueue and sets the state to committed. +func (store *EvidenceStore) MarkEvidenceAsCommitted(evidence types.Evidence) { + // if its committed, its been broadcast + store.MarkEvidenceAsBroadcasted(evidence) + + pendingKey := keyPending(evidence) + store.db.Delete(pendingKey) + + ei := store.getEvidenceInfo(evidence) + ei.Committed = true + + lookupKey := keyLookup(evidence) + store.db.SetSync(lookupKey, wire.BinaryBytes(ei)) +} + +//--------------------------------------------------- +// utils + +func (store *EvidenceStore) getEvidenceInfo(evidence types.Evidence) EvidenceInfo { + key := keyLookup(evidence) + var ei EvidenceInfo + b := store.db.Get(key) + wire.ReadBinaryBytes(b, &ei) + return ei +} diff --git a/evidence/store_test.go b/evidence/store_test.go new file mode 100644 index 00000000..7828d37b --- /dev/null +++ b/evidence/store_test.go @@ -0,0 +1,164 @@ +package evidence + +import ( + "bytes" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + wire "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/types" + dbm "github.com/tendermint/tmlibs/db" +) + +//------------------------------------------- + +func TestStoreAddDuplicate(t *testing.T) { + assert := assert.New(t) + + db := dbm.NewMemDB() + store := NewEvidenceStore(db) + + priority := int64(10) + ev := newMockGoodEvidence(2, 1, []byte("val1")) + + added := store.AddNewEvidence(ev, priority) + assert.True(added) + + // cant add twice + added = store.AddNewEvidence(ev, priority) + assert.False(added) +} + +func TestStoreMark(t *testing.T) { + assert := assert.New(t) + + db := dbm.NewMemDB() + store := NewEvidenceStore(db) + + // before we do anything, priority/pending are empty + priorityEv := store.PriorityEvidence() + pendingEv := store.PendingEvidence() + assert.Equal(0, len(priorityEv)) + assert.Equal(0, len(pendingEv)) + + priority := int64(10) + ev := newMockGoodEvidence(2, 1, []byte("val1")) + + added := store.AddNewEvidence(ev, priority) + assert.True(added) + + // get the evidence. verify. should be uncommitted + ei := store.GetEvidence(ev.Height(), ev.Hash()) + assert.Equal(ev, ei.Evidence) + assert.Equal(priority, ei.Priority) + assert.False(ei.Committed) + + // new evidence should be returns in priority/pending + priorityEv = store.PriorityEvidence() + pendingEv = store.PendingEvidence() + assert.Equal(1, len(priorityEv)) + assert.Equal(1, len(pendingEv)) + + // priority is now empty + store.MarkEvidenceAsBroadcasted(ev) + priorityEv = store.PriorityEvidence() + pendingEv = store.PendingEvidence() + assert.Equal(0, len(priorityEv)) + assert.Equal(1, len(pendingEv)) + + // priority and pending are now empty + store.MarkEvidenceAsCommitted(ev) + priorityEv = store.PriorityEvidence() + pendingEv = store.PendingEvidence() + assert.Equal(0, len(priorityEv)) + assert.Equal(0, len(pendingEv)) + + // evidence should show committed + ei = store.GetEvidence(ev.Height(), ev.Hash()) + assert.Equal(ev, ei.Evidence) + assert.Equal(priority, ei.Priority) + assert.True(ei.Committed) +} + +func TestStorePriority(t *testing.T) { + assert := assert.New(t) + + db := dbm.NewMemDB() + store := NewEvidenceStore(db) + + // sorted by priority and then height + cases := []struct { + ev MockGoodEvidence + priority int64 + }{ + {newMockGoodEvidence(2, 1, []byte("val1")), 17}, + {newMockGoodEvidence(5, 2, []byte("val2")), 15}, + {newMockGoodEvidence(10, 2, []byte("val2")), 13}, + {newMockGoodEvidence(100, 2, []byte("val2")), 11}, + {newMockGoodEvidence(90, 2, []byte("val2")), 11}, + {newMockGoodEvidence(80, 2, []byte("val2")), 11}, + } + + for _, c := range cases { + added := store.AddNewEvidence(c.ev, c.priority) + assert.True(added) + } + + evList := store.PriorityEvidence() + for i, ev := range evList { + assert.Equal(ev, cases[i].ev) + } +} + +//------------------------------------------- +const ( + evidenceTypeMock = byte(0x01) +) + +var _ = wire.RegisterInterface( + struct{ types.Evidence }{}, + wire.ConcreteType{MockGoodEvidence{}, evidenceTypeMock}, +) + +type MockGoodEvidence struct { + Height_ int64 + Address_ []byte + Index_ int +} + +func newMockGoodEvidence(height int64, index int, address []byte) MockGoodEvidence { + return MockGoodEvidence{height, address, index} +} + +func (e MockGoodEvidence) Height() int64 { return e.Height_ } +func (e MockGoodEvidence) Address() []byte { return e.Address_ } +func (e MockGoodEvidence) Index() int { return e.Index_ } +func (e MockGoodEvidence) Hash() []byte { + return []byte(fmt.Sprintf("%d-%d", e.Height_, e.Index_)) +} +func (e MockGoodEvidence) Verify(chainID string) error { return nil } +func (e MockGoodEvidence) Equal(ev types.Evidence) bool { + e2 := ev.(MockGoodEvidence) + return e.Height_ == e2.Height_ && + bytes.Equal(e.Address_, e2.Address_) && + e.Index_ == e2.Index_ +} +func (e MockGoodEvidence) String() string { + return fmt.Sprintf("GoodEvidence: %d/%s/%d", e.Height_, e.Address_, e.Index_) +} + +type MockBadEvidence struct { + MockGoodEvidence +} + +func (e MockBadEvidence) Verify(chainID string) error { return fmt.Errorf("MockBadEvidence") } +func (e MockBadEvidence) Equal(ev types.Evidence) bool { + e2 := ev.(MockBadEvidence) + return e.Height_ == e2.Height_ && + bytes.Equal(e.Address_, e2.Address_) && + e.Index_ == e2.Index_ +} +func (e MockBadEvidence) String() string { + return fmt.Sprintf("BadEvidence: %d/%s/%d", e.Height_, e.Address_, e.Index_) +} diff --git a/mempool/reactor.go b/mempool/reactor.go index 9aed416f..4523f824 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -100,17 +100,10 @@ type PeerState interface { GetHeight() int64 } -// Peer describes a peer. -type Peer interface { - IsRunning() bool - Send(byte, interface{}) bool - Get(string) interface{} -} - // Send new mempool txs to peer. // TODO: Handle mempool or reactor shutdown? // As is this routine may block forever if no new txs come in. -func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) { +func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { if !memR.config.Broadcast { return } diff --git a/node/node.go b/node/node.go index 352c13cc..53eab6e0 100644 --- a/node/node.go +++ b/node/node.go @@ -19,6 +19,7 @@ import ( bc "github.com/tendermint/tendermint/blockchain" cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" + "github.com/tendermint/tendermint/evidence" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/trust" @@ -107,6 +108,7 @@ type Node struct { mempoolReactor *mempl.MempoolReactor // for gossipping transactions consensusState *consensus.ConsensusState // latest consensus state consensusReactor *consensus.ConsensusReactor // for participating in the consensus + evidencePool *evidence.EvidencePool // tracking evidence proxyApp proxy.AppConns // connection to the application rpcListeners []net.Listener // rpc servers txIndexer txindex.TxIndexer @@ -128,9 +130,6 @@ func NewNode(config *cfg.Config, } blockStore := bc.NewBlockStore(blockStoreDB) - consensusLogger := logger.With("module", "consensus") - stateLogger := logger.With("module", "state") - // Get State stateDB, err := dbProvider(&DBContext{"state", config}) if err != nil { @@ -149,6 +148,7 @@ func NewNode(config *cfg.Config, saveGenesisDoc(stateDB, genDoc) } + stateLogger := logger.With("module", "state") state := sm.LoadState(stateDB) if state == nil { state, err = sm.MakeGenesisState(stateDB, genDoc) @@ -161,6 +161,7 @@ func NewNode(config *cfg.Config, // Create the proxyApp, which manages connections (consensus, mempool, query) // and sync tendermint and the app by replaying any necessary blocks + consensusLogger := logger.With("module", "consensus") handshaker := consensus.NewHandshaker(state, blockStore) handshaker.SetLogger(consensusLogger) proxyApp := proxy.NewAppConns(clientCreator, handshaker) @@ -208,8 +209,21 @@ func NewNode(config *cfg.Config, mempool.EnableTxsAvailable() } + // Make Evidence Reactor + evidenceDB, err := dbProvider(&DBContext{"evidence", config}) + if err != nil { + return nil, err + } + evidenceLogger := logger.With("module", "evidence") + evidenceStore := evidence.NewEvidenceStore(evidenceDB) + evidencePool := evidence.NewEvidencePool(state.ConsensusParams.EvidenceParams, evidenceStore, state.Copy()) + evidencePool.SetLogger(evidenceLogger) + evidenceReactor := evidence.NewEvidenceReactor(evidencePool) + evidenceReactor.SetLogger(evidenceLogger) + // Make ConsensusReactor - consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool) + consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), + proxyApp.Consensus(), blockStore, mempool, evidencePool) consensusState.SetLogger(consensusLogger) if privValidator != nil { consensusState.SetPrivValidator(privValidator) @@ -224,6 +238,7 @@ func NewNode(config *cfg.Config, sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("BLOCKCHAIN", bcReactor) sw.AddReactor("CONSENSUS", consensusReactor) + sw.AddReactor("EVIDENCE", evidenceReactor) // Optionally, start the pex reactor var addrBook *p2p.AddrBook @@ -323,6 +338,7 @@ func NewNode(config *cfg.Config, mempoolReactor: mempoolReactor, consensusState: consensusState, consensusReactor: consensusReactor, + evidencePool: evidencePool, proxyApp: proxyApp, txIndexer: txIndexer, indexerService: indexerService, @@ -416,6 +432,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetBlockStore(n.blockStore) rpccore.SetConsensusState(n.consensusState) rpccore.SetMempool(n.mempoolReactor.Mempool) + rpccore.SetEvidencePool(n.evidencePool) rpccore.SetSwitch(n.sw) rpccore.SetPubKey(n.privValidator.GetPubKey()) rpccore.SetGenesisDoc(n.genesisDoc) @@ -489,6 +506,11 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor { return n.mempoolReactor } +// EvidencePool returns the Node's EvidencePool. +func (n *Node) EvidencePool() *evidence.EvidencePool { + return n.evidencePool +} + // EventBus returns the Node's EventBus. func (n *Node) EventBus() *types.EventBus { return n.eventBus diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index d0b0f87d..325625c7 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -45,6 +45,7 @@ var ( // interfaces defined in types and above blockStore types.BlockStore mempool types.Mempool + evidencePool types.EvidencePool consensusState Consensus p2pSwitch P2P @@ -67,6 +68,10 @@ func SetMempool(mem types.Mempool) { mempool = mem } +func SetEvidencePool(evpool types.EvidencePool) { + evidencePool = evpool +} + func SetConsensusState(cs Consensus) { consensusState = cs } diff --git a/state/execution.go b/state/execution.go index dc38ac98..c9686152 100644 --- a/state/execution.go +++ b/state/execution.go @@ -309,6 +309,12 @@ func (s *State) validateBlock(b *types.Block) error { } } + for _, ev := range b.Evidence.Evidence { + if _, err := s.VerifyEvidence(ev); err != nil { + return types.NewEvidenceInvalidErr(ev, err) + } + } + return nil } @@ -320,7 +326,8 @@ func (s *State) validateBlock(b *types.Block) error { // commits it, and saves the block and state. It's the only function that needs to be called // from outside this package to process and commit an entire block. func (s *State) ApplyBlock(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, - block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error { + block *types.Block, partsHeader types.PartSetHeader, + mempool types.Mempool, evpool types.EvidencePool) error { abciResponses, err := s.ValExecBlock(txEventPublisher, proxyAppConn, block) if err != nil { @@ -348,6 +355,8 @@ func (s *State) ApplyBlock(txEventPublisher types.TxEventPublisher, proxyAppConn fail.Fail() // XXX + evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence) + // save the state and the validators s.Save() diff --git a/state/execution_test.go b/state/execution_test.go index 25d32cbb..7cda5c1d 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -93,7 +93,10 @@ func TestApplyBlock(t *testing.T) { block := makeBlock(state, 1) - err = state.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), types.MockMempool{}) + err = state.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), + block, block.MakePartSet(testPartSize).Header(), + types.MockMempool{}, types.MockEvidencePool{}) + require.Nil(t, err) // TODO check state and mempool diff --git a/state/state.go b/state/state.go index 0dd105cb..773b46fc 100644 --- a/state/state.go +++ b/state/state.go @@ -195,6 +195,7 @@ func (s *State) LoadABCIResponses(height int64) (*ABCIResponses, error) { } // LoadValidators loads the ValidatorSet for a given height. +// Returns ErrNoValSetForHeight if the validator set can't be found for this height. func (s *State) LoadValidators(height int64) (*types.ValidatorSet, error) { valInfo := s.loadValidatorsInfo(height) if valInfo == nil { @@ -382,6 +383,43 @@ func (s *State) GetValidators() (last *types.ValidatorSet, current *types.Valida return s.LastValidators, s.Validators } +// VerifyEvidence verifies the evidence fully by checking it is internally +// consistent and corresponds to an existing or previous validator. +// It returns the priority of this evidence, or an error. +// NOTE: return error may be ErrNoValSetForHeight, in which case the validator set +// for the evidence height could not be loaded. +func (s *State) VerifyEvidence(evidence types.Evidence) (priority int64, err error) { + evidenceAge := s.LastBlockHeight - evidence.Height() + maxAge := s.ConsensusParams.EvidenceParams.MaxAge + if evidenceAge > maxAge { + return priority, fmt.Errorf("Evidence from height %d is too old. Min height is %d", + evidence.Height(), s.LastBlockHeight-maxAge) + } + + if err := evidence.Verify(s.ChainID); err != nil { + return priority, err + } + + // The address must have been an active validator at the height + ev := evidence + height, addr, idx := ev.Height(), ev.Address(), ev.Index() + valset, err := s.LoadValidators(height) + if err != nil { + // XXX/TODO: what do we do if we can't load the valset? + // eg. if we have pruned the state or height is too high? + return priority, err + } + valIdx, val := valset.GetByAddress(addr) + if val == nil { + return priority, fmt.Errorf("Address %X was not a validator at height %d", addr, height) + } else if idx != valIdx { + return priority, fmt.Errorf("Address %X was validator %d at height %d, not %d", addr, valIdx, height, idx) + } + + priority = val.VotingPower + return priority, nil +} + //------------------------------------------------------------------------ // ABCIResponses retains the responses diff --git a/types/block.go b/types/block.go index 29775466..6aa97c2d 100644 --- a/types/block.go +++ b/types/block.go @@ -19,7 +19,8 @@ import ( type Block struct { *Header `json:"header"` *Data `json:"data"` - LastCommit *Commit `json:"last_commit"` + Evidence EvidenceData `json:"evidence"` + LastCommit *Commit `json:"last_commit"` } // MakeBlock returns a new block with an empty header, except what can be computed from itself. @@ -40,6 +41,11 @@ func MakeBlock(height int64, txs []Tx, commit *Commit) *Block { return block } +// AddEvidence appends the given evidence to the block +func (b *Block) AddEvidence(evidence []Evidence) { + b.Evidence.Evidence = append(b.Evidence.Evidence, evidence...) +} + // ValidateBasic performs basic validation that doesn't involve state data. // It checks the internal consistency of the block. func (b *Block) ValidateBasic() error { @@ -58,6 +64,9 @@ func (b *Block) ValidateBasic() error { if !bytes.Equal(b.DataHash, b.Data.Hash()) { return fmt.Errorf("Wrong Block.Header.DataHash. Expected %v, got %v", b.DataHash, b.Data.Hash()) } + if !bytes.Equal(b.EvidenceHash, b.Evidence.Hash()) { + return errors.New(cmn.Fmt("Wrong Block.Header.EvidenceHash. Expected %v, got %v", b.EvidenceHash, b.Evidence.Hash())) + } return nil } @@ -69,12 +78,14 @@ func (b *Block) FillHeader() { if b.DataHash == nil { b.DataHash = b.Data.Hash() } + if b.EvidenceHash == nil { + b.EvidenceHash = b.Evidence.Hash() + } } // Hash computes and returns the block hash. // If the block is incomplete, block hash is nil for safety. func (b *Block) Hash() data.Bytes { - // fmt.Println(">>", b.Data) if b == nil || b.Header == nil || b.Data == nil || b.LastCommit == nil { return nil } @@ -114,9 +125,11 @@ func (b *Block) StringIndented(indent string) string { %s %v %s %v %s %v +%s %v %s}#%v`, indent, b.Header.StringIndented(indent+" "), indent, b.Data.StringIndented(indent+" "), + indent, b.Evidence.StringIndented(indent+" "), indent, b.LastCommit.StringIndented(indent+" "), indent, b.Hash()) } @@ -134,6 +147,7 @@ func (b *Block) StringShort() string { // Header defines the structure of a Tendermint block header // TODO: limit header size +// NOTE: changes to the Header should be duplicated in the abci Header type Header struct { // basic block info ChainID string `json:"chain_id"` @@ -154,6 +168,9 @@ type Header struct { ConsensusHash data.Bytes `json:"consensus_hash"` // consensus params for current block AppHash data.Bytes `json:"app_hash"` // state after txs from the previous block LastResultsHash data.Bytes `json:"last_results_hash"` // root hash of all results from the txs from the previous block + + // consensus info + EvidenceHash data.Bytes `json:"evidence_hash"` // evidence included in the block } // Hash returns the hash of the header. @@ -175,6 +192,7 @@ func (h *Header) Hash() data.Bytes { "App": h.AppHash, "Consensus": h.ConsensusHash, "Results": h.LastResultsHash, + "Evidence": h.EvidenceHash, }) } @@ -196,6 +214,7 @@ func (h *Header) StringIndented(indent string) string { %s App: %v %s Conensus: %v %s Results: %v +%s Evidence: %v %s}#%v`, indent, h.ChainID, indent, h.Height, @@ -209,6 +228,7 @@ func (h *Header) StringIndented(indent string) string { indent, h.AppHash, indent, h.ConsensusHash, indent, h.LastResultsHash, + indent, h.EvidenceHash, indent, h.Hash()) } @@ -413,6 +433,45 @@ func (data *Data) StringIndented(indent string) string { indent, data.hash) } +//----------------------------------------------------------------------------- + +// EvidenceData contains any evidence of malicious wrong-doing by validators +type EvidenceData struct { + Evidence EvidenceList `json:"evidence"` + + // Volatile + hash data.Bytes +} + +// Hash returns the hash of the data. +func (data *EvidenceData) Hash() data.Bytes { + if data.hash == nil { + data.hash = data.Evidence.Hash() + } + return data.hash +} + +// StringIndented returns a string representation of the evidence. +func (data *EvidenceData) StringIndented(indent string) string { + if data == nil { + return "nil-Evidence" + } + evStrings := make([]string, cmn.MinInt(len(data.Evidence), 21)) + for i, ev := range data.Evidence { + if i == 20 { + evStrings[i] = fmt.Sprintf("... (%v total)", len(data.Evidence)) + break + } + evStrings[i] = fmt.Sprintf("Evidence:%v", ev) + } + return fmt.Sprintf(`Data{ +%s %v +%s}#%v`, + indent, strings.Join(evStrings, "\n"+indent+" "), + indent, data.hash) + return "" +} + //-------------------------------------------------------------------------------- // BlockID defines the unique ID of a block as its Hash and its PartSetHeader diff --git a/types/block_test.go b/types/block_test.go index bde47440..1fcfa469 100644 --- a/types/block_test.go +++ b/types/block_test.go @@ -10,7 +10,7 @@ import ( func TestValidateBlock(t *testing.T) { txs := []Tx{Tx("foo"), Tx("bar")} - lastID := makeBlockID() + lastID := makeBlockIDRandom() h := int64(3) voteSet, _, vals := randVoteSet(h-1, 1, VoteTypePrecommit, @@ -58,7 +58,18 @@ func TestValidateBlock(t *testing.T) { require.Error(t, err) } -func makeBlockID() BlockID { +func makeBlockIDRandom() BlockID { blockHash, blockPartsHeader := crypto.CRandBytes(32), PartSetHeader{123, crypto.CRandBytes(32)} return BlockID{blockHash, blockPartsHeader} } + +func makeBlockID(hash string, partSetSize int, partSetHash string) BlockID { + return BlockID{ + Hash: []byte(hash), + PartsHeader: PartSetHeader{ + Total: partSetSize, + Hash: []byte(partSetHash), + }, + } + +} diff --git a/types/evidence.go b/types/evidence.go new file mode 100644 index 00000000..94bab1a3 --- /dev/null +++ b/types/evidence.go @@ -0,0 +1,169 @@ +package types + +import ( + "bytes" + "fmt" + + "github.com/tendermint/go-crypto" + wire "github.com/tendermint/go-wire" + "github.com/tendermint/tmlibs/merkle" +) + +// ErrEvidenceInvalid wraps a piece of evidence and the error denoting how or why it is invalid. +type ErrEvidenceInvalid struct { + Evidence Evidence + ErrorValue error +} + +func NewEvidenceInvalidErr(ev Evidence, err error) *ErrEvidenceInvalid { + return &ErrEvidenceInvalid{ev, err} +} + +// Error returns a string representation of the error. +func (err *ErrEvidenceInvalid) Error() string { + return fmt.Sprintf("Invalid evidence: %v. Evidence: %v", err.ErrorValue, err.Evidence) +} + +//------------------------------------------- + +// Evidence represents any provable malicious activity by a validator +type Evidence interface { + Height() int64 // height of the equivocation + Address() []byte // address of the equivocating validator + Index() int // index of the validator in the validator set + Hash() []byte // hash of the evidence + Verify(chainID string) error // verify the evidence + Equal(Evidence) bool // check equality of evidence + + String() string +} + +//------------------------------------------- + +// EvidenceList is a list of Evidence. Evidences is not a word. +type EvidenceList []Evidence + +// Hash returns the simple merkle root hash of the EvidenceList. +func (evl EvidenceList) Hash() []byte { + // Recursive impl. + // Copied from tmlibs/merkle to avoid allocations + switch len(evl) { + case 0: + return nil + case 1: + return evl[0].Hash() + default: + left := EvidenceList(evl[:(len(evl)+1)/2]).Hash() + right := EvidenceList(evl[(len(evl)+1)/2:]).Hash() + return merkle.SimpleHashFromTwoHashes(left, right) + } +} + +func (evl EvidenceList) String() string { + s := "" + for _, e := range evl { + s += fmt.Sprintf("%s\t\t", e) + } + return s +} + +// Has returns true if the evidence is in the EvidenceList. +func (evl EvidenceList) Has(evidence Evidence) bool { + for _, ev := range evl { + if ev.Equal(evidence) { + return true + } + } + return false +} + +//------------------------------------------- + +const ( + evidenceTypeDuplicateVote = byte(0x01) +) + +var _ = wire.RegisterInterface( + struct{ Evidence }{}, + wire.ConcreteType{&DuplicateVoteEvidence{}, evidenceTypeDuplicateVote}, +) + +//------------------------------------------- + +// DuplicateVoteEvidence contains evidence a validator signed two conflicting votes. +type DuplicateVoteEvidence struct { + PubKey crypto.PubKey + VoteA *Vote + VoteB *Vote +} + +// String returns a string representation of the evidence. +func (dve *DuplicateVoteEvidence) String() string { + return fmt.Sprintf("VoteA: %v; VoteB: %v", dve.VoteA, dve.VoteB) + +} + +// Height returns the height this evidence refers to. +func (dve *DuplicateVoteEvidence) Height() int64 { + return dve.VoteA.Height +} + +// Address returns the address of the validator. +func (dve *DuplicateVoteEvidence) Address() []byte { + return dve.PubKey.Address() +} + +// Index returns the index of the validator. +func (dve *DuplicateVoteEvidence) Index() int { + return dve.VoteA.ValidatorIndex +} + +// Hash returns the hash of the evidence. +func (dve *DuplicateVoteEvidence) Hash() []byte { + return merkle.SimpleHashFromBinary(dve) +} + +// Verify returns an error if the two votes aren't conflicting. +// To be conflicting, they must be from the same validator, for the same H/R/S, but for different blocks. +func (dve *DuplicateVoteEvidence) Verify(chainID string) error { + // H/R/S must be the same + if dve.VoteA.Height != dve.VoteB.Height || + dve.VoteA.Round != dve.VoteB.Round || + dve.VoteA.Type != dve.VoteB.Type { + return fmt.Errorf("DuplicateVoteEvidence Error: H/R/S does not match. Got %v and %v", dve.VoteA, dve.VoteB) + } + + // Address must be the same + if !bytes.Equal(dve.VoteA.ValidatorAddress, dve.VoteB.ValidatorAddress) { + return fmt.Errorf("DuplicateVoteEvidence Error: Validator addresses do not match. Got %X and %X", dve.VoteA.ValidatorAddress, dve.VoteB.ValidatorAddress) + } + // XXX: Should we enforce index is the same ? + if dve.VoteA.ValidatorIndex != dve.VoteB.ValidatorIndex { + return fmt.Errorf("DuplicateVoteEvidence Error: Validator indices do not match. Got %d and %d", dve.VoteA.ValidatorIndex, dve.VoteB.ValidatorIndex) + } + + // BlockIDs must be different + if dve.VoteA.BlockID.Equals(dve.VoteB.BlockID) { + return fmt.Errorf("DuplicateVoteEvidence Error: BlockIDs are the same (%v) - not a real duplicate vote!", dve.VoteA.BlockID) + } + + // Signatures must be valid + if !dve.PubKey.VerifyBytes(SignBytes(chainID, dve.VoteA), dve.VoteA.Signature) { + return fmt.Errorf("DuplicateVoteEvidence Error verifying VoteA: %v", ErrVoteInvalidSignature) + } + if !dve.PubKey.VerifyBytes(SignBytes(chainID, dve.VoteB), dve.VoteB.Signature) { + return fmt.Errorf("DuplicateVoteEvidence Error verifying VoteB: %v", ErrVoteInvalidSignature) + } + + return nil +} + +// Equal checks if two pieces of evidence are equal. +func (dve *DuplicateVoteEvidence) Equal(ev Evidence) bool { + if _, ok := ev.(*DuplicateVoteEvidence); !ok { + return false + } + + // just check their hashes + return bytes.Equal(merkle.SimpleHashFromBinary(dve), merkle.SimpleHashFromBinary(ev)) +} diff --git a/types/evidence_test.go b/types/evidence_test.go new file mode 100644 index 00000000..876b68ad --- /dev/null +++ b/types/evidence_test.go @@ -0,0 +1,73 @@ +package types + +import ( + "testing" + + "github.com/stretchr/testify/assert" + cmn "github.com/tendermint/tmlibs/common" +) + +type voteData struct { + vote1 *Vote + vote2 *Vote + valid bool +} + +func makeVote(val *PrivValidatorFS, chainID string, valIndex int, height int64, round, step int, blockID BlockID) *Vote { + v := &Vote{ + ValidatorAddress: val.PubKey.Address(), + ValidatorIndex: valIndex, + Height: height, + Round: round, + Type: byte(step), + BlockID: blockID, + } + sig := val.PrivKey.Sign(SignBytes(chainID, v)) + v.Signature = sig + return v + +} + +func TestEvidence(t *testing.T) { + _, tmpFilePath := cmn.Tempfile("priv_validator_") + val := GenPrivValidatorFS(tmpFilePath) + val2 := GenPrivValidatorFS(tmpFilePath) + blockID := makeBlockID("blockhash", 1000, "partshash") + blockID2 := makeBlockID("blockhash2", 1000, "partshash") + blockID3 := makeBlockID("blockhash", 10000, "partshash") + blockID4 := makeBlockID("blockhash", 10000, "partshash2") + + chainID := "mychain" + + vote1 := makeVote(val, chainID, 0, 10, 2, 1, blockID) + badVote := makeVote(val, chainID, 0, 10, 2, 1, blockID) + badVote.Signature = val2.PrivKey.Sign(SignBytes(chainID, badVote)) + + cases := []voteData{ + {vote1, makeVote(val, chainID, 0, 10, 2, 1, blockID2), true}, // different block ids + {vote1, makeVote(val, chainID, 0, 10, 2, 1, blockID3), true}, + {vote1, makeVote(val, chainID, 0, 10, 2, 1, blockID4), true}, + {vote1, makeVote(val, chainID, 0, 10, 2, 1, blockID), false}, // wrong block id + {vote1, makeVote(val, "mychain2", 0, 10, 2, 1, blockID2), false}, // wrong chain id + {vote1, makeVote(val, chainID, 1, 10, 2, 1, blockID2), false}, // wrong val index + {vote1, makeVote(val, chainID, 0, 11, 2, 1, blockID2), false}, // wrong height + {vote1, makeVote(val, chainID, 0, 10, 3, 1, blockID2), false}, // wrong round + {vote1, makeVote(val, chainID, 0, 10, 2, 2, blockID2), false}, // wrong step + {vote1, makeVote(val2, chainID, 0, 10, 2, 1, blockID), false}, // wrong validator + {vote1, badVote, false}, // signed by wrong key + } + + for _, c := range cases { + ev := &DuplicateVoteEvidence{ + PubKey: val.PubKey, + VoteA: c.vote1, + VoteB: c.vote2, + } + if c.valid { + assert.Nil(t, ev.Verify(chainID), "evidence should be valid") + } else { + assert.NotNil(t, ev.Verify(chainID), "evidence should be invalid") + } + } + +} diff --git a/types/params.go b/types/params.go index 19e86d44..90ab5f65 100644 --- a/types/params.go +++ b/types/params.go @@ -14,9 +14,10 @@ const ( // ConsensusParams contains consensus critical parameters // that determine the validity of blocks. type ConsensusParams struct { - BlockSize `json:"block_size_params"` - TxSize `json:"tx_size_params"` - BlockGossip `json:"block_gossip_params"` + BlockSize `json:"block_size_params"` + TxSize `json:"tx_size_params"` + BlockGossip `json:"block_gossip_params"` + EvidenceParams `json:"evidence_params"` } // BlockSize contain limits on the block size. @@ -37,12 +38,18 @@ type BlockGossip struct { BlockPartSizeBytes int `json:"block_part_size_bytes"` // NOTE: must not be 0 } +// EvidenceParams determine how we handle evidence of malfeasance +type EvidenceParams struct { + MaxAge int64 `json:"max_age"` // only accept new evidence more recent than this +} + // DefaultConsensusParams returns a default ConsensusParams. func DefaultConsensusParams() *ConsensusParams { return &ConsensusParams{ DefaultBlockSize(), DefaultTxSize(), DefaultBlockGossip(), + DefaultEvidenceParams(), } } @@ -70,6 +77,13 @@ func DefaultBlockGossip() BlockGossip { } } +// DefaultEvidence Params returns a default EvidenceParams. +func DefaultEvidenceParams() EvidenceParams { + return EvidenceParams{ + MaxAge: 100000, // 27.8 hrs at 1block/s + } +} + // Validate validates the ConsensusParams to ensure all values // are within their allowed limits, and returns an error if they are not. func (params *ConsensusParams) Validate() error { diff --git a/types/services.go b/types/services.go index 0e007554..787b1b99 100644 --- a/types/services.go +++ b/types/services.go @@ -4,7 +4,7 @@ import ( abci "github.com/tendermint/abci/types" ) -// NOTE: all types in this file are considered UNSTABLE +// NOTE/XXX: all type definitions in this file are considered UNSTABLE //------------------------------------------------------ // blockchain services types @@ -14,7 +14,7 @@ import ( //------------------------------------------------------ // mempool -// Mempool defines the mempool interface. +// Mempool defines the mempool interface as used by the ConsensusState. // Updates to the mempool need to be synchronized with committing a block // so apps can reset their transient state on Commit // UNSTABLE @@ -63,9 +63,38 @@ type BlockStoreRPC interface { LoadSeenCommit(height int64) *Commit } -// BlockStore defines the BlockStore interface. +// BlockStore defines the BlockStore interface used by the ConsensusState. // UNSTABLE type BlockStore interface { BlockStoreRPC SaveBlock(block *Block, blockParts *PartSet, seenCommit *Commit) } + +//------------------------------------------------------ +// state + +// State defines the stateful interface used to verify evidence. +// UNSTABLE +type State interface { + VerifyEvidence(Evidence) (priority int64, err error) +} + +//------------------------------------------------------ +// evidence pool + +// EvidencePool defines the EvidencePool interface used by the ConsensusState. +// UNSTABLE +type EvidencePool interface { + PendingEvidence() []Evidence + AddEvidence(Evidence) error + MarkEvidenceAsCommitted([]Evidence) +} + +// MockMempool is an empty implementation of a Mempool, useful for testing. +// UNSTABLE +type MockEvidencePool struct { +} + +func (m MockEvidencePool) PendingEvidence() []Evidence { return nil } +func (m MockEvidencePool) AddEvidence(Evidence) error { return nil } +func (m MockEvidencePool) MarkEvidenceAsCommitted([]Evidence) {} diff --git a/types/tx.go b/types/tx.go index 5761b83e..4cf5843a 100644 --- a/types/tx.go +++ b/types/tx.go @@ -10,18 +10,18 @@ import ( "github.com/tendermint/tmlibs/merkle" ) -// Tx represents a transaction, which may contain arbitrary bytes. +// Tx is an arbitrary byte array. +// NOTE: Tx has no types at this level, so when go-wire encoded it's just length-prefixed. +// Alternatively, it may make sense to add types here and let +// []byte be type 0x1 so we can have versioned txs if need be in the future. type Tx []byte -// Hash returns the hash of the go-wire encoded Tx. -// Tx has no types at this level, so go-wire encoding only adds length-prefix. -// NOTE: It may make sense to add types here one day and let []byte be type 0x1 -// so we can have versioned txs if need be in the future. +// Hash computes the RIPEMD160 hash of the go-wire encoded transaction. func (tx Tx) Hash() []byte { return merkle.SimpleHashFromBinary(tx) } -// String returns a string representation of the Tx. +// String returns the hex-encoded transaction as a string. func (tx Tx) String() string { return fmt.Sprintf("Tx{%X}", []byte(tx)) } @@ -29,7 +29,7 @@ func (tx Tx) String() string { // Txs is a slice of Tx. type Txs []Tx -// Hash returns the simple Merkle root hash of the Txs. +// Hash returns the simple Merkle root hash of the transactions. func (txs Txs) Hash() []byte { // Recursive impl. // Copied from tmlibs/merkle to avoid allocations @@ -87,6 +87,7 @@ func (txs Txs) Proof(i int) TxProof { } } +// TxProof represents a Merkle proof of the presence of a transaction in the Merkle tree. type TxProof struct { Index, Total int RootHash data.Bytes @@ -94,12 +95,13 @@ type TxProof struct { Proof merkle.SimpleProof } +// LeadHash returns the hash of the transaction this proof refers to. func (tp TxProof) LeafHash() []byte { return tp.Data.Hash() } -// Validate returns nil if it matches the dataHash, and is internally consistent -// otherwise, returns a sensible error +// Validate verifies the proof. It returns nil if the RootHash matches the dataHash argument, +// and if the proof is internally consistent. Otherwise, it returns a sensible error. func (tp TxProof) Validate(dataHash []byte) error { if !bytes.Equal(dataHash, tp.RootHash) { return errors.New("Proof matches different data hash") diff --git a/types/vote.go b/types/vote.go index aa993e33..4397152c 100644 --- a/types/vote.go +++ b/types/vote.go @@ -1,6 +1,7 @@ package types import ( + "bytes" "errors" "fmt" "io" @@ -13,21 +14,31 @@ import ( ) var ( - ErrVoteUnexpectedStep = errors.New("Unexpected step") - ErrVoteInvalidValidatorIndex = errors.New("Invalid validator index") - ErrVoteInvalidValidatorAddress = errors.New("Invalid validator address") - ErrVoteInvalidSignature = errors.New("Invalid signature") - ErrVoteInvalidBlockHash = errors.New("Invalid block hash") - ErrVoteNil = errors.New("Nil vote") + ErrVoteUnexpectedStep = errors.New("Unexpected step") + ErrVoteInvalidValidatorIndex = errors.New("Invalid validator index") + ErrVoteInvalidValidatorAddress = errors.New("Invalid validator address") + ErrVoteInvalidSignature = errors.New("Invalid signature") + ErrVoteInvalidBlockHash = errors.New("Invalid block hash") + ErrVoteNonDeterministicSignature = errors.New("Non-deterministic signature") + ErrVoteNil = errors.New("Nil vote") ) type ErrVoteConflictingVotes struct { - VoteA *Vote - VoteB *Vote + *DuplicateVoteEvidence } func (err *ErrVoteConflictingVotes) Error() string { - return "Conflicting votes" + return fmt.Sprintf("Conflicting votes from validator %v", err.PubKey.Address()) +} + +func NewConflictingVoteError(val *Validator, voteA, voteB *Vote) *ErrVoteConflictingVotes { + return &ErrVoteConflictingVotes{ + &DuplicateVoteEvidence{ + PubKey: val.PubKey, + VoteA: voteA, + VoteB: voteB, + }, + } } // Types of votes @@ -92,3 +103,14 @@ func (vote *Vote) String() string { cmn.Fingerprint(vote.BlockID.Hash), vote.Signature, CanonicalTime(vote.Timestamp)) } + +func (vote *Vote) Verify(chainID string, pubKey crypto.PubKey) error { + if !bytes.Equal(pubKey.Address(), vote.ValidatorAddress) { + return ErrVoteInvalidValidatorAddress + } + + if !pubKey.VerifyBytes(SignBytes(chainID, vote), vote.Signature) { + return ErrVoteInvalidSignature + } + return nil +} diff --git a/types/vote_set.go b/types/vote_set.go index 941852a8..34f98956 100644 --- a/types/vote_set.go +++ b/types/vote_set.go @@ -6,6 +6,8 @@ import ( "strings" "sync" + "github.com/pkg/errors" + cmn "github.com/tendermint/tmlibs/common" ) @@ -145,27 +147,32 @@ func (voteSet *VoteSet) addVote(vote *Vote) (added bool, err error) { // Ensure that validator index was set if valIndex < 0 { - return false, ErrVoteInvalidValidatorIndex + return false, errors.Wrap(ErrVoteInvalidValidatorIndex, "Index < 0") } else if len(valAddr) == 0 { - return false, ErrVoteInvalidValidatorAddress + return false, errors.Wrap(ErrVoteInvalidValidatorAddress, "Empty address") } // Make sure the step matches. if (vote.Height != voteSet.height) || (vote.Round != voteSet.round) || (vote.Type != voteSet.type_) { - return false, ErrVoteUnexpectedStep + return false, errors.Wrapf(ErrVoteUnexpectedStep, "Got %d/%d/%d, expected %d/%d/%d", + voteSet.height, voteSet.round, voteSet.type_, + vote.Height, vote.Round, vote.Type) } // Ensure that signer is a validator. lookupAddr, val := voteSet.valSet.GetByIndex(valIndex) if val == nil { - return false, ErrVoteInvalidValidatorIndex + return false, errors.Wrapf(ErrVoteInvalidValidatorIndex, + "Cannot find validator %d in valSet of size %d", valIndex, voteSet.valSet.Size()) } // Ensure that the signer has the right address if !bytes.Equal(valAddr, lookupAddr) { - return false, ErrVoteInvalidValidatorAddress + return false, errors.Wrapf(ErrVoteInvalidValidatorAddress, + "vote.ValidatorAddress (%X) does not match address (%X) for vote.ValidatorIndex (%d)", + valAddr, lookupAddr, valIndex) } // If we already know of this vote, return false. @@ -173,23 +180,19 @@ func (voteSet *VoteSet) addVote(vote *Vote) (added bool, err error) { if existing.Signature.Equals(vote.Signature) { return false, nil // duplicate } else { - return false, ErrVoteInvalidSignature // NOTE: assumes deterministic signatures + return false, errors.Wrapf(ErrVoteNonDeterministicSignature, "Existing vote: %v; New vote: %v", existing, vote) } } // Check signature. - if !val.PubKey.VerifyBytes(SignBytes(voteSet.chainID, vote), vote.Signature) { - // Bad signature. - return false, ErrVoteInvalidSignature + if err := vote.Verify(voteSet.chainID, val.PubKey); err != nil { + return false, errors.Wrapf(err, "Failed to verify vote with ChainID %s and PubKey %s", voteSet.chainID, val.PubKey) } // Add vote and get conflicting vote if any added, conflicting := voteSet.addVerifiedVote(vote, blockKey, val.VotingPower) if conflicting != nil { - return added, &ErrVoteConflictingVotes{ - VoteA: conflicting, - VoteB: vote, - } + return added, NewConflictingVoteError(val, conflicting, vote) } else { if !added { cmn.PanicSanity("Expected to add non-conflicting vote")