From 4321b7e89a2c7da55af1a1ec3bac46643a68613d Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Mon, 1 Apr 2019 14:27:25 +0100 Subject: [PATCH] feat: adaper with passing tests --- package.json | 3 +- src/adapter.js | 73 +++++ src/index.js | 17 +- test/adapter/compliance.node.js | 24 ++ test/adapter/node.js | 563 ++++++++++++++++++++++++++++++++ 5 files changed, 670 insertions(+), 10 deletions(-) create mode 100644 src/adapter.js create mode 100644 test/adapter/compliance.node.js create mode 100644 test/adapter/node.js diff --git a/package.json b/package.json index 66f043a..8badf4b 100644 --- a/package.json +++ b/package.json @@ -40,10 +40,11 @@ }, "homepage": "https://github.com/libp2p/js-libp2p-websockets#readme", "dependencies": { + "async-iterator-to-pull-stream": "^1.3.0", "class-is": "^1.1.0", "debug": "^4.1.1", "interface-connection": "~0.3.2", - "it-ws": "^1.0.0", + "it-ws": "^2.1.0", "mafmt": "^6.0.4", "multiaddr-to-uri": "^4.0.1" }, diff --git a/src/adapter.js b/src/adapter.js new file mode 100644 index 0000000..15015b9 --- /dev/null +++ b/src/adapter.js @@ -0,0 +1,73 @@ +'use strict' + +const { Connection } = require('interface-connection') +const withIs = require('class-is') +const toPull = require('async-iterator-to-pull-stream') +const WebSockets = require('./') +const noop = () => {} + +function callbackify (fn) { + return async function (...args) { + let cb = args.pop() + if (typeof cb !== 'function') { + args.push(cb) + cb = noop + } + let res + try { + res = await fn(...args) + } catch (err) { + return cb(err) + } + cb(null, res) + } +} + +// Legacy adapter to old transport & connection interface +class WebSocketsAdapter extends WebSockets { + dial (ma, options, callback) { + if (typeof options === 'function') { + callback = options + options = {} + } + + callback = callback || noop + + const socket = super.dial(ma, options) + const conn = new Connection(toPull.duplex(socket)) + + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + conn.close = callbackify(socket.close.bind(socket)) + + socket.connected().then(callback).catch(callback) + + return conn + } + + createListener (options, handler) { + if (typeof options === 'function') { + handler = options + options = {} + } + + const server = super.createListener(options, socket => { + const conn = new Connection(toPull.duplex(socket)) + conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) + handler(conn) + }) + + const proxy = { + listen: callbackify(server.listen.bind(server)), + close: callbackify(server.close.bind(server)), + getAddrs: callbackify(server.getAddrs.bind(server)), + getObservedAddrs: callbackify(() => server.getObservedAddrs()) + } + + return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] }) + } +} + +module.exports = withIs(WebSocketsAdapter, { + className: 'WebSockets', + symbolName: '@libp2p/js-libp2p-websockets/websockets' +}) diff --git a/src/index.js b/src/index.js index e6b2947..0237dc6 100644 --- a/src/index.js +++ b/src/index.js @@ -9,10 +9,9 @@ const log = require('debug')('libp2p:websockets:transport') const createListener = require('./listener') class WebSockets { - async dial (ma, options) { + dial (ma, options) { log('dialing %s', ma) - const url = toUri(ma) - const socket = connect(url, { binary: true }) + const socket = connect(toUri(ma), { binary: true }) socket.getObservedAddrs = () => [ma] log('connected %s', ma) return socket @@ -23,9 +22,7 @@ class WebSockets { } filter (multiaddrs) { - if (!Array.isArray(multiaddrs)) { - multiaddrs = [multiaddrs] - } + multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs] return multiaddrs.filter((ma) => { if (ma.protoNames().includes('p2p-circuit')) { @@ -36,10 +33,12 @@ class WebSockets { ma = ma.decapsulate('ipfs') } - return mafmt.WebSockets.matches(ma) || - mafmt.WebSocketsSecure.matches(ma) + return mafmt.WebSockets.matches(ma) || mafmt.WebSocketsSecure.matches(ma) }) } } -module.exports = withIs(WebSockets, { className: 'WebSockets', symbolName: '@libp2p/js-libp2p-websockets/websockets' }) +module.exports = withIs(WebSockets, { + className: 'WebSockets', + symbolName: '@libp2p/js-libp2p-websockets/websockets' +}) diff --git a/test/adapter/compliance.node.js b/test/adapter/compliance.node.js new file mode 100644 index 0000000..e940090 --- /dev/null +++ b/test/adapter/compliance.node.js @@ -0,0 +1,24 @@ +/* eslint-env mocha */ +'use strict' + +const tests = require('interface-transport') +const multiaddr = require('multiaddr') +const WS = require('../../src/adapter') + +describe('compliance', () => { + tests({ + setup (callback) { + let ws = new WS() + const addrs = [ + multiaddr('/ip4/127.0.0.1/tcp/9091/ws'), + multiaddr('/ip4/127.0.0.1/tcp/9092/wss'), + multiaddr('/dns4/ipfs.io/tcp/9092/ws'), + multiaddr('/dns4/ipfs.io/tcp/9092/wss') + ] + callback(null, ws, addrs) + }, + teardown (callback) { + callback() + } + }) +}) diff --git a/test/adapter/node.js b/test/adapter/node.js new file mode 100644 index 0000000..7537ba9 --- /dev/null +++ b/test/adapter/node.js @@ -0,0 +1,563 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 6] */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const multiaddr = require('multiaddr') +const pull = require('pull-stream') +const goodbye = require('pull-goodbye') + +const WS = require('../../src/adapter') + +require('./compliance.node') + +describe('instantiate the transport', () => { + it('create', () => { + const ws = new WS() + expect(ws).to.exist() + }) +}) + +describe('listen', () => { + describe('ip4', () => { + let ws + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws') + + beforeEach(() => { + ws = new WS() + }) + + it('listen, check for callback', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + + it('listen, check for listening event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.close(done) + }) + + listener.listen(ma) + }) + + it('listen, check for the close event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.on('close', done) + listener.close() + }) + + listener.listen(ma) + }) + + it('listen on addr with /ipfs/QmHASH', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + + it.skip('close listener with connections, through timeout', (done) => { + // TODO `ws` closes all anyway, we need to make it not close + // first - https://github.com/diasdavid/simple-websocket-server + }) + + it.skip('listen on port 0', (done) => { + // TODO port 0 not supported yet + }) + + it.skip('listen on any Interface', (done) => { + // TODO 0.0.0.0 not supported yet + }) + + it('getAddrs', (done) => { + const listener = ws.createListener((conn) => { + }) + listener.listen(ma, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + listener.close(done) + }) + }) + }) + + it('getAddrs on port 0 listen', (done) => { + const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`) + const listener = ws.createListener((conn) => { + }) + listener.listen(addr, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.length).to.equal(1) + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + listener.close(done) + }) + }) + }) + + it('getAddrs from listening on 0.0.0.0', (done) => { + const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`) + const listener = ws.createListener((conn) => { + }) + listener.listen(addr, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + listener.close(done) + }) + }) + }) + + it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { + const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`) + const listener = ws.createListener((conn) => { + }) + listener.listen(addr, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') + expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') + listener.close(done) + }) + }) + }) + + it('getAddrs preserves IPFS Id', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.getAddrs((err, addrs) => { + expect(err).to.not.exist() + expect(addrs.length).to.equal(1) + expect(addrs[0]).to.deep.equal(ma) + listener.close(done) + }) + }) + }) + }) + + describe('ip6', () => { + let ws + const ma = multiaddr('/ip6/::1/tcp/9091/ws') + + beforeEach(() => { + ws = new WS() + }) + + it('listen, check for callback', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + + it('listen, check for listening event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.close(done) + }) + + listener.listen(ma) + }) + + it('listen, check for the close event', (done) => { + const listener = ws.createListener((conn) => { }) + + listener.on('listening', () => { + listener.on('close', done) + listener.close() + }) + + listener.listen(ma) + }) + + it('listen on addr with /ipfs/QmHASH', (done) => { + const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const listener = ws.createListener((conn) => { }) + + listener.listen(ma, () => { + listener.close(done) + }) + }) + }) +}) + +describe('dial', () => { + describe('ip4', () => { + let ws + let listener + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws') + + beforeEach((done) => { + ws = new WS() + listener = ws.createListener((conn) => { + pull(conn, conn) + }) + listener.listen(ma, done) + }) + + afterEach((done) => { + listener.close(done) + }) + + it('dial', (done) => { + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + + it('dial with IPFS Id', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + }) + + describe('ip6', () => { + let ws + let listener + const ma = multiaddr('/ip6/::1/tcp/9091') + + beforeEach((done) => { + ws = new WS() + listener = ws.createListener((conn) => { + pull(conn, conn) + }) + listener.listen(ma, done) + }) + + afterEach((done) => { + listener.close(done) + }) + + it('dial', (done) => { + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + + it('dial with IPFS Id', (done) => { + const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const conn = ws.dial(ma) + + const s = goodbye({ + source: pull.values(['hey']), + sink: pull.collect((err, result) => { + expect(err).to.not.exist() + + expect(result).to.be.eql(['hey']) + done() + }) + }) + + pull(s, conn, s) + }) + }) +}) + +describe('filter addrs', () => { + let ws + + before(() => { + ws = new WS() + }) + + describe('filter valid addrs for this transport', function () { + it('should fail invalid WS addresses', function () { + const ma1 = multiaddr('/ip4/127.0.0.1/tcp/9090') + const ma2 = multiaddr('/ip4/127.0.0.1/udp/9090') + const ma3 = multiaddr('/ip6/::1/tcp/80') + const ma4 = multiaddr('/dnsaddr/ipfs.io/tcp/80') + + const valid = ws.filter([ma1, ma2, ma3, ma4]) + expect(valid.length).to.equal(0) + }) + + it('should filter correct ipv4 addresses', function () { + const ma1 = multiaddr('/ip4/127.0.0.1/tcp/80/ws') + const ma2 = multiaddr('/ip4/127.0.0.1/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct ipv4 addresses with ipfs id', function () { + const ma1 = multiaddr('/ip4/127.0.0.1/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/ip4/127.0.0.1/tcp/80/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct ipv6 address', function () { + const ma1 = multiaddr('/ip6/::1/tcp/80/ws') + const ma2 = multiaddr('/ip6/::1/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct ipv6 addresses with ipfs id', function () { + const ma1 = multiaddr('/ip6/::1/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/ip6/::1/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns address', function () { + const ma1 = multiaddr('/dnsaddr/ipfs.io/ws') + const ma2 = multiaddr('/dnsaddr/ipfs.io/tcp/80/ws') + const ma3 = multiaddr('/dnsaddr/ipfs.io/tcp/80/wss') + + const valid = ws.filter([ma1, ma2, ma3]) + expect(valid.length).to.equal(3) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + expect(valid[2]).to.deep.equal(ma3) + }) + + it('should filter correct dns address with ipfs id', function () { + const ma1 = multiaddr('/dnsaddr/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/dnsaddr/ipfs.io/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns4 address', function () { + const ma1 = multiaddr('/dns4/ipfs.io/tcp/80/ws') + const ma2 = multiaddr('/dns4/ipfs.io/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns6 address', function () { + const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws') + const ma2 = multiaddr('/dns6/ipfs.io/tcp/443/wss') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter correct dns6 address with ipfs id', function () { + const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/dns6/ipfs.io/tcp/443/wss/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma2) + }) + + it('should filter mixed addresses', function () { + const ma1 = multiaddr('/dns6/ipfs.io/tcp/80/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + const ma2 = multiaddr('/ip4/127.0.0.1/tcp/9090') + const ma3 = multiaddr('/ip4/127.0.0.1/udp/9090') + const ma4 = multiaddr('/dns6/ipfs.io/ws') + const mh5 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw' + + '/p2p-circuit/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter([ma1, ma2, ma3, ma4, mh5]) + expect(valid.length).to.equal(2) + expect(valid[0]).to.deep.equal(ma1) + expect(valid[1]).to.deep.equal(ma4) + }) + }) + + it('filter a single addr for this transport', (done) => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') + + const valid = ws.filter(ma) + expect(valid.length).to.equal(1) + expect(valid[0]).to.deep.equal(ma) + done() + }) +}) + +describe('valid Connection', () => { + const ma = multiaddr('/ip4/127.0.0.1/tcp/9092/ws') + + it('get observed addrs', (done) => { + let dialerObsAddrs + let listenerObsAddrs + + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist() + + conn.getObservedAddrs((err, addrs) => { + expect(err).to.not.exist() + dialerObsAddrs = addrs + }) + + pull(conn, conn) + }) + + listener.listen(ma, () => { + const conn = ws.dial(ma) + + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) + + function onEnd () { + conn.getObservedAddrs((err, addrs) => { + expect(err).to.not.exist() + listenerObsAddrs = addrs + + listener.close(onClose) + + function onClose () { + expect(listenerObsAddrs[0]).to.deep.equal(ma) + expect(dialerObsAddrs.length).to.equal(0) + done() + } + }) + } + }) + }) + + it('get Peer Info', (done) => { + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist() + + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.exist() + }) + + pull(conn, conn) + }) + + listener.listen(ma, () => { + const conn = ws.dial(ma) + + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) + + function onEnd () { + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.exist() + listener.close(done) + }) + } + }) + }) + + it('set Peer Info', (done) => { + const ws = new WS() + + const listener = ws.createListener((conn) => { + expect(conn).to.exist() + conn.setPeerInfo('a') + + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist() + expect(peerInfo).to.equal('a') + }) + + pull(conn, conn) + }) + + listener.listen(ma, onListen) + + function onListen () { + const conn = ws.dial(ma) + conn.setPeerInfo('b') + + pull( + pull.empty(), + conn, + pull.onEnd(onEnd) + ) + + function onEnd () { + conn.getPeerInfo((err, peerInfo) => { + expect(err).to.not.exist() + expect(peerInfo).to.equal('b') + listener.close(done) + }) + } + } + }) +}) + +describe.skip('turbolence', () => { + it('dialer - emits error on the other end is terminated abruptly', (done) => { + }) + it('listener - emits error on the other end is terminated abruptly', (done) => { + }) +})