tendermint/p2p/pex/pex_reactor_test.go

448 lines
11 KiB
Go
Raw Normal View History

2018-01-20 21:12:04 -05:00
package pex
2017-01-12 00:17:15 +04:00
import (
"fmt"
2017-01-16 20:31:50 +04:00
"io/ioutil"
"os"
2018-03-09 16:02:24 +04:00
"path/filepath"
2017-01-12 00:17:15 +04:00
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
2018-01-20 21:12:04 -05:00
crypto "github.com/tendermint/go-crypto"
2018-01-20 21:12:04 -05:00
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p"
2018-01-21 00:33:53 -05:00
"github.com/tendermint/tendermint/p2p/conn"
2018-03-09 16:23:52 +04:00
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
2018-01-20 21:12:04 -05:00
)
var (
config *cfg.P2PConfig
2017-01-12 00:17:15 +04:00
)
2018-01-20 21:12:04 -05:00
func init() {
config = cfg.DefaultP2PConfig()
config.PexReactor = true
}
2017-01-12 00:17:15 +04:00
func TestPEXReactorBasic(t *testing.T) {
2018-03-09 16:02:24 +04:00
r, book := createReactor(&PEXReactorConfig{})
defer teardownReactor(book)
2017-04-20 13:04:40 +04:00
2018-03-09 16:02:24 +04:00
assert.NotNil(t, r)
assert.NotEmpty(t, r.GetChannels())
2017-01-12 00:17:15 +04:00
}
func TestPEXReactorAddRemovePeer(t *testing.T) {
2018-03-09 16:02:24 +04:00
r, book := createReactor(&PEXReactorConfig{})
defer teardownReactor(book)
2017-01-12 00:17:15 +04:00
size := book.Size()
2018-01-20 21:12:04 -05:00
peer := p2p.CreateRandomPeer(false)
2017-01-12 00:17:15 +04:00
r.AddPeer(peer)
2018-03-09 16:02:24 +04:00
assert.Equal(t, size+1, book.Size())
2017-01-12 00:17:15 +04:00
r.RemovePeer(peer, "peer not available")
2018-03-09 16:02:24 +04:00
assert.Equal(t, size+1, book.Size())
2017-01-12 00:17:15 +04:00
2018-01-20 21:12:04 -05:00
outboundPeer := p2p.CreateRandomPeer(true)
2017-01-12 00:17:15 +04:00
r.AddPeer(outboundPeer)
2018-03-09 16:02:24 +04:00
assert.Equal(t, size+1, book.Size(), "outbound peers should not be added to the address book")
2017-01-12 00:17:15 +04:00
r.RemovePeer(outboundPeer, "peer not available")
2018-03-09 16:02:24 +04:00
assert.Equal(t, size+1, book.Size())
2017-01-12 00:17:15 +04:00
}
func TestPEXReactorRunning(t *testing.T) {
N := 3
2018-01-20 21:12:04 -05:00
switches := make([]*p2p.Switch, N)
2017-01-12 00:17:15 +04:00
// directory to store address books
2017-01-16 20:31:50 +04:00
dir, err := ioutil.TempDir("", "pex_reactor")
require.Nil(t, err)
2017-10-03 18:49:20 -04:00
defer os.RemoveAll(dir) // nolint: errcheck
books := make([]*addrBook, N)
logger := log.TestingLogger()
2017-01-12 00:17:15 +04:00
// create switches
for i := 0; i < N; i++ {
2018-01-20 21:12:04 -05:00
switches[i] = p2p.MakeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch {
books[i] = NewAddrBook(filepath.Join(dir, fmt.Sprintf("addrbook%d.json", i)), false)
books[i].SetLogger(logger.With("pex", i))
sw.SetAddrBook(books[i])
sw.SetLogger(logger.With("pex", i))
r := NewPEXReactor(books[i], &PEXReactorConfig{})
r.SetLogger(logger.With("pex", i))
2017-01-12 00:17:15 +04:00
r.SetEnsurePeersPeriod(250 * time.Millisecond)
sw.AddReactor("pex", r)
2017-01-12 00:17:15 +04:00
return sw
})
}
addOtherNodeAddrToAddrBook := func(switchIndex, otherSwitchIndex int) {
addr := switches[otherSwitchIndex].NodeInfo().NetAddress()
books[switchIndex].AddAddress(addr, addr)
2017-01-12 00:17:15 +04:00
}
addOtherNodeAddrToAddrBook(0, 1)
addOtherNodeAddrToAddrBook(1, 0)
addOtherNodeAddrToAddrBook(2, 1)
for i, sw := range switches {
sw.AddListener(p2p.NewDefaultListener("tcp", sw.NodeInfo().ListenAddr, true, logger.With("pex", i)))
err := sw.Start() // start switch and reactors
require.Nil(t, err)
2017-01-12 00:17:15 +04:00
}
assertPeersWithTimeout(t, switches, 10*time.Millisecond, 10*time.Second, N-1)
2017-01-12 00:17:15 +04:00
// stop them
for _, s := range switches {
s.Stop()
}
}
2017-01-12 15:16:51 +04:00
func TestPEXReactorReceive(t *testing.T) {
2018-03-09 16:02:24 +04:00
r, book := createReactor(&PEXReactorConfig{})
defer teardownReactor(book)
2017-01-12 15:16:51 +04:00
2018-01-20 21:12:04 -05:00
peer := p2p.CreateRandomPeer(false)
2017-01-12 15:16:51 +04:00
2018-01-14 03:22:01 -05:00
// we have to send a request to receive responses
r.RequestAddrs(peer)
2018-01-14 03:22:01 -05:00
2017-01-12 15:16:51 +04:00
size := book.Size()
2018-01-21 00:33:53 -05:00
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
2018-03-26 06:40:02 +02:00
msg := cdc.MustMarshalBinary(&pexAddrsMessage{Addrs: addrs})
2017-01-12 15:16:51 +04:00
r.Receive(PexChannel, peer, msg)
2018-03-09 16:02:24 +04:00
assert.Equal(t, size+1, book.Size())
2017-01-12 15:16:51 +04:00
2018-03-26 06:40:02 +02:00
msg = cdc.MustMarshalBinary(&pexRequestMessage{})
r.Receive(PexChannel, peer, msg) // should not panic.
2017-01-12 15:16:51 +04:00
}
2018-01-14 03:22:01 -05:00
func TestPEXReactorRequestMessageAbuse(t *testing.T) {
2018-03-09 16:02:24 +04:00
r, book := createReactor(&PEXReactorConfig{})
defer teardownReactor(book)
2017-04-20 13:04:40 +04:00
2018-03-09 16:02:24 +04:00
sw := createSwitchAndAddReactors(r)
sw.SetAddrBook(book)
2017-01-12 17:56:40 +04:00
2018-01-14 03:22:01 -05:00
peer := newMockPeer()
2018-01-20 21:12:04 -05:00
p2p.AddPeerToSwitch(sw, peer)
2018-03-09 16:02:24 +04:00
assert.True(t, sw.Peers().Has(peer.ID()))
2017-01-12 17:56:40 +04:00
2018-01-14 03:22:01 -05:00
id := string(peer.ID())
2018-03-26 06:40:02 +02:00
msg := cdc.MustMarshalBinary(&pexRequestMessage{})
2017-01-12 17:56:40 +04:00
2018-01-14 03:22:01 -05:00
// first time creates the entry
r.Receive(PexChannel, peer, msg)
2018-03-09 16:02:24 +04:00
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
2018-01-14 03:22:01 -05:00
// next time sets the last time value
r.Receive(PexChannel, peer, msg)
2018-03-09 16:02:24 +04:00
assert.True(t, r.lastReceivedRequests.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
2018-01-14 03:22:01 -05:00
// third time is too many too soon - peer is removed
r.Receive(PexChannel, peer, msg)
2018-03-09 16:02:24 +04:00
assert.False(t, r.lastReceivedRequests.Has(id))
assert.False(t, sw.Peers().Has(peer.ID()))
2018-01-14 03:22:01 -05:00
}
func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
2018-03-09 16:02:24 +04:00
r, book := createReactor(&PEXReactorConfig{})
defer teardownReactor(book)
2018-01-14 03:22:01 -05:00
2018-03-09 16:02:24 +04:00
sw := createSwitchAndAddReactors(r)
sw.SetAddrBook(book)
2018-01-14 03:22:01 -05:00
peer := newMockPeer()
2018-01-20 21:12:04 -05:00
p2p.AddPeerToSwitch(sw, peer)
2018-03-09 16:02:24 +04:00
assert.True(t, sw.Peers().Has(peer.ID()))
2018-01-14 03:22:01 -05:00
id := string(peer.ID())
// request addrs from the peer
r.RequestAddrs(peer)
2018-03-09 16:02:24 +04:00
assert.True(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
2018-01-14 03:22:01 -05:00
2018-01-21 00:33:53 -05:00
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
2018-03-26 06:40:02 +02:00
msg := cdc.MustMarshalBinary(&pexAddrsMessage{Addrs: addrs})
2018-01-14 03:22:01 -05:00
// receive some addrs. should clear the request
r.Receive(PexChannel, peer, msg)
2018-03-09 16:02:24 +04:00
assert.False(t, r.requestsSent.Has(id))
assert.True(t, sw.Peers().Has(peer.ID()))
2018-01-14 03:22:01 -05:00
// receiving more addrs causes a disconnect
r.Receive(PexChannel, peer, msg)
2018-03-09 16:02:24 +04:00
assert.False(t, sw.Peers().Has(peer.ID()))
2017-01-12 17:56:40 +04:00
}
func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
// directory to store address books
dir, err := ioutil.TempDir("", "pex_reactor")
require.Nil(t, err)
defer os.RemoveAll(dir) // nolint: errcheck
// 1. create seed
seed := p2p.MakeSwitch(
config,
0,
"127.0.0.1",
"123.123.123",
func(i int, sw *p2p.Switch) *p2p.Switch {
book := NewAddrBook(filepath.Join(dir, "addrbook0.json"), false)
book.SetLogger(log.TestingLogger())
sw.SetAddrBook(book)
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(book, &PEXReactorConfig{})
r.SetLogger(log.TestingLogger())
sw.AddReactor("pex", r)
return sw
},
)
seed.AddListener(
p2p.NewDefaultListener(
"tcp",
seed.NodeInfo().ListenAddr,
true,
log.TestingLogger(),
),
)
require.Nil(t, seed.Start())
defer seed.Stop()
// 2. create usual peer with only seed configured.
peer := p2p.MakeSwitch(
config,
1,
"127.0.0.1",
"123.123.123",
func(i int, sw *p2p.Switch) *p2p.Switch {
book := NewAddrBook(filepath.Join(dir, "addrbook1.json"), false)
book.SetLogger(log.TestingLogger())
sw.SetAddrBook(book)
sw.SetLogger(log.TestingLogger())
r := NewPEXReactor(
book,
&PEXReactorConfig{
Seeds: []string{seed.NodeInfo().NetAddress().String()},
},
)
r.SetLogger(log.TestingLogger())
sw.AddReactor("pex", r)
return sw
},
)
require.Nil(t, peer.Start())
defer peer.Stop()
// 3. check that the peer connects to seed immediately
2018-04-05 05:43:23 -07:00
assertPeersWithTimeout(t, []*p2p.Switch{peer}, 10*time.Millisecond, 3*time.Second, 1)
}
2018-01-09 20:12:41 -05:00
func TestPEXReactorCrawlStatus(t *testing.T) {
2018-03-09 16:02:24 +04:00
pexR, book := createReactor(&PEXReactorConfig{SeedMode: true})
defer teardownReactor(book)
2018-01-09 20:12:41 -05:00
// Seed/Crawler mode uses data from the Switch
sw := createSwitchAndAddReactors(pexR)
sw.SetAddrBook(book)
2018-01-09 20:12:41 -05:00
// Create a peer, add it to the peer set and the addrbook.
2018-01-20 21:12:04 -05:00
peer := p2p.CreateRandomPeer(false)
p2p.AddPeerToSwitch(pexR.Switch, peer)
addr1 := peer.NodeInfo().NetAddress()
pexR.book.AddAddress(addr1, addr1)
// Add a non-connected address to the book.
2018-01-20 21:12:04 -05:00
_, addr2 := p2p.CreateRoutableAddr()
pexR.book.AddAddress(addr2, addr1)
2018-01-09 20:12:41 -05:00
// Get some peerInfos to crawl
peerInfos := pexR.getPeersToCrawl()
2018-01-09 20:12:41 -05:00
// Make sure it has the proper number of elements
2018-03-09 16:02:24 +04:00
assert.Equal(t, 2, len(peerInfos))
2018-01-09 20:12:41 -05:00
// TODO: test
2018-01-09 20:12:41 -05:00
}
2018-03-07 12:25:31 +04:00
func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
2018-03-13 10:30:24 +04:00
peer := p2p.CreateRandomPeer(false)
2018-03-07 12:25:31 +04:00
pexR, book := createReactor(&PEXReactorConfig{PrivatePeerIDs: []string{string(peer.NodeInfo().ID())}})
defer teardownReactor(book)
// we have to send a request to receive responses
2018-03-13 10:30:24 +04:00
pexR.RequestAddrs(peer)
2018-03-07 12:25:31 +04:00
size := book.Size()
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
2018-03-13 10:30:24 +04:00
pexR.Receive(PexChannel, peer, msg)
2018-03-07 12:25:31 +04:00
assert.Equal(t, size, book.Size())
2018-03-13 10:30:24 +04:00
pexR.AddPeer(peer)
2018-03-07 12:25:31 +04:00
assert.Equal(t, size, book.Size())
}
2018-03-09 16:23:52 +04:00
func TestPEXReactorDialPeer(t *testing.T) {
pexR, book := createReactor(&PEXReactorConfig{})
defer teardownReactor(book)
sw := createSwitchAndAddReactors(pexR)
sw.SetAddrBook(book)
2018-03-09 16:23:52 +04:00
peer := newMockPeer()
addr := peer.NodeInfo().NetAddress()
assert.Equal(t, 0, pexR.AttemptsToDial(addr))
// 1st unsuccessful attempt
pexR.dialPeer(addr)
2018-03-11 14:00:49 +04:00
assert.Equal(t, 1, pexR.AttemptsToDial(addr))
2018-03-09 16:23:52 +04:00
// 2nd unsuccessful attempt
pexR.dialPeer(addr)
2018-03-11 14:00:49 +04:00
// must be skipped because it is too early
assert.Equal(t, 1, pexR.AttemptsToDial(addr))
if !testing.Short() {
time.Sleep(3 * time.Second)
// 3rd attempt
pexR.dialPeer(addr)
assert.Equal(t, 2, pexR.AttemptsToDial(addr))
}
2018-03-09 16:23:52 +04:00
}
2018-01-14 03:22:01 -05:00
type mockPeer struct {
*cmn.BaseService
pubKey crypto.PubKey
2018-01-21 00:33:53 -05:00
addr *p2p.NetAddress
2018-01-14 03:22:01 -05:00
outbound, persistent bool
}
func newMockPeer() mockPeer {
2018-01-20 21:12:04 -05:00
_, netAddr := p2p.CreateRoutableAddr()
2018-01-14 03:22:01 -05:00
mp := mockPeer{
addr: netAddr,
2018-03-26 06:40:02 +02:00
pubKey: crypto.GenPrivKeyEd25519().PubKey(),
2018-01-14 03:22:01 -05:00
}
mp.BaseService = cmn.NewBaseService(nil, "MockPeer", mp)
mp.Start()
return mp
}
2018-01-21 00:33:53 -05:00
func (mp mockPeer) ID() p2p.ID { return p2p.PubKeyToID(mp.pubKey) }
2018-01-14 03:22:01 -05:00
func (mp mockPeer) IsOutbound() bool { return mp.outbound }
func (mp mockPeer) IsPersistent() bool { return mp.persistent }
2018-01-21 00:33:53 -05:00
func (mp mockPeer) NodeInfo() p2p.NodeInfo {
return p2p.NodeInfo{
2018-01-14 03:22:01 -05:00
PubKey: mp.pubKey,
ListenAddr: mp.addr.DialString(),
}
}
2018-03-26 06:40:02 +02:00
func (mp mockPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
func (mp mockPeer) Send(byte, []byte) bool { return false }
func (mp mockPeer) TrySend(byte, []byte) bool { return false }
func (mp mockPeer) Set(string, interface{}) {}
func (mp mockPeer) Get(string) interface{} { return nil }
func assertPeersWithTimeout(
t *testing.T,
switches []*p2p.Switch,
checkPeriod, timeout time.Duration,
nPeers int,
) {
var (
ticker = time.NewTicker(checkPeriod)
remaining = timeout
)
for {
select {
case <-ticker.C:
// check peers are connected
allGood := true
for _, s := range switches {
outbound, inbound, _ := s.NumPeers()
if outbound+inbound < nPeers {
allGood = false
}
}
remaining -= checkPeriod
if remaining < 0 {
remaining = 0
}
if allGood {
return
}
case <-time.After(remaining):
numPeersStr := ""
for i, s := range switches {
outbound, inbound, _ := s.NumPeers()
numPeersStr += fmt.Sprintf("%d => {outbound: %d, inbound: %d}, ", i, outbound, inbound)
}
t.Errorf(
"expected all switches to be connected to at least one peer (switches: %s)",
numPeersStr,
)
return
}
}
}
2018-03-09 16:02:24 +04:00
func createReactor(config *PEXReactorConfig) (r *PEXReactor, book *addrBook) {
// directory to store address book
2018-03-09 16:02:24 +04:00
dir, err := ioutil.TempDir("", "pex_reactor")
if err != nil {
panic(err)
}
book = NewAddrBook(filepath.Join(dir, "addrbook.json"), true)
book.SetLogger(log.TestingLogger())
2018-03-13 10:30:24 +04:00
r = NewPEXReactor(book, config)
2018-03-09 16:02:24 +04:00
r.SetLogger(log.TestingLogger())
return
}
func teardownReactor(book *addrBook) {
err := os.RemoveAll(filepath.Dir(book.FilePath()))
if err != nil {
panic(err)
}
}
func createSwitchAndAddReactors(reactors ...p2p.Reactor) *p2p.Switch {
sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw })
sw.SetLogger(log.TestingLogger())
for _, r := range reactors {
sw.AddReactor(r.String(), r)
r.SetSwitch(sw)
}
return sw
}