go-libp2p-kad-dht/routing.go

472 lines
11 KiB
Go
Raw Normal View History

2014-07-23 04:48:30 -07:00
package dht
import (
"bytes"
"encoding/json"
"errors"
"time"
proto "code.google.com/p/goprotobuf/proto"
ma "github.com/jbenet/go-multiaddr"
2014-07-29 17:55:19 -07:00
peer "github.com/jbenet/go-ipfs/peer"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
2014-07-29 17:55:19 -07:00
swarm "github.com/jbenet/go-ipfs/swarm"
u "github.com/jbenet/go-ipfs/util"
2014-07-23 04:48:30 -07:00
)
// This file implements the Routing interface for the IpfsDHT struct.
// Basic Put/Get
// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (dht *IpfsDHT) PutValue(key u.Key, value []byte) error {
2014-08-09 22:28:46 -07:00
complete := make(chan struct{})
2014-08-16 08:20:29 -07:00
count := 0
2014-08-16 23:03:36 -07:00
for _, route := range dht.routingTables {
2014-08-16 08:20:29 -07:00
peers := route.NearestPeers(kb.ConvertKey(key), KValue)
for _, p := range peers {
if p == nil {
2014-08-16 23:03:36 -07:00
dht.network.Error(kb.ErrLookupFailure)
2014-08-16 08:20:29 -07:00
continue
}
count++
go func(sp *peer.Peer) {
2014-08-16 23:03:36 -07:00
err := dht.putValueToNetwork(sp, string(key), value)
2014-08-16 08:20:29 -07:00
if err != nil {
2014-08-16 23:03:36 -07:00
dht.network.Error(err)
2014-08-16 08:20:29 -07:00
}
2014-08-09 22:28:46 -07:00
complete <- struct{}{}
2014-08-16 08:20:29 -07:00
}(p)
2014-08-09 22:28:46 -07:00
}
}
2014-08-16 08:20:29 -07:00
for i := 0; i < count; i++ {
2014-08-09 22:28:46 -07:00
<-complete
}
return nil
2014-07-23 04:48:30 -07:00
}
// GetValue searches for the value corresponding to given Key.
2014-08-06 21:36:56 -07:00
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
2014-08-16 23:03:36 -07:00
func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
ll := startNewRPC("GET")
2014-08-14 08:32:17 -07:00
defer func() {
ll.EndLog()
ll.Print()
}()
2014-08-12 22:10:44 -07:00
// If we have it local, dont bother doing an RPC!
// NOTE: this might not be what we want to do...
2014-08-16 23:03:36 -07:00
val, err := dht.getLocal(key)
2014-08-14 08:32:17 -07:00
if err == nil {
ll.Success = true
2014-08-26 14:24:51 -07:00
u.DOut("Found local, returning.\n")
2014-08-12 22:10:44 -07:00
return val, nil
}
2014-08-16 23:03:36 -07:00
routeLevel := 0
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
if closest == nil || len(closest) == 0 {
return nil, kb.ErrLookupFailure
}
2014-08-16 23:03:36 -07:00
valChan := make(chan []byte)
npeerChan := make(chan *peer.Peer, 30)
procPeer := make(chan *peer.Peer, 30)
errChan := make(chan error)
after := time.After(timeout)
pset := newPeerSet()
for _, p := range closest {
pset.Add(p)
2014-08-16 23:03:36 -07:00
npeerChan <- p
}
c := counter{}
count := 0
go func() {
defer close(procPeer)
for {
select {
case p, ok := <-npeerChan:
if !ok {
return
}
count++
2014-08-16 08:20:29 -07:00
if count >= KValue {
errChan <- u.ErrNotFound
return
2014-08-12 22:10:44 -07:00
}
c.Increment()
2014-08-16 23:03:36 -07:00
procPeer <- p
default:
if c.Size() <= 0 {
select {
case errChan <- u.ErrNotFound:
default:
}
return
}
}
}
}()
process := func() {
defer c.Decrement()
for p := range procPeer {
if p == nil {
return
}
val, peers, err := dht.getValueOrPeers(p, key, timeout/4, routeLevel)
if err != nil {
u.DErr("%v\n", err.Error())
continue
}
if val != nil {
select {
case valChan <- val:
default:
u.DOut("Wasnt the first to return the value!")
}
return
}
for _, np := range peers {
// TODO: filter out peers that arent closer
if !pset.Contains(np) && pset.Size() < KValue {
pset.Add(np) //This is racey... make a single function to do operation
npeerChan <- np
}
}
c.Decrement()
}
}
2014-08-16 08:20:29 -07:00
for i := 0; i < AlphaValue; i++ {
go process()
}
select {
2014-08-16 23:03:36 -07:00
case val := <-valChan:
return val, nil
2014-08-16 23:03:36 -07:00
case err := <-errChan:
return nil, err
case <-after:
return nil, u.ErrTimeout
}
2014-07-23 04:48:30 -07:00
}
// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
2014-08-16 23:03:36 -07:00
// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(key u.Key) error {
dht.providers.AddProvider(key, dht.self)
2014-08-16 23:03:36 -07:00
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), PoolSize)
if len(peers) == 0 {
return kb.ErrLookupFailure
}
2014-08-16 23:03:36 -07:00
pmes := Message{
Type: PBDHTMessage_ADD_PROVIDER,
Key: string(key),
}
pbmes := pmes.ToProtobuf()
for _, p := range peers {
mes := swarm.NewMessage(p, pbmes)
dht.netChan.Outgoing <- mes
}
return nil
2014-07-23 04:48:30 -07:00
}
func (dht *IpfsDHT) FindProvidersAsync(key u.Key, count int, timeout time.Duration) chan *peer.Peer {
peerOut := make(chan *peer.Peer, count)
go func() {
ps := newPeerSet()
provs := dht.providers.GetProviders(key)
for _, p := range provs {
count--
// NOTE: assuming that this list of peers is unique
ps.Add(p)
peerOut <- p
if count <= 0 {
return
}
}
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
for _, pp := range peers {
go func() {
pmes, err := dht.findProvidersSingle(pp, key, 0, timeout)
if err != nil {
u.PErr("%v\n", err)
return
}
dht.addPeerListAsync(key, pmes.GetPeers(), ps, count, peerOut)
}()
}
}()
return peerOut
}
//TODO: this function could also be done asynchronously
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*PBDHTMessage_PBPeer, ps *peerSet, count int, out chan *peer.Peer) {
for _, pbp := range peers {
2014-08-26 14:24:51 -07:00
if peer.ID(pbp.GetId()).Equal(dht.self.ID) {
continue
}
maddr, err := ma.NewMultiaddr(pbp.GetAddr())
if err != nil {
u.PErr("%v\n", err)
continue
}
p, err := dht.network.GetConnection(peer.ID(pbp.GetId()), maddr)
if err != nil {
u.PErr("%v\n", err)
continue
}
dht.providers.AddProvider(k, p)
if ps.AddIfSmallerThan(p, count) {
out <- p
} else if ps.Size() >= count {
return
}
}
}
2014-07-23 04:48:30 -07:00
// FindProviders searches for peers who can provide the value for given key.
2014-08-16 23:03:36 -07:00
func (dht *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
ll := startNewRPC("FindProviders")
2014-08-14 08:32:17 -07:00
defer func() {
ll.EndLog()
ll.Print()
}()
u.DOut("Find providers for: '%s'\n", key)
2014-08-16 23:03:36 -07:00
p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key))
if p == nil {
return nil, kb.ErrLookupFailure
}
2014-08-16 23:03:36 -07:00
for level := 0; level < len(dht.routingTables); {
pmes, err := dht.findProvidersSingle(p, key, level, timeout)
if err != nil {
return nil, err
}
2014-08-14 08:32:17 -07:00
if pmes.GetSuccess() {
2014-08-26 14:24:51 -07:00
u.DOut("Got providers back from findProviders call!\n")
2014-08-16 23:03:36 -07:00
provs := dht.addPeerList(key, pmes.GetPeers())
2014-08-14 08:32:17 -07:00
ll.Success = true
return provs, nil
2014-08-16 23:03:36 -07:00
}
2014-08-26 14:24:51 -07:00
u.DOut("Didnt get providers, just closer peers.\n")
2014-08-16 23:03:36 -07:00
closer := pmes.GetPeers()
if len(closer) == 0 {
level++
continue
}
if peer.ID(closer[0].GetId()).Equal(dht.self.ID) {
u.DOut("Got myself back as a closer peer.")
return nil, u.ErrNotFound
}
maddr, err := ma.NewMultiaddr(closer[0].GetAddr())
if err != nil {
// ??? Move up route level???
panic("not yet implemented")
}
np, err := dht.network.GetConnection(peer.ID(closer[0].GetId()), maddr)
if err != nil {
u.PErr("[%s] Failed to connect to: %s\n", dht.self.ID.Pretty(), closer[0].GetAddr())
2014-08-16 23:03:36 -07:00
level++
continue
2014-08-14 08:32:17 -07:00
}
2014-08-16 23:03:36 -07:00
p = np
}
2014-08-14 08:32:17 -07:00
return nil, u.ErrNotFound
2014-07-23 04:48:30 -07:00
}
// Find specific Peer
// FindPeer searches for a peer with given ID.
2014-08-16 23:03:36 -07:00
func (dht *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
// Check if were already connected to them
2014-08-16 23:03:36 -07:00
p, _ := dht.Find(id)
if p != nil {
return p, nil
}
2014-08-16 23:03:36 -07:00
routeLevel := 0
p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id))
if p == nil {
return nil, kb.ErrLookupFailure
}
if p.ID.Equal(id) {
return p, nil
}
2014-08-16 23:03:36 -07:00
for routeLevel < len(dht.routingTables) {
pmes, err := dht.findPeerSingle(p, id, timeout, routeLevel)
plist := pmes.GetPeers()
if plist == nil || len(plist) == 0 {
2014-08-16 23:03:36 -07:00
routeLevel++
continue
}
found := plist[0]
addr, err := ma.NewMultiaddr(found.GetAddr())
2014-08-05 20:31:48 -07:00
if err != nil {
return nil, err
2014-08-05 20:31:48 -07:00
}
2014-08-16 23:03:36 -07:00
nxtPeer, err := dht.network.GetConnection(peer.ID(found.GetId()), addr)
2014-08-06 10:02:53 -07:00
if err != nil {
return nil, err
2014-08-06 10:02:53 -07:00
}
if pmes.GetSuccess() {
if !id.Equal(nxtPeer.ID) {
return nil, errors.New("got back invalid peer from 'successful' response")
}
return nxtPeer, nil
2014-08-06 10:02:53 -07:00
}
2014-08-16 23:03:36 -07:00
p = nxtPeer
}
return nil, u.ErrNotFound
2014-07-23 04:48:30 -07:00
}
func (dht *IpfsDHT) findPeerMultiple(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
// Check if were already connected to them
p, _ := dht.Find(id)
if p != nil {
return p, nil
}
routeLevel := 0
peers := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if len(peers) == 0 {
return nil, kb.ErrLookupFailure
}
found := make(chan *peer.Peer)
after := time.After(timeout)
for _, p := range peers {
go func(p *peer.Peer) {
pmes, err := dht.findPeerSingle(p, id, timeout, routeLevel)
if err != nil {
u.DErr("getPeer error: %v\n", err)
return
}
plist := pmes.GetPeers()
if len(plist) == 0 {
routeLevel++
}
for _, fp := range plist {
nxtp, err := dht.peerFromInfo(fp)
if err != nil {
u.DErr("findPeer error: %v\n", err)
continue
}
if nxtp.ID.Equal(dht.self.ID) {
found <- nxtp
return
}
}
}(p)
}
select {
case p := <-found:
return p, nil
case <-after:
return nil, u.ErrTimeout
}
}
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
2014-08-26 14:24:51 -07:00
u.DOut("Enter Ping.\n")
pmes := Message{ID: swarm.GenerateMessageID(), Type: PBDHTMessage_PING}
mes := swarm.NewMessage(p, pmes.ToProtobuf())
before := time.Now()
2014-08-16 23:03:36 -07:00
responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
dht.netChan.Outgoing <- mes
tout := time.After(timeout)
select {
2014-08-16 23:03:36 -07:00
case <-responseChan:
roundtrip := time.Since(before)
p.SetLatency(roundtrip)
u.DOut("Ping took %s.\n", roundtrip.String())
return nil
case <-tout:
// Timed out, think about removing peer from network
u.DOut("[%s] Ping peer [%s] timed out.", dht.self.ID.Pretty(), p.ID.Pretty())
2014-08-16 23:03:36 -07:00
dht.listener.Unlisten(pmes.ID)
return u.ErrTimeout
}
}
2014-08-16 23:03:36 -07:00
func (dht *IpfsDHT) getDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
u.DOut("Begin Diagnostic")
//Send to N closest peers
2014-08-16 23:03:36 -07:00
targets := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
// TODO: Add timeout to this struct so nodes know when to return
2014-08-16 23:03:36 -07:00
pmes := Message{
Type: PBDHTMessage_DIAGNOSTIC,
ID: swarm.GenerateMessageID(),
}
2014-08-16 23:03:36 -07:00
listenChan := dht.listener.Listen(pmes.ID, len(targets), time.Minute*2)
pbmes := pmes.ToProtobuf()
for _, p := range targets {
mes := swarm.NewMessage(p, pbmes)
dht.netChan.Outgoing <- mes
}
var out []*diagInfo
after := time.After(timeout)
for count := len(targets); count > 0; {
select {
case <-after:
u.DOut("Diagnostic request timed out.")
return out, u.ErrTimeout
2014-08-09 22:28:46 -07:00
case resp := <-listenChan:
2014-08-16 23:03:36 -07:00
pmesOut := new(PBDHTMessage)
err := proto.Unmarshal(resp.Data, pmesOut)
if err != nil {
// NOTE: here and elsewhere, need to audit error handling,
// some errors should be continued on from
return out, err
}
2014-08-16 23:03:36 -07:00
dec := json.NewDecoder(bytes.NewBuffer(pmesOut.GetValue()))
for {
di := new(diagInfo)
err := dec.Decode(di)
if err != nil {
break
}
out = append(out, di)
}
}
}
return nil, nil
}