2014-07-23 04:48:30 -07:00
|
|
|
package dht
|
|
|
|
|
|
|
|
import (
|
2014-07-30 17:46:56 -07:00
|
|
|
"math/rand"
|
|
|
|
"time"
|
2014-08-05 09:38:26 -07:00
|
|
|
"encoding/json"
|
2014-07-30 17:46:56 -07:00
|
|
|
|
2014-08-03 17:35:12 -07:00
|
|
|
proto "code.google.com/p/goprotobuf/proto"
|
|
|
|
|
2014-08-05 09:38:26 -07:00
|
|
|
ma "github.com/jbenet/go-multiaddr"
|
|
|
|
|
2014-07-29 17:55:19 -07:00
|
|
|
peer "github.com/jbenet/go-ipfs/peer"
|
|
|
|
swarm "github.com/jbenet/go-ipfs/swarm"
|
|
|
|
u "github.com/jbenet/go-ipfs/util"
|
2014-07-23 04:48:30 -07:00
|
|
|
)
|
|
|
|
|
2014-08-03 21:46:01 -07:00
|
|
|
// Pool size is the number of nodes used for group find/set RPC calls
|
|
|
|
var PoolSize = 6
|
|
|
|
|
2014-07-29 19:33:51 -07:00
|
|
|
// TODO: determine a way of creating and managing message IDs
|
|
|
|
func GenerateMessageID() uint64 {
|
2014-08-05 18:32:22 -07:00
|
|
|
//return (uint64(rand.Uint32()) << 32) & uint64(rand.Uint32())
|
|
|
|
return uint64(rand.Uint32())
|
2014-07-29 19:33:51 -07:00
|
|
|
}
|
|
|
|
|
2014-07-23 04:48:30 -07:00
|
|
|
// This file implements the Routing interface for the IpfsDHT struct.
|
|
|
|
|
|
|
|
// Basic Put/Get
|
|
|
|
|
|
|
|
// PutValue adds value corresponding to given Key.
|
2014-07-29 17:55:19 -07:00
|
|
|
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
|
2014-07-28 22:14:27 -07:00
|
|
|
var p *peer.Peer
|
2014-08-03 13:37:09 -07:00
|
|
|
p = s.routes.NearestPeer(convertKey(key))
|
2014-08-03 17:35:12 -07:00
|
|
|
if p == nil {
|
|
|
|
panic("Table returned nil peer!")
|
|
|
|
}
|
2014-07-28 22:14:27 -07:00
|
|
|
|
2014-08-03 17:35:12 -07:00
|
|
|
pmes := pDHTMessage{
|
|
|
|
Type: DHTMessage_PUT_VALUE,
|
|
|
|
Key: string(key),
|
|
|
|
Value: value,
|
|
|
|
Id: GenerateMessageID(),
|
|
|
|
}
|
2014-07-28 22:14:27 -07:00
|
|
|
|
2014-08-03 17:35:12 -07:00
|
|
|
mes := swarm.NewMessage(p, pmes.ToProtobuf())
|
2014-07-28 22:14:27 -07:00
|
|
|
s.network.Chan.Outgoing <- mes
|
|
|
|
return nil
|
2014-07-23 04:48:30 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// GetValue searches for the value corresponding to given Key.
|
|
|
|
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
2014-07-28 22:14:27 -07:00
|
|
|
var p *peer.Peer
|
2014-08-03 13:37:09 -07:00
|
|
|
p = s.routes.NearestPeer(convertKey(key))
|
2014-08-03 17:35:12 -07:00
|
|
|
if p == nil {
|
|
|
|
panic("Table returned nil peer!")
|
|
|
|
}
|
2014-07-28 22:14:27 -07:00
|
|
|
|
2014-08-03 17:35:12 -07:00
|
|
|
pmes := pDHTMessage{
|
|
|
|
Type: DHTMessage_GET_VALUE,
|
|
|
|
Key: string(key),
|
|
|
|
Id: GenerateMessageID(),
|
|
|
|
}
|
|
|
|
response_chan := s.ListenFor(pmes.Id)
|
2014-07-28 22:14:27 -07:00
|
|
|
|
2014-08-03 17:35:12 -07:00
|
|
|
mes := swarm.NewMessage(p, pmes.ToProtobuf())
|
|
|
|
s.network.Chan.Outgoing <- mes
|
2014-07-28 22:14:27 -07:00
|
|
|
|
2014-07-29 14:50:33 -07:00
|
|
|
// Wait for either the response or a timeout
|
2014-07-28 22:14:27 -07:00
|
|
|
timeup := time.After(timeout)
|
|
|
|
select {
|
2014-07-29 17:55:19 -07:00
|
|
|
case <-timeup:
|
2014-08-03 21:46:01 -07:00
|
|
|
s.Unlisten(pmes.Id)
|
2014-07-29 19:33:51 -07:00
|
|
|
return nil, u.ErrTimeout
|
2014-07-29 17:55:19 -07:00
|
|
|
case resp := <-response_chan:
|
2014-08-03 17:35:12 -07:00
|
|
|
pmes_out := new(DHTMessage)
|
|
|
|
err := proto.Unmarshal(resp.Data, pmes_out)
|
|
|
|
if err != nil {
|
|
|
|
return nil,err
|
|
|
|
}
|
|
|
|
return pmes_out.GetValue(), nil
|
2014-07-28 22:14:27 -07:00
|
|
|
}
|
2014-07-23 04:48:30 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// Value provider layer of indirection.
|
|
|
|
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
|
|
|
|
|
|
|
|
// Announce that this node can provide value for given key
|
2014-07-29 17:55:19 -07:00
|
|
|
func (s *IpfsDHT) Provide(key u.Key) error {
|
2014-08-03 21:46:01 -07:00
|
|
|
peers := s.routes.NearestPeers(convertKey(key), PoolSize)
|
|
|
|
if len(peers) == 0 {
|
|
|
|
//return an error
|
|
|
|
}
|
|
|
|
|
|
|
|
pmes := pDHTMessage{
|
|
|
|
Type: DHTMessage_ADD_PROVIDER,
|
|
|
|
Key: string(key),
|
|
|
|
}
|
|
|
|
pbmes := pmes.ToProtobuf()
|
|
|
|
|
|
|
|
for _,p := range peers {
|
|
|
|
mes := swarm.NewMessage(p, pbmes)
|
|
|
|
s.network.Chan.Outgoing <-mes
|
|
|
|
}
|
|
|
|
return nil
|
2014-07-23 04:48:30 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// FindProviders searches for peers who can provide the value for given key.
|
2014-08-03 21:46:01 -07:00
|
|
|
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
|
|
|
|
p := s.routes.NearestPeer(convertKey(key))
|
|
|
|
|
|
|
|
pmes := pDHTMessage{
|
|
|
|
Type: DHTMessage_GET_PROVIDERS,
|
|
|
|
Key: string(key),
|
|
|
|
Id: GenerateMessageID(),
|
|
|
|
}
|
|
|
|
|
|
|
|
mes := swarm.NewMessage(p, pmes.ToProtobuf())
|
|
|
|
|
|
|
|
listen_chan := s.ListenFor(pmes.Id)
|
2014-08-05 18:32:22 -07:00
|
|
|
u.DOut("Find providers for: '%s'", key)
|
2014-08-03 21:46:01 -07:00
|
|
|
s.network.Chan.Outgoing <-mes
|
|
|
|
after := time.After(timeout)
|
|
|
|
select {
|
|
|
|
case <-after:
|
|
|
|
s.Unlisten(pmes.Id)
|
|
|
|
return nil, u.ErrTimeout
|
|
|
|
case resp := <-listen_chan:
|
2014-08-05 18:32:22 -07:00
|
|
|
u.DOut("FindProviders: got response.")
|
2014-08-03 21:46:01 -07:00
|
|
|
pmes_out := new(DHTMessage)
|
|
|
|
err := proto.Unmarshal(resp.Data, pmes_out)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2014-08-05 18:32:22 -07:00
|
|
|
var addrs map[u.Key]string
|
|
|
|
err = json.Unmarshal(pmes_out.GetValue(), &addrs)
|
2014-08-05 09:38:26 -07:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2014-08-05 18:32:22 -07:00
|
|
|
var prov_arr []*peer.Peer
|
|
|
|
for pid,addr := range addrs {
|
|
|
|
p := s.network.Find(pid)
|
2014-08-05 09:38:26 -07:00
|
|
|
if p == nil {
|
|
|
|
maddr,err := ma.NewMultiaddr(addr)
|
|
|
|
if err != nil {
|
|
|
|
u.PErr("error connecting to new peer: %s", err)
|
|
|
|
continue
|
|
|
|
}
|
2014-08-05 18:32:22 -07:00
|
|
|
p, err = s.Connect(maddr)
|
2014-08-05 09:38:26 -07:00
|
|
|
if err != nil {
|
|
|
|
u.PErr("error connecting to new peer: %s", err)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
2014-08-05 18:32:22 -07:00
|
|
|
s.addProviderEntry(key, p)
|
|
|
|
prov_arr = append(prov_arr, p)
|
2014-08-05 09:38:26 -07:00
|
|
|
}
|
|
|
|
|
2014-08-05 18:32:22 -07:00
|
|
|
return prov_arr, nil
|
|
|
|
|
2014-08-03 21:46:01 -07:00
|
|
|
}
|
2014-07-23 04:48:30 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// Find specific Peer
|
|
|
|
|
|
|
|
// FindPeer searches for a peer with given ID.
|
|
|
|
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
|
2014-08-03 21:46:01 -07:00
|
|
|
p := s.routes.NearestPeer(convertPeerID(id))
|
|
|
|
|
|
|
|
pmes := pDHTMessage{
|
|
|
|
Type: DHTMessage_FIND_NODE,
|
|
|
|
Key: string(id),
|
|
|
|
Id: GenerateMessageID(),
|
|
|
|
}
|
|
|
|
|
|
|
|
mes := swarm.NewMessage(p, pmes.ToProtobuf())
|
|
|
|
|
|
|
|
listen_chan := s.ListenFor(pmes.Id)
|
|
|
|
s.network.Chan.Outgoing <-mes
|
|
|
|
after := time.After(timeout)
|
|
|
|
select {
|
|
|
|
case <-after:
|
|
|
|
s.Unlisten(pmes.Id)
|
|
|
|
return nil, u.ErrTimeout
|
|
|
|
case resp := <-listen_chan:
|
|
|
|
pmes_out := new(DHTMessage)
|
|
|
|
err := proto.Unmarshal(resp.Data, pmes_out)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2014-08-05 20:31:48 -07:00
|
|
|
addr := string(pmes_out.GetValue())
|
|
|
|
maddr, err := ma.NewMultiaddr(addr)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return s.Connect(maddr)
|
2014-08-03 21:46:01 -07:00
|
|
|
}
|
2014-07-23 04:48:30 -07:00
|
|
|
}
|