Merge pull request #576 from tendermint/release-v0.10.2

Release v0.10.2
This commit is contained in:
Ethan Buchman 2017-07-10 16:22:09 -04:00 committed by GitHub
commit b467515719
11 changed files with 388 additions and 214 deletions

View File

@ -1,5 +1,15 @@
# Changelog # Changelog
## 0.10.2 (July 10, 2017)
FEATURES:
- Enable lower latency block commits by adding consensus reactor sleep durations and p2p flush throttle timeout to the config
IMPROVEMENTS:
- More detailed logging in the consensus reactor and state machine
- More in-code documentation for many exposed functions, especially in consensus/reactor.go and p2p/switch.go
- Improved readability for some function definitions and code blocks with long lines
## 0.10.1 (June 28, 2017) ## 0.10.1 (June 28, 2017)
FEATURES: FEATURES:

View File

@ -23,8 +23,14 @@ const (
) )
// isolate provides a clean setup and returns a copy of RootCmd you can // isolate provides a clean setup and returns a copy of RootCmd you can
// modify in the test cases // modify in the test cases.
// NOTE: it unsets all TM* env variables.
func isolate(cmds ...*cobra.Command) cli.Executable { func isolate(cmds ...*cobra.Command) cli.Executable {
os.Unsetenv("TMHOME")
os.Unsetenv("TM_HOME")
os.Unsetenv("TMROOT")
os.Unsetenv("TM_ROOT")
viper.Reset() viper.Reset()
config = cfg.DefaultConfig() config = cfg.DefaultConfig()
r := &cobra.Command{ r := &cobra.Command{

View File

@ -5,9 +5,10 @@ import (
"path/filepath" "path/filepath"
"time" "time"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types" // TODO: remove
) )
// Config defines the top level configuration for a Tendermint node
type Config struct { type Config struct {
// Top level options use an anonymous struct // Top level options use an anonymous struct
BaseConfig `mapstructure:",squash"` BaseConfig `mapstructure:",squash"`
@ -19,6 +20,7 @@ type Config struct {
Consensus *ConsensusConfig `mapstructure:"consensus"` Consensus *ConsensusConfig `mapstructure:"consensus"`
} }
// DefaultConfig returns a default configuration for a Tendermint node
func DefaultConfig() *Config { func DefaultConfig() *Config {
return &Config{ return &Config{
BaseConfig: DefaultBaseConfig(), BaseConfig: DefaultBaseConfig(),
@ -29,6 +31,7 @@ func DefaultConfig() *Config {
} }
} }
// TestConfig returns a configuration that can be used for testing
func TestConfig() *Config { func TestConfig() *Config {
return &Config{ return &Config{
BaseConfig: TestBaseConfig(), BaseConfig: TestBaseConfig(),
@ -39,7 +42,7 @@ func TestConfig() *Config {
} }
} }
// Set the RootDir for all Config structs // SetRoot sets the RootDir for all Config structs
func (cfg *Config) SetRoot(root string) *Config { func (cfg *Config) SetRoot(root string) *Config {
cfg.BaseConfig.RootDir = root cfg.BaseConfig.RootDir = root
cfg.RPC.RootDir = root cfg.RPC.RootDir = root
@ -52,7 +55,7 @@ func (cfg *Config) SetRoot(root string) *Config {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// BaseConfig // BaseConfig
// BaseConfig struct for a Tendermint node // BaseConfig defines the base configuration for a Tendermint node
type BaseConfig struct { type BaseConfig struct {
// The root directory for all data. // The root directory for all data.
// This should be set in viper so it can unmarshal into this struct // This should be set in viper so it can unmarshal into this struct
@ -102,6 +105,7 @@ type BaseConfig struct {
DBPath string `mapstructure:"db_dir"` DBPath string `mapstructure:"db_dir"`
} }
// DefaultBaseConfig returns a default base configuration for a Tendermint node
func DefaultBaseConfig() BaseConfig { func DefaultBaseConfig() BaseConfig {
return BaseConfig{ return BaseConfig{
Genesis: "genesis.json", Genesis: "genesis.json",
@ -119,6 +123,7 @@ func DefaultBaseConfig() BaseConfig {
} }
} }
// TestBaseConfig returns a base configuration for testing a Tendermint node
func TestBaseConfig() BaseConfig { func TestBaseConfig() BaseConfig {
conf := DefaultBaseConfig() conf := DefaultBaseConfig()
conf.ChainID = "tendermint_test" conf.ChainID = "tendermint_test"
@ -128,22 +133,27 @@ func TestBaseConfig() BaseConfig {
return conf return conf
} }
// GenesisFile returns the full path to the genesis.json file
func (b BaseConfig) GenesisFile() string { func (b BaseConfig) GenesisFile() string {
return rootify(b.Genesis, b.RootDir) return rootify(b.Genesis, b.RootDir)
} }
// PrivValidatorFile returns the full path to the priv_validator.json file
func (b BaseConfig) PrivValidatorFile() string { func (b BaseConfig) PrivValidatorFile() string {
return rootify(b.PrivValidator, b.RootDir) return rootify(b.PrivValidator, b.RootDir)
} }
// DBDir returns the full path to the database directory
func (b BaseConfig) DBDir() string { func (b BaseConfig) DBDir() string {
return rootify(b.DBPath, b.RootDir) return rootify(b.DBPath, b.RootDir)
} }
// DefaultLogLevel returns a default log level of "error"
func DefaultLogLevel() string { func DefaultLogLevel() string {
return "error" return "error"
} }
// DefaultPackageLogLevels returns a default log level setting so all packages log at "error", while the `state` package logs at "info"
func DefaultPackageLogLevels() string { func DefaultPackageLogLevels() string {
return fmt.Sprintf("state:info,*:%s", DefaultLogLevel()) return fmt.Sprintf("state:info,*:%s", DefaultLogLevel())
} }
@ -151,6 +161,7 @@ func DefaultPackageLogLevels() string {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// RPCConfig // RPCConfig
// RPCConfig defines the configuration options for the Tendermint RPC server
type RPCConfig struct { type RPCConfig struct {
RootDir string `mapstructure:"home"` RootDir string `mapstructure:"home"`
@ -165,6 +176,7 @@ type RPCConfig struct {
Unsafe bool `mapstructure:"unsafe"` Unsafe bool `mapstructure:"unsafe"`
} }
// DefaultRPCConfig returns a default configuration for the RPC server
func DefaultRPCConfig() *RPCConfig { func DefaultRPCConfig() *RPCConfig {
return &RPCConfig{ return &RPCConfig{
ListenAddress: "tcp://0.0.0.0:46657", ListenAddress: "tcp://0.0.0.0:46657",
@ -173,6 +185,7 @@ func DefaultRPCConfig() *RPCConfig {
} }
} }
// TestRPCConfig returns a configuration for testing the RPC server
func TestRPCConfig() *RPCConfig { func TestRPCConfig() *RPCConfig {
conf := DefaultRPCConfig() conf := DefaultRPCConfig()
conf.ListenAddress = "tcp://0.0.0.0:36657" conf.ListenAddress = "tcp://0.0.0.0:36657"
@ -184,26 +197,47 @@ func TestRPCConfig() *RPCConfig {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// P2PConfig // P2PConfig
// P2PConfig defines the configuration options for the Tendermint peer-to-peer networking layer
type P2PConfig struct { type P2PConfig struct {
RootDir string `mapstructure:"home"` RootDir string `mapstructure:"home"`
ListenAddress string `mapstructure:"laddr"`
Seeds string `mapstructure:"seeds"` // Address to listen for incoming connections
SkipUPNP bool `mapstructure:"skip_upnp"` ListenAddress string `mapstructure:"laddr"`
AddrBook string `mapstructure:"addr_book_file"`
AddrBookStrict bool `mapstructure:"addr_book_strict"` // Comma separated list of seed nodes to connect to
PexReactor bool `mapstructure:"pex"` Seeds string `mapstructure:"seeds"`
MaxNumPeers int `mapstructure:"max_num_peers"`
// Skip UPNP port forwarding
SkipUPNP bool `mapstructure:"skip_upnp"`
// Path to address book
AddrBook string `mapstructure:"addr_book_file"`
// Set true for strict address routability rules
AddrBookStrict bool `mapstructure:"addr_book_strict"`
// Set true to enable the peer-exchange reactor
PexReactor bool `mapstructure:"pex"`
// Maximum number of peers to connect to
MaxNumPeers int `mapstructure:"max_num_peers"`
// Time to wait before flushing messages out on the connection. In ms
FlushThrottleTimeout int `mapstructure:"flush_throttle_timeout"`
} }
// DefaultP2PConfig returns a default configuration for the peer-to-peer layer
func DefaultP2PConfig() *P2PConfig { func DefaultP2PConfig() *P2PConfig {
return &P2PConfig{ return &P2PConfig{
ListenAddress: "tcp://0.0.0.0:46656", ListenAddress: "tcp://0.0.0.0:46656",
AddrBook: "addrbook.json", AddrBook: "addrbook.json",
AddrBookStrict: true, AddrBookStrict: true,
MaxNumPeers: 50, MaxNumPeers: 50,
FlushThrottleTimeout: 100,
} }
} }
// TestP2PConfig returns a configuration for testing the peer-to-peer layer
func TestP2PConfig() *P2PConfig { func TestP2PConfig() *P2PConfig {
conf := DefaultP2PConfig() conf := DefaultP2PConfig()
conf.ListenAddress = "tcp://0.0.0.0:36656" conf.ListenAddress = "tcp://0.0.0.0:36656"
@ -211,6 +245,7 @@ func TestP2PConfig() *P2PConfig {
return conf return conf
} }
// AddrBookFile returns the full path to the address bool
func (p *P2PConfig) AddrBookFile() string { func (p *P2PConfig) AddrBookFile() string {
return rootify(p.AddrBook, p.RootDir) return rootify(p.AddrBook, p.RootDir)
} }
@ -218,6 +253,7 @@ func (p *P2PConfig) AddrBookFile() string {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// MempoolConfig // MempoolConfig
// MempoolConfig defines the configuration options for the Tendermint mempool
type MempoolConfig struct { type MempoolConfig struct {
RootDir string `mapstructure:"home"` RootDir string `mapstructure:"home"`
Recheck bool `mapstructure:"recheck"` Recheck bool `mapstructure:"recheck"`
@ -226,6 +262,7 @@ type MempoolConfig struct {
WalPath string `mapstructure:"wal_dir"` WalPath string `mapstructure:"wal_dir"`
} }
// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
func DefaultMempoolConfig() *MempoolConfig { func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{ return &MempoolConfig{
Recheck: true, Recheck: true,
@ -235,6 +272,7 @@ func DefaultMempoolConfig() *MempoolConfig {
} }
} }
// WalDir returns the full path to the mempool's write-ahead log
func (m *MempoolConfig) WalDir() string { func (m *MempoolConfig) WalDir() string {
return rootify(m.WalPath, m.RootDir) return rootify(m.WalPath, m.RootDir)
} }
@ -242,8 +280,8 @@ func (m *MempoolConfig) WalDir() string {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// ConsensusConfig // ConsensusConfig
// ConsensusConfig holds timeouts and details about the WAL, the block structure, // ConsensusConfig defines the confuguration for the Tendermint consensus service,
// and timeouts in the consensus protocol. // including timeouts and details about the WAL and the block structure.
type ConsensusConfig struct { type ConsensusConfig struct {
RootDir string `mapstructure:"home"` RootDir string `mapstructure:"home"`
WalPath string `mapstructure:"wal_file"` WalPath string `mapstructure:"wal_file"`
@ -269,46 +307,64 @@ type ConsensusConfig struct {
// TODO: This probably shouldn't be exposed but it makes it // TODO: This probably shouldn't be exposed but it makes it
// easy to write tests for the wal/replay // easy to write tests for the wal/replay
BlockPartSize int `mapstructure:"block_part_size"` BlockPartSize int `mapstructure:"block_part_size"`
// Reactor sleep duration parameters are in ms
PeerGossipSleepDuration int `mapstructure:"peer_gossip_sleep_duration"`
PeerQueryMaj23SleepDuration int `mapstructure:"peer_query_maj23_sleep_duration"`
} }
// Wait this long for a proposal // Propose returns the amount of time to wait for a proposal
func (cfg *ConsensusConfig) Propose(round int) time.Duration { func (cfg *ConsensusConfig) Propose(round int) time.Duration {
return time.Duration(cfg.TimeoutPropose+cfg.TimeoutProposeDelta*round) * time.Millisecond return time.Duration(cfg.TimeoutPropose+cfg.TimeoutProposeDelta*round) * time.Millisecond
} }
// After receiving any +2/3 prevote, wait this long for stragglers // Prevote returns the amount of time to wait for straggler votes after receiving any +2/3 prevotes
func (cfg *ConsensusConfig) Prevote(round int) time.Duration { func (cfg *ConsensusConfig) Prevote(round int) time.Duration {
return time.Duration(cfg.TimeoutPrevote+cfg.TimeoutPrevoteDelta*round) * time.Millisecond return time.Duration(cfg.TimeoutPrevote+cfg.TimeoutPrevoteDelta*round) * time.Millisecond
} }
// After receiving any +2/3 precommits, wait this long for stragglers // Precommit returns the amount of time to wait for straggler votes after receiving any +2/3 precommits
func (cfg *ConsensusConfig) Precommit(round int) time.Duration { func (cfg *ConsensusConfig) Precommit(round int) time.Duration {
return time.Duration(cfg.TimeoutPrecommit+cfg.TimeoutPrecommitDelta*round) * time.Millisecond return time.Duration(cfg.TimeoutPrecommit+cfg.TimeoutPrecommitDelta*round) * time.Millisecond
} }
// After receiving +2/3 precommits for a single block (a commit), wait this long for stragglers in the next height's RoundStepNewHeight // Commit returns the amount of time to wait for straggler votes after receiving +2/3 precommits for a single block (ie. a commit).
func (cfg *ConsensusConfig) Commit(t time.Time) time.Time { func (cfg *ConsensusConfig) Commit(t time.Time) time.Time {
return t.Add(time.Duration(cfg.TimeoutCommit) * time.Millisecond) return t.Add(time.Duration(cfg.TimeoutCommit) * time.Millisecond)
} }
// PeerGossipSleep returns the amount of time to sleep if there is nothing to send from the ConsensusReactor
func (cfg *ConsensusConfig) PeerGossipSleep() time.Duration {
return time.Duration(cfg.PeerGossipSleepDuration) * time.Millisecond
}
// PeerQueryMaj23Sleep returns the amount of time to sleep after each VoteSetMaj23Message is sent in the ConsensusReactor
func (cfg *ConsensusConfig) PeerQueryMaj23Sleep() time.Duration {
return time.Duration(cfg.PeerQueryMaj23SleepDuration) * time.Millisecond
}
// DefaultConsensusConfig returns a default configuration for the consensus service
func DefaultConsensusConfig() *ConsensusConfig { func DefaultConsensusConfig() *ConsensusConfig {
return &ConsensusConfig{ return &ConsensusConfig{
WalPath: "data/cs.wal/wal", WalPath: "data/cs.wal/wal",
WalLight: false, WalLight: false,
TimeoutPropose: 3000, TimeoutPropose: 3000,
TimeoutProposeDelta: 500, TimeoutProposeDelta: 500,
TimeoutPrevote: 1000, TimeoutPrevote: 1000,
TimeoutPrevoteDelta: 500, TimeoutPrevoteDelta: 500,
TimeoutPrecommit: 1000, TimeoutPrecommit: 1000,
TimeoutPrecommitDelta: 500, TimeoutPrecommitDelta: 500,
TimeoutCommit: 1000, TimeoutCommit: 1000,
SkipTimeoutCommit: false, SkipTimeoutCommit: false,
MaxBlockSizeTxs: 10000, MaxBlockSizeTxs: 10000,
MaxBlockSizeBytes: 1, // TODO MaxBlockSizeBytes: 1, // TODO
BlockPartSize: types.DefaultBlockPartSize, // TODO: we shouldnt be importing types BlockPartSize: types.DefaultBlockPartSize, // TODO: we shouldnt be importing types
PeerGossipSleepDuration: 100,
PeerQueryMaj23SleepDuration: 2000,
} }
} }
// TestConsensusConfig returns a configuration for testing the consensus service
func TestConsensusConfig() *ConsensusConfig { func TestConsensusConfig() *ConsensusConfig {
config := DefaultConsensusConfig() config := DefaultConsensusConfig()
config.TimeoutPropose = 2000 config.TimeoutPropose = 2000
@ -322,6 +378,7 @@ func TestConsensusConfig() *ConsensusConfig {
return config return config
} }
// WalFile returns the full path to the write-ahead log file
func (c *ConsensusConfig) WalFile() string { func (c *ConsensusConfig) WalFile() string {
if c.walFile != "" { if c.walFile != "" {
return c.walFile return c.walFile
@ -329,6 +386,7 @@ func (c *ConsensusConfig) WalFile() string {
return rootify(c.WalPath, c.RootDir) return rootify(c.WalPath, c.RootDir)
} }
// SetWalFile sets the path to the write-ahead log file
func (c *ConsensusConfig) SetWalFile(walFile string) { func (c *ConsensusConfig) SetWalFile(walFile string) {
c.walFile = walFile c.walFile = walFile
} }

View File

@ -9,10 +9,12 @@ import (
"time" "time"
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
. "github.com/tendermint/tmlibs/common"
) )
const ( const (
@ -21,13 +23,12 @@ const (
VoteChannel = byte(0x22) VoteChannel = byte(0x22)
VoteSetBitsChannel = byte(0x23) VoteSetBitsChannel = byte(0x23)
peerGossipSleepDuration = 100 * time.Millisecond // Time to sleep if there's nothing to send. maxConsensusMessageSize = 1048576 // 1MB; NOTE/TODO: keep in sync with types.PartSet sizes.
peerQueryMaj23SleepDuration = 2 * time.Second // Time to sleep after each VoteSetMaj23Message sent
maxConsensusMessageSize = 1048576 // 1MB; NOTE: keep in sync with types.PartSet sizes.
) )
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// ConsensusReactor defines a reactor for the consensus service.
type ConsensusReactor struct { type ConsensusReactor struct {
p2p.BaseReactor // BaseService + p2p.Switch p2p.BaseReactor // BaseService + p2p.Switch
@ -36,6 +37,7 @@ type ConsensusReactor struct {
evsw types.EventSwitch evsw types.EventSwitch
} }
// NewConsensusReactor returns a new ConsensusReactor with the given consensusState.
func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor { func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *ConsensusReactor {
conR := &ConsensusReactor{ conR := &ConsensusReactor{
conS: consensusState, conS: consensusState,
@ -45,6 +47,7 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens
return conR return conR
} }
// OnStart implements BaseService.
func (conR *ConsensusReactor) OnStart() error { func (conR *ConsensusReactor) OnStart() error {
conR.Logger.Info("ConsensusReactor ", "fastSync", conR.fastSync) conR.Logger.Info("ConsensusReactor ", "fastSync", conR.fastSync)
conR.BaseReactor.OnStart() conR.BaseReactor.OnStart()
@ -62,13 +65,14 @@ func (conR *ConsensusReactor) OnStart() error {
return nil return nil
} }
// OnStop implements BaseService
func (conR *ConsensusReactor) OnStop() { func (conR *ConsensusReactor) OnStop() {
conR.BaseReactor.OnStop() conR.BaseReactor.OnStop()
conR.conS.Stop() conR.conS.Stop()
} }
// Switch from the fast_sync to the consensus: // SwitchToConsensus switches from fast_sync mode to consensus mode.
// reset the state, turn off fast_sync, start the consensus-state-machine // It resets the state, turns off fast_sync, and starts the consensus state-machine
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) { func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
conR.Logger.Info("SwitchToConsensus") conR.Logger.Info("SwitchToConsensus")
conR.conS.reconstructLastCommit(state) conR.conS.reconstructLastCommit(state)
@ -79,7 +83,7 @@ func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State) {
conR.conS.Start() conR.conS.Start()
} }
// Implements Reactor // GetChannels implements Reactor
func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor { func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
// TODO optimize // TODO optimize
return []*p2p.ChannelDescriptor{ return []*p2p.ChannelDescriptor{
@ -109,7 +113,7 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
} }
} }
// Implements Reactor // AddPeer implements Reactor
func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
if !conR.IsRunning() { if !conR.IsRunning() {
return return
@ -131,7 +135,7 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
} }
} }
// Implements Reactor // RemovePeer implements Reactor
func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
if !conR.IsRunning() { if !conR.IsRunning() {
return return
@ -140,7 +144,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
//peer.Data.Get(PeerStateKey).(*PeerState).Disconnect() //peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
} }
// Implements Reactor // Receive implements Reactor
// NOTE: We process these messages even when we're fast_syncing. // NOTE: We process these messages even when we're fast_syncing.
// Messages affect either a peer state or the consensus state. // Messages affect either a peer state or the consensus state.
// Peer state updates can happen in parallel, but processing of // Peer state updates can happen in parallel, but processing of
@ -184,7 +188,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID) votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID)
// Respond with a VoteSetBitsMessage showing which votes we have. // Respond with a VoteSetBitsMessage showing which votes we have.
// (and consequently shows which we don't have) // (and consequently shows which we don't have)
var ourVotes *BitArray var ourVotes *cmn.BitArray
switch msg.Type { switch msg.Type {
case types.VoteTypePrevote: case types.VoteTypePrevote:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
@ -202,7 +206,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
Votes: ourVotes, Votes: ourVotes,
}}) }})
default: default:
conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
case DataChannel: case DataChannel:
@ -220,7 +224,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index) ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key} conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
default: default:
conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
case VoteChannel: case VoteChannel:
@ -242,7 +246,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
default: default:
// don't punish (leave room for soft upgrades) // don't punish (leave room for soft upgrades)
conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
case VoteSetBitsChannel: case VoteSetBitsChannel:
@ -258,7 +262,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
cs.mtx.Unlock() cs.mtx.Unlock()
if height == msg.Height { if height == msg.Height {
var ourVotes *BitArray var ourVotes *cmn.BitArray
switch msg.Type { switch msg.Type {
case types.VoteTypePrevote: case types.VoteTypePrevote:
ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID)
@ -274,11 +278,11 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
} }
default: default:
// don't punish (leave room for soft upgrades) // don't punish (leave room for soft upgrades)
conR.Logger.Error(Fmt("Unknown message type %v", reflect.TypeOf(msg))) conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
} }
default: default:
conR.Logger.Error(Fmt("Unknown chId %X", chID)) conR.Logger.Error(cmn.Fmt("Unknown chId %X", chID))
} }
if err != nil { if err != nil {
@ -286,7 +290,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
} }
} }
// implements events.Eventable // SetEventSwitch implements events.Eventable
func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) {
conR.evsw = evsw conR.evsw = evsw
conR.conS.SetEventSwitch(evsw) conR.conS.SetEventSwitch(evsw)
@ -390,7 +394,6 @@ OUTER_LOOP:
// Send proposal Block parts? // Send proposal Block parts?
if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) { if rs.ProposalBlockParts.HasHeader(prs.ProposalBlockPartsHeader) {
//logger.Info("ProposalBlockParts matched", "blockParts", prs.ProposalBlockParts)
if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok { if index, ok := rs.ProposalBlockParts.BitArray().Sub(prs.ProposalBlockParts.Copy()).PickRandom(); ok {
part := rs.ProposalBlockParts.GetPart(index) part := rs.ProposalBlockParts.GetPart(index)
msg := &BlockPartMessage{ msg := &BlockPartMessage{
@ -398,6 +401,7 @@ OUTER_LOOP:
Round: rs.Round, // This tells peer that this part applies to us. Round: rs.Round, // This tells peer that this part applies to us.
Part: part, Part: part,
} }
logger.Debug("Sending block part", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index) ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
} }
@ -407,49 +411,15 @@ OUTER_LOOP:
// If the peer is on a previous height, help catch up. // If the peer is on a previous height, help catch up.
if (0 < prs.Height) && (prs.Height < rs.Height) { if (0 < prs.Height) && (prs.Height < rs.Height) {
//logger.Info("Data catchup", "height", rs.Height, "peerHeight", prs.Height, "peerProposalBlockParts", prs.ProposalBlockParts) heightLogger := logger.With("height", prs.Height)
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok { conR.gossipDataForCatchup(heightLogger, rs, prs, ps, peer)
// Ensure that the peer's PartSetHeader is correct continue OUTER_LOOP
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if blockMeta == nil {
logger.Error("Failed to load block meta", "peer height", prs.Height, "ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height(), "pv", conR.conS.privValidator)
time.Sleep(peerGossipSleepDuration)
continue OUTER_LOOP
} else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
"peerHeight", prs.Height, "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
time.Sleep(peerGossipSleepDuration)
continue OUTER_LOOP
}
// Load the part
part := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
if part == nil {
logger.Error("Could not load part", "index", index,
"peerHeight", prs.Height, "blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
time.Sleep(peerGossipSleepDuration)
continue OUTER_LOOP
}
// Send the part
msg := &BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
continue OUTER_LOOP
} else {
//logger.Info("No parts to send in catch-up, sleeping")
time.Sleep(peerGossipSleepDuration)
continue OUTER_LOOP
}
} }
// If height and round don't match, sleep. // If height and round don't match, sleep.
if (rs.Height != prs.Height) || (rs.Round != prs.Round) { if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
//logger.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer) //logger.Info("Peer Height|Round mismatch, sleeping", "peerHeight", prs.Height, "peerRound", prs.Round, "peer", peer)
time.Sleep(peerGossipSleepDuration) time.Sleep(conR.conS.config.PeerGossipSleep())
continue OUTER_LOOP continue OUTER_LOOP
} }
@ -463,6 +433,7 @@ OUTER_LOOP:
// Proposal: share the proposal metadata with peer. // Proposal: share the proposal metadata with peer.
{ {
msg := &ProposalMessage{Proposal: rs.Proposal} msg := &ProposalMessage{Proposal: rs.Proposal}
logger.Debug("Sending proposal", "height", prs.Height, "round", prs.Round)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) { if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposal(rs.Proposal) ps.SetHasProposal(rs.Proposal)
} }
@ -477,17 +448,61 @@ OUTER_LOOP:
ProposalPOLRound: rs.Proposal.POLRound, ProposalPOLRound: rs.Proposal.POLRound,
ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(), ProposalPOL: rs.Votes.Prevotes(rs.Proposal.POLRound).BitArray(),
} }
logger.Debug("Sending POL", "height", prs.Height, "round", prs.Round)
peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) peer.Send(DataChannel, struct{ ConsensusMessage }{msg})
} }
continue OUTER_LOOP continue OUTER_LOOP
} }
// Nothing to do. Sleep. // Nothing to do. Sleep.
time.Sleep(peerGossipSleepDuration) time.Sleep(conR.conS.config.PeerGossipSleep())
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *RoundState,
prs *PeerRoundState, ps *PeerState, peer *p2p.Peer) {
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
// Ensure that the peer's PartSetHeader is correct
blockMeta := conR.conS.blockStore.LoadBlockMeta(prs.Height)
if blockMeta == nil {
logger.Error("Failed to load block meta",
"ourHeight", rs.Height, "blockstoreHeight", conR.conS.blockStore.Height())
time.Sleep(conR.conS.config.PeerGossipSleep())
return
} else if !blockMeta.BlockID.PartsHeader.Equals(prs.ProposalBlockPartsHeader) {
logger.Info("Peer ProposalBlockPartsHeader mismatch, sleeping",
"blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
time.Sleep(conR.conS.config.PeerGossipSleep())
return
}
// Load the part
part := conR.conS.blockStore.LoadBlockPart(prs.Height, index)
if part == nil {
logger.Error("Could not load part", "index", index,
"blockPartsHeader", blockMeta.BlockID.PartsHeader, "peerBlockPartsHeader", prs.ProposalBlockPartsHeader)
time.Sleep(conR.conS.config.PeerGossipSleep())
return
}
// Send the part
msg := &BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
}
logger.Debug("Sending block part for catchup", "round", prs.Round)
if peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) {
ps.SetHasProposalBlockPart(prs.Height, prs.Round, index)
}
return
} else {
//logger.Info("No parts to send in catch-up, sleeping")
time.Sleep(conR.conS.config.PeerGossipSleep())
return
}
}
func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) { func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer) logger := conR.Logger.With("peer", peer)
@ -516,35 +531,9 @@ OUTER_LOOP:
// If height matches, then send LastCommit, Prevotes, Precommits. // If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height { if rs.Height == prs.Height {
// If there are lastCommits to send... heightLogger := logger.With("height", prs.Height)
if prs.Step == RoundStepNewHeight { if conR.gossipVotesForHeight(heightLogger, rs, prs, ps) {
if ps.PickSendVote(rs.LastCommit) { continue OUTER_LOOP
logger.Debug("Picked rs.LastCommit to send")
continue OUTER_LOOP
}
}
// If there are prevotes to send...
if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send")
continue OUTER_LOOP
}
}
// If there are precommits to send...
if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
logger.Debug("Picked rs.Precommits(prs.Round) to send")
continue OUTER_LOOP
}
}
// If there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send")
continue OUTER_LOOP
}
}
} }
} }
@ -552,7 +541,7 @@ OUTER_LOOP:
// If peer is lagging by height 1, send LastCommit. // If peer is lagging by height 1, send LastCommit.
if prs.Height != 0 && rs.Height == prs.Height+1 { if prs.Height != 0 && rs.Height == prs.Height+1 {
if ps.PickSendVote(rs.LastCommit) { if ps.PickSendVote(rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send") logger.Debug("Picked rs.LastCommit to send", "height", prs.Height)
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
@ -565,7 +554,7 @@ OUTER_LOOP:
commit := conR.conS.blockStore.LoadBlockCommit(prs.Height) commit := conR.conS.blockStore.LoadBlockCommit(prs.Height)
logger.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit) logger.Info("Loaded BlockCommit for catch-up", "height", prs.Height, "commit", commit)
if ps.PickSendVote(commit) { if ps.PickSendVote(commit) {
logger.Debug("Picked Catchup commit to send") logger.Debug("Picked Catchup commit to send", "height", prs.Height)
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
@ -573,7 +562,7 @@ OUTER_LOOP:
if sleeping == 0 { if sleeping == 0 {
// We sent nothing. Sleep... // We sent nothing. Sleep...
sleeping = 1 sleeping = 1
logger.Debug("No votes to send, sleeping", logger.Debug("No votes to send, sleeping", "rs.Height", rs.Height, "prs.Height", prs.Height,
"localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes, "localPV", rs.Votes.Prevotes(rs.Round).BitArray(), "peerPV", prs.Prevotes,
"localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits) "localPC", rs.Votes.Precommits(rs.Round).BitArray(), "peerPC", prs.Precommits)
} else if sleeping == 2 { } else if sleeping == 2 {
@ -581,11 +570,47 @@ OUTER_LOOP:
sleeping = 1 sleeping = 1
} }
time.Sleep(peerGossipSleepDuration) time.Sleep(conR.conS.config.PeerGossipSleep())
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
func (conR *ConsensusReactor) gossipVotesForHeight(logger log.Logger, rs *RoundState, prs *PeerRoundState, ps *PeerState) bool {
// If there are lastCommits to send...
if prs.Step == RoundStepNewHeight {
if ps.PickSendVote(rs.LastCommit) {
logger.Debug("Picked rs.LastCommit to send")
return true
}
}
// If there are prevotes to send...
if prs.Step <= RoundStepPrevote && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) {
logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are precommits to send...
if prs.Step <= RoundStepPrecommit && prs.Round != -1 && prs.Round <= rs.Round {
if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) {
logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round)
return true
}
}
// If there are POLPrevotes to send...
if prs.ProposalPOLRound != -1 {
if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil {
if ps.PickSendVote(polPrevotes) {
logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send",
"round", prs.ProposalPOLRound)
return true
}
}
}
return false
}
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes // NOTE: `queryMaj23Routine` has a simple crude design since it only comes
// into play for liveness when there's a signature DDoS attack happening. // into play for liveness when there's a signature DDoS attack happening.
func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) { func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) {
@ -611,7 +636,7 @@ OUTER_LOOP:
Type: types.VoteTypePrevote, Type: types.VoteTypePrevote,
BlockID: maj23, BlockID: maj23,
}}) }})
time.Sleep(peerQueryMaj23SleepDuration) time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
} }
} }
} }
@ -628,7 +653,7 @@ OUTER_LOOP:
Type: types.VoteTypePrecommit, Type: types.VoteTypePrecommit,
BlockID: maj23, BlockID: maj23,
}}) }})
time.Sleep(peerQueryMaj23SleepDuration) time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
} }
} }
} }
@ -645,7 +670,7 @@ OUTER_LOOP:
Type: types.VoteTypePrevote, Type: types.VoteTypePrevote,
BlockID: maj23, BlockID: maj23,
}}) }})
time.Sleep(peerQueryMaj23SleepDuration) time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
} }
} }
} }
@ -664,21 +689,25 @@ OUTER_LOOP:
Type: types.VoteTypePrecommit, Type: types.VoteTypePrecommit,
BlockID: commit.BlockID, BlockID: commit.BlockID,
}}) }})
time.Sleep(peerQueryMaj23SleepDuration) time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
} }
} }
time.Sleep(peerQueryMaj23SleepDuration) time.Sleep(conR.conS.config.PeerQueryMaj23Sleep())
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
// String returns a string representation of the ConsensusReactor.
// NOTE: For now, it is just a hard-coded string to avoid accessing unprotected shared variables.
// TODO: improve!
func (conR *ConsensusReactor) String() string { func (conR *ConsensusReactor) String() string {
// better not to access shared variables // better not to access shared variables
return "ConsensusReactor" // conR.StringIndented("") return "ConsensusReactor" // conR.StringIndented("")
} }
// StringIndented returns an indented string representation of the ConsensusReactor
func (conR *ConsensusReactor) StringIndented(indent string) string { func (conR *ConsensusReactor) StringIndented(indent string) string {
s := "ConsensusReactor{\n" s := "ConsensusReactor{\n"
s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n" s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n"
@ -692,7 +721,8 @@ func (conR *ConsensusReactor) StringIndented(indent string) string {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Read only when returned by PeerState.GetRoundState(). // PeerRoundState contains the known state of a peer.
// NOTE: Read-only when returned by PeerState.GetRoundState().
type PeerRoundState struct { type PeerRoundState struct {
Height int // Height peer is at Height int // Height peer is at
Round int // Round peer is at, -1 if unknown. Round int // Round peer is at, -1 if unknown.
@ -700,21 +730,23 @@ type PeerRoundState struct {
StartTime time.Time // Estimated start of round 0 at this height StartTime time.Time // Estimated start of round 0 at this height
Proposal bool // True if peer has proposal for this round Proposal bool // True if peer has proposal for this round
ProposalBlockPartsHeader types.PartSetHeader // ProposalBlockPartsHeader types.PartSetHeader //
ProposalBlockParts *BitArray // ProposalBlockParts *cmn.BitArray //
ProposalPOLRound int // Proposal's POL round. -1 if none. ProposalPOLRound int // Proposal's POL round. -1 if none.
ProposalPOL *BitArray // nil until ProposalPOLMessage received. ProposalPOL *cmn.BitArray // nil until ProposalPOLMessage received.
Prevotes *BitArray // All votes peer has for this round Prevotes *cmn.BitArray // All votes peer has for this round
Precommits *BitArray // All precommits peer has for this round Precommits *cmn.BitArray // All precommits peer has for this round
LastCommitRound int // Round of commit for last height. -1 if none. LastCommitRound int // Round of commit for last height. -1 if none.
LastCommit *BitArray // All commit precommits of commit for last height. LastCommit *cmn.BitArray // All commit precommits of commit for last height.
CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none.
CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound CatchupCommit *cmn.BitArray // All commit precommits peer has for this height & CatchupCommitRound
} }
// String returns a string representation of the PeerRoundState
func (prs PeerRoundState) String() string { func (prs PeerRoundState) String() string {
return prs.StringIndented("") return prs.StringIndented("")
} }
// StringIndented returns a string representation of the PeerRoundState
func (prs PeerRoundState) StringIndented(indent string) string { func (prs PeerRoundState) StringIndented(indent string) string {
return fmt.Sprintf(`PeerRoundState{ return fmt.Sprintf(`PeerRoundState{
%s %v/%v/%v @%v %s %v/%v/%v @%v
@ -742,6 +774,8 @@ var (
ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime") ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime")
) )
// PeerState contains the known state of a peer, including its connection
// and threadsafe access to its PeerRoundState.
type PeerState struct { type PeerState struct {
Peer *p2p.Peer Peer *p2p.Peer
@ -749,6 +783,7 @@ type PeerState struct {
PeerRoundState PeerRoundState
} }
// NewPeerState returns a new PeerState for the given Peer
func NewPeerState(peer *p2p.Peer) *PeerState { func NewPeerState(peer *p2p.Peer) *PeerState {
return &PeerState{ return &PeerState{
Peer: peer, Peer: peer,
@ -761,7 +796,7 @@ func NewPeerState(peer *p2p.Peer) *PeerState {
} }
} }
// Returns an atomic snapshot of the PeerRoundState. // GetRoundState returns an atomic snapshot of the PeerRoundState.
// There's no point in mutating it since it won't change PeerState. // There's no point in mutating it since it won't change PeerState.
func (ps *PeerState) GetRoundState() *PeerRoundState { func (ps *PeerState) GetRoundState() *PeerRoundState {
ps.mtx.Lock() ps.mtx.Lock()
@ -771,7 +806,7 @@ func (ps *PeerState) GetRoundState() *PeerRoundState {
return &prs return &prs
} }
// Returns an atomic snapshot of the PeerRoundState's height // GetHeight returns an atomic snapshot of the PeerRoundState's height
// used by the mempool to ensure peers are caught up before broadcasting new txs // used by the mempool to ensure peers are caught up before broadcasting new txs
func (ps *PeerState) GetHeight() int { func (ps *PeerState) GetHeight() int {
ps.mtx.Lock() ps.mtx.Lock()
@ -779,6 +814,7 @@ func (ps *PeerState) GetHeight() int {
return ps.PeerRoundState.Height return ps.PeerRoundState.Height
} }
// SetHasProposal sets the given proposal as known for the peer.
func (ps *PeerState) SetHasProposal(proposal *types.Proposal) { func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -792,11 +828,12 @@ func (ps *PeerState) SetHasProposal(proposal *types.Proposal) {
ps.Proposal = true ps.Proposal = true
ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader ps.ProposalBlockPartsHeader = proposal.BlockPartsHeader
ps.ProposalBlockParts = NewBitArray(proposal.BlockPartsHeader.Total) ps.ProposalBlockParts = cmn.NewBitArray(proposal.BlockPartsHeader.Total)
ps.ProposalPOLRound = proposal.POLRound ps.ProposalPOLRound = proposal.POLRound
ps.ProposalPOL = nil // Nil until ProposalPOLMessage received. ps.ProposalPOL = nil // Nil until ProposalPOLMessage received.
} }
// SetHasProposalBlockPart sets the given block part index as known for the peer.
func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) { func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -808,9 +845,9 @@ func (ps *PeerState) SetHasProposalBlockPart(height int, round int, index int) {
ps.ProposalBlockParts.SetIndex(index, true) ps.ProposalBlockParts.SetIndex(index, true)
} }
// PickVoteToSend sends vote to peer. // PickSendVote picks a vote and sends it to the peer.
// Returns true if vote was sent. // Returns true if vote was sent.
func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) { func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool {
if vote, ok := ps.PickVoteToSend(votes); ok { if vote, ok := ps.PickVoteToSend(votes); ok {
msg := &VoteMessage{vote} msg := &VoteMessage{vote}
return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg}) return ps.Peer.Send(VoteChannel, struct{ ConsensusMessage }{msg})
@ -818,7 +855,9 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) (ok bool) {
return false return false
} }
// votes: Must be the correct Size() for the Height(). // PickVoteToSend picks a vote to send to the peer.
// Returns true if a vote was picked.
// NOTE: `votes` must be the correct Size() for the Height().
func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) { func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote, ok bool) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -846,9 +885,9 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote
return nil, false return nil, false
} }
func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray { func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *cmn.BitArray {
if !types.IsVoteTypeValid(type_) { if !types.IsVoteTypeValid(type_) {
PanicSanity("Invalid vote type") cmn.PanicSanity("Invalid vote type")
} }
if ps.Height == height { if ps.Height == height {
@ -901,7 +940,7 @@ func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators i
NOTE: This is wrong, 'round' could change. NOTE: This is wrong, 'round' could change.
e.g. if orig round is not the same as block LastCommit round. e.g. if orig round is not the same as block LastCommit round.
if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round { if ps.CatchupCommitRound != -1 && ps.CatchupCommitRound != round {
PanicSanity(Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round)) cmn.PanicSanity(cmn.Fmt("Conflicting CatchupCommitRound. Height: %v, Orig: %v, New: %v", height, ps.CatchupCommitRound, round))
} }
*/ */
if ps.CatchupCommitRound == round { if ps.CatchupCommitRound == round {
@ -911,10 +950,12 @@ func (ps *PeerState) ensureCatchupCommitRound(height, round int, numValidators i
if round == ps.Round { if round == ps.Round {
ps.CatchupCommit = ps.Precommits ps.CatchupCommit = ps.Precommits
} else { } else {
ps.CatchupCommit = NewBitArray(numValidators) ps.CatchupCommit = cmn.NewBitArray(numValidators)
} }
} }
// EnsureVoteVitArrays ensures the bit-arrays have been allocated for tracking
// what votes this peer has received.
// NOTE: It's important to make sure that numValidators actually matches // NOTE: It's important to make sure that numValidators actually matches
// what the node sees as the number of validators for height. // what the node sees as the number of validators for height.
func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) { func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
@ -926,24 +967,25 @@ func (ps *PeerState) EnsureVoteBitArrays(height int, numValidators int) {
func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) { func (ps *PeerState) ensureVoteBitArrays(height int, numValidators int) {
if ps.Height == height { if ps.Height == height {
if ps.Prevotes == nil { if ps.Prevotes == nil {
ps.Prevotes = NewBitArray(numValidators) ps.Prevotes = cmn.NewBitArray(numValidators)
} }
if ps.Precommits == nil { if ps.Precommits == nil {
ps.Precommits = NewBitArray(numValidators) ps.Precommits = cmn.NewBitArray(numValidators)
} }
if ps.CatchupCommit == nil { if ps.CatchupCommit == nil {
ps.CatchupCommit = NewBitArray(numValidators) ps.CatchupCommit = cmn.NewBitArray(numValidators)
} }
if ps.ProposalPOL == nil { if ps.ProposalPOL == nil {
ps.ProposalPOL = NewBitArray(numValidators) ps.ProposalPOL = cmn.NewBitArray(numValidators)
} }
} else if ps.Height == height+1 { } else if ps.Height == height+1 {
if ps.LastCommit == nil { if ps.LastCommit == nil {
ps.LastCommit = NewBitArray(numValidators) ps.LastCommit = cmn.NewBitArray(numValidators)
} }
} }
} }
// SetHasVote sets the given vote as known by the peer
func (ps *PeerState) SetHasVote(vote *types.Vote) { func (ps *PeerState) SetHasVote(vote *types.Vote) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -959,6 +1001,7 @@ func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
ps.getVoteBitArray(height, round, type_).SetIndex(index, true) ps.getVoteBitArray(height, round, type_).SetIndex(index, true)
} }
// ApplyNewRoundStepMessage updates the peer state for the new round.
func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) { func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -1012,6 +1055,7 @@ func (ps *PeerState) ApplyNewRoundStepMessage(msg *NewRoundStepMessage) {
} }
} }
// ApplyCommitStepMessage updates the peer state for the new commit.
func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) { func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -1024,6 +1068,7 @@ func (ps *PeerState) ApplyCommitStepMessage(msg *CommitStepMessage) {
ps.ProposalBlockParts = msg.BlockParts ps.ProposalBlockParts = msg.BlockParts
} }
// ApplyProposalPOLMessage updates the peer state for the new proposal POL.
func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) { func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -1040,6 +1085,7 @@ func (ps *PeerState) ApplyProposalPOLMessage(msg *ProposalPOLMessage) {
ps.ProposalPOL = msg.ProposalPOL ps.ProposalPOL = msg.ProposalPOL
} }
// ApplyHasVoteMessage updates the peer state for the new vote.
func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -1051,12 +1097,12 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) {
ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index) ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index)
} }
// The peer has responded with a bitarray of votes that it has // ApplyVoteSetBitsMessage updates the peer state for the bit-array of votes
// of the corresponding BlockID. // it claims to have for the corresponding BlockID.
// ourVotes: BitArray of votes we have for msg.BlockID // `ourVotes` is a BitArray of votes we have for msg.BlockID
// NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height), // NOTE: if ourVotes is nil (e.g. msg.Height < rs.Height),
// we conservatively overwrite ps's votes w/ msg.Votes. // we conservatively overwrite ps's votes w/ msg.Votes.
func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *BitArray) { func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *cmn.BitArray) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -1072,10 +1118,12 @@ func (ps *PeerState) ApplyVoteSetBitsMessage(msg *VoteSetBitsMessage, ourVotes *
} }
} }
// String returns a string representation of the PeerState
func (ps *PeerState) String() string { func (ps *PeerState) String() string {
return ps.StringIndented("") return ps.StringIndented("")
} }
// StringIndented returns a string representation of the PeerState
func (ps *PeerState) StringIndented(indent string) string { func (ps *PeerState) StringIndented(indent string) string {
return fmt.Sprintf(`PeerState{ return fmt.Sprintf(`PeerState{
%s Key %v %s Key %v
@ -1101,6 +1149,7 @@ const (
msgTypeVoteSetBits = byte(0x17) msgTypeVoteSetBits = byte(0x17)
) )
// ConsensusMessage is a message that can be sent and received on the ConsensusReactor
type ConsensusMessage interface{} type ConsensusMessage interface{}
var _ = wire.RegisterInterface( var _ = wire.RegisterInterface(
@ -1116,17 +1165,20 @@ var _ = wire.RegisterInterface(
wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits}, wire.ConcreteType{&VoteSetBitsMessage{}, msgTypeVoteSetBits},
) )
// DecodeMessage decodes the given bytes into a ConsensusMessage.
// TODO: check for unnecessary extra bytes at the end. // TODO: check for unnecessary extra bytes at the end.
func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) { func DecodeMessage(bz []byte) (msgType byte, msg ConsensusMessage, err error) {
msgType = bz[0] msgType = bz[0]
n := new(int) n := new(int)
r := bytes.NewReader(bz) r := bytes.NewReader(bz)
msg = wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err).(struct{ ConsensusMessage }).ConsensusMessage msgI := wire.ReadBinary(struct{ ConsensusMessage }{}, r, maxConsensusMessageSize, n, &err)
msg = msgI.(struct{ ConsensusMessage }).ConsensusMessage
return return
} }
//------------------------------------- //-------------------------------------
// NewRoundStepMessage is sent for every step taken in the ConsensusState.
// For every height/round/step transition // For every height/round/step transition
type NewRoundStepMessage struct { type NewRoundStepMessage struct {
Height int Height int
@ -1136,6 +1188,7 @@ type NewRoundStepMessage struct {
LastCommitRound int LastCommitRound int
} }
// String returns a string representation.
func (m *NewRoundStepMessage) String() string { func (m *NewRoundStepMessage) String() string {
return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]", return fmt.Sprintf("[NewRoundStep H:%v R:%v S:%v LCR:%v]",
m.Height, m.Round, m.Step, m.LastCommitRound) m.Height, m.Round, m.Step, m.LastCommitRound)
@ -1143,62 +1196,73 @@ func (m *NewRoundStepMessage) String() string {
//------------------------------------- //-------------------------------------
// CommitStepMessage is sent when a block is committed.
type CommitStepMessage struct { type CommitStepMessage struct {
Height int Height int
BlockPartsHeader types.PartSetHeader BlockPartsHeader types.PartSetHeader
BlockParts *BitArray BlockParts *cmn.BitArray
} }
// String returns a string representation.
func (m *CommitStepMessage) String() string { func (m *CommitStepMessage) String() string {
return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts) return fmt.Sprintf("[CommitStep H:%v BP:%v BA:%v]", m.Height, m.BlockPartsHeader, m.BlockParts)
} }
//------------------------------------- //-------------------------------------
// ProposalMessage is sent when a new block is proposed.
type ProposalMessage struct { type ProposalMessage struct {
Proposal *types.Proposal Proposal *types.Proposal
} }
// String returns a string representation.
func (m *ProposalMessage) String() string { func (m *ProposalMessage) String() string {
return fmt.Sprintf("[Proposal %v]", m.Proposal) return fmt.Sprintf("[Proposal %v]", m.Proposal)
} }
//------------------------------------- //-------------------------------------
// ProposalPOLMessage is sent when a previous proposal is re-proposed.
type ProposalPOLMessage struct { type ProposalPOLMessage struct {
Height int Height int
ProposalPOLRound int ProposalPOLRound int
ProposalPOL *BitArray ProposalPOL *cmn.BitArray
} }
// String returns a string representation.
func (m *ProposalPOLMessage) String() string { func (m *ProposalPOLMessage) String() string {
return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL) return fmt.Sprintf("[ProposalPOL H:%v POLR:%v POL:%v]", m.Height, m.ProposalPOLRound, m.ProposalPOL)
} }
//------------------------------------- //-------------------------------------
// BlockPartMessage is sent when gossipping a piece of the proposed block.
type BlockPartMessage struct { type BlockPartMessage struct {
Height int Height int
Round int Round int
Part *types.Part Part *types.Part
} }
// String returns a string representation.
func (m *BlockPartMessage) String() string { func (m *BlockPartMessage) String() string {
return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part) return fmt.Sprintf("[BlockPart H:%v R:%v P:%v]", m.Height, m.Round, m.Part)
} }
//------------------------------------- //-------------------------------------
// VoteMessage is sent when voting for a proposal (or lack thereof).
type VoteMessage struct { type VoteMessage struct {
Vote *types.Vote Vote *types.Vote
} }
// String returns a string representation.
func (m *VoteMessage) String() string { func (m *VoteMessage) String() string {
return fmt.Sprintf("[Vote %v]", m.Vote) return fmt.Sprintf("[Vote %v]", m.Vote)
} }
//------------------------------------- //-------------------------------------
// HasVoteMessage is sent to indicate that a particular vote has been received.
type HasVoteMessage struct { type HasVoteMessage struct {
Height int Height int
Round int Round int
@ -1206,12 +1270,14 @@ type HasVoteMessage struct {
Index int Index int
} }
// String returns a string representation.
func (m *HasVoteMessage) String() string { func (m *HasVoteMessage) String() string {
return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index) return fmt.Sprintf("[HasVote VI:%v V:{%v/%02d/%v} VI:%v]", m.Index, m.Height, m.Round, m.Type, m.Index)
} }
//------------------------------------- //-------------------------------------
// VoteSetMaj23Message is sent to indicate that a given BlockID has seen +2/3 votes.
type VoteSetMaj23Message struct { type VoteSetMaj23Message struct {
Height int Height int
Round int Round int
@ -1219,20 +1285,23 @@ type VoteSetMaj23Message struct {
BlockID types.BlockID BlockID types.BlockID
} }
// String returns a string representation.
func (m *VoteSetMaj23Message) String() string { func (m *VoteSetMaj23Message) String() string {
return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID) return fmt.Sprintf("[VSM23 %v/%02d/%v %v]", m.Height, m.Round, m.Type, m.BlockID)
} }
//------------------------------------- //-------------------------------------
// VoteSetBitsMessage is sent to communicate the bit-array of votes seen for the BlockID.
type VoteSetBitsMessage struct { type VoteSetBitsMessage struct {
Height int Height int
Round int Round int
Type byte Type byte
BlockID types.BlockID BlockID types.BlockID
Votes *BitArray Votes *cmn.BitArray
} }
// String returns a string representation.
func (m *VoteSetBitsMessage) String() string { func (m *VoteSetBitsMessage) String() string {
return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes) return fmt.Sprintf("[VSB %v/%02d/%v %v %v]", m.Height, m.Round, m.Type, m.BlockID, m.Votes)
} }

