feat: callbacks -> async / await (#44)

* feat: callbacks -> async / await

BREAKING CHANGE: All places in the API that used callbacks are now replaced with async/await

* test: add tests for canceling dials

* feat: Adapter class
This commit is contained in:
dirkmc 2019-04-18 01:35:06 +08:00 committed by Jacob Heun
parent 6e99899b3d
commit b30ee5f6af
7 changed files with 291 additions and 156 deletions

111
README.md
View File

@ -10,9 +10,9 @@ The primary goal of this module is to enable developers to pick and swap their t
Publishing a test suite as a module lets multiple modules all ensure compatibility since they use the same test suite. Publishing a test suite as a module lets multiple modules all ensure compatibility since they use the same test suite.
The purpose of this interface is not to reinvent any wheels when it comes to dialing and listening to transports. Instead, it tries to uniform several transports through a shimmed interface. The purpose of this interface is not to reinvent any wheels when it comes to dialing and listening to transports. Instead, it tries to provide a uniform API for several transports through a shimmed interface.
The API is presented with both Node.js and Go primitives, however, there are no actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks. The API is presented with both Node.js and Go primitives, however there are no actual limitations for it to be extended for any other language, pushing forward the cross compatibility and interop through diferent stacks.
## Lead Maintainer ## Lead Maintainer
@ -48,16 +48,32 @@ const YourTransport = require('../src')
describe('compliance', () => { describe('compliance', () => {
tests({ tests({
setup (cb) { setup () {
let t = new YourTransport() let transport = new YourTransport()
const addrs = [ const addrs = [
multiaddr('valid-multiaddr-for-your-transport'), multiaddr('valid-multiaddr-for-your-transport'),
multiaddr('valid-multiaddr2-for-your-transport') multiaddr('valid-multiaddr2-for-your-transport')
] ]
cb(null, t, addrs)
const network = require('my-network-lib')
const connect = network.connect
const connector = {
delay (delayMs) {
// Add a delay in the connection mechanism for the transport
// (this is used by the dial tests)
network.connect = (...args) => setTimeout(() => connect(...args), 100)
},
restore () {
// Restore the connection mechanism to normal
network.connect = connect
}
}
return { transport, addrs, connector }
}, },
teardown (cb) { teardown () {
cb() // Clean up any resources created by setup()
} }
}) })
}) })
@ -69,50 +85,73 @@ describe('compliance', () => {
# API # API
A valid (read: that follows the interface defined) transport, must implement the following API. A valid transport (one that follows the interface defined) must implement the following API:
**Table of contents:** **Table of contents:**
- type: `Transport` - type: `Transport`
- `new Transport([options])` - `new Transport([options])`
- `transport.dial(multiaddr, [options, callback])` - `<Promise> transport.dial(multiaddr, [options])`
- `transport.createListener([options], handlerFunction)` - `transport.createListener([options], handlerFunction)`
- type: `transport.Listener` - type: `transport.Listener`
- event: 'listening' - event: 'listening'
- event: 'close' - event: 'close'
- event: 'connection' - event: 'connection'
- event: 'error' - event: 'error'
- `listener.listen(multiaddr, [callback])` - `<Promise> listener.listen(multiaddr)`
- `listener.getAddrs(callback)` - `listener.getAddrs()`
- `listener.close([options])` - `<Promise> listener.close([options])`
### Creating a transport instance ### Creating a transport instance
- `JavaScript` - `var transport = new Transport([options])` - `JavaScript` - `var transport = new Transport([options])`
Creates a new Transport instance. `options` is a optional JavaScript object, might include the necessary parameters for the transport instance. Creates a new Transport instance. `options` is an optional JavaScript object that should include the necessary parameters for the transport instance.
**Note: Why is it important to instantiate a transport -** Some transports have state that can be shared between the dialing and listening parts. One example is a libp2p-webrtc-star (or pretty much any other WebRTC flavour transport), where that, in order to dial, a peer needs to be part of some signalling network that is shared also with the listener. **Note: Why is it important to instantiate a transport -** Some transports have state that can be shared between the dialing and listening parts. For example with libp2p-webrtc-star, in order to dial a peer, the peer must be part of some signaling network that is shared with the listener.
### Dial to another peer ### Dial to another peer
- `JavaScript` - `var conn = transport.dial(multiaddr, [options, callback])` - `JavaScript` - `const conn = await transport.dial(multiaddr, [options])`
This method dials a transport to the Peer listening on `multiaddr`. This method uses a transport to dial a Peer listening on `multiaddr`.
`multiaddr` must be of the type [`multiaddr`](https://www.npmjs.com/multiaddr). `multiaddr` must be of the type [`multiaddr`](https://www.npmjs.com/multiaddr).
`stream` must implements the [interface-connection](https://github.com/libp2p/interface-connection) interface. `[options]` the options that may be passed to the dial. Must support the `signal` option (see below)
`[options]` is an optional argument, which can be used by some implementations `conn` must implement the [interface-connection](https://github.com/libp2p/interface-connection) interface.
`callback` should follow the `function (err)` signature. The dial may throw an `Error` instance if there was a problem connecting to the `multiaddr`.
`err` is an `Error` instance to signal that the dial was unsuccessful, this error can be a 'timeout' or simply 'error'. ### Canceling a dial
Dials may be cancelled using an `AbortController`:
```Javascript
const AbortController = require('abort-controller')
const { AbortError } = require('interface-transport')
const controller = new AbortController()
try {
const conn = await mytransport.dial(ma, { signal: controller.signal })
// Do stuff with conn here ...
} catch (err) {
if(err.code === AbortError.code) {
// Dial was aborted, just bail out
return
}
throw err
}
// ----
// In some other part of the code:
controller.abort()
// ----
```
### Create a listener ### Create a listener
- `JavaScript` - `var listener = transport.createListener([options], handlerFunction)` - `JavaScript` - `const listener = transport.createListener([options], handlerFunction)`
This method creates a listener on the transport. This method creates a listener on the transport.
@ -120,37 +159,33 @@ This method creates a listener on the transport.
`handlerFunction` is a function called each time a new connection is received. It must follow the following signature: `function (conn) {}`, where `conn` is a connection that follows the [`interface-connection`](https://github.com/diasdavid/interface-connection). `handlerFunction` is a function called each time a new connection is received. It must follow the following signature: `function (conn) {}`, where `conn` is a connection that follows the [`interface-connection`](https://github.com/diasdavid/interface-connection).
The listener object created, can emit the following events: The listener object created may emit the following events:
- `listening` - - `listening` - when the listener is ready for incoming connections
- `close` - - `close` - when the listener is closed
- `connection` - - `connection` - (`conn`) each time an incoming connection is received
- `error` - - `error` - (`err`) each time there is an error on the connection
### Start a listener ### Start a listener
- `JavaScript` - `listener.listen(multiaddr, [callback])` - `JavaScript` - `await listener.listen(multiaddr)`
This method puts the listener in `listening` mode, waiting for incoming connections. This method puts the listener in `listening` mode, waiting for incoming connections.
`multiaddr` is the address where the listener should bind to. `multiaddr` is the address that the listener should bind to.
`callback` is a function called once the listener is ready.
### Get listener addrs ### Get listener addrs
- `JavaScript` - `listener.getAddrs(callback)` - `JavaScript` - `listener.getAddrs()`
This method retrieves the addresses in which this listener is listening. Useful for when listening on port 0 or any interface (0.0.0.0). This method returns the addresses on which this listener is listening. Useful when listening on port 0 or any interface (0.0.0.0).
### Stop a listener ### Stop a listener
- `JavaScript` - `listener.close([options, callback])` - `JavaScript` - `await listener.close([options])`
This method closes the listener so that no more connections can be open on this transport instance. This method closes the listener so that no more connections can be opened on this transport instance.
`options` is an optional object that might contain the following properties: `options` is an optional object that may contain the following properties:
- `timeout` - A timeout value (in ms) that fires and destroys all the connections on this transport if the transport is not able to close graciously. (e.g { timeout: 1000 }) - `timeout` - A timeout value (in ms) after which all connections on this transport will be destroyed if the transport is not able to close gracefully. (e.g { timeout: 1000 })
`callback` is function that gets called when the listener is closed. It is optional.

View File

@ -38,11 +38,15 @@
"dirty-chai": "^2.0.1" "dirty-chai": "^2.0.1"
}, },
"dependencies": { "dependencies": {
"abort-controller": "^3.0.0",
"async-iterator-to-pull-stream": "^1.3.0",
"chai": "^4.2.0", "chai": "^4.2.0",
"interface-connection": "~0.3.3",
"it-goodbye": "^2.0.0",
"it-pipe": "^1.0.0",
"multiaddr": "^5.0.2", "multiaddr": "^5.0.2",
"pull-goodbye": "~0.0.2", "pull-stream": "^3.6.9",
"pull-serializer": "~0.3.2", "streaming-iterables": "^4.0.2"
"pull-stream": "^3.6.9"
}, },
"contributors": [ "contributors": [
"David Dias <daviddias.p@gmail.com>", "David Dias <daviddias.p@gmail.com>",

80
src/adapter.js Normal file
View File

@ -0,0 +1,80 @@
'use strict'
const { Connection } = require('interface-connection')
const toPull = require('async-iterator-to-pull-stream')
const error = require('pull-stream/sources/error')
const drain = require('pull-stream/sinks/drain')
const noop = () => {}
function callbackify (fn) {
return async function (...args) {
let cb = args.pop()
if (typeof cb !== 'function') {
args.push(cb)
cb = noop
}
let res
try {
res = await fn(...args)
} catch (err) {
return cb(err)
}
cb(null, res)
}
}
// Legacy adapter to old transport & connection interface
class Adapter {
constructor (transport) {
this.transport = transport
}
dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}
callback = callback || noop
const conn = new Connection()
this.transport.dial(ma, options)
.then(socket => {
conn.setInnerConn(toPull.duplex(socket))
conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket))
conn.close = callbackify(socket.close.bind(socket))
callback(null, conn)
})
.catch(err => {
conn.setInnerConn({ sink: drain(), source: error(err) })
callback(err)
})
return conn
}
createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}
const server = this.transport.createListener(options, socket => {
const conn = new Connection(toPull.duplex(socket))
conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket))
handler(conn)
})
const proxy = {
listen: callbackify(server.listen.bind(server)),
close: callbackify(server.close.bind(server)),
getAddrs: callbackify(server.getAddrs.bind(server)),
getObservedAddrs: callbackify(() => server.getObservedAddrs())
}
return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] })
}
}
module.exports = Adapter

