Merge branch 'master' of https://github.com/libp2p/interface-transport into remote/connection

This commit is contained in:
Jacob Heun 2019-10-18 13:40:37 +02:00
commit f399a680a6
No known key found for this signature in database
GPG Key ID: CA5A94C15809879F
18 changed files with 1075 additions and 0 deletions

8
.gitignore vendored Normal file
View File

@ -0,0 +1,8 @@
**/node_modules/
**/*.log
package-lock.json
# Coverage directory used by tools like istanbul
coverage
docs
dist

34
.npmignore Normal file
View File

@ -0,0 +1,34 @@
**/node_modules/
**/*.log
test/repo-tests*
# Logs
logs
*.log
coverage
# Runtime data
pids
*.pid
*.seed
# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov
# Coverage directory used by tools like istanbul
coverage
# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt
# node-waf configuration
.lock-wscript
build
# Dependency directory
# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git
node_modules
test

10
.travis.yml Normal file
View File

@ -0,0 +1,10 @@
sudo: false
language: node_js
node_js:
- "10"
before_install:
- npm install -g npm
script:
- npm run lint

171
CHANGELOG.md Normal file
View File

@ -0,0 +1,171 @@
<a name="0.7.0"></a>
# [0.7.0](https://github.com/libp2p/interface-transport/compare/v0.6.1...v0.7.0) (2019-09-19)
### Features
* timeline and close checking ([#55](https://github.com/libp2p/interface-transport/issues/55)) ([993ca1c](https://github.com/libp2p/interface-transport/commit/993ca1c))
<a name="0.6.1"></a>
## [0.6.1](https://github.com/libp2p/interface-transport/compare/v0.6.0...v0.6.1) (2019-09-16)
### Bug Fixes
* **test:** close with timeout ([#54](https://github.com/libp2p/interface-transport/issues/54)) ([583f02d](https://github.com/libp2p/interface-transport/commit/583f02d))
<a name="0.6.0"></a>
# [0.6.0](https://github.com/libp2p/interface-transport/compare/v0.5.2...v0.6.0) (2019-09-06)
### Features
* add upgrader support to transports ([#53](https://github.com/libp2p/interface-transport/issues/53)) ([a5ad120](https://github.com/libp2p/interface-transport/commit/a5ad120))
### BREAKING CHANGES
* Transports must now be passed and use an `Upgrader` instance. See the Readme for usage. Compliance test suites will now need to pass `options` from `common.setup(options)` to their Transport constructor.
* docs: update readme to include upgrader
* docs: update readme to include MultiaddrConnection ref
* feat: add upgrader spy to test suite
* test: validate returned value of spy
<a name="0.5.2"></a>
## [0.5.2](https://github.com/libp2p/interface-transport/compare/v0.5.1...v0.5.2) (2019-06-11)
<a name="0.5.1"></a>
## [0.5.1](https://github.com/libp2p/interface-transport/compare/v0.5.0...v0.5.1) (2019-05-01)
### Bug Fixes
* move dirty-chai to dependencies ([#52](https://github.com/libp2p/interface-transport/issues/52)) ([f9a7908](https://github.com/libp2p/interface-transport/commit/f9a7908))
<a name="0.5.0"></a>
# [0.5.0](https://github.com/libp2p/interface-transport/compare/v0.4.0...v0.5.0) (2019-04-29)
### Reverts
* "feat: make listen take an array of addrs ([#46](https://github.com/libp2p/interface-transport/issues/46))" ([#51](https://github.com/libp2p/interface-transport/issues/51)) ([030195e](https://github.com/libp2p/interface-transport/commit/030195e))
<a name="0.4.0"></a>
# [0.4.0](https://github.com/libp2p/interface-transport/compare/v0.3.7...v0.4.0) (2019-04-19)
### Features
* add type to AbortError ([#45](https://github.com/libp2p/interface-transport/issues/45)) ([4fd37bb](https://github.com/libp2p/interface-transport/commit/4fd37bb))
* callbacks -> async / await ([#44](https://github.com/libp2p/interface-transport/issues/44)) ([b30ee5f](https://github.com/libp2p/interface-transport/commit/b30ee5f))
* make listen take an array of addrs ([#46](https://github.com/libp2p/interface-transport/issues/46)) ([1dc5baa](https://github.com/libp2p/interface-transport/commit/1dc5baa))
### BREAKING CHANGES
* All places in the API that used callbacks are now replaced with async/await
* test: add tests for canceling dials
* feat: Adapter class
<a name="0.3.7"></a>
## [0.3.7](https://github.com/libp2p/interface-transport/compare/v0.3.6...v0.3.7) (2018-11-29)
<a name="0.3.6"></a>
## [0.3.6](https://github.com/libp2p/interface-transport/compare/v0.3.5...v0.3.6) (2018-04-05)
<a name="0.3.5"></a>
## [0.3.5](https://github.com/libp2p/interface-transport/compare/v0.3.4...v0.3.5) (2017-03-21)
### Bug Fixes
* wrong main path in package.json ([54b83a7](https://github.com/libp2p/interface-transport/commit/54b83a7))
<a name="0.3.4"></a>
## [0.3.4](https://github.com/libp2p/interface-transport/compare/v0.3.3...v0.3.4) (2017-02-09)
<a name="0.3.3"></a>
## [0.3.3](https://github.com/libp2p/interface-transport/compare/v0.3.2...v0.3.3) (2016-09-06)
<a name="0.3.2"></a>
## [0.3.2](https://github.com/libp2p/interface-transport/compare/v0.3.1...v0.3.2) (2016-09-06)
### Bug Fixes
* **dial-test:** ensure goodbye works over tcp ([e1346da](https://github.com/libp2p/interface-transport/commit/e1346da))
<a name="0.3.1"></a>
## [0.3.1](https://github.com/libp2p/interface-transport/compare/v0.3.0...v0.3.1) (2016-09-05)
### Bug Fixes
* **package.json:** point to right main ([ace6150](https://github.com/libp2p/interface-transport/commit/ace6150))
<a name="0.3.0"></a>
# [0.3.0](https://github.com/libp2p/interface-transport/compare/v0.2.0...v0.3.0) (2016-09-05)
### Bug Fixes
* **tests:** add place holder test script for releases ([8e9f7cf](https://github.com/libp2p/interface-transport/commit/8e9f7cf))
### Features
* **dialer:** remove conn from on connect callback ([1bd20d9](https://github.com/libp2p/interface-transport/commit/1bd20d9))
* **spec:** update the dial interface to cope with new pull additions ([2e12166](https://github.com/libp2p/interface-transport/commit/2e12166))
* **tests:** add dial and listen tests ([d50224d](https://github.com/libp2p/interface-transport/commit/d50224d))
<a name="0.2.0"></a>
# [0.2.0](https://github.com/libp2p/interface-transport/compare/v0.1.1...v0.2.0) (2016-06-16)
<a name="0.1.1"></a>
## [0.1.1](https://github.com/libp2p/interface-transport/compare/v0.1.0...v0.1.1) (2015-12-12)
<a name="0.1.0"></a>
# 0.1.0 (2015-09-17)

22
LICENSE Normal file
View File

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2015 David Dias
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

227
README.md Normal file
View File

@ -0,0 +1,227 @@
interface-transport
===================
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://protocol.ai)
[![](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/)
[![](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
[![](https://img.shields.io/travis/libp2p/interface-transport.svg?style=flat-square)](https://travis-ci.com/libp2p/interface-transport)
[![Dependency Status](https://david-dm.org/libp2p/interface-transport.svg?style=flat-square)](https://david-dm.org/libp2p/interface-transport)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
> A test suite and interface you can use to implement a libp2p transport. A libp2p transport is understood as something that offers a dial and listen interface.
The primary goal of this module is to enable developers to pick and swap their transport module as they see fit for their libp2p installation, without having to go through shims or compatibility issues. This module and test suite were heavily inspired by abstract-blob-store, interface-stream-muxer and others.
Publishing a test suite as a module lets multiple modules all ensure compatibility since they use the same test suite.
The purpose of this interface is not to reinvent any wheels when it comes to dialing and listening to transports. Instead, it tries to provide a uniform API for several transports through a shimmed interface.
## Lead Maintainer
[Jacob Heun](https://github.com/jacobheun/)
# Modules that implement the interface
- [js-libp2p-tcp](https://github.com/libp2p/js-libp2p-tcp)
- [js-libp2p-webrtc-star](https://github.com/libp2p/js-libp2p-webrtc-star)
- [js-libp2p-webrtc-direct](https://github.com/libp2p/js-libp2p-webrtc-direct)
- [js-libp2p-websocket-star](https://github.com/libp2p/js-libp2p-websocket-star)
- [js-libp2p-websockets](https://github.com/libp2p/js-libp2p-websockets)
- [js-libp2p-utp](https://github.com/libp2p/js-libp2p-utp)
- [webrtc-explorer](https://github.com/diasdavid/webrtc-explorer)
# Badge
Include this badge in your readme if you make a module that is compatible with the interface-transport API. You can validate this by running the tests.
![](https://raw.githubusercontent.com/diasdavid/interface-transport/master/img/badge.png)
# How to use the battery of tests
## Node.js
```js
/* eslint-env mocha */
'use strict'
const tests = require('interface-transport')
const multiaddr = require('multiaddr')
const YourTransport = require('../src')
describe('compliance', () => {
tests({
setup (options) {
let transport = new YourTransport(options)
const addrs = [
multiaddr('valid-multiaddr-for-your-transport'),
multiaddr('valid-multiaddr2-for-your-transport')
]
const network = require('my-network-lib')
const connect = network.connect
const connector = {
delay (delayMs) {
// Add a delay in the connection mechanism for the transport
// (this is used by the dial tests)
network.connect = (...args) => setTimeout(() => connect(...args), delayMs)
},
restore () {
// Restore the connection mechanism to normal
network.connect = connect
}
}
return { transport, addrs, connector }
},
teardown () {
// Clean up any resources created by setup()
}
})
})
```
# API
A valid transport (one that follows the interface defined) must implement the following API:
**Table of contents:**
- type: `Transport`
- `new Transport({ upgrader, ...[options] })`
- `<Promise> transport.dial(multiaddr, [options])`
- `<Multiaddr[]> transport.filter(multiaddrs)`
- `transport.createListener([options], handlerFunction)`
- type: `transport.Listener`
- event: 'listening'
- event: 'close'
- event: 'connection'
- event: 'error'
- `<Promise> listener.listen(multiaddr)`
- `listener.getAddrs()`
- `<Promise> listener.close([options])`
### Types
#### Upgrader
Upgraders have 2 methods: `upgradeOutbound` and `upgradeInbound`.
- `upgradeOutbound` must be called and returned by `transport.dial`.
- `upgradeInbound` must be called and the results must be passed to the `createListener` `handlerFunction` and the `connection` event handler, anytime a new connection is created.
```js
const connection = await upgrader.upgradeOutbound(multiaddrConnection)
const connection = await upgrader.upgradeInbound(multiaddrConnection)
```
The `Upgrader` methods take a [MultiaddrConnection](#multiaddrconnection) and will return an `interface-connection` instance.
#### MultiaddrConnection
- `MultiaddrConnection`
- `sink<function(source)>`: A [streaming iterable sink](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#sink-it)
- `source<AsyncIterator>`: A [streaming iterable source](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#source-it)
- `close<function(Error)>`: A method for closing the connection
- `conn`: The raw connection of the transport, such as a TCP socket.
- `remoteAddr<Multiaddr>`: The remote `Multiaddr` of the connection.
- `[localAddr<Multiaddr>]`: An optional local `Multiaddr` of the connection.
- `timeline<object>`: A hash map of connection time events
- `open<number>`: The time in ticks the connection was opened
- `close<number>`: The time in ticks the connection was closed
### Creating a transport instance
- `const transport = new Transport({ upgrader, ...[options] })`
Creates a new Transport instance. `options` is an JavaScript object that should include the necessary parameters for the transport instance. Options **MUST** include an `Upgrader` instance, as Transports will use this to return `interface-connection` instances from `transport.dial` and the listener `handlerFunction`.
**Note: Why is it important to instantiate a transport -** Some transports have state that can be shared between the dialing and listening parts. For example with libp2p-webrtc-star, in order to dial a peer, the peer must be part of some signaling network that is shared with the listener.
### Dial to another peer
- `const connection = await transport.dial(multiaddr, [options])`
This method uses a transport to dial a Peer listening on `multiaddr`.
`multiaddr` must be of the type [`multiaddr`](https://www.npmjs.com/multiaddr).
`[options]` the options that may be passed to the dial. Must support the `signal` option (see below)
Dial **MUST** call and return `upgrader.upgradeOutbound(multiaddrConnection)`. The upgrader will return an [interface-connection](https://github.com/libp2p/interface-connection) instance.
The dial may throw an `Error` instance if there was a problem connecting to the `multiaddr`.
### Canceling a dial
Dials may be cancelled using an `AbortController`:
```Javascript
const AbortController = require('abort-controller')
const { AbortError } = require('interface-transport')
const controller = new AbortController()
try {
const conn = await mytransport.dial(ma, { signal: controller.signal })
// Do stuff with conn here ...
} catch (err) {
if(err.code === AbortError.code) {
// Dial was aborted, just bail out
return
}
throw err
}
// ----
// In some other part of the code:
controller.abort()
// ----
```
### Filtering Addresses
- `const supportedAddrs = await transport.filter(multiaddrs)`
When using a transport its important to be able to filter out `multiaddr`s that the transport doesn't support. A transport instance provides a filter method to return only the valid addresses it supports.
`multiaddrs` must be an array of type [`multiaddr`](https://www.npmjs.com/multiaddr).
Filter returns an array of `multiaddr`.
### Create a listener
- `const listener = transport.createListener([options], handlerFunction)`
This method creates a listener on the transport. Implementations **MUST** call `upgrader.upgradeInbound(multiaddrConnection)` and pass its results to the `handlerFunction` and any emitted `connection` events.
`options` is an optional object that contains the properties the listener must have, in order to properly listen on a given transport/socket.
`handlerFunction` is a function called each time a new connection is received. It must follow the following signature: `function (conn) {}`, where `conn` is a connection that follows the [`interface-connection`](https://github.com/diasdavid/interface-connection).
The listener object created may emit the following events:
- `listening` - when the listener is ready for incoming connections
- `close` - when the listener is closed
- `connection` - (`conn`) each time an incoming connection is received
- `error` - (`err`) each time there is an error on the connection
### Start a listener
- `await listener.listen(multiaddr)`
This method puts the listener in `listening` mode, waiting for incoming connections.
`multiaddr` is the address that the listener should bind to.
### Get listener addrs
- `listener.getAddrs()`
This method returns the addresses on which this listener is listening. Useful when listening on port 0 or any interface (0.0.0.0).
### Stop a listener
- `await listener.close([options])`
This method closes the listener so that no more connections can be opened on this transport instance.
`options` is an optional object that may contain the following properties:
- `timeout` - A timeout value (in ms) after which all connections on this transport will be destroyed if the transport is not able to close gracefully. (e.g { timeout: 1000 })

BIN
img/badge.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.1 KiB

BIN
img/badge.sketch Normal file

Binary file not shown.

19
img/badge.svg Normal file

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 9.5 KiB

65
package.json Normal file
View File

@ -0,0 +1,65 @@
{
"name": "interface-transport",
"version": "0.7.0",
"description": "A test suite and interface you can use to implement a transport.",
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
"repository": {
"type": "git",
"url": "https://github.com/libp2p/interface-transport.git"
},
"main": "src/index.js",
"files": [
"dist",
"src"
],
"scripts": {
"lint": "aegir lint",
"test": "aegir test",
"build": "aegir build",
"release": "aegir release --no-test",
"release-minor": "aegir release --type minor --no-test",
"release-major": "aegir release --type major --no-test",
"coverage": "exit(0)",
"coverage-publish": "exit(0)"
},
"pre-push": [
"lint"
],
"keywords": [
"IPFS"
],
"license": "MIT",
"bugs": {
"url": "https://github.com/libp2p/interface-transport/issues"
},
"homepage": "https://github.com/libp2p/interface-transport",
"devDependencies": {
"aegir": "^20.0.0"
},
"dependencies": {
"abort-controller": "^3.0.0",
"async-iterator-to-pull-stream": "^1.3.0",
"chai": "^4.2.0",
"dirty-chai": "^2.0.1",
"interface-connection": "~0.3.3",
"it-goodbye": "^2.0.0",
"it-pipe": "^1.0.0",
"multiaddr": "^7.0.0",
"pull-stream": "^3.6.9",
"sinon": "^7.4.2",
"streaming-iterables": "^4.1.0"
},
"contributors": [
"Alan Shaw <alan.shaw@protocol.ai>",
"David Dias <daviddias.p@gmail.com>",
"Friedel Ziegelmayer <dignifiedquire@gmail.com>",
"Jacob Heun <jacobheun@gmail.com>",
"João Santos <joaosantos15@users.noreply.github.com>",
"Maciej Krüger <mkg20001@gmail.com>",
"Richard Littauer <richard.littauer@gmail.com>",
"Vasco Santos <vasco.santos@ua.pt>",
"dirkmc <dirkmdev@gmail.com>",
"dmitriy ryajov <dryajov@dmitriys-MBP.HomeNET>",
"greenkeeperio-bot <support@greenkeeper.io>"
]
}

80
src/adapter.js Normal file
View File

@ -0,0 +1,80 @@
'use strict'
const { Connection } = require('interface-connection')
const toPull = require('async-iterator-to-pull-stream')
const error = require('pull-stream/sources/error')
const drain = require('pull-stream/sinks/drain')
const noop = () => {}
function callbackify (fn) {
return async function (...args) {
let cb = args.pop()
if (typeof cb !== 'function') {
args.push(cb)
cb = noop
}
let res
try {
res = await fn(...args)
} catch (err) {
return cb(err)
}
cb(null, res)
}
}
// Legacy adapter to old transport & connection interface
class Adapter {
constructor (transport) {
this.transport = transport
}
dial (ma, options, callback) {
if (typeof options === 'function') {
callback = options
options = {}
}
callback = callback || noop
const conn = new Connection()
this.transport.dial(ma, options)
.then(socket => {
conn.setInnerConn(toPull.duplex(socket))
conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket))
conn.close = callbackify(socket.close.bind(socket))
callback(null, conn)
})
.catch(err => {
conn.setInnerConn({ sink: drain(), source: error(err) })
callback(err)
})
return conn
}
createListener (options, handler) {
if (typeof options === 'function') {
handler = options
options = {}
}
const server = this.transport.createListener(options, socket => {
const conn = new Connection(toPull.duplex(socket))
conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket))
handler(conn)
})
const proxy = {
listen: callbackify(server.listen.bind(server)),
close: callbackify(server.close.bind(server)),
getAddrs: callbackify(server.getAddrs.bind(server)),
getObservedAddrs: callbackify(() => server.getObservedAddrs())
}
return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] })
}
}
module.exports = Adapter

203
src/dial-test.js Normal file
View File

@ -0,0 +1,203 @@
/* eslint-env mocha */
'use strict'
const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)
const { isValidTick } = require('./utils')
const goodbye = require('it-goodbye')
const { collect } = require('streaming-iterables')
const pipe = require('it-pipe')
const AbortController = require('abort-controller')
const AbortError = require('./errors').AbortError
const sinon = require('sinon')
module.exports = (common) => {
const upgrader = {
_upgrade (multiaddrConnection) {
['sink', 'source', 'remoteAddr', 'conn', 'timeline', 'close'].forEach(prop => {
expect(multiaddrConnection).to.have.property(prop)
})
expect(isValidTick(multiaddrConnection.timeline.open)).to.equal(true)
return multiaddrConnection
},
upgradeOutbound (multiaddrConnection) {
return upgrader._upgrade(multiaddrConnection)
},
upgradeInbound (multiaddrConnection) {
return upgrader._upgrade(multiaddrConnection)
}
}
describe('dial', () => {
let addrs
let transport
let connector
let listener
before(async () => {
({ addrs, transport, connector } = await common.setup({ upgrader }))
})
after(() => common.teardown && common.teardown())
beforeEach(() => {
listener = transport.createListener((conn) => pipe(conn, conn))
return listener.listen(addrs[0])
})
afterEach(() => {
sinon.restore()
return listener.close()
})
it('simple', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
const conn = await transport.dial(addrs[0])
const s = goodbye({ source: ['hey'], sink: collect })
const result = await pipe(s, conn, s)
expect(upgradeSpy.callCount).to.equal(1)
expect(upgradeSpy.returned(conn)).to.equal(true)
expect(result.length).to.equal(1)
expect(result[0].toString()).to.equal('hey')
})
it('can close connections', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
const conn = await transport.dial(addrs[0])
expect(upgradeSpy.callCount).to.equal(1)
expect(upgradeSpy.returned(conn)).to.equal(true)
await conn.close()
expect(isValidTick(conn.timeline.close)).to.equal(true)
})
it('to non existent listener', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
try {
await transport.dial(addrs[1])
} catch (_) {
expect(upgradeSpy.callCount).to.equal(0)
// Success: expected an error to be throw
return
}
expect.fail('Did not throw error attempting to connect to non-existent listener')
})
it('abort before dialing throws AbortError', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
const controller = new AbortController()
controller.abort()
const socket = transport.dial(addrs[0], { signal: controller.signal })
try {
await socket
} catch (err) {
expect(upgradeSpy.callCount).to.equal(0)
expect(err.code).to.eql(AbortError.code)
expect(err.type).to.eql(AbortError.type)
return
}
expect.fail('Did not throw error with code ' + AbortError.code)
})
it('abort while dialing throws AbortError', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeOutbound')
// Add a delay to connect() so that we can abort while the dial is in
// progress
connector.delay(100)
const controller = new AbortController()
const socket = transport.dial(addrs[0], { signal: controller.signal })
setTimeout(() => controller.abort(), 50)
try {
await socket
} catch (err) {
expect(upgradeSpy.callCount).to.equal(0)
expect(err.code).to.eql(AbortError.code)
expect(err.type).to.eql(AbortError.type)
return
} finally {
connector.restore()
}
expect.fail('Did not throw error with code ' + AbortError.code)
})
it('abort while reading throws AbortError', async () => {
// Add a delay to the response from the server
async function * delayedResponse (source) {
for await (const val of source) {
await new Promise((resolve) => setTimeout(resolve, 1000))
yield val
}
}
const delayedListener = transport.createListener(async (conn) => {
await pipe(conn, delayedResponse, conn)
})
await delayedListener.listen(addrs[1])
// Create an abort signal and dial the socket
const controller = new AbortController()
const socket = await transport.dial(addrs[1], { signal: controller.signal })
try {
// Set a timeout to abort before the server responds
setTimeout(() => controller.abort(), 100)
// An AbortError should be thrown before the pipe completes
const s = goodbye({ source: ['hey'], sink: collect })
await pipe(s, socket, s)
} catch (err) {
expect(err.code).to.eql(AbortError.code)
expect(err.type).to.eql(AbortError.type)
return
} finally {
await delayedListener.close()
}
expect.fail('Did not throw error with code ' + AbortError.code)
})
it('abort while writing does not throw AbortError', async () => {
// Record values received by the listener
const recorded = []
async function * recorderTransform (source) {
for await (const val of source) {
recorded.push(val)
yield val
}
}
const recordListener = transport.createListener(async (conn) => {
await pipe(conn, recorderTransform, conn)
})
await recordListener.listen(addrs[1])
// Create an abort signal and dial the socket
const controller = new AbortController()
const socket = await transport.dial(addrs[1], { signal: controller.signal })
// Set a timeout to abort before writing has completed
setTimeout(() => controller.abort(), 100)
try {
// The pipe should write to the socket until aborted
await pipe(
async function * () {
yield 'hey'
await new Promise((resolve) => setTimeout(resolve, 200))
yield 'there'
},
socket)
expect(recorded.length).to.eql(1)
expect(recorded[0].toString()).to.eql('hey')
} finally {
await recordListener.close()
}
})
})
}

21
src/errors.js Normal file
View File

@ -0,0 +1,21 @@
'use strict'
class AbortError extends Error {
constructor () {
super('The operation was aborted')
this.code = AbortError.code
this.type = AbortError.type
}
static get code () {
return 'ABORT_ERR'
}
static get type () {
return 'aborted'
}
}
module.exports = {
AbortError
}

37
src/filter-test.js Normal file
View File

@ -0,0 +1,37 @@
/* eslint-env mocha */
'use strict'
const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)
module.exports = (common) => {
const upgrader = {
_upgrade (multiaddrConnection) {
return multiaddrConnection
},
upgradeOutbound (multiaddrConnection) {
return upgrader._upgrade(multiaddrConnection)
},
upgradeInbound (multiaddrConnection) {
return upgrader._upgrade(multiaddrConnection)
}
}
describe('filter', () => {
let addrs
let transport
before(async () => {
({ addrs, transport } = await common.setup({ upgrader }))
})
after(() => common.teardown && common.teardown())
it('filters addresses', () => {
const filteredAddrs = transport.filter(addrs)
expect(filteredAddrs).to.eql(addrs)
})
})
}

17
src/index.js Normal file
View File

@ -0,0 +1,17 @@
/* eslint-env mocha */
'use strict'
const dial = require('./dial-test')
const listen = require('./listen-test')
const filter = require('./filter-test')
module.exports = (common) => {
describe('interface-transport', () => {
dial(common)
listen(common)
filter(common)
})
}
module.exports.AbortError = require('./errors').AbortError
module.exports.Adapter = require('./adapter')

144
src/listen-test.js Normal file
View File

@ -0,0 +1,144 @@
/* eslint max-nested-callbacks: ["error", 8] */
/* eslint-env mocha */
'use strict'
const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)
const sinon = require('sinon')
const pipe = require('it-pipe')
const { isValidTick } = require('./utils')
module.exports = (common) => {
const upgrader = {
_upgrade (multiaddrConnection) {
['sink', 'source', 'remoteAddr', 'conn', 'timeline', 'close'].forEach(prop => {
expect(multiaddrConnection).to.have.property(prop)
})
expect(isValidTick(multiaddrConnection.timeline.open)).to.equal(true)
return multiaddrConnection
},
upgradeOutbound (multiaddrConnection) {
return upgrader._upgrade(multiaddrConnection)
},
upgradeInbound (multiaddrConnection) {
return upgrader._upgrade(multiaddrConnection)
}
}
describe('listen', () => {
let addrs
let transport
before(async () => {
({ transport, addrs } = await common.setup({ upgrader }))
})
after(() => common.teardown && common.teardown())
afterEach(() => {
sinon.restore()
})
it('simple', async () => {
const listener = transport.createListener((conn) => {})
await listener.listen(addrs[0])
await listener.close()
})
it('close listener with connections, through timeout', async () => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeInbound')
const listenerConns = []
const listener = transport.createListener((conn) => {
listenerConns.push(conn)
expect(upgradeSpy.returned(conn)).to.equal(true)
pipe(conn, conn)
})
// Listen
await listener.listen(addrs[0])
// Create two connections to the listener
const [socket1] = await Promise.all([
transport.dial(addrs[0]),
transport.dial(addrs[0])
])
// Give the listener a chance to finish its upgrade
await new Promise(resolve => setTimeout(resolve, 0))
// Wait for the data send and close to finish
await Promise.all([
pipe(
[Buffer.from('Some data that is never handled')],
socket1
),
// Closer the listener (will take a couple of seconds to time out)
listener.close()
])
await socket1.close()
expect(isValidTick(socket1.timeline.close)).to.equal(true)
listenerConns.forEach(conn => {
expect(isValidTick(conn.timeline.close)).to.equal(true)
})
// 2 dials = 2 connections upgraded
expect(upgradeSpy.callCount).to.equal(2)
})
describe('events', () => {
it('connection', (done) => {
const upgradeSpy = sinon.spy(upgrader, 'upgradeInbound')
const listener = transport.createListener()
listener.on('connection', async (conn) => {
expect(upgradeSpy.returned(conn)).to.equal(true)
expect(upgradeSpy.callCount).to.equal(1)
expect(conn).to.exist()
await listener.close()
done()
})
;(async () => {
await listener.listen(addrs[0])
await transport.dial(addrs[0])
})()
})
it('listening', (done) => {
const listener = transport.createListener()
listener.on('listening', async () => {
await listener.close()
done()
})
listener.listen(addrs[0])
})
it('error', (done) => {
const listener = transport.createListener()
listener.on('error', async (err) => {
expect(err).to.exist()
await listener.close()
done()
})
listener.emit('error', new Error('my err'))
})
it('close', (done) => {
const listener = transport.createListener()
listener.on('close', done)
;(async () => {
await listener.listen(addrs[0])
await listener.close()
})()
})
})
})
}

16
src/utils/index.js Normal file
View File

@ -0,0 +1,16 @@
'use strict'
module.exports = {
/**
* A tick is considered valid if it happened between now
* and `ms` milliseconds ago
* @param {number} date Time in ticks
* @param {number} ms max milliseconds that should have expired
* @returns {boolean}
*/
isValidTick: function isValidTick (date, ms = 5000) {
const now = Date.now()
if (date > now - ms && date <= now) return true
return false
}
}

1
test/transport.spec.js Normal file
View File

@ -0,0 +1 @@
'use strict'