mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
fixed small bug introduced during race condition frustration
This commit is contained in:
parent
b17baaab00
commit
ee923993e3
100
dht.go
100
dht.go
@ -3,6 +3,7 @@ package dht
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
@ -22,7 +23,7 @@ import (
|
||||
// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
|
||||
// It is used to implement the base IpfsRouting module.
|
||||
type IpfsDHT struct {
|
||||
routes *RoutingTable
|
||||
routes []*RoutingTable
|
||||
|
||||
network *swarm.Swarm
|
||||
|
||||
@ -38,7 +39,7 @@ type IpfsDHT struct {
|
||||
providerLock sync.RWMutex
|
||||
|
||||
// map of channels waiting for reply messages
|
||||
listeners map[uint64]chan *swarm.Message
|
||||
listeners map[uint64]*listenInfo
|
||||
listenLock sync.RWMutex
|
||||
|
||||
// Signal to shutdown dht
|
||||
@ -46,6 +47,14 @@ type IpfsDHT struct {
|
||||
|
||||
// When this peer started up
|
||||
birth time.Time
|
||||
|
||||
//lock to make diagnostics work better
|
||||
diaglock sync.Mutex
|
||||
}
|
||||
|
||||
type listenInfo struct {
|
||||
resp chan *swarm.Message
|
||||
count int
|
||||
}
|
||||
|
||||
// Create a new DHT object with the given peer as the 'local' host
|
||||
@ -63,10 +72,11 @@ func NewDHT(p *peer.Peer) (*IpfsDHT, error) {
|
||||
dht.network = network
|
||||
dht.datastore = ds.NewMapDatastore()
|
||||
dht.self = p
|
||||
dht.listeners = make(map[uint64]chan *swarm.Message)
|
||||
dht.listeners = make(map[uint64]*listenInfo)
|
||||
dht.providers = make(map[u.Key][]*providerInfo)
|
||||
dht.shutdown = make(chan struct{})
|
||||
dht.routes = NewRoutingTable(20, convertPeerID(p.ID))
|
||||
dht.routes = make([]*RoutingTable, 1)
|
||||
dht.routes[0] = NewRoutingTable(20, convertPeerID(p.ID))
|
||||
dht.birth = time.Now()
|
||||
return dht, nil
|
||||
}
|
||||
@ -106,7 +116,7 @@ func (dht *IpfsDHT) Connect(addr *ma.Multiaddr) (*peer.Peer, error) {
|
||||
|
||||
dht.network.StartConn(conn)
|
||||
|
||||
removed := dht.routes.Update(peer)
|
||||
removed := dht.routes[0].Update(peer)
|
||||
if removed != nil {
|
||||
panic("need to remove this peer.")
|
||||
}
|
||||
@ -142,7 +152,7 @@ func (dht *IpfsDHT) handleMessages() {
|
||||
}
|
||||
|
||||
// Update peers latest visit in routing table
|
||||
removed := dht.routes.Update(mes.Peer)
|
||||
removed := dht.routes[0].Update(mes.Peer)
|
||||
if removed != nil {
|
||||
panic("Need to handle removed peer.")
|
||||
}
|
||||
@ -150,10 +160,15 @@ func (dht *IpfsDHT) handleMessages() {
|
||||
// Note: not sure if this is the correct place for this
|
||||
if pmes.GetResponse() {
|
||||
dht.listenLock.RLock()
|
||||
ch, ok := dht.listeners[pmes.GetId()]
|
||||
list, ok := dht.listeners[pmes.GetId()]
|
||||
if list.count > 1 {
|
||||
list.count--
|
||||
} else if list.count == 1 {
|
||||
delete(dht.listeners, pmes.GetId())
|
||||
}
|
||||
dht.listenLock.RUnlock()
|
||||
if ok {
|
||||
ch <- mes
|
||||
list.resp <- mes
|
||||
} else {
|
||||
// this is expected behaviour during a timeout
|
||||
u.DOut("Received response with nobody listening...")
|
||||
@ -181,7 +196,7 @@ func (dht *IpfsDHT) handleMessages() {
|
||||
case DHTMessage_PING:
|
||||
dht.handlePing(mes.Peer, pmes)
|
||||
case DHTMessage_DIAGNOSTIC:
|
||||
// TODO: network diagnostic messages
|
||||
dht.handleDiagnostic(mes.Peer, pmes)
|
||||
}
|
||||
|
||||
case err := <-dht.network.Chan.Errors:
|
||||
@ -220,7 +235,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *DHTMessage) {
|
||||
}
|
||||
} else if err == ds.ErrNotFound {
|
||||
// Find closest peer(s) to desired key and reply with that info
|
||||
closer := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey())))
|
||||
closer := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
|
||||
resp = &pDHTMessage{
|
||||
Response: true,
|
||||
Id: *pmes.Id,
|
||||
@ -256,7 +271,7 @@ func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *DHTMessage) {
|
||||
|
||||
func (dht *IpfsDHT) handleFindPeer(p *peer.Peer, pmes *DHTMessage) {
|
||||
u.POut("handleFindPeer: searching for '%s'", peer.ID(pmes.GetKey()).Pretty())
|
||||
closest := dht.routes.NearestPeer(convertKey(u.Key(pmes.GetKey())))
|
||||
closest := dht.routes[0].NearestPeer(convertKey(u.Key(pmes.GetKey())))
|
||||
if closest == nil {
|
||||
panic("could not find anything.")
|
||||
}
|
||||
@ -336,10 +351,10 @@ func (dht *IpfsDHT) handleAddProvider(p *peer.Peer, pmes *DHTMessage) {
|
||||
|
||||
// Register a handler for a specific message ID, used for getting replies
|
||||
// to certain messages (i.e. response to a GET_VALUE message)
|
||||
func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
|
||||
func (dht *IpfsDHT) ListenFor(mesid uint64, count int) <-chan *swarm.Message {
|
||||
lchan := make(chan *swarm.Message)
|
||||
dht.listenLock.Lock()
|
||||
dht.listeners[mesid] = lchan
|
||||
dht.listeners[mesid] = &listenInfo{lchan, count}
|
||||
dht.listenLock.Unlock()
|
||||
return lchan
|
||||
}
|
||||
@ -347,12 +362,19 @@ func (dht *IpfsDHT) ListenFor(mesid uint64) <-chan *swarm.Message {
|
||||
// Unregister the given message id from the listener map
|
||||
func (dht *IpfsDHT) Unlisten(mesid uint64) {
|
||||
dht.listenLock.Lock()
|
||||
ch, ok := dht.listeners[mesid]
|
||||
list, ok := dht.listeners[mesid]
|
||||
if ok {
|
||||
delete(dht.listeners, mesid)
|
||||
}
|
||||
dht.listenLock.Unlock()
|
||||
close(ch)
|
||||
close(list.resp)
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) IsListening(mesid uint64) bool {
|
||||
dht.listenLock.RLock()
|
||||
_,ok := dht.listeners[mesid]
|
||||
dht.listenLock.RUnlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
// Stop all communications from this peer and shut down
|
||||
@ -368,3 +390,51 @@ func (dht *IpfsDHT) addProviderEntry(key u.Key, p *peer.Peer) {
|
||||
dht.providers[key] = append(provs, &providerInfo{time.Now(), p})
|
||||
dht.providerLock.Unlock()
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) handleDiagnostic(p *peer.Peer, pmes *DHTMessage) {
|
||||
dht.diaglock.Lock()
|
||||
if dht.IsListening(pmes.GetId()) {
|
||||
//TODO: ehhh..........
|
||||
dht.diaglock.Unlock()
|
||||
return
|
||||
}
|
||||
dht.diaglock.Unlock()
|
||||
|
||||
seq := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
|
||||
listen_chan := dht.ListenFor(pmes.GetId(), len(seq))
|
||||
|
||||
for _,ps := range seq {
|
||||
mes := swarm.NewMessage(ps, pmes)
|
||||
dht.network.Chan.Outgoing <-mes
|
||||
}
|
||||
|
||||
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
// NOTE: this shouldnt be a hardcoded value
|
||||
after := time.After(time.Second * 20)
|
||||
count := len(seq)
|
||||
for count > 0 {
|
||||
select {
|
||||
case <-after:
|
||||
//Timeout, return what we have
|
||||
goto out
|
||||
case req_resp := <-listen_chan:
|
||||
buf.Write(req_resp.Data)
|
||||
count--
|
||||
}
|
||||
}
|
||||
|
||||
out:
|
||||
di := dht.getDiagInfo()
|
||||
buf.Write(di.Marshal())
|
||||
resp := pDHTMessage{
|
||||
Type: DHTMessage_DIAGNOSTIC,
|
||||
Id: pmes.GetId(),
|
||||
Value: buf.Bytes(),
|
||||
Response: true,
|
||||
}
|
||||
|
||||
mes := swarm.NewMessage(p, resp.ToProtobuf())
|
||||
dht.network.Chan.Outgoing <-mes
|
||||
}
|
||||
|
71
routing.go
71
routing.go
@ -3,6 +3,7 @@ package dht
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
|
||||
proto "code.google.com/p/goprotobuf/proto"
|
||||
@ -30,7 +31,7 @@ func GenerateMessageID() uint64 {
|
||||
// PutValue adds value corresponding to given Key.
|
||||
func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
|
||||
var p *peer.Peer
|
||||
p = s.routes.NearestPeer(convertKey(key))
|
||||
p = s.routes[0].NearestPeer(convertKey(key))
|
||||
if p == nil {
|
||||
panic("Table returned nil peer!")
|
||||
}
|
||||
@ -52,7 +53,7 @@ func (s *IpfsDHT) PutValue(key u.Key, value []byte) error {
|
||||
// returned along with util.ErrSearchIncomplete
|
||||
func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
||||
var p *peer.Peer
|
||||
p = s.routes.NearestPeer(convertKey(key))
|
||||
p = s.routes[0].NearestPeer(convertKey(key))
|
||||
if p == nil {
|
||||
panic("Table returned nil peer!")
|
||||
}
|
||||
@ -62,7 +63,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
||||
Key: string(key),
|
||||
Id: GenerateMessageID(),
|
||||
}
|
||||
response_chan := s.ListenFor(pmes.Id)
|
||||
response_chan := s.ListenFor(pmes.Id, 1)
|
||||
|
||||
mes := swarm.NewMessage(p, pmes.ToProtobuf())
|
||||
s.network.Chan.Outgoing <- mes
|
||||
@ -92,7 +93,7 @@ func (s *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) {
|
||||
|
||||
// Announce that this node can provide value for given key
|
||||
func (s *IpfsDHT) Provide(key u.Key) error {
|
||||
peers := s.routes.NearestPeers(convertKey(key), PoolSize)
|
||||
peers := s.routes[0].NearestPeers(convertKey(key), PoolSize)
|
||||
if len(peers) == 0 {
|
||||
//return an error
|
||||
}
|
||||
@ -112,7 +113,7 @@ func (s *IpfsDHT) Provide(key u.Key) error {
|
||||
|
||||
// FindProviders searches for peers who can provide the value for given key.
|
||||
func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer, error) {
|
||||
p := s.routes.NearestPeer(convertKey(key))
|
||||
p := s.routes[0].NearestPeer(convertKey(key))
|
||||
|
||||
pmes := pDHTMessage{
|
||||
Type: DHTMessage_GET_PROVIDERS,
|
||||
@ -122,7 +123,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
|
||||
|
||||
mes := swarm.NewMessage(p, pmes.ToProtobuf())
|
||||
|
||||
listen_chan := s.ListenFor(pmes.Id)
|
||||
listen_chan := s.ListenFor(pmes.Id, 1)
|
||||
u.DOut("Find providers for: '%s'", key)
|
||||
s.network.Chan.Outgoing <-mes
|
||||
after := time.After(timeout)
|
||||
@ -163,7 +164,6 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
|
||||
}
|
||||
|
||||
return prov_arr, nil
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -171,7 +171,7 @@ func (s *IpfsDHT) FindProviders(key u.Key, timeout time.Duration) ([]*peer.Peer,
|
||||
|
||||
// FindPeer searches for a peer with given ID.
|
||||
func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error) {
|
||||
p := s.routes.NearestPeer(convertPeerID(id))
|
||||
p := s.routes[0].NearestPeer(convertPeerID(id))
|
||||
|
||||
pmes := pDHTMessage{
|
||||
Type: DHTMessage_FIND_NODE,
|
||||
@ -181,7 +181,7 @@ func (s *IpfsDHT) FindPeer(id peer.ID, timeout time.Duration) (*peer.Peer, error
|
||||
|
||||
mes := swarm.NewMessage(p, pmes.ToProtobuf())
|
||||
|
||||
listen_chan := s.ListenFor(pmes.Id)
|
||||
listen_chan := s.ListenFor(pmes.Id, 1)
|
||||
s.network.Chan.Outgoing <-mes
|
||||
after := time.After(timeout)
|
||||
select {
|
||||
@ -224,7 +224,7 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
|
||||
mes := swarm.NewMessage(p, pmes.ToProtobuf())
|
||||
|
||||
before := time.Now()
|
||||
response_chan := dht.ListenFor(pmes.Id)
|
||||
response_chan := dht.ListenFor(pmes.Id, 1)
|
||||
dht.network.Chan.Outgoing <- mes
|
||||
|
||||
tout := time.After(timeout)
|
||||
@ -241,3 +241,54 @@ func (dht *IpfsDHT) Ping(p *peer.Peer, timeout time.Duration) error {
|
||||
return u.ErrTimeout
|
||||
}
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) GetDiagnostic(timeout time.Duration) ([]*diagInfo, error) {
|
||||
u.DOut("Begin Diagnostic")
|
||||
//Send to N closest peers
|
||||
targets := dht.routes[0].NearestPeers(convertPeerID(dht.self.ID), 10)
|
||||
|
||||
// TODO: Add timeout to this struct so nodes know when to return
|
||||
pmes := pDHTMessage{
|
||||
Type: DHTMessage_DIAGNOSTIC,
|
||||
Id: GenerateMessageID(),
|
||||
}
|
||||
|
||||
listen_chan := dht.ListenFor(pmes.Id, len(targets))
|
||||
|
||||
pbmes := pmes.ToProtobuf()
|
||||
for _,p := range targets {
|
||||
mes := swarm.NewMessage(p, pbmes)
|
||||
dht.network.Chan.Outgoing <-mes
|
||||
}
|
||||
|
||||
var out []*diagInfo
|
||||
after := time.After(timeout)
|
||||
for count := len(targets); count > 0; {
|
||||
select {
|
||||
case <-after:
|
||||
u.DOut("Diagnostic request timed out.")
|
||||
return out, u.ErrTimeout
|
||||
case resp := <-listen_chan:
|
||||
pmes_out := new(DHTMessage)
|
||||
err := proto.Unmarshal(resp.Data, pmes_out)
|
||||
if err != nil {
|
||||
// NOTE: here and elsewhere, need to audit error handling,
|
||||
// some errors should be continued on from
|
||||
return out, err
|
||||
}
|
||||
|
||||
dec := json.NewDecoder(bytes.NewBuffer(pmes_out.GetValue()))
|
||||
for {
|
||||
di := new(diagInfo)
|
||||
err := dec.Decode(di)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
out = append(out, di)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil,nil
|
||||
}
|
||||
|
9
table.go
9
table.go
@ -95,11 +95,7 @@ func (p peerSorterArr) Less(a, b int) bool {
|
||||
//
|
||||
|
||||
func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) peerSorterArr {
|
||||
if peerList == nil {
|
||||
return peerSorterArr{}
|
||||
}
|
||||
e := peerList.Front()
|
||||
for ; e != nil; {
|
||||
for e := peerList.Front(); e != nil; e = e.Next() {
|
||||
p := e.Value.(*peer.Peer)
|
||||
p_id := convertPeerID(p.ID)
|
||||
pd := peerDistance{
|
||||
@ -107,11 +103,10 @@ func copyPeersFromList(target ID, peerArr peerSorterArr, peerList *list.List) pe
|
||||
distance: xor(target, p_id),
|
||||
}
|
||||
peerArr = append(peerArr, &pd)
|
||||
if e != nil {
|
||||
if e == nil {
|
||||
u.POut("list element was nil.")
|
||||
return peerArr
|
||||
}
|
||||
e = e.Next()
|
||||
}
|
||||
return peerArr
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user