mirror of
https://github.com/fluencelabs/tendermint
synced 2025-07-31 20:21:56 +00:00
remove viper from p2p
This commit is contained in:
@@ -1,62 +0,0 @@
|
||||
package p2p
|
||||
|
||||
import (
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
// for node.Config
|
||||
type NetworkConfig struct {
|
||||
ListenAddress string `mapstructure:"laddr"`
|
||||
Seeds string `mapstructure:"seeds"`
|
||||
SkipUPNP bool `mapstructure:"skip_upnp"`
|
||||
AddrBookFile string `mapstructure:"addr_book_file"`
|
||||
AddrBookStrict bool `mapstructure:"addr_book_strict"`
|
||||
PexReactor bool `mapstructure:"pex_reactor"`
|
||||
}
|
||||
|
||||
func NewDefaultConfig(rootDir string) *NetworkConfig {
|
||||
return &NetworkConfig{
|
||||
AddrBookFile: rootDir + "/addrbook.json",
|
||||
AddrBookStrict: true,
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
// Switch config keys
|
||||
configKeyDialTimeoutSeconds = "dial_timeout_seconds"
|
||||
configKeyHandshakeTimeoutSeconds = "handshake_timeout_seconds"
|
||||
configKeyMaxNumPeers = "max_num_peers"
|
||||
configKeyAuthEnc = "authenticated_encryption"
|
||||
|
||||
// MConnection config keys
|
||||
configKeySendRate = "send_rate"
|
||||
configKeyRecvRate = "recv_rate"
|
||||
|
||||
// Fuzz params
|
||||
configFuzzEnable = "fuzz_enable" // use the fuzz wrapped conn
|
||||
configFuzzMode = "fuzz_mode" // eg. drop, delay
|
||||
configFuzzMaxDelayMilliseconds = "fuzz_max_delay_milliseconds"
|
||||
configFuzzProbDropRW = "fuzz_prob_drop_rw"
|
||||
configFuzzProbDropConn = "fuzz_prob_drop_conn"
|
||||
configFuzzProbSleep = "fuzz_prob_sleep"
|
||||
)
|
||||
|
||||
func setConfigDefaults(config *viper.Viper) {
|
||||
// Switch default config
|
||||
config.SetDefault(configKeyDialTimeoutSeconds, 3)
|
||||
config.SetDefault(configKeyHandshakeTimeoutSeconds, 20)
|
||||
config.SetDefault(configKeyMaxNumPeers, 50)
|
||||
config.SetDefault(configKeyAuthEnc, true)
|
||||
|
||||
// MConnection default config
|
||||
config.SetDefault(configKeySendRate, 512000) // 500KB/s
|
||||
config.SetDefault(configKeyRecvRate, 512000) // 500KB/s
|
||||
|
||||
// Fuzz defaults
|
||||
config.SetDefault(configFuzzEnable, false)
|
||||
config.SetDefault(configFuzzMode, FuzzModeDrop)
|
||||
config.SetDefault(configFuzzMaxDelayMilliseconds, 3000)
|
||||
config.SetDefault(configFuzzProbDropRW, 0.2)
|
||||
config.SetDefault(configFuzzProbDropConn, 0.00)
|
||||
config.SetDefault(configFuzzProbSleep, 0.00)
|
||||
}
|
@@ -87,8 +87,8 @@ type MConnection struct {
|
||||
|
||||
// MConnConfig is a MConnection configuration.
|
||||
type MConnConfig struct {
|
||||
SendRate int64
|
||||
RecvRate int64
|
||||
SendRate int64 `mapstructure:"send_rate"`
|
||||
RecvRate int64 `mapstructure:"recv_rate"`
|
||||
}
|
||||
|
||||
// DefaultMConnConfig returns the default config.
|
||||
|
21
p2p/peer.go
21
p2p/peer.go
@@ -35,23 +35,24 @@ type Peer struct {
|
||||
|
||||
// PeerConfig is a Peer configuration.
|
||||
type PeerConfig struct {
|
||||
AuthEnc bool // authenticated encryption
|
||||
AuthEnc bool `mapstructure:"auth_enc"` // authenticated encryption
|
||||
|
||||
HandshakeTimeout time.Duration
|
||||
DialTimeout time.Duration
|
||||
// times are in seconds
|
||||
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
|
||||
DialTimeout time.Duration `mapstructure:"dial_timeout"`
|
||||
|
||||
MConfig *MConnConfig
|
||||
MConfig *MConnConfig `mapstructure:"connection"`
|
||||
|
||||
Fuzz bool // fuzz connection (for testing)
|
||||
FuzzConfig *FuzzConnConfig
|
||||
Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing)
|
||||
FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
|
||||
}
|
||||
|
||||
// DefaultPeerConfig returns the default config.
|
||||
func DefaultPeerConfig() *PeerConfig {
|
||||
return &PeerConfig{
|
||||
AuthEnc: true,
|
||||
HandshakeTimeout: 2 * time.Second,
|
||||
DialTimeout: 3 * time.Second,
|
||||
HandshakeTimeout: 20, // * time.Second,
|
||||
DialTimeout: 3, // * time.Second,
|
||||
MConfig: DefaultMConnConfig(),
|
||||
Fuzz: false,
|
||||
FuzzConfig: DefaultFuzzConnConfig(),
|
||||
@@ -95,7 +96,7 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[
|
||||
|
||||
// Encrypt connection
|
||||
if config.AuthEnc {
|
||||
conn.SetDeadline(time.Now().Add(config.HandshakeTimeout))
|
||||
conn.SetDeadline(time.Now().Add(config.HandshakeTimeout * time.Second))
|
||||
|
||||
var err error
|
||||
conn, err = MakeSecretConnection(conn, ourNodePrivKey)
|
||||
@@ -279,7 +280,7 @@ func (p *Peer) Get(key string) interface{} {
|
||||
|
||||
func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
|
||||
log.Info("Dialing address", "address", addr)
|
||||
conn, err := addr.DialTimeout(config.DialTimeout)
|
||||
conn, err := addr.DialTimeout(config.DialTimeout * time.Second)
|
||||
if err != nil {
|
||||
log.Info("Failed dialing address", "address", addr, "error", err)
|
||||
return nil, err
|
||||
|
@@ -9,8 +9,8 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
wire "github.com/tendermint/go-wire"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
)
|
||||
|
||||
func TestPEXReactorBasic(t *testing.T) {
|
||||
@@ -68,7 +68,7 @@ func TestPEXReactorRunning(t *testing.T) {
|
||||
|
||||
// create switches
|
||||
for i := 0; i < N; i++ {
|
||||
switches[i] = makeSwitch(i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
|
||||
switches[i] = makeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
|
||||
r := NewPEXReactor(book)
|
||||
r.SetEnsurePeersPeriod(250 * time.Millisecond)
|
||||
sw.AddReactor("pex", r)
|
||||
|
@@ -7,8 +7,6 @@ import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
|
||||
crypto "github.com/tendermint/go-crypto"
|
||||
"github.com/tendermint/log15"
|
||||
. "github.com/tendermint/tmlibs/common"
|
||||
@@ -19,6 +17,29 @@ const (
|
||||
reconnectInterval = 3 * time.Second
|
||||
)
|
||||
|
||||
// for node.Config
|
||||
type Config struct {
|
||||
ListenAddress string `mapstructure:"laddr"`
|
||||
Seeds string `mapstructure:"seeds"`
|
||||
SkipUPNP bool `mapstructure:"skip_upnp"`
|
||||
AddrBookFile string `mapstructure:"addr_book_file"`
|
||||
AddrBookStrict bool `mapstructure:"addr_book_strict"`
|
||||
PexReactor bool `mapstructure:"pex_reactor"`
|
||||
MaxNumPeers int `mapstructure:"max_num_peers"`
|
||||
|
||||
Peer *PeerConfig `mapstructure:"peer"`
|
||||
}
|
||||
|
||||
func NewDefaultConfig(rootDir string) *Config {
|
||||
return &Config{
|
||||
ListenAddress: "tcp://0.0.0.0:46656",
|
||||
AddrBookFile: rootDir + "/addrbook.json",
|
||||
AddrBookStrict: true,
|
||||
MaxNumPeers: 50,
|
||||
Peer: DefaultPeerConfig(),
|
||||
}
|
||||
}
|
||||
|
||||
type Reactor interface {
|
||||
Service // Start, Stop
|
||||
|
||||
@@ -62,7 +83,7 @@ incoming messages are received on the reactor.
|
||||
type Switch struct {
|
||||
BaseService
|
||||
|
||||
config *viper.Viper
|
||||
config *Config
|
||||
listeners []Listener
|
||||
reactors map[string]Reactor
|
||||
chDescs []*ChannelDescriptor
|
||||
@@ -81,9 +102,7 @@ var (
|
||||
ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers")
|
||||
)
|
||||
|
||||
func NewSwitch(config *viper.Viper) *Switch {
|
||||
setConfigDefaults(config)
|
||||
|
||||
func NewSwitch(config *Config) *Switch {
|
||||
sw := &Switch{
|
||||
config: config,
|
||||
reactors: make(map[string]Reactor),
|
||||
@@ -209,7 +228,7 @@ func (sw *Switch) AddPeer(peer *Peer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.GetInt(configKeyHandshakeTimeoutSeconds))*time.Second); err != nil {
|
||||
if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.Peer.HandshakeTimeout*time.Second)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -318,7 +337,7 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer,
|
||||
sw.dialing.Set(addr.IP.String(), addr)
|
||||
defer sw.dialing.Delete(addr.IP.String())
|
||||
|
||||
peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, peerConfigFromGoConfig(sw.config))
|
||||
peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config.Peer)
|
||||
if err != nil {
|
||||
log.Info("Failed dialing peer", "address", addr, "error", err)
|
||||
return nil, err
|
||||
@@ -430,14 +449,14 @@ func (sw *Switch) listenerRoutine(l Listener) {
|
||||
}
|
||||
|
||||
// ignore connection if we already have enough
|
||||
maxPeers := sw.config.GetInt(configKeyMaxNumPeers)
|
||||
maxPeers := sw.config.MaxNumPeers
|
||||
if maxPeers <= sw.peers.Size() {
|
||||
log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
|
||||
continue
|
||||
}
|
||||
|
||||
// New inbound connection!
|
||||
err := sw.addPeerWithConnectionAndConfig(inConn, peerConfigFromGoConfig(sw.config))
|
||||
err := sw.addPeerWithConnectionAndConfig(inConn, sw.config.Peer)
|
||||
if err != nil {
|
||||
log.Notice("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err)
|
||||
continue
|
||||
@@ -469,10 +488,10 @@ type SwitchEventDonePeer struct {
|
||||
// If connect==Connect2Switches, the switches will be fully connected.
|
||||
// initSwitch defines how the ith switch should be initialized (ie. with what reactors).
|
||||
// NOTE: panics if any switch fails to start.
|
||||
func MakeConnectedSwitches(n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
|
||||
func MakeConnectedSwitches(cfg *Config, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
|
||||
switches := make([]*Switch, n)
|
||||
for i := 0; i < n; i++ {
|
||||
switches[i] = makeSwitch(i, "testing", "123.123.123", initSwitch)
|
||||
switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
|
||||
}
|
||||
|
||||
if err := StartSwitches(switches); err != nil {
|
||||
@@ -526,11 +545,11 @@ func StartSwitches(switches []*Switch) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeSwitch(i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
|
||||
func makeSwitch(cfg *Config, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
|
||||
privKey := crypto.GenPrivKeyEd25519()
|
||||
// new switch, add reactors
|
||||
// TODO: let the config be passed in?
|
||||
s := initSwitch(i, NewSwitch(viper.New()))
|
||||
s := initSwitch(i, NewSwitch(cfg))
|
||||
s.SetNodeInfo(&NodeInfo{
|
||||
PubKey: privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
|
||||
Moniker: Fmt("switch%d", i),
|
||||
@@ -572,23 +591,3 @@ func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConf
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func peerConfigFromGoConfig(config *viper.Viper) *PeerConfig {
|
||||
return &PeerConfig{
|
||||
AuthEnc: config.GetBool(configKeyAuthEnc),
|
||||
Fuzz: config.GetBool(configFuzzEnable),
|
||||
HandshakeTimeout: time.Duration(config.GetInt(configKeyHandshakeTimeoutSeconds)) * time.Second,
|
||||
DialTimeout: time.Duration(config.GetInt(configKeyDialTimeoutSeconds)) * time.Second,
|
||||
MConfig: &MConnConfig{
|
||||
SendRate: int64(config.GetInt(configKeySendRate)),
|
||||
RecvRate: int64(config.GetInt(configKeyRecvRate)),
|
||||
},
|
||||
FuzzConfig: &FuzzConnConfig{
|
||||
Mode: config.GetInt(configFuzzMode),
|
||||
MaxDelay: time.Duration(config.GetInt(configFuzzMaxDelayMilliseconds)) * time.Millisecond,
|
||||
ProbDropRW: config.GetFloat64(configFuzzProbDropRW),
|
||||
ProbDropConn: config.GetFloat64(configFuzzProbDropConn),
|
||||
ProbSleep: config.GetFloat64(configFuzzProbSleep),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@@ -8,8 +8,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/viper"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
crypto "github.com/tendermint/go-crypto"
|
||||
@@ -18,12 +16,12 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
config *viper.Viper
|
||||
config *Config
|
||||
)
|
||||
|
||||
func init() {
|
||||
config = viper.New()
|
||||
setConfigDefaults(config)
|
||||
config = NewDefaultConfig("")
|
||||
config.PexReactor = true
|
||||
}
|
||||
|
||||
type PeerMessage struct {
|
||||
@@ -92,7 +90,7 @@ func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
|
||||
// XXX: note this uses net.Pipe and not a proper TCP conn
|
||||
func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
|
||||
// Create two switches that will be interconnected.
|
||||
switches := MakeConnectedSwitches(2, initSwitch, Connect2Switches)
|
||||
switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
|
||||
return switches[0], switches[1]
|
||||
}
|
||||
|
||||
@@ -163,8 +161,8 @@ func TestSwitches(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConnAddrFilter(t *testing.T) {
|
||||
s1 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
|
||||
s2 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
|
||||
s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
|
||||
s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
|
||||
|
||||
c1, c2 := net.Pipe()
|
||||
|
||||
@@ -197,8 +195,8 @@ func TestConnAddrFilter(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestConnPubKeyFilter(t *testing.T) {
|
||||
s1 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
|
||||
s2 := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
|
||||
s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
|
||||
s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
|
||||
|
||||
c1, c2 := net.Pipe()
|
||||
|
||||
@@ -234,7 +232,7 @@ func TestConnPubKeyFilter(t *testing.T) {
|
||||
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
|
||||
assert, require := assert.New(t), require.New(t)
|
||||
|
||||
sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
|
||||
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
|
||||
sw.Start()
|
||||
defer sw.Stop()
|
||||
|
||||
@@ -260,7 +258,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
|
||||
func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
|
||||
assert, require := assert.New(t), require.New(t)
|
||||
|
||||
sw := makeSwitch(1, "testing", "123.123.123", initSwitchFunc)
|
||||
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
|
||||
sw.Start()
|
||||
defer sw.Stop()
|
||||
|
||||
|
Reference in New Issue
Block a user