WIP consensus

This commit is contained in:
Jae Kwon 2018-04-05 07:05:45 -07:00
parent 5d1c758730
commit 45ec5fd170
12 changed files with 104 additions and 143 deletions

View File

@ -7,7 +7,6 @@ import (
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
@ -48,7 +47,9 @@ func TestByzantine(t *testing.T) {
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
// make first val byzantine // make first val byzantine
if i == 0 { if i == 0 {
css[i].privValidator = NewByzantinePrivValidator(css[i].privValidator) // NOTE: Now, test validators are MockPV, which by default doesn't
// do any safety checks.
css[i].privValidator.(*types.MockPV).DisableChecks()
css[i].decideProposal = func(j int) func(int64, int) { css[i].decideProposal = func(j int) func(int64, int) {
return func(height int64, round int) { return func(height int64, round int) {
byzantineDecideProposalFunc(t, height, round, css[j], switches[j]) byzantineDecideProposalFunc(t, height, round, css[j], switches[j])
@ -203,7 +204,7 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int, cs *Cons
func sendProposalAndParts(height int64, round int, cs *ConsensusState, peer p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) { func sendProposalAndParts(height int64, round int, cs *ConsensusState, peer p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) {
// proposal // proposal
msg := &ProposalMessage{Proposal: proposal} msg := &ProposalMessage{Proposal: proposal}
peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg))
// parts // parts
for i := 0; i < parts.Total(); i++ { for i := 0; i < parts.Total(); i++ {
@ -213,7 +214,7 @@ func sendProposalAndParts(height int64, round int, cs *ConsensusState, peer p2p.
Round: round, // This tells peer that this part applies to us. Round: round, // This tells peer that this part applies to us.
Part: part, Part: part,
} }
peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg))
} }
// votes // votes
@ -222,8 +223,8 @@ func sendProposalAndParts(height int64, round int, cs *ConsensusState, peer p2p.
precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header()) precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header())
cs.mtx.Unlock() cs.mtx.Unlock()
peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}}) peer.Send(VoteChannel, cdc.MustMarshalBinaryBare(&VoteMessage{prevote}))
peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}}) peer.Send(VoteChannel, cdc.MustMarshalBinaryBare(&VoteMessage{precommit}))
} }
//---------------------------------------- //----------------------------------------
@ -264,47 +265,3 @@ func (br *ByzantineReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { func (br *ByzantineReactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {
br.reactor.Receive(chID, peer, msgBytes) br.reactor.Receive(chID, peer, msgBytes)
} }
//----------------------------------------
// byzantine privValidator
type ByzantinePrivValidator struct {
types.Signer
pv types.PrivValidator
}
// Return a priv validator that will sign anything
func NewByzantinePrivValidator(pv types.PrivValidator) *ByzantinePrivValidator {
return &ByzantinePrivValidator{
Signer: pv.(*types.PrivValidatorFS).Signer,
pv: pv,
}
}
func (privVal *ByzantinePrivValidator) GetAddress() types.Address {
return privVal.pv.GetAddress()
}
func (privVal *ByzantinePrivValidator) GetPubKey() crypto.PubKey {
return privVal.pv.GetPubKey()
}
func (privVal *ByzantinePrivValidator) SignVote(chainID string, vote *types.Vote) (err error) {
vote.Signature, err = privVal.Sign(vote.SignBytes(chainID))
return err
}
func (privVal *ByzantinePrivValidator) SignProposal(chainID string, proposal *types.Proposal) (err error) {
proposal.Signature, _ = privVal.Sign(proposal.SignBytes(chainID))
return nil
}
func (privVal *ByzantinePrivValidator) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) (err error) {
heartbeat.Signature, _ = privVal.Sign(heartbeat.SignBytes(chainID))
return nil
}
func (privVal *ByzantinePrivValidator) String() string {
return cmn.Fmt("PrivValidator{%X}", privVal.GetAddress())
}

View File

@ -21,6 +21,7 @@ import (
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
privval "github.com/tendermint/tendermint/types/priv_validator"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
@ -277,10 +278,10 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S
return cs return cs
} }
func loadPrivValidator(config *cfg.Config) *types.PrivValidatorFS { func loadPrivValidator(config *cfg.Config) *privval.FilePV {
privValidatorFile := config.PrivValidatorFile() privValidatorFile := config.PrivValidatorFile()
ensureDir(path.Dir(privValidatorFile), 0700) ensureDir(path.Dir(privValidatorFile), 0700)
privValidator := types.LoadOrGenPrivValidatorFS(privValidatorFile) privValidator := privval.LoadOrGenFilePV(privValidatorFile)
privValidator.Reset() privValidator.Reset()
return privValidator return privValidator
} }
@ -378,7 +379,7 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF
privVal = privVals[i] privVal = privVals[i]
} else { } else {
_, tempFilePath := cmn.Tempfile("priv_validator_") _, tempFilePath := cmn.Tempfile("priv_validator_")
privVal = types.GenPrivValidatorFS(tempFilePath) privVal = privval.GenFilePV(tempFilePath)
} }
app := appFunc() app := appFunc()

