136 lines
3.8 KiB
JavaScript
Raw Normal View History

2016-03-23 16:23:10 +01:00
'use strict'
const connect = require('it-ws/client')
2016-03-14 21:19:42 +00:00
const mafmt = require('mafmt')
2018-04-05 17:21:26 +01:00
const withIs = require('class-is')
const toUri = require('multiaddr-to-uri')
const { AbortError } = require('abortable-iterator')
const log = require('debug')('libp2p:websockets')
const assert = require('assert')
2016-08-11 14:50:44 +02:00
const createListener = require('./listener')
const toConnection = require('./socket-to-conn')
const { CODE_CIRCUIT, CODE_P2P } = require('./constants')
2016-03-14 20:25:00 +00:00
/**
* @class WebSockets
*/
2017-03-23 15:09:06 +00:00
class WebSockets {
/**
* @constructor
* @param {object} options
* @param {Upgrader} options.upgrader
*/
constructor ({ upgrader }) {
assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.')
this._upgrader = upgrader
}
/**
* @async
* @param {Multiaddr} ma
* @param {object} [options]
* @param {AbortSignal} [options.signal] Used to abort dial requests
* @returns {Connection} An upgraded Connection
*/
async dial (ma, options = {}) {
log('dialing %s', ma)
const socket = await this._connect(ma, options)
const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal })
log('new outbound connection %s', maConn.remoteAddr)
const conn = await this._upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
return conn
}
/**
* @private
* @param {Multiaddr} ma
* @param {object} [options]
* @param {AbortSignal} [options.signal] Used to abort dial requests
* @returns {Promise<WebSocket>} Resolves a extended duplex iterable on top of a WebSocket
*/
async _connect (ma, options = {}) {
if (options.signal && options.signal.aborted) {
throw new AbortError()
2016-03-14 20:25:00 +00:00
}
const cOpts = ma.toOptions()
log('dialing %s:%s', cOpts.host, cOpts.port)
const rawSocket = connect(toUri(ma), Object.assign({ binary: true }, options))
2016-03-14 20:25:00 +00:00
if (!options.signal) {
await rawSocket.connected()
log('connected %s', ma)
return rawSocket
}
// Allow abort via signal during connect
let onAbort
const abort = new Promise((resolve, reject) => {
onAbort = () => {
reject(new AbortError())
rawSocket.close()
2017-10-20 11:25:14 +01:00
}
// Already aborted?
if (options.signal.aborted) return onAbort()
options.signal.addEventListener('abort', onAbort)
})
try {
await Promise.race([abort, rawSocket.connected()])
} finally {
options.signal.removeEventListener('abort', onAbort)
}
log('connected %s', ma)
return rawSocket
2016-03-14 20:25:00 +00:00
}
/**
* Creates a Websockets listener. The provided `handler` function will be called
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
* @param {object} [options]
* @param {http.Server} [options.server] A pre-created Node.js HTTP/S server.
* @param {function (Connection)} handler
* @returns {Listener} A Websockets listener
*/
createListener (options = {}, handler) {
2016-03-14 20:25:00 +00:00
if (typeof options === 'function') {
handler = options
options = {}
}
return createListener({ handler, upgrader: this._upgrader }, options)
2016-03-14 20:25:00 +00:00
}
/**
* Takes a list of `Multiaddr`s and returns only valid Websockets addresses
* @param {Multiaddr[]} multiaddrs
* @returns {Multiaddr[]} Valid Websockets multiaddrs
*/
2016-08-11 14:50:44 +02:00
filter (multiaddrs) {
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]
2016-08-11 14:50:44 +02:00
2016-03-14 20:25:00 +00:00
return multiaddrs.filter((ma) => {
if (ma.protoCodes().includes(CODE_CIRCUIT)) {
return false
}
return mafmt.WebSockets.matches(ma.decapsulateCode(CODE_P2P)) ||
mafmt.WebSocketsSecure.matches(ma.decapsulateCode(CODE_P2P))
2016-03-14 20:25:00 +00:00
})
}
}
2017-03-23 15:09:06 +00:00
module.exports = withIs(WebSockets, {
className: 'WebSockets',
symbolName: '@libp2p/js-libp2p-websockets/websockets'
})