mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 14:52:17 +00:00
evidence: dont send evidence to unsynced peers
* only send evidence to peers that are synced enough to validate it all * closes #1624
This commit is contained in:
parent
3445f1206e
commit
f1c53c7358
@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/tendermint/go-amino"
|
"github.com/tendermint/go-amino"
|
||||||
"github.com/tendermint/tmlibs/log"
|
"github.com/tendermint/tmlibs/log"
|
||||||
|
|
||||||
|
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||||
"github.com/tendermint/tendermint/p2p"
|
"github.com/tendermint/tendermint/p2p"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
@ -118,21 +119,48 @@ func (evR *EvidenceReactor) broadcastRoutine() {
|
|||||||
case evidence := <-evR.evpool.EvidenceChan():
|
case evidence := <-evR.evpool.EvidenceChan():
|
||||||
// broadcast some new evidence
|
// broadcast some new evidence
|
||||||
msg := &EvidenceListMessage{[]types.Evidence{evidence}}
|
msg := &EvidenceListMessage{[]types.Evidence{evidence}}
|
||||||
evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
|
evR.broadcastEvidenceListMsg(msg)
|
||||||
|
|
||||||
// TODO: Broadcast runs asynchronously, so this should wait on the successChan
|
// TODO: the broadcast here is just doing TrySend.
|
||||||
// in another routine before marking to be proper.
|
// We should make sure the send succeeds before marking broadcasted.
|
||||||
evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence)
|
evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence)
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
// broadcast all pending evidence
|
// broadcast all pending evidence
|
||||||
msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
|
msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
|
||||||
evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
|
evR.broadcastEvidenceListMsg(msg)
|
||||||
case <-evR.Quit():
|
case <-evR.Quit():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (evR *EvidenceReactor) broadcastEvidenceListMsg(msg *EvidenceListMessage) {
|
||||||
|
// NOTE: we dont send evidence to peers higher than their height,
|
||||||
|
// because they can't validate it (don't have validators from the height).
|
||||||
|
// So, for now, only send the `msg` to peers synced to the highest height in the list.
|
||||||
|
// TODO: send each peer all the evidence below its current height -
|
||||||
|
// might require a routine per peer, like the mempool.
|
||||||
|
|
||||||
|
var maxHeight int64
|
||||||
|
for _, ev := range msg.Evidence {
|
||||||
|
if ev.Height() > maxHeight {
|
||||||
|
maxHeight = ev.Height()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, peer := range evR.Switch.Peers().List() {
|
||||||
|
ps := peer.Get(types.PeerStateKey).(PeerState)
|
||||||
|
rs := ps.GetRoundState()
|
||||||
|
if rs.Height >= maxHeight {
|
||||||
|
peer.TrySend(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeerState interface {
|
||||||
|
GetRoundState() *cstypes.PeerRoundState
|
||||||
|
}
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
//-----------------------------------------------------------------------------
|
||||||
// Messages
|
// Messages
|
||||||
|
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/tendermint/tmlibs/log"
|
"github.com/tendermint/tmlibs/log"
|
||||||
|
|
||||||
cfg "github.com/tendermint/tendermint/config"
|
cfg "github.com/tendermint/tendermint/config"
|
||||||
|
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||||
"github.com/tendermint/tendermint/p2p"
|
"github.com/tendermint/tendermint/p2p"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
@ -130,8 +131,50 @@ func TestReactorBroadcastEvidence(t *testing.T) {
|
|||||||
// make reactors from statedb
|
// make reactors from statedb
|
||||||
reactors := makeAndConnectEvidenceReactors(config, stateDBs)
|
reactors := makeAndConnectEvidenceReactors(config, stateDBs)
|
||||||
|
|
||||||
|
// set the peer height on each reactor
|
||||||
|
for _, r := range reactors {
|
||||||
|
for _, peer := range r.Switch.Peers().List() {
|
||||||
|
ps := peerState{height}
|
||||||
|
peer.Set(types.PeerStateKey, ps)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// send a bunch of valid evidence to the first reactor's evpool
|
// send a bunch of valid evidence to the first reactor's evpool
|
||||||
// and wait for them all to be received in the others
|
// and wait for them all to be received in the others
|
||||||
evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE)
|
evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE)
|
||||||
waitForEvidence(t, evList, reactors)
|
waitForEvidence(t, evList, reactors)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type peerState struct {
|
||||||
|
height int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps peerState) GetRoundState() *cstypes.PeerRoundState {
|
||||||
|
return &cstypes.PeerRoundState{
|
||||||
|
Height: ps.height,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestReactorSelectiveBroadcast(t *testing.T) {
|
||||||
|
config := cfg.TestConfig()
|
||||||
|
|
||||||
|
valAddr := []byte("myval")
|
||||||
|
height1 := int64(NUM_EVIDENCE) + 10
|
||||||
|
height2 := int64(NUM_EVIDENCE) / 2
|
||||||
|
|
||||||
|
// DB1 is ahead of DB2
|
||||||
|
stateDB1 := initializeValidatorState(valAddr, height1)
|
||||||
|
stateDB2 := initializeValidatorState(valAddr, height2)
|
||||||
|
|
||||||
|
// make reactors from statedb
|
||||||
|
reactors := makeAndConnectEvidenceReactors(config, []dbm.DB{stateDB1, stateDB2})
|
||||||
|
peer := reactors[0].Switch.Peers().List()[0]
|
||||||
|
ps := peerState{height2}
|
||||||
|
peer.Set(types.PeerStateKey, ps)
|
||||||
|
|
||||||
|
// send a bunch of valid evidence to the first reactor's evpool
|
||||||
|
evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE)
|
||||||
|
|
||||||
|
// only ones less than the peers height should make it through
|
||||||
|
waitForEvidence(t, evList[:NUM_EVIDENCE/2], reactors[1:2])
|
||||||
|
}
|
||||||
|
@ -164,7 +164,7 @@ func (store *EvidenceStore) MarkEvidenceAsBroadcasted(evidence types.Evidence) {
|
|||||||
store.db.Delete(key)
|
store.db.Delete(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarkEvidenceAsPending removes evidence from pending and outqueue and sets the state to committed.
|
// MarkEvidenceAsCommitted removes evidence from pending and outqueue and sets the state to committed.
|
||||||
func (store *EvidenceStore) MarkEvidenceAsCommitted(evidence types.Evidence) {
|
func (store *EvidenceStore) MarkEvidenceAsCommitted(evidence types.Evidence) {
|
||||||
// if its committed, its been broadcast
|
// if its committed, its been broadcast
|
||||||
store.MarkEvidenceAsBroadcasted(evidence)
|
store.MarkEvidenceAsBroadcasted(evidence)
|
||||||
|
@ -106,7 +106,7 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block
|
|||||||
|
|
||||||
fail.Fail() // XXX
|
fail.Fail() // XXX
|
||||||
|
|
||||||
// Update evpool now that state is saved
|
// Update evpool now that state is saved.
|
||||||
// TODO: handle the crash/recover scenario
|
// TODO: handle the crash/recover scenario
|
||||||
// ie. (may need to call Update for last block)
|
// ie. (may need to call Update for last block)
|
||||||
blockExec.evpool.Update(block)
|
blockExec.evpool.Update(block)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user