diff --git a/bucket.go b/bucket.go deleted file mode 100644 index 7aa8d0a..0000000 --- a/bucket.go +++ /dev/null @@ -1,68 +0,0 @@ -package dht - -import ( - "container/list" - - peer "github.com/jbenet/go-ipfs/peer" -) -// Bucket holds a list of peers. -type Bucket list.List - -func (b *Bucket) Find(id peer.ID) *list.Element { - bucket_list := (*list.List)(b) - for e := bucket_list.Front(); e != nil; e = e.Next() { - if e.Value.(*peer.Peer).ID.Equal(id) { - return e - } - } - return nil -} - -func (b *Bucket) MoveToFront(e *list.Element) { - bucket_list := (*list.List)(b) - bucket_list.MoveToFront(e) -} - -func (b *Bucket) PushFront(p *peer.Peer) { - bucket_list := (*list.List)(b) - bucket_list.PushFront(p) -} - -func (b *Bucket) PopBack() *peer.Peer { - bucket_list := (*list.List)(b) - last := bucket_list.Back() - bucket_list.Remove(last) - return last.Value.(*peer.Peer) -} - -func (b *Bucket) Len() int { - bucket_list := (*list.List)(b) - return bucket_list.Len() -} - -// Splits a buckets peers into two buckets, the methods receiver will have -// peers with CPL equal to cpl, the returned bucket will have peers with CPL -// greater than cpl (returned bucket has closer peers) -func (b *Bucket) Split(cpl int, target ID) *Bucket { - bucket_list := (*list.List)(b) - out := list.New() - e := bucket_list.Front() - for e != nil { - peer_id := convertPeerID(e.Value.(*peer.Peer).ID) - peer_cpl := prefLen(peer_id, target) - if peer_cpl > cpl { - cur := e - out.PushBack(e.Value) - e = e.Next() - bucket_list.Remove(cur) - continue - } - e = e.Next() - } - return (*Bucket)(out) -} - -func (b *Bucket) getIter() *list.Element { - bucket_list := (*list.List)(b) - return bucket_list.Front() -} diff --git a/dht.go b/dht.go index 8fbd5c0..7188fc8 100644 --- a/dht.go +++ b/dht.go @@ -10,6 +10,7 @@ import ( peer "github.com/jbenet/go-ipfs/peer" swarm "github.com/jbenet/go-ipfs/swarm" u "github.com/jbenet/go-ipfs/util" + kb "github.com/jbenet/go-ipfs/routing/kbucket" ma "github.com/jbenet/go-multiaddr" @@ -25,7 +26,7 @@ import ( type IpfsDHT struct { // Array of routing tables for differently distanced nodes // NOTE: (currently, only a single table is used) - routes []*RoutingTable + routes []*kb.RoutingTable network *swarm.Swarm @@ -84,8 +85,8 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) { dht.listeners = make(map[uint64]*listenInfo) dht.providers = make(map[u.Key][]*providerInfo) dht.shutdown = make(chan struct{}) - dht.routes = make([]*RoutingTable, 1) - dht.routes[0] = NewRoutingTable(20, convertPeerID(p.ID)) + dht.routes = make([]*kb.RoutingTable, 1) + dht.routes[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID)) dht.birth = time.Now() return dht, nil } @@ -253,7 +254,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *PBDHTMessage) { } } else if err == ds.ErrNotFound { // Find closest peer(s) to desired key and reply with that info - closer := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey()))) + closer := dht.routes[0].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) resp = &DHTMessage{ Response: true, Id: *pmes.Id, @@ -290,7 +291,7 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *PBDHTMessage) { func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *PBDHTMessage) { success := true u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty()) - closest := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey()))) + closest := dht.routes[0].NearestPeer(kb.ConvertKey(u.Key(pmes.GetKey()))) if closest == nil { u.PErr("handleFindPeer: could not find anything.") success = false @@ -432,7 +433,7 @@ func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *PBDHTMessage) { } dht.diaglock.Unlock() - seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10) + seq := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) listen_chan := dht.ListenFor(pmes.GetId(), len(seq), time.Second*30) for _, ps := range seq { diff --git a/diag.go b/diag.go index 4bc752f..50d5a3d 100644 --- a/diag.go +++ b/diag.go @@ -37,7 +37,7 @@ func (dht *IpfsDHT) getDiagInfo() *diagInfo { di.LifeSpan = time.Since(dht.birth) di.Keys = nil // Currently no way to query datastore - for _,p := range dht.routes[0].listpeers() { + for _,p := range dht.routes[0].Listpeers() { di.Connections = append(di.Connections, connDiagInfo{p.GetLatency(), p.ID}) } return di diff --git a/routing.go b/routing.go index 57d087f..8898aaf 100644 --- a/routing.go +++ b/routing.go @@ -12,6 +12,7 @@ import ( ma "github.com/jbenet/go-multiaddr" peer "github.com/jbenet/go-ipfs/peer" + kb "github.com/jbenet/go-ipfs/routing/kbucket" swarm "github.com/jbenet/go-ipfs/swarm" u "github.com/jbenet/go-ipfs/util" ) @@ -33,7 +34,7 @@ func GenerateMessageID() uint64 { // This is the top level "Store" operation of the DHT func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { var p *peer.Peer - p = s.routes[0].NearestPeer(convertKey(key)) + p = s.routes[0].NearestPeer(kb.ConvertKey(key)) if p == nil { return errors.New("Table returned nil peer!") } @@ -46,7 +47,7 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error { // returned along with util.ErrSearchIncomplete func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { var p *peer.Peer - p = s.routes[0].NearestPeer(convertKey(key)) + p = s.routes[0].NearestPeer(kb.ConvertKey(key)) if p == nil { return nil, errors.New("Table returned nil peer!") } @@ -90,7 +91,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { // Announce that this node can provide value for given key func (s *IpfsDHT) Provide(key u.Key) error { - peers := s.routes[0].NearestPeers(convertKey(key), PoolSize) + peers := s.routes[0].NearestPeers(kb.ConvertKey(key), PoolSize) if len(peers) == 0 { //return an error } @@ -110,7 +111,7 @@ func (s *IpfsDHT) Provide(key u.Key) error { // FindProviders searches for peers who can provide the value for given key. func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) { - p := s.routes[0].NearestPeer(convertKey(key)) + p := s.routes[0].NearestPeer(kb.ConvertKey(key)) pmes := DHTMessage{ Type: PBDHTMessage_GET_PROVIDERS, @@ -168,7 +169,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, // FindPeer searches for a peer with given ID. func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) { - p := s.routes[0].NearestPeer(convertPeerID(id)) + p := s.routes[0].NearestPeer(kb.ConvertPeerID(id)) pmes := DHTMessage{ Type: PBDHTMessage_FIND_NODE, @@ -242,7 +243,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error { func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) { u.DOut("Begin Diagnostic") //Send to N closest peers - targets := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10) + targets := dht.routes[0].NearestPeers(kb.ConvertPeerID(dht.self.ID), 10) // TODO: Add timeout to this struct so nodes know when to return pmes := DHTMessage{ diff --git a/table.go b/table.go deleted file mode 100644 index 7a12123..0000000 --- a/table.go +++ /dev/null @@ -1,184 +0,0 @@ -package dht - -import ( - "container/list" - "sort" - "sync" - - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" -) - -// RoutingTable defines the routing table. -type RoutingTable struct { - - // ID of the local peer - local ID - - // Blanket lock, refine later for better performance - tabLock sync.RWMutex - - // kBuckets define all the fingers to other nodes. - Buckets []*Bucket - bucketsize int -} - -func NewRoutingTable(bucketsize int, local_id ID) *RoutingTable { - rt := new(RoutingTable) - rt.Buckets = []*Bucket{new(Bucket)} - rt.bucketsize = bucketsize - rt.local = local_id - return rt -} - -// Update adds or moves the given peer to the front of its respective bucket -// If a peer gets removed from a bucket, it is returned -func (rt *RoutingTable) Update(p *peer.Peer) *peer.Peer { - rt.tabLock.Lock() - defer rt.tabLock.Unlock() - peer_id := convertPeerID(p.ID) - cpl := xor(peer_id, rt.local).commonPrefixLen() - - b_id := cpl - if b_id >= len(rt.Buckets) { - b_id = len(rt.Buckets) - 1 - } - - bucket := rt.Buckets[b_id] - e := bucket.Find(p.ID) - if e == nil { - // New peer, add to bucket - bucket.PushFront(p) - - // Are we past the max bucket size? - if bucket.Len() > rt.bucketsize { - if b_id == len(rt.Buckets) - 1 { - new_bucket := bucket.Split(b_id, rt.local) - rt.Buckets = append(rt.Buckets, new_bucket) - if new_bucket.Len() > rt.bucketsize { - // TODO: This is a very rare and annoying case - panic("Case not handled.") - } - - // If all elements were on left side of split... - if bucket.Len() > rt.bucketsize { - return bucket.PopBack() - } - } else { - // If the bucket cant split kick out least active node - return bucket.PopBack() - } - } - return nil - } else { - // If the peer is already in the table, move it to the front. - // This signifies that it it "more active" and the less active nodes - // Will as a result tend towards the back of the list - bucket.MoveToFront(e) - return nil - } -} - -// A helper struct to sort peers by their distance to the local node -type peerDistance struct { - p *peer.Peer - distance ID -} - -// peerSorterArr implements sort.Interface to sort peers by xor distance -type peerSorterArr []*peerDistance -func (p peerSorterArr) Len() int {return len(p)} -func (p peerSorterArr) Swap(a, b int) {p[a],p[b] = p[b],p[a]} -func (p peerSorterArr) Less(a, b int) bool { - return p[a].distance.Less(p[b].distance) -} -// - -func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr { - for e := peerList.Front(); e != nil; e = e.Next() { - p := e.Value.(*peer.Peer) - p_id := convertPeerID(p.ID) - pd := peerDistance{ - p: p, - distance: xor(target, p_id), - } - peerArr = append(peerArr, &pd) - if e == nil { - u.POut("list element was nil.") - return peerArr - } - } - return peerArr -} - -// Returns a single peer that is nearest to the given ID -func (rt *RoutingTable) NearestPeer(id ID) *peer.Peer { - peers := rt.NearestPeers(id, 1) - if len(peers) > 0 { - return peers[0] - } else { - return nil - } -} - -// Returns a list of the 'count' closest peers to the given ID -func (rt *RoutingTable) NearestPeers(id ID, count int) []*peer.Peer { - rt.tabLock.RLock() - defer rt.tabLock.RUnlock() - cpl := prefLen(id, rt.local) - - // Get bucket at cpl index or last bucket - var bucket *Bucket - if cpl >= len(rt.Buckets) { - cpl = len(rt.Buckets) - 1 - } - bucket = rt.Buckets[cpl] - - var peerArr peerSorterArr - if bucket.Len() == 0 { - // In the case of an unusual split, one bucket may be empty. - // if this happens, search both surrounding buckets for nearest peer - if cpl > 0 { - plist := (*list.List)(rt.Buckets[cpl - 1]) - peerArr = copyPeersFromList(id, peerArr, plist) - } - - if cpl < len(rt.Buckets) - 1 { - plist := (*list.List)(rt.Buckets[cpl + 1]) - peerArr = copyPeersFromList(id, peerArr, plist) - } - } else { - plist := (*list.List)(bucket) - peerArr = copyPeersFromList(id, peerArr, plist) - } - - // Sort by distance to local peer - sort.Sort(peerArr) - - var out []*peer.Peer - for i := 0; i < count && i < peerArr.Len(); i++ { - out = append(out, peerArr[i].p) - } - - return out -} - -// Returns the total number of peers in the routing table -func (rt *RoutingTable) Size() int { - var tot int - for _,buck := range rt.Buckets { - tot += buck.Len() - } - return tot -} - -// NOTE: This is potentially unsafe... use at your own risk -func (rt *RoutingTable) listpeers() []*peer.Peer { - var peers []*peer.Peer - for _,buck := range rt.Buckets { - for e := buck.getIter(); e != nil; e = e.Next() { - peers = append(peers, e.Value.(*peer.Peer)) - } - } - return peers -} diff --git a/table_test.go b/table_test.go deleted file mode 100644 index 393a1c5..0000000 --- a/table_test.go +++ /dev/null @@ -1,126 +0,0 @@ -package dht - -import ( - crand "crypto/rand" - "crypto/sha256" - "math/rand" - "container/list" - "testing" - - peer "github.com/jbenet/go-ipfs/peer" -) - -func _randPeer() *peer.Peer { - p := new(peer.Peer) - p.ID = make(peer.ID, 16) - crand.Read(p.ID) - return p -} - -func _randID() ID { - buf := make([]byte, 16) - crand.Read(buf) - - hash := sha256.Sum256(buf) - return ID(hash[:]) -} - -// Test basic features of the bucket struct -func TestBucket(t *testing.T) { - b := new(Bucket) - - peers := make([]*peer.Peer, 100) - for i := 0; i < 100; i++ { - peers[i] = _randPeer() - b.PushFront(peers[i]) - } - - local := _randPeer() - local_id := convertPeerID(local.ID) - - i := rand.Intn(len(peers)) - e := b.Find(peers[i].ID) - if e == nil { - t.Errorf("Failed to find peer: %v", peers[i]) - } - - spl := b.Split(0, convertPeerID(local.ID)) - llist := (*list.List)(b) - for e := llist.Front(); e != nil; e = e.Next() { - p := convertPeerID(e.Value.(*peer.Peer).ID) - cpl := xor(p, local_id).commonPrefixLen() - if cpl > 0 { - t.Fatalf("Split failed. found id with cpl > 0 in 0 bucket") - } - } - - rlist := (*list.List)(spl) - for e := rlist.Front(); e != nil; e = e.Next() { - p := convertPeerID(e.Value.(*peer.Peer).ID) - cpl := xor(p, local_id).commonPrefixLen() - if cpl == 0 { - t.Fatalf("Split failed. found id with cpl == 0 in non 0 bucket") - } - } -} - -// Right now, this just makes sure that it doesnt hang or crash -func TestTableUpdate(t *testing.T) { - local := _randPeer() - rt := NewRoutingTable(10, convertPeerID(local.ID)) - - peers := make([]*peer.Peer, 100) - for i := 0; i < 100; i++ { - peers[i] = _randPeer() - } - - // Testing Update - for i := 0; i < 10000; i++ { - p := rt.Update(peers[rand.Intn(len(peers))]) - if p != nil { - t.Log("evicted peer.") - } - } - - for i := 0; i < 100; i++ { - id := _randID() - ret := rt.NearestPeers(id, 5) - if len(ret) == 0 { - t.Fatal("Failed to find node near ID.") - } - } -} - -func TestTableFind(t *testing.T) { - local := _randPeer() - rt := NewRoutingTable(10, convertPeerID(local.ID)) - - peers := make([]*peer.Peer, 100) - for i := 0; i < 5; i++ { - peers[i] = _randPeer() - rt.Update(peers[i]) - } - - t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty()) - found := rt.NearestPeer(convertPeerID(peers[2].ID)) - if !found.ID.Equal(peers[2].ID) { - t.Fatalf("Failed to lookup known node...") - } -} - -func TestTableFindMultiple(t *testing.T) { - local := _randPeer() - rt := NewRoutingTable(20, convertPeerID(local.ID)) - - peers := make([]*peer.Peer, 100) - for i := 0; i < 18; i++ { - peers[i] = _randPeer() - rt.Update(peers[i]) - } - - t.Logf("Searching for peer: '%s'", peers[2].ID.Pretty()) - found := rt.NearestPeers(convertPeerID(peers[2].ID), 15) - if len(found) != 15 { - t.Fatalf("Got back different number of peers than we expected.") - } -} diff --git a/util.go b/util.go deleted file mode 100644 index 5961cd2..0000000 --- a/util.go +++ /dev/null @@ -1,82 +0,0 @@ -package dht - -import ( - "bytes" - "crypto/sha256" - - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" -) - -// ID for IpfsDHT should be a byte slice, to allow for simpler operations -// (xor). DHT ids are based on the peer.IDs. -// -// The type dht.ID signifies that its contents have been hashed from either a -// peer.ID or a util.Key. This unifies the keyspace -type ID []byte - -func (id ID) Equal(other ID) bool { - return bytes.Equal(id, other) -} - -func (id ID) Less(other interface{}) bool { - a, b := equalizeSizes(id, other.(ID)) - for i := 0; i < len(a); i++ { - if a[i] != b[i] { - return a[i] < b[i] - } - } - return len(a) < len(b) -} - -func (id ID) commonPrefixLen() int { - for i := 0; i < len(id); i++ { - for j := 0; j < 8; j++ { - if (id[i]>>uint8(7-j))&0x1 != 0 { - return i*8 + j - } - } - } - return len(id)*8 - 1 -} - -func prefLen(a, b ID) int { - return xor(a, b).commonPrefixLen() -} - -func xor(a, b ID) ID { - a, b = equalizeSizes(a, b) - - c := make(ID, len(a)) - for i := 0; i < len(a); i++ { - c[i] = a[i] ^ b[i] - } - return c -} - -func equalizeSizes(a, b ID) (ID, ID) { - la := len(a) - lb := len(b) - - if la < lb { - na := make([]byte, lb) - copy(na, a) - a = na - } else if lb < la { - nb := make([]byte, la) - copy(nb, b) - b = nb - } - - return a, b -} - -func convertPeerID(id peer.ID) ID { - hash := sha256.Sum256(id) - return hash[:] -} - -func convertKey(id u.Key) ID { - hash := sha256.Sum256([]byte(id)) - return hash[:] -}