p2p: seed mode refactoring (#3011)

ListOfKnownAddresses is removed
panic if addrbook size is less than zero
CrawlPeers does not attempt to connect to existing or peers we're currently dialing
various perf. fixes
improved tests (though not complete)
move IsDialingOrExistingAddress check into DialPeerWithAddress (Fixes #2716)


* addrbook: preallocate memory when saving addrbook to file

* addrbook: remove oldestFirst struct and check for ID

* oldestFirst replaced with sort.Slice
* ID is now mandatory, so no need to check

* addrbook: remove ListOfKnownAddresses

GetSelection is used instead in seed mode.

* addrbook: panic if size is less than 0

* rewrite addrbook#saveToFile to not use a counter

* test AttemptDisconnects func

* move IsDialingOrExistingAddress check into DialPeerWithAddress

* save and cleanup crawl peer data

* get rid of DefaultSeedDisconnectWaitPeriod

* make linter happy

* fix TestPEXReactorSeedMode

* fix comment

* add a changelog entry

* Apply suggestions from code review

Co-Authored-By: melekes <anton.kalyaev@gmail.com>

* rename ErrDialingOrExistingAddress to ErrCurrentlyDialingOrExistingAddress

* lowercase errors

* do not persist seed data

pros:
- no extra files
- less IO

cons:
- if the node crashes, seed might crawl a peer too soon

* fixes after Ethan's review

* add a changelog entry

* we should only consult Switch about peers

checking addrbook size does not make sense since only PEX reactor uses
it for dialing peers!

https://github.com/tendermint/tendermint/pull/3011#discussion_r270948875
This commit is contained in:
Anton Kaliaev
2019-04-03 11:22:52 +02:00
committed by GitHub
parent 086d6cbe8c
commit f965a4db15
9 changed files with 175 additions and 165 deletions

View File

@ -19,4 +19,4 @@
### IMPROVEMENTS: ### IMPROVEMENTS:
### BUG FIXES: ### BUG FIXES:
- [p2p] \#2716 Check if we're already connected to peer right before dialing it (@melekes)

View File

@ -498,6 +498,12 @@ func NewNode(config *cfg.Config,
&pex.PEXReactorConfig{ &pex.PEXReactorConfig{
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "), Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.P2P.SeedMode, SeedMode: config.P2P.SeedMode,
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
// blocks assuming 10s blocks ~ 28 hours.
// TODO (melekes): make it dynamic based on the actual block latencies
// from the live network.
// https://github.com/tendermint/tendermint/issues/3523
SeedDisconnectWaitPeriod: 28 * time.Hour,
}) })
pexReactor.SetLogger(logger.With("module", "pex")) pexReactor.SetLogger(logger.With("module", "pex"))
sw.AddReactor("PEX", pexReactor) sw.AddReactor("PEX", pexReactor)

View File

@ -103,7 +103,7 @@ type ErrSwitchDuplicatePeerID struct {
} }
func (e ErrSwitchDuplicatePeerID) Error() string { func (e ErrSwitchDuplicatePeerID) Error() string {
return fmt.Sprintf("Duplicate peer ID %v", e.ID) return fmt.Sprintf("duplicate peer ID %v", e.ID)
} }
// ErrSwitchDuplicatePeerIP to be raised whena a peer is connecting with a known // ErrSwitchDuplicatePeerIP to be raised whena a peer is connecting with a known
@ -113,7 +113,7 @@ type ErrSwitchDuplicatePeerIP struct {
} }
func (e ErrSwitchDuplicatePeerIP) Error() string { func (e ErrSwitchDuplicatePeerIP) Error() string {
return fmt.Sprintf("Duplicate peer IP %v", e.IP.String()) return fmt.Sprintf("duplicate peer IP %v", e.IP.String())
} }
// ErrSwitchConnectToSelf to be raised when trying to connect to itself. // ErrSwitchConnectToSelf to be raised when trying to connect to itself.
@ -122,7 +122,7 @@ type ErrSwitchConnectToSelf struct {
} }
func (e ErrSwitchConnectToSelf) Error() string { func (e ErrSwitchConnectToSelf) Error() string {
return fmt.Sprintf("Connect to self: %v", e.Addr) return fmt.Sprintf("connect to self: %v", e.Addr)
} }
type ErrSwitchAuthenticationFailure struct { type ErrSwitchAuthenticationFailure struct {
@ -132,7 +132,7 @@ type ErrSwitchAuthenticationFailure struct {
func (e ErrSwitchAuthenticationFailure) Error() string { func (e ErrSwitchAuthenticationFailure) Error() string {
return fmt.Sprintf( return fmt.Sprintf(
"Failed to authenticate peer. Dialed %v, but got peer with ID %s", "failed to authenticate peer. Dialed %v, but got peer with ID %s",
e.Dialed, e.Dialed,
e.Got, e.Got,
) )
@ -152,7 +152,7 @@ type ErrNetAddressNoID struct {
} }
func (e ErrNetAddressNoID) Error() string { func (e ErrNetAddressNoID) Error() string {
return fmt.Sprintf("Address (%s) does not contain ID", e.Addr) return fmt.Sprintf("address (%s) does not contain ID", e.Addr)
} }
type ErrNetAddressInvalid struct { type ErrNetAddressInvalid struct {
@ -161,7 +161,7 @@ type ErrNetAddressInvalid struct {
} }
func (e ErrNetAddressInvalid) Error() string { func (e ErrNetAddressInvalid) Error() string {
return fmt.Sprintf("Invalid address (%s): %v", e.Addr, e.Err) return fmt.Sprintf("invalid address (%s): %v", e.Addr, e.Err)
} }
type ErrNetAddressLookup struct { type ErrNetAddressLookup struct {
@ -170,5 +170,15 @@ type ErrNetAddressLookup struct {
} }
func (e ErrNetAddressLookup) Error() string { func (e ErrNetAddressLookup) Error() string {
return fmt.Sprintf("Error looking up host (%s): %v", e.Addr, e.Err) return fmt.Sprintf("error looking up host (%s): %v", e.Addr, e.Err)
}
// ErrCurrentlyDialingOrExistingAddress indicates that we're currently
// dialing this address or it belongs to an existing peer.
type ErrCurrentlyDialingOrExistingAddress struct {
Addr string
}
func (e ErrCurrentlyDialingOrExistingAddress) Error() string {
return fmt.Sprintf("connection with %s has been established or dialed", e.Addr)
} }

View File

@ -66,8 +66,7 @@ type AddrBook interface {
// Send a selection of addresses with bias // Send a selection of addresses with bias
GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddress GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddress
// TODO: remove Size() int
ListOfKnownAddresses() []*knownAddress
// Persist to disk // Persist to disk
Save() Save()
@ -254,7 +253,7 @@ func (a *addrBook) PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress {
bookSize := a.size() bookSize := a.size()
if bookSize <= 0 { if bookSize <= 0 {
if bookSize < 0 { if bookSize < 0 {
a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld) panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld))
} }
return nil return nil
} }
@ -339,7 +338,7 @@ func (a *addrBook) GetSelection() []*p2p.NetAddress {
bookSize := a.size() bookSize := a.size()
if bookSize <= 0 { if bookSize <= 0 {
if bookSize < 0 { if bookSize < 0 {
a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld) panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld))
} }
return nil return nil
} }
@ -389,7 +388,7 @@ func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddre
bookSize := a.size() bookSize := a.size()
if bookSize <= 0 { if bookSize <= 0 {
if bookSize < 0 { if bookSize < 0 {
a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld) panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld))
} }
return nil return nil
} }
@ -414,18 +413,6 @@ func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddre
return selection return selection
} }
// ListOfKnownAddresses returns the new and old addresses.
func (a *addrBook) ListOfKnownAddresses() []*knownAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
addrs := []*knownAddress{}
for _, addr := range a.addrLookup {
addrs = append(addrs, addr.copy())
}
return addrs
}
//------------------------------------------------ //------------------------------------------------
// Size returns the number of addresses in the book. // Size returns the number of addresses in the book.
@ -473,8 +460,7 @@ func (a *addrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAd
case bucketTypeOld: case bucketTypeOld:
return a.bucketsOld[bucketIdx] return a.bucketsOld[bucketIdx]
default: default:
cmn.PanicSanity("Should not happen") panic("Invalid bucket type")
return nil
} }
} }

