mirror of
https://github.com/fluencelabs/js-libp2p-websockets
synced 2025-06-13 04:51:36 +00:00
fix: add error event handler (#118)
This commit is contained in:
@ -48,6 +48,7 @@
|
|||||||
"mafmt": "^8.0.1",
|
"mafmt": "^8.0.1",
|
||||||
"multiaddr": "^8.1.1",
|
"multiaddr": "^8.1.1",
|
||||||
"multiaddr-to-uri": "^6.0.0",
|
"multiaddr-to-uri": "^6.0.0",
|
||||||
|
"p-defer": "^3.0.0",
|
||||||
"p-timeout": "^3.2.0"
|
"p-timeout": "^3.2.0"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
28
src/index.js
28
src/index.js
@ -4,8 +4,11 @@ const connect = require('it-ws/client')
|
|||||||
const withIs = require('class-is')
|
const withIs = require('class-is')
|
||||||
const toUri = require('multiaddr-to-uri')
|
const toUri = require('multiaddr-to-uri')
|
||||||
const { AbortError } = require('abortable-iterator')
|
const { AbortError } = require('abortable-iterator')
|
||||||
|
const pDefer = require('p-defer')
|
||||||
|
|
||||||
const log = require('debug')('libp2p:websockets')
|
const debug = require('debug')
|
||||||
|
const log = debug('libp2p:websockets')
|
||||||
|
log.error = debug('libp2p:websockets:error')
|
||||||
const env = require('ipfs-utils/src/env')
|
const env = require('ipfs-utils/src/env')
|
||||||
|
|
||||||
const createListener = require('./listener')
|
const createListener = require('./listener')
|
||||||
@ -63,10 +66,24 @@ class WebSockets {
|
|||||||
const cOpts = ma.toOptions()
|
const cOpts = ma.toOptions()
|
||||||
log('dialing %s:%s', cOpts.host, cOpts.port)
|
log('dialing %s:%s', cOpts.host, cOpts.port)
|
||||||
|
|
||||||
|
const errorPromise = pDefer()
|
||||||
|
const errfn = (err) => {
|
||||||
|
const msg = `connection error: ${err.message}`
|
||||||
|
log.error(msg)
|
||||||
|
|
||||||
|
errorPromise.reject(err)
|
||||||
|
}
|
||||||
|
|
||||||
const rawSocket = connect(toUri(ma), Object.assign({ binary: true }, options))
|
const rawSocket = connect(toUri(ma), Object.assign({ binary: true }, options))
|
||||||
|
|
||||||
|
if (rawSocket.socket.on) {
|
||||||
|
rawSocket.socket.on('error', errfn)
|
||||||
|
} else {
|
||||||
|
rawSocket.socket.onerror = errfn
|
||||||
|
}
|
||||||
|
|
||||||
if (!options.signal) {
|
if (!options.signal) {
|
||||||
await rawSocket.connected()
|
await Promise.race([rawSocket.connected(), errorPromise.promise])
|
||||||
|
|
||||||
log('connected %s', ma)
|
log('connected %s', ma)
|
||||||
return rawSocket
|
return rawSocket
|
||||||
@ -77,7 +94,10 @@ class WebSockets {
|
|||||||
const abort = new Promise((resolve, reject) => {
|
const abort = new Promise((resolve, reject) => {
|
||||||
onAbort = () => {
|
onAbort = () => {
|
||||||
reject(new AbortError())
|
reject(new AbortError())
|
||||||
rawSocket.close()
|
// FIXME: https://github.com/libp2p/js-libp2p-websockets/issues/121
|
||||||
|
setTimeout(() => {
|
||||||
|
rawSocket.close()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Already aborted?
|
// Already aborted?
|
||||||
@ -86,7 +106,7 @@ class WebSockets {
|
|||||||
})
|
})
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await Promise.race([abort, rawSocket.connected()])
|
await Promise.race([abort, errorPromise.promise, rawSocket.connected()])
|
||||||
} finally {
|
} finally {
|
||||||
options.signal.removeEventListener('abort', onAbort)
|
options.signal.removeEventListener('abort', onAbort)
|
||||||
}
|
}
|
||||||
|
11
test/node.js
11
test/node.js
@ -5,6 +5,7 @@
|
|||||||
const https = require('https')
|
const https = require('https')
|
||||||
const fs = require('fs')
|
const fs = require('fs')
|
||||||
|
|
||||||
|
const AbortController = require('abort-controller').default
|
||||||
const { expect } = require('aegir/utils/chai')
|
const { expect } = require('aegir/utils/chai')
|
||||||
const multiaddr = require('multiaddr')
|
const multiaddr = require('multiaddr')
|
||||||
const goodbye = require('it-goodbye')
|
const goodbye = require('it-goodbye')
|
||||||
@ -224,6 +225,16 @@ describe('dial', () => {
|
|||||||
expect(result).to.be.eql([uint8ArrayFromString('hey')])
|
expect(result).to.be.eql([uint8ArrayFromString('hey')])
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('dial should throw on immediate abort', async () => {
|
||||||
|
const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws')
|
||||||
|
const controller = new AbortController()
|
||||||
|
|
||||||
|
const conn = ws.dial(ma, { signal: controller.signal })
|
||||||
|
controller.abort()
|
||||||
|
|
||||||
|
await expect(conn).to.eventually.be.rejected()
|
||||||
|
})
|
||||||
|
|
||||||
it('should resolve port 0', async () => {
|
it('should resolve port 0', async () => {
|
||||||
const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws')
|
const ma = multiaddr('/ip4/127.0.0.1/tcp/0/ws')
|
||||||
const ws = new WS({ upgrader: mockUpgrader })
|
const ws = new WS({ upgrader: mockUpgrader })
|
||||||
|
Reference in New Issue
Block a user