mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-12 10:57:13 +00:00
listen_on now returns a new Multiaddr on success
This commit is contained in:
parent
2dafcdf896
commit
fd7b86ddcb
@ -65,7 +65,7 @@ fn main() {
|
|||||||
|
|
||||||
let future = with_echo.listen_on(swarm::multiaddr::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap())
|
let future = with_echo.listen_on(swarm::multiaddr::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap())
|
||||||
.map_err(|_| panic!())
|
.map_err(|_| panic!())
|
||||||
.unwrap()
|
.unwrap().0
|
||||||
.for_each(|socket| {
|
.for_each(|socket| {
|
||||||
loop_fn(socket, |socket| {
|
loop_fn(socket, |socket| {
|
||||||
socket.into_future()
|
socket.into_future()
|
||||||
|
@ -58,10 +58,16 @@ pub trait Transport {
|
|||||||
/// A future which indicates that we are currently dialing to a peer.
|
/// A future which indicates that we are currently dialing to a peer.
|
||||||
type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
|
type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
|
||||||
|
|
||||||
/// Listen on the given multi-addr.
|
/// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified
|
||||||
|
/// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised
|
||||||
|
/// to other nodes, instead of the one passed as parameter.
|
||||||
///
|
///
|
||||||
/// Returns the address back if it isn't supported.
|
/// Returns the address back if it isn't supported.
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)>
|
///
|
||||||
|
/// > **Note**: The reason why we need to change the `Multiaddr` on success is to handle
|
||||||
|
/// > situations such as turning `/ip4/127.0.0.1/tcp/0` into
|
||||||
|
/// > `/ip4/127.0.0.1/tcp/<actual port>`.
|
||||||
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
|
||||||
where Self: Sized;
|
where Self: Sized;
|
||||||
|
|
||||||
/// Dial to the given multi-addr.
|
/// Dial to the given multi-addr.
|
||||||
@ -108,7 +114,7 @@ impl Transport for DeniedTransport {
|
|||||||
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
Err((DeniedTransport, addr))
|
Err((DeniedTransport, addr))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -133,14 +139,14 @@ impl<A, B> Transport for OrTransport<A, B>
|
|||||||
<B::Dial as IntoFuture>::Future,
|
<B::Dial as IntoFuture>::Future,
|
||||||
>;
|
>;
|
||||||
|
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
let (first, addr) = match self.0.listen_on(addr) {
|
let (first, addr) = match self.0.listen_on(addr) {
|
||||||
Ok(connec) => return Ok(EitherStream::First(connec)),
|
Ok((connec, addr)) => return Ok((EitherStream::First(connec), addr)),
|
||||||
Err(err) => err,
|
Err(err) => err,
|
||||||
};
|
};
|
||||||
|
|
||||||
match self.1.listen_on(addr) {
|
match self.1.listen_on(addr) {
|
||||||
Ok(connec) => Ok(EitherStream::Second(connec)),
|
Ok((connec, addr)) => Ok((EitherStream::Second(connec), addr)),
|
||||||
Err((second, addr)) => Err((OrTransport(first, second), addr)),
|
Err((second, addr)) => Err((OrTransport(first, second), addr)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -543,14 +549,14 @@ impl<'a, T, C> UpgradedNode<T, C>
|
|||||||
pub fn listen_on(
|
pub fn listen_on(
|
||||||
self,
|
self,
|
||||||
addr: Multiaddr,
|
addr: Multiaddr,
|
||||||
) -> Result<Box<Stream<Item = C::Output, Error = IoError> + 'a>, (Self, Multiaddr)>
|
) -> Result<(Box<Stream<Item = C::Output, Error = IoError> + 'a>, Multiaddr), (Self, Multiaddr)>
|
||||||
where C::NamesIter: Clone, // TODO: not elegant
|
where C::NamesIter: Clone, // TODO: not elegant
|
||||||
C: Clone
|
C: Clone
|
||||||
{
|
{
|
||||||
let upgrade = self.upgrade;
|
let upgrade = self.upgrade;
|
||||||
|
|
||||||
let listening_stream = match self.transports.listen_on(addr) {
|
let (listening_stream, new_addr) = match self.transports.listen_on(addr) {
|
||||||
Ok(l) => l,
|
Ok((l, new_addr)) => (l, new_addr),
|
||||||
Err((trans, addr)) => {
|
Err((trans, addr)) => {
|
||||||
let builder = UpgradedNode {
|
let builder = UpgradedNode {
|
||||||
transports: trans,
|
transports: trans,
|
||||||
@ -580,7 +586,7 @@ impl<'a, T, C> UpgradedNode<T, C>
|
|||||||
upgrade.upgrade(connection, upgrade_id)
|
upgrade.upgrade(connection, upgrade_id)
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(Box::new(stream))
|
Ok((Box::new(stream), new_addr))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -596,7 +602,7 @@ impl<T, C> Transport for UpgradedNode<T, C>
|
|||||||
type Dial = Box<Future<Item = C::Output, Error = IoError>>;
|
type Dial = Box<Future<Item = C::Output, Error = IoError>>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)>
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
|
||||||
where Self: Sized
|
where Self: Sized
|
||||||
{
|
{
|
||||||
self.listen_on(addr)
|
self.listen_on(addr)
|
||||||
|
@ -62,17 +62,26 @@ impl Transport for Tcp {
|
|||||||
|
|
||||||
/// Listen on the given multi-addr.
|
/// Listen on the given multi-addr.
|
||||||
/// Returns the address back if it isn't supported.
|
/// Returns the address back if it isn't supported.
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
|
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
|
||||||
Ok(Box::new(
|
let listener = TcpListener::bind(&socket_addr, &self.event_loop);
|
||||||
futures::future::result(
|
// We need to build the `Multiaddr` to return from this function. If an error happened,
|
||||||
TcpListener::bind(&socket_addr, &self.event_loop),
|
// just return the original multiaddr.
|
||||||
).map(|listener| {
|
let new_addr = match listener {
|
||||||
|
Ok(ref l) => if let Ok(new_s_addr) = l.local_addr() {
|
||||||
|
Multiaddr::new(&format!("/ip4/{}/tcp/{}", new_s_addr.ip(), new_s_addr.port()))
|
||||||
|
.expect("manually-generated multiaddr is always valid")
|
||||||
|
} else {
|
||||||
|
addr
|
||||||
|
}
|
||||||
|
Err(_) => addr,
|
||||||
|
};
|
||||||
|
let future = futures::future::result(listener).map(|listener| {
|
||||||
// Pull out a stream of sockets for incoming connections
|
// Pull out a stream of sockets for incoming connections
|
||||||
listener.incoming().map(|x| x.0)
|
listener.incoming().map(|x| x.0)
|
||||||
})
|
})
|
||||||
.flatten_stream(),
|
.flatten_stream();
|
||||||
))
|
Ok((Box::new(future), new_addr))
|
||||||
} else {
|
} else {
|
||||||
Err((self, addr))
|
Err((self, addr))
|
||||||
}
|
}
|
||||||
@ -91,7 +100,7 @@ impl Transport for Tcp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// This type of logic should probably be moved into the multiaddr package
|
// This type of logic should probably be moved into the multiaddr package
|
||||||
fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, &Multiaddr> {
|
fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
|
||||||
let protocols = addr.protocol();
|
let protocols = addr.protocol();
|
||||||
|
|
||||||
// TODO: This is nonconforming (since a multiaddr could specify TCP first) but we can't fix that
|
// TODO: This is nonconforming (since a multiaddr could specify TCP first) but we can't fix that
|
||||||
@ -114,9 +123,9 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, &Multiaddr> {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(addr)
|
Err(())
|
||||||
}
|
}
|
||||||
_ => Err(addr),
|
_ => Err(()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -188,7 +197,7 @@ mod tests {
|
|||||||
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap();
|
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap();
|
||||||
let tcp = Tcp::new(core.handle()).unwrap();
|
let tcp = Tcp::new(core.handle()).unwrap();
|
||||||
let handle = core.handle();
|
let handle = core.handle();
|
||||||
let listener = tcp.listen_on(addr).unwrap().for_each(|sock| {
|
let listener = tcp.listen_on(addr).unwrap().0.for_each(|sock| {
|
||||||
// Define what to do with the socket that just connected to us
|
// Define what to do with the socket that just connected to us
|
||||||
// Which in this case is read 3 bytes
|
// Which in this case is read 3 bytes
|
||||||
let handle_conn = tokio_io::io::read_exact(sock, [0; 3])
|
let handle_conn = tokio_io::io::read_exact(sock, [0; 3])
|
||||||
@ -221,4 +230,16 @@ mod tests {
|
|||||||
core.run(action).unwrap();
|
core.run(action).unwrap();
|
||||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn replace_port_0_in_returned_multiaddr() {
|
||||||
|
let core = Core::new().unwrap();
|
||||||
|
let tcp = Tcp::new(core.handle()).unwrap();
|
||||||
|
|
||||||
|
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/0").unwrap();
|
||||||
|
assert!(addr.to_string().contains("tcp/0"));
|
||||||
|
|
||||||
|
let (_, new_addr) = tcp.listen_on(addr).unwrap();
|
||||||
|
assert!(!new_addr.to_string().contains("tcp/0"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user