352 lines
7.8 KiB
Go
Raw Normal View History

2014-09-17 07:19:40 -07:00
package dht
import (
2016-09-30 10:24:03 -07:00
"context"
"errors"
2019-05-26 23:33:15 +01:00
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
2020-01-17 07:00:42 -08:00
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
2019-05-26 23:33:15 +01:00
kb "github.com/libp2p/go-libp2p-kbucket"
2019-05-26 23:33:15 +01:00
pstore "github.com/libp2p/go-libp2p-core/peerstore"
2014-09-17 07:19:40 -07:00
)
// ErrNoPeersQueried is returned when we failed to connect to any peers.
var ErrNoPeersQueried = errors.New("failed to query any peers")
2020-01-17 07:00:42 -08:00
type qfn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
type sfn func(*kpeerset.SortedPeerset) bool
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
type qu struct {
ctx context.Context
cancel context.CancelFunc
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
dht *IpfsDHT
2020-01-17 07:00:42 -08:00
localPeers *kpeerset.SortedPeerset
globallyQueriedPeers *peer.Set
queryFn qfn
stopFn sfn
2014-09-18 19:30:04 -07:00
}
2020-01-17 07:00:42 -08:00
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn qfn, stopFn sfn) []*qu {
queryCtx, cancelQuery := context.WithCancel(ctx)
numQueriesComplete := 0
queryDone := make(chan struct{}, d)
seedPeers := dht.routingTable.NearestPeers(kb.ConvertKey(target), KValue)
// TODO does this need to be here?
//if len(seedPeers) == 0 {
// routing.PublishQueryEvent(ctx, &routing.QueryEvent{
// Type: routing.QueryError,
// Extra: kb.ErrLookupFailure.Error(),
// })
// return nil, kb.ErrLookupFailure
//}
dht.rng.Shuffle(len(seedPeers), func(i, j int) {
seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i]
})
queries := make([]*qu, d)
peersQueried := peer.NewSet()
for i := 0; i < d; i++ {
query := &qu{
ctx: queryCtx,
cancel: cancelQuery,
dht: dht,
localPeers: kpeerset.NewSortedPeerset(KValue, target, dht.sortPeers),
globallyQueriedPeers: peersQueried,
queryFn: queryFn,
stopFn: stopFn,
}
queries[i] = query
2014-09-18 19:30:04 -07:00
}
2014-09-17 07:19:40 -07:00
2020-01-17 07:00:42 -08:00
for i := 0; i < len(seedPeers); i++ {
queries[i%d].localPeers.Add(seedPeers[i])
}
2020-01-17 07:00:42 -08:00
for i := 0; i < d; i++ {
query := queries[i]
go func() {
strictParallelismQuery(query)
queryDone <- struct{}{}
}()
2015-03-07 02:44:20 -08:00
}
2020-01-17 07:00:42 -08:00
loop:
for {
select {
case <-queryDone:
numQueriesComplete++
if numQueriesComplete == d {
break loop
}
case <-ctx.Done():
break loop
}
}
2020-01-17 07:00:42 -08:00
return queries
2014-09-18 19:30:04 -07:00
}
2020-01-17 07:00:42 -08:00
func (dht *IpfsDHT) sortPeers(peers []kpeerset.IPeerMetric) kpeerset.SortablePeers {
return kpeerset.PeersSortedByLatency(peers, dht.host.Network(), dht.peerstore)
}
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
func strictParallelismQuery(q *qu) {
/*
start with K closest peers (some queried already some not)
take best alpha (sorted by some metric)
query those alpha
once they complete:
if the alpha requests did not add any new peers to top K, repeat with unqueried top K
else repeat
*/
var lastPeers []peer.ID
for {
topPeers := q.localPeers.TopK()
2020-01-17 07:00:42 -08:00
peersToQuery := q.localPeers.KUnqueried()
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
if len(peersToQuery) == 0 {
return
}
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
numQuery := AlphaValue
// TODO: alternative: Check if we did not get any peers closer than alpha closest try k
if lastPeers != nil && peerSlicesEqual(lastPeers, topPeers) {
2020-01-17 07:00:42 -08:00
numQuery = len(peersToQuery)
} else if pqLen := len(peersToQuery); pqLen < numQuery {
numQuery = pqLen
}
lastPeers = topPeers
2020-01-17 07:00:42 -08:00
queryResCh := make(chan bool, numQuery)
resultsReceived := 0
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
for _, p := range peersToQuery[:numQuery] {
go func(p peer.ID) {
queryResCh <- q.queryPeer(q.ctx, p)
}(p)
}
loop:
for {
select {
case <-queryResCh:
resultsReceived++
if resultsReceived == numQuery {
break loop
}
case <-q.ctx.Done():
return
}
}
}
2014-09-18 19:30:04 -07:00
}
2020-01-17 07:00:42 -08:00
func simpleQuery(q *qu) {
/*
start with K closest peers (some queried already some not)
take best alpha (sorted by some metric)
query those alpha
- if a query fails then take the next one
once they complete:
if the alpha requests did not add any new peers to top K, repeat with unqueried top K
else repeat
*/
var lastPeers []peer.ID
for {
peersToQuery := q.localPeers.KUnqueried()
if len(peersToQuery) == 0 {
return
}
2020-01-17 07:00:42 -08:00
numQuery := AlphaValue
if lastPeers != nil && peerSlicesEqual(lastPeers, peersToQuery) {
numQuery = len(peersToQuery)
} else if pqLen := len(peersToQuery); pqLen < numQuery {
numQuery = pqLen
}
2015-01-05 04:35:54 -08:00
2020-01-17 07:00:42 -08:00
peersToQueryCh := make(chan peer.ID, numQuery)
for _, p := range peersToQuery[:numQuery] {
peersToQueryCh <- p
}
queryResCh := make(chan bool, numQuery)
queriesSucceeded, queriesSent := 0, numQuery
2014-09-17 07:19:40 -07:00
2020-01-17 07:00:42 -08:00
dialPeers:
for {
select {
case p := <-peersToQueryCh:
go func() {
queryResCh <- q.queryPeer(q.ctx, p)
}()
case success := <-queryResCh:
if success {
queriesSucceeded++
if queriesSucceeded == numQuery {
break dialPeers
}
} else {
queriesSent++
if queriesSent >= len(peersToQuery) {
break dialPeers
}
peersToQueryCh <- peersToQuery[queriesSent]
}
case <-q.ctx.Done():
return
}
}
2015-03-07 02:44:20 -08:00
}
2020-01-17 07:00:42 -08:00
}
func boundedDialQuery(q *qu) {
/*
start with K closest peers (some queried already some not)
take best alpha (sorted by some metric)
query those alpha
-- if queried peer falls out of top K we've heard of + top alpha we've received responses from
+ others like percentage of way through the timeout, their reputation, etc.
1) Cancel dial 2) Cancel query but not dial 3) Continue with query
*/
var lastPeers []peer.ID
for {
peersToQuery := q.localPeers.KUnqueried()
2015-03-07 02:44:20 -08:00
2020-01-17 07:00:42 -08:00
if len(peersToQuery) == 0 {
return
2014-09-18 19:30:04 -07:00
}
2020-01-17 07:00:42 -08:00
numQuery := AlphaValue
if lastPeers != nil && peerSlicesEqual(lastPeers, peersToQuery) {
numQuery = len(peersToQuery)
}
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
peersToQueryCh := make(chan peer.ID, numQuery)
for _, p := range peersToQuery[:numQuery] {
peersToQueryCh <- p
}
queryResCh := make(chan bool, numQuery)
queriesSucceeded, queriesSent := 0, 0
2020-01-17 07:00:42 -08:00
for {
select {
case p := <-peersToQueryCh:
go func() {
queryResCh <- q.queryPeer(q.ctx, p)
}()
case success := <-queryResCh:
if success {
queriesSucceeded++
} else {
queriesSent++
if queriesSent >= len(peersToQuery) {
return
}
peersToQueryCh <- peersToQuery[queriesSent]
}
case <-q.ctx.Done():
return
}
}
2014-09-18 19:30:04 -07:00
}
}
2020-01-17 07:00:42 -08:00
func (q *qu) queryPeer(ctx context.Context, p peer.ID) bool {
dialCtx, queryCtx := ctx, ctx
if err := q.dht.dialPeer(dialCtx, p); err != nil {
q.localPeers.Remove(p)
return false
}
if !q.globallyQueriedPeers.TryAdd(p) {
q.localPeers.Remove(p)
return false
2014-11-21 08:03:11 -08:00
}
2020-01-17 07:00:42 -08:00
newPeers, err := q.queryFn(queryCtx, p)
if err != nil {
q.localPeers.Remove(p)
return false
2014-09-18 19:30:04 -07:00
}
2020-01-17 07:00:42 -08:00
q.localPeers.MarkQueried(p)
2020-01-17 07:00:42 -08:00
if len(newPeers) == 0 {
logger.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
2014-09-17 07:19:40 -07:00
}
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
for _, next := range newPeers {
if next.ID == q.dht.self { // don't add self.
logger.Debugf("PEERS CLOSER -- worker for: %v found self", p)
continue
}
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
// add their addresses to the dialer's peerstore
q.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
}
2014-09-18 19:30:04 -07:00
2020-01-17 07:00:42 -08:00
for _, np := range newPeers {
q.localPeers.Add(np.ID)
2014-09-18 19:30:04 -07:00
}
2020-01-17 07:00:42 -08:00
if q.stopFn(q.localPeers) {
q.cancel()
}
return true
2014-09-18 19:30:04 -07:00
}
2014-09-17 07:19:40 -07:00
2020-01-17 07:00:42 -08:00
func (dht *IpfsDHT) dialPeer(ctx context.Context, p peer.ID) error {
// short-circuit if we're already connected.
2020-01-17 07:00:42 -08:00
if dht.host.Network().Connectedness(p) == network.Connected {
return nil
}
2019-02-01 17:46:46 +11:00
logger.Debug("not connected. dialing.")
2020-01-17 07:00:42 -08:00
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
2019-12-26 19:53:52 -05:00
Type: routing.DialingPeer,
ID: p,
})
2019-05-26 23:33:15 +01:00
pi := peer.AddrInfo{ID: p}
2020-01-17 07:00:42 -08:00
if err := dht.host.Connect(ctx, pi); err != nil {
2019-02-01 17:46:46 +11:00
logger.Debugf("error connecting: %s", err)
2020-01-17 07:00:42 -08:00
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
2019-12-26 19:53:52 -05:00
Type: routing.QueryError,
Extra: err.Error(),
ID: p,
})
return err
}
2019-02-01 17:46:46 +11:00
logger.Debugf("connected. dial success.")
return nil
}
2020-01-17 07:00:42 -08:00
// Equal tells whether a and b contain the same elements.
// A nil argument is equivalent to an empty slice.
func peerSlicesEqual(a, b []peer.ID) bool {
if len(a) != len(b) {
return false
}
for i, v := range a {
if v != b[i] {
return false
2014-09-18 19:30:04 -07:00
}
2014-09-17 07:19:40 -07:00
}
2020-01-17 07:00:42 -08:00
return true
2014-09-17 07:19:40 -07:00
}