Compare commits

..

1 Commits

Author SHA1 Message Date
1e9d909078 chore: complete pubsub tests 2021-01-20 14:08:36 +01:00
45 changed files with 525 additions and 397 deletions

View File

@ -27,7 +27,7 @@ jobs:
strategy:
matrix:
os: [windows-latest, ubuntu-latest, macos-latest]
node: [14, 15]
node: [12, 14]
fail-fast: true
steps:
- uses: actions/checkout@v2

View File

@ -1,20 +1,3 @@
# [0.9.0](https://github.com/libp2p/js-interfaces/compare/v0.8.4...v0.9.0) (2021-04-07)
## [0.8.4](https://github.com/libp2p/js-interfaces/compare/v0.8.3...v0.8.4) (2021-03-22)
### Bug Fixes
* specify connection direction ([#86](https://github.com/libp2p/js-interfaces/issues/86)) ([3b960d5](https://github.com/libp2p/js-interfaces/commit/3b960d516f70f7e198574a736cb09000ddd7a94c))
## [0.8.3](https://github.com/libp2p/js-interfaces/compare/v0.8.2...v0.8.3) (2021-01-26)
## [0.8.2](https://github.com/libp2p/js-interfaces/compare/v0.8.1...v0.8.2) (2021-01-20)

View File

@ -1,6 +1,6 @@
{
"name": "libp2p-interfaces",
"version": "0.9.0",
"version": "0.8.2",
"description": "Interfaces for JS Libp2p",
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
"main": "src/index.js",
@ -22,9 +22,9 @@
"extends": "ipfs"
},
"scripts": {
"prepare": "aegir build --no-bundle",
"lint": "aegir lint",
"build": "aegir build",
"prepare": "aegir build --no-bundle",
"test": "aegir test",
"test:node": "aegir test --target node",
"test:browser": "aegir test --target browser",
@ -47,43 +47,39 @@
},
"homepage": "https://github.com/libp2p/js-interfaces#readme",
"dependencies": {
"@types/bl": "4.1.0",
"@types/bl": "^2.1.0",
"abort-controller": "^3.0.0",
"abortable-iterator": "^3.0.0",
"chai": "^4.3.4",
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"debug": "^4.3.1",
"delay": "^5.0.0",
"debug": "^4.1.1",
"delay": "^4.3.0",
"detect-node": "^2.0.4",
"dirty-chai": "^2.0.1",
"err-code": "^3.0.1",
"it-goodbye": "^2.0.2",
"err-code": "^2.0.0",
"it-goodbye": "^2.0.1",
"it-length-prefixed": "^3.1.0",
"it-pair": "^1.0.0",
"it-pipe": "^1.1.0",
"it-pushable": "^1.4.2",
"libp2p-crypto": "^0.19.0",
"it-pushable": "^1.4.0",
"libp2p-crypto": "^0.18.0",
"libp2p-tcp": "^0.15.0",
"multiaddr": "^8.1.2",
"multibase": "^4.0.2",
"multihashes": "^4.0.2",
"multiaddr": "^8.0.0",
"multibase": "^3.0.0",
"multihashes": "^3.0.1",
"p-defer": "^3.0.0",
"p-limit": "^3.1.0",
"p-wait-for": "^3.2.0",
"peer-id": "^0.14.2",
"p-limit": "^2.3.0",
"p-wait-for": "^3.1.0",
"peer-id": "^0.14.0",
"protons": "^2.0.0",
"sinon": "^10.0.0",
"streaming-iterables": "^5.0.4",
"uint8arrays": "^2.1.3"
"sinon": "^9.0.2",
"streaming-iterables": "^5.0.2",
"uint8arrays": "^1.1.0"
},
"devDependencies": {
"@types/debug": "^4.1.5",
"aegir": "^32.1.0",
"cids": "^1.1.6",
"events": "^3.3.0",
"it-handshake": "^1.0.2",
"rimraf": "^3.0.2",
"util": "^0.12.3"
"aegir": "^29.2.0",
"it-handshake": "^1.0.1",
"rimraf": "^3.0.2"
},
"contributors": [
"Alan Shaw <alan.shaw@protocol.ai>",

View File

@ -19,7 +19,7 @@ const connectionSymbol = Symbol.for('@libp2p/interface-connection/connection')
* @property {number} [close]
*
* @typedef {Object} ConectionStat
* @property {'inbound' | 'outbound'} direction - connection establishment direction
* @property {string} direction - connection establishment direction ("inbound" or "outbound").
* @property {Timeline} timeline - connection relevant events timestamp.
* @property {string} [multiplexer] - connection multiplexing identifier.
* @property {string} [encryption] - connection encryption method identifier.
@ -230,15 +230,6 @@ class Connection {
module.exports = Connection
/**
* @param {multiaddr|undefined} localAddr
* @param {PeerId} localPeer
* @param {PeerId} remotePeer
* @param {(protocols: string | string[]) => Promise<{ stream: import("../stream-muxer/types").MuxedStream; protocol: string; }>} newStream
* @param {() => Promise<void>} close
* @param {() => import("../stream-muxer/types").MuxedStream[]} getStreams
* @param {{ direction: any; timeline: any; multiplexer?: string | undefined; encryption?: string | undefined; }} stat
*/
function validateArgs (localAddr, localPeer, remotePeer, newStream, close, getStreams, stat) {
if (localAddr && !multiaddr.isMultiaddr(localAddr)) {
throw errCode(new Error('localAddr must be an instance of multiaddr'), 'ERR_INVALID_PARAMETERS')

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,11 +0,0 @@
export = ContentRouting;
import PeerId from 'peer-id'
import Multiaddr from 'multiaddr'
import CID from 'cids'
declare class ContentRouting {
constructor (options: Object);
provide (cid: CID): Promise<void>;
findProviders (cid: CID, options: Object): AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>;
}

View File

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,10 +0,0 @@
export = PeerDiscovery;
import events from 'events';
declare class PeerDiscovery extends events.EventEmitter {
constructor (options: Object);
start (): Promise<void>;
stop (): Promise<void>;
tag: string;
}

View File

@ -1,10 +0,0 @@
export = PeerRouting;
import PeerId from 'peer-id'
import Multiaddr from 'multiaddr'
declare class PeerRouting {
constructor (options?: Object);
findPeer (peerId: PeerId, options?: Object): Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>;
getClosestPeers(key: Uint8Array, options?: Object): AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>;
}

View File

@ -10,7 +10,9 @@ const { pipe } = require('it-pipe')
const MulticodecTopology = require('../topology/multicodec-topology')
const { codes } = require('./errors')
/**
* @type {typeof import('./message')}
*/
const message = require('./message')
const PeerStreams = require('./peer-streams')
const { SignaturePolicy } = require('./signature-policy')
@ -27,9 +29,9 @@ const {
* @typedef {import('bl')} BufferList
* @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream
* @typedef {import('../connection/connection')} Connection
* @typedef {import('./message/types').RPC} RPC
* @typedef {import('./message/types').SubOpts} RPCSubOpts
* @typedef {import('./message/types').Message} RPCMessage
* @typedef {import('./message').RPC} RPC
* @typedef {import('./message').SubOpts} RPCSubOpts
* @typedef {import('./message').Message} RPCMessage
* @typedef {import('./signature-policy').SignaturePolicyType} SignaturePolicyType
*/
@ -42,16 +44,6 @@ const {
* @property {Uint8Array} data
* @property {Uint8Array} [signature]
* @property {Uint8Array} [key]
*
* @typedef {Object} PubsubProperties
* @property {string} debugName - log namespace
* @property {Array<string>|string} multicodecs - protocol identificers to connect
* @property {Libp2p} libp2p
*
* @typedef {Object} PubsubOptions
* @property {SignaturePolicyType} [globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled
* @property {boolean} [canRelayMessage = false] - if can relay messages not subscribed
* @property {boolean} [emitSelf = false] - if publish should emit to self, if subscribed
*/
/**
@ -60,7 +52,13 @@ const {
*/
class PubsubBaseProtocol extends EventEmitter {
/**
* @param {PubsubProperties & PubsubOptions} props
* @param {Object} props
* @param {string} props.debugName - log namespace
* @param {Array<string>|string} props.multicodecs - protocol identificers to connect
* @param {Libp2p} props.libp2p
* @param {SignaturePolicyType} [props.globalSignaturePolicy = SignaturePolicy.StrictSign] - defines how signatures should be handled
* @param {boolean} [props.canRelayMessage = false] - if can relay messages not subscribed
* @param {boolean} [props.emitSelf = false] - if publish should emit to self, if subscribed
* @abstract
*/
constructor ({
@ -85,9 +83,8 @@ class PubsubBaseProtocol extends EventEmitter {
super()
this.log = Object.assign(debug(debugName), {
err: debug(`${debugName}:error`)
})
this.log = debug(debugName)
this.log.err = debug(`${debugName}:error`)
/**
* @type {Array<string>}
@ -125,7 +122,7 @@ class PubsubBaseProtocol extends EventEmitter {
// validate signature policy
if (!SignaturePolicy[globalSignaturePolicy]) {
throw errcode(new Error('Invalid global signature policy'), codes.ERR_INVALID_SIGNATURE_POLICY)
throw errcode(new Error('Invalid global signature policy'), codes.ERR_INVALID_SIGUATURE_POLICY)
}
/**
@ -382,9 +379,7 @@ class PubsubBaseProtocol extends EventEmitter {
if (subs.length) {
// update peer subscriptions
subs.forEach((/** @type {RPCSubOpts} */ subOpt) => {
this._processRpcSubOpt(idB58Str, subOpt)
})
subs.forEach((subOpt) => this._processRpcSubOpt(idB58Str, subOpt))
this.emit('pubsub:subscription-change', peerStreams.id, subs)
}
@ -394,9 +389,8 @@ class PubsubBaseProtocol extends EventEmitter {
}
if (msgs.length) {
// @ts-ignore RPC message is modified
msgs.forEach((message) => {
if (!(this.canRelayMessage || message.topicIDs.some((/** @type {string} */ topic) => this.subscriptions.has(topic)))) {
msgs.forEach(message => {
if (!(this.canRelayMessage || message.topicIDs.some((topic) => this.subscriptions.has(topic)))) {
this.log('received message we didn\'t subscribe to. Dropping.')
return
}
@ -595,7 +589,7 @@ class PubsubBaseProtocol extends EventEmitter {
for (const topic of message.topicIDs) {
const validatorFn = this.topicValidators.get(topic)
if (!validatorFn) {
continue // eslint-disable-line
continue
}
await validatorFn(topic, message)
}

View File

@ -1,16 +1,17 @@
'use strict'
// @ts-ignore protons not typed
const protons = require('protons')
const rpcProto = protons(require('./rpc.proto.js'))
const RPC = rpcProto.RPC
const topicDescriptorProto = protons(require('./topic-descriptor.proto.js'))
module.exports = {
rpc: rpcProto,
td: topicDescriptorProto,
RPC,
Message: RPC.Message,
SubOpts: RPC.SubOpts
}
/**
* @module pubsub/message/index
*/
exports = module.exports
exports.rpc = rpcProto
exports.td = topicDescriptorProto
exports.RPC = RPC
exports.Message = RPC.Message
exports.SubOpts = RPC.SubOpts

View File

@ -1,5 +0,0 @@
import { RPC, Message, SubOpts } from './types'
export type RPC = RPC
export type Message = Message
export type SubOpts = SubOpts

View File

@ -1,20 +1,21 @@
'use strict'
const debug = require('debug')
const log = Object.assign(debug('libp2p-pubsub:peer-streams'), {
error: debug('libp2p-pubsub:peer-streams:err')
})
/** @typedef {import('../types').EventEmitterFactory} Events */
/** @type Events */
const EventEmitter = require('events')
const lp = require('it-length-prefixed')
/** @type {typeof import('it-pushable').default} */
// @ts-ignore
const pushable = require('it-pushable')
const { pipe } = require('it-pipe')
const { source: abortable } = require('abortable-iterator')
const AbortController = require('abort-controller').default
const debug = require('debug')
const log = debug('libp2p-pubsub:peer-streams')
log.error = debug('libp2p-pubsub:peer-streams:error')
/**
* @typedef {import('../stream-muxer/types').MuxedStream} MuxedStream
@ -169,7 +170,7 @@ class PeerStreams extends EventEmitter {
this.outboundStream,
lp.encode(),
this._rawOutboundStream
).catch(/** @param {Error} err */ err => {
).catch(err => {
log.error(err)
})

View File

@ -1,9 +1,7 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'
const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const pDefer = require('p-defer')

View File

@ -0,0 +1,305 @@
/* eslint-env mocha */
'use strict'
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const pDefer = require('p-defer')
const pWaitFor = require('p-wait-for')
const uint8ArrayToString = require('uint8arrays/to-string')
const { expectSet } = require('./utils')
module.exports = (common) => {
describe('pubsub connection handlers', () => {
let psA, psB
describe('nodes send state on connection', () => {
// Create pubsub nodes and connect them
before(async () => {
[psA, psB] = await common.setup(2)
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
// Start pubsub
psA.start()
psB.start()
})
// Make subscriptions prior to nodes connected
before(() => {
psA.subscribe('Za')
psB.subscribe('Zb')
expect(psA.peers.size).to.equal(0)
expectSet(psA.subscriptions, ['Za'])
expect(psB.peers.size).to.equal(0)
expectSet(psB.subscriptions, ['Zb'])
})
after(async () => {
sinon.restore()
await common.teardown()
})
it('existing subscriptions are sent upon peer connection', async function () {
this.timeout(10e3)
await Promise.all([
psA._libp2p.dial(psB.peerId),
new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)),
new Promise((resolve) => psB.once('pubsub:subscription-change', resolve))
])
expect(psA.peers.size).to.equal(1)
expect(psB.peers.size).to.equal(1)
expectSet(psA.subscriptions, ['Za'])
expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()])
expectSet(psB.subscriptions, ['Zb'])
expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()])
})
})
describe('pubsub started before connect', () => {
// Create pubsub nodes and start them
beforeEach(async () => {
[psA, psB] = await common.setup(2)
psA.start()
psB.start()
})
afterEach(async () => {
sinon.restore()
await common.teardown()
})
it('should get notified of connected peers on dial', async () => {
const connection = await psA._libp2p.dial(psB.peerId)
expect(connection).to.exist()
return Promise.all([
pWaitFor(() => psA.peers.size === 1),
pWaitFor(() => psB.peers.size === 1)
])
})
it('should receive pubsub messages', async () => {
const defer = pDefer()
const topic = 'test-topic'
const data = 'hey!'
await psA._libp2p.dial(psB.peerId)
let subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.not.include(topic)
psA.on(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
defer.resolve()
})
psA.subscribe(topic)
subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.include(topic)
// wait for psB to know about psA subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psA.peerId.toB58String())
})
psB.publish(topic, data)
await defer.promise
})
})
describe('pubsub started after connect', () => {
// Create pubsub nodes
beforeEach(async () => {
[psA, psB] = await common.setup(2)
})
afterEach(async () => {
sinon.restore()
psA && psA.stop()
psB && psB.stop()
await common.teardown()
})
it('should get notified of connected peers after starting', async () => {
const connection = await psA._libp2p.dial(psB.peerId)
expect(connection).to.exist()
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
psA.start()
psB.start()
return Promise.all([
pWaitFor(() => psA.peers.size === 1),
pWaitFor(() => psB.peers.size === 1)
])
})
it('should receive pubsub messages', async () => {
const defer = pDefer()
const topic = 'test-topic'
const data = 'hey!'
await psA._libp2p.dial(psB.peerId)
psA.start()
psB.start()
await Promise.all([
pWaitFor(() => psA.peers.size === 1),
pWaitFor(() => psB.peers.size === 1)
])
let subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.not.include(topic)
psA.on(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
defer.resolve()
})
psA.subscribe(topic)
subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.include(topic)
// wait for psB to know about psA subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psA.peerId.toB58String())
})
psB.publish(topic, data)
await defer.promise
})
})
describe('pubsub with intermittent connections', () => {
// Create pubsub nodes and start them
beforeEach(async () => {
[psA, psB] = await common.setup(2)
psA.start()
psB.start()
})
afterEach(async () => {
sinon.restore()
psA && psA.stop()
psB && psB.stop()
await common.teardown()
})
it('should receive pubsub messages after a node restart', async function () {
this.timeout(10e3)
const topic = 'test-topic'
const data = 'hey!'
const psAid = psA.peerId.toB58String()
let counter = 0
const defer1 = pDefer()
const defer2 = pDefer()
await psA._libp2p.dial(psB.peerId)
let subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.not.include(topic)
psA.on(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
counter++
counter === 1 ? defer1.resolve() : defer2.resolve()
})
psA.subscribe(topic)
subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.include(topic)
// wait for psB to know about psA subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psAid)
})
psB.publish(topic, data)
await defer1.promise
psB.stop()
await psB._libp2p.stop()
await pWaitFor(() => !psA._libp2p.connectionManager.get(psB.peerId) && !psB._libp2p.connectionManager.get(psA.peerId))
await psB._libp2p.start()
psB.start()
psA._libp2p.peerStore.addressBook.set(psB.peerId, psB._libp2p.multiaddrs)
await psA._libp2p.dial(psB.peerId)
// wait for remoteLibp2p to know about libp2p subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psAid)
})
psB.publish(topic, data)
await defer2.promise
})
it('should handle quick reconnects with a delayed disconnect', async () => {
// Subscribe on both
const handlerSpy = sinon.spy()
const topic = 'reconnect-channel'
psA.on(topic, handlerSpy)
psB.on(topic, handlerSpy)
await Promise.all([
psA.subscribe(topic),
psB.subscribe(topic)
])
// Create two connections to the remote peer
const originalConnection = await psA._libp2p.dialer.connectToPeer(psB.peerId)
// second connection
await psA._libp2p.dialer.connectToPeer(psB.peerId)
expect(psA._libp2p.connections.get(psB.peerId.toB58String())).to.have.length(2)
// Wait for subscriptions to occur
await pWaitFor(() => {
return psA.getSubscribers(topic).includes(psB.peerId.toB58String()) &&
psB.getSubscribers(topic).includes(psA.peerId.toB58String())
})
// Verify messages go both ways
psA.publish(topic, 'message1')
psB.publish(topic, 'message2')
await pWaitFor(() => handlerSpy.callCount >= 2)
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message1', 'message2'])
// Disconnect the first connection (this acts as a delayed reconnect)
const psAConnUpdateSpy = sinon.spy(psA._libp2p.connectionManager.connections, 'set')
await originalConnection.close()
await pWaitFor(() => psAConnUpdateSpy.callCount === 1)
// Verify messages go both ways after the disconnect
handlerSpy.resetHistory()
psA.publish(topic, 'message3')
psB.publish(topic, 'message4')
await pWaitFor(() => handlerSpy.callCount >= 2)
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message3', 'message4'])
})
})
})
}

