go-libp2p-kad-dht/dht_test.go

1397 lines
32 KiB
Go
Raw Normal View History

package dht
2014-09-19 07:51:03 -07:00
import (
"bytes"
2016-09-30 10:24:03 -07:00
"context"
"errors"
2014-12-23 18:40:19 -08:00
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"math/rand"
2014-11-24 14:58:51 -05:00
"sort"
2018-06-04 10:25:44 -07:00
"strings"
2014-12-23 18:40:19 -08:00
"sync"
2014-09-19 07:51:03 -07:00
"testing"
"time"
2014-09-19 07:51:03 -07:00
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
cid "github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
kb "github.com/libp2p/go-libp2p-kbucket"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
2016-09-02 20:21:23 +01:00
record "github.com/libp2p/go-libp2p-record"
routing "github.com/libp2p/go-libp2p-routing"
2018-02-16 18:59:15 -08:00
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
2016-11-21 20:10:14 -08:00
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
2016-09-02 20:21:23 +01:00
ci "github.com/libp2p/go-testutil/ci"
travisci "github.com/libp2p/go-testutil/ci/travis"
ma "github.com/multiformats/go-multiaddr"
2014-09-19 07:51:03 -07:00
)
var testCaseValues = map[string][]byte{}
var testCaseCids []cid.Cid
2014-12-23 18:40:19 -08:00
func init() {
for i := 0; i < 100; i++ {
v := fmt.Sprintf("%d -- value", i)
mhv := u.Hash([]byte(v))
testCaseCids = append(testCaseCids, cid.NewCidV0(mhv))
2014-12-23 18:40:19 -08:00
}
}
type blankValidator struct{}
func (blankValidator) Validate(_ string, _ []byte) error { return nil }
func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }
type testValidator struct{}
func (testValidator) Select(_ string, bs [][]byte) (int, error) {
index := -1
for i, b := range bs {
if bytes.Compare(b, []byte("newer")) == 0 {
index = i
} else if bytes.Compare(b, []byte("valid")) == 0 {
if index == -1 {
index = i
}
}
}
if index == -1 {
return -1, errors.New("no rec found")
}
return index, nil
}
func (testValidator) Validate(_ string, b []byte) error {
if bytes.Compare(b, []byte("expired")) == 0 {
return errors.New("expired")
}
return nil
}
func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
d, err := New(
ctx,
2018-02-16 18:59:15 -08:00
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
opts.Client(client),
opts.NamespacedValidator("v", blankValidator{}),
)
2014-09-19 07:51:03 -07:00
if err != nil {
t.Fatal(err)
}
2014-09-19 07:51:03 -07:00
return d
}
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) {
addrs := make([]ma.Multiaddr, n)
2014-09-19 14:31:10 -07:00
dhts := make([]*IpfsDHT, n)
peers := make([]peer.ID, n)
sanityAddrsMap := make(map[string]struct{})
sanityPeersMap := make(map[string]struct{})
2014-09-19 14:31:10 -07:00
for i := 0; i < n; i++ {
dhts[i] = setupDHT(ctx, t, false)
peers[i] = dhts[i].self
addrs[i] = dhts[i].peerstore.Addrs(dhts[i].self)[0]
if _, lol := sanityAddrsMap[addrs[i].String()]; lol {
t.Fatal("While setting up DHTs address got duplicated.")
} else {
sanityAddrsMap[addrs[i].String()] = struct{}{}
}
if _, lol := sanityPeersMap[peers[i].String()]; lol {
t.Fatal("While setting up DHTs peerid got duplicated.")
} else {
sanityPeersMap[peers[i].String()] = struct{}{}
}
2014-09-19 07:51:03 -07:00
}
return addrs, peers, dhts
}
func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
t.Helper()
idB := b.self
addrB := b.peerstore.Addrs(idB)
if len(addrB) == 0 {
t.Fatal("peers setup incorrectly: no local address")
2014-09-19 07:51:03 -07:00
}
a.peerstore.AddAddrs(idB, addrB, pstore.TempAddrTTL)
pi := pstore.PeerInfo{ID: idB}
if err := a.host.Connect(ctx, pi); err != nil {
t.Fatal(err)
2014-09-19 07:51:03 -07:00
}
}
func wait(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
t.Helper()
// loop until connection notification has been received.
// under high load, this may not happen as immediately as we would like.
for a.routingTable.Find(b.self) == "" {
select {
case <-ctx.Done():
t.Fatal(ctx.Err())
case <-time.After(time.Millisecond * 5):
}
}
}
func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
t.Helper()
connectNoSync(t, ctx, a, b)
wait(t, ctx, a, b)
wait(t, ctx, b, a)
2014-09-19 07:51:03 -07:00
}
2014-12-23 19:05:41 -08:00
func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
ctx, cancel := context.WithCancel(ctx)
2018-02-16 18:52:29 -08:00
defer cancel()
2019-02-01 17:46:46 +11:00
logger.Debugf("Bootstrapping DHTs...")
// tried async. sequential fares much better. compare:
// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
// probably because results compound
var cfg BootstrapConfig
cfg = DefaultBootstrapConfig
cfg.Queries = 3
start := rand.Intn(len(dhts)) // randomize to decrease bias.
for i := range dhts {
dht := dhts[(start+i)%len(dhts)]
dht.runBootstrap(ctx, cfg)
2014-12-23 19:05:41 -08:00
}
}
2014-09-19 07:51:03 -07:00
func TestValueGetSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2014-12-16 14:35:52 -08:00
2018-06-13 19:37:29 -07:00
var dhts [5]*IpfsDHT
2014-09-19 07:51:03 -07:00
2018-06-13 19:37:29 -07:00
for i := range dhts {
dhts[i] = setupDHT(ctx, t, false)
defer dhts[i].Close()
defer dhts[i].host.Close()
}
2014-09-19 07:51:03 -07:00
2018-06-13 19:37:29 -07:00
connect(t, ctx, dhts[0], dhts[1])
2014-09-19 07:51:03 -07:00
2018-06-13 19:37:29 -07:00
t.Log("adding value on: ", dhts[0].self)
2016-09-30 11:08:16 -07:00
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
2018-06-13 19:37:29 -07:00
err := dhts[0].PutValue(ctxT, "/v/hello", []byte("world"))
2014-09-19 07:51:03 -07:00
if err != nil {
t.Fatal(err)
}
2018-06-13 19:37:29 -07:00
t.Log("requesting value on dhts: ", dhts[1].self)
2016-09-30 11:08:16 -07:00
ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
defer cancel()
2018-06-13 19:37:29 -07:00
val, err := dhts[1].GetValue(ctxT, "/v/hello")
if err != nil {
t.Fatal(err)
}
if string(val) != "world" {
t.Fatalf("Expected 'world' got '%s'", string(val))
}
// late connect
connect(t, ctx, dhts[2], dhts[0])
connect(t, ctx, dhts[2], dhts[1])
t.Log("requesting value (offline) on dhts: ", dhts[2].self)
vala, err := dhts[2].GetValue(ctxT, "/v/hello", Quorum(0))
if vala != nil {
t.Fatalf("offline get should have failed, got %s", string(vala))
}
if err != routing.ErrNotFound {
t.Fatalf("offline get should have failed with ErrNotFound, got: %s", err)
}
t.Log("requesting value (online) on dhts: ", dhts[2].self)
val, err = dhts[2].GetValue(ctxT, "/v/hello")
if err != nil {
t.Fatal(err)
}
if string(val) != "world" {
t.Fatalf("Expected 'world' got '%s'", string(val))
}
for _, d := range dhts[:3] {
connect(t, ctx, dhts[3], d)
}
connect(t, ctx, dhts[4], dhts[3])
t.Log("requesting value (requires peer routing) on dhts: ", dhts[4].self)
val, err = dhts[4].GetValue(ctxT, "/v/hello")
if err != nil {
t.Fatal(err)
}
2018-06-13 19:37:29 -07:00
if string(val) != "world" {
t.Fatalf("Expected 'world' got '%s'", string(val))
}
2014-09-19 07:51:03 -07:00
}
func TestValueSetInvalid(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)
defer dhtA.Close()
defer dhtB.Close()
defer dhtA.host.Close()
defer dhtB.host.Close()
dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
dhtB.Validator.(record.NamespacedValidator)["v"] = blankValidator{}
connect(t, ctx, dhtA, dhtB)
testSetGet := func(val string, failset bool, exp string, experr error) {
t.Helper()
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
err := dhtA.PutValue(ctxT, "/v/hello", []byte(val))
if failset {
if err == nil {
t.Error("expected set to fail")
}
} else {
if err != nil {
t.Error(err)
}
}
ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
defer cancel()
valb, err := dhtB.GetValue(ctxT, "/v/hello")
if err != experr {
t.Errorf("Set/Get %v: Expected %v error but got %v", val, experr, err)
} else if err == nil && string(valb) != exp {
t.Errorf("Expected '%v' got '%s'", exp, string(valb))
}
}
// Expired records should not be set
testSetGet("expired", true, "", routing.ErrNotFound)
// Valid record should be returned
testSetGet("valid", false, "valid", nil)
// Newer record should supersede previous record
testSetGet("newer", false, "newer", nil)
// Attempt to set older record again should be ignored
testSetGet("valid", true, "newer", nil)
}
2018-07-18 14:26:50 +02:00
func TestSearchValue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)
defer dhtA.Close()
defer dhtB.Close()
defer dhtA.host.Close()
defer dhtB.host.Close()
connect(t, ctx, dhtA, dhtB)
dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{}
dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
err := dhtA.PutValue(ctxT, "/v/hello", []byte("valid"))
if err != nil {
t.Error(err)
}
ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
defer cancel()
valCh, err := dhtA.SearchValue(ctxT, "/v/hello", Quorum(-1))
if err != nil {
t.Fatal(err)
}
2018-07-18 14:26:50 +02:00
select {
case v := <-valCh:
if string(v) != "valid" {
t.Errorf("expected 'valid', got '%s'", string(v))
}
case <-ctxT.Done():
t.Fatal(ctxT.Err())
}
err = dhtB.PutValue(ctxT, "/v/hello", []byte("newer"))
if err != nil {
t.Error(err)
}
select {
case v := <-valCh:
if string(v) != "newer" {
t.Errorf("expected 'newer', got '%s'", string(v))
}
case <-ctxT.Done():
t.Fatal(ctxT.Err())
}
}
func TestGetValues(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)
defer dhtA.Close()
defer dhtB.Close()
defer dhtA.host.Close()
defer dhtB.host.Close()
connect(t, ctx, dhtA, dhtB)
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
err := dhtB.PutValue(ctxT, "/v/hello", []byte("newer"))
if err != nil {
t.Error(err)
}
err = dhtA.PutValue(ctxT, "/v/hello", []byte("valid"))
if err != nil {
t.Error(err)
}
ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
defer cancel()
vals, err := dhtA.GetValues(ctxT, "/v/hello", 16)
if err != nil {
t.Fatal(err)
}
if len(vals) != 2 {
t.Fatalf("expected to get 2 values, got %d", len(vals))
}
sort.Slice(vals, func(i, j int) bool { return string(vals[i].Val) < string(vals[j].Val) })
if string(vals[0].Val) != "valid" {
t.Errorf("unexpected vals[0]: %s", string(vals[0].Val))
}
if string(vals[1].Val) != "valid" {
t.Errorf("unexpected vals[1]: %s", string(vals[1].Val))
}
}
func TestValueGetInvalid(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)
defer dhtA.Close()
defer dhtB.Close()
defer dhtA.host.Close()
defer dhtB.host.Close()
dhtA.Validator.(record.NamespacedValidator)["v"] = blankValidator{}
dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{}
connect(t, ctx, dhtA, dhtB)
testSetGet := func(val string, exp string, experr error) {
t.Helper()
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
err := dhtA.PutValue(ctxT, "/v/hello", []byte(val))
if err != nil {
t.Error(err)
}
ctxT, cancel = context.WithTimeout(ctx, time.Second*2)
defer cancel()
valb, err := dhtB.GetValue(ctxT, "/v/hello")
if err != experr {
2018-07-17 23:44:34 +02:00
t.Errorf("Set/Get %v: Expected '%v' error but got '%v'", val, experr, err)
} else if err == nil && string(valb) != exp {
t.Errorf("Expected '%v' got '%s'", exp, string(valb))
}
}
// Expired records should not be returned
testSetGet("expired", "", routing.ErrNotFound)
// Valid record should be returned
testSetGet("valid", "valid", nil)
// Newer record should supersede previous record
testSetGet("newer", "newer", nil)
// Attempt to set older record again should be ignored
testSetGet("valid", "newer", nil)
}
func TestInvalidMessageSenderTracking(t *testing.T) {
2018-02-16 18:52:29 -08:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dht := setupDHT(ctx, t, false)
2018-02-16 18:52:29 -08:00
defer dht.Close()
foo := peer.ID("asdasd")
_, err := dht.messageSenderForPeer(foo)
if err == nil {
t.Fatal("that shouldnt have succeeded")
}
dht.smlk.Lock()
2018-02-16 18:52:29 -08:00
mscnt := len(dht.strmap)
dht.smlk.Unlock()
if mscnt > 0 {
t.Fatal("should have no message senders in map")
}
}
2014-09-19 18:11:05 -07:00
func TestProvides(t *testing.T) {
// t.Skip("skipping test to debug another")
2018-02-16 18:52:29 -08:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2014-09-19 18:11:05 -07:00
_, _, dhts := setupDHTS(ctx, 4, t)
2014-09-19 18:11:05 -07:00
defer func() {
for i := 0; i < 4; i++ {
2014-10-25 07:12:01 -07:00
dhts[i].Close()
2015-01-01 12:45:39 -08:00
defer dhts[i].host.Close()
2014-09-19 18:11:05 -07:00
}
}()
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
2014-09-19 18:11:05 -07:00
for _, k := range testCaseCids {
2019-02-01 17:46:46 +11:00
logger.Debugf("announcing provider for %s", k)
2017-05-16 18:23:18 -07:00
if err := dhts[3].Provide(ctx, k, true); err != nil {
2014-12-23 18:40:19 -08:00
t.Fatal(err)
}
2014-09-19 18:11:05 -07:00
}
2014-12-23 18:40:19 -08:00
// what is this timeout for? was 60ms before.
time.Sleep(time.Millisecond * 6)
n := 0
for _, c := range testCaseCids {
2014-12-23 18:40:19 -08:00
n = (n + 1) % 3
2019-02-01 17:46:46 +11:00
logger.Debugf("getting providers for %s from %d", c, n)
2016-09-30 11:08:16 -07:00
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
provchan := dhts[n].FindProvidersAsync(ctxT, c, 1)
2014-12-23 18:40:19 -08:00
select {
case prov := <-provchan:
if prov.ID == "" {
t.Fatal("Got back nil provider")
}
if prov.ID != dhts[3].self {
t.Fatal("Got back wrong provider")
}
case <-ctxT.Done():
t.Fatal("Did not get a provider back.")
}
}
}
2017-05-16 18:23:18 -07:00
func TestLocalProvides(t *testing.T) {
// t.Skip("skipping test to debug another")
2018-02-16 18:52:29 -08:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2017-05-16 18:23:18 -07:00
_, _, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
defer dhts[i].host.Close()
}
}()
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
for _, k := range testCaseCids {
2019-02-01 17:46:46 +11:00
logger.Debugf("announcing provider for %s", k)
2017-05-16 18:23:18 -07:00
if err := dhts[3].Provide(ctx, k, false); err != nil {
t.Fatal(err)
}
}
time.Sleep(time.Millisecond * 10)
for _, c := range testCaseCids {
for i := 0; i < 3; i++ {
provs := dhts[i].providers.GetProviders(ctx, c)
if len(provs) > 0 {
t.Fatal("shouldnt know this")
}
}
}
}
// if minPeers or avgPeers is 0, dont test for it.
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
// test "well-formed-ness" (>= minPeers peers in every routing table)
checkTables := func() bool {
totalPeers := 0
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
totalPeers += rtlen
if minPeers > 0 && rtlen < minPeers {
2019-01-31 13:33:03 +11:00
//t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
return false
}
}
actualAvgPeers := totalPeers / len(dhts)
t.Logf("avg rt size: %d", actualAvgPeers)
if avgPeers > 0 && actualAvgPeers < avgPeers {
t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
return false
}
return true
}
timeoutA := time.After(timeout)
for {
select {
case <-timeoutA:
2019-02-01 17:46:46 +11:00
logger.Debugf("did not reach well-formed routing tables by %s", timeout)
return false // failed
case <-time.After(5 * time.Millisecond):
if checkTables() {
return true // succeeded
}
}
}
}
func printRoutingTables(dhts []*IpfsDHT) {
// the routing tables should be full now. let's inspect them.
fmt.Printf("checking routing table of %d\n", len(dhts))
for _, dht := range dhts {
fmt.Printf("checking routing table of %s\n", dht.self)
dht.routingTable.Print()
fmt.Println("")
}
}
2014-12-23 19:05:41 -08:00
func TestBootstrap(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
2018-02-16 18:52:29 -08:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2014-12-23 19:05:41 -08:00
nDHTs := 30
2014-12-23 19:05:41 -08:00
_, _, dhts := setupDHTS(ctx, nDHTs, t)
defer func() {
for i := 0; i < nDHTs; i++ {
dhts[i].Close()
2015-01-01 12:45:39 -08:00
defer dhts[i].host.Close()
2014-12-23 19:05:41 -08:00
}
}()
t.Logf("connecting %d dhts in a ring", nDHTs)
for i := 0; i < nDHTs; i++ {
connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
}
<-time.After(100 * time.Millisecond)
// bootstrap a few times until we get good tables.
stop := make(chan struct{})
go func() {
for {
t.Logf("bootstrapping them so they find each other %d", nDHTs)
2016-09-30 11:08:16 -07:00
ctxT, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
bootstrap(t, ctxT, dhts)
select {
case <-time.After(50 * time.Millisecond):
continue // being explicit
case <-stop:
return
}
}
}()
waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
close(stop)
2014-12-23 19:05:41 -08:00
if u.Debug {
// the routing tables should be full now. let's inspect them.
printRoutingTables(dhts)
}
}
func TestPeriodicBootstrap(t *testing.T) {
if ci.IsRunning() {
t.Skip("skipping on CI. highly timing dependent")
}
if testing.Short() {
t.SkipNow()
}
2018-02-16 18:52:29 -08:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nDHTs := 30
_, _, dhts := setupDHTS(ctx, nDHTs, t)
defer func() {
for i := 0; i < nDHTs; i++ {
dhts[i].Close()
defer dhts[i].host.Close()
}
}()
var cfg BootstrapConfig
cfg = DefaultBootstrapConfig
cfg.Queries = 5
t.Logf("dhts are not connected. %d", nDHTs)
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
if rtlen > 0 {
t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen)
}
}
for i := 0; i < nDHTs; i++ {
connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
}
t.Logf("DHTs are now connected to 1-2 others. %d", nDHTs)
2014-12-23 19:05:41 -08:00
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
if rtlen > 2 {
t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen)
}
2014-12-23 19:05:41 -08:00
}
if u.Debug {
printRoutingTables(dhts)
}
t.Logf("bootstrapping them so they find each other. %d", nDHTs)
for _, dht := range dhts {
go dht.BootstrapOnce(ctx, cfg)
}
// this is async, and we dont know when it's finished with one cycle, so keep checking
// until the routing tables look better, or some long timeout for the failure case.
waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
if u.Debug {
printRoutingTables(dhts)
}
2014-12-23 19:05:41 -08:00
}
2014-12-23 18:40:19 -08:00
func TestProvidesMany(t *testing.T) {
t.Skip("this test doesn't work")
2018-02-16 18:52:29 -08:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2014-12-23 18:40:19 -08:00
nDHTs := 40
_, _, dhts := setupDHTS(ctx, nDHTs, t)
defer func() {
for i := 0; i < nDHTs; i++ {
dhts[i].Close()
2015-01-01 12:45:39 -08:00
defer dhts[i].host.Close()
2014-12-23 18:40:19 -08:00
}
}()
t.Logf("connecting %d dhts in a ring", nDHTs)
for i := 0; i < nDHTs; i++ {
connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
}
<-time.After(100 * time.Millisecond)
t.Logf("bootstrapping them so they find each other. %d", nDHTs)
2016-09-30 11:08:16 -07:00
ctxT, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
bootstrap(t, ctxT, dhts)
if u.Debug {
// the routing tables should be full now. let's inspect them.
t.Logf("checking routing table of %d", nDHTs)
for _, dht := range dhts {
fmt.Printf("checking routing table of %s\n", dht.self)
dht.routingTable.Print()
fmt.Println("")
}
}
2014-12-23 18:40:19 -08:00
providers := make(map[string]peer.ID)
2014-12-24 04:23:15 -08:00
2014-12-23 18:40:19 -08:00
d := 0
for _, c := range testCaseCids {
2014-12-23 18:40:19 -08:00
d = (d + 1) % len(dhts)
dht := dhts[d]
providers[c.KeyString()] = dht.self
2014-12-23 18:40:19 -08:00
t.Logf("announcing provider for %s", c)
2017-05-16 18:23:18 -07:00
if err := dht.Provide(ctx, c, true); err != nil {
2014-12-23 18:40:19 -08:00
t.Fatal(err)
}
2014-09-19 18:11:05 -07:00
}
// what is this timeout for? was 60ms before.
time.Sleep(time.Millisecond * 6)
2014-09-19 18:11:05 -07:00
2014-12-23 18:40:19 -08:00
errchan := make(chan error)
2016-09-30 11:08:16 -07:00
ctxT, cancel = context.WithTimeout(ctx, 5*time.Second)
defer cancel()
2014-12-23 18:40:19 -08:00
var wg sync.WaitGroup
getProvider := func(dht *IpfsDHT, k cid.Cid) {
2014-12-23 18:40:19 -08:00
defer wg.Done()
2014-09-19 18:11:05 -07:00
expected := providers[k.KeyString()]
2014-12-24 04:23:15 -08:00
2014-12-23 18:40:19 -08:00
provchan := dht.FindProvidersAsync(ctxT, k, 1)
select {
case prov := <-provchan:
2014-12-24 04:23:15 -08:00
actual := prov.ID
if actual == "" {
2014-12-23 18:40:19 -08:00
errchan <- fmt.Errorf("Got back nil provider (%s at %s)", k, dht.self)
2014-12-24 04:23:15 -08:00
} else if actual != expected {
errchan <- fmt.Errorf("Got back wrong provider (%s != %s) (%s at %s)",
expected, actual, k, dht.self)
2014-12-23 18:40:19 -08:00
}
case <-ctxT.Done():
errchan <- fmt.Errorf("Did not get a provider back (%s at %s)", k, dht.self)
2014-10-11 10:43:54 -07:00
}
2014-12-23 18:40:19 -08:00
}
for _, c := range testCaseCids {
2014-12-23 18:40:19 -08:00
// everyone should be able to find it...
for _, dht := range dhts {
2019-02-01 17:46:46 +11:00
logger.Debugf("getting providers for %s at %s", c, dht.self)
2014-12-23 18:40:19 -08:00
wg.Add(1)
go getProvider(dht, c)
}
2014-12-23 18:40:19 -08:00
}
// we need this because of printing errors
go func() {
wg.Wait()
close(errchan)
}()
for err := range errchan {
t.Error(err)
2014-09-19 18:11:05 -07:00
}
}
func TestProvidesAsync(t *testing.T) {
2015-01-05 04:48:37 -08:00
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
2018-02-16 18:52:29 -08:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, _, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
2014-10-25 07:12:01 -07:00
dhts[i].Close()
2015-01-01 12:45:39 -08:00
defer dhts[i].host.Close()
}
}()
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
2017-05-16 18:23:18 -07:00
err := dhts[3].Provide(ctx, testCaseCids[0], true)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 60)
2016-09-30 11:08:16 -07:00
ctxT, cancel := context.WithTimeout(ctx, time.Millisecond*300)
defer cancel()
provs := dhts[0].FindProvidersAsync(ctxT, testCaseCids[0], 5)
select {
2014-10-24 18:32:28 -07:00
case p, ok := <-provs:
if !ok {
t.Fatal("Provider channel was closed...")
}
if p.ID == "" {
2014-10-24 18:32:28 -07:00
t.Fatal("Got back nil provider!")
}
if p.ID != dhts[3].self {
t.Fatalf("got a provider, but not the right one. %s", p)
}
2014-10-18 04:19:12 -07:00
case <-ctxT.Done():
t.Fatal("Didnt get back providers")
}
}
2014-09-19 18:11:05 -07:00
func TestLayeredGet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2014-12-16 14:35:52 -08:00
_, _, dhts := setupDHTS(ctx, 4, t)
2014-09-19 18:11:05 -07:00
defer func() {
for i := 0; i < 4; i++ {
2014-10-25 07:12:01 -07:00
dhts[i].Close()
2015-01-01 12:45:39 -08:00
defer dhts[i].host.Close()
2014-09-19 18:11:05 -07:00
}
}()
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[2], dhts[3])
2014-09-19 18:11:05 -07:00
err := dhts[3].PutValue(ctx, "/v/hello", []byte("world"))
2014-09-19 18:11:05 -07:00
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Millisecond * 6)
2014-09-19 18:11:05 -07:00
2016-09-30 11:08:16 -07:00
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
val, err := dhts[0].GetValue(ctxT, "/v/hello")
if err != nil {
t.Fatal(err)
}
if string(val) != "world" {
t.Error("got wrong value")
2014-09-19 18:11:05 -07:00
}
}
func TestUnfindablePeer(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
maddrs, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
dhts[i].host.Close()
}
}()
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[2], dhts[3])
// Give DHT 1 a bad addr for DHT 2.
dhts[1].host.Peerstore().ClearAddrs(peers[2])
dhts[1].host.Peerstore().AddAddr(peers[2], maddrs[0], time.Minute)
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
_, err := dhts[0].FindPeer(ctxT, peers[3])
if err == nil {
t.Error("should have failed to find peer")
}
if ctxT.Err() != nil {
t.Error("FindPeer should have failed before context expired")
}
}
2014-09-19 18:11:05 -07:00
func TestFindPeer(t *testing.T) {
2015-01-05 04:48:37 -08:00
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
2014-09-19 18:11:05 -07:00
2018-02-16 18:52:29 -08:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2014-09-19 18:11:05 -07:00
2014-10-18 04:19:12 -07:00
_, peers, dhts := setupDHTS(ctx, 4, t)
2014-09-19 18:11:05 -07:00
defer func() {
for i := 0; i < 4; i++ {
2014-10-25 07:12:01 -07:00
dhts[i].Close()
2015-01-01 12:45:39 -08:00
dhts[i].host.Close()
2014-09-19 18:11:05 -07:00
}
}()
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
2014-09-19 18:11:05 -07:00
2016-09-30 11:08:16 -07:00
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
p, err := dhts[0].FindPeer(ctxT, peers[2])
2014-09-19 18:11:05 -07:00
if err != nil {
t.Fatal(err)
}
if p.ID == "" {
2014-09-19 18:11:05 -07:00
t.Fatal("Failed to find peer.")
}
if p.ID != peers[2] {
2014-09-19 18:11:05 -07:00
t.Fatal("Didnt find expected peer.")
}
}
2014-11-24 14:58:51 -05:00
func TestFindPeersConnectedToPeer(t *testing.T) {
2014-12-16 14:35:52 -08:00
t.Skip("not quite correct (see note)")
2014-11-24 14:58:51 -05:00
if testing.Short() {
t.SkipNow()
}
2018-02-16 18:52:29 -08:00
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2014-11-24 14:58:51 -05:00
_, peers, dhts := setupDHTS(ctx, 4, t)
defer func() {
for i := 0; i < 4; i++ {
dhts[i].Close()
2015-01-01 12:45:39 -08:00
dhts[i].host.Close()
2014-11-24 14:58:51 -05:00
}
}()
// topology:
// 0-1, 1-2, 1-3, 2-3
connect(t, ctx, dhts[0], dhts[1])
connect(t, ctx, dhts[1], dhts[2])
connect(t, ctx, dhts[1], dhts[3])
connect(t, ctx, dhts[2], dhts[3])
2014-11-24 14:58:51 -05:00
// fmt.Println("0 is", peers[0])
// fmt.Println("1 is", peers[1])
// fmt.Println("2 is", peers[2])
// fmt.Println("3 is", peers[3])
2016-09-30 11:08:16 -07:00
ctxT, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2])
2014-11-24 14:58:51 -05:00
if err != nil {
t.Fatal(err)
}
// shouldFind := []peer.ID{peers[1], peers[3]}
var found []*pstore.PeerInfo
2014-11-24 14:58:51 -05:00
for nextp := range pchan {
found = append(found, nextp)
}
// fmt.Printf("querying 0 (%s) FindPeersConnectedToPeer 2 (%s)\n", peers[0], peers[2])
// fmt.Println("should find 1, 3", shouldFind)
// fmt.Println("found", found)
// testPeerListsMatch(t, shouldFind, found)
2019-02-01 17:46:46 +11:00
logger.Warning("TestFindPeersConnectedToPeer is not quite correct")
2014-11-24 14:58:51 -05:00
if len(found) == 0 {
t.Fatal("didn't find any peers.")
}
}
func testPeerListsMatch(t *testing.T, p1, p2 []peer.ID) {
2014-11-24 14:58:51 -05:00
if len(p1) != len(p2) {
t.Fatal("did not find as many peers as should have", p1, p2)
}
ids1 := make([]string, len(p1))
ids2 := make([]string, len(p2))
for i, p := range p1 {
ids1[i] = string(p)
2014-11-24 14:58:51 -05:00
}
for i, p := range p2 {
ids2[i] = string(p)
2014-11-24 14:58:51 -05:00
}
sort.Sort(sort.StringSlice(ids1))
sort.Sort(sort.StringSlice(ids2))
for i := range ids1 {
if ids1[i] != ids2[i] {
t.Fatal("Didnt find expected peer", ids1[i], ids2)
}
}
}
func TestConnectCollision(t *testing.T) {
2015-01-05 04:48:37 -08:00
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
if travisci.IsRunning() {
t.Skip("Skipping on Travis-CI.")
}
2014-10-19 06:29:18 -07:00
runTimes := 10
2014-10-19 02:05:29 -07:00
for rtime := 0; rtime < runTimes; rtime++ {
2019-02-01 17:46:46 +11:00
logger.Info("Running Time: ", rtime)
2018-02-16 18:52:29 -08:00
ctx, cancel := context.WithCancel(context.Background())
2014-12-16 14:35:52 -08:00
dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)
addrA := dhtA.peerstore.Addrs(dhtA.self)[0]
addrB := dhtB.peerstore.Addrs(dhtB.self)[0]
2014-10-19 02:05:29 -07:00
peerA := dhtA.self
peerB := dhtB.self
2014-10-19 02:05:29 -07:00
errs := make(chan error)
2014-10-19 02:05:29 -07:00
go func() {
dhtA.peerstore.AddAddr(peerB, addrB, pstore.TempAddrTTL)
pi := pstore.PeerInfo{ID: peerB}
err := dhtA.host.Connect(ctx, pi)
errs <- err
2014-10-19 02:05:29 -07:00
}()
go func() {
dhtB.peerstore.AddAddr(peerA, addrA, pstore.TempAddrTTL)
pi := pstore.PeerInfo{ID: peerA}
err := dhtB.host.Connect(ctx, pi)
errs <- err
2014-10-19 02:05:29 -07:00
}()
timeout := time.After(5 * time.Second)
2014-10-19 02:05:29 -07:00
select {
case e := <-errs:
if e != nil {
t.Fatal(e)
}
2014-10-19 02:05:29 -07:00
case <-timeout:
t.Fatal("Timeout received!")
}
select {
case e := <-errs:
if e != nil {
t.Fatal(e)
}
2014-10-19 02:05:29 -07:00
case <-timeout:
t.Fatal("Timeout received!")
}
2014-10-25 07:12:01 -07:00
dhtA.Close()
dhtB.Close()
2015-01-01 12:45:39 -08:00
dhtA.host.Close()
dhtB.host.Close()
2018-02-16 18:52:29 -08:00
cancel()
2014-10-14 17:46:11 -07:00
}
}
func TestBadProtoMessages(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := setupDHT(ctx, t, false)
nilrec := new(pb.Message)
if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil {
t.Fatal("should have errored on nil record")
}
}
func TestClientModeConnect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := setupDHT(ctx, t, false)
b := setupDHT(ctx, t, true)
connectNoSync(t, ctx, a, b)
c := testCaseCids[0]
p := peer.ID("TestPeer")
a.providers.AddProvider(ctx, c, p)
time.Sleep(time.Millisecond * 5) // just in case...
provs, err := b.FindProviders(ctx, c)
if err != nil {
t.Fatal(err)
}
if len(provs) == 0 {
t.Fatal("Expected to get a provider back")
}
if provs[0].ID != p {
t.Fatal("expected it to be our test peer")
}
2018-06-14 20:27:56 -07:00
if a.routingTable.Find(b.self) != "" {
t.Fatal("DHT clients should not be added to routing tables")
}
if b.routingTable.Find(a.self) == "" {
t.Fatal("DHT server should have been added to the dht client's routing table")
}
}
func TestClientModeFindPeer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
2018-06-14 20:27:56 -07:00
defer cancel()
a := setupDHT(ctx, t, false)
b := setupDHT(ctx, t, true)
c := setupDHT(ctx, t, true)
connectNoSync(t, ctx, b, a)
connectNoSync(t, ctx, c, a)
2018-06-14 20:27:56 -07:00
// Can't use `connect` because b and c are only clients.
wait(t, ctx, b, a)
wait(t, ctx, c, a)
2018-06-14 20:27:56 -07:00
pi, err := c.FindPeer(ctx, b.self)
if err != nil {
t.Fatal(err)
}
if len(pi.Addrs) == 0 {
t.Fatal("should have found addresses for node b")
}
err = c.host.Connect(ctx, pi)
if err != nil {
t.Fatal(err)
}
}
func minInt(a, b int) int {
if a < b {
return a
} else {
return b
}
}
func TestFindPeerQueryMinimal(t *testing.T) {
testFindPeerQuery(t, 2, 22, 11)
}
func TestFindPeerQuery(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode")
}
if curFileLimit() < 1024 {
t.Skip("insufficient file descriptors available")
}
testFindPeerQuery(t, 20, 80, 16)
}
func testFindPeerQuery(t *testing.T,
bootstrappers, // Number of nodes connected to the querying node
leafs, // Number of nodes that might be connected to from the bootstrappers
bootstrapperLeafConns int, // Number of connections each bootstrapper has to the leaf nodes
) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_, allpeers, dhts := setupDHTS(ctx, 1+bootstrappers+leafs, t)
defer func() {
for _, d := range dhts {
d.Close()
d.host.Close()
}
}()
mrand := rand.New(rand.NewSource(42))
guy := dhts[0]
others := dhts[1:]
for i := 0; i < bootstrappers; i++ {
for j := 0; j < bootstrapperLeafConns; j++ {
v := mrand.Intn(leafs)
connect(t, ctx, others[i], others[bootstrappers+v])
}
}
for i := 0; i < bootstrappers; i++ {
connect(t, ctx, guy, others[i])
}
var reachableIds []peer.ID
for i, d := range dhts {
lp := len(d.host.Network().Peers())
//t.Log(i, lp)
if i != 0 && lp > 0 {
reachableIds = append(reachableIds, allpeers[i])
}
}
t.Logf("%d reachable ids", len(reachableIds))
val := "foobar"
rtval := kb.ConvertKey(val)
rtablePeers := guy.routingTable.NearestPeers(rtval, AlphaValue)
assert.Len(t, rtablePeers, minInt(bootstrappers, AlphaValue))
assert.Len(t, guy.host.Network().Peers(), bootstrappers)
out, err := guy.GetClosestPeers(ctx, val)
require.NoError(t, err)
var outpeers []peer.ID
for p := range out {
outpeers = append(outpeers, p)
}
sort.Sort(peer.IDSlice(outpeers))
2018-06-15 14:13:07 -07:00
exp := kb.SortClosestPeers(reachableIds, rtval)[:minInt(KValue, len(reachableIds))]
t.Logf("got %d peers", len(outpeers))
got := kb.SortClosestPeers(outpeers, rtval)
assert.EqualValues(t, exp, got)
}
func TestFindClosestPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nDHTs := 30
_, _, dhts := setupDHTS(ctx, nDHTs, t)
defer func() {
for i := 0; i < nDHTs; i++ {
dhts[i].Close()
defer dhts[i].host.Close()
}
}()
t.Logf("connecting %d dhts in a ring", nDHTs)
for i := 0; i < nDHTs; i++ {
connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
}
peers, err := dhts[1].GetClosestPeers(ctx, "foo")
if err != nil {
t.Fatal(err)
}
var out []peer.ID
for p := range peers {
out = append(out, p)
}
if len(out) != KValue {
t.Fatalf("got wrong number of peers (got %d, expected %d)", len(out), KValue)
}
}
2018-06-01 13:45:34 -07:00
func TestGetSetPluggedProtocol(t *testing.T) {
t.Run("PutValue/GetValue - same protocol", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
2018-06-01 13:45:34 -07:00
os := []opts.Option{
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
}
2018-06-01 13:45:34 -07:00
2018-02-16 18:59:15 -08:00
dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
if err != nil {
t.Fatal(err)
}
2018-06-01 13:45:34 -07:00
2018-02-16 18:59:15 -08:00
dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
if err != nil {
t.Fatal(err)
}
2018-06-01 13:45:34 -07:00
connect(t, ctx, dhtA, dhtB)
2018-06-01 13:45:34 -07:00
ctxT, cancel := context.WithTimeout(ctx, time.Second)
2018-06-04 10:25:44 -07:00
defer cancel()
if err := dhtA.PutValue(ctxT, "/v/cat", []byte("meow")); err != nil {
t.Fatal(err)
}
2018-06-01 13:45:34 -07:00
value, err := dhtB.GetValue(ctxT, "/v/cat")
if err != nil {
t.Fatal(err)
}
2018-06-01 13:45:34 -07:00
if string(value) != "meow" {
t.Fatalf("Expected 'meow' got '%s'", string(value))
}
})
2018-06-04 10:25:44 -07:00
t.Run("DHT routing table for peer A won't contain B if A and B don't use same protocol", func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
2018-02-16 18:59:15 -08:00
dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []opts.Option{
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
2018-06-04 10:25:44 -07:00
}...)
if err != nil {
t.Fatal(err)
}
2018-02-16 18:59:15 -08:00
dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []opts.Option{
opts.Protocols("/lsr/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
2018-06-04 10:25:44 -07:00
}...)
if err != nil {
t.Fatal(err)
}
2018-06-04 10:25:44 -07:00
connectNoSync(t, ctx, dhtA, dhtB)
2018-06-04 10:25:44 -07:00
// We don't expect connection notifications for A to reach B (or vice-versa), given
// that they've been configured with different protocols - but we'll give them a
// chance, anyhow.
time.Sleep(time.Second * 2)
err = dhtA.PutValue(ctx, "/v/cat", []byte("meow"))
if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
2018-07-18 13:02:45 +02:00
t.Fatalf("put should not have been able to find any peers in routing table, err:'%v'", err)
}
2018-06-04 10:25:44 -07:00
_, err = dhtB.GetValue(ctx, "/v/cat")
if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
2018-07-18 13:02:45 +02:00
t.Fatalf("get should not have been able to find any peers in routing table, err:'%v'", err)
2018-06-04 10:25:44 -07:00
}
})
2018-06-01 13:45:34 -07:00
}