refactor: wip switch to it-ws and async iterators

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
This commit is contained in:
Alan Shaw 2019-03-29 07:53:08 +00:00
parent 18885e9cd3
commit 980f750d14
No known key found for this signature in database
GPG Key ID: AFC4442246B75B6F
4 changed files with 73 additions and 131 deletions

View File

@ -43,9 +43,9 @@
"class-is": "^1.1.0", "class-is": "^1.1.0",
"debug": "^4.1.1", "debug": "^4.1.1",
"interface-connection": "~0.3.2", "interface-connection": "~0.3.2",
"it-ws": "^1.0.0",
"mafmt": "^6.0.4", "mafmt": "^6.0.4",
"multiaddr-to-uri": "^4.0.1", "multiaddr-to-uri": "^4.0.1"
"pull-ws": "hugomrdias/pull-ws#fix/bundle-size"
}, },
"devDependencies": { "devDependencies": {
"aegir": "^18.0.3", "aegir": "^18.0.3",

View File

@ -1,47 +1,24 @@
'use strict' 'use strict'
const connect = require('pull-ws/client') const connect = require('it-ws/client')
const mafmt = require('mafmt') const mafmt = require('mafmt')
const withIs = require('class-is') const withIs = require('class-is')
const Connection = require('interface-connection').Connection
const toUri = require('multiaddr-to-uri') const toUri = require('multiaddr-to-uri')
const debug = require('debug') const log = require('debug')('libp2p:websockets:transport')
const log = debug('libp2p:websockets:dialer')
const createListener = require('./listener') const createListener = require('./listener')
class WebSockets { class WebSockets {
dial (ma, options, callback) { async dial (ma, options) {
if (typeof options === 'function') { log('dialing %s', ma)
callback = options
options = {}
}
callback = callback || function () { }
const url = toUri(ma) const url = toUri(ma)
log('dialing %s', url) const socket = connect(url, { binary: true })
const socket = connect(url, { socket.getObservedAddrs = () => [ma]
binary: true, log('connected %s', ma)
onConnect: (err) => { return socket
callback(err)
}
})
const conn = new Connection(socket)
conn.getObservedAddrs = (cb) => cb(null, [ma])
conn.close = (cb) => socket.close(cb)
return conn
} }
createListener (options, handler) { createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}
return createListener(options, handler) return createListener(options, handler)
} }

View File

@ -1,43 +1,35 @@
'use strict' 'use strict'
const Connection = require('interface-connection').Connection
const multiaddr = require('multiaddr') const multiaddr = require('multiaddr')
const os = require('os') const os = require('os')
function noop () {} const createServer = require('it-ws/server')
const createServer = require('pull-ws/server') || noop
module.exports = (options, handler) => { module.exports = (options, handler) => {
const listener = createServer(options, (socket) => { const server = createServer(options, socket => {
socket.getObservedAddrs = (callback) => { socket.getObservedAddrs = () => []
// TODO research if we can reuse the address in anyway handler(socket)
return callback(null, [])
}
handler(new Connection(socket))
}) })
let listeningMultiaddr let listeningMultiaddr
listener._listen = listener.listen const listen = server.listen
listener.listen = (ma, callback) => { server.listen = ma => {
callback = callback || noop
listeningMultiaddr = ma listeningMultiaddr = ma
if (ma.protoNames().includes('ipfs')) { if (ma.protoNames().includes('ipfs')) {
ma = ma.decapsulate('ipfs') ma = ma.decapsulate('ipfs')
} }
listener._listen(ma.toOptions(), callback) return listen(ma.toOptions())
} }
listener.getAddrs = (callback) => { server.getAddrs = async () => {
const multiaddrs = [] const multiaddrs = []
const address = listener.address() const address = server.address()
if (!address) { if (!address) {
return callback(new Error('Listener is not ready yet')) throw new Error('Listener is not ready yet')
} }
let ipfsId = listeningMultiaddr.getPeerId() let ipfsId = listeningMultiaddr.getPeerId()
@ -65,8 +57,8 @@ module.exports = (options, handler) => {
} }
} }
callback(null, multiaddrs) return multiaddrs
} }
return listener return server
} }

View File