View File

@ -1,9 +1,7 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'
const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const uint8ArrayFromString = require('uint8arrays/from-string')

View File

@ -1,10 +1,10 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'
const apiTest = require('./api')
const emitSelfTest = require('./emit-self')
const messagesTest = require('./messages')
const connectionHandlersTest = require('./connection-handlers')
const twoNodesTest = require('./two-nodes')
const multipleNodesTest = require('./multiple-nodes')
@ -13,6 +13,7 @@ module.exports = (common) => {
apiTest(common)
emitSelfTest(common)
messagesTest(common)
connectionHandlersTest(common)
twoNodesTest(common)
multipleNodesTest(common)
})

View File

@ -1,9 +1,7 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'
const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const PeerId = require('peer-id')

View File

@ -1,10 +1,8 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 6] */
'use strict'
const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const delay = require('delay')

View File

@ -1,10 +1,8 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 6] */
'use strict'
const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const pDefer = require('p-defer')
@ -25,205 +23,148 @@ function shouldNotHappen (_) {
module.exports = (common) => {
describe('pubsub with two nodes', () => {
describe('fresh nodes', () => {
let psA, psB
let psA, psB
// Create pubsub nodes and connect them
before(async () => {
[psA, psB] = await common.setup(2)
// Create pubsub nodes and connect them
before(async () => {
[psA, psB] = await common.setup(2)
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
// Start pubsub and connect nodes
psA.start()
psB.start()
// Start pubsub and connect nodes
psA.start()
psB.start()
await psA._libp2p.dial(psB.peerId)
await psA._libp2p.dial(psB.peerId)
// Wait for peers to be ready in pubsub
await pWaitFor(() => psA.peers.size === 1 && psB.peers.size === 1)
})
after(async () => {
sinon.restore()
psA && psA.stop()
psB && psB.stop()
await common.teardown()
})
it('Subscribe to a topic in nodeA', () => {
const defer = pDefer()
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
expectSet(psA.subscriptions, [topic])
expect(psB.peers.size).to.equal(1)
expectSet(psB.topics.get(topic), [psA.peerId.toB58String()])
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }])
defer.resolve()
})
psA.subscribe(topic)
return defer.promise
})
it('Publish to a topic in nodeA', () => {
const defer = pDefer()
psA.once(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal('hey')
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
})
psB.once(topic, shouldNotHappen)
psA.publish(topic, uint8ArrayFromString('hey'))
return defer.promise
})
it('Publish to a topic in nodeB', () => {
const defer = pDefer()
psA.once(topic, (msg) => {
psA.once(topic, shouldNotHappen)
expect(uint8ArrayToString(msg.data)).to.equal('banana')
setTimeout(() => {
psA.removeListener(topic, shouldNotHappen)
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
}, 100)
})
psB.once(topic, shouldNotHappen)
psB.publish(topic, uint8ArrayFromString('banana'))
return defer.promise
})
it('Publish 10 msg to a topic in nodeB', () => {
const defer = pDefer()
let counter = 0
psB.once(topic, shouldNotHappen)
psA.on(topic, receivedMsg)
function receivedMsg (msg) {
expect(uint8ArrayToString(msg.data)).to.equal('banana')
expect(msg.from).to.be.eql(psB.peerId.toB58String())
expect(msg.seqno).to.be.a('Uint8Array')
expect(msg.topicIDs).to.be.eql([topic])
if (++counter === 10) {
psA.removeListener(topic, receivedMsg)
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
}
}
Array.from({ length: 10 }, (_, i) => psB.publish(topic, uint8ArrayFromString('banana')))
return defer.promise
})
it('Unsubscribe from topic in nodeA', () => {
const defer = pDefer()
psA.unsubscribe(topic)
expect(psA.subscriptions.size).to.equal(0)
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
expect(psB.peers.size).to.equal(1)
expectSet(psB.topics.get(topic), [])
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }])
defer.resolve()
})
return defer.promise
})
it('Publish to a topic:Z in nodeA nodeB', () => {
const defer = pDefer()
psA.once('Z', shouldNotHappen)
psB.once('Z', shouldNotHappen)
setTimeout(() => {
psA.removeListener('Z', shouldNotHappen)
psB.removeListener('Z', shouldNotHappen)
defer.resolve()
}, 100)
psB.publish('Z', uint8ArrayFromString('banana'))
psA.publish('Z', uint8ArrayFromString('banana'))
return defer.promise
})
// Wait for peers to be ready in pubsub
await pWaitFor(() => psA.peers.size === 1 && psB.peers.size === 1)
})
describe('nodes send state on connection', () => {
let psA, psB
after(async () => {
sinon.restore()
// Create pubsub nodes and connect them
before(async () => {
[psA, psB] = await common.setup(2)
psA && psA.stop()
psB && psB.stop()
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
await common.teardown()
})
// Start pubsub and connect nodes
psA.start()
psB.start()
})
it('Subscribe to a topic in nodeA', () => {
const defer = pDefer()
// Make subscriptions prior to nodes connected
before(() => {
psA.subscribe('Za')
psB.subscribe('Zb')
expect(psA.peers.size).to.equal(0)
expectSet(psA.subscriptions, ['Za'])
expect(psB.peers.size).to.equal(0)
expectSet(psB.subscriptions, ['Zb'])
})
after(async () => {
sinon.restore()
psA && psA.stop()
psB && psB.stop()
await common.teardown()
})
it('existing subscriptions are sent upon peer connection', async function () {
this.timeout(10e3)
await Promise.all([
psA._libp2p.dial(psB.peerId),
new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)),
new Promise((resolve) => psB.once('pubsub:subscription-change', resolve))
])
expect(psA.peers.size).to.equal(1)
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
expectSet(psA.subscriptions, [topic])
expect(psB.peers.size).to.equal(1)
expectSet(psA.subscriptions, ['Za'])
expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()])
expectSet(psB.subscriptions, ['Zb'])
expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()])
expectSet(psB.topics.get(topic), [psA.peerId.toB58String()])
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }])
defer.resolve()
})
psA.subscribe(topic)
return defer.promise
})
it('Publish to a topic in nodeA', () => {
const defer = pDefer()
psA.once(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal('hey')
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
})
psB.once(topic, shouldNotHappen)
psA.publish(topic, uint8ArrayFromString('hey'))
return defer.promise
})
it('Publish to a topic in nodeB', () => {
const defer = pDefer()
psA.once(topic, (msg) => {
psA.once(topic, shouldNotHappen)
expect(uint8ArrayToString(msg.data)).to.equal('banana')
setTimeout(() => {
psA.removeListener(topic, shouldNotHappen)
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
}, 100)
})
psB.once(topic, shouldNotHappen)
psB.publish(topic, uint8ArrayFromString('banana'))
return defer.promise
})
it('Publish 10 msg to a topic in nodeB', () => {
const defer = pDefer()
let counter = 0
psB.once(topic, shouldNotHappen)
psA.on(topic, receivedMsg)
function receivedMsg (msg) {
expect(uint8ArrayToString(msg.data)).to.equal('banana')
expect(msg.from).to.be.eql(psB.peerId.toB58String())
expect(msg.seqno).to.be.a('Uint8Array')
expect(msg.topicIDs).to.be.eql([topic])
if (++counter === 10) {
psA.removeListener(topic, receivedMsg)
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
}
}
Array.from({ length: 10 }, (_, i) => psB.publish(topic, uint8ArrayFromString('banana')))
return defer.promise
})
it('Unsubscribe from topic in nodeA', () => {
const defer = pDefer()
psA.unsubscribe(topic)
expect(psA.subscriptions.size).to.equal(0)
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
expect(psB.peers.size).to.equal(1)
expectSet(psB.topics.get(topic), [])
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }])
defer.resolve()
})
return defer.promise
})
it('Publish to a topic:Z in nodeA nodeB', () => {
const defer = pDefer()
psA.once('Z', shouldNotHappen)
psB.once('Z', shouldNotHappen)
setTimeout(() => {
psA.removeListener('Z', shouldNotHappen)
psB.removeListener('Z', shouldNotHappen)
defer.resolve()
}, 100)
psB.publish('Z', uint8ArrayFromString('banana'))
psA.publish('Z', uint8ArrayFromString('banana'))
return defer.promise
})
})
}

