Implement 'client only' dht logic to not have to serve dht queries

This commit is contained in:
Jeromy 2016-09-23 23:45:35 -07:00
parent e015b2a2f6
commit 5235d5f0d3
3 changed files with 86 additions and 11 deletions

23
dht.go
View File

@ -61,6 +61,8 @@ type IpfsDHT struct {
ctx context.Context
proc goprocess.Process
clientOnly bool
strmap map[peer.ID]*messageSender
smlk sync.Mutex
}
@ -89,6 +91,27 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
return dht
}
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht := makeDHT(ctx, h, dstore)
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
dht.proc.AddChild(dht.providers.Process())
dht.Validator["pk"] = record.PublicKeyValidator
dht.Selector["pk"] = record.PublicKeySelector
return dht
}
func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
return &IpfsDHT{
datastore: dstore,

View File

@ -37,11 +37,16 @@ func init() {
}
}
func setupDHT(ctx context.Context, t *testing.T) *IpfsDHT {
func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
h := netutil.GenHostSwarm(t, ctx)
dss := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, h, dss)
var d *IpfsDHT
if client {
d = NewDHTClient(ctx, h, dss)
} else {
d = NewDHT(ctx, h, dss)
}
d.Validator["v"] = &record.ValidChecker{
Func: func(key.Key, []byte) error {
@ -61,7 +66,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer
sanityPeersMap := make(map[string]struct{})
for i := 0; i < n; i++ {
dhts[i] = setupDHT(ctx, t)
dhts[i] = setupDHT(ctx, t, false)
peers[i] = dhts[i].self
addrs[i] = dhts[i].peerstore.Addrs(dhts[i].self)[0]
@ -80,8 +85,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer
return addrs, peers, dhts
}
func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
func connectNoSync(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
idB := b.self
addrB := b.peerstore.Addrs(idB)
if len(addrB) == 0 {
@ -93,6 +97,10 @@ func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
if err := a.host.Connect(ctx, pi); err != nil {
t.Fatal(err)
}
}
func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
connectNoSync(t, ctx, a, b)
// loop until connection notification has been received.
// under high load, this may not happen as immediately as we would like.
@ -132,8 +140,8 @@ func TestValueGetSet(t *testing.T) {
ctx := context.Background()
dhtA := setupDHT(ctx, t)
dhtB := setupDHT(ctx, t)
dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)
defer dhtA.Close()
defer dhtB.Close()
@ -780,8 +788,8 @@ func TestConnectCollision(t *testing.T) {
ctx := context.Background()
dhtA := setupDHT(ctx, t)
dhtB := setupDHT(ctx, t)
dhtA := setupDHT(ctx, t, false)
dhtB := setupDHT(ctx, t, false)
addrA := dhtA.peerstore.Addrs(dhtA.self)[0]
addrB := dhtB.peerstore.Addrs(dhtB.self)[0]
@ -832,10 +840,37 @@ func TestBadProtoMessages(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d := setupDHT(ctx, t)
d := setupDHT(ctx, t, false)
nilrec := new(pb.Message)
if _, err := d.handlePutValue(ctx, "testpeer", nilrec); err == nil {
t.Fatal("should have errored on nil record")
}
}
func TestClientModeConnect(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
a := setupDHT(ctx, t, false)
b := setupDHT(ctx, t, true)
connectNoSync(t, ctx, a, b)
k := key.Key("TestHash")
p := peer.ID("TestPeer")
a.providers.AddProvider(ctx, k, p)
provs, err := b.FindProviders(ctx, k)
if err != nil {
t.Fatal(err)
}
if len(provs) == 0 {
t.Fatal("Expected to get a provider back")
}
if provs[0].ID != p {
t.Fatal("expected it to be our test peer")
}
}

View File

@ -3,6 +3,7 @@ package dht
import (
ma "github.com/jbenet/go-multiaddr"
inet "github.com/libp2p/go-libp2p/p2p/net"
mstream "github.com/whyrusleeping/go-multistream"
)
// netNotifiee defines methods to be used with the IpfsDHT
@ -19,7 +20,23 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
return
default:
}
// Note: We *could* just check the peerstore to see if the remote side supports the dht
// protocol, but its not clear that that information will make it into the peerstore
// by the time this notification is sent. So just to be very careful, we brute force this
// and open a new stream
s, err := dht.host.NewStream(dht.Context(), v.RemotePeer(), ProtocolDHT, ProtocolDHTOld)
switch err {
case nil:
s.Close()
// connected fine? full dht node
dht.Update(dht.Context(), v.RemotePeer())
case mstream.ErrNotSupported:
// Client mode only, don't bother adding them to our routing table
default:
// real error? thats odd
log.Errorf("checking dht client type: %#v", err)
}
}
func (nn *netNotifiee) Disconnected(n inet.Network, v inet.Conn) {