diff --git a/kpeerset/sorted_peerset.go b/kpeerset/sorted_peerset.go index f4eac9b..ef1ef9e 100644 --- a/kpeerset/sorted_peerset.go +++ b/kpeerset/sorted_peerset.go @@ -160,6 +160,64 @@ func (ps *SortedPeerset) UnqueriedFromKClosest(getValue func(id peer.ID, distanc return peers } +func (ps *SortedPeerset) GetBestUnqueried(num int, getValue func(id peer.ID, distance *big.Int) interface{}, + sortWith peerheap.Comparator) []peer.ID { + ps.lock.Lock() + defer ps.lock.Unlock() + + unqueriedPeerItems := ps.heapKClosestPeers.FilterItems(ps.isPeerItemQueried) + + // create a min-heap to sort the unqueried peer Items using the given comparator + ph := peerheap.New(false, sortWith) + for _, i := range unqueriedPeerItems { + p := i.Peer + d := i.Value.(*big.Int) + heap.Push(ph, &peerheap.Item{Peer: p, Value: getValue(p, d)}) + } + // now pop so we get them in sorted order + peers := make([]peer.ID, 0, num) + for ph.Len() != 0 && len(peers) < num { + popped := heap.Pop(ph).(*peerheap.Item) + peers = append(peers, popped.Peer) + } + + return peers +} + +func (ps *SortedPeerset) GetClosestUnqueried(num int) []peer.ID { + ps.lock.Lock() + defer ps.lock.Unlock() + + sortWith := func(i1 peerheap.Item, i2 peerheap.Item) bool { + return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1 + } + + peerItems := ps.heapKClosestPeers.FilterItems(func(peerheap.Item) bool { return true }) + + // create a min-heap to sort the unqueried peer Items using the given comparator + ph := peerheap.New(false, sortWith) + for _, i := range peerItems { + p := i.Peer + d := i.Value.(*big.Int) + heap.Push(ph, &peerheap.Item{Peer: p, Value: d}) + } + // now pop so we get them in sorted order + peers := make([]peer.ID, 0, num) + for ph.Len() != 0 && len(peers) < num { + popped := heap.Pop(ph).(*peerheap.Item) + peers = append(peers, popped.Peer) + } + + unqueriedPeers := make([]peer.ID, 0, num) + for _, p := range peers { + if _, queried := ps.queried[p]; !queried { + unqueriedPeers = append(unqueriedPeers, p) + } + } + + return unqueriedPeers +} + // LenUnqueriedFromKClosest returns the number of unqueried peers among // the K closest peers. func (ps *SortedPeerset) LenUnqueriedFromKClosest() int { diff --git a/query.go b/query.go index 6eddf8f..39b2624 100644 --- a/query.go +++ b/query.go @@ -9,6 +9,7 @@ import ( "github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap" "math/big" + "sync" "time" "github.com/libp2p/go-libp2p-kad-dht/kpeerset" @@ -91,7 +92,8 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string for i := 0; i < d; i++ { query := queries[i] go func() { - strictParallelismQuery(query) + looseBParallelismQuery(query) + //strictParallelismQuery(query) queryDone <- struct{}{} }() } @@ -208,6 +210,68 @@ func strictParallelismQuery(q *query) { } } +// strictParallelismQuery concurrently sends the query RPC to all eligible peers +// and waits for ALL the RPC's to complete before starting the next round of RPC's. +func looseBParallelismQuery(q *query) { + alphaCh := make(chan bool, q.dht.alpha) + resultCh := make(chan *queryResult, q.dht.alpha) + + pathCtx, cancelPath := context.WithCancel(q.ctx) + defer cancelPath() + + scoreCmp := func(i1 peerheap.Item, i2 peerheap.Item) bool { + return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1 + } + + alphaMx := sync.Mutex{} + + for i := 0; i < q.dht.alpha; i++ { + go func() { + for { + if len(q.localPeers.GetClosestUnqueried(3)) == 0 { + cancelPath() + } + + select { + case top := <-alphaCh: + alphaMx.Lock() + var peers []peer.ID + if !top { + peers = q.localPeers.GetBestUnqueried(1, q.scorePeerByDistanceAndLatency, scoreCmp) + } else { + peers = q.localPeers.GetClosestUnqueried(3) + } + var qp peer.ID + if len(peers) > 0 { + qp = peers[0] + } else { + continue + } + q.localPeers.MarkQueried(qp) + alphaMx.Unlock() + resultCh <- q.queryPeer(qp) + case <-pathCtx.Done(): + return + } + } + }() + } + + foundCloserCounter := 0 + for closest := q.localPeers.GetClosestUnqueried(3); len(closest) > 0; { + select { + case alphaCh <- foundCloserCounter >= q.dht.alpha: + case res := <-resultCh: + if res.foundCloserPeer { + foundCloserCounter++ + } else { + foundCloserCounter = 0 + } + case <-pathCtx.Done(): + } + } +} + type queryResult struct { // foundCloserPeer is true if the peer we're querying returns a peer // closer than the closest we've already heard about