Compare commits

..

82 Commits

Author SHA1 Message Date
d7a0246d54 Release v0.9.2. 2016-03-23 08:32:07 +00:00
1344caa4d2 Merge pull request #33 from diasdavid/tests/add-multiplex-to-the-mix
add multiplex to the mix
2016-03-23 08:28:43 +00:00
52519cd4e9 add multiplex tests 2016-03-23 08:17:16 +00:00
5bfbd8d972 restructure test files 2016-03-23 08:14:37 +00:00
28bd4ab187 Release v0.9.1. 2016-03-23 07:01:54 +00:00
35b5ac6573 Merge pull request #32 from diasdavid/tests/websockets-mania
fix dirty identify bug
2016-03-22 17:36:50 +00:00
2bd536e861 fix dirty identify bug 2016-03-22 17:31:58 +00:00
a2084f01e3 Merge pull request #31 from diasdavid/clean/browser-tests
WIP: clean browser tests, add badgers
2016-03-22 12:07:13 +00:00
8c6ef52504 clean browser tests, add badgers 2016-03-22 12:01:46 +00:00
72f205e2b9 Merge pull request #29 from xicombd/browser-tests
Browser tests
2016-03-22 10:00:28 +00:00
e41694d308 Add more tests 2016-03-20 18:50:30 +00:00
caf3a0180a Add browser tests 2016-03-20 18:22:54 +00:00
879d736686 Merge pull request #30 from xicombd/dial-fixes
Dial fixes
2016-03-20 18:22:00 +00:00
c726c252df Return conn on swarm.dial 2016-03-20 18:11:16 +00:00
f2df5586d5 Update libp2p-websockets and add test for callback's conn 2016-03-20 17:57:14 +00:00
9d149caf15 Release v0.9.0. 2016-03-15 12:57:14 +00:00
689bd4b190 Merge pull request #28 from diasdavid/tests/ws+tcp
All together now 🎵
2016-03-15 12:24:39 +00:00
9a29b01ad3 it wooorks™ 2016-03-15 12:20:42 +00:00
63569849c0 Release v0.8.1. 2016-03-15 11:21:54 +00:00
c4f87f0a33 Merge pull request #27 from diasdavid/tests/websockets
add websockets to the battery of tests, everything checks out
2016-03-15 11:18:07 +00:00
eda78d7a76 add websockets to the battery of tests, everything checks out 2016-03-15 11:14:55 +00:00
6bcf48ff39 Release v0.8.0. 2016-03-15 10:25:27 +00:00
8615a50bc9 Merge pull request #26 from diasdavid/feature/filter-addrs
filter the multiaddrs that are passed to avoid collision
2016-03-15 10:08:43 +00:00
b83e5dd8dc complete 2016-03-15 09:57:06 +00:00
4036ea4b1a Release v0.7.0. 2016-03-11 14:56:16 +00:00
d65a0901b9 Merge pull request #25 from diasdavid/fix/silly-passthrough-bug
silly passthrough bug
2016-03-11 14:55:52 +00:00
412bda731b fixed 2016-03-11 14:47:39 +00:00
69bd386afc Release v0.6.0. 2016-03-10 21:00:28 +00:00
849d38e8e0 update readme badges 2016-03-10 21:00:15 +00:00
bb7f7399b2 Merge pull request #20 from diasdavid/update/simplify
simplify libp2p-swarm
2016-03-10 20:41:23 +00:00
990111980b woot 2016-03-10 20:36:07 +00:00
366b6ef382 design notes 2016-03-10 15:17:07 +00:00
e8de55bc28 update the docs 2016-03-10 14:38:22 +00:00
f8e14e4ddf stream multiplexing done, starting on identify refactor 2016-03-07 15:22:36 +00:00
9d8ee67c61 high level API working + tests 2016-03-06 08:40:49 +00:00
c8f2fdd077 internal transport interface + libp2p-tcp tests 2016-03-05 11:27:57 +00:00
1fd6a3885d remove coverage folder and solve new linting issue 2016-03-03 12:21:01 +00:00
c651dd2aec remove coverage folder and solve new linting issue 2016-03-03 12:18:00 +00:00
d688893268 Merge pull request #19 from diasdavid/update/mocha-istanbul-standard
update swarm
2016-03-03 12:09:06 +00:00
0e636597ee update swarm 2016-03-03 12:08:46 +00:00
b54b7edeeb Merge pull request #18 from RichardLitt/patch-1
Freejs => Freenode
2015-12-29 00:29:30 +01:00
aea0940b2d Freejs => Freenode
See https://github.com/ipfs/community/issues/93
2015-12-28 17:18:08 -05:00
60028daf14 Merge pull request #17 from masylum/feature/remove-v4-to-v6-hack
Removed ipv6/ipv4 hack
2015-11-02 04:32:31 +00:00
c39eb4a830 Removed ipv6 to ipv4 hack 2015-11-01 19:57:20 +01:00
3b9465de92 Release v0.5.5. 2015-10-29 00:27:59 +00:00
c827bd8470 package.json 2015-10-29 00:27:49 +00:00
555f2199df Release v0.5.4. 2015-10-29 00:27:18 +00:00
a9942b014b Merge pull request #16 from diasdavid/rn
update readme and package.json
2015-10-29 00:26:53 +00:00
348615b5aa update readme and package.json 2015-10-29 00:26:18 +00:00
f309d4f7b7 Release v0.5.3. 2015-09-28 16:12:01 +01:00
1c8fbb2c5b Merge branch 'masylum-patch-1' 2015-09-28 16:11:45 +01:00
789fdcfdc3 merge #11 2015-09-28 16:11:16 +01:00
f53124393c Release v0.5.2. 2015-09-28 04:01:45 +01:00
81c6ab013d Merge pull request #14 from masylum/implement-close
Implemented `close` and improved the tests with it
2015-09-28 04:00:12 +01:00
adb5ce19b1 Release v0.5.1. 2015-09-26 21:12:46 +01:00
4feaa8b187 Merge pull request #13 from masylum/patch-3
Do not allow undefined `peerInfo`
2015-09-26 20:59:14 +01:00
5c53540e92 Implemented close and improved the tests with it 2015-09-26 20:12:13 +02:00
8dc46da80f Protect from peers without supported transports
# What

Trying to run compliance tests from the kad router module. I've tried to port the new swarm API but forgot to add a transport. The tests ended up blowing up instead of failing gracefully.

# How to test

```js
peerOne = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8090')])
peerTwo = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8091')])
swarm = new Swarm(peerZero)
swarm.dial(peerTwo, {}, function (err) {
  console.log(err);
});
```

This just work and display the error.
2015-09-26 16:55:34 +02:00
0514b0034b Do not allow undefined peerInfo
# What

The code assumes that `peerInfo` exists, the API doesn't.

# How to test

```js
swarm = new Swarm()
swarm.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8095})
```

This shouldn't explode because of `self.peerInfo.multiaddrs`.
2015-09-26 16:24:46 +02:00
cd53344441 Release v0.5.0. 2015-09-23 23:12:50 +01:00
490022cf02 ready to publish new version 2015-09-23 23:12:35 +01:00
4d9d8c94c7 Merge pull request #10 from diasdavid/revisit
Revisit Swarm - multitransport + upgrades - https://github.com/diasdavid/node-ipfs-swarm/issues/8
2015-09-23 23:09:27 +01:00
1ba8e80d4d rm laf 2015-09-23 20:34:31 +01:00
5b7a6051ad comment undone tests 2015-09-23 20:07:55 +01:00
5fe94446d8 rm old test file 2015-09-23 20:07:05 +01:00
92b499df82 fix readme typos and missing links 2015-09-23 20:06:10 +01:00
e6bcde41fb change cov 2015-09-23 19:58:14 +01:00
fb37b4dec9 clear unused console logs 2015-09-23 19:57:37 +01:00
2000827273 add identify 2015-09-23 19:14:29 +01:00
0bcbe63005 rm old code 2015-09-23 17:26:26 +01:00
168d01befd stream muxer for connection reuse test 2015-09-23 17:25:21 +01:00
0040be765d add spdy + test 2015-09-23 17:11:32 +01:00
416e107d64 quick fix for travis 2015-09-22 17:50:41 +01:00
59b00f6886 use warmed up connection + test 2015-09-22 17:27:37 +01:00
8e8d8e9093 dial on a protocol + test 2015-09-22 16:50:42 +01:00
5e4cca52c0 dial a conn + test 2015-09-22 16:16:21 +01:00
e1df0b9ecd add transport and close listener test 2015-09-22 14:31:23 +01:00
73a6a4fd45 adding transports works 2015-09-21 19:56:42 +01:00
1833ded0f7 making progress 2015-09-21 16:46:04 +01:00
544e4a4165 update README with new candidate interface 2015-09-20 21:08:28 +01:00
7aae02581c Merge pull request #7 from Dignifiedquire/identify-tests
identify: Fix some issues with updateSelf.
2015-09-14 19:23:24 +01:00
17f40911db identify: Fix some issues with updateSelf. 2015-08-04 12:10:17 +02:00
27 changed files with 1700 additions and 822 deletions

3
.gitignore vendored
View File

@ -1,3 +1,4 @@
# Logs
logs
*.log
@ -27,6 +28,6 @@ build/Release
node_modules
coverage
.jshintrc
.jshintignore

View File

