mirror of
https://github.com/fluencelabs/tendermint
synced 2025-05-03 02:22:14 +00:00
Merge pull request #1292 from tendermint/1125-exp-backoff-for-ensure-peers
exponential backoff for addrs in the address book
This commit is contained in:
commit
f3000d0c84
@ -139,6 +139,10 @@ func (a *addrBook) Wait() {
|
|||||||
a.wg.Wait()
|
a.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *addrBook) FilePath() string {
|
||||||
|
return a.filePath
|
||||||
|
}
|
||||||
|
|
||||||
//-------------------------------------------------------
|
//-------------------------------------------------------
|
||||||
|
|
||||||
// AddOurAddress one of our addresses.
|
// AddOurAddress one of our addresses.
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
@ -26,17 +27,24 @@ func createTempFileName(prefix string) string {
|
|||||||
return fname
|
return fname
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func deleteTempFile(fname string) {
|
||||||
|
err := os.Remove(fname)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestAddrBookPickAddress(t *testing.T) {
|
func TestAddrBookPickAddress(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
fname := createTempFileName("addrbook_test")
|
fname := createTempFileName("addrbook_test")
|
||||||
|
defer deleteTempFile(fname)
|
||||||
|
|
||||||
// 0 addresses
|
// 0 addresses
|
||||||
book := NewAddrBook(fname, true)
|
book := NewAddrBook(fname, true)
|
||||||
book.SetLogger(log.TestingLogger())
|
book.SetLogger(log.TestingLogger())
|
||||||
assert.Zero(book.Size())
|
assert.Zero(t, book.Size())
|
||||||
|
|
||||||
addr := book.PickAddress(50)
|
addr := book.PickAddress(50)
|
||||||
assert.Nil(addr, "expected no address")
|
assert.Nil(t, addr, "expected no address")
|
||||||
|
|
||||||
randAddrs := randNetAddressPairs(t, 1)
|
randAddrs := randNetAddressPairs(t, 1)
|
||||||
addrSrc := randAddrs[0]
|
addrSrc := randAddrs[0]
|
||||||
@ -44,26 +52,27 @@ func TestAddrBookPickAddress(t *testing.T) {
|
|||||||
|
|
||||||
// pick an address when we only have new address
|
// pick an address when we only have new address
|
||||||
addr = book.PickAddress(0)
|
addr = book.PickAddress(0)
|
||||||
assert.NotNil(addr, "expected an address")
|
assert.NotNil(t, addr, "expected an address")
|
||||||
addr = book.PickAddress(50)
|
addr = book.PickAddress(50)
|
||||||
assert.NotNil(addr, "expected an address")
|
assert.NotNil(t, addr, "expected an address")
|
||||||
addr = book.PickAddress(100)
|
addr = book.PickAddress(100)
|
||||||
assert.NotNil(addr, "expected an address")
|
assert.NotNil(t, addr, "expected an address")
|
||||||
|
|
||||||
// pick an address when we only have old address
|
// pick an address when we only have old address
|
||||||
book.MarkGood(addrSrc.addr)
|
book.MarkGood(addrSrc.addr)
|
||||||
addr = book.PickAddress(0)
|
addr = book.PickAddress(0)
|
||||||
assert.NotNil(addr, "expected an address")
|
assert.NotNil(t, addr, "expected an address")
|
||||||
addr = book.PickAddress(50)
|
addr = book.PickAddress(50)
|
||||||
assert.NotNil(addr, "expected an address")
|
assert.NotNil(t, addr, "expected an address")
|
||||||
|
|
||||||
// in this case, nNew==0 but we biased 100% to new, so we return nil
|
// in this case, nNew==0 but we biased 100% to new, so we return nil
|
||||||
addr = book.PickAddress(100)
|
addr = book.PickAddress(100)
|
||||||
assert.Nil(addr, "did not expected an address")
|
assert.Nil(t, addr, "did not expected an address")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAddrBookSaveLoad(t *testing.T) {
|
func TestAddrBookSaveLoad(t *testing.T) {
|
||||||
fname := createTempFileName("addrbook_test")
|
fname := createTempFileName("addrbook_test")
|
||||||
|
defer deleteTempFile(fname)
|
||||||
|
|
||||||
// 0 addresses
|
// 0 addresses
|
||||||
book := NewAddrBook(fname, true)
|
book := NewAddrBook(fname, true)
|
||||||
@ -95,6 +104,7 @@ func TestAddrBookSaveLoad(t *testing.T) {
|
|||||||
|
|
||||||
func TestAddrBookLookup(t *testing.T) {
|
func TestAddrBookLookup(t *testing.T) {
|
||||||
fname := createTempFileName("addrbook_test")
|
fname := createTempFileName("addrbook_test")
|
||||||
|
defer deleteTempFile(fname)
|
||||||
|
|
||||||
randAddrs := randNetAddressPairs(t, 100)
|
randAddrs := randNetAddressPairs(t, 100)
|
||||||
|
|
||||||
@ -115,8 +125,8 @@ func TestAddrBookLookup(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestAddrBookPromoteToOld(t *testing.T) {
|
func TestAddrBookPromoteToOld(t *testing.T) {
|
||||||
assert := assert.New(t)
|
|
||||||
fname := createTempFileName("addrbook_test")
|
fname := createTempFileName("addrbook_test")
|
||||||
|
defer deleteTempFile(fname)
|
||||||
|
|
||||||
randAddrs := randNetAddressPairs(t, 100)
|
randAddrs := randNetAddressPairs(t, 100)
|
||||||
|
|
||||||
@ -147,11 +157,12 @@ func TestAddrBookPromoteToOld(t *testing.T) {
|
|||||||
t.Errorf("selection could not be bigger than the book")
|
t.Errorf("selection could not be bigger than the book")
|
||||||
}
|
}
|
||||||
|
|
||||||
assert.Equal(book.Size(), 100, "expecting book size to be 100")
|
assert.Equal(t, book.Size(), 100, "expecting book size to be 100")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAddrBookHandlesDuplicates(t *testing.T) {
|
func TestAddrBookHandlesDuplicates(t *testing.T) {
|
||||||
fname := createTempFileName("addrbook_test")
|
fname := createTempFileName("addrbook_test")
|
||||||
|
defer deleteTempFile(fname)
|
||||||
|
|
||||||
book := NewAddrBook(fname, true)
|
book := NewAddrBook(fname, true)
|
||||||
book.SetLogger(log.TestingLogger())
|
book.SetLogger(log.TestingLogger())
|
||||||
@ -202,6 +213,8 @@ func randIPv4Address(t *testing.T) *p2p.NetAddress {
|
|||||||
|
|
||||||
func TestAddrBookRemoveAddress(t *testing.T) {
|
func TestAddrBookRemoveAddress(t *testing.T) {
|
||||||
fname := createTempFileName("addrbook_test")
|
fname := createTempFileName("addrbook_test")
|
||||||
|
defer deleteTempFile(fname)
|
||||||
|
|
||||||
book := NewAddrBook(fname, true)
|
book := NewAddrBook(fname, true)
|
||||||
book.SetLogger(log.TestingLogger())
|
book.SetLogger(log.TestingLogger())
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"math/rand"
|
"math/rand"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
@ -38,6 +39,8 @@ const (
|
|||||||
defaultSeedDisconnectWaitPeriod = 2 * time.Minute // disconnect after this
|
defaultSeedDisconnectWaitPeriod = 2 * time.Minute // disconnect after this
|
||||||
defaultCrawlPeerInterval = 2 * time.Minute // dont redial for this. TODO: back-off
|
defaultCrawlPeerInterval = 2 * time.Minute // dont redial for this. TODO: back-off
|
||||||
defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this
|
defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this
|
||||||
|
|
||||||
|
maxAttemptsToDial = 16 // ~ 35h in total (last attempt - 18h)
|
||||||
)
|
)
|
||||||
|
|
||||||
// PEXReactor handles PEX (peer exchange) and ensures that an
|
// PEXReactor handles PEX (peer exchange) and ensures that an
|
||||||
@ -59,6 +62,8 @@ type PEXReactor struct {
|
|||||||
// maps to prevent abuse
|
// maps to prevent abuse
|
||||||
requestsSent *cmn.CMap // ID->struct{}: unanswered send requests
|
requestsSent *cmn.CMap // ID->struct{}: unanswered send requests
|
||||||
lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us
|
lastReceivedRequests *cmn.CMap // ID->time.Time: last time peer requested from us
|
||||||
|
|
||||||
|
attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// PEXReactorConfig holds reactor specific configuration data.
|
// PEXReactorConfig holds reactor specific configuration data.
|
||||||
@ -71,6 +76,11 @@ type PEXReactorConfig struct {
|
|||||||
Seeds []string
|
Seeds []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type _attemptsToDial struct {
|
||||||
|
number int
|
||||||
|
lastDialed time.Time
|
||||||
|
}
|
||||||
|
|
||||||
// NewPEXReactor creates new PEX reactor.
|
// NewPEXReactor creates new PEX reactor.
|
||||||
func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
|
func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
|
||||||
r := &PEXReactor{
|
r := &PEXReactor{
|
||||||
@ -339,19 +349,8 @@ func (r *PEXReactor) ensurePeers() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Dial picked addresses
|
// Dial picked addresses
|
||||||
for _, item := range toDial {
|
for _, addr := range toDial {
|
||||||
go func(picked *p2p.NetAddress) {
|
go r.dialPeer(addr)
|
||||||
err := r.Switch.DialPeerWithAddress(picked, false)
|
|
||||||
if err != nil {
|
|
||||||
r.Logger.Error("Dialing failed", "err", err)
|
|
||||||
// TODO: detect more "bad peer" scenarios
|
|
||||||
if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok {
|
|
||||||
r.book.MarkBad(picked)
|
|
||||||
} else {
|
|
||||||
r.book.MarkAttempt(picked)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(item)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we need more addresses, pick a random peer and ask for more.
|
// If we need more addresses, pick a random peer and ask for more.
|
||||||
@ -372,6 +371,48 @@ func (r *PEXReactor) ensurePeers() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) {
|
||||||
|
var attempts int
|
||||||
|
var lastDialed time.Time
|
||||||
|
if lAttempts, attempted := r.attemptsToDial.Load(addr.DialString()); attempted {
|
||||||
|
attempts = lAttempts.(_attemptsToDial).number
|
||||||
|
lastDialed = lAttempts.(_attemptsToDial).lastDialed
|
||||||
|
}
|
||||||
|
|
||||||
|
if attempts > maxAttemptsToDial {
|
||||||
|
r.Logger.Error("Reached max attempts to dial", "addr", addr, "attempts", attempts)
|
||||||
|
r.book.MarkBad(addr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// exponential backoff if it's not our first attempt to dial given address
|
||||||
|
if attempts > 0 {
|
||||||
|
jitterSeconds := time.Duration(rand.Float64() * float64(time.Second)) // 1s == (1e9 ns)
|
||||||
|
backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second)
|
||||||
|
sinceLastDialed := time.Since(lastDialed)
|
||||||
|
if sinceLastDialed < backoffDuration {
|
||||||
|
r.Logger.Debug("Too early to dial", "addr", addr, "backoff_duration", backoffDuration, "last_dialed", lastDialed, "time_since", sinceLastDialed)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.Switch.DialPeerWithAddress(addr, false)
|
||||||
|
if err != nil {
|
||||||
|
r.Logger.Error("Dialing failed", "addr", addr, "err", err, "attempts", attempts)
|
||||||
|
// TODO: detect more "bad peer" scenarios
|
||||||
|
if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok {
|
||||||
|
r.book.MarkBad(addr)
|
||||||
|
} else {
|
||||||
|
r.book.MarkAttempt(addr)
|
||||||
|
}
|
||||||
|
// record attempt
|
||||||
|
r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()})
|
||||||
|
} else {
|
||||||
|
// cleanup any history
|
||||||
|
r.attemptsToDial.Delete(addr.DialString())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// check seed addresses are well formed
|
// check seed addresses are well formed
|
||||||
func (r *PEXReactor) checkSeeds() error {
|
func (r *PEXReactor) checkSeeds() error {
|
||||||
lSeeds := len(r.config.Seeds)
|
lSeeds := len(r.config.Seeds)
|
||||||
@ -409,6 +450,17 @@ func (r *PEXReactor) dialSeeds() {
|
|||||||
r.Switch.Logger.Error("Couldn't connect to any seeds")
|
r.Switch.Logger.Error("Couldn't connect to any seeds")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AttemptsToDial returns the number of attempts to dial specific address. It
|
||||||
|
// returns 0 if never attempted or successfully connected.
|
||||||
|
func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int {
|
||||||
|
lAttempts, attempted := r.attemptsToDial.Load(addr.DialString())
|
||||||
|
if attempted {
|
||||||
|
return lAttempts.(_attemptsToDial).number
|
||||||
|
} else {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//----------------------------------------------------------
|
//----------------------------------------------------------
|
||||||
|
|
||||||
// Explores the network searching for more peers. (continuous)
|
// Explores the network searching for more peers. (continuous)
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -12,12 +13,11 @@ import (
|
|||||||
|
|
||||||
crypto "github.com/tendermint/go-crypto"
|
crypto "github.com/tendermint/go-crypto"
|
||||||
wire "github.com/tendermint/go-wire"
|
wire "github.com/tendermint/go-wire"
|
||||||
cmn "github.com/tendermint/tmlibs/common"
|
|
||||||
"github.com/tendermint/tmlibs/log"
|
|
||||||
|
|
||||||
cfg "github.com/tendermint/tendermint/config"
|
cfg "github.com/tendermint/tendermint/config"
|
||||||
"github.com/tendermint/tendermint/p2p"
|
"github.com/tendermint/tendermint/p2p"
|
||||||
"github.com/tendermint/tendermint/p2p/conn"
|
"github.com/tendermint/tendermint/p2p/conn"
|
||||||
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
|
"github.com/tendermint/tmlibs/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -30,49 +30,33 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPEXReactorBasic(t *testing.T) {
|
func TestPEXReactorBasic(t *testing.T) {
|
||||||
assert, require := assert.New(t), require.New(t)
|
r, book := createReactor(&PEXReactorConfig{})
|
||||||
|
defer teardownReactor(book)
|
||||||
|
|
||||||
dir, err := ioutil.TempDir("", "pex_reactor")
|
assert.NotNil(t, r)
|
||||||
require.Nil(err)
|
assert.NotEmpty(t, r.GetChannels())
|
||||||
defer os.RemoveAll(dir) // nolint: errcheck
|
|
||||||
book := NewAddrBook(dir+"addrbook.json", true)
|
|
||||||
book.SetLogger(log.TestingLogger())
|
|
||||||
|
|
||||||
r := NewPEXReactor(book, &PEXReactorConfig{})
|
|
||||||
r.SetLogger(log.TestingLogger())
|
|
||||||
|
|
||||||
assert.NotNil(r)
|
|
||||||
assert.NotEmpty(r.GetChannels())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPEXReactorAddRemovePeer(t *testing.T) {
|
func TestPEXReactorAddRemovePeer(t *testing.T) {
|
||||||
assert, require := assert.New(t), require.New(t)
|
r, book := createReactor(&PEXReactorConfig{})
|
||||||
|
defer teardownReactor(book)
|
||||||
dir, err := ioutil.TempDir("", "pex_reactor")
|
|
||||||
require.Nil(err)
|
|
||||||
defer os.RemoveAll(dir) // nolint: errcheck
|
|
||||||
book := NewAddrBook(dir+"addrbook.json", true)
|
|
||||||
book.SetLogger(log.TestingLogger())
|
|
||||||
|
|
||||||
r := NewPEXReactor(book, &PEXReactorConfig{})
|
|
||||||
r.SetLogger(log.TestingLogger())
|
|
||||||
|
|
||||||
size := book.Size()
|
size := book.Size()
|
||||||
peer := p2p.CreateRandomPeer(false)
|
peer := p2p.CreateRandomPeer(false)
|
||||||
|
|
||||||
r.AddPeer(peer)
|
r.AddPeer(peer)
|
||||||
assert.Equal(size+1, book.Size())
|
assert.Equal(t, size+1, book.Size())
|
||||||
|
|
||||||
r.RemovePeer(peer, "peer not available")
|
r.RemovePeer(peer, "peer not available")
|
||||||
assert.Equal(size+1, book.Size())
|
assert.Equal(t, size+1, book.Size())
|
||||||
|
|
||||||
outboundPeer := p2p.CreateRandomPeer(true)
|
outboundPeer := p2p.CreateRandomPeer(true)
|
||||||
|
|
||||||
r.AddPeer(outboundPeer)
|
r.AddPeer(outboundPeer)
|
||||||
assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book")
|
assert.Equal(t, size+1, book.Size(), "outbound peers should not be added to the address book")
|
||||||
|
|
||||||
r.RemovePeer(outboundPeer, "peer not available")
|
r.RemovePeer(outboundPeer, "peer not available")
|
||||||
assert.Equal(size+1, book.Size())
|
assert.Equal(t, size+1, book.Size())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPEXReactorRunning(t *testing.T) {
|
func TestPEXReactorRunning(t *testing.T) {
|
||||||
@ -82,7 +66,7 @@ func TestPEXReactorRunning(t *testing.T) {
|
|||||||
dir, err := ioutil.TempDir("", "pex_reactor")
|
dir, err := ioutil.TempDir("", "pex_reactor")
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
defer os.RemoveAll(dir) // nolint: errcheck
|
defer os.RemoveAll(dir) // nolint: errcheck
|
||||||
book := NewAddrBook(dir+"addrbook.json", false)
|
book := NewAddrBook(filepath.Join(dir, "addrbook.json"), false)
|
||||||
book.SetLogger(log.TestingLogger())
|
book.SetLogger(log.TestingLogger())
|
||||||
|
|
||||||
// create switches
|
// create switches
|
||||||
@ -120,16 +104,8 @@ func TestPEXReactorRunning(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPEXReactorReceive(t *testing.T) {
|
func TestPEXReactorReceive(t *testing.T) {
|
||||||
assert, require := assert.New(t), require.New(t)
|
r, book := createReactor(&PEXReactorConfig{})
|
||||||
|
defer teardownReactor(book)
|
||||||
dir, err := ioutil.TempDir("", "pex_reactor")
|
|
||||||
require.Nil(err)
|
|
||||||
defer os.RemoveAll(dir) // nolint: errcheck
|
|
||||||
book := NewAddrBook(dir+"addrbook.json", false)
|
|
||||||
book.SetLogger(log.TestingLogger())
|
|
||||||
|
|
||||||
r := NewPEXReactor(book, &PEXReactorConfig{})
|
|
||||||
r.SetLogger(log.TestingLogger())
|
|
||||||
|
|
||||||
peer := p2p.CreateRandomPeer(false)
|
peer := p2p.CreateRandomPeer(false)
|
||||||
|
|
||||||
@ -140,98 +116,77 @@ func TestPEXReactorReceive(t *testing.T) {
|
|||||||
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
|
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
|
||||||
msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
|
msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
|
||||||
r.Receive(PexChannel, peer, msg)
|
r.Receive(PexChannel, peer, msg)
|
||||||
assert.Equal(size+1, book.Size())
|
assert.Equal(t, size+1, book.Size())
|
||||||
|
|
||||||
msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
|
msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
|
||||||
r.Receive(PexChannel, peer, msg)
|
r.Receive(PexChannel, peer, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
|
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
|
||||||
assert, require := assert.New(t), require.New(t)
|
r, book := createReactor(&PEXReactorConfig{})
|
||||||
|
defer teardownReactor(book)
|
||||||
|
|
||||||
dir, err := ioutil.TempDir("", "pex_reactor")
|
sw := createSwitchAndAddReactors(r)
|
||||||
require.Nil(err)
|
|
||||||
defer os.RemoveAll(dir) // nolint: errcheck
|
|
||||||
book := NewAddrBook(dir+"addrbook.json", true)
|
|
||||||
book.SetLogger(log.TestingLogger())
|
|
||||||
|
|
||||||
r := NewPEXReactor(book, &PEXReactorConfig{})
|
|
||||||
sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
|
|
||||||
sw.SetLogger(log.TestingLogger())
|
|
||||||
sw.AddReactor("PEX", r)
|
|
||||||
r.SetSwitch(sw)
|
|
||||||
r.SetLogger(log.TestingLogger())
|
|
||||||
|
|
||||||
peer := newMockPeer()
|
peer := newMockPeer()
|
||||||
p2p.AddPeerToSwitch(sw, peer)
|
p2p.AddPeerToSwitch(sw, peer)
|
||||||
assert.True(sw.Peers().Has(peer.ID()))
|
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||||
|
|
||||||
id := string(peer.ID())
|
id := string(peer.ID())
|
||||||
msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
|
msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
|
||||||
|
|
||||||
// first time creates the entry
|
// first time creates the entry
|
||||||
r.Receive(PexChannel, peer, msg)
|
r.Receive(PexChannel, peer, msg)
|
||||||
assert.True(r.lastReceivedRequests.Has(id))
|
assert.True(t, r.lastReceivedRequests.Has(id))
|
||||||
assert.True(sw.Peers().Has(peer.ID()))
|
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||||
|
|
||||||
// next time sets the last time value
|
// next time sets the last time value
|
||||||
r.Receive(PexChannel, peer, msg)
|
r.Receive(PexChannel, peer, msg)
|
||||||
assert.True(r.lastReceivedRequests.Has(id))
|
assert.True(t, r.lastReceivedRequests.Has(id))
|
||||||
assert.True(sw.Peers().Has(peer.ID()))
|
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||||
|
|
||||||
// third time is too many too soon - peer is removed
|
// third time is too many too soon - peer is removed
|
||||||
r.Receive(PexChannel, peer, msg)
|
r.Receive(PexChannel, peer, msg)
|
||||||
assert.False(r.lastReceivedRequests.Has(id))
|
assert.False(t, r.lastReceivedRequests.Has(id))
|
||||||
assert.False(sw.Peers().Has(peer.ID()))
|
assert.False(t, sw.Peers().Has(peer.ID()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
|
func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
|
||||||
assert, require := assert.New(t), require.New(t)
|
r, book := createReactor(&PEXReactorConfig{})
|
||||||
|
defer teardownReactor(book)
|
||||||
|
|
||||||
dir, err := ioutil.TempDir("", "pex_reactor")
|
sw := createSwitchAndAddReactors(r)
|
||||||
require.Nil(err)
|
|
||||||
defer os.RemoveAll(dir) // nolint: errcheck
|
|
||||||
book := NewAddrBook(dir+"addrbook.json", true)
|
|
||||||
book.SetLogger(log.TestingLogger())
|
|
||||||
|
|
||||||
r := NewPEXReactor(book, &PEXReactorConfig{})
|
|
||||||
sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
|
|
||||||
sw.SetLogger(log.TestingLogger())
|
|
||||||
sw.AddReactor("PEX", r)
|
|
||||||
r.SetSwitch(sw)
|
|
||||||
r.SetLogger(log.TestingLogger())
|
|
||||||
|
|
||||||
peer := newMockPeer()
|
peer := newMockPeer()
|
||||||
p2p.AddPeerToSwitch(sw, peer)
|
p2p.AddPeerToSwitch(sw, peer)
|
||||||
assert.True(sw.Peers().Has(peer.ID()))
|
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||||
|
|
||||||
id := string(peer.ID())
|
id := string(peer.ID())
|
||||||
|
|
||||||
// request addrs from the peer
|
// request addrs from the peer
|
||||||
r.RequestAddrs(peer)
|
r.RequestAddrs(peer)
|
||||||
assert.True(r.requestsSent.Has(id))
|
assert.True(t, r.requestsSent.Has(id))
|
||||||
assert.True(sw.Peers().Has(peer.ID()))
|
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||||
|
|
||||||
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
|
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
|
||||||
msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
|
msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
|
||||||
|
|
||||||
// receive some addrs. should clear the request
|
// receive some addrs. should clear the request
|
||||||
r.Receive(PexChannel, peer, msg)
|
r.Receive(PexChannel, peer, msg)
|
||||||
assert.False(r.requestsSent.Has(id))
|
assert.False(t, r.requestsSent.Has(id))
|
||||||
assert.True(sw.Peers().Has(peer.ID()))
|
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||||
|
|
||||||
// receiving more addrs causes a disconnect
|
// receiving more addrs causes a disconnect
|
||||||
r.Receive(PexChannel, peer, msg)
|
r.Receive(PexChannel, peer, msg)
|
||||||
assert.False(sw.Peers().Has(peer.ID()))
|
assert.False(t, sw.Peers().Has(peer.ID()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
|
func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
|
||||||
|
|
||||||
dir, err := ioutil.TempDir("", "pex_reactor")
|
dir, err := ioutil.TempDir("", "pex_reactor")
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
defer os.RemoveAll(dir) // nolint: errcheck
|
defer os.RemoveAll(dir) // nolint: errcheck
|
||||||
|
|
||||||
book := NewAddrBook(dir+"addrbook.json", false)
|
book := NewAddrBook(filepath.Join(dir, "addrbook.json"), false)
|
||||||
book.SetLogger(log.TestingLogger())
|
book.SetLogger(log.TestingLogger())
|
||||||
|
|
||||||
// 1. create seed
|
// 1. create seed
|
||||||
@ -288,22 +243,11 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPEXReactorCrawlStatus(t *testing.T) {
|
func TestPEXReactorCrawlStatus(t *testing.T) {
|
||||||
assert, require := assert.New(t), require.New(t)
|
pexR, book := createReactor(&PEXReactorConfig{SeedMode: true})
|
||||||
|
defer teardownReactor(book)
|
||||||
|
|
||||||
dir, err := ioutil.TempDir("", "pex_reactor")
|
|
||||||
require.Nil(err)
|
|
||||||
defer os.RemoveAll(dir) // nolint: errcheck
|
|
||||||
book := NewAddrBook(dir+"addrbook.json", false)
|
|
||||||
book.SetLogger(log.TestingLogger())
|
|
||||||
|
|
||||||
pexR := NewPEXReactor(book, &PEXReactorConfig{SeedMode: true})
|
|
||||||
// Seed/Crawler mode uses data from the Switch
|
// Seed/Crawler mode uses data from the Switch
|
||||||
p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
|
_ = createSwitchAndAddReactors(pexR)
|
||||||
pexR.SetLogger(log.TestingLogger())
|
|
||||||
sw.SetLogger(log.TestingLogger().With("switch", i))
|
|
||||||
sw.AddReactor("pex", pexR)
|
|
||||||
return sw
|
|
||||||
})
|
|
||||||
|
|
||||||
// Create a peer, add it to the peer set and the addrbook.
|
// Create a peer, add it to the peer set and the addrbook.
|
||||||
peer := p2p.CreateRandomPeer(false)
|
peer := p2p.CreateRandomPeer(false)
|
||||||
@ -319,11 +263,34 @@ func TestPEXReactorCrawlStatus(t *testing.T) {
|
|||||||
peerInfos := pexR.getPeersToCrawl()
|
peerInfos := pexR.getPeersToCrawl()
|
||||||
|
|
||||||
// Make sure it has the proper number of elements
|
// Make sure it has the proper number of elements
|
||||||
assert.Equal(2, len(peerInfos))
|
assert.Equal(t, 2, len(peerInfos))
|
||||||
|
|
||||||
// TODO: test
|
// TODO: test
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPEXReactorDialPeer(t *testing.T) {
|
||||||
|
pexR, book := createReactor(&PEXReactorConfig{})
|
||||||
|
defer teardownReactor(book)
|
||||||
|
|
||||||
|
_ = createSwitchAndAddReactors(pexR)
|
||||||
|
|
||||||
|
peer := newMockPeer()
|
||||||
|
addr := peer.NodeInfo().NetAddress()
|
||||||
|
|
||||||
|
assert.Equal(t, 0, pexR.AttemptsToDial(addr))
|
||||||
|
|
||||||
|
// 1st unsuccessful attempt
|
||||||
|
pexR.dialPeer(addr)
|
||||||
|
|
||||||
|
assert.Equal(t, 1, pexR.AttemptsToDial(addr))
|
||||||
|
|
||||||
|
// 2nd unsuccessful attempt
|
||||||
|
pexR.dialPeer(addr)
|
||||||
|
|
||||||
|
// must be skipped because it is too early
|
||||||
|
assert.Equal(t, 1, pexR.AttemptsToDial(addr))
|
||||||
|
}
|
||||||
|
|
||||||
type mockPeer struct {
|
type mockPeer struct {
|
||||||
*cmn.BaseService
|
*cmn.BaseService
|
||||||
pubKey crypto.PubKey
|
pubKey crypto.PubKey
|
||||||
@ -400,3 +367,33 @@ func assertPeersWithTimeout(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) {
|
||||||
|
dir, err := ioutil.TempDir("", "pex_reactor")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true)
|
||||||
|
book.SetLogger(log.TestingLogger())
|
||||||
|
|
||||||
|
r = NewPEXReactor(book, &PEXReactorConfig{})
|
||||||
|
r.SetLogger(log.TestingLogger())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func teardownReactor(book *addrBook) {
|
||||||
|
err := os.RemoveAll(filepath.Dir(book.FilePath()))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch {
|
||||||
|
sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
|
||||||
|
sw.SetLogger(log.TestingLogger())
|
||||||
|
for _, r := range reactors {
|
||||||
|
sw.AddReactor(r.String(), r)
|
||||||
|
r.SetSwitch(sw)
|
||||||
|
}
|
||||||
|
return sw
|
||||||
|
}
|
||||||
|
@ -254,10 +254,8 @@ func (c *WSClient) reconnect() error {
|
|||||||
c.mtx.Unlock()
|
c.mtx.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// 1s == (1e9 ns) == (1 Billion ns)
|
|
||||||
billionNs := float64(time.Second.Nanoseconds())
|
|
||||||
for {
|
for {
|
||||||
jitterSeconds := time.Duration(rand.Float64() * billionNs)
|
jitterSeconds := time.Duration(rand.Float64() * float64(time.Second)) // 1s == (1e9 ns)
|
||||||
backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second)
|
backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second)
|
||||||
|
|
||||||
c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
|
c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user