View File

@ -16,16 +16,15 @@ type addrBookJSON struct {
} }
func (a *addrBook) saveToFile(filePath string) { func (a *addrBook) saveToFile(filePath string) {
a.Logger.Info("Saving AddrBook to file", "size", a.Size())
a.mtx.Lock() a.mtx.Lock()
defer a.mtx.Unlock() defer a.mtx.Unlock()
// Compile Addrs
addrs := []*knownAddress{} a.Logger.Info("Saving AddrBook to file", "size", a.size())
addrs := make([]*knownAddress, 0, len(a.addrLookup))
for _, ka := range a.addrLookup { for _, ka := range a.addrLookup {
addrs = append(addrs, ka) addrs = append(addrs, ka)
} }
aJSON := &addrBookJSON{ aJSON := &addrBookJSON{
Key: a.key, Key: a.key,
Addrs: addrs, Addrs: addrs,

View File

@ -33,18 +33,6 @@ func (ka *knownAddress) ID() p2p.ID {
return ka.Addr.ID return ka.Addr.ID
} }
func (ka *knownAddress) copy() *knownAddress {
return &knownAddress{
Addr: ka.Addr,
Src: ka.Src,
Attempts: ka.Attempts,
LastAttempt: ka.LastAttempt,
LastSuccess: ka.LastSuccess,
BucketType: ka.BucketType,
Buckets: ka.Buckets,
}
}
func (ka *knownAddress) isOld() bool { func (ka *knownAddress) isOld() bool {
return ka.BucketType == bucketTypeOld return ka.BucketType == bucketTypeOld
} }

View File

@ -3,7 +3,6 @@ package pex
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"sort"
"sync" "sync"
"time" "time"
@ -35,16 +34,11 @@ const (
// Seed/Crawler constants // Seed/Crawler constants
// We want seeds to only advertise good peers. Therefore they should wait at // minTimeBetweenCrawls is a minimum time between attempts to crawl a peer.
// least as long as we expect it to take for a peer to become good before minTimeBetweenCrawls = 2 * time.Minute
// disconnecting.
// see consensus/reactor.go: blocksToContributeToBecomeGoodPeer
// 10000 blocks assuming 1s blocks ~ 2.7 hours.
defaultSeedDisconnectWaitPeriod = 3 * time.Hour
defaultCrawlPeerInterval = 2 * time.Minute // don't redial for this. TODO: back-off. what for? // check some peers every this
crawlPeerPeriod = 30 * time.Second
defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this
maxAttemptsToDial = 16 // ~ 35h in total (last attempt - 18h) maxAttemptsToDial = 16 // ~ 35h in total (last attempt - 18h)
@ -77,6 +71,9 @@ type PEXReactor struct {
seedAddrs []*p2p.NetAddress seedAddrs []*p2p.NetAddress
attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)} attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)}
// seed/crawled mode fields
crawlPeerInfos map[p2p.ID]crawlPeerInfo
} }
func (r *PEXReactor) minReceiveRequestInterval() time.Duration { func (r *PEXReactor) minReceiveRequestInterval() time.Duration {
@ -90,6 +87,11 @@ type PEXReactorConfig struct {
// Seed/Crawler mode // Seed/Crawler mode
SeedMode bool SeedMode bool
// We want seeds to only advertise good peers. Therefore they should wait at
// least as long as we expect it to take for a peer to become good before
// disconnecting.
SeedDisconnectWaitPeriod time.Duration
// Seeds is a list of addresses reactor may use // Seeds is a list of addresses reactor may use
// if it can't connect to peers in the addrbook. // if it can't connect to peers in the addrbook.
Seeds []string Seeds []string
@ -108,6 +110,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
ensurePeersPeriod: defaultEnsurePeersPeriod, ensurePeersPeriod: defaultEnsurePeersPeriod,
requestsSent: cmn.NewCMap(), requestsSent: cmn.NewCMap(),
lastReceivedRequests: cmn.NewCMap(), lastReceivedRequests: cmn.NewCMap(),
crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo),
} }
r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r) r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
return r return r
@ -363,9 +366,9 @@ func (r *PEXReactor) ensurePeersRoutine() {
) )
// Randomize first round of communication to avoid thundering herd. // Randomize first round of communication to avoid thundering herd.
// If no potential peers are present directly start connecting so we guarantee // If no peers are present directly start connecting so we guarantee swift
// swift setup with the help of configured seeds. // setup with the help of configured seeds.
if r.hasPotentialPeers() { if r.nodeHasSomePeersOrDialingAny() {
time.Sleep(time.Duration(jitter)) time.Sleep(time.Duration(jitter))
} }
@ -493,23 +496,26 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) {
err := r.Switch.DialPeerWithAddress(addr, false) err := r.Switch.DialPeerWithAddress(addr, false)
if err != nil { if err != nil {
if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok {
return
}
r.Logger.Error("Dialing failed", "addr", addr, "err", err, "attempts", attempts) r.Logger.Error("Dialing failed", "addr", addr, "err", err, "attempts", attempts)
// TODO: detect more "bad peer" scenarios markAddrInBookBasedOnErr(addr, r.book, err)
if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok { if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok {
r.book.MarkBad(addr)
r.attemptsToDial.Delete(addr.DialString()) r.attemptsToDial.Delete(addr.DialString())
} else { } else {
r.book.MarkAttempt(addr)
// FIXME: if the addr is going to be removed from the addrbook (hard to // FIXME: if the addr is going to be removed from the addrbook (hard to
// tell at this point), we need to Delete it from attemptsToDial, not // tell at this point), we need to Delete it from attemptsToDial, not
// record another attempt. // record another attempt.
// record attempt // record attempt
r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()}) r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()})
} }
} else { return
// cleanup any history
r.attemptsToDial.Delete(addr.DialString())
} }
// cleanup any history
r.attemptsToDial.Delete(addr.DialString())
} }
// checkSeeds checks that addresses are well formed. // checkSeeds checks that addresses are well formed.
@ -568,101 +574,92 @@ func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int {
// from peers, except other seed nodes. // from peers, except other seed nodes.
func (r *PEXReactor) crawlPeersRoutine() { func (r *PEXReactor) crawlPeersRoutine() {
// Do an initial crawl // Do an initial crawl
r.crawlPeers() r.crawlPeers(r.book.GetSelection())
// Fire periodically // Fire periodically
ticker := time.NewTicker(defaultCrawlPeersPeriod) ticker := time.NewTicker(crawlPeerPeriod)
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
r.attemptDisconnects() r.attemptDisconnects()
r.crawlPeers() r.crawlPeers(r.book.GetSelection())
r.cleanupCrawlPeerInfos()
case <-r.Quit(): case <-r.Quit():
return return
} }
} }
} }
// hasPotentialPeers indicates if there is a potential peer to connect to, by // nodeHasSomePeersOrDialingAny returns true if the node is connected to some
// consulting the Switch as well as the AddrBook. // peers or dialing them currently.
func (r *PEXReactor) hasPotentialPeers() bool { func (r *PEXReactor) nodeHasSomePeersOrDialingAny() bool {
out, in, dial := r.Switch.NumPeers() out, in, dial := r.Switch.NumPeers()
return out+in+dial > 0
return out+in+dial > 0 && len(r.book.ListOfKnownAddresses()) > 0
} }
// crawlPeerInfo handles temporary data needed for the // crawlPeerInfo handles temporary data needed for the network crawling
// network crawling performed during seed/crawler mode. // performed during seed/crawler mode.
type crawlPeerInfo struct { type crawlPeerInfo struct {
// The listening address of a potential peer we learned about Addr *p2p.NetAddress `json:"addr"`
Addr *p2p.NetAddress // The last time we crawled the peer or attempted to do so.
LastCrawled time.Time `json:"last_crawled"`
// The last time we attempt to reach this address
LastAttempt time.Time
// The last time we successfully reached this address
LastSuccess time.Time
} }
// oldestFirst implements sort.Interface for []crawlPeerInfo // crawlPeers will crawl the network looking for new peer addresses.
// based on the LastAttempt field. func (r *PEXReactor) crawlPeers(addrs []*p2p.NetAddress) {
type oldestFirst []crawlPeerInfo
func (of oldestFirst) Len() int { return len(of) }
func (of oldestFirst) Swap(i, j int) { of[i], of[j] = of[j], of[i] }
func (of oldestFirst) Less(i, j int) bool { return of[i].LastAttempt.Before(of[j].LastAttempt) }
// getPeersToCrawl returns addresses of potential peers that we wish to validate.
// NOTE: The status information is ordered as described above.
func (r *PEXReactor) getPeersToCrawl() []crawlPeerInfo {
// TODO: be more selective
addrs := r.book.ListOfKnownAddresses()
of := make(oldestFirst, 0, len(addrs))
for _, addr := range addrs {
if len(addr.ID()) == 0 {
continue // dont use peers without id
}
of = append(of, crawlPeerInfo{
Addr: addr.Addr,
LastAttempt: addr.LastAttempt,
LastSuccess: addr.LastSuccess,
})
}
sort.Sort(of)
return of
}
// crawlPeers will crawl the network looking for new peer addresses. (once)
func (r *PEXReactor) crawlPeers() {
peerInfos := r.getPeersToCrawl()
now := time.Now() now := time.Now()
// Use addresses we know of to reach additional peers
for _, pi := range peerInfos { for _, addr := range addrs {
// Do not attempt to connect with peers we recently dialed peerInfo, ok := r.crawlPeerInfos[addr.ID]
if now.Sub(pi.LastAttempt) < defaultCrawlPeerInterval {
// Do not attempt to connect with peers we recently crawled.
if ok && now.Sub(peerInfo.LastCrawled) < minTimeBetweenCrawls {
continue continue
} }
// Otherwise, attempt to connect with the known address
err := r.Switch.DialPeerWithAddress(pi.Addr, false) // Record crawling attempt.
r.crawlPeerInfos[addr.ID] = crawlPeerInfo{
Addr: addr,
LastCrawled: now,
}
err := r.Switch.DialPeerWithAddress(addr, false)
if err != nil { if err != nil {
r.book.MarkAttempt(pi.Addr) if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok {
continue
}
r.Logger.Error("Dialing failed", "addr", addr, "err", err)
markAddrInBookBasedOnErr(addr, r.book, err)
continue continue
} }
// Ask for more addresses
peer := r.Switch.Peers().Get(pi.Addr.ID) peer := r.Switch.Peers().Get(addr.ID)
if peer != nil { if peer != nil {
r.RequestAddrs(peer) r.RequestAddrs(peer)
} }
} }
} }
func (r *PEXReactor) cleanupCrawlPeerInfos() {
for id, info := range r.crawlPeerInfos {
// If we did not crawl a peer for 24 hours, it means the peer was removed
// from the addrbook => remove
//
// 10000 addresses / maxGetSelection = 40 cycles to get all addresses in
// the ideal case,
// 40 * crawlPeerPeriod ~ 20 minutes
if time.Since(info.LastCrawled) > 24*time.Hour {
delete(r.crawlPeerInfos, id)
}
}
}
// attemptDisconnects checks if we've been with each peer long enough to disconnect // attemptDisconnects checks if we've been with each peer long enough to disconnect
func (r *PEXReactor) attemptDisconnects() { func (r *PEXReactor) attemptDisconnects() {
for _, peer := range r.Switch.Peers().List() { for _, peer := range r.Switch.Peers().List() {
if peer.Status().Duration < defaultSeedDisconnectWaitPeriod { if peer.Status().Duration < r.config.SeedDisconnectWaitPeriod {
continue continue
} }
if peer.IsPersistent() { if peer.IsPersistent() {
@ -672,6 +669,16 @@ func (r *PEXReactor) attemptDisconnects() {
} }
} }
func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) {
// TODO: detect more "bad peer" scenarios
switch err.(type) {
case p2p.ErrSwitchAuthenticationFailure:
book.MarkBad(addr)
default:
book.MarkAttempt(addr)
}
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Messages // Messages

View File

@ -204,26 +204,26 @@ func TestCheckSeeds(t *testing.T) {
defer os.RemoveAll(dir) // nolint: errcheck defer os.RemoveAll(dir) // nolint: errcheck
// 1. test creating peer with no seeds works // 1. test creating peer with no seeds works
peer := testCreateDefaultPeer(dir, 0) peerSwitch := testCreateDefaultPeer(dir, 0)
require.Nil(t, peer.Start()) require.Nil(t, peerSwitch.Start())
peer.Stop() peerSwitch.Stop()
// 2. create seed // 2. create seed
seed := testCreateSeed(dir, 1, []*p2p.NetAddress{}, []*p2p.NetAddress{}) seed := testCreateSeed(dir, 1, []*p2p.NetAddress{}, []*p2p.NetAddress{})
// 3. test create peer with online seed works // 3. test create peer with online seed works
peer = testCreatePeerWithSeed(dir, 2, seed) peerSwitch = testCreatePeerWithSeed(dir, 2, seed)
require.Nil(t, peer.Start()) require.Nil(t, peerSwitch.Start())
peer.Stop() peerSwitch.Stop()
// 4. test create peer with all seeds having unresolvable DNS fails // 4. test create peer with all seeds having unresolvable DNS fails
badPeerConfig := &PEXReactorConfig{ badPeerConfig := &PEXReactorConfig{
Seeds: []string{"ed3dfd27bfc4af18f67a49862f04cc100696e84d@bad.network.addr:26657", Seeds: []string{"ed3dfd27bfc4af18f67a49862f04cc100696e84d@bad.network.addr:26657",
"d824b13cb5d40fa1d8a614e089357c7eff31b670@anotherbad.network.addr:26657"}, "d824b13cb5d40fa1d8a614e089357c7eff31b670@anotherbad.network.addr:26657"},
} }
peer = testCreatePeerWithConfig(dir, 2, badPeerConfig) peerSwitch = testCreatePeerWithConfig(dir, 2, badPeerConfig)
require.Error(t, peer.Start()) require.Error(t, peerSwitch.Start())
peer.Stop() peerSwitch.Stop()
// 5. test create peer with one good seed address succeeds // 5. test create peer with one good seed address succeeds
badPeerConfig = &PEXReactorConfig{ badPeerConfig = &PEXReactorConfig{
@ -231,9 +231,9 @@ func TestCheckSeeds(t *testing.T) {
"d824b13cb5d40fa1d8a614e089357c7eff31b670@anotherbad.network.addr:26657", "d824b13cb5d40fa1d8a614e089357c7eff31b670@anotherbad.network.addr:26657",
seed.NetAddress().String()}, seed.NetAddress().String()},
} }
peer = testCreatePeerWithConfig(dir, 2, badPeerConfig) peerSwitch = testCreatePeerWithConfig(dir, 2, badPeerConfig)
require.Nil(t, peer.Start()) require.Nil(t, peerSwitch.Start())
peer.Stop() peerSwitch.Stop()
} }
func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
@ -285,31 +285,41 @@ func TestConnectionSpeedForPeerReceivedFromSeed(t *testing.T) {
assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 1*time.Second, 2) assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 1*time.Second, 2)
} }
func TestPEXReactorCrawlStatus(t *testing.T) { func TestPEXReactorSeedMode(t *testing.T) {
pexR, book := createReactor(&PEXReactorConfig{SeedMode: true}) // directory to store address books
dir, err := ioutil.TempDir("", "pex_reactor")
require.Nil(t, err)
defer os.RemoveAll(dir) // nolint: errcheck
pexR, book := createReactor(&PEXReactorConfig{SeedMode: true, SeedDisconnectWaitPeriod: 10 * time.Millisecond})
defer teardownReactor(book) defer teardownReactor(book)
// Seed/Crawler mode uses data from the Switch
sw := createSwitchAndAddReactors(pexR) sw := createSwitchAndAddReactors(pexR)
sw.SetAddrBook(book) sw.SetAddrBook(book)
err = sw.Start()
require.NoError(t, err)
defer sw.Stop()
// Create a peer, add it to the peer set and the addrbook. assert.Zero(t, sw.Peers().Size())
peer := p2p.CreateRandomPeer(false)
p2p.AddPeerToSwitch(pexR.Switch, peer)
addr1 := peer.SocketAddr()
pexR.book.AddAddress(addr1, addr1)
// Add a non-connected address to the book. peerSwitch := testCreateDefaultPeer(dir, 1)
_, addr2 := p2p.CreateRoutableAddr() require.NoError(t, peerSwitch.Start())
pexR.book.AddAddress(addr2, addr1) defer peerSwitch.Stop()
// Get some peerInfos to crawl // 1. Test crawlPeers dials the peer
peerInfos := pexR.getPeersToCrawl() pexR.crawlPeers([]*p2p.NetAddress{peerSwitch.NetAddress()})
assert.Equal(t, 1, sw.Peers().Size())
assert.True(t, sw.Peers().Has(peerSwitch.NodeInfo().ID()))
// Make sure it has the proper number of elements // 2. attemptDisconnects should not disconnect because of wait period
assert.Equal(t, 2, len(peerInfos)) pexR.attemptDisconnects()
assert.Equal(t, 1, sw.Peers().Size())
// TODO: test time.Sleep(100 * time.Millisecond)
// 3. attemptDisconnects should disconnect after wait period
pexR.attemptDisconnects()
assert.Equal(t, 0, sw.Peers().Size())
} }
// connect a peer to a seed, wait a bit, then stop it. // connect a peer to a seed, wait a bit, then stop it.

