Compare commits

..

5 Commits

14 changed files with 479 additions and 200 deletions

View File

@ -1,3 +1,12 @@
## [0.8.2](https://github.com/libp2p/js-interfaces/compare/v0.8.1...v0.8.2) (2021-01-20)
### Bug Fixes
* event emitter types with local types ([#80](https://github.com/libp2p/js-interfaces/issues/80)) ([ca52077](https://github.com/libp2p/js-interfaces/commit/ca520775eb26f5ed501375fdb24ba698c9a8c8c8))
## [0.8.1](https://github.com/libp2p/js-interfaces/compare/v0.8.0...v0.8.1) (2020-12-11)

View File

@ -4,6 +4,8 @@
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-interfaces.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-interfaces)
[![GitHub Workflow Status](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-interfaces/ci?label=ci&style=flat-square)](https://github.com/libp2p/js-libp2p-interfaces/actions?query=branch%3Amaster+workflow%3Aci+)
> Contains test suites and interfaces you can use to implement the various components of libp2p.

View File

@ -1,6 +1,6 @@
{
"name": "libp2p-interfaces",
"version": "0.8.1",
"version": "0.8.2",
"description": "Interfaces for JS Libp2p",
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
"main": "src/index.js",
@ -24,6 +24,7 @@
"scripts": {
"lint": "aegir lint",
"build": "aegir build",
"prepare": "aegir build --no-bundle",
"test": "aegir test",
"test:node": "aegir test --target node",
"test:browser": "aegir test --target browser",

View File

@ -1,7 +1,9 @@
'use strict'
const debug = require('debug')
const { EventEmitter } = require('events')
/** @typedef {import('../types').EventEmitterFactory} Events */
/** @type Events */
const EventEmitter = require('events')
const errcode = require('err-code')
const { pipe } = require('it-pipe')

View File

@ -1,6 +1,8 @@
'use strict'
const { EventEmitter } = require('events')
/** @typedef {import('../types').EventEmitterFactory} Events */
/** @type Events */
const EventEmitter = require('events')
const lp = require('it-length-prefixed')

View File

@ -1,8 +1,7 @@
/* eslint-env mocha */
'use strict'
const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const pDefer = require('p-defer')

View File

@ -0,0 +1,305 @@
/* eslint-env mocha */
'use strict'
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const pDefer = require('p-defer')
const pWaitFor = require('p-wait-for')
const uint8ArrayToString = require('uint8arrays/to-string')
const { expectSet } = require('./utils')
module.exports = (common) => {
describe('pubsub connection handlers', () => {
let psA, psB
describe('nodes send state on connection', () => {
// Create pubsub nodes and connect them
before(async () => {
[psA, psB] = await common.setup(2)
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
// Start pubsub
psA.start()
psB.start()
})
// Make subscriptions prior to nodes connected
before(() => {
psA.subscribe('Za')
psB.subscribe('Zb')
expect(psA.peers.size).to.equal(0)
expectSet(psA.subscriptions, ['Za'])
expect(psB.peers.size).to.equal(0)
expectSet(psB.subscriptions, ['Zb'])
})
after(async () => {
sinon.restore()
await common.teardown()
})
it('existing subscriptions are sent upon peer connection', async function () {
this.timeout(10e3)
await Promise.all([
psA._libp2p.dial(psB.peerId),
new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)),
new Promise((resolve) => psB.once('pubsub:subscription-change', resolve))
])
expect(psA.peers.size).to.equal(1)
expect(psB.peers.size).to.equal(1)
expectSet(psA.subscriptions, ['Za'])
expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()])
expectSet(psB.subscriptions, ['Zb'])
expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()])
})
})
describe('pubsub started before connect', () => {
// Create pubsub nodes and start them
beforeEach(async () => {
[psA, psB] = await common.setup(2)
psA.start()
psB.start()
})
afterEach(async () => {
sinon.restore()
await common.teardown()
})
it('should get notified of connected peers on dial', async () => {
const connection = await psA._libp2p.dial(psB.peerId)
expect(connection).to.exist()
return Promise.all([
pWaitFor(() => psA.peers.size === 1),
pWaitFor(() => psB.peers.size === 1)
])
})
it('should receive pubsub messages', async () => {
const defer = pDefer()
const topic = 'test-topic'
const data = 'hey!'
await psA._libp2p.dial(psB.peerId)
let subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.not.include(topic)
psA.on(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
defer.resolve()
})
psA.subscribe(topic)
subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.include(topic)
// wait for psB to know about psA subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psA.peerId.toB58String())
})
psB.publish(topic, data)
await defer.promise
})
})
describe('pubsub started after connect', () => {
// Create pubsub nodes
beforeEach(async () => {
[psA, psB] = await common.setup(2)
})
afterEach(async () => {
sinon.restore()
psA && psA.stop()
psB && psB.stop()
await common.teardown()
})
it('should get notified of connected peers after starting', async () => {
const connection = await psA._libp2p.dial(psB.peerId)
expect(connection).to.exist()
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
psA.start()
psB.start()
return Promise.all([
pWaitFor(() => psA.peers.size === 1),
pWaitFor(() => psB.peers.size === 1)
])
})
it('should receive pubsub messages', async () => {
const defer = pDefer()
const topic = 'test-topic'
const data = 'hey!'
await psA._libp2p.dial(psB.peerId)
psA.start()
psB.start()
await Promise.all([
pWaitFor(() => psA.peers.size === 1),
pWaitFor(() => psB.peers.size === 1)
])
let subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.not.include(topic)
psA.on(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
defer.resolve()
})
psA.subscribe(topic)
subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.include(topic)
// wait for psB to know about psA subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psA.peerId.toB58String())
})
psB.publish(topic, data)
await defer.promise
})
})
describe('pubsub with intermittent connections', () => {
// Create pubsub nodes and start them
beforeEach(async () => {
[psA, psB] = await common.setup(2)
psA.start()
psB.start()
})
afterEach(async () => {
sinon.restore()
psA && psA.stop()
psB && psB.stop()
await common.teardown()
})
it('should receive pubsub messages after a node restart', async function () {
this.timeout(10e3)
const topic = 'test-topic'
const data = 'hey!'
const psAid = psA.peerId.toB58String()
let counter = 0
const defer1 = pDefer()
const defer2 = pDefer()
await psA._libp2p.dial(psB.peerId)
let subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.not.include(topic)
psA.on(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal(data)
counter++
counter === 1 ? defer1.resolve() : defer2.resolve()
})
psA.subscribe(topic)
subscribedTopics = psA.getTopics()
expect(subscribedTopics).to.include(topic)
// wait for psB to know about psA subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psAid)
})
psB.publish(topic, data)
await defer1.promise
psB.stop()
await psB._libp2p.stop()
await pWaitFor(() => !psA._libp2p.connectionManager.get(psB.peerId) && !psB._libp2p.connectionManager.get(psA.peerId))
await psB._libp2p.start()
psB.start()
psA._libp2p.peerStore.addressBook.set(psB.peerId, psB._libp2p.multiaddrs)
await psA._libp2p.dial(psB.peerId)
// wait for remoteLibp2p to know about libp2p subscription
await pWaitFor(() => {
const subscribedPeers = psB.getSubscribers(topic)
return subscribedPeers.includes(psAid)
})
psB.publish(topic, data)
await defer2.promise
})
it('should handle quick reconnects with a delayed disconnect', async () => {
// Subscribe on both
const handlerSpy = sinon.spy()
const topic = 'reconnect-channel'
psA.on(topic, handlerSpy)
psB.on(topic, handlerSpy)
await Promise.all([
psA.subscribe(topic),
psB.subscribe(topic)
])
// Create two connections to the remote peer
const originalConnection = await psA._libp2p.dialer.connectToPeer(psB.peerId)
// second connection
await psA._libp2p.dialer.connectToPeer(psB.peerId)
expect(psA._libp2p.connections.get(psB.peerId.toB58String())).to.have.length(2)
// Wait for subscriptions to occur
await pWaitFor(() => {
return psA.getSubscribers(topic).includes(psB.peerId.toB58String()) &&
psB.getSubscribers(topic).includes(psA.peerId.toB58String())
})
// Verify messages go both ways
psA.publish(topic, 'message1')
psB.publish(topic, 'message2')
await pWaitFor(() => handlerSpy.callCount >= 2)
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message1', 'message2'])
// Disconnect the first connection (this acts as a delayed reconnect)
const psAConnUpdateSpy = sinon.spy(psA._libp2p.connectionManager.connections, 'set')
await originalConnection.close()
await pWaitFor(() => psAConnUpdateSpy.callCount === 1)
// Verify messages go both ways after the disconnect
handlerSpy.resetHistory()
psA.publish(topic, 'message3')
psB.publish(topic, 'message4')
await pWaitFor(() => handlerSpy.callCount >= 2)
expect(handlerSpy.args.map(([message]) => uint8ArrayToString(message.data))).to.include.members(['message3', 'message4'])
})
})
})
}

