go-libp2p-kad-dht/handlers.go

237 lines
6.6 KiB
Go
Raw Normal View History

2014-09-16 02:07:59 -07:00
package dht
import (
"errors"
"fmt"
"time"
msg "github.com/jbenet/go-ipfs/net/message"
peer "github.com/jbenet/go-ipfs/peer"
kb "github.com/jbenet/go-ipfs/routing/kbucket"
u "github.com/jbenet/go-ipfs/util"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
)
// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(*peer.Peer, *Message) (*Message, error)
func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
switch t {
case Message_GET_VALUE:
return dht.handleGetValue
2014-09-16 02:16:57 -07:00
case Message_PUT_VALUE:
return dht.handlePutValue
2014-09-16 02:07:59 -07:00
case Message_FIND_NODE:
return dht.handleFindPeer
2014-09-16 02:16:57 -07:00
case Message_ADD_PROVIDER:
return dht.handleAddProvider
case Message_GET_PROVIDERS:
return dht.handleGetProviders
2014-09-16 02:07:59 -07:00
case Message_PING:
return dht.handlePing
2014-09-16 02:16:57 -07:00
case Message_DIAGNOSTIC:
return dht.handleDiagnostic
2014-09-16 02:07:59 -07:00
default:
return nil
}
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) {
2014-09-19 08:07:56 -07:00
u.DOut("[%s] handleGetValue for key: %s\n", dht.self.ID.Pretty(), pmes.GetKey())
2014-09-16 02:07:59 -07:00
// setup response
2014-09-16 07:17:55 -07:00
resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
2014-09-16 02:07:59 -07:00
// first, is the key even a key?
key := pmes.GetKey()
if key == "" {
return nil, errors.New("handleGetValue but no key was provided")
}
// let's first check if we have the value locally.
2014-09-19 08:07:56 -07:00
u.DOut("[%s] handleGetValue looking into ds\n", dht.self.ID.Pretty())
2014-09-16 02:07:59 -07:00
dskey := ds.NewKey(pmes.GetKey())
iVal, err := dht.datastore.Get(dskey)
2014-09-19 08:07:56 -07:00
u.DOut("[%s] handleGetValue looking into ds GOT %v\n", dht.self.ID.Pretty(), iVal)
2014-09-16 02:07:59 -07:00
// if we got an unexpected error, bail.
2014-09-19 08:07:56 -07:00
if err != nil && err != ds.ErrNotFound {
2014-09-16 02:07:59 -07:00
return nil, err
}
2014-09-18 19:30:04 -07:00
// Note: changed the behavior here to return _as much_ info as possible
// (potentially all of {value, closer peers, provider})
// if we have the value, send it back
2014-09-16 02:07:59 -07:00
if err == nil {
2014-09-19 08:07:56 -07:00
u.DOut("[%s] handleGetValue success!\n", dht.self.ID.Pretty())
2014-09-16 02:07:59 -07:00
byts, ok := iVal.([]byte)
if !ok {
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
}
resp.Value = byts
}
// if we know any providers for the requested value, return those.
provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
if len(provs) > 0 {
u.DOut("handleGetValue returning %d provider[s]\n", len(provs))
resp.ProviderPeers = peersToPBPeers(provs)
}
// Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeerToQuery(pmes)
2014-09-18 19:30:04 -07:00
if closer != nil {
u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty())
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
2014-09-16 02:07:59 -07:00
}
return resp, nil
}
// Store a value in this peer local storage
2014-09-16 07:17:55 -07:00
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error) {
2014-09-16 02:07:59 -07:00
dht.dslock.Lock()
defer dht.dslock.Unlock()
dskey := ds.NewKey(pmes.GetKey())
err := dht.datastore.Put(dskey, pmes.GetValue())
2014-09-19 08:07:56 -07:00
u.DOut("[%s] handlePutValue %v %v\n", dht.self.ID.Pretty(), dskey, pmes.GetValue())
return pmes, err
2014-09-16 02:07:59 -07:00
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) {
u.DOut("[%s] Responding to ping from [%s]!\n", dht.self.ID.Pretty(), p.ID.Pretty())
2014-09-16 07:17:55 -07:00
return newMessage(pmes.GetType(), "", int(pmes.GetClusterLevel())), nil
2014-09-16 02:07:59 -07:00
}
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) {
2014-09-16 07:17:55 -07:00
resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel())
2014-09-16 02:07:59 -07:00
var closest *peer.Peer
// if looking for self... special case where we send it on CloserPeers.
if peer.ID(pmes.GetKey()).Equal(dht.self.ID) {
closest = dht.self
} else {
closest = dht.betterPeerToQuery(pmes)
}
if closest == nil {
u.PErr("handleFindPeer: could not find anything.\n")
return resp, nil
}
if len(closest.Addresses) == 0 {
u.PErr("handleFindPeer: no addresses for connected peer...\n")
return resp, nil
}
u.DOut("handleFindPeer: sending back '%s'\n", closest.ID.Pretty())
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closest})
return resp, nil
}
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, error) {
2014-09-16 07:17:55 -07:00
resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
2014-09-16 02:07:59 -07:00
// check if we have this value, to add ourselves as provider.
has, err := dht.datastore.Has(ds.NewKey(pmes.GetKey()))
if err != nil && err != ds.ErrNotFound {
u.PErr("unexpected datastore error: %v\n", err)
has = false
}
// setup providers
providers := dht.providers.GetProviders(u.Key(pmes.GetKey()))
if has {
providers = append(providers, dht.self)
}
// if we've got providers, send thos those.
if providers != nil && len(providers) > 0 {
resp.ProviderPeers = peersToPBPeers(providers)
}
// Also send closer peers.
closer := dht.betterPeerToQuery(pmes)
if closer != nil {
resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer})
}
return resp, nil
}
type providerInfo struct {
Creation time.Time
Value *peer.Peer
}
2014-09-16 07:17:55 -07:00
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, error) {
2014-09-16 02:07:59 -07:00
key := u.Key(pmes.GetKey())
2014-09-16 07:17:55 -07:00
2014-09-16 02:07:59 -07:00
u.DOut("[%s] Adding [%s] as a provider for '%s'\n",
dht.self.ID.Pretty(), p.ID.Pretty(), peer.ID(key).Pretty())
2014-09-16 07:17:55 -07:00
2014-09-16 02:07:59 -07:00
dht.providers.AddProvider(key, p)
2014-09-19 18:11:05 -07:00
return pmes, nil // send back same msg as confirmation.
2014-09-16 02:07:59 -07:00
}
// Halt stops all communications from this peer and shut down
// TODO -- remove this in favor of context
func (dht *IpfsDHT) Halt() {
dht.providers.Halt()
}
// NOTE: not yet finished, low priority
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *Message) (*Message, error) {
seq := dht.routingTables[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10)
for _, ps := range seq {
2014-09-17 07:19:40 -07:00
_, err := msg.FromObject(ps, pmes)
2014-09-16 02:07:59 -07:00
if err != nil {
u.PErr("handleDiagnostics error creating message: %v\n", err)
continue
}
// dht.sender.SendRequest(context.TODO(), mes)
}
return nil, errors.New("not yet ported back")
// buf := new(bytes.Buffer)
// di := dht.getDiagInfo()
// buf.Write(di.Marshal())
//
// // NOTE: this shouldnt be a hardcoded value
// after := time.After(time.Second * 20)
// count := len(seq)
// for count > 0 {
// select {
// case <-after:
// //Timeout, return what we have
// goto out
// case reqResp := <-listenChan:
// pmesOut := new(Message)
// err := proto.Unmarshal(reqResp.Data, pmesOut)
// if err != nil {
// // It broke? eh, whatever, keep going
// continue
// }
// buf.Write(reqResp.Data)
// count--
// }
// }
//
// out:
// resp := Message{
// Type: Message_DIAGNOSTIC,
// ID: pmes.GetId(),
// Value: buf.Bytes(),
// Response: true,
// }
//
// mes := swarm.NewMessage(p, resp.ToProtobuf())
// dht.netChan.Outgoing <- mes
}