refactor: async with multiaddr conn (#92)

BREAKING CHANGE: Switch to using async/await and async iterators. The transport and connection interfaces have changed. See the README for new usage.
This commit is contained in:
Vasco Santos
2019-09-30 12:14:28 +02:00
committed by Jacob Heun
parent 5a9434bc58
commit ce7bf4f1e0
15 changed files with 530 additions and 475 deletions

8
src/constants.js Normal file
View File

@ -0,0 +1,8 @@
'use strict'
// p2p multi-address code
exports.CODE_P2P = 421
exports.CODE_CIRCUIT = 290
// Time to wait for a connection to close gracefully before destroying it manually
exports.CLOSE_TIMEOUT = 2000

View File

@ -1,68 +1,135 @@
'use strict'
const connect = require('pull-ws/client')
const connect = require('it-ws/client')
const mafmt = require('mafmt')
const withIs = require('class-is')
const Connection = require('interface-connection').Connection
const toUri = require('multiaddr-to-uri')
const debug = require('debug')
const log = debug('libp2p:websockets:dialer')
const { AbortError } = require('abortable-iterator')
const log = require('debug')('libp2p:websockets')
const assert = require('assert')
const createListener = require('./listener')
const toConnection = require('./socket-to-conn')
const { CODE_CIRCUIT, CODE_P2P } = require('./constants')
/**
* @class WebSockets
*/
class WebSockets {
dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}
/**
* @constructor
* @param {object} options
* @param {Upgrader} options.upgrader
*/
constructor ({ upgrader }) {
assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.')
this._upgrader = upgrader
}
callback = callback || function () { }
/**
* @async
* @param {Multiaddr} ma
* @param {object} [options]
* @param {AbortSignal} [options.signal] Used to abort dial requests
* @returns {Connection} An upgraded Connection
*/
async dial (ma, options = {}) {
log('dialing %s', ma)
const url = toUri(ma)
log('dialing %s', url)
const socket = connect(url, {
binary: true,
onConnect: (err) => {
callback(err)
}
})
const conn = new Connection(socket)
conn.getObservedAddrs = (cb) => cb(null, [ma])
conn.close = (cb) => socket.close(cb)
const socket = await this._connect(ma, options)
const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal })
log('new outbound connection %s', maConn.remoteAddr)
const conn = await this._upgrader.upgradeOutbound(maConn)
log('outbound connection %s upgraded', maConn.remoteAddr)
return conn
}
createListener (options, handler) {
/**
* @private
* @param {Multiaddr} ma
* @param {object} [options]
* @param {AbortSignal} [options.signal] Used to abort dial requests
* @returns {Promise<WebSocket>} Resolves a extended duplex iterable on top of a WebSocket
*/
async _connect (ma, options = {}) {
if (options.signal && options.signal.aborted) {
throw new AbortError()
}
const cOpts = ma.toOptions()
log('dialing %s:%s', cOpts.host, cOpts.port)
const rawSocket = connect(toUri(ma), Object.assign({ binary: true }, options))
if (!options.signal) {
await rawSocket.connected()
log('connected %s', ma)
return rawSocket
}
// Allow abort via signal during connect
let onAbort
const abort = new Promise((resolve, reject) => {
onAbort = () => {
reject(new AbortError())
rawSocket.close()
}
// Already aborted?
if (options.signal.aborted) return onAbort()
options.signal.addEventListener('abort', onAbort)
})
try {
await Promise.race([abort, rawSocket.connected()])
} finally {
options.signal.removeEventListener('abort', onAbort)
}
log('connected %s', ma)
return rawSocket
}
/**
* Creates a Websockets listener. The provided `handler` function will be called
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
* @param {object} [options]
* @param {http.Server} [options.server] A pre-created Node.js HTTP/S server.
* @param {function (Connection)} handler
* @returns {Listener} A Websockets listener
*/
createListener (options = {}, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}
return createListener(options, handler)
return createListener({ handler, upgrader: this._upgrader }, options)
}
/**
* Takes a list of `Multiaddr`s and returns only valid Websockets addresses
* @param {Multiaddr[]} multiaddrs
* @returns {Multiaddr[]} Valid Websockets multiaddrs
*/
filter (multiaddrs) {
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]
return multiaddrs.filter((ma) => {
if (ma.protoNames().includes('p2p-circuit')) {
if (ma.protoCodes().includes(CODE_CIRCUIT)) {
return false
}
if (ma.protoNames().includes('ipfs')) {
ma = ma.decapsulate('ipfs')
}
return mafmt.WebSockets.matches(ma) ||
mafmt.WebSocketsSecure.matches(ma)
return mafmt.WebSockets.matches(ma.decapsulateCode(CODE_P2P)) ||
mafmt.WebSocketsSecure.matches(ma.decapsulateCode(CODE_P2P))
})
}
}
module.exports = withIs(WebSockets, { className: 'WebSockets', symbolName: '@libp2p/js-libp2p-websockets/websockets' })
module.exports = withIs(WebSockets, {
className: 'WebSockets',
symbolName: '@libp2p/js-libp2p-websockets/websockets'
})