@ -10,3 +10,10 @@ before_install:
script:
- npm run lint
- npm test
addons:
firefox: 'latest'
before_script:
- export DISPLAY=:99.0
- sh -e /etc/init.d/xvfb start

168
README.md
View File

@ -1,82 +1,158 @@
ipfs-swarm Node.js implementation
=================================
libp2p-swarm JavaScript implementation
======================================
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io) [![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/) [![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs) [![Build Status](https://img.shields.io/travis/diasdavid/node-ipfs-swarm/master.svg?style=flat-square)](https://travis-ci.org/diasdavid/node-ipfs-swarm)
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![Build Status](https://img.shields.io/travis/diasdavid/js-libp2p-swarm/master.svg?style=flat-square)](https://travis-ci.org/diasdavid/js-libp2p-swarm)
![](https://img.shields.io/badge/coverage-%3F-yellow.svg?style=flat-square)
[![Dependency Status](https://david-dm.org/diasdavid/js-libp2p-swarm.svg?style=flat-square)](https://david-dm.org/ipfs/js-libp2p-swarm)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
> IPFS swarm implementation in Node.js
> libp2p swarm implementation in JavaScript
# Description
ipfs-swarm is an abstraction for the network layer on IPFS. It offers an API to open streams between peers on a specific protocol.
libp2p-swarm is a connection abstraction that is able to leverage several transports and connection upgrades, such as congestion control, channel encryption, multiplexing several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream.
Ref spec (WIP) - https://github.com/diasdavid/specs/blob/protocol-spec/protocol/layers.md#network-layer
libp2p-swarm is used by libp2p but it can be also used as a standalone module.
# Usage
### Create a new Swarm
## Install
```javascript
var Swarm = require('ipfs-swarm')
libp2p-swarm is available on npm and so, like any other npm module, just:
var s = new Swarm([port]) // `port` defalts to 4001
```bash
> npm install libp2p-swarm --save
```
### Set the swarm to listen for incoming streams
## API
```javascript
s.listen([port], [callback]) // `port` defaults to 4001, `callback` gets called when the socket starts listening
#### Create a libp2p Swarm
And use it in your Node.js code as:
```JavaScript
const Swarm = require('libp2p-swarm')
const sw = new Swarm(peerInfo)
```
### Close the listener/socket and every open stream that was multiplexed on it
peerInfo is a [PeerInfo](https://github.com/diasdavid/js-peer-info) object that represents the peer creating this swarm instance.
```javascript
s.closeListener()
```
### Transports
### Register a protocol to be handled by an incoming stream
##### `swarm.transport.add(key, transport, options, callback)`
```javascript
s.registerHandler('/name/protocol/you/want/version', function (stream) {})
```
libp2p-swarm expects transports that implement [interface-transport](https://github.com/diasdavid/abstract-transport). For example [libp2p-tcp](https://github.com/diasdavid/js-libp2p-tcp).
### Open a new connection
- `key` - the transport identifier
- `transport` -
- `options`
- `callback`
Used when we want to make sure we can connect to a given peer, but do not intend to establish a stream with any of the services offered right away.
##### `swarm.transport.dial(key, multiaddrs, callback)`
```
s.openConnection(peerConnection, function (err) {})
```
Dial to a peer on a specific transport.
- `key`
- `multiaddrs`
- `callback`
### Dial a new stream
##### `swarm.transport.listen(key, options, handler, callback)`
```
s.openStream(peerInfo, protocol, function (err, stream) {})
```
Set a transport to start listening mode.
peerInfo must be a [`ipfs-peer`](https://www.npmjs.com/package/ipfs-peer) object, contaning both peer-id and multiaddrs.
- `key`
- `options`
- `handler`
- `callback`
## Events emitted
##### `swarm.transport.close(key, callback)`
```
.on('error')
Close the listeners of a given transport.
.on('connection')
.on('connection-unknown') // used by Identify to start the Identify protocol from listener to dialer
```
- `key`
- `callback`
## Identify protocol
### Connection
The Identify protocol is an integral part to Swarm. It enables peers to share observedAddrs, identities and other possible address available. This enables us to do better NAT traversal.
##### `swarm.connection.addUpgrade()`
To instantiate Identify:
A connection upgrade must be able to receive and return something that implements the [interface-connection](https://github.com/diasdavid/interface-connection) specification.
```
var Identify = require('ipfs-swarm/identify')
> **WIP**
var i = new Identify(swarmInstance, peerSelf)
```
##### `swarm.connection.addStreamMuxer(muxer)`
`swarmInstance` must be an Instance of swarm and `peerSelf` must be a instance of `ipfs-peer` that represents the peer that instantiated this Identify
Upgrading a connection to use a stream muxer is still considered an upgrade, but a special case since once this connection is applied, the returned obj will implement the [interface-stream-muxer](https://github.com/diasdavid/interface-stream-muxer) spec.
Identify emits a `peer-update` event each time it receives information from another peer.
- `muxer`
##### `swarm.connection.reuse()`
Enable the identify protocol
### `swarm.dial(pi, protocol, callback)`
dial uses the best transport (whatever works first, in the future we can have some criteria), and jump starts the connection until the point we have to negotiate the protocol. If a muxer is available, then drop the muxer onto that connection. Good to warm up connections or to check for connectivity. If we have already a muxer for that peerInfo, than do nothing.
- `pi` - peer info project
- `protocol`
- `callback`
### `swarm.handle(protocol, handler)`
handle a new protocol.
- `protocol`
- `handler` - function called when we receive a dial on `protocol. Signature must be `function (conn) {}`
### `swarm.close(callback)`
close all the listeners and muxers.
- `callback`
# Design
## Multitransport
libp2p is designed to support multiple transports at the same time. While peers are identified by their ID (which are generated from their public keys), the addresses of each pair may vary, depending the device where they are being run or the network in which they are accessible through.
In order for a transport to be supported, it has to follow the [interface-transport](https://github.com/diasdavid/interface-transport) spec.
## Connection upgrades
Each connection in libp2p follows the [interface-connection](https://github.com/diasdavid/interface-connection) spec. This design decision enables libp2p to have upgradable transports.
We think of `upgrade` as a very important notion when we are talking about connections, we can see mechanisms like: stream multiplexing, congestion control, encrypted channels, multipath, simulcast, etc, as `upgrades` to a connection. A connection can be a simple and with no guarantees, drop a packet on the network with a destination thing, a transport in the other hand can be a connection and or a set of different upgrades that are mounted on top of each other, giving extra functionality to that connection and therefore `upgrading` it.
Types of upgrades to a connection:
- encrypted channel (with TLS for e.g)
- congestion flow (some transports don't have it by default)
- multipath (open several connections and abstract it as a single connection)
- simulcast (still really thinking this one through, it might be interesting to send a packet through different connections under some hard network circumstances)
- stream-muxer - this a special case, because once we upgrade a connection to a stream-muxer, we can open more streams (multiplex them) on a single stream, also enabling us to reuse the underlying dialed transport
We also want to enable flexibility when it comes to upgrading a connection, for example, we might that all dialed transports pass through the encrypted channel upgrade, but not the congestion flow, specially when a transport might have already some underlying properties (UDP vs TCP vs WebRTC vs every other transport protocol)
## Identify
Identify is a protocol that Swarms mounts on top of itself, to identify the connections between any two peers. E.g:
- a) peer A dials a conn to peer B
- b) that conn gets upgraded to a stream multiplexer that both peers agree
- c) peer B executes de identify protocol
- d) peer B now can open streams to peer A, knowing which is the identity of peer A
In addition to this, we also share the 'observed addresses' by the other peer, which is extremely useful information for different kinds of network topologies.
## Notes
To avoid the confusion between connection, stream, transport, and other names that represent an abstraction of data flow between two points, we use terms as:
- connection - something that implements the transversal expectations of a stream between two peers, including the benefits of using a stream plus having a way to do half duplex, full duplex
- transport - something that as a dial/listen interface and return objs that implement a connection interface

View File

@ -1,25 +0,0 @@
// var Identify = require('./../src/identify')
var Swarm = require('./../src')
var Peer = require('ipfs-peer')
var Id = require('ipfs-peer-id')
var multiaddr = require('multiaddr')
var a = new Swarm()
a.port = 4000
// a.listen()
// var peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + a.port)])
// attention, peerB Id isn't going to match, but whateves
var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/4001')])
// var i = new Identify(a, peerA)
// i.on('thenews', function (news) {
// console.log('such news')
// })
a.openStream(peerB, '/ipfs/sparkles/1.2.3', function (err, stream) {
if (err) {
return console.log(err)
}
console.log('WoHoo, dialed a stream')
})

View File

@ -1,28 +0,0 @@
// var Identify = require('./../src/identify')
var Swarm = require('./../src')
// var Peer = require('ipfs-peer')
// var Id = require('ipfs-peer-id')
// var multiaddr = require('multiaddr')
var b = new Swarm()
b.port = 4001
// var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + b.port)])
// var i = new Identify(b, peerB)
// i.on('thenews', function (news) {
// console.log('such news')
// })
b.on('error', function (err) {
console.log(err)
})
b.listen()
b.registerHandler('/ipfs/sparkles/1.2.3', function (stream) {
// if (err) {
// return console.log(err)
// }
console.log('woop got a stream')
})

54
karma.conf.js Normal file
View File

@ -0,0 +1,54 @@
const path = require('path')
module.exports = function (config) {
const nodeForgePath = path.resolve(__dirname, 'node_modules/peer-id/deps/forge.bundle.js')
config.set({
basePath: '',
frameworks: ['mocha'],
files: [
nodeForgePath,
'tests/browser-nodejs/browser.js'
],
preprocessors: {
'tests/*': ['webpack'],
'tests/browser-nodejs/*': ['webpack']
},
webpack: {
resolve: {
extensions: ['', '.js', '.json']
},
externals: {
fs: '{}',
'node-forge': 'forge'
},
node: {
Buffer: true
},
module: {
loaders: [
{ test: /\.json$/, loader: 'json' }
]
}
},
webpackMiddleware: {
noInfo: true,
stats: {
colors: true
}
},
reporters: ['spec'],
port: 9876,
colors: true,
logLevel: config.LOG_INFO,
autoWatch: false,
browsers: process.env.TRAVIS ? ['Firefox'] : ['Chrome'],
captureTimeout: 60000,
browserNoActivityTimeout: 20000,
singleRun: true
})
}

View File

@ -1,18 +1,18 @@
{
"name": "ipfs-swarm",
"version": "0.4.1",
"description": "IPFS swarm implementation in Node.js",
"name": "libp2p-swarm",
"version": "0.9.2",
"description": "libp2p swarm implementation in JavaScript",
"main": "src/index.js",
"scripts": {
"test": "./node_modules/.bin/lab tests/*-test.js",
"coverage": "./node_modules/.bin/lab -t 100 tests/*-test.js",
"codestyle": "./node_modules/.bin/standard --format",
"lint": "./node_modules/.bin/standard",
"validate": "npm ls"
"test:node": "mocha tests/*-test.js",
"test:browser": "node tests/browser-nodejs/test.js",
"test": "npm run test:node && npm run test:browser",
"coverage": "istanbul cover --print both -- _mocha tests/*-test.js",
"lint": "standard"
},
"repository": {
"type": "git",
"url": "https://github.com/diasdavid/node-ipfs-swarm.git"
"url": "https://github.com/diasdavid/js-libp2p-swarm.git"
},
"keywords": [
"IPFS"
@ -20,34 +20,45 @@
"author": "David Dias <daviddias@ipfs.io>",
"license": "MIT",
"bugs": {
"url": "https://github.com/diasdavid/node-ipfs-swarm/issues"
"url": "https://github.com/diasdavid/js-libp2p-swarm/issues"
},
"homepage": "https://github.com/diasdavid/node-ipfs-swarm",
"homepage": "https://github.com/diasdavid/js-libp2p-swarm",
"pre-commit": [
"codestyle",
"lint",
"test"
],
"engines": {
"node": "^4.0.0"
"node": "^4.3.0"
},
"devDependencies": {
"code": "^1.4.1",
"lab": "^5.13.0",
"precommit-hook": "^3.0.0",
"sinon": "^1.15.4",
"standard": "^4.5.2",
"stream-pair": "^1.0.3"
"bl": "^1.1.2",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"istanbul": "^0.4.2",
"json-loader": "^0.5.4",
"karma": "^0.13.22",
"karma-chrome-launcher": "^0.2.2",
"karma-firefox-launcher": "^0.1.7",
"karma-mocha": "^0.2.2",
"karma-spec-reporter": "0.0.24",
"karma-webpack": "^1.7.0",
"libp2p-multiplex": "^0.2.1",
"libp2p-spdy": "^0.2.3",
"libp2p-tcp": "^0.4.0",
"libp2p-websockets": "^0.2.1",
"mocha": "^2.4.5",
"multiaddr": "^1.3.0",
"peer-id": "^0.6.0",
"peer-info": "^0.6.0",
"pre-commit": "^1.1.2",
"standard": "^6.0.7",
"stream-pair": "^1.0.3",
"webpack": "^2.1.0-beta.4"
},
"dependencies": {
"async": "^1.3.0",
"ip-address": "^4.0.0",
"ipfs-logger": "^0.1.0",
"ipfs-peer": "^0.3.0",
"ipfs-peer-id": "^0.3.0",
"multiaddr": "^1.0.0",
"multiplex-stream-muxer": "^0.2.0",
"duplex-passthrough": "github:diasdavid/duplex-passthrough",
"ip-address": "^5.0.2",
"multistream-select": "^0.6.1",
"protocol-buffers-stream": "^1.2.0",
"spdy-stream-muxer": "^0.6.0"
"protocol-buffers-stream": "^1.2.0"
}
}

103
src/identify.js Normal file
View File

@ -0,0 +1,103 @@
/*
* Identify is one of the protocols swarms speaks in order to
* broadcast and learn about the ip:port pairs a specific peer
* is available through and to know when a new stream muxer is
* established, so a conn can be reused
*/
const multistream = require('multistream-select')
const fs = require('fs')
const path = require('path')
const Info = require('peer-info')
const Id = require('peer-id')
const multiaddr = require('multiaddr')
const isNode = !global.window
const identity = isNode
? fs.readFileSync(path.join(__dirname, 'identify.proto'))
: require('buffer!./identify.proto')
const pbStream = require('protocol-buffers-stream')(identity)
exports = module.exports
exports.multicodec = '/ipfs/identify/1.0.0'
exports.exec = (rawConn, muxer, peerInfo, callback) => {
// 1. open a stream
// 2. multistream into identify
// 3. send what I see from this other peer (extract fro conn)
// 4. receive what the other peer sees from me
// 4. callback with (err, peerInfo)
const conn = muxer.newStream()
var msI = new multistream.Interactive()
msI.handle(conn, () => {
msI.select(exports.multicodec, (err, ds) => {
if (err) {
return callback(err)
}
var pbs = pbStream()
pbs.on('identify', (msg) => {
if (msg.observedAddr.length > 0) {
peerInfo.multiaddr.addSafe(msg.observedAddr)
}
const peerId = Id.createFromPubKey(msg.publicKey)
const otherPeerInfo = new Info(peerId)
msg.listenAddrs.forEach((ma) => {
otherPeerInfo.multiaddr.add(multiaddr(ma))
})
callback(null, otherPeerInfo)
})
const obsMultiaddr = rawConn.getObservedAddrs()[0]
pbs.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerInfo.id.pubKey,
listenAddrs: peerInfo.multiaddrs.map((mh) => { return mh.buffer }),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})
pbs.pipe(ds).pipe(pbs)
pbs.finalize()
})
})
}
exports.handler = (peerInfo, swarm) => {
return function (conn) {
// 1. receive incoming observed info about me
// 2. update my own information (on peerInfo)
// 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs()
var pbs = pbStream()
pbs.on('identify', function (msg) {
if (msg.observedAddr.length > 0) {
peerInfo.multiaddr.addSafe(msg.observedAddr)
}
const peerId = Id.createFromPubKey(msg.publicKey)
const conn = swarm.muxedConns[peerId.toB58String()].conn
const obsMultiaddr = conn.getObservedAddrs()[0]
pbs.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerInfo.id.pubKey,
listenAddrs: peerInfo.multiaddrs.map(function (ma) {
return ma.buffer
}),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
})
pbs.finalize()
})
pbs.pipe(conn).pipe(pbs)
}
}

View File

@ -1,141 +0,0 @@
/*
* Identify is one of the protocols swarms speaks in order to broadcast and learn
* about the ip:port pairs a specific peer is available through
*/
var Interactive = require('multistream-select').Interactive
var EventEmmiter = require('events').EventEmitter
var util = require('util')
var protobufs = require('protocol-buffers-stream')
var fs = require('fs')
var schema = fs.readFileSync(__dirname + '/identify.proto')
var v6 = require('ip-address').v6
var Id = require('ipfs-peer-id')
var multiaddr = require('multiaddr')
exports = module.exports = Identify
util.inherits(Identify, EventEmmiter)
function Identify (swarm, peerSelf) {
var self = this
self.createProtoStream = protobufs(schema)
swarm.registerHandler('/ipfs/identify/1.0.0', function (stream) {
var ps = self.createProtoStream()
ps.on('identify', function (msg) {
updateSelf(peerSelf, msg.observedAddr)
var peerId = Id.createFromPubKey(msg.publicKey)
var socket = swarm.connections[peerId.toB58String()].socket
var mh = getMultiaddr(socket)
ps.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerSelf.id.pubKey,
listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}),
observedAddr: mh.buffer
})
self.emit('peer-update', {
peerId: peerId,
listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)})
})
ps.finalize()
})
ps.pipe(stream).pipe(ps)
})
swarm.on('connection-unknown', function (conn, socket) {
conn.dialStream(function (err, stream) {
if (err) { return console.log(err) }
var msi = new Interactive()
msi.handle(stream, function () {
msi.select('/ipfs/identify/1.0.0', function (err, ds) {
if (err) { return console.log(err) }
var ps = self.createProtoStream()
ps.on('identify', function (msg) {
var peerId = Id.createFromPubKey(msg.publicKey)
updateSelf(peerSelf, msg.observedAddr)
swarm.connections[peerId.toB58String()] = {
conn: conn,
socket: socket
}
self.emit('peer-update', {
peerId: peerId,
listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)})
})
})
var mh = getMultiaddr(socket)
ps.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerSelf.id.pubKey,
listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}),
observedAddr: mh.buffer
})
ps.pipe(ds).pipe(ps)
ps.finalize()
})
})
})
})
}
function getMultiaddr (socket) {
var mh
if (~socket.remoteAddress.indexOf(':')) {
var addr = new v6.Address(socket.remoteAddress)
if (addr.v4) {
var ip4 = socket.remoteAddress.split(':')[3]
mh = multiaddr('/ip4/' + ip4 + '/tcp/' + socket.remotePort)
} else {
mh = multiaddr('/ip6/' + socket.remoteAddress + '/tcp/' + socket.remotePort)
}
} else {
mh = multiaddr('/ip4/' + socket.remoteAddress + '/tcp/' + socket.remotePort)
}
return mh
}
function updateSelf (peerSelf, observedAddr) {
var omh = multiaddr(observedAddr)
if (!peerSelf.previousObservedAddrs) {
peerSelf.previousObservedAddrs = []
}
for (var i = 0; i < peerSelf.previousObservedAddrs.length; i++) {
if (peerSelf.previousObservedAddrs[i].toString() === omh.toString()) {
peerSelf.previousObservedAddrs.splice(i, 1)
addToSelf()
return
}
}
peerSelf.previousObservedAddrs.push(omh)
function addToSelf () {
var isIn = false
peerSelf.multiaddrs.forEach(function (mh) {
if (mh.toString() === omh.toString()) {
isIn = true
}
})
if (!isIn) {
peerSelf.multiaddrs.push(omh)
}
}
}

View File

@ -1,4 +1,323 @@
var Swarm = require('./swarm')
const multistream = require('multistream-select')
const identify = require('./identify')
const DuplexPassThrough = require('duplex-passthrough')
exports = module.exports = Swarm
exports.singleton = new Swarm()
function Swarm (peerInfo) {
if (!(this instanceof Swarm)) {
return new Swarm(peerInfo)
}
if (!peerInfo) {
throw new Error('You must provide a value for `peerInfo`')
}
// transports --
// { key: transport }; e.g { tcp: <tcp> }
this.transports = {}
this.transport = {}
this.transport.add = (key, transport, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}
if (!callback) { callback = noop }
if (this.transports[key]) {
throw new Error('There is already a transport with this key')
}
this.transports[key] = transport
callback()
}
this.transport.dial = (key, multiaddrs, callback) => {
const t = this.transports[key]
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}
// a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
multiaddrs = t.filter(multiaddrs)
// b) if multiaddrs.length = 1, return the conn from the
// transport, otherwise, create a passthrough
if (multiaddrs.length === 1) {
const conn = t.dial(multiaddrs.shift(), {ready: () => {
const cb = callback
callback = noop // this is done to avoid connection drops as connect errors
cb(null, conn)
}})
conn.once('error', () => {
callback(new Error('failed to connect to every multiaddr'))
})
return conn
}
// c) multiaddrs should already be a filtered list
// specific for the transport we are using
const pt = new DuplexPassThrough()
next(multiaddrs.shift())
return pt
function next (multiaddr) {
const conn = t.dial(multiaddr, {ready: () => {
pt.wrapStream(conn)
const cb = callback
callback = noop // this is done to avoid connection drops as connect errors
cb(null, pt)
}})
conn.once('error', () => {
if (multiaddrs.length === 0) {
return callback(new Error('failed to connect to every multiaddr'))
}
next(multiaddrs.shift())
})
}
}
this.transport.listen = (key, options, handler, callback) => {
// if no callback is passed, we pass conns to connHandler
if (!handler) { handler = connHandler }
const multiaddrs = this.transports[key].filter(peerInfo.multiaddrs)
this.transports[key].createListener(multiaddrs, handler, (err, maUpdate) => {
if (err) {
return callback(err)
}
if (maUpdate) {
// because we can listen on port 0...
peerInfo.multiaddr.replace(multiaddrs, maUpdate)
}
callback()
})
}
this.transport.close = (key, callback) => {
this.transports[key].close(callback)
}
// connections --
// { peerIdB58: { conn: <conn> }}
this.conns = {}
// {
// peerIdB58: {
// muxer: <muxer>
// conn: <transport socket> // to extract info required for the Identify Protocol
// }
// }
this.muxedConns = {}
// { protocol: handler }
this.protocols = {}
this.connection = {}
this.connection.addUpgrade = () => {}
// { muxerCodec: <muxer> } e.g { '/spdy/0.3.1': spdy }
this.muxers = {}
this.connection.addStreamMuxer = (muxer) => {
// for dialing
this.muxers[muxer.multicodec] = muxer
// for listening
this.handle(muxer.multicodec, (conn) => {
const muxedConn = muxer(conn, true)
muxedConn.on('stream', (conn) => {
connHandler(conn)
})
// if identify is enabled, attempt to do it for muxer reuse
if (this.identify) {
identify.exec(conn, muxedConn, peerInfo, (err, pi) => {
if (err) {
return console.log('Identify exec failed', err)
}
this.muxedConns[pi.id.toB58String()] = {}
this.muxedConns[pi.id.toB58String()].muxer = muxedConn
this.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs
})
}
})
}
// enable the Identify protocol
this.identify = false
this.connection.reuse = () => {
this.identify = true
this.handle(identify.multicodec, identify.handler(peerInfo, this))
}
const self = this // prefered this to bind
// incomming connection handler
function connHandler (conn) {
var msS = new multistream.Select()
Object.keys(self.protocols).forEach((protocol) => {
if (!protocol) { return }
msS.addHandler(protocol, self.protocols[protocol])
})
msS.handle(conn)
}
// higher level (public) API
this.dial = (pi, protocol, callback) => {
var pt = null
if (typeof protocol === 'function') {
callback = protocol
protocol = null
} else {
pt = new DuplexPassThrough()
}
const b58Id = pi.id.toB58String()
if (!this.muxedConns[b58Id]) {
if (!this.conns[b58Id]) {
attemptDial(pi, (err, conn) => {
if (err) {
return callback(err)
}
gotWarmedUpConn(conn)
})
} else {
const conn = this.conns[b58Id]
this.conns[b58Id] = undefined
gotWarmedUpConn(conn)
}
} else {
gotMuxer(this.muxedConns[b58Id].muxer)
}
return pt
function gotWarmedUpConn (conn) {
attemptMuxerUpgrade(conn, (err, muxer) => {
if (!protocol) {
if (err) {
self.conns[b58Id] = conn
}
return callback()
}
if (err) {
// couldn't upgrade to Muxer, it is ok
protocolHandshake(conn, protocol, callback)
} else {
gotMuxer(muxer)
}
})
}
function gotMuxer (muxer) {
openConnInMuxedConn(muxer, (conn) => {
protocolHandshake(conn, protocol, callback)
})
}
function attemptDial (pi, cb) {
const tKeys = Object.keys(self.transports)
nextTransport(tKeys.shift())
function nextTransport (key) {
const multiaddrs = pi.multiaddrs.slice()
self.transport.dial(key, multiaddrs, (err, conn) => {
if (err) {
if (tKeys.length === 0) {
return cb(new Error('Could not dial in any of the transports'))
}
return nextTransport(tKeys.shift())
}
cb(null, conn)
})
}
}
function attemptMuxerUpgrade (conn, cb) {
const muxers = Object.keys(self.muxers)
if (muxers.length === 0) {
return cb(new Error('no muxers available'))
}
// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler
nextMuxer(muxers.shift())
function nextMuxer (key) {
var msI = new multistream.Interactive()
msI.handle(conn, function () {
msI.select(key, (err, conn) => {
if (err) {
if (muxers.length === 0) {
cb(new Error('could not upgrade to stream muxing'))
} else {
nextMuxer(muxers.shift())
}
return
}
const muxedConn = self.muxers[key](conn, false)
self.muxedConns[b58Id] = {}
self.muxedConns[b58Id].muxer = muxedConn
self.muxedConns[b58Id].conn = conn
// in case identify is on
muxedConn.on('stream', connHandler)
cb(null, muxedConn)
})
})
}
}
function openConnInMuxedConn (muxer, cb) {
cb(muxer.newStream())
}
function protocolHandshake (conn, protocol, cb) {
var msI = new multistream.Interactive()
msI.handle(conn, function () {
msI.select(protocol, (err, conn) => {
if (err) {
return callback(err)
}
pt.wrapStream(conn)
callback(null, pt)
})
})
}
}
this.handle = (protocol, handler) => {
this.protocols[protocol] = handler
}
this.close = (callback) => {
var count = 0
Object.keys(this.muxedConns).forEach((key) => {
this.muxedConns[key].muxer.end()
})
Object.keys(this.transports).forEach((key) => {
this.transports[key].close(() => {
if (++count === Object.keys(this.transports).length) {
callback()
}
})
})
}
}
function noop () {}

View File

@ -1,2 +0,0 @@
exports = module.exports = require('spdy-stream-muxer')
// exports = module.exports = require('multiplex-stream-muxer')

View File

@ -1,189 +0,0 @@
var tcp = require('net')
var Select = require('multistream-select').Select
var Interactive = require('multistream-select').Interactive
var Muxer = require('./stream-muxer')
var log = require('ipfs-logger').group('swarm')
var async = require('async')
var EventEmitter = require('events').EventEmitter
var util = require('util')
exports = module.exports = Swarm
util.inherits(Swarm, EventEmitter)
function Swarm () {
var self = this
if (!(self instanceof Swarm)) {
throw new Error('Swarm must be called with new')
}
self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001
self.connections = {} // {peerIdB58: {conn: <>, socket: <>}
self.handles = {}
// set the listener
self.listen = function (port, ready) {
if (!ready) {
ready = function noop () {}
}
if (typeof port === 'function') {
ready = port
} else if (port) { self.port = port }
//
self.listener = tcp.createServer(function (socket) {
errorUp(self, socket)
var ms = new Select()
ms.handle(socket)
ms.addHandler('/spdy/3.1.0', function (ds) {
log.info('Negotiated spdy with incoming socket')
var conn = new Muxer().attach(ds, true)
// attach multistream handlers to incoming streams
conn.on('stream', registerHandles)
errorUp(self, conn)
// FOR IDENTIFY
self.emit('connection-unknown', conn, socket)
// IDENTIFY DOES THIS FOR US
// conn.on('close', function () { delete self.connections[conn.peerId] })
})
}).listen(self.port, ready)
errorUp(self, self.listener)
}
// interface
// open stream account for connection reuse
self.openConnection = function (peer, cb) {
// If no connection open yet, open it
if (!self.connections[peer.id.toB58String()]) {
// Establish a socket with one of the addresses
var socket
async.eachSeries(peer.multiaddrs, function (multiaddr, next) {
if (socket) { return next() }
var tmp = tcp.connect(multiaddr.toOptions(), function () {
socket = tmp
errorUp(self, socket)
next()
})
tmp.once('error', function (err) {
log.warn(multiaddr.toString(), 'on', peer.id.toB58String(), 'not available', err)
next()
})
}, function done () {
if (!socket) {
return cb(new Error('Not able to open a scoket with peer - ',
peer.id.toB58String()))
}
gotSocket(socket)
})
} else {
cb()
}
// do the spdy people dance (multistream-select into spdy)
function gotSocket (socket) {
var msi = new Interactive()
msi.handle(socket, function () {
msi.select('/spdy/3.1.0', function (err, ds) {
if (err) { cb(err) }
var conn = new Muxer().attach(ds, false)
conn.on('stream', registerHandles)
self.connections[peer.id.toB58String()] = {
conn: conn,
socket: socket
}
conn.on('close', function () { delete self.connections[peer.id.toB58String()]})
errorUp(self, conn)
cb()
})
})
}
}
self.openStream = function (peer, protocol, cb) {
self.openConnection(peer, function (err) {
if (err) {
return cb(err)
}
// spawn new muxed stream
var conn = self.connections[peer.id.toB58String()].conn
conn.dialStream(function (err, stream) {
if (err) { return cb(err) }
errorUp(self, stream)
// negotiate desired protocol
var msi = new Interactive()
msi.handle(stream, function () {
msi.select(protocol, function (err, ds) {
if (err) { return cb(err) }
peer.lastSeen = new Date()
cb(null, ds) // return the stream
})
})
})
})
}
self.registerHandler = function (protocol, handlerFunc) {
if (self.handles[protocol]) {
return handlerFunc(new Error('Handle for protocol already exists', protocol))
}
self.handles[protocol] = handlerFunc
log.info('Registered handler for protocol:', protocol)
}
self.closeConns = function (cb) {
var keys = Object.keys(self.connections)
var number = keys.length
if (number === 0) { cb() }
var c = new Counter(number, cb)
keys.forEach(function (key) {
self.connections[key].conn.end()
c.hit()
})
}
self.closeListener = function (cb) {
self.listener.close(cb)
}
function registerHandles (stream) {
log.info('Registering protocol handlers on new stream')
errorUp(self, stream)
var msH = new Select()
msH.handle(stream)
Object.keys(self.handles).forEach(function (protocol) {
msH.addHandler(protocol, self.handles[protocol])
})
}
}
function errorUp (self, emitter) {
emitter.on('error', function (err) {
self.emit('error', err)
})
}
function Counter (target, callback) {
var c = 0
this.hit = count
function count () {
c += 1
if (c === target) { callback() }
}
}

12
tests/00-basic-test.js Normal file
View File

@ -0,0 +1,12 @@
/* eslint-env mocha */
const expect = require('chai').expect
const Swarm = require('../src')
describe('basics', () => {
it('throws on missing peerInfo', (done) => {
expect(Swarm).to.throw(Error)
done()
})
})

View File

@ -0,0 +1,165 @@
/* eslint-env mocha */
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const Peer = require('peer-info')
const Swarm = require('../src')
const TCP = require('libp2p-tcp')
const bl = require('bl')
describe('transport - tcp', function () {
this.timeout(10000)
var swarmA
var swarmB
var peerA = new Peer()
var peerB = new Peer()
before((done) => {
peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9888'))
peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999'))
swarmA = new Swarm(peerA)
swarmB = new Swarm(peerB)
done()
})
it('add', (done) => {
swarmA.transport.add('tcp', new TCP())
expect(Object.keys(swarmA.transports).length).to.equal(1)
swarmB.transport.add('tcp', new TCP(), () => {
expect(Object.keys(swarmB.transports).length).to.equal(1)
done()
})
})
it('listen', (done) => {
var count = 0
swarmA.transport.listen('tcp', {}, (conn) => {
conn.pipe(conn)
}, ready)
swarmB.transport.listen('tcp', {}, (conn) => {
conn.pipe(conn)
}, ready)
function ready () {
if (++count === 2) {
expect(peerA.multiaddrs.length).to.equal(1)
expect(peerA.multiaddrs[0]).to.deep.equal(multiaddr('/ip4/127.0.0.1/tcp/9888'))
expect(peerB.multiaddrs.length).to.equal(1)
expect(peerB.multiaddrs[0]).to.deep.equal(multiaddr('/ip4/127.0.0.1/tcp/9999'))
done()
}
}
})
it('dial to a multiaddr', (done) => {
const conn = swarmA.transport.dial('tcp', multiaddr('/ip4/127.0.0.1/tcp/9999'), (err, conn) => {
expect(err).to.not.exist
})
conn.pipe(bl((err, data) => {
expect(err).to.not.exist
done()
}))
conn.write('hey')
conn.end()
})
it('dial to set of multiaddr, only one is available', (done) => {
const conn = swarmA.transport.dial('tcp', [
multiaddr('/ip4/127.0.0.1/tcp/9910/websockets'), // not valid on purpose
multiaddr('/ip4/127.0.0.1/tcp/9910'),
multiaddr('/ip4/127.0.0.1/tcp/9999'),
multiaddr('/ip4/127.0.0.1/tcp/9309')
], (err, conn) => {
expect(err).to.not.exist
})
conn.pipe(bl((err, data) => {
expect(err).to.not.exist
done()
}))
conn.write('hey')
conn.end()
})
it('close', (done) => {
var count = 0
swarmA.transport.close('tcp', closed)
swarmB.transport.close('tcp', closed)
function closed () {
if (++count === 2) {
done()
}
}
})
it('support port 0', (done) => {
var swarm
var peer = new Peer()
peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0'))
swarm = new Swarm(peer)
swarm.transport.add('tcp', new TCP())
swarm.transport.listen('tcp', {}, (conn) => {
conn.pipe(conn)
}, ready)
function ready () {
expect(peer.multiaddrs.length).to.equal(1)
expect(peer.multiaddrs[0]).to.not.deep.equal(multiaddr('/ip4/127.0.0.1/tcp/0'))
swarm.close(done)
}
})
it('support addr /ip4/0.0.0.0/tcp/9050', (done) => {
var swarm
var peer = new Peer()
peer.multiaddr.add(multiaddr('/ip4/0.0.0.0/tcp/9050'))
swarm = new Swarm(peer)
swarm.transport.add('tcp', new TCP())
swarm.transport.listen('tcp', {}, (conn) => {
conn.pipe(conn)
}, ready)
function ready () {
expect(peer.multiaddrs.length).to.equal(1)
expect(peer.multiaddrs[0]).to.deep.equal(multiaddr('/ip4/0.0.0.0/tcp/9050'))
swarm.close(done)
}
})
it('support addr /ip4/0.0.0.0/tcp/0', (done) => {
var swarm
var peer = new Peer()
peer.multiaddr.add(multiaddr('/ip4/0.0.0.0/tcp/0'))
swarm = new Swarm(peer)
swarm.transport.add('tcp', new TCP())
swarm.transport.listen('tcp', {}, (conn) => {
conn.pipe(conn)
}, ready)
function ready () {
expect(peer.multiaddrs.length).to.equal(1)
expect(peer.multiaddrs[0]).to.not.deep.equal(multiaddr('/ip4/0.0.0.0/tcp/0'))
swarm.close(done)
}
})
it('listen in several addrs', (done) => {
var swarm
var peer = new Peer()
peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001'))
peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002'))
peer.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003'))
swarm = new Swarm(peer)
swarm.transport.add('tcp', new TCP())
swarm.transport.listen('tcp', {}, (conn) => {
conn.pipe(conn)
}, ready)
function ready () {
expect(peer.multiaddrs.length).to.equal(3)
swarm.close(done)
}
})
})

View File

@ -0,0 +1,12 @@
/* eslint-env mocha */
describe('transport - utp', function () {
this.timeout(10000)
before((done) => { done() })
it.skip('add', (done) => {})
it.skip('listen', (done) => {})
it.skip('dial', (done) => {})
it.skip('close', (done) => {})
})

View File

@ -0,0 +1,92 @@
/* eslint-env mocha */
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const Peer = require('peer-info')
const Swarm = require('../src')
const WebSockets = require('libp2p-websockets')
const bl = require('bl')
describe('transport - websockets', function () {
this.timeout(10000)
var swarmA
var swarmB
var peerA = new Peer()
var peerB = new Peer()
before((done) => {
peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9888/websockets'))
peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9999/websockets'))
swarmA = new Swarm(peerA)
swarmB = new Swarm(peerB)
done()
})
it('add', (done) => {
swarmA.transport.add('ws', new WebSockets())
expect(Object.keys(swarmA.transports).length).to.equal(1)
swarmB.transport.add('ws', new WebSockets(), () => {
expect(Object.keys(swarmB.transports).length).to.equal(1)
done()
})
})
it('listen', (done) => {
var count = 0
swarmA.transport.listen('ws', {}, (conn) => {
conn.pipe(conn)
}, ready)
swarmB.transport.listen('ws', {}, (conn) => {
conn.pipe(conn)
}, ready)
function ready () {
if (++count === 2) {
expect(peerA.multiaddrs.length).to.equal(1)
expect(peerA.multiaddrs[0]).to.deep.equal(multiaddr('/ip4/127.0.0.1/tcp/9888/websockets'))
expect(peerB.multiaddrs.length).to.equal(1)
expect(peerB.multiaddrs[0]).to.deep.equal(multiaddr('/ip4/127.0.0.1/tcp/9999/websockets'))
done()
}
}
})
it('dial', (done) => {
const conn = swarmA.transport.dial('ws', multiaddr('/ip4/127.0.0.1/tcp/9999/websockets'), (err, conn) => {
expect(err).to.not.exist
})
conn.pipe(bl((err, data) => {
expect(err).to.not.exist
done()
}))
conn.write('hey')
conn.end()
})
it('dial (conn from callback)', (done) => {
swarmA.transport.dial('ws', multiaddr('/ip4/127.0.0.1/tcp/9999/websockets'), (err, conn) => {
expect(err).to.not.exist
conn.pipe(bl((err, data) => {
expect(err).to.not.exist
done()
}))
conn.write('hey')
conn.end()
})
})
it('close', (done) => {
var count = 0
swarmA.transport.close('ws', closed)
swarmB.transport.close('ws', closed)
function closed () {
if (++count === 2) {
done()
}
}
})
})

View File

@ -0,0 +1,130 @@
/* eslint-env mocha */
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const Peer = require('peer-info')
const Swarm = require('../src')
const TCP = require('libp2p-tcp')
const multiplex = require('libp2p-spdy')
describe('stream muxing with multiplex (on TCP)', function () {
this.timeout(20000)
var swarmA
var peerA
var swarmB
var peerB
var swarmC
var peerC
before((done) => {
peerA = new Peer()
peerB = new Peer()
peerC = new Peer()
// console.log('peer A', peerA.id.toB58String())
// console.log('peer B', peerB.id.toB58String())
// console.log('peer C', peerC.id.toB58String())
peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001'))
peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002'))
peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003'))
swarmA = new Swarm(peerA)
swarmB = new Swarm(peerB)
swarmC = new Swarm(peerC)
swarmA.transport.add('tcp', new TCP())
swarmA.transport.listen('tcp', {}, null, ready)
swarmB.transport.add('tcp', new TCP())
swarmB.transport.listen('tcp', {}, null, ready)
swarmC.transport.add('tcp', new TCP())
swarmC.transport.listen('tcp', {}, null, ready)
var counter = 0
function ready () {
if (++counter === 3) {
done()
}
}
})
after((done) => {
var counter = 0
swarmA.close(closed)
swarmB.close(closed)
swarmC.close(closed)
function closed () {
if (++counter === 3) {
done()
}
}
})
it('add', (done) => {
swarmA.connection.addStreamMuxer(multiplex)
swarmB.connection.addStreamMuxer(multiplex)
swarmC.connection.addStreamMuxer(multiplex)
done()
})
it('handle + dial on protocol', (done) => {
swarmB.handle('/abacaxi/1.0.0', (conn) => {
conn.pipe(conn)
})
swarmA.dial(peerB, '/abacaxi/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmA.muxedConns).length).to.equal(1)
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
})
})
it('dial to warm conn', (done) => {
swarmB.dial(peerA, (err) => {
expect(err).to.not.exist
expect(Object.keys(swarmB.conns).length).to.equal(0)
expect(Object.keys(swarmB.muxedConns).length).to.equal(1)
done()
})
})
it('dial on protocol, reuse warmed conn', (done) => {
swarmA.handle('/papaia/1.0.0', (conn) => {
conn.pipe(conn)
})
swarmB.dial(peerA, '/papaia/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmB.conns).length).to.equal(0)
expect(Object.keys(swarmB.muxedConns).length).to.equal(1)
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
})
})
it('enable identify to reuse incomming muxed conn', (done) => {
swarmA.connection.reuse()
swarmC.connection.reuse()
swarmC.dial(peerA, (err) => {
expect(err).to.not.exist
setTimeout(() => {
expect(Object.keys(swarmC.muxedConns).length).to.equal(1)
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
done()
}, 500)
})
})
})

View File

@ -0,0 +1,130 @@
/* eslint-env mocha */
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const Peer = require('peer-info')
const Swarm = require('../src')
const TCP = require('libp2p-tcp')
const spdy = require('libp2p-spdy')
describe('stream muxing with spdy (on TCP)', function () {
this.timeout(20000)
var swarmA
var peerA
var swarmB
var peerB
var swarmC
var peerC
before((done) => {
peerA = new Peer()
peerB = new Peer()
peerC = new Peer()
// console.log('peer A', peerA.id.toB58String())
// console.log('peer B', peerB.id.toB58String())
// console.log('peer C', peerC.id.toB58String())
peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001'))
peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002'))
peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003'))
swarmA = new Swarm(peerA)
swarmB = new Swarm(peerB)
swarmC = new Swarm(peerC)
swarmA.transport.add('tcp', new TCP())
swarmA.transport.listen('tcp', {}, null, ready)
swarmB.transport.add('tcp', new TCP())
swarmB.transport.listen('tcp', {}, null, ready)
swarmC.transport.add('tcp', new TCP())
swarmC.transport.listen('tcp', {}, null, ready)
var counter = 0
function ready () {
if (++counter === 3) {
done()
}
}
})
after((done) => {
var counter = 0
swarmA.close(closed)
swarmB.close(closed)
swarmC.close(closed)
function closed () {
if (++counter === 3) {
done()
}
}
})
it('add', (done) => {
swarmA.connection.addStreamMuxer(spdy)
swarmB.connection.addStreamMuxer(spdy)
swarmC.connection.addStreamMuxer(spdy)
done()
})
it('handle + dial on protocol', (done) => {
swarmB.handle('/abacaxi/1.0.0', (conn) => {
conn.pipe(conn)
})
swarmA.dial(peerB, '/abacaxi/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmA.muxedConns).length).to.equal(1)
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
})
})
it('dial to warm conn', (done) => {
swarmB.dial(peerA, (err) => {
expect(err).to.not.exist
expect(Object.keys(swarmB.conns).length).to.equal(0)
expect(Object.keys(swarmB.muxedConns).length).to.equal(1)
done()
})
})
it('dial on protocol, reuse warmed conn', (done) => {
swarmA.handle('/papaia/1.0.0', (conn) => {
conn.pipe(conn)
})
swarmB.dial(peerA, '/papaia/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmB.conns).length).to.equal(0)
expect(Object.keys(swarmB.muxedConns).length).to.equal(1)
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
})
})
it('enable identify to reuse incomming muxed conn', (done) => {
swarmA.connection.reuse()
swarmC.connection.reuse()
swarmC.dial(peerA, (err) => {
expect(err).to.not.exist
setTimeout(() => {
expect(Object.keys(swarmC.muxedConns).length).to.equal(1)
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
done()
}, 500)
})
})
})

View File

@ -0,0 +1,12 @@
/* eslint-env mocha */
describe('secio conn upgrade (on TCP)', function () {
this.timeout(20000)
before((done) => { done() })
after((done) => { done() })
it.skip('add', (done) => {})
it.skip('dial', (done) => {})
it.skip('tls on a muxed stream (not the full conn)', (done) => {})
})

View File

@ -0,0 +1,10 @@
/* eslint-env mocha */
describe('tls conn upgrade (on TCP)', function () {
before((done) => { done() })
after((done) => { done() })
it.skip('add', (done) => {})
it.skip('dial', (done) => {})
it.skip('tls on a muxed stream (not the full conn)', (done) => {})
})

View File

@ -0,0 +1,105 @@
/* eslint-env mocha */
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const Peer = require('peer-info')
const Swarm = require('../src')
const TCP = require('libp2p-tcp')
describe('high level API - 1st without stream multiplexing (on TCP)', function () {
this.timeout(20000)
var swarmA
var peerA
var swarmB
var peerB
before((done) => {
peerA = new Peer()
peerB = new Peer()
peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001'))
peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002'))
swarmA = new Swarm(peerA)
swarmB = new Swarm(peerB)
swarmA.transport.add('tcp', new TCP())
swarmA.transport.listen('tcp', {}, null, ready)
swarmB.transport.add('tcp', new TCP())
swarmB.transport.listen('tcp', {}, null, ready)
var counter = 0
function ready () {
if (++counter === 2) {
done()
}
}
})
after((done) => {
var counter = 0
swarmA.close(closed)
swarmB.close(closed)
function closed () {
if (++counter === 2) {
done()
}
}
})
it('handle a protocol', (done) => {
swarmB.handle('/bananas/1.0.0', (conn) => {
conn.pipe(conn)
})
expect(Object.keys(swarmB.protocols).length).to.equal(1)
done()
})
it('dial on protocol', (done) => {
swarmB.handle('/pineapple/1.0.0', (conn) => {
conn.pipe(conn)
})
swarmA.dial(peerB, '/pineapple/1.0.0', (err, conn) => {
expect(err).to.not.exist
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
})
})
it('dial on protocol (returned conn)', (done) => {
swarmB.handle('/apples/1.0.0', (conn) => {
conn.pipe(conn)
})
const conn = swarmA.dial(peerB, '/apples/1.0.0', (err) => {
expect(err).to.not.exist
})
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
})
it('dial to warm a conn', (done) => {
swarmA.dial(peerB, (err) => {
expect(err).to.not.exist
done()
})
})
it('dial on protocol, reuse warmed conn', (done) => {
swarmA.dial(peerB, '/bananas/1.0.0', (err, conn) => {
expect(err).to.not.exist
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
})
})
})

View File

@ -0,0 +1,196 @@
/* eslint-env mocha */
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const Peer = require('peer-info')
const Swarm = require('../src')
const TCP = require('libp2p-tcp')
const WebSockets = require('libp2p-websockets')
const spdy = require('libp2p-spdy')
describe('high level API - with everything mixed all together!', function () {
this.timeout(20000)
var swarmA // tcp
var peerA
var swarmB // tcp+ws
var peerB
var swarmC // tcp+ws
var peerC
var swarmD // ws
var peerD
var swarmE // ws
var peerE
before((done) => {
peerA = new Peer()
peerB = new Peer()
peerC = new Peer()
peerD = new Peer()
peerE = new Peer()
// console.log('peer A', peerA.id.toB58String())
// console.log('peer B', peerB.id.toB58String())
// console.log('peer C', peerC.id.toB58String())
swarmA = new Swarm(peerA)
swarmB = new Swarm(peerB)
swarmC = new Swarm(peerC)
swarmD = new Swarm(peerD)
swarmE = new Swarm(peerE)
done()
})
after((done) => {
var counter = 0
swarmA.close(closed)
swarmB.close(closed)
swarmC.close(closed)
swarmD.close(closed)
swarmE.close(closed)
function closed () {
if (++counter === 4) {
done()
}
}
})
it('add tcp', (done) => {
peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0'))
peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0'))
peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/0'))
swarmA.transport.add('tcp', new TCP())
swarmA.transport.listen('tcp', {}, null, ready)
swarmB.transport.add('tcp', new TCP())
swarmB.transport.listen('tcp', {}, null, ready)
swarmC.transport.add('tcp', new TCP())
swarmC.transport.listen('tcp', {}, null, ready)
var counter = 0
function ready () {
if (++counter === 3) {
done()
}
}
})
it.skip('add utp', (done) => {})
it('add websockets', (done) => {
peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9012/websockets'))
peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9022/websockets'))
peerD.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9032/websockets'))
peerE.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9042/websockets'))
swarmB.transport.add('ws', new WebSockets())
swarmB.transport.listen('ws', {}, null, ready)
swarmC.transport.add('ws', new WebSockets())
swarmC.transport.listen('ws', {}, null, ready)
swarmD.transport.add('ws', new WebSockets())
swarmD.transport.listen('ws', {}, null, ready)
swarmE.transport.add('ws', new WebSockets())
swarmE.transport.listen('ws', {}, null, ready)
var counter = 0
function ready () {
if (++counter === 4) {
done()
}
}
})
it('add spdy', (done) => {
swarmA.connection.addStreamMuxer(spdy)
swarmB.connection.addStreamMuxer(spdy)
swarmC.connection.addStreamMuxer(spdy)
swarmD.connection.addStreamMuxer(spdy)
swarmE.connection.addStreamMuxer(spdy)
swarmA.connection.reuse()
swarmB.connection.reuse()
swarmC.connection.reuse()
swarmD.connection.reuse()
swarmE.connection.reuse()
done()
})
it.skip('add multiplex', (done) => {})
it('dial from tcp to tcp+ws', (done) => {
swarmB.handle('/anona/1.0.0', (conn) => {
conn.pipe(conn)
})
swarmA.dial(peerB, '/anona/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmA.muxedConns).length).to.equal(1)
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
})
})
it('dial from ws to ws', (done) => {
swarmE.handle('/abacaxi/1.0.0', (conn) => {
conn.pipe(conn)
})
swarmD.dial(peerE, '/abacaxi/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmD.muxedConns).length).to.equal(1)
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', () => {
setTimeout(() => {
expect(Object.keys(swarmE.muxedConns).length).to.equal(1)
done()
}, 1000)
})
})
})
it('dial from tcp to tcp+ws (returned conn)', (done) => {
swarmB.handle('/grapes/1.0.0', (conn) => {
conn.pipe(conn)
})
const conn = swarmA.dial(peerB, '/grapes/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmA.muxedConns).length).to.equal(1)
})
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
})
it('dial from tcp+ws to tcp+ws', (done) => {
swarmC.handle('/mamao/1.0.0', (conn) => {
conn.pipe(conn)
})
swarmA.dial(peerC, '/mamao/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
})
})
})

View File

@ -0,0 +1,123 @@
/* eslint-env mocha */
const expect = require('chai').expect
const multiaddr = require('multiaddr')
const Id = require('peer-id')
const Peer = require('peer-info')
const Swarm = require('./../../src')
const WebSockets = require('libp2p-websockets')
const bl = require('bl')
describe('basics', () => {
it('throws on missing peerInfo', (done) => {
expect(Swarm).to.throw(Error)
done()
})
})
describe('transport - websockets', function () {
this.timeout(10000)
var swarm
before((done) => {
const b58IdSrc = 'QmYzgdesgjdvD3okTPGZT9NPmh1BuH5FfTVNKjsvaAprhb'
// use a pre generated Id to save time
const idSrc = Id.createFromB58String(b58IdSrc)
const peerSrc = new Peer(idSrc)
swarm = new Swarm(peerSrc)
done()
})
it('add', (done) => {
swarm.transport.add('ws', new WebSockets(), () => {
expect(Object.keys(swarm.transports).length).to.equal(1)
done()
})
})
it('dial', (done) => {
const ma = multiaddr('/ip4/127.0.0.1/tcp/9100/websockets')
const conn = swarm.transport.dial('ws', ma, (err, conn) => {
expect(err).to.not.exist
})
conn.pipe(bl((err, data) => {
expect(err).to.not.exist
expect(data.toString()).to.equal('hey')
done()
}))
conn.write('hey')
conn.end()
})
})
describe('high level API - 1st without stream multiplexing (on websockets)', function () {
this.timeout(10000)
var swarm
var peerDst
before((done) => {
const b58IdSrc = 'QmYzgdesgjdvD3okTPGZT9NPmh1BuH5FfTVNKjsvaAprhb'
// use a pre generated Id to save time
const idSrc = Id.createFromB58String(b58IdSrc)
const peerSrc = new Peer(idSrc)
swarm = new Swarm(peerSrc)
done()
})
after((done) => {
done()
// swarm.close(done)
})
it('add ws', (done) => {
swarm.transport.add('ws', new WebSockets())
expect(Object.keys(swarm.transports).length).to.equal(1)
done()
})
it('create Dst peer info', (done) => {
const b58IdDst = 'QmYzgdesgjdvD3okTPGZT9NPmh1BuH5FfTVNKjsvaAprhb'
// use a pre generated Id to save time
const idDst = Id.createFromB58String(b58IdDst)
peerDst = new Peer(idDst)
const ma = multiaddr('/ip4/127.0.0.1/tcp/9200/websockets')
peerDst.multiaddr.add(ma)
done()
})
it('dial on protocol', (done) => {
swarm.dial(peerDst, '/echo/1.0.0', (err, conn) => {
expect(err).to.not.exist
conn.pipe(bl((err, data) => {
expect(err).to.not.exist
expect(data.toString()).to.equal('hey')
done()
}))
conn.write('hey')
conn.end()
})
})
it('dial to warm a conn', (done) => {
swarm.dial(peerDst, (err) => {
expect(err).to.not.exist
done()
})
})
it('dial on protocol, reuse warmed conn', (done) => {
swarm.dial(peerDst, '/echo/1.0.0', (err, conn) => {
expect(err).to.not.exist
conn.end()
conn.on('data', () => {}) // let it flow.. let it flooooow
conn.on('end', done)
})
})
})

View File

@ -0,0 +1,65 @@
const Server = require('karma').Server
const path = require('path')
const Peer = require('peer-info')
const Id = require('peer-id')
const WebSockets = require('libp2p-websockets')
const Swarm = require('./../../src')
const multiaddr = require('multiaddr')
var swarmA
var swarmB
function createListeners (done) {
function createListenerA (cb) {
const b58IdA = 'QmWg2L4Fucx1x4KXJTfKHGixBJvveubzcd7DdhB2Mqwfh1'
const peerA = new Peer(Id.createFromB58String(b58IdA))
const maA = multiaddr('/ip4/127.0.0.1/tcp/9100/websockets')
peerA.multiaddr.add(maA)
swarmA = new Swarm(peerA)
swarmA.transport.add('ws', new WebSockets())
swarmA.transport.listen('ws', {}, echo, cb)
}
function createListenerB (cb) {
const b58IdB = 'QmRy1iU6BHmG5Hd8rnPhPL98cy1W1przUSTAMcGDq9yAAV'
const maB = multiaddr('/ip4/127.0.0.1/tcp/9200/websockets')
const peerB = new Peer(Id.createFromB58String(b58IdB))
peerB.multiaddr.add(maB)
swarmB = new Swarm(peerB)
swarmB.transport.add('ws', new WebSockets())
swarmB.transport.listen('ws', {}, null, cb)
swarmB.handle('/echo/1.0.0', echo)
}
var count = 0
const ready = () => ++count === 2 ? done() : null
createListenerA(ready)
createListenerB(ready)
function echo (conn) {
conn.pipe(conn)
}
}
function stop (done) {
var count = 0
const ready = () => ++count === 2 ? done() : null
swarmA.transport.close('ws', ready)
swarmB.transport.close('ws', ready)
}
function run (done) {
const karma = new Server({
configFile: path.join(__dirname, '../../karma.conf.js')
}, done)
karma.start()
}
createListeners(() => run(() => stop(() => null)))

View File

@ -1,139 +0,0 @@
var Lab = require('lab')
var Code = require('code')
var lab = exports.lab = Lab.script()
var experiment = lab.experiment
var test = lab.test
var beforeEach = lab.beforeEach
var afterEach = lab.afterEach
var expect = Code.expect
var Muxer = require('./../src/stream-muxer.js')
var multistream = require('multistream-select')
var Interactive = multistream.Interactive
var Select = multistream.Select
var streamPair = require('stream-pair')
beforeEach(function (done) {
done()
})
afterEach(function (done) {
done()
})
experiment('MULTISTREAM AND STREAM MUXER', function () {
test('Open a socket and multistream-select it into spdy', function (done) {
var pair = streamPair.create()
var msI = new Interactive()
var msS = new Select()
var dialerMuxer = new Muxer()
var listenerMuxer = new Muxer()
msS.handle(pair.other)
msS.addHandler('/spdy/0.3.1', function (stream) {
var listenerConn = listenerMuxer.attach(stream, true)
expect(typeof listenerConn).to.be.equal('object')
done()
})
msI.handle(pair, function () {
msI.select('/spdy/0.3.1', function (err, stream) {
expect(err).to.not.be.instanceof(Error)
var dialerConn = dialerMuxer.attach(stream, false)
expect(typeof dialerConn).to.be.equal('object')
})
})
})
test('socket->ms-select into spdy->stream from dialer->ms-select into other protocol', function (done) {
var pair = streamPair.create()
var msI = new Interactive()
var msS = new Select()
var dialerMuxer = new Muxer()
var listenerMuxer = new Muxer()
msS.handle(pair.other)
msS.addHandler('/spdy/0.3.1', function (stream) {
var listenerConn = listenerMuxer.attach(stream, true)
listenerConn.on('stream', function (stream) {
stream.on('data', function (chunk) {
expect(chunk.toString()).to.equal('mux all the streams')
done()
})
})
})
msI.handle(pair, function () {
msI.select('/spdy/0.3.1', function (err, stream) {
expect(err).to.not.be.instanceof(Error)
var dialerConn = dialerMuxer.attach(stream, false)
dialerConn.dialStream(function (err, stream) {
expect(err).to.not.be.instanceof(Error)
stream.write('mux all the streams')
})
})
})
})
test('socket->ms-select into spdy->stream from listener->ms-select into another protocol', function (done) {
var pair = streamPair.create()
var msI = new Interactive()
var msS = new Select()
var dialerMuxer = new Muxer()
var listenerMuxer = new Muxer()
msS.handle(pair.other)
msS.addHandler('/spdy/0.3.1', function (stream) {
var listenerConn = listenerMuxer.attach(stream, true)
listenerConn.on('stream', function (stream) {
stream.on('data', function (chunk) {
expect(chunk.toString()).to.equal('mux all the streams')
listenerConn.dialStream(function (err, stream) {
expect(err).to.not.be.instanceof(Error)
var msI2 = new Interactive()
msI2.handle(stream, function () {
msI2.select('/other/protocol', function (err, stream) {
expect(err).to.not.be.instanceof(Error)
stream.write('the other protocol')
})
})
})
})
})
})
msI.handle(pair, function () {
msI.select('/spdy/0.3.1', function (err, stream) {
expect(err).to.not.be.instanceof(Error)
var dialerConn = dialerMuxer.attach(stream, false)
dialerConn.dialStream(function (err, stream) {
expect(err).to.not.be.instanceof(Error)
stream.write('mux all the streams')
})
dialerConn.on('stream', function (stream) {
var msS2 = new Select()
msS2.handle(stream)
msS2.addHandler('/other/protocol', function (stream) {
stream.on('data', function (chunk) {
expect(chunk.toString()).to.equal('the other protocol')
done()
})
})
})
})
})
})
})

View File

@ -1,221 +0,0 @@
var Lab = require('lab')
var Code = require('code')
var sinon = require('sinon')
var lab = exports.lab = Lab.script()
var experiment = lab.experiment
var test = lab.test
var beforeEach = lab.beforeEach
var afterEach = lab.afterEach
var expect = Code.expect
var multiaddr = require('multiaddr')
var Id = require('ipfs-peer-id')
var Peer = require('ipfs-peer')
var Swarm = require('../src/')
var Identify = require('../src/identify')
var swarmA
var swarmB
var peerA
var peerB
beforeEach(function (done) {
swarmA = new Swarm()
swarmB = new Swarm()
var c = new Counter(2, done)
swarmA.listen(8100, function () {
peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmA.port)])
c.hit()
})
swarmB.listen(8101, function () {
peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmB.port)])
c.hit()
})
})
afterEach(function (done) {
swarmA.closeListener()
swarmB.closeListener()
done()
})
experiment('BASICS', function () {
experiment('Swarm', function () {
test('enforces instantiation with new', function (done) {
expect(function () {
Swarm()
}).to.throw('Swarm must be called with new')
done()
})
test('parses $IPFS_SWARM_PORT', function (done) {
process.env.IPFS_SWARM_PORT = 1111
var swarm = new Swarm()
expect(swarm.port).to.be.equal(1111)
process.env.IPFS_SWARM_PORT = undefined
done()
})
})
experiment('Swarm.listen', function (done) {
test('handles missing port', function (done) {
var swarm = new Swarm()
swarm.listen(done)
})
test('handles passed in port', function (done) {
var swarm = new Swarm()
swarm.listen(1234)
expect(swarm.port).to.be.equal(1234)
done()
})
})
experiment('Swarm.registerHandler', function () {
test('throws when registering a protcol handler twice', function (done) {
var swarm = new Swarm()
swarm.registerHandler('/sparkles/1.1.1', function () {})
swarm.registerHandler('/sparkles/1.1.1', function (err) {
expect(err).to.be.an.instanceOf(Error)
expect(err.message).to.be.equal('Handle for protocol already exists')
done()
})
})
})
experiment('Swarm.closeConns', function () {
test('calls end on all connections', function (done) {
swarmA.openConnection(peerB, function () {
var key = Object.keys(swarmA.connections)[0]
sinon.spy(swarmA.connections[key].conn, 'end')
swarmA.closeConns(function () {
expect(swarmA.connections[key].conn.end.called).to.be.equal(true)
done()
})
})
})
})
})
experiment('BASE', function () {
test('Open a stream', function (done) {
var protocol = '/sparkles/3.3.3'
var c = new Counter(2, done)
swarmB.registerHandler(protocol, function (stream) {
c.hit()
})
swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
c.hit()
})
})
test('Reuse connection (from dialer)', function (done) {
var protocol = '/sparkles/3.3.3'
swarmB.registerHandler(protocol, function (stream) {})
swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
expect(swarmA.connections.length === 1)
done()
})
})
})
test('Check for lastSeen', function (done) {
var protocol = '/sparkles/3.3.3'
swarmB.registerHandler(protocol, function (stream) {})
swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
expect(peerB.lastSeen).to.be.instanceof(Date)
done()
})
})
})
experiment('IDENTIFY', function () {
test('Attach Identify, open a stream, see a peer update', function (done) {
swarmA.on('error', function (err) {
console.log('A - ', err)
})
swarmB.on('error', function (err) {
console.log('B - ', err)
})
var protocol = '/sparkles/3.3.3'
var identifyA = new Identify(swarmA, peerA)
var identifyB = new Identify(swarmB, peerB)
setTimeout(function () {
swarmB.registerHandler(protocol, function (stream) {})
swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
})
identifyB.on('peer-update', function (answer) {
done()
})
identifyA.on('peer-update', function (answer) {})
}, 500)
})
test('Attach Identify, open a stream, reuse stream', function (done) {
console.log('\n\n\n')
var protocol = '/sparkles/3.3.3'
var identifyA = new Identify(swarmA, peerA)
var identifyB = new Identify(swarmB, peerB)
swarmA.registerHandler(protocol, function (stream) {})
swarmB.registerHandler(protocol, function (stream) {})
swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
})
identifyB.on('peer-update', function (answer) {
expect(Object.keys(swarmB.connections).length).to.equal(1)
swarmB.openStream(peerA, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
expect(Object.keys(swarmB.connections).length).to.equal(1)
done()
})
})
identifyA.on('peer-update', function (answer) {})
})
})
experiment('HARDNESS', function () {})
function Counter (target, callback) {
var c = 0
this.hit = count
function count () {
c += 1
if (c === target) {
callback()
}
}
}
// function checkErr (err) {
// console.log('err')
// expect(err).to.be.instanceof(Error)
// }