go-libp2p-kad-dht/ext_test.go
Steven Allen ef319676a9 chore: rename bootstrap to refresh
As pointed out by raul, bootstrapping and refreshing are not the same thing.
Bootstrapping is the initial setup (i.e., connect to some initial nodes to get
started). Refreshing is the process of refreshing the routing table.
2019-11-05 23:33:53 +00:00

360 lines
8.3 KiB
Go

package dht
import (
"context"
"math/rand"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
record "github.com/libp2p/go-libp2p-record"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
)
func TestGetFailures(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
d.Update(ctx, hosts[1].ID())
// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
time.Sleep(400 * time.Millisecond)
s.Close()
})
// This one should time out
ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
if _, err := d.GetValue(ctx1, "test"); err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
err = merr[0]
}
if err != context.DeadlineExceeded {
t.Fatal("Got different error than we expected", err)
}
} else {
t.Fatal("Did not get expected error!")
}
t.Log("Timeout test passed.")
// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
defer s.Close()
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
resp := &pb.Message{
Type: pmes.Type,
}
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
})
// This one should fail with NotFound.
// long context timeout to ensure we dont end too early.
// the dht should be exhausting its query and returning not found.
// (was 3 seconds before which should be _plenty_ of time, but maybe
// travis machines really have a hard time...)
ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
_, err = d.GetValue(ctx2, "test")
if err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
err = merr[0]
}
if err != routing.ErrNotFound {
t.Fatalf("Expected ErrNotFound, got: %s", err)
}
} else {
t.Fatal("expected error, got none.")
}
t.Log("ErrNotFound check passed!")
// Now we test this DHT's handleGetValue failure
{
typ := pb.Message_GET_VALUE
str := "hello"
rec := record.MakePutRecord(str, []byte("blah"))
req := pb.Message{
Type: typ,
Key: []byte(str),
Record: rec,
}
s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), d.protocols[0])
if err != nil {
t.Fatal(err)
}
defer s.Close()
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
if err := pbw.WriteMsg(&req); err != nil {
t.Fatal(err)
}
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
t.Fatal(err)
}
if pmes.GetRecord() != nil {
t.Fatal("shouldnt have value")
}
if pmes.GetProviderPeers() != nil {
t.Fatal("shouldnt have provider peers")
}
}
}
func TestNotFound(t *testing.T) {
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 16)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
for _, p := range hosts {
d.Update(ctx, p.ID())
}
// Reply with random peers to every message
for _, host := range hosts {
host := host // shadow loop var
host.SetStreamHandler(d.protocols[0], func(s network.Stream) {
defer s.Close()
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
switch pmes.GetType() {
case pb.Message_GET_VALUE:
resp := &pb.Message{Type: pmes.Type}
ps := []peer.AddrInfo{}
for i := 0; i < 7; i++ {
p := hosts[rand.Intn(len(hosts))].ID()
pi := host.Peerstore().PeerInfo(p)
ps = append(ps, pi)
}
resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
default:
panic("Shouldnt recieve this.")
}
})
}
// long timeout to ensure timing is not at play.
ctx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
v, err := d.GetValue(ctx, "hello")
logger.Debugf("get value got %v", v)
if err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
err = merr[0]
}
switch err {
case routing.ErrNotFound:
//Success!
return
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}
// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
// t.Skip("skipping test to debug another")
// t.Skip("skipping test because it makes a lot of output")
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 6)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
for i := 1; i < 5; i++ {
d.Update(ctx, hosts[i].ID())
}
// Reply with random peers to every message
for _, host := range hosts {
host := host // shadow loop var
host.SetStreamHandler(d.protocols[0], func(s network.Stream) {
defer s.Close()
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
switch pmes.GetType() {
case pb.Message_GET_VALUE:
pi := host.Peerstore().PeerInfo(hosts[1].ID())
resp := &pb.Message{
Type: pmes.Type,
CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
}
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
default:
panic("Shouldnt recieve this.")
}
})
}
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
if _, err := d.GetValue(ctx, "hello"); err != nil {
switch err {
case routing.ErrNotFound:
//Success!
return
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}
// Test multiple queries against a node that closes its stream after every query.
func TestMultipleQueries(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
d.Update(ctx, hosts[1].ID())
// It would be nice to be able to just get a value and succeed but then
// we'd need to deal with selectors and validators...
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
defer s.Close()
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
switch pmes.GetType() {
case pb.Message_GET_VALUE:
pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
resp := &pb.Message{
Type: pmes.Type,
CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
}
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
default:
panic("Shouldnt recieve this.")
}
})
// long timeout to ensure timing is not at play.
ctx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
for i := 0; i < 10; i++ {
if _, err := d.GetValue(ctx, "hello"); err != nil {
switch err {
case routing.ErrNotFound:
//Success!
continue
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}
}