View File

@ -5,69 +5,86 @@ const chai = require('chai')
const dirtyChai = require('dirty-chai') const dirtyChai = require('dirty-chai')
const expect = chai.expect const expect = chai.expect
chai.use(dirtyChai) chai.use(dirtyChai)
const pull = require('pull-stream')
const goodbye = require('pull-goodbye') const goodbye = require('it-goodbye')
const serializer = require('pull-serializer') const { collect } = require('streaming-iterables')
const pipe = require('it-pipe')
const AbortController = require('abort-controller')
const AbortError = require('./errors').AbortError
module.exports = (common) => { module.exports = (common) => {
describe('dial', () => { describe('dial', () => {
let addrs let addrs
let transport let transport
let connector
let listener let listener
before((done) => { before(async () => {
common.setup((err, _transport, _addrs) => { ({ addrs, transport, connector } = await common.setup())
if (err) return done(err)
transport = _transport
addrs = _addrs
done()
})
}) })
after((done) => { after(() => common.teardown && common.teardown())
common.teardown(done)
beforeEach(() => {
listener = transport.createListener((conn) => pipe(conn, conn))
return listener.listen(addrs[0])
}) })
beforeEach((done) => { afterEach(() => listener.close())
listener = transport.createListener((conn) => {
pull(conn, conn) it('simple', async () => {
}) const conn = await transport.dial(addrs[0])
listener.listen(addrs[0], done)
const s = goodbye({ source: ['hey'], sink: collect })
const result = await pipe(s, conn, s)
expect(result.length).to.equal(1)
expect(result[0].toString()).to.equal('hey')
}) })
afterEach((done) => { it('to non existent listener', async () => {
listener.close(done) try {
await transport.dial(addrs[1])
} catch (_) {
// Success: expected an error to be throw
return
}
expect.fail('Did not throw error attempting to connect to non-existent listener')
}) })
it('simple', (done) => { it('cancel before dialing', async () => {
const s = serializer(goodbye({ const controller = new AbortController()
source: pull.values(['hey']), controller.abort()
sink: pull.collect((err, values) => { const socket = transport.dial(addrs[0], { signal: controller.signal })
expect(err).to.not.exist()
expect(
values
).to.be.eql(
['hey']
)
done()
})
}))
pull( try {
s, await socket
transport.dial(addrs[0]), } catch (err) {
s expect(err.code).to.eql(AbortError.code)
) return
}
expect.fail('Did not throw error with code ' + AbortError.code)
}) })
it('to non existent listener', (done) => { it('cancel while dialing', async () => {
pull( // Add a delay to connect() so that we can cancel while the dial is in
transport.dial(addrs[1]), // progress
pull.onEnd((err) => { connector.delay(100)
expect(err).to.exist()
done() const controller = new AbortController()
}) const socket = transport.dial(addrs[0], { signal: controller.signal })
) setTimeout(() => controller.abort(), 50)
try {
await socket
} catch (err) {
expect(err.code).to.eql(AbortError.code)
return
} finally {
connector.restore()
}
expect.fail('Did not throw error with code ' + AbortError.code)
}) })
}) })
} }

