mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 14:52:17 +00:00
parent
a655500047
commit
86ddf17db0
@ -46,9 +46,9 @@ func TestByzantine(t *testing.T) {
|
|||||||
eventChans := make([]chan interface{}, N)
|
eventChans := make([]chan interface{}, N)
|
||||||
reactors := make([]p2p.Reactor, N)
|
reactors := make([]p2p.Reactor, N)
|
||||||
for i := 0; i < N; i++ {
|
for i := 0; i < N; i++ {
|
||||||
|
// make first val byzantine
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator)
|
css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator)
|
||||||
// make byzantine
|
|
||||||
css[i].decideProposal = func(j int) func(int64, int) {
|
css[i].decideProposal = func(j int) func(int64, int) {
|
||||||
return func(height int64, round int) {
|
return func(height int64, round int) {
|
||||||
byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
|
byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
|
||||||
@ -74,9 +74,11 @@ func TestByzantine(t *testing.T) {
|
|||||||
var conRI p2p.Reactor // nolint: gotype, gosimple
|
var conRI p2p.Reactor // nolint: gotype, gosimple
|
||||||
conRI = conR
|
conRI = conR
|
||||||
|
|
||||||
|
// make first val byzantine
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
conRI = NewByzantineReactor(conR)
|
conRI = NewByzantineReactor(conR)
|
||||||
}
|
}
|
||||||
|
|
||||||
reactors[i] = conRI
|
reactors[i] = conRI
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -115,19 +117,19 @@ func TestByzantine(t *testing.T) {
|
|||||||
// and the other block to peers[1] and peers[2].
|
// and the other block to peers[1] and peers[2].
|
||||||
// note peers and switches order don't match.
|
// note peers and switches order don't match.
|
||||||
peers := switches[0].Peers().List()
|
peers := switches[0].Peers().List()
|
||||||
|
|
||||||
|
// partition A
|
||||||
ind0 := getSwitchIndex(switches, peers[0])
|
ind0 := getSwitchIndex(switches, peers[0])
|
||||||
|
|
||||||
|
// partition B
|
||||||
ind1 := getSwitchIndex(switches, peers[1])
|
ind1 := getSwitchIndex(switches, peers[1])
|
||||||
ind2 := getSwitchIndex(switches, peers[2])
|
ind2 := getSwitchIndex(switches, peers[2])
|
||||||
|
|
||||||
// connect the 2 peers in the larger partition
|
|
||||||
p2p.Connect2Switches(switches, ind1, ind2)
|
p2p.Connect2Switches(switches, ind1, ind2)
|
||||||
|
|
||||||
// wait for someone in the big partition to make a block
|
// wait for someone in the big partition (B) to make a block
|
||||||
<-eventChans[ind2]
|
<-eventChans[ind2]
|
||||||
|
|
||||||
t.Log("A block has been committed. Healing partition")
|
t.Log("A block has been committed. Healing partition")
|
||||||
|
|
||||||
// connect the partitions
|
|
||||||
p2p.Connect2Switches(switches, ind0, ind1)
|
p2p.Connect2Switches(switches, ind0, ind1)
|
||||||
p2p.Connect2Switches(switches, ind0, ind2)
|
p2p.Connect2Switches(switches, ind0, ind2)
|
||||||
|
|
||||||
|
@ -64,11 +64,6 @@ func (conR *ConsensusReactor) OnStart() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = conR.startPeerErrorsRoutine()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !conR.FastSync() {
|
if !conR.FastSync() {
|
||||||
err := conR.conS.Start()
|
err := conR.conS.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -345,39 +340,6 @@ func (conR *ConsensusReactor) FastSync() bool {
|
|||||||
|
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
|
|
||||||
// startPeerErrorsRoutine spawns a new gororoutine listening for errors from
|
|
||||||
// consensus/state or other consensus modules.
|
|
||||||
func (conR *ConsensusReactor) startPeerErrorsRoutine() error {
|
|
||||||
const subscriber = "consensus-reactor"
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
errorsCh := make(chan interface{})
|
|
||||||
err := conR.eventBus.Subscribe(ctx, subscriber, types.QueryForEvent(peerErrorEvent), errorsCh)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, peerErrorEvent)
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case data, ok := <-errorsCh:
|
|
||||||
if ok {
|
|
||||||
pErr := data.(types.TMEventData).Unwrap().(peerError)
|
|
||||||
peer := conR.Switch.Peers().Get(pErr.peerID)
|
|
||||||
if peer != nil {
|
|
||||||
conR.Switch.StopPeerForError(peer, pErr.err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case <-conR.Quit():
|
|
||||||
conR.eventBus.UnsubscribeAll(ctx, subscriber)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// startBroadcastRoutine subscribes for new round steps, votes and proposal
|
// startBroadcastRoutine subscribes for new round steps, votes and proposal
|
||||||
// heartbeats using the event bus and starts a go routine to broadcasts events
|
// heartbeats using the event bus and starts a go routine to broadcasts events
|
||||||
// to peers upon receiving them.
|
// to peers upon receiving them.
|
||||||
|
@ -63,19 +63,6 @@ func (ti *timeoutInfo) String() string {
|
|||||||
return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step)
|
return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step)
|
||||||
}
|
}
|
||||||
|
|
||||||
type peerError struct {
|
|
||||||
err error
|
|
||||||
peerID p2p.ID
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e peerError) Error() string {
|
|
||||||
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
|
||||||
peerErrorEvent = "cs.PeerError"
|
|
||||||
)
|
|
||||||
|
|
||||||
// ConsensusState handles execution of the consensus algorithm.
|
// ConsensusState handles execution of the consensus algorithm.
|
||||||
// It processes votes and proposals, and upon reaching agreement,
|
// It processes votes and proposals, and upon reaching agreement,
|
||||||
// commits blocks to the chain and executes them against the application.
|
// commits blocks to the chain and executes them against the application.
|
||||||
@ -596,8 +583,10 @@ func (cs *ConsensusState) handleMsg(mi msgInfo) {
|
|||||||
err := cs.tryAddVote(msg.Vote, peerID)
|
err := cs.tryAddVote(msg.Vote, peerID)
|
||||||
if err == ErrAddingVote {
|
if err == ErrAddingVote {
|
||||||
// TODO: punish peer
|
// TODO: punish peer
|
||||||
// breaks byzantine_test
|
// We probably don't want to stop the peer here. The vote does not
|
||||||
// cs.eventBus.Publish(peerErrorEvent, types.TMEventData{peerError{err, peerID}})
|
// necessarily comes from a malicious peer but can be just broadcasted by
|
||||||
|
// a typical peer.
|
||||||
|
// https://github.com/tendermint/tendermint/issues/1281
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: the vote is broadcast to peers by the reactor listening
|
// NOTE: the vote is broadcast to peers by the reactor listening
|
||||||
|
Loading…
x
Reference in New Issue
Block a user