From ce7bf4f1e081a8e3795f2a523273bdea0cf7b1e0 Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Mon, 30 Sep 2019 12:14:28 +0200 Subject: [PATCH] refactor: async with multiaddr conn (#92) BREAKING CHANGE: Switch to using async/await and async iterators. The transport and connection interfaces have changed. See the README for new usage. --- .aegir.js | 16 +- .gitignore | 45 +--- .travis.yml | 2 +- README.md | 36 ++- ci/Jenkinsfile | 2 - package.json | 27 ++- src/constants.js | 8 + src/index.js | 139 ++++++++--- src/listener.js | 71 ++++-- src/socket-to-conn.js | 61 +++++ test/browser.js | 79 +++--- test/compliance.node.js | 55 ++++- test/fixtures/certificate.pem | 13 + test/fixtures/key.pem | 15 ++ test/node.js | 436 +++++++++++++--------------------- 15 files changed, 530 insertions(+), 475 deletions(-) delete mode 100644 ci/Jenkinsfile create mode 100644 src/constants.js create mode 100644 src/socket-to-conn.js create mode 100644 test/fixtures/certificate.pem create mode 100644 test/fixtures/key.pem diff --git a/.aegir.js b/.aegir.js index f43cfb2..4d2f8bd 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,21 +1,25 @@ 'use strict' const multiaddr = require('multiaddr') -const pull = require('pull-stream') - +const pipe = require('it-pipe') const WS = require('./src') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} let listener function boot (done) { - const ws = new WS() + const ws = new WS({ upgrader: mockUpgrader }) const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') - listener = ws.createListener((conn) => pull(conn, conn)) - listener.listen(ma, done) + listener = ws.createListener(conn => pipe(conn, conn)) + listener.listen(ma).then(() => done()).catch(done) + listener.on('error', console.error) } function shutdown (done) { - listener.close(done) + listener.close().then(done).catch(done) } module.exports = { diff --git a/.gitignore b/.gitignore index c2b6311..9faa8e0 100644 --- a/.gitignore +++ b/.gitignore @@ -1,43 +1,4 @@ -docs -package-lock.json -yarn.lock - -# Logs -logs -*.log -npm-debug.log* - -# Runtime data -pids -*.pid -*.seed - -# Directory for instrumented libs generated by jscoverage/JSCover -lib-cov - -# Coverage directory used by tools like istanbul -coverage - -# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) -.grunt - -# node-waf configuration -.lock-wscript - -# Compiled binary addons (http://nodejs.org/api/addons.html) -build/Release - -# Dependency directory node_modules - -# Optional npm cache directory -.npm - -# Optional REPL history -.node_repl_history - -# Vim editor swap files -*.swp - -dist - +package-lock.json +coverage +.nyc_output diff --git a/.travis.yml b/.travis.yml index dba6e9d..ff17c4c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,7 +24,7 @@ jobs: - stage: check script: - - npx aegir commitlint --travis + - npx aegir build --bundlesize - npx aegir dep-check -- -i wrtc -i electron-webrtc - npm run lint diff --git a/README.md b/README.md index 57f4969..b99fd8a 100644 --- a/README.md +++ b/README.md @@ -40,37 +40,33 @@ ```js const WS = require('libp2p-websockets') const multiaddr = require('multiaddr') -const pull = require('pull-stream') +const pipe = require('it-pipe') +const { collect } = require('streaming-iterables') -const mh = multiaddr('/ip4/0.0.0.0/tcp/9090/ws') +const addr = multiaddr('/ip4/0.0.0.0/tcp/9090/ws') -const ws = new WS() +const ws = new WS({ upgrader }) const listener = ws.createListener((socket) => { console.log('new connection opened') - pull( - pull.values(['hello']), + pipe( + ['hello'], socket ) }) -listener.listen(mh, () => { - console.log('listening') +await listener.listen(addr) +console.log('listening') - pull( - ws.dial(mh), - pull.collect((err, values) => { - if (!err) { - console.log(`Value: ${values.toString()}`) - } else { - console.log(`Error: ${err}`) - } +const socket = await ws.dial(addr) +const values = await pipe( + socket, + collect +) +console.log(`Value: ${values.toString()}`) - // Close connection after reading - listener.close() - }), - ) -}) +// Close connection after reading +await listener.close() ``` ## API diff --git a/ci/Jenkinsfile b/ci/Jenkinsfile deleted file mode 100644 index a7da2e5..0000000 --- a/ci/Jenkinsfile +++ /dev/null @@ -1,2 +0,0 @@ -// Warning: This file is automatically synced from https://github.com/ipfs/ci-sync so if you want to change it, please change it there and ask someone to sync all repositories. -javascript() diff --git a/package.json b/package.json index 6a32f17..d795309 100644 --- a/package.json +++ b/package.json @@ -13,8 +13,7 @@ "release": "aegir release -t node -t browser ", "release-minor": "aegir release --type minor -t node -t browser", "release-major": "aegir release --type major -t node -t browser", - "coverage": "aegir coverage", - "coverage-publish": "aegir coverage --provider coveralls" + "coverage": "nyc --reporter=lcov --reporter=text npm run test:node" }, "browser": { "src/listener": "./src/listener.browser.js" @@ -24,8 +23,7 @@ "dist" ], "pre-push": [ - "lint", - "test" + "lint" ], "repository": { "type": "git", @@ -40,21 +38,26 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { + "abortable-iterator": "^2.1.0", "class-is": "^1.1.0", "debug": "^4.1.1", - "interface-connection": "~0.3.3", - "mafmt": "^6.0.7", + "err-code": "^2.0.0", + "it-ws": "vasco-santos/it-ws#v2.1.1-rc.0", + "libp2p-utils": "~0.1.0", + "mafmt": "^7.0.0", + "multiaddr": "^7.1.0", "multiaddr-to-uri": "^5.0.0", - "pull-ws": "hugomrdias/pull-ws#fix/bundle-size" + "p-timeout": "^3.2.0" }, "devDependencies": { - "aegir": "^20.0.0", + "abort-controller": "^3.0.0", + "aegir": "^20.3.1", "chai": "^4.2.0", "dirty-chai": "^2.0.1", - "interface-transport": "~0.3.7", - "multiaddr": "^6.0.6", - "pull-goodbye": "0.0.2", - "pull-stream": "^3.6.9" + "interface-transport": "^0.7.0", + "it-goodbye": "^2.0.1", + "it-pipe": "^1.0.1", + "streaming-iterables": "^4.1.0" }, "contributors": [ "Chris Campbell ", diff --git a/src/constants.js b/src/constants.js new file mode 100644 index 0000000..b7ab8fe --- /dev/null +++ b/src/constants.js @@ -0,0 +1,8 @@ +'use strict' + +// p2p multi-address code +exports.CODE_P2P = 421 +exports.CODE_CIRCUIT = 290 + +// Time to wait for a connection to close gracefully before destroying it manually +exports.CLOSE_TIMEOUT = 2000 diff --git a/src/index.js b/src/index.js index b3053b0..e152c14 100644 --- a/src/index.js +++ b/src/index.js @@ -1,68 +1,135 @@ 'use strict' -const connect = require('pull-ws/client') +const connect = require('it-ws/client') const mafmt = require('mafmt') const withIs = require('class-is') -const Connection = require('interface-connection').Connection - const toUri = require('multiaddr-to-uri') -const debug = require('debug') -const log = debug('libp2p:websockets:dialer') +const { AbortError } = require('abortable-iterator') + +const log = require('debug')('libp2p:websockets') +const assert = require('assert') const createListener = require('./listener') +const toConnection = require('./socket-to-conn') +const { CODE_CIRCUIT, CODE_P2P } = require('./constants') +/** + * @class WebSockets + */ class WebSockets { - dial (ma, options, callback) { - if (typeof options === 'function') { - callback = options - options = {} - } + /** + * @constructor + * @param {object} options + * @param {Upgrader} options.upgrader + */ + constructor ({ upgrader }) { + assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.') + this._upgrader = upgrader + } - callback = callback || function () { } + /** + * @async + * @param {Multiaddr} ma + * @param {object} [options] + * @param {AbortSignal} [options.signal] Used to abort dial requests + * @returns {Connection} An upgraded Connection + */ + async dial (ma, options = {}) { + log('dialing %s', ma) - const url = toUri(ma) - log('dialing %s', url) - const socket = connect(url, { - binary: true, - onConnect: (err) => { - callback(err) - } - }) - - const conn = new Connection(socket) - conn.getObservedAddrs = (cb) => cb(null, [ma]) - conn.close = (cb) => socket.close(cb) + const socket = await this._connect(ma, options) + const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal }) + log('new outbound connection %s', maConn.remoteAddr) + const conn = await this._upgrader.upgradeOutbound(maConn) + log('outbound connection %s upgraded', maConn.remoteAddr) return conn } - createListener (options, handler) { + /** + * @private + * @param {Multiaddr} ma + * @param {object} [options] + * @param {AbortSignal} [options.signal] Used to abort dial requests + * @returns {Promise} Resolves a extended duplex iterable on top of a WebSocket + */ + async _connect (ma, options = {}) { + if (options.signal && options.signal.aborted) { + throw new AbortError() + } + const cOpts = ma.toOptions() + log('dialing %s:%s', cOpts.host, cOpts.port) + + const rawSocket = connect(toUri(ma), Object.assign({ binary: true }, options)) + + if (!options.signal) { + await rawSocket.connected() + + log('connected %s', ma) + return rawSocket + } + + // Allow abort via signal during connect + let onAbort + const abort = new Promise((resolve, reject) => { + onAbort = () => { + reject(new AbortError()) + rawSocket.close() + } + + // Already aborted? + if (options.signal.aborted) return onAbort() + options.signal.addEventListener('abort', onAbort) + }) + + try { + await Promise.race([abort, rawSocket.connected()]) + } finally { + options.signal.removeEventListener('abort', onAbort) + } + + log('connected %s', ma) + return rawSocket + } + + /** + * Creates a Websockets listener. The provided `handler` function will be called + * anytime a new incoming Connection has been successfully upgraded via + * `upgrader.upgradeInbound`. + * @param {object} [options] + * @param {http.Server} [options.server] A pre-created Node.js HTTP/S server. + * @param {function (Connection)} handler + * @returns {Listener} A Websockets listener + */ + createListener (options = {}, handler) { if (typeof options === 'function') { handler = options options = {} } - return createListener(options, handler) + return createListener({ handler, upgrader: this._upgrader }, options) } + /** + * Takes a list of `Multiaddr`s and returns only valid Websockets addresses + * @param {Multiaddr[]} multiaddrs + * @returns {Multiaddr[]} Valid Websockets multiaddrs + */ filter (multiaddrs) { - if (!Array.isArray(multiaddrs)) { - multiaddrs = [multiaddrs] - } + multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] return multiaddrs.filter((ma) => { - if (ma.protoNames().includes('p2p-circuit')) { + if (ma.protoCodes().includes(CODE_CIRCUIT)) { return false } - if (ma.protoNames().includes('ipfs')) { - ma = ma.decapsulate('ipfs') - } - - return mafmt.WebSockets.matches(ma) || - mafmt.WebSocketsSecure.matches(ma) + return mafmt.WebSockets.matches(ma.decapsulateCode(CODE_P2P)) || + mafmt.WebSocketsSecure.matches(ma.decapsulateCode(CODE_P2P)) }) } } -module.exports = withIs(WebSockets, { className: 'WebSockets', symbolName: '@libp2p/js-libp2p-websockets/websockets' }) +module.exports = withIs(WebSockets, { + className: 'WebSockets', + symbolName: '@libp2p/js-libp2p-websockets/websockets' +}) diff --git a/src/listener.js b/src/listener.js index 7c5538b..1104a8b 100644 --- a/src/listener.js +++ b/src/listener.js @@ -1,43 +1,58 @@ 'use strict' -const Connection = require('interface-connection').Connection -const multiaddr = require('multiaddr') +const EventEmitter = require('events') const os = require('os') +const multiaddr = require('multiaddr') +const { createServer } = require('it-ws') -function noop () {} +const log = require('debug')('libp2p:websockets:listener') -const createServer = require('pull-ws/server') || noop +const toConnection = require('./socket-to-conn') -module.exports = (options, handler) => { - const listener = createServer(options, (socket) => { - socket.getObservedAddrs = (callback) => { - // TODO research if we can reuse the address in anyway - return callback(null, []) - } +module.exports = ({ handler, upgrader }, options = {}) => { + const listener = new EventEmitter() - handler(new Connection(socket)) + const server = createServer(options, async (stream) => { + const maConn = toConnection(stream) + + log('new inbound connection %s', maConn.remoteAddr) + + const conn = await upgrader.upgradeInbound(maConn) + log('inbound connection %s upgraded', maConn.remoteAddr) + + trackConn(server, maConn) + + if (handler) handler(conn) + listener.emit('connection', conn) }) + server + .on('listening', () => listener.emit('listening')) + .on('error', err => listener.emit('error', err)) + .on('close', () => listener.emit('close')) + + // Keep track of open connections to destroy in case of timeout + server.__connections = [] + let listeningMultiaddr - listener._listen = listener.listen - listener.listen = (ma, callback) => { - callback = callback || noop - listeningMultiaddr = ma - - if (ma.protoNames().includes('ipfs')) { - ma = ma.decapsulate('ipfs') - } - - listener._listen(ma.toOptions(), callback) + listener.close = () => { + server.__connections.forEach(maConn => maConn.close()) + return server.close() } - listener.getAddrs = (callback) => { + listener.listen = (ma) => { + listeningMultiaddr = ma + + return server.listen(ma.toOptions()) + } + + listener.getAddrs = () => { const multiaddrs = [] - const address = listener.address() + const address = server.address() if (!address) { - return callback(new Error('Listener is not ready yet')) + throw new Error('Listener is not ready yet') } const ipfsId = listeningMultiaddr.getPeerId() @@ -48,7 +63,7 @@ module.exports = (options, handler) => { let m = listeningMultiaddr.decapsulate('tcp') m = m.encapsulate('/tcp/' + address.port + '/ws') if (listeningMultiaddr.getPeerId()) { - m = m.encapsulate('/ipfs/' + ipfsId) + m = m.encapsulate('/p2p/' + ipfsId) } if (m.toString().indexOf('0.0.0.0') !== -1) { @@ -65,8 +80,12 @@ module.exports = (options, handler) => { } } - callback(null, multiaddrs) + return multiaddrs } return listener } + +function trackConn (server, maConn) { + server.__connections.push(maConn) +} diff --git a/src/socket-to-conn.js b/src/socket-to-conn.js new file mode 100644 index 0000000..d819995 --- /dev/null +++ b/src/socket-to-conn.js @@ -0,0 +1,61 @@ +'use strict' + +const abortable = require('abortable-iterator') +const { CLOSE_TIMEOUT } = require('./constants') +const toMultiaddr = require('libp2p-utils/src/ip-port-to-multiaddr') + +const pTimeout = require('p-timeout') + +const debug = require('debug') +const log = debug('libp2p:websockets:socket') +log.error = debug('libp2p:websockets:socket:error') + +// Convert a stream into a MultiaddrConnection +// https://github.com/libp2p/interface-transport#multiaddrconnection +module.exports = (socket, options = {}) => { + const maConn = { + async sink (source) { + if (options.signal) { + source = abortable(source, options.signal) + } + + try { + await socket.sink(source) + } catch (err) { + if (err.type !== 'aborted') { + log.error(err) + } + } + }, + + source: options.signal ? abortable(socket.source, options.signal) : socket.source, + + conn: socket, + + localAddr: options.localAddr || (socket.localAddress && socket.localPort + ? toMultiaddr(socket.localAddress, socket.localPort) : undefined), + + // If the remote address was passed, use it - it may have the peer ID encapsulated + remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress, socket.remotePort), + + timeline: { open: Date.now() }, + + async close () { + const start = Date.now() + + try { + await pTimeout(socket.close(), CLOSE_TIMEOUT) + } catch (err) { + const { host, port } = maConn.remoteAddr.toOptions() + log('timeout closing socket to %s:%s after %dms, destroying it manually', + host, port, Date.now() - start) + + socket.destroy() + } finally { + maConn.timeline.close = Date.now() + } + } + } + + return maConn +} diff --git a/test/browser.js b/test/browser.js index bc4e9db..b3492a6 100644 --- a/test/browser.js +++ b/test/browser.js @@ -7,75 +7,64 @@ const expect = chai.expect chai.use(dirtyChai) const multiaddr = require('multiaddr') -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') +const pipe = require('it-pipe') +const goodbye = require('it-goodbye') +const { collect, take } = require('streaming-iterables') const WS = require('../src') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} + describe('libp2p-websockets', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9095/ws') let ws let conn - beforeEach((done) => { - ws = new WS() - expect(ws).to.exist() - conn = ws.dial(ma, (err, res) => { - expect(err).to.not.exist() - done() - }) + beforeEach(async () => { + ws = new WS({ upgrader: mockUpgrader }) + conn = await ws.dial(ma) }) - it('echo', (done) => { - const message = 'Hello World!' + it('echo', async () => { + const message = Buffer.from('Hello World!') + const s = goodbye({ source: [message], sink: collect }) - const s = goodbye({ - source: pull.values([message]), - sink: pull.collect((err, results) => { - expect(err).to.not.exist() - expect(results).to.eql([message]) - done() - }) - }) - - pull(s, conn, s) + const results = await pipe(s, conn, s) + expect(results).to.eql([message]) }) describe('stress', () => { - it('one big write', (done) => { + it('one big write', async () => { const rawMessage = Buffer.allocUnsafe(1000000).fill('a') - const s = goodbye({ - source: pull.values([rawMessage]), - sink: pull.collect((err, results) => { - expect(err).to.not.exist() - expect(results).to.eql([rawMessage]) - done() - }) - }) - pull(s, conn, s) + const s = goodbye({ source: [rawMessage], sink: collect }) + + const results = await pipe(s, conn, s) + expect(results).to.eql([rawMessage]) }) - it('many writes', function (done) { + it('many writes', async function () { this.timeout(10000) const s = goodbye({ - source: pull( - pull.infinite(), - pull.take(1000), - pull.map((val) => Buffer.from(val.toString())) + source: pipe( + { + [Symbol.iterator] () { return this }, + next: () => ({ done: false, value: Buffer.from(Math.random().toString()) }) + }, + take(20000) ), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - expect(result).to.have.length(1000) - done() - }) + sink: collect }) - pull(s, conn, s) + const result = await pipe(s, conn, s) + expect(result).to.have.length(20000) }) }) -}) -it('.createServer throws in browser', () => { - expect(new WS().createListener).to.throw() + it('.createServer throws in browser', () => { + expect(new WS({ upgrader: mockUpgrader }).createListener).to.throw() + }) }) diff --git a/test/compliance.node.js b/test/compliance.node.js index a4977d9..591026c 100644 --- a/test/compliance.node.js +++ b/test/compliance.node.js @@ -3,22 +3,61 @@ const tests = require('interface-transport') const multiaddr = require('multiaddr') +const http = require('http') const WS = require('../src') -describe('compliance', () => { +describe('interface-transport compliance', () => { tests({ - setup (callback) { - const ws = new WS() + async setup ({ upgrader }) { // eslint-disable-line require-await + const ws = new WS({ upgrader }) const addrs = [ multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), - multiaddr('/ip4/127.0.0.1/tcp/9092/wss'), + multiaddr('/ip4/127.0.0.1/tcp/9092/ws'), multiaddr('/dns4/ipfs.io/tcp/9092/ws'), multiaddr('/dns4/ipfs.io/tcp/9092/wss') ] - callback(null, ws, addrs) + + let delayMs = 0 + const delayedCreateListener = (options, handler) => { + if (typeof options === 'function') { + handler = options + options = {} + } + + options = options || {} + + // A server that will delay the upgrade event by delayMs + options.server = new Proxy(http.createServer(), { + get (server, prop) { + if (prop === 'on') { + return (event, handler) => { + server.on(event, (...args) => { + if (event !== 'upgrade' || !delayMs) { + return handler(...args) + } + setTimeout(() => handler(...args), delayMs) + }) + } + } + return server[prop] + } + }) + + return ws.createListener(options, handler) + } + + const wsProxy = new Proxy(ws, { + get: (_, prop) => prop === 'createListener' ? delayedCreateListener : ws[prop] + }) + + // Used by the dial tests to simulate a delayed connect + const connector = { + delay (ms) { delayMs = ms }, + restore () { delayMs = 0 } + } + + return { transport: wsProxy, addrs, connector } }, - teardown (callback) { - callback() - } + async teardown () {} }) }) diff --git a/test/fixtures/certificate.pem b/test/fixtures/certificate.pem new file mode 100644 index 0000000..840776c --- /dev/null +++ b/test/fixtures/certificate.pem @@ -0,0 +1,13 @@ +-----BEGIN CERTIFICATE----- +MIICATCCAWoCCQDPufXH86n2QzANBgkqhkiG9w0BAQUFADBFMQswCQYDVQQGEwJu +bzETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0 +cyBQdHkgTHRkMB4XDTEyMDEwMTE0NDQwMFoXDTIwMDMxOTE0NDQwMFowRTELMAkG +A1UEBhMCbm8xEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoMGEludGVybmV0 +IFdpZGdpdHMgUHR5IEx0ZDCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEAtrQ7 ++r//2iV/B6F+4boH0XqFn7alcV9lpjvAmwRXNKnxAoa0f97AjYPGNLKrjpkNXXhB +JROIdbRbZnCNeC5fzX1a+JCo7KStzBXuGSZr27TtFmcV4H+9gIRIcNHtZmJLnxbJ +sIhkGR8yVYdmJZe4eT5ldk1zoB1adgPF1hZhCBMCAwEAATANBgkqhkiG9w0BAQUF +AAOBgQCeWBEHYJ4mCB5McwSSUox0T+/mJ4W48L/ZUE4LtRhHasU9hiW92xZkTa7E +QLcoJKQiWfiLX2ysAro0NX4+V8iqLziMqvswnPzz5nezaOLE/9U/QvH3l8qqNkXu +rNbsW1h/IO6FV8avWFYVFoutUwOaZ809k7iMh2F2JMgXQ5EymQ== +-----END CERTIFICATE----- \ No newline at end of file diff --git a/test/fixtures/key.pem b/test/fixtures/key.pem new file mode 100644 index 0000000..3649a93 --- /dev/null +++ b/test/fixtures/key.pem @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQC2tDv6v//aJX8HoX7hugfReoWftqVxX2WmO8CbBFc0qfEChrR/ +3sCNg8Y0squOmQ1deEElE4h1tFtmcI14Ll/NfVr4kKjspK3MFe4ZJmvbtO0WZxXg +f72AhEhw0e1mYkufFsmwiGQZHzJVh2Yll7h5PmV2TXOgHVp2A8XWFmEIEwIDAQAB +AoGAAlVY8sHi/aE+9xT77twWX3mGHV0SzdjfDnly40fx6S1Gc7bOtVdd9DC7pk6l +3ENeJVR02IlgU8iC5lMHq4JEHPE272jtPrLlrpWLTGmHEqoVFv9AITPqUDLhB9Kk +Hjl7h8NYBKbr2JHKICr3DIPKOT+RnXVb1PD4EORbJ3ooYmkCQQDfknUnVxPgxUGs +ouABw1WJIOVgcCY/IFt4Ihf6VWTsxBgzTJKxn3HtgvE0oqTH7V480XoH0QxHhjLq +DrgobWU9AkEA0TRJ8/ouXGnFEPAXjWr9GdPQRZ1Use2MrFjneH2+Sxc0CmYtwwqL +Kr5kS6mqJrxprJeluSjBd+3/ElxURrEXjwJAUvmlN1OPEhXDmRHd92mKnlkyKEeX +OkiFCiIFKih1S5Y/sRJTQ0781nyJjtJqO7UyC3pnQu1oFEePL+UEniRztQJAMfav +AtnpYKDSM+1jcp7uu9BemYGtzKDTTAYfoiNF42EzSJiGrWJDQn4eLgPjY0T0aAf/ +yGz3Z9ErbhMm/Ysl+QJBAL4kBxRT8gM4ByJw4sdOvSeCCANFq8fhbgm8pGWlCPb5 +JGmX3/GHFM8x2tbWMGpyZP1DLtiNEFz7eCGktWK5rqE= +-----END RSA PRIVATE KEY----- \ No newline at end of file diff --git a/test/node.js b/test/node.js index f6ba702..ec80930 100644 --- a/test/node.js +++ b/test/node.js @@ -2,21 +2,30 @@ /* eslint max-nested-callbacks: ["error", 6] */ 'use strict' +const https = require('https') +const fs = require('fs') + const chai = require('chai') const dirtyChai = require('dirty-chai') const expect = chai.expect chai.use(dirtyChai) const multiaddr = require('multiaddr') -const pull = require('pull-stream') -const goodbye = require('pull-goodbye') +const goodbye = require('it-goodbye') +const { collect } = require('streaming-iterables') +const pipe = require('it-pipe') const WS = require('../src') require('./compliance.node') +const mockUpgrader = { + upgradeInbound: maConn => maConn, + upgradeOutbound: maConn => maConn +} + describe('instantiate the transport', () => { it('create', () => { - const ws = new WS() + const ws = new WS({ upgrader: mockUpgrader }) expect(ws).to.exist() }) }) @@ -27,22 +36,21 @@ describe('listen', () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') beforeEach(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) }) - it('listen, check for callback', (done) => { + it('listen, check for promise', async () => { const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) it('listen, check for listening event', (done) => { const listener = ws.createListener((conn) => { }) - listener.on('listening', () => { - listener.close(done) + listener.on('listening', async () => { + await listener.close() + done() }) listener.listen(ma) @@ -59,14 +67,12 @@ describe('listen', () => { listener.listen(ma) }) - it('listen on addr with /ipfs/QmHASH', (done) => { + it('listen on addr with /ipfs/QmHASH', async () => { const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = ws.createListener((conn) => { }) - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) it.skip('close listener with connections, through timeout', (done) => { @@ -82,73 +88,53 @@ describe('listen', () => { // TODO 0.0.0.0 not supported yet }) - it('getAddrs', (done) => { - const listener = ws.createListener((conn) => { - }) - listener.listen(ma, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) + it('getAddrs', async () => { + const listener = ws.createListener((conn) => { }) + await listener.listen(ma) + const addrs = await listener.getAddrs() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + await listener.close() }) - it('getAddrs on port 0 listen', (done) => { - const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') - listener.close(done) - }) - }) + it('getAddrs on port 0 listen', async () => { + const addr = multiaddr('/ip4/127.0.0.1/tcp/0/ws') + const listener = ws.createListener((conn) => { }) + await listener.listen(addr) + const addrs = await listener.getAddrs() + expect(addrs.length).to.equal(1) + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + await listener.close() }) - it('getAddrs from listening on 0.0.0.0', (done) => { - const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') - listener.close(done) - }) - }) + it('getAddrs from listening on 0.0.0.0', async () => { + const addr = multiaddr('/ip4/0.0.0.0/tcp/9003/ws') + const listener = ws.createListener((conn) => { }) + await listener.listen(addr) + const addrs = await listener.getAddrs() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + await listener.close() }) - it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { - const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`) - const listener = ws.createListener((conn) => { - }) - listener.listen(addr, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') - expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') - listener.close(done) - }) - }) + it('getAddrs from listening on 0.0.0.0 and port 0', async () => { + const addr = multiaddr('/ip4/0.0.0.0/tcp/0/ws') + const listener = ws.createListener((conn) => { }) + await listener.listen(addr) + const addrs = await listener.getAddrs() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + await listener.close() }) - it('getAddrs preserves IPFS Id', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - + it('getAddrs preserves p2p Id', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = ws.createListener((conn) => { }) - listener.listen(ma, () => { - listener.getAddrs((err, addrs) => { - expect(err).to.not.exist() - expect(addrs.length).to.equal(1) - expect(addrs[0]).to.deep.equal(ma) - listener.close(done) - }) - }) + await listener.listen(ma) + const addrs = await listener.getAddrs() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + await listener.close() }) }) @@ -157,22 +143,21 @@ describe('listen', () => { const ma = multiaddr('/ip6/::1/tcp/9091/ws') beforeEach(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) }) - it('listen, check for callback', (done) => { + it('listen, check for promise', async () => { const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) it('listen, check for listening event', (done) => { const listener = ws.createListener((conn) => { }) - listener.on('listening', () => { - listener.close(done) + listener.on('listening', async () => { + await listener.close() + done() }) listener.listen(ma) @@ -189,14 +174,11 @@ describe('listen', () => { listener.listen(ma) }) - it('listen on addr with /ipfs/QmHASH', (done) => { + it('listen on addr with /ipfs/QmHASH', async () => { const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const listener = ws.createListener((conn) => { }) - - listener.listen(ma, () => { - listener.close(done) - }) + await listener.listen(ma) + await listener.close() }) }) }) @@ -207,49 +189,86 @@ describe('dial', () => { let listener const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') - beforeEach((done) => { - ws = new WS() - listener = ws.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(ma, done) + beforeEach(() => { + ws = new WS({ upgrader: mockUpgrader }) + listener = ws.createListener(conn => pipe(conn, conn)) + return listener.listen(ma) }) - afterEach((done) => { - listener.close(done) + afterEach(() => listener.close()) + + it('dial', async () => { + const conn = await ws.dial(ma) + const s = goodbye({ source: ['hey'], sink: collect }) + + const result = await pipe(s, conn, s) + + expect(result).to.be.eql([Buffer.from('hey')]) }) - it('dial', (done) => { - const conn = ws.dial(ma) + it('dial with p2p Id', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const conn = await ws.dial(ma) + const s = goodbye({ source: ['hey'], sink: collect }) - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() + const result = await pipe(s, conn, s) - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) + expect(result).to.be.eql([Buffer.from('hey')]) }) - it('dial with IPFS Id', (done) => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = ws.dial(ma) + it('should resolve port 0', async () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws') + const ws = new WS({ upgrader: mockUpgrader }) - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() + // Create a Promise that resolves when a connection is handled + let handled + const handlerPromise = new Promise(resolve => { handled = resolve }) + const handler = conn => handled(conn) - expect(result).to.be.eql(['hey']) - done() - }) - }) + const listener = ws.createListener(handler) - pull(s, conn, s) + // Listen on the multiaddr + await listener.listen(ma) + + const localAddrs = listener.getAddrs() + expect(localAddrs.length).to.equal(1) + + // Dial to that address + await ws.dial(localAddrs[0]) + + // Wait for the incoming dial to be handled + await handlerPromise + + // close the listener + await listener.close() + }) + }) + + describe('ip4 with wss', () => { + let ws + let listener + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/wss') + + const server = https.createServer({ + cert: fs.readFileSync('./test/fixtures/certificate.pem'), + key: fs.readFileSync('./test/fixtures/key.pem') + }) + + beforeEach(() => { + ws = new WS({ upgrader: mockUpgrader }) + listener = ws.createListener({ server }, conn => pipe(conn, conn)) + return listener.listen(ma) + }) + + afterEach(() => listener.close()) + + it('dial', async () => { + const conn = await ws.dial(ma, { websocket: { rejectUnauthorized: false } }) + const s = goodbye({ source: ['hey'], sink: collect }) + + const result = await pipe(s, conn, s) + + expect(result).to.be.eql([Buffer.from('hey')]) }) }) @@ -258,49 +277,34 @@ describe('dial', () => { let listener const ma = multiaddr('/ip6/::1/tcp/9091') - beforeEach((done) => { - ws = new WS() - listener = ws.createListener((conn) => { - pull(conn, conn) - }) - listener.listen(ma, done) + beforeEach(() => { + ws = new WS({ upgrader: mockUpgrader }) + listener = ws.createListener(conn => pipe(conn, conn)) + return listener.listen(ma) }) - afterEach((done) => { - listener.close(done) + afterEach(() => listener.close()) + + it('dial', async () => { + const conn = await ws.dial(ma) + const s = goodbye({ source: ['hey'], sink: collect }) + + const result = await pipe(s, conn, s) + + expect(result).to.be.eql([Buffer.from('hey')]) }) - it('dial', (done) => { - const conn = ws.dial(ma) + it('dial with p2p Id', async () => { + const ma = multiaddr('/ip6/::1/tcp/9091/ws/p2p/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const conn = await ws.dial(ma) const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) + source: ['hey'], + sink: collect }) - pull(s, conn, s) - }) - - it('dial with IPFS Id', (done) => { - const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') - const conn = ws.dial(ma) - - const s = goodbye({ - source: pull.values(['hey']), - sink: pull.collect((err, result) => { - expect(err).to.not.exist() - - expect(result).to.be.eql(['hey']) - done() - }) - }) - - pull(s, conn, s) + const result = await pipe(s, conn, s) + expect(result).to.be.eql([Buffer.from('hey')]) }) }) }) @@ -309,7 +313,7 @@ describe('filter addrs', () => { let ws before(() => { - ws = new WS() + ws = new WS({ upgrader: mockUpgrader }) }) describe('filter valid addrs for this transport', function () { @@ -439,125 +443,3 @@ describe('filter addrs', () => { done() }) }) - -describe('valid Connection', () => { - const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') - - it('get observed addrs', (done) => { - let dialerObsAddrs - let listenerObsAddrs - - const ws = new WS() - - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - dialerObsAddrs = addrs - }) - - pull(conn, conn) - }) - - listener.listen(ma, () => { - const conn = ws.dial(ma) - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getObservedAddrs((err, addrs) => { - expect(err).to.not.exist() - listenerObsAddrs = addrs - - listener.close(onClose) - - function onClose () { - expect(listenerObsAddrs[0]).to.deep.equal(ma) - expect(dialerObsAddrs.length).to.equal(0) - done() - } - }) - } - }) - }) - - it('get Peer Info', (done) => { - const ws = new WS() - - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - }) - - pull(conn, conn) - }) - - listener.listen(ma, () => { - const conn = ws.dial(ma) - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.exist() - listener.close(done) - }) - } - }) - }) - - it('set Peer Info', (done) => { - const ws = new WS() - - const listener = ws.createListener((conn) => { - expect(conn).to.exist() - conn.setPeerInfo('a') - - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('a') - }) - - pull(conn, conn) - }) - - listener.listen(ma, onListen) - - function onListen () { - const conn = ws.dial(ma) - conn.setPeerInfo('b') - - pull( - pull.empty(), - conn, - pull.onEnd(onEnd) - ) - - function onEnd () { - conn.getPeerInfo((err, peerInfo) => { - expect(err).to.not.exist() - expect(peerInfo).to.equal('b') - listener.close(done) - }) - } - } - }) -}) - -describe.skip('turbolence', () => { - it('dialer - emits error on the other end is terminated abruptly', (done) => { - }) - it('listener - emits error on the other end is terminated abruptly', (done) => { - }) -})