2014-09-17 07:19:40 -07:00
|
|
|
package dht
|
|
|
|
|
|
|
|
import (
|
2016-09-30 10:24:03 -07:00
|
|
|
"context"
|
2019-04-24 08:29:00 -07:00
|
|
|
"errors"
|
2019-05-26 23:33:15 +01:00
|
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
|
|
"github.com/libp2p/go-libp2p-core/peer"
|
2020-01-17 07:00:42 -08:00
|
|
|
"github.com/libp2p/go-libp2p-core/routing"
|
|
|
|
"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
|
2019-04-26 12:55:45 -07:00
|
|
|
kb "github.com/libp2p/go-libp2p-kbucket"
|
2019-05-26 23:33:15 +01:00
|
|
|
|
|
|
|
pstore "github.com/libp2p/go-libp2p-core/peerstore"
|
2014-09-17 07:19:40 -07:00
|
|
|
)
|
|
|
|
|
2019-04-24 08:29:00 -07:00
|
|
|
// ErrNoPeersQueried is returned when we failed to connect to any peers.
|
|
|
|
var ErrNoPeersQueried = errors.New("failed to query any peers")
|
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
type qfn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
|
|
|
|
type sfn func(*kpeerset.SortedPeerset) bool
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
type qu struct {
|
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
dht *IpfsDHT
|
2017-03-05 12:08:20 -08:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
localPeers *kpeerset.SortedPeerset
|
|
|
|
globallyQueriedPeers *peer.Set
|
|
|
|
queryFn qfn
|
|
|
|
stopFn sfn
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
|
|
|
|
2020-01-21 17:29:37 -08:00
|
|
|
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn qfn, stopFn sfn) ([]*qu, error) {
|
2020-01-17 07:00:42 -08:00
|
|
|
queryCtx, cancelQuery := context.WithCancel(ctx)
|
|
|
|
|
|
|
|
numQueriesComplete := 0
|
|
|
|
queryDone := make(chan struct{}, d)
|
|
|
|
|
2020-02-01 21:25:58 -05:00
|
|
|
seedPeers := dht.routingTable.NearestPeers(kb.ConvertKey(target), dht.bucketSize)
|
2020-01-21 17:29:37 -08:00
|
|
|
if len(seedPeers) == 0 {
|
|
|
|
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
|
|
|
Type: routing.QueryError,
|
|
|
|
Extra: kb.ErrLookupFailure.Error(),
|
|
|
|
})
|
|
|
|
return nil, kb.ErrLookupFailure
|
|
|
|
}
|
2020-01-17 07:00:42 -08:00
|
|
|
|
|
|
|
dht.rng.Shuffle(len(seedPeers), func(i, j int) {
|
|
|
|
seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i]
|
|
|
|
})
|
|
|
|
|
|
|
|
queries := make([]*qu, d)
|
|
|
|
|
|
|
|
peersQueried := peer.NewSet()
|
|
|
|
for i := 0; i < d; i++ {
|
|
|
|
query := &qu{
|
|
|
|
ctx: queryCtx,
|
|
|
|
cancel: cancelQuery,
|
|
|
|
dht: dht,
|
2020-02-01 21:25:58 -05:00
|
|
|
localPeers: kpeerset.NewSortedPeerset(dht.bucketSize, target, dht.sortPeers),
|
2020-01-17 07:00:42 -08:00
|
|
|
globallyQueriedPeers: peersQueried,
|
|
|
|
queryFn: queryFn,
|
|
|
|
stopFn: stopFn,
|
|
|
|
}
|
|
|
|
|
|
|
|
queries[i] = query
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
2014-09-17 07:19:40 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
for i := 0; i < len(seedPeers); i++ {
|
|
|
|
queries[i%d].localPeers.Add(seedPeers[i])
|
2019-04-26 12:55:45 -07:00
|
|
|
}
|
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
for i := 0; i < d; i++ {
|
|
|
|
query := queries[i]
|
|
|
|
go func() {
|
|
|
|
strictParallelismQuery(query)
|
|
|
|
queryDone <- struct{}{}
|
|
|
|
}()
|
2015-03-07 02:44:20 -08:00
|
|
|
}
|
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
loop:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-queryDone:
|
|
|
|
numQueriesComplete++
|
|
|
|
if numQueriesComplete == d {
|
|
|
|
break loop
|
|
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
|
|
break loop
|
|
|
|
}
|
|
|
|
}
|
2015-02-03 12:18:30 -08:00
|
|
|
|
2020-01-21 17:29:37 -08:00
|
|
|
return queries, nil
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
func (dht *IpfsDHT) sortPeers(peers []kpeerset.IPeerMetric) kpeerset.SortablePeers {
|
|
|
|
return kpeerset.PeersSortedByLatency(peers, dht.host.Network(), dht.peerstore)
|
|
|
|
}
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
func strictParallelismQuery(q *qu) {
|
|
|
|
/*
|
|
|
|
start with K closest peers (some queried already some not)
|
|
|
|
take best alpha (sorted by some metric)
|
|
|
|
query those alpha
|
|
|
|
once they complete:
|
|
|
|
if the alpha requests did not add any new peers to top K, repeat with unqueried top K
|
|
|
|
else repeat
|
|
|
|
*/
|
|
|
|
|
2020-01-30 14:45:11 -05:00
|
|
|
foundCloser := false
|
2020-01-17 07:00:42 -08:00
|
|
|
for {
|
|
|
|
peersToQuery := q.localPeers.KUnqueried()
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
if len(peersToQuery) == 0 {
|
|
|
|
return
|
|
|
|
}
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2020-01-30 14:45:11 -05:00
|
|
|
// TODO: Is it finding a closer peer if it's closer than one we know about or one we have queried?
|
2020-02-01 21:57:29 -05:00
|
|
|
numQuery := q.dht.alpha
|
2020-01-30 14:45:11 -05:00
|
|
|
if foundCloser {
|
2020-01-17 07:00:42 -08:00
|
|
|
numQuery = len(peersToQuery)
|
|
|
|
} else if pqLen := len(peersToQuery); pqLen < numQuery {
|
|
|
|
numQuery = pqLen
|
|
|
|
}
|
2020-01-30 14:45:11 -05:00
|
|
|
foundCloser = false
|
2015-11-04 21:49:20 -08:00
|
|
|
|
2020-01-30 14:45:11 -05:00
|
|
|
queryResCh := make(chan *queryResult, numQuery)
|
2020-01-17 07:00:42 -08:00
|
|
|
resultsReceived := 0
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
for _, p := range peersToQuery[:numQuery] {
|
|
|
|
go func(p peer.ID) {
|
|
|
|
queryResCh <- q.queryPeer(q.ctx, p)
|
|
|
|
}(p)
|
|
|
|
}
|
|
|
|
|
|
|
|
loop:
|
|
|
|
for {
|
|
|
|
select {
|
2020-01-30 14:45:11 -05:00
|
|
|
case res := <-queryResCh:
|
|
|
|
foundCloser = foundCloser || res.foundCloserPeer
|
2020-01-17 07:00:42 -08:00
|
|
|
resultsReceived++
|
|
|
|
if resultsReceived == numQuery {
|
|
|
|
break loop
|
|
|
|
}
|
|
|
|
case <-q.ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
2019-01-31 12:56:28 +00:00
|
|
|
}
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
func simpleQuery(q *qu) {
|
|
|
|
/*
|
|
|
|
start with K closest peers (some queried already some not)
|
|
|
|
take best alpha (sorted by some metric)
|
|
|
|
query those alpha
|
|
|
|
- if a query fails then take the next one
|
|
|
|
once they complete:
|
|
|
|
if the alpha requests did not add any new peers to top K, repeat with unqueried top K
|
|
|
|
else repeat
|
|
|
|
*/
|
|
|
|
|
|
|
|
var lastPeers []peer.ID
|
|
|
|
for {
|
|
|
|
peersToQuery := q.localPeers.KUnqueried()
|
|
|
|
|
|
|
|
if len(peersToQuery) == 0 {
|
|
|
|
return
|
|
|
|
}
|
2020-02-27 17:47:15 -08:00
|
|
|
|
2020-02-01 21:57:29 -05:00
|
|
|
numQuery := q.dht.alpha
|
2020-01-17 07:00:42 -08:00
|
|
|
if lastPeers != nil && peerSlicesEqual(lastPeers, peersToQuery) {
|
|
|
|
numQuery = len(peersToQuery)
|
|
|
|
} else if pqLen := len(peersToQuery); pqLen < numQuery {
|
|
|
|
numQuery = pqLen
|
|
|
|
}
|
2015-01-05 04:35:54 -08:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
peersToQueryCh := make(chan peer.ID, numQuery)
|
|
|
|
for _, p := range peersToQuery[:numQuery] {
|
|
|
|
peersToQueryCh <- p
|
|
|
|
}
|
2020-01-30 14:45:11 -05:00
|
|
|
queryResCh := make(chan *queryResult, numQuery)
|
2020-01-17 07:00:42 -08:00
|
|
|
queriesSucceeded, queriesSent := 0, numQuery
|
2014-09-17 07:19:40 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
dialPeers:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case p := <-peersToQueryCh:
|
|
|
|
go func() {
|
|
|
|
queryResCh <- q.queryPeer(q.ctx, p)
|
|
|
|
}()
|
2020-01-30 14:45:11 -05:00
|
|
|
case res := <-queryResCh:
|
|
|
|
if res.success {
|
2020-01-17 07:00:42 -08:00
|
|
|
queriesSucceeded++
|
|
|
|
if queriesSucceeded == numQuery {
|
|
|
|
break dialPeers
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
queriesSent++
|
|
|
|
if queriesSent >= len(peersToQuery) {
|
|
|
|
break dialPeers
|
|
|
|
}
|
|
|
|
peersToQueryCh <- peersToQuery[queriesSent]
|
|
|
|
}
|
|
|
|
case <-q.ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
2015-03-07 02:44:20 -08:00
|
|
|
}
|
2020-01-17 07:00:42 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
func boundedDialQuery(q *qu) {
|
|
|
|
/*
|
|
|
|
start with K closest peers (some queried already some not)
|
|
|
|
take best alpha (sorted by some metric)
|
|
|
|
query those alpha
|
|
|
|
-- if queried peer falls out of top K we've heard of + top alpha we've received responses from
|
|
|
|
+ others like percentage of way through the timeout, their reputation, etc.
|
|
|
|
1) Cancel dial 2) Cancel query but not dial 3) Continue with query
|
|
|
|
*/
|
|
|
|
|
|
|
|
var lastPeers []peer.ID
|
|
|
|
for {
|
|
|
|
peersToQuery := q.localPeers.KUnqueried()
|
2015-03-07 02:44:20 -08:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
if len(peersToQuery) == 0 {
|
|
|
|
return
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
|
|
|
|
2020-02-01 21:57:29 -05:00
|
|
|
numQuery := q.dht.alpha
|
2020-01-17 07:00:42 -08:00
|
|
|
if lastPeers != nil && peerSlicesEqual(lastPeers, peersToQuery) {
|
|
|
|
numQuery = len(peersToQuery)
|
|
|
|
}
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
peersToQueryCh := make(chan peer.ID, numQuery)
|
|
|
|
for _, p := range peersToQuery[:numQuery] {
|
|
|
|
peersToQueryCh <- p
|
|
|
|
}
|
2020-01-30 14:45:11 -05:00
|
|
|
queryResCh := make(chan *queryResult, numQuery)
|
2020-01-17 07:00:42 -08:00
|
|
|
queriesSucceeded, queriesSent := 0, 0
|
2019-04-24 08:29:00 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case p := <-peersToQueryCh:
|
|
|
|
go func() {
|
|
|
|
queryResCh <- q.queryPeer(q.ctx, p)
|
|
|
|
}()
|
2020-01-30 14:45:11 -05:00
|
|
|
case res := <-queryResCh:
|
|
|
|
if res.success {
|
2020-01-17 07:00:42 -08:00
|
|
|
queriesSucceeded++
|
|
|
|
} else {
|
|
|
|
queriesSent++
|
|
|
|
if queriesSent >= len(peersToQuery) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
peersToQueryCh <- peersToQuery[queriesSent]
|
|
|
|
}
|
|
|
|
case <-q.ctx.Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-01-30 14:45:11 -05:00
|
|
|
type queryResult struct {
|
|
|
|
success bool
|
|
|
|
foundCloserPeer bool
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *qu) queryPeer(ctx context.Context, p peer.ID) *queryResult {
|
2020-01-17 07:00:42 -08:00
|
|
|
dialCtx, queryCtx := ctx, ctx
|
|
|
|
|
|
|
|
if err := q.dht.dialPeer(dialCtx, p); err != nil {
|
|
|
|
q.localPeers.Remove(p)
|
2020-01-30 14:45:11 -05:00
|
|
|
return &queryResult{}
|
2020-01-17 07:00:42 -08:00
|
|
|
}
|
|
|
|
if !q.globallyQueriedPeers.TryAdd(p) {
|
|
|
|
q.localPeers.Remove(p)
|
2020-01-30 14:45:11 -05:00
|
|
|
return &queryResult{}
|
2014-11-21 08:03:11 -08:00
|
|
|
}
|
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
newPeers, err := q.queryFn(queryCtx, p)
|
|
|
|
if err != nil {
|
|
|
|
q.localPeers.Remove(p)
|
2020-01-30 14:45:11 -05:00
|
|
|
return &queryResult{}
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
q.localPeers.MarkQueried(p)
|
2015-11-04 21:49:20 -08:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
if len(newPeers) == 0 {
|
|
|
|
logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
|
2014-09-17 07:19:40 -07:00
|
|
|
}
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
for _, next := range newPeers {
|
|
|
|
if next.ID == q.dht.self { // don't add self.
|
|
|
|
logger.Debugf("PEERS CLOSER -- worker for: %v found self", p)
|
|
|
|
continue
|
|
|
|
}
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
// add their addresses to the dialer's peerstore
|
|
|
|
q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
|
|
|
|
}
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2020-01-30 14:45:11 -05:00
|
|
|
foundCloserPeer := false
|
2020-01-17 07:00:42 -08:00
|
|
|
for _, np := range newPeers {
|
2020-01-30 14:45:11 -05:00
|
|
|
closer := q.localPeers.Add(np.ID)
|
|
|
|
foundCloserPeer = foundCloserPeer || closer
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
2020-01-17 07:00:42 -08:00
|
|
|
|
|
|
|
if q.stopFn(q.localPeers) {
|
|
|
|
q.cancel()
|
|
|
|
}
|
2020-01-30 14:45:11 -05:00
|
|
|
return &queryResult{
|
|
|
|
success: true,
|
|
|
|
foundCloserPeer: foundCloserPeer,
|
|
|
|
}
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
2014-09-17 07:19:40 -07:00
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {
|
2019-01-24 18:04:52 +00:00
|
|
|
// short-circuit if we're already connected.
|
2020-01-17 07:00:42 -08:00
|
|
|
if dht.host.Network().Connectedness(p) == network.Connected {
|
2019-01-24 18:04:52 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-02-01 17:46:46 +11:00
|
|
|
logger.Debug("not connected. dialing.")
|
2020-01-17 07:00:42 -08:00
|
|
|
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
2019-12-26 19:53:52 -05:00
|
|
|
Type: routing.DialingPeer,
|
2019-01-24 18:04:52 +00:00
|
|
|
ID: p,
|
|
|
|
})
|
|
|
|
|
2019-05-26 23:33:15 +01:00
|
|
|
pi := peer.AddrInfo{ID: p}
|
2020-01-17 07:00:42 -08:00
|
|
|
if err := dht.host.Connect(ctx, pi); err != nil {
|
2019-02-01 17:46:46 +11:00
|
|
|
logger.Debugf("error connecting: %s", err)
|
2020-01-17 07:00:42 -08:00
|
|
|
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
2019-12-26 19:53:52 -05:00
|
|
|
Type: routing.QueryError,
|
2019-01-24 18:04:52 +00:00
|
|
|
Extra: err.Error(),
|
|
|
|
ID: p,
|
|
|
|
})
|
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
2019-02-01 17:46:46 +11:00
|
|
|
logger.Debugf("connected. dial success.")
|
2019-01-24 18:04:52 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-01-17 07:00:42 -08:00
|
|
|
// Equal tells whether a and b contain the same elements.
|
|
|
|
// A nil argument is equivalent to an empty slice.
|
|
|
|
func peerSlicesEqual(a, b []peer.ID) bool {
|
|
|
|
if len(a) != len(b) {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
for i, v := range a {
|
|
|
|
if v != b[i] {
|
|
|
|
return false
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
2014-09-17 07:19:40 -07:00
|
|
|
}
|
2020-01-17 07:00:42 -08:00
|
|
|
return true
|
2014-09-17 07:19:40 -07:00
|
|
|
}
|