View File

@ -339,14 +339,11 @@ func (sw *Switch) reconnectToPeer(addr *NetAddress) {
return return
} }
if sw.IsDialingOrExistingAddress(addr) {
sw.Logger.Debug("Peer connection has been established or dialed while we waiting next try", "addr", addr)
return
}
err := sw.DialPeerWithAddress(addr, true) err := sw.DialPeerWithAddress(addr, true)
if err == nil { if err == nil {
return // success return // success
} else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok {
return
} }
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr) sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
@ -365,9 +362,12 @@ func (sw *Switch) reconnectToPeer(addr *NetAddress) {
// sleep an exponentially increasing amount // sleep an exponentially increasing amount
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i)) sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second) sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
err := sw.DialPeerWithAddress(addr, true) err := sw.DialPeerWithAddress(addr, true)
if err == nil { if err == nil {
return // success return // success
} else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok {
return
} }
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr) sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
} }
@ -435,15 +435,10 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b
sw.randomSleep(0) sw.randomSleep(0)
if sw.IsDialingOrExistingAddress(addr) {
sw.Logger.Debug("Ignore attempt to connect to an existing peer", "addr", addr)
return
}
err := sw.DialPeerWithAddress(addr, persistent) err := sw.DialPeerWithAddress(addr, persistent)
if err != nil { if err != nil {
switch err.(type) { switch err.(type) {
case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID: case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID, ErrCurrentlyDialingOrExistingAddress:
sw.Logger.Debug("Error dialing peer", "err", err) sw.Logger.Debug("Error dialing peer", "err", err)
default: default:
sw.Logger.Error("Error dialing peer", "err", err) sw.Logger.Error("Error dialing peer", "err", err)
@ -454,11 +449,20 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b
return nil return nil
} }
// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully. // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects
// If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails. // and authenticates successfully.
// If `persistent == true`, the switch will always try to reconnect to this
// peer if the connection ever fails.
// If we're currently dialing this address or it belongs to an existing peer,
// ErrCurrentlyDialingOrExistingAddress is returned.
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error { func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
if sw.IsDialingOrExistingAddress(addr) {
return ErrCurrentlyDialingOrExistingAddress{addr.String()}
}
sw.dialing.Set(string(addr.ID), addr) sw.dialing.Set(string(addr.ID), addr)
defer sw.dialing.Delete(string(addr.ID)) defer sw.dialing.Delete(string(addr.ID))
return sw.addOutboundPeerWithConfig(addr, sw.config, persistent) return sw.addOutboundPeerWithConfig(addr, sw.config, persistent)
} }