peer.Peer is now an interface

![](http://m.memegen.com/77n7dk.jpg)
This commit is contained in:
Juan Batiz-Benet
2014-10-20 03:26:44 -07:00
parent e98bb76b0f
commit 28e083b902
11 changed files with 123 additions and 129 deletions

View File

@ -17,20 +17,21 @@ func newMessage(typ Message_MessageType, key string, level int) *Message {
return m
}
func peerToPBPeer(p *peer.Peer) *Message_Peer {
func peerToPBPeer(p peer.Peer) *Message_Peer {
pbp := new(Message_Peer)
if len(p.Addresses) == 0 || p.Addresses[0] == nil {
addrs := p.Addresses()
if len(addrs) == 0 || addrs[0] == nil {
pbp.Addr = proto.String("")
} else {
addr := p.Addresses[0].String()
addr := addrs[0].String()
pbp.Addr = &addr
}
pid := string(p.ID)
pid := string(p.ID())
pbp.Id = &pid
return pbp
}
func peersToPBPeers(peers []*peer.Peer) []*Message_Peer {
func peersToPBPeers(peers []peer.Peer) []*Message_Peer {
pbpeers := make([]*Message_Peer, len(peers))
for i, p := range peers {
pbpeers[i] = peerToPBPeer(p)

66
dht.go
View File

@ -39,7 +39,7 @@ type IpfsDHT struct {
sender inet.Sender
// Local peer (yourself)
self *peer.Peer
self peer.Peer
// Other peers
peerstore peer.Peerstore
@ -60,7 +60,7 @@ type IpfsDHT struct {
}
// NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT {
dht := new(IpfsDHT)
dht.network = net
dht.sender = sender
@ -69,12 +69,12 @@ func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Netwo
dht.peerstore = ps
dht.ctx = ctx
dht.providers = NewProviderManager(p.ID)
dht.providers = NewProviderManager(p.ID())
dht.routingTables = make([]*kb.RoutingTable, 3)
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000)
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour)
dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000)
dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour)
dht.birth = time.Now()
if doPinging {
@ -84,7 +84,7 @@ func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Netwo
}
// Connect to a new peer at the given address, ping and add to the routing table
func (dht *IpfsDHT) Connect(ctx context.Context, npeer *peer.Peer) (*peer.Peer, error) {
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, error) {
log.Debug("Connect to new peer: %s", npeer)
// TODO(jbenet,whyrusleeping)
@ -175,7 +175,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N
// sendRequest sends out a request using dht.sender, but also makes sure to
// measure the RTT for latency measurements.
func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) {
mes, err := msg.FromObject(p, pmes)
if err != nil {
@ -208,7 +208,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message
}
// putValueToNetwork stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer,
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
key string, value []byte) error {
pmes := newMessage(Message_PUT_VALUE, string(key), 0)
@ -224,12 +224,12 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer,
return nil
}
func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) error {
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {
pmes := newMessage(Message_ADD_PROVIDER, string(key), 0)
// add self as the provider
pmes.ProviderPeers = peersToPBPeers([]*peer.Peer{dht.self})
pmes.ProviderPeers = peersToPBPeers([]peer.Peer{dht.self})
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
@ -244,8 +244,8 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) e
return nil
}
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
key u.Key, level int) ([]byte, []*peer.Peer, error) {
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
key u.Key, level int) ([]byte, []peer.Peer, error) {
pmes, err := dht.getValueSingle(ctx, p, key, level)
if err != nil {
@ -270,7 +270,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
}
// Perhaps we were given closer peers
var peers []*peer.Peer
var peers []peer.Peer
for _, pb := range pmes.GetCloserPeers() {
pr, err := dht.addPeer(pb)
if err != nil {
@ -289,8 +289,8 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
return nil, nil, u.ErrNotFound
}
func (dht *IpfsDHT) addPeer(pb *Message_Peer) (*peer.Peer, error) {
if peer.ID(pb.GetId()).Equal(dht.self.ID) {
func (dht *IpfsDHT) addPeer(pb *Message_Peer) (peer.Peer, error) {
if peer.ID(pb.GetId()).Equal(dht.self.ID()) {
return nil, errors.New("cannot add self as peer")
}
@ -310,7 +310,7 @@ func (dht *IpfsDHT) addPeer(pb *Message_Peer) (*peer.Peer, error) {
}
// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
key u.Key, level int) (*Message, error) {
pmes := newMessage(Message_GET_VALUE, string(key), level)
@ -369,7 +369,7 @@ func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
// Update signals to all routingTables to Update their last-seen status
// on the given peer.
func (dht *IpfsDHT) Update(p *peer.Peer) {
func (dht *IpfsDHT) Update(p peer.Peer) {
log.Debug("updating peer: %s latency = %f\n", p, p.GetLatency().Seconds())
removedCount := 0
for _, route := range dht.routingTables {
@ -390,7 +390,7 @@ func (dht *IpfsDHT) Update(p *peer.Peer) {
}
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
for _, table := range dht.routingTables {
p := table.Find(id)
if p != nil {
@ -400,7 +400,7 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
return nil, nil
}
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p *peer.Peer, id peer.ID, level int) (*Message, error) {
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*Message, error) {
pmes := newMessage(Message_FIND_NODE, string(id), level)
return dht.sendRequest(ctx, p, pmes)
}
@ -411,14 +411,14 @@ func (dht *IpfsDHT) printTables() {
}
}
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p *peer.Peer, key u.Key, level int) (*Message, error) {
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) {
pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}
// TODO: Could be done async
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer {
var provArr []*peer.Peer
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
var provArr []peer.Peer
for _, prov := range peers {
p, err := dht.peerFromInfo(prov)
if err != nil {
@ -429,7 +429,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer
log.Debug("%s adding provider: %s for %s", dht.self, p, key)
// Dont add outselves to the list
if p.ID.Equal(dht.self.ID) {
if p.ID().Equal(dht.self.ID()) {
continue
}
@ -441,7 +441,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer
}
// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []*peer.Peer {
func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer {
level := pmes.GetClusterLevel()
cluster := dht.routingTables[level]
@ -451,7 +451,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []*peer.Peer {
}
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer {
func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []peer.Peer {
closer := dht.nearestPeersToQuery(pmes, count)
// no node? nil
@ -461,17 +461,17 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer {
// == to self? thats bad
for _, p := range closer {
if p.ID.Equal(dht.self.ID) {
if p.ID().Equal(dht.self.ID()) {
log.Error("Attempted to return self! this shouldnt happen...")
return nil
}
}
var filtered []*peer.Peer
var filtered []peer.Peer
for _, p := range closer {
// must all be closer than self
key := u.Key(pmes.GetKey())
if !kb.Closer(dht.self.ID, p.ID, key) {
if !kb.Closer(dht.self.ID(), p.ID(), key) {
filtered = append(filtered, p)
}
}
@ -480,7 +480,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer {
return filtered
}
func (dht *IpfsDHT) getPeer(id peer.ID) (*peer.Peer, error) {
func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
p, err := dht.peerstore.Get(id)
if err != nil {
err = fmt.Errorf("Failed to get peer from peerstore: %s", err)
@ -490,12 +490,12 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (*peer.Peer, error) {
return p, nil
}
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) {
id := peer.ID(pbp.GetId())
// continue if it's ourselves
if id.Equal(dht.self.ID) {
if id.Equal(dht.self.ID()) {
return nil, errors.New("found self")
}
@ -512,7 +512,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
return p, nil
}
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (*peer.Peer, error) {
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error) {
p, err := dht.peerFromInfo(pbp)
if err != nil {
return nil, err

View File

@ -20,7 +20,7 @@ import (
"time"
)
func setupDHT(ctx context.Context, t *testing.T, p *peer.Peer) *IpfsDHT {
func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT {
peerstore := peer.NewPeerstore()
dhts := netservice.NewService(nil) // nil handler for now, need to patch it
@ -40,7 +40,7 @@ func setupDHT(ctx context.Context, t *testing.T, p *peer.Peer) *IpfsDHT {
return d
}
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*peer.Peer, []*IpfsDHT) {
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.Peer, []*IpfsDHT) {
var addrs []ma.Multiaddr
for i := 0; i < n; i++ {
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i))
@ -50,7 +50,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*pee
addrs = append(addrs, a)
}
var peers []*peer.Peer
var peers []peer.Peer
for i := 0; i < n; i++ {
p := makePeer(addrs[i])
peers = append(peers, p)
@ -64,21 +64,16 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*pee
return addrs, peers, dhts
}
func makePeer(addr ma.Multiaddr) *peer.Peer {
p := new(peer.Peer)
p.AddAddress(addr)
func makePeer(addr ma.Multiaddr) peer.Peer {
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
if err != nil {
panic(err)
}
p.PrivKey = sk
p.PubKey = pk
id, err := peer.IDFromPubKey(pk)
p, err := peer.WithKeyPair(sk, pk)
if err != nil {
panic(err)
}
p.ID = id
p.AddAddress(addr)
return p
}
@ -289,7 +284,7 @@ func TestProvidesAsync(t *testing.T) {
provs := dhts[0].FindProvidersAsync(ctxT, u.Key("hello"), 5)
select {
case p := <-provs:
if !p.ID.Equal(dhts[3].self.ID) {
if !p.ID().Equal(dhts[3].self.ID()) {
t.Fatalf("got a provider, but not the right one. %s", p)
}
case <-ctxT.Done():
@ -379,7 +374,7 @@ func TestFindPeer(t *testing.T) {
}
ctxT, _ := context.WithTimeout(ctx, time.Second)
p, err := dhts[0].FindPeer(ctxT, peers[2].ID)
p, err := dhts[0].FindPeer(ctxT, peers[2].ID())
if err != nil {
t.Fatal(err)
}
@ -388,7 +383,7 @@ func TestFindPeer(t *testing.T) {
t.Fatal("Failed to find peer.")
}
if !p.ID.Equal(peers[2].ID) {
if !p.ID().Equal(peers[2].ID()) {
t.Fatal("Didnt find expected peer.")
}
}

View File

@ -32,12 +32,13 @@ func (di *diagInfo) Marshal() []byte {
func (dht *IpfsDHT) getDiagInfo() *diagInfo {
di := new(diagInfo)
di.CodeVersion = "github.com/jbenet/go-ipfs"
di.ID = dht.self.ID
di.ID = dht.self.ID()
di.LifeSpan = time.Since(dht.birth)
di.Keys = nil // Currently no way to query datastore
for _, p := range dht.routingTables[0].ListPeers() {
di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID})
d := connDiagInfo{p.GetLatency(), p.ID()}
di.Connections = append(di.Connections, d)
}
return di
}

View File

@ -9,7 +9,6 @@ import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
msg "github.com/jbenet/go-ipfs/net/message"
mux "github.com/jbenet/go-ipfs/net/mux"
peer "github.com/jbenet/go-ipfs/peer"
@ -66,17 +65,17 @@ type fauxNet struct {
}
// DialPeer attempts to establish a connection to a given peer
func (f *fauxNet) DialPeer(*peer.Peer) error {
func (f *fauxNet) DialPeer(peer.Peer) error {
return nil
}
// ClosePeer connection to peer
func (f *fauxNet) ClosePeer(*peer.Peer) error {
func (f *fauxNet) ClosePeer(peer.Peer) error {
return nil
}
// IsConnected returns whether a connection to given peer exists.
func (f *fauxNet) IsConnected(*peer.Peer) (bool, error) {
func (f *fauxNet) IsConnected(peer.Peer) (bool, error) {
return true, nil
}
@ -88,7 +87,7 @@ func (f *fauxNet) SendMessage(msg.NetMessage) error {
return nil
}
func (f *fauxNet) GetPeerList() []*peer.Peer {
func (f *fauxNet) GetPeerList() []peer.Peer {
return nil
}
@ -107,11 +106,10 @@ func TestGetFailures(t *testing.T) {
fs := &fauxSender{}
peerstore := peer.NewPeerstore()
local := new(peer.Peer)
local.ID = peer.ID("test_peer")
local := peer.WithIDString("test_peer")
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
other := &peer.Peer{ID: peer.ID("other_peer")}
other := peer.WithIDString("other_peer")
d.Update(other)
// This one should time out
@ -189,11 +187,10 @@ func TestGetFailures(t *testing.T) {
}
// TODO: Maybe put these in some sort of "ipfs_testutil" package
func _randPeer() *peer.Peer {
p := new(peer.Peer)
p.ID = make(peer.ID, 16)
p.Addresses = []ma.Multiaddr{nil}
crand.Read(p.ID)
func _randPeer() peer.Peer {
id := make(peer.ID, 16)
crand.Read(id)
p := peer.WithID(id)
return p
}
@ -204,13 +201,13 @@ func TestNotFound(t *testing.T) {
fn := &fauxNet{}
fs := &fauxSender{}
local := new(peer.Peer)
local.ID = peer.ID("test_peer")
local := peer.WithIDString("test_peer")
peerstore := peer.NewPeerstore()
peerstore.Put(local)
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
var ps []*peer.Peer
var ps []peer.Peer
for i := 0; i < 5; i++ {
ps = append(ps, _randPeer())
d.Update(ps[i])
@ -228,7 +225,7 @@ func TestNotFound(t *testing.T) {
case Message_GET_VALUE:
resp := &Message{Type: pmes.Type}
peers := []*peer.Peer{}
peers := []peer.Peer{}
for i := 0; i < 7; i++ {
peers = append(peers, _randPeer())
}
@ -270,13 +267,13 @@ func TestLessThanKResponses(t *testing.T) {
u.Debug = false
fn := &fauxNet{}
fs := &fauxSender{}
local := peer.WithIDString("test_peer")
peerstore := peer.NewPeerstore()
local := new(peer.Peer)
local.ID = peer.ID("test_peer")
peerstore.Put(local)
d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore())
var ps []*peer.Peer
var ps []peer.Peer
for i := 0; i < 5; i++ {
ps = append(ps, _randPeer())
d.Update(ps[i])
@ -295,7 +292,7 @@ func TestLessThanKResponses(t *testing.T) {
case Message_GET_VALUE:
resp := &Message{
Type: pmes.Type,
CloserPeers: peersToPBPeers([]*peer.Peer{other}),
CloserPeers: peersToPBPeers([]peer.Peer{other}),
}
mes, err := msg.FromObject(mes.Peer(), resp)

View File

@ -14,7 +14,7 @@ import (
var CloserPeerCount = 4
// dhthandler specifies the signature of functions that handle DHT messages.
type dhtHandler func(*peer.Peer, *Message) (*Message, error)
type dhtHandler func(peer.Peer, *Message) (*Message, error)
func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
switch t {
@ -35,7 +35,7 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler {
}
}
func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error) {
log.Debug("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
// setup response
@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error
}
// Store a value in this peer local storage
func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *Message) (*Message, error) {
dht.dslock.Lock()
defer dht.dslock.Unlock()
dskey := u.Key(pmes.GetKey()).DsKey()
@ -102,18 +102,18 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error
return pmes, err
}
func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *Message) (*Message, error) {
log.Debug("%s Responding to ping from %s!\n", dht.self, p)
return pmes, nil
}
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *Message) (*Message, error) {
resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel())
var closest []*peer.Peer
var closest []peer.Peer
// if looking for self... special case where we send it on CloserPeers.
if peer.ID(pmes.GetKey()).Equal(dht.self.ID) {
closest = []*peer.Peer{dht.self}
if peer.ID(pmes.GetKey()).Equal(dht.self.ID()) {
closest = []peer.Peer{dht.self}
} else {
closest = dht.betterPeersToQuery(pmes, CloserPeerCount)
}
@ -123,9 +123,9 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error
return resp, nil
}
var withAddresses []*peer.Peer
var withAddresses []peer.Peer
for _, p := range closest {
if len(p.Addresses) > 0 {
if len(p.Addresses()) > 0 {
withAddresses = append(withAddresses, p)
}
}
@ -137,7 +137,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error
return resp, nil
}
func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *Message) (*Message, error) {
resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
// check if we have this value, to add ourselves as provider.
@ -171,10 +171,10 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, e
type providerInfo struct {
Creation time.Time
Value *peer.Peer
Value peer.Peer
}
func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, error) {
func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *Message) (*Message, error) {
key := u.Key(pmes.GetKey())
log.Debug("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key))
@ -182,7 +182,7 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, er
// add provider should use the address given in the message
for _, pb := range pmes.GetProviderPeers() {
pid := peer.ID(pb.GetId())
if pid.Equal(p.ID) {
if pid.Equal(p.ID()) {
addr, err := pb.Address()
if err != nil {

View File

@ -20,12 +20,12 @@ type ProviderManager struct {
type addProv struct {
k u.Key
val *peer.Peer
val peer.Peer
}
type getProv struct {
k u.Key
resp chan []*peer.Peer
resp chan []peer.Peer
}
func NewProviderManager(local peer.ID) *ProviderManager {
@ -45,7 +45,7 @@ func (pm *ProviderManager) run() {
for {
select {
case np := <-pm.newprovs:
if np.val.ID.Equal(pm.lpeer) {
if np.val.ID().Equal(pm.lpeer) {
pm.local[np.k] = struct{}{}
}
pi := new(providerInfo)
@ -54,7 +54,7 @@ func (pm *ProviderManager) run() {
arr := pm.providers[np.k]
pm.providers[np.k] = append(arr, pi)
case gp := <-pm.getprovs:
var parr []*peer.Peer
var parr []peer.Peer
provs := pm.providers[gp.k]
for _, p := range provs {
parr = append(parr, p.Value)
@ -82,17 +82,17 @@ func (pm *ProviderManager) run() {
}
}
func (pm *ProviderManager) AddProvider(k u.Key, val *peer.Peer) {
func (pm *ProviderManager) AddProvider(k u.Key, val peer.Peer) {
pm.newprovs <- &addProv{
k: k,
val: val,
}
}
func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer {
func (pm *ProviderManager) GetProviders(k u.Key) []peer.Peer {
gp := new(getProv)
gp.k = k
gp.resp = make(chan []*peer.Peer)
gp.resp = make(chan []peer.Peer)
pm.getprovs <- gp
return <-gp.resp
}

View File

@ -11,7 +11,7 @@ func TestProviderManager(t *testing.T) {
mid := peer.ID("testing")
p := NewProviderManager(mid)
a := u.Key("test")
p.AddProvider(a, &peer.Peer{})
p.AddProvider(a, peer.WithIDString("testingprovider"))
resp := p.GetProviders(a)
if len(resp) != 1 {
t.Fatal("Could not retrieve provider.")

View File

@ -27,9 +27,9 @@ type dhtQuery struct {
type dhtQueryResult struct {
value []byte // GetValue
peer *peer.Peer // FindPeer
providerPeers []*peer.Peer // GetProviders
closerPeers []*peer.Peer // *
peer peer.Peer // FindPeer
providerPeers []peer.Peer // GetProviders
closerPeers []peer.Peer // *
success bool
}
@ -47,10 +47,10 @@ func newQuery(k u.Key, f queryFunc) *dhtQuery {
// - the value
// - a list of peers potentially better able to serve the query
// - an error
type queryFunc func(context.Context, *peer.Peer) (*dhtQueryResult, error)
type queryFunc func(context.Context, peer.Peer) (*dhtQueryResult, error)
// Run runs the query at hand. pass in a list of peers to use first.
func (q *dhtQuery) Run(ctx context.Context, peers []*peer.Peer) (*dhtQueryResult, error) {
func (q *dhtQuery) Run(ctx context.Context, peers []peer.Peer) (*dhtQueryResult, error) {
runner := newQueryRunner(ctx, q)
return runner.Run(peers)
}
@ -100,7 +100,7 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
}
}
func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
func (r *dhtQueryRunner) Run(peers []peer.Peer) (*dhtQueryResult, error) {
log.Debug("Run query with %d peers.", len(peers))
if len(peers) == 0 {
log.Warning("Running query with no peers!")
@ -148,7 +148,7 @@ func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
return nil, err
}
func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
func (r *dhtQueryRunner) addPeerToQuery(next peer.Peer, benchmark peer.Peer) {
if next == nil {
// wtf why are peers nil?!?
log.Error("Query getting nil peers!!!\n")
@ -156,7 +156,7 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
}
// if new peer further away than whom we got it from, bother (loops)
if benchmark != nil && kb.Closer(benchmark.ID, next.ID, r.query.key) {
if benchmark != nil && kb.Closer(benchmark.ID(), next.ID(), r.query.key) {
return
}
@ -200,7 +200,7 @@ func (r *dhtQueryRunner) spawnWorkers() {
}
}
func (r *dhtQueryRunner) queryPeer(p *peer.Peer) {
func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
log.Debug("spawned worker for: %v\n", p)
// make sure we rate limit concurrency.

View File

@ -23,13 +23,13 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
return err
}
var peers []*peer.Peer
var peers []peer.Peer
for _, route := range dht.routingTables {
npeers := route.NearestPeers(kb.ConvertKey(key), KValue)
peers = append(peers, npeers...)
}
query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
query := newQuery(key, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
log.Debug("%s PutValue qry part %v", dht.self, p)
err := dht.putValueToNetwork(ctx, p, string(key), value)
if err != nil {
@ -65,7 +65,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
}
// setup the Query
query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
query := newQuery(key, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel)
if err != nil {
@ -117,8 +117,8 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
return nil
}
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan *peer.Peer {
peerOut := make(chan *peer.Peer, count)
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer {
peerOut := make(chan peer.Peer, count)
go func() {
ps := newPeerSet()
provs := dht.providers.GetProviders(key)
@ -136,7 +136,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
for _, pp := range peers {
wg.Add(1)
go func(p *peer.Peer) {
go func(p peer.Peer) {
defer wg.Done()
pmes, err := dht.findProvidersSingle(ctx, p, key, 0)
if err != nil {
@ -153,7 +153,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
}
//TODO: this function could also be done asynchronously
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan *peer.Peer) {
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
for _, pbp := range peers {
// construct new peer
@ -173,7 +173,7 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet
// Find specific Peer
// FindPeer searches for a peer with given ID.
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error) {
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) {
// Check if were already connected to them
p, _ := dht.FindLocal(id)
@ -186,7 +186,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error
if p == nil {
return nil, nil
}
if p.ID.Equal(id) {
if p.ID().Equal(id) {
return p, nil
}
@ -205,7 +205,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error
continue
}
if nxtPeer.ID.Equal(id) {
if nxtPeer.ID().Equal(id) {
return nxtPeer, nil
}
@ -214,7 +214,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error
return nil, u.ErrNotFound
}
func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Peer, error) {
func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (peer.Peer, error) {
// Check if were already connected to them
p, _ := dht.FindLocal(id)
@ -230,7 +230,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee
}
// setup query function
query := newQuery(u.Key(id), func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) {
query := newQuery(u.Key(id), func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
if err != nil {
log.Error("%s getPeer error: %v", dht.self, err)
@ -242,7 +242,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee
routeLevel++
}
nxtprs := make([]*peer.Peer, len(plist))
nxtprs := make([]peer.Peer, len(plist))
for i, fp := range plist {
nxtp, err := dht.peerFromInfo(fp)
if err != nil {
@ -250,7 +250,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee
continue
}
if nxtp.ID.Equal(id) {
if nxtp.ID().Equal(id) {
return &dhtQueryResult{peer: nxtp, success: true}, nil
}
@ -272,7 +272,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee
}
// Ping a peer, log the time it took
func (dht *IpfsDHT) Ping(ctx context.Context, p *peer.Peer) error {
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error {
// Thoughts: maybe this should accept an ID and do a peer lookup?
log.Info("ping %s start", p)

14
util.go
View File

@ -52,15 +52,15 @@ func newPeerSet() *peerSet {
return ps
}
func (ps *peerSet) Add(p *peer.Peer) {
func (ps *peerSet) Add(p peer.Peer) {
ps.lk.Lock()
ps.ps[string(p.ID)] = true
ps.ps[string(p.ID())] = true
ps.lk.Unlock()
}
func (ps *peerSet) Contains(p *peer.Peer) bool {
func (ps *peerSet) Contains(p peer.Peer) bool {
ps.lk.RLock()
_, ok := ps.ps[string(p.ID)]
_, ok := ps.ps[string(p.ID())]
ps.lk.RUnlock()
return ok
}
@ -71,12 +71,12 @@ func (ps *peerSet) Size() int {
return len(ps.ps)
}
func (ps *peerSet) AddIfSmallerThan(p *peer.Peer, maxsize int) bool {
func (ps *peerSet) AddIfSmallerThan(p peer.Peer, maxsize int) bool {
var success bool
ps.lk.Lock()
if _, ok := ps.ps[string(p.ID)]; !ok && len(ps.ps) < maxsize {
if _, ok := ps.ps[string(p.ID())]; !ok && len(ps.ps) < maxsize {
success = true
ps.ps[string(p.ID)] = true
ps.ps[string(p.ID())] = true
}
ps.lk.Unlock()
return success