mirror of
https://github.com/fluencelabs/js-libp2p-interfaces
synced 2025-07-08 01:31:51 +00:00
Compare commits
7 Commits
v0.5.2
...
feat/strea
Author | SHA1 | Date | |
---|---|---|---|
3d14678de8 | |||
d7e113b3db | |||
39af3ae7fa | |||
bbf1b556bc | |||
d168c7d531 | |||
349c1174db | |||
e14844315b |
16
CHANGELOG.md
16
CHANGELOG.md
@ -1,3 +1,19 @@
|
|||||||
|
<a name="0.6.0"></a>
|
||||||
|
# [0.6.0](https://github.com/libp2p/js-interfaces/compare/v0.5.2...v0.6.0) (2020-10-05)
|
||||||
|
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* update pubsub getMsgId return type to Uint8Array ([#65](https://github.com/libp2p/js-interfaces/issues/65)) ([e148443](https://github.com/libp2p/js-interfaces/commit/e148443))
|
||||||
|
|
||||||
|
|
||||||
|
### BREAKING CHANGES
|
||||||
|
|
||||||
|
* new getMsgId return type is not backwards compatible with prior `string`
|
||||||
|
return type.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
<a name="0.5.2"></a>
|
<a name="0.5.2"></a>
|
||||||
## [0.5.2](https://github.com/libp2p/js-interfaces/compare/v0.3.1...v0.5.2) (2020-09-30)
|
## [0.5.2](https://github.com/libp2p/js-interfaces/compare/v0.3.1...v0.5.2) (2020-09-30)
|
||||||
|
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "libp2p-interfaces",
|
"name": "libp2p-interfaces",
|
||||||
"version": "0.5.2",
|
"version": "0.6.0",
|
||||||
"description": "Interfaces for JS Libp2p",
|
"description": "Interfaces for JS Libp2p",
|
||||||
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
|
"leadMaintainer": "Jacob Heun <jacobheun@gmail.com>",
|
||||||
"main": "src/index.js",
|
"main": "src/index.js",
|
||||||
@ -53,7 +53,6 @@
|
|||||||
"it-pipe": "^1.1.0",
|
"it-pipe": "^1.1.0",
|
||||||
"it-pushable": "^1.4.0",
|
"it-pushable": "^1.4.0",
|
||||||
"libp2p-crypto": "^0.18.0",
|
"libp2p-crypto": "^0.18.0",
|
||||||
"libp2p-tcp": "^0.15.0",
|
|
||||||
"multiaddr": "^8.0.0",
|
"multiaddr": "^8.0.0",
|
||||||
"multibase": "^3.0.0",
|
"multibase": "^3.0.0",
|
||||||
"p-defer": "^3.0.0",
|
"p-defer": "^3.0.0",
|
||||||
|
@ -209,6 +209,8 @@ class Connection {
|
|||||||
* @return {Promise<void>}
|
* @return {Promise<void>}
|
||||||
*/
|
*/
|
||||||
async close () {
|
async close () {
|
||||||
|
this.streams.map(s => s.close && s.close())
|
||||||
|
|
||||||
if (this.stat.status === Status.CLOSED) {
|
if (this.stat.status === Status.CLOSED) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
4
src/pubsub/index.d.ts
vendored
4
src/pubsub/index.d.ts
vendored
@ -184,9 +184,9 @@ declare class PubsubBaseProtocol {
|
|||||||
* The default msgID implementation
|
* The default msgID implementation
|
||||||
* Child class can override this.
|
* Child class can override this.
|
||||||
* @param {RPC.Message} msg the message object
|
* @param {RPC.Message} msg the message object
|
||||||
* @returns {string} message id as string
|
* @returns {Uint8Array} message id as bytes
|
||||||
*/
|
*/
|
||||||
getMsgId(msg: any): string;
|
getMsgId(msg: any): Uint8Array;
|
||||||
/**
|
/**
|
||||||
* Whether to accept a message from a peer
|
* Whether to accept a message from a peer
|
||||||
* Override to create a graylist
|
* Override to create a graylist
|
||||||
|
@ -437,7 +437,7 @@ class PubsubBaseProtocol extends EventEmitter {
|
|||||||
* The default msgID implementation
|
* The default msgID implementation
|
||||||
* Child class can override this.
|
* Child class can override this.
|
||||||
* @param {RPC.Message} msg the message object
|
* @param {RPC.Message} msg the message object
|
||||||
* @returns {string} message id as string
|
* @returns {Uint8Array} message id as bytes
|
||||||
*/
|
*/
|
||||||
getMsgId (msg) {
|
getMsgId (msg) {
|
||||||
return utils.msgId(msg.from, msg.seqno)
|
return utils.msgId(msg.from, msg.seqno)
|
||||||
|
2
src/pubsub/utils.d.ts
vendored
2
src/pubsub/utils.d.ts
vendored
@ -1,5 +1,5 @@
|
|||||||
export function randomSeqno(): Uint8Array;
|
export function randomSeqno(): Uint8Array;
|
||||||
export function msgId(from: string, seqno: Uint8Array): string;
|
export function msgId(from: string, seqno: Uint8Array): Uint8Array;
|
||||||
export function anyMatch(a: any[] | Set<any>, b: any[] | Set<any>): boolean;
|
export function anyMatch(a: any[] | Set<any>, b: any[] | Set<any>): boolean;
|
||||||
export function ensureArray(maybeArray: any): any[];
|
export function ensureArray(maybeArray: any): any[];
|
||||||
export function normalizeInRpcMessage(message: any, peerId: string): any;
|
export function normalizeInRpcMessage(message: any, peerId: string): any;
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
const randomBytes = require('libp2p-crypto/src/random-bytes')
|
const randomBytes = require('libp2p-crypto/src/random-bytes')
|
||||||
const uint8ArrayToString = require('uint8arrays/to-string')
|
const uint8ArrayToString = require('uint8arrays/to-string')
|
||||||
const uint8ArrayFromString = require('uint8arrays/from-string')
|
const uint8ArrayFromString = require('uint8arrays/from-string')
|
||||||
|
const PeerId = require('peer-id')
|
||||||
exports = module.exports
|
exports = module.exports
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -20,11 +21,15 @@ exports.randomSeqno = () => {
|
|||||||
*
|
*
|
||||||
* @param {string} from
|
* @param {string} from
|
||||||
* @param {Uint8Array} seqno
|
* @param {Uint8Array} seqno
|
||||||
* @returns {string}
|
* @returns {Uint8Array}
|
||||||
* @private
|
* @private
|
||||||
*/
|
*/
|
||||||
exports.msgId = (from, seqno) => {
|
exports.msgId = (from, seqno) => {
|
||||||
return from + uint8ArrayToString(seqno, 'base16')
|
const fromBytes = PeerId.createFromB58String(from).id
|
||||||
|
const msgId = new Uint8Array(fromBytes.length + seqno.length)
|
||||||
|
msgId.set(fromBytes, 0)
|
||||||
|
msgId.set(seqno, fromBytes.length)
|
||||||
|
return msgId
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2,17 +2,17 @@
|
|||||||
/* eslint max-nested-callbacks: ["error", 8] */
|
/* eslint max-nested-callbacks: ["error", 8] */
|
||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
const expect = chai.expect
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
|
||||||
const pair = require('it-pair/duplex')
|
const pair = require('it-pair/duplex')
|
||||||
const pipe = require('it-pipe')
|
const pipe = require('it-pipe')
|
||||||
const { consume } = require('streaming-iterables')
|
const { consume } = require('streaming-iterables')
|
||||||
const Tcp = require('libp2p-tcp')
|
|
||||||
const multiaddr = require('multiaddr')
|
|
||||||
const abortable = require('abortable-iterator')
|
const abortable = require('abortable-iterator')
|
||||||
const AbortController = require('abort-controller')
|
const AbortController = require('abort-controller')
|
||||||
const uint8arrayFromString = require('uint8arrays/from-string')
|
const uint8arrayFromString = require('uint8arrays/from-string')
|
||||||
|
|
||||||
const mh = multiaddr('/ip4/127.0.0.1/tcp/0')
|
|
||||||
|
|
||||||
function pause (ms) {
|
function pause (ms) {
|
||||||
return new Promise(resolve => setTimeout(resolve, ms))
|
return new Promise(resolve => setTimeout(resolve, ms))
|
||||||
}
|
}
|
||||||
@ -38,33 +38,31 @@ module.exports = (common) => {
|
|||||||
Muxer = await common.setup()
|
Muxer = await common.setup()
|
||||||
})
|
})
|
||||||
|
|
||||||
it('closing underlying socket closes streams (tcp)', async () => {
|
it('closing underlying socket closes streams', async () => {
|
||||||
const mockConn = muxer => ({
|
const mockConn = muxer => ({
|
||||||
newStream: (...args) => muxer.newStream(...args)
|
newStream: (...args) => muxer.newStream(...args)
|
||||||
})
|
})
|
||||||
|
|
||||||
const mockUpgrade = () => maConn => {
|
const mockUpgrade = maConn => {
|
||||||
const muxer = new Muxer(stream => pipe(stream, stream))
|
const muxer = new Muxer(stream => pipe(stream, stream))
|
||||||
pipe(maConn, muxer, maConn)
|
pipe(maConn, muxer, maConn)
|
||||||
return mockConn(muxer)
|
return mockConn(muxer)
|
||||||
}
|
}
|
||||||
|
|
||||||
const mockUpgrader = () => ({
|
const [local, remote] = pair()
|
||||||
upgradeInbound: mockUpgrade(),
|
const controller = new AbortController()
|
||||||
upgradeOutbound: mockUpgrade()
|
const abortableRemote = abortable.duplex(remote, controller.signal, {
|
||||||
|
returnOnAbort: true
|
||||||
})
|
})
|
||||||
|
|
||||||
const tcp = new Tcp({ upgrader: mockUpgrader() })
|
mockUpgrade(abortableRemote)
|
||||||
const tcpListener = tcp.createListener()
|
const dialerConn = mockUpgrade(local)
|
||||||
|
|
||||||
await tcpListener.listen(mh)
|
|
||||||
const dialerConn = await tcp.dial(tcpListener.getAddrs()[0])
|
|
||||||
|
|
||||||
const s1 = await dialerConn.newStream()
|
const s1 = await dialerConn.newStream()
|
||||||
const s2 = await dialerConn.newStream()
|
const s2 = await dialerConn.newStream()
|
||||||
|
|
||||||
// close the listener in a bit
|
// close the remote in a bit
|
||||||
setTimeout(() => tcpListener.close(), 50)
|
setTimeout(() => controller.abort(), 50)
|
||||||
|
|
||||||
const s1Result = pipe(infiniteRandom, s1, consume)
|
const s1Result = pipe(infiniteRandom, s1, consume)
|
||||||
const s2Result = pipe(infiniteRandom, s2, consume)
|
const s2Result = pipe(infiniteRandom, s2, consume)
|
||||||
@ -115,5 +113,69 @@ module.exports = (common) => {
|
|||||||
// These should now all resolve without error
|
// These should now all resolve without error
|
||||||
await Promise.all(streamResults)
|
await Promise.all(streamResults)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('can close a stream for writing', (done) => {
|
||||||
|
const p = pair()
|
||||||
|
const dialer = new Muxer()
|
||||||
|
const data = [randomBuffer(), randomBuffer()]
|
||||||
|
|
||||||
|
const listener = new Muxer(async stream => {
|
||||||
|
// Immediate close for write
|
||||||
|
await stream.closeWrite()
|
||||||
|
|
||||||
|
const results = await pipe(stream, async (source) => {
|
||||||
|
const data = []
|
||||||
|
for await (const chunk of source) {
|
||||||
|
data.push(chunk.slice())
|
||||||
|
}
|
||||||
|
return data
|
||||||
|
})
|
||||||
|
expect(results).to.eql(data)
|
||||||
|
|
||||||
|
try {
|
||||||
|
await stream.sink([randomBuffer()])
|
||||||
|
} catch (err) {
|
||||||
|
expect(err).to.exist()
|
||||||
|
return done()
|
||||||
|
}
|
||||||
|
expect.fail('should not support writing to closed writer')
|
||||||
|
})
|
||||||
|
|
||||||
|
pipe(p[0], dialer, p[0])
|
||||||
|
pipe(p[1], listener, p[1])
|
||||||
|
|
||||||
|
const stream = dialer.newStream()
|
||||||
|
stream.sink(data)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('can close a stream for reading', (done) => {
|
||||||
|
const p = pair()
|
||||||
|
const dialer = new Muxer()
|
||||||
|
const data = [randomBuffer(), randomBuffer()]
|
||||||
|
|
||||||
|
const listener = new Muxer(async stream => {
|
||||||
|
const results = await pipe(stream, async (source) => {
|
||||||
|
const data = []
|
||||||
|
for await (const chunk of source) {
|
||||||
|
data.push(chunk.slice())
|
||||||
|
}
|
||||||
|
return data
|
||||||
|
})
|
||||||
|
expect(results).to.eql(data)
|
||||||
|
done()
|
||||||
|
})
|
||||||
|
|
||||||
|
pipe(p[0], dialer, p[0])
|
||||||
|
pipe(p[1], listener, p[1])
|
||||||
|
|
||||||
|
const stream = dialer.newStream()
|
||||||
|
stream.closeRead()
|
||||||
|
|
||||||
|
// Source should be done
|
||||||
|
;(async () => {
|
||||||
|
expect(await stream.source.next()).to.eql({ done: true })
|
||||||
|
stream.sink(data)
|
||||||
|
})()
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -15,15 +15,11 @@ describe('utils', () => {
|
|||||||
expect(first).to.not.eql(second)
|
expect(first).to.not.eql(second)
|
||||||
})
|
})
|
||||||
|
|
||||||
it('msgId', () => {
|
|
||||||
expect(utils.msgId('hello', uint8ArrayFromString('world'))).to.be.eql('hello776f726c64')
|
|
||||||
})
|
|
||||||
|
|
||||||
it('msgId should not generate same ID for two different Uint8Arrays', () => {
|
it('msgId should not generate same ID for two different Uint8Arrays', () => {
|
||||||
const peerId = 'QmPNdSYk5Rfpo5euNqwtyizzmKXMNHdXeLjTQhcN4yfX22'
|
const peerId = 'QmPNdSYk5Rfpo5euNqwtyizzmKXMNHdXeLjTQhcN4yfX22'
|
||||||
const msgId0 = utils.msgId(peerId, uint8ArrayFromString('15603533e990dfde', 'base16'))
|
const msgId0 = utils.msgId(peerId, uint8ArrayFromString('15603533e990dfde', 'base16'))
|
||||||
const msgId1 = utils.msgId(peerId, uint8ArrayFromString('15603533e990dfe0', 'base16'))
|
const msgId1 = utils.msgId(peerId, uint8ArrayFromString('15603533e990dfe0', 'base16'))
|
||||||
expect(msgId0).to.not.eql(msgId1)
|
expect(msgId0).to.not.deep.equal(msgId1)
|
||||||
})
|
})
|
||||||
|
|
||||||
it('anyMatch', () => {
|
it('anyMatch', () => {
|
||||||
|
Reference in New Issue
Block a user