fix: reject connections when not running (#1146)

When the node is shutting down, new connections can still be received.

If this happens we can end up writing into the datastore when it's
been closed which throws an error.

Instead, if we're not running, have the connection manager close new
incoming connections.
This commit is contained in:
Alex Potsides
2022-01-26 10:52:23 +00:00
committed by GitHub
parent fc12973344
commit 902f10d58d
7 changed files with 61 additions and 7 deletions

View File

@ -223,6 +223,12 @@ class ConnectionManager extends EventEmitter {
* @param {Connection} connection * @param {Connection} connection
*/ */
async onConnect (connection) { async onConnect (connection) {
if (!this._started) {
// This can happen when we are in the process of shutting down the node
await connection.close()
return
}
const peerId = connection.remotePeer const peerId = connection.remotePeer
const peerIdStr = peerId.toB58String() const peerIdStr = peerId.toB58String()
const storedConn = this.connections.get(peerIdStr) const storedConn = this.connections.get(peerIdStr)
@ -251,6 +257,11 @@ class ConnectionManager extends EventEmitter {
* @returns {void} * @returns {void}
*/ */
onDisconnect (connection) { onDisconnect (connection) {
if (!this._started) {
// This can happen when we are in the process of shutting down the node
return
}
const peerId = connection.remotePeer.toB58String() const peerId = connection.remotePeer.toB58String()
let storedConn = this.connections.get(peerId) let storedConn = this.connections.get(peerId)

View File

@ -288,7 +288,7 @@ class PeerStoreAddressBook {
updatedPeer = await this._store.mergeOrCreate(peerId, { addresses }) updatedPeer = await this._store.mergeOrCreate(peerId, { addresses })
log(`added multiaddrs for ${peerId}`) log(`added multiaddrs for ${peerId.toB58String()}`)
} finally { } finally {
log('set release write lock') log('set release write lock')
release() release()

View File

@ -307,6 +307,8 @@ describe('Dialing (direct, TCP)', () => {
} }
}) })
await libp2p.start()
sinon.spy(libp2p.dialer, 'connectToPeer') sinon.spy(libp2p.dialer, 'connectToPeer')
try { try {
@ -329,6 +331,8 @@ describe('Dialing (direct, TCP)', () => {
} }
}) })
await libp2p.start()
sinon.spy(libp2p.dialer, 'connectToPeer') sinon.spy(libp2p.dialer, 'connectToPeer')
const connection = await libp2p.dial(remoteAddr) const connection = await libp2p.dial(remoteAddr)
@ -337,7 +341,7 @@ describe('Dialing (direct, TCP)', () => {
expect(stream).to.exist() expect(stream).to.exist()
expect(protocol).to.equal('/echo/1.0.0') expect(protocol).to.equal('/echo/1.0.0')
await connection.close() await connection.close()
expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) expect(libp2p.dialer.connectToPeer.callCount).to.be.greaterThan(0)
}) })
it('should use the dialer for connecting to a peer', async () => { it('should use the dialer for connecting to a peer', async () => {
@ -350,6 +354,8 @@ describe('Dialing (direct, TCP)', () => {
} }
}) })
await libp2p.start()
sinon.spy(libp2p.dialer, 'connectToPeer') sinon.spy(libp2p.dialer, 'connectToPeer')
await libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) await libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs)
@ -359,7 +365,7 @@ describe('Dialing (direct, TCP)', () => {
expect(stream).to.exist() expect(stream).to.exist()
expect(protocol).to.equal('/echo/1.0.0') expect(protocol).to.equal('/echo/1.0.0')
await connection.close() await connection.close()
expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) expect(libp2p.dialer.connectToPeer.callCount).to.be.greaterThan(0)
}) })
it('should close all streams when the connection closes', async () => { it('should close all streams when the connection closes', async () => {
@ -372,6 +378,8 @@ describe('Dialing (direct, TCP)', () => {
} }
}) })
await libp2p.start()
// register some stream handlers to simulate several protocols // register some stream handlers to simulate several protocols
await libp2p.handle('/stream-count/1', ({ stream }) => pipe(stream, stream)) await libp2p.handle('/stream-count/1', ({ stream }) => pipe(stream, stream))
await libp2p.handle('/stream-count/2', ({ stream }) => pipe(stream, stream)) await libp2p.handle('/stream-count/2', ({ stream }) => pipe(stream, stream))
@ -397,8 +405,8 @@ describe('Dialing (direct, TCP)', () => {
// Verify stream count // Verify stream count
const remoteConn = remoteLibp2p.connectionManager.get(libp2p.peerId) const remoteConn = remoteLibp2p.connectionManager.get(libp2p.peerId)
expect(connection.streams).to.have.length(6) expect(connection.streams).to.have.length(5)
expect(remoteConn.streams).to.have.length(6) expect(remoteConn.streams).to.have.length(5)
// Close the connection and verify all streams have been closed // Close the connection and verify all streams have been closed
await connection.close() await connection.close()
@ -416,6 +424,8 @@ describe('Dialing (direct, TCP)', () => {
} }
}) })
await libp2p.start()
await expect(libp2p.dialProtocol(remotePeerId)) await expect(libp2p.dialProtocol(remotePeerId))
.to.eventually.be.rejectedWith(Error) .to.eventually.be.rejectedWith(Error)
.and.to.have.property('code', ErrorCodes.ERR_INVALID_PROTOCOLS_FOR_STREAM) .and.to.have.property('code', ErrorCodes.ERR_INVALID_PROTOCOLS_FOR_STREAM)
@ -435,6 +445,8 @@ describe('Dialing (direct, TCP)', () => {
} }
}) })
await libp2p.start()
const connection = await libp2p.dial(remoteAddr) const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist() expect(connection).to.exist()
expect(connection.stat.timeline.close).to.not.exist() expect(connection.stat.timeline.close).to.not.exist()
@ -452,6 +464,8 @@ describe('Dialing (direct, TCP)', () => {
} }
}) })
await libp2p.start()
const connection = await libp2p.dial(`${remoteAddr.toString()}`) const connection = await libp2p.dial(`${remoteAddr.toString()}`)
expect(connection).to.exist() expect(connection).to.exist()
expect(connection.stat.timeline.close).to.not.exist() expect(connection.stat.timeline.close).to.not.exist()
@ -474,6 +488,8 @@ describe('Dialing (direct, TCP)', () => {
sinon.spy(libp2p.upgrader.protector, 'protect') sinon.spy(libp2p.upgrader.protector, 'protect')
sinon.stub(remoteLibp2p.upgrader, 'protector').value(new Protector(swarmKeyBuffer)) sinon.stub(remoteLibp2p.upgrader, 'protector').value(new Protector(swarmKeyBuffer))
await libp2p.start()
const connection = await libp2p.dialer.connectToPeer(remoteAddr) const connection = await libp2p.dialer.connectToPeer(remoteAddr)
expect(connection).to.exist() expect(connection).to.exist()
const { stream, protocol } = await connection.newStream('/echo/1.0.0') const { stream, protocol } = await connection.newStream('/echo/1.0.0')
@ -492,8 +508,10 @@ describe('Dialing (direct, TCP)', () => {
connEncryption: [Crypto] connEncryption: [Crypto]
} }
}) })
const dials = 10
await libp2p.start()
const dials = 10
const fullAddress = remoteAddr.encapsulate(`/p2p/${remoteLibp2p.peerId.toB58String()}`) const fullAddress = remoteAddr.encapsulate(`/p2p/${remoteLibp2p.peerId.toB58String()}`)
await libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs) await libp2p.peerStore.addressBook.set(remotePeerId, remoteLibp2p.multiaddrs)
@ -522,6 +540,9 @@ describe('Dialing (direct, TCP)', () => {
connEncryption: [Crypto] connEncryption: [Crypto]
} }
}) })
await libp2p.start()
const dials = 10 const dials = 10
const error = new Error('Boom') const error = new Error('Boom')
sinon.stub(libp2p.transportManager, 'dial').callsFake(() => Promise.reject(error)) sinon.stub(libp2p.transportManager, 'dial').callsFake(() => Promise.reject(error))

