mirror of
https://github.com/fluencelabs/tendermint
synced 2025-05-29 14:11:21 +00:00
Merge pull request #591 from tendermint/heartbeat
broadcast proposer heartbeat msg
This commit is contained in:
commit
043c6018b4
@ -293,6 +293,15 @@ func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *ty
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (privVal *ByzantinePrivValidator) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
|
||||||
|
privVal.mtx.Lock()
|
||||||
|
defer privVal.mtx.Unlock()
|
||||||
|
|
||||||
|
// Sign
|
||||||
|
heartbeat.Signature = privVal.Sign(types.SignBytes(chainID, heartbeat))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (privVal *ByzantinePrivValidator) String() string {
|
func (privVal *ByzantinePrivValidator) String() string {
|
||||||
return Fmt("PrivValidator{%X}", privVal.Address)
|
return Fmt("PrivValidator{%X}", privVal.Address)
|
||||||
}
|
}
|
||||||
|
@ -311,6 +311,16 @@ func (conR *ConsensusReactor) registerEventCallbacks() {
|
|||||||
edv := data.Unwrap().(types.EventDataVote)
|
edv := data.Unwrap().(types.EventDataVote)
|
||||||
conR.broadcastHasVoteMessage(edv.Vote)
|
conR.broadcastHasVoteMessage(edv.Vote)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringProposalHeartbeat(), func(data types.TMEventData) {
|
||||||
|
heartbeat := data.Unwrap().(types.EventDataProposalHeartbeat)
|
||||||
|
conR.broadcastProposalHeartbeatMessage(heartbeat)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) {
|
||||||
|
msg := &ProposalHeartbeatMessage{heartbeat.Heartbeat}
|
||||||
|
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) {
|
func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) {
|
||||||
@ -1305,3 +1315,15 @@ type VoteSetBitsMessage struct {
|
|||||||
func (m *VoteSetBitsMessage) String() string {
|
func (m *VoteSetBitsMessage) String() string {
|
||||||
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
|
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//-------------------------------------
|
||||||
|
|
||||||
|
// ProposalHeartbeatMessage is sent to signal that a node is alive and waiting for transactions for a proposal.
|
||||||
|
type ProposalHeartbeatMessage struct {
|
||||||
|
Heartbeat *types.Heartbeat
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns a string representation.
|
||||||
|
func (m *ProposalHeartbeatMessage) String() string {
|
||||||
|
return fmt.Sprintf("[HEARTBEAT %v]", m.Heartbeat)
|
||||||
|
}
|
||||||
|
@ -181,6 +181,7 @@ type PrivValidator interface {
|
|||||||
GetAddress() []byte
|
GetAddress() []byte
|
||||||
SignVote(chainID string, vote *types.Vote) error
|
SignVote(chainID string, vote *types.Vote) error
|
||||||
SignProposal(chainID string, proposal *types.Proposal) error
|
SignProposal(chainID string, proposal *types.Proposal) error
|
||||||
|
SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConsensusState handles execution of the consensus algorithm.
|
// ConsensusState handles execution of the consensus algorithm.
|
||||||
@ -787,7 +788,7 @@ func (cs *ConsensusState) enterNewRound(height int, round int) {
|
|||||||
// we may need an empty "proof" block, and enterPropose immediately.
|
// we may need an empty "proof" block, and enterPropose immediately.
|
||||||
waitForTxs := cs.config.NoEmptyBlocks && round == 0 && !cs.needProofBlock(height)
|
waitForTxs := cs.config.NoEmptyBlocks && round == 0 && !cs.needProofBlock(height)
|
||||||
if waitForTxs {
|
if waitForTxs {
|
||||||
go cs.proposalHeartbeat()
|
go cs.proposalHeartbeat(height, round)
|
||||||
} else {
|
} else {
|
||||||
cs.enterPropose(height, round)
|
cs.enterPropose(height, round)
|
||||||
}
|
}
|
||||||
@ -807,14 +808,32 @@ func (cs *ConsensusState) needProofBlock(height int) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ConsensusState) proposalHeartbeat() {
|
func (cs *ConsensusState) proposalHeartbeat(height, round int) {
|
||||||
|
counter := 0
|
||||||
|
addr := cs.privValidator.GetAddress()
|
||||||
|
valIndex, v := cs.Validators.GetByAddress(addr)
|
||||||
|
if v == nil {
|
||||||
|
// not a validator
|
||||||
|
valIndex = -1
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
select {
|
rs := cs.GetRoundState()
|
||||||
default:
|
// if we've already moved on, no need to send more heartbeats
|
||||||
// TODO: broadcast heartbeat
|
if rs.Step > RoundStepNewRound || rs.Round > round || rs.Height > height {
|
||||||
|
return
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
}
|
||||||
|
heartbeat := &types.Heartbeat{
|
||||||
|
Height: rs.Height,
|
||||||
|
Round: rs.Round,
|
||||||
|
Sequence: counter,
|
||||||
|
ValidatorAddress: addr,
|
||||||
|
ValidatorIndex: valIndex,
|
||||||
|
}
|
||||||
|
cs.privValidator.SignHeartbeat(cs.state.ChainID, heartbeat)
|
||||||
|
heartbeatEvent := types.EventDataProposalHeartbeat{heartbeat}
|
||||||
|
types.FireEventProposalHeartbeat(cs.evsw, heartbeatEvent)
|
||||||
|
counter += 1
|
||||||
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,6 +31,14 @@ type CanonicalJSONVote struct {
|
|||||||
Type byte `json:"type"`
|
Type byte `json:"type"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CanonicalJSONHeartbeat struct {
|
||||||
|
Height int `json:"height"`
|
||||||
|
Round int `json:"round"`
|
||||||
|
Sequence int `json:"sequence"`
|
||||||
|
ValidatorAddress data.Bytes `json:"validator_address"`
|
||||||
|
ValidatorIndex int `json:"validator_index"`
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------
|
//------------------------------------
|
||||||
// Messages including a "chain id" can only be applied to one chain, hence "Once"
|
// Messages including a "chain id" can only be applied to one chain, hence "Once"
|
||||||
|
|
||||||
@ -44,6 +52,11 @@ type CanonicalJSONOnceVote struct {
|
|||||||
Vote CanonicalJSONVote `json:"vote"`
|
Vote CanonicalJSONVote `json:"vote"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type CanonicalJSONOnceHeartbeat struct {
|
||||||
|
ChainID string `json:"chain_id"`
|
||||||
|
Heartbeat CanonicalJSONHeartbeat `json:"heartbeat"`
|
||||||
|
}
|
||||||
|
|
||||||
//-----------------------------------
|
//-----------------------------------
|
||||||
// Canonicalize the structs
|
// Canonicalize the structs
|
||||||
|
|
||||||
@ -79,3 +92,13 @@ func CanonicalVote(vote *Vote) CanonicalJSONVote {
|
|||||||
vote.Type,
|
vote.Type,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func CanonicalHeartbeat(heartbeat *Heartbeat) CanonicalJSONHeartbeat {
|
||||||
|
return CanonicalJSONHeartbeat{
|
||||||
|
heartbeat.Height,
|
||||||
|
heartbeat.Round,
|
||||||
|
heartbeat.Sequence,
|
||||||
|
heartbeat.ValidatorAddress,
|
||||||
|
heartbeat.ValidatorIndex,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -31,6 +31,8 @@ func EventStringRelock() string { return "Relock" }
|
|||||||
func EventStringTimeoutWait() string { return "TimeoutWait" }
|
func EventStringTimeoutWait() string { return "TimeoutWait" }
|
||||||
func EventStringVote() string { return "Vote" }
|
func EventStringVote() string { return "Vote" }
|
||||||
|
|
||||||
|
func EventStringProposalHeartbeat() string { return "ProposalHeartbeat" }
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -39,6 +41,8 @@ var (
|
|||||||
EventDataNameTx = "tx"
|
EventDataNameTx = "tx"
|
||||||
EventDataNameRoundState = "round_state"
|
EventDataNameRoundState = "round_state"
|
||||||
EventDataNameVote = "vote"
|
EventDataNameVote = "vote"
|
||||||
|
|
||||||
|
EventDataNameProposalHeartbeat = "proposer_heartbeat"
|
||||||
)
|
)
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
@ -84,6 +88,8 @@ const (
|
|||||||
|
|
||||||
EventDataTypeRoundState = byte(0x11)
|
EventDataTypeRoundState = byte(0x11)
|
||||||
EventDataTypeVote = byte(0x12)
|
EventDataTypeVote = byte(0x12)
|
||||||
|
|
||||||
|
EventDataTypeProposalHeartbeat = byte(0x20)
|
||||||
)
|
)
|
||||||
|
|
||||||
var tmEventDataMapper = data.NewMapper(TMEventData{}).
|
var tmEventDataMapper = data.NewMapper(TMEventData{}).
|
||||||
@ -91,7 +97,8 @@ var tmEventDataMapper = data.NewMapper(TMEventData{}).
|
|||||||
RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader).
|
RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader).
|
||||||
RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx).
|
RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx).
|
||||||
RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState).
|
RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState).
|
||||||
RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote)
|
RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote).
|
||||||
|
RegisterImplementation(EventDataProposalHeartbeat{}, EventDataNameProposalHeartbeat, EventDataTypeProposalHeartbeat)
|
||||||
|
|
||||||
// Most event messages are basic types (a block, a transaction)
|
// Most event messages are basic types (a block, a transaction)
|
||||||
// but some (an input to a call tx or a receive) are more exotic
|
// but some (an input to a call tx or a receive) are more exotic
|
||||||
@ -115,6 +122,10 @@ type EventDataTx struct {
|
|||||||
Error string `json:"error"` // this is redundant information for now
|
Error string `json:"error"` // this is redundant information for now
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type EventDataProposalHeartbeat struct {
|
||||||
|
Heartbeat *Heartbeat
|
||||||
|
}
|
||||||
|
|
||||||
// NOTE: This goes into the replay WAL
|
// NOTE: This goes into the replay WAL
|
||||||
type EventDataRoundState struct {
|
type EventDataRoundState struct {
|
||||||
Height int `json:"height"`
|
Height int `json:"height"`
|
||||||
@ -135,6 +146,8 @@ func (_ EventDataTx) AssertIsTMEventData() {}
|
|||||||
func (_ EventDataRoundState) AssertIsTMEventData() {}
|
func (_ EventDataRoundState) AssertIsTMEventData() {}
|
||||||
func (_ EventDataVote) AssertIsTMEventData() {}
|
func (_ EventDataVote) AssertIsTMEventData() {}
|
||||||
|
|
||||||
|
func (_ EventDataProposalHeartbeat) AssertIsTMEventData() {}
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
// Wrappers for type safety
|
// Wrappers for type safety
|
||||||
|
|
||||||
@ -232,3 +245,7 @@ func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) {
|
|||||||
func FireEventLock(fireable events.Fireable, rs EventDataRoundState) {
|
func FireEventLock(fireable events.Fireable, rs EventDataRoundState) {
|
||||||
fireEvent(fireable, EventStringLock(), TMEventData{rs})
|
fireEvent(fireable, EventStringLock(), TMEventData{rs})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func FireEventProposalHeartbeat(fireable events.Fireable, rs EventDataProposalHeartbeat) {
|
||||||
|
fireEvent(fireable, EventStringProposalHeartbeat(), TMEventData{rs})
|
||||||
|
}
|
||||||
|
42
types/heartbeat.go
Normal file
42
types/heartbeat.go
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
package types
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/tendermint/go-crypto"
|
||||||
|
"github.com/tendermint/go-wire"
|
||||||
|
"github.com/tendermint/go-wire/data"
|
||||||
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Heartbeat struct {
|
||||||
|
ValidatorAddress data.Bytes `json:"validator_address"`
|
||||||
|
ValidatorIndex int `json:"validator_index"`
|
||||||
|
Height int `json:"height"`
|
||||||
|
Round int `json:"round"`
|
||||||
|
Sequence int `json:"sequence"`
|
||||||
|
Signature crypto.Signature `json:"signature"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (heartbeat *Heartbeat) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) {
|
||||||
|
wire.WriteJSON(CanonicalJSONOnceHeartbeat{
|
||||||
|
chainID,
|
||||||
|
CanonicalHeartbeat(heartbeat),
|
||||||
|
}, w, n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (heartbeat *Heartbeat) Copy() *Heartbeat {
|
||||||
|
heartbeatCopy := *heartbeat
|
||||||
|
return &heartbeatCopy
|
||||||
|
}
|
||||||
|
|
||||||
|
func (heartbeat *Heartbeat) String() string {
|
||||||
|
if heartbeat == nil {
|
||||||
|
return "nil-heartbeat"
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf("Heartbeat{%v:%X %v/%02d (%v) %v}",
|
||||||
|
heartbeat.ValidatorIndex, cmn.Fingerprint(heartbeat.ValidatorAddress),
|
||||||
|
heartbeat.Height, heartbeat.Round, heartbeat.Sequence, heartbeat.Signature)
|
||||||
|
}
|
@ -252,6 +252,13 @@ func (privVal *PrivValidator) signBytesHRS(height, round int, step int8, signByt
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (privVal *PrivValidator) SignHeartbeat(chainID string, heartbeat *Heartbeat) error {
|
||||||
|
privVal.mtx.Lock()
|
||||||
|
defer privVal.mtx.Unlock()
|
||||||
|
heartbeat.Signature = privVal.Sign(SignBytes(chainID, heartbeat))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (privVal *PrivValidator) String() string {
|
func (privVal *PrivValidator) String() string {
|
||||||
return fmt.Sprintf("PrivValidator{%v LH:%v, LR:%v, LS:%v}", privVal.Address, privVal.LastHeight, privVal.LastRound, privVal.LastStep)
|
return fmt.Sprintf("PrivValidator{%v LH:%v, LR:%v, LS:%v}", privVal.Address, privVal.LastHeight, privVal.LastRound, privVal.LastStep)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user