mirror of
https://github.com/fluencelabs/js-libp2p-tcp
synced 2025-06-12 23:41:31 +00:00
feat: change api to async / await (#112)
BREAKING CHANGE: All places in the API that used callbacks are now replaced with async/await. The API has also been updated according to the latest `interface-transport` version, https://github.com/libp2p/interface-transport/tree/v0.6.0#api.
This commit is contained in:
8
src/constants.js
Normal file
8
src/constants.js
Normal file
@ -0,0 +1,8 @@
|
||||
'use strict'
|
||||
|
||||
// p2p multi-address code
|
||||
exports.CODE_P2P = 421
|
||||
exports.CODE_CIRCUIT = 290
|
||||
|
||||
// Time to wait for a connection to close gracefully before destroying it manually
|
||||
exports.CLOSE_TIMEOUT = 2000
|
@ -1,33 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
const multiaddr = require('multiaddr')
|
||||
const Address6 = require('ip-address').Address6
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:tcp:get-multiaddr')
|
||||
|
||||
module.exports = (socket) => {
|
||||
let ma
|
||||
|
||||
try {
|
||||
if (socket.remoteFamily === 'IPv6') {
|
||||
const addr = new Address6(socket.remoteAddress)
|
||||
|
||||
if (addr.v4) {
|
||||
const ip4 = addr.to4().correctForm()
|
||||
ma = multiaddr('/ip4/' + ip4 +
|
||||
'/tcp/' + socket.remotePort
|
||||
)
|
||||
} else {
|
||||
ma = multiaddr('/ip6/' + socket.remoteAddress +
|
||||
'/tcp/' + socket.remotePort
|
||||
)
|
||||
}
|
||||
} else {
|
||||
ma = multiaddr('/ip4/' + socket.remoteAddress +
|
||||
'/tcp/' + socket.remotePort)
|
||||
}
|
||||
} catch (err) {
|
||||
log(err)
|
||||
}
|
||||
return ma
|
||||
}
|
168
src/index.js
168
src/index.js
@ -1,83 +1,137 @@
|
||||
'use strict'
|
||||
|
||||
const net = require('net')
|
||||
const toPull = require('stream-to-pull-stream')
|
||||
const mafmt = require('mafmt')
|
||||
const withIs = require('class-is')
|
||||
const includes = require('lodash.includes')
|
||||
const isFunction = require('lodash.isfunction')
|
||||
const Connection = require('interface-connection').Connection
|
||||
const once = require('once')
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:tcp:dial')
|
||||
|
||||
const errCode = require('err-code')
|
||||
const log = require('debug')('libp2p:tcp')
|
||||
const toConnection = require('./socket-to-conn')
|
||||
const createListener = require('./listener')
|
||||
const { AbortError } = require('abortable-iterator')
|
||||
const { CODE_CIRCUIT, CODE_P2P } = require('./constants')
|
||||
const assert = require('assert')
|
||||
|
||||
function noop () {}
|
||||
|
||||
/**
|
||||
* @class TCP
|
||||
*/
|
||||
class TCP {
|
||||
dial (ma, options, callback) {
|
||||
if (isFunction(options)) {
|
||||
callback = options
|
||||
options = {}
|
||||
}
|
||||
|
||||
callback = once(callback || noop)
|
||||
|
||||
const cOpts = ma.toOptions()
|
||||
log('Connecting to %s %s', cOpts.port, cOpts.host)
|
||||
|
||||
const rawSocket = net.connect(cOpts)
|
||||
|
||||
rawSocket.once('timeout', () => {
|
||||
log('timeout')
|
||||
rawSocket.emit('error', new Error('Timeout'))
|
||||
})
|
||||
|
||||
rawSocket.once('error', callback)
|
||||
|
||||
rawSocket.once('connect', () => {
|
||||
rawSocket.removeListener('error', callback)
|
||||
callback()
|
||||
})
|
||||
|
||||
const socket = toPull.duplex(rawSocket)
|
||||
|
||||
const conn = new Connection(socket)
|
||||
|
||||
conn.getObservedAddrs = (callback) => {
|
||||
return callback(null, [ma])
|
||||
}
|
||||
/**
|
||||
* @constructor
|
||||
* @param {object} options
|
||||
* @param {Upgrader} options.upgrader
|
||||
*/
|
||||
constructor ({ upgrader }) {
|
||||
assert(upgrader, 'An upgrader must be provided. See https://github.com/libp2p/interface-transport#upgrader.')
|
||||
this._upgrader = upgrader
|
||||
}
|
||||
|
||||
/**
|
||||
* @async
|
||||
* @param {Multiaddr} ma
|
||||
* @param {object} options
|
||||
* @param {AbortSignal} options.signal Used to abort dial requests
|
||||
* @returns {Connection} An upgraded Connection
|
||||
*/
|
||||
async dial (ma, options) {
|
||||
options = options || {}
|
||||
const socket = await this._connect(ma, options)
|
||||
const maConn = toConnection(socket, { remoteAddr: ma, signal: options.signal })
|
||||
log('new outbound connection %s', maConn.remoteAddr)
|
||||
const conn = await this._upgrader.upgradeOutbound(maConn)
|
||||
log('outbound connection %s upgraded', maConn.remoteAddr)
|
||||
return conn
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
* @param {Multiaddr} ma
|
||||
* @param {object} options
|
||||
* @param {AbortSignal} options.signal Used to abort dial requests
|
||||
* @returns {Promise<Socket>} Resolves a TCP Socket
|
||||
*/
|
||||
_connect (ma, options = {}) {
|
||||
if (options.signal && options.signal.aborted) {
|
||||
throw new AbortError()
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const start = Date.now()
|
||||
const cOpts = ma.toOptions()
|
||||
|
||||
log('dialing %s:%s', cOpts.host, cOpts.port)
|
||||
const rawSocket = net.connect(cOpts)
|
||||
|
||||
const onError = err => {
|
||||
err.message = `connection error ${cOpts.host}:${cOpts.port}: ${err.message}`
|
||||
done(err)
|
||||
}
|
||||
|
||||
const onTimeout = () => {
|
||||
log('connnection timeout %s:%s', cOpts.host, cOpts.port)
|
||||
const err = errCode(new Error(`connection timeout after ${Date.now() - start}ms`), 'ERR_CONNECT_TIMEOUT')
|
||||
// Note: this will result in onError() being called
|
||||
rawSocket.emit('error', err)
|
||||
}
|
||||
|
||||
const onConnect = () => {
|
||||
log('connection opened %s:%s', cOpts.host, cOpts.port)
|
||||
done()
|
||||
}
|
||||
|
||||
const onAbort = () => {
|
||||
log('connection aborted %s:%s', cOpts.host, cOpts.port)
|
||||
rawSocket.destroy()
|
||||
done(new AbortError())
|
||||
}
|
||||
|
||||
const done = err => {
|
||||
rawSocket.removeListener('error', onError)
|
||||
rawSocket.removeListener('timeout', onTimeout)
|
||||
rawSocket.removeListener('connect', onConnect)
|
||||
options.signal && options.signal.removeEventListener('abort', onAbort)
|
||||
|
||||
if (err) return reject(err)
|
||||
resolve(rawSocket)
|
||||
}
|
||||
|
||||
rawSocket.on('error', onError)
|
||||
rawSocket.on('timeout', onTimeout)
|
||||
rawSocket.on('connect', onConnect)
|
||||
options.signal && options.signal.addEventListener('abort', onAbort)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a TCP listener. The provided `handler` function will be called
|
||||
* anytime a new incoming Connection has been successfully upgraded via
|
||||
* `upgrader.upgradeInbound`.
|
||||
* @param {*} [options]
|
||||
* @param {function(Connection)} handler
|
||||
* @returns {Listener} A TCP listener
|
||||
*/
|
||||
createListener (options, handler) {
|
||||
if (isFunction(options)) {
|
||||
if (typeof options === 'function') {
|
||||
handler = options
|
||||
options = {}
|
||||
}
|
||||
|
||||
handler = handler || noop
|
||||
|
||||
return createListener(handler)
|
||||
options = options || {}
|
||||
return createListener({ handler, upgrader: this._upgrader }, options)
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes a list of `Multiaddr`s and returns only valid TCP addresses
|
||||
* @param {Multiaddr[]} multiaddrs
|
||||
* @returns {Multiaddr[]} Valid TCP multiaddrs
|
||||
*/
|
||||
filter (multiaddrs) {
|
||||
if (!Array.isArray(multiaddrs)) {
|
||||
multiaddrs = [multiaddrs]
|
||||
}
|
||||
multiaddrs = Array.isArray(multiaddrs) ? multiaddrs : [multiaddrs]
|
||||
|
||||
return multiaddrs.filter((ma) => {
|
||||
if (includes(ma.protoNames(), 'p2p-circuit')) {
|
||||
return multiaddrs.filter(ma => {
|
||||
if (ma.protoCodes().includes(CODE_CIRCUIT)) {
|
||||
return false
|
||||
}
|
||||
|
||||
if (includes(ma.protoNames(), 'ipfs')) {
|
||||
ma = ma.decapsulate('ipfs')
|
||||
}
|
||||
|
||||
return mafmt.TCP.matches(ma)
|
||||
return mafmt.TCP.matches(ma.decapsulateCode(CODE_P2P))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
30
src/ip-port-to-multiaddr.js
Normal file
30
src/ip-port-to-multiaddr.js
Normal file
@ -0,0 +1,30 @@
|
||||
'use strict'
|
||||
|
||||
const multiaddr = require('multiaddr')
|
||||
const { Address4, Address6 } = require('ip-address')
|
||||
|
||||
module.exports = (ip, port) => {
|
||||
if (typeof ip !== 'string') {
|
||||
throw new Error('invalid ip')
|
||||
}
|
||||
|
||||
port = parseInt(port)
|
||||
|
||||
if (isNaN(port)) {
|
||||
throw new Error('invalid port')
|
||||
}
|
||||
|
||||
if (new Address4(ip).isValid()) {
|
||||
return multiaddr(`/ip4/${ip}/tcp/${port}`)
|
||||
}
|
||||
|
||||
const ip6 = new Address6(ip)
|
||||
|
||||
if (ip6.isValid()) {
|
||||
return ip6.is4()
|
||||
? multiaddr(`/ip4/${ip6.to4().correctForm()}/tcp/${port}`)
|
||||
: multiaddr(`/ip6/${ip}/tcp/${port}`)
|
||||
}
|
||||
|
||||
throw new Error('invalid ip')
|
||||
}
|
188
src/listener.js
188
src/listener.js
@ -1,156 +1,122 @@
|
||||
'use strict'
|
||||
|
||||
const multiaddr = require('multiaddr')
|
||||
const Connection = require('interface-connection').Connection
|
||||
const os = require('os')
|
||||
const includes = require('lodash.includes')
|
||||
const net = require('net')
|
||||
const toPull = require('stream-to-pull-stream')
|
||||
const EventEmitter = require('events').EventEmitter
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:tcp:listen')
|
||||
const EventEmitter = require('events')
|
||||
const log = require('debug')('libp2p:tcp:listener')
|
||||
const toConnection = require('./socket-to-conn')
|
||||
const { CODE_P2P } = require('./constants')
|
||||
const ProtoFamily = { ip4: 'IPv4', ip6: 'IPv6' }
|
||||
|
||||
const getMultiaddr = require('./get-multiaddr')
|
||||
|
||||
const IPFS_CODE = 421
|
||||
const CLOSE_TIMEOUT = 2000
|
||||
|
||||
function noop () {}
|
||||
|
||||
module.exports = (handler) => {
|
||||
module.exports = ({ handler, upgrader }, options) => {
|
||||
const listener = new EventEmitter()
|
||||
|
||||
const server = net.createServer((socket) => {
|
||||
// Avoid uncaught errors cause by unstable connections
|
||||
socket.on('error', noop)
|
||||
const server = net.createServer(async socket => {
|
||||
// Avoid uncaught errors caused by unstable connections
|
||||
socket.on('error', err => log('socket error', err))
|
||||
|
||||
const addr = getMultiaddr(socket)
|
||||
if (!addr) {
|
||||
if (socket.remoteAddress === undefined) {
|
||||
log('connection closed before p2p connection made')
|
||||
} else {
|
||||
log('error interpreting incoming p2p connection')
|
||||
}
|
||||
return
|
||||
}
|
||||
const maConn = toConnection(socket)
|
||||
log('new inbound connection %s', maConn.remoteAddr)
|
||||
|
||||
log('new connection', addr.toString())
|
||||
const conn = await upgrader.upgradeInbound(maConn)
|
||||
log('inbound connection %s upgraded', maConn.remoteAddr)
|
||||
|
||||
const s = toPull.duplex(socket)
|
||||
trackConn(server, maConn)
|
||||
|
||||
s.getObservedAddrs = (cb) => {
|
||||
cb(null, [addr])
|
||||
}
|
||||
|
||||
trackSocket(server, socket)
|
||||
|
||||
const conn = new Connection(s)
|
||||
handler(conn)
|
||||
if (handler) handler(conn)
|
||||
listener.emit('connection', conn)
|
||||
})
|
||||
|
||||
server.on('listening', () => listener.emit('listening'))
|
||||
server.on('error', (err) => listener.emit('error', err))
|
||||
server.on('close', () => listener.emit('close'))
|
||||
server
|
||||
.on('listening', () => listener.emit('listening'))
|
||||
.on('error', err => listener.emit('error', err))
|
||||
.on('close', () => listener.emit('close'))
|
||||
|
||||
// Keep track of open connections to destroy in case of timeout
|
||||
server.__connections = {}
|
||||
server.__connections = []
|
||||
|
||||
listener.close = (options, callback) => {
|
||||
if (typeof options === 'function') {
|
||||
callback = options
|
||||
options = {}
|
||||
}
|
||||
callback = callback || noop
|
||||
options = options || {}
|
||||
listener.close = () => {
|
||||
if (!server.listening) return
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
log('unable to close graciously, destroying conns')
|
||||
Object.keys(server.__connections).forEach((key) => {
|
||||
log('destroying %s', key)
|
||||
server.__connections[key].destroy()
|
||||
})
|
||||
}, options.timeout || CLOSE_TIMEOUT)
|
||||
|
||||
server.close(callback)
|
||||
|
||||
server.once('close', () => {
|
||||
clearTimeout(timeout)
|
||||
return new Promise((resolve, reject) => {
|
||||
server.__connections.forEach(maConn => maConn.close())
|
||||
server.close(err => err ? reject(err) : resolve())
|
||||
})
|
||||
}
|
||||
|
||||
let ipfsId
|
||||
let listeningAddr
|
||||
let peerId, listeningAddr
|
||||
|
||||
listener.listen = (ma, callback) => {
|
||||
listener.listen = ma => {
|
||||
listeningAddr = ma
|
||||
if (includes(ma.protoNames(), 'ipfs')) {
|
||||
ipfsId = getIpfsId(ma)
|
||||
listeningAddr = ma.decapsulate('ipfs')
|
||||
peerId = ma.getPeerId()
|
||||
|
||||
if (peerId) {
|
||||
listeningAddr = ma.decapsulateCode(CODE_P2P)
|
||||
}
|
||||
|
||||
const lOpts = listeningAddr.toOptions()
|
||||
log('Listening on %s %s', lOpts.port, lOpts.host)
|
||||
return server.listen(lOpts.port, lOpts.host, callback)
|
||||
return new Promise((resolve, reject) => {
|
||||
const { host, port } = listeningAddr.toOptions()
|
||||
server.listen(port, host, err => {
|
||||
if (err) return reject(err)
|
||||
log('Listening on %s %s', port, host)
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
listener.getAddrs = (callback) => {
|
||||
const multiaddrs = []
|
||||
listener.getAddrs = () => {
|
||||
let addrs = []
|
||||
const address = server.address()
|
||||
|
||||
if (!address) {
|
||||
return callback(new Error('Listener is not ready yet'))
|
||||
throw new Error('Listener is not ready yet')
|
||||
}
|
||||
|
||||
// Because TCP will only return the IPv6 version
|
||||
// we need to capture from the passed multiaddr
|
||||
if (listeningAddr.toString().indexOf('ip4') !== -1) {
|
||||
let m = listeningAddr.decapsulate('tcp')
|
||||
m = m.encapsulate('/tcp/' + address.port)
|
||||
if (ipfsId) {
|
||||
m = m.encapsulate('/ipfs/' + ipfsId)
|
||||
}
|
||||
|
||||
if (m.toString().indexOf('0.0.0.0') !== -1) {
|
||||
const netInterfaces = os.networkInterfaces()
|
||||
Object.keys(netInterfaces).forEach((niKey) => {
|
||||
netInterfaces[niKey].forEach((ni) => {
|
||||
if (ni.family === 'IPv4') {
|
||||
multiaddrs.push(multiaddr(m.toString().replace('0.0.0.0', ni.address)))
|
||||
}
|
||||
})
|
||||
})
|
||||
} else {
|
||||
multiaddrs.push(m)
|
||||
}
|
||||
if (listeningAddr.toString().startsWith('/ip4')) {
|
||||
addrs = addrs.concat(getMulitaddrs('ip4', address.address, address.port))
|
||||
} else if (address.family === 'IPv6') {
|
||||
addrs = addrs.concat(getMulitaddrs('ip6', address.address, address.port))
|
||||
}
|
||||
|
||||
if (address.family === 'IPv6') {
|
||||
let ma = multiaddr('/ip6/' + address.address + '/tcp/' + address.port)
|
||||
if (ipfsId) {
|
||||
ma = ma.encapsulate('/ipfs/' + ipfsId)
|
||||
}
|
||||
|
||||
multiaddrs.push(ma)
|
||||
}
|
||||
|
||||
callback(null, multiaddrs)
|
||||
return addrs.map(ma => peerId ? ma.encapsulate(`/p2p/${peerId}`) : ma)
|
||||
}
|
||||
|
||||
return listener
|
||||
}
|
||||
|
||||
function getIpfsId (ma) {
|
||||
return ma.stringTuples().filter((tuple) => {
|
||||
return tuple[0] === IPFS_CODE
|
||||
})[0][1]
|
||||
function getMulitaddrs (proto, ip, port) {
|
||||
const toMa = ip => multiaddr(`/${proto}/${ip}/tcp/${port}`)
|
||||
return (isAnyAddr(ip) ? getNetworkAddrs(ProtoFamily[proto]) : [ip]).map(toMa)
|
||||
}
|
||||
|
||||
function trackSocket (server, socket) {
|
||||
const key = `${socket.remoteAddress}:${socket.remotePort}`
|
||||
server.__connections[key] = socket
|
||||
|
||||
socket.on('close', () => {
|
||||
delete server.__connections[key]
|
||||
})
|
||||
function isAnyAddr (ip) {
|
||||
return ['0.0.0.0', '::'].includes(ip)
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
* @param {string} family One of ['IPv6', 'IPv4']
|
||||
* @returns {string[]} an array of ip address strings
|
||||
*/
|
||||
function getNetworkAddrs (family) {
|
||||
return Object.values(os.networkInterfaces()).reduce((addresses, netAddrs) => {
|
||||
netAddrs.forEach(netAddr => {
|
||||
// Add the ip of each matching network interface
|
||||
if (netAddr.family === family) addresses.push(netAddr.address)
|
||||
})
|
||||
return addresses
|
||||
}, [])
|
||||
}
|
||||
|
||||
function trackConn (server, maConn) {
|
||||
server.__connections.push(maConn)
|
||||
|
||||
const untrackConn = () => {
|
||||
server.__connections = server.__connections.filter(c => c !== maConn)
|
||||
}
|
||||
|
||||
maConn.conn.once('close', untrackConn)
|
||||
}
|
||||
|
83
src/socket-to-conn.js
Normal file
83
src/socket-to-conn.js
Normal file
@ -0,0 +1,83 @@
|
||||
'use strict'
|
||||
|
||||
const abortable = require('abortable-iterator')
|
||||
const log = require('debug')('libp2p:tcp:socket')
|
||||
const toIterable = require('stream-to-it')
|
||||
const toMultiaddr = require('./ip-port-to-multiaddr')
|
||||
const { CLOSE_TIMEOUT } = require('./constants')
|
||||
|
||||
// Convert a socket into a MultiaddrConnection
|
||||
// https://github.com/libp2p/interface-transport#multiaddrconnection
|
||||
module.exports = (socket, options) => {
|
||||
options = options || {}
|
||||
|
||||
const { sink, source } = toIterable.duplex(socket)
|
||||
const maConn = {
|
||||
async sink (source) {
|
||||
if (options.signal) {
|
||||
source = abortable(source, options.signal)
|
||||
}
|
||||
|
||||
try {
|
||||
await sink((async function * () {
|
||||
for await (const chunk of source) {
|
||||
// Convert BufferList to Buffer
|
||||
yield Buffer.isBuffer(chunk) ? chunk : chunk.slice()
|
||||
}
|
||||
})())
|
||||
} catch (err) {
|
||||
// If aborted we can safely ignore
|
||||
if (err.type !== 'aborted') {
|
||||
// If the source errored the socket will already have been destroyed by
|
||||
// toIterable.duplex(). If the socket errored it will already be
|
||||
// destroyed. There's nothing to do here except log the error & return.
|
||||
log(err)
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
source: options.signal ? abortable(source, options.signal) : source,
|
||||
|
||||
conn: socket,
|
||||
|
||||
localAddr: toMultiaddr(socket.localAddress, socket.localPort),
|
||||
|
||||
// If the remote address was passed, use it - it may have the peer ID encapsulated
|
||||
remoteAddr: options.remoteAddr || toMultiaddr(socket.remoteAddress, socket.remotePort),
|
||||
|
||||
timeline: { open: Date.now() },
|
||||
|
||||
close () {
|
||||
if (socket.destroyed) return
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const start = Date.now()
|
||||
|
||||
// Attempt to end the socket. If it takes longer to close than the
|
||||
// timeout, destroy it manually.
|
||||
const timeout = setTimeout(() => {
|
||||
const { host, port } = maConn.remoteAddr.toOptions()
|
||||
log('timeout closing socket to %s:%s after %dms, destroying it manually',
|
||||
host, port, Date.now() - start)
|
||||
|
||||
if (socket.destroyed) {
|
||||
log('%s:%s is already destroyed', host, port)
|
||||
} else {
|
||||
socket.destroy()
|
||||
}
|
||||
|
||||
resolve()
|
||||
}, CLOSE_TIMEOUT)
|
||||
|
||||
socket.once('close', () => clearTimeout(timeout))
|
||||
socket.end(err => {
|
||||
maConn.timeline.close = Date.now()
|
||||
if (err) return reject(err)
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return maConn
|
||||
}
|
Reference in New Issue
Block a user