diff --git a/src/dht.js b/src/dht.js index 361d84f8..f727c271 100644 --- a/src/dht.js +++ b/src/dht.js @@ -1,10 +1,15 @@ 'use strict' +const nextTick = require('async/nextTick') +const errCode = require('err-code') + +const { messages, codes } = require('./errors') + module.exports = (node) => { return { put: (key, value, callback) => { if (!node._dht) { - return callback(new Error('DHT is not available')) + return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)) } node._dht.put(key, value, callback) @@ -16,7 +21,7 @@ module.exports = (node) => { } if (!node._dht) { - return callback(new Error('DHT is not available')) + return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)) } node._dht.get(key, options, callback) @@ -28,7 +33,7 @@ module.exports = (node) => { } if (!node._dht) { - return callback(new Error('DHT is not available')) + return nextTick(callback, errCode(new Error(messages.DHT_DISABLED), codes.DHT_DISABLED)) } node._dht.getMany(key, nVals, options, callback) diff --git a/src/error-messages.js b/src/error-messages.js deleted file mode 100644 index e3183330..00000000 --- a/src/error-messages.js +++ /dev/null @@ -1,3 +0,0 @@ -'use strict' - -exports.NOT_STARTED_YET = 'The libp2p node is not started yet' diff --git a/src/errors.js b/src/errors.js new file mode 100644 index 00000000..64054bbd --- /dev/null +++ b/src/errors.js @@ -0,0 +1,11 @@ +'use strict' + +exports.messages = { + NOT_STARTED_YET: 'The libp2p node is not started yet', + DHT_DISABLED: 'DHT is not available' +} + +exports.codes = { + DHT_DISABLED: 'ERR_DHT_DISABLED', + PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED' +} diff --git a/src/pubsub.js b/src/pubsub.js index 8e33761a..f8dc7f9d 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -1,9 +1,11 @@ 'use strict' -const setImmediate = require('async/setImmediate') -const NOT_STARTED_YET = require('./error-messages').NOT_STARTED_YET +const nextTick = require('async/nextTick') +const { messages, codes } = require('./errors') const FloodSub = require('libp2p-floodsub') +const errCode = require('err-code') + module.exports = (node) => { const floodSub = new FloodSub(node) @@ -18,7 +20,7 @@ module.exports = (node) => { } if (!node.isStarted() && !floodSub.started) { - return setImmediate(() => callback(new Error(NOT_STARTED_YET))) + return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } function subscribe (cb) { @@ -27,7 +29,7 @@ module.exports = (node) => { } floodSub.on(topic, handler) - setImmediate(cb) + nextTick(cb) } subscribe(callback) @@ -35,7 +37,7 @@ module.exports = (node) => { unsubscribe: (topic, handler, callback) => { if (!node.isStarted() && !floodSub.started) { - throw new Error(NOT_STARTED_YET) + return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } if (!handler && !callback) { floodSub.removeAllListeners(topic) @@ -48,37 +50,37 @@ module.exports = (node) => { } if (typeof callback === 'function') { - setImmediate(() => callback()) + nextTick(() => callback()) } }, publish: (topic, data, callback) => { if (!node.isStarted() && !floodSub.started) { - return setImmediate(() => callback(new Error(NOT_STARTED_YET))) + return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } if (!Buffer.isBuffer(data)) { - return setImmediate(() => callback(new Error('data must be a Buffer'))) + return nextTick(callback, errCode(new Error('data must be a Buffer'), 'ERR_DATA_IS_NOT_A_BUFFER')) } floodSub.publish(topic, data) - setImmediate(() => callback()) + nextTick(() => callback()) }, ls: (callback) => { if (!node.isStarted() && !floodSub.started) { - return setImmediate(() => callback(new Error(NOT_STARTED_YET))) + return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } const subscriptions = Array.from(floodSub.subscriptions) - setImmediate(() => callback(null, subscriptions)) + nextTick(() => callback(null, subscriptions)) }, peers: (topic, callback) => { if (!node.isStarted() && !floodSub.started) { - return setImmediate(() => callback(new Error(NOT_STARTED_YET))) + return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)) } if (typeof topic === 'function') { @@ -90,7 +92,7 @@ module.exports = (node) => { .filter((peer) => topic ? peer.topics.has(topic) : true) .map((peer) => peer.info.id.toB58String()) - setImmediate(() => callback(null, peers)) + nextTick(() => callback(null, peers)) }, setMaxListeners (n) { diff --git a/test/dht.node.js b/test/dht.node.js index 41e370a9..cecd7b72 100644 --- a/test/dht.node.js +++ b/test/dht.node.js @@ -140,6 +140,7 @@ describe('.dht', () => { nodeA.dht.put(key, value, (err) => { expect(err).to.exist() + expect(err.code).to.equal('ERR_DHT_DISABLED') done() }) }) @@ -149,6 +150,7 @@ describe('.dht', () => { nodeA.dht.get(key, (err) => { expect(err).to.exist() + expect(err.code).to.equal('ERR_DHT_DISABLED') done() }) }) @@ -158,6 +160,7 @@ describe('.dht', () => { nodeA.dht.getMany(key, 10, (err) => { expect(err).to.exist() + expect(err.code).to.equal('ERR_DHT_DISABLED') done() }) }) diff --git a/test/pubsub.node.js b/test/pubsub.node.js index aaa2ba92..f3795c35 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -11,6 +11,7 @@ const parallel = require('async/parallel') const series = require('async/series') const _times = require('lodash.times') +const { codes } = require('../src/errors') const createNode = require('./utils/create-node') function startTwo (callback) { @@ -76,6 +77,10 @@ describe('.pubsub', () => { (cb) => setTimeout(cb, 500), // publish on the second (cb) => nodes[1].pubsub.publish('pubsub', data, cb), + // ls subscripts + (cb) => nodes[1].pubsub.ls(cb), + // get subscribed peers + (cb) => nodes[1].pubsub.peers('pubsub', cb), // Wait a moment before unsubscribing (cb) => setTimeout(cb, 500), // unsubscribe on the first @@ -132,6 +137,33 @@ describe('.pubsub', () => { expect(err).to.not.exist().mark() }) }) + it('publish should fail if data is not a buffer', (done) => { + createNode('/ip4/0.0.0.0/tcp/0', { + config: { + peerDiscovery: { + mdns: { + enabled: false + } + }, + EXPERIMENTAL: { + pubsub: true + } + } + }, (err, node) => { + expect(err).to.not.exist() + + node.start((err) => { + expect(err).to.not.exist() + + node.pubsub.publish('pubsub', 'datastr', (err) => { + expect(err).to.exist() + expect(err.code).to.equal('ERR_DATA_IS_NOT_A_BUFFER') + + done() + }) + }) + }) + }) }) describe('.pubsub off', () => { @@ -154,4 +186,73 @@ describe('.pubsub', () => { }) }) }) + + describe('.pubsub on and node not started', () => { + let libp2pNode + + before(function (done) { + createNode('/ip4/0.0.0.0/tcp/0', { + config: { + peerDiscovery: { + mdns: { + enabled: false + } + }, + EXPERIMENTAL: { + pubsub: true + } + } + }, (err, node) => { + expect(err).to.not.exist() + + libp2pNode = node + done() + }) + }) + + it('fail to subscribe if node not started yet', (done) => { + libp2pNode.pubsub.subscribe('pubsub', () => { }, (err) => { + expect(err).to.exist() + expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED) + + done() + }) + }) + + it('fail to unsubscribe if node not started yet', (done) => { + libp2pNode.pubsub.unsubscribe('pubsub', () => { }, (err) => { + expect(err).to.exist() + expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED) + + done() + }) + }) + + it('fail to publish if node not started yet', (done) => { + libp2pNode.pubsub.publish('pubsub', Buffer.from('data'), (err) => { + expect(err).to.exist() + expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED) + + done() + }) + }) + + it('fail to ls if node not started yet', (done) => { + libp2pNode.pubsub.ls((err) => { + expect(err).to.exist() + expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED) + + done() + }) + }) + + it('fail to get subscribed peers to a topic if node not started yet', (done) => { + libp2pNode.pubsub.peers('pubsub', (err) => { + expect(err).to.exist() + expect(err.code).to.equal(codes.PUBSUB_NOT_STARTED) + + done() + }) + }) + }) })