16
src/errors.js Normal file
View File

@ -0,0 +1,16 @@
'use strict'
class AbortError extends Error {
constructor () {
super('AbortError')
this.code = AbortError.code
}
static get code () {
return 'ABORT_ERR'
}
}
module.exports = {
AbortError
}

View File

@ -10,3 +10,6 @@ module.exports = (common) => {
listen(common) listen(common)
}) })
} }
module.exports.AbortError = require('./errors').AbortError
module.exports.Adapter = require('./adapter')

View File

@ -7,118 +7,98 @@ const dirtyChai = require('dirty-chai')
const expect = chai.expect const expect = chai.expect
chai.use(dirtyChai) chai.use(dirtyChai)
const pull = require('pull-stream') const pipe = require('it-pipe')
module.exports = (common) => { module.exports = (common) => {
describe('listen', () => { describe('listen', () => {
let addrs let addrs
let transport let transport
before((done) => { before(async () => {
common.setup((err, _transport, _addrs) => { ({ transport, addrs } = await common.setup())
if (err) return done(err)
transport = _transport
addrs = _addrs
done()
})
}) })
after((done) => { after(() => common.teardown && common.teardown())
common.teardown(done)
})
it('simple', (done) => { it('simple', async () => {
const listener = transport.createListener((conn) => {}) const listener = transport.createListener((conn) => {})
listener.listen(addrs[0], () => { await listener.listen(addrs[0])
listener.close(done) await listener.close()
})
}) })
it('close listener with connections, through timeout', (done) => { it('close listener with connections, through timeout', async () => {
const finish = plan(3, done) let finish
const listener = transport.createListener((conn) => { let done = new Promise((resolve) => {
pull(conn, conn) finish = resolve
}) })
listener.listen(addrs[0], () => { const listener = transport.createListener((conn) => pipe(conn, conn))
const socket1 = transport.dial(addrs[0], () => {
listener.close(finish)
})
pull( // Listen
transport.dial(addrs[0]), await listener.listen(addrs[0])
pull.onEnd(() => {
finish()
})
)
pull( // Create two connections to the listener
pull.values([Buffer.from('Some data that is never handled')]), const socket1 = await transport.dial(addrs[0])
socket1, await transport.dial(addrs[0])
pull.onEnd(() => {
finish() pipe(
}) [Buffer.from('Some data that is never handled')],
) socket1
).then(() => {
finish()
}) })
// Closer the listener (will take a couple of seconds to time out)
await listener.close()
// Pipe should have completed
await done
}) })
describe('events', () => { describe('events', () => {
// eslint-disable-next-line it('connection', (done) => {
// TODO: figure out why it fails in the full test suite
it.skip('connection', (done) => {
const finish = plan(2, done)
const listener = transport.createListener() const listener = transport.createListener()
listener.on('connection', (conn) => { listener.on('connection', async (conn) => {
expect(conn).to.exist() expect(conn).to.exist()
finish() await listener.close()
done()
}) })
listener.listen(addrs[0], () => { ;(async () => {
transport.dial(addrs[0], () => { await listener.listen(addrs[0])
listener.close(finish) await transport.dial(addrs[0])
}) })()
})
}) })
it('listening', (done) => { it('listening', (done) => {
const listener = transport.createListener() const listener = transport.createListener()
listener.on('listening', () => { listener.on('listening', async () => {
listener.close(done) await listener.close()
done()
}) })
listener.listen(addrs[0]) listener.listen(addrs[0])
}) })
// eslint-disable-next-line it('error', (done) => {
// TODO: how to get the listener to emit an error?
it.skip('error', (done) => {
const listener = transport.createListener() const listener = transport.createListener()
listener.on('error', (err) => { listener.on('error', async (err) => {
expect(err).to.exist() expect(err).to.exist()
listener.close(done) await listener.close()
done()
}) })
listener.emit('error', new Error('my err'))
}) })
it('close', (done) => { it('close', (done) => {
const finish = plan(2, done)
const listener = transport.createListener() const listener = transport.createListener()
listener.on('close', finish) listener.on('close', done)
listener.listen(addrs[0], () => { ;(async () => {
listener.close(finish) await listener.listen(addrs[0])
}) await listener.close()
})()
}) })
}) })
}) })
} }
function plan (n, done) {
let i = 0
return (err) => {
if (err) return done(err)
i++
if (i === n) done()
}
}