cleanup evidence pkg. state.VerifyEvidence

This commit is contained in:
Ethan Buchman
2017-11-19 01:34:11 +00:00
parent f7731d38f6
commit 6c4a0f9363
6 changed files with 115 additions and 75 deletions

View File

@ -9,44 +9,45 @@ import (
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
const ( const (
EvidencePoolChannel = byte(0x38) EvidenceChannel = byte(0x38)
maxEvidencePoolMessageSize = 1048576 // 1MB TODO make it configurable maxEvidenceMessageSize = 1048576 // 1MB TODO make it configurable
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often broadcastEvidenceIntervalS = 60 // broadcast uncommitted evidence this often
) )
// EvidencePoolReactor handles evpool evidence broadcasting amongst peers. // EvidenceReactor handles evpool evidence broadcasting amongst peers.
type EvidencePoolReactor struct { type EvidenceReactor struct {
p2p.BaseReactor p2p.BaseReactor
config *EvidencePoolConfig config *cfg.EvidenceConfig
evpool *EvidencePool evpool *EvidencePool
evsw types.EventSwitch eventBus *types.EventBus
} }
// NewEvidencePoolReactor returns a new EvidencePoolReactor with the given config and evpool. // NewEvidenceReactor returns a new EvidenceReactor with the given config and evpool.
func NewEvidencePoolReactor(config *EvidencePoolConfig, evpool *EvidencePool) *EvidencePoolReactor { func NewEvidenceReactor(config *cfg.EvidenceConfig, evpool *EvidencePool) *EvidenceReactor {
evR := &EvidencePoolReactor{ evR := &EvidenceReactor{
config: config, config: config,
evpool: evpool, evpool: evpool,
} }
evR.BaseReactor = *p2p.NewBaseReactor("EvidencePoolReactor", evR) evR.BaseReactor = *p2p.NewBaseReactor("EvidenceReactor", evR)
return evR return evR
} }
// SetLogger sets the Logger on the reactor and the underlying EvidencePool. // SetLogger sets the Logger on the reactor and the underlying Evidence.
func (evR *EvidencePoolReactor) SetLogger(l log.Logger) { func (evR *EvidenceReactor) SetLogger(l log.Logger) {
evR.Logger = l evR.Logger = l
evR.evpool.SetLogger(l) evR.evpool.SetLogger(l)
} }
// OnStart implements cmn.Service // OnStart implements cmn.Service
func (evR *EvidencePoolReactor) OnStart() error { func (evR *EvidenceReactor) OnStart() error {
if err := evR.BaseReactor.OnStart(); err != nil { if err := evR.BaseReactor.OnStart(); err != nil {
return err return err
} }
@ -56,35 +57,35 @@ func (evR *EvidencePoolReactor) OnStart() error {
// GetChannels implements Reactor. // GetChannels implements Reactor.
// It returns the list of channels for this reactor. // It returns the list of channels for this reactor.
func (evR *EvidencePoolReactor) GetChannels() []*p2p.ChannelDescriptor { func (evR *EvidenceReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{ return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{ &p2p.ChannelDescriptor{
ID: EvidencePoolChannel, ID: EvidenceChannel,
Priority: 5, Priority: 5,
}, },
} }
} }
// AddPeer implements Reactor. // AddPeer implements Reactor.
func (evR *EvidencePoolReactor) AddPeer(peer p2p.Peer) { func (evR *EvidenceReactor) AddPeer(peer p2p.Peer) {
// send the peer our high-priority evidence. // send the peer our high-priority evidence.
// the rest will be sent by the broadcastRoutine // the rest will be sent by the broadcastRoutine
evidence := evR.evpool.PriorityEvidence() evidence := evR.evpool.PriorityEvidence()
msg := EvidenceMessage{evidence} msg := EvidenceListMessage{evidence}
success := peer.Send(EvidencePoolChannel, struct{ EvidencePoolMessage }{msg}) success := peer.Send(EvidenceChannel, struct{ EvidenceMessage }{msg})
if !success { if !success {
// TODO: remove peer ? // TODO: remove peer ?
} }
} }
// RemovePeer implements Reactor. // RemovePeer implements Reactor.
func (evR *EvidencePoolReactor) RemovePeer(peer p2p.Peer, reason interface{}) { func (evR *EvidenceReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
// nothing to do // nothing to do
} }
// Receive implements Reactor. // Receive implements Reactor.
// It adds any received evidence to the evpool. // It adds any received evidence to the evpool.
func (evR *EvidencePoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { func (evR *EvidenceReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
_, msg, err := DecodeMessage(msgBytes) _, msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
evR.Logger.Error("Error decoding message", "err", err) evR.Logger.Error("Error decoding message", "err", err)
@ -93,7 +94,7 @@ func (evR *EvidencePoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte
evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) evR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
switch msg := msg.(type) { switch msg := msg.(type) {
case *EvidenceMessage: case *EvidenceListMessage:
for _, ev := range msg.Evidence { for _, ev := range msg.Evidence {
err := evR.evpool.AddEvidence(ev) err := evR.evpool.AddEvidence(ev)
if err != nil { if err != nil {
@ -107,28 +108,28 @@ func (evR *EvidencePoolReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte
} }
// SetEventSwitch implements events.Eventable. // SetEventSwitch implements events.Eventable.
func (evR *EvidencePoolReactor) SetEventSwitch(evsw types.EventSwitch) { func (evR *EvidenceReactor) SetEventBus(b *types.EventBus) {
evR.evsw = evsw evR.eventBus = b
} }
// broadcast new evidence to all peers. // broadcast new evidence to all peers.
// broadcasts must be non-blocking so routine is always available to read off EvidenceChan. // broadcasts must be non-blocking so routine is always available to read off EvidenceChan.
func (evR *EvidencePoolReactor) broadcastRoutine() { func (evR *EvidenceReactor) broadcastRoutine() {
ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS) ticker := time.NewTicker(time.Second * broadcastEvidenceIntervalS)
for { for {
select { select {
case evidence := <-evR.evpool.EvidenceChan(): case evidence := <-evR.evpool.EvidenceChan():
// broadcast some new evidence // broadcast some new evidence
msg := EvidenceMessage{[]types.Evidence{evidence}} msg := EvidenceListMessage{[]types.Evidence{evidence}}
evR.Switch.Broadcast(EvidencePoolChannel, struct{ EvidencePoolMessage }{msg}) evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
// TODO: Broadcast runs asynchronously, so this should wait on the successChan // TODO: Broadcast runs asynchronously, so this should wait on the successChan
// in another routine before marking to be proper. // in another routine before marking to be proper.
evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence) evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence)
case <-ticker.C: case <-ticker.C:
// broadcast all pending evidence // broadcast all pending evidence
msg := EvidenceMessage{evR.evpool.PendingEvidence()} msg := EvidenceListMessage{evR.evpool.PendingEvidence()}
evR.Switch.Broadcast(EvidencePoolChannel, struct{ EvidencePoolMessage }{msg}) evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
case <-evR.Quit: case <-evR.Quit:
return return
} }
@ -142,31 +143,31 @@ const (
msgTypeEvidence = byte(0x01) msgTypeEvidence = byte(0x01)
) )
// EvidencePoolMessage is a message sent or received by the EvidencePoolReactor. // EvidenceMessage is a message sent or received by the EvidenceReactor.
type EvidencePoolMessage interface{} type EvidenceMessage interface{}
var _ = wire.RegisterInterface( var _ = wire.RegisterInterface(
struct{ EvidencePoolMessage }{}, struct{ EvidenceMessage }{},
wire.ConcreteType{&EvidenceMessage{}, msgTypeEvidence}, wire.ConcreteType{&EvidenceListMessage{}, msgTypeEvidence},
) )
// DecodeMessage decodes a byte-array into a EvidencePoolMessage. // DecodeMessage decodes a byte-array into a EvidenceMessage.
func DecodeMessage(bz []byte) (msgType byte, msg EvidencePoolMessage, err error) { func DecodeMessage(bz []byte) (msgType byte, msg EvidenceMessage, err error) {
msgType = bz[0] msgType = bz[0]
n := new(int) n := new(int)
r := bytes.NewReader(bz) r := bytes.NewReader(bz)
msg = wire.ReadBinary(struct{ EvidencePoolMessage }{}, r, maxEvidencePoolMessageSize, n, &err).(struct{ EvidencePoolMessage }).EvidencePoolMessage msg = wire.ReadBinary(struct{ EvidenceMessage }{}, r, maxEvidenceMessageSize, n, &err).(struct{ EvidenceMessage }).EvidenceMessage
return return
} }
//------------------------------------- //-------------------------------------
// EvidenceMessage contains a list of evidence. // EvidenceMessage contains a list of evidence.
type EvidenceMessage struct { type EvidenceListMessage struct {
Evidence []types.Evidence Evidence []types.Evidence
} }
// String returns a string representation of the EvidenceMessage. // String returns a string representation of the EvidenceListMessage.
func (m *EvidenceMessage) String() string { func (m *EvidenceListMessage) String() string {
return fmt.Sprintf("[EvidenceMessage %v]", m.Evidence) return fmt.Sprintf("[EvidenceListMessage %v]", m.Evidence)
} }

View File

@ -55,14 +55,14 @@ type EvidenceStore struct {
db dbm.DB db dbm.DB
// so we can verify evidence was from a real validator // so we can verify evidence was from a real validator
historicalValidators types.HistoricalValidators state types.State
} }
func NewEvidenceStore(chainID string, db dbm.DB, vals types.HistoricalValidators) *EvidenceStore { func NewEvidenceStore(chainID string, db dbm.DB, state types.State) *EvidenceStore {
return &EvidenceStore{ return &EvidenceStore{
chainID: chainID, chainID: chainID,
db: db, db: db,
historicalValidators: vals, state: state,
} }
} }
@ -101,16 +101,11 @@ func (store *EvidenceStore) AddNewEvidence(evidence types.Evidence) (bool, error
return false, nil return false, nil
} }
// verify evidence consistency priority, err := store.state.VerifyEvidence(evidence)
if err := evidence.Verify(store.chainID, store.historicalValidators); err != nil { if err != nil {
return false, err return false, err
} }
// TODO: or we let Verify return the val to avoid running this again?
valSet := store.historicalValidators.LoadValidators(evidence.Height())
_, val := valSet.GetByAddress(evidence.Address())
priority := int(val.VotingPower)
ei := evidenceInfo{ ei := evidenceInfo{
Committed: false, Committed: false,
Priority: priority, Priority: priority,

View File

@ -310,7 +310,7 @@ func (s *State) validateBlock(b *types.Block) error {
} }
for _, ev := range block.Evidence.Evidences { for _, ev := range block.Evidence.Evidences {
if err := ev.Verify(s.ChainID, s); err != nil { if _, err := s.VerifyEvidence(ev); err != nil {
return types.NewEvidenceInvalidErr(ev, err) return types.NewEvidenceInvalidErr(ev, err)
} }
// TODO: mark evidence as committed // TODO: mark evidence as committed

View File

@ -195,6 +195,7 @@ func (s *State) LoadABCIResponses(height int64) (*ABCIResponses, error) {
} }
// LoadValidators loads the ValidatorSet for a given height. // LoadValidators loads the ValidatorSet for a given height.
// Returns ErrNoValSetForHeight if the validator set can't be found for this height.
func (s *State) LoadValidators(height int64) (*types.ValidatorSet, error) { func (s *State) LoadValidators(height int64) (*types.ValidatorSet, error) {
valInfo := s.loadValidatorsInfo(height) valInfo := s.loadValidatorsInfo(height)
if valInfo == nil { if valInfo == nil {
@ -382,6 +383,36 @@ func (s *State) GetValidators() (last *types.ValidatorSet, current *types.Valida
return s.LastValidators, s.Validators return s.LastValidators, s.Validators
} }
// VerifyEvidence verifies the evidence fully by checking it is internally
// consistent and corresponds to an existing or previous validator.
// It returns the priority of this evidence, or an error.
// NOTE: return error may be ErrLoadValidators, in which case the validator set
// for the evidence height could not be loaded.
func (s *State) VerifyEvidence(evidence types.Evidence) (priority int, err error) {
if err := evidence.Verify(s.ChainID); err != nil {
return priority, err
}
// The address must have been an active validator at the height
ev := evidence
height, addr, idx := ev.Height(), ev.Address(), ev.Index()
valset, err := s.LoadValidators(height)
if err != nil {
// XXX/TODO: what do we do if we can't load the valset?
// eg. if we have pruned the state or height is too high?
return priority, err
}
valIdx, val := valset.GetByAddress(addr)
if val == nil {
return priority, fmt.Errorf("Address %X was not a validator at height %d", addr, height)
} else if idx != valIdx {
return priority, fmt.Errorf("Address %X was validator %d at height %d, not %d", addr, valIdx, height, idx)
}
priority = int(val.VotingPower)
return priority, nil
}
//------------------------------------------------------------------------ //------------------------------------------------------------------------
// ABCIResponses retains the responses // ABCIResponses retains the responses

View File

@ -27,17 +27,17 @@ func (err *ErrEvidenceInvalid) Error() string {
//------------------------------------------- //-------------------------------------------
type HistoricalValidators interface { type HistoricalValidators interface {
LoadValidators(height int) *ValidatorSet LoadValidators(height int) (*ValidatorSet, error)
} }
// Evidence represents any provable malicious activity by a validator // Evidence represents any provable malicious activity by a validator
type Evidence interface { type Evidence interface {
Height() int // height of the equivocation Height() int // height of the equivocation
Address() []byte // address of the equivocating validator Address() []byte // address of the equivocating validator
Index() int // index of the validator in the validator set Index() int // index of the validator in the validator set
Hash() []byte // hash of the evidence Hash() []byte // hash of the evidence
Verify(chainID string, vals HistoricalValidators) error // verify the evidence Verify(chainID string) error // verify the evidence
Equal(Evidence) bool // check equality of evidence Equal(Evidence) bool // check equality of evidence
String() string String() string
} }
@ -178,7 +178,7 @@ func (dve *DuplicateVoteEvidence) Hash() []byte {
// Verify returns an error if the two votes aren't conflicting. // Verify returns an error if the two votes aren't conflicting.
// To be conflicting, they must be from the same validator, for the same H/R/S, but for different blocks. // To be conflicting, they must be from the same validator, for the same H/R/S, but for different blocks.
func (dve *DuplicateVoteEvidence) Verify(chainID string, vals HistoricalValidators) error { func (dve *DuplicateVoteEvidence) Verify(chainID string) error {
// TODO: verify (cs.Height - dve.Height) < MaxHeightDiff // TODO: verify (cs.Height - dve.Height) < MaxHeightDiff
@ -211,18 +211,6 @@ func (dve *DuplicateVoteEvidence) Verify(chainID string, vals HistoricalValidato
return ErrVoteInvalidSignature return ErrVoteInvalidSignature
} }
// The address must have been an active validator at the height
height := dve.Height()
addr := dve.Address()
idx := dve.Index()
valset := vals.LoadValidators(height)
valIdx, val := valset.GetByAddress(addr)
if val == nil {
return fmt.Errorf("Address %X was not a validator at height %d", addr, height)
} else if idx != valIdx {
return fmt.Errorf("Address %X was validator %d at height %d, not %d", addr, valIdx, height, idx)
}
return nil return nil
} }

View File

@ -14,7 +14,7 @@ import (
//------------------------------------------------------ //------------------------------------------------------
// mempool // mempool
// Mempool defines the mempool interface. // Mempool defines the mempool interface as used by the ConsensusState.
// Updates to the mempool need to be synchronized with committing a block // Updates to the mempool need to be synchronized with committing a block
// so apps can reset their transient state on Commit // so apps can reset their transient state on Commit
// UNSTABLE // UNSTABLE
@ -63,9 +63,34 @@ type BlockStoreRPC interface {
LoadSeenCommit(height int64) *Commit LoadSeenCommit(height int64) *Commit
} }
// BlockStore defines the BlockStore interface. // BlockStore defines the BlockStore interface used by the ConsensusState.
// UNSTABLE // UNSTABLE
type BlockStore interface { type BlockStore interface {
BlockStoreRPC BlockStoreRPC
SaveBlock(block *Block, blockParts *PartSet, seenCommit *Commit) SaveBlock(block *Block, blockParts *PartSet, seenCommit *Commit)
} }
//------------------------------------------------------
// state
type State interface {
VerifyEvidence(Evidence) (priority int, err error)
}
//------------------------------------------------------
// evidence pool
// EvidencePool defines the EvidencePool interface used by the ConsensusState.
// UNSTABLE
type EvidencePool interface {
PendingEvidence() []Evidence
AddEvidence(Evidence)
}
// MockMempool is an empty implementation of a Mempool, useful for testing.
// UNSTABLE
type MockEvidencePool struct {
}
func (m MockEvidencePool) PendingEvidence() []Evidence { return nil }
func (m MockEvidencePool) AddEvidence(Evidence) {}