View File

@ -1,7 +1,6 @@
package consensus package consensus
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"reflect" "reflect"
@ -10,7 +9,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
wire "github.com/tendermint/go-wire" "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
@ -176,7 +175,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
return return
} }
_, msg, err := DecodeMessage(msgBytes) msg, err := DecodeMessage(msgBytes)
if err != nil { if err != nil {
conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) conR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
conR.Switch.StopPeerForError(src, err) conR.Switch.StopPeerForError(src, err)
@ -222,13 +221,13 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
conR.Logger.Error("Bad VoteSetBitsMessage field Type") conR.Logger.Error("Bad VoteSetBitsMessage field Type")
return return
} }
src.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{ src.TrySend(VoteSetBitsChannel, cdc.MustMarshalBinaryBare(&VoteSetBitsMessage{
Height: msg.Height, Height: msg.Height,
Round: msg.Round, Round: msg.Round,
Type: msg.Type, Type: msg.Type,
BlockID: msg.BlockID, BlockID: msg.BlockID,
Votes: ourVotes, Votes: ourVotes,
}}) }))
case *ProposalHeartbeatMessage: case *ProposalHeartbeatMessage:
hb := msg.Heartbeat hb := msg.Heartbeat
conR.Logger.Debug("Received proposal heartbeat message", conR.Logger.Debug("Received proposal heartbeat message",
@ -401,16 +400,16 @@ func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.
conR.Logger.Debug("Broadcasting proposal heartbeat message", conR.Logger.Debug("Broadcasting proposal heartbeat message",
"height", hb.Height, "round", hb.Round, "sequence", hb.Sequence) "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence)
msg := &ProposalHeartbeatMessage{hb} msg := &ProposalHeartbeatMessage{hb}
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg}) conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(msg))
} }
func (conR *ConsensusReactor) broadcastNewRoundStep(rs *cstypes.RoundState) { func (conR *ConsensusReactor) broadcastNewRoundStep(rs *cstypes.RoundState) {
nrsMsg, csMsg := makeRoundStepMessages(rs) nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil { if nrsMsg != nil {
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{nrsMsg}) conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))
} }
if csMsg != nil { if csMsg != nil {
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{csMsg}) conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(csMsg))
} }
} }
@ -422,7 +421,7 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) {
Type: vote.Type, Type: vote.Type,
Index: vote.ValidatorIndex, Index: vote.ValidatorIndex,
} }
conR.Switch.Broadcast(StateChannel, struct{ ConsensusMessage }{msg}) conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(msg))
/* /*
// TODO: Make this broadcast more selective. // TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() { for _, peer := range conR.Switch.Peers().List() {
@ -462,10 +461,10 @@ func (conR *ConsensusReactor) sendNewRoundStepMessages(peer p2p.Peer) {
rs := conR.conS.GetRoundState() rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs) nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil { if nrsMsg != nil {
peer.Send(StateChannel, struct{ ConsensusMessage }{nrsMsg}) peer.Send(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))
} }
if csMsg != nil { if csMsg != nil {
peer.Send(StateChannel, struct{ ConsensusMessage }{csMsg}) peer.Send(StateChannel, cdc.MustMarshalBinaryBare(csMsg))
} }
} }
@ -492,7 +491,7 @@ OUTER_LOOP:
Part: part, Part: part,
} }
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round) logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { if peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg)) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} }
continue OUTER_LOOP continue OUTER_LOOP
@ -536,7 +535,7 @@ OUTER_LOOP:
{ {
msg := &ProposalMessage{Proposal: rs.Proposal} msg := &ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round) logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { if peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg)) {
ps.SetHasProposal(rs.Proposal) ps.SetHasProposal(rs.Proposal)
} }
} }
@ -551,7 +550,7 @@ OUTER_LOOP:
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(), ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
} }
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round) logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg))
} }
continue OUTER_LOOP continue OUTER_LOOP
} }
@ -594,7 +593,7 @@ func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *cstype
Part: part, Part: part,
} }
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index) logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { if peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg)) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} else { } else {
logger.Debug("Sending block part for catchup failed") logger.Debug("Sending block part for catchup failed")
@ -733,12 +732,12 @@ OUTER_LOOP:
prs := ps.GetRoundState() prs := ps.GetRoundState()
if rs.Height == prs.Height { if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ peer.TrySend(StateChannel, cdc.MustMarshalBinaryBare(&VoteSetMaj23Message{
Height: prs.Height, Height: prs.Height,
Round: prs.Round, Round: prs.Round,
Type: types.VoteTypePrevote, Type: types.VoteTypePrevote,
BlockID: maj23, BlockID: maj23,
}}) }))
time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
} }
} }
@ -750,12 +749,12 @@ OUTER_LOOP:
prs := ps.GetRoundState() prs := ps.GetRoundState()
if rs.Height == prs.Height { if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok { if maj23, ok := rs.Votes.Precommits(prs.Round).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ peer.TrySend(StateChannel, cdc.MustMarshalBinaryBare(&VoteSetMaj23Message{
Height: prs.Height, Height: prs.Height,
Round: prs.Round, Round: prs.Round,
Type: types.VoteTypePrecommit, Type: types.VoteTypePrecommit,
BlockID: maj23, BlockID: maj23,
}}) }))
time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
} }
} }
@ -767,12 +766,12 @@ OUTER_LOOP:
prs := ps.GetRoundState() prs := ps.GetRoundState()
if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 { if rs.Height == prs.Height && prs.ProposalPOLRound >= 0 {
if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok { if maj23, ok := rs.Votes.Prevotes(prs.ProposalPOLRound).TwoThirdsMajority(); ok {
peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ peer.TrySend(StateChannel, cdc.MustMarshalBinaryBare(&VoteSetMaj23Message{
Height: prs.Height, Height: prs.Height,
Round: prs.ProposalPOLRound, Round: prs.ProposalPOLRound,
Type: types.VoteTypePrevote, Type: types.VoteTypePrevote,
BlockID: maj23, BlockID: maj23,
}}) }))
time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
} }
} }
@ -786,12 +785,12 @@ OUTER_LOOP:
prs := ps.GetRoundState() prs := ps.GetRoundState()
if prs.CatchupCommitRound != -1 && 0 < prs.Height && prs.Height <= conR.conS.blockStore.Height() { if prs.CatchupCommitRound != -1 && 0 < prs.Height && prs.Height <= conR.conS.blockStore.Height() {
commit := conR.conS.LoadCommit(prs.Height) commit := conR.conS.LoadCommit(prs.Height)
peer.TrySend(StateChannel, struct{ ConsensusMessage }{&VoteSetMaj23Message{ peer.TrySend(StateChannel, cdc.MustMarshalBinaryBare(&VoteSetMaj23Message{
Height: prs.Height, Height: prs.Height,
Round: commit.Round(), Round: commit.Round(),
Type: types.VoteTypePrecommit, Type: types.VoteTypePrecommit,
BlockID: commit.BlockID, BlockID: commit.BlockID,
}}) }))
time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
} }
} }
@ -938,7 +937,7 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok { if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote} msg := &VoteMessage{vote}
ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote)
return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg}) return ps.Peer.Send(VoteChannel, cdc.MustMarshalBinaryBare(msg))
} }
return false return false
} }
@ -1261,45 +1260,27 @@ func (ps *PeerState) StringIndented(indent string) string {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Messages // Messages
const (
msgTypeNewRoundStep = byte(0x01)
msgTypeCommitStep = byte(0x02)
msgTypeProposal = byte(0x11)
msgTypeProposalPOL = byte(0x12)
msgTypeBlockPart = byte(0x13) // both block & POL
msgTypeVote = byte(0x14)
msgTypeHasVote = byte(0x15)
msgTypeVoteSetMaj23 = byte(0x16)
msgTypeVoteSetBits = byte(0x17)
msgTypeProposalHeartbeat = byte(0x20)
)
// ConsensusMessage is a message that can be sent and received on the ConsensusReactor // ConsensusMessage is a message that can be sent and received on the ConsensusReactor
type ConsensusMessage interface{} type ConsensusMessage interface{}
var _ = wire.RegisterInterface( func RegisterConsensusMessages(cdc *amino.Codec) {
struct{ ConsensusMessage }{}, cdc.RegisterInterface((*ConsensusMessage)(nil), nil)
wire.ConcreteType{&NewRoundStepMessage{}, msgTypeNewRoundStep}, cdc.RegisterConcrete(&NewRoundStepMessage{}, "tendermint/NewRoundStepMessage", nil)
wire.ConcreteType{&CommitStepMessage{}, msgTypeCommitStep}, cdc.RegisterConcrete(&CommitStepMessage{}, "tendermint/CommitStep", nil)
wire.ConcreteType{&ProposalMessage{}, msgTypeProposal}, cdc.RegisterConcrete(&ProposalMessage{}, "tendermint/Proposal", nil)
wire.ConcreteType{&ProposalPOLMessage{}, msgTypeProposalPOL}, cdc.RegisterConcrete(&ProposalPOLMessage{}, "tendermint/ProposalPOL", nil)
wire.ConcreteType{&BlockPartMessage{}, msgTypeBlockPart}, cdc.RegisterConcrete(&BlockPartMessage{}, "tendermint/BlockPart", nil)
wire.ConcreteType{&VoteMessage{}, msgTypeVote}, cdc.RegisterConcrete(&VoteMessage{}, "tendermint/Vote", nil)
wire.ConcreteType{&HasVoteMessage{}, msgTypeHasVote}, cdc.RegisterConcrete(&HasVoteMessage{}, "tendermint/HasVote", nil)
wire.ConcreteType{&VoteSetMaj23Message{}, msgTypeVoteSetMaj23}, cdc.RegisterConcrete(&VoteSetMaj23Message{}, "tendermint/VoteSetMaj23", nil)
wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits}, cdc.RegisterConcrete(&VoteSetBitsMessage{}, "tendermint/VoteSetBits", nil)
wire.ConcreteType{&ProposalHeartbeatMessage{}, msgTypeProposalHeartbeat}, cdc.RegisterConcrete(&ProposalHeartbeatMessage{}, "tendermint/ProposalHeartbeat", nil)
) }
// DecodeMessage decodes the given bytes into a ConsensusMessage. // DecodeMessage decodes the given bytes into a ConsensusMessage.
// TODO: check for unnecessary extra bytes at the end. // TODO: check for unnecessary extra bytes at the end.
func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) { func DecodeMessage(bz []byte) (msg ConsensusMessage, err error) {
msgType = bz[0] err = cdc.UnmarshalBinaryBare(bz, &msg)
n := new(int)
r := bytes.NewReader(bz)
msgI := wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err)
msg = msgI.(struct{ ConsensusMessage }).ConsensusMessage
return return
} }

View File

@ -17,8 +17,7 @@ import (
"github.com/tendermint/abci/example/kvstore" "github.com/tendermint/abci/example/kvstore"
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
crypto "github.com/tendermint/go-crypto" "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
auto "github.com/tendermint/tmlibs/autofile" auto "github.com/tendermint/tmlibs/autofile"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
@ -27,6 +26,7 @@ import (
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
privval "github.com/tendermint/tendermint/types/priv_validator"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
) )
@ -325,7 +325,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
walFile := tempWALWithData(walBody) walFile := tempWALWithData(walBody)
config.Consensus.SetWalFile(walFile) config.Consensus.SetWalFile(walFile)
privVal := types.LoadPrivValidatorFS(config.PrivValidatorFile()) privVal := privval.LoadFilePV(config.PrivValidatorFile())
wal, err := NewWAL(walFile, false) wal, err := NewWAL(walFile, false)
if err != nil { if err != nil {
@ -519,8 +519,8 @@ func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
case EndHeightMessage: case EndHeightMessage:
// if its not the first one, we have a full block // if its not the first one, we have a full block
if thisBlockParts != nil { if thisBlockParts != nil {
var n int var block = new(types.Block)
block := wire.ReadBinary(&types.Block{}, thisBlockParts.GetReader(), 0, &n, &err).(*types.Block) _, err = cdc.UnmarshalBinaryReader(thisBlockParts.GetReader(), block, 0)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -552,8 +552,8 @@ func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
} }
} }
// grab the last block too // grab the last block too
var n int var block = new(types.Block)
block := wire.ReadBinary(&types.Block{}, thisBlockParts.GetReader(), 0, &n, &err).(*types.Block) _, err = cdc.UnmarshalBinaryReader(thisBlockParts.GetReader(), block, 0)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -4,17 +4,14 @@ import (
"bytes" "bytes"
"errors" "errors"
"fmt" "fmt"
fail "github.com/ebuchman/fail-test"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
"reflect" "reflect"
"runtime/debug" "runtime/debug"
"sync" "sync"
"time" "time"
fail "github.com/ebuchman/fail-test"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
cstypes "github.com/tendermint/tendermint/consensus/types" cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
@ -1301,7 +1298,7 @@ func (cs *ConsensusState) addProposalBlockPart(height int64, part *types.Part, v
} }
if added && cs.ProposalBlockParts.IsComplete() { if added && cs.ProposalBlockParts.IsComplete() {
// Added and completed! // Added and completed!
err = cdc.UnmarshalBinaryBare(cs.ProposalBlockParts.GetReader(), &cs.ProposalBlock, cs.state.ConsensusParams.BlockSize.MaxBytes) _, err = cdc.UnmarshalBinaryReader(cs.ProposalBlockParts.GetReader(), &cs.ProposalBlock, int64(cs.state.ConsensusParams.BlockSize.MaxBytes))
if err != nil { if err != nil {
return true, err return true, err
} }

View File

@ -409,6 +409,7 @@ func TestStateLockNoPOL(t *testing.T) {
validatePrevote(t, cs1, 1, vss[0], rs.LockedBlock.Hash()) validatePrevote(t, cs1, 1, vss[0], rs.LockedBlock.Hash())
// add a conflicting prevote from the other validator // add a conflicting prevote from the other validator
fmt.Println(">>", rs.ProposalBlock)
signAddVotes(cs1, types.VoteTypePrevote, hash, rs.ProposalBlock.MakePartSet(partSize).Header(), vs2) signAddVotes(cs1, types.VoteTypePrevote, hash, rs.ProposalBlock.MakePartSet(partSize).Header(), vs2)
<-voteCh <-voteCh

View File

@ -1,7 +1,6 @@
package consensus package consensus
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"hash/crc32" "hash/crc32"
@ -11,7 +10,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
wire "github.com/tendermint/go-wire" "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
auto "github.com/tendermint/tmlibs/autofile" auto "github.com/tendermint/tmlibs/autofile"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
@ -38,13 +37,13 @@ type EndHeightMessage struct {
type WALMessage interface{} type WALMessage interface{}
var _ = wire.RegisterInterface( func RegisterWALMessages(cdc *amino.Codec) {
struct{ WALMessage }{}, cdc.RegisterInterface((*WALMessage)(nil), nil)
wire.ConcreteType{types.EventDataRoundState{}, 0x01}, cdc.RegisterConcrete(types.EventDataRoundState{}, "tendermint/wal/EventDataRoundState", nil)
wire.ConcreteType{msgInfo{}, 0x02}, cdc.RegisterConcrete(msgInfo{}, "tendermint/wal/MsgInfo", nil)
wire.ConcreteType{timeoutInfo{}, 0x03}, cdc.RegisterConcrete(timeoutInfo{}, "tendermint/wal/TimeoutInfo", nil)
wire.ConcreteType{EndHeightMessage{}, 0x04}, cdc.RegisterConcrete(EndHeightMessage{}, "tendermint/wal/EndHeightMessagE", nil)
) }
//-------------------------------------------------------- //--------------------------------------------------------
// Simple write-ahead logger // Simple write-ahead logger
@ -114,6 +113,7 @@ func (wal *baseWAL) OnStop() {
// called in newStep and for each pass in receiveRoutine // called in newStep and for each pass in receiveRoutine
func (wal *baseWAL) Save(msg WALMessage) { func (wal *baseWAL) Save(msg WALMessage) {
fmt.Println("!!", 1)
if wal == nil { if wal == nil {
return return
} }
@ -193,7 +193,7 @@ func (wal *baseWAL) SearchForEndHeight(height int64, options *WALSearchOptions)
// A WALEncoder writes custom-encoded WAL messages to an output stream. // A WALEncoder writes custom-encoded WAL messages to an output stream.
// //
// Format: 4 bytes CRC sum + 4 bytes length + arbitrary-length value (go-wire encoded) // Format: 4 bytes CRC sum + 4 bytes length + arbitrary-length value (go-amino encoded)
type WALEncoder struct { type WALEncoder struct {
wr io.Writer wr io.Writer
} }
@ -205,7 +205,7 @@ func NewWALEncoder(wr io.Writer) *WALEncoder {
// Encode writes the custom encoding of v to the stream. // Encode writes the custom encoding of v to the stream.
func (enc *WALEncoder) Encode(v *TimedWALMessage) error { func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
data := wire.BinaryBytes(v) data := cdc.MustMarshalBinaryBare(v)
crc := crc32.Checksum(data, crc32c) crc := crc32.Checksum(data, crc32c)
length := uint32(len(data)) length := uint32(len(data))
@ -298,9 +298,8 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
return nil, DataCorruptionError{fmt.Errorf("checksums do not match: (read: %v, actual: %v)", crc, actualCRC)} return nil, DataCorruptionError{fmt.Errorf("checksums do not match: (read: %v, actual: %v)", crc, actualCRC)}
} }
var nn int var res = new(TimedWALMessage) // nolint: gosimple
var res *TimedWALMessage // nolint: gosimple err = cdc.UnmarshalBinaryBare(data, res)
res = wire.ReadBinary(&TimedWALMessage{}, bytes.NewBuffer(data), int(length), &nn, &err).(*TimedWALMessage)
if err != nil { if err != nil {
return nil, DataCorruptionError{fmt.Errorf("failed to decode data: %v", err)} return nil, DataCorruptionError{fmt.Errorf("failed to decode data: %v", err)}
} }

View File

@ -17,6 +17,7 @@ import (
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
privval "github.com/tendermint/tendermint/types/priv_validator"
auto "github.com/tendermint/tmlibs/autofile" auto "github.com/tendermint/tmlibs/autofile"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/db" "github.com/tendermint/tmlibs/db"
@ -40,7 +41,7 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
// COPY PASTE FROM node.go WITH A FEW MODIFICATIONS // COPY PASTE FROM node.go WITH A FEW MODIFICATIONS
// NOTE: we can't import node package because of circular dependency // NOTE: we can't import node package because of circular dependency
privValidatorFile := config.PrivValidatorFile() privValidatorFile := config.PrivValidatorFile()
privValidator := types.LoadOrGenPrivValidatorFS(privValidatorFile) privValidator := privval.LoadOrGenFilePV(privValidatorFile)
genDoc, err := types.GenesisDocFromFile(config.GenesisFile()) genDoc, err := types.GenesisDocFromFile(config.GenesisFile())
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to read genesis file") return nil, errors.Wrap(err, "failed to read genesis file")

View File

@ -3,11 +3,10 @@ package consensus
import ( import (
"bytes" "bytes"
"crypto/rand" "crypto/rand"
"sync" // "sync"
"testing" "testing"
"time" "time"
wire "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/consensus/types" "github.com/tendermint/tendermint/consensus/types"
tmtypes "github.com/tendermint/tendermint/types" tmtypes "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
@ -68,6 +67,7 @@ func TestWALSearchForEndHeight(t *testing.T) {
assert.Equal(t, rs.Height, h+1, cmn.Fmt("wrong height")) assert.Equal(t, rs.Height, h+1, cmn.Fmt("wrong height"))
} }
/*
var initOnce sync.Once var initOnce sync.Once
func registerInterfacesOnce() { func registerInterfacesOnce() {
@ -78,6 +78,7 @@ func registerInterfacesOnce() {
) )
}) })
} }
*/
func nBytes(n int) []byte { func nBytes(n int) []byte {
buf := make([]byte, n) buf := make([]byte, n)
@ -86,7 +87,7 @@ func nBytes(n int) []byte {
} }
func benchmarkWalDecode(b *testing.B, n int) { func benchmarkWalDecode(b *testing.B, n int) {
registerInterfacesOnce() // registerInterfacesOnce()
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
enc := NewWALEncoder(buf) enc := NewWALEncoder(buf)

14
consensus/wire.go Normal file
View File

@ -0,0 +1,14 @@
package consensus
import (
"github.com/tendermint/go-amino"
"github.com/tendermint/go-crypto"
)
var cdc = amino.NewCodec()
func init() {
RegisterConsensusMessages(cdc)
RegisterWALMessages(cdc)
crypto.RegisterAmino(cdc)
}

View File

@ -94,7 +94,9 @@ func (b *Block) Hash() cmn.HexBytes {
// MakePartSet returns a PartSet containing parts of a serialized block. // MakePartSet returns a PartSet containing parts of a serialized block.
// This is the form in which the block is gossipped to peers. // This is the form in which the block is gossipped to peers.
func (b *Block) MakePartSet(partSize int) *PartSet { func (b *Block) MakePartSet(partSize int) *PartSet {
bz, err := cdc.MarshalBinaryBare(b) // We prefix the byte length, so that unmarshaling
// can easily happen via a reader.
bz, err := cdc.MarshalBinary(b)
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@ -87,3 +87,10 @@ func (pv *MockPV) SignHeartbeat(chainID string, heartbeat *Heartbeat) error {
func (pv *MockPV) String() string { func (pv *MockPV) String() string {
return fmt.Sprintf("MockPV{%v}", pv.GetAddress()) return fmt.Sprintf("MockPV{%v}", pv.GetAddress())
} }
// XXX: Implement.
func (pv *MockPV) DisableChecks() {
// Currently this does nothing,
// as MockPV has no safety checks at all.
return
}