fix: upgrader should not need muxers (#517)

* fix: upgrader should not need muxers

* chore: address review

* chore: apply suggestions from code review

Co-Authored-By: Jacob Heun <jacobheun@gmail.com>
This commit is contained in:
Vasco Santos 2019-12-16 11:26:38 +00:00 committed by Jacob Heun
parent 48fd64182b
commit 5d7ee50e76
2 changed files with 91 additions and 46 deletions

View File

@ -64,7 +64,7 @@ class Upgrader {
async upgradeInbound (maConn) { async upgradeInbound (maConn) {
let encryptedConn let encryptedConn
let remotePeer let remotePeer
let muxedConnection let upgradedConn
let Muxer let Muxer
let cryptoProtocol let cryptoProtocol
let setPeer let setPeer
@ -94,7 +94,11 @@ class Upgrader {
} = await this._encryptInbound(this.localPeer, protectedConn, this.cryptos)) } = await this._encryptInbound(this.localPeer, protectedConn, this.cryptos))
// Multiplex the connection // Multiplex the connection
;({ stream: muxedConnection, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers)) if (this.muxers.size) {
({ stream: upgradedConn, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers))
} else {
upgradedConn = encryptedConn
}
} catch (err) { } catch (err) {
log.error('Failed to upgrade inbound connection', err) log.error('Failed to upgrade inbound connection', err)
await maConn.close(err) await maConn.close(err)
@ -112,7 +116,7 @@ class Upgrader {
cryptoProtocol, cryptoProtocol,
direction: 'inbound', direction: 'inbound',
maConn, maConn,
muxedConnection, upgradedConn,
Muxer, Muxer,
remotePeer remotePeer
}) })
@ -134,7 +138,7 @@ class Upgrader {
let encryptedConn let encryptedConn
let remotePeer let remotePeer
let muxedConnection let upgradedConn
let cryptoProtocol let cryptoProtocol
let Muxer let Muxer
let setPeer let setPeer
@ -164,7 +168,11 @@ class Upgrader {
} = await this._encryptOutbound(this.localPeer, protectedConn, remotePeerId, this.cryptos)) } = await this._encryptOutbound(this.localPeer, protectedConn, remotePeerId, this.cryptos))
// Multiplex the connection // Multiplex the connection
;({ stream: muxedConnection, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers)) if (this.muxers.size) {
({ stream: upgradedConn, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers))
} else {
upgradedConn = encryptedConn
}
} catch (err) { } catch (err) {
log.error('Failed to upgrade outbound connection', err) log.error('Failed to upgrade outbound connection', err)
await maConn.close(err) await maConn.close(err)
@ -182,7 +190,7 @@ class Upgrader {
cryptoProtocol, cryptoProtocol,
direction: 'outbound', direction: 'outbound',
maConn, maConn,
muxedConnection, upgradedConn,
Muxer, Muxer,
remotePeer remotePeer
}) })
@ -195,7 +203,7 @@ class Upgrader {
* @param {string} cryptoProtocol The crypto protocol that was negotiated * @param {string} cryptoProtocol The crypto protocol that was negotiated
* @param {string} direction One of ['inbound', 'outbound'] * @param {string} direction One of ['inbound', 'outbound']
* @param {MultiaddrConnection} maConn The transport layer connection * @param {MultiaddrConnection} maConn The transport layer connection
* @param {*} muxedConnection A duplex connection returned from multiplexer selection * @param {*} upgradedConn A duplex connection returned from multiplexer and/or crypto selection
* @param {Muxer} Muxer The muxer to be used for muxing * @param {Muxer} Muxer The muxer to be used for muxing
* @param {PeerId} remotePeer The peer the connection is with * @param {PeerId} remotePeer The peer the connection is with
* @returns {Connection} * @returns {Connection}
@ -204,49 +212,52 @@ class Upgrader {
cryptoProtocol, cryptoProtocol,
direction, direction,
maConn, maConn,
muxedConnection, upgradedConn,
Muxer, Muxer,
remotePeer remotePeer
}) { }) {
// Create the muxer let muxer, newStream
const muxer = new Muxer({
// Run anytime a remote stream is created
onStream: async muxedStream => {
const mss = new Multistream.Listener(muxedStream)
try {
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
log('%s: incoming stream opened on %s', direction, protocol)
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
connection.addStream(stream, protocol)
this._onStream({ connection, stream, protocol })
} catch (err) {
log.error(err)
}
},
// Run anytime a stream closes
onStreamEnd: muxedStream => {
connection.removeStream(muxedStream.id)
}
})
const newStream = async protocols => { if (Muxer) {
log('%s: starting new stream on %s', direction, protocols) // Create the muxer
const muxedStream = muxer.newStream() muxer = new Muxer({
const mss = new Multistream.Dialer(muxedStream) // Run anytime a remote stream is created
try { onStream: async muxedStream => {
const { stream, protocol } = await mss.select(protocols) const mss = new Multistream.Listener(muxedStream)
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol }) try {
return { stream: { ...muxedStream, ...stream }, protocol } const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
} catch (err) { log('%s: incoming stream opened on %s', direction, protocol)
log.error('could not create new stream', err) if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL) connection.addStream(stream, protocol)
this._onStream({ connection, stream, protocol })
} catch (err) {
log.error(err)
}
},
// Run anytime a stream closes
onStreamEnd: muxedStream => {
connection.removeStream(muxedStream.id)
}
})
newStream = async protocols => {
log('%s: starting new stream on %s', direction, protocols)
const muxedStream = muxer.newStream()
const mss = new Multistream.Dialer(muxedStream)
try {
const { stream, protocol } = await mss.select(protocols)
if (this.metrics) this.metrics.trackStream({ stream, remotePeer, protocol })
return { stream: { ...muxedStream, ...stream }, protocol }
} catch (err) {
log.error('could not create new stream', err)
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL)
}
} }
// Pipe all data through the muxer
pipe(upgradedConn, muxer, upgradedConn)
} }
// Pipe all data through the muxer
pipe(muxedConnection, muxer, muxedConnection)
maConn.timeline.upgraded = Date.now()
const _timeline = maConn.timeline const _timeline = maConn.timeline
maConn.timeline = new Proxy(_timeline, { maConn.timeline = new Proxy(_timeline, {
set: (...args) => { set: (...args) => {
@ -258,6 +269,11 @@ class Upgrader {
return Reflect.set(...args) return Reflect.set(...args)
} }
}) })
maConn.timeline.upgraded = Date.now()
const errConnectionNotMultiplexed = () => {
throw errCode(new Error('connection is not multiplexed'), 'ERR_CONNECTION_NOT_MULTIPLEXED')
}
// Create the connection // Create the connection
const connection = new Connection({ const connection = new Connection({
@ -268,11 +284,11 @@ class Upgrader {
stat: { stat: {
direction, direction,
timeline: maConn.timeline, timeline: maConn.timeline,
multiplexer: Muxer.multicodec, multiplexer: Muxer && Muxer.multicodec,
encryption: cryptoProtocol encryption: cryptoProtocol
}, },
newStream, newStream: newStream || errConnectionNotMultiplexed,
getStreams: () => muxer.streams, getStreams: () => muxer ? muxer.streams : errConnectionNotMultiplexed,
close: err => maConn.close(err) close: err => maConn.close(err)
}) })

View File

@ -116,6 +116,35 @@ describe('Upgrader', () => {
expect(result).to.eql([hello]) expect(result).to.eql([hello])
}) })
it('should upgrade with only crypto', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })
// No available muxers
const muxers = new Map()
sinon.stub(localUpgrader, 'muxers').value(muxers)
sinon.stub(remoteUpgrader, 'muxers').value(muxers)
const cryptos = new Map([[Crypto.protocol, Crypto]])
sinon.stub(localUpgrader, 'cryptos').value(cryptos)
sinon.stub(remoteUpgrader, 'cryptos').value(cryptos)
const connections = await Promise.all([
localUpgrader.upgradeOutbound(outbound),
remoteUpgrader.upgradeInbound(inbound)
])
expect(connections).to.have.length(2)
await expect(connections[0].newStream('/echo/1.0.0')).to.be.rejected()
// Verify the MultiaddrConnection close method is called
sinon.spy(inbound, 'close')
sinon.spy(outbound, 'close')
await Promise.all(connections.map(conn => conn.close()))
expect(inbound.close.callCount).to.equal(1)
expect(outbound.close.callCount).to.equal(1)
})
it('should use a private connection protector when provided', async () => { it('should use a private connection protector when provided', async () => {
const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer }) const { inbound, outbound } = mockMultiaddrConnPair({ addrs, remotePeer })