View File

@ -912,16 +912,17 @@ func (cs *ConsensusState) enterPrevote(height int, round int) {
} }
func (cs *ConsensusState) defaultDoPrevote(height int, round int) { func (cs *ConsensusState) defaultDoPrevote(height int, round int) {
logger := cs.Logger.With("height", height, "round", round)
// If a block is locked, prevote that. // If a block is locked, prevote that.
if cs.LockedBlock != nil { if cs.LockedBlock != nil {
cs.Logger.Info("enterPrevote: Block was locked") logger.Info("enterPrevote: Block was locked")
cs.signAddVote(types.VoteTypePrevote, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header()) cs.signAddVote(types.VoteTypePrevote, cs.LockedBlock.Hash(), cs.LockedBlockParts.Header())
return return
} }
// If ProposalBlock is nil, prevote nil. // If ProposalBlock is nil, prevote nil.
if cs.ProposalBlock == nil { if cs.ProposalBlock == nil {
cs.Logger.Info("enterPrevote: ProposalBlock is nil") logger.Info("enterPrevote: ProposalBlock is nil")
cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{})
return return
} }
@ -930,7 +931,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int, round int) {
err := cs.state.ValidateBlock(cs.ProposalBlock) err := cs.state.ValidateBlock(cs.ProposalBlock)
if err != nil { if err != nil {
// ProposalBlock is invalid, prevote nil. // ProposalBlock is invalid, prevote nil.
cs.Logger.Error("enterPrevote: ProposalBlock is invalid", "err", err) logger.Error("enterPrevote: ProposalBlock is invalid", "err", err)
cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{}) cs.signAddVote(types.VoteTypePrevote, nil, types.PartSetHeader{})
return return
} }
@ -938,6 +939,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int, round int) {
// Prevote cs.ProposalBlock // Prevote cs.ProposalBlock
// NOTE: the proposal signature is validated when it is received, // NOTE: the proposal signature is validated when it is received,
// and the proposal block parts are validated as they are received (against the merkle hash in the proposal) // and the proposal block parts are validated as they are received (against the merkle hash in the proposal)
logger.Info("enterPrevote: ProposalBlock is valid")
cs.signAddVote(types.VoteTypePrevote, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header()) cs.signAddVote(types.VoteTypePrevote, cs.ProposalBlock.Hash(), cs.ProposalBlockParts.Header())
} }
@ -1331,19 +1333,14 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerKey string) error {
if err == ErrVoteHeightMismatch { if err == ErrVoteHeightMismatch {
return err return err
} else if _, ok := err.(*types.ErrVoteConflictingVotes); ok { } else if _, ok := err.(*types.ErrVoteConflictingVotes); ok {
if peerKey == "" { if bytes.Equal(vote.ValidatorAddress, cs.privValidator.GetAddress()) {
cs.Logger.Error("Found conflicting vote from ourselves. Did you unsafe_reset a validator?", "height", vote.Height, "round", vote.Round, "type", vote.Type) cs.Logger.Error("Found conflicting vote from ourselves. Did you unsafe_reset a validator?", "height", vote.Height, "round", vote.Round, "type", vote.Type)
return err return err
} }
cs.Logger.Error("Found conflicting vote. Publish evidence (TODO)") cs.Logger.Error("Found conflicting vote. Publish evidence (TODO)", "height", vote.Height, "round", vote.Round, "type", vote.Type, "valAddr", vote.ValidatorAddress, "valIndex", vote.ValidatorIndex)
/* TODO
evidenceTx := &types.DupeoutTx{ // TODO: track evidence for inclusion in a block
Address: address,
VoteA: *errDupe.VoteA,
VoteB: *errDupe.VoteB,
}
cs.mempool.BroadcastTx(struct{???}{evidenceTx}) // shouldn't need to check returned err
*/
return err return err
} else { } else {
// Probably an invalid signature. Bad peer. // Probably an invalid signature. Bad peer.

View File

@ -21,7 +21,11 @@ const (
minWriteBufferSize = 65536 minWriteBufferSize = 65536
updateState = 2 * time.Second updateState = 2 * time.Second
pingTimeout = 40 * time.Second pingTimeout = 40 * time.Second
flushThrottle = 100 * time.Millisecond
// flushThrottle used here as a default.
// overwritten by the user config.
// TODO: remove
flushThrottle = 100 * time.Millisecond
defaultSendQueueCapacity = 1 defaultSendQueueCapacity = 1
defaultSendRate = int64(512000) // 500KB/s defaultSendRate = int64(512000) // 500KB/s
@ -89,13 +93,16 @@ type MConnection struct {
type MConnConfig struct { type MConnConfig struct {
SendRate int64 `mapstructure:"send_rate"` SendRate int64 `mapstructure:"send_rate"`
RecvRate int64 `mapstructure:"recv_rate"` RecvRate int64 `mapstructure:"recv_rate"`
flushThrottle time.Duration
} }
// DefaultMConnConfig returns the default config. // DefaultMConnConfig returns the default config.
func DefaultMConnConfig() *MConnConfig { func DefaultMConnConfig() *MConnConfig {
return &MConnConfig{ return &MConnConfig{
SendRate: defaultSendRate, SendRate: defaultSendRate,
RecvRate: defaultRecvRate, RecvRate: defaultRecvRate,
flushThrottle: flushThrottle,
} }
} }
@ -145,10 +152,11 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
return mconn return mconn
} }
// OnStart implements BaseService
func (c *MConnection) OnStart() error { func (c *MConnection) OnStart() error {
c.BaseService.OnStart() c.BaseService.OnStart()
c.quit = make(chan struct{}) c.quit = make(chan struct{})
c.flushTimer = cmn.NewThrottleTimer("flush", flushThrottle) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle)
c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateState)
go c.sendRoutine() go c.sendRoutine()
@ -156,6 +164,7 @@ func (c *MConnection) OnStart() error {
return nil return nil
} }
// OnStop implements BaseService
func (c *MConnection) OnStop() { func (c *MConnection) OnStop() {
c.BaseService.OnStop() c.BaseService.OnStop()
c.flushTimer.Stop() c.flushTimer.Stop()

View File

@ -59,11 +59,9 @@ func DefaultPeerConfig() *PeerConfig {
} }
} }
func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
return newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
}
func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
conn, err := dial(addr, config) conn, err := dial(addr, config)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Error creating peer") return nil, errors.Wrap(err, "Error creating peer")
@ -77,15 +75,15 @@ func newOutboundPeerWithConfig(addr *NetAddress, reactorsByCh map[byte]Reactor,
return peer, nil return peer, nil
} }
func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519) (*Peer, error) { func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
return newInboundPeerWithConfig(conn, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, DefaultPeerConfig()) onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
}
func newInboundPeerWithConfig(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
} }
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) { func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
onPeerError func(*Peer, interface{}), ourNodePrivKey crypto.PrivKeyEd25519, config *PeerConfig) (*Peer, error) {
conn := rawConn conn := rawConn
// Fuzz connection // Fuzz connection
@ -286,7 +284,9 @@ func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
return conn, nil return conn, nil
} }
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection {
onReceive := func(chID byte, msgBytes []byte) { onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID] reactor := reactorsByCh[chID]
if reactor == nil { if reactor == nil {

View File

@ -82,7 +82,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig)
} }
reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)}
pk := crypto.GenPrivKeyEd25519() pk := crypto.GenPrivKeyEd25519()
p, err := newOutboundPeerWithConfig(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk, config) p, err := newOutboundPeer(addr, reactorsByCh, chDescs, func(p *Peer, r interface{}) {}, pk, config)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -133,7 +133,7 @@ func (p *remotePeer) accept(l net.Listener) {
if err != nil { if err != nil {
golog.Fatalf("Failed to accept conn: %+v", err) golog.Fatalf("Failed to accept conn: %+v", err)
} }
peer, err := newInboundPeerWithConfig(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config) peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p *Peer, r interface{}) {}, p.PrivKey, p.Config)
if err != nil { if err != nil {
golog.Fatalf("Failed to create a peer: %+v", err) golog.Fatalf("Failed to create a peer: %+v", err)
} }

