From 850da13622258c86de1e17e81be2c3d1c84e1a8d Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Tue, 27 Jun 2017 13:31:32 +0200 Subject: [PATCH 01/13] Do some cleanup in the test, so it doesn't fail in my shell --- cmd/tendermint/commands/root_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cmd/tendermint/commands/root_test.go b/cmd/tendermint/commands/root_test.go index f776a024..01d69bb6 100644 --- a/cmd/tendermint/commands/root_test.go +++ b/cmd/tendermint/commands/root_test.go @@ -25,6 +25,11 @@ const ( // isolate provides a clean setup and returns a copy of RootCmd you can // modify in the test cases func isolate(cmds ...*cobra.Command) cli.Executable { + os.Unsetenv("TMHOME") + os.Unsetenv("TM_HOME") + os.Unsetenv("TMROOT") + os.Unsetenv("TM_ROOT") + viper.Reset() config = cfg.DefaultConfig() r := &cobra.Command{ From ca8c34f966a5cf0f501c1ae098bdf046010a4ee3 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 7 Jul 2017 12:39:40 -0400 Subject: [PATCH 02/13] add consensus reactor sleep durations to the config --- CHANGELOG.md | 5 +++ config/config.go | 84 ++++++++++++++++++++++++++++++++------------ consensus/reactor.go | 28 +++++++-------- 3 files changed, 80 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85072172..1dbf7a70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.10.2 (TBD) + +FEATURES: +- Add consensus reactor sleep durations to the config + ## 0.10.1 (June 28, 2017) FEATURES: diff --git a/config/config.go b/config/config.go index 42ca6099..56708442 100644 --- a/config/config.go +++ b/config/config.go @@ -5,9 +5,10 @@ import ( "path/filepath" "time" - "github.com/tendermint/tendermint/types" + "github.com/tendermint/tendermint/types" // TODO: remove ) +// Config defines the top level configuration for a Tendermint node type Config struct { // Top level options use an anonymous struct BaseConfig `mapstructure:",squash"` @@ -19,6 +20,7 @@ type Config struct { Consensus *ConsensusConfig `mapstructure:"consensus"` } +// DefaultConfig returns a default configuration for a Tendermint node func DefaultConfig() *Config { return &Config{ BaseConfig: DefaultBaseConfig(), @@ -29,6 +31,7 @@ func DefaultConfig() *Config { } } +// TestConfig returns a configuration that can be used for testing func TestConfig() *Config { return &Config{ BaseConfig: TestBaseConfig(), @@ -39,7 +42,7 @@ func TestConfig() *Config { } } -// Set the RootDir for all Config structs +// SetRoot sets the RootDir for all Config structs func (cfg *Config) SetRoot(root string) *Config { cfg.BaseConfig.RootDir = root cfg.RPC.RootDir = root @@ -52,7 +55,7 @@ func (cfg *Config) SetRoot(root string) *Config { //----------------------------------------------------------------------------- // BaseConfig -// BaseConfig struct for a Tendermint node +// BaseConfig defines the base configuration for a Tendermint node type BaseConfig struct { // The root directory for all data. // This should be set in viper so it can unmarshal into this struct @@ -102,6 +105,7 @@ type BaseConfig struct { DBPath string `mapstructure:"db_dir"` } +// DefaultBaseConfig returns a default base configuration for a Tendermint node func DefaultBaseConfig() BaseConfig { return BaseConfig{ Genesis: "genesis.json", @@ -119,6 +123,7 @@ func DefaultBaseConfig() BaseConfig { } } +// TestBaseConfig returns a base configuration for testing a Tendermint node func TestBaseConfig() BaseConfig { conf := DefaultBaseConfig() conf.ChainID = "tendermint_test" @@ -128,22 +133,27 @@ func TestBaseConfig() BaseConfig { return conf } +// GenesisFile returns the full path to the genesis.json file func (b BaseConfig) GenesisFile() string { return rootify(b.Genesis, b.RootDir) } +// PrivValidatorFile returns the full path to the priv_validator.json file func (b BaseConfig) PrivValidatorFile() string { return rootify(b.PrivValidator, b.RootDir) } +// DBDir returns the full path to the database directory func (b BaseConfig) DBDir() string { return rootify(b.DBPath, b.RootDir) } +// DefaultLogLevel returns a default log level of "error" func DefaultLogLevel() string { return "error" } +// DefaultPackageLogLevels returns a default log level setting so all packages log at "error", while the `state` package logs at "info" func DefaultPackageLogLevels() string { return fmt.Sprintf("state:info,*:%s", DefaultLogLevel()) } @@ -151,6 +161,7 @@ func DefaultPackageLogLevels() string { //----------------------------------------------------------------------------- // RPCConfig +// RPCConfig defines the configuration options for the Tendermint RPC server type RPCConfig struct { RootDir string `mapstructure:"home"` @@ -165,6 +176,7 @@ type RPCConfig struct { Unsafe bool `mapstructure:"unsafe"` } +// DefaultRPCConfig returns a default configuration for the RPC server func DefaultRPCConfig() *RPCConfig { return &RPCConfig{ ListenAddress: "tcp://0.0.0.0:46657", @@ -173,6 +185,7 @@ func DefaultRPCConfig() *RPCConfig { } } +// TestRPCConfig returns a configuration for testing the RPC server func TestRPCConfig() *RPCConfig { conf := DefaultRPCConfig() conf.ListenAddress = "tcp://0.0.0.0:36657" @@ -184,6 +197,7 @@ func TestRPCConfig() *RPCConfig { //----------------------------------------------------------------------------- // P2PConfig +// P2PConfig defines the configuration options for the Tendermint peer-to-peer networking layer type P2PConfig struct { RootDir string `mapstructure:"home"` ListenAddress string `mapstructure:"laddr"` @@ -195,6 +209,7 @@ type P2PConfig struct { MaxNumPeers int `mapstructure:"max_num_peers"` } +// DefaultP2PConfig returns a default configuration for the peer-to-peer layer func DefaultP2PConfig() *P2PConfig { return &P2PConfig{ ListenAddress: "tcp://0.0.0.0:46656", @@ -204,6 +219,7 @@ func DefaultP2PConfig() *P2PConfig { } } +// TestP2PConfig returns a configuration for testing the peer-to-peer layer func TestP2PConfig() *P2PConfig { conf := DefaultP2PConfig() conf.ListenAddress = "tcp://0.0.0.0:36656" @@ -211,6 +227,7 @@ func TestP2PConfig() *P2PConfig { return conf } +// AddrBookFile returns the full path to the address bool func (p *P2PConfig) AddrBookFile() string { return rootify(p.AddrBook, p.RootDir) } @@ -218,6 +235,7 @@ func (p *P2PConfig) AddrBookFile() string { //----------------------------------------------------------------------------- // MempoolConfig +// MempoolConfig defines the configuration options for the Tendermint mempool type MempoolConfig struct { RootDir string `mapstructure:"home"` Recheck bool `mapstructure:"recheck"` @@ -226,6 +244,7 @@ type MempoolConfig struct { WalPath string `mapstructure:"wal_dir"` } +// DefaultMempoolConfig returns a default configuration for the Tendermint mempool func DefaultMempoolConfig() *MempoolConfig { return &MempoolConfig{ Recheck: true, @@ -235,6 +254,7 @@ func DefaultMempoolConfig() *MempoolConfig { } } +// WalDir returns the full path to the mempool's write-ahead log func (m *MempoolConfig) WalDir() string { return rootify(m.WalPath, m.RootDir) } @@ -242,8 +262,8 @@ func (m *MempoolConfig) WalDir() string { //----------------------------------------------------------------------------- // ConsensusConfig -// ConsensusConfig holds timeouts and details about the WAL, the block structure, -// and timeouts in the consensus protocol. +// ConsensusConfig defines the confuguration for the Tendermint consensus service, +// including timeouts and details about the WAL and the block structure. type ConsensusConfig struct { RootDir string `mapstructure:"home"` WalPath string `mapstructure:"wal_file"` @@ -269,46 +289,64 @@ type ConsensusConfig struct { // TODO: This probably shouldn't be exposed but it makes it // easy to write tests for the wal/replay BlockPartSize int `mapstructure:"block_part_size"` + + // Reactor sleep duration parameters are in ms + PeerGossipSleepDuration int `mapstructure:"peer_gossip_sleep_duration"` + PeerQueryMaj23SleepDuration int `mapstructure:"peer_query_maj23_sleep_duration"` } -// Wait this long for a proposal +// Propose returns the amount of time to wait for a proposal func (cfg *ConsensusConfig) Propose(round int) time.Duration { return time.Duration(cfg.TimeoutPropose+cfg.TimeoutProposeDelta*round) * time.Millisecond } -// After receiving any +2/3 prevote, wait this long for stragglers +// Prevote returns the amount of time to wait for straggler votes after receiving any +2/3 prevotes func (cfg *ConsensusConfig) Prevote(round int) time.Duration { return time.Duration(cfg.TimeoutPrevote+cfg.TimeoutPrevoteDelta*round) * time.Millisecond } -// After receiving any +2/3 precommits, wait this long for stragglers +// Precommit returns the amount of time to wait for straggler votes after receiving any +2/3 precommits func (cfg *ConsensusConfig) Precommit(round int) time.Duration { return time.Duration(cfg.TimeoutPrecommit+cfg.TimeoutPrecommitDelta*round) * time.Millisecond } -// After receiving +2/3 precommits for a single block (a commit), wait this long for stragglers in the next height's RoundStepNewHeight +// Commit returns the amount of time to wait for straggler votes after receiving +2/3 precommits for a single block (ie. a commit). func (cfg *ConsensusConfig) Commit(t time.Time) time.Time { return t.Add(time.Duration(cfg.TimeoutCommit) * time.Millisecond) } +// PeerGossipSleep returns the amount of time to sleep if there is nothing to send from the ConsensusReactor +func (cfg *ConsensusConfig) PeerGossipSleep() time.Duration { + return time.Duration(cfg.PeerGossipSleepDuration) * time.Millisecond +} + +// PeerQueryMaj23Sleep returns the amount of time to sleep after each VoteSetMaj23Message is sent in the ConsensusReactor +func (cfg *ConsensusConfig) PeerQueryMaj23Sleep() time.Duration { + return time.Duration(cfg.PeerQueryMaj23SleepDuration) * time.Millisecond +} + +// DefaultConsensusConfig returns a default configuration for the consensus service func DefaultConsensusConfig() *ConsensusConfig { return &ConsensusConfig{ - WalPath: "data/cs.wal/wal", - WalLight: false, - TimeoutPropose: 3000, - TimeoutProposeDelta: 500, - TimeoutPrevote: 1000, - TimeoutPrevoteDelta: 500, - TimeoutPrecommit: 1000, - TimeoutPrecommitDelta: 500, - TimeoutCommit: 1000, - SkipTimeoutCommit: false, - MaxBlockSizeTxs: 10000, - MaxBlockSizeBytes: 1, // TODO - BlockPartSize: types.DefaultBlockPartSize, // TODO: we shouldnt be importing types + WalPath: "data/cs.wal/wal", + WalLight: false, + TimeoutPropose: 3000, + TimeoutProposeDelta: 500, + TimeoutPrevote: 1000, + TimeoutPrevoteDelta: 500, + TimeoutPrecommit: 1000, + TimeoutPrecommitDelta: 500, + TimeoutCommit: 1000, + SkipTimeoutCommit: false, + MaxBlockSizeTxs: 10000, + MaxBlockSizeBytes: 1, // TODO + BlockPartSize: types.DefaultBlockPartSize, // TODO: we shouldnt be importing types + PeerGossipSleepDuration: 100, + PeerQueryMaj23SleepDuration: 2000, } } +// TestConsensusConfig returns a configuration for testing the consensus service func TestConsensusConfig() *ConsensusConfig { config := DefaultConsensusConfig() config.TimeoutPropose = 2000 @@ -322,6 +360,7 @@ func TestConsensusConfig() *ConsensusConfig { return config } +// WalFile returns the full path to the write-ahead log file func (c *ConsensusConfig) WalFile() string { if c.walFile != "" { return c.walFile @@ -329,6 +368,7 @@ func (c *ConsensusConfig) WalFile() string { return rootify(c.WalPath, c.RootDir) } +// SetWalFile sets the path to the write-ahead log file func (c *ConsensusConfig) SetWalFile(walFile string) { c.walFile = walFile } diff --git a/consensus/reactor.go b/consensus/reactor.go index 50207ed5..902ed76f 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -21,9 +21,7 @@ const ( VoteChannel = byte(0x22) VoteSetBitsChannel = byte(0x23) - peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. - peerQueryMaj23SleepDuration = 2 * time.Second // Time to sleep after each VoteSetMaj23Message sent - maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes. + maxConsensusMessageSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes. ) //----------------------------------------------------------------------------- @@ -413,12 +411,12 @@ OUTER_LOOP: blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height) if blockMeta == nil { logger.Error("Failed to load block meta", "peer height", prs.Height, "ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height(), "pv", conR.conS.privValidator) - time.Sleep(peerGossipSleepDuration) + time.Sleep(conR.conS.config.PeerGossipSleep()) continue OUTER_LOOP } else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) { logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping", "peerHeight", prs.Height, "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) - time.Sleep(peerGossipSleepDuration) + time.Sleep(conR.conS.config.PeerGossipSleep()) continue OUTER_LOOP } // Load the part @@ -426,7 +424,7 @@ OUTER_LOOP: if part == nil { logger.Error("Could not load part", "index", index, "peerHeight", prs.Height, "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) - time.Sleep(peerGossipSleepDuration) + time.Sleep(conR.conS.config.PeerGossipSleep()) continue OUTER_LOOP } // Send the part @@ -441,7 +439,7 @@ OUTER_LOOP: continue OUTER_LOOP } else { //logger.Info("No parts to send in catch-up, sleeping") - time.Sleep(peerGossipSleepDuration) + time.Sleep(conR.conS.config.PeerGossipSleep()) continue OUTER_LOOP } } @@ -449,7 +447,7 @@ OUTER_LOOP: // If height and round don't match, sleep. if (rs.Height != prs.Height) || (rs.Round != prs.Round) { //logger.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer) - time.Sleep(peerGossipSleepDuration) + time.Sleep(conR.conS.config.PeerGossipSleep()) continue OUTER_LOOP } @@ -483,7 +481,7 @@ OUTER_LOOP: } // Nothing to do. Sleep. - time.Sleep(peerGossipSleepDuration) + time.Sleep(conR.conS.config.PeerGossipSleep()) continue OUTER_LOOP } } @@ -581,7 +579,7 @@ OUTER_LOOP: sleeping = 1 } - time.Sleep(peerGossipSleepDuration) + time.Sleep(conR.conS.config.PeerGossipSleep()) continue OUTER_LOOP } } @@ -611,7 +609,7 @@ OUTER_LOOP: Type: types.VoteTypePrevote, BlockID: maj23, }}) - time.Sleep(peerQueryMaj23SleepDuration) + time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) } } } @@ -628,7 +626,7 @@ OUTER_LOOP: Type: types.VoteTypePrecommit, BlockID: maj23, }}) - time.Sleep(peerQueryMaj23SleepDuration) + time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) } } } @@ -645,7 +643,7 @@ OUTER_LOOP: Type: types.VoteTypePrevote, BlockID: maj23, }}) - time.Sleep(peerQueryMaj23SleepDuration) + time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) } } } @@ -664,11 +662,11 @@ OUTER_LOOP: Type: types.VoteTypePrecommit, BlockID: commit.BlockID, }}) - time.Sleep(peerQueryMaj23SleepDuration) + time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) } } - time.Sleep(peerQueryMaj23SleepDuration) + time.Sleep(conR.conS.config.PeerQueryMaj23Sleep()) continue OUTER_LOOP } From 3c10f7a122cf5856c0cf230ad18779c504121d5d Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 7 Jul 2017 13:08:52 -0400 Subject: [PATCH 03/13] add p2p flush throttle to config --- CHANGELOG.md | 1 + config/config.go | 42 ++++++++++++++++++++++++++++++------------ p2p/connection.go | 9 ++++++--- p2p/peer.go | 8 ++++---- p2p/switch.go | 3 ++- 5 files changed, 43 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1dbf7a70..6d9ee4b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ FEATURES: - Add consensus reactor sleep durations to the config +- Add p2p flush throttle timeout to the config ## 0.10.1 (June 28, 2017) diff --git a/config/config.go b/config/config.go index 56708442..e552b33b 100644 --- a/config/config.go +++ b/config/config.go @@ -199,23 +199,41 @@ func TestRPCConfig() *RPCConfig { // P2PConfig defines the configuration options for the Tendermint peer-to-peer networking layer type P2PConfig struct { - RootDir string `mapstructure:"home"` - ListenAddress string `mapstructure:"laddr"` - Seeds string `mapstructure:"seeds"` - SkipUPNP bool `mapstructure:"skip_upnp"` - AddrBook string `mapstructure:"addr_book_file"` - AddrBookStrict bool `mapstructure:"addr_book_strict"` - PexReactor bool `mapstructure:"pex"` - MaxNumPeers int `mapstructure:"max_num_peers"` + RootDir string `mapstructure:"home"` + + // Address to listen for incoming connections + ListenAddress string `mapstructure:"laddr"` + + // Comma separated list of seed nodes to connect to + Seeds string `mapstructure:"seeds"` + + // Skip UPNP port forwarding + SkipUPNP bool `mapstructure:"skip_upnp"` + + // Path to address book + AddrBook string `mapstructure:"addr_book_file"` + + // Set true for strict address routability rules + AddrBookStrict bool `mapstructure:"addr_book_strict"` + + // Set true to enable the peer-exchange reactor + PexReactor bool `mapstructure:"pex"` + + // Maximum number of peers to connect to + MaxNumPeers int `mapstructure:"max_num_peers"` + + // Time to wait before flushing messages out on the connection. In ms + FlushThrottleTimeout int `mapstructure:"flush_throttle_timeout"` } // DefaultP2PConfig returns a default configuration for the peer-to-peer layer func DefaultP2PConfig() *P2PConfig { return &P2PConfig{ - ListenAddress: "tcp://0.0.0.0:46656", - AddrBook: "addrbook.json", - AddrBookStrict: true, - MaxNumPeers: 50, + ListenAddress: "tcp://0.0.0.0:46656", + AddrBook: "addrbook.json", + AddrBookStrict: true, + MaxNumPeers: 50, + FlushThrottleTimeout: 100, } } diff --git a/p2p/connection.go b/p2p/connection.go index 67c9d98f..ce165a8d 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -89,13 +89,16 @@ type MConnection struct { type MConnConfig struct { SendRate int64 `mapstructure:"send_rate"` RecvRate int64 `mapstructure:"recv_rate"` + + flushThrottle time.Duration } // DefaultMConnConfig returns the default config. func DefaultMConnConfig() *MConnConfig { return &MConnConfig{ - SendRate: defaultSendRate, - RecvRate: defaultRecvRate, + SendRate: defaultSendRate, + RecvRate: defaultRecvRate, + flushThrottle: flushThrottle, } } @@ -148,7 +151,7 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec func (c *MConnection) OnStart() error { c.BaseService.OnStart() c.quit = make(chan struct{}) - c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle) + c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState) go c.sendRoutine() diff --git a/p2p/peer.go b/p2p/peer.go index 2602206c..58678b3d 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -59,9 +59,9 @@ func DefaultPeerConfig() *PeerConfig { } } -func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { +/*func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) -} +}*/ func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { conn, err := dial(addr, config) @@ -77,9 +77,9 @@ func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, return peer, nil } -func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { +/*func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) -} +}*/ func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) diff --git a/p2p/switch.go b/p2p/switch.go index 2d8d3435..547db4bd 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -90,6 +90,7 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { dialing: cmn.NewCMap(), nodeInfo: nil, } + sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond // TODO: collapse the peerConfig into the config ? sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) return sw } @@ -547,7 +548,7 @@ func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f } func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) + peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) if err != nil { conn.Close() return err From e36e507463afc56b79ec180f5035cf06e6034f7b Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 7 Jul 2017 13:30:30 -0400 Subject: [PATCH 04/13] changelog --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d9ee4b4..5eb3c2f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,8 +3,7 @@ ## 0.10.2 (TBD) FEATURES: -- Add consensus reactor sleep durations to the config -- Add p2p flush throttle timeout to the config +- Enable lower latency block commits by adding consensus reactor sleep durations and p2p flush throttle timeout to the config ## 0.10.1 (June 28, 2017) From e6cecb9595f8a883d405a94ddac39f435a4c0294 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 7 Jul 2017 13:33:15 -0400 Subject: [PATCH 05/13] p2p: fix test --- p2p/peer.go | 12 ++---------- p2p/peer_test.go | 4 ++-- p2p/switch.go | 6 +++--- p2p/switch_test.go | 4 ++-- 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/p2p/peer.go b/p2p/peer.go index 58678b3d..db91c84f 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -59,11 +59,7 @@ func DefaultPeerConfig() *PeerConfig { } } -/*func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { - return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) -}*/ - -func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { +func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { conn, err := dial(addr, config) if err != nil { return nil, errors.Wrap(err, "Error creating peer") @@ -77,11 +73,7 @@ func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, return peer, nil } -/*func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { - return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) -}*/ - -func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { +func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) } diff --git a/p2p/peer_test.go b/p2p/peer_test.go index 0ac77634..347de784 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -82,7 +82,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) } reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} pk := crypto.GenPrivKeyEd25519() - p, err := newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk, config) + p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk, config) if err != nil { return nil, err } @@ -133,7 +133,7 @@ func (p *remotePeer) accept(l net.Listener) { if err != nil { golog.Fatalf("Failed to accept conn: %+v", err) } - peer, err := newInboundPeerWithConfig(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config) + peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config) if err != nil { golog.Fatalf("Failed to create a peer: %+v", err) } diff --git a/p2p/switch.go b/p2p/switch.go index 547db4bd..d41b7d4c 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -321,7 +321,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, defer sw.dialing.Delete(addr.IP.String()) sw.Logger.Info("Dialing peer", "address", addr) - peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) + peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) if err != nil { sw.Logger.Error("Failed to dial peer", "address", addr, "err", err) return nil, err @@ -548,7 +548,7 @@ func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f } func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) + peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) if err != nil { conn.Close() return err @@ -563,7 +563,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error { } func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { - peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) + peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) if err != nil { conn.Close() return err diff --git a/p2p/switch_test.go b/p2p/switch_test.go index eed7d1fa..c686f9e4 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -244,7 +244,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { rp.Start() defer rp.Stop() - peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig()) require.Nil(err) err = sw.AddPeer(peer) require.Nil(err) @@ -270,7 +270,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) { rp.Start() defer rp.Stop() - peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) + peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig()) peer.makePersistent() require.Nil(err) err = sw.AddPeer(peer) From 5888ddaab108c2f2b4ed362cdaada440381e5540 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 7 Jul 2017 13:41:50 -0400 Subject: [PATCH 06/13] consensus: improve logging for conflicting votes --- consensus/state.go | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index cc9cd51e..fe98e854 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1331,19 +1331,14 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerKey string) error { if err == ErrVoteHeightMismatch { return err } else if _, ok := err.(*types.ErrVoteConflictingVotes); ok { - if peerKey == "" { + if bytes.Equal(vote.ValidatorAddress, cs.privValidator.GetAddress()) { cs.Logger.Error("Found conflicting vote from ourselves. Did you unsafe_reset a validator?", "height", vote.Height, "round", vote.Round, "type", vote.Type) return err } - cs.Logger.Error("Found conflicting vote. Publish evidence (TODO)") - /* TODO - evidenceTx := &types.DupeoutTx{ - Address: address, - VoteA: *errDupe.VoteA, - VoteB: *errDupe.VoteB, - } - cs.mempool.BroadcastTx(struct{???}{evidenceTx}) // shouldn't need to check returned err - */ + cs.Logger.Error("Found conflicting vote. Publish evidence (TODO)", "height", vote.Height, "round", vote.Round, "type", vote.Type, "valAddr", vote.ValidatorAddress, "valIndex", vote.ValidatorIndex) + + // TODO: track evidence for inclusion in a block + return err } else { // Probably an invalid signature. Bad peer. From 612726d9f66d8b90d0a0738ad37e11b5dd1b382b Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 7 Jul 2017 16:58:16 -0400 Subject: [PATCH 07/13] consensus: better logging --- consensus/reactor.go | 20 +++++++++++--------- consensus/state.go | 7 ++++--- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 902ed76f..f8a6dfae 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -388,7 +388,6 @@ OUTER_LOOP: // Send proposal Block parts? if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) { - //logger.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts) if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok { part := rs.ProposalBlockParts.GetPart(index) msg := &BlockPartMessage{ @@ -396,6 +395,7 @@ OUTER_LOOP: Round: rs.Round, // This tells peer that this part applies to us. Part: part, } + logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round) if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } @@ -405,7 +405,6 @@ OUTER_LOOP: // If the peer is on a previous height, help catch up. if (0 < prs.Height) && (prs.Height < rs.Height) { - //logger.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts) if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { // Ensure that the peer's PartSetHeader is correct blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height) @@ -433,6 +432,7 @@ OUTER_LOOP: Round: prs.Round, // Not our height, so it doesn't matter. Part: part, } + logger.Debug("Sending block part for catchup", "height", prs.Height, "round", prs.Round) if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) } @@ -461,6 +461,7 @@ OUTER_LOOP: // Proposal: share the proposal metadata with peer. { msg := &ProposalMessage{Proposal: rs.Proposal} + logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round) if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { ps.SetHasProposal(rs.Proposal) } @@ -475,6 +476,7 @@ OUTER_LOOP: ProposalPOLRound: rs.Proposal.POLRound, ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(), } + logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round) peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) } continue OUTER_LOOP @@ -517,21 +519,21 @@ OUTER_LOOP: // If there are lastCommits to send... if prs.Step == RoundStepNewHeight { if ps.PickSendVote(rs.LastCommit) { - logger.Debug("Picked rs.LastCommit to send") + logger.Debug("Picked rs.LastCommit to send", "height", prs.Height) continue OUTER_LOOP } } // If there are prevotes to send... if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round { if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { - logger.Debug("Picked rs.Prevotes(prs.Round) to send") + logger.Debug("Picked rs.Prevotes(prs.Round) to send", "height", prs.Height, "round", prs.Round) continue OUTER_LOOP } } // If there are precommits to send... if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round { if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { - logger.Debug("Picked rs.Precommits(prs.Round) to send") + logger.Debug("Picked rs.Precommits(prs.Round) to send", "height", prs.Height, "round", prs.Round) continue OUTER_LOOP } } @@ -539,7 +541,7 @@ OUTER_LOOP: if prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { if ps.PickSendVote(polPrevotes) { - logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send") + logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", "height", prs.Height, "round", prs.ProposalPOLRound) continue OUTER_LOOP } } @@ -550,7 +552,7 @@ OUTER_LOOP: // If peer is lagging by height 1, send LastCommit. if prs.Height != 0 && rs.Height == prs.Height+1 { if ps.PickSendVote(rs.LastCommit) { - logger.Debug("Picked rs.LastCommit to send") + logger.Debug("Picked rs.LastCommit to send", "height", prs.Height) continue OUTER_LOOP } } @@ -563,7 +565,7 @@ OUTER_LOOP: commit := conR.conS.blockStore.LoadBlockCommit(prs.Height) logger.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit) if ps.PickSendVote(commit) { - logger.Debug("Picked Catchup commit to send") + logger.Debug("Picked Catchup commit to send", "height", prs.Height) continue OUTER_LOOP } } @@ -571,7 +573,7 @@ OUTER_LOOP: if sleeping == 0 { // We sent nothing. Sleep... sleeping = 1 - logger.Debug("No votes to send, sleeping", + logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height, "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes, "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits) } else if sleeping == 2 { diff --git a/consensus/state.go b/consensus/state.go index fe98e854..75da2a08 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -912,16 +912,17 @@ func (cs *ConsensusState) enterPrevote(height int, round int) { } func (cs *ConsensusState) defaultDoPrevote(height int, round int) { + logger := cs.Logger.With("height", height, "round", round) // If a block is locked, prevote that. if cs.LockedBlock != nil { - cs.Logger.Info("enterPrevote: Block was locked") + logger.Info("enterPrevote: Block was locked") cs.signAddVote(types.VoteTypePrevote, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header()) return } // If ProposalBlock is nil, prevote nil. if cs.ProposalBlock == nil { - cs.Logger.Info("enterPrevote: ProposalBlock is nil") + logger.Info("enterPrevote: ProposalBlock is nil") cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) return } @@ -930,7 +931,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int, round int) { err := cs.state.ValidateBlock(cs.ProposalBlock) if err != nil { // ProposalBlock is invalid, prevote nil. - cs.Logger.Error("enterPrevote: ProposalBlock is invalid", "err", err) + logger.Error("enterPrevote: ProposalBlock is invalid", "err", err) cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) return } From eed3959749e3ba020d65f6e450557d57ffbe2c24 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 9 Jul 2017 13:17:01 -0400 Subject: [PATCH 08/13] version bump --- CHANGELOG.md | 3 +++ version/version.go | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5eb3c2f4..83286d8e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ FEATURES: - Enable lower latency block commits by adding consensus reactor sleep durations and p2p flush throttle timeout to the config +IMPROVEMENTS: +- Better consensus logging + ## 0.10.1 (June 28, 2017) FEATURES: diff --git a/version/version.go b/version/version.go index e77f0e40..23ea294c 100644 --- a/version/version.go +++ b/version/version.go @@ -2,11 +2,11 @@ package version const Maj = "0" const Min = "10" -const Fix = "1" +const Fix = "2" var ( // The full version string - Version = "0.10.1" + Version = "0.10.2" // GitCommit is set with --ldflags "-X main.gitCommit=$(git rev-parse HEAD)" GitCommit string From b07d01f102e1cc725510913013a258e8e2c22f37 Mon Sep 17 00:00:00 2001 From: Adrian Brink Date: Sun, 9 Jul 2017 20:35:48 +0200 Subject: [PATCH 09/13] Add more comments on public functions and extra logging during 'enterPrevote' Signed-off-by: Adrian Brink --- consensus/reactor.go | 9 +++++---- consensus/state.go | 1 + p2p/connection.go | 2 ++ p2p/switch.go | 30 +++++++++++++++--------------- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index f8a6dfae..ee463b95 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -60,6 +60,7 @@ func (conR *ConsensusReactor) OnStart() error { return nil } +// OnStop implements BaseService func (conR *ConsensusReactor) OnStop() { conR.BaseReactor.OnStop() conR.conS.Stop() @@ -77,7 +78,7 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { conR.conS.Start() } -// Implements Reactor +// GetChannels implements Reactor func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { // TODO optimize return []*p2p.ChannelDescriptor{ @@ -107,7 +108,7 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { } } -// Implements Reactor +// ConsensusReactor implements Reactor func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { if !conR.IsRunning() { return @@ -129,7 +130,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { } } -// Implements Reactor +// RemovePeer implements Reactor func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { if !conR.IsRunning() { return @@ -138,7 +139,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect() } -// Implements Reactor +// Receive implements Reactor // NOTE: We process these messages even when we're fast_syncing. // Messages affect either a peer state or the consensus state. // Peer state updates can happen in parallel, but processing of diff --git a/consensus/state.go b/consensus/state.go index 75da2a08..6abcd7a2 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -939,6 +939,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int, round int) { // Prevote cs.ProposalBlock // NOTE: the proposal signature is validated when it is received, // and the proposal block parts are validated as they are received (against the merkle hash in the proposal) + logger.Info("enterPrevote: ProposalBlock is valid and voted on") cs.signAddVote(types.VoteTypePrevote, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header()) } diff --git a/p2p/connection.go b/p2p/connection.go index ce165a8d..f86a211d 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -148,6 +148,7 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec return mconn } +// OnStart implements BaseService func (c *MConnection) OnStart() error { c.BaseService.OnStart() c.quit = make(chan struct{}) @@ -159,6 +160,7 @@ func (c *MConnection) OnStart() error { return nil } +// OnStop implements BaseService func (c *MConnection) OnStop() { c.BaseService.OnStop() c.flushTimer.Stop() diff --git a/p2p/switch.go b/p2p/switch.go index d41b7d4c..c23f97df 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -95,7 +95,7 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { return sw } -// Not goroutine safe. +// AddReactor is not goroutine safe. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { // Validate the reactor. // No two reactors can share the same channel. @@ -113,42 +113,42 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { return reactor } -// Not goroutine safe. +// Reactors is not goroutine safe. func (sw *Switch) Reactors() map[string]Reactor { return sw.reactors } -// Not goroutine safe. +// Reactor is not goroutine safe. func (sw *Switch) Reactor(name string) Reactor { return sw.reactors[name] } -// Not goroutine safe. +// AddListener is not goroutine safe. func (sw *Switch) AddListener(l Listener) { sw.listeners = append(sw.listeners, l) } -// Not goroutine safe. +// Listeners is not goroutine safe. func (sw *Switch) Listeners() []Listener { return sw.listeners } -// Not goroutine safe. +// IsListening is not goroutine safe. func (sw *Switch) IsListening() bool { return len(sw.listeners) > 0 } -// Not goroutine safe. +// SetNodeInfo is not goroutine safe. func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) { sw.nodeInfo = nodeInfo } -// Not goroutine safe. +// NodeInfo is not goroutine safe. func (sw *Switch) NodeInfo() *NodeInfo { return sw.nodeInfo } -// Not goroutine safe. +// SetNodePrivKey is not goroutine safe. // NOTE: Overwrites sw.nodeInfo.PubKey func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) { sw.nodePrivKey = nodePrivKey @@ -273,7 +273,7 @@ func (sw *Switch) startInitPeer(peer *Peer) { } } -// Dial a list of seeds asynchronously in random order +// DialSeeds dials a list of seeds asynchronously in random order func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { netAddrs, err := NewNetAddressStrings(seeds) @@ -360,7 +360,7 @@ func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { return successChan } -// Returns the count of outbound/inbound and outbound-dialing peers. +// NumPeers returns the count of outbound/inbound and outbound-dialing peers. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { peers := sw.peers.List() for _, peer := range peers { @@ -378,7 +378,7 @@ func (sw *Switch) Peers() IPeerSet { return sw.peers } -// Disconnect from a peer due to external error, retry if it is a persistent peer. +// StopPeerForError disconnect from a peer due to external error, retry if it is a persistent peer. // TODO: make record depending on reason. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { addr := NewNetAddress(peer.Addr()) @@ -411,7 +411,7 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { } } -// Disconnect from a peer gracefully. +// StopPeerGracefully disconnect from a peer gracefully. // TODO: handle graceful disconnects. func (sw *Switch) StopPeerGracefully(peer *Peer) { sw.Logger.Info("Stopping peer gracefully") @@ -469,7 +469,7 @@ type SwitchEventDonePeer struct { //------------------------------------------------------------------ // Switches connected via arbitrary net.Conn; useful for testing -// Returns n switches, connected according to the connect func. +// MakeConnectedSwitches returns n switches, connected according to the connect func. // If connect==Connect2Switches, the switches will be fully connected. // initSwitch defines how the ith switch should be initialized (ie. with what reactors). // NOTE: panics if any switch fails to start. @@ -494,7 +494,7 @@ func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Swit var PanicOnAddPeerErr = false -// Will connect switches i and j via net.Pipe() +// Connect2Switches will connect switches i and j via net.Pipe() // Blocks until a conection is established. // NOTE: caller ensures i and j are within bounds func Connect2Switches(switches []*Switch, i, j int) { From 74a3a2b56af79c7f526a77b91d918861b1cc2753 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 9 Jul 2017 18:01:25 -0400 Subject: [PATCH 10/13] fix comments --- consensus/reactor.go | 2 +- consensus/state.go | 2 +- p2p/switch.go | 50 ++++++++++++++++++++++++++++++++------------ 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index ee463b95..3825eca1 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -108,7 +108,7 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { } } -// ConsensusReactor implements Reactor +// AddPeer implements Reactor func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { if !conR.IsRunning() { return diff --git a/consensus/state.go b/consensus/state.go index 6abcd7a2..98255186 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -939,7 +939,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int, round int) { // Prevote cs.ProposalBlock // NOTE: the proposal signature is validated when it is received, // and the proposal block parts are validated as they are received (against the merkle hash in the proposal) - logger.Info("enterPrevote: ProposalBlock is valid and voted on") + logger.Info("enterPrevote: ProposalBlock is valid") cs.signAddVote(types.VoteTypePrevote, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header()) } diff --git a/p2p/switch.go b/p2p/switch.go index c23f97df..49687511 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -95,7 +95,8 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { return sw } -// AddReactor is not goroutine safe. +// AddReactor adds the given reactor to the switch. +// NOTE: Not goroutine safe. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { // Validate the reactor. // No two reactors can share the same channel. @@ -113,43 +114,51 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { return reactor } -// Reactors is not goroutine safe. +// Reactors returns a map of reactors registered on the switch. +// NOTE: Not goroutine safe. func (sw *Switch) Reactors() map[string]Reactor { return sw.reactors } -// Reactor is not goroutine safe. +// Reactor returns the reactor with the given name. +// NOTE: Not goroutine safe. func (sw *Switch) Reactor(name string) Reactor { return sw.reactors[name] } -// AddListener is not goroutine safe. +// AddListener adds the given listener to the switch for listening to incoming peer connections. +// NOTE: Not goroutine safe. func (sw *Switch) AddListener(l Listener) { sw.listeners = append(sw.listeners, l) } -// Listeners is not goroutine safe. +// Listeners returns the list of listeners the switch listens on. +// NOTE: Not goroutine safe. func (sw *Switch) Listeners() []Listener { return sw.listeners } -// IsListening is not goroutine safe. +// IsListening returns true if the switch has at least one listener. +// NOTE: Not goroutine safe. func (sw *Switch) IsListening() bool { return len(sw.listeners) > 0 } -// SetNodeInfo is not goroutine safe. +// SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes. +// NOTE: Not goroutine safe. func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) { sw.nodeInfo = nodeInfo } -// NodeInfo is not goroutine safe. +// NodeInfo returns the switch's NodeInfo. +// NOTE: Not goroutine safe. func (sw *Switch) NodeInfo() *NodeInfo { return sw.nodeInfo } -// SetNodePrivKey is not goroutine safe. -// NOTE: Overwrites sw.nodeInfo.PubKey +// SetNodePrivKey sets the switche's private key for authenticated encryption. +// NOTE: Overwrites sw.nodeInfo.PubKey. +// NOTE: Not goroutine safe. func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) { sw.nodePrivKey = nodePrivKey if sw.nodeInfo != nil { @@ -157,7 +166,7 @@ func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) { } } -// Switch.Start() starts all the reactors, peers, and listeners. +// OnStart implements BaseService. It starts all the reactors, peers, and listeners. func (sw *Switch) OnStart() error { sw.BaseService.OnStart() // Start reactors @@ -178,6 +187,7 @@ func (sw *Switch) OnStart() error { return nil } +// OnStop implements BaseService. It stops all listeners, peers, and reactors. func (sw *Switch) OnStop() { sw.BaseService.OnStop() // Stop listeners @@ -196,6 +206,8 @@ func (sw *Switch) OnStop() { } } +// AddPeer checks the given peer's validity, performs a handshake, and adds the peer to the switch +// and to all registered reactors. // NOTE: This performs a blocking handshake before the peer is added. // CONTRACT: If error is returned, peer is nil, and conn is immediately closed. func (sw *Switch) AddPeer(peer *Peer) error { @@ -243,6 +255,7 @@ func (sw *Switch) AddPeer(peer *Peer) error { return nil } +// FilterConnByAddr returns an error if connecting to the given address is forbidden. func (sw *Switch) FilterConnByAddr(addr net.Addr) error { if sw.filterConnByAddr != nil { return sw.filterConnByAddr(addr) @@ -250,6 +263,7 @@ func (sw *Switch) FilterConnByAddr(addr net.Addr) error { return nil } +// FilterConnByPubKey returns an error if connecting to the given public key is forbidden. func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error { if sw.filterConnByPubKey != nil { return sw.filterConnByPubKey(pubkey) @@ -258,10 +272,12 @@ func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error { } +// SetAddrFilter sets the function for filtering connections by address. func (sw *Switch) SetAddrFilter(f func(net.Addr) error) { sw.filterConnByAddr = f } +// SetPubKeyFilter sets the function for filtering connections by public key. func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) { sw.filterConnByPubKey = f } @@ -316,6 +332,8 @@ func (sw *Switch) dialSeed(addr *NetAddress) { } } +// DialPeerWithAddress dials the given peer and runs sw.AddPeer if it connects successfully. +// If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails. func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { sw.dialing.Set(addr.IP.String(), addr) defer sw.dialing.Delete(addr.IP.String()) @@ -340,6 +358,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, return peer, nil } +// IsDialing returns true if the switch is currently dialing the given address. func (sw *Switch) IsDialing(addr *NetAddress) bool { return sw.dialing.Has(addr.IP.String()) } @@ -348,6 +367,7 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool { // trying to send for defaultSendTimeoutSeconds. Returns a channel // which receives success values for each attempted send (false if times out) // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. +// TODO: Something more intelligent. func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { successChan := make(chan bool, len(sw.peers.List())) sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) @@ -374,11 +394,13 @@ func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { return } +// Peers returns the set of peers the switch is connected to. func (sw *Switch) Peers() IPeerSet { return sw.peers } -// StopPeerForError disconnect from a peer due to external error, retry if it is a persistent peer. +// StopPeerForError disconnects from a peer due to external error. +// If the peer is persistent, it will attempt to reconnect. // TODO: make record depending on reason. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { addr := NewNetAddress(peer.Addr()) @@ -411,7 +433,7 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { } } -// StopPeerGracefully disconnect from a peer gracefully. +// StopPeerGracefully disconnects from a peer gracefully. // TODO: handle graceful disconnects. func (sw *Switch) StopPeerGracefully(peer *Peer) { sw.Logger.Info("Stopping peer gracefully") @@ -520,6 +542,8 @@ func Connect2Switches(switches []*Switch, i, j int) { <-doneCh } +// StartSwitches calls sw.Start() for each given switch. +// It returns the first encountered error. func StartSwitches(switches []*Switch) error { for _, s := range switches { _, err := s.Start() // start switch and reactors From 5f6b996d229284eb128c2dc713d96a51f0855f0d Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 9 Jul 2017 18:38:59 -0400 Subject: [PATCH 11/13] breakup some long lines; add more comments to consensus reactor --- consensus/reactor.go | 282 +++++++++++++++++++++++++++---------------- p2p/peer.go | 16 ++- 2 files changed, 187 insertions(+), 111 deletions(-) diff --git a/consensus/reactor.go b/consensus/reactor.go index 3825eca1..1d8155c1 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -9,10 +9,12 @@ import ( "time" wire "github.com/tendermint/go-wire" + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" + "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" - . "github.com/tendermint/tmlibs/common" ) const ( @@ -26,6 +28,7 @@ const ( //----------------------------------------------------------------------------- +// ConsensusReactor defines a reactor for the consensus service. type ConsensusReactor struct { p2p.BaseReactor // BaseService + p2p.Switch @@ -34,6 +37,7 @@ type ConsensusReactor struct { evsw types.EventSwitch } +// NewConsensusReactor returns a new ConsensusReactor with the given consensusState. func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor { conR := &ConsensusReactor{ conS: consensusState, @@ -43,6 +47,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens return conR } +// OnStart implements BaseService. func (conR *ConsensusReactor) OnStart() error { conR.Logger.Info("ConsensusReactor ", "fastSync", conR.fastSync) conR.BaseReactor.OnStart() @@ -66,8 +71,8 @@ func (conR *ConsensusReactor) OnStop() { conR.conS.Stop() } -// Switch from the fast_sync to the consensus: -// reset the state, turn off fast_sync, start the consensus-state-machine +// SwitchToConsensus switches from fast_sync mode to consensus mode. +// It resets the state, turns off fast_sync, and starts the consensus state-machine func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { conR.Logger.Info("SwitchToConsensus") conR.conS.reconstructLastCommit(state) @@ -183,7 +188,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID) // Respond with a VoteSetBitsMessage showing which votes we have. // (and consequently shows which we don't have) - var ourVotes *BitArray + var ourVotes *cmn.BitArray switch msg.Type { case types.VoteTypePrevote: ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) @@ -201,7 +206,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) Votes: ourVotes, }}) default: - conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } case DataChannel: @@ -219,7 +224,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) conR.conS.peerMsgQueue <- msgInfo{msg, src.Key} default: - conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } case VoteChannel: @@ -241,7 +246,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) default: // don't punish (leave room for soft upgrades) - conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } case VoteSetBitsChannel: @@ -257,7 +262,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) cs.mtx.Unlock() if height == msg.Height { - var ourVotes *BitArray + var ourVotes *cmn.BitArray switch msg.Type { case types.VoteTypePrevote: ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) @@ -273,11 +278,11 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } default: // don't punish (leave room for soft upgrades) - conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) + conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg))) } default: - conR.Logger.Error(Fmt("Unknown chId %X", chID)) + conR.Logger.Error(cmn.Fmt("Unknown chId %X", chID)) } if err != nil { @@ -285,7 +290,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) } } -// implements events.Eventable +// SetEventSwitch implements events.Eventable func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { conR.evsw = evsw conR.conS.SetEventSwitch(evsw) @@ -406,43 +411,9 @@ OUTER_LOOP: // If the peer is on a previous height, help catch up. if (0 < prs.Height) && (prs.Height < rs.Height) { - if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { - // Ensure that the peer's PartSetHeader is correct - blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height) - if blockMeta == nil { - logger.Error("Failed to load block meta", "peer height", prs.Height, "ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height(), "pv", conR.conS.privValidator) - time.Sleep(conR.conS.config.PeerGossipSleep()) - continue OUTER_LOOP - } else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) { - logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping", - "peerHeight", prs.Height, "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) - time.Sleep(conR.conS.config.PeerGossipSleep()) - continue OUTER_LOOP - } - // Load the part - part := conR.conS.blockStore.LoadBlockPart(prs.Height, index) - if part == nil { - logger.Error("Could not load part", "index", index, - "peerHeight", prs.Height, "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) - time.Sleep(conR.conS.config.PeerGossipSleep()) - continue OUTER_LOOP - } - // Send the part - msg := &BlockPartMessage{ - Height: prs.Height, // Not our height, so it doesn't matter. - Round: prs.Round, // Not our height, so it doesn't matter. - Part: part, - } - logger.Debug("Sending block part for catchup", "height", prs.Height, "round", prs.Round) - if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { - ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) - } - continue OUTER_LOOP - } else { - //logger.Info("No parts to send in catch-up, sleeping") - time.Sleep(conR.conS.config.PeerGossipSleep()) - continue OUTER_LOOP - } + heightLogger := logger.With("height", prs.Height) + conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer) + continue OUTER_LOOP } // If height and round don't match, sleep. @@ -489,6 +460,49 @@ OUTER_LOOP: } } +func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *RoundState, + prs *PeerRoundState, ps *PeerState, peer *p2p.Peer) { + + if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { + // Ensure that the peer's PartSetHeader is correct + blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height) + if blockMeta == nil { + logger.Error("Failed to load block meta", + "ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height()) + time.Sleep(conR.conS.config.PeerGossipSleep()) + return + } else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) { + logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping", + "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) + time.Sleep(conR.conS.config.PeerGossipSleep()) + return + } + // Load the part + part := conR.conS.blockStore.LoadBlockPart(prs.Height, index) + if part == nil { + logger.Error("Could not load part", "index", index, + "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader) + time.Sleep(conR.conS.config.PeerGossipSleep()) + return + } + // Send the part + msg := &BlockPartMessage{ + Height: prs.Height, // Not our height, so it doesn't matter. + Round: prs.Round, // Not our height, so it doesn't matter. + Part: part, + } + logger.Debug("Sending block part for catchup", "round", prs.Round) + if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { + ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) + } + return + } else { + //logger.Info("No parts to send in catch-up, sleeping") + time.Sleep(conR.conS.config.PeerGossipSleep()) + return + } +} + func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) { logger := conR.Logger.With("peer", peer) @@ -517,35 +531,9 @@ OUTER_LOOP: // If height matches, then send LastCommit, Prevotes, Precommits. if rs.Height == prs.Height { - // If there are lastCommits to send... - if prs.Step == RoundStepNewHeight { - if ps.PickSendVote(rs.LastCommit) { - logger.Debug("Picked rs.LastCommit to send", "height", prs.Height) - continue OUTER_LOOP - } - } - // If there are prevotes to send... - if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { - logger.Debug("Picked rs.Prevotes(prs.Round) to send", "height", prs.Height, "round", prs.Round) - continue OUTER_LOOP - } - } - // If there are precommits to send... - if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { - logger.Debug("Picked rs.Precommits(prs.Round) to send", "height", prs.Height, "round", prs.Round) - continue OUTER_LOOP - } - } - // If there are POLPrevotes to send... - if prs.ProposalPOLRound != -1 { - if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ps.PickSendVote(polPrevotes) { - logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", "height", prs.Height, "round", prs.ProposalPOLRound) - continue OUTER_LOOP - } - } + heightLogger := logger.With("height", prs.Height) + if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) { + continue OUTER_LOOP } } @@ -587,6 +575,42 @@ OUTER_LOOP: } } +func (conR *ConsensusReactor) gossipVotesForHeight(logger log.Logger, rs *RoundState, prs *PeerRoundState, ps *PeerState) bool { + + // If there are lastCommits to send... + if prs.Step == RoundStepNewHeight { + if ps.PickSendVote(rs.LastCommit) { + logger.Debug("Picked rs.LastCommit to send") + return true + } + } + // If there are prevotes to send... + if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round { + if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) + return true + } + } + // If there are precommits to send... + if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round { + if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { + logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round) + return true + } + } + // If there are POLPrevotes to send... + if prs.ProposalPOLRound != -1 { + if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { + if ps.PickSendVote(polPrevotes) { + logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", + "round", prs.ProposalPOLRound) + return true + } + } + } + return false +} + // NOTE: `queryMaj23Routine` has a simple crude design since it only comes // into play for liveness when there's a signature DDoS attack happening. func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) { @@ -675,11 +699,15 @@ OUTER_LOOP: } } +// String returns a string representation of the ConsensusReactor. +// NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables. +// TODO: improve! func (conR *ConsensusReactor) String() string { // better not to access shared variables return "ConsensusReactor" // conR.StringIndented("") } +// StringIndented returns an indented string representation of the ConsensusReactor func (conR *ConsensusReactor) StringIndented(indent string) string { s := "ConsensusReactor{\n" s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n" @@ -693,7 +721,8 @@ func (conR *ConsensusReactor) StringIndented(indent string) string { //----------------------------------------------------------------------------- -// Read only when returned by PeerState.GetRoundState(). +// PeerRoundState contains the known state of a peer. +// NOTE: Read-only when returned by PeerState.GetRoundState(). type PeerRoundState struct { Height int // Height peer is at Round int // Round peer is at, -1 if unknown. @@ -701,21 +730,23 @@ type PeerRoundState struct { StartTime time.Time // Estimated start of round 0 at this height Proposal bool // True if peer has proposal for this round ProposalBlockPartsHeader types.PartSetHeader // - ProposalBlockParts *BitArray // + ProposalBlockParts *cmn.BitArray // ProposalPOLRound int // Proposal's POL round. -1 if none. - ProposalPOL *BitArray // nil until ProposalPOLMessage received. - Prevotes *BitArray // All votes peer has for this round - Precommits *BitArray // All precommits peer has for this round + ProposalPOL *cmn.BitArray // nil until ProposalPOLMessage received. + Prevotes *cmn.BitArray // All votes peer has for this round + Precommits *cmn.BitArray // All precommits peer has for this round LastCommitRound int // Round of commit for last height. -1 if none. - LastCommit *BitArray // All commit precommits of commit for last height. + LastCommit *cmn.BitArray // All commit precommits of commit for last height. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none. - CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound + CatchupCommit *cmn.BitArray // All commit precommits peer has for this height & CatchupCommitRound } +// String returns a string representation of the PeerRoundState func (prs PeerRoundState) String() string { return prs.StringIndented("") } +// StringIndented returns a string representation of the PeerRoundState func (prs PeerRoundState) StringIndented(indent string) string { return fmt.Sprintf(`PeerRoundState{ %s %v/%v/%v @%v @@ -743,6 +774,8 @@ var ( ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime") ) +// PeerState contains the known state of a peer, including its connection +// and threadsafe access to its PeerRoundState. type PeerState struct { Peer *p2p.Peer @@ -750,6 +783,7 @@ type PeerState struct { PeerRoundState } +// NewPeerState returns a new PeerState for the given Peer func NewPeerState(peer *p2p.Peer) *PeerState { return &PeerState{ Peer: peer, @@ -762,7 +796,7 @@ func NewPeerState(peer *p2p.Peer) *PeerState { } } -// Returns an atomic snapshot of the PeerRoundState. +// GetRoundState returns an atomic snapshot of the PeerRoundState. // There's no point in mutating it since it won't change PeerState. func (ps *PeerState) GetRoundState() *PeerRoundState { ps.mtx.Lock() @@ -772,7 +806,7 @@ func (ps *PeerState) GetRoundState() *PeerRoundState { return &prs } -// Returns an atomic snapshot of the PeerRoundState's height +// GetHeight returns an atomic snapshot of the PeerRoundState's height // used by the mempool to ensure peers are caught up before broadcasting new txs func (ps *PeerState) GetHeight() int { ps.mtx.Lock() @@ -780,6 +814,7 @@ func (ps *PeerState) GetHeight() int { return ps.PeerRoundState.Height } +// SetHasProposal sets the given proposal as known for the peer. func (ps *PeerState) SetHasProposal(proposal *types.Proposal) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -793,11 +828,12 @@ func (ps *PeerState) SetHasProposal(proposal *types.Proposal) { ps.Proposal = true ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader - ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total) + ps.ProposalBlockParts = cmn.NewBitArray(proposal.BlockPartsHeader.Total) ps.ProposalPOLRound = proposal.POLRound ps.ProposalPOL = nil // Nil until ProposalPOLMessage received. } +// SetHasProposalBlockPart sets the given block part index as known for the peer. func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -809,9 +845,9 @@ func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) { ps.ProposalBlockParts.SetIndex(index, true) } -// PickVoteToSend sends vote to peer. +// PickSendVote picks a vote and sends it to the peer. // Returns true if vote was sent. -func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) { +func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { if vote, ok := ps.PickVoteToSend(votes); ok { msg := &VoteMessage{vote} return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg}) @@ -819,7 +855,9 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) { return false } -// votes: Must be the correct Size() for the Height(). +// PickVoteToSend picks a vote to send to the peer. +// Returns true if a vote was picked. +// NOTE: `votes` must be the correct Size() for the Height(). func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -847,9 +885,9 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote return nil, false } -func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray { +func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *cmn.BitArray { if !types.IsVoteTypeValid(type_) { - PanicSanity("Invalid vote type") + cmn.PanicSanity("Invalid vote type") } if ps.Height == height { @@ -902,7 +940,7 @@ func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators i NOTE: This is wrong, 'round' could change. e.g. if orig round is not the same as block LastCommit round. if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round { - PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round)) + cmn.PanicSanity(cmn.Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round)) } */ if ps.CatchupCommitRound == round { @@ -912,10 +950,12 @@ func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators i if round == ps.Round { ps.CatchupCommit = ps.Precommits } else { - ps.CatchupCommit = NewBitArray(numValidators) + ps.CatchupCommit = cmn.NewBitArray(numValidators) } } +// EnsureVoteVitArrays ensures the bit-arrays have been allocated for tracking +// what votes this peer has received. // NOTE: It's important to make sure that numValidators actually matches // what the node sees as the number of validators for height. func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) { @@ -927,24 +967,25 @@ func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) { func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) { if ps.Height == height { if ps.Prevotes == nil { - ps.Prevotes = NewBitArray(numValidators) + ps.Prevotes = cmn.NewBitArray(numValidators) } if ps.Precommits == nil { - ps.Precommits = NewBitArray(numValidators) + ps.Precommits = cmn.NewBitArray(numValidators) } if ps.CatchupCommit == nil { - ps.CatchupCommit = NewBitArray(numValidators) + ps.CatchupCommit = cmn.NewBitArray(numValidators) } if ps.ProposalPOL == nil { - ps.ProposalPOL = NewBitArray(numValidators) + ps.ProposalPOL = cmn.NewBitArray(numValidators) } } else if ps.Height == height+1 { if ps.LastCommit == nil { - ps.LastCommit = NewBitArray(numValidators) + ps.LastCommit = cmn.NewBitArray(numValidators) } } } +// SetHasVote sets the given vote as known by the peer func (ps *PeerState) SetHasVote(vote *types.Vote) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -960,6 +1001,7 @@ func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) { ps.getVoteBitArray(height, round, type_).SetIndex(index, true) } +// ApplyNewRoundStepMessage updates the peer state for the new round. func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1013,6 +1055,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { } } +// ApplyCommitStepMessage updates the peer state for the new commit. func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1025,6 +1068,7 @@ func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) { ps.ProposalBlockParts = msg.BlockParts } +// ApplyProposalPOLMessage updates the peer state for the new proposal POL. func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1041,6 +1085,7 @@ func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) { ps.ProposalPOL = msg.ProposalPOL } +// ApplyHasVoteMessage updates the peer state for the new vote. func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1052,12 +1097,12 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index) } -// The peer has responded with a bitarray of votes that it has -// of the corresponding BlockID. -// ourVotes: BitArray of votes we have for msg.BlockID +// ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes +// it claims to have for the corresponding BlockID. +// `ourVotes` is a BitArray of votes we have for msg.BlockID // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height), // we conservatively overwrite ps's votes w/ msg.Votes. -func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *BitArray) { +func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *cmn.BitArray) { ps.mtx.Lock() defer ps.mtx.Unlock() @@ -1073,10 +1118,12 @@ func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes * } } +// String returns a string representation of the PeerState func (ps *PeerState) String() string { return ps.StringIndented("") } +// StringIndented returns a string representation of the PeerState func (ps *PeerState) StringIndented(indent string) string { return fmt.Sprintf(`PeerState{ %s Key %v @@ -1102,6 +1149,7 @@ const ( msgTypeVoteSetBits = byte(0x17) ) +// ConsensusMessage is a message that can be sent and received on the ConsensusReactor type ConsensusMessage interface{} var _ = wire.RegisterInterface( @@ -1117,17 +1165,20 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits}, ) +// DecodeMessage decodes the given bytes into a ConsensusMessage. // TODO: check for unnecessary extra bytes at the end. func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) { msgType = bz[0] n := new(int) r := bytes.NewReader(bz) - msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err).(struct{ ConsensusMessage }).ConsensusMessage + msgI := wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err) + msg = msgI.(struct{ ConsensusMessage }).ConsensusMessage return } //------------------------------------- +// NewRoundStepMessage is sent for every step taken in the ConsensusState. // For every height/round/step transition type NewRoundStepMessage struct { Height int @@ -1137,6 +1188,7 @@ type NewRoundStepMessage struct { LastCommitRound int } +// String returns a string representation. func (m *NewRoundStepMessage) String() string { return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]", m.Height, m.Round, m.Step, m.LastCommitRound) @@ -1144,62 +1196,73 @@ func (m *NewRoundStepMessage) String() string { //------------------------------------- +// CommitStepMessage is sent when a block is committed. type CommitStepMessage struct { Height int BlockPartsHeader types.PartSetHeader - BlockParts *BitArray + BlockParts *cmn.BitArray } +// String returns a string representation. func (m *CommitStepMessage) String() string { return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts) } //------------------------------------- +// ProposalMessage is sent when a new block is proposed. type ProposalMessage struct { Proposal *types.Proposal } +// String returns a string representation. func (m *ProposalMessage) String() string { return fmt.Sprintf("[Proposal %v]", m.Proposal) } //------------------------------------- +// ProposalPOLMessage is sent when a previous proposal is re-proposed. type ProposalPOLMessage struct { Height int ProposalPOLRound int - ProposalPOL *BitArray + ProposalPOL *cmn.BitArray } +// String returns a string representation. func (m *ProposalPOLMessage) String() string { return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL) } //------------------------------------- +// BlockPartMessage is sent when gossipping a piece of the proposed block. type BlockPartMessage struct { Height int Round int Part *types.Part } +// String returns a string representation. func (m *BlockPartMessage) String() string { return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part) } //------------------------------------- +// VoteMessage is sent when voting for a proposal (or lack thereof). type VoteMessage struct { Vote *types.Vote } +// String returns a string representation. func (m *VoteMessage) String() string { return fmt.Sprintf("[Vote %v]", m.Vote) } //------------------------------------- +// HasVoteMessage is sent to indicate that a particular vote has been received. type HasVoteMessage struct { Height int Round int @@ -1207,12 +1270,14 @@ type HasVoteMessage struct { Index int } +// String returns a string representation. func (m *HasVoteMessage) String() string { return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index) } //------------------------------------- +// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes. type VoteSetMaj23Message struct { Height int Round int @@ -1220,20 +1285,23 @@ type VoteSetMaj23Message struct { BlockID types.BlockID } +// String returns a string representation. func (m *VoteSetMaj23Message) String() string { return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID) } //------------------------------------- +// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID. type VoteSetBitsMessage struct { Height int Round int Type byte BlockID types.BlockID - Votes *BitArray + Votes *cmn.BitArray } +// String returns a string representation. func (m *VoteSetBitsMessage) String() string { return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes) } diff --git a/p2p/peer.go b/p2p/peer.go index db91c84f..c15f61d3 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -59,7 +59,9 @@ func DefaultPeerConfig() *PeerConfig { } } -func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { +func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, + onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + conn, err := dial(addr, config) if err != nil { return nil, errors.Wrap(err, "Error creating peer") @@ -73,11 +75,15 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs [] return peer, nil } -func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { +func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, + onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) } -func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { +func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, + onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { + conn := rawConn // Fuzz connection @@ -278,7 +284,9 @@ func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { return conn, nil } -func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { +func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, + onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { + onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { From bcde80fc4f13378da0a8d4c7a92c7cdc03a7897a Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 9 Jul 2017 18:49:22 -0400 Subject: [PATCH 12/13] changelog --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83286d8e..2faec6fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,9 @@ FEATURES: - Enable lower latency block commits by adding consensus reactor sleep durations and p2p flush throttle timeout to the config IMPROVEMENTS: -- Better consensus logging +- More detailed logging in the consensus reactor and state machine +- More in-code documentation for many exposed functions, especially in consensus/reactor.go and p2p/switch.go +- Improved readability for some function definitions and code blocks with long lines ## 0.10.1 (June 28, 2017) From 75df0d91babb61e8480bb23838bb65fc81441fee Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Mon, 10 Jul 2017 13:02:46 -0400 Subject: [PATCH 13/13] comments from review --- CHANGELOG.md | 2 +- cmd/tendermint/commands/root_test.go | 3 ++- p2p/connection.go | 6 +++++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2faec6fd..438a2a42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.10.2 (TBD) +## 0.10.2 (July 10, 2017) FEATURES: - Enable lower latency block commits by adding consensus reactor sleep durations and p2p flush throttle timeout to the config diff --git a/cmd/tendermint/commands/root_test.go b/cmd/tendermint/commands/root_test.go index 01d69bb6..ae0f3827 100644 --- a/cmd/tendermint/commands/root_test.go +++ b/cmd/tendermint/commands/root_test.go @@ -23,7 +23,8 @@ const ( ) // isolate provides a clean setup and returns a copy of RootCmd you can -// modify in the test cases +// modify in the test cases. +// NOTE: it unsets all TM* env variables. func isolate(cmds ...*cobra.Command) cli.Executable { os.Unsetenv("TMHOME") os.Unsetenv("TM_HOME") diff --git a/p2p/connection.go b/p2p/connection.go index f86a211d..1d97d455 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -21,7 +21,11 @@ const ( minWriteBufferSize = 65536 updateState = 2 * time.Second pingTimeout = 40 * time.Second - flushThrottle = 100 * time.Millisecond + + // flushThrottle used here as a default. + // overwritten by the user config. + // TODO: remove + flushThrottle = 100 * time.Millisecond defaultSendQueueCapacity = 1 defaultSendRate = int64(512000) // 500KB/s