Merge pull request #1233 from tendermint/feature/xla-dial-seed-without-timeout

p2p: if we have no peers we should dial seeds right away
This commit is contained in:
Ethan Buchman 2018-03-02 09:07:01 -05:00 committed by GitHub
commit 8bceb5ce36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 124 additions and 66 deletions

View File

@ -260,9 +260,17 @@ func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
// Ensures that sufficient peers are connected. (continuous) // Ensures that sufficient peers are connected. (continuous)
func (r *PEXReactor) ensurePeersRoutine() { func (r *PEXReactor) ensurePeersRoutine() {
// Randomize when routine starts var (
ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6 seed = rand.New(rand.NewSource(time.Now().UnixNano()))
time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond) jitter = seed.Int63n(r.ensurePeersPeriod.Nanoseconds())
)
// Randomize first round of communication to avoid thundering herd.
// If no potential peers are present directly start connecting so we guarantee
// swift setup with the help of configured seeds.
if r.hasPotentialPeers() {
time.Sleep(time.Duration(jitter))
}
// fire once immediately. // fire once immediately.
// ensures we dial the seeds right away if the book is empty // ensures we dial the seeds right away if the book is empty
@ -287,9 +295,18 @@ func (r *PEXReactor) ensurePeersRoutine() {
// the node operator. It should not be used to compute what addresses are // the node operator. It should not be used to compute what addresses are
// already connected or not. // already connected or not.
func (r *PEXReactor) ensurePeers() { func (r *PEXReactor) ensurePeers() {
numOutPeers, numInPeers, numDialing := r.Switch.NumPeers() var (
numToDial := defaultMinNumOutboundPeers - (numOutPeers + numDialing) out, in, dial = r.Switch.NumPeers()
r.Logger.Info("Ensure peers", "numOutPeers", numOutPeers, "numDialing", numDialing, "numToDial", numToDial) numToDial = defaultMinNumOutboundPeers - (out + dial)
)
r.Logger.Info(
"Ensure peers",
"numOutPeers", out,
"numInPeers", in,
"numDialing", dial,
"numToDial", numToDial,
)
if numToDial <= 0 { if numToDial <= 0 {
return return
} }
@ -297,11 +314,12 @@ func (r *PEXReactor) ensurePeers() {
// bias to prefer more vetted peers when we have fewer connections. // bias to prefer more vetted peers when we have fewer connections.
// not perfect, but somewhate ensures that we prioritize connecting to more-vetted // not perfect, but somewhate ensures that we prioritize connecting to more-vetted
// NOTE: range here is [10, 90]. Too high ? // NOTE: range here is [10, 90]. Too high ?
newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 newBias := cmn.MinInt(out, 8)*10 + 10
toDial := make(map[p2p.ID]*p2p.NetAddress) toDial := make(map[p2p.ID]*p2p.NetAddress)
// Try maxAttempts times to pick numToDial addresses to dial // Try maxAttempts times to pick numToDial addresses to dial
maxAttempts := numToDial * 3 maxAttempts := numToDial * 3
for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ { for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
try := r.book.PickAddress(newBias) try := r.book.PickAddress(newBias)
if try == nil { if try == nil {
@ -348,7 +366,7 @@ func (r *PEXReactor) ensurePeers() {
} }
// If we are not connected to nor dialing anybody, fallback to dialing a seed. // If we are not connected to nor dialing anybody, fallback to dialing a seed.
if numOutPeers+numInPeers+numDialing+len(toDial) == 0 { if out+in+dial+len(toDial) == 0 {
r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds") r.Logger.Info("No addresses to dial nor connected peers. Falling back to seeds")
r.dialSeeds() r.dialSeeds()
} }
@ -414,6 +432,14 @@ func (r *PEXReactor) crawlPeersRoutine() {
} }
} }
// hasPotentialPeers indicates if there is a potential peer to connect to, by
// consulting the Switch as well as the AddrBook.
func (r *PEXReactor) hasPotentialPeers() bool {
out, in, dial := r.Switch.NumPeers()
return out+in+dial > 0 && len(r.book.ListOfKnownAddresses()) > 0
}
// crawlPeerInfo handles temporary data needed for the // crawlPeerInfo handles temporary data needed for the
// network crawling performed during seed/crawler mode. // network crawling performed during seed/crawler mode.
type crawlPeerInfo struct { type crawlPeerInfo struct {

View File

@ -119,39 +119,6 @@ func TestPEXReactorRunning(t *testing.T) {
} }
} }
func assertPeersWithTimeout(t *testing.T, switches []*p2p.Switch, checkPeriod, timeout time.Duration, nPeers int) {
ticker := time.NewTicker(checkPeriod)
remaining := timeout
for {
select {
case <-ticker.C:
// check peers are connected
allGood := true
for _, s := range switches {
outbound, inbound, _ := s.NumPeers()
if outbound+inbound < nPeers {
allGood = false
}
}
remaining -= checkPeriod
if remaining < 0 {
remaining = 0
}
if allGood {
return
}
case <-time.After(remaining):
numPeersStr := ""
for i, s := range switches {
outbound, inbound, _ := s.NumPeers()
numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
}
t.Errorf("expected all switches to be connected to at least one peer (switches: %s)", numPeersStr)
return
}
}
}
func TestPEXReactorReceive(t *testing.T) { func TestPEXReactorReceive(t *testing.T) {
assert, require := assert.New(t), require.New(t) assert, require := assert.New(t), require.New(t)
@ -259,6 +226,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
} }
func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
dir, err := ioutil.TempDir("", "pex_reactor") dir, err := ioutil.TempDir("", "pex_reactor")
require.Nil(t, err) require.Nil(t, err)
defer os.RemoveAll(dir) // nolint: errcheck defer os.RemoveAll(dir) // nolint: errcheck
@ -267,36 +235,56 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
book.SetLogger(log.TestingLogger()) book.SetLogger(log.TestingLogger())
// 1. create seed // 1. create seed
seed := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { seed := p2p.MakeSwitch(
sw.SetLogger(log.TestingLogger()) config,
0,
"127.0.0.1",
"123.123.123",
func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(book, &PEXReactorConfig{}) r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger()) r.SetLogger(log.TestingLogger())
r.SetEnsurePeersPeriod(250 * time.Millisecond) sw.AddReactor("pex", r)
sw.AddReactor("pex", r) return sw
return sw },
}) )
seed.AddListener(p2p.NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger())) seed.AddListener(
err = seed.Start() p2p.NewDefaultListener(
require.Nil(t, err) "tcp",
seed.NodeInfo().ListenAddr,
true,
log.TestingLogger(),
),
)
require.Nil(t, seed.Start())
defer seed.Stop() defer seed.Stop()
// 2. create usual peer // 2. create usual peer with only seed configured.
sw := p2p.MakeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { peer := p2p.MakeSwitch(
sw.SetLogger(log.TestingLogger()) config,
1,
"127.0.0.1",
"123.123.123",
func(i int, sw *p2p.Switch) *p2p.Switch {
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(book, &PEXReactorConfig{Seeds: []string{seed.NodeInfo().NetAddress().String()}}) r := NewPEXReactor(
r.SetLogger(log.TestingLogger()) book,
r.SetEnsurePeersPeriod(250 * time.Millisecond) &PEXReactorConfig{
sw.AddReactor("pex", r) Seeds: []string{seed.NodeInfo().NetAddress().String()},
return sw },
}) )
err = sw.Start() r.SetLogger(log.TestingLogger())
require.Nil(t, err) sw.AddReactor("pex", r)
defer sw.Stop() return sw
},
)
require.Nil(t, peer.Start())
defer peer.Stop()
// 3. check that peer at least connects to seed // 3. check that the peer connects to seed immediately
assertPeersWithTimeout(t, []*p2p.Switch{sw}, 10*time.Millisecond, 10*time.Second, 1) assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 1*time.Second, 1)
} }
func TestPEXReactorCrawlStatus(t *testing.T) { func TestPEXReactorCrawlStatus(t *testing.T) {
@ -368,3 +356,47 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false }
func (mp mockPeer) TrySend(byte, interface{}) bool { return false } func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
func (mp mockPeer) Set(string, interface{}) {} func (mp mockPeer) Set(string, interface{}) {}
func (mp mockPeer) Get(string) interface{} { return nil } func (mp mockPeer) Get(string) interface{} { return nil }
func assertPeersWithTimeout(
t *testing.T,
switches []*p2p.Switch,
checkPeriod, timeout time.Duration,
nPeers int,
) {
var (
ticker = time.NewTicker(checkPeriod)
remaining = timeout
)
for {
select {
case <-ticker.C:
// check peers are connected
allGood := true
for _, s := range switches {
outbound, inbound, _ := s.NumPeers()
if outbound+inbound < nPeers {
allGood = false
}
}
remaining -= checkPeriod
if remaining < 0 {
remaining = 0
}
if allGood {
return
}
case <-time.After(remaining):
numPeersStr := ""
for i, s := range switches {
outbound, inbound, _ := s.NumPeers()
numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
}
t.Errorf(
"expected all switches to be connected to at least one peer (switches: %s)",
numPeersStr,
)
return
}
}
}