mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-25 06:42:13 +00:00
refactor(dht/pb) move proto to pb package
This commit is contained in:
parent
2aeefaa102
commit
1024504a6f
11
Makefile
11
Makefile
@ -1,11 +0,0 @@
|
|||||||
|
|
||||||
PB = $(wildcard *.proto)
|
|
||||||
GO = $(PB:.proto=.pb.go)
|
|
||||||
|
|
||||||
all: $(GO)
|
|
||||||
|
|
||||||
%.pb.go: %.proto
|
|
||||||
protoc --gogo_out=. --proto_path=../../../../:/usr/local/opt/protobuf/include:. $<
|
|
||||||
|
|
||||||
clean:
|
|
||||||
rm *.pb.go
|
|
41
dht.go
41
dht.go
@ -11,6 +11,7 @@ import (
|
|||||||
inet "github.com/jbenet/go-ipfs/net"
|
inet "github.com/jbenet/go-ipfs/net"
|
||||||
msg "github.com/jbenet/go-ipfs/net/message"
|
msg "github.com/jbenet/go-ipfs/net/message"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||||
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
|
||||||
@ -128,7 +129,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
|
|||||||
}
|
}
|
||||||
|
|
||||||
// deserialize msg
|
// deserialize msg
|
||||||
pmes := new(Message)
|
pmes := new(pb.Message)
|
||||||
err := proto.Unmarshal(mData, pmes)
|
err := proto.Unmarshal(mData, pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error unmarshaling data")
|
log.Error("Error unmarshaling data")
|
||||||
@ -140,7 +141,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
|
|||||||
|
|
||||||
// Print out diagnostic
|
// Print out diagnostic
|
||||||
log.Debugf("%s got message type: '%s' from %s",
|
log.Debugf("%s got message type: '%s' from %s",
|
||||||
dht.self, Message_MessageType_name[int32(pmes.GetType())], mPeer)
|
dht.self, pb.Message_MessageType_name[int32(pmes.GetType())], mPeer)
|
||||||
|
|
||||||
// get handler for this msg type.
|
// get handler for this msg type.
|
||||||
handler := dht.handlerForMsgType(pmes.GetType())
|
handler := dht.handlerForMsgType(pmes.GetType())
|
||||||
@ -174,7 +175,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
|
|||||||
|
|
||||||
// sendRequest sends out a request using dht.sender, but also makes sure to
|
// sendRequest sends out a request using dht.sender, but also makes sure to
|
||||||
// measure the RTT for latency measurements.
|
// measure the RTT for latency measurements.
|
||||||
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) {
|
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||||
|
|
||||||
mes, err := msg.FromObject(p, pmes)
|
mes, err := msg.FromObject(p, pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -185,7 +186,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
|
|||||||
|
|
||||||
// Print out diagnostic
|
// Print out diagnostic
|
||||||
log.Debugf("Sent message type: '%s' to %s",
|
log.Debugf("Sent message type: '%s' to %s",
|
||||||
Message_MessageType_name[int32(pmes.GetType())], p)
|
pb.Message_MessageType_name[int32(pmes.GetType())], p)
|
||||||
|
|
||||||
rmes, err := dht.sender.SendRequest(ctx, mes)
|
rmes, err := dht.sender.SendRequest(ctx, mes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -198,7 +199,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
|
|||||||
rtt := time.Since(start)
|
rtt := time.Since(start)
|
||||||
rmes.Peer().SetLatency(rtt)
|
rmes.Peer().SetLatency(rtt)
|
||||||
|
|
||||||
rpmes := new(Message)
|
rpmes := new(pb.Message)
|
||||||
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
|
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -210,7 +211,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message)
|
|||||||
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
|
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
|
||||||
key string, value []byte) error {
|
key string, value []byte) error {
|
||||||
|
|
||||||
pmes := newMessage(Message_PUT_VALUE, string(key), 0)
|
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
|
||||||
pmes.Value = value
|
pmes.Value = value
|
||||||
rpmes, err := dht.sendRequest(ctx, p, pmes)
|
rpmes, err := dht.sendRequest(ctx, p, pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -225,10 +226,10 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
|
|||||||
|
|
||||||
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {
|
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {
|
||||||
|
|
||||||
pmes := newMessage(Message_ADD_PROVIDER, string(key), 0)
|
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
|
||||||
|
|
||||||
// add self as the provider
|
// add self as the provider
|
||||||
pmes.ProviderPeers = peersToPBPeers([]peer.Peer{dht.self})
|
pmes.ProviderPeers = pb.PeersToPBPeers([]peer.Peer{dht.self})
|
||||||
|
|
||||||
rpmes, err := dht.sendRequest(ctx, p, pmes)
|
rpmes, err := dht.sendRequest(ctx, p, pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -290,9 +291,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
|
|||||||
|
|
||||||
// getValueSingle simply performs the get value RPC with the given parameters
|
// getValueSingle simply performs the get value RPC with the given parameters
|
||||||
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
|
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
|
||||||
key u.Key, level int) (*Message, error) {
|
key u.Key, level int) (*pb.Message, error) {
|
||||||
|
|
||||||
pmes := newMessage(Message_GET_VALUE, string(key), level)
|
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), level)
|
||||||
return dht.sendRequest(ctx, p, pmes)
|
return dht.sendRequest(ctx, p, pmes)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -301,7 +302,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
|
|||||||
// one to get the value from? Or just connect to one at a time until we get a
|
// one to get the value from? Or just connect to one at a time until we get a
|
||||||
// successful connection and request the value from it?
|
// successful connection and request the value from it?
|
||||||
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
|
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
|
||||||
peerlist []*Message_Peer, level int) ([]byte, error) {
|
peerlist []*pb.Message_Peer, level int) ([]byte, error) {
|
||||||
|
|
||||||
for _, pinfo := range peerlist {
|
for _, pinfo := range peerlist {
|
||||||
p, err := dht.ensureConnectedToPeer(pinfo)
|
p, err := dht.ensureConnectedToPeer(pinfo)
|
||||||
@ -379,17 +380,17 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*Message, error) {
|
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*pb.Message, error) {
|
||||||
pmes := newMessage(Message_FIND_NODE, string(id), level)
|
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), level)
|
||||||
return dht.sendRequest(ctx, p, pmes)
|
return dht.sendRequest(ctx, p, pmes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) {
|
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*pb.Message, error) {
|
||||||
pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
|
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), level)
|
||||||
return dht.sendRequest(ctx, p, pmes)
|
return dht.sendRequest(ctx, p, pmes)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
|
func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer {
|
||||||
var provArr []peer.Peer
|
var provArr []peer.Peer
|
||||||
for _, prov := range peers {
|
for _, prov := range peers {
|
||||||
p, err := dht.peerFromInfo(prov)
|
p, err := dht.peerFromInfo(prov)
|
||||||
@ -413,7 +414,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// nearestPeersToQuery returns the routing tables closest peers.
|
// nearestPeersToQuery returns the routing tables closest peers.
|
||||||
func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
|
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
|
||||||
level := pmes.GetClusterLevel()
|
level := pmes.GetClusterLevel()
|
||||||
cluster := dht.routingTables[level]
|
cluster := dht.routingTables[level]
|
||||||
|
|
||||||
@ -423,7 +424,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
|
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
|
||||||
func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []peer.Peer {
|
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
|
||||||
closer := dht.nearestPeersToQuery(pmes, count)
|
closer := dht.nearestPeersToQuery(pmes, count)
|
||||||
|
|
||||||
// no node? nil
|
// no node? nil
|
||||||
@ -462,7 +463,7 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
|
|||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
|
func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {
|
||||||
|
|
||||||
id := peer.ID(pbp.GetId())
|
id := peer.ID(pbp.GetId())
|
||||||
|
|
||||||
@ -485,7 +486,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
|
|||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error) {
|
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *pb.Message_Peer) (peer.Peer, error) {
|
||||||
p, err := dht.peerFromInfo(pbp)
|
p, err := dht.peerFromInfo(pbp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
27
ext_test.go
27
ext_test.go
@ -12,6 +12,7 @@ import (
|
|||||||
msg "github.com/jbenet/go-ipfs/net/message"
|
msg "github.com/jbenet/go-ipfs/net/message"
|
||||||
mux "github.com/jbenet/go-ipfs/net/mux"
|
mux "github.com/jbenet/go-ipfs/net/mux"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
|
||||||
"time"
|
"time"
|
||||||
@ -127,13 +128,13 @@ func TestGetFailures(t *testing.T) {
|
|||||||
// u.POut("NotFound Test\n")
|
// u.POut("NotFound Test\n")
|
||||||
// Reply with failures to every message
|
// Reply with failures to every message
|
||||||
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
||||||
pmes := new(Message)
|
pmes := new(pb.Message)
|
||||||
err := proto.Unmarshal(mes.Data(), pmes)
|
err := proto.Unmarshal(mes.Data(), pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resp := &Message{
|
resp := &pb.Message{
|
||||||
Type: pmes.Type,
|
Type: pmes.Type,
|
||||||
}
|
}
|
||||||
m, err := msg.FromObject(mes.Peer(), resp)
|
m, err := msg.FromObject(mes.Peer(), resp)
|
||||||
@ -153,9 +154,9 @@ func TestGetFailures(t *testing.T) {
|
|||||||
|
|
||||||
fs.handlers = nil
|
fs.handlers = nil
|
||||||
// Now we test this DHT's handleGetValue failure
|
// Now we test this DHT's handleGetValue failure
|
||||||
typ := Message_GET_VALUE
|
typ := pb.Message_GET_VALUE
|
||||||
str := "hello"
|
str := "hello"
|
||||||
req := Message{
|
req := pb.Message{
|
||||||
Type: &typ,
|
Type: &typ,
|
||||||
Key: &str,
|
Key: &str,
|
||||||
Value: []byte{0},
|
Value: []byte{0},
|
||||||
@ -169,7 +170,7 @@ func TestGetFailures(t *testing.T) {
|
|||||||
|
|
||||||
mes = d.HandleMessage(ctx, mes)
|
mes = d.HandleMessage(ctx, mes)
|
||||||
|
|
||||||
pmes := new(Message)
|
pmes := new(pb.Message)
|
||||||
err = proto.Unmarshal(mes.Data(), pmes)
|
err = proto.Unmarshal(mes.Data(), pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -215,21 +216,21 @@ func TestNotFound(t *testing.T) {
|
|||||||
|
|
||||||
// Reply with random peers to every message
|
// Reply with random peers to every message
|
||||||
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
||||||
pmes := new(Message)
|
pmes := new(pb.Message)
|
||||||
err := proto.Unmarshal(mes.Data(), pmes)
|
err := proto.Unmarshal(mes.Data(), pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch pmes.GetType() {
|
switch pmes.GetType() {
|
||||||
case Message_GET_VALUE:
|
case pb.Message_GET_VALUE:
|
||||||
resp := &Message{Type: pmes.Type}
|
resp := &pb.Message{Type: pmes.Type}
|
||||||
|
|
||||||
peers := []peer.Peer{}
|
peers := []peer.Peer{}
|
||||||
for i := 0; i < 7; i++ {
|
for i := 0; i < 7; i++ {
|
||||||
peers = append(peers, _randPeer())
|
peers = append(peers, _randPeer())
|
||||||
}
|
}
|
||||||
resp.CloserPeers = peersToPBPeers(peers)
|
resp.CloserPeers = pb.PeersToPBPeers(peers)
|
||||||
mes, err := msg.FromObject(mes.Peer(), resp)
|
mes, err := msg.FromObject(mes.Peer(), resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
@ -282,17 +283,17 @@ func TestLessThanKResponses(t *testing.T) {
|
|||||||
|
|
||||||
// Reply with random peers to every message
|
// Reply with random peers to every message
|
||||||
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage {
|
||||||
pmes := new(Message)
|
pmes := new(pb.Message)
|
||||||
err := proto.Unmarshal(mes.Data(), pmes)
|
err := proto.Unmarshal(mes.Data(), pmes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
switch pmes.GetType() {
|
switch pmes.GetType() {
|
||||||
case Message_GET_VALUE:
|
case pb.Message_GET_VALUE:
|
||||||
resp := &Message{
|
resp := &pb.Message{
|
||||||
Type: pmes.Type,
|
Type: pmes.Type,
|
||||||
CloserPeers: peersToPBPeers([]peer.Peer{other}),
|
CloserPeers: pb.PeersToPBPeers([]peer.Peer{other}),
|
||||||
}
|
}
|
||||||
|
|
||||||
mes, err := msg.FromObject(mes.Peer(), resp)
|
mes, err := msg.FromObject(mes.Peer(), resp)
|
||||||
|
45
handlers.go
45
handlers.go
@ -6,6 +6,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
|
||||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||||
@ -14,32 +15,32 @@ import (
|
|||||||
var CloserPeerCount = 4
|
var CloserPeerCount = 4
|
||||||
|
|
||||||
// dhthandler specifies the signature of functions that handle DHT messages.
|
// dhthandler specifies the signature of functions that handle DHT messages.
|
||||||
type dhtHandler func(peer.Peer, *Message) (*Message, error)
|
type dhtHandler func(peer.Peer, *pb.Message) (*pb.Message, error)
|
||||||
|
|
||||||
func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
|
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
|
||||||
switch t {
|
switch t {
|
||||||
case Message_GET_VALUE:
|
case pb.Message_GET_VALUE:
|
||||||
return dht.handleGetValue
|
return dht.handleGetValue
|
||||||
case Message_PUT_VALUE:
|
case pb.Message_PUT_VALUE:
|
||||||
return dht.handlePutValue
|
return dht.handlePutValue
|
||||||
case Message_FIND_NODE:
|
case pb.Message_FIND_NODE:
|
||||||
return dht.handleFindPeer
|
return dht.handleFindPeer
|
||||||
case Message_ADD_PROVIDER:
|
case pb.Message_ADD_PROVIDER:
|
||||||
return dht.handleAddProvider
|
return dht.handleAddProvider
|
||||||
case Message_GET_PROVIDERS:
|
case pb.Message_GET_PROVIDERS:
|
||||||
return dht.handleGetProviders
|
return dht.handleGetProviders
|
||||||
case Message_PING:
|
case pb.Message_PING:
|
||||||
return dht.handlePing
|
return dht.handlePing
|
||||||
default:
|
default:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error) {
|
func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||||
log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
|
log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
|
||||||
|
|
||||||
// setup response
|
// setup response
|
||||||
resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
|
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
|
||||||
|
|
||||||
// first, is the key even a key?
|
// first, is the key even a key?
|
||||||
key := pmes.GetKey()
|
key := pmes.GetKey()
|
||||||
@ -77,7 +78,7 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error)
|
|||||||
provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
|
provs := dht.providers.GetProviders(u.Key(pmes.GetKey()))
|
||||||
if len(provs) > 0 {
|
if len(provs) > 0 {
|
||||||
log.Debugf("handleGetValue returning %d provider[s]", len(provs))
|
log.Debugf("handleGetValue returning %d provider[s]", len(provs))
|
||||||
resp.ProviderPeers = peersToPBPeers(provs)
|
resp.ProviderPeers = pb.PeersToPBPeers(provs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find closest peer on given cluster to desired key and reply with that info
|
// Find closest peer on given cluster to desired key and reply with that info
|
||||||
@ -89,14 +90,14 @@ func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error)
|
|||||||
log.Critical("no addresses on peer being sent!")
|
log.Critical("no addresses on peer being sent!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
resp.CloserPeers = peersToPBPeers(closer)
|
resp.CloserPeers = pb.PeersToPBPeers(closer)
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store a value in this peer local storage
|
// Store a value in this peer local storage
|
||||||
func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *Message) (*Message, error) {
|
func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||||
dht.dslock.Lock()
|
dht.dslock.Lock()
|
||||||
defer dht.dslock.Unlock()
|
defer dht.dslock.Unlock()
|
||||||
dskey := u.Key(pmes.GetKey()).DsKey()
|
dskey := u.Key(pmes.GetKey()).DsKey()
|
||||||
@ -105,13 +106,13 @@ func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *Message) (*Message, error)
|
|||||||
return pmes, err
|
return pmes, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *Message) (*Message, error) {
|
func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||||
log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
|
log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
|
||||||
return pmes, nil
|
return pmes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *Message) (*Message, error) {
|
func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||||
resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel())
|
resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
|
||||||
var closest []peer.Peer
|
var closest []peer.Peer
|
||||||
|
|
||||||
// if looking for self... special case where we send it on CloserPeers.
|
// if looking for self... special case where we send it on CloserPeers.
|
||||||
@ -136,12 +137,12 @@ func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *Message) (*Message, error)
|
|||||||
for _, p := range withAddresses {
|
for _, p := range withAddresses {
|
||||||
log.Debugf("handleFindPeer: sending back '%s'", p)
|
log.Debugf("handleFindPeer: sending back '%s'", p)
|
||||||
}
|
}
|
||||||
resp.CloserPeers = peersToPBPeers(withAddresses)
|
resp.CloserPeers = pb.PeersToPBPeers(withAddresses)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *Message) (*Message, error) {
|
func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||||
resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
|
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
|
||||||
|
|
||||||
// check if we have this value, to add ourselves as provider.
|
// check if we have this value, to add ourselves as provider.
|
||||||
log.Debugf("handling GetProviders: '%s'", pmes.GetKey())
|
log.Debugf("handling GetProviders: '%s'", pmes.GetKey())
|
||||||
@ -160,13 +161,13 @@ func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *Message) (*Message, er
|
|||||||
|
|
||||||
// if we've got providers, send thos those.
|
// if we've got providers, send thos those.
|
||||||
if providers != nil && len(providers) > 0 {
|
if providers != nil && len(providers) > 0 {
|
||||||
resp.ProviderPeers = peersToPBPeers(providers)
|
resp.ProviderPeers = pb.PeersToPBPeers(providers)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Also send closer peers.
|
// Also send closer peers.
|
||||||
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
|
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
|
||||||
if closer != nil {
|
if closer != nil {
|
||||||
resp.CloserPeers = peersToPBPeers(closer)
|
resp.CloserPeers = pb.PeersToPBPeers(closer)
|
||||||
}
|
}
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
@ -177,7 +178,7 @@ type providerInfo struct {
|
|||||||
Value peer.Peer
|
Value peer.Peer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *Message) (*Message, error) {
|
func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||||
key := u.Key(pmes.GetKey())
|
key := u.Key(pmes.GetKey())
|
||||||
|
|
||||||
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key))
|
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key))
|
||||||
|
11
pb/Makefile
Normal file
11
pb/Makefile
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
PB = $(wildcard *.proto)
|
||||||
|
GO = $(PB:.proto=.pb.go)
|
||||||
|
|
||||||
|
all: $(GO)
|
||||||
|
|
||||||
|
%.pb.go: %.proto
|
||||||
|
protoc --gogo_out=. --proto_path=../../../../../../:/usr/local/opt/protobuf/include:. $<
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm -f *.pb.go
|
||||||
|
rm -f *.go
|
@ -1,19 +1,19 @@
|
|||||||
// Code generated by protoc-gen-go.
|
// Code generated by protoc-gen-gogo.
|
||||||
// source: messages.proto
|
// source: dht.proto
|
||||||
// DO NOT EDIT!
|
// DO NOT EDIT!
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Package dht is a generated protocol buffer package.
|
Package dht_pb is a generated protocol buffer package.
|
||||||
|
|
||||||
It is generated from these files:
|
It is generated from these files:
|
||||||
messages.proto
|
dht.proto
|
||||||
|
|
||||||
It has these top-level messages:
|
It has these top-level messages:
|
||||||
Message
|
Message
|
||||||
*/
|
*/
|
||||||
package dht
|
package dht_pb
|
||||||
|
|
||||||
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
import proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/proto"
|
||||||
import math "math"
|
import math "math"
|
||||||
|
|
||||||
// Reference imports to suppress errors if they are not otherwise used.
|
// Reference imports to suppress errors if they are not otherwise used.
|
||||||
@ -67,7 +67,7 @@ func (x *Message_MessageType) UnmarshalJSON(data []byte) error {
|
|||||||
|
|
||||||
type Message struct {
|
type Message struct {
|
||||||
// defines what type of message it is.
|
// defines what type of message it is.
|
||||||
Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=dht.Message_MessageType" json:"type,omitempty"`
|
Type *Message_MessageType `protobuf:"varint,1,opt,name=type,enum=dht.pb.Message_MessageType" json:"type,omitempty"`
|
||||||
// defines what coral cluster level this query/response belongs to.
|
// defines what coral cluster level this query/response belongs to.
|
||||||
ClusterLevelRaw *int32 `protobuf:"varint,10,opt,name=clusterLevelRaw" json:"clusterLevelRaw,omitempty"`
|
ClusterLevelRaw *int32 `protobuf:"varint,10,opt,name=clusterLevelRaw" json:"clusterLevelRaw,omitempty"`
|
||||||
// Used to specify the key associated with this message.
|
// Used to specify the key associated with this message.
|
||||||
@ -156,5 +156,5 @@ func (m *Message_Peer) GetAddr() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
proto.RegisterEnum("dht.Message_MessageType", Message_MessageType_name, Message_MessageType_value)
|
proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value)
|
||||||
}
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package dht;
|
package dht.pb;
|
||||||
|
|
||||||
//run `protoc --go_out=. *.proto` to generate
|
//run `protoc --go_out=. *.proto` to generate
|
||||||
|
|
@ -1,4 +1,4 @@
|
|||||||
package dht
|
package dht_pb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
@ -8,7 +8,7 @@ import (
|
|||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newMessage(typ Message_MessageType, key string, level int) *Message {
|
func NewMessage(typ Message_MessageType, key string, level int) *Message {
|
||||||
m := &Message{
|
m := &Message{
|
||||||
Type: &typ,
|
Type: &typ,
|
||||||
Key: &key,
|
Key: &key,
|
||||||
@ -31,7 +31,7 @@ func peerToPBPeer(p peer.Peer) *Message_Peer {
|
|||||||
return pbp
|
return pbp
|
||||||
}
|
}
|
||||||
|
|
||||||
func peersToPBPeers(peers []peer.Peer) []*Message_Peer {
|
func PeersToPBPeers(peers []peer.Peer) []*Message_Peer {
|
||||||
pbpeers := make([]*Message_Peer, len(peers))
|
pbpeers := make([]*Message_Peer, len(peers))
|
||||||
for i, p := range peers {
|
for i, p := range peers {
|
||||||
pbpeers[i] = peerToPBPeer(p)
|
pbpeers[i] = peerToPBPeer(p)
|
||||||
@ -53,8 +53,7 @@ func (m *Message_Peer) Address() (ma.Multiaddr, error) {
|
|||||||
func (m *Message) GetClusterLevel() int {
|
func (m *Message) GetClusterLevel() int {
|
||||||
level := m.GetClusterLevelRaw() - 1
|
level := m.GetClusterLevelRaw() - 1
|
||||||
if level < 0 {
|
if level < 0 {
|
||||||
log.Debug("GetClusterLevel: no routing level specified, assuming 0")
|
return 0
|
||||||
level = 0
|
|
||||||
}
|
}
|
||||||
return int(level)
|
return int(level)
|
||||||
}
|
}
|
@ -6,6 +6,7 @@ import (
|
|||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||||
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
@ -152,10 +153,10 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
|
|||||||
return peerOut
|
return peerOut
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
|
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
for _, pbp := range peers {
|
for _, pbp := range peers {
|
||||||
go func(mp *Message_Peer) {
|
go func(mp *pb.Message_Peer) {
|
||||||
defer func() { done <- struct{}{} }()
|
defer func() { done <- struct{}{} }()
|
||||||
// construct new peer
|
// construct new peer
|
||||||
p, err := dht.ensureConnectedToPeer(mp)
|
p, err := dht.ensureConnectedToPeer(mp)
|
||||||
@ -258,7 +259,7 @@ func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error {
|
|||||||
// Thoughts: maybe this should accept an ID and do a peer lookup?
|
// Thoughts: maybe this should accept an ID and do a peer lookup?
|
||||||
log.Infof("ping %s start", p)
|
log.Infof("ping %s start", p)
|
||||||
|
|
||||||
pmes := newMessage(Message_PING, "", 0)
|
pmes := pb.NewMessage(pb.Message_PING, "", 0)
|
||||||
_, err := dht.sendRequest(ctx, p, pmes)
|
_, err := dht.sendRequest(ctx, p, pmes)
|
||||||
log.Infof("ping %s end (err = %s)", p, err)
|
log.Infof("ping %s end (err = %s)", p, err)
|
||||||
return err
|
return err
|
||||||
|
Loading…
x
Reference in New Issue
Block a user