follow new transport and connection spec

This commit is contained in:
David Dias 2016-06-19 06:35:31 +01:00
parent 0ee73f062e
commit bbc92b998f
7 changed files with 410 additions and 145 deletions

11
.aegir.js Normal file
View File

@ -0,0 +1,11 @@
'use strict'
module.exports = {
webpack: {
resolve: {
},
externals: {
'simple-websocket-server': '{}'
}
}
}

View File

@ -4,12 +4,10 @@ js-libp2p-websockets
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
![](https://img.shields.io/badge/coverage-%3F-yellow.svg?style=flat-square)
[![Dependency Status](https://david-dm.org/diasdavid/js-libp2p-websockets.svg?style=flat-square)](https://david-dm.org/diasdavid/js-libp2p-websockets)
[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-websockets.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-websockets)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
![](https://raw.githubusercontent.com/diasdavid/interace-connection/master/img/badge.png)
![](https://raw.githubusercontent.com/diasdavid/interface-transport/master/img/badge.png)
> JavaScript implementation of the WebSockets module that libp2p uses and that implements the interface-transport interface
![](https://raw.githubusercontent.com/libp2p/interface-connection/master/img/badge.png)
![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png)
> JavaScript implementation of the WebSockets module that libp2p uses and that implements the interface-transport interface

View File

@ -2,20 +2,21 @@
const gulp = require('gulp')
const multiaddr = require('multiaddr')
const WSlibp2p = require('./src')
const WS = require('./src')
let ws
let listener
gulp.task('test:browser:before', (done) => {
ws = new WSlibp2p()
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
ws.createListener(mh, (socket) => {
socket.pipe(socket)
}, done)
const ws = new WS()
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
listener = ws.createListener((conn) => {
conn.pipe(conn)
})
listener.listen(ma, done)
})
gulp.task('test:browser:after', (done) => {
ws.close(done)
listener.close(done)
})
require('aegir/gulp')(gulp)

View File

@ -34,18 +34,20 @@
},
"homepage": "https://github.com/diasdavid/js-libp2p-websockets#readme",
"dependencies": {
"detect-node": "^2.0.3",
"interface-connection": "^0.1.3",
"lodash.contains": "^2.4.3",
"mafmt": "^2.1.0",
"multiaddr": "^2.0.2",
"run-parallel": "^1.1.6",
"simple-websocket": "github:diasdavid/simple-websocket#ec31437"
"simple-websocket": "^4.1.0",
"simple-websocket-server": "^0.1.4"
},
"devDependencies": {
"aegir": "^3.2.0",
"multiaddr": "^2.0.2",
"chai": "^3.5.0",
"aegir": "^3.0.1",
"gulp": "^3.9.1",
"interface-connection": "0.0.3",
"interface-transport": "^0.1.1",
"interface-transport": "^0.2.0",
"pre-commit": "^1.1.2"
},
"contributors": [
@ -53,4 +55,4 @@
"Francisco Baio Dias <xicombd@gmail.com>",
"Friedel Ziegelmayer <dignifiedquire@gmail.com>"
]
}
}

View File

@ -2,10 +2,20 @@
const debug = require('debug')
const log = debug('libp2p:websockets')
const SWS = require('simple-websocket')
const SW = require('simple-websocket')
const isNode = require('detect-node')
let SWS
if (isNode) {
SWS = require('simple-websocket-server')
} else {
SWS = {}
}
const mafmt = require('mafmt')
const parallel = require('run-parallel')
const contains = require('lodash.contains')
const Connection = require('interface-connection').Connection
const CLOSE_TIMEOUT = 2000
// const IPFS_CODE = 421
exports = module.exports = WebSockets
@ -14,66 +24,118 @@ function WebSockets () {
return new WebSockets()
}
const listeners = []
this.dial = function (multiaddr, options) {
if (!options) {
this.dial = function (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}
options.ready = options.ready || function noop () {}
const maOpts = multiaddr.toOptions()
const conn = new SWS('ws://' + maOpts.host + ':' + maOpts.port)
conn.on('connect', options.ready)
conn.getObservedAddrs = () => {
return [multiaddr]
if (!callback) {
callback = function noop () {}
}
const maOpts = ma.toOptions()
const socket = new SW('ws://' + maOpts.host + ':' + maOpts.port)
const conn = new Connection(socket)
socket.on('timeout', () => {
conn.emit('timeout')
})
socket.on('error', (err) => {
callback(err)
conn.emit('error', err)
})
socket.on('connect', () => {
callback(null, conn)
conn.emit('connect')
})
conn.getObservedAddrs = (cb) => {
return cb(null, [ma])
}
return conn
}
this.createListener = (multiaddrs, options, handler, callback) => {
this.createListener = (options, handler) => {
if (typeof options === 'function') {
callback = handler
handler = options
options = {}
}
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}
const listener = SWS.createServer((socket) => {
const conn = new Connection(socket)
var count = 0
conn.getObservedAddrs = (cb) => {
// TODO research if we can reuse the address in anyway
return cb(null, [])
}
handler(conn)
})
multiaddrs.forEach((m) => {
if (contains(m.protoNames(), 'ipfs')) {
m = m.decapsulate('ipfs')
let listeningMultiaddr
listener._listen = listener.listen
listener.listen = (ma, callback) => {
if (!callback) {
callback = function noop () {}
}
const listener = SWS.createServer((conn) => {
conn.getObservedAddrs = () => {
return [] // TODO think if it makes sense for WebSockets
}
handler(conn)
})
listeningMultiaddr = ma
listener.listen(m.toOptions().port, () => {
if (++count === multiaddrs.length) {
callback()
}
})
listeners.push(listener)
})
}
if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
}
this.close = (callback) => {
if (listeners.length === 0) {
log('Called close with no active listeners')
return callback()
listener._listen(ma.toOptions(), callback)
}
parallel(listeners.map((listener) => {
return (cb) => listener.close(cb)
}), callback)
listener._close = listener.close
listener.close = (options, callback) => {
if (typeof options === 'function') {
callback = options
options = { timeout: CLOSE_TIMEOUT }
}
if (!callback) { callback = function noop () {} }
if (!options) { options = { timeout: CLOSE_TIMEOUT } }
let closed = false
listener.once('close', () => {
closed = true
})
listener._close(callback)
setTimeout(() => {
if (closed) {
return
}
log('unable to close graciously, destroying conns')
Object.keys(listener.__connections).forEach((key) => {
log('destroying %s', key)
listener.__connections[key].destroy()
})
}, options.timeout || CLOSE_TIMEOUT)
}
// Keep track of open connections to destroy in case of timeout
listener.__connections = {}
listener.on('connection', (socket) => {
const key = (~~(Math.random() * 1e9)).toString(36) + Date.now()
listener.__connections[key] = socket
socket.on('close', () => {
delete listener.__connections[key]
})
})
listener.getAddrs = (callback) => {
callback(null, [listeningMultiaddr])
}
return listener
}
this.filter = (multiaddrs) => {

View File

@ -3,21 +3,20 @@
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const WSlibp2p = require('../src')
const WS = require('../src')
describe('libp2p-websockets', function () {
this.timeout(10000)
var ws
describe('libp2p-websockets', () => {
let ws
it('create', (done) => {
ws = new WSlibp2p()
ws = new WS()
expect(ws).to.exist
done()
})
it('echo', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const conn = ws.dial(mh)
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const conn = ws.dial(ma)
const message = 'Hello World!'
conn.write(message)
conn.on('data', (data) => {

View File

@ -3,98 +3,290 @@
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const WSlibp2p = require('../src')
describe('libp2p-websockets', function () {
this.timeout(10000)
var ws
const WS = require('../src')
describe('instantiate the transport', () => {
it('create', (done) => {
ws = new WSlibp2p()
const ws = new WS()
expect(ws).to.exist
done()
})
it('listen and dial', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
ws.createListener(mh, (socket) => {
expect(socket).to.exist
socket.end()
ws.close(done)
}, () => {
const conn = ws.dial(mh)
conn.end()
})
})
it('listen on several', (done) => {
const mh1 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const mh2 = multiaddr('/ip4/127.0.0.1/tcp/9091/ws')
const ws = new WSlibp2p()
ws.createListener([mh1, mh2], (socket) => {}, () => {
ws.close(done)
})
})
it('get observed addrs', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
ws.createListener(mh, (socket) => {
expect(socket).to.exist
socket.end()
expect(socket.getObservedAddrs()).to.deep.equal([])
ws.close(done)
}, () => {
const conn = ws.dial(mh)
conn.end()
})
})
it('filter', (done) => {
const mh1 = multiaddr('/ip4/127.0.0.1/tcp/9090')
const mh2 = multiaddr('/ip4/127.0.0.1/udp/9090')
const mh3 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const valid = ws.filter([mh1, mh2, mh3])
expect(valid.length).to.equal(1)
expect(valid[0]).to.deep.equal(mh3)
it('create without new', (done) => {
const ws = WS()
expect(ws).to.exist
done()
})
})
it('echo', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
ws.createListener(mh, (conn) => {
conn.pipe(conn)
}, () => {
const conn = ws.dial(mh)
const message = 'Hello World!'
conn.write(message)
conn.on('data', (data) => {
expect(data.toString()).to.equal(message)
conn.end()
ws.close(done)
describe('listen', () => {
let ws
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
beforeEach(() => {
ws = new WS()
})
it('listen, check for callback', (done) => {
const listener = ws.createListener((conn) => {})
listener.listen(ma, () => {
listener.close(done)
})
})
it('listen, check for listening event', (done) => {
const listener = ws.createListener((conn) => {})
listener.on('listening', () => {
listener.close(done)
})
listener.listen(ma)
})
it('listen, check for the close event', (done) => {
const listener = ws.createListener((conn) => {})
listener.on('listening', () => {
listener.on('close', done)
listener.close()
})
listener.listen(ma)
})
it('listen on addr with /ipfs/QmHASH', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const listener = ws.createListener((conn) => {})
listener.listen(ma, () => {
listener.close(done)
})
})
it.skip('close listener with connections, through timeout', (done) => {
// TODO `ws` closes all anyway, we need to make it not close
// first - https://github.com/diasdavid/simple-websocket-server
})
it.skip('listen on port 0', (done) => {
// TODO port 0 not supported yet
})
it.skip('listen on IPv6 addr', (done) => {
// TODO IPv6 not supported yet
})
it.skip('listen on any Interface', (done) => {
// TODO 0.0.0.0 not supported yet
})
it('getAddrs', (done) => {
const listener = ws.createListener((conn) => {})
listener.listen(ma, () => {
listener.getAddrs((err, addrs) => {
expect(err).to.not.exist
expect(addrs.length).to.equal(1)
expect(addrs[0]).to.deep.equal(ma)
listener.close(done)
})
})
})
it('echo with connect event and send', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
ws.createListener(mh, (conn) => {
conn.pipe(conn)
}, () => {
const message = 'Hello World!'
it.skip('getAddrs on port 0 listen', (done) => {
// TODO port 0 not supported yet
})
const conn = ws.dial(mh, {
ready: () => {
conn.send(message)
}
})
it.skip('getAddrs from listening on 0.0.0.0', (done) => {
// TODO 0.0.0.0 not supported yet
})
conn.on('data', (data) => {
expect(data.toString()).to.equal(message)
conn.end()
ws.close(done)
it.skip('getAddrs from listening on 0.0.0.0 and port 0', (done) => {
// TODO 0.0.0.0 or port 0 not supported yet
})
it('getAddrs preserves IPFS Id', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const listener = ws.createListener((conn) => {})
listener.listen(ma, () => {
listener.getAddrs((err, addrs) => {
expect(err).to.not.exist
expect(addrs.length).to.equal(1)
expect(addrs[0]).to.deep.equal(ma)
listener.close(done)
})
})
})
})
describe('dial', () => {
let ws
let listener
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
beforeEach((done) => {
ws = new WS()
listener = ws.createListener((conn) => {
conn.pipe(conn)
})
listener.listen(ma, done)
})
afterEach((done) => {
listener.close(done)
})
it('dial on IPv4', (done) => {
const conn = ws.dial(ma)
conn.write('hey')
conn.end()
conn.on('data', (chunk) => {
expect(chunk.toString()).to.equal('hey')
})
conn.on('end', done)
})
it.skip('dial on IPv6', (done) => {
// TODO IPv6 not supported yet
})
it('dial on IPv4 with IPFS Id', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const conn = ws.dial(ma)
conn.write('hey')
conn.end()
conn.on('data', (chunk) => {
expect(chunk.toString()).to.equal('hey')
})
conn.on('end', done)
})
})
describe('filter addrs', () => {
let ws
before(() => {
ws = new WS()
})
it('filter valid addrs for this transport', (done) => {
const mh1 = multiaddr('/ip4/127.0.0.1/tcp/9090')
const mh2 = multiaddr('/ip4/127.0.0.1/udp/9090')
const mh3 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const mh4 = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const valid = ws.filter([mh1, mh2, mh3, mh4])
expect(valid.length).to.equal(2)
expect(valid[0]).to.deep.equal(mh3)
expect(valid[1]).to.deep.equal(mh4)
done()
})
it('filter a single addr for this transport', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const valid = ws.filter(ma)
expect(valid.length).to.equal(1)
expect(valid[0]).to.deep.equal(ma)
done()
})
})
describe('valid Connection', () => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
it('get observed addrs', (done) => {
let dialerObsAddrs
let listenerObsAddrs
const ws = new WS()
const listener = ws.createListener((conn) => {
expect(conn).to.exist
conn.getObservedAddrs((err, addrs) => {
expect(err).to.not.exist
dialerObsAddrs = addrs
})
conn.pipe(conn)
})
listener.listen(ma, () => {
const conn = ws.dial(ma)
conn.on('end', () => {
conn.getObservedAddrs((err, addrs) => {
expect(err).to.not.exist
listenerObsAddrs = addrs
listener.close(() => {
expect(listenerObsAddrs[0]).to.deep.equal(ma)
expect(dialerObsAddrs.length).to.equal(0)
done()
})
})
})
conn.resume()
conn.end()
})
})
it('get Peer Info', (done) => {
const ws = new WS()
const listener = ws.createListener((conn) => {
expect(conn).to.exist
conn.getPeerInfo((err, peerInfo) => {
expect(err).to.exist
})
conn.pipe(conn)
})
listener.listen(ma, () => {
const conn = ws.dial(ma)
conn.on('end', () => {
conn.getPeerInfo((err, peerInfo) => {
expect(err).to.exit
listener.close(done)
})
})
conn.resume()
conn.end()
})
})
it('set Peer Info', (done) => {
const ws = new WS()
const listener = ws.createListener((conn) => {
expect(conn).to.exist
conn.setPeerInfo('a')
conn.getPeerInfo((err, peerInfo) => {
expect(err).to.not.exist
expect(peerInfo).to.equal('a')
})
conn.pipe(conn)
})
listener.listen(ma, () => {
const conn = ws.dial(ma)
conn.setPeerInfo('b')
conn.on('end', () => {
conn.getPeerInfo((err, peerInfo) => {
expect(err).to.not.exist
expect(peerInfo).to.equal('b')
listener.close(done)
})
})
conn.resume()
conn.end()
})
})
})
describe.skip('turbolence', () => {
it('dialer - emits error on the other end is terminated abruptly', (done) => {})
it('listener - emits error on the other end is terminated abruptly', (done) => {})
})