From 902f10d58d1062e812eb27aa0e2256e3fde5d3f6 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 26 Jan 2022 10:52:23 +0000 Subject: [PATCH] fix: reject connections when not running (#1146) When the node is shutting down, new connections can still be received. If this happens we can end up writing into the datastore when it's been closed which throws an error. Instead, if we're not running, have the connection manager close new incoming connections. --- src/connection-manager/index.js | 11 +++++++++++ src/peer-store/address-book.js | 2 +- test/dialing/direct.node.js | 31 ++++++++++++++++++++++++++----- test/dialing/direct.spec.js | 14 +++++++++++++- test/dialing/resolver.spec.js | 3 +++ test/registrar/registrar.spec.js | 4 ++++ test/upgrading/upgrader.spec.js | 3 +++ 7 files changed, 61 insertions(+), 7 deletions(-) diff --git a/src/connection-manager/index.js b/src/connection-manager/index.js index fa2cb402..eeae1559 100644 --- a/src/connection-manager/index.js +++ b/src/connection-manager/index.js @@ -223,6 +223,12 @@ class ConnectionManager extends EventEmitter { * @param {Connection} connection */ async onConnect (connection) { + if (!this._started) { + // This can happen when we are in the process of shutting down the node + await connection.close() + return + } + const peerId = connection.remotePeer const peerIdStr = peerId.toB58String() const storedConn = this.connections.get(peerIdStr) @@ -251,6 +257,11 @@ class ConnectionManager extends EventEmitter { * @returns {void} */ onDisconnect (connection) { + if (!this._started) { + // This can happen when we are in the process of shutting down the node + return + } + const peerId = connection.remotePeer.toB58String() let storedConn = this.connections.get(peerId) diff --git a/src/peer-store/address-book.js b/src/peer-store/address-book.js index b54ce981..391f9608 100644 --- a/src/peer-store/address-book.js +++ b/src/peer-store/address-book.js @@ -288,7 +288,7 @@ class PeerStoreAddressBook { updatedPeer = await this._store.mergeOrCreate(peerId, { addresses }) - log(`added multiaddrs for ${peerId}`) + log(`added multiaddrs for ${peerId.toB58String()}`) } finally { log('set release write lock') release() diff --git a/test/dialing/direct.node.js b/test/dialing/direct.node.js index 6842811a..81d8205d 100644 --- a/test/dialing/direct.node.js +++ b/test/dialing/direct.node.js @@ -307,6 +307,8 @@ describe('Dialing (direct, TCP)', () => { } }) + await libp2p.start() + sinon.spy(libp2p.dialer, 'connectToPeer') try { @@ -329,6 +331,8 @@ describe('Dialing (direct, TCP)', () => { } }) + await libp2p.start() + sinon.spy(libp2p.dialer, 'connectToPeer') const connection = await libp2p.dial(remoteAddr) @@ -337,7 +341,7 @@ describe('Dialing (direct, TCP)', () => { expect(stream).to.exist() expect(protocol).to.equal('/echo/1.0.0') await connection.close() - expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) + expect(libp2p.dialer.connectToPeer.callCount).to.be.greaterThan(0) }) it('should use the dialer for connecting to a peer', async () => { @@ -350,6 +354,8 @@ describe('Dialing (direct, TCP)', () => { } }) + await libp2p.start() + sinon.spy(libp2p.dialer, 'connectToPeer') await libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) @@ -359,7 +365,7 @@ describe('Dialing (direct, TCP)', () => { expect(stream).to.exist() expect(protocol).to.equal('/echo/1.0.0') await connection.close() - expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) + expect(libp2p.dialer.connectToPeer.callCount).to.be.greaterThan(0) }) it('should close all streams when the connection closes', async () => { @@ -372,6 +378,8 @@ describe('Dialing (direct, TCP)', () => { } }) + await libp2p.start() + // register some stream handlers to simulate several protocols await libp2p.handle('/stream-count/1', ({ stream }) => pipe(stream, stream)) await libp2p.handle('/stream-count/2', ({ stream }) => pipe(stream, stream)) @@ -397,8 +405,8 @@ describe('Dialing (direct, TCP)', () => { // Verify stream count const remoteConn = remoteLibp2p.connectionManager.get(libp2p.peerId) - expect(connection.streams).to.have.length(6) - expect(remoteConn.streams).to.have.length(6) + expect(connection.streams).to.have.length(5) + expect(remoteConn.streams).to.have.length(5) // Close the connection and verify all streams have been closed await connection.close() @@ -416,6 +424,8 @@ describe('Dialing (direct, TCP)', () => { } }) + await libp2p.start() + await expect(libp2p.dialProtocol(remotePeerId)) .to.eventually.be.rejectedWith(Error) .and.to.have.property('code', ErrorCodes.ERR_INVALID_PROTOCOLS_FOR_STREAM) @@ -435,6 +445,8 @@ describe('Dialing (direct, TCP)', () => { } }) + await libp2p.start() + const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() expect(connection.stat.timeline.close).to.not.exist() @@ -452,6 +464,8 @@ describe('Dialing (direct, TCP)', () => { } }) + await libp2p.start() + const connection = await libp2p.dial(`${remoteAddr.toString()}`) expect(connection).to.exist() expect(connection.stat.timeline.close).to.not.exist() @@ -474,6 +488,8 @@ describe('Dialing (direct, TCP)', () => { sinon.spy(libp2p.upgrader.protector, 'protect') sinon.stub(remoteLibp2p.upgrader, 'protector').value(new Protector(swarmKeyBuffer)) + await libp2p.start() + const connection = await libp2p.dialer.connectToPeer(remoteAddr) expect(connection).to.exist() const { stream, protocol } = await connection.newStream('/echo/1.0.0') @@ -492,8 +508,10 @@ describe('Dialing (direct, TCP)', () => { connEncryption: [Crypto] } }) - const dials = 10 + await libp2p.start() + + const dials = 10 const fullAddress = remoteAddr.encapsulate(`/p2p/${remoteLibp2p.peerId.toB58String()}`) await libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) @@ -522,6 +540,9 @@ describe('Dialing (direct, TCP)', () => { connEncryption: [Crypto] } }) + + await libp2p.start() + const dials = 10 const error = new Error('Boom') sinon.stub(libp2p.transportManager, 'dial').callsFake(() => Promise.reject(error)) diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index 40193a90..aa0e45bb 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -459,14 +459,18 @@ describe('Dialing (direct, WebSockets)', () => { sinon.spy(libp2p.dialer, 'connectToPeer') sinon.spy(libp2p.peerStore.addressBook, 'add') + await libp2p.start() + const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() const { stream, protocol } = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() expect(protocol).to.equal('/echo/1.0.0') await connection.close() - expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) + expect(libp2p.dialer.connectToPeer.callCount).to.be.at.least(1) expect(libp2p.peerStore.addressBook.add.callCount).to.be.at.least(1) + + await libp2p.stop() }) it('should run identify automatically after connecting', async () => { @@ -489,6 +493,8 @@ describe('Dialing (direct, WebSockets)', () => { sinon.spy(libp2p.identifyService, 'identify') sinon.spy(libp2p.upgrader, 'onConnection') + await libp2p.start() + const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() @@ -501,6 +507,8 @@ describe('Dialing (direct, WebSockets)', () => { await libp2p.identifyService.identify.firstCall.returnValue expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1) + + await libp2p.stop() }) it('should be able to use hangup to close connections', async () => { @@ -520,11 +528,15 @@ describe('Dialing (direct, WebSockets)', () => { } }) + await libp2p.start() + const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() expect(connection.stat.timeline.close).to.not.exist() await libp2p.hangUp(connection.remotePeer) expect(connection.stat.timeline.close).to.exist() + + await libp2p.stop() }) it('should be able to use hangup when no connection exists', async () => { diff --git a/test/dialing/resolver.spec.js b/test/dialing/resolver.spec.js index f5088ae9..ab36881f 100644 --- a/test/dialing/resolver.spec.js +++ b/test/dialing/resolver.spec.js @@ -51,6 +51,9 @@ describe('Dialing (resolvable addresses)', () => { started: true, populateAddressBooks: false }) + + await libp2p.start() + await remoteLibp2p.start() }) afterEach(async () => { diff --git a/test/registrar/registrar.spec.js b/test/registrar/registrar.spec.js index 83f87fd6..fa9be0c8 100644 --- a/test/registrar/registrar.spec.js +++ b/test/registrar/registrar.spec.js @@ -123,6 +123,8 @@ describe('registrar', () => { } }) + await libp2p.start() + // Register protocol const identifier = await libp2p.registrar.register(topologyProps) const topology = libp2p.registrar.topologies.get(identifier) @@ -164,6 +166,8 @@ describe('registrar', () => { } }) + await libp2p.start() + // Register protocol const identifier = await libp2p.registrar.register(topologyProps) const topology = libp2p.registrar.topologies.get(identifier) diff --git a/test/upgrading/upgrader.spec.js b/test/upgrading/upgrader.spec.js index 6726b747..5caf63f2 100644 --- a/test/upgrading/upgrader.spec.js +++ b/test/upgrading/upgrader.spec.js @@ -403,6 +403,7 @@ describe('libp2p.upgrader', () => { connEncryption: [Crypto] } }) + await libp2p.start() const echoHandler = () => {} libp2p.handle(['/echo/1.0.0'], echoHandler) @@ -448,6 +449,8 @@ describe('libp2p.upgrader', () => { connectionGater }) + await libp2p.start() + const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) // Spy on emit for easy verification