View File

@ -90,11 +90,13 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
dialing: cmn.NewCMap(), dialing: cmn.NewCMap(),
nodeInfo: nil, nodeInfo: nil,
} }
sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond // TODO: collapse the peerConfig into the config ?
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
return sw return sw
} }
// Not goroutine safe. // AddReactor adds the given reactor to the switch.
// NOTE: Not goroutine safe.
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
// Validate the reactor. // Validate the reactor.
// No two reactors can share the same channel. // No two reactors can share the same channel.
@ -112,43 +114,51 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
return reactor return reactor
} }
// Not goroutine safe. // Reactors returns a map of reactors registered on the switch.
// NOTE: Not goroutine safe.
func (sw *Switch) Reactors() map[string]Reactor { func (sw *Switch) Reactors() map[string]Reactor {
return sw.reactors return sw.reactors
} }
// Not goroutine safe. // Reactor returns the reactor with the given name.
// NOTE: Not goroutine safe.
func (sw *Switch) Reactor(name string) Reactor { func (sw *Switch) Reactor(name string) Reactor {
return sw.reactors[name] return sw.reactors[name]
} }
// Not goroutine safe. // AddListener adds the given listener to the switch for listening to incoming peer connections.
// NOTE: Not goroutine safe.
func (sw *Switch) AddListener(l Listener) { func (sw *Switch) AddListener(l Listener) {
sw.listeners = append(sw.listeners, l) sw.listeners = append(sw.listeners, l)
} }
// Not goroutine safe. // Listeners returns the list of listeners the switch listens on.
// NOTE: Not goroutine safe.
func (sw *Switch) Listeners() []Listener { func (sw *Switch) Listeners() []Listener {
return sw.listeners return sw.listeners
} }
// Not goroutine safe. // IsListening returns true if the switch has at least one listener.
// NOTE: Not goroutine safe.
func (sw *Switch) IsListening() bool { func (sw *Switch) IsListening() bool {
return len(sw.listeners) > 0 return len(sw.listeners) > 0
} }
// Not goroutine safe. // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
// NOTE: Not goroutine safe.
func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) { func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
sw.nodeInfo = nodeInfo sw.nodeInfo = nodeInfo
} }
// Not goroutine safe. // NodeInfo returns the switch's NodeInfo.
// NOTE: Not goroutine safe.
func (sw *Switch) NodeInfo() *NodeInfo { func (sw *Switch) NodeInfo() *NodeInfo {
return sw.nodeInfo return sw.nodeInfo
} }
// Not goroutine safe. // SetNodePrivKey sets the switche's private key for authenticated encryption.
// NOTE: Overwrites sw.nodeInfo.PubKey // NOTE: Overwrites sw.nodeInfo.PubKey.
// NOTE: Not goroutine safe.
func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) { func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
sw.nodePrivKey = nodePrivKey sw.nodePrivKey = nodePrivKey
if sw.nodeInfo != nil { if sw.nodeInfo != nil {
@ -156,7 +166,7 @@ func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
} }
} }
// Switch.Start() starts all the reactors, peers, and listeners. // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
func (sw *Switch) OnStart() error { func (sw *Switch) OnStart() error {
sw.BaseService.OnStart() sw.BaseService.OnStart()
// Start reactors // Start reactors
@ -177,6 +187,7 @@ func (sw *Switch) OnStart() error {
return nil return nil
} }
// OnStop implements BaseService. It stops all listeners, peers, and reactors.
func (sw *Switch) OnStop() { func (sw *Switch) OnStop() {
sw.BaseService.OnStop() sw.BaseService.OnStop()
// Stop listeners // Stop listeners
@ -195,6 +206,8 @@ func (sw *Switch) OnStop() {
} }
} }
// AddPeer checks the given peer's validity, performs a handshake, and adds the peer to the switch
// and to all registered reactors.
// NOTE: This performs a blocking handshake before the peer is added. // NOTE: This performs a blocking handshake before the peer is added.
// CONTRACT: If error is returned, peer is nil, and conn is immediately closed. // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
func (sw *Switch) AddPeer(peer *Peer) error { func (sw *Switch) AddPeer(peer *Peer) error {
@ -242,6 +255,7 @@ func (sw *Switch) AddPeer(peer *Peer) error {
return nil return nil
} }
// FilterConnByAddr returns an error if connecting to the given address is forbidden.
func (sw *Switch) FilterConnByAddr(addr net.Addr) error { func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
if sw.filterConnByAddr != nil { if sw.filterConnByAddr != nil {
return sw.filterConnByAddr(addr) return sw.filterConnByAddr(addr)
@ -249,6 +263,7 @@ func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
return nil return nil
} }
// FilterConnByPubKey returns an error if connecting to the given public key is forbidden.
func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error { func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
if sw.filterConnByPubKey != nil { if sw.filterConnByPubKey != nil {
return sw.filterConnByPubKey(pubkey) return sw.filterConnByPubKey(pubkey)
@ -257,10 +272,12 @@ func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
} }
// SetAddrFilter sets the function for filtering connections by address.
func (sw *Switch) SetAddrFilter(f func(net.Addr) error) { func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
sw.filterConnByAddr = f sw.filterConnByAddr = f
} }
// SetPubKeyFilter sets the function for filtering connections by public key.
func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) { func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
sw.filterConnByPubKey = f sw.filterConnByPubKey = f
} }
@ -272,7 +289,7 @@ func (sw *Switch) startInitPeer(peer *Peer) {
} }
} }
// Dial a list of seeds asynchronously in random order // DialSeeds dials a list of seeds asynchronously in random order
func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
netAddrs, err := NewNetAddressStrings(seeds) netAddrs, err := NewNetAddressStrings(seeds)
@ -315,12 +332,14 @@ func (sw *Switch) dialSeed(addr *NetAddress) {
} }
} }
// DialPeerWithAddress dials the given peer and runs sw.AddPeer if it connects successfully.
// If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
sw.dialing.Set(addr.IP.String(), addr) sw.dialing.Set(addr.IP.String(), addr)
defer sw.dialing.Delete(addr.IP.String()) defer sw.dialing.Delete(addr.IP.String())
sw.Logger.Info("Dialing peer", "address", addr) sw.Logger.Info("Dialing peer", "address", addr)
peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
if err != nil { if err != nil {
sw.Logger.Error("Failed to dial peer", "address", addr, "err", err) sw.Logger.Error("Failed to dial peer", "address", addr, "err", err)
return nil, err return nil, err
@ -339,6 +358,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer,
return peer, nil return peer, nil
} }
// IsDialing returns true if the switch is currently dialing the given address.
func (sw *Switch) IsDialing(addr *NetAddress) bool { func (sw *Switch) IsDialing(addr *NetAddress) bool {
return sw.dialing.Has(addr.IP.String()) return sw.dialing.Has(addr.IP.String())
} }
@ -347,6 +367,7 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool {
// trying to send for defaultSendTimeoutSeconds. Returns a channel // trying to send for defaultSendTimeoutSeconds. Returns a channel
// which receives success values for each attempted send (false if times out) // which receives success values for each attempted send (false if times out)
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
// TODO: Something more intelligent.
func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
successChan := make(chan bool, len(sw.peers.List())) successChan := make(chan bool, len(sw.peers.List()))
sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg) sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg)
@ -359,7 +380,7 @@ func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
return successChan return successChan
} }
// Returns the count of outbound/inbound and outbound-dialing peers. // NumPeers returns the count of outbound/inbound and outbound-dialing peers.
func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
peers := sw.peers.List() peers := sw.peers.List()
for _, peer := range peers { for _, peer := range peers {
@ -373,11 +394,13 @@ func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
return return
} }
// Peers returns the set of peers the switch is connected to.
func (sw *Switch) Peers() IPeerSet { func (sw *Switch) Peers() IPeerSet {
return sw.peers return sw.peers
} }
// Disconnect from a peer due to external error, retry if it is a persistent peer. // StopPeerForError disconnects from a peer due to external error.
// If the peer is persistent, it will attempt to reconnect.
// TODO: make record depending on reason. // TODO: make record depending on reason.
func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
addr := NewNetAddress(peer.Addr()) addr := NewNetAddress(peer.Addr())
@ -410,7 +433,7 @@ func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
} }
} }
// Disconnect from a peer gracefully. // StopPeerGracefully disconnects from a peer gracefully.
// TODO: handle graceful disconnects. // TODO: handle graceful disconnects.
func (sw *Switch) StopPeerGracefully(peer *Peer) { func (sw *Switch) StopPeerGracefully(peer *Peer) {
sw.Logger.Info("Stopping peer gracefully") sw.Logger.Info("Stopping peer gracefully")
@ -468,7 +491,7 @@ type SwitchEventDonePeer struct {
//------------------------------------------------------------------ //------------------------------------------------------------------
// Switches connected via arbitrary net.Conn; useful for testing // Switches connected via arbitrary net.Conn; useful for testing
// Returns n switches, connected according to the connect func. // MakeConnectedSwitches returns n switches, connected according to the connect func.
// If connect==Connect2Switches, the switches will be fully connected. // If connect==Connect2Switches, the switches will be fully connected.
// initSwitch defines how the ith switch should be initialized (ie. with what reactors). // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
// NOTE: panics if any switch fails to start. // NOTE: panics if any switch fails to start.
@ -493,7 +516,7 @@ func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Swit
var PanicOnAddPeerErr = false var PanicOnAddPeerErr = false
// Will connect switches i and j via net.Pipe() // Connect2Switches will connect switches i and j via net.Pipe()
// Blocks until a conection is established. // Blocks until a conection is established.
// NOTE: caller ensures i and j are within bounds // NOTE: caller ensures i and j are within bounds
func Connect2Switches(switches []*Switch, i, j int) { func Connect2Switches(switches []*Switch, i, j int) {
@ -519,6 +542,8 @@ func Connect2Switches(switches []*Switch, i, j int) {
<-doneCh <-doneCh
} }
// StartSwitches calls sw.Start() for each given switch.
// It returns the first encountered error.
func StartSwitches(switches []*Switch) error { func StartSwitches(switches []*Switch) error {
for _, s := range switches { for _, s := range switches {
_, err := s.Start() // start switch and reactors _, err := s.Start() // start switch and reactors
@ -547,7 +572,7 @@ func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f
} }
func (sw *Switch) addPeerWithConnection(conn net.Conn) error { func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
if err != nil { if err != nil {
conn.Close() conn.Close()
return err return err
@ -562,7 +587,7 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
} }
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
if err != nil { if err != nil {
conn.Close() conn.Close()
return err return err

View File

@ -244,7 +244,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
rp.Start() rp.Start()
defer rp.Stop() defer rp.Stop()
peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig())
require.Nil(err) require.Nil(err)
err = sw.AddPeer(peer) err = sw.AddPeer(peer)
require.Nil(err) require.Nil(err)
@ -270,7 +270,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
rp.Start() rp.Start()
defer rp.Stop() defer rp.Stop()
peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey) peer, err := newOutboundPeer(rp.Addr(), sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, DefaultPeerConfig())
peer.makePersistent() peer.makePersistent()
require.Nil(err) require.Nil(err)
err = sw.AddPeer(peer) err = sw.AddPeer(peer)

View File

@ -2,11 +2,11 @@ package version
const Maj = "0" const Maj = "0"
const Min = "10" const Min = "10"
const Fix = "1" const Fix = "2"
var ( var (
// The full version string // The full version string
Version = "0.10.1" Version = "0.10.2"
// GitCommit is set with --ldflags "-X main.gitCommit=$(git rev-parse HEAD)" // GitCommit is set with --ldflags "-X main.gitCommit=$(git rev-parse HEAD)"
GitCommit string GitCommit string