View File

@ -1,7 +1,6 @@
// @ts-nocheck interface tests
'use strict'
const { expect } = require('chai')
const { expect } = require('aegir/utils/chai')
exports.first = (map) => map.values().next().value

View File

@ -1,11 +1,11 @@
'use strict'
// @ts-ignore libp2p crypto has no types
const randomBytes = require('libp2p-crypto/src/random-bytes')
const uint8ArrayToString = require('uint8arrays/to-string')
const uint8ArrayFromString = require('uint8arrays/from-string')
const PeerId = require('peer-id')
const multihash = require('multihashes')
exports = module.exports
/**
* Generatea random sequence number.
@ -13,7 +13,7 @@ const multihash = require('multihashes')
* @returns {Uint8Array}
* @private
*/
const randomSeqno = () => {
exports.randomSeqno = () => {
return randomBytes(8)
}
@ -25,7 +25,7 @@ const randomSeqno = () => {
* @returns {Uint8Array}
* @private
*/
const msgId = (from, seqno) => {
exports.msgId = (from, seqno) => {
const fromBytes = PeerId.createFromB58String(from).id
const msgId = new Uint8Array(fromBytes.length + seqno.length)
msgId.set(fromBytes, 0)
@ -40,28 +40,22 @@ const msgId = (from, seqno) => {
* @returns {Uint8Array}
* @private
*/
const noSignMsgId = (data) => multihash.encode(data, 'sha2-256')
exports.noSignMsgId = (data) => multihash.encode(data, 'sha2-256')
/**
* Check if any member of the first set is also a member
* of the second set.
*
* @param {Set<number>|Array<number>} a
* @param {Set<number>|Array<number>} b
* @param {Set|Array} a
* @param {Set|Array} b
* @returns {boolean}
* @private
*/
const anyMatch = (a, b) => {
exports.anyMatch = (a, b) => {
let bHas
if (Array.isArray(b)) {
/**
* @param {number} val
*/
bHas = (val) => b.indexOf(val) > -1
} else {
/**
* @param {number} val
*/
bHas = (val) => b.has(val)
}
@ -82,7 +76,7 @@ const anyMatch = (a, b) => {
* @returns {T[]}
* @private
*/
const ensureArray = (maybeArray) => {
exports.ensureArray = (maybeArray) => {
if (!Array.isArray(maybeArray)) {
return [maybeArray]
}
@ -98,7 +92,7 @@ const ensureArray = (maybeArray) => {
* @param {string} [peerId]
* @returns {T & {from?: string, peerId?: string }}
*/
const normalizeInRpcMessage = (message, peerId) => {
exports.normalizeInRpcMessage = (message, peerId) => {
const m = Object.assign({}, message)
if (message.from instanceof Uint8Array) {
m.from = uint8ArrayToString(message.from, 'base58btc')
@ -115,23 +109,13 @@ const normalizeInRpcMessage = (message, peerId) => {
* @param {T} message
* @returns {T & {from?: Uint8Array, data?: Uint8Array}}
*/
const normalizeOutRpcMessage = (message) => {
exports.normalizeOutRpcMessage = (message) => {
const m = Object.assign({}, message)
if (typeof message.from === 'string') {
if (typeof message.from === 'string' || message.from instanceof String) {
m.from = uint8ArrayFromString(message.from, 'base58btc')
}
if (typeof message.data === 'string') {
if (typeof message.data === 'string' || message.data instanceof String) {
m.data = uint8ArrayFromString(message.data)
}
return m
}
module.exports = {
randomSeqno,
msgId,
noSignMsgId,
anyMatch,
ensureArray,
normalizeInRpcMessage,
normalizeOutRpcMessage
}

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 8] */
'use strict'

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,11 +1,10 @@
// @ts-nocheck interface tests
'use strict'
const { expect } = require('chai')
const pair = require('it-pair/duplex')
const { pipe } = require('it-pipe')
const pLimit = require('p-limit')
const pLimit = require('p-limit').default
const { collect, tap, consume } = require('streaming-iterables')
module.exports = async (Muxer, nStreams, nMsg, limit) => {

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -43,9 +43,9 @@ export interface MuxedStream extends AsyncIterable<Uint8Array | BufferList> {
abort: () => void;
reset: () => void;
sink: Sink;
source: AsyncIterable<Uint8Array | BufferList>;
source: () => AsyncIterable<Uint8Array | BufferList>;
timeline: MuxedTimeline;
id: string;
}
export type Sink = (source: Uint8Array) => Promise<void>;
export type Sink = (source: Uint8Array) => Promise<Uint8Array>;

View File

@ -62,9 +62,6 @@ class Topology {
return Boolean(other && other[topologySymbol])
}
/**
* @param {any} registrar
*/
set registrar (registrar) { // eslint-disable-line
this._registrar = registrar
}

View File

@ -56,9 +56,6 @@ class MulticodecTopology extends Topology {
return Boolean(other && other[multicodecTopologySymbol])
}
/**
* @param {any} registrar
*/
set registrar (registrar) { // eslint-disable-line
this._registrar = registrar
this._registrar.peerStore.on('change:protocols', this._onProtocolChange)

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint-env mocha */
'use strict'

View File

@ -1,4 +1,3 @@
// @ts-nocheck interface tests
/* eslint max-nested-callbacks: ["error", 8] */
/* eslint-env mocha */
'use strict'

View File

@ -1,4 +1,3 @@
import BufferList from 'bl'
import events from 'events'
import Multiaddr from 'multiaddr'
import Connection from '../connection/connection'
@ -63,7 +62,7 @@ export type MultiaddrConnectionTimeline = {
export type MultiaddrConnection = {
sink: Sink;
source: AsyncIterable<Uint8Array | BufferList>;
source: () => AsyncIterable<Uint8Array>;
close: (err?: Error) => Promise<void>;
conn: unknown;
remoteAddr: Multiaddr;

View File

@ -3,13 +3,13 @@ export interface EventEmitterFactory {
}
export interface EventEmitter {
addListener(event: string | symbol, listener: (...args: any[]) => void): EventEmitter;
on(event: string | symbol, listener: (...args: any[]) => void): EventEmitter;
once(event: string | symbol, listener: (...args: any[]) => void): EventEmitter;
removeListener(event: string | symbol, listener: (...args: any[]) => void): EventEmitter;
off(event: string | symbol, listener: (...args: any[]) => void): EventEmitter;
removeAllListeners(event?: string | symbol): EventEmitter;
setMaxListeners(n: number): EventEmitter;
addListener(event: string | symbol, listener: (...args: any[]) => void);
on(event: string | symbol, listener: (...args: any[]) => void);
once(event: string | symbol, listener: (...args: any[]) => void);
removeListener(event: string | symbol, listener: (...args: any[]) => void);
off(event: string | symbol, listener: (...args: any[]) => void);
removeAllListeners(event?: string | symbol);
setMaxListeners(n: number);
getMaxListeners(): number;
listeners(event: string | symbol): Function[]; // eslint-disable-line @typescript-eslint/ban-types
rawListeners(event: string | symbol): Function[]; // eslint-disable-line @typescript-eslint/ban-types