Merge pull request #1129 from tendermint/addrbook

p2p: bust up into sub dirs
This commit is contained in:
Ethan Buchman 2018-01-23 23:10:50 -05:00 committed by GitHub
commit d7b1b8d3d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 761 additions and 639 deletions

View File

@ -22,11 +22,13 @@ var (
defaultGenesisJSONName = "genesis.json"
defaultPrivValName = "priv_validator.json"
defaultNodeKeyName = "node_key.json"
defaultAddrBookName = "addrbook.json"
defaultConfigFilePath = filepath.Join(defaultConfigDir, defaultConfigFileName)
defaultGenesisJSONPath = filepath.Join(defaultConfigDir, defaultGenesisJSONName)
defaultPrivValPath = filepath.Join(defaultConfigDir, defaultPrivValName)
defaultNodeKeyPath = filepath.Join(defaultConfigDir, defaultNodeKeyName)
defaultAddrBookPath = filepath.Join(defaultConfigDir, defaultAddrBookName)
)
// Config defines the top level configuration for a Tendermint node
@ -278,7 +280,7 @@ type P2PConfig struct {
func DefaultP2PConfig() *P2PConfig {
return &P2PConfig{
ListenAddress: "tcp://0.0.0.0:46656",
AddrBook: "addrbook.json",
AddrBook: defaultAddrBookPath,
AddrBookStrict: true,
MaxNumPeers: 50,
FlushThrottleTimeout: 100,

View File

@ -22,6 +22,7 @@ import (
"github.com/tendermint/tendermint/evidence"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/pex"
"github.com/tendermint/tendermint/p2p/trust"
"github.com/tendermint/tendermint/proxy"
rpccore "github.com/tendermint/tendermint/rpc/core"
@ -97,7 +98,7 @@ type Node struct {
// network
sw *p2p.Switch // p2p connections
addrBook *p2p.AddrBook // known peers
addrBook pex.AddrBook // known peers
trustMetricStore *trust.TrustMetricStore // trust metrics for all peers
// services
@ -238,10 +239,10 @@ func NewNode(config *cfg.Config,
sw.AddReactor("EVIDENCE", evidenceReactor)
// Optionally, start the pex reactor
var addrBook *p2p.AddrBook
var addrBook pex.AddrBook
var trustMetricStore *trust.TrustMetricStore
if config.P2P.PexReactor {
addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
addrBook = pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
// Get the trust metric history data
@ -256,8 +257,8 @@ func NewNode(config *cfg.Config,
if config.P2P.Seeds != "" {
seeds = strings.Split(config.P2P.Seeds, ",")
}
pexReactor := p2p.NewPEXReactor(addrBook,
&p2p.PEXReactorConfig{Seeds: seeds})
pexReactor := pex.NewPEXReactor(addrBook,
&pex.PEXReactorConfig{Seeds: seeds})
pexReactor.SetLogger(p2pLogger)
sw.AddReactor("PEX", pexReactor)
}

View File

@ -1,12 +1,15 @@
package p2p
import cmn "github.com/tendermint/tmlibs/common"
import (
"github.com/tendermint/tendermint/p2p/conn"
cmn "github.com/tendermint/tmlibs/common"
)
type Reactor interface {
cmn.Service // Start, Stop
SetSwitch(*Switch)
GetChannels() []*ChannelDescriptor
GetChannels() []*conn.ChannelDescriptor
AddPeer(peer Peer)
RemovePeer(peer Peer, reason interface{})
Receive(chID byte, peer Peer, msgBytes []byte) // CONTRACT: msgBytes are not nil
@ -29,7 +32,7 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor {
func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}
func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
func (_ *BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (_ *BaseReactor) AddPeer(peer Peer) {}
func (_ *BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
func (_ *BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {}

View File

@ -1,6 +1,6 @@
// +build go1.10
package p2p
package conn
// Go1.10 has a proper net.Conn implementation that
// has the SetDeadline method implemented as per
@ -10,6 +10,6 @@ package p2p
import "net"
func netPipe() (net.Conn, net.Conn) {
func NetPipe() (net.Conn, net.Conn) {
return net.Pipe()
}

View File

@ -1,6 +1,6 @@
// +build !go1.10
package p2p
package conn
import (
"net"
@ -24,7 +24,7 @@ func (p *pipe) SetDeadline(t time.Time) error {
return nil
}
func netPipe() (net.Conn, net.Conn) {
func NetPipe() (net.Conn, net.Conn) {
p1, p2 := net.Pipe()
return &pipe{p1}, &pipe{p2}
}

View File

@ -1,4 +1,4 @@
package p2p
package conn
import (
"bufio"
@ -97,13 +97,13 @@ type MConnConfig struct {
SendRate int64 `mapstructure:"send_rate"`
RecvRate int64 `mapstructure:"recv_rate"`
maxMsgPacketPayloadSize int
MaxMsgPacketPayloadSize int
flushThrottle time.Duration
FlushThrottle time.Duration
}
func (cfg *MConnConfig) maxMsgPacketTotalSize() int {
return cfg.maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
return cfg.MaxMsgPacketPayloadSize + maxMsgPacketOverheadSize
}
// DefaultMConnConfig returns the default config.
@ -111,8 +111,8 @@ func DefaultMConnConfig() *MConnConfig {
return &MConnConfig{
SendRate: defaultSendRate,
RecvRate: defaultRecvRate,
maxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize,
flushThrottle: defaultFlushThrottle,
MaxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize,
FlushThrottle: defaultFlushThrottle,
}
}
@ -171,7 +171,7 @@ func (c *MConnection) OnStart() error {
return err
}
c.quit = make(chan struct{})
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle)
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout)
c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats)
go c.sendRoutine()
@ -586,7 +586,7 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
desc: desc,
sendQueue: make(chan []byte, desc.SendQueueCapacity),
recving: make([]byte, 0, desc.RecvBufferCapacity),
maxMsgPacketPayloadSize: conn.config.maxMsgPacketPayloadSize,
maxMsgPacketPayloadSize: conn.config.MaxMsgPacketPayloadSize,
}
}

View File

@ -1,4 +1,4 @@
package p2p
package conn
import (
"net"
@ -31,7 +31,7 @@ func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msg
func TestMConnectionSend(t *testing.T) {
assert, require := assert.New(t), require.New(t)
server, client := netPipe()
server, client := NetPipe()
defer server.Close() // nolint: errcheck
defer client.Close() // nolint: errcheck
@ -64,7 +64,7 @@ func TestMConnectionSend(t *testing.T) {
func TestMConnectionReceive(t *testing.T) {
assert, require := assert.New(t), require.New(t)
server, client := netPipe()
server, client := NetPipe()
defer server.Close() // nolint: errcheck
defer client.Close() // nolint: errcheck
@ -102,7 +102,7 @@ func TestMConnectionReceive(t *testing.T) {
func TestMConnectionStatus(t *testing.T) {
assert, require := assert.New(t), require.New(t)
server, client := netPipe()
server, client := NetPipe()
defer server.Close() // nolint: errcheck
defer client.Close() // nolint: errcheck
@ -119,7 +119,7 @@ func TestMConnectionStatus(t *testing.T) {
func TestMConnectionStopsAndReturnsError(t *testing.T) {
assert, require := assert.New(t), require.New(t)
server, client := netPipe()
server, client := NetPipe()
defer server.Close() // nolint: errcheck
defer client.Close() // nolint: errcheck
@ -152,7 +152,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) {
}
func newClientAndServerConnsForReadErrors(require *require.Assertions, chOnErr chan struct{}) (*MConnection, *MConnection) {
server, client := netPipe()
server, client := NetPipe()
onReceive := func(chID byte, msgBytes []byte) {}
onError := func(r interface{}) {}
@ -283,7 +283,7 @@ func TestMConnectionReadErrorUnknownMsgType(t *testing.T) {
func TestMConnectionTrySend(t *testing.T) {
assert, require := assert.New(t), require.New(t)
server, client := netPipe()
server, client := NetPipe()
defer server.Close()
defer client.Close()

View File

@ -4,7 +4,7 @@
// is known ahead of time, and thus we are technically
// still vulnerable to MITM. (TODO!)
// See docs/sts-final.pdf for more info
package p2p
package conn
import (
"bytes"

View File

@ -1,4 +1,4 @@
package p2p
package conn
import (
"io"

20
p2p/errors.go Normal file
View File

@ -0,0 +1,20 @@
package p2p
import (
"errors"
"fmt"
)
var (
ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
ErrSwitchConnectToSelf = errors.New("Connect to self")
)
type ErrSwitchAuthenticationFailure struct {
Dialed *NetAddress
Got ID
}
func (e ErrSwitchAuthenticationFailure) Error() string {
return fmt.Sprintf("Failed to authenticate peer. Dialed %v, but got peer with ID %s", e.Dialed, e.Got)
}

105
p2p/node_info.go Normal file
View File

@ -0,0 +1,105 @@
package p2p
import (
"fmt"
"strings"
crypto "github.com/tendermint/go-crypto"
)
const maxNodeInfoSize = 10240 // 10Kb
func MaxNodeInfoSize() int {
return maxNodeInfoSize
}
// NodeInfo is the basic node information exchanged
// between two peers during the Tendermint P2P handshake.
type NodeInfo struct {
// Authenticate
PubKey crypto.PubKey `json:"pub_key"` // authenticated pubkey
ListenAddr string `json:"listen_addr"` // accepting incoming
// Check compatibility
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
// Sanitize
Moniker string `json:"moniker"` // arbitrary moniker
Other []string `json:"other"` // other application specific data
}
// Validate checks the self-reported NodeInfo is safe.
// It returns an error if the info.PubKey doesn't match the given pubKey.
// TODO: constraints for Moniker/Other? Or is that for the UI ?
func (info NodeInfo) Validate(pubKey crypto.PubKey) error {
if !info.PubKey.Equals(pubKey) {
return fmt.Errorf("info.PubKey (%v) doesn't match peer.PubKey (%v)",
info.PubKey, pubKey)
}
return nil
}
// CompatibleWith checks if two NodeInfo are compatible with eachother.
// CONTRACT: two nodes are compatible if the major/minor versions match and network match.
func (info NodeInfo) CompatibleWith(other NodeInfo) error {
iMajor, iMinor, _, iErr := splitVersion(info.Version)
oMajor, oMinor, _, oErr := splitVersion(other.Version)
// if our own version number is not formatted right, we messed up
if iErr != nil {
return iErr
}
// version number must be formatted correctly ("x.x.x")
if oErr != nil {
return oErr
}
// major version must match
if iMajor != oMajor {
return fmt.Errorf("Peer is on a different major version. Got %v, expected %v", oMajor, iMajor)
}
// minor version must match
if iMinor != oMinor {
return fmt.Errorf("Peer is on a different minor version. Got %v, expected %v", oMinor, iMinor)
}
// nodes must be on the same network
if info.Network != other.Network {
return fmt.Errorf("Peer is on a different network. Got %v, expected %v", other.Network, info.Network)
}
return nil
}
func (info NodeInfo) ID() ID {
return PubKeyToID(info.PubKey)
}
// NetAddress returns a NetAddress derived from the NodeInfo -
// it includes the authenticated peer ID and the self-reported
// ListenAddr. Note that the ListenAddr is not authenticated and
// may not match that address actually dialed if its an outbound peer.
func (info NodeInfo) NetAddress() *NetAddress {
id := PubKeyToID(info.PubKey)
addr := info.ListenAddr
netAddr, err := NewNetAddressString(IDAddressString(id, addr))
if err != nil {
panic(err) // everything should be well formed by now
}
return netAddr
}
func (info NodeInfo) String() string {
return fmt.Sprintf("NodeInfo{pk: %v, moniker: %v, network: %v [listen %v], version: %v (%v)}", info.PubKey, info.Moniker, info.Network, info.ListenAddr, info.Version, info.Other)
}
func splitVersion(version string) (string, string, string, error) {
spl := strings.Split(version, ".")
if len(spl) != 3 {
return "", "", "", fmt.Errorf("Invalid version format %v", version)
}
return spl[0], spl[1], spl[2], nil
}

View File

@ -11,6 +11,8 @@ import (
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
tmconn "github.com/tendermint/tendermint/p2p/conn"
)
// Peer is an interface representing a peer connected on a reactor.
@ -21,7 +23,7 @@ type Peer interface {
IsOutbound() bool // did we dial the peer
IsPersistent() bool // do we redial this peer when we disconnect
NodeInfo() NodeInfo // peer's info
Status() ConnectionStatus
Status() tmconn.ConnectionStatus
Send(byte, interface{}) bool
TrySend(byte, interface{}) bool
@ -41,7 +43,7 @@ type peer struct {
outbound bool
conn net.Conn // source connection
mconn *MConnection // multiplex connection
mconn *tmconn.MConnection // multiplex connection
persistent bool
config *PeerConfig
@ -58,7 +60,7 @@ type PeerConfig struct {
HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"`
DialTimeout time.Duration `mapstructure:"dial_timeout"`
MConfig *MConnConfig `mapstructure:"connection"`
MConfig *tmconn.MConnConfig `mapstructure:"connection"`
Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing)
FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"`
@ -70,13 +72,13 @@ func DefaultPeerConfig() *PeerConfig {
AuthEnc: true,
HandshakeTimeout: 20, // * time.Second,
DialTimeout: 3, // * time.Second,
MConfig: DefaultMConnConfig(),
MConfig: tmconn.DefaultMConnConfig(),
Fuzz: false,
FuzzConfig: DefaultFuzzConnConfig(),
}
}
func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig, persistent bool) (*peer, error) {
conn, err := dial(addr, config)
@ -96,7 +98,7 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []
return peer, nil
}
func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) {
// TODO: issue PoW challenge
@ -104,7 +106,7 @@ func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*Cha
return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config)
}
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) {
conn := rawConn
@ -122,7 +124,7 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[
}
var err error
conn, err = MakeSecretConnection(conn, ourNodePrivKey)
conn, err = tmconn.MakeSecretConnection(conn, ourNodePrivKey)
if err != nil {
return nil, errors.Wrap(err, "Error creating peer")
}
@ -191,7 +193,7 @@ func (p *peer) NodeInfo() NodeInfo {
}
// Status returns the peer's ConnectionStatus.
func (p *peer) Status() ConnectionStatus {
func (p *peer) Status() tmconn.ConnectionStatus {
return p.mconn.Status()
}
@ -252,7 +254,7 @@ func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) err
},
func() {
var n int
wire.ReadBinary(&peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2)
wire.ReadBinary(&peerNodeInfo, p.conn, MaxNodeInfoSize(), &n, &err2)
p.Logger.Info("Peer handshake", "peerNodeInfo", peerNodeInfo)
})
if err1 != nil {
@ -267,8 +269,6 @@ func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) err
return errors.Wrap(err, "Error removing deadline")
}
// TODO: fix the peerNodeInfo.ListenAddr
p.nodeInfo = peerNodeInfo
return nil
}
@ -283,7 +283,7 @@ func (p *peer) PubKey() crypto.PubKey {
if !p.nodeInfo.PubKey.Empty() {
return p.nodeInfo.PubKey
} else if p.config.AuthEnc {
return p.conn.(*SecretConnection).RemotePubKey()
return p.conn.(*tmconn.SecretConnection).RemotePubKey()
}
panic("Attempt to get peer's PubKey before calling Handshake")
}
@ -316,8 +316,8 @@ func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) {
return conn, nil
}
func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor,
onPeerError func(Peer, interface{}), config *MConnConfig) *MConnection {
func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor,
onPeerError func(Peer, interface{}), config *tmconn.MConnConfig) *tmconn.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
@ -331,5 +331,5 @@ func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, ch
onPeerError(p, r)
}
return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
return tmconn.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
crypto "github.com/tendermint/go-crypto"
tmconn "github.com/tendermint/tendermint/p2p/conn"
)
func TestPeerBasic(t *testing.T) {
@ -81,7 +82,7 @@ func TestPeerSend(t *testing.T) {
}
func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*peer, error) {
chDescs := []*ChannelDescriptor{
chDescs := []*tmconn.ChannelDescriptor{
{ID: 0x01, Priority: 1},
}
reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)}
@ -137,7 +138,7 @@ func (p *remotePeer) accept(l net.Listener) {
if err != nil {
golog.Fatalf("Failed to accept conn: %+v", err)
}
peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p Peer, r interface{}) {}, p.PrivKey, p.Config)
peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*tmconn.ChannelDescriptor, 0), func(p Peer, r interface{}) {}, p.PrivKey, p.Config)
if err != nil {
golog.Fatalf("Failed to create a peer: %+v", err)
}

View File

@ -2,95 +2,80 @@
// Originally Copyright (c) 2013-2014 Conformal Systems LLC.
// https://github.com/conformal/btcd/blob/master/LICENSE
package p2p
package pex
import (
"crypto/sha256"
"encoding/binary"
"encoding/json"
"fmt"
"math"
"math/rand"
"net"
"os"
"sync"
"time"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/p2p"
cmn "github.com/tendermint/tmlibs/common"
)
const (
// addresses under which the address manager will claim to need more addresses.
needAddressThreshold = 1000
// interval used to dump the address cache to disk for future use.
dumpAddressInterval = time.Minute * 2
// max addresses in each old address bucket.
oldBucketSize = 64
// buckets we split old addresses over.
oldBucketCount = 64
// max addresses in each new address bucket.
newBucketSize = 64
// buckets that we spread new addresses over.
newBucketCount = 256
// old buckets over which an address group will be spread.
oldBucketsPerGroup = 4
// new buckets over which a source address group will be spread.
newBucketsPerGroup = 32
// buckets a frequently seen new address may end up in.
maxNewBucketsPerAddress = 4
// days before which we assume an address has vanished
// if we have not seen it announced in that long.
numMissingDays = 30
// tries without a single success before we assume an address is bad.
numRetries = 3
// max failures we will accept without a success before considering an address bad.
maxFailures = 10
// days since the last success before we will consider evicting an address.
minBadDays = 7
// % of total addresses known returned by GetSelection.
getSelectionPercent = 23
// min addresses that must be returned by GetSelection. Useful for bootstrapping.
minGetSelection = 32
// max addresses returned by GetSelection
// NOTE: this must match "maxPexMessageSize"
maxGetSelection = 250
)
const (
bucketTypeNew = 0x01
bucketTypeOld = 0x02
)
// AddrBook - concurrency safe peer address manager.
type AddrBook struct {
// AddrBook is an address book used for tracking peers
// so we can gossip about them to others and select
// peers to dial.
// TODO: break this up?
type AddrBook interface {
cmn.Service
// Add our own addresses so we don't later add ourselves
AddOurAddress(*p2p.NetAddress)
// Add and remove an address
AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) error
RemoveAddress(addr *p2p.NetAddress)
// Do we need more peers?
NeedMoreAddrs() bool
// Pick an address to dial
PickAddress(newBias int) *p2p.NetAddress
// Mark address
MarkGood(*p2p.NetAddress)
MarkAttempt(*p2p.NetAddress)
MarkBad(*p2p.NetAddress)
// Send a selection of addresses to peers
GetSelection() []*p2p.NetAddress
// TODO: remove
ListOfKnownAddresses() []*knownAddress
// Persist to disk
Save()
}
var _ AddrBook = (*addrBook)(nil)
// addrBook - concurrency safe peer address manager.
// Implements AddrBook.
type addrBook struct {
cmn.BaseService
// immutable after creation
filePath string
routabilityStrict bool
key string
key string // random prefix for bucket placement
// accessed concurrently
mtx sync.Mutex
rand *rand.Rand
ourAddrs map[string]*NetAddress
addrLookup map[ID]*knownAddress // new & old
ourAddrs map[string]*p2p.NetAddress
addrLookup map[p2p.ID]*knownAddress // new & old
bucketsOld []map[string]*knownAddress
bucketsNew []map[string]*knownAddress
nOld int
@ -101,11 +86,11 @@ type AddrBook struct {
// NewAddrBook creates a new address book.
// Use Start to begin processing asynchronous address updates.
func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
am := &AddrBook{
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
ourAddrs: make(map[string]*NetAddress),
addrLookup: make(map[ID]*knownAddress),
func NewAddrBook(filePath string, routabilityStrict bool) *addrBook {
am := &addrBook{
rand: rand.New(rand.NewSource(time.Now().UnixNano())), // TODO: seed from outside
ourAddrs: make(map[string]*p2p.NetAddress),
addrLookup: make(map[p2p.ID]*knownAddress),
filePath: filePath,
routabilityStrict: routabilityStrict,
}
@ -114,8 +99,9 @@ func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
return am
}
// Initialize the buckets.
// When modifying this, don't forget to update loadFromFile()
func (a *AddrBook) init() {
func (a *addrBook) init() {
a.key = crypto.CRandHex(24) // 24/2 * 8 = 96 bits
// New addr buckets
a.bucketsNew = make([]map[string]*knownAddress, newBucketCount)
@ -130,7 +116,7 @@ func (a *AddrBook) init() {
}
// OnStart implements Service.
func (a *AddrBook) OnStart() error {
func (a *addrBook) OnStart() error {
if err := a.BaseService.OnStart(); err != nil {
return err
}
@ -145,62 +131,56 @@ func (a *AddrBook) OnStart() error {
}
// OnStop implements Service.
func (a *AddrBook) OnStop() {
func (a *addrBook) OnStop() {
a.BaseService.OnStop()
}
func (a *AddrBook) Wait() {
func (a *addrBook) Wait() {
a.wg.Wait()
}
// AddOurAddress adds another one of our addresses.
func (a *AddrBook) AddOurAddress(addr *NetAddress) {
//-------------------------------------------------------
// AddOurAddress one of our addresses.
func (a *addrBook) AddOurAddress(addr *p2p.NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.Logger.Info("Add our address to book", "addr", addr)
a.ourAddrs[addr.String()] = addr
}
// OurAddresses returns a list of our addresses.
func (a *AddrBook) OurAddresses() []*NetAddress {
addrs := []*NetAddress{}
for _, addr := range a.ourAddrs {
addrs = append(addrs, addr)
}
return addrs
}
// AddAddress adds the given address as received from the given source.
// AddAddress implements AddrBook - adds the given address as received from the given source.
// NOTE: addr must not be nil
func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) error {
func (a *addrBook) AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) error {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.addAddress(addr, src)
}
// NeedMoreAddrs returns true if there are not have enough addresses in the book.
func (a *AddrBook) NeedMoreAddrs() bool {
// RemoveAddress implements AddrBook - removes the address from the book.
func (a *addrBook) RemoveAddress(addr *p2p.NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.addrLookup[addr.ID]
if ka == nil {
return
}
a.Logger.Info("Remove address from book", "addr", ka.Addr, "ID", ka.ID)
a.removeFromAllBuckets(ka)
}
// NeedMoreAddrs implements AddrBook - returns true if there are not have enough addresses in the book.
func (a *addrBook) NeedMoreAddrs() bool {
return a.Size() < needAddressThreshold
}
// Size returns the number of addresses in the book.
func (a *AddrBook) Size() int {
a.mtx.Lock()
defer a.mtx.Unlock()
return a.size()
}
func (a *AddrBook) size() int {
return a.nNew + a.nOld
}
// PickAddress picks an address to connect to.
// PickAddress implements AddrBook. It picks an address to connect to.
// The address is picked randomly from an old or new bucket according
// to the newBias argument, which must be between [0, 100] (or else is truncated to that range)
// and determines how biased we are to pick an address from a new bucket.
// PickAddress returns nil if the AddrBook is empty or if we try to pick
// from an empty bucket.
func (a *AddrBook) PickAddress(newBias int) *NetAddress {
func (a *addrBook) PickAddress(newBias int) *p2p.NetAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
@ -244,9 +224,9 @@ func (a *AddrBook) PickAddress(newBias int) *NetAddress {
return nil
}
// MarkGood marks the peer as good and moves it into an "old" bucket.
// TODO: call this from somewhere
func (a *AddrBook) MarkGood(addr *NetAddress) {
// MarkGood implements AddrBook - it marks the peer as good and
// moves it into an "old" bucket.
func (a *addrBook) MarkGood(addr *p2p.NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.addrLookup[addr.ID]
@ -259,8 +239,8 @@ func (a *AddrBook) MarkGood(addr *NetAddress) {
}
}
// MarkAttempt marks that an attempt was made to connect to the address.
func (a *AddrBook) MarkAttempt(addr *NetAddress) {
// MarkAttempt implements AddrBook - it marks that an attempt was made to connect to the address.
func (a *addrBook) MarkAttempt(addr *p2p.NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.addrLookup[addr.ID]
@ -270,28 +250,15 @@ func (a *AddrBook) MarkAttempt(addr *NetAddress) {
ka.markAttempt()
}
// MarkBad currently just ejects the address. In the future, consider
// blacklisting.
func (a *AddrBook) MarkBad(addr *NetAddress) {
// MarkBad implements AddrBook. Currently it just ejects the address.
// TODO: black list for some amount of time
func (a *addrBook) MarkBad(addr *p2p.NetAddress) {
a.RemoveAddress(addr)
}
// RemoveAddress removes the address from the book.
func (a *AddrBook) RemoveAddress(addr *NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.addrLookup[addr.ID]
if ka == nil {
return
}
a.Logger.Info("Remove address from book", "addr", ka.Addr, "ID", ka.ID)
a.removeFromAllBuckets(ka)
}
/* Peer exchange */
// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
func (a *AddrBook) GetSelection() []*NetAddress {
// GetSelection implements AddrBook.
// It randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
func (a *addrBook) GetSelection() []*p2p.NetAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
@ -299,7 +266,7 @@ func (a *AddrBook) GetSelection() []*NetAddress {
return nil
}
allAddr := make([]*NetAddress, a.size())
allAddr := make([]*p2p.NetAddress, a.size())
i := 0
for _, ka := range a.addrLookup {
allAddr[i] = ka.Addr
@ -325,7 +292,7 @@ func (a *AddrBook) GetSelection() []*NetAddress {
}
// ListOfKnownAddresses returns the new and old addresses.
func (a *AddrBook) ListOfKnownAddresses() []*knownAddress {
func (a *addrBook) ListOfKnownAddresses() []*knownAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
@ -336,102 +303,27 @@ func (a *AddrBook) ListOfKnownAddresses() []*knownAddress {
return addrs
}
func (ka *knownAddress) copy() *knownAddress {
return &knownAddress{
Addr: ka.Addr,
Src: ka.Src,
Attempts: ka.Attempts,
LastAttempt: ka.LastAttempt,
LastSuccess: ka.LastSuccess,
BucketType: ka.BucketType,
Buckets: ka.Buckets,
}
}
/* Loading & Saving */
type addrBookJSON struct {
Key string
Addrs []*knownAddress
}
func (a *AddrBook) saveToFile(filePath string) {
a.Logger.Info("Saving AddrBook to file", "size", a.Size())
//------------------------------------------------
// Size returns the number of addresses in the book.
func (a *addrBook) Size() int {
a.mtx.Lock()
defer a.mtx.Unlock()
// Compile Addrs
addrs := []*knownAddress{}
for _, ka := range a.addrLookup {
addrs = append(addrs, ka)
return a.size()
}
aJSON := &addrBookJSON{
Key: a.key,
Addrs: addrs,
func (a *addrBook) size() int {
return a.nNew + a.nOld
}
jsonBytes, err := json.MarshalIndent(aJSON, "", "\t")
if err != nil {
a.Logger.Error("Failed to save AddrBook to file", "err", err)
return
}
err = cmn.WriteFileAtomic(filePath, jsonBytes, 0644)
if err != nil {
a.Logger.Error("Failed to save AddrBook to file", "file", filePath, "err", err)
}
//----------------------------------------------------------
// Save persists the address book to disk.
func (a *addrBook) Save() {
a.saveToFile(a.filePath) // thread safe
}
// Returns false if file does not exist.
// cmn.Panics if file is corrupt.
func (a *AddrBook) loadFromFile(filePath string) bool {
// If doesn't exist, do nothing.
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
return false
}
// Load addrBookJSON{}
r, err := os.Open(filePath)
if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", filePath, err))
}
defer r.Close() // nolint: errcheck
aJSON := &addrBookJSON{}
dec := json.NewDecoder(r)
err = dec.Decode(aJSON)
if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", filePath, err))
}
// Restore all the fields...
// Restore the key
a.key = aJSON.Key
// Restore .bucketsNew & .bucketsOld
for _, ka := range aJSON.Addrs {
for _, bucketIndex := range ka.Buckets {
bucket := a.getBucket(ka.BucketType, bucketIndex)
bucket[ka.Addr.String()] = ka
}
a.addrLookup[ka.ID()] = ka
if ka.BucketType == bucketTypeNew {
a.nNew++
} else {
a.nOld++
}
}
return true
}
// Save saves the book.
func (a *AddrBook) Save() {
a.Logger.Info("Saving AddrBook to file", "size", a.Size())
a.saveToFile(a.filePath)
}
/* Private methods */
func (a *AddrBook) saveRoutine() {
func (a *addrBook) saveRoutine() {
defer a.wg.Done()
saveFileTicker := time.NewTicker(dumpAddressInterval)
@ -449,7 +341,9 @@ out:
a.Logger.Info("Address handler done")
}
func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
//----------------------------------------------------------
func (a *addrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
switch bucketType {
case bucketTypeNew:
return a.bucketsNew[bucketIdx]
@ -463,7 +357,7 @@ func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAd
// Adds ka to new bucket. Returns false if it couldn't do it cuz buckets full.
// NOTE: currently it always returns true.
func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool {
func (a *addrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool {
// Sanity check
if ka.isOld() {
a.Logger.Error(cmn.Fmt("Cannot add address already in old bucket to a new bucket: %v", ka))
@ -480,24 +374,25 @@ func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool {
// Enforce max addresses.
if len(bucket) > newBucketSize {
a.Logger.Info("new bucket is full, expiring old ")
a.Logger.Info("new bucket is full, expiring new")
a.expireNew(bucketIdx)
}
// Add to bucket.
bucket[addrStr] = ka
// increment nNew if the peer doesnt already exist in a bucket
if ka.addBucketRef(bucketIdx) == 1 {
a.nNew++
}
// Ensure in addrLookup
// Add it to addrLookup
a.addrLookup[ka.ID()] = ka
return true
}
// Adds ka to old bucket. Returns false if it couldn't do it cuz buckets full.
func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
func (a *addrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
// Sanity check
if ka.isNew() {
a.Logger.Error(cmn.Fmt("Cannot add new address to old bucket: %v", ka))
@ -533,7 +428,7 @@ func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
return true
}
func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) {
func (a *addrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) {
if ka.BucketType != bucketType {
a.Logger.Error(cmn.Fmt("Bucket type mismatch: %v", ka))
return
@ -550,7 +445,7 @@ func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx
}
}
func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
func (a *addrBook) removeFromAllBuckets(ka *knownAddress) {
for _, bucketIdx := range ka.Buckets {
bucket := a.getBucket(ka.BucketType, bucketIdx)
delete(bucket, ka.Addr.String())
@ -564,7 +459,9 @@ func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
delete(a.addrLookup, ka.ID())
}
func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
//----------------------------------------------------------
func (a *addrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
bucket := a.getBucket(bucketType, bucketIdx)
var oldest *knownAddress
for _, ka := range bucket {
@ -575,7 +472,9 @@ func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
return oldest
}
func (a *AddrBook) addAddress(addr, src *NetAddress) error {
// adds the address to a "new" bucket. if its already in one,
// it only adds it probabilistically
func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error {
if a.routabilityStrict && !addr.Routable() {
return fmt.Errorf("Cannot add non-routable address %v", addr)
}
@ -605,7 +504,10 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) error {
}
bucket := a.calcNewBucket(addr, src)
a.addToNewBucket(ka, bucket)
added := a.addToNewBucket(ka, bucket)
if !added {
a.Logger.Info("Can't add new address, addr book is full", "address", addr, "total", a.size())
}
a.Logger.Info("Added new address", "address", addr, "total", a.size())
return nil
@ -613,7 +515,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) error {
// Make space in the new buckets by expiring the really bad entries.
// If no bad entries are available we remove the oldest.
func (a *AddrBook) expireNew(bucketIdx int) {
func (a *addrBook) expireNew(bucketIdx int) {
for addrStr, ka := range a.bucketsNew[bucketIdx] {
// If an entry is bad, throw it away
if ka.isBad() {
@ -628,10 +530,10 @@ func (a *AddrBook) expireNew(bucketIdx int) {
a.removeFromBucket(oldest, bucketTypeNew, bucketIdx)
}
// Promotes an address from new to old.
// TODO: Move to old probabilistically.
// The better a node is, the less likely it should be evicted from an old bucket.
func (a *AddrBook) moveToOld(ka *knownAddress) {
// Promotes an address from new to old. If the destination bucket is full,
// demote the oldest one to a "new" bucket.
// TODO: Demote more probabilistically?
func (a *addrBook) moveToOld(ka *knownAddress) {
// Sanity check
if ka.isOld() {
a.Logger.Error(cmn.Fmt("Cannot promote address that is already old %v", ka))
@ -674,9 +576,12 @@ func (a *AddrBook) moveToOld(ka *knownAddress) {
}
}
//---------------------------------------------------------------------
// calculate bucket placements
// doublesha256( key + sourcegroup +
// int64(doublesha256(key + group + sourcegroup))%bucket_per_group ) % num_new_buckets
func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int {
func (a *addrBook) calcNewBucket(addr, src *p2p.NetAddress) int {
data1 := []byte{}
data1 = append(data1, []byte(a.key)...)
data1 = append(data1, []byte(a.groupKey(addr))...)
@ -697,7 +602,7 @@ func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int {
// doublesha256( key + group +
// int64(doublesha256(key + addr))%buckets_per_group ) % num_old_buckets
func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
func (a *addrBook) calcOldBucket(addr *p2p.NetAddress) int {
data1 := []byte{}
data1 = append(data1, []byte(a.key)...)
data1 = append(data1, []byte(addr.String())...)
@ -719,7 +624,7 @@ func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
// This is the /16 for IPv4, the /32 (/36 for he.net) for IPv6, the string
// "local" for a local address and the string "unroutable" for an unroutable
// address.
func (a *AddrBook) groupKey(na *NetAddress) string {
func (a *addrBook) groupKey(na *p2p.NetAddress) string {
if a.routabilityStrict && na.Local() {
return "local"
}
@ -764,137 +669,6 @@ func (a *AddrBook) groupKey(na *NetAddress) string {
return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
}
//-----------------------------------------------------------------------------
/*
knownAddress
tracks information about a known network address that is used
to determine how viable an address is.
*/
type knownAddress struct {
Addr *NetAddress
Src *NetAddress
Attempts int32
LastAttempt time.Time
LastSuccess time.Time
BucketType byte
Buckets []int
}
func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress {
return &knownAddress{
Addr: addr,
Src: src,
Attempts: 0,
LastAttempt: time.Now(),
BucketType: bucketTypeNew,
Buckets: nil,
}
}
func (ka *knownAddress) ID() ID {
return ka.Addr.ID
}
func (ka *knownAddress) isOld() bool {
return ka.BucketType == bucketTypeOld
}
func (ka *knownAddress) isNew() bool {
return ka.BucketType == bucketTypeNew
}
func (ka *knownAddress) markAttempt() {
now := time.Now()
ka.LastAttempt = now
ka.Attempts += 1
}
func (ka *knownAddress) markGood() {
now := time.Now()
ka.LastAttempt = now
ka.Attempts = 0
ka.LastSuccess = now
}
func (ka *knownAddress) addBucketRef(bucketIdx int) int {
for _, bucket := range ka.Buckets {
if bucket == bucketIdx {
// TODO refactor to return error?
// log.Warn(Fmt("Bucket already exists in ka.Buckets: %v", ka))
return -1
}
}
ka.Buckets = append(ka.Buckets, bucketIdx)
return len(ka.Buckets)
}
func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
buckets := []int{}
for _, bucket := range ka.Buckets {
if bucket != bucketIdx {
buckets = append(buckets, bucket)
}
}
if len(buckets) != len(ka.Buckets)-1 {
// TODO refactor to return error?
// log.Warn(Fmt("bucketIdx not found in ka.Buckets: %v", ka))
return -1
}
ka.Buckets = buckets
return len(ka.Buckets)
}
/*
An address is bad if the address in question is a New address, has not been tried in the last
minute, and meets one of the following criteria:
1) It claims to be from the future
2) It hasn't been seen in over a month
3) It has failed at least three times and never succeeded
4) It has failed ten times in the last week
All addresses that meet these criteria are assumed to be worthless and not
worth keeping hold of.
XXX: so a good peer needs us to call MarkGood before the conditions above are reached!
*/
func (ka *knownAddress) isBad() bool {
// Is Old --> good
if ka.BucketType == bucketTypeOld {
return false
}
// Has been attempted in the last minute --> good
if ka.LastAttempt.Before(time.Now().Add(-1 * time.Minute)) {
return false
}
// Too old?
// XXX: does this mean if we've kept a connection up for this long we'll disconnect?!
// and shouldn't it be .Before ?
if ka.LastAttempt.After(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
return true
}
// Never succeeded?
if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries {
return true
}
// Hasn't succeeded in too long?
// XXX: does this mean if we've kept a connection up for this long we'll disconnect?!
if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) &&
ka.Attempts >= maxFailures {
return true
}
return false
}
//-----------------------------------------------------------------------------
// doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes.
func doubleSha256(b []byte) []byte {
hasher := sha256.New()

View File

@ -1,4 +1,4 @@
package p2p
package pex
import (
"encoding/hex"
@ -8,6 +8,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/p2p"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
)
@ -168,8 +169,8 @@ func TestAddrBookHandlesDuplicates(t *testing.T) {
}
type netAddressPair struct {
addr *NetAddress
src *NetAddress
addr *p2p.NetAddress
src *p2p.NetAddress
}
func randNetAddressPairs(t *testing.T, n int) []netAddressPair {
@ -180,7 +181,7 @@ func randNetAddressPairs(t *testing.T, n int) []netAddressPair {
return randAddrs
}
func randIPv4Address(t *testing.T) *NetAddress {
func randIPv4Address(t *testing.T) *p2p.NetAddress {
for {
ip := fmt.Sprintf("%v.%v.%v.%v",
rand.Intn(254)+1,
@ -189,9 +190,9 @@ func randIPv4Address(t *testing.T) *NetAddress {
rand.Intn(255),
)
port := rand.Intn(65535-1) + 1
id := ID(hex.EncodeToString(cmn.RandBytes(IDByteLength)))
idAddr := IDAddressString(id, fmt.Sprintf("%v:%v", ip, port))
addr, err := NewNetAddressString(idAddr)
id := p2p.ID(hex.EncodeToString(cmn.RandBytes(p2p.IDByteLength)))
idAddr := p2p.IDAddressString(id, fmt.Sprintf("%v:%v", ip, port))
addr, err := p2p.NewNetAddressString(idAddr)
assert.Nil(t, err, "error generating rand network address")
if addr.Routable() {
return addr

83
p2p/pex/file.go Normal file
View File

@ -0,0 +1,83 @@
package pex
import (
"encoding/json"
"os"
cmn "github.com/tendermint/tmlibs/common"
)
/* Loading & Saving */
type addrBookJSON struct {
Key string `json:"key"`
Addrs []*knownAddress `json:"addrs"`
}
func (a *addrBook) saveToFile(filePath string) {
a.Logger.Info("Saving AddrBook to file", "size", a.Size())
a.mtx.Lock()
defer a.mtx.Unlock()
// Compile Addrs
addrs := []*knownAddress{}
for _, ka := range a.addrLookup {
addrs = append(addrs, ka)
}
aJSON := &addrBookJSON{
Key: a.key,
Addrs: addrs,
}
jsonBytes, err := json.MarshalIndent(aJSON, "", "\t")
if err != nil {
a.Logger.Error("Failed to save AddrBook to file", "err", err)
return
}
err = cmn.WriteFileAtomic(filePath, jsonBytes, 0644)
if err != nil {
a.Logger.Error("Failed to save AddrBook to file", "file", filePath, "err", err)
}
}
// Returns false if file does not exist.
// cmn.Panics if file is corrupt.
func (a *addrBook) loadFromFile(filePath string) bool {
// If doesn't exist, do nothing.
_, err := os.Stat(filePath)
if os.IsNotExist(err) {
return false
}
// Load addrBookJSON{}
r, err := os.Open(filePath)
if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", filePath, err))
}
defer r.Close() // nolint: errcheck
aJSON := &addrBookJSON{}
dec := json.NewDecoder(r)
err = dec.Decode(aJSON)
if err != nil {
cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", filePath, err))
}
// Restore all the fields...
// Restore the key
a.key = aJSON.Key
// Restore .bucketsNew & .bucketsOld
for _, ka := range aJSON.Addrs {
for _, bucketIndex := range ka.Buckets {
bucket := a.getBucket(ka.BucketType, bucketIndex)
bucket[ka.Addr.String()] = ka
}
a.addrLookup[ka.ID()] = ka
if ka.BucketType == bucketTypeNew {
a.nNew++
} else {
a.nOld++
}
}
return true
}

142
p2p/pex/known_address.go Normal file
View File

@ -0,0 +1,142 @@
package pex
import (
"time"
"github.com/tendermint/tendermint/p2p"
)
// knownAddress tracks information about a known network address
// that is used to determine how viable an address is.
type knownAddress struct {
Addr *p2p.NetAddress `json:"addr"`
Src *p2p.NetAddress `json:"src"`
Attempts int32 `json:"attempts"`
LastAttempt time.Time `json:"last_attempt"`
LastSuccess time.Time `json:"last_success"`
BucketType byte `json:"bucket_type"`
Buckets []int `json:"buckets"`
}
func newKnownAddress(addr *p2p.NetAddress, src *p2p.NetAddress) *knownAddress {
return &knownAddress{
Addr: addr,
Src: src,
Attempts: 0,
LastAttempt: time.Now(),
BucketType: bucketTypeNew,
Buckets: nil,
}
}
func (ka *knownAddress) ID() p2p.ID {
return ka.Addr.ID
}
func (ka *knownAddress) copy() *knownAddress {
return &knownAddress{
Addr: ka.Addr,
Src: ka.Src,
Attempts: ka.Attempts,
LastAttempt: ka.LastAttempt,
LastSuccess: ka.LastSuccess,
BucketType: ka.BucketType,
Buckets: ka.Buckets,
}
}
func (ka *knownAddress) isOld() bool {
return ka.BucketType == bucketTypeOld
}
func (ka *knownAddress) isNew() bool {
return ka.BucketType == bucketTypeNew
}
func (ka *knownAddress) markAttempt() {
now := time.Now()
ka.LastAttempt = now
ka.Attempts += 1
}
func (ka *knownAddress) markGood() {
now := time.Now()
ka.LastAttempt = now
ka.Attempts = 0
ka.LastSuccess = now
}
func (ka *knownAddress) addBucketRef(bucketIdx int) int {
for _, bucket := range ka.Buckets {
if bucket == bucketIdx {
// TODO refactor to return error?
// log.Warn(Fmt("Bucket already exists in ka.Buckets: %v", ka))
return -1
}
}
ka.Buckets = append(ka.Buckets, bucketIdx)
return len(ka.Buckets)
}
func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
buckets := []int{}
for _, bucket := range ka.Buckets {
if bucket != bucketIdx {
buckets = append(buckets, bucket)
}
}
if len(buckets) != len(ka.Buckets)-1 {
// TODO refactor to return error?
// log.Warn(Fmt("bucketIdx not found in ka.Buckets: %v", ka))
return -1
}
ka.Buckets = buckets
return len(ka.Buckets)
}
/*
An address is bad if the address in question is a New address, has not been tried in the last
minute, and meets one of the following criteria:
1) It claims to be from the future
2) It hasn't been seen in over a week
3) It has failed at least three times and never succeeded
4) It has failed ten times in the last week
All addresses that meet these criteria are assumed to be worthless and not
worth keeping hold of.
XXX: so a good peer needs us to call MarkGood before the conditions above are reached!
*/
func (ka *knownAddress) isBad() bool {
// Is Old --> good
if ka.BucketType == bucketTypeOld {
return false
}
// Has been attempted in the last minute --> good
if ka.LastAttempt.Before(time.Now().Add(-1 * time.Minute)) {
return false
}
// Too old?
// XXX: does this mean if we've kept a connection up for this long we'll disconnect?!
// and shouldn't it be .Before ?
if ka.LastAttempt.After(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
return true
}
// Never succeeded?
if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries {
return true
}
// Hasn't succeeded in too long?
// XXX: does this mean if we've kept a connection up for this long we'll disconnect?!
if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) &&
ka.Attempts >= maxFailures {
return true
}
return false
}

55
p2p/pex/params.go Normal file
View File

@ -0,0 +1,55 @@
package pex
import "time"
const (
// addresses under which the address manager will claim to need more addresses.
needAddressThreshold = 1000
// interval used to dump the address cache to disk for future use.
dumpAddressInterval = time.Minute * 2
// max addresses in each old address bucket.
oldBucketSize = 64
// buckets we split old addresses over.
oldBucketCount = 64
// max addresses in each new address bucket.
newBucketSize = 64
// buckets that we spread new addresses over.
newBucketCount = 256
// old buckets over which an address group will be spread.
oldBucketsPerGroup = 4
// new buckets over which a source address group will be spread.
newBucketsPerGroup = 32
// buckets a frequently seen new address may end up in.
maxNewBucketsPerAddress = 4
// days before which we assume an address has vanished
// if we have not seen it announced in that long.
numMissingDays = 7
// tries without a single success before we assume an address is bad.
numRetries = 3
// max failures we will accept without a success before considering an address bad.
maxFailures = 10 // ?
// days since the last success before we will consider evicting an address.
minBadDays = 7
// % of total addresses known returned by GetSelection.
getSelectionPercent = 23
// min addresses that must be returned by GetSelection. Useful for bootstrapping.
minGetSelection = 32
// max addresses returned by GetSelection
// NOTE: this must match "maxPexMessageSize"
maxGetSelection = 250
)

View File

@ -1,4 +1,4 @@
package p2p
package pex
import (
"bytes"
@ -11,8 +11,13 @@ import (
"github.com/pkg/errors"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
)
type Peer = p2p.Peer
const (
// PexChannel is a channel for PEX messages
PexChannel = byte(0x00)
@ -45,9 +50,9 @@ const (
// Only accept pexAddrsMsg from peers we sent a corresponding pexRequestMsg too.
// Only accept one pexRequestMsg every ~defaultEnsurePeersPeriod.
type PEXReactor struct {
BaseReactor
p2p.BaseReactor
book *AddrBook
book AddrBook
config *PEXReactorConfig
ensurePeersPeriod time.Duration
@ -67,7 +72,7 @@ type PEXReactorConfig struct {
}
// NewPEXReactor creates new PEX reactor.
func NewPEXReactor(b *AddrBook, config *PEXReactorConfig) *PEXReactor {
func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
r := &PEXReactor{
book: b,
config: config,
@ -75,7 +80,7 @@ func NewPEXReactor(b *AddrBook, config *PEXReactorConfig) *PEXReactor {
requestsSent: cmn.NewCMap(),
lastReceivedRequests: cmn.NewCMap(),
}
r.BaseReactor = *NewBaseReactor("PEXReactor", r)
r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
return r
}
@ -111,8 +116,8 @@ func (r *PEXReactor) OnStop() {
}
// GetChannels implements Reactor
func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
return []*ChannelDescriptor{
func (r *PEXReactor) GetChannels() []*conn.ChannelDescriptor {
return []*conn.ChannelDescriptor{
{
ID: PexChannel,
Priority: 1,
@ -225,7 +230,7 @@ func (r *PEXReactor) RequestAddrs(p Peer) {
// ReceiveAddrs adds the given addrs to the addrbook if theres an open
// request for this peer and deletes the open request.
// If there's no open request for the src peer, it returns an error.
func (r *PEXReactor) ReceiveAddrs(addrs []*NetAddress, src Peer) error {
func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
id := string(src.ID())
if !r.requestsSent.Has(id) {
@ -244,7 +249,7 @@ func (r *PEXReactor) ReceiveAddrs(addrs []*NetAddress, src Peer) error {
}
// SendAddrs sends addrs to the peer.
func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*NetAddress) {
func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) {
p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: netAddrs}})
}
@ -294,7 +299,7 @@ func (r *PEXReactor) ensurePeers() {
// NOTE: range here is [10, 90]. Too high ?
newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
toDial := make(map[ID]*NetAddress)
toDial := make(map[p2p.ID]*p2p.NetAddress)
// Try maxAttempts times to pick numToDial addresses to dial
maxAttempts := numToDial * 3
for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
@ -317,11 +322,16 @@ func (r *PEXReactor) ensurePeers() {
// Dial picked addresses
for _, item := range toDial {
go func(picked *NetAddress) {
go func(picked *p2p.NetAddress) {
_, err := r.Switch.DialPeerWithAddress(picked, false)
if err != nil {
// TODO: detect more "bad peer" scenarios
if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok {
r.book.MarkBad(picked)
} else {
r.book.MarkAttempt(picked)
}
}
}(item)
}
@ -349,7 +359,7 @@ func (r *PEXReactor) checkSeeds() error {
if lSeeds == 0 {
return nil
}
_, errs := NewNetAddressStrings(r.config.Seeds)
_, errs := p2p.NewNetAddressStrings(r.config.Seeds)
for _, err := range errs {
if err != nil {
return err
@ -364,9 +374,10 @@ func (r *PEXReactor) dialSeeds() {
if lSeeds == 0 {
return
}
seedAddrs, _ := NewNetAddressStrings(r.config.Seeds)
seedAddrs, _ := p2p.NewNetAddressStrings(r.config.Seeds)
perm := r.Switch.rng.Perm(lSeeds)
perm := rand.Perm(lSeeds)
// perm := r.Switch.rng.Perm(lSeeds)
for _, i := range perm {
// dial a random seed
seedAddr := seedAddrs[i]
@ -408,7 +419,7 @@ func (r *PEXReactor) crawlPeersRoutine() {
// network crawling performed during seed/crawler mode.
type crawlPeerInfo struct {
// The listening address of a potential peer we learned about
Addr *NetAddress
Addr *p2p.NetAddress
// The last time we attempt to reach this address
LastAttempt time.Time
@ -532,7 +543,7 @@ func (m *pexRequestMessage) String() string {
A message with announced peer addresses.
*/
type pexAddrsMessage struct {
Addrs []*NetAddress
Addrs []*p2p.NetAddress
}
func (m *pexAddrsMessage) String() string {

View File

@ -1,21 +1,34 @@
package p2p
package pex
import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
crypto "github.com/tendermint/go-crypto"
wire "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
)
var (
config *cfg.P2PConfig
)
func init() {
config = cfg.DefaultP2PConfig()
config.PexReactor = true
}
func TestPEXReactorBasic(t *testing.T) {
assert, require := assert.New(t), require.New(t)
@ -45,7 +58,7 @@ func TestPEXReactorAddRemovePeer(t *testing.T) {
r.SetLogger(log.TestingLogger())
size := book.Size()
peer := createRandomPeer(false)
peer := p2p.CreateRandomPeer(false)
r.AddPeer(peer)
assert.Equal(size+1, book.Size())
@ -53,7 +66,7 @@ func TestPEXReactorAddRemovePeer(t *testing.T) {
r.RemovePeer(peer, "peer not available")
assert.Equal(size+1, book.Size())
outboundPeer := createRandomPeer(true)
outboundPeer := p2p.CreateRandomPeer(true)
r.AddPeer(outboundPeer)
assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book")
@ -64,7 +77,7 @@ func TestPEXReactorAddRemovePeer(t *testing.T) {
func TestPEXReactorRunning(t *testing.T) {
N := 3
switches := make([]*Switch, N)
switches := make([]*p2p.Switch, N)
dir, err := ioutil.TempDir("", "pex_reactor")
require.Nil(t, err)
@ -74,7 +87,7 @@ func TestPEXReactorRunning(t *testing.T) {
// create switches
for i := 0; i < N; i++ {
switches[i] = makeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
switches[i] = p2p.MakeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(log.TestingLogger().With("switch", i))
r := NewPEXReactor(book, &PEXReactorConfig{})
@ -87,9 +100,9 @@ func TestPEXReactorRunning(t *testing.T) {
// fill the address book and add listeners
for _, s := range switches {
addr, _ := NewNetAddressString(s.NodeInfo().ListenAddr)
addr, _ := p2p.NewNetAddressString(s.NodeInfo().ListenAddr)
book.AddAddress(addr, addr)
s.AddListener(NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger()))
s.AddListener(p2p.NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger()))
}
// start switches
@ -98,7 +111,7 @@ func TestPEXReactorRunning(t *testing.T) {
require.Nil(t, err)
}
assertSomePeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second)
assertPeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second, N-1)
// stop them
for _, s := range switches {
@ -106,7 +119,7 @@ func TestPEXReactorRunning(t *testing.T) {
}
}
func assertSomePeersWithTimeout(t *testing.T, switches []*Switch, checkPeriod, timeout time.Duration) {
func assertPeersWithTimeout(t *testing.T, switches []*p2p.Switch, checkPeriod, timeout time.Duration, nPeers int) {
ticker := time.NewTicker(checkPeriod)
remaining := timeout
for {
@ -116,7 +129,7 @@ func assertSomePeersWithTimeout(t *testing.T, switches []*Switch, checkPeriod, t
allGood := true
for _, s := range switches {
outbound, inbound, _ := s.NumPeers()
if outbound+inbound == 0 {
if outbound+inbound < nPeers {
allGood = false
}
}
@ -151,13 +164,13 @@ func TestPEXReactorReceive(t *testing.T) {
r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger())
peer := createRandomPeer(false)
peer := p2p.CreateRandomPeer(false)
// we have to send a request to receive responses
r.RequestAddrs(peer)
size := book.Size()
addrs := []*NetAddress{peer.NodeInfo().NetAddress()}
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
r.Receive(PexChannel, peer, msg)
assert.Equal(size+1, book.Size())
@ -176,14 +189,14 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) {
book.SetLogger(log.TestingLogger())
r := NewPEXReactor(book, &PEXReactorConfig{})
sw := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { return sw })
sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
sw.SetLogger(log.TestingLogger())
sw.AddReactor("PEX", r)
r.SetSwitch(sw)
r.SetLogger(log.TestingLogger())
peer := newMockPeer()
sw.peers.Add(peer)
p2p.AddPeerToSwitch(sw, peer)
assert.True(sw.Peers().Has(peer.ID()))
id := string(peer.ID())
@ -215,14 +228,14 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
book.SetLogger(log.TestingLogger())
r := NewPEXReactor(book, &PEXReactorConfig{})
sw := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { return sw })
sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
sw.SetLogger(log.TestingLogger())
sw.AddReactor("PEX", r)
r.SetSwitch(sw)
r.SetLogger(log.TestingLogger())
peer := newMockPeer()
sw.peers.Add(peer)
p2p.AddPeerToSwitch(sw, peer)
assert.True(sw.Peers().Has(peer.ID()))
id := string(peer.ID())
@ -232,7 +245,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
assert.True(r.requestsSent.Has(id))
assert.True(sw.Peers().Has(peer.ID()))
addrs := []*NetAddress{peer.NodeInfo().NetAddress()}
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
// receive some addrs. should clear the request
@ -254,7 +267,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
book.SetLogger(log.TestingLogger())
// 1. create seed
seed := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
seed := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(book, &PEXReactorConfig{})
@ -263,13 +276,13 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
sw.AddReactor("pex", r)
return sw
})
seed.AddListener(NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger()))
seed.AddListener(p2p.NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger()))
err = seed.Start()
require.Nil(t, err)
defer seed.Stop()
// 2. create usual peer
sw := makeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
sw := p2p.MakeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(book, &PEXReactorConfig{Seeds: []string{seed.NodeInfo().ListenAddr}})
@ -283,7 +296,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
defer sw.Stop()
// 3. check that peer at least connects to seed
assertSomePeersWithTimeout(t, []*Switch{sw}, 10*time.Millisecond, 10*time.Second)
assertPeersWithTimeout(t, []*p2p.Switch{sw}, 10*time.Millisecond, 10*time.Second, 1)
}
func TestPEXReactorCrawlStatus(t *testing.T) {
@ -297,7 +310,7 @@ func TestPEXReactorCrawlStatus(t *testing.T) {
pexR := NewPEXReactor(book, &PEXReactorConfig{SeedMode: true})
// Seed/Crawler mode uses data from the Switch
makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
pexR.SetLogger(log.TestingLogger())
sw.SetLogger(log.TestingLogger().With("switch", i))
sw.AddReactor("pex", pexR)
@ -305,13 +318,13 @@ func TestPEXReactorCrawlStatus(t *testing.T) {
})
// Create a peer, add it to the peer set and the addrbook.
peer := createRandomPeer(false)
pexR.Switch.peers.Add(peer)
peer := p2p.CreateRandomPeer(false)
p2p.AddPeerToSwitch(pexR.Switch, peer)
addr1 := peer.NodeInfo().NetAddress()
pexR.book.AddAddress(addr1, addr1)
// Add a non-connected address to the book.
_, addr2 := createRoutableAddr()
_, addr2 := p2p.CreateRoutableAddr()
pexR.book.AddAddress(addr2, addr1)
// Get some peerInfos to crawl
@ -323,44 +336,15 @@ func TestPEXReactorCrawlStatus(t *testing.T) {
// TODO: test
}
func createRoutableAddr() (addr string, netAddr *NetAddress) {
for {
var err error
addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256)
netAddr, err = NewNetAddressString(addr)
if err != nil {
panic(err)
}
if netAddr.Routable() {
break
}
}
return
}
func createRandomPeer(outbound bool) *peer {
addr, netAddr := createRoutableAddr()
p := &peer{
nodeInfo: NodeInfo{
ListenAddr: netAddr.DialString(),
PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
},
outbound: outbound,
mconn: &MConnection{},
}
p.SetLogger(log.TestingLogger().With("peer", addr))
return p
}
type mockPeer struct {
*cmn.BaseService
pubKey crypto.PubKey
addr *NetAddress
addr *p2p.NetAddress
outbound, persistent bool
}
func newMockPeer() mockPeer {
_, netAddr := createRoutableAddr()
_, netAddr := p2p.CreateRoutableAddr()
mp := mockPeer{
addr: netAddr,
pubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
@ -370,16 +354,16 @@ func newMockPeer() mockPeer {
return mp
}
func (mp mockPeer) ID() ID { return PubKeyToID(mp.pubKey) }
func (mp mockPeer) ID() p2p.ID { return p2p.PubKeyToID(mp.pubKey) }
func (mp mockPeer) IsOutbound() bool { return mp.outbound }
func (mp mockPeer) IsPersistent() bool { return mp.persistent }
func (mp mockPeer) NodeInfo() NodeInfo {
return NodeInfo{
func (mp mockPeer) NodeInfo() p2p.NodeInfo {
return p2p.NodeInfo{
PubKey: mp.pubKey,
ListenAddr: mp.addr.DialString(),
}
}
func (mp mockPeer) Status() ConnectionStatus { return ConnectionStatus{} }
func (mp mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
func (mp mockPeer) Send(byte, interface{}) bool { return false }
func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
func (mp mockPeer) Set(string, interface{}) {}

View File

@ -11,6 +11,7 @@ import (
crypto "github.com/tendermint/go-crypto"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p/conn"
cmn "github.com/tendermint/tmlibs/common"
)
@ -30,10 +31,12 @@ const (
reconnectBackOffBaseSeconds = 3
)
var (
ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
ErrSwitchConnectToSelf = errors.New("Connect to self")
)
//-----------------------------------------------------------------------------
type AddrBook interface {
AddAddress(addr *NetAddress, src *NetAddress) error
Save()
}
//-----------------------------------------------------------------------------
@ -48,7 +51,7 @@ type Switch struct {
peerConfig *PeerConfig
listeners []Listener
reactors map[string]Reactor
chDescs []*ChannelDescriptor
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *cmn.CMap
@ -66,7 +69,7 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
config: config,
peerConfig: DefaultPeerConfig(),
reactors: make(map[string]Reactor),
chDescs: make([]*ChannelDescriptor, 0),
chDescs: make([]*conn.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
@ -77,10 +80,10 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
sw.rng = rand.New(rand.NewSource(cmn.RandInt64()))
// TODO: collapse the peerConfig into the config ?
sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond
sw.peerConfig.MConfig.FlushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond
sw.peerConfig.MConfig.SendRate = config.SendRate
sw.peerConfig.MConfig.RecvRate = config.RecvRate
sw.peerConfig.MConfig.maxMsgPacketPayloadSize = config.MaxMsgPacketPayloadSize
sw.peerConfig.MConfig.MaxMsgPacketPayloadSize = config.MaxMsgPacketPayloadSize
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
return sw
@ -265,6 +268,8 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) {
// If no success after all that, it stops trying, and leaves it
// to the PEX/Addrbook to find the peer again
func (sw *Switch) reconnectToPeer(peer Peer) {
// NOTE this will connect to the self reported address,
// not necessarily the original we dialed
netAddr := peer.NodeInfo().NetAddress()
start := time.Now()
sw.Logger.Info("Reconnecting to peer", "peer", peer)
@ -316,7 +321,7 @@ func (sw *Switch) IsDialing(id ID) bool {
}
// DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent bool) error {
func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error {
netAddrs, errs := NewNetAddressStrings(peers)
for _, err := range errs {
sw.Logger.Error("Error in peer's address", "err", err)
@ -330,8 +335,11 @@ func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent
if netAddr.Same(ourAddr) {
continue
}
// TODO: move this out of here ?
addrBook.AddAddress(netAddr, ourAddr)
}
// Persist some peers to disk right away.
// NOTE: integration tests depend on this
addrBook.Save()
}
@ -454,7 +462,7 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig
peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr)
} else if addr.ID != peer.ID() {
peer.CloseConn()
return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID())
return nil, ErrSwitchAuthenticationFailure{addr, peer.ID()}
}
err = sw.addPeer(peer)

View File

@ -16,6 +16,7 @@ import (
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p/conn"
)
var (
@ -37,7 +38,7 @@ type TestReactor struct {
BaseReactor
mtx sync.Mutex
channels []*ChannelDescriptor
channels []*conn.ChannelDescriptor
peersAdded []Peer
peersRemoved []Peer
logMessages bool
@ -45,7 +46,7 @@ type TestReactor struct {
msgsReceived map[byte][]PeerMessage
}
func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor {
func NewTestReactor(channels []*conn.ChannelDescriptor, logMessages bool) *TestReactor {
tr := &TestReactor{
channels: channels,
logMessages: logMessages,
@ -56,7 +57,7 @@ func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReacto
return tr
}
func (tr *TestReactor) GetChannels() []*ChannelDescriptor {
func (tr *TestReactor) GetChannels() []*conn.ChannelDescriptor {
return tr.channels
}
@ -92,7 +93,7 @@ func (tr *TestReactor) getMsgs(chID byte) []PeerMessage {
// convenience method for creating two switches connected to each other.
// XXX: note this uses net.Pipe and not a proper TCP conn
func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) {
// Create two switches that will be interconnected.
switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches)
return switches[0], switches[1]
@ -100,11 +101,11 @@ func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switc
func initSwitchFunc(i int, sw *Switch) *Switch {
// Make two reactors of two channels each
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
}, true))
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
}, true))
@ -112,7 +113,7 @@ func initSwitchFunc(i int, sw *Switch) *Switch {
}
func TestSwitches(t *testing.T) {
s1, s2 := makeSwitchPair(t, initSwitchFunc)
s1, s2 := MakeSwitchPair(t, initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
@ -156,12 +157,12 @@ func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reacto
}
func TestConnAddrFilter(t *testing.T) {
s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
c1, c2 := netPipe()
c1, c2 := conn.NetPipe()
s1.SetAddrFilter(func(addr net.Addr) error {
if addr.String() == c1.RemoteAddr().String() {
@ -192,12 +193,12 @@ func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration)
}
func TestConnPubKeyFilter(t *testing.T) {
s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
c1, c2 := netPipe()
c1, c2 := conn.NetPipe()
// set pubkey filter
s1.SetPubKeyFilter(func(pubkey crypto.PubKey) error {
@ -224,7 +225,7 @@ func TestConnPubKeyFilter(t *testing.T) {
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
if err != nil {
t.Error(err)
@ -251,7 +252,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
assert, require := assert.New(t), require.New(t)
sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc)
err := sw.Start()
if err != nil {
t.Error(err)
@ -302,13 +303,13 @@ func TestSwitchFullConnectivity(t *testing.T) {
func BenchmarkSwitches(b *testing.B) {
b.StopTimer()
s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch {
s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch {
// Make bar reactors of bar channels each
sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{
sw.AddReactor("foo", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x00), Priority: 10},
{ID: byte(0x01), Priority: 10},
}, false))
sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{
sw.AddReactor("bar", NewTestReactor([]*conn.ChannelDescriptor{
{ID: byte(0x02), Priority: 10},
{ID: byte(0x03), Priority: 10},
}, false))

View File

@ -5,11 +5,46 @@ import (
"net"
crypto "github.com/tendermint/go-crypto"
cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p/conn"
)
func AddPeerToSwitch(sw *Switch, peer Peer) {
sw.peers.Add(peer)
}
func CreateRandomPeer(outbound bool) *peer {
addr, netAddr := CreateRoutableAddr()
p := &peer{
nodeInfo: NodeInfo{
ListenAddr: netAddr.DialString(),
PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(),
},
outbound: outbound,
mconn: &conn.MConnection{},
}
p.SetLogger(log.TestingLogger().With("peer", addr))
return p
}
func CreateRoutableAddr() (addr string, netAddr *NetAddress) {
for {
var err error
addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256)
netAddr, err = NewNetAddressString(addr)
if err != nil {
panic(err)
}
if netAddr.Routable() {
break
}
}
return
}
//------------------------------------------------------------------
// Connects switches via arbitrary net.Conn. Used for testing.
@ -20,7 +55,7 @@ import (
func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
switches := make([]*Switch, n)
for i := 0; i < n; i++ {
switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
switches[i] = MakeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
}
if err := StartSwitches(switches); err != nil {
@ -42,7 +77,7 @@ func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Swit
func Connect2Switches(switches []*Switch, i, j int) {
switchI := switches[i]
switchJ := switches[j]
c1, c2 := netPipe()
c1, c2 := conn.NetPipe()
doneCh := make(chan struct{})
go func() {
err := switchI.addPeerWithConnection(c1)
@ -91,7 +126,7 @@ func StartSwitches(switches []*Switch) error {
return nil
}
func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
// new switch, add reactors
// TODO: let the config be passed in?
nodeKey := &NodeKey{

View File

@ -1,112 +1,8 @@
package p2p
import (
"fmt"
"net"
"strconv"
"strings"
crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/p2p/conn"
)
const maxNodeInfoSize = 10240 // 10Kb
// NodeInfo is the basic node information exchanged
// between two peers during the Tendermint P2P handshake.
type NodeInfo struct {
// Authenticate
PubKey crypto.PubKey `json:"pub_key"` // authenticated pubkey
ListenAddr string `json:"listen_addr"` // accepting incoming
// Check compatibility
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
// Sanitize
Moniker string `json:"moniker"` // arbitrary moniker
Other []string `json:"other"` // other application specific data
}
// Validate checks the self-reported NodeInfo is safe.
// It returns an error if the info.PubKey doesn't match the given pubKey.
// TODO: constraints for Moniker/Other? Or is that for the UI ?
func (info NodeInfo) Validate(pubKey crypto.PubKey) error {
if !info.PubKey.Equals(pubKey) {
return fmt.Errorf("info.PubKey (%v) doesn't match peer.PubKey (%v)",
info.PubKey, pubKey)
}
return nil
}
// CONTRACT: two nodes are compatible if the major/minor versions match and network match
func (info NodeInfo) CompatibleWith(other NodeInfo) error {
iMajor, iMinor, _, iErr := splitVersion(info.Version)
oMajor, oMinor, _, oErr := splitVersion(other.Version)
// if our own version number is not formatted right, we messed up
if iErr != nil {
return iErr
}
// version number must be formatted correctly ("x.x.x")
if oErr != nil {
return oErr
}
// major version must match
if iMajor != oMajor {
return fmt.Errorf("Peer is on a different major version. Got %v, expected %v", oMajor, iMajor)
}
// minor version must match
if iMinor != oMinor {
return fmt.Errorf("Peer is on a different minor version. Got %v, expected %v", oMinor, iMinor)
}
// nodes must be on the same network
if info.Network != other.Network {
return fmt.Errorf("Peer is on a different network. Got %v, expected %v", other.Network, info.Network)
}
return nil
}
func (info NodeInfo) ID() ID {
return PubKeyToID(info.PubKey)
}
func (info NodeInfo) NetAddress() *NetAddress {
id := PubKeyToID(info.PubKey)
addr := info.ListenAddr
netAddr, err := NewNetAddressString(IDAddressString(id, addr))
if err != nil {
panic(err) // everything should be well formed by now
}
return netAddr
}
func (info NodeInfo) ListenHost() string {
host, _, _ := net.SplitHostPort(info.ListenAddr) // nolint: errcheck, gas
return host
}
func (info NodeInfo) ListenPort() int {
_, port, _ := net.SplitHostPort(info.ListenAddr) // nolint: errcheck, gas
port_i, err := strconv.Atoi(port)
if err != nil {
return -1
}
return port_i
}
func (info NodeInfo) String() string {
return fmt.Sprintf("NodeInfo{pk: %v, moniker: %v, network: %v [listen %v], version: %v (%v)}", info.PubKey, info.Moniker, info.Network, info.ListenAddr, info.Version, info.Other)
}
func splitVersion(version string) (string, string, string, error) {
spl := strings.Split(version, ".")
if len(spl) != 3 {
return "", "", "", fmt.Errorf("Invalid version format %v", version)
}
return spl[0], spl[1], spl[2], nil
}
type ChannelDescriptor = conn.ChannelDescriptor
type ConnectionStatus = conn.ConnectionStatus

View File

@ -32,7 +32,7 @@ type P2P interface {
NumPeers() (outbound, inbound, dialig int)
NodeInfo() p2p.NodeInfo
IsListening() bool
DialPeersAsync(*p2p.AddrBook, []string, bool) error
DialPeersAsync(p2p.AddrBook, []string, bool) error
}
//----------------------------------------------
@ -54,7 +54,7 @@ var (
// objects
pubKey crypto.PubKey
genDoc *types.GenesisDoc // cache the genesis structure
addrBook *p2p.AddrBook
addrBook p2p.AddrBook
txIndexer txindex.TxIndexer
consensusReactor *consensus.ConsensusReactor
eventBus *types.EventBus // thread safe
@ -94,7 +94,7 @@ func SetGenesisDoc(doc *types.GenesisDoc) {
genDoc = doc
}
func SetAddrBook(book *p2p.AddrBook) {
func SetAddrBook(book p2p.AddrBook) {
addrBook = book
}

View File

@ -17,18 +17,18 @@ CLIENT_NAME="pex_addrbook_$ID"
echo "1. restart peer $ID"
docker stop "local_testnet_$ID"
# preserve addrbook.json
docker cp "local_testnet_$ID:/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/addrbook.json" "/tmp/addrbook.json"
docker cp "local_testnet_$ID:/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/config/addrbook.json" "/tmp/addrbook.json"
set +e #CIRCLE
docker rm -vf "local_testnet_$ID"
set -e
# NOTE that we do not provide persistent_peers
bash test/p2p/peer.sh "$DOCKER_IMAGE" "$NETWORK_NAME" "$ID" "$PROXY_APP" "--p2p.pex --rpc.unsafe"
docker cp "/tmp/addrbook.json" "local_testnet_$ID:/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/addrbook.json"
docker cp "/tmp/addrbook.json" "local_testnet_$ID:/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/config/addrbook.json"
echo "with the following addrbook:"
cat /tmp/addrbook.json
# exec doesn't work on circle
# docker exec "local_testnet_$ID" cat "/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/addrbook.json"
# docker exec "local_testnet_$ID" cat "/go/src/github.com/tendermint/tendermint/test/p2p/data/mach1/core/config/addrbook.json"
echo ""
# if the client runs forever, it means addrbook wasn't saved or was empty