diff --git a/CHANGELOG.md b/CHANGELOG.md index 85072172..438a2a42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## 0.10.2 (July 10, 2017) + +FEATURES: +- Enable lower latency block commits by adding consensus reactor sleep durations and p2p flush throttle timeout to the config + +IMPROVEMENTS: +- More detailed logging in the consensus reactor and state machine +- More in-code documentation for many exposed functions, especially in consensus/reactor.go and p2p/switch.go +- Improved readability for some function definitions and code blocks with long lines + ## 0.10.1 (June 28, 2017) FEATURES: diff --git a/cmd/tendermint/commands/root_test.go b/cmd/tendermint/commands/root_test.go index f776a024..ae0f3827 100644 --- a/cmd/tendermint/commands/root_test.go +++ b/cmd/tendermint/commands/root_test.go @@ -23,8 +23,14 @@ const ( ) // isolate provides a clean setup and returns a copy of RootCmd you can -// modify in the test cases +// modify in the test cases. +// NOTE: it unsets all TM* env variables. func isolate(cmds ...*cobra.Command) cli.Executable { + os.Unsetenv("TMHOME") + os.Unsetenv("TM_HOME") + os.Unsetenv("TMROOT") + os.Unsetenv("TM_ROOT") + viper.Reset() config = cfg.DefaultConfig() r := &cobra.Command{ diff --git a/config/config.go b/config/config.go index 42ca6099..e552b33b 100644 --- a/config/config.go +++ b/config/config.go @@ -5,9 +5,10 @@ import ( "path/filepath" "time" - "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/types" // TODO: remove ) +// Config defines the top level configuration for a Tendermint node type Config struct { // Top level options use an anonymous struct BaseConfig `mapstructure:",squash"` @@ -19,6 +20,7 @@ type Config struct { Consensus *ConsensusConfig `mapstructure:"consensus"` } +// DefaultConfig returns a default configuration for a Tendermint node func DefaultConfig() *Config { return &Config{ BaseConfig: DefaultBaseConfig(), @@ -29,6 +31,7 @@ func DefaultConfig() *Config { } } +// TestConfig returns a configuration that can be used for testing func TestConfig() *Config { return &Config{ BaseConfig: TestBaseConfig(), @@ -39,7 +42,7 @@ func TestConfig() *Config { } } -// Set the RootDir for all Config structs +// SetRoot sets the RootDir for all Config structs func (cfg *Config) SetRoot(root string) *Config { cfg.BaseConfig.RootDir = root cfg.RPC.RootDir = root @@ -52,7 +55,7 @@ func (cfg *Config) SetRoot(root string) *Config { //----------------------------------------------------------------------------- // BaseConfig -// BaseConfig struct for a Tendermint node +// BaseConfig defines the base configuration for a Tendermint node type BaseConfig struct { // The root directory for all data. // This should be set in viper so it can unmarshal into this struct @@ -102,6 +105,7 @@ type BaseConfig struct { DBPath string `mapstructure:"db_dir"` } +// DefaultBaseConfig returns a default base configuration for a Tendermint node func DefaultBaseConfig() BaseConfig { return BaseConfig{ Genesis: "genesis.json", @@ -119,6 +123,7 @@ func DefaultBaseConfig() BaseConfig { } } +// TestBaseConfig returns a base configuration for testing a Tendermint node func TestBaseConfig() BaseConfig { conf := DefaultBaseConfig() conf.ChainID = "tendermint_test" @@ -128,22 +133,27 @@ func TestBaseConfig() BaseConfig { return conf } +// GenesisFile returns the full path to the genesis.json file func (b BaseConfig) GenesisFile() string { return rootify(b.Genesis, b.RootDir) } +// PrivValidatorFile returns the full path to the priv_validator.json file func (b BaseConfig) PrivValidatorFile() string { return rootify(b.PrivValidator, b.RootDir) } +// DBDir returns the full path to the database directory func (b BaseConfig) DBDir() string { return rootify(b.DBPath, b.RootDir) } +// DefaultLogLevel returns a default log level of "error" func DefaultLogLevel() string { return "error" } +// DefaultPackageLogLevels returns a default log level setting so all packages log at "error", while the `state` package logs at "info" func DefaultPackageLogLevels() string { return fmt.Sprintf("state:info,*:%s", DefaultLogLevel()) } @@ -151,6 +161,7 @@ func DefaultPackageLogLevels() string { //----------------------------------------------------------------------------- // RPCConfig +// RPCConfig defines the configuration options for the Tendermint RPC server type RPCConfig struct { RootDir string `mapstructure:"home"` @@ -165,6 +176,7 @@ type RPCConfig struct { Unsafe bool `mapstructure:"unsafe"` } +// DefaultRPCConfig returns a default configuration for the RPC server func DefaultRPCConfig() *RPCConfig { return &RPCConfig{ ListenAddress: "tcp://0.0.0.0:46657", @@ -173,6 +185,7 @@ func DefaultRPCConfig() *RPCConfig { } } +// TestRPCConfig returns a configuration for testing the RPC server func TestRPCConfig() *RPCConfig { conf := DefaultRPCConfig() conf.ListenAddress = "tcp://0.0.0.0:36657" @@ -184,26 +197,47 @@ func TestRPCConfig() *RPCConfig { //----------------------------------------------------------------------------- // P2PConfig +// P2PConfig defines the configuration options for the Tendermint peer-to-peer networking layer type P2PConfig struct { - RootDir string `mapstructure:"home"` - ListenAddress string `mapstructure:"laddr"` - Seeds string `mapstructure:"seeds"` - SkipUPNP bool `mapstructure:"skip_upnp"` - AddrBook string `mapstructure:"addr_book_file"` - AddrBookStrict bool `mapstructure:"addr_book_strict"` - PexReactor bool `mapstructure:"pex"` - MaxNumPeers int `mapstructure:"max_num_peers"` + RootDir string `mapstructure:"home"` + + // Address to listen for incoming connections + ListenAddress string `mapstructure:"laddr"` + + // Comma separated list of seed nodes to connect to + Seeds string `mapstructure:"seeds"` + + // Skip UPNP port forwarding + SkipUPNP bool `mapstructure:"skip_upnp"` + + // Path to address book + AddrBook string `mapstructure:"addr_book_file"` + + // Set true for strict address routability rules + AddrBookStrict bool `mapstructure:"addr_book_strict"` + + // Set true to enable the peer-exchange reactor + PexReactor bool `mapstructure:"pex"` + + // Maximum number of peers to connect to + MaxNumPeers int `mapstructure:"max_num_peers"` + + // Time to wait before flushing messages out on the connection. In ms + FlushThrottleTimeout int `mapstructure:"flush_throttle_timeout"` } +// DefaultP2PConfig returns a default configuration for the peer-to-peer layer func DefaultP2PConfig() *P2PConfig { return &P2PConfig{ - ListenAddress: "tcp://0.0.0.0:46656", - AddrBook: "addrbook.json", - AddrBookStrict: true, - MaxNumPeers: 50, + ListenAddress: "tcp://0.0.0.0:46656", + AddrBook: "addrbook.json", + AddrBookStrict: true, + MaxNumPeers: 50, + FlushThrottleTimeout: 100, } } +// TestP2PConfig returns a configuration for testing the peer-to-peer layer func TestP2PConfig() *P2PConfig { conf := DefaultP2PConfig() conf.ListenAddress = "tcp://0.0.0.0:36656" @@ -211,6 +245,7 @@ func TestP2PConfig() *P2PConfig { return conf } +// AddrBookFile returns the full path to the address bool func (p *P2PConfig) AddrBookFile() string { return rootify(p.AddrBook, p.RootDir) } @@ -218,6 +253,7 @@ func (p *P2PConfig) AddrBookFile() string { //----------------------------------------------------------------------------- // MempoolConfig +// MempoolConfig defines the configuration options for the Tendermint mempool type MempoolConfig struct { RootDir string `mapstructure:"home"` Recheck bool `mapstructure:"recheck"` @@ -226,6 +262,7 @@ type MempoolConfig struct { WalPath string `mapstructure:"wal_dir"` } +// DefaultMempoolConfig returns a default configuration for the Tendermint mempool func DefaultMempoolConfig() *MempoolConfig { return &MempoolConfig{ Recheck: true, @@ -235,6 +272,7 @@ func DefaultMempoolConfig() *MempoolConfig { } } +// WalDir returns the full path to the mempool's write-ahead log func (m *MempoolConfig) WalDir() string { return rootify(m.WalPath, m.RootDir) } @@ -242,8 +280,8 @@ func (m *MempoolConfig) WalDir() string { //----------------------------------------------------------------------------- // ConsensusConfig -// ConsensusConfig holds timeouts and details about the WAL, the block structure, -// and timeouts in the consensus protocol. +// ConsensusConfig defines the confuguration for the Tendermint consensus service, +// including timeouts and details about the WAL and the block structure. type ConsensusConfig struct { RootDir string `mapstructure:"home"` WalPath string `mapstructure:"wal_file"` @@ -269,46 +307,64 @@ type ConsensusConfig struct { // TODO: This probably shouldn't be exposed but it makes it // easy to write tests for the wal/replay BlockPartSize int `mapstructure:"block_part_size"` + + // Reactor sleep duration parameters are in ms + PeerGossipSleepDuration int `mapstructure:"peer_gossip_sleep_duration"` + PeerQueryMaj23SleepDuration int `mapstructure:"peer_query_maj23_sleep_duration"` } -// Wait this long for a proposal +// Propose returns the amount of time to wait for a proposal func (cfg *ConsensusConfig) Propose(round int) time.Duration { return time.Duration(cfg.TimeoutPropose+cfg.TimeoutProposeDelta*round) * time.Millisecond } -// After receiving any +2/3 prevote, wait this long for stragglers +// Prevote returns the amount of time to wait for straggler votes after receiving any +2/3 prevotes func (cfg *ConsensusConfig) Prevote(round int) time.Duration { return time.Duration(cfg.TimeoutPrevote+cfg.TimeoutPrevoteDelta*round) * time.Millisecond } -// After receiving any +2/3 precommits, wait this long for stragglers +// Precommit returns the amount of time to wait for straggler votes after receiving any +2/3 precommits func (cfg *ConsensusConfig) Precommit(round int) time.Duration { return time.Duration(cfg.TimeoutPrecommit+cfg.TimeoutPrecommitDelta*round) * time.Millisecond } -// After receiving +2/3 precommits for a single block (a commit), wait this long for stragglers in the next height's RoundStepNewHeight +// Commit returns the amount of time to wait for straggler votes after receiving +2/3 precommits for a single block (ie. a commit). func (cfg *ConsensusConfig) Commit(t time.Time) time.Time { return t.Add(time.Duration(cfg.TimeoutCommit) * time.Millisecond) } +// PeerGossipSleep returns the amount of time to sleep if there is nothing to send from the ConsensusReactor +func (cfg *ConsensusConfig) PeerGossipSleep() time.Duration { + return time.Duration(cfg.PeerGossipSleepDuration) * time.Millisecond +} + +// PeerQueryMaj23Sleep returns the amount of time to sleep after each VoteSetMaj23Message is sent in the ConsensusReactor +func (cfg *ConsensusConfig) PeerQueryMaj23Sleep() time.Duration { + return time.Duration(cfg.PeerQueryMaj23SleepDuration) * time.Millisecond +} + +// DefaultConsensusConfig returns a default configuration for the consensus service func DefaultConsensusConfig() *ConsensusConfig { return &ConsensusConfig{ - WalPath: "data/cs.wal/wal", - WalLight: false, - TimeoutPropose: 3000, - TimeoutProposeDelta: 500, - TimeoutPrevote: 1000, - TimeoutPrevoteDelta: 500, - TimeoutPrecommit: 1000, - TimeoutPrecommitDelta: 500, - TimeoutCommit: 1000, - SkipTimeoutCommit: false, - MaxBlockSizeTxs: 10000, - MaxBlockSizeBytes: 1, // TODO - BlockPartSize: types.DefaultBlockPartSize, // TODO: we shouldnt be importing types + WalPath: "data/cs.wal/wal", + WalLight: false, + TimeoutPropose: 3000, + TimeoutProposeDelta: 500, + TimeoutPrevote: 1000, + TimeoutPrevoteDelta: 500, + TimeoutPrecommit: 1000, + TimeoutPrecommitDelta: 500, + TimeoutCommit: 1000, + SkipTimeoutCommit: false, + MaxBlockSizeTxs: 10000, + MaxBlockSizeBytes: 1, // TODO + BlockPartSize: types.DefaultBlockPartSize, // TODO: we shouldnt be importing types + PeerGossipSleepDuration: 100, + PeerQueryMaj23SleepDuration: 2000, } } +// TestConsensusConfig returns a configuration for testing the consensus service func TestConsensusConfig() *ConsensusConfig { config := DefaultConsensusConfig() config.TimeoutPropose = 2000 @@ -322,6 +378,7 @@ func TestConsensusConfig() *ConsensusConfig { return config } +// WalFile returns the full path to the write-ahead log file func (c *ConsensusConfig) WalFile() string { if c.walFile != "" { return c.walFile @@ -329,6 +386,7 @@ func (c *ConsensusConfig) WalFile() string { return rootify(c.WalPath, c.RootDir) } +// SetWalFile sets the path to the write-ahead log file func (c *ConsensusConfig) SetWalFile(walFile string) { c.walFile = walFile } diff --git a/consensus/reactor.go b/consensus/reactor.go index 50207ed5..1d8155c1 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -9,10 +9,12 @@ import ( "time" wire "github.com/tendermint/go-wire" + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" + "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" - . "github.com/tendermint/tmlibs/common" ) const ( @@ -21,13 +23,12 @@ const ( VoteChannel = byte(0x22) VoteSetBitsChannel = byte(0x23) - peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. - peerQueryMaj23SleepDuration = 2 * time.Second // Time to sleep after each VoteSetMaj23Message sent - maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes. + maxConsensusMessageSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes. ) //----------------------------------------------------------------------------- +// ConsensusReactor defines a reactor for the consensus service. type ConsensusReactor struct { p2p.BaseReactor // BaseService + p2p.Switch @@ -36,6 +37,7 @@ type ConsensusReactor struct { evsw types.EventSwitch } +// NewConsensusReactor returns a new ConsensusReactor with the given consensusState. func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor { conR := &ConsensusReactor{ conS: consensusState, @@ -45,6 +47,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens return conR } +// OnStart implements BaseService. func (conR *ConsensusReactor) OnStart() error { conR.Logger.Info("ConsensusReactor ", "fastSync", conR.fastSync) conR.BaseReactor.OnStart() @@ -62,13 +65,14 @@ func (conR *ConsensusReactor) OnStart() error { return nil } +// OnStop implements BaseService func (conR *ConsensusReactor) OnStop() { conR.BaseReactor.OnStop() conR.conS.Stop() } -// Switch from the fast_sync to the consensus: -// reset the state, turn off fast_sync, start the consensus-state-machine +// SwitchToConsensus switches from fast_sync mode to consensus mode. +// It resets the state, turns off fast_sync, and starts the consensus state-machine func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { conR.Logger.Info("SwitchToConsensus") conR.conS.reconstructLastCommit(state) @@ -79,7 +83,7 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { conR.conS.Start() } -// Implements Reactor +// GetChannels implements Reactor func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // TODO optimize return []*p2p.ChannelDescriptor{ @@ -109,7 +113,7 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { } } -// Implements Reactor +// AddPeer implements Reactor func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { if !conR.IsRunning() { return @@ -131,7 +135,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { } } -// Implements Reactor +// RemovePeer implements Reactor func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { if !conR.IsRunning() { return @@ -140,7 +144,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect() } -// Implements Reactor +// Receive implements Reactor // NOTE: We process these messages even when we're fast_syncing. // Messages affect either a peer state or the consensus state. // Peer state updates can happen in parallel, but processing of @@ -184,7 +188,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID) // Respond with a VoteSetBitsMessage showing which votes we have. // (and consequently shows which we don't have) - var ourVotes *BitArray + var ourVotes *cmn.BitArray switch msg.Type { case types.VoteTypePrevote: ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) @@ -202,7 +206,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) Votes: ourVotes, }}) default: - conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } case DataChannel: @@ -220,7 +224,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) conR.conS.peerMsgQueue <- msgInfo{msg, src.Key} default: - conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } case VoteChannel: @@ -242,7 +246,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) default: // don't punish (leave room for soft upgrades) - conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } case VoteSetBitsChannel: @@ -258,7 +262,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) cs.mtx.Unlock() if height == msg.Height { - var ourVotes *BitArray + var ourVotes *cmn.BitArray switch msg.Type { case types.VoteTypePrevote: ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) @@ -274,11 +278,11 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } default: // don't punish (leave room for soft upgrades) - conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } default: - conR.Logger.Error(Fmt("Unknown chId %X", chID)) + conR.Logger.Error(cmn.Fmt("Unknown chId %X", chID)) } if err != nil { @@ -286,7 +290,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } } -// implements events.Eventable +// SetEventSwitch implements events.Eventable func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { conR.evsw = evsw conR.conS.SetEventSwitch(evsw) @@ -390,7 +394,6 @@ OUTER_LOOP: // Send proposal Block parts? if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) { - //logger.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts) if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok { part := rs.ProposalBlockParts.GetPart(index) msg := &BlockPartMessage{ @@ -398,6 +401,7 @@ OUTER_LOOP: Round: rs.Round, // This tells peer that this part applies to us. Part: part, } + logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round) if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } @@ -407,49 +411,15 @@ OUTER_LOOP: // If the peer is on a previous height, help catch up. if (0 < prs.Height) && (prs.Height < rs.Height) { - //logger.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts) - if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { - // Ensure that the peer's PartSetHeader is correct - blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height) - if blockMeta == nil { - logger.Error("Failed to load block meta", "peer height", prs.Height, "ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height(), "pv", conR.conS.privValidator) - time.Sleep(peerGossipSleepDuration) - continue OUTER_LOOP - } else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) { - logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping", - "peerHeight", prs.Height, "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) - time.Sleep(peerGossipSleepDuration) - continue OUTER_LOOP - } - // Load the part - part := conR.conS.blockStore.LoadBlockPart(prs.Height, index) - if part == nil { - logger.Error("Could not load part", "index", index, - "peerHeight", prs.Height, "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) - time.Sleep(peerGossipSleepDuration) - continue OUTER_LOOP - } - // Send the part - msg := &BlockPartMessage{ - Height: prs.Height, // Not our height, so it doesn't matter. - Round: prs.Round, // Not our height, so it doesn't matter. - Part: part, - } - if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { - ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) - } - continue OUTER_LOOP - } else { - //logger.Info("No parts to send in catch-up, sleeping") - time.Sleep(peerGossipSleepDuration) - continue OUTER_LOOP - } + heightLogger := logger.With("height", prs.Height) + conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer) + continue OUTER_LOOP } // If height and round don't match, sleep. if (rs.Height != prs.Height) || (rs.Round != prs.Round) { //logger.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer) - time.Sleep(peerGossipSleepDuration) + time.Sleep(conR.conS.config.PeerGossipSleep()) continue OUTER_LOOP } @@ -463,6 +433,7 @@ OUTER_LOOP: // Proposal: share the proposal metadata with peer. { msg := &ProposalMessage{Proposal: rs.Proposal} + logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round) if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { ps.SetHasProposal(rs.Proposal) } @@ -477,17 +448,61 @@ OUTER_LOOP: ProposalPOLRound: rs.Proposal.POLRound, ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(), } + logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round) peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) } continue OUTER_LOOP } // Nothing to do. Sleep. - time.Sleep(peerGossipSleepDuration) + time.Sleep(conR.conS.config.PeerGossipSleep()) continue OUTER_LOOP } } +func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *RoundState, + prs *PeerRoundState, ps *PeerState, peer *p2p.Peer) { + + if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { + // Ensure that the peer's PartSetHeader is correct + blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height) + if blockMeta == nil { + logger.Error("Failed to load block meta", + "ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height()) + time.Sleep(conR.conS.config.PeerGossipSleep()) + return + } else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) { + logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping", + "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) + time.Sleep(conR.conS.config.PeerGossipSleep()) + return + } + // Load the part + part := conR.conS.blockStore.LoadBlockPart(prs.Height, index) + if part == nil { + logger.Error("Could not load part", "index", index, + "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) + time.Sleep(conR.conS.config.PeerGossipSleep()) + return + } + // Send the part + msg := &BlockPartMessage{ + Height: prs.Height, // Not our height, so it doesn't matter. + Round: prs.Round, // Not our height, so it doesn't matter. + Part: part, + } + logger.Debug("Sending block part for catchup", "round", prs.Round) + if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { + ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) + } + return + } else { + //logger.Info("No parts to send in catch-up, sleeping") + time.Sleep(conR.conS.config.PeerGossipSleep()) + return + } +} + func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) @@ -516,35 +531,9 @@ OUTER_LOOP: // If height matches, then send LastCommit, Prevotes, Precommits. if rs.Height == prs.Height { - // If there are lastCommits to send... - if prs.Step == RoundStepNewHeight { - if ps.PickSendVote(rs.LastCommit) { - logger.Debug("Picked rs.LastCommit to send") - continue OUTER_LOOP - } - } - // If there are prevotes to send... - if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { - logger.Debug("Picked rs.Prevotes(prs.Round) to send") - continue OUTER_LOOP - } - } - // If there are precommits to send... - if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { - logger.Debug("Picked rs.Precommits(prs.Round) to send") - continue OUTER_LOOP - } - } - // If there are POLPrevotes to send... - if prs.ProposalPOLRound != -1 { - if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ps.PickSendVote(polPrevotes) { - logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send") - continue OUTER_LOOP - } - } + heightLogger := logger.With("height", prs.Height) + if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) { + continue OUTER_LOOP } } @@ -552,7 +541,7 @@ OUTER_LOOP: // If peer is lagging by height 1, send LastCommit. if prs.Height != 0 && rs.Height == prs.Height+1 { if ps.PickSendVote(rs.LastCommit) { - logger.Debug("Picked rs.LastCommit to send") + logger.Debug("Picked rs.LastCommit to send", "height", prs.Height) continue OUTER_LOOP } } @@ -565,7 +554,7 @@ OUTER_LOOP: commit := conR.conS.blockStore.LoadBlockCommit(prs.Height) logger.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit) if ps.PickSendVote(commit) { - logger.Debug("Picked Catchup commit to send") + logger.Debug("Picked Catchup commit to send", "height", prs.Height) continue OUTER_LOOP } } @@ -573,7 +562,7 @@ OUTER_LOOP: if sleeping == 0 { // We sent nothing. Sleep... sleeping = 1 - logger.Debug("No votes to send, sleeping", + logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height, "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes, "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits) } else if sleeping == 2 { @@ -581,11 +570,47 @@ OUTER_LOOP: sleeping = 1 } - time.Sleep(peerGossipSleepDuration) + time.Sleep(conR.conS.config.PeerGossipSleep()) continue OUTER_LOOP } } +func (conR *ConsensusReactor) gossipVotesForHeight(logger log.Logger, rs *RoundState, prs *PeerRoundState, ps *PeerState) bool { + + // If there are lastCommits to send... + if prs.Step == RoundStepNewHeight { + if ps.PickSendVote(rs.LastCommit) { + logger.Debug("Picked rs.LastCommit to send") + return true + } + } + // If there are prevotes to send... + if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round { + if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) + return true + } + } + // If there are precommits to send... + if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round { + if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { + logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round) + return true + } + } + // If there are POLPrevotes to send... + if prs.ProposalPOLRound != -1 { + if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { + if ps.PickSendVote(polPrevotes) { + logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", + "round", prs.ProposalPOLRound) + return true + } + } + } + return false +} + // NOTE: `queryMaj23Routine` has a simple crude design since it only comes // into play for liveness when there's a signature DDoS attack happening. func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) { @@ -611,7 +636,7 @@ OUTER_LOOP: Type: types.VoteTypePrevote, BlockID: maj23, }}) - time.Sleep(peerQueryMaj23SleepDuration) + time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) } } } @@ -628,7 +653,7 @@ OUTER_LOOP: Type: types.VoteTypePrecommit, BlockID: maj23, }}) - time.Sleep(peerQueryMaj23SleepDuration) + time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) } } } @@ -645,7 +670,7 @@ OUTER_LOOP: Type: types.VoteTypePrevote, BlockID: maj23, }}) - time.Sleep(peerQueryMaj23SleepDuration) + time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) } } } @@ -664,21 +689,25 @@ OUTER_LOOP: Type: types.VoteTypePrecommit, BlockID: commit.BlockID, }}) - time.Sleep(peerQueryMaj23SleepDuration) + time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) } } - time.Sleep(peerQueryMaj23SleepDuration) + time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) continue OUTER_LOOP } } +// String returns a string representation of the ConsensusReactor. +// NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables. +// TODO: improve! func (conR *ConsensusReactor) String() string { // better not to access shared variables return "ConsensusReactor" // conR.StringIndented("") } +// StringIndented returns an indented string representation of the ConsensusReactor func (conR *ConsensusReactor) StringIndented(indent string) string { s := "ConsensusReactor{\n" s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n" @@ -692,7 +721,8 @@ func (conR *ConsensusReactor) StringIndented(indent string) string { //----------------------------------------------------------------------------- -// Read only when returned by PeerState.GetRoundState(). +// PeerRoundState contains the known state of a peer. +// NOTE: Read-only when returned by PeerState.GetRoundState(). type PeerRoundState struct { Height int // Height peer is at Round int // Round peer is at, -1 if unknown. @@ -700,21 +730,23 @@ type PeerRoundState struct { StartTime time.Time // Estimated start of round 0 at this height Proposal bool // True if peer has proposal for this round ProposalBlockPartsHeader types.PartSetHeader // - ProposalBlockParts *BitArray // + ProposalBlockParts *cmn.BitArray // ProposalPOLRound int // Proposal's POL round. -1 if none. - ProposalPOL *BitArray // nil until ProposalPOLMessage received. - Prevotes *BitArray // All votes peer has for this round - Precommits *BitArray // All precommits peer has for this round + ProposalPOL *cmn.BitArray // nil until ProposalPOLMessage received. + Prevotes *cmn.BitArray // All votes peer has for this round + Precommits *cmn.BitArray // All precommits peer has for this round LastCommitRound int // Round of commit for last height. -1 if none. - LastCommit *BitArray // All commit precommits of commit for last height. + LastCommit *cmn.BitArray // All commit precommits of commit for last height. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none. - CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound + CatchupCommit *cmn.BitArray // All commit precommits peer has for this height & CatchupCommitRound } +// String returns a string representation of the PeerRoundState func (prs PeerRoundState) String() string { return prs.StringIndented("") } +// StringIndented returns a string representation of the PeerRoundState func (prs PeerRoundState) StringIndented(indent string) string { return fmt.Sprintf(`PeerRoundState{ %s %v/%v/%v @%v @@ -742,6 +774,8 @@ var ( ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime") ) +// PeerState contains the known state of a peer, including its connection +// and threadsafe access to its PeerRoundState. type PeerState struct { Peer *p2p.Peer @@ -749,6 +783,7 @@ type PeerState struct { PeerRoundState } +// NewPeerState returns a new PeerState for the given Peer func NewPeerState(peer *p2p.Peer) *PeerState { return &PeerState{ Peer: peer, @@ -761,7 +796,7 @@ func NewPeerState(peer *p2p.Peer) *PeerState { } } -// Returns an atomic snapshot of the PeerRoundState. +// GetRoundState returns an atomic snapshot of the PeerRoundState. // There's no point in mutating it since it won't change PeerState. func (ps *PeerState) GetRoundState() *PeerRoundState { ps.mtx.Lock() @@ -771,7 +806,7 @@ func (ps *PeerState) GetRoundState() *PeerRoundState { return &prs } -// Returns an atomic snapshot of the PeerRoundState's height +// GetHeight returns an atomic snapshot of the PeerRoundState's height // used by the mempool to ensure peers are caught up before broadcasting new txs func (ps *PeerState) GetHeight() int { ps.mtx.Lock() @@ -779,6 +814,7 @@ func (ps *PeerState) GetHeight() int { return ps.PeerRoundState.Height } +// SetHasProposal sets the given proposal as known for the peer. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -792,11 +828,12 @@ func (ps *PeerState) SetHasProposal(proposal *types.Proposal) { ps.Proposal = true ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader - ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total) + ps.ProposalBlockParts = cmn.NewBitArray(proposal.BlockPartsHeader.Total) ps.ProposalPOLRound = proposal.POLRound ps.ProposalPOL = nil // Nil until ProposalPOLMessage received. } +// SetHasProposalBlockPart sets the given block part index as known for the peer. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -808,9 +845,9 @@ func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) { ps.ProposalBlockParts.SetIndex(index, true) } -// PickVoteToSend sends vote to peer. +// PickSendVote picks a vote and sends it to the peer. // Returns true if vote was sent. -func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) { +func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { if vote, ok := ps.PickVoteToSend(votes); ok { msg := &VoteMessage{vote} return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg}) @@ -818,7 +855,9 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) { return false } -// votes: Must be the correct Size() for the Height(). +// PickVoteToSend picks a vote to send to the peer. +// Returns true if a vote was picked. +// NOTE: `votes` must be the correct Size() for the Height(). func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -846,9 +885,9 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote return nil, false } -func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray { +func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *cmn.BitArray { if !types.IsVoteTypeValid(type_) { - PanicSanity("Invalid vote type") + cmn.PanicSanity("Invalid vote type") } if ps.Height == height { @@ -901,7 +940,7 @@ func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators i NOTE: This is wrong, 'round' could change. e.g. if orig round is not the same as block LastCommit round. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round { - PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round)) + cmn.PanicSanity(cmn.Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round)) } */ if ps.CatchupCommitRound == round { @@ -911,10 +950,12 @@ func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators i if round == ps.Round { ps.CatchupCommit = ps.Precommits } else { - ps.CatchupCommit = NewBitArray(numValidators) + ps.CatchupCommit = cmn.NewBitArray(numValidators) } } +// EnsureVoteVitArrays ensures the bit-arrays have been allocated for tracking +// what votes this peer has received. // NOTE: It's important to make sure that numValidators actually matches // what the node sees as the number of validators for height. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) { @@ -926,24 +967,25 @@ func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) { func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) { if ps.Height == height { if ps.Prevotes == nil { - ps.Prevotes = NewBitArray(numValidators) + ps.Prevotes = cmn.NewBitArray(numValidators) } if ps.Precommits == nil { - ps.Precommits = NewBitArray(numValidators) + ps.Precommits = cmn.NewBitArray(numValidators) } if ps.CatchupCommit == nil { - ps.CatchupCommit = NewBitArray(numValidators) + ps.CatchupCommit = cmn.NewBitArray(numValidators) } if ps.ProposalPOL == nil { - ps.ProposalPOL = NewBitArray(numValidators) + ps.ProposalPOL = cmn.NewBitArray(numValidators) } } else if ps.Height == height+1 { if ps.LastCommit == nil { - ps.LastCommit = NewBitArray(numValidators) + ps.LastCommit = cmn.NewBitArray(numValidators) } } } +// SetHasVote sets the given vote as known by the peer func (ps *PeerState) SetHasVote(vote *types.Vote) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -959,6 +1001,7 @@ func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) { ps.getVoteBitArray(height, round, type_).SetIndex(index, true) } +// ApplyNewRoundStepMessage updates the peer state for the new round. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1012,6 +1055,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { } } +// ApplyCommitStepMessage updates the peer state for the new commit. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1024,6 +1068,7 @@ func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) { ps.ProposalBlockParts = msg.BlockParts } +// ApplyProposalPOLMessage updates the peer state for the new proposal POL. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1040,6 +1085,7 @@ func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) { ps.ProposalPOL = msg.ProposalPOL } +// ApplyHasVoteMessage updates the peer state for the new vote. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1051,12 +1097,12 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index) } -// The peer has responded with a bitarray of votes that it has -// of the corresponding BlockID. -// ourVotes: BitArray of votes we have for msg.BlockID +// ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes +// it claims to have for the corresponding BlockID. +// `ourVotes` is a BitArray of votes we have for msg.BlockID // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height), // we conservatively overwrite ps's votes w/ msg.Votes. -func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *BitArray) { +func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *cmn.BitArray) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1072,10 +1118,12 @@ func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes * } } +// String returns a string representation of the PeerState func (ps *PeerState) String() string { return ps.StringIndented("") } +// StringIndented returns a string representation of the PeerState func (ps *PeerState) StringIndented(indent string) string { return fmt.Sprintf(`PeerState{ %s Key %v @@ -1101,6 +1149,7 @@ const ( msgTypeVoteSetBits = byte(0x17) ) +// ConsensusMessage is a message that can be sent and received on the ConsensusReactor type ConsensusMessage interface{} var _ = wire.RegisterInterface( @@ -1116,17 +1165,20 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits}, ) +// DecodeMessage decodes the given bytes into a ConsensusMessage. // TODO: check for unnecessary extra bytes at the end. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) { msgType = bz[0] n := new(int) r := bytes.NewReader(bz) - msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err).(struct{ ConsensusMessage }).ConsensusMessage + msgI := wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err) + msg = msgI.(struct{ ConsensusMessage }).ConsensusMessage return } //------------------------------------- +// NewRoundStepMessage is sent for every step taken in the ConsensusState. // For every height/round/step transition type NewRoundStepMessage struct { Height int @@ -1136,6 +1188,7 @@ type NewRoundStepMessage struct { LastCommitRound int } +// String returns a string representation. func (m *NewRoundStepMessage) String() string { return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]", m.Height, m.Round, m.Step, m.LastCommitRound) @@ -1143,62 +1196,73 @@ func (m *NewRoundStepMessage) String() string { //------------------------------------- +// CommitStepMessage is sent when a block is committed. type CommitStepMessage struct { Height int BlockPartsHeader types.PartSetHeader - BlockParts *BitArray + BlockParts *cmn.BitArray } +// String returns a string representation. func (m *CommitStepMessage) String() string { return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts) } //------------------------------------- +// ProposalMessage is sent when a new block is proposed. type ProposalMessage struct { Proposal *types.Proposal } +// String returns a string representation. func (m *ProposalMessage) String() string { return fmt.Sprintf("[Proposal %v]", m.Proposal) } //------------------------------------- +// ProposalPOLMessage is sent when a previous proposal is re-proposed. type ProposalPOLMessage struct { Height int ProposalPOLRound int - ProposalPOL *BitArray + ProposalPOL *cmn.BitArray } +// String returns a string representation. func (m *ProposalPOLMessage) String() string { return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL) } //------------------------------------- +// BlockPartMessage is sent when gossipping a piece of the proposed block. type BlockPartMessage struct { Height int Round int Part *types.Part } +// String returns a string representation. func (m *BlockPartMessage) String() string { return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part) } //------------------------------------- +// VoteMessage is sent when voting for a proposal (or lack thereof). type VoteMessage struct { Vote *types.Vote } +// String returns a string representation. func (m *VoteMessage) String() string { return fmt.Sprintf("[Vote %v]", m.Vote) } //------------------------------------- +// HasVoteMessage is sent to indicate that a particular vote has been received. type HasVoteMessage struct { Height int Round int @@ -1206,12 +1270,14 @@ type HasVoteMessage struct { Index int } +// String returns a string representation. func (m *HasVoteMessage) String() string { return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index) } //------------------------------------- +// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes. type VoteSetMaj23Message struct { Height int Round int @@ -1219,20 +1285,23 @@ type VoteSetMaj23Message struct { BlockID types.BlockID } +// String returns a string representation. func (m *VoteSetMaj23Message) String() string { return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID) } //------------------------------------- +// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID. type VoteSetBitsMessage struct { Height int Round int Type byte BlockID types.BlockID - Votes *BitArray + Votes *cmn.BitArray } +// String returns a string representation. func (m *VoteSetBitsMessage) String() string { return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes) } diff --git a/consensus/state.go b/consensus/state.go index cc9cd51e..98255186 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -912,16 +912,17 @@ func (cs *ConsensusState) enterPrevote(height int, round int) { } func (cs *ConsensusState) defaultDoPrevote(height int, round int) { + logger := cs.Logger.With("height", height, "round", round) // If a block is locked, prevote that. if cs.LockedBlock != nil { - cs.Logger.Info("enterPrevote: Block was locked") + logger.Info("enterPrevote: Block was locked") cs.signAddVote(types.VoteTypePrevote, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header()) return } // If ProposalBlock is nil, prevote nil. if cs.ProposalBlock == nil { - cs.Logger.Info("enterPrevote: ProposalBlock is nil") + logger.Info("enterPrevote: ProposalBlock is nil") cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) return } @@ -930,7 +931,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int, round int) { err := cs.state.ValidateBlock(cs.ProposalBlock) if err != nil { // ProposalBlock is invalid, prevote nil. - cs.Logger.Error("enterPrevote: ProposalBlock is invalid", "err", err) + logger.Error("enterPrevote: ProposalBlock is invalid", "err", err) cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) return } @@ -938,6 +939,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int, round int) { // Prevote cs.ProposalBlock // NOTE: the proposal signature is validated when it is received, // and the proposal block parts are validated as they are received (against the merkle hash in the proposal) + logger.Info("enterPrevote: ProposalBlock is valid") cs.signAddVote(types.VoteTypePrevote, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header()) } @@ -1331,19 +1333,14 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerKey string) error { if err == ErrVoteHeightMismatch { return err } else if _, ok := err.(*types.ErrVoteConflictingVotes); ok { - if peerKey == "" { + if bytes.Equal(vote.ValidatorAddress, cs.privValidator.GetAddress()) { cs.Logger.Error("Found conflicting vote from ourselves. Did you unsafe_reset a validator?", "height", vote.Height, "round", vote.Round, "type", vote.Type) return err } - cs.Logger.Error("Found conflicting vote. Publish evidence (TODO)") - /* TODO - evidenceTx := &types.DupeoutTx{ - Address: address, - VoteA: *errDupe.VoteA, - VoteB: *errDupe.VoteB, - } - cs.mempool.BroadcastTx(struct{???}{evidenceTx}) // shouldn't need to check returned err - */ + cs.Logger.Error("Found conflicting vote. Publish evidence (TODO)", "height", vote.Height, "round", vote.Round, "type", vote.Type, "valAddr", vote.ValidatorAddress, "valIndex", vote.ValidatorIndex) + + // TODO: track evidence for inclusion in a block + return err } else { // Probably an invalid signature. Bad peer. diff --git a/p2p/connection.go b/p2p/connection.go index 67c9d98f..1d97d455 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -21,7 +21,11 @@ const ( minWriteBufferSize = 65536 updateState = 2 * time.Second pingTimeout = 40 * time.Second - flushThrottle = 100 * time.Millisecond + + // flushThrottle used here as a default. + // overwritten by the user config. + // TODO: remove + flushThrottle = 100 * time.Millisecond defaultSendQueueCapacity = 1 defaultSendRate = int64(512000) // 500KB/s @@ -89,13 +93,16 @@ type MConnection struct { type MConnConfig struct { SendRate int64 `mapstructure:"send_rate"` RecvRate int64 `mapstructure:"recv_rate"` + + flushThrottle time.Duration } // DefaultMConnConfig returns the default config. func DefaultMConnConfig() *MConnConfig { return &MConnConfig{ - SendRate: defaultSendRate, - RecvRate: defaultRecvRate, + SendRate: defaultSendRate, + RecvRate: defaultRecvRate, + flushThrottle: flushThrottle, } } @@ -145,10 +152,11 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec return mconn } +// OnStart implements BaseService func (c *MConnection) OnStart() error { c.BaseService.OnStart() c.quit = make(chan struct{}) - c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle) + c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState) go c.sendRoutine() @@ -156,6 +164,7 @@ func (c *MConnection) OnStart() error { return nil } +// OnStop implements BaseService func (c *MConnection) OnStop() { c.BaseService.OnStop() c.flushTimer.Stop() diff --git a/p2p/peer.go b/p2p/peer.go index 2602206c..c15f61d3 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -59,11 +59,9 @@ func DefaultPeerConfig() *PeerConfig { } } -func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { - return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) -} +func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, + onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { -func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { conn, err := dial(addr, config) if err != nil { return nil, errors.Wrap(err, "Error creating peer") @@ -77,15 +75,15 @@ func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, return peer, nil } -func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { - return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) -} +func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, + onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { -func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) } -func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { +func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, + onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + conn := rawConn // Fuzz connection @@ -286,7 +284,9 @@ func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { return conn, nil } -func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { +func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, + onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { + onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 0ac77634..347de784 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -82,7 +82,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) } reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} pk := crypto.GenPrivKeyEd25519() - p, err := newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk, config) + p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk, config) if err != nil { return nil, err } @@ -133,7 +133,7 @@ func (p *remotePeer) accept(l net.Listener) { if err != nil { golog.Fatalf("Failed to accept conn: %+v", err) } - peer, err := newInboundPeerWithConfig(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config) + peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config) if err != nil { golog.Fatalf("Failed to create a peer: %+v", err) } diff --git a/p2p/switch.go b/p2p/switch.go index 2d8d3435..49687511 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -90,11 +90,13 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { dialing: cmn.NewCMap(), nodeInfo: nil, } + sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond // TODO: collapse the peerConfig into the config ? sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) return sw } -// Not goroutine safe. +// AddReactor adds the given reactor to the switch. +// NOTE: Not goroutine safe. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { // Validate the reactor. // No two reactors can share the same channel. @@ -112,43 +114,51 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { return reactor } -// Not goroutine safe. +// Reactors returns a map of reactors registered on the switch. +// NOTE: Not goroutine safe. func (sw *Switch) Reactors() map[string]Reactor { return sw.reactors } -// Not goroutine safe. +// Reactor returns the reactor with the given name. +// NOTE: Not goroutine safe. func (sw *Switch) Reactor(name string) Reactor { return sw.reactors[name] } -// Not goroutine safe. +// AddListener adds the given listener to the switch for listening to incoming peer connections. +// NOTE: Not goroutine safe. func (sw *Switch) AddListener(l Listener) { sw.listeners = append(sw.listeners, l) } -// Not goroutine safe. +// Listeners returns the list of listeners the switch listens on. +// NOTE: Not goroutine safe. func (sw *Switch) Listeners() []Listener { return sw.listeners } -// Not goroutine safe. +// IsListening returns true if the switch has at least one listener. +// NOTE: Not goroutine safe. func (sw *Switch) IsListening() bool { return len(sw.listeners) > 0 } -// Not goroutine safe. +// SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes. +// NOTE: Not goroutine safe. func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) { sw.nodeInfo = nodeInfo } -// Not goroutine safe. +// NodeInfo returns the switch's NodeInfo. +// NOTE: Not goroutine safe. func (sw *Switch) NodeInfo() *NodeInfo { return sw.nodeInfo } -// Not goroutine safe. -// NOTE: Overwrites sw.nodeInfo.PubKey +// SetNodePrivKey sets the switche's private key for authenticated encryption. +// NOTE: Overwrites sw.nodeInfo.PubKey. +// NOTE: Not goroutine safe. func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) { sw.nodePrivKey = nodePrivKey if sw.nodeInfo != nil { @@ -156,7 +166,7 @@ func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) { } } -// Switch.Start() starts all the reactors, peers, and listeners. +// OnStart implements BaseService. It starts all the reactors, peers, and listeners. func (sw *Switch) OnStart() error { sw.BaseService.OnStart() // Start reactors @@ -177,6 +187,7 @@ func (sw *Switch) OnStart() error { return nil } +// OnStop implements BaseService. It stops all listeners, peers, and reactors. func (sw *Switch) OnStop() { sw.BaseService.OnStop() // Stop listeners @@ -195,6 +206,8 @@ func (sw *Switch) OnStop() { } } +// AddPeer checks the given peer's validity, performs a handshake, and adds the peer to the switch +// and to all registered reactors. // NOTE: This performs a blocking handshake before the peer is added. // CONTRACT: If error is returned, peer is nil, and conn is immediately closed. func (sw *Switch) AddPeer(peer *Peer) error { @@ -242,6 +255,7 @@ func (sw *Switch) AddPeer(peer *Peer) error { return nil } +// FilterConnByAddr returns an error if connecting to the given address is forbidden. func (sw *Switch) FilterConnByAddr(addr net.Addr) error { if sw.filterConnByAddr != nil { return sw.filterConnByAddr(addr) @@ -249,6 +263,7 @@ func (sw *Switch) FilterConnByAddr(addr net.Addr) error { return nil } +// FilterConnByPubKey returns an error if connecting to the given public key is forbidden. func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error { if sw.filterConnByPubKey != nil { return sw.filterConnByPubKey(pubkey) @@ -257,10 +272,12 @@ func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error { } +// SetAddrFilter sets the function for filtering connections by address. func (sw *Switch) SetAddrFilter(f func(net.Addr) error) { sw.filterConnByAddr = f } +// SetPubKeyFilter sets the function for filtering connections by public key. func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) { sw.filterConnByPubKey = f } @@ -272,7 +289,7 @@ func (sw *Switch) startInitPeer(peer *Peer) { } } -// Dial a list of seeds asynchronously in random order +// DialSeeds dials a list of seeds asynchronously in random order func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { netAddrs, err := NewNetAddressStrings(seeds) @@ -315,12 +332,14 @@ func (sw *Switch) dialSeed(addr *NetAddress) { } } +// DialPeerWithAddress dials the given peer and runs sw.AddPeer if it connects successfully. +// If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { sw.dialing.Set(addr.IP.String(), addr) defer sw.dialing.Delete(addr.IP.String()) sw.Logger.Info("Dialing peer", "address", addr) - peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) + peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) if err != nil { sw.Logger.Error("Failed to dial peer", "address", addr, "err", err) return nil, err @@ -339,6 +358,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, return peer, nil } +// IsDialing returns true if the switch is currently dialing the given address. func (sw *Switch) IsDialing(addr *NetAddress) bool { return sw.dialing.Has(addr.IP.String()) } @@ -347,6 +367,7 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { // trying to send for defaultSendTimeoutSeconds. Returns a channel // which receives success values for each attempted send (false if times out) // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. +// TODO: Something more intelligent. func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { successChan := make(chan bool, len(sw.peers.List())) sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) @@ -359,7 +380,7 @@ func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { return successChan } -// Returns the count of outbound/inbound and outbound-dialing peers. +// NumPeers returns the count of outbound/inbound and outbound-dialing peers. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { peers := sw.peers.List() for _, peer := range peers { @@ -373,11 +394,13 @@ func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { return } +// Peers returns the set of peers the switch is connected to. func (sw *Switch) Peers() IPeerSet { return sw.peers } -// Disconnect from a peer due to external error, retry if it is a persistent peer. +// StopPeerForError disconnects from a peer due to external error. +// If the peer is persistent, it will attempt to reconnect. // TODO: make record depending on reason. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { addr := NewNetAddress(peer.Addr()) @@ -410,7 +433,7 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { } } -// Disconnect from a peer gracefully. +// StopPeerGracefully disconnects from a peer gracefully. // TODO: handle graceful disconnects. func (sw *Switch) StopPeerGracefully(peer *Peer) { sw.Logger.Info("Stopping peer gracefully") @@ -468,7 +491,7 @@ type SwitchEventDonePeer struct { //------------------------------------------------------------------ // Switches connected via arbitrary net.Conn; useful for testing -// Returns n switches, connected according to the connect func. +// MakeConnectedSwitches returns n switches, connected according to the connect func. // If connect==Connect2Switches, the switches will be fully connected. // initSwitch defines how the ith switch should be initialized (ie. with what reactors). // NOTE: panics if any switch fails to start. @@ -493,7 +516,7 @@ func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Swit var PanicOnAddPeerErr = false -// Will connect switches i and j via net.Pipe() +// Connect2Switches will connect switches i and j via net.Pipe() // Blocks until a conection is established. // NOTE: caller ensures i and j are within bounds func Connect2Switches(switches []*Switch, i, j int) { @@ -519,6 +542,8 @@ func Connect2Switches(switches []*Switch, i, j int) { <-doneCh } +// StartSwitches calls sw.Start() for each given switch. +// It returns the first encountered error. func StartSwitches(switches []*Switch) error { for _, s := range switches { _, err := s.Start() // start switch and reactors @@ -547,7 +572,7 @@ func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f } func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) + peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) if err != nil { conn.Close() return err @@ -562,7 +587,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { } func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { - peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) + peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) if err != nil { conn.Close() return err diff --git a/p2p/switch_test.go b/p2p/switch_test.go index eed7d1fa..c686f9e4 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -244,7 +244,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { rp.Start() defer rp.Stop() - peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig()) require.Nil(err) err = sw.AddPeer(peer) require.Nil(err) @@ -270,7 +270,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { rp.Start() defer rp.Stop() - peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig()) peer.makePersistent() require.Nil(err) err = sw.AddPeer(peer) diff --git a/version/version.go b/version/version.go index e77f0e40..23ea294c 100644 --- a/version/version.go +++ b/version/version.go @@ -2,11 +2,11 @@ package version const Maj = "0" const Min = "10" -const Fix = "1" +const Fix = "2" var ( // The full version string - Version = "0.10.1" + Version = "0.10.2" // GitCommit is set with --ldflags "-X main.gitCommit=$(git rev-parse HEAD)" GitCommit string