mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-23 14:51:34 +00:00
Replace listen_addr
with local_addr
. (#1223)
* Replace `listen_addr` with `local_addr`. In `ListenerUpgrade`, `ConnectedPoint` and other event types where we were previously using the listen address we now report the local address of an incoming connection. The reason being that it is difficult to get the listen address right. In case clients want to know, which listener produced an incoming connection upgrade they are advised to use the `ListenerId` for such purposes. * Update transports/tcp/src/lib.rs Co-Authored-By: Max Inden <mail@max-inden.de>
This commit is contained in:
committed by
Pierre Krieger
parent
56c14071d8
commit
9793c42859
@ -110,8 +110,8 @@ pub enum ConnectedPoint {
|
|||||||
},
|
},
|
||||||
/// We received the node.
|
/// We received the node.
|
||||||
Listener {
|
Listener {
|
||||||
/// Address of the listener that received the connection.
|
/// Local connection address.
|
||||||
listen_addr: Multiaddr,
|
local_addr: Multiaddr,
|
||||||
/// Stack of protocols used to send back data to the remote.
|
/// Stack of protocols used to send back data to the remote.
|
||||||
send_back_addr: Multiaddr,
|
send_back_addr: Multiaddr,
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,7 @@
|
|||||||
|
|
||||||
use crate::{Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
|
use crate::{Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use log::{debug, warn};
|
use log::debug;
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use std::{collections::VecDeque, fmt};
|
use std::{collections::VecDeque, fmt};
|
||||||
use void::Void;
|
use void::Void;
|
||||||
@ -66,8 +66,8 @@ use void::Void;
|
|||||||
/// ListenersEvent::Error { listener_id, error } => {
|
/// ListenersEvent::Error { listener_id, error } => {
|
||||||
/// println!("Listener {:?} has experienced an error: {}", listener_id, error);
|
/// println!("Listener {:?} has experienced an error: {}", listener_id, error);
|
||||||
/// },
|
/// },
|
||||||
/// ListenersEvent::Incoming { listener_id, upgrade, listen_addr, .. } => {
|
/// ListenersEvent::Incoming { listener_id, upgrade, local_addr, .. } => {
|
||||||
/// println!("Listener {:?} has a new connection on {}", listener_id, listen_addr);
|
/// println!("Listener {:?} has a new connection on {}", listener_id, local_addr);
|
||||||
/// // We don't do anything with the newly-opened connection, but in a real-life
|
/// // We don't do anything with the newly-opened connection, but in a real-life
|
||||||
/// // program you probably want to use it!
|
/// // program you probably want to use it!
|
||||||
/// drop(upgrade);
|
/// drop(upgrade);
|
||||||
@ -138,8 +138,8 @@ where
|
|||||||
listener_id: ListenerId,
|
listener_id: ListenerId,
|
||||||
/// The produced upgrade.
|
/// The produced upgrade.
|
||||||
upgrade: TTrans::ListenerUpgrade,
|
upgrade: TTrans::ListenerUpgrade,
|
||||||
/// Address of the listener which received the connection.
|
/// Local connection address.
|
||||||
listen_addr: Multiaddr,
|
local_addr: Multiaddr,
|
||||||
/// Address used to send back data to the incoming client.
|
/// Address used to send back data to the incoming client.
|
||||||
send_back_addr: Multiaddr,
|
send_back_addr: Multiaddr,
|
||||||
},
|
},
|
||||||
@ -233,18 +233,13 @@ where
|
|||||||
remaining -= 1;
|
remaining -= 1;
|
||||||
if remaining == 0 { break }
|
if remaining == 0 { break }
|
||||||
}
|
}
|
||||||
Ok(Async::Ready(Some(ListenerEvent::Upgrade { upgrade, listen_addr, remote_addr }))) => {
|
Ok(Async::Ready(Some(ListenerEvent::Upgrade { upgrade, local_addr, remote_addr }))) => {
|
||||||
if !listener.addresses.contains(&listen_addr) {
|
|
||||||
warn!("Transport reported listen address {} not in the list: {:?}",
|
|
||||||
listen_addr,
|
|
||||||
listener.addresses)
|
|
||||||
}
|
|
||||||
let id = listener.id;
|
let id = listener.id;
|
||||||
self.listeners.push_front(listener);
|
self.listeners.push_front(listener);
|
||||||
return Async::Ready(ListenersEvent::Incoming {
|
return Async::Ready(ListenersEvent::Incoming {
|
||||||
listener_id: id,
|
listener_id: id,
|
||||||
upgrade,
|
upgrade,
|
||||||
listen_addr,
|
local_addr,
|
||||||
send_back_addr: remote_addr
|
send_back_addr: remote_addr
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -332,10 +327,10 @@ where
|
|||||||
.field("listener_id", listener_id)
|
.field("listener_id", listener_id)
|
||||||
.field("listen_addr", listen_addr)
|
.field("listen_addr", listen_addr)
|
||||||
.finish(),
|
.finish(),
|
||||||
ListenersEvent::Incoming { listener_id, listen_addr, .. } => f
|
ListenersEvent::Incoming { listener_id, local_addr, .. } => f
|
||||||
.debug_struct("ListenersEvent::Incoming")
|
.debug_struct("ListenersEvent::Incoming")
|
||||||
.field("listener_id", listener_id)
|
.field("listener_id", listener_id)
|
||||||
.field("listen_addr", listen_addr)
|
.field("local_addr", local_addr)
|
||||||
.finish(),
|
.finish(),
|
||||||
ListenersEvent::Closed { listener_id, .. } => f
|
ListenersEvent::Closed { listener_id, .. } => f
|
||||||
.debug_struct("ListenersEvent::Closed")
|
.debug_struct("ListenersEvent::Closed")
|
||||||
@ -401,8 +396,8 @@ mod tests {
|
|||||||
.map_err(|(err, _)| err)
|
.map_err(|(err, _)| err)
|
||||||
.and_then(|(event, _)| {
|
.and_then(|(event, _)| {
|
||||||
match event {
|
match event {
|
||||||
Some(ListenersEvent::Incoming { listen_addr, upgrade, send_back_addr, .. }) => {
|
Some(ListenersEvent::Incoming { local_addr, upgrade, send_back_addr, .. }) => {
|
||||||
assert_eq!(listen_addr, address);
|
assert_eq!(local_addr, address);
|
||||||
assert_eq!(send_back_addr, address);
|
assert_eq!(send_back_addr, address);
|
||||||
upgrade.map(|_| ()).map_err(|_| panic!())
|
upgrade.map(|_| ()).map_err(|_| panic!())
|
||||||
},
|
},
|
||||||
@ -475,17 +470,17 @@ mod tests {
|
|||||||
ListenerEvent::NewAddress(tcp4([127, 0, 0, 1], 9090)),
|
ListenerEvent::NewAddress(tcp4([127, 0, 0, 1], 9090)),
|
||||||
ListenerEvent::Upgrade {
|
ListenerEvent::Upgrade {
|
||||||
upgrade: (peer_id.clone(), muxer.clone()),
|
upgrade: (peer_id.clone(), muxer.clone()),
|
||||||
listen_addr: tcp4([127, 0, 0, 1], 9090),
|
local_addr: tcp4([127, 0, 0, 1], 9090),
|
||||||
remote_addr: tcp4([127, 0, 0, 1], 32000)
|
remote_addr: tcp4([127, 0, 0, 1], 32000)
|
||||||
},
|
},
|
||||||
ListenerEvent::Upgrade {
|
ListenerEvent::Upgrade {
|
||||||
upgrade: (peer_id.clone(), muxer.clone()),
|
upgrade: (peer_id.clone(), muxer.clone()),
|
||||||
listen_addr: tcp4([127, 0, 0, 1], 9090),
|
local_addr: tcp4([127, 0, 0, 1], 9090),
|
||||||
remote_addr: tcp4([127, 0, 0, 1], 32000)
|
remote_addr: tcp4([127, 0, 0, 1], 32000)
|
||||||
},
|
},
|
||||||
ListenerEvent::Upgrade {
|
ListenerEvent::Upgrade {
|
||||||
upgrade: (peer_id.clone(), muxer.clone()),
|
upgrade: (peer_id.clone(), muxer.clone()),
|
||||||
listen_addr: tcp4([127, 0, 0, 1], 9090),
|
local_addr: tcp4([127, 0, 0, 1], 9090),
|
||||||
remote_addr: tcp4([127, 0, 0, 1], 32000)
|
remote_addr: tcp4([127, 0, 0, 1], 32000)
|
||||||
}
|
}
|
||||||
]));
|
]));
|
||||||
@ -550,7 +545,7 @@ mod tests {
|
|||||||
let muxer = DummyMuxer::new();
|
let muxer = DummyMuxer::new();
|
||||||
let event = ListenerEvent::Upgrade {
|
let event = ListenerEvent::Upgrade {
|
||||||
upgrade: (peer_id, muxer),
|
upgrade: (peer_id, muxer),
|
||||||
listen_addr: tcp4([127, 0, 0, 1], 1234),
|
local_addr: tcp4([127, 0, 0, 1], 1234),
|
||||||
remote_addr: tcp4([127, 0, 0, 1], 32000)
|
remote_addr: tcp4([127, 0, 0, 1], 32000)
|
||||||
};
|
};
|
||||||
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(event))));
|
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(event))));
|
||||||
|
@ -200,8 +200,8 @@ where
|
|||||||
/// This can include, for example, an error during the handshake of the encryption layer, or
|
/// This can include, for example, an error during the handshake of the encryption layer, or
|
||||||
/// the connection unexpectedly closed.
|
/// the connection unexpectedly closed.
|
||||||
IncomingConnectionError {
|
IncomingConnectionError {
|
||||||
/// The address of the listener which received the connection.
|
/// Local connection address.
|
||||||
listen_addr: Multiaddr,
|
local_addr: Multiaddr,
|
||||||
/// Address used to send back data to the remote.
|
/// Address used to send back data to the remote.
|
||||||
send_back_addr: Multiaddr,
|
send_back_addr: Multiaddr,
|
||||||
/// The error that happened.
|
/// The error that happened.
|
||||||
@ -315,13 +315,13 @@ where
|
|||||||
}
|
}
|
||||||
NetworkEvent::IncomingConnection(event) => {
|
NetworkEvent::IncomingConnection(event) => {
|
||||||
f.debug_struct("IncomingConnection")
|
f.debug_struct("IncomingConnection")
|
||||||
.field("listen_addr", &event.listen_addr)
|
.field("local_addr", &event.local_addr)
|
||||||
.field("send_back_addr", &event.send_back_addr)
|
.field("send_back_addr", &event.send_back_addr)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
NetworkEvent::IncomingConnectionError { listen_addr, send_back_addr, error } => {
|
NetworkEvent::IncomingConnectionError { local_addr, send_back_addr, error } => {
|
||||||
f.debug_struct("IncomingConnectionError")
|
f.debug_struct("IncomingConnectionError")
|
||||||
.field("listen_addr", listen_addr)
|
.field("local_addr", local_addr)
|
||||||
.field("send_back_addr", send_back_addr)
|
.field("send_back_addr", send_back_addr)
|
||||||
.field("error", error)
|
.field("error", error)
|
||||||
.finish()
|
.finish()
|
||||||
@ -558,8 +558,8 @@ where TTrans: Transport
|
|||||||
upgrade: TTrans::ListenerUpgrade,
|
upgrade: TTrans::ListenerUpgrade,
|
||||||
/// PeerId of the local node.
|
/// PeerId of the local node.
|
||||||
local_peer_id: TPeerId,
|
local_peer_id: TPeerId,
|
||||||
/// Addresses of the listener which received the connection.
|
/// Local connection address.
|
||||||
listen_addr: Multiaddr,
|
local_addr: Multiaddr,
|
||||||
/// Address used to send back data to the remote.
|
/// Address used to send back data to the remote.
|
||||||
send_back_addr: Multiaddr,
|
send_back_addr: Multiaddr,
|
||||||
/// Reference to the `active_nodes` field of the `Network`.
|
/// Reference to the `active_nodes` field of the `Network`.
|
||||||
@ -630,14 +630,14 @@ where TTrans: Transport
|
|||||||
/// Returns the `IncomingInfo` corresponding to this incoming connection.
|
/// Returns the `IncomingInfo` corresponding to this incoming connection.
|
||||||
pub fn info(&self) -> IncomingInfo<'_> {
|
pub fn info(&self) -> IncomingInfo<'_> {
|
||||||
IncomingInfo {
|
IncomingInfo {
|
||||||
listen_addr: &self.listen_addr,
|
local_addr: &self.local_addr,
|
||||||
send_back_addr: &self.send_back_addr,
|
send_back_addr: &self.send_back_addr,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Address of the listener that received the connection.
|
/// Local connection address.
|
||||||
pub fn listen_addr(&self) -> &Multiaddr {
|
pub fn local_addr(&self) -> &Multiaddr {
|
||||||
&self.listen_addr
|
&self.local_addr
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Address used to send back data to the dialer.
|
/// Address used to send back data to the dialer.
|
||||||
@ -654,8 +654,8 @@ where TTrans: Transport
|
|||||||
/// Information about an incoming connection currently being negotiated.
|
/// Information about an incoming connection currently being negotiated.
|
||||||
#[derive(Debug, Copy, Clone)]
|
#[derive(Debug, Copy, Clone)]
|
||||||
pub struct IncomingInfo<'a> {
|
pub struct IncomingInfo<'a> {
|
||||||
/// Listener address that received the connection.
|
/// Local connection address.
|
||||||
pub listen_addr: &'a Multiaddr,
|
pub local_addr: &'a Multiaddr,
|
||||||
/// Stack of protocols used to send back data to the remote.
|
/// Stack of protocols used to send back data to the remote.
|
||||||
pub send_back_addr: &'a Multiaddr,
|
pub send_back_addr: &'a Multiaddr,
|
||||||
}
|
}
|
||||||
@ -664,7 +664,7 @@ impl<'a> IncomingInfo<'a> {
|
|||||||
/// Builds the `ConnectedPoint` corresponding to the incoming connection.
|
/// Builds the `ConnectedPoint` corresponding to the incoming connection.
|
||||||
pub fn to_connected_point(&self) -> ConnectedPoint {
|
pub fn to_connected_point(&self) -> ConnectedPoint {
|
||||||
ConnectedPoint::Listener {
|
ConnectedPoint::Listener {
|
||||||
listen_addr: self.listen_addr.clone(),
|
local_addr: self.local_addr.clone(),
|
||||||
send_back_addr: self.send_back_addr.clone(),
|
send_back_addr: self.send_back_addr.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -831,8 +831,8 @@ where
|
|||||||
.iter()
|
.iter()
|
||||||
.filter_map(|&(_, ref endpoint)| {
|
.filter_map(|&(_, ref endpoint)| {
|
||||||
match endpoint {
|
match endpoint {
|
||||||
ConnectedPoint::Listener { listen_addr, send_back_addr } => {
|
ConnectedPoint::Listener { local_addr, send_back_addr } => {
|
||||||
Some(IncomingInfo { listen_addr, send_back_addr })
|
Some(IncomingInfo { local_addr, send_back_addr })
|
||||||
},
|
},
|
||||||
ConnectedPoint::Dialer { .. } => None,
|
ConnectedPoint::Dialer { .. } => None,
|
||||||
}
|
}
|
||||||
@ -1005,12 +1005,12 @@ where
|
|||||||
_ => {
|
_ => {
|
||||||
match self.listeners.poll() {
|
match self.listeners.poll() {
|
||||||
Async::NotReady => (),
|
Async::NotReady => (),
|
||||||
Async::Ready(ListenersEvent::Incoming { listener_id, upgrade, listen_addr, send_back_addr }) => {
|
Async::Ready(ListenersEvent::Incoming { listener_id, upgrade, local_addr, send_back_addr }) => {
|
||||||
let event = IncomingConnectionEvent {
|
let event = IncomingConnectionEvent {
|
||||||
listener_id,
|
listener_id,
|
||||||
upgrade,
|
upgrade,
|
||||||
local_peer_id: self.reach_attempts.local_peer_id.clone(),
|
local_peer_id: self.reach_attempts.local_peer_id.clone(),
|
||||||
listen_addr,
|
local_addr,
|
||||||
send_back_addr,
|
send_back_addr,
|
||||||
active_nodes: &mut self.active_nodes,
|
active_nodes: &mut self.active_nodes,
|
||||||
other_reach_attempts: &mut self.reach_attempts.other_reach_attempts,
|
other_reach_attempts: &mut self.reach_attempts.other_reach_attempts,
|
||||||
@ -1171,9 +1171,9 @@ where
|
|||||||
// If we have a lower peer ID than the incoming one, we drop an incoming connection.
|
// If we have a lower peer ID than the incoming one, we drop an incoming connection.
|
||||||
if event.would_replace() && has_dial_prio {
|
if event.would_replace() && has_dial_prio {
|
||||||
if let Some(ConnectedPoint::Dialer { .. }) = reach_attempts.connected_points.get(event.peer_id()) {
|
if let Some(ConnectedPoint::Dialer { .. }) = reach_attempts.connected_points.get(event.peer_id()) {
|
||||||
if let ConnectedPoint::Listener { listen_addr, send_back_addr } = opened_endpoint {
|
if let ConnectedPoint::Listener { local_addr, send_back_addr } = opened_endpoint {
|
||||||
return (Default::default(), NetworkEvent::IncomingConnectionError {
|
return (Default::default(), NetworkEvent::IncomingConnectionError {
|
||||||
listen_addr,
|
local_addr,
|
||||||
send_back_addr,
|
send_back_addr,
|
||||||
error: IncomingError::DeniedLowerPriority,
|
error: IncomingError::DeniedLowerPriority,
|
||||||
});
|
});
|
||||||
@ -1385,7 +1385,7 @@ where
|
|||||||
handler,
|
handler,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
ConnectedPoint::Listener { listen_addr, send_back_addr } => {
|
ConnectedPoint::Listener { local_addr, send_back_addr } => {
|
||||||
let error = match error {
|
let error = match error {
|
||||||
InternalReachErr::Transport(err) => IncomingError::Transport(err),
|
InternalReachErr::Transport(err) => IncomingError::Transport(err),
|
||||||
InternalReachErr::FoundLocalPeerId => IncomingError::FoundLocalPeerId,
|
InternalReachErr::FoundLocalPeerId => IncomingError::FoundLocalPeerId,
|
||||||
@ -1395,7 +1395,7 @@ where
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
return (Default::default(), NetworkEvent::IncomingConnectionError {
|
return (Default::default(), NetworkEvent::IncomingConnectionError {
|
||||||
listen_addr,
|
local_addr,
|
||||||
send_back_addr,
|
send_back_addr,
|
||||||
error
|
error
|
||||||
});
|
});
|
||||||
|
@ -88,7 +88,7 @@ fn num_incoming_negotiated() {
|
|||||||
ListenerEvent::NewAddress("/ip4/127.0.0.1/tcp/1234".parse().unwrap()),
|
ListenerEvent::NewAddress("/ip4/127.0.0.1/tcp/1234".parse().unwrap()),
|
||||||
ListenerEvent::Upgrade {
|
ListenerEvent::Upgrade {
|
||||||
upgrade: (peer_id.clone(), muxer.clone()),
|
upgrade: (peer_id.clone(), muxer.clone()),
|
||||||
listen_addr: "/ip4/127.0.0.1/tcp/1234".parse().unwrap(),
|
local_addr: "/ip4/127.0.0.1/tcp/1234".parse().unwrap(),
|
||||||
remote_addr: "/ip4/127.0.0.1/tcp/32111".parse().unwrap()
|
remote_addr: "/ip4/127.0.0.1/tcp/32111".parse().unwrap()
|
||||||
}
|
}
|
||||||
];
|
];
|
||||||
@ -399,7 +399,7 @@ fn limit_incoming_connections() {
|
|||||||
events.extend(std::iter::repeat(
|
events.extend(std::iter::repeat(
|
||||||
ListenerEvent::Upgrade {
|
ListenerEvent::Upgrade {
|
||||||
upgrade: (peer_id.clone(), muxer.clone()),
|
upgrade: (peer_id.clone(), muxer.clone()),
|
||||||
listen_addr: "/ip4/127.0.0.1/tcp/1234".parse().unwrap(),
|
local_addr: "/ip4/127.0.0.1/tcp/1234".parse().unwrap(),
|
||||||
remote_addr: "/ip4/127.0.0.1/tcp/32111".parse().unwrap()
|
remote_addr: "/ip4/127.0.0.1/tcp/32111".parse().unwrap()
|
||||||
}
|
}
|
||||||
).take(10));
|
).take(10));
|
||||||
|
@ -93,9 +93,9 @@ where
|
|||||||
match self.stream.poll().map_err(EitherError::A)? {
|
match self.stream.poll().map_err(EitherError::A)? {
|
||||||
Async::Ready(Some(event)) => {
|
Async::Ready(Some(event)) => {
|
||||||
let event = match event {
|
let event = match event {
|
||||||
ListenerEvent::Upgrade { upgrade, listen_addr, remote_addr } => {
|
ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => {
|
||||||
let point = ConnectedPoint::Listener {
|
let point = ConnectedPoint::Listener {
|
||||||
listen_addr: listen_addr.clone(),
|
local_addr: local_addr.clone(),
|
||||||
send_back_addr: remote_addr.clone()
|
send_back_addr: remote_addr.clone()
|
||||||
};
|
};
|
||||||
ListenerEvent::Upgrade {
|
ListenerEvent::Upgrade {
|
||||||
@ -103,7 +103,7 @@ where
|
|||||||
inner: Either::A(upgrade),
|
inner: Either::A(upgrade),
|
||||||
args: Some((self.fun.clone(), point))
|
args: Some((self.fun.clone(), point))
|
||||||
},
|
},
|
||||||
listen_addr,
|
local_addr,
|
||||||
remote_addr
|
remote_addr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -77,9 +77,9 @@ where
|
|||||||
match self.stream.poll()? {
|
match self.stream.poll()? {
|
||||||
Async::Ready(Some(event)) => {
|
Async::Ready(Some(event)) => {
|
||||||
let event = match event {
|
let event = match event {
|
||||||
ListenerEvent::Upgrade { upgrade, listen_addr, remote_addr } => {
|
ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => {
|
||||||
let point = ConnectedPoint::Listener {
|
let point = ConnectedPoint::Listener {
|
||||||
listen_addr: listen_addr.clone(),
|
local_addr: local_addr.clone(),
|
||||||
send_back_addr: remote_addr.clone()
|
send_back_addr: remote_addr.clone()
|
||||||
};
|
};
|
||||||
ListenerEvent::Upgrade {
|
ListenerEvent::Upgrade {
|
||||||
@ -87,7 +87,7 @@ where
|
|||||||
inner: upgrade,
|
inner: upgrade,
|
||||||
args: Some((self.fun.clone(), point))
|
args: Some((self.fun.clone(), point))
|
||||||
},
|
},
|
||||||
listen_addr,
|
local_addr,
|
||||||
remote_addr
|
remote_addr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -192,7 +192,7 @@ impl Stream for Listener {
|
|||||||
};
|
};
|
||||||
let event = ListenerEvent::Upgrade {
|
let event = ListenerEvent::Upgrade {
|
||||||
upgrade: future::ok(channel),
|
upgrade: future::ok(channel),
|
||||||
listen_addr: self.addr.clone(),
|
local_addr: self.addr.clone(),
|
||||||
remote_addr: Protocol::Memory(self.port.get()).into()
|
remote_addr: Protocol::Memory(self.port.get()).into()
|
||||||
};
|
};
|
||||||
Ok(Async::Ready(Some(event)))
|
Ok(Async::Ready(Some(event)))
|
||||||
|
@ -241,8 +241,8 @@ pub enum ListenerEvent<T> {
|
|||||||
Upgrade {
|
Upgrade {
|
||||||
/// The upgrade.
|
/// The upgrade.
|
||||||
upgrade: T,
|
upgrade: T,
|
||||||
/// The listening address which produced this upgrade.
|
/// The local address which produced this upgrade.
|
||||||
listen_addr: Multiaddr,
|
local_addr: Multiaddr,
|
||||||
/// The remote address which produced this upgrade.
|
/// The remote address which produced this upgrade.
|
||||||
remote_addr: Multiaddr
|
remote_addr: Multiaddr
|
||||||
},
|
},
|
||||||
@ -256,8 +256,8 @@ impl<T> ListenerEvent<T> {
|
|||||||
/// based the the function's result.
|
/// based the the function's result.
|
||||||
pub fn map<U>(self, f: impl FnOnce(T) -> U) -> ListenerEvent<U> {
|
pub fn map<U>(self, f: impl FnOnce(T) -> U) -> ListenerEvent<U> {
|
||||||
match self {
|
match self {
|
||||||
ListenerEvent::Upgrade { upgrade, listen_addr, remote_addr } => {
|
ListenerEvent::Upgrade { upgrade, local_addr, remote_addr } => {
|
||||||
ListenerEvent::Upgrade { upgrade: f(upgrade), listen_addr, remote_addr }
|
ListenerEvent::Upgrade { upgrade: f(upgrade), local_addr, remote_addr }
|
||||||
}
|
}
|
||||||
ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
|
ListenerEvent::NewAddress(a) => ListenerEvent::NewAddress(a),
|
||||||
ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a)
|
ListenerEvent::AddressExpired(a) => ListenerEvent::AddressExpired(a)
|
||||||
|
@ -237,11 +237,11 @@ fn dial_self() {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
Async::Ready(NetworkEvent::IncomingConnectionError {
|
Async::Ready(NetworkEvent::IncomingConnectionError {
|
||||||
listen_addr,
|
local_addr,
|
||||||
send_back_addr: _,
|
send_back_addr: _,
|
||||||
error: IncomingError::FoundLocalPeerId
|
error: IncomingError::FoundLocalPeerId
|
||||||
}) => {
|
}) => {
|
||||||
assert_eq!(address, listen_addr);
|
assert_eq!(address, local_addr);
|
||||||
assert!(!got_inc_err);
|
assert!(!got_inc_err);
|
||||||
got_inc_err = true;
|
got_inc_err = true;
|
||||||
if got_dial_err {
|
if got_dial_err {
|
||||||
@ -249,7 +249,7 @@ fn dial_self() {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
Async::Ready(NetworkEvent::IncomingConnection(inc)) => {
|
Async::Ready(NetworkEvent::IncomingConnection(inc)) => {
|
||||||
assert_eq!(*inc.listen_addr(), address);
|
assert_eq!(*inc.local_addr(), address);
|
||||||
inc.accept(TestHandler::default().into_node_handler_builder());
|
inc.accept(TestHandler::default().into_node_handler_builder());
|
||||||
},
|
},
|
||||||
Async::Ready(ev) => {
|
Async::Ready(ev) => {
|
||||||
|
@ -415,60 +415,59 @@ pub struct TcpListenStream {
|
|||||||
config: TcpConfig
|
config: TcpConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// Map a `SocketAddr` to the corresponding `Multiaddr`.
|
// If we listen on all interfaces, find out to which interface the given
|
||||||
// If not found, check for host address changes.
|
// socket address belongs. In case we think the address is new, check
|
||||||
// This is a function rather than a method due to borrowing issues.
|
// all host interfaces again and report new and expired listen addresses.
|
||||||
fn map_addr(addr: &SocketAddr, addrs: &mut Addresses, pending: &mut Buffer, port: u16)
|
fn check_for_interface_changes(
|
||||||
-> Result<Multiaddr, io::Error>
|
socket_addr: &SocketAddr,
|
||||||
{
|
listen_port: u16,
|
||||||
match addrs {
|
listen_addrs: &mut Vec<(IpAddr, IpNet, Multiaddr)>,
|
||||||
Addresses::One(ref ma) => Ok(ma.clone()),
|
pending: &mut Buffer
|
||||||
Addresses::Many(ref mut addrs) => {
|
) -> Result<(), io::Error> {
|
||||||
// Check for exact match:
|
// Check for exact match:
|
||||||
if let Some((_, _, ma)) = addrs.iter().find(|(i, ..)| i == &addr.ip()) {
|
if listen_addrs.iter().find(|(ip, ..)| ip == &socket_addr.ip()).is_some() {
|
||||||
return Ok(ma.clone())
|
return Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// No exact match => check netmask
|
// No exact match => check netmask
|
||||||
if let Some((_, _, ma)) = addrs.iter().find(|(_, i, _)| i.contains(&addr.ip())) {
|
if listen_addrs.iter().find(|(_, net, _)| net.contains(&socket_addr.ip())).is_some() {
|
||||||
return Ok(ma.clone())
|
return Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
// The local IP address of this socket is new to us.
|
// The local IP address of this socket is new to us.
|
||||||
// We need to check for changes in the set of host addresses and report new
|
// We check for changes in the set of host addresses and report new
|
||||||
// and expired addresses.
|
// and expired addresses.
|
||||||
//
|
//
|
||||||
// TODO: We do not detect expired addresses unless there is a new address.
|
// TODO: We do not detect expired addresses unless there is a new address.
|
||||||
|
let old_listen_addrs = std::mem::replace(listen_addrs, host_addresses(listen_port)?);
|
||||||
|
|
||||||
let new_addrs = host_addresses(port)?;
|
// Check for addresses no longer in use.
|
||||||
let old_addrs = std::mem::replace(addrs, new_addrs);
|
for (ip, _, ma) in old_listen_addrs.iter() {
|
||||||
|
if listen_addrs.iter().find(|(i, ..)| i == ip).is_none() {
|
||||||
// Check for addresses no longer in use.
|
debug!("Expired listen address: {}", ma);
|
||||||
for (ip, _, ma) in old_addrs.iter() {
|
pending.push_back(ListenerEvent::AddressExpired(ma.clone()));
|
||||||
if addrs.iter().find(|(i, ..)| i == ip).is_none() {
|
|
||||||
debug!("Expired listen address: {}", ma);
|
|
||||||
pending.push_back(ListenerEvent::AddressExpired(ma.clone()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for new addresses.
|
|
||||||
for (ip, _, ma) in addrs.iter() {
|
|
||||||
if old_addrs.iter().find(|(i, ..)| i == ip).is_none() {
|
|
||||||
debug!("New listen address: {}", ma);
|
|
||||||
pending.push_back(ListenerEvent::NewAddress(ma.clone()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// We should now be able to find the listen address of the local socket address,
|
|
||||||
// if not something is seriously wrong and we report an error.
|
|
||||||
if addrs.iter().find(|(i, j, _)| i == &addr.ip() || j.contains(&addr.ip())).is_none() {
|
|
||||||
let msg = format!("{} does not match any listen address", addr.ip());
|
|
||||||
return Err(io::Error::new(io::ErrorKind::Other, msg))
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(ip_to_multiaddr(addr.ip(), port))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check for new addresses.
|
||||||
|
for (ip, _, ma) in listen_addrs.iter() {
|
||||||
|
if old_listen_addrs.iter().find(|(i, ..)| i == ip).is_none() {
|
||||||
|
debug!("New listen address: {}", ma);
|
||||||
|
pending.push_back(ListenerEvent::NewAddress(ma.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We should now be able to find the local address, if not something
|
||||||
|
// is seriously wrong and we report an error.
|
||||||
|
if listen_addrs.iter()
|
||||||
|
.find(|(ip, net, _)| ip == &socket_addr.ip() || net.contains(&socket_addr.ip()))
|
||||||
|
.is_none()
|
||||||
|
{
|
||||||
|
let msg = format!("{} does not match any listen address", socket_addr.ip());
|
||||||
|
return Err(io::Error::new(io::ErrorKind::Other, msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for TcpListenStream {
|
impl Stream for TcpListenStream {
|
||||||
@ -496,8 +495,13 @@ impl Stream for TcpListenStream {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let listen_addr = match sock.local_addr() {
|
let local_addr = match sock.local_addr() {
|
||||||
Ok(addr) => map_addr(&addr, &mut self.addrs, &mut self.pending, self.port)?,
|
Ok(sock_addr) => {
|
||||||
|
if let Addresses::Many(ref mut addrs) = self.addrs {
|
||||||
|
check_for_interface_changes(&sock_addr, self.port, addrs, &mut self.pending)?
|
||||||
|
}
|
||||||
|
ip_to_multiaddr(sock_addr.ip(), sock_addr.port())
|
||||||
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("Failed to get local address of incoming socket: {:?}", err);
|
debug!("Failed to get local address of incoming socket: {:?}", err);
|
||||||
continue
|
continue
|
||||||
@ -508,10 +512,10 @@ impl Stream for TcpListenStream {
|
|||||||
|
|
||||||
match apply_config(&self.config, &sock) {
|
match apply_config(&self.config, &sock) {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
trace!("Incoming connection from {} on {}", remote_addr, listen_addr);
|
trace!("Incoming connection from {} at {}", remote_addr, local_addr);
|
||||||
self.pending.push_back(ListenerEvent::Upgrade {
|
self.pending.push_back(ListenerEvent::Upgrade {
|
||||||
upgrade: future::ok(TcpTransStream { inner: sock }),
|
upgrade: future::ok(TcpTransStream { inner: sock }),
|
||||||
listen_addr,
|
local_addr,
|
||||||
remote_addr
|
remote_addr
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -519,7 +523,7 @@ impl Stream for TcpListenStream {
|
|||||||
debug!("Error upgrading incoming connection from {}: {:?}", remote_addr, err);
|
debug!("Error upgrading incoming connection from {}: {:?}", remote_addr, err);
|
||||||
self.pending.push_back(ListenerEvent::Upgrade {
|
self.pending.push_back(ListenerEvent::Upgrade {
|
||||||
upgrade: future::err(err),
|
upgrade: future::err(err),
|
||||||
listen_addr,
|
local_addr,
|
||||||
remote_addr
|
remote_addr
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -160,7 +160,7 @@ where
|
|||||||
debug!("incoming connection on {}", self.addr);
|
debug!("incoming connection on {}", self.addr);
|
||||||
Ok(Async::Ready(Some(ListenerEvent::Upgrade {
|
Ok(Async::Ready(Some(ListenerEvent::Upgrade {
|
||||||
upgrade: future::ok(item),
|
upgrade: future::ok(item),
|
||||||
listen_addr: self.addr.clone(),
|
local_addr: self.addr.clone(),
|
||||||
remote_addr: self.addr.clone()
|
remote_addr: self.addr.clone()
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
|
@ -282,7 +282,7 @@ impl Stream for Listen {
|
|||||||
{
|
{
|
||||||
let upgrade: ffi::ConnectionEvent = upgrade.into();
|
let upgrade: ffi::ConnectionEvent = upgrade.into();
|
||||||
self.pending_events.push_back(ListenerEvent::Upgrade {
|
self.pending_events.push_back(ListenerEvent::Upgrade {
|
||||||
listen_addr: upgrade.local_addr().parse()?,
|
local_addr: upgrade.local_addr().parse()?,
|
||||||
remote_addr: upgrade.observed_addr().parse()?,
|
remote_addr: upgrade.observed_addr().parse()?,
|
||||||
upgrade: futures::future::ok(Connection::new(upgrade.connection())),
|
upgrade: futures::future::ok(Connection::new(upgrade.connection())),
|
||||||
});
|
});
|
||||||
|
@ -152,8 +152,8 @@ where
|
|||||||
a = a.with(proto.clone());
|
a = a.with(proto.clone());
|
||||||
ListenerEvent::AddressExpired(a)
|
ListenerEvent::AddressExpired(a)
|
||||||
}
|
}
|
||||||
ListenerEvent::Upgrade { upgrade, mut listen_addr, mut remote_addr } => {
|
ListenerEvent::Upgrade { upgrade, mut local_addr, mut remote_addr } => {
|
||||||
listen_addr = listen_addr.with(proto.clone());
|
local_addr = local_addr.with(proto.clone());
|
||||||
remote_addr = remote_addr.with(proto.clone());
|
remote_addr = remote_addr.with(proto.clone());
|
||||||
let remote1 = remote_addr.clone(); // used for logging
|
let remote1 = remote_addr.clone(); // used for logging
|
||||||
let remote2 = remote_addr.clone(); // used for logging
|
let remote2 = remote_addr.clone(); // used for logging
|
||||||
@ -206,7 +206,7 @@ where
|
|||||||
});
|
});
|
||||||
ListenerEvent::Upgrade {
|
ListenerEvent::Upgrade {
|
||||||
upgrade: Box::new(upgraded) as Box<dyn Future<Item = _, Error = _> + Send>,
|
upgrade: Box::new(upgraded) as Box<dyn Future<Item = _, Error = _> + Send>,
|
||||||
listen_addr,
|
local_addr,
|
||||||
remote_addr
|
remote_addr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user