fix: pubsub promisify (#456)

* fix: allow pubsub sub/unsub via promises

* chore: fix linting errors
This commit is contained in:
Alex Potsides
2019-09-24 13:02:07 +01:00
committed by Jacob Heun
parent 2a80618740
commit ae6af20e8e
17 changed files with 196 additions and 149 deletions

View File

@ -153,7 +153,7 @@ class Dialer {
const relays = Array.from(this.relayPeers.values())
const next = (nextRelay) => {
if (!nextRelay) {
const err = `no relay peers were found or all relays failed to dial`
const err = 'no relay peers were found or all relays failed to dial'
log.err(err)
return cb(err)
}
@ -235,7 +235,7 @@ class Dialer {
}
const message = proto.CircuitRelay.decode(msg)
if (message.type !== proto.CircuitRelay.Type.STATUS) {
return callback(new Error(`Got invalid message type - ` +
return callback(new Error('Got invalid message type - ' +
`expected ${proto.CircuitRelay.Type.STATUS} got ${message.type}`))
}

View File

@ -203,7 +203,7 @@ class Hop extends EE {
const message = proto.decode(msg)
if (message.code !== proto.Status.SUCCESS) {
return callback(new Error(`Unable to create circuit!`))
return callback(new Error('Unable to create circuit!'))
}
return callback(null, msg)

View File

@ -49,7 +49,7 @@ class StreamHandler {
*/
read (cb) {
if (!this.isValid()) {
return cb(new Error(`handler is not in a valid state`))
return cb(new Error('handler is not in a valid state'))
}
lp.decodeFromReader(
@ -77,7 +77,7 @@ class StreamHandler {
cb = cb || (() => {})
if (!this.isValid()) {
return cb(new Error(`handler is not in a valid state`))
return cb(new Error('handler is not in a valid state'))
}
pull(

View File

@ -132,10 +132,10 @@ module.exports = (swarm, options, connHandler) => {
if (!mafmt.Circuit.matches(addr)) {
if (addr.getPeerId()) {
// by default we're reachable over any relay
listenAddrs.push(multiaddr(`/p2p-circuit`).encapsulate(addr))
listenAddrs.push(multiaddr('/p2p-circuit').encapsulate(addr))
} else {
const ma = `${addr}/ipfs/${swarm._peerInfo.id.toB58String()}`
listenAddrs.push(multiaddr(`/p2p-circuit`).encapsulate(ma))
listenAddrs.push(multiaddr('/p2p-circuit').encapsulate(ma))
}
} else {
listenAddrs.push(addr.encapsulate(`/ipfs/${swarm._peerInfo.id.toB58String()}`))

View File

@ -32,27 +32,35 @@ module.exports = (node, Pubsub, config) => {
* const handler = (message) => { }
* libp2p.subscribe(topic, handler, callback)
*/
subscribe: promisify((topic, handler, options, callback) => {
subscribe: (topic, handler, options, callback) => {
// can't use promisify because it thinks the handler is a callback
if (typeof options === 'function') {
callback = options
options = {}
}
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
}
const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
function subscribe (cb) {
if (pubsub.listenerCount(topic) === 0) {
pubsub.subscribe(topic)
if (callback) {
return nextTick(() => callback(err))
}
pubsub.on(topic, handler)
nextTick(cb)
return Promise.reject(err)
}
subscribe(callback)
}),
if (pubsub.listenerCount(topic) === 0) {
pubsub.subscribe(topic)
}
pubsub.on(topic, handler)
if (callback) {
return nextTick(() => callback())
}
return Promise.resolve()
},
/**
* Unsubscribes from a pubsub topic
@ -76,9 +84,16 @@ module.exports = (node, Pubsub, config) => {
*
* libp2p.unsubscribe(topic, handler, callback)
*/
unsubscribe: promisify((topic, handler, callback) => {
unsubscribe: (topic, handler, callback) => {
// can't use promisify because it thinks the handler is a callback
if (!node.isStarted() && !pubsub.started) {
return nextTick(callback, errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED))
const err = errCode(new Error(messages.NOT_STARTED_YET), codes.PUBSUB_NOT_STARTED)
if (callback) {
return nextTick(() => callback(err))
}
return Promise.reject(err)
}
if (!handler) {
@ -91,12 +106,12 @@ module.exports = (node, Pubsub, config) => {
pubsub.unsubscribe(topic)
}
if (typeof callback === 'function') {
if (callback) {
return nextTick(() => callback())
}
return Promise.resolve()
}),
},
publish: promisify((topic, data, callback) => {
if (!node.isStarted() && !pubsub.started) {

View File

@ -5,7 +5,7 @@ const IncomingConnection = require('./incoming')
const observeConn = require('../observe-connection')
function listener (_switch) {
const log = debug(`libp2p:switch:listener`)
const log = debug('libp2p:switch:listener')
/**
* Takes a transport key and returns a connection handler function

View File

@ -41,7 +41,7 @@ module.exports = function protocolMuxer (protocols, observer) {
ms.handle(parentConn, (err) => {
if (err) {
log.error(`multistream handshake failed`, err)
log.error('multistream handshake failed', err)
}
})
}