go-libp2p-kad-dht/ext_test.go

403 lines
9.3 KiB
Go
Raw Permalink Normal View History

2014-08-10 21:40:17 -07:00
package dht
import (
2016-09-30 10:24:03 -07:00
"context"
2014-12-16 14:35:52 -08:00
"math/rand"
2014-08-10 21:40:17 -07:00
"testing"
"time"
2014-08-10 21:40:17 -07:00
2019-05-26 23:33:15 +01:00
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
2019-05-26 23:33:15 +01:00
2016-09-02 20:21:23 +01:00
ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
2018-08-07 12:43:52 +02:00
record "github.com/libp2p/go-libp2p-record"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
2014-08-10 21:40:17 -07:00
)
func TestHang(t *testing.T) {
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
// Hang on every request.
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
defer s.Reset()
<-ctx.Done()
})
d.Update(ctx, hosts[1].ID())
ctx1, cancel1 := context.WithTimeout(ctx, 1*time.Second)
defer cancel1()
peers, err := d.GetClosestPeers(ctx1, testCaseCids[0].KeyString())
if err != nil {
t.Fatal(err)
}
time.Sleep(100 * time.Millisecond)
ctx2, cancel2 := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel2()
_ = d.Provide(ctx2, testCaseCids[0], true)
if ctx2.Err() != context.DeadlineExceeded {
t.Errorf("expected to fail with deadline exceeded, got: %s", ctx2.Err())
}
select {
case <-peers:
t.Error("GetClosestPeers should not have returned yet")
default:
}
}
func TestGetFailures(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
2014-09-19 18:11:05 -07:00
2014-12-16 14:35:52 -08:00
ctx := context.Background()
2014-12-17 08:02:59 -08:00
mn, err := mocknet.FullMeshConnected(ctx, 2)
2014-12-16 14:35:52 -08:00
if err != nil {
t.Fatal(err)
}
2015-01-01 12:45:39 -08:00
hosts := mn.Hosts()
2014-11-20 18:45:05 -08:00
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
d.Update(ctx, hosts[1].ID())
2015-01-01 12:45:39 -08:00
// Reply with failures to every message
2019-05-26 23:33:15 +01:00
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
2018-08-03 22:53:50 +02:00
time.Sleep(400 * time.Millisecond)
s.Close()
2015-01-01 12:45:39 -08:00
})
// This one should time out
2016-09-30 11:08:16 -07:00
ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
if _, err := d.GetValue(ctx1, "test"); err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
err = merr[0]
}
2018-08-03 22:53:50 +02:00
if err != context.DeadlineExceeded {
2014-09-18 19:30:04 -07:00
t.Fatal("Got different error than we expected", err)
}
} else {
t.Fatal("Did not get expected error!")
2014-08-10 21:40:17 -07:00
}
t.Log("Timeout test passed.")
2014-12-16 14:35:52 -08:00
// Reply with failures to every message
2019-05-26 23:33:15 +01:00
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
2014-12-16 14:35:52 -08:00
defer s.Close()
2019-05-26 23:33:15 +01:00
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
2014-12-16 14:35:52 -08:00
pbw := ggio.NewDelimitedWriter(s)
pmes := new(pb.Message)
2014-12-16 14:35:52 -08:00
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
resp := &pb.Message{
2014-09-17 10:30:38 -07:00
Type: pmes.Type,
}
2014-12-16 14:35:52 -08:00
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
})
// This one should fail with NotFound.
// long context timeout to ensure we dont end too early.
// the dht should be exhausting its query and returning not found.
// (was 3 seconds before which should be _plenty_ of time, but maybe
// travis machines really have a hard time...)
2016-09-30 11:08:16 -07:00
ctx2, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
_, err = d.GetValue(ctx2, "test")
if err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
err = merr[0]
}
if err != routing.ErrNotFound {
t.Fatalf("Expected ErrNotFound, got: %s", err)
}
} else {
t.Fatal("expected error, got none.")
}
t.Log("ErrNotFound check passed!")
// Now we test this DHT's handleGetValue failure
2014-12-16 14:35:52 -08:00
{
typ := pb.Message_GET_VALUE
str := "hello"
2018-02-01 15:09:57 -05:00
rec := record.MakePutRecord(str, []byte("blah"))
2014-12-16 14:35:52 -08:00
req := pb.Message{
Type: typ,
Key: []byte(str),
2014-12-16 14:35:52 -08:00
Record: rec,
}
2014-09-17 10:30:38 -07:00
2018-06-01 13:45:34 -07:00
s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), d.protocols[0])
2014-12-16 14:35:52 -08:00
if err != nil {
t.Fatal(err)
}
defer s.Close()
2014-09-17 10:30:38 -07:00
2019-05-26 23:33:15 +01:00
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
2014-12-16 14:35:52 -08:00
pbw := ggio.NewDelimitedWriter(s)
2014-12-16 14:35:52 -08:00
if err := pbw.WriteMsg(&req); err != nil {
t.Fatal(err)
}
2014-09-18 19:30:04 -07:00
2014-12-16 14:35:52 -08:00
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
t.Fatal(err)
}
if pmes.GetRecord() != nil {
t.Fatal("shouldnt have value")
}
if pmes.GetProviderPeers() != nil {
t.Fatal("shouldnt have provider peers")
}
}
2014-08-10 21:40:17 -07:00
}
2014-08-19 19:14:52 -07:00
func TestNotFound(t *testing.T) {
2015-01-05 04:48:37 -08:00
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
2014-09-19 18:11:05 -07:00
2014-12-16 14:35:52 -08:00
ctx := context.Background()
2014-12-17 08:02:59 -08:00
mn, err := mocknet.FullMeshConnected(ctx, 16)
2014-12-16 14:35:52 -08:00
if err != nil {
t.Fatal(err)
}
2015-01-01 12:45:39 -08:00
hosts := mn.Hosts()
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
2014-08-19 19:14:52 -07:00
for _, p := range hosts {
d.Update(ctx, p.ID())
2014-08-19 19:14:52 -07:00
}
// Reply with random peers to every message
2015-01-01 12:45:39 -08:00
for _, host := range hosts {
host := host // shadow loop var
2019-05-26 23:33:15 +01:00
host.SetStreamHandler(d.protocols[0], func(s network.Stream) {
2014-12-16 14:35:52 -08:00
defer s.Close()
2014-08-19 19:14:52 -07:00
2019-05-26 23:33:15 +01:00
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
2014-12-16 14:35:52 -08:00
pbw := ggio.NewDelimitedWriter(s)
2014-08-19 19:14:52 -07:00
2014-12-16 14:35:52 -08:00
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
2014-08-19 19:14:52 -07:00
}
2014-12-16 14:35:52 -08:00
switch pmes.GetType() {
case pb.Message_GET_VALUE:
resp := &pb.Message{Type: pmes.Type}
2019-05-26 23:33:15 +01:00
ps := []peer.AddrInfo{}
2014-12-16 14:35:52 -08:00
for i := 0; i < 7; i++ {
p := hosts[rand.Intn(len(hosts))].ID()
2015-01-01 12:45:39 -08:00
pi := host.Peerstore().PeerInfo(p)
ps = append(ps, pi)
2014-12-16 14:35:52 -08:00
}
2015-01-01 12:45:39 -08:00
resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
2014-12-16 14:35:52 -08:00
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
default:
panic("Shouldnt recieve this.")
}
})
}
2014-08-19 19:14:52 -07:00
// long timeout to ensure timing is not at play.
ctx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
v, err := d.GetValue(ctx, "hello")
2019-02-01 17:46:46 +11:00
logger.Debugf("get value got %v", v)
2014-08-19 19:14:52 -07:00
if err != nil {
if merr, ok := err.(u.MultiErr); ok && len(merr) > 0 {
err = merr[0]
}
2014-08-19 19:14:52 -07:00
switch err {
case routing.ErrNotFound:
2014-08-19 19:14:52 -07:00
//Success!
return
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}
// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
2015-01-05 04:48:37 -08:00
// t.Skip("skipping test to debug another")
2014-09-19 18:11:05 -07:00
// t.Skip("skipping test because it makes a lot of output")
2014-12-16 14:35:52 -08:00
ctx := context.Background()
2014-12-17 08:02:59 -08:00
mn, err := mocknet.FullMeshConnected(ctx, 6)
2014-12-16 14:35:52 -08:00
if err != nil {
t.Fatal(err)
}
2015-01-01 12:45:39 -08:00
hosts := mn.Hosts()
2014-11-20 18:45:05 -08:00
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
2014-12-16 14:35:52 -08:00
for i := 1; i < 5; i++ {
d.Update(ctx, hosts[i].ID())
}
// Reply with random peers to every message
2015-01-01 12:45:39 -08:00
for _, host := range hosts {
host := host // shadow loop var
2019-05-26 23:33:15 +01:00
host.SetStreamHandler(d.protocols[0], func(s network.Stream) {
2014-12-16 14:35:52 -08:00
defer s.Close()
2019-05-26 23:33:15 +01:00
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
2014-12-16 14:35:52 -08:00
pbw := ggio.NewDelimitedWriter(s)
2014-12-16 14:35:52 -08:00
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
2014-12-16 14:35:52 -08:00
switch pmes.GetType() {
case pb.Message_GET_VALUE:
pi := host.Peerstore().PeerInfo(hosts[1].ID())
2014-12-16 14:35:52 -08:00
resp := &pb.Message{
Type: pmes.Type,
2019-05-26 23:33:15 +01:00
CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
2014-12-16 14:35:52 -08:00
}
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
default:
panic("Shouldnt recieve this.")
2014-09-17 10:30:38 -07:00
}
2014-12-16 14:35:52 -08:00
})
}
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
if _, err := d.GetValue(ctx, "hello"); err != nil {
switch err {
case routing.ErrNotFound:
//Success!
return
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}
// Test multiple queries against a node that closes its stream after every query.
func TestMultipleQueries(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
d.Update(ctx, hosts[1].ID())
// It would be nice to be able to just get a value and succeed but then
// we'd need to deal with selectors and validators...
2019-05-26 23:33:15 +01:00
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
defer s.Close()
2019-05-26 23:33:15 +01:00
pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
pbw := ggio.NewDelimitedWriter(s)
pmes := new(pb.Message)
if err := pbr.ReadMsg(pmes); err != nil {
panic(err)
}
switch pmes.GetType() {
case pb.Message_GET_VALUE:
pi := hosts[1].Peerstore().PeerInfo(hosts[0].ID())
resp := &pb.Message{
Type: pmes.Type,
2019-05-26 23:33:15 +01:00
CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.AddrInfo{pi}),
}
if err := pbw.WriteMsg(resp); err != nil {
panic(err)
}
default:
panic("Shouldnt recieve this.")
}
})
// long timeout to ensure timing is not at play.
ctx, cancel := context.WithTimeout(ctx, time.Second*20)
defer cancel()
for i := 0; i < 10; i++ {
if _, err := d.GetValue(ctx, "hello"); err != nil {
switch err {
case routing.ErrNotFound:
//Success!
continue
case u.ErrTimeout:
t.Fatal("Should not have gotten timeout!")
default:
t.Fatalf("Got unexpected error: %s", err)
}
}
t.Fatal("Expected to recieve an error.")
}
}