View File

@ -459,14 +459,18 @@ describe('Dialing (direct, WebSockets)', () => {
sinon.spy(libp2p.dialer, 'connectToPeer') sinon.spy(libp2p.dialer, 'connectToPeer')
sinon.spy(libp2p.peerStore.addressBook, 'add') sinon.spy(libp2p.peerStore.addressBook, 'add')
await libp2p.start()
const connection = await libp2p.dial(remoteAddr) const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist() expect(connection).to.exist()
const { stream, protocol } = await connection.newStream('/echo/1.0.0') const { stream, protocol } = await connection.newStream('/echo/1.0.0')
expect(stream).to.exist() expect(stream).to.exist()
expect(protocol).to.equal('/echo/1.0.0') expect(protocol).to.equal('/echo/1.0.0')
await connection.close() await connection.close()
expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) expect(libp2p.dialer.connectToPeer.callCount).to.be.at.least(1)
expect(libp2p.peerStore.addressBook.add.callCount).to.be.at.least(1) expect(libp2p.peerStore.addressBook.add.callCount).to.be.at.least(1)
await libp2p.stop()
}) })
it('should run identify automatically after connecting', async () => { it('should run identify automatically after connecting', async () => {
@ -489,6 +493,8 @@ describe('Dialing (direct, WebSockets)', () => {
sinon.spy(libp2p.identifyService, 'identify') sinon.spy(libp2p.identifyService, 'identify')
sinon.spy(libp2p.upgrader, 'onConnection') sinon.spy(libp2p.upgrader, 'onConnection')
await libp2p.start()
const connection = await libp2p.dial(remoteAddr) const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist() expect(connection).to.exist()
@ -501,6 +507,8 @@ describe('Dialing (direct, WebSockets)', () => {
await libp2p.identifyService.identify.firstCall.returnValue await libp2p.identifyService.identify.firstCall.returnValue
expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1) expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1)
await libp2p.stop()
}) })
it('should be able to use hangup to close connections', async () => { it('should be able to use hangup to close connections', async () => {
@ -520,11 +528,15 @@ describe('Dialing (direct, WebSockets)', () => {
} }
}) })
await libp2p.start()
const connection = await libp2p.dial(remoteAddr) const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist() expect(connection).to.exist()
expect(connection.stat.timeline.close).to.not.exist() expect(connection.stat.timeline.close).to.not.exist()
await libp2p.hangUp(connection.remotePeer) await libp2p.hangUp(connection.remotePeer)
expect(connection.stat.timeline.close).to.exist() expect(connection.stat.timeline.close).to.exist()
await libp2p.stop()
}) })
it('should be able to use hangup when no connection exists', async () => { it('should be able to use hangup when no connection exists', async () => {

View File

@ -51,6 +51,9 @@ describe('Dialing (resolvable addresses)', () => {
started: true, started: true,
populateAddressBooks: false populateAddressBooks: false
}) })
await libp2p.start()
await remoteLibp2p.start()
}) })
afterEach(async () => { afterEach(async () => {

View File

@ -123,6 +123,8 @@ describe('registrar', () => {
} }
}) })
await libp2p.start()
// Register protocol // Register protocol
const identifier = await libp2p.registrar.register(topologyProps) const identifier = await libp2p.registrar.register(topologyProps)
const topology = libp2p.registrar.topologies.get(identifier) const topology = libp2p.registrar.topologies.get(identifier)
@ -164,6 +166,8 @@ describe('registrar', () => {
} }
}) })
await libp2p.start()
// Register protocol // Register protocol
const identifier = await libp2p.registrar.register(topologyProps) const identifier = await libp2p.registrar.register(topologyProps)
const topology = libp2p.registrar.topologies.get(identifier) const topology = libp2p.registrar.topologies.get(identifier)

View File

@ -403,6 +403,7 @@ describe('libp2p.upgrader', () => {
connEncryption: [Crypto] connEncryption: [Crypto]
} }
}) })
await libp2p.start()
const echoHandler = () => {} const echoHandler = () => {}
libp2p.handle(['/echo/1.0.0'], echoHandler) libp2p.handle(['/echo/1.0.0'], echoHandler)
@ -448,6 +449,8 @@ describe('libp2p.upgrader', () => {
connectionGater connectionGater
}) })
await libp2p.start()
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
// Spy on emit for easy verification // Spy on emit for easy verification