mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-24 14:22:16 +00:00
2871 remove proposalHeartbeat infrastructure (#2874)
* 2871 remove proposalHeartbeat infrastructure * 2871 add preliminary changelog entry
This commit is contained in:
parent
416d143bf7
commit
e291fbbebe
@ -26,5 +26,6 @@ program](https://hackerone.com/tendermint).
|
||||
### FEATURES:
|
||||
|
||||
### IMPROVEMENTS:
|
||||
- [consensus] [\#2871](https://github.com/tendermint/tendermint/issues/2871) Remove *ProposalHeartbeat* infrastructure as it serves no real purpose
|
||||
|
||||
### BUG FIXES:
|
||||
|
@ -425,20 +425,6 @@ func ensureNewRound(roundCh <-chan interface{}, height int64, round int) {
|
||||
}
|
||||
}
|
||||
|
||||
func ensureProposalHeartbeat(heartbeatCh <-chan interface{}) {
|
||||
select {
|
||||
case <-time.After(ensureTimeout):
|
||||
panic("Timeout expired while waiting for ProposalHeartbeat event")
|
||||
case ev := <-heartbeatCh:
|
||||
heartbeat, ok := ev.(types.EventDataProposalHeartbeat)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("expected a *types.EventDataProposalHeartbeat, "+
|
||||
"got %v. wrong subscription channel?",
|
||||
reflect.TypeOf(heartbeat)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) {
|
||||
timeoutDuration := time.Duration(timeout*3) * time.Nanosecond
|
||||
ensureNewEvent(timeoutCh, height, round, timeoutDuration,
|
||||
|
@ -8,7 +8,7 @@ import (
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
amino "github.com/tendermint/go-amino"
|
||||
"github.com/tendermint/go-amino"
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
tmevents "github.com/tendermint/tendermint/libs/events"
|
||||
@ -264,11 +264,6 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
|
||||
BlockID: msg.BlockID,
|
||||
Votes: ourVotes,
|
||||
}))
|
||||
case *ProposalHeartbeatMessage:
|
||||
hb := msg.Heartbeat
|
||||
conR.Logger.Debug("Received proposal heartbeat message",
|
||||
"height", hb.Height, "round", hb.Round, "sequence", hb.Sequence,
|
||||
"valIdx", hb.ValidatorIndex, "valAddr", hb.ValidatorAddress)
|
||||
default:
|
||||
conR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
|
||||
}
|
||||
@ -369,8 +364,8 @@ func (conR *ConsensusReactor) FastSync() bool {
|
||||
|
||||
//--------------------------------------
|
||||
|
||||
// subscribeToBroadcastEvents subscribes for new round steps, votes and
|
||||
// proposal heartbeats using internal pubsub defined on state to broadcast
|
||||
// subscribeToBroadcastEvents subscribes for new round steps and votes
|
||||
// using internal pubsub defined on state to broadcast
|
||||
// them to peers upon receiving.
|
||||
func (conR *ConsensusReactor) subscribeToBroadcastEvents() {
|
||||
const subscriber = "consensus-reactor"
|
||||
@ -389,10 +384,6 @@ func (conR *ConsensusReactor) subscribeToBroadcastEvents() {
|
||||
conR.broadcastHasVoteMessage(data.(*types.Vote))
|
||||
})
|
||||
|
||||
conR.conS.evsw.AddListenerForEvent(subscriber, types.EventProposalHeartbeat,
|
||||
func(data tmevents.EventData) {
|
||||
conR.broadcastProposalHeartbeatMessage(data.(*types.Heartbeat))
|
||||
})
|
||||
}
|
||||
|
||||
func (conR *ConsensusReactor) unsubscribeFromBroadcastEvents() {
|
||||
@ -400,13 +391,6 @@ func (conR *ConsensusReactor) unsubscribeFromBroadcastEvents() {
|
||||
conR.conS.evsw.RemoveListener(subscriber)
|
||||
}
|
||||
|
||||
func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(hb *types.Heartbeat) {
|
||||
conR.Logger.Debug("Broadcasting proposal heartbeat message",
|
||||
"height", hb.Height, "round", hb.Round, "sequence", hb.Sequence, "address", hb.ValidatorAddress)
|
||||
msg := &ProposalHeartbeatMessage{hb}
|
||||
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(msg))
|
||||
}
|
||||
|
||||
func (conR *ConsensusReactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
|
||||
nrsMsg := makeRoundStepMessage(rs)
|
||||
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))
|
||||
@ -1387,7 +1371,6 @@ func RegisterConsensusMessages(cdc *amino.Codec) {
|
||||
cdc.RegisterConcrete(&HasVoteMessage{}, "tendermint/HasVote", nil)
|
||||
cdc.RegisterConcrete(&VoteSetMaj23Message{}, "tendermint/VoteSetMaj23", nil)
|
||||
cdc.RegisterConcrete(&VoteSetBitsMessage{}, "tendermint/VoteSetBits", nil)
|
||||
cdc.RegisterConcrete(&ProposalHeartbeatMessage{}, "tendermint/ProposalHeartbeat", nil)
|
||||
}
|
||||
|
||||
func decodeMsg(bz []byte) (msg ConsensusMessage, err error) {
|
||||
@ -1664,18 +1647,3 @@ func (m *VoteSetBitsMessage) String() string {
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
|
||||
// ProposalHeartbeatMessage is sent to signal that a node is alive and waiting for transactions for a proposal.
|
||||
type ProposalHeartbeatMessage struct {
|
||||
Heartbeat *types.Heartbeat
|
||||
}
|
||||
|
||||
// ValidateBasic performs basic validation.
|
||||
func (m *ProposalHeartbeatMessage) ValidateBasic() error {
|
||||
return m.Heartbeat.ValidateBasic()
|
||||
}
|
||||
|
||||
// String returns a string representation.
|
||||
func (m *ProposalHeartbeatMessage) String() string {
|
||||
return fmt.Sprintf("[HEARTBEAT %v]", m.Heartbeat)
|
||||
}
|
||||
|
@ -213,8 +213,8 @@ func (m *mockEvidencePool) Update(block *types.Block, state sm.State) {
|
||||
|
||||
//------------------------------------
|
||||
|
||||
// Ensure a testnet sends proposal heartbeats and makes blocks when there are txs
|
||||
func TestReactorProposalHeartbeats(t *testing.T) {
|
||||
// Ensure a testnet makes blocks when there are txs
|
||||
func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
|
||||
N := 4
|
||||
css := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter,
|
||||
func(c *cfg.Config) {
|
||||
@ -222,17 +222,6 @@ func TestReactorProposalHeartbeats(t *testing.T) {
|
||||
})
|
||||
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
|
||||
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||
heartbeatChans := make([]chan interface{}, N)
|
||||
var err error
|
||||
for i := 0; i < N; i++ {
|
||||
heartbeatChans[i] = make(chan interface{}, 1)
|
||||
err = eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryProposalHeartbeat, heartbeatChans[i])
|
||||
require.NoError(t, err)
|
||||
}
|
||||
// wait till everyone sends a proposal heartbeat
|
||||
timeoutWaitGroup(t, N, func(j int) {
|
||||
<-heartbeatChans[j]
|
||||
}, css)
|
||||
|
||||
// send a tx
|
||||
if err := css[3].mempool.CheckTx([]byte{1, 2, 3}, nil); err != nil {
|
||||
|
@ -22,13 +22,6 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Config
|
||||
|
||||
const (
|
||||
proposalHeartbeatIntervalSeconds = 2
|
||||
)
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Errors
|
||||
|
||||
@ -118,7 +111,7 @@ type ConsensusState struct {
|
||||
done chan struct{}
|
||||
|
||||
// synchronous pubsub between consensus state and reactor.
|
||||
// state only emits EventNewRoundStep, EventVote and EventProposalHeartbeat
|
||||
// state only emits EventNewRoundStep and EventVote
|
||||
evsw tmevents.EventSwitch
|
||||
|
||||
// for reporting metrics
|
||||
@ -785,7 +778,6 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
|
||||
cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round,
|
||||
cstypes.RoundStepNewRound)
|
||||
}
|
||||
go cs.proposalHeartbeat(height, round)
|
||||
} else {
|
||||
cs.enterPropose(height, round)
|
||||
}
|
||||
@ -802,38 +794,6 @@ func (cs *ConsensusState) needProofBlock(height int64) bool {
|
||||
return !bytes.Equal(cs.state.AppHash, lastBlockMeta.Header.AppHash)
|
||||
}
|
||||
|
||||
func (cs *ConsensusState) proposalHeartbeat(height int64, round int) {
|
||||
logger := cs.Logger.With("height", height, "round", round)
|
||||
addr := cs.privValidator.GetAddress()
|
||||
|
||||
if !cs.Validators.HasAddress(addr) {
|
||||
logger.Debug("Not sending proposalHearbeat. This node is not a validator", "addr", addr, "vals", cs.Validators)
|
||||
return
|
||||
}
|
||||
counter := 0
|
||||
valIndex, _ := cs.Validators.GetByAddress(addr)
|
||||
chainID := cs.state.ChainID
|
||||
for {
|
||||
rs := cs.GetRoundState()
|
||||
// if we've already moved on, no need to send more heartbeats
|
||||
if rs.Step > cstypes.RoundStepNewRound || rs.Round > round || rs.Height > height {
|
||||
return
|
||||
}
|
||||
heartbeat := &types.Heartbeat{
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
Sequence: counter,
|
||||
ValidatorAddress: addr,
|
||||
ValidatorIndex: valIndex,
|
||||
}
|
||||
cs.privValidator.SignHeartbeat(chainID, heartbeat)
|
||||
cs.eventBus.PublishEventProposalHeartbeat(types.EventDataProposalHeartbeat{heartbeat})
|
||||
cs.evsw.FireEvent(types.EventProposalHeartbeat, heartbeat)
|
||||
counter++
|
||||
time.Sleep(proposalHeartbeatIntervalSeconds * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// Enter (CreateEmptyBlocks): from enterNewRound(height,round)
|
||||
// Enter (CreateEmptyBlocks, CreateEmptyBlocksInterval > 0 ): after enterNewRound(height,round), after timeout of CreateEmptyBlocksInterval
|
||||
// Enter (!CreateEmptyBlocks) : after enterNewRound(height,round), once txs are in the mempool
|
||||
|
@ -11,8 +11,6 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
tmevents "github.com/tendermint/tendermint/libs/events"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
@ -1029,33 +1027,6 @@ func TestSetValidBlockOnDelayedPrevote(t *testing.T) {
|
||||
assert.True(t, rs.ValidRound == round)
|
||||
}
|
||||
|
||||
// regression for #2518
|
||||
func TestNoHearbeatWhenNotValidator(t *testing.T) {
|
||||
cs, _ := randConsensusState(4)
|
||||
cs.Validators = types.NewValidatorSet(nil) // make sure we are not in the validator set
|
||||
|
||||
cs.evsw.AddListenerForEvent("testing", types.EventProposalHeartbeat,
|
||||
func(data tmevents.EventData) {
|
||||
t.Errorf("Should not have broadcasted heartbeat")
|
||||
})
|
||||
go cs.proposalHeartbeat(10, 1)
|
||||
|
||||
cs.Stop()
|
||||
|
||||
// if a faulty implementation sends an event, we should wait here a little bit to make sure we don't miss it by prematurely leaving the test method
|
||||
time.Sleep((proposalHeartbeatIntervalSeconds + 1) * time.Second)
|
||||
}
|
||||
|
||||
// regression for #2518
|
||||
func TestHearbeatWhenWeAreValidator(t *testing.T) {
|
||||
cs, _ := randConsensusState(4)
|
||||
heartbeatCh := subscribe(cs.eventBus, types.EventQueryProposalHeartbeat)
|
||||
|
||||
go cs.proposalHeartbeat(10, 1)
|
||||
ensureProposalHeartbeat(heartbeatCh)
|
||||
|
||||
}
|
||||
|
||||
// What we want:
|
||||
// P0 miss to lock B as Proposal Block is missing, but set valid block to B after
|
||||
// receiving delayed Block Proposal.
|
||||
|
@ -338,12 +338,11 @@ BlockID has seen +2/3 votes. This routine is based on the local RoundState (`rs`
|
||||
|
||||
## Broadcast routine
|
||||
|
||||
The Broadcast routine subscribes to an internal event bus to receive new round steps, votes messages and proposal
|
||||
heartbeat messages, and broadcasts messages to peers upon receiving those events.
|
||||
The Broadcast routine subscribes to an internal event bus to receive new round steps and votes messages, and broadcasts messages to peers upon receiving those
|
||||
events.
|
||||
It broadcasts `NewRoundStepMessage` or `CommitStepMessage` upon new round state event. Note that
|
||||
broadcasting these messages does not depend on the PeerRoundState; it is sent on the StateChannel.
|
||||
Upon receiving VoteMessage it broadcasts `HasVoteMessage` message to its peers on the StateChannel.
|
||||
`ProposalHeartbeatMessage` is sent the same way on the StateChannel.
|
||||
|
||||
## Channels
|
||||
|
||||
|
@ -89,33 +89,6 @@ type BlockPartMessage struct {
|
||||
}
|
||||
```
|
||||
|
||||
## ProposalHeartbeatMessage
|
||||
|
||||
ProposalHeartbeatMessage is sent to signal that a node is alive and waiting for transactions
|
||||
to be able to create a next block proposal.
|
||||
|
||||
```go
|
||||
type ProposalHeartbeatMessage struct {
|
||||
Heartbeat Heartbeat
|
||||
}
|
||||
```
|
||||
|
||||
### Heartbeat
|
||||
|
||||
Heartbeat contains validator information (address and index),
|
||||
height, round and sequence number. It is signed by the private key of the validator.
|
||||
|
||||
```go
|
||||
type Heartbeat struct {
|
||||
ValidatorAddress []byte
|
||||
ValidatorIndex int
|
||||
Height int64
|
||||
Round int
|
||||
Sequence int
|
||||
Signature Signature
|
||||
}
|
||||
```
|
||||
|
||||
## NewRoundStepMessage
|
||||
|
||||
NewRoundStepMessage is sent for every step transition during the core consensus algorithm execution.
|
||||
|
@ -290,19 +290,6 @@ func (pv *FilePV) saveSigned(height int64, round int, step int8,
|
||||
pv.save()
|
||||
}
|
||||
|
||||
// SignHeartbeat signs a canonical representation of the heartbeat, along with the chainID.
|
||||
// Implements PrivValidator.
|
||||
func (pv *FilePV) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
|
||||
pv.mtx.Lock()
|
||||
defer pv.mtx.Unlock()
|
||||
sig, err := pv.PrivKey.Sign(heartbeat.SignBytes(chainID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
heartbeat.Signature = sig
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a string representation of the FilePV.
|
||||
func (pv *FilePV) String() string {
|
||||
return fmt.Sprintf("PrivValidator{%v LH:%v, LR:%v, LS:%v}", pv.GetAddress(), pv.LastHeight, pv.LastRound, pv.LastStep)
|
||||
|
@ -125,35 +125,6 @@ func (sc *RemoteSignerClient) SignProposal(
|
||||
return nil
|
||||
}
|
||||
|
||||
// SignHeartbeat implements PrivValidator.
|
||||
func (sc *RemoteSignerClient) SignHeartbeat(
|
||||
chainID string,
|
||||
heartbeat *types.Heartbeat,
|
||||
) error {
|
||||
sc.lock.Lock()
|
||||
defer sc.lock.Unlock()
|
||||
|
||||
err := writeMsg(sc.conn, &SignHeartbeatRequest{Heartbeat: heartbeat})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := readMsg(sc.conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, ok := res.(*SignedHeartbeatResponse)
|
||||
if !ok {
|
||||
return ErrUnexpectedResponse
|
||||
}
|
||||
if resp.Error != nil {
|
||||
return resp.Error
|
||||
}
|
||||
*heartbeat = *resp.Heartbeat
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ping is used to check connection health.
|
||||
func (sc *RemoteSignerClient) Ping() error {
|
||||
sc.lock.Lock()
|
||||
@ -186,8 +157,6 @@ func RegisterRemoteSignerMsg(cdc *amino.Codec) {
|
||||
cdc.RegisterConcrete(&SignedVoteResponse{}, "tendermint/remotesigner/SignedVoteResponse", nil)
|
||||
cdc.RegisterConcrete(&SignProposalRequest{}, "tendermint/remotesigner/SignProposalRequest", nil)
|
||||
cdc.RegisterConcrete(&SignedProposalResponse{}, "tendermint/remotesigner/SignedProposalResponse", nil)
|
||||
cdc.RegisterConcrete(&SignHeartbeatRequest{}, "tendermint/remotesigner/SignHeartbeatRequest", nil)
|
||||
cdc.RegisterConcrete(&SignedHeartbeatResponse{}, "tendermint/remotesigner/SignedHeartbeatResponse", nil)
|
||||
cdc.RegisterConcrete(&PingRequest{}, "tendermint/remotesigner/PingRequest", nil)
|
||||
cdc.RegisterConcrete(&PingResponse{}, "tendermint/remotesigner/PingResponse", nil)
|
||||
}
|
||||
@ -218,16 +187,6 @@ type SignedProposalResponse struct {
|
||||
Error *RemoteSignerError
|
||||
}
|
||||
|
||||
// SignHeartbeatRequest is a PrivValidatorSocket message containing a Heartbeat.
|
||||
type SignHeartbeatRequest struct {
|
||||
Heartbeat *types.Heartbeat
|
||||
}
|
||||
|
||||
type SignedHeartbeatResponse struct {
|
||||
Heartbeat *types.Heartbeat
|
||||
Error *RemoteSignerError
|
||||
}
|
||||
|
||||
// PingRequest is a PrivValidatorSocket message to keep the connection alive.
|
||||
type PingRequest struct {
|
||||
}
|
||||
@ -286,13 +245,6 @@ func handleRequest(req RemoteSignerMsg, chainID string, privVal types.PrivValida
|
||||
} else {
|
||||
res = &SignedProposalResponse{r.Proposal, nil}
|
||||
}
|
||||
case *SignHeartbeatRequest:
|
||||
err = privVal.SignHeartbeat(chainID, r.Heartbeat)
|
||||
if err != nil {
|
||||
res = &SignedHeartbeatResponse{nil, &RemoteSignerError{0, err.Error()}}
|
||||
} else {
|
||||
res = &SignedHeartbeatResponse{r.Heartbeat, nil}
|
||||
}
|
||||
case *PingRequest:
|
||||
res = &PingResponse{}
|
||||
default:
|
||||
|
@ -137,22 +137,6 @@ func TestSocketPVVoteKeepalive(t *testing.T) {
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
|
||||
func TestSocketPVHeartbeat(t *testing.T) {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
sc, rs = testSetupSocketPair(t, chainID, types.NewMockPV())
|
||||
|
||||
want = &types.Heartbeat{}
|
||||
have = &types.Heartbeat{}
|
||||
)
|
||||
defer sc.Stop()
|
||||
defer rs.Stop()
|
||||
|
||||
require.NoError(t, rs.privVal.SignHeartbeat(chainID, want))
|
||||
require.NoError(t, sc.SignHeartbeat(chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
|
||||
func TestSocketPVDeadline(t *testing.T) {
|
||||
var (
|
||||
addr = testFreeAddr(t)
|
||||
@ -301,32 +285,6 @@ func TestRemoteSignProposalErrors(t *testing.T) {
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestRemoteSignHeartbeatErrors(t *testing.T) {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
sc, rs = testSetupSocketPair(t, chainID, types.NewErroringMockPV())
|
||||
hb = &types.Heartbeat{}
|
||||
)
|
||||
defer sc.Stop()
|
||||
defer rs.Stop()
|
||||
|
||||
err := writeMsg(sc.conn, &SignHeartbeatRequest{Heartbeat: hb})
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := readMsg(sc.conn)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp := *res.(*SignedHeartbeatResponse)
|
||||
require.NotNil(t, resp.Error)
|
||||
require.Equal(t, resp.Error.Description, types.ErroringMockPVErr.Error())
|
||||
|
||||
err = rs.privVal.SignHeartbeat(chainID, hb)
|
||||
require.Error(t, err)
|
||||
|
||||
err = sc.SignHeartbeat(chainID, hb)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestErrUnexpectedResponse(t *testing.T) {
|
||||
var (
|
||||
addr = testFreeAddr(t)
|
||||
@ -362,22 +320,12 @@ func TestErrUnexpectedResponse(t *testing.T) {
|
||||
require.NotNil(t, rsConn)
|
||||
<-readyc
|
||||
|
||||
// Heartbeat:
|
||||
go func(errc chan error) {
|
||||
errc <- sc.SignHeartbeat(chainID, &types.Heartbeat{})
|
||||
}(errc)
|
||||
// read request and write wrong response:
|
||||
go testReadWriteResponse(t, &SignedVoteResponse{}, rsConn)
|
||||
err = <-errc
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, ErrUnexpectedResponse)
|
||||
|
||||
// Proposal:
|
||||
go func(errc chan error) {
|
||||
errc <- sc.SignProposal(chainID, &types.Proposal{})
|
||||
}(errc)
|
||||
// read request and write wrong response:
|
||||
go testReadWriteResponse(t, &SignedHeartbeatResponse{}, rsConn)
|
||||
go testReadWriteResponse(t, &SignedVoteResponse{}, rsConn)
|
||||
err = <-errc
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, ErrUnexpectedResponse)
|
||||
@ -387,7 +335,7 @@ func TestErrUnexpectedResponse(t *testing.T) {
|
||||
errc <- sc.SignVote(chainID, &types.Vote{})
|
||||
}(errc)
|
||||
// read request and write wrong response:
|
||||
go testReadWriteResponse(t, &SignedHeartbeatResponse{}, rsConn)
|
||||
go testReadWriteResponse(t, &SignedProposalResponse{}, rsConn)
|
||||
err = <-errc
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, ErrUnexpectedResponse)
|
||||
|
@ -41,16 +41,6 @@ type CanonicalVote struct {
|
||||
ChainID string
|
||||
}
|
||||
|
||||
type CanonicalHeartbeat struct {
|
||||
Type byte
|
||||
Height int64 `binary:"fixed64"`
|
||||
Round int `binary:"fixed64"`
|
||||
Sequence int `binary:"fixed64"`
|
||||
ValidatorAddress Address
|
||||
ValidatorIndex int
|
||||
ChainID string
|
||||
}
|
||||
|
||||
//-----------------------------------
|
||||
// Canonicalize the structs
|
||||
|
||||
@ -91,18 +81,6 @@ func CanonicalizeVote(chainID string, vote *Vote) CanonicalVote {
|
||||
}
|
||||
}
|
||||
|
||||
func CanonicalizeHeartbeat(chainID string, heartbeat *Heartbeat) CanonicalHeartbeat {
|
||||
return CanonicalHeartbeat{
|
||||
Type: byte(HeartbeatType),
|
||||
Height: heartbeat.Height,
|
||||
Round: heartbeat.Round,
|
||||
Sequence: heartbeat.Sequence,
|
||||
ValidatorAddress: heartbeat.ValidatorAddress,
|
||||
ValidatorIndex: heartbeat.ValidatorIndex,
|
||||
ChainID: chainID,
|
||||
}
|
||||
}
|
||||
|
||||
// CanonicalTime can be used to stringify time in a canonical way.
|
||||
func CanonicalTime(t time.Time) string {
|
||||
// Note that sending time over amino resets it to
|
||||
|
@ -146,10 +146,6 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *EventBus) PublishEventProposalHeartbeat(data EventDataProposalHeartbeat) error {
|
||||
return b.Publish(EventProposalHeartbeat, data)
|
||||
}
|
||||
|
||||
func (b *EventBus) PublishEventNewRoundStep(data EventDataRoundState) error {
|
||||
return b.Publish(EventNewRoundStep, data)
|
||||
}
|
||||
|
@ -152,7 +152,7 @@ func TestEventBusPublish(t *testing.T) {
|
||||
err = eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, eventsCh)
|
||||
require.NoError(t, err)
|
||||
|
||||
const numEventsExpected = 15
|
||||
const numEventsExpected = 14
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
numEvents := 0
|
||||
@ -172,8 +172,6 @@ func TestEventBusPublish(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
err = eventBus.PublishEventVote(EventDataVote{})
|
||||
require.NoError(t, err)
|
||||
err = eventBus.PublishEventProposalHeartbeat(EventDataProposalHeartbeat{})
|
||||
require.NoError(t, err)
|
||||
err = eventBus.PublishEventNewRoundStep(EventDataRoundState{})
|
||||
require.NoError(t, err)
|
||||
err = eventBus.PublishEventTimeoutPropose(EventDataRoundState{})
|
||||
|
@ -18,7 +18,6 @@ const (
|
||||
EventNewRound = "NewRound"
|
||||
EventNewRoundStep = "NewRoundStep"
|
||||
EventPolka = "Polka"
|
||||
EventProposalHeartbeat = "ProposalHeartbeat"
|
||||
EventRelock = "Relock"
|
||||
EventTimeoutPropose = "TimeoutPropose"
|
||||
EventTimeoutWait = "TimeoutWait"
|
||||
@ -47,7 +46,6 @@ func RegisterEventDatas(cdc *amino.Codec) {
|
||||
cdc.RegisterConcrete(EventDataNewRound{}, "tendermint/event/NewRound", nil)
|
||||
cdc.RegisterConcrete(EventDataCompleteProposal{}, "tendermint/event/CompleteProposal", nil)
|
||||
cdc.RegisterConcrete(EventDataVote{}, "tendermint/event/Vote", nil)
|
||||
cdc.RegisterConcrete(EventDataProposalHeartbeat{}, "tendermint/event/ProposalHeartbeat", nil)
|
||||
cdc.RegisterConcrete(EventDataValidatorSetUpdates{}, "tendermint/event/ValidatorSetUpdates", nil)
|
||||
cdc.RegisterConcrete(EventDataString(""), "tendermint/event/ProposalString", nil)
|
||||
}
|
||||
@ -75,10 +73,6 @@ type EventDataTx struct {
|
||||
TxResult
|
||||
}
|
||||
|
||||
type EventDataProposalHeartbeat struct {
|
||||
Heartbeat *Heartbeat
|
||||
}
|
||||
|
||||
// NOTE: This goes into the replay WAL
|
||||
type EventDataRoundState struct {
|
||||
Height int64 `json:"height"`
|
||||
@ -143,7 +137,6 @@ var (
|
||||
EventQueryNewRound = QueryForEvent(EventNewRound)
|
||||
EventQueryNewRoundStep = QueryForEvent(EventNewRoundStep)
|
||||
EventQueryPolka = QueryForEvent(EventPolka)
|
||||
EventQueryProposalHeartbeat = QueryForEvent(EventProposalHeartbeat)
|
||||
EventQueryRelock = QueryForEvent(EventRelock)
|
||||
EventQueryTimeoutPropose = QueryForEvent(EventTimeoutPropose)
|
||||
EventQueryTimeoutWait = QueryForEvent(EventTimeoutWait)
|
||||
|
@ -1,83 +0,0 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
)
|
||||
|
||||
// Heartbeat is a simple vote-like structure so validators can
|
||||
// alert others that they are alive and waiting for transactions.
|
||||
// Note: We aren't adding ",omitempty" to Heartbeat's
|
||||
// json field tags because we always want the JSON
|
||||
// representation to be in its canonical form.
|
||||
type Heartbeat struct {
|
||||
ValidatorAddress Address `json:"validator_address"`
|
||||
ValidatorIndex int `json:"validator_index"`
|
||||
Height int64 `json:"height"`
|
||||
Round int `json:"round"`
|
||||
Sequence int `json:"sequence"`
|
||||
Signature []byte `json:"signature"`
|
||||
}
|
||||
|
||||
// SignBytes returns the Heartbeat bytes for signing.
|
||||
// It panics if the Heartbeat is nil.
|
||||
func (heartbeat *Heartbeat) SignBytes(chainID string) []byte {
|
||||
bz, err := cdc.MarshalBinaryLengthPrefixed(CanonicalizeHeartbeat(chainID, heartbeat))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return bz
|
||||
}
|
||||
|
||||
// Copy makes a copy of the Heartbeat.
|
||||
func (heartbeat *Heartbeat) Copy() *Heartbeat {
|
||||
if heartbeat == nil {
|
||||
return nil
|
||||
}
|
||||
heartbeatCopy := *heartbeat
|
||||
return &heartbeatCopy
|
||||
}
|
||||
|
||||
// String returns a string representation of the Heartbeat.
|
||||
func (heartbeat *Heartbeat) String() string {
|
||||
if heartbeat == nil {
|
||||
return "nil-heartbeat"
|
||||
}
|
||||
|
||||
return fmt.Sprintf("Heartbeat{%v:%X %v/%02d (%v) %v}",
|
||||
heartbeat.ValidatorIndex, cmn.Fingerprint(heartbeat.ValidatorAddress),
|
||||
heartbeat.Height, heartbeat.Round, heartbeat.Sequence,
|
||||
fmt.Sprintf("/%X.../", cmn.Fingerprint(heartbeat.Signature[:])))
|
||||
}
|
||||
|
||||
// ValidateBasic performs basic validation.
|
||||
func (heartbeat *Heartbeat) ValidateBasic() error {
|
||||
if len(heartbeat.ValidatorAddress) != crypto.AddressSize {
|
||||
return fmt.Errorf("Expected ValidatorAddress size to be %d bytes, got %d bytes",
|
||||
crypto.AddressSize,
|
||||
len(heartbeat.ValidatorAddress),
|
||||
)
|
||||
}
|
||||
if heartbeat.ValidatorIndex < 0 {
|
||||
return errors.New("Negative ValidatorIndex")
|
||||
}
|
||||
if heartbeat.Height < 0 {
|
||||
return errors.New("Negative Height")
|
||||
}
|
||||
if heartbeat.Round < 0 {
|
||||
return errors.New("Negative Round")
|
||||
}
|
||||
if heartbeat.Sequence < 0 {
|
||||
return errors.New("Negative Sequence")
|
||||
}
|
||||
if len(heartbeat.Signature) == 0 {
|
||||
return errors.New("Signature is missing")
|
||||
}
|
||||
if len(heartbeat.Signature) > MaxSignatureSize {
|
||||
return fmt.Errorf("Signature is too big (max: %d)", MaxSignatureSize)
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,104 +0,0 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/crypto/secp256k1"
|
||||
)
|
||||
|
||||
func TestHeartbeatCopy(t *testing.T) {
|
||||
hb := &Heartbeat{ValidatorIndex: 1, Height: 10, Round: 1}
|
||||
hbCopy := hb.Copy()
|
||||
require.Equal(t, hbCopy, hb, "heartbeat copy should be the same")
|
||||
hbCopy.Round = hb.Round + 10
|
||||
require.NotEqual(t, hbCopy, hb, "heartbeat copy mutation should not change original")
|
||||
|
||||
var nilHb *Heartbeat
|
||||
nilHbCopy := nilHb.Copy()
|
||||
require.Nil(t, nilHbCopy, "copy of nil should also return nil")
|
||||
}
|
||||
|
||||
func TestHeartbeatString(t *testing.T) {
|
||||
var nilHb *Heartbeat
|
||||
require.Contains(t, nilHb.String(), "nil", "expecting a string and no panic")
|
||||
|
||||
hb := &Heartbeat{ValidatorIndex: 1, Height: 11, Round: 2}
|
||||
require.Equal(t, "Heartbeat{1:000000000000 11/02 (0) /000000000000.../}", hb.String())
|
||||
|
||||
var key ed25519.PrivKeyEd25519
|
||||
sig, err := key.Sign([]byte("Tendermint"))
|
||||
require.NoError(t, err)
|
||||
hb.Signature = sig
|
||||
require.Equal(t, "Heartbeat{1:000000000000 11/02 (0) /FF41E371B9BF.../}", hb.String())
|
||||
}
|
||||
|
||||
func TestHeartbeatWriteSignBytes(t *testing.T) {
|
||||
chainID := "test_chain_id"
|
||||
|
||||
{
|
||||
testHeartbeat := &Heartbeat{ValidatorIndex: 1, Height: 10, Round: 1}
|
||||
signBytes := testHeartbeat.SignBytes(chainID)
|
||||
expected, err := cdc.MarshalBinaryLengthPrefixed(CanonicalizeHeartbeat(chainID, testHeartbeat))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, signBytes, "Got unexpected sign bytes for Heartbeat")
|
||||
}
|
||||
|
||||
{
|
||||
testHeartbeat := &Heartbeat{}
|
||||
signBytes := testHeartbeat.SignBytes(chainID)
|
||||
expected, err := cdc.MarshalBinaryLengthPrefixed(CanonicalizeHeartbeat(chainID, testHeartbeat))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, signBytes, "Got unexpected sign bytes for Heartbeat")
|
||||
}
|
||||
|
||||
require.Panics(t, func() {
|
||||
var nilHb *Heartbeat
|
||||
signBytes := nilHb.SignBytes(chainID)
|
||||
require.Equal(t, string(signBytes), "null")
|
||||
})
|
||||
}
|
||||
|
||||
func TestHeartbeatValidateBasic(t *testing.T) {
|
||||
testCases := []struct {
|
||||
testName string
|
||||
malleateHeartBeat func(*Heartbeat)
|
||||
expectErr bool
|
||||
}{
|
||||
{"Good HeartBeat", func(hb *Heartbeat) {}, false},
|
||||
{"Invalid address size", func(hb *Heartbeat) {
|
||||
hb.ValidatorAddress = nil
|
||||
}, true},
|
||||
{"Negative validator index", func(hb *Heartbeat) {
|
||||
hb.ValidatorIndex = -1
|
||||
}, true},
|
||||
{"Negative height", func(hb *Heartbeat) {
|
||||
hb.Height = -1
|
||||
}, true},
|
||||
{"Negative round", func(hb *Heartbeat) {
|
||||
hb.Round = -1
|
||||
}, true},
|
||||
{"Negative sequence", func(hb *Heartbeat) {
|
||||
hb.Sequence = -1
|
||||
}, true},
|
||||
{"Missing signature", func(hb *Heartbeat) {
|
||||
hb.Signature = nil
|
||||
}, true},
|
||||
{"Signature too big", func(hb *Heartbeat) {
|
||||
hb.Signature = make([]byte, MaxSignatureSize+1)
|
||||
}, true},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.testName, func(t *testing.T) {
|
||||
hb := &Heartbeat{
|
||||
ValidatorAddress: secp256k1.GenPrivKey().PubKey().Address(),
|
||||
Signature: make([]byte, 4),
|
||||
ValidatorIndex: 1, Height: 10, Round: 1}
|
||||
|
||||
tc.malleateHeartBeat(hb)
|
||||
assert.Equal(t, tc.expectErr, hb.ValidateBasic() != nil, "Validate Basic had an unexpected result")
|
||||
})
|
||||
}
|
||||
}
|
@ -10,14 +10,13 @@ import (
|
||||
)
|
||||
|
||||
// PrivValidator defines the functionality of a local Tendermint validator
|
||||
// that signs votes, proposals, and heartbeats, and never double signs.
|
||||
// that signs votes and proposals, and never double signs.
|
||||
type PrivValidator interface {
|
||||
GetAddress() Address // redundant since .PubKey().Address()
|
||||
GetPubKey() crypto.PubKey
|
||||
|
||||
SignVote(chainID string, vote *Vote) error
|
||||
SignProposal(chainID string, proposal *Proposal) error
|
||||
SignHeartbeat(chainID string, heartbeat *Heartbeat) error
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
@ -84,16 +83,6 @@ func (pv *MockPV) SignProposal(chainID string, proposal *Proposal) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// signHeartbeat signs the heartbeat without any checking.
|
||||
func (pv *MockPV) SignHeartbeat(chainID string, heartbeat *Heartbeat) error {
|
||||
sig, err := pv.privKey.Sign(heartbeat.SignBytes(chainID))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
heartbeat.Signature = sig
|
||||
return nil
|
||||
}
|
||||
|
||||
// String returns a string representation of the MockPV.
|
||||
func (pv *MockPV) String() string {
|
||||
return fmt.Sprintf("MockPV{%v}", pv.GetAddress())
|
||||
@ -121,11 +110,6 @@ func (pv *erroringMockPV) SignProposal(chainID string, proposal *Proposal) error
|
||||
return ErroringMockPVErr
|
||||
}
|
||||
|
||||
// signHeartbeat signs the heartbeat without any checking.
|
||||
func (pv *erroringMockPV) SignHeartbeat(chainID string, heartbeat *Heartbeat) error {
|
||||
return ErroringMockPVErr
|
||||
}
|
||||
|
||||
// NewErroringMockPV returns a MockPV that fails on each signing request. Again, for testing only.
|
||||
func NewErroringMockPV() *erroringMockPV {
|
||||
return &erroringMockPV{&MockPV{ed25519.GenPrivKey()}}
|
||||
|
@ -6,8 +6,8 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// MaxSignatureSize is a maximum allowed signature size for the Heartbeat,
|
||||
// Proposal and Vote.
|
||||
// MaxSignatureSize is a maximum allowed signature size for the Proposal
|
||||
// and Vote.
|
||||
// XXX: secp256k1 does not have Size nor MaxSize defined.
|
||||
MaxSignatureSize = cmn.MaxInt(ed25519.SignatureSize, 64)
|
||||
)
|
||||
|
@ -11,8 +11,6 @@ const (
|
||||
// Proposals
|
||||
ProposalType SignedMsgType = 0x20
|
||||
|
||||
// Heartbeat
|
||||
HeartbeatType SignedMsgType = 0x30
|
||||
)
|
||||
|
||||
// IsVoteTypeValid returns true if t is a valid vote type.
|
||||
|
Loading…
x
Reference in New Issue
Block a user