feat(pull): migrate to pull streams

This commit is contained in:
dignifiedquire 2016-08-11 14:50:44 +02:00 committed by David Dias
parent 3c3a7077f6
commit 3f58dca09a
7 changed files with 188 additions and 199 deletions

View File

@ -2,6 +2,8 @@
const gulp = require('gulp')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const WS = require('./src')
let listener
@ -10,7 +12,7 @@ gulp.task('test:browser:before', (done) => {
const ws = new WS()
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
listener = ws.createListener((conn) => {
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, done)
})

View File

@ -35,20 +35,20 @@
"homepage": "https://github.com/libp2p/js-libp2p-websockets#readme",
"dependencies": {
"detect-node": "^2.0.3",
"interface-connection": "^0.1.8",
"interface-connection": "^0.2.1",
"lodash.contains": "^2.4.3",
"mafmt": "^2.1.1",
"run-parallel": "^1.1.6",
"simple-websocket": "^4.1.0",
"simple-websocket-server": "^0.1.4"
"pull-ws": "^3.2.3"
},
"devDependencies": {
"aegir": "^6.0.1",
"multiaddr": "^2.0.2",
"chai": "^3.5.0",
"gulp": "^3.9.1",
"interface-transport": "^0.2.0",
"pre-commit": "^1.1.3"
"interface-transport": "^0.3.1",
"multiaddr": "^2.0.2",
"pre-commit": "^1.1.3",
"pull-goodbye": "0.0.1",
"pull-stream": "^3.4.3"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",

View File

@ -1,147 +1,55 @@
'use strict'
const debug = require('debug')
const log = debug('libp2p:websockets')
const SW = require('simple-websocket')
const isNode = require('detect-node')
let SWS
if (isNode) {
SWS = require('simple-websocket-server')
} else {
SWS = {}
}
const connect = require('pull-ws/client')
const mafmt = require('mafmt')
const contains = require('lodash.contains')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('libp2p:websockets:dialer')
const CLOSE_TIMEOUT = 2000
// const IPFS_CODE = 421
const createListener = require('./listener')
exports = module.exports = WebSockets
function WebSockets () {
if (!(this instanceof WebSockets)) {
return new WebSockets()
}
this.dial = function (ma, options, callback) {
module.exports = class WebSockets {
dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}
if (!callback) {
callback = function noop () {}
callback = () => {}
}
const maOpts = ma.toOptions()
const socket = new SW('ws://' + maOpts.host + ':' + maOpts.port)
const url = `ws://${maOpts.host}:${maOpts.port}`
log('dialing %s', url)
const socket = connect(url, {
binary: true,
onConnect: callback
})
const conn = new Connection(socket)
socket.on('timeout', () => {
conn.emit('timeout')
})
socket.on('error', (err) => {
callback(err)
conn.emit('error', err)
})
socket.on('connect', () => {
callback(null, conn)
conn.emit('connect')
})
conn.getObservedAddrs = (cb) => {
return cb(null, [ma])
}
conn.getObservedAddrs = (cb) => cb(null, [ma])
conn.close = (cb) => socket.close(cb)
return conn
}
this.createListener = (options, handler) => {
createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}
const listener = SWS.createServer((socket) => {
const conn = new Connection(socket)
conn.getObservedAddrs = (cb) => {
// TODO research if we can reuse the address in anyway
return cb(null, [])
}
handler(conn)
})
let listeningMultiaddr
listener._listen = listener.listen
listener.listen = (ma, callback) => {
if (!callback) {
callback = function noop () {}
}
listeningMultiaddr = ma
if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
}
listener._listen(ma.toOptions(), callback)
}
listener._close = listener.close
listener.close = (options, callback) => {
if (typeof options === 'function') {
callback = options
options = { timeout: CLOSE_TIMEOUT }
}
if (!callback) { callback = function noop () {} }
if (!options) { options = { timeout: CLOSE_TIMEOUT } }
let closed = false
listener.once('close', () => {
closed = true
})
listener._close(callback)
setTimeout(() => {
if (closed) {
return
}
log('unable to close graciously, destroying conns')
Object.keys(listener.__connections).forEach((key) => {
log('destroying %s', key)
listener.__connections[key].destroy()
})
}, options.timeout || CLOSE_TIMEOUT)
}
// Keep track of open connections to destroy in case of timeout
listener.__connections = {}
listener.on('connection', (socket) => {
const key = (~~(Math.random() * 1e9)).toString(36) + Date.now()
listener.__connections[key] = socket
socket.on('close', () => {
delete listener.__connections[key]
})
})
listener.getAddrs = (callback) => {
callback(null, [listeningMultiaddr])
}
return listener
return createListener(options, handler)
}
this.filter = (multiaddrs) => {
filter (multiaddrs) {
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}
return multiaddrs.filter((ma) => {
if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')

46
src/listener.js Normal file
View File

@ -0,0 +1,46 @@
'use strict'
const isNode = require('detect-node')
const Connection = require('interface-connection').Connection
const contains = require('lodash.contains')
// const IPFS_CODE = 421
let createServer
if (isNode) {
createServer = require('pull-ws/server')
} else {
createServer = () => {}
}
module.exports = (options, handler) => {
const listener = createServer((socket) => {
socket.getObservedAddrs = (cb) => {
// TODO research if we can reuse the address in anyway
return cb(null, [])
}
handler(new Connection(socket))
})
let listeningMultiaddr
listener._listen = listener.listen
listener.listen = (ma, cb) => {
cb = cb || (() => {})
listeningMultiaddr = ma
if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
}
listener._listen(ma.toOptions(), cb)
}
listener.getAddrs = (cb) => {
cb(null, [listeningMultiaddr])
}
return listener
}

View File

@ -3,74 +3,67 @@
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const goodbye = require('pull-goodbye')
const WS = require('../src')
describe('libp2p-websockets', () => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
let ws
let conn
it('create', (done) => {
beforeEach((done) => {
ws = new WS()
expect(ws).to.exist
done()
conn = ws.dial(ma, done)
})
it('echo', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const conn = ws.dial(ma)
const message = 'Hello World!'
conn.write(message)
conn.on('data', (data) => {
expect(data.toString()).to.equal(message)
conn.end()
done()
const s = goodbye({
source: pull.values([message]),
sink: pull.collect((err, results) => {
expect(err).to.not.exist
expect(results).to.be.eql([message])
done()
})
})
pull(s, conn, s)
})
describe('stress', () => {
it('one big write', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const conn = ws.dial(mh)
const message = new Buffer(1000000).fill('a').toString('hex')
conn.write(message)
conn.on('data', (data) => {
expect(data.toString()).to.equal(message)
conn.end()
done()
const rawMessage = new Buffer(1000000).fill('a')
const s = goodbye({
source: pull.values([rawMessage]),
sink: pull.collect((err, results) => {
expect(err).to.not.exist
expect(results).to.be.eql([rawMessage])
done()
})
})
pull(s, conn, s)
})
it('many writes in 2 batches', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const conn = ws.dial(mh)
let expected = ''
let counter = 0
while (++counter < 10000) {
conn.write(`${counter} `)
expected += `${counter} `
}
setTimeout(() => {
while (++counter < 20000) {
conn.write(`${counter} `)
expected += `${counter} `
}
conn.write('STOP')
}, 1000)
let result = ''
conn.on('data', (data) => {
if (data.toString() === 'STOP') {
conn.end()
return
}
result += data.toString()
it('many writes', (done) => {
const s = goodbye({
source: pull(
pull.infinite(),
pull.take(1000),
pull.map((val) => Buffer(val.toString()))
),
sink: pull.collect((err, result) => {
expect(err).to.not.exist
expect(result).to.have.length(1000)
done()
})
})
conn.on('end', () => {
expect(result).to.equal(expected)
done()
})
pull(s, conn, s)
})
})
})

23
test/compliance.node.js Normal file
View File

@ -0,0 +1,23 @@
/* eslint-env mocha */
'use strict'
const tests = require('interface-transport')
const multiaddr = require('multiaddr')
const Ws = require('../src')
describe('compliance', () => {
tests({
setup (cb) {
let ws = new Ws()
const addrs = [
multiaddr('/ip4/127.0.0.1/tcp/9091/ws'),
multiaddr('/ip4/127.0.0.1/tcp/9092/ws'),
multiaddr('/ip4/127.0.0.1/tcp/9093/ws')
]
cb(null, ws, addrs)
},
teardown (cb) {
cb()
}
})
})

View File

@ -3,19 +3,17 @@
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const goodbye = require('pull-goodbye')
const WS = require('../src')
require('./compliance.node')
describe('instantiate the transport', () => {
it('create', (done) => {
it('create', () => {
const ws = new WS()
expect(ws).to.exist
done()
})
it('create without new', (done) => {
const ws = WS()
expect(ws).to.exist
done()
})
})
@ -122,7 +120,7 @@ describe('dial', () => {
beforeEach((done) => {
ws = new WS()
listener = ws.createListener((conn) => {
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, done)
})
@ -133,12 +131,18 @@ describe('dial', () => {
it('dial on IPv4', (done) => {
const conn = ws.dial(ma)
conn.write('hey')
conn.end()
conn.on('data', (chunk) => {
expect(chunk.toString()).to.equal('hey')
const s = goodbye({
source: pull.values(['hey']),
sink: pull.collect((err, result) => {
expect(err).to.not.exist
expect(result).to.be.eql(['hey'])
done()
})
})
conn.on('end', done)
pull(s, conn, s)
})
it.skip('dial on IPv6', (done) => {
@ -148,12 +152,18 @@ describe('dial', () => {
it('dial on IPv4 with IPFS Id', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const conn = ws.dial(ma)
conn.write('hey')
conn.end()
conn.on('data', (chunk) => {
expect(chunk.toString()).to.equal('hey')
const s = goodbye({
source: pull.values(['hey']),
sink: pull.collect((err, result) => {
expect(err).to.not.exist
expect(result).to.be.eql(['hey'])
done()
})
})
conn.on('end', done)
pull(s, conn, s)
})
})
@ -204,13 +214,17 @@ describe('valid Connection', () => {
dialerObsAddrs = addrs
})
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, () => {
const conn = ws.dial(ma)
conn.on('end', onEnd)
pull(
pull.empty(),
conn,
pull.onEnd(onEnd)
)
function onEnd () {
conn.getObservedAddrs((err, addrs) => {
@ -218,7 +232,6 @@ describe('valid Connection', () => {
listenerObsAddrs = addrs
listener.close(onClose)
function onClose () {
expect(listenerObsAddrs[0]).to.deep.equal(ma)
expect(dialerObsAddrs.length).to.equal(0)
@ -226,8 +239,6 @@ describe('valid Connection', () => {
}
})
}
conn.resume()
conn.end()
})
})
@ -241,13 +252,17 @@ describe('valid Connection', () => {
expect(err).to.exist
})
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, () => {
const conn = ws.dial(ma)
conn.on('end', onEnd)
pull(
pull.empty(),
conn,
pull.onEnd(onEnd)
)
function onEnd () {
conn.getPeerInfo((err, peerInfo) => {
@ -255,8 +270,6 @@ describe('valid Connection', () => {
listener.close(done)
})
}
conn.resume()
conn.end()
})
})
@ -272,7 +285,7 @@ describe('valid Connection', () => {
expect(peerInfo).to.equal('a')
})
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, onListen)
@ -280,15 +293,19 @@ describe('valid Connection', () => {
const conn = ws.dial(ma)
conn.setPeerInfo('b')
conn.on('end', () => {
pull(
pull.empty(),
conn,
pull.onEnd(onEnd)
)
function onEnd () {
conn.getPeerInfo((err, peerInfo) => {
expect(err).to.not.exist
expect(peerInfo).to.equal('b')
listener.close(done)
})
})
conn.resume()
conn.end()
}
}
})
})