Merge pull request #18 from libp2p/pull

[WIP] Move to pull-streams
This commit is contained in:
David Dias 2016-09-06 08:48:31 -04:00 committed by GitHub
commit 4b0f9e201c
8 changed files with 248 additions and 211 deletions

View File

@ -1,13 +1,61 @@
js-libp2p-websockets
====================
# js-libp2p-websockets
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
![](https://img.shields.io/badge/coverage-%3F-yellow.svg?style=flat-square)
[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-websockets.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-websockets)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
[![Coverage Status](https://coveralls.io/repos/github/libp2p/js-libp2p-websockets/badge.svg?branch=master)](https://coveralls.io/github/libp2p/js-libp2p-websockets?branch=master)
[![Travis CI](https://travis-ci.org/libp2p/js-libp2p-websockets.svg?branch=master)](https://travis-ci.org/libp2p/js-libp2p-websockets)
[![Circle CI](https://circleci.com/gh/libp2p/js-libp2p-websockets.svg?style=svg)](https://circleci.com/gh/libp2p/js-libp2p-websockets)
[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-websockets.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-websockets) [![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
![](https://raw.githubusercontent.com/libp2p/interface-connection/master/img/badge.png)
![](https://raw.githubusercontent.com/libp2p/interface-transport/master/img/badge.png)
> JavaScript implementation of the WebSockets module that libp2p uses and that implements the interface-transport interface
## Description
`libp2p-websockets` is the WebSockets implementation compatible with libp2p.
**Note:** This module uses [pull-streams](https://pull-stream.github.io) for all stream based interfaces.
## Example
```
TODO
```
## Installation
### npm
```sh
> npm i libp2p-websockets
```
## This module uses `pull-streams`
We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about what took us to make this migration, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362).
You can learn more about pull-streams at:
- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ)
- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams)
- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple)
- [pull-streams documentation](https://pull-stream.github.io/)
### Converting `pull-streams` to Node.js Streams
If you are a Node.js streams user, you can convert a pull-stream to Node.js Stream using the module `pull-stream-to-stream`, giving you an instance of a Node.js stream that is linked to the pull-stream. Example:
```
const pullToStream = require('pull-stream-to-stream')
const nodeStreamInstance = pullToStream(pullStreamInstance)
// nodeStreamInstance is an instance of a Node.js Stream
```
To learn more about his utility, visit https://pull-stream.github.io/#pull-stream-to-stream
## API
[![](https://raw.githubusercontent.com/diasdavid/interface-transport/master/img/badge.png)](https://github.com/diasdavid/interface-transport)

View File

@ -2,6 +2,8 @@
const gulp = require('gulp')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const WS = require('./src')
let listener
@ -10,7 +12,7 @@ gulp.task('test:browser:before', (done) => {
const ws = new WS()
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
listener = ws.createListener((conn) => {
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, done)
})

View File

@ -22,7 +22,7 @@
],
"repository": {
"type": "git",
"url": "git+https://github.com/diasdavid/js-libp2p-websockets.git"
"url": "git+https://github.com/libp2p/js-libp2p-websockets.git"
},
"keywords": [
"IPFS"
@ -30,25 +30,25 @@
"author": "David Dias <daviddias@ipfs.io>",
"license": "MIT",
"bugs": {
"url": "https://github.com/diasdavid/js-libp2p-websockets/issues"
"url": "https://github.com/libp2p/js-libp2p-websockets/issues"
},
"homepage": "https://github.com/diasdavid/js-libp2p-websockets#readme",
"homepage": "https://github.com/libp2p/js-libp2p-websockets#readme",
"dependencies": {
"detect-node": "^2.0.3",
"interface-connection": "^0.1.8",
"interface-connection": "^0.2.1",
"lodash.contains": "^2.4.3",
"mafmt": "^2.1.0",
"run-parallel": "^1.1.6",
"simple-websocket": "^4.1.0",
"simple-websocket-server": "^0.1.4"
"mafmt": "^2.1.1",
"pull-ws": "^3.2.3"
},
"devDependencies": {
"aegir": "^6.0.0",
"multiaddr": "^2.0.2",
"aegir": "^6.0.1",
"chai": "^3.5.0",
"gulp": "^3.9.1",
"interface-transport": "^0.2.0",
"pre-commit": "^1.1.2"
"interface-transport": "^0.3.3",
"multiaddr": "^2.0.2",
"pre-commit": "^1.1.3",
"pull-goodbye": "0.0.1",
"pull-stream": "^3.4.3"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
@ -56,4 +56,4 @@
"Friedel Ziegelmayer <dignifiedquire@gmail.com>",
"greenkeeperio-bot <support@greenkeeper.io>"
]
}
}

View File

@ -1,147 +1,55 @@
'use strict'
const debug = require('debug')
const log = debug('libp2p:websockets')
const SW = require('simple-websocket')
const isNode = require('detect-node')
let SWS
if (isNode) {
SWS = require('simple-websocket-server')
} else {
SWS = {}
}
const connect = require('pull-ws/client')
const mafmt = require('mafmt')
const contains = require('lodash.contains')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('libp2p:websockets:dialer')
const CLOSE_TIMEOUT = 2000
// const IPFS_CODE = 421
const createListener = require('./listener')
exports = module.exports = WebSockets
function WebSockets () {
if (!(this instanceof WebSockets)) {
return new WebSockets()
}
this.dial = function (ma, options, callback) {
module.exports = class WebSockets {
dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}
if (!callback) {
callback = function noop () {}
callback = () => {}
}
const maOpts = ma.toOptions()
const socket = new SW('ws://' + maOpts.host + ':' + maOpts.port)
const url = `ws://${maOpts.host}:${maOpts.port}`
log('dialing %s', url)
const socket = connect(url, {
binary: true,
onConnect: callback
})
const conn = new Connection(socket)
socket.on('timeout', () => {
conn.emit('timeout')
})
socket.on('error', (err) => {
callback(err)
conn.emit('error', err)
})
socket.on('connect', () => {
callback(null, conn)
conn.emit('connect')
})
conn.getObservedAddrs = (cb) => {
return cb(null, [ma])
}
conn.getObservedAddrs = (cb) => cb(null, [ma])
conn.close = (cb) => socket.close(cb)
return conn
}
this.createListener = (options, handler) => {
createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}
const listener = SWS.createServer((socket) => {
const conn = new Connection(socket)
conn.getObservedAddrs = (cb) => {
// TODO research if we can reuse the address in anyway
return cb(null, [])
}
handler(conn)
})
let listeningMultiaddr
listener._listen = listener.listen
listener.listen = (ma, callback) => {
if (!callback) {
callback = function noop () {}
}
listeningMultiaddr = ma
if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
}
listener._listen(ma.toOptions(), callback)
}
listener._close = listener.close
listener.close = (options, callback) => {
if (typeof options === 'function') {
callback = options
options = { timeout: CLOSE_TIMEOUT }
}
if (!callback) { callback = function noop () {} }
if (!options) { options = { timeout: CLOSE_TIMEOUT } }
let closed = false
listener.once('close', () => {
closed = true
})
listener._close(callback)
setTimeout(() => {
if (closed) {
return
}
log('unable to close graciously, destroying conns')
Object.keys(listener.__connections).forEach((key) => {
log('destroying %s', key)
listener.__connections[key].destroy()
})
}, options.timeout || CLOSE_TIMEOUT)
}
// Keep track of open connections to destroy in case of timeout
listener.__connections = {}
listener.on('connection', (socket) => {
const key = (~~(Math.random() * 1e9)).toString(36) + Date.now()
listener.__connections[key] = socket
socket.on('close', () => {
delete listener.__connections[key]
})
})
listener.getAddrs = (callback) => {
callback(null, [listeningMultiaddr])
}
return listener
return createListener(options, handler)
}
this.filter = (multiaddrs) => {
filter (multiaddrs) {
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}
return multiaddrs.filter((ma) => {
if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')

46
src/listener.js Normal file
View File

@ -0,0 +1,46 @@
'use strict'
const isNode = require('detect-node')
const Connection = require('interface-connection').Connection
const contains = require('lodash.contains')
// const IPFS_CODE = 421
let createServer
if (isNode) {
createServer = require('pull-ws/server')
} else {
createServer = () => {}
}
module.exports = (options, handler) => {
const listener = createServer((socket) => {
socket.getObservedAddrs = (cb) => {
// TODO research if we can reuse the address in anyway
return cb(null, [])
}
handler(new Connection(socket))
})
let listeningMultiaddr
listener._listen = listener.listen
listener.listen = (ma, cb) => {
cb = cb || (() => {})
listeningMultiaddr = ma
if (contains(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
}
listener._listen(ma.toOptions(), cb)
}
listener.getAddrs = (cb) => {
cb(null, [listeningMultiaddr])
}
return listener
}

View File

@ -3,74 +3,67 @@
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const goodbye = require('pull-goodbye')
const WS = require('../src')
describe('libp2p-websockets', () => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
let ws
let conn
it('create', (done) => {
beforeEach((done) => {
ws = new WS()
expect(ws).to.exist
done()
conn = ws.dial(ma, done)
})
it('echo', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const conn = ws.dial(ma)
const message = 'Hello World!'
conn.write(message)
conn.on('data', (data) => {
expect(data.toString()).to.equal(message)
conn.end()
done()
const s = goodbye({
source: pull.values([message]),
sink: pull.collect((err, results) => {
expect(err).to.not.exist
expect(results).to.be.eql([message])
done()
})
})
pull(s, conn, s)
})
describe('stress', () => {
it('one big write', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const conn = ws.dial(mh)
const message = new Buffer(1000000).fill('a').toString('hex')
conn.write(message)
conn.on('data', (data) => {
expect(data.toString()).to.equal(message)
conn.end()
done()
const rawMessage = new Buffer(1000000).fill('a')
const s = goodbye({
source: pull.values([rawMessage]),
sink: pull.collect((err, results) => {
expect(err).to.not.exist
expect(results).to.be.eql([rawMessage])
done()
})
})
pull(s, conn, s)
})
it('many writes in 2 batches', (done) => {
const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ws')
const conn = ws.dial(mh)
let expected = ''
let counter = 0
while (++counter < 10000) {
conn.write(`${counter} `)
expected += `${counter} `
}
setTimeout(() => {
while (++counter < 20000) {
conn.write(`${counter} `)
expected += `${counter} `
}
conn.write('STOP')
}, 1000)
let result = ''
conn.on('data', (data) => {
if (data.toString() === 'STOP') {
conn.end()
return
}
result += data.toString()
it('many writes', (done) => {
const s = goodbye({
source: pull(
pull.infinite(),
pull.take(1000),
pull.map((val) => Buffer(val.toString()))
),
sink: pull.collect((err, result) => {
expect(err).to.not.exist
expect(result).to.have.length(1000)
done()
})
})
conn.on('end', () => {
expect(result).to.equal(expected)
done()
})
pull(s, conn, s)
})
})
})

23
test/compliance.node.js Normal file
View File

@ -0,0 +1,23 @@
/* eslint-env mocha */
'use strict'
const tests = require('interface-transport')
const multiaddr = require('multiaddr')
const Ws = require('../src')
describe('compliance', () => {
tests({
setup (cb) {
let ws = new Ws()
const addrs = [
multiaddr('/ip4/127.0.0.1/tcp/9091/ws'),
multiaddr('/ip4/127.0.0.1/tcp/9092/ws'),
multiaddr('/ip4/127.0.0.1/tcp/9093/ws')
]
cb(null, ws, addrs)
},
teardown (cb) {
cb()
}
})
})

View File

@ -3,19 +3,17 @@
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
const goodbye = require('pull-goodbye')
const WS = require('../src')
require('./compliance.node')
describe('instantiate the transport', () => {
it('create', (done) => {
it('create', () => {
const ws = new WS()
expect(ws).to.exist
done()
})
it('create without new', (done) => {
const ws = WS()
expect(ws).to.exist
done()
})
})
@ -122,7 +120,7 @@ describe('dial', () => {
beforeEach((done) => {
ws = new WS()
listener = ws.createListener((conn) => {
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, done)
})
@ -133,12 +131,18 @@ describe('dial', () => {
it('dial on IPv4', (done) => {
const conn = ws.dial(ma)
conn.write('hey')
conn.end()
conn.on('data', (chunk) => {
expect(chunk.toString()).to.equal('hey')
const s = goodbye({
source: pull.values(['hey']),
sink: pull.collect((err, result) => {
expect(err).to.not.exist
expect(result).to.be.eql(['hey'])
done()
})
})
conn.on('end', done)
pull(s, conn, s)
})
it.skip('dial on IPv6', (done) => {
@ -148,12 +152,18 @@ describe('dial', () => {
it('dial on IPv4 with IPFS Id', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const conn = ws.dial(ma)
conn.write('hey')
conn.end()
conn.on('data', (chunk) => {
expect(chunk.toString()).to.equal('hey')
const s = goodbye({
source: pull.values(['hey']),
sink: pull.collect((err, result) => {
expect(err).to.not.exist
expect(result).to.be.eql(['hey'])
done()
})
})
conn.on('end', done)
pull(s, conn, s)
})
})
@ -204,13 +214,17 @@ describe('valid Connection', () => {
dialerObsAddrs = addrs
})
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, () => {
const conn = ws.dial(ma)
conn.on('end', onEnd)
pull(
pull.empty(),
conn,
pull.onEnd(onEnd)
)
function onEnd () {
conn.getObservedAddrs((err, addrs) => {
@ -218,7 +232,6 @@ describe('valid Connection', () => {
listenerObsAddrs = addrs
listener.close(onClose)
function onClose () {
expect(listenerObsAddrs[0]).to.deep.equal(ma)
expect(dialerObsAddrs.length).to.equal(0)
@ -226,8 +239,6 @@ describe('valid Connection', () => {
}
})
}
conn.resume()
conn.end()
})
})
@ -241,13 +252,17 @@ describe('valid Connection', () => {
expect(err).to.exist
})
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, () => {
const conn = ws.dial(ma)
conn.on('end', onEnd)
pull(
pull.empty(),
conn,
pull.onEnd(onEnd)
)
function onEnd () {
conn.getPeerInfo((err, peerInfo) => {
@ -255,8 +270,6 @@ describe('valid Connection', () => {
listener.close(done)
})
}
conn.resume()
conn.end()
})
})
@ -272,7 +285,7 @@ describe('valid Connection', () => {
expect(peerInfo).to.equal('a')
})
conn.pipe(conn)
pull(conn, conn)
})
listener.listen(ma, onListen)
@ -280,15 +293,19 @@ describe('valid Connection', () => {
const conn = ws.dial(ma)
conn.setPeerInfo('b')
conn.on('end', () => {
pull(
pull.empty(),
conn,
pull.onEnd(onEnd)
)
function onEnd () {
conn.getPeerInfo((err, peerInfo) => {
expect(err).to.not.exist
expect(peerInfo).to.equal('b')
listener.close(done)
})
})
conn.resume()
conn.end()
}
}
})
})