mirror of
https://github.com/fluencelabs/js-libp2p-utils
synced 2025-06-26 23:11:45 +00:00
feat: stream to multiaddr connection converter (#2)
This commit is contained in:
11
package.json
11
package.json
@ -29,14 +29,19 @@
|
|||||||
},
|
},
|
||||||
"homepage": "https://github.com/libp2p/js-libp2p-utils#readme",
|
"homepage": "https://github.com/libp2p/js-libp2p-utils#readme",
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"aegir": "^20.3.1",
|
"aegir": "^20.6.0",
|
||||||
"chai": "^4.2.0",
|
"chai": "^4.2.0",
|
||||||
"dirty-chai": "^2.0.1"
|
"dirty-chai": "^2.0.1",
|
||||||
|
"it-pair": "^1.0.0",
|
||||||
|
"it-pipe": "^1.1.0",
|
||||||
|
"streaming-iterables": "^4.1.2"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"abortable-iterator": "^3.0.0",
|
||||||
|
"debug": "^4.1.1",
|
||||||
"err-code": "^2.0.0",
|
"err-code": "^2.0.0",
|
||||||
"ip-address": "^6.1.0",
|
"ip-address": "^6.1.0",
|
||||||
"multiaddr": "^7.1.0"
|
"multiaddr": "^7.3.0"
|
||||||
},
|
},
|
||||||
"contributors": [
|
"contributors": [
|
||||||
"Vasco Santos <vasco.santos@moxy.studio>"
|
"Vasco Santos <vasco.santos@moxy.studio>"
|
||||||
|
49
src/stream-to-ma-conn.js
Normal file
49
src/stream-to-ma-conn.js
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const abortable = require('abortable-iterator')
|
||||||
|
const log = require('debug')('libp2p:stream:converter')
|
||||||
|
|
||||||
|
// Convert a duplex iterable into a MultiaddrConnection
|
||||||
|
// https://github.com/libp2p/interface-transport#multiaddrconnection
|
||||||
|
module.exports = ({ stream, remoteAddr, localAddr }, options = {}) => {
|
||||||
|
const { sink, source } = stream
|
||||||
|
const maConn = {
|
||||||
|
async sink (source) {
|
||||||
|
if (options.signal) {
|
||||||
|
source = abortable(source, options.signal)
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
await sink(source)
|
||||||
|
} catch (err) {
|
||||||
|
// If aborted we can safely ignore
|
||||||
|
if (err.type !== 'aborted') {
|
||||||
|
// If the source errored the socket will already have been destroyed by
|
||||||
|
// toIterable.duplex(). If the socket errored it will already be
|
||||||
|
// destroyed. There's nothing to do here except log the error & return.
|
||||||
|
log(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close()
|
||||||
|
},
|
||||||
|
|
||||||
|
source: options.signal ? abortable(source, options.signal) : source,
|
||||||
|
conn: stream,
|
||||||
|
localAddr,
|
||||||
|
remoteAddr,
|
||||||
|
timeline: { open: Date.now() },
|
||||||
|
|
||||||
|
close () {
|
||||||
|
sink([])
|
||||||
|
close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function close () {
|
||||||
|
if (!maConn.timeline.close) {
|
||||||
|
maConn.timeline.close = Date.now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return maConn
|
||||||
|
}
|
56
test/stream-to-ma-conn.spec.js
Normal file
56
test/stream-to-ma-conn.spec.js
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
/* eslint-env mocha */
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
const dirtyChai = require('dirty-chai')
|
||||||
|
const expect = chai.expect
|
||||||
|
chai.use(dirtyChai)
|
||||||
|
|
||||||
|
const pair = require('it-pair')
|
||||||
|
const pipe = require('it-pipe')
|
||||||
|
const { collect } = require('streaming-iterables')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
|
||||||
|
const streamToMaConn = require('../src/stream-to-ma-conn')
|
||||||
|
|
||||||
|
describe('Convert stream into a multiaddr connection', () => {
|
||||||
|
it('converts a stream and adds the provided metadata', () => {
|
||||||
|
const stream = pair()
|
||||||
|
const localAddr = multiaddr('/ip4/101.45.75.219/tcp/6000')
|
||||||
|
const remoteAddr = multiaddr('/ip4/100.46.74.201/tcp/6002')
|
||||||
|
|
||||||
|
const maConn = streamToMaConn({
|
||||||
|
stream,
|
||||||
|
localAddr,
|
||||||
|
remoteAddr
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(maConn).to.exist()
|
||||||
|
expect(maConn.sink).to.exist()
|
||||||
|
expect(maConn.source).to.exist()
|
||||||
|
expect(maConn.localAddr).to.eql(localAddr)
|
||||||
|
expect(maConn.remoteAddr).to.eql(remoteAddr)
|
||||||
|
expect(maConn.timeline).to.exist()
|
||||||
|
expect(maConn.timeline.open).to.exist()
|
||||||
|
expect(maConn.timeline.close).to.not.exist()
|
||||||
|
|
||||||
|
maConn.close()
|
||||||
|
expect(maConn.timeline.close).to.exist()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('can stream data over the multiaddr connection', async () => {
|
||||||
|
const stream = pair()
|
||||||
|
const maConn = streamToMaConn({ stream })
|
||||||
|
|
||||||
|
const data = 'hey'
|
||||||
|
const streamData = await pipe(
|
||||||
|
[data],
|
||||||
|
maConn,
|
||||||
|
collect
|
||||||
|
)
|
||||||
|
|
||||||
|
expect(streamData).to.eql([data])
|
||||||
|
// underlying stream end closes the connection
|
||||||
|
expect(maConn.timeline.close).to.exist()
|
||||||
|
})
|
||||||
|
})
|
Reference in New Issue
Block a user