View File

@ -1,8 +1,7 @@
/* eslint-env mocha */
'use strict'
const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const uint8ArrayFromString = require('uint8arrays/from-string')

View File

@ -4,6 +4,7 @@
const apiTest = require('./api')
const emitSelfTest = require('./emit-self')
const messagesTest = require('./messages')
const connectionHandlersTest = require('./connection-handlers')
const twoNodesTest = require('./two-nodes')
const multipleNodesTest = require('./multiple-nodes')
@ -12,6 +13,7 @@ module.exports = (common) => {
apiTest(common)
emitSelfTest(common)
messagesTest(common)
connectionHandlersTest(common)
twoNodesTest(common)
multipleNodesTest(common)
})

View File

@ -1,8 +1,7 @@
/* eslint-env mocha */
'use strict'
const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const PeerId = require('peer-id')

View File

@ -2,8 +2,7 @@
/* eslint max-nested-callbacks: ["error", 6] */
'use strict'
const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const delay = require('delay')

View File

@ -2,8 +2,7 @@
/* eslint max-nested-callbacks: ["error", 6] */
'use strict'
const chai = require('chai')
const { expect } = chai
const { expect } = require('aegir/utils/chai')
const sinon = require('sinon')
const pDefer = require('p-defer')
@ -24,205 +23,148 @@ function shouldNotHappen (_) {
module.exports = (common) => {
describe('pubsub with two nodes', () => {
describe('fresh nodes', () => {
let psA, psB
let psA, psB
// Create pubsub nodes and connect them
before(async () => {
[psA, psB] = await common.setup(2)
// Create pubsub nodes and connect them
before(async () => {
[psA, psB] = await common.setup(2)
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
// Start pubsub and connect nodes
psA.start()
psB.start()
// Start pubsub and connect nodes
psA.start()
psB.start()
await psA._libp2p.dial(psB.peerId)
await psA._libp2p.dial(psB.peerId)
// Wait for peers to be ready in pubsub
await pWaitFor(() => psA.peers.size === 1 && psB.peers.size === 1)
})
after(async () => {
sinon.restore()
psA && psA.stop()
psB && psB.stop()
await common.teardown()
})
it('Subscribe to a topic in nodeA', () => {
const defer = pDefer()
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
expectSet(psA.subscriptions, [topic])
expect(psB.peers.size).to.equal(1)
expectSet(psB.topics.get(topic), [psA.peerId.toB58String()])
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }])
defer.resolve()
})
psA.subscribe(topic)
return defer.promise
})
it('Publish to a topic in nodeA', () => {
const defer = pDefer()
psA.once(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal('hey')
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
})
psB.once(topic, shouldNotHappen)
psA.publish(topic, uint8ArrayFromString('hey'))
return defer.promise
})
it('Publish to a topic in nodeB', () => {
const defer = pDefer()
psA.once(topic, (msg) => {
psA.once(topic, shouldNotHappen)
expect(uint8ArrayToString(msg.data)).to.equal('banana')
setTimeout(() => {
psA.removeListener(topic, shouldNotHappen)
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
}, 100)
})
psB.once(topic, shouldNotHappen)
psB.publish(topic, uint8ArrayFromString('banana'))
return defer.promise
})
it('Publish 10 msg to a topic in nodeB', () => {
const defer = pDefer()
let counter = 0
psB.once(topic, shouldNotHappen)
psA.on(topic, receivedMsg)
function receivedMsg (msg) {
expect(uint8ArrayToString(msg.data)).to.equal('banana')
expect(msg.from).to.be.eql(psB.peerId.toB58String())
expect(msg.seqno).to.be.a('Uint8Array')
expect(msg.topicIDs).to.be.eql([topic])
if (++counter === 10) {
psA.removeListener(topic, receivedMsg)
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
}
}
Array.from({ length: 10 }, (_, i) => psB.publish(topic, uint8ArrayFromString('banana')))
return defer.promise
})
it('Unsubscribe from topic in nodeA', () => {
const defer = pDefer()
psA.unsubscribe(topic)
expect(psA.subscriptions.size).to.equal(0)
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
expect(psB.peers.size).to.equal(1)
expectSet(psB.topics.get(topic), [])
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }])
defer.resolve()
})
return defer.promise
})
it('Publish to a topic:Z in nodeA nodeB', () => {
const defer = pDefer()
psA.once('Z', shouldNotHappen)
psB.once('Z', shouldNotHappen)
setTimeout(() => {
psA.removeListener('Z', shouldNotHappen)
psB.removeListener('Z', shouldNotHappen)
defer.resolve()
}, 100)
psB.publish('Z', uint8ArrayFromString('banana'))
psA.publish('Z', uint8ArrayFromString('banana'))
return defer.promise
})
// Wait for peers to be ready in pubsub
await pWaitFor(() => psA.peers.size === 1 && psB.peers.size === 1)
})
describe('nodes send state on connection', () => {
let psA, psB
after(async () => {
sinon.restore()
// Create pubsub nodes and connect them
before(async () => {
[psA, psB] = await common.setup(2)
psA && psA.stop()
psB && psB.stop()
expect(psA.peers.size).to.be.eql(0)
expect(psB.peers.size).to.be.eql(0)
await common.teardown()
})
// Start pubsub and connect nodes
psA.start()
psB.start()
})
it('Subscribe to a topic in nodeA', () => {
const defer = pDefer()
// Make subscriptions prior to nodes connected
before(() => {
psA.subscribe('Za')
psB.subscribe('Zb')
expect(psA.peers.size).to.equal(0)
expectSet(psA.subscriptions, ['Za'])
expect(psB.peers.size).to.equal(0)
expectSet(psB.subscriptions, ['Zb'])
})
after(async () => {
sinon.restore()
psA && psA.stop()
psB && psB.stop()
await common.teardown()
})
it('existing subscriptions are sent upon peer connection', async function () {
this.timeout(10e3)
await Promise.all([
psA._libp2p.dial(psB.peerId),
new Promise((resolve) => psA.once('pubsub:subscription-change', resolve)),
new Promise((resolve) => psB.once('pubsub:subscription-change', resolve))
])
expect(psA.peers.size).to.equal(1)
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
expectSet(psA.subscriptions, [topic])
expect(psB.peers.size).to.equal(1)
expectSet(psA.subscriptions, ['Za'])
expectSet(psB.topics.get('Za'), [psA.peerId.toB58String()])
expectSet(psB.subscriptions, ['Zb'])
expectSet(psA.topics.get('Zb'), [psB.peerId.toB58String()])
expectSet(psB.topics.get(topic), [psA.peerId.toB58String()])
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: true }])
defer.resolve()
})
psA.subscribe(topic)
return defer.promise
})
it('Publish to a topic in nodeA', () => {
const defer = pDefer()
psA.once(topic, (msg) => {
expect(uint8ArrayToString(msg.data)).to.equal('hey')
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
})
psB.once(topic, shouldNotHappen)
psA.publish(topic, uint8ArrayFromString('hey'))
return defer.promise
})
it('Publish to a topic in nodeB', () => {
const defer = pDefer()
psA.once(topic, (msg) => {
psA.once(topic, shouldNotHappen)
expect(uint8ArrayToString(msg.data)).to.equal('banana')
setTimeout(() => {
psA.removeListener(topic, shouldNotHappen)
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
}, 100)
})
psB.once(topic, shouldNotHappen)
psB.publish(topic, uint8ArrayFromString('banana'))
return defer.promise
})
it('Publish 10 msg to a topic in nodeB', () => {
const defer = pDefer()
let counter = 0
psB.once(topic, shouldNotHappen)
psA.on(topic, receivedMsg)
function receivedMsg (msg) {
expect(uint8ArrayToString(msg.data)).to.equal('banana')
expect(msg.from).to.be.eql(psB.peerId.toB58String())
expect(msg.seqno).to.be.a('Uint8Array')
expect(msg.topicIDs).to.be.eql([topic])
if (++counter === 10) {
psA.removeListener(topic, receivedMsg)
psB.removeListener(topic, shouldNotHappen)
defer.resolve()
}
}
Array.from({ length: 10 }, (_, i) => psB.publish(topic, uint8ArrayFromString('banana')))
return defer.promise
})
it('Unsubscribe from topic in nodeA', () => {
const defer = pDefer()
psA.unsubscribe(topic)
expect(psA.subscriptions.size).to.equal(0)
psB.once('pubsub:subscription-change', (changedPeerId, changedSubs) => {
expect(psB.peers.size).to.equal(1)
expectSet(psB.topics.get(topic), [])
expect(changedPeerId.toB58String()).to.equal(first(psB.peers).id.toB58String())
expect(changedSubs).to.be.eql([{ topicID: topic, subscribe: false }])
defer.resolve()
})
return defer.promise
})
it('Publish to a topic:Z in nodeA nodeB', () => {
const defer = pDefer()
psA.once('Z', shouldNotHappen)
psB.once('Z', shouldNotHappen)
setTimeout(() => {
psA.removeListener('Z', shouldNotHappen)
psB.removeListener('Z', shouldNotHappen)
defer.resolve()
}, 100)
psB.publish('Z', uint8ArrayFromString('banana'))
psA.publish('Z', uint8ArrayFromString('banana'))
return defer.promise
})
})
}

View File

@ -1,6 +1,6 @@
'use strict'
const { expect } = require('chai')
const { expect } = require('aegir/utils/chai')
exports.first = (map) => map.values().next().value

18
src/types.ts Normal file
View File

@ -0,0 +1,18 @@
export interface EventEmitterFactory {
new(): EventEmitter;
}
export interface EventEmitter {
addListener(event: string | symbol, listener: (...args: any[]) => void);
on(event: string | symbol, listener: (...args: any[]) => void);
once(event: string | symbol, listener: (...args: any[]) => void);
removeListener(event: string | symbol, listener: (...args: any[]) => void);
off(event: string | symbol, listener: (...args: any[]) => void);
removeAllListeners(event?: string | symbol);
setMaxListeners(n: number);
getMaxListeners(): number;
listeners(event: string | symbol): Function[]; // eslint-disable-line @typescript-eslint/ban-types
rawListeners(event: string | symbol): Function[]; // eslint-disable-line @typescript-eslint/ban-types
emit(event: string | symbol, ...args: any[]): boolean;
listenerCount(event: string | symbol): number;
}