@ -30,19 +30,18 @@ describe('listen', () => {
ws = new WS() ws = new WS()
}) })
it('listen, check for callback', (done) => { it('listen, check for promise', async () => {
const listener = ws.createListener((conn) => { }) const listener = ws.createListener((conn) => { })
await listener.listen(ma)
listener.listen(ma, () => { await listener.close()
listener.close(done)
})
}) })
it('listen, check for listening event', (done) => { it('listen, check for listening event', (done) => {
const listener = ws.createListener((conn) => { }) const listener = ws.createListener((conn) => { })
listener.on('listening', () => { listener.on('listening', async () => {
listener.close(done) await listener.close()
done()
}) })
listener.listen(ma) listener.listen(ma)
@ -59,14 +58,12 @@ describe('listen', () => {
listener.listen(ma) listener.listen(ma)
}) })
it('listen on addr with /ipfs/QmHASH', (done) => { it('listen on addr with /ipfs/QmHASH', async () => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const listener = ws.createListener((conn) => { }) const listener = ws.createListener((conn) => { })
listener.listen(ma, () => { await listener.listen(ma)
listener.close(done) await listener.close()
})
}) })
it.skip('close listener with connections, through timeout', (done) => { it.skip('close listener with connections, through timeout', (done) => {
@ -82,73 +79,53 @@ describe('listen', () => {
// TODO 0.0.0.0 not supported yet // TODO 0.0.0.0 not supported yet
}) })
it('getAddrs', (done) => { it('getAddrs', async () => {
const listener = ws.createListener((conn) => { const listener = ws.createListener((conn) => { })
}) await listener.listen(ma)
listener.listen(ma, () => { const addrs = await listener.getAddrs()
listener.getAddrs((err, addrs) => {
expect(err).to.not.exist()
expect(addrs.length).to.equal(1) expect(addrs.length).to.equal(1)
expect(addrs[0]).to.deep.equal(ma) expect(addrs[0]).to.deep.equal(ma)
listener.close(done) await listener.close()
})
})
}) })
it('getAddrs on port 0 listen', (done) => { it('getAddrs on port 0 listen', async () => {
const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`) const addr = multiaddr(`/ip4/127.0.0.1/tcp/0/ws`)
const listener = ws.createListener((conn) => { const listener = ws.createListener((conn) => { })
}) await listener.listen(addr)
listener.listen(addr, () => { const addrs = await listener.getAddrs()
listener.getAddrs((err, addrs) => {
expect(err).to.not.exist()
expect(addrs.length).to.equal(1) expect(addrs.length).to.equal(1)
expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') expect(addrs.map((a) => a.toOptions().port)).to.not.include('0')
listener.close(done) await listener.close()
})
})
}) })
it('getAddrs from listening on 0.0.0.0', (done) => { it('getAddrs from listening on 0.0.0.0', async () => {
const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`) const addr = multiaddr(`/ip4/0.0.0.0/tcp/9003/ws`)
const listener = ws.createListener((conn) => { const listener = ws.createListener((conn) => { })
}) await listener.listen(addr)
listener.listen(addr, () => { const addrs = await listener.getAddrs()
listener.getAddrs((err, addrs) => {
expect(err).to.not.exist()
expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0')
listener.close(done) await listener.close()
})
})
}) })
it('getAddrs from listening on 0.0.0.0 and port 0', (done) => { it('getAddrs from listening on 0.0.0.0 and port 0', async () => {
const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`) const addr = multiaddr(`/ip4/0.0.0.0/tcp/0/ws`)
const listener = ws.createListener((conn) => { const listener = ws.createListener((conn) => { })
}) await listener.listen(addr)
listener.listen(addr, () => { const addrs = await listener.getAddrs()
listener.getAddrs((err, addrs) => {
expect(err).to.not.exist()
expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0') expect(addrs.map((a) => a.toOptions().host)).to.not.include('0.0.0.0')
expect(addrs.map((a) => a.toOptions().port)).to.not.include('0') expect(addrs.map((a) => a.toOptions().port)).to.not.include('0')
listener.close(done) await listener.close()
})
})
}) })
it('getAddrs preserves IPFS Id', (done) => { it('getAddrs preserves IPFS Id', async () => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const ma = multiaddr('/ip4/127.0.0.1/tcp/9090/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const listener = ws.createListener((conn) => { }) const listener = ws.createListener((conn) => { })
listener.listen(ma, () => { await listener.listen(ma)
listener.getAddrs((err, addrs) => { const addrs = await listener.getAddrs()
expect(err).to.not.exist()
expect(addrs.length).to.equal(1) expect(addrs.length).to.equal(1)
expect(addrs[0]).to.deep.equal(ma) expect(addrs[0]).to.deep.equal(ma)
listener.close(done) await listener.close()
})
})
}) })
}) })
@ -160,19 +137,18 @@ describe('listen', () => {
ws = new WS() ws = new WS()
}) })
it('listen, check for callback', (done) => { it('listen, check for promise', async () => {
const listener = ws.createListener((conn) => { }) const listener = ws.createListener((conn) => { })
await listener.listen(ma)
listener.listen(ma, () => { await listener.close()
listener.close(done)
})
}) })
it('listen, check for listening event', (done) => { it('listen, check for listening event', (done) => {
const listener = ws.createListener((conn) => { }) const listener = ws.createListener((conn) => { })
listener.on('listening', () => { listener.on('listening', async () => {
listener.close(done) await listener.close()
done()
}) })
listener.listen(ma) listener.listen(ma)
@ -189,14 +165,11 @@ describe('listen', () => {
listener.listen(ma) listener.listen(ma)
}) })
it('listen on addr with /ipfs/QmHASH', (done) => { it('listen on addr with /ipfs/QmHASH', async () => {
const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const ma = multiaddr('/ip6/::1/tcp/9091/ws/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw')
const listener = ws.createListener((conn) => { }) const listener = ws.createListener((conn) => { })
await listener.listen(ma)
listener.listen(ma, () => { await listener.close()
listener.close(done)
})
}) })
}) })
}) })