mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-25 06:42:13 +00:00
getValueSingle using SendRequest
This commit is contained in:
parent
d13b445da9
commit
6c1cdb39b5
71
dht.go
71
dht.go
@ -159,8 +159,37 @@ func (dht *IpfsDHT) HandleMessage(ctx context.Context, mes msg.NetMessage) (msg.
|
|||||||
return rmes, nil
|
return rmes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Duration, level int) ([]byte, []*peer.Peer, error) {
|
// sendRequest sends out a request using dht.sender, but also makes sure to
|
||||||
pmes, err := dht.getValueSingle(p, key, timeout, level)
|
// measure the RTT for latency measurements.
|
||||||
|
func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message) (*Message, error) {
|
||||||
|
|
||||||
|
mes, err := msg.FromObject(p, pmes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
rmes, err := dht.sender.SendRequest(ctx, mes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rtt := time.Since(start)
|
||||||
|
rmes.Peer().SetLatency(rtt)
|
||||||
|
|
||||||
|
rpmes := new(Message)
|
||||||
|
if err := proto.Unmarshal(rmes.Data(), rpmes); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return rpmes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p *peer.Peer,
|
||||||
|
key u.Key, level int) ([]byte, []*peer.Peer, error) {
|
||||||
|
|
||||||
|
pmes, err := dht.getValueSingle(ctx, p, key, level)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
@ -202,39 +231,15 @@ func (dht *IpfsDHT) getValueOrPeers(p *peer.Peer, key u.Key, timeout time.Durati
|
|||||||
}
|
}
|
||||||
|
|
||||||
// getValueSingle simply performs the get value RPC with the given parameters
|
// getValueSingle simply performs the get value RPC with the given parameters
|
||||||
func (dht *IpfsDHT) getValueSingle(p *peer.Peer, key u.Key, timeout time.Duration, level int) (*Message, error) {
|
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p *peer.Peer,
|
||||||
pmes := Message{
|
key u.Key, level int) (*Message, error) {
|
||||||
Type: Message_GET_VALUE,
|
|
||||||
Key: string(key),
|
|
||||||
Value: []byte{byte(level)},
|
|
||||||
ID: swarm.GenerateMessageID(),
|
|
||||||
}
|
|
||||||
responseChan := dht.listener.Listen(pmes.ID, 1, time.Minute)
|
|
||||||
|
|
||||||
mes := swarm.NewMessage(p, pmes.ToProtobuf())
|
typ := Message_GET_VALUE
|
||||||
t := time.Now()
|
skey := string(key)
|
||||||
dht.netChan.Outgoing <- mes
|
pmes := &Message{Type: &typ, Key: &skey}
|
||||||
|
pmes.SetClusterLevel(int32(level))
|
||||||
|
|
||||||
// Wait for either the response or a timeout
|
return dht.sendRequest(ctx, p, pmes)
|
||||||
timeup := time.After(timeout)
|
|
||||||
select {
|
|
||||||
case <-timeup:
|
|
||||||
dht.listener.Unlisten(pmes.ID)
|
|
||||||
return nil, u.ErrTimeout
|
|
||||||
case resp, ok := <-responseChan:
|
|
||||||
if !ok {
|
|
||||||
u.PErr("response channel closed before timeout, please investigate.\n")
|
|
||||||
return nil, u.ErrTimeout
|
|
||||||
}
|
|
||||||
roundtrip := time.Since(t)
|
|
||||||
resp.Peer.SetLatency(roundtrip)
|
|
||||||
pmesOut := new(Message)
|
|
||||||
err := proto.Unmarshal(resp.Data, pmesOut)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return pmesOut, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Im not certain on this implementation, we get a list of peers/providers
|
// TODO: Im not certain on this implementation, we get a list of peers/providers
|
||||||
|
Loading…
x
Reference in New Issue
Block a user