mirror of
https://github.com/fluencelabs/js-libp2p-tcp
synced 2025-04-25 21:42:30 +00:00
84 lines
2.5 KiB
JavaScript
84 lines
2.5 KiB
JavaScript
|
'use strict'
|
||
|
|
||
|
const abortable = require('abortable-iterator')
|
||
|
const log = require('debug')('libp2p:tcp:socket')
|
||
|
const toIterable = require('stream-to-it')
|
||
|
const toMultiaddr = require('./ip-port-to-multiaddr')
|
||
|
const { CLOSE_TIMEOUT } = require('./constants')
|
||
|
|
||
|
// Convert a socket into a MultiaddrConnection
|
||
|
// https://github.com/libp2p/interface-transport#multiaddrconnection
|
||
|
module.exports = (socket, options) => {
|
||
|
options = options || {}
|
||
|
|
||
|
const { sink, source } = toIterable.duplex(socket)
|
||
|
const maConn = {
|
||
|
async sink (source) {
|
||
|
if (options.signal) {
|
||
|
source = abortable(source, options.signal)
|
||
|
}
|
||
|
|
||
|
try {
|
||
|
await sink((async function * () {
|
||
|
for await (const chunk of source) {
|
||
|
// Convert BufferList to Buffer
|
||
|
yield Buffer.isBuffer(chunk) ? chunk : chunk.slice()
|
||
|
}
|
||
|
})())
|
||
|
} catch (err) {
|
||
|
// If aborted we can safely ignore
|
||
|
if (err.type !== 'aborted') {
|
||
|
// If the source errored the socket will already have been destroyed by
|
||
|
// toIterable.duplex(). If the socket errored it will already be
|
||
|
// destroyed. There's nothing to do here except log the error & return.
|
||
|
log(err)
|
||
|
}
|
||
|
}
|
||
|
},
|
||
|
|
||
|
source: options.signal ? abortable(source, options.signal) : source,
|
||
|
|
||
|
conn: socket,
|
||
|
|
||
|
localAddr: toMultiaddr(socket.localAddress, socket.localPort),
|
||
|
|
||
|
// If the remote address was passed, use it - it may have the peer ID encapsulated
|
||
|
remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress, socket.remotePort),
|
||
|
|
||
|
timeline: { open: Date.now() },
|
||
|
|
||
|
close () {
|
||
|
if (socket.destroyed) return
|
||
|
|
||
|
return new Promise((resolve, reject) => {
|
||
|
const start = Date.now()
|
||
|
|
||
|
// Attempt to end the socket. If it takes longer to close than the
|
||
|
// timeout, destroy it manually.
|
||
|
const timeout = setTimeout(() => {
|
||
|
const { host, port } = maConn.remoteAddr.toOptions()
|
||
|
log('timeout closing socket to %s:%s after %dms, destroying it manually',
|
||
|
host, port, Date.now() - start)
|
||
|
|
||
|
if (socket.destroyed) {
|
||
|
log('%s:%s is already destroyed', host, port)
|
||
|
} else {
|
||
|
socket.destroy()
|
||
|
}
|
||
|
|
||
|
resolve()
|
||
|
}, CLOSE_TIMEOUT)
|
||
|
|
||
|
socket.once('close', () => clearTimeout(timeout))
|
||
|
socket.end(err => {
|
||
|
maConn.timeline.close = Date.now()
|
||
|
if (err) return reject(err)
|
||
|
resolve()
|
||
|
})
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return maConn
|
||
|
}
|