feat: add upgrader support to transports (#53)

BREAKING CHANGE: Transports must now be passed and use an `Upgrader` instance. See the Readme for usage. Compliance test suites will now need to pass `options` from `common.setup(options)` to their Transport constructor.

* docs: update readme to include upgrader

* docs: update readme to include MultiaddrConnection ref

* feat: add upgrader spy to test suite

* test: validate returned value of spy
This commit is contained in:
Jacob Heun 2019-09-06 09:44:17 +02:00 committed by GitHub
parent 259fc58622
commit a5ad120b60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 101 additions and 20 deletions

View File

@ -53,8 +53,8 @@ const YourTransport = require('../src')
describe('compliance', () => { describe('compliance', () => {
tests({ tests({
setup () { setup (options) {
let transport = new YourTransport() let transport = new YourTransport(options)
const addrs = [ const addrs = [
multiaddr('valid-multiaddr-for-your-transport'), multiaddr('valid-multiaddr-for-your-transport'),
@ -84,10 +84,6 @@ describe('compliance', () => {
}) })
``` ```
## Go
> WIP
# API # API
A valid transport (one that follows the interface defined) must implement the following API: A valid transport (one that follows the interface defined) must implement the following API:
@ -95,7 +91,7 @@ A valid transport (one that follows the interface defined) must implement the fo
**Table of contents:** **Table of contents:**
- type: `Transport` - type: `Transport`
- `new Transport([options])` - `new Transport({ upgrader, ...[options] })`
- `<Promise> transport.dial(multiaddr, [options])` - `<Promise> transport.dial(multiaddr, [options])`
- `transport.createListener([options], handlerFunction)` - `transport.createListener([options], handlerFunction)`
- type: `transport.Listener` - type: `transport.Listener`
@ -107,17 +103,39 @@ A valid transport (one that follows the interface defined) must implement the fo
- `listener.getAddrs()` - `listener.getAddrs()`
- `<Promise> listener.close([options])` - `<Promise> listener.close([options])`
### Types
#### Upgrader
Upgraders have 2 methods: `upgradeOutbound` and `upgradeInbound`.
- `upgradeOutbound` must be called and returned by `transport.dial`.
- `upgradeInbound` must be called and the results must be passed to the `createListener` `handlerFunction` and the `connection` event handler, anytime a new connection is created.
```js
const connection = await upgrader.upgradeOutbound(multiaddrConnection)
const connection = await upgrader.upgradeInbound(multiaddrConnection)
```
The `Upgrader` methods take a [MultiaddrConnection](#multiaddrconnection) and will return an `interface-connection` instance.
#### MultiaddrConnection
- `MultiaddrConnection`
- `sink<function(source)>`: A [streaming iterable sink](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#sink-it)
- `source<AsyncIterator>`: A [streaming iterable source](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#source-it)
- `conn`: The raw connection of the transport, such as a TCP socket.
- `remoteAddr<Multiaddr>`: The remote `Multiaddr` of the connection.
### Creating a transport instance ### Creating a transport instance
- `JavaScript` - `const transport = new Transport([options])` - `JavaScript` - `const transport = new Transport({ upgrader, ...[options] })`
Creates a new Transport instance. `options` is an optional JavaScript object that should include the necessary parameters for the transport instance. Creates a new Transport instance. `options` is an JavaScript object that should include the necessary parameters for the transport instance. Options **MUST** include an `Upgrader` instance, as Transports will use this to return `interface-connection` instances from `transport.dial` and the listener `handlerFunction`.
**Note: Why is it important to instantiate a transport -** Some transports have state that can be shared between the dialing and listening parts. For example with libp2p-webrtc-star, in order to dial a peer, the peer must be part of some signaling network that is shared with the listener. **Note: Why is it important to instantiate a transport -** Some transports have state that can be shared between the dialing and listening parts. For example with libp2p-webrtc-star, in order to dial a peer, the peer must be part of some signaling network that is shared with the listener.
### Dial to another peer ### Dial to another peer
- `JavaScript` - `const conn = await transport.dial(multiaddr, [options])` - `JavaScript` - `const connection = await transport.dial(multiaddr, [options])`
This method uses a transport to dial a Peer listening on `multiaddr`. This method uses a transport to dial a Peer listening on `multiaddr`.
@ -125,7 +143,7 @@ This method uses a transport to dial a Peer listening on `multiaddr`.
`[options]` the options that may be passed to the dial. Must support the `signal` option (see below) `[options]` the options that may be passed to the dial. Must support the `signal` option (see below)
`conn` must implement the [interface-connection](https://github.com/libp2p/interface-connection) interface. Dial **MUST** call and return `upgrader.upgradeOutbound(multiaddrConnection)`. The upgrader will return an [interface-connection](https://github.com/libp2p/interface-connection) instance.
The dial may throw an `Error` instance if there was a problem connecting to the `multiaddr`. The dial may throw an `Error` instance if there was a problem connecting to the `multiaddr`.
@ -158,7 +176,7 @@ try {
- `JavaScript` - `const listener = transport.createListener([options], handlerFunction)` - `JavaScript` - `const listener = transport.createListener([options], handlerFunction)`
This method creates a listener on the transport. This method creates a listener on the transport. Implementations **MUST** call `upgrader.upgradeInbound(multiaddrConnection)` and pass its results to the `handlerFunction` and any emitted `connection` events.
`options` is an optional object that contains the properties the listener must have, in order to properly listen on a given transport/socket. `options` is an optional object that contains the properties the listener must have, in order to properly listen on a given transport/socket.

View File

@ -34,7 +34,7 @@
}, },
"homepage": "https://github.com/libp2p/interface-transport", "homepage": "https://github.com/libp2p/interface-transport",
"devDependencies": { "devDependencies": {
"aegir": "^18.2.2" "aegir": "^20.0.0"
}, },
"dependencies": { "dependencies": {
"abort-controller": "^3.0.0", "abort-controller": "^3.0.0",
@ -44,9 +44,10 @@
"interface-connection": "~0.3.3", "interface-connection": "~0.3.3",
"it-goodbye": "^2.0.0", "it-goodbye": "^2.0.0",
"it-pipe": "^1.0.0", "it-pipe": "^1.0.0",
"multiaddr": "^6.0.6", "multiaddr": "^7.0.0",
"pull-stream": "^3.6.9", "pull-stream": "^3.6.9",
"streaming-iterables": "^4.0.2" "sinon": "^7.4.2",
"streaming-iterables": "^4.1.0"
}, },
"contributors": [ "contributors": [
"Alan Shaw <alan.shaw@protocol.ai>", "Alan Shaw <alan.shaw@protocol.ai>",

View File

@ -11,8 +11,26 @@ const { collect } = require('streaming-iterables')
const pipe = require('it-pipe') const pipe = require('it-pipe')
const AbortController = require('abort-controller') const AbortController = require('abort-controller')
const AbortError = require('./errors').AbortError const AbortError = require('./errors').AbortError
const sinon = require('sinon')
module.exports = (common) => { module.exports = (common) => {
const upgrader = {
upgradeOutbound (multiaddrConnection) {
['sink', 'source', 'remoteAddr', 'conn'].forEach(prop => {
expect(multiaddrConnection).to.have.property(prop)
})
return { sink: multiaddrConnection.sink, source: multiaddrConnection.source }
},
upgradeInbound (multiaddrConnection) {
['sink', 'source', 'remoteAddr', 'conn'].forEach(prop => {
expect(multiaddrConnection).to.have.property(prop)
})
return { sink: multiaddrConnection.sink, source: multiaddrConnection.source }
}
}
describe('dial', () => { describe('dial', () => {
let addrs let addrs
let transport let transport
@ -20,7 +38,7 @@ module.exports = (common) => {
let listener let listener
before(async () => { before(async () => {
({ addrs, transport, connector } = await common.setup()) ({ addrs, transport, connector } = await common.setup({ upgrader }))
}) })
after(() => common.teardown && common.teardown()) after(() => common.teardown && common.teardown())
@ -30,23 +48,31 @@ module.exports = (common) => {
return listener.listen(addrs[0]) return listener.listen(addrs[0])
}) })
afterEach(() => listener.close()) afterEach(() => {
sinon.restore()
return listener.close()
})
it('simple', async () => { it('simple', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
const conn = await transport.dial(addrs[0]) const conn = await transport.dial(addrs[0])
const s = goodbye({ source: ['hey'], sink: collect }) const s = goodbye({ source: ['hey'], sink: collect })
const result = await pipe(s, conn, s) const result = await pipe(s, conn, s)
expect(upgradeSpy.callCount).to.equal(1)
expect(upgradeSpy.returned(conn)).to.equal(true)
expect(result.length).to.equal(1) expect(result.length).to.equal(1)
expect(result[0].toString()).to.equal('hey') expect(result[0].toString()).to.equal('hey')
}) })
it('to non existent listener', async () => { it('to non existent listener', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
try { try {
await transport.dial(addrs[1]) await transport.dial(addrs[1])
} catch (_) { } catch (_) {
expect(upgradeSpy.callCount).to.equal(0)
// Success: expected an error to be throw // Success: expected an error to be throw
return return
} }
@ -54,6 +80,7 @@ module.exports = (common) => {
}) })
it('abort before dialing throws AbortError', async () => { it('abort before dialing throws AbortError', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
const controller = new AbortController() const controller = new AbortController()
controller.abort() controller.abort()
const socket = transport.dial(addrs[0], { signal: controller.signal }) const socket = transport.dial(addrs[0], { signal: controller.signal })
@ -61,6 +88,7 @@ module.exports = (common) => {
try { try {
await socket await socket
} catch (err) { } catch (err) {
expect(upgradeSpy.callCount).to.equal(0)
expect(err.code).to.eql(AbortError.code) expect(err.code).to.eql(AbortError.code)
expect(err.type).to.eql(AbortError.type) expect(err.type).to.eql(AbortError.type)
return return
@ -69,6 +97,7 @@ module.exports = (common) => {
}) })
it('abort while dialing throws AbortError', async () => { it('abort while dialing throws AbortError', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
// Add a delay to connect() so that we can abort while the dial is in // Add a delay to connect() so that we can abort while the dial is in
// progress // progress
connector.delay(100) connector.delay(100)
@ -80,6 +109,7 @@ module.exports = (common) => {
try { try {
await socket await socket
} catch (err) { } catch (err) {
expect(upgradeSpy.callCount).to.equal(0)
expect(err.code).to.eql(AbortError.code) expect(err.code).to.eql(AbortError.code)
expect(err.type).to.eql(AbortError.type) expect(err.type).to.eql(AbortError.type)
return return

View File

@ -6,20 +6,42 @@ const chai = require('chai')
const dirtyChai = require('dirty-chai') const dirtyChai = require('dirty-chai')
const expect = chai.expect const expect = chai.expect
chai.use(dirtyChai) chai.use(dirtyChai)
const sinon = require('sinon')
const pipe = require('it-pipe') const pipe = require('it-pipe')
module.exports = (common) => { module.exports = (common) => {
const upgrader = {
upgradeOutbound (multiaddrConnection) {
['sink', 'source', 'remoteAddr', 'conn'].forEach(prop => {
expect(multiaddrConnection).to.have.property(prop)
})
return { sink: multiaddrConnection.sink, source: multiaddrConnection.source }
},
upgradeInbound (multiaddrConnection) {
['sink', 'source', 'remoteAddr', 'conn'].forEach(prop => {
expect(multiaddrConnection).to.have.property(prop)
})
return { sink: multiaddrConnection.sink, source: multiaddrConnection.source }
}
}
describe('listen', () => { describe('listen', () => {
let addrs let addrs
let transport let transport
before(async () => { before(async () => {
({ transport, addrs } = await common.setup()) ({ transport, addrs } = await common.setup({ upgrader }))
}) })
after(() => common.teardown && common.teardown()) after(() => common.teardown && common.teardown())
afterEach(() => {
sinon.restore()
})
it('simple', async () => { it('simple', async () => {
const listener = transport.createListener((conn) => {}) const listener = transport.createListener((conn) => {})
await listener.listen(addrs[0]) await listener.listen(addrs[0])
@ -27,12 +49,16 @@ module.exports = (common) => {
}) })
it('close listener with connections, through timeout', async () => { it('close listener with connections, through timeout', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeInbound')
let finish let finish
let done = new Promise((resolve) => { const done = new Promise((resolve) => {
finish = resolve finish = resolve
}) })
const listener = transport.createListener((conn) => pipe(conn, conn)) const listener = transport.createListener((conn) => {
expect(upgradeSpy.returned(conn)).to.equal(true)
pipe(conn, conn)
})
// Listen // Listen
await listener.listen(addrs[0]) await listener.listen(addrs[0])
@ -53,13 +79,19 @@ module.exports = (common) => {
// Pipe should have completed // Pipe should have completed
await done await done
// 2 dials = 2 connections upgraded
expect(upgradeSpy.callCount).to.equal(2)
}) })
describe('events', () => { describe('events', () => {
it('connection', (done) => { it('connection', (done) => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeInbound')
const listener = transport.createListener() const listener = transport.createListener()
listener.on('connection', async (conn) => { listener.on('connection', async (conn) => {
expect(upgradeSpy.returned(conn)).to.equal(true)
expect(upgradeSpy.callCount).to.equal(1)
expect(conn).to.exist() expect(conn).to.exist()
await listener.close() await listener.close()
done() done()