diff --git a/src/pubsub.js b/src/pubsub.js index 9bff4a1c..4547de9b 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -33,7 +33,7 @@ module.exports = (node) => { subscribe(callback) }, - unsubscribe: (topic, handler) => { + unsubscribe: (topic, handler, callback) => { if (!node.isStarted() && !floodSub.started) { throw new Error(NOT_STARTED_YET) } @@ -43,6 +43,10 @@ module.exports = (node) => { if (floodSub.listenerCount(topic) === 0) { floodSub.unsubscribe(topic) } + + if (typeof callback === 'function') { + setImmediate(() => callback()) + } }, publish: (topic, data, callback) => { diff --git a/test/pubsub.node.js b/test/pubsub.node.js index 0aaa75c9..5cc24db8 100644 --- a/test/pubsub.node.js +++ b/test/pubsub.node.js @@ -5,9 +5,10 @@ const chai = require('chai') chai.use(require('dirty-chai')) +chai.use(require('chai-checkmark')) const expect = chai.expect const parallel = require('async/parallel') -const waterfall = require('async/waterfall') +const series = require('async/series') const _times = require('lodash.times') const createNode = require('./utils/create-node') @@ -52,26 +53,39 @@ function stopTwo (nodes, callback) { // TODO: consider if all or some of those should come here describe('.pubsub', () => { describe('.pubsub on (default)', (done) => { - it('start two nodes and send one message', (done) => { - waterfall([ - (cb) => startTwo(cb), - (nodes, cb) => { - const data = Buffer.from('test') - nodes[0].pubsub.subscribe('pubsub', - (msg) => { - expect(msg.data).to.eql(data) - cb(null, nodes) - }, - (err) => { - expect(err).to.not.exist() - setTimeout(() => nodes[1].pubsub.publish('pubsub', data, (err) => { - expect(err).to.not.exist() - }), 500) - } - ) - }, - (nodes, cb) => stopTwo(nodes, cb) - ], done) + it('start two nodes and send one message, then unsubscribe', (done) => { + // Check the final series error, and the publish handler + expect(2).checks(done) + + let nodes + const data = Buffer.from('test') + const handler = (msg) => { + // verify the data is correct and mark the expect + expect(msg.data).to.eql(data).mark() + } + + series([ + // Start the nodes + (cb) => startTwo((err, _nodes) => { + nodes = _nodes + cb(err) + }), + // subscribe on the first + (cb) => nodes[0].pubsub.subscribe('pubsub', handler, cb), + // Wait a moment before publishing + (cb) => setTimeout(cb, 500), + // publish on the second + (cb) => nodes[1].pubsub.publish('pubsub', data, cb), + // Wait a moment before unsubscribing + (cb) => setTimeout(cb, 500), + // unsubscribe on the first + (cb) => nodes[0].pubsub.unsubscribe('pubsub', handler, cb), + // Stop both nodes + (cb) => stopTwo(nodes, cb) + ], (err) => { + // Verify there was no error, and mark the expect + expect(err).to.not.exist().mark() + }) }) })