View File

@ -1,43 +1,58 @@
'use strict'
const Connection = require('interface-connection').Connection
const multiaddr = require('multiaddr')
const EventEmitter = require('events')
const os = require('os')
const multiaddr = require('multiaddr')
const { createServer } = require('it-ws')
function noop () {}
const log = require('debug')('libp2p:websockets:listener')
const createServer = require('pull-ws/server') || noop
const toConnection = require('./socket-to-conn')
module.exports = (options, handler) => {
const listener = createServer(options, (socket) => {
socket.getObservedAddrs = (callback) => {
// TODO research if we can reuse the address in anyway
return callback(null, [])
}
module.exports = ({ handler, upgrader }, options = {}) => {
const listener = new EventEmitter()
handler(new Connection(socket))
const server = createServer(options, async (stream) => {
const maConn = toConnection(stream)
log('new inbound connection %s', maConn.remoteAddr)
const conn = await upgrader.upgradeInbound(maConn)
log('inbound connection %s upgraded', maConn.remoteAddr)
trackConn(server, maConn)
if (handler) handler(conn)
listener.emit('connection', conn)
})
server
.on('listening', () => listener.emit('listening'))
.on('error', err => listener.emit('error', err))
.on('close', () => listener.emit('close'))
// Keep track of open connections to destroy in case of timeout
server.__connections = []
let listeningMultiaddr
listener._listen = listener.listen
listener.listen = (ma, callback) => {
callback = callback || noop
listeningMultiaddr = ma
if (ma.protoNames().includes('ipfs')) {
ma = ma.decapsulate('ipfs')
}
listener._listen(ma.toOptions(), callback)
listener.close = () => {
server.__connections.forEach(maConn => maConn.close())
return server.close()
}
listener.getAddrs = (callback) => {
listener.listen = (ma) => {
listeningMultiaddr = ma
return server.listen(ma.toOptions())
}
listener.getAddrs = () => {
const multiaddrs = []
const address = listener.address()
const address = server.address()
if (!address) {
return callback(new Error('Listener is not ready yet'))
throw new Error('Listener is not ready yet')
}
const ipfsId = listeningMultiaddr.getPeerId()
@ -48,7 +63,7 @@ module.exports = (options, handler) => {
let m = listeningMultiaddr.decapsulate('tcp')
m = m.encapsulate('/tcp/' + address.port + '/ws')
if (listeningMultiaddr.getPeerId()) {
m = m.encapsulate('/ipfs/' + ipfsId)
m = m.encapsulate('/p2p/' + ipfsId)
}
if (m.toString().indexOf('0.0.0.0') !== -1) {
@ -65,8 +80,12 @@ module.exports = (options, handler) => {
}
}
callback(null, multiaddrs)
return multiaddrs
}
return listener
}
function trackConn (server, maConn) {
server.__connections.push(maConn)
}

61
src/socket-to-conn.js Normal file
View File

@ -0,0 +1,61 @@
'use strict'
const abortable = require('abortable-iterator')
const { CLOSE_TIMEOUT } = require('./constants')
const toMultiaddr = require('libp2p-utils/src/ip-port-to-multiaddr')
const pTimeout = require('p-timeout')
const debug = require('debug')
const log = debug('libp2p:websockets:socket')
log.error = debug('libp2p:websockets:socket:error')
// Convert a stream into a MultiaddrConnection
// https://github.com/libp2p/interface-transport#multiaddrconnection
module.exports = (socket, options = {}) => {
const maConn = {
async sink (source) {
if (options.signal) {
source = abortable(source, options.signal)
}
try {
await socket.sink(source)
} catch (err) {
if (err.type !== 'aborted') {
log.error(err)
}
}
},
source: options.signal ? abortable(socket.source, options.signal) : socket.source,
conn: socket,
localAddr: options.localAddr || (socket.localAddress && socket.localPort
? toMultiaddr(socket.localAddress, socket.localPort) : undefined),
// If the remote address was passed, use it - it may have the peer ID encapsulated
remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress, socket.remotePort),
timeline: { open: Date.now() },
async close () {
const start = Date.now()
try {
await pTimeout(socket.close(), CLOSE_TIMEOUT)
} catch (err) {
const { host, port } = maConn.remoteAddr.toOptions()
log('timeout closing socket to %s:%s after %dms, destroying it manually',
host, port, Date.now() - start)
socket.destroy()
} finally {
maConn.timeline.close = Date.now()
}
}
}
return maConn
}