Compare commits

..

1 Commits

Author SHA1 Message Date
10d7212373 fix: protocols change event
I was spelunking last night and found that pubsub was constantly having it's `onConnect` handler fired from the multicodec topology.

It was because bitswap was calling `dialProtocol` to connect to a peer. `dialProtocol` makes a `PeerInfo` out of the multiaddr it's given but it has no `protocols` i.e. `[]`. This is passed to `peerStore.update()` where we compare `[]` with an array of populated protocols. Obviously there is no intersection here so `change:protocols` was being emitted, even though no protocols were added or removed.

This logic needs to be improved and tested properly but I just wanted to send a PR to document my findings.
2020-02-06 09:23:30 +00:00
14 changed files with 84 additions and 109 deletions

View File

@ -1,13 +1,3 @@
<a name="0.27.3"></a>
## [0.27.3](https://github.com/libp2p/js-libp2p/compare/v0.27.2...v0.27.3) (2020-02-11)
### Bug Fixes
* dont allow multiaddr dials without a peer id ([#558](https://github.com/libp2p/js-libp2p/issues/558)) ([a317a8b](https://github.com/libp2p/js-libp2p/commit/a317a8b))
<a name="0.27.2"></a>
## [0.27.2](https://github.com/libp2p/js-libp2p/compare/v0.27.1...v0.27.2) (2020-02-05)

View File

@ -174,13 +174,15 @@ for (const [peerId, connections] of libp2p.connections) {
### dial
Dials to another peer in the network and establishes the connection.
`dial(peer, options)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| peer | [`PeerInfo`][peer-info]\|[`PeerId`][peer-id]\|[`Multiaddr`][multiaddr]\|`string` | The peer to dial. If a [`Multiaddr`][multiaddr] or its string is provided, it **must** include the peer id |
| peer | [`PeerInfo`][peer-info]\|[`PeerId`][peer-id]\|[`Multiaddr`][multiaddr]\|`string` | peer to dial |
| [options] | `Object` | dial options |
| [options.signal] | [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) | An `AbortSignal` instance obtained from an [`AbortController`](https://developer.mozilla.org/en-US/docs/Web/API/AbortController) that can be used to abort the connection before it completes |
@ -215,7 +217,7 @@ Dials to another peer in the network and selects a protocol to communicate with
| Name | Type | Description |
|------|------|-------------|
| peer | [`PeerInfo`][peer-info]\|[`PeerId`][peer-id]\|[`Multiaddr`][multiaddr]\|`string` | The peer to dial. If a [`Multiaddr`][multiaddr] or its string is provided, it **must** include the peer id |
| peer | [`PeerInfo`][peer-info]\|[`PeerId`][peer-id]\|[`Multiaddr`][multiaddr]\|`string` | peer to dial |
| protocols | `String|Array<String>` | A list of protocols (or single protocol) to negotiate with. Protocols are attempted in order until a match is made. (e.g '/ipfs/bitswap/1.1.0') |
| [options] | `Object` | dial options |
| [options.signal] | [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal) | An `AbortSignal` instance obtained from an [`AbortController`](https://developer.mozilla.org/en-US/docs/Web/API/AbortController) that can be used to abort the connection before it completes |

View File

@ -1,6 +1,6 @@
{
"name": "libp2p",
"version": "0.27.3",
"version": "0.27.2",
"description": "JavaScript implementation of libp2p, a modular peer to peer network stack",
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
"main": "src/index.js",
@ -83,10 +83,10 @@
"cids": "^0.7.1",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"interop-libp2p": "~0.0.1",
"it-concat": "^1.0.0",
"it-pair": "^1.0.0",
"it-pushable": "^1.4.0",
"interop-libp2p": "~0.0.1",
"libp2p-bootstrap": "^0.10.3",
"libp2p-delegated-content-routing": "^0.4.1",
"libp2p-delegated-peer-routing": "^0.4.0",

View File

@ -70,7 +70,7 @@ class Dialer {
async connectToPeer (peer, options = {}) {
const dialTarget = this._createDialTarget(peer)
if (dialTarget.addrs.length === 0) {
throw errCode(new Error('The dial request has no addresses'), codes.ERR_NO_VALID_ADDRESSES)
throw errCode(new Error('The dial request has no addresses'), 'ERR_NO_DIAL_MULTIADDRS')
}
const pendingDial = this._pendingDials.get(dialTarget.id) || this._createPendingDial(dialTarget, options)
@ -136,7 +136,7 @@ class Dialer {
*/
_createPendingDial (dialTarget, options) {
const dialAction = (addr, options) => {
if (options.signal.aborted) throw errCode(new Error('already aborted'), codes.ERR_ALREADY_ABORTED)
if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED')
return this.transportManager.dial(addr, options)
}
@ -197,7 +197,8 @@ class Dialer {
try {
peer = PeerId.createFromCID(peer.getPeerId())
} catch (err) {
throw errCode(new Error('The multiaddr did not contain a valid peer id'), codes.ERR_INVALID_PEER)
// Couldn't get the PeerId, just use the address
return peer
}
}

View File

@ -12,7 +12,6 @@ exports.codes = {
ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED',
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
ERR_ALREADY_ABORTED: 'ERR_ALREADY_ABORTED',
ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES',
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',

View File

@ -133,18 +133,15 @@ class PeerStore extends EventEmitter {
})
}
// Update protocols
// TODO: better track added and removed protocols
const protocolsIntersection = new Set(
[...recorded.protocols].filter((p) => peerInfo.protocols.has(p))
)
if (protocolsIntersection.size !== peerInfo.protocols.size ||
protocolsIntersection.size !== recorded.protocols.size) {
let isProtocolsChanged = false
for (const protocol of peerInfo.protocols) {
if (!recorded.protocols.has(protocol)) {
isProtocolsChanged = true
recorded.protocols.add(protocol)
}
}
if (isProtocolsChanged) {
this.emit('change:protocols', {
peerInfo: recorded,
protocols: Array.from(recorded.protocols)
@ -224,16 +221,12 @@ class PeerStore extends EventEmitter {
}
/**
* Returns the known multiaddrs for a given `PeerInfo`. All returned multiaddrs
* will include the encapsulated `PeerId` of the peer.
* Returns the known multiaddrs for a given `PeerInfo`
* @param {PeerInfo} peer
* @returns {Array<Multiaddr>}
*/
multiaddrsForPeer (peer) {
return this.put(peer, true).multiaddrs.toArray().map(addr => {
if (addr.getPeerId()) return addr
return addr.encapsulate(`/p2p/${peer.id.toB58String()}`)
})
return this.put(peer, true).multiaddrs.toArray()
}
}

View File

@ -43,7 +43,7 @@ describe('DHT subsystem operates correctly', () => {
remoteLibp2p.start()
])
remAddr = libp2p.peerStore.multiaddrsForPeer(remotePeerInfo)[0]
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
})
afterEach(() => Promise.all([
@ -98,7 +98,7 @@ describe('DHT subsystem operates correctly', () => {
await libp2p.start()
await remoteLibp2p.start()
remAddr = libp2p.peerStore.multiaddrsForPeer(remotePeerInfo)[0]
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
})
afterEach(() => Promise.all([

View File

@ -34,25 +34,20 @@ const Peers = require('../fixtures/peers')
const { createPeerInfo } = require('../utils/creators/peer')
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws/p2p/QmckxVrJw1Yo8LqvmDJNUmdAsKtSbiKWmrXJFyKmUraBoN')
const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws')
describe('Dialing (direct, TCP)', () => {
let remoteTM
let localTM
let peerStore
let remoteAddr
before(async () => {
const [remotePeerId] = await Promise.all([
PeerId.createFromJSON(Peers[0])
])
remoteTM = new TransportManager({
libp2p: {},
upgrader: mockUpgrader
})
remoteTM.add(Transport.prototype[Symbol.toStringTag], Transport)
peerStore = new PeerStore()
localTM = new TransportManager({
libp2p: {},
upgrader: mockUpgrader
@ -61,7 +56,7 @@ describe('Dialing (direct, TCP)', () => {
await remoteTM.listen([listenAddr])
remoteAddr = remoteTM.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`)
remoteAddr = remoteTM.getAddrs()[0]
})
after(() => remoteTM.close())
@ -71,7 +66,7 @@ describe('Dialing (direct, TCP)', () => {
})
it('should be able to connect to a remote node via its multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM, peerStore })
const dialer = new Dialer({ transportManager: localTM })
const connection = await dialer.connectToPeer(remoteAddr)
expect(connection).to.exist()
@ -79,7 +74,7 @@ describe('Dialing (direct, TCP)', () => {
})
it('should be able to connect to a remote node via its stringified multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM, peerStore })
const dialer = new Dialer({ transportManager: localTM })
const dialable = Dialer.getDialable(remoteAddr.toString())
const connection = await dialer.connectToPeer(dialable)
@ -88,7 +83,7 @@ describe('Dialing (direct, TCP)', () => {
})
it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM, peerStore })
const dialer = new Dialer({ transportManager: localTM })
await expect(dialer.connectToPeer(unsupportedAddr))
.to.eventually.be.rejectedWith(AggregateError)
@ -145,7 +140,6 @@ describe('Dialing (direct, TCP)', () => {
it('should abort dials on queue task timeout', async () => {
const dialer = new Dialer({
transportManager: localTM,
peerStore,
timeout: 50
})
sinon.stub(localTM, 'dial').callsFake(async (addr, options) => {
@ -230,7 +224,7 @@ describe('Dialing (direct, TCP)', () => {
remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream))
await remoteLibp2p.start()
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`)
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0]
})
afterEach(async () => {
@ -241,28 +235,6 @@ describe('Dialing (direct, TCP)', () => {
after(() => remoteLibp2p.stop())
it('should fail if no peer id is provided', async () => {
libp2p = new Libp2p({
peerInfo,
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
}
})
sinon.spy(libp2p.dialer, 'connectToPeer')
try {
await libp2p.dial(remoteLibp2p.transportManager.getAddrs()[0])
} catch (err) {
expect(err).to.have.property('code', ErrorCodes.ERR_INVALID_PEER)
return
}
expect.fail('dial should have failed')
})
it('should use the dialer for connecting to a multiaddr', async () => {
libp2p = new Libp2p({
peerInfo,
@ -295,8 +267,10 @@ describe('Dialing (direct, TCP)', () => {
})
sinon.spy(libp2p.dialer, 'connectToPeer')
const remotePeer = new PeerInfo(remoteLibp2p.peerInfo.id)
remotePeer.multiaddrs.add(remoteAddr)
const connection = await libp2p.dial(remotePeerInfo)
const connection = await libp2p.dial(remotePeer)
expect(connection).to.exist()
const { stream, protocol } = await connection.newStream('/echo/1.0.0')
expect(stream).to.exist()
@ -332,7 +306,7 @@ describe('Dialing (direct, TCP)', () => {
}
})
const connection = await libp2p.dial(`${remoteAddr.toString()}`)
const connection = await libp2p.dial(`${remoteAddr.toString()}/p2p/${remotePeerInfo.id.toB58String()}`)
expect(connection).to.exist()
expect(connection.stat.timeline.close).to.not.exist()
await libp2p.hangUp(connection.remotePeer)
@ -363,6 +337,33 @@ describe('Dialing (direct, TCP)', () => {
expect(libp2p.upgrader.protector.protect.callCount).to.equal(1)
})
it('should coalesce parallel dials to the same peer (no id in multiaddr)', async () => {
libp2p = new Libp2p({
peerInfo,
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [Crypto]
}
})
const dials = 10
const dialResults = await Promise.all([...new Array(dials)].map((_, index) => {
if (index % 2 === 0) return libp2p.dial(remoteLibp2p.peerInfo)
return libp2p.dial(remoteLibp2p.peerInfo.multiaddrs.toArray()[0])
}))
// All should succeed and we should have ten results
expect(dialResults).to.have.length(10)
for (const connection of dialResults) {
expect(Connection.isConnection(connection)).to.equal(true)
}
// We will have two connections, since the multiaddr dial doesn't have a peer id
expect(libp2p.connectionManager._connections.size).to.equal(2)
expect(remoteLibp2p.connectionManager._connections.size).to.equal(2)
})
it('should coalesce parallel dials to the same peer (id in multiaddr)', async () => {
libp2p = new Libp2p({
peerInfo,
@ -404,9 +405,10 @@ describe('Dialing (direct, TCP)', () => {
const error = new Error('Boom')
sinon.stub(libp2p.transportManager, 'dial').callsFake(() => Promise.reject(error))
const fullAddress = remoteAddr.encapsulate(`/p2p/${remoteLibp2p.peerInfo.id.toB58String()}`)
const dialResults = await pSettle([...new Array(dials)].map((_, index) => {
if (index % 2 === 0) return libp2p.dial(remoteLibp2p.peerInfo)
return libp2p.dial(remoteAddr)
return libp2p.dial(fullAddress)
}))
// All should succeed and we should have ten results

View File

@ -21,7 +21,6 @@ const { AbortError } = require('libp2p-interfaces/src/transport/errors')
const { codes: ErrorCodes } = require('../../src/errors')
const Constants = require('../../src/constants')
const Dialer = require('../../src/dialer')
const PeerStore = require('../../src/peer-store')
const TransportManager = require('../../src/transport-manager')
const Libp2p = require('../../src')
@ -30,15 +29,13 @@ const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser')
const mockUpgrader = require('../utils/mockUpgrader')
const createMockConnection = require('../utils/mockConnection')
const { createPeerId } = require('../utils/creators/peer')
const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws/p2p/QmckxVrJw1Yo8LqvmDJNUmdAsKtSbiKWmrXJFyKmUraBoN')
const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws')
const remoteAddr = MULTIADDRS_WEBSOCKETS[0]
describe('Dialing (direct, WebSockets)', () => {
let localTM
let peerStore
before(() => {
peerStore = new PeerStore()
localTM = new TransportManager({
libp2p: {},
upgrader: mockUpgrader,
@ -52,13 +49,13 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should have appropriate defaults', () => {
const dialer = new Dialer({ transportManager: localTM, peerStore })
const dialer = new Dialer({ transportManager: localTM })
expect(dialer.concurrency).to.equal(Constants.MAX_PARALLEL_DIALS)
expect(dialer.timeout).to.equal(Constants.DIAL_TIMEOUT)
})
it('should limit the number of tokens it provides', () => {
const dialer = new Dialer({ transportManager: localTM, peerStore })
const dialer = new Dialer({ transportManager: localTM })
const maxPerPeer = Constants.MAX_PER_PEER_DIALS
expect(dialer.tokens).to.have.length(Constants.MAX_PARALLEL_DIALS)
const tokens = dialer.getTokens(maxPerPeer + 1)
@ -67,14 +64,14 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should not return tokens if non are left', () => {
const dialer = new Dialer({ transportManager: localTM, peerStore })
const dialer = new Dialer({ transportManager: localTM })
sinon.stub(dialer, 'tokens').value([])
const tokens = dialer.getTokens(1)
expect(tokens.length).to.equal(0)
})
it('should NOT be able to return a token twice', () => {
const dialer = new Dialer({ transportManager: localTM, peerStore })
const dialer = new Dialer({ transportManager: localTM })
const tokens = dialer.getTokens(1)
expect(tokens).to.have.length(1)
expect(dialer.tokens).to.have.length(Constants.MAX_PARALLEL_DIALS - 1)
@ -110,7 +107,7 @@ describe('Dialing (direct, WebSockets)', () => {
})
it('should fail to connect to an unsupported multiaddr', async () => {
const dialer = new Dialer({ transportManager: localTM, peerStore })
const dialer = new Dialer({ transportManager: localTM })
await expect(dialer.connectToPeer(unsupportedAddr))
.to.eventually.be.rejectedWith(AggregateError)

View File

@ -123,10 +123,11 @@ describe('Dialing (via relay, TCP)', () => {
})
it('dialer should stay connected to an already connected relay on hop failure', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerInfo.id.toB58String()
const relayAddr = relayLibp2p.transportManager.getAddrs()[0].encapsulate(`/p2p/${relayIdString}`)
const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toB58String()}`)
await srcLibp2p.dial(relayAddr)
@ -141,15 +142,16 @@ describe('Dialing (via relay, TCP)', () => {
})
it('destination peer should stay connected to an already connected relay on hop failure', async () => {
const relayAddr = relayLibp2p.transportManager.getAddrs()[0]
const relayIdString = relayLibp2p.peerInfo.id.toB58String()
const relayAddr = relayLibp2p.transportManager.getAddrs()[0].encapsulate(`/p2p/${relayIdString}`)
const dialAddr = relayAddr
.encapsulate(`/p2p/${relayIdString}`)
.encapsulate(`/p2p-circuit/p2p/${dstLibp2p.peerInfo.id.toB58String()}`)
// Connect the destination peer and the relay
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}`)])
await dstLibp2p.transportManager.listen([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
// Tamper with the our multiaddrs for the circuit message

View File

@ -160,24 +160,6 @@ describe('peer-store', () => {
expect(removed).to.eql(true)
expect(peerStore.peers.size).to.equal(0)
})
it('should be able to remove a peer from store through its b58str id', async () => {
const [peerInfo] = await peerUtils.createPeerInfo()
const id = peerInfo.id
const ma1 = multiaddr('/ip4/127.0.0.1/tcp/4001')
const ma2 = multiaddr('/ip4/127.0.0.1/tcp/4002/ws')
peerInfo.multiaddrs.add(ma1)
peerInfo.multiaddrs.add(ma2)
const multiaddrs = peerStore.multiaddrsForPeer(peerInfo)
const expectedAddrs = [
ma1.encapsulate(`/p2p/${id.toB58String()}`),
ma2.encapsulate(`/p2p/${id.toB58String()}`)
]
expect(multiaddrs).to.eql(expectedAddrs)
})
})
describe('peer-store on discovery', () => {

View File

@ -26,6 +26,7 @@ const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
describe('Pubsub subsystem is able to use different implementations', () => {
let peerInfo, remotePeerInfo
let libp2p, remoteLibp2p
let remAddr
beforeEach(async () => {
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfo({ number: 2 })
@ -72,8 +73,9 @@ describe('Pubsub subsystem is able to use different implementations', () => {
])
const libp2pId = libp2p.peerInfo.id.toB58String()
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
const connection = await libp2p.dialProtocol(remotePeerInfo, multicodec)
const connection = await libp2p.dialProtocol(remAddr, multicodec)
expect(connection).to.exist()
libp2p.pubsub.subscribe(topic, (msg) => {

View File

@ -21,6 +21,7 @@ const remoteListenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
describe('Pubsub subsystem operates correctly', () => {
let peerInfo, remotePeerInfo
let libp2p, remoteLibp2p
let remAddr
beforeEach(async () => {
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfo({ number: 2 })
@ -43,6 +44,8 @@ describe('Pubsub subsystem operates correctly', () => {
libp2p.start(),
remoteLibp2p.start()
])
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
})
afterEach(() => Promise.all([
@ -55,7 +58,7 @@ describe('Pubsub subsystem operates correctly', () => {
})
it('should get notified of connected peers on dial', async () => {
const connection = await libp2p.dialProtocol(remotePeerInfo, subsystemMulticodecs)
const connection = await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
expect(connection).to.exist()
@ -71,7 +74,7 @@ describe('Pubsub subsystem operates correctly', () => {
const data = 'hey!'
const libp2pId = libp2p.peerInfo.id.toB58String()
await libp2p.dialProtocol(remotePeerInfo, subsystemMulticodecs)
await libp2p.dialProtocol(remAddr, subsystemMulticodecs)
let subscribedTopics = libp2p.pubsub.getTopics()
expect(subscribedTopics).to.not.include(topic)
@ -112,6 +115,8 @@ describe('Pubsub subsystem operates correctly', () => {
await libp2p.start()
await remoteLibp2p.start()
remAddr = remoteLibp2p.transportManager.getAddrs()[0]
})
afterEach(() => Promise.all([
@ -124,7 +129,7 @@ describe('Pubsub subsystem operates correctly', () => {
})
it('should get notified of connected peers after starting', async () => {
const connection = await libp2p.dial(remotePeerInfo)
const connection = await libp2p.dial(remAddr)
expect(connection).to.exist()
expect(libp2p.pubsub._pubsub.peers.size).to.be.eql(0)
@ -145,7 +150,7 @@ describe('Pubsub subsystem operates correctly', () => {
const topic = 'test-topic'
const data = 'hey!'
await libp2p.dial(remotePeerInfo)
await libp2p.dial(remAddr)
remoteLibp2p.pubsub.start()

View File

@ -29,7 +29,7 @@ describe('registrar on dial', () => {
}))
await remoteLibp2p.transportManager.listen([listenAddr])
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0].encapsulate(`/p2p/${remotePeerInfo.id.toB58String()}`)
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0]
})
after(async () => {