mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-25 06:42:13 +00:00
document and clean small things
This commit is contained in:
parent
bae77cc534
commit
2026fcb558
@ -355,8 +355,8 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
|
|||||||
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
|
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
|
||||||
for _, pi := range pinfos {
|
for _, pi := range pinfos {
|
||||||
if pi.ID != p {
|
if pi.ID != p {
|
||||||
// we should ignore this provider reccord! not from originator.
|
// we should ignore this provider record! not from originator.
|
||||||
// (we chould sign them and check signature later...)
|
// (we should sign them and check signature later...)
|
||||||
log.Debugf("handleAddProvider received provider %s from %s. Ignore.", pi.ID, p)
|
log.Debugf("handleAddProvider received provider %s from %s. Ignore.", pi.ID, p)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -367,7 +367,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("received provider %s for %s (addrs: %s)", p, c, pi.Addrs)
|
log.Infof("received provider %s for %s (addrs: %s)", p, c, pi.Addrs)
|
||||||
if pi.ID != dht.self { // dont add own addrs.
|
if pi.ID != dht.self { // don't add own addrs.
|
||||||
// add the received addresses to our peerstore.
|
// add the received addresses to our peerstore.
|
||||||
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, pstore.ProviderAddrTTL)
|
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, pstore.ProviderAddrTTL)
|
||||||
}
|
}
|
||||||
|
2
notif.go
2
notif.go
@ -37,7 +37,7 @@ func (nn *netNotifiee) Connected(n inet.Network, v inet.Conn) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note: Unfortunately, the peerstore may not yet now that this peer is
|
// Note: Unfortunately, the peerstore may not yet know that this peer is
|
||||||
// a DHT server. So, if it didn't return a positive response above, test
|
// a DHT server. So, if it didn't return a positive response above, test
|
||||||
// manually.
|
// manually.
|
||||||
go nn.testConnection(v)
|
go nn.testConnection(v)
|
||||||
|
9
query.go
9
query.go
@ -1,3 +1,8 @@
|
|||||||
|
// package query implement a query manager to drive concurrent workers
|
||||||
|
// to query the DHT. A query is setup with a target key, a queryFunc tasked
|
||||||
|
// to communicate with a peer, and a set of initial peers. As the query
|
||||||
|
// progress, queryFunc can return closer peers that will be used to navigate
|
||||||
|
// closer to the target key in the DHT until an answer is reached.
|
||||||
package dht
|
package dht
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -224,7 +229,7 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
|
|||||||
|
|
||||||
// make sure we do this when we exit
|
// make sure we do this when we exit
|
||||||
defer func() {
|
defer func() {
|
||||||
// signal we're done proccessing peer p
|
// signal we're done processing peer p
|
||||||
r.peersRemaining.Decrement(1)
|
r.peersRemaining.Decrement(1)
|
||||||
r.rateLimit <- struct{}{}
|
r.rateLimit <- struct{}{}
|
||||||
}()
|
}()
|
||||||
@ -283,7 +288,7 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
|
|||||||
} else if len(res.closerPeers) > 0 {
|
} else if len(res.closerPeers) > 0 {
|
||||||
log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
|
log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
|
||||||
for _, next := range res.closerPeers {
|
for _, next := range res.closerPeers {
|
||||||
if next.ID == r.query.dht.self { // dont add self.
|
if next.ID == r.query.dht.self { // don't add self.
|
||||||
log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
|
log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -157,10 +157,10 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []r
|
|||||||
vals := make([]routing.RecvdVal, 0, nvals)
|
vals := make([]routing.RecvdVal, 0, nvals)
|
||||||
var valslock sync.Mutex
|
var valslock sync.Mutex
|
||||||
|
|
||||||
// If we have it local, dont bother doing an RPC!
|
// If we have it local, don't bother doing an RPC!
|
||||||
lrec, err := dht.getLocal(key)
|
lrec, err := dht.getLocal(key)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
// TODO: this is tricky, we dont always want to trust our own value
|
// TODO: this is tricky, we don't always want to trust our own value
|
||||||
// what if the authoritative source updated it?
|
// what if the authoritative source updated it?
|
||||||
log.Debug("have it locally")
|
log.Debug("have it locally")
|
||||||
vals = append(vals, routing.RecvdVal{
|
vals = append(vals, routing.RecvdVal{
|
||||||
@ -219,7 +219,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []r
|
|||||||
valslock.Lock()
|
valslock.Lock()
|
||||||
vals = append(vals, rv)
|
vals = append(vals, rv)
|
||||||
|
|
||||||
// If weve collected enough records, we're done
|
// If we have collected enough records, we're done
|
||||||
if len(vals) >= nvals {
|
if len(vals) >= nvals {
|
||||||
res.success = true
|
res.success = true
|
||||||
}
|
}
|
||||||
@ -343,7 +343,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we have enough peers locally, dont bother with remote RPC
|
// If we have enough peers locally, don't bother with remote RPC
|
||||||
// TODO: is this a DOS vector?
|
// TODO: is this a DOS vector?
|
||||||
if ps.Size() >= count {
|
if ps.Size() >= count {
|
||||||
return
|
return
|
||||||
|
Loading…
x
Reference in New Issue
Block a user