diff --git a/Message.go b/Message.go index 5267242..ae78d1f 100644 --- a/Message.go +++ b/Message.go @@ -17,20 +17,21 @@ func newMessage(typ Message_MessageType, key string, level int) *Message { return m } -func peerToPBPeer(p *peer.Peer) *Message_Peer { +func peerToPBPeer(p peer.Peer) *Message_Peer { pbp := new(Message_Peer) - if len(p.Addresses) == 0 || p.Addresses[0] == nil { + addrs := p.Addresses() + if len(addrs) == 0 || addrs[0] == nil { pbp.Addr = proto.String("") } else { - addr := p.Addresses[0].String() + addr := addrs[0].String() pbp.Addr = &addr } - pid := string(p.ID) + pid := string(p.ID()) pbp.Id = &pid return pbp } -func peersToPBPeers(peers []*peer.Peer) []*Message_Peer { +func peersToPBPeers(peers []peer.Peer) []*Message_Peer { pbpeers := make([]*Message_Peer, len(peers)) for i, p := range peers { pbpeers[i] = peerToPBPeer(p) diff --git a/dht.go b/dht.go index 683b578..3617b71 100644 --- a/dht.go +++ b/dht.go @@ -39,7 +39,7 @@ type IpfsDHT struct { sender inet.Sender // Local peer (yourself) - self *peer.Peer + self peer.Peer // Other peers peerstore peer.Peerstore @@ -60,7 +60,7 @@ type IpfsDHT struct { } // NewDHT creates a new DHT object with the given peer as the 'local' host -func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT { +func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, net inet.Network, sender inet.Sender, dstore ds.Datastore) *IpfsDHT { dht := new(IpfsDHT) dht.network = net dht.sender = sender @@ -69,12 +69,12 @@ func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Netwo dht.peerstore = ps dht.ctx = ctx - dht.providers = NewProviderManager(p.ID) + dht.providers = NewProviderManager(p.ID()) dht.routingTables = make([]*kb.RoutingTable, 3) - dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000) - dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*1000) - dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour) + dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000) + dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Millisecond*1000) + dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Hour) dht.birth = time.Now() if doPinging { @@ -84,7 +84,7 @@ func NewDHT(ctx context.Context, p *peer.Peer, ps peer.Peerstore, net inet.Netwo } // Connect to a new peer at the given address, ping and add to the routing table -func (dht *IpfsDHT) Connect(ctx context.Context, npeer *peer.Peer) (*peer.Peer, error) { +func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, error) { log.Debug("Connect to new peer: %s", npeer) // TODO(jbenet,whyrusleeping) @@ -175,7 +175,7 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) msg.N // sendRequest sends out a request using dht.sender, but also makes sure to // measure the RTT for latency measurements. -func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *Message) (*Message, error) { mes, err := msg.FromObject(p, pmes) if err != nil { @@ -208,7 +208,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message } // putValueToNetwork stores the given key/value pair at the peer 'p' -func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, +func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer, key string, value []byte) error { pmes := newMessage(Message_PUT_VALUE, string(key), 0) @@ -224,12 +224,12 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, return nil } -func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) error { +func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error { pmes := newMessage(Message_ADD_PROVIDER, string(key), 0) // add self as the provider - pmes.ProviderPeers = peersToPBPeers([]*peer.Peer{dht.self}) + pmes.ProviderPeers = peersToPBPeers([]peer.Peer{dht.self}) rpmes, err := dht.sendRequest(ctx, p, pmes) if err != nil { @@ -244,8 +244,8 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) e return nil } -func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer, - key u.Key, level int) ([]byte, []*peer.Peer, error) { +func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer, + key u.Key, level int) ([]byte, []peer.Peer, error) { pmes, err := dht.getValueSingle(ctx, p, key, level) if err != nil { @@ -270,7 +270,7 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer, } // Perhaps we were given closer peers - var peers []*peer.Peer + var peers []peer.Peer for _, pb := range pmes.GetCloserPeers() { pr, err := dht.addPeer(pb) if err != nil { @@ -289,8 +289,8 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer, return nil, nil, u.ErrNotFound } -func (dht *IpfsDHT) addPeer(pb *Message_Peer) (*peer.Peer, error) { - if peer.ID(pb.GetId()).Equal(dht.self.ID) { +func (dht *IpfsDHT) addPeer(pb *Message_Peer) (peer.Peer, error) { + if peer.ID(pb.GetId()).Equal(dht.self.ID()) { return nil, errors.New("cannot add self as peer") } @@ -310,7 +310,7 @@ func (dht *IpfsDHT) addPeer(pb *Message_Peer) (*peer.Peer, error) { } // getValueSingle simply performs the get value RPC with the given parameters -func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer, +func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) { pmes := newMessage(Message_GET_VALUE, string(key), level) @@ -369,7 +369,7 @@ func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error { // Update signals to all routingTables to Update their last-seen status // on the given peer. -func (dht *IpfsDHT) Update(p *peer.Peer) { +func (dht *IpfsDHT) Update(p peer.Peer) { log.Debug("updating peer: %s latency = %f\n", p, p.GetLatency().Seconds()) removedCount := 0 for _, route := range dht.routingTables { @@ -390,7 +390,7 @@ func (dht *IpfsDHT) Update(p *peer.Peer) { } // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. -func (dht *IpfsDHT) FindLocal(id peer.ID) (*peer.Peer, *kb.RoutingTable) { +func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) { for _, table := range dht.routingTables { p := table.Find(id) if p != nil { @@ -400,7 +400,7 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) (*peer.Peer, *kb.RoutingTable) { return nil, nil } -func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p *peer.Peer, id peer.ID, level int) (*Message, error) { +func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID, level int) (*Message, error) { pmes := newMessage(Message_FIND_NODE, string(id), level) return dht.sendRequest(ctx, p, pmes) } @@ -411,14 +411,14 @@ func (dht *IpfsDHT) printTables() { } } -func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p *peer.Peer, key u.Key, level int) (*Message, error) { +func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) { pmes := newMessage(Message_GET_PROVIDERS, string(key), level) return dht.sendRequest(ctx, p, pmes) } // TODO: Could be done async -func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer { - var provArr []*peer.Peer +func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer { + var provArr []peer.Peer for _, prov := range peers { p, err := dht.peerFromInfo(prov) if err != nil { @@ -429,7 +429,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer log.Debug("%s adding provider: %s for %s", dht.self, p, key) // Dont add outselves to the list - if p.ID.Equal(dht.self.ID) { + if p.ID().Equal(dht.self.ID()) { continue } @@ -441,7 +441,7 @@ func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []*peer.Peer } // nearestPeersToQuery returns the routing tables closest peers. -func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []*peer.Peer { +func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []peer.Peer { level := pmes.GetClusterLevel() cluster := dht.routingTables[level] @@ -451,7 +451,7 @@ func (dht *IpfsDHT) nearestPeersToQuery(pmes *Message, count int) []*peer.Peer { } // betterPeerToQuery returns nearestPeersToQuery, but iff closer than self. -func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer { +func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []peer.Peer { closer := dht.nearestPeersToQuery(pmes, count) // no node? nil @@ -461,17 +461,17 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer { // == to self? thats bad for _, p := range closer { - if p.ID.Equal(dht.self.ID) { + if p.ID().Equal(dht.self.ID()) { log.Error("Attempted to return self! this shouldnt happen...") return nil } } - var filtered []*peer.Peer + var filtered []peer.Peer for _, p := range closer { // must all be closer than self key := u.Key(pmes.GetKey()) - if !kb.Closer(dht.self.ID, p.ID, key) { + if !kb.Closer(dht.self.ID(), p.ID(), key) { filtered = append(filtered, p) } } @@ -480,7 +480,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *Message, count int) []*peer.Peer { return filtered } -func (dht *IpfsDHT) getPeer(id peer.ID) (*peer.Peer, error) { +func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) { p, err := dht.peerstore.Get(id) if err != nil { err = fmt.Errorf("Failed to get peer from peerstore: %s", err) @@ -490,12 +490,12 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (*peer.Peer, error) { return p, nil } -func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) { +func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (peer.Peer, error) { id := peer.ID(pbp.GetId()) // continue if it's ourselves - if id.Equal(dht.self.ID) { + if id.Equal(dht.self.ID()) { return nil, errors.New("found self") } @@ -512,7 +512,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) { return p, nil } -func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (*peer.Peer, error) { +func (dht *IpfsDHT) ensureConnectedToPeer(pbp *Message_Peer) (peer.Peer, error) { p, err := dht.peerFromInfo(pbp) if err != nil { return nil, err diff --git a/dht_test.go b/dht_test.go index 98b196d..23d9fcf 100644 --- a/dht_test.go +++ b/dht_test.go @@ -20,7 +20,7 @@ import ( "time" ) -func setupDHT(ctx context.Context, t *testing.T, p *peer.Peer) *IpfsDHT { +func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT { peerstore := peer.NewPeerstore() dhts := netservice.NewService(nil) // nil handler for now, need to patch it @@ -40,7 +40,7 @@ func setupDHT(ctx context.Context, t *testing.T, p *peer.Peer) *IpfsDHT { return d } -func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*peer.Peer, []*IpfsDHT) { +func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.Peer, []*IpfsDHT) { var addrs []ma.Multiaddr for i := 0; i < n; i++ { a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i)) @@ -50,7 +50,7 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*pee addrs = append(addrs, a) } - var peers []*peer.Peer + var peers []peer.Peer for i := 0; i < n; i++ { p := makePeer(addrs[i]) peers = append(peers, p) @@ -64,21 +64,16 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []*pee return addrs, peers, dhts } -func makePeer(addr ma.Multiaddr) *peer.Peer { - p := new(peer.Peer) - p.AddAddress(addr) +func makePeer(addr ma.Multiaddr) peer.Peer { sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) if err != nil { panic(err) } - p.PrivKey = sk - p.PubKey = pk - id, err := peer.IDFromPubKey(pk) + p, err := peer.WithKeyPair(sk, pk) if err != nil { panic(err) } - - p.ID = id + p.AddAddress(addr) return p } @@ -289,7 +284,7 @@ func TestProvidesAsync(t *testing.T) { provs := dhts[0].FindProvidersAsync(ctxT, u.Key("hello"), 5) select { case p := <-provs: - if !p.ID.Equal(dhts[3].self.ID) { + if !p.ID().Equal(dhts[3].self.ID()) { t.Fatalf("got a provider, but not the right one. %s", p) } case <-ctxT.Done(): @@ -379,7 +374,7 @@ func TestFindPeer(t *testing.T) { } ctxT, _ := context.WithTimeout(ctx, time.Second) - p, err := dhts[0].FindPeer(ctxT, peers[2].ID) + p, err := dhts[0].FindPeer(ctxT, peers[2].ID()) if err != nil { t.Fatal(err) } @@ -388,7 +383,7 @@ func TestFindPeer(t *testing.T) { t.Fatal("Failed to find peer.") } - if !p.ID.Equal(peers[2].ID) { + if !p.ID().Equal(peers[2].ID()) { t.Fatal("Didnt find expected peer.") } } diff --git a/diag.go b/diag.go index 8fd581a..e91ba9b 100644 --- a/diag.go +++ b/diag.go @@ -32,12 +32,13 @@ func (di *diagInfo) Marshal() []byte { func (dht *IpfsDHT) getDiagInfo() *diagInfo { di := new(diagInfo) di.CodeVersion = "github.com/jbenet/go-ipfs" - di.ID = dht.self.ID + di.ID = dht.self.ID() di.LifeSpan = time.Since(dht.birth) di.Keys = nil // Currently no way to query datastore for _, p := range dht.routingTables[0].ListPeers() { - di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID}) + d := connDiagInfo{p.GetLatency(), p.ID()} + di.Connections = append(di.Connections, d) } return di } diff --git a/ext_test.go b/ext_test.go index b5bf487..c4ba094 100644 --- a/ext_test.go +++ b/ext_test.go @@ -9,7 +9,6 @@ import ( "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/datastore.go" - ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" msg "github.com/jbenet/go-ipfs/net/message" mux "github.com/jbenet/go-ipfs/net/mux" peer "github.com/jbenet/go-ipfs/peer" @@ -66,17 +65,17 @@ type fauxNet struct { } // DialPeer attempts to establish a connection to a given peer -func (f *fauxNet) DialPeer(*peer.Peer) error { +func (f *fauxNet) DialPeer(peer.Peer) error { return nil } // ClosePeer connection to peer -func (f *fauxNet) ClosePeer(*peer.Peer) error { +func (f *fauxNet) ClosePeer(peer.Peer) error { return nil } // IsConnected returns whether a connection to given peer exists. -func (f *fauxNet) IsConnected(*peer.Peer) (bool, error) { +func (f *fauxNet) IsConnected(peer.Peer) (bool, error) { return true, nil } @@ -88,7 +87,7 @@ func (f *fauxNet) SendMessage(msg.NetMessage) error { return nil } -func (f *fauxNet) GetPeerList() []*peer.Peer { +func (f *fauxNet) GetPeerList() []peer.Peer { return nil } @@ -107,11 +106,10 @@ func TestGetFailures(t *testing.T) { fs := &fauxSender{} peerstore := peer.NewPeerstore() - local := new(peer.Peer) - local.ID = peer.ID("test_peer") + local := peer.WithIDString("test_peer") d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) - other := &peer.Peer{ID: peer.ID("other_peer")} + other := peer.WithIDString("other_peer") d.Update(other) // This one should time out @@ -189,11 +187,10 @@ func TestGetFailures(t *testing.T) { } // TODO: Maybe put these in some sort of "ipfs_testutil" package -func _randPeer() *peer.Peer { - p := new(peer.Peer) - p.ID = make(peer.ID, 16) - p.Addresses = []ma.Multiaddr{nil} - crand.Read(p.ID) +func _randPeer() peer.Peer { + id := make(peer.ID, 16) + crand.Read(id) + p := peer.WithID(id) return p } @@ -204,13 +201,13 @@ func TestNotFound(t *testing.T) { fn := &fauxNet{} fs := &fauxSender{} - local := new(peer.Peer) - local.ID = peer.ID("test_peer") + local := peer.WithIDString("test_peer") peerstore := peer.NewPeerstore() + peerstore.Put(local) d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) - var ps []*peer.Peer + var ps []peer.Peer for i := 0; i < 5; i++ { ps = append(ps, _randPeer()) d.Update(ps[i]) @@ -228,7 +225,7 @@ func TestNotFound(t *testing.T) { case Message_GET_VALUE: resp := &Message{Type: pmes.Type} - peers := []*peer.Peer{} + peers := []peer.Peer{} for i := 0; i < 7; i++ { peers = append(peers, _randPeer()) } @@ -270,13 +267,13 @@ func TestLessThanKResponses(t *testing.T) { u.Debug = false fn := &fauxNet{} fs := &fauxSender{} + local := peer.WithIDString("test_peer") peerstore := peer.NewPeerstore() - local := new(peer.Peer) - local.ID = peer.ID("test_peer") + peerstore.Put(local) d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) - var ps []*peer.Peer + var ps []peer.Peer for i := 0; i < 5; i++ { ps = append(ps, _randPeer()) d.Update(ps[i]) @@ -295,7 +292,7 @@ func TestLessThanKResponses(t *testing.T) { case Message_GET_VALUE: resp := &Message{ Type: pmes.Type, - CloserPeers: peersToPBPeers([]*peer.Peer{other}), + CloserPeers: peersToPBPeers([]peer.Peer{other}), } mes, err := msg.FromObject(mes.Peer(), resp) diff --git a/handlers.go b/handlers.go index 1046516..44802ba 100644 --- a/handlers.go +++ b/handlers.go @@ -14,7 +14,7 @@ import ( var CloserPeerCount = 4 // dhthandler specifies the signature of functions that handle DHT messages. -type dhtHandler func(*peer.Peer, *Message) (*Message, error) +type dhtHandler func(peer.Peer, *Message) (*Message, error) func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler { switch t { @@ -35,7 +35,7 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler { } } -func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handleGetValue(p peer.Peer, pmes *Message) (*Message, error) { log.Debug("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey()) // setup response @@ -93,7 +93,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error } // Store a value in this peer local storage -func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handlePutValue(p peer.Peer, pmes *Message) (*Message, error) { dht.dslock.Lock() defer dht.dslock.Unlock() dskey := u.Key(pmes.GetKey()).DsKey() @@ -102,18 +102,18 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error return pmes, err } -func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handlePing(p peer.Peer, pmes *Message) (*Message, error) { log.Debug("%s Responding to ping from %s!\n", dht.self, p) return pmes, nil } -func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handleFindPeer(p peer.Peer, pmes *Message) (*Message, error) { resp := newMessage(pmes.GetType(), "", pmes.GetClusterLevel()) - var closest []*peer.Peer + var closest []peer.Peer // if looking for self... special case where we send it on CloserPeers. - if peer.ID(pmes.GetKey()).Equal(dht.self.ID) { - closest = []*peer.Peer{dht.self} + if peer.ID(pmes.GetKey()).Equal(dht.self.ID()) { + closest = []peer.Peer{dht.self} } else { closest = dht.betterPeersToQuery(pmes, CloserPeerCount) } @@ -123,9 +123,9 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error return resp, nil } - var withAddresses []*peer.Peer + var withAddresses []peer.Peer for _, p := range closest { - if len(p.Addresses) > 0 { + if len(p.Addresses()) > 0 { withAddresses = append(withAddresses, p) } } @@ -137,7 +137,7 @@ func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *Message) (*Message, error return resp, nil } -func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *Message) (*Message, error) { resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) // check if we have this value, to add ourselves as provider. @@ -171,10 +171,10 @@ func (dht *IpfsDHT) handleGetProviders(p *peer.Peer, pmes *Message) (*Message, e type providerInfo struct { Creation time.Time - Value *peer.Peer + Value peer.Peer } -func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, error) { +func (dht *IpfsDHT) handleAddProvider(p peer.Peer, pmes *Message) (*Message, error) { key := u.Key(pmes.GetKey()) log.Debug("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key)) @@ -182,7 +182,7 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *Message) (*Message, er // add provider should use the address given in the message for _, pb := range pmes.GetProviderPeers() { pid := peer.ID(pb.GetId()) - if pid.Equal(p.ID) { + if pid.Equal(p.ID()) { addr, err := pb.Address() if err != nil { diff --git a/providers.go b/providers.go index c62755c..204fdf7 100644 --- a/providers.go +++ b/providers.go @@ -20,12 +20,12 @@ type ProviderManager struct { type addProv struct { k u.Key - val *peer.Peer + val peer.Peer } type getProv struct { k u.Key - resp chan []*peer.Peer + resp chan []peer.Peer } func NewProviderManager(local peer.ID) *ProviderManager { @@ -45,7 +45,7 @@ func (pm *ProviderManager) run() { for { select { case np := <-pm.newprovs: - if np.val.ID.Equal(pm.lpeer) { + if np.val.ID().Equal(pm.lpeer) { pm.local[np.k] = struct{}{} } pi := new(providerInfo) @@ -54,7 +54,7 @@ func (pm *ProviderManager) run() { arr := pm.providers[np.k] pm.providers[np.k] = append(arr, pi) case gp := <-pm.getprovs: - var parr []*peer.Peer + var parr []peer.Peer provs := pm.providers[gp.k] for _, p := range provs { parr = append(parr, p.Value) @@ -82,17 +82,17 @@ func (pm *ProviderManager) run() { } } -func (pm *ProviderManager) AddProvider(k u.Key, val *peer.Peer) { +func (pm *ProviderManager) AddProvider(k u.Key, val peer.Peer) { pm.newprovs <- &addProv{ k: k, val: val, } } -func (pm *ProviderManager) GetProviders(k u.Key) []*peer.Peer { +func (pm *ProviderManager) GetProviders(k u.Key) []peer.Peer { gp := new(getProv) gp.k = k - gp.resp = make(chan []*peer.Peer) + gp.resp = make(chan []peer.Peer) pm.getprovs <- gp return <-gp.resp } diff --git a/providers_test.go b/providers_test.go index 0cdfa4f..b37327d 100644 --- a/providers_test.go +++ b/providers_test.go @@ -11,7 +11,7 @@ func TestProviderManager(t *testing.T) { mid := peer.ID("testing") p := NewProviderManager(mid) a := u.Key("test") - p.AddProvider(a, &peer.Peer{}) + p.AddProvider(a, peer.WithIDString("testingprovider")) resp := p.GetProviders(a) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") diff --git a/query.go b/query.go index 0a9ca0b..ef3670c 100644 --- a/query.go +++ b/query.go @@ -26,10 +26,10 @@ type dhtQuery struct { } type dhtQueryResult struct { - value []byte // GetValue - peer *peer.Peer // FindPeer - providerPeers []*peer.Peer // GetProviders - closerPeers []*peer.Peer // * + value []byte // GetValue + peer peer.Peer // FindPeer + providerPeers []peer.Peer // GetProviders + closerPeers []peer.Peer // * success bool } @@ -47,10 +47,10 @@ func newQuery(k u.Key, f queryFunc) *dhtQuery { // - the value // - a list of peers potentially better able to serve the query // - an error -type queryFunc func(context.Context, *peer.Peer) (*dhtQueryResult, error) +type queryFunc func(context.Context, peer.Peer) (*dhtQueryResult, error) // Run runs the query at hand. pass in a list of peers to use first. -func (q *dhtQuery) Run(ctx context.Context, peers []*peer.Peer) (*dhtQueryResult, error) { +func (q *dhtQuery) Run(ctx context.Context, peers []peer.Peer) (*dhtQueryResult, error) { runner := newQueryRunner(ctx, q) return runner.Run(peers) } @@ -100,7 +100,7 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner { } } -func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) { +func (r *dhtQueryRunner) Run(peers []peer.Peer) (*dhtQueryResult, error) { log.Debug("Run query with %d peers.", len(peers)) if len(peers) == 0 { log.Warning("Running query with no peers!") @@ -148,7 +148,7 @@ func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) { return nil, err } -func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) { +func (r *dhtQueryRunner) addPeerToQuery(next peer.Peer, benchmark peer.Peer) { if next == nil { // wtf why are peers nil?!? log.Error("Query getting nil peers!!!\n") @@ -156,7 +156,7 @@ func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) { } // if new peer further away than whom we got it from, bother (loops) - if benchmark != nil && kb.Closer(benchmark.ID, next.ID, r.query.key) { + if benchmark != nil && kb.Closer(benchmark.ID(), next.ID(), r.query.key) { return } @@ -200,7 +200,7 @@ func (r *dhtQueryRunner) spawnWorkers() { } } -func (r *dhtQueryRunner) queryPeer(p *peer.Peer) { +func (r *dhtQueryRunner) queryPeer(p peer.Peer) { log.Debug("spawned worker for: %v\n", p) // make sure we rate limit concurrency. diff --git a/routing.go b/routing.go index 55ef265..7f004dc 100644 --- a/routing.go +++ b/routing.go @@ -23,13 +23,13 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error return err } - var peers []*peer.Peer + var peers []peer.Peer for _, route := range dht.routingTables { npeers := route.NearestPeers(kb.ConvertKey(key), KValue) peers = append(peers, npeers...) } - query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) { + query := newQuery(key, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { log.Debug("%s PutValue qry part %v", dht.self, p) err := dht.putValueToNetwork(ctx, p, string(key), value) if err != nil { @@ -65,7 +65,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { } // setup the Query - query := newQuery(key, func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) { + query := newQuery(key, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel) if err != nil { @@ -117,8 +117,8 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { return nil } -func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan *peer.Peer { - peerOut := make(chan *peer.Peer, count) +func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer { + peerOut := make(chan peer.Peer, count) go func() { ps := newPeerSet() provs := dht.providers.GetProviders(key) @@ -136,7 +136,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue) for _, pp := range peers { wg.Add(1) - go func(p *peer.Peer) { + go func(p peer.Peer) { defer wg.Done() pmes, err := dht.findProvidersSingle(ctx, p, key, 0) if err != nil { @@ -153,7 +153,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int } //TODO: this function could also be done asynchronously -func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan *peer.Peer) { +func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan peer.Peer) { for _, pbp := range peers { // construct new peer @@ -173,7 +173,7 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet // Find specific Peer // FindPeer searches for a peer with given ID. -func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error) { +func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) { // Check if were already connected to them p, _ := dht.FindLocal(id) @@ -186,7 +186,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error if p == nil { return nil, nil } - if p.ID.Equal(id) { + if p.ID().Equal(id) { return p, nil } @@ -205,7 +205,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error continue } - if nxtPeer.ID.Equal(id) { + if nxtPeer.ID().Equal(id) { return nxtPeer, nil } @@ -214,7 +214,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error return nil, u.ErrNotFound } -func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Peer, error) { +func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (peer.Peer, error) { // Check if were already connected to them p, _ := dht.FindLocal(id) @@ -230,7 +230,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee } // setup query function - query := newQuery(u.Key(id), func(ctx context.Context, p *peer.Peer) (*dhtQueryResult, error) { + query := newQuery(u.Key(id), func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) { pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel) if err != nil { log.Error("%s getPeer error: %v", dht.self, err) @@ -242,7 +242,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee routeLevel++ } - nxtprs := make([]*peer.Peer, len(plist)) + nxtprs := make([]peer.Peer, len(plist)) for i, fp := range plist { nxtp, err := dht.peerFromInfo(fp) if err != nil { @@ -250,7 +250,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee continue } - if nxtp.ID.Equal(id) { + if nxtp.ID().Equal(id) { return &dhtQueryResult{peer: nxtp, success: true}, nil } @@ -272,7 +272,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Pee } // Ping a peer, log the time it took -func (dht *IpfsDHT) Ping(ctx context.Context, p *peer.Peer) error { +func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error { // Thoughts: maybe this should accept an ID and do a peer lookup? log.Info("ping %s start", p) diff --git a/util.go b/util.go index d12f7f9..3cc8126 100644 --- a/util.go +++ b/util.go @@ -52,15 +52,15 @@ func newPeerSet() *peerSet { return ps } -func (ps *peerSet) Add(p *peer.Peer) { +func (ps *peerSet) Add(p peer.Peer) { ps.lk.Lock() - ps.ps[string(p.ID)] = true + ps.ps[string(p.ID())] = true ps.lk.Unlock() } -func (ps *peerSet) Contains(p *peer.Peer) bool { +func (ps *peerSet) Contains(p peer.Peer) bool { ps.lk.RLock() - _, ok := ps.ps[string(p.ID)] + _, ok := ps.ps[string(p.ID())] ps.lk.RUnlock() return ok } @@ -71,12 +71,12 @@ func (ps *peerSet) Size() int { return len(ps.ps) } -func (ps *peerSet) AddIfSmallerThan(p *peer.Peer, maxsize int) bool { +func (ps *peerSet) AddIfSmallerThan(p peer.Peer, maxsize int) bool { var success bool ps.lk.Lock() - if _, ok := ps.ps[string(p.ID)]; !ok && len(ps.ps) < maxsize { + if _, ok := ps.ps[string(p.ID())]; !ok && len(ps.ps) < maxsize { success = true - ps.ps[string(p.ID)] = true + ps.ps[string(p.ID())] = true } ps.lk.Unlock() return success