feat: add fetch protocol (#1036)

Adds three methods to implement the `/libp2p/fetch/0.0.1` protocol:

* `libp2p.fetch(peerId, key) => Promise<Uint8Array>`
* `libp2p.fetchService.registerLookupFunction(prefix, lookupFunction)`
* `libp2p.fetchService.unRegisterLookupFunction(prefix, [lookupFunction])`

Co-authored-by: achingbrain <alex@achingbrain.net>
This commit is contained in:
Spencer T Brody
2022-01-24 12:07:11 -05:00
committed by GitHub
parent 00e49592a3
commit d8ceb0bc66
11 changed files with 932 additions and 2 deletions

View File

@ -12,6 +12,9 @@
* [`handle`](#handle) * [`handle`](#handle)
* [`unhandle`](#unhandle) * [`unhandle`](#unhandle)
* [`ping`](#ping) * [`ping`](#ping)
* [`fetch`](#fetch)
* [`fetchService.registerLookupFunction`](#fetchserviceregisterlookupfunction)
* [`fetchService.unRegisterLookupFunction`](#fetchserviceunregisterlookupfunction)
* [`multiaddrs`](#multiaddrs) * [`multiaddrs`](#multiaddrs)
* [`addressManager.getListenAddrs`](#addressmanagergetlistenaddrs) * [`addressManager.getListenAddrs`](#addressmanagergetlistenaddrs)
* [`addressManager.getAnnounceAddrs`](#addressmanagergetannounceaddrs) * [`addressManager.getAnnounceAddrs`](#addressmanagergetannounceaddrs)
@ -455,6 +458,72 @@ Pings a given peer and get the operation's latency.
const latency = await libp2p.ping(otherPeerId) const latency = await libp2p.ping(otherPeerId)
``` ```
## fetch
Fetch a value from a remote node
`libp2p.fetch(peer, key)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| peer | [`PeerId`][peer-id]\|[`Multiaddr`][multiaddr]\|`string` | peer to ping |
| key | `string` | A key that corresponds to a value on the remote node |
#### Returns
| Type | Description |
|------|-------------|
| `Promise<Uint8Array | null>` | The value for the key or null if it cannot be found |
#### Example
```js
// ...
const value = await libp2p.fetch(otherPeerId, '/some/key')
```
## fetchService.registerLookupFunction
Register a function to look up values requested by remote nodes
`libp2p.fetchService.registerLookupFunction(prefix, lookup)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| prefix | `string` | All queries below this prefix will be passed to the lookup function |
| lookup | `(key: string) => Promise<Uint8Array | null>` | A function that takes a key and returns a Uint8Array or null |
#### Example
```js
// ...
const value = await libp2p.fetchService.registerLookupFunction('/prefix', (key) => { ... })
```
## fetchService.unregisterLookupFunction
Removes the passed lookup function or any function registered for the passed prefix
`libp2p.fetchService.unregisterLookupFunction(prefix, lookup)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| prefix | `string` | All queries below this prefix will be passed to the lookup function |
| lookup | `(key: string) => Promise<Uint8Array | null>` | Optional: A function that takes a key and returns a Uint8Array or null |
#### Example
```js
// ...
libp2p.fetchService.unregisterLookupFunction('/prefix')
```
## multiaddrs ## multiaddrs
Gets the multiaddrs the libp2p node announces to the network. This computes the advertising multiaddrs Gets the multiaddrs the libp2p node announces to the network. This computes the advertising multiaddrs

View File

@ -20,15 +20,17 @@
"scripts": { "scripts": {
"lint": "aegir lint", "lint": "aegir lint",
"build": "aegir build", "build": "aegir build",
"build:proto": "npm run build:proto:circuit && npm run build:proto:identify && npm run build:proto:plaintext && npm run build:proto:address-book && npm run build:proto:proto-book && npm run build:proto:peer && npm run build:proto:peer-record && npm run build:proto:envelope", "build:proto": "npm run build:proto:circuit && npm run build:proto:fetch && npm run build:proto:identify && npm run build:proto:plaintext && npm run build:proto:address-book && npm run build:proto:proto-book && npm run build:proto:peer && npm run build:proto:peer-record && npm run build:proto:envelope",
"build:proto:circuit": "pbjs -t static-module -w commonjs -r libp2p-circuit --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/circuit/protocol/index.js ./src/circuit/protocol/index.proto", "build:proto:circuit": "pbjs -t static-module -w commonjs -r libp2p-circuit --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/circuit/protocol/index.js ./src/circuit/protocol/index.proto",
"build:proto:fetch": "pbjs -t static-module -w commonjs -r libp2p-fetch --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/fetch/proto.js ./src/fetch/proto.proto",
"build:proto:identify": "pbjs -t static-module -w commonjs -r libp2p-identify --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/identify/message.js ./src/identify/message.proto", "build:proto:identify": "pbjs -t static-module -w commonjs -r libp2p-identify --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/identify/message.js ./src/identify/message.proto",
"build:proto:plaintext": "pbjs -t static-module -w commonjs -r libp2p-plaintext --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/insecure/proto.js ./src/insecure/proto.proto", "build:proto:plaintext": "pbjs -t static-module -w commonjs -r libp2p-plaintext --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/insecure/proto.js ./src/insecure/proto.proto",
"build:proto:peer": "pbjs -t static-module -w commonjs -r libp2p-peer --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/peer-store/pb/peer.js ./src/peer-store/pb/peer.proto", "build:proto:peer": "pbjs -t static-module -w commonjs -r libp2p-peer --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/peer-store/pb/peer.js ./src/peer-store/pb/peer.proto",
"build:proto:peer-record": "pbjs -t static-module -w commonjs -r libp2p-peer-record --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/record/peer-record/peer-record.js ./src/record/peer-record/peer-record.proto", "build:proto:peer-record": "pbjs -t static-module -w commonjs -r libp2p-peer-record --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/record/peer-record/peer-record.js ./src/record/peer-record/peer-record.proto",
"build:proto:envelope": "pbjs -t static-module -w commonjs -r libp2p-envelope --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/record/envelope/envelope.js ./src/record/envelope/envelope.proto", "build:proto:envelope": "pbjs -t static-module -w commonjs -r libp2p-envelope --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/record/envelope/envelope.js ./src/record/envelope/envelope.proto",
"build:proto-types": "npm run build:proto-types:circuit && npm run build:proto-types:identify && npm run build:proto-types:plaintext && npm run build:proto-types:address-book && npm run build:proto-types:proto-book && npm run build:proto-types:peer && npm run build:proto-types:peer-record && npm run build:proto-types:envelope", "build:proto-types": "npm run build:proto-types:circuit && npm run build:proto-types:fetch && npm run build:proto-types:identify && npm run build:proto-types:plaintext && npm run build:proto-types:address-book && npm run build:proto-types:proto-book && npm run build:proto-types:peer && npm run build:proto-types:peer-record && npm run build:proto-types:envelope",
"build:proto-types:circuit": "pbts -o src/circuit/protocol/index.d.ts src/circuit/protocol/index.js", "build:proto-types:circuit": "pbts -o src/circuit/protocol/index.d.ts src/circuit/protocol/index.js",
"build:proto-types:fetch": "pbts -o src/fetch/proto.d.ts src/fetch/proto.js",
"build:proto-types:identify": "pbts -o src/identify/message.d.ts src/identify/message.js", "build:proto-types:identify": "pbts -o src/identify/message.d.ts src/identify/message.js",
"build:proto-types:plaintext": "pbts -o src/insecure/proto.d.ts src/insecure/proto.js", "build:proto-types:plaintext": "pbts -o src/insecure/proto.d.ts src/insecure/proto.js",
"build:proto-types:peer": "pbts -o src/peer-store/pb/peer.d.ts src/peer-store/pb/peer.js", "build:proto-types:peer": "pbts -o src/peer-store/pb/peer.d.ts src/peer-store/pb/peer.js",

36
src/fetch/README.md Normal file
View File

@ -0,0 +1,36 @@
libp2p-fetch JavaScript Implementation
=====================================
> Libp2p fetch protocol JavaScript implementation
## Overview
An implementation of the Fetch protocol as described here: https://github.com/libp2p/specs/tree/master/fetch
The fetch protocol is a simple protocol for requesting a value corresponding to a key from a peer.
## Usage
```javascript
const Libp2p = require('libp2p')
/**
* Given a key (as a string) returns a value (as a Uint8Array), or null if the key isn't found.
* All keys must be prefixed my the same prefix, which will be used to find the appropriate key
* lookup function.
* @param key - a string
* @returns value - a Uint8Array value that corresponds to the given key, or null if the key doesn't
* have a corresponding value.
*/
async function my_subsystem_key_lookup(key) {
// app specific callback to lookup key-value pairs.
}
// Enable this peer to respond to fetch requests for keys that begin with '/my_subsystem_key_prefix/'
const libp2p = Libp2p.create(...)
libp2p.fetchService.registerLookupFunction('/my_subsystem_key_prefix/', my_subsystem_key_lookup)
const key = '/my_subsystem_key_prefix/{...}'
const peerDst = PeerId.parse('Qmfoo...') // or Multiaddr instance
const value = await libp2p.fetch(peerDst, key)
```

6
src/fetch/constants.js Normal file
View File

@ -0,0 +1,6 @@
'use strict'
module.exports = {
// https://github.com/libp2p/specs/tree/master/fetch#wire-protocol
PROTOCOL: '/libp2p/fetch/0.0.1'
}

159
src/fetch/index.js Normal file
View File

@ -0,0 +1,159 @@
'use strict'
const debug = require('debug')
const log = Object.assign(debug('libp2p:fetch'), {
error: debug('libp2p:fetch:err')
})
const errCode = require('err-code')
const { codes } = require('../errors')
const lp = require('it-length-prefixed')
const { FetchRequest, FetchResponse } = require('./proto')
// @ts-ignore it-handshake does not export types
const handshake = require('it-handshake')
const { PROTOCOL } = require('./constants')
/**
* @typedef {import('../')} Libp2p
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('peer-id')} PeerId
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
* @typedef {(key: string) => Promise<Uint8Array | null>} LookupFunction
*/
/**
* A simple libp2p protocol for requesting a value corresponding to a key from a peer.
* Developers can register one or more lookup function for retrieving the value corresponding to
* a given key. Each lookup function must act on a distinct part of the overall key space, defined
* by a fixed prefix that all keys that should be routed to that lookup function will start with.
*/
class FetchProtocol {
/**
* @param {Libp2p} libp2p
*/
constructor (libp2p) {
this._lookupFunctions = new Map() // Maps key prefix to value lookup function
this._libp2p = libp2p
this.handleMessage = this.handleMessage.bind(this)
}
/**
* Sends a request to fetch the value associated with the given key from the given peer.
*
* @param {PeerId|Multiaddr} peer
* @param {string} key
* @returns {Promise<Uint8Array | null>}
*/
async fetch (peer, key) {
// @ts-ignore multiaddr might not have toB58String
log('dialing %s to %s', this._protocol, peer.toB58String ? peer.toB58String() : peer)
const connection = await this._libp2p.dial(peer)
const { stream } = await connection.newStream(FetchProtocol.PROTOCOL)
const shake = handshake(stream)
// send message
const request = new FetchRequest({ identifier: key })
shake.write(lp.encode.single(FetchRequest.encode(request).finish()))
// read response
const response = FetchResponse.decode((await lp.decode.fromReader(shake.reader).next()).value.slice())
switch (response.status) {
case (FetchResponse.StatusCode.OK): {
return response.data
}
case (FetchResponse.StatusCode.NOT_FOUND): {
return null
}
case (FetchResponse.StatusCode.ERROR): {
const errmsg = (new TextDecoder()).decode(response.data)
throw errCode(new Error('Error in fetch protocol response: ' + errmsg), codes.ERR_INVALID_PARAMETERS)
}
default: {
throw errCode(new Error('Unknown response status'), codes.ERR_INVALID_MESSAGE)
}
}
}
/**
* Invoked when a fetch request is received. Reads the request message off the given stream and
* responds based on looking up the key in the request via the lookup callback that corresponds
* to the key's prefix.
*
* @param {object} options
* @param {MuxedStream} options.stream
* @param {string} options.protocol
*/
async handleMessage (options) {
const { stream } = options
const shake = handshake(stream)
const request = FetchRequest.decode((await lp.decode.fromReader(shake.reader).next()).value.slice())
let response
const lookup = this._getLookupFunction(request.identifier)
if (lookup) {
const data = await lookup(request.identifier)
if (data) {
response = new FetchResponse({ status: FetchResponse.StatusCode.OK, data })
} else {
response = new FetchResponse({ status: FetchResponse.StatusCode.NOT_FOUND })
}
} else {
const errmsg = (new TextEncoder()).encode('No lookup function registered for key: ' + request.identifier)
response = new FetchResponse({ status: FetchResponse.StatusCode.ERROR, data: errmsg })
}
shake.write(lp.encode.single(FetchResponse.encode(response).finish()))
}
/**
* Given a key, finds the appropriate function for looking up its corresponding value, based on
* the key's prefix.
*
* @param {string} key
*/
_getLookupFunction (key) {
for (const prefix of this._lookupFunctions.keys()) {
if (key.startsWith(prefix)) {
return this._lookupFunctions.get(prefix)
}
}
return null
}
/**
* Registers a new lookup callback that can map keys to values, for a given set of keys that
* share the same prefix.
*
* @param {string} prefix
* @param {LookupFunction} lookup
*/
registerLookupFunction (prefix, lookup) {
if (this._lookupFunctions.has(prefix)) {
throw errCode(new Error("Fetch protocol handler for key prefix '" + prefix + "' already registered"), codes.ERR_KEY_ALREADY_EXISTS)
}
this._lookupFunctions.set(prefix, lookup)
}
/**
* Registers a new lookup callback that can map keys to values, for a given set of keys that
* share the same prefix.
*
* @param {string} prefix
* @param {LookupFunction} [lookup]
*/
unregisterLookupFunction (prefix, lookup) {
if (lookup != null) {
const existingLookup = this._lookupFunctions.get(prefix)
if (existingLookup !== lookup) {
return
}
}
this._lookupFunctions.delete(prefix)
}
}
FetchProtocol.PROTOCOL = PROTOCOL
exports = module.exports = FetchProtocol

134
src/fetch/proto.d.ts vendored Normal file
View File

@ -0,0 +1,134 @@
import * as $protobuf from "protobufjs";
/** Properties of a FetchRequest. */
export interface IFetchRequest {
/** FetchRequest identifier */
identifier?: (string|null);
}
/** Represents a FetchRequest. */
export class FetchRequest implements IFetchRequest {
/**
* Constructs a new FetchRequest.
* @param [p] Properties to set
*/
constructor(p?: IFetchRequest);
/** FetchRequest identifier. */
public identifier: string;
/**
* Encodes the specified FetchRequest message. Does not implicitly {@link FetchRequest.verify|verify} messages.
* @param m FetchRequest message or plain object to encode
* @param [w] Writer to encode to
* @returns Writer
*/
public static encode(m: IFetchRequest, w?: $protobuf.Writer): $protobuf.Writer;
/**
* Decodes a FetchRequest message from the specified reader or buffer.
* @param r Reader or buffer to decode from
* @param [l] Message length if known beforehand
* @returns FetchRequest
* @throws {Error} If the payload is not a reader or valid buffer
* @throws {$protobuf.util.ProtocolError} If required fields are missing
*/
public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): FetchRequest;
/**
* Creates a FetchRequest message from a plain object. Also converts values to their respective internal types.
* @param d Plain object
* @returns FetchRequest
*/
public static fromObject(d: { [k: string]: any }): FetchRequest;
/**
* Creates a plain object from a FetchRequest message. Also converts values to other types if specified.
* @param m FetchRequest
* @param [o] Conversion options
* @returns Plain object
*/
public static toObject(m: FetchRequest, o?: $protobuf.IConversionOptions): { [k: string]: any };
/**
* Converts this FetchRequest to JSON.
* @returns JSON object
*/
public toJSON(): { [k: string]: any };
}
/** Properties of a FetchResponse. */
export interface IFetchResponse {
/** FetchResponse status */
status?: (FetchResponse.StatusCode|null);
/** FetchResponse data */
data?: (Uint8Array|null);
}
/** Represents a FetchResponse. */
export class FetchResponse implements IFetchResponse {
/**
* Constructs a new FetchResponse.
* @param [p] Properties to set
*/
constructor(p?: IFetchResponse);
/** FetchResponse status. */
public status: FetchResponse.StatusCode;
/** FetchResponse data. */
public data: Uint8Array;
/**
* Encodes the specified FetchResponse message. Does not implicitly {@link FetchResponse.verify|verify} messages.
* @param m FetchResponse message or plain object to encode
* @param [w] Writer to encode to
* @returns Writer
*/
public static encode(m: IFetchResponse, w?: $protobuf.Writer): $protobuf.Writer;
/**
* Decodes a FetchResponse message from the specified reader or buffer.
* @param r Reader or buffer to decode from
* @param [l] Message length if known beforehand
* @returns FetchResponse
* @throws {Error} If the payload is not a reader or valid buffer
* @throws {$protobuf.util.ProtocolError} If required fields are missing
*/
public static decode(r: ($protobuf.Reader|Uint8Array), l?: number): FetchResponse;
/**
* Creates a FetchResponse message from a plain object. Also converts values to their respective internal types.
* @param d Plain object
* @returns FetchResponse
*/
public static fromObject(d: { [k: string]: any }): FetchResponse;
/**
* Creates a plain object from a FetchResponse message. Also converts values to other types if specified.
* @param m FetchResponse
* @param [o] Conversion options
* @returns Plain object
*/
public static toObject(m: FetchResponse, o?: $protobuf.IConversionOptions): { [k: string]: any };
/**
* Converts this FetchResponse to JSON.
* @returns JSON object
*/
public toJSON(): { [k: string]: any };
}
export namespace FetchResponse {
/** StatusCode enum. */
enum StatusCode {
OK = 0,
NOT_FOUND = 1,
ERROR = 2
}
}

333
src/fetch/proto.js Normal file
View File

@ -0,0 +1,333 @@
/*eslint-disable*/
"use strict";
var $protobuf = require("protobufjs/minimal");
// Common aliases
var $Reader = $protobuf.Reader, $Writer = $protobuf.Writer, $util = $protobuf.util;
// Exported root namespace
var $root = $protobuf.roots["libp2p-fetch"] || ($protobuf.roots["libp2p-fetch"] = {});
$root.FetchRequest = (function() {
/**
* Properties of a FetchRequest.
* @exports IFetchRequest
* @interface IFetchRequest
* @property {string|null} [identifier] FetchRequest identifier
*/
/**
* Constructs a new FetchRequest.
* @exports FetchRequest
* @classdesc Represents a FetchRequest.
* @implements IFetchRequest
* @constructor
* @param {IFetchRequest=} [p] Properties to set
*/
function FetchRequest(p) {
if (p)
for (var ks = Object.keys(p), i = 0; i < ks.length; ++i)
if (p[ks[i]] != null)
this[ks[i]] = p[ks[i]];
}
/**
* FetchRequest identifier.
* @member {string} identifier
* @memberof FetchRequest
* @instance
*/
FetchRequest.prototype.identifier = "";
/**
* Encodes the specified FetchRequest message. Does not implicitly {@link FetchRequest.verify|verify} messages.
* @function encode
* @memberof FetchRequest
* @static
* @param {IFetchRequest} m FetchRequest message or plain object to encode
* @param {$protobuf.Writer} [w] Writer to encode to
* @returns {$protobuf.Writer} Writer
*/
FetchRequest.encode = function encode(m, w) {
if (!w)
w = $Writer.create();
if (m.identifier != null && Object.hasOwnProperty.call(m, "identifier"))
w.uint32(10).string(m.identifier);
return w;
};
/**
* Decodes a FetchRequest message from the specified reader or buffer.
* @function decode
* @memberof FetchRequest
* @static
* @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from
* @param {number} [l] Message length if known beforehand
* @returns {FetchRequest} FetchRequest
* @throws {Error} If the payload is not a reader or valid buffer
* @throws {$protobuf.util.ProtocolError} If required fields are missing
*/
FetchRequest.decode = function decode(r, l) {
if (!(r instanceof $Reader))
r = $Reader.create(r);
var c = l === undefined ? r.len : r.pos + l, m = new $root.FetchRequest();
while (r.pos < c) {
var t = r.uint32();
switch (t >>> 3) {
case 1:
m.identifier = r.string();
break;
default:
r.skipType(t & 7);
break;
}
}
return m;
};
/**
* Creates a FetchRequest message from a plain object. Also converts values to their respective internal types.
* @function fromObject
* @memberof FetchRequest
* @static
* @param {Object.<string,*>} d Plain object
* @returns {FetchRequest} FetchRequest
*/
FetchRequest.fromObject = function fromObject(d) {
if (d instanceof $root.FetchRequest)
return d;
var m = new $root.FetchRequest();
if (d.identifier != null) {
m.identifier = String(d.identifier);
}
return m;
};
/**
* Creates a plain object from a FetchRequest message. Also converts values to other types if specified.
* @function toObject
* @memberof FetchRequest
* @static
* @param {FetchRequest} m FetchRequest
* @param {$protobuf.IConversionOptions} [o] Conversion options
* @returns {Object.<string,*>} Plain object
*/
FetchRequest.toObject = function toObject(m, o) {
if (!o)
o = {};
var d = {};
if (o.defaults) {
d.identifier = "";
}
if (m.identifier != null && m.hasOwnProperty("identifier")) {
d.identifier = m.identifier;
}
return d;
};
/**
* Converts this FetchRequest to JSON.
* @function toJSON
* @memberof FetchRequest
* @instance
* @returns {Object.<string,*>} JSON object
*/
FetchRequest.prototype.toJSON = function toJSON() {
return this.constructor.toObject(this, $protobuf.util.toJSONOptions);
};
return FetchRequest;
})();
$root.FetchResponse = (function() {
/**
* Properties of a FetchResponse.
* @exports IFetchResponse
* @interface IFetchResponse
* @property {FetchResponse.StatusCode|null} [status] FetchResponse status
* @property {Uint8Array|null} [data] FetchResponse data
*/
/**
* Constructs a new FetchResponse.
* @exports FetchResponse
* @classdesc Represents a FetchResponse.
* @implements IFetchResponse
* @constructor
* @param {IFetchResponse=} [p] Properties to set
*/
function FetchResponse(p) {
if (p)
for (var ks = Object.keys(p), i = 0; i < ks.length; ++i)
if (p[ks[i]] != null)
this[ks[i]] = p[ks[i]];
}
/**
* FetchResponse status.
* @member {FetchResponse.StatusCode} status
* @memberof FetchResponse
* @instance
*/
FetchResponse.prototype.status = 0;
/**
* FetchResponse data.
* @member {Uint8Array} data
* @memberof FetchResponse
* @instance
*/
FetchResponse.prototype.data = $util.newBuffer([]);
/**
* Encodes the specified FetchResponse message. Does not implicitly {@link FetchResponse.verify|verify} messages.
* @function encode
* @memberof FetchResponse
* @static
* @param {IFetchResponse} m FetchResponse message or plain object to encode
* @param {$protobuf.Writer} [w] Writer to encode to
* @returns {$protobuf.Writer} Writer
*/
FetchResponse.encode = function encode(m, w) {
if (!w)
w = $Writer.create();
if (m.status != null && Object.hasOwnProperty.call(m, "status"))
w.uint32(8).int32(m.status);
if (m.data != null && Object.hasOwnProperty.call(m, "data"))
w.uint32(18).bytes(m.data);
return w;
};
/**
* Decodes a FetchResponse message from the specified reader or buffer.
* @function decode
* @memberof FetchResponse
* @static
* @param {$protobuf.Reader|Uint8Array} r Reader or buffer to decode from
* @param {number} [l] Message length if known beforehand
* @returns {FetchResponse} FetchResponse
* @throws {Error} If the payload is not a reader or valid buffer
* @throws {$protobuf.util.ProtocolError} If required fields are missing
*/
FetchResponse.decode = function decode(r, l) {
if (!(r instanceof $Reader))
r = $Reader.create(r);
var c = l === undefined ? r.len : r.pos + l, m = new $root.FetchResponse();
while (r.pos < c) {
var t = r.uint32();
switch (t >>> 3) {
case 1:
m.status = r.int32();
break;
case 2:
m.data = r.bytes();
break;
default:
r.skipType(t & 7);
break;
}
}
return m;
};
/**
* Creates a FetchResponse message from a plain object. Also converts values to their respective internal types.
* @function fromObject
* @memberof FetchResponse
* @static
* @param {Object.<string,*>} d Plain object
* @returns {FetchResponse} FetchResponse
*/
FetchResponse.fromObject = function fromObject(d) {
if (d instanceof $root.FetchResponse)
return d;
var m = new $root.FetchResponse();
switch (d.status) {
case "OK":
case 0:
m.status = 0;
break;
case "NOT_FOUND":
case 1:
m.status = 1;
break;
case "ERROR":
case 2:
m.status = 2;
break;
}
if (d.data != null) {
if (typeof d.data === "string")
$util.base64.decode(d.data, m.data = $util.newBuffer($util.base64.length(d.data)), 0);
else if (d.data.length)
m.data = d.data;
}
return m;
};
/**
* Creates a plain object from a FetchResponse message. Also converts values to other types if specified.
* @function toObject
* @memberof FetchResponse
* @static
* @param {FetchResponse} m FetchResponse
* @param {$protobuf.IConversionOptions} [o] Conversion options
* @returns {Object.<string,*>} Plain object
*/
FetchResponse.toObject = function toObject(m, o) {
if (!o)
o = {};
var d = {};
if (o.defaults) {
d.status = o.enums === String ? "OK" : 0;
if (o.bytes === String)
d.data = "";
else {
d.data = [];
if (o.bytes !== Array)
d.data = $util.newBuffer(d.data);
}
}
if (m.status != null && m.hasOwnProperty("status")) {
d.status = o.enums === String ? $root.FetchResponse.StatusCode[m.status] : m.status;
}
if (m.data != null && m.hasOwnProperty("data")) {
d.data = o.bytes === String ? $util.base64.encode(m.data, 0, m.data.length) : o.bytes === Array ? Array.prototype.slice.call(m.data) : m.data;
}
return d;
};
/**
* Converts this FetchResponse to JSON.
* @function toJSON
* @memberof FetchResponse
* @instance
* @returns {Object.<string,*>} JSON object
*/
FetchResponse.prototype.toJSON = function toJSON() {
return this.constructor.toObject(this, $protobuf.util.toJSONOptions);
};
/**
* StatusCode enum.
* @name FetchResponse.StatusCode
* @enum {number}
* @property {number} OK=0 OK value
* @property {number} NOT_FOUND=1 NOT_FOUND value
* @property {number} ERROR=2 ERROR value
*/
FetchResponse.StatusCode = (function() {
var valuesById = {}, values = Object.create(valuesById);
values[valuesById[0] = "OK"] = 0;
values[valuesById[1] = "NOT_FOUND"] = 1;
values[valuesById[2] = "ERROR"] = 2;
return values;
})();
return FetchResponse;
})();
module.exports = $root;

15
src/fetch/proto.proto Normal file
View File

@ -0,0 +1,15 @@
syntax = "proto3";
message FetchRequest {
string identifier = 1;
}
message FetchResponse {
StatusCode status = 1;
enum StatusCode {
OK = 0;
NOT_FOUND = 1;
ERROR = 2;
}
bytes data = 2;
}

View File

@ -31,6 +31,7 @@ const PubsubAdapter = require('./pubsub-adapter')
const Registrar = require('./registrar') const Registrar = require('./registrar')
const ping = require('./ping') const ping = require('./ping')
const IdentifyService = require('./identify') const IdentifyService = require('./identify')
const FetchService = require('./fetch')
const NatManager = require('./nat-manager') const NatManager = require('./nat-manager')
const { updateSelfPeerRecord } = require('./record/utils') const { updateSelfPeerRecord } = require('./record/utils')
@ -323,6 +324,8 @@ class Libp2p extends EventEmitter {
ping.mount(this) ping.mount(this)
this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this) this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this)
this.fetchService = new FetchService(this)
} }
/** /**
@ -356,6 +359,10 @@ class Libp2p extends EventEmitter {
await this.handle(Object.values(IdentifyService.getProtocolStr(this)), this.identifyService.handleMessage) await this.handle(Object.values(IdentifyService.getProtocolStr(this)), this.identifyService.handleMessage)
} }
if (this.fetchService) {
await this.handle(FetchService.PROTOCOL, this.fetchService.handleMessage)
}
try { try {
await this._onStarting() await this._onStarting()
await this._onDidStart() await this._onDidStart()
@ -407,6 +414,8 @@ class Libp2p extends EventEmitter {
await this.natManager.stop() await this.natManager.stop()
await this.transportManager.close() await this.transportManager.close()
this.unhandle(FetchService.PROTOCOL)
ping.unmount(this) ping.unmount(this)
this.dialer.destroy() this.dialer.destroy()
} catch (/** @type {any} */ err) { } catch (/** @type {any} */ err) {
@ -559,6 +568,17 @@ class Libp2p extends EventEmitter {
) )
} }
/**
* Sends a request to fetch the value associated with the given key from the given peer.
*
* @param {PeerId|Multiaddr} peer
* @param {string} key
* @returns {Promise<Uint8Array | null>}
*/
fetch (peer, key) {
return this.fetchService.fetch(peer, key)
}
/** /**
* Pings the given peer in order to obtain the operation latency. * Pings the given peer in order to obtain the operation latency.
* *

155
test/fetch/fetch.node.js Normal file
View File

@ -0,0 +1,155 @@
'use strict'
/* eslint-env mocha */
const { expect } = require('aegir/utils/chai')
const Libp2p = require('../../src')
const TCP = require('libp2p-tcp')
const Mplex = require('libp2p-mplex')
const { NOISE } = require('@chainsafe/libp2p-noise')
const MDNS = require('libp2p-mdns')
const { createPeerId } = require('../utils/creators/peer')
const { codes } = require('../../src/errors')
const { Multiaddr } = require('multiaddr')
async function createLibp2pNode (peerId) {
return await Libp2p.create({
peerId,
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0']
},
modules: {
transport: [TCP],
streamMuxer: [Mplex],
connEncryption: [NOISE],
peerDiscovery: [MDNS]
}
})
}
describe('Fetch', () => {
/** @type {Libp2p} */
let sender
/** @type {Libp2p} */
let receiver
const PREFIX_A = '/moduleA/'
const PREFIX_B = '/moduleB/'
const DATA_A = { foobar: 'hello world' }
const DATA_B = { foobar: 'goodnight moon' }
const generateLookupFunction = function (prefix, data) {
return async function (key) {
key = key.slice(prefix.length) // strip prefix from key
const val = data[key]
if (val) {
return (new TextEncoder()).encode(val)
}
return null
}
}
beforeEach(async () => {
const [peerIdA, peerIdB] = await createPeerId({ number: 2 })
sender = await createLibp2pNode(peerIdA)
receiver = await createLibp2pNode(peerIdB)
await sender.start()
await receiver.start()
await Promise.all([
...sender.multiaddrs.map(addr => receiver.dial(addr.encapsulate(new Multiaddr(`/p2p/${sender.peerId}`)))),
...receiver.multiaddrs.map(addr => sender.dial(addr.encapsulate(new Multiaddr(`/p2p/${receiver.peerId}`))))
])
})
afterEach(async () => {
receiver.fetchService.unregisterLookupFunction(PREFIX_A)
receiver.fetchService.unregisterLookupFunction(PREFIX_B)
await sender.stop()
await receiver.stop()
})
it('fetch key that exists in receivers datastore', async () => {
receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A))
const rawData = await sender.fetch(receiver.peerId, '/moduleA/foobar')
const value = (new TextDecoder()).decode(rawData)
expect(value).to.equal('hello world')
})
it('Different lookups for different prefixes', async () => {
receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A))
receiver.fetchService.registerLookupFunction(PREFIX_B, generateLookupFunction(PREFIX_B, DATA_B))
const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar')
const valueA = (new TextDecoder()).decode(rawDataA)
expect(valueA).to.equal('hello world')
// Different lookup functions can be registered on different prefixes, and have different
// values for the same key underneath the different prefix.
const rawDataB = await sender.fetch(receiver.peerId, '/moduleB/foobar')
const valueB = (new TextDecoder()).decode(rawDataB)
expect(valueB).to.equal('goodnight moon')
})
it('fetch key that does not exist in receivers datastore', async () => {
receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A))
const result = await sender.fetch(receiver.peerId, '/moduleA/garbage')
expect(result).to.equal(null)
})
it('fetch key with unknown prefix throws error', async () => {
receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A))
await expect(sender.fetch(receiver.peerId, '/moduleUNKNOWN/foobar'))
.to.eventually.be.rejected.with.property('code', codes.ERR_INVALID_PARAMETERS)
})
it('registering multiple handlers for same prefix errors', async () => {
receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A))
expect(() => receiver.fetchService.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_B)))
.to.throw().with.property('code', codes.ERR_KEY_ALREADY_EXISTS)
})
it('can unregister handler', async () => {
const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A)
receiver.fetchService.registerLookupFunction(PREFIX_A, lookupFunction)
const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar')
const valueA = (new TextDecoder()).decode(rawDataA)
expect(valueA).to.equal('hello world')
receiver.fetchService.unregisterLookupFunction(PREFIX_A, lookupFunction)
await expect(sender.fetch(receiver.peerId, '/moduleA/foobar'))
.to.eventually.be.rejectedWith(/No lookup function registered for key/)
})
it('can unregister all handlers', async () => {
const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A)
receiver.fetchService.registerLookupFunction(PREFIX_A, lookupFunction)
const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar')
const valueA = (new TextDecoder()).decode(rawDataA)
expect(valueA).to.equal('hello world')
receiver.fetchService.unregisterLookupFunction(PREFIX_A)
await expect(sender.fetch(receiver.peerId, '/moduleA/foobar'))
.to.eventually.be.rejectedWith(/No lookup function registered for key/)
})
it('does not unregister wrong handlers', async () => {
const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A)
receiver.fetchService.registerLookupFunction(PREFIX_A, lookupFunction)
const rawDataA = await sender.fetch(receiver.peerId, '/moduleA/foobar')
const valueA = (new TextDecoder()).decode(rawDataA)
expect(valueA).to.equal('hello world')
receiver.fetchService.unregisterLookupFunction(PREFIX_A, () => {})
const rawDataB = await sender.fetch(receiver.peerId, '/moduleA/foobar')
const valueB = (new TextDecoder()).decode(rawDataB)
expect(valueB).to.equal('hello world')
})
})

View File

@ -8,6 +8,7 @@
], ],
"exclude": [ "exclude": [
"src/circuit/protocol/index.js", // exclude generated file "src/circuit/protocol/index.js", // exclude generated file
"src/fetch/proto.js", // exclude generated file
"src/identify/message.js", // exclude generated file "src/identify/message.js", // exclude generated file
"src/insecure/proto.js", // exclude generated file "src/insecure/proto.js", // exclude generated file
"src/peer-store/pb/peer.js", // exclude generated file "src/peer-store/pb/peer.js", // exclude generated file