add put and get dht commands to cli

This commit is contained in:
Jeromy 2015-02-21 16:20:28 -08:00
parent 0394b95f47
commit d88233353f
3 changed files with 18 additions and 2 deletions

2
dht.go
View File

@ -322,7 +322,7 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
// == to self? thats bad // == to self? thats bad
for _, p := range closer { for _, p := range closer {
if p == dht.self { if p == dht.self {
log.Debug("Attempted to return self! this shouldnt happen...") log.Error("Attempted to return self! this shouldnt happen...")
return nil return nil
} }
} }

View File

@ -83,7 +83,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
// Find closest peer on given cluster to desired key and reply with that info // Find closest peer on given cluster to desired key and reply with that info
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
if closer != nil { if len(closer) > 0 {
closerinfos := peer.PeerInfos(dht.peerstore, closer) closerinfos := peer.PeerInfos(dht.peerstore, closer)
for _, pi := range closerinfos { for _, pi := range closerinfos {
log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID) log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)

View File

@ -59,6 +59,11 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
wg.Add(1) wg.Add(1)
go func(p peer.ID) { go func(p peer.ID) {
defer wg.Done() defer wg.Done()
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.Value,
ID: p,
})
err := dht.putValueToPeer(ctx, p, key, rec) err := dht.putValueToPeer(ctx, p, key, rec)
if err != nil { if err != nil {
log.Debugf("failed putting value to peer: %s", err) log.Debugf("failed putting value to peer: %s", err)
@ -92,6 +97,11 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
// setup the Query // setup the Query
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})
val, peers, err := dht.getValueOrPeers(ctx, p, key) val, peers, err := dht.getValueOrPeers(ctx, p, key)
if err != nil { if err != nil {
return nil, err return nil, err
@ -102,6 +112,12 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
res.success = true res.success = true
} }
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.PeerResponse,
ID: p,
Responses: pointerizePeerInfos(peers),
})
return res, nil return res, nil
}) })