301 lines
8.1 KiB
Go
Raw Normal View History

2014-09-17 07:19:40 -07:00
package dht
import (
2016-09-30 10:24:03 -07:00
"context"
"errors"
2019-05-26 23:33:15 +01:00
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
2020-01-17 07:00:42 -08:00
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-kad-dht/kpeerset/peerheap"
"math/big"
"time"
2020-01-17 07:00:42 -08:00
"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
kb "github.com/libp2p/go-libp2p-kbucket"
2014-09-17 07:19:40 -07:00
)
// ErrNoPeersQueried is returned when we failed to connect to any peers.
var ErrNoPeersQueried = errors.New("failed to query any peers")
2020-02-19 12:23:18 -05:00
type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
type stopFn func(*kpeerset.SortedPeerset) bool
2014-09-18 19:30:04 -07:00
// query represents a single disjoint query.
2020-02-19 12:23:18 -05:00
type query struct {
// the query context.
ctx context.Context
// the cancellation function for the query context.
2020-01-17 07:00:42 -08:00
cancel context.CancelFunc
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
dht *IpfsDHT
// localPeers is the set of peers that need to be queried or have already been queried for this query.
localPeers *kpeerset.SortedPeerset
// globallyQueriedPeers is the combined set of peers queried across ALL the disjoint queries.
2020-01-17 07:00:42 -08:00
globallyQueriedPeers *peer.Set
// the function that will be used to query a single peer.
queryFn queryFn
// stopFn is used to determine if we should stop the WHOLE disjoint query.
stopFn stopFn
2014-09-18 19:30:04 -07:00
}
2020-02-19 12:23:18 -05:00
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) {
2020-01-17 07:00:42 -08:00
queryCtx, cancelQuery := context.WithCancel(ctx)
numQueriesComplete := 0
queryDone := make(chan struct{}, d)
// pick the K closest peers to the key in our Routing table and shuffle them.
seedPeers := dht.routingTable.NearestPeers(kb.ConvertKey(target), dht.bucketSize)
if len(seedPeers) == 0 {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.QueryError,
Extra: kb.ErrLookupFailure.Error(),
})
return nil, kb.ErrLookupFailure
}
2020-01-17 07:00:42 -08:00
dht.rng.Shuffle(len(seedPeers), func(i, j int) {
seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i]
})
// create "d" disjoint queries
2020-02-19 12:23:18 -05:00
queries := make([]*query, d)
2020-01-17 07:00:42 -08:00
peersQueried := peer.NewSet()
for i := 0; i < d; i++ {
2020-02-19 12:23:18 -05:00
query := &query{
2020-01-17 07:00:42 -08:00
ctx: queryCtx,
cancel: cancelQuery,
dht: dht,
localPeers: kpeerset.NewSortedPeerset(dht.bucketSize, target),
2020-01-17 07:00:42 -08:00
globallyQueriedPeers: peersQueried,
queryFn: queryFn,
stopFn: stopFn,
}
queries[i] = query
2014-09-18 19:30:04 -07:00
}
2014-09-17 07:19:40 -07:00
// distribute the shuffled K closest peers as seeds among the "d" disjoint queries
2020-01-17 07:00:42 -08:00
for i := 0; i < len(seedPeers); i++ {
queries[i%d].localPeers.Add(seedPeers[i])
}
// start the "d" disjoint queries
2020-01-17 07:00:42 -08:00
for i := 0; i < d; i++ {
query := queries[i]
go func() {
strictParallelismQuery(query)
queryDone <- struct{}{}
}()
2015-03-07 02:44:20 -08:00
}
2020-01-17 07:00:42 -08:00
loop:
// wait for all the "d" disjoint queries to complete before we return
2020-01-17 07:00:42 -08:00
for {
select {
case <-queryDone:
numQueriesComplete++
if numQueriesComplete == d {
break loop
}
case <-ctx.Done():
break loop
}
}
return queries, nil
2014-09-18 19:30:04 -07:00
}
// TODO This function should be owned by the DHT as it dosen't really belong to "a query".
// scorePeerByDistanceAndLatency scores a peer using metrics such as connectendness of the peer, it's distance from the key
// and it's current known latency.
func (q query) scorePeerByDistanceAndLatency(p peer.ID, distanceFromKey *big.Int) interface{} {
connectedness := q.dht.host.Network().Connectedness(p)
latency := q.dht.host.Peerstore().LatencyEWMA(p)
var c int64
switch connectedness {
case network.Connected:
c = 1
case network.CanConnect:
c = 5
case network.CannotConnect:
c = 10000
default:
c = 20
}
l := int64(latency)
if l <= 0 {
l = int64(time.Second) * 10
}
res := big.NewInt(c)
res.Mul(res, big.NewInt(l))
res.Mul(res, distanceFromKey)
return res
2020-01-17 07:00:42 -08:00
}
2014-09-18 19:30:04 -07:00
// strictParallelismQuery concurrently sends the query RPC to all eligible peers
// and waits for ALL the RPC's to complete before starting the next round of RPC's.
2020-02-19 12:23:18 -05:00
func strictParallelismQuery(q *query) {
foundCloser := false
2020-01-17 07:00:42 -08:00
for {
// get the unqueried peers from among the K closest peers to the key sorted in ascending order
// of their 'distance-latency` score.
// We sort peers like this so that "better" peers are chosen to be in the α peers
// which get queried from among the unqueried K closet.
peersToQuery := q.localPeers.UnqueriedFromKClosest(q.scorePeerByDistanceAndLatency,
func(i1 peerheap.Item, i2 peerheap.Item) bool {
return i1.Value.(*big.Int).Cmp(i2.Value.(*big.Int)) == -1
})
// The lookup terminates when the initiator has queried and gotten responses from the k
// closest nodes it has heard about.
2020-01-17 07:00:42 -08:00
if len(peersToQuery) == 0 {
return
}
2014-09-18 19:30:04 -07:00
// Of the k nodes the initiator has heard of closest to the target,
// it picks α that it has not yet queried and resends the FIND NODE RPC to them.
numQuery := q.dht.alpha
// However, If a round of RPC's fails to return a node any closer than the closest already heard about,
// the initiator resends the RPC'S to all of the k closest nodes it has
// not already queried.
if !foundCloser {
2020-01-17 07:00:42 -08:00
numQuery = len(peersToQuery)
} else if pqLen := len(peersToQuery); pqLen < numQuery {
// if we don't have α peers, pick whatever number we have.
2020-01-17 07:00:42 -08:00
numQuery = pqLen
}
// reset foundCloser to false for the next round of RPC's
foundCloser = false
queryResCh := make(chan *queryResult, numQuery)
2020-01-17 07:00:42 -08:00
resultsReceived := 0
2014-09-18 19:30:04 -07:00
// send RPC's to all the chosen peers concurrently
2020-01-17 07:00:42 -08:00
for _, p := range peersToQuery[:numQuery] {
go func(p peer.ID) {
queryResCh <- q.queryPeer(p)
2020-01-17 07:00:42 -08:00
}(p)
}
loop:
// wait for all outstanding RPC's to complete before we start the next round.
2020-01-17 07:00:42 -08:00
for {
select {
case res := <-queryResCh:
foundCloser = foundCloser || res.foundCloserPeer
2020-01-17 07:00:42 -08:00
resultsReceived++
if resultsReceived == numQuery {
break loop
}
case <-q.ctx.Done():
return
}
}
}
2014-09-18 19:30:04 -07:00
}
type queryResult struct {
// foundCloserPeer is true if the peer we're querying returns a peer
// closer than the closest we've already heard about
foundCloserPeer bool
}
// queryPeer queries a single peer.
func (q *query) queryPeer(p peer.ID) *queryResult {
dialCtx, queryCtx := q.ctx, q.ctx
2020-01-17 07:00:42 -08:00
// dial the peer
2020-01-17 07:00:42 -08:00
if err := q.dht.dialPeer(dialCtx, p); err != nil {
q.localPeers.Remove(p)
return &queryResult{}
2020-01-17 07:00:42 -08:00
}
// add the peer to the global set of queried peers since the dial was successful
// so that no other disjoint query tries sending an RPC to the same peer
2020-01-17 07:00:42 -08:00
if !q.globallyQueriedPeers.TryAdd(p) {
q.localPeers.Remove(p)
return &queryResult{}
2014-11-21 08:03:11 -08:00
}
// did the dial fulfill the stop condition ?
if q.stopFn(q.localPeers) {
q.cancel()
return &queryResult{}
}
// send query RPC to the remote peer
2020-01-17 07:00:42 -08:00
newPeers, err := q.queryFn(queryCtx, p)
if err != nil {
q.localPeers.Remove(p)
return &queryResult{}
2014-09-18 19:30:04 -07:00
}
// mark the peer as queried.
2020-01-17 07:00:42 -08:00
q.localPeers.MarkQueried(p)
2020-01-17 07:00:42 -08:00
if len(newPeers) == 0 {
logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
2014-09-17 07:19:40 -07:00
}
2014-09-18 19:30:04 -07:00
foundCloserPeer := false
2020-01-17 07:00:42 -08:00
for _, next := range newPeers {
if next.ID == q.dht.self { // don't add self.
logger.Debugf("PEERS CLOSER -- worker for: %v found self", p)
continue
}
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
// add their addresses to the dialer's peerstore
q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
closer := q.localPeers.Add(next.ID)
foundCloserPeer = foundCloserPeer || closer
2014-09-18 19:30:04 -07:00
}
2020-01-17 07:00:42 -08:00
// did the successful query RPC fulfill the query stop condition ?
2020-01-17 07:00:42 -08:00
if q.stopFn(q.localPeers) {
q.cancel()
}
return &queryResult{
foundCloserPeer: foundCloserPeer,
}
2014-09-18 19:30:04 -07:00
}
2014-09-17 07:19:40 -07:00
2020-01-17 07:00:42 -08:00
func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {
// short-circuit if we're already connected.
2020-01-17 07:00:42 -08:00
if dht.host.Network().Connectedness(p) == network.Connected {
return nil
}
2019-02-01 17:46:46 +11:00
logger.Debug("not connected. dialing.")
2020-01-17 07:00:42 -08:00
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
2019-12-26 19:53:52 -05:00
Type: routing.DialingPeer,
ID: p,
})
2019-05-26 23:33:15 +01:00
pi := peer.AddrInfo{ID: p}
2020-01-17 07:00:42 -08:00
if err := dht.host.Connect(ctx, pi); err != nil {
2019-02-01 17:46:46 +11:00
logger.Debugf("error connecting: %s", err)
2020-01-17 07:00:42 -08:00
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
2019-12-26 19:53:52 -05:00
Type: routing.QueryError,
Extra: err.Error(),
ID: p,
})
return err
}
2019-02-01 17:46:46 +11:00
logger.Debugf("connected. dial success.")
return nil
}