Have Transport::Listeners produce ListenerEvents. (#1032)

Replace the listener and address pair returned from `Transport::listen_on` with just a listener that produces `ListenerEvent` values which include upgrades as well as address changes.
This commit is contained in:
Toralf Wittner
2019-04-10 10:29:21 +02:00
committed by GitHub
parent 98b2517403
commit 6917b8f543
36 changed files with 935 additions and 701 deletions

View File

@ -20,8 +20,9 @@
//! Manage listening on multiple multiaddresses at once.
use crate::{Multiaddr, Transport, transport::TransportError};
use crate::{Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
use futures::prelude::*;
use smallvec::SmallVec;
use std::{collections::VecDeque, fmt};
use void::Void;
@ -49,14 +50,17 @@ use void::Void;
/// // Ask the `listeners` to start listening on the given multiaddress.
/// listeners.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
///
/// // You can retrieve the list of active listeners with `listeners()`.
/// println!("Listening on: {:?}", listeners.listeners().collect::<Vec<_>>());
///
/// // The `listeners` will now generate events when polled.
/// let future = listeners.for_each(move |event| {
/// match event {
/// ListenersEvent::Closed { listen_addr, listener, result } => {
/// println!("Listener {} has been closed: {:?}", listen_addr, result);
/// ListenersEvent::NewAddress { listen_addr } => {
/// println!("Listener is listening at address {}", listen_addr);
/// },
/// ListenersEvent::AddressExpired { listen_addr } => {
/// println!("Listener is no longer listening at address {}", listen_addr);
/// },
/// ListenersEvent::Closed { result, .. } => {
/// println!("Listener has been closed: {:?}", result);
/// },
/// ListenersEvent::Incoming { upgrade, listen_addr, .. } => {
/// println!("A connection has arrived on {}", listen_addr);
@ -79,7 +83,7 @@ where
/// Transport used to spawn listeners.
transport: TTrans,
/// All the active listeners.
listeners: VecDeque<Listener<TTrans>>,
listeners: VecDeque<Listener<TTrans>>
}
/// A single active listener.
@ -90,8 +94,8 @@ where
{
/// The object that actually listens.
listener: TTrans::Listener,
/// Address it is listening on.
address: Multiaddr,
/// Addresses it is listening on.
addresses: SmallVec<[Multiaddr; 4]>
}
/// Event that can happen on the `ListenersStream`.
@ -99,6 +103,16 @@ pub enum ListenersEvent<TTrans>
where
TTrans: Transport,
{
/// A new address is being listened on.
NewAddress {
/// The new address that is being listened on.
listen_addr: Multiaddr
},
/// An address is no longer being listened on.
AddressExpired {
/// The new address that is being listened on.
listen_addr: Multiaddr
},
/// A connection is incoming on one of the listeners.
Incoming {
/// The produced upgrade.
@ -108,11 +122,8 @@ where
/// Address used to send back data to the incoming client.
send_back_addr: Multiaddr,
},
/// A listener has closed, either gracefully or with an error.
Closed {
/// Address of the listener which closed.
listen_addr: Multiaddr,
/// The listener that closed.
listener: TTrans::Listener,
/// The error that happened. `Ok` if gracefully closed.
@ -129,7 +140,7 @@ where
pub fn new(transport: TTrans) -> Self {
ListenersStream {
transport,
listeners: VecDeque::new(),
listeners: VecDeque::new()
}
}
@ -139,40 +150,30 @@ where
pub fn with_capacity(transport: TTrans, capacity: usize) -> Self {
ListenersStream {
transport,
listeners: VecDeque::with_capacity(capacity),
listeners: VecDeque::with_capacity(capacity)
}
}
/// Start listening on a multiaddress.
///
/// Returns an error if the transport doesn't support the given multiaddress.
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<Multiaddr, TransportError<TTrans::Error>>
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<(), TransportError<TTrans::Error>>
where
TTrans: Clone,
{
let (listener, new_addr) = self
.transport
.clone()
.listen_on(addr)?;
self.listeners.push_back(Listener {
listener,
address: new_addr.clone(),
});
Ok(new_addr)
let listener = self.transport.clone().listen_on(addr)?;
self.listeners.push_back(Listener { listener, addresses: SmallVec::new() });
Ok(())
}
/// Returns the transport passed when building this object.
#[inline]
pub fn transport(&self) -> &TTrans {
&self.transport
}
/// Returns an iterator that produces the list of addresses we're listening on.
#[inline]
pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
self.listeners.iter().map(|l| &l.address)
pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
self.listeners.iter().flat_map(|l| l.addresses.iter())
}
/// Provides an API similar to `Stream`, except that it cannot error.
@ -186,28 +187,38 @@ where
remaining -= 1;
if remaining == 0 { break }
}
Ok(Async::Ready(Some((upgrade, send_back_addr)))) => {
let listen_addr = listener.address.clone();
Ok(Async::Ready(Some(ListenerEvent::Upgrade { upgrade, listen_addr, remote_addr }))) => {
debug_assert!(listener.addresses.contains(&listen_addr));
self.listeners.push_front(listener);
return Async::Ready(ListenersEvent::Incoming {
upgrade,
listen_addr,
send_back_addr,
});
send_back_addr: remote_addr
})
}
Ok(Async::Ready(Some(ListenerEvent::NewAddress(a)))) => {
if !listener.addresses.contains(&a) {
listener.addresses.push(a.clone());
}
self.listeners.push_front(listener);
return Async::Ready(ListenersEvent::NewAddress { listen_addr: a })
}
Ok(Async::Ready(Some(ListenerEvent::AddressExpired(a)))) => {
listener.addresses.retain(|x| x != &a);
self.listeners.push_front(listener);
return Async::Ready(ListenersEvent::AddressExpired { listen_addr: a })
}
Ok(Async::Ready(None)) => {
return Async::Ready(ListenersEvent::Closed {
listen_addr: listener.address,
listener: listener.listener,
result: Ok(()),
});
})
}
Err(err) => {
return Async::Ready(ListenersEvent::Closed {
listen_addr: listener.address,
listener: listener.listener,
result: Err(err),
});
})
}
}
}
@ -237,7 +248,7 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("ListenersStream")
.field("transport", &self.transport)
.field("listeners", &self.listeners().collect::<Vec<_>>())
.field("listen_addrs", &self.listen_addrs().collect::<Vec<_>>())
.finish()
}
}
@ -249,19 +260,20 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
ListenersEvent::Incoming {
ref listen_addr, ..
} => f
ListenersEvent::NewAddress { listen_addr } => f
.debug_struct("ListenersEvent::NewAddress")
.field("listen_addr", listen_addr)
.finish(),
ListenersEvent::AddressExpired { listen_addr } => f
.debug_struct("ListenersEvent::AddressExpired")
.field("listen_addr", listen_addr)
.finish(),
ListenersEvent::Incoming { listen_addr, .. } => f
.debug_struct("ListenersEvent::Incoming")
.field("listen_addr", listen_addr)
.finish(),
ListenersEvent::Closed {
ref listen_addr,
ref result,
..
} => f
ListenersEvent::Closed { result, .. } => f
.debug_struct("ListenersEvent::Closed")
.field("listen_addr", listen_addr)
.field("result", result)
.finish(),
}
@ -271,42 +283,29 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::transport;
use crate::transport::{self, ListenerEvent};
use assert_matches::assert_matches;
use tokio::runtime::current_thread::Runtime;
use std::io;
use std::{io, iter::FromIterator};
use futures::{future::{self}, stream};
use crate::tests::dummy_transport::{DummyTransport, ListenerState};
use crate::tests::dummy_muxer::DummyMuxer;
use crate::PeerId;
fn set_listener_state(ls: &mut ListenersStream<DummyTransport>, idx: usize, state: ListenerState) {
let l = &mut ls.listeners[idx];
l.listener =
match state {
ListenerState::Error => {
let stream = stream::poll_fn(|| future::err(io::Error::new(io::ErrorKind::Other, "oh noes")).poll() );
Box::new(stream)
}
ListenerState::Ok(r#async) => {
match r#async {
Async::NotReady => {
let stream = stream::poll_fn(|| Ok(Async::NotReady));
Box::new(stream)
}
Async::Ready(Some(tup)) => {
let addr = l.address.clone();
let stream = stream::poll_fn(move || Ok( Async::Ready(Some(tup.clone())) ))
.map(move |stream| (future::ok(stream), addr.clone()));
Box::new(stream)
}
Async::Ready(None) => {
let stream = stream::empty();
Box::new(stream)
}
}
}
};
ls.listeners[idx].listener = match state {
ListenerState::Error =>
Box::new(stream::poll_fn(|| Err(io::Error::new(io::ErrorKind::Other, "oh noes")))),
ListenerState::Ok(state) => match state {
Async::NotReady => Box::new(stream::poll_fn(|| Ok(Async::NotReady))),
Async::Ready(Some(event)) => Box::new(stream::poll_fn(move || {
Ok(Async::Ready(Some(event.clone().map(future::ok))))
})),
Async::Ready(None) => Box::new(stream::empty())
}
ListenerState::Events(events) =>
Box::new(stream::iter_ok(events.into_iter().map(|e| e.map(future::ok))))
};
}
#[test]
@ -314,18 +313,27 @@ mod tests {
let mem_transport = transport::MemoryTransport::default();
let mut listeners = ListenersStream::new(mem_transport);
let actual_addr = listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
let dial = mem_transport.dial(actual_addr.clone()).unwrap();
let address = {
let event = listeners.by_ref().wait().next().expect("some event").expect("no error");
if let ListenersEvent::NewAddress { listen_addr, .. } = event {
listen_addr
} else {
panic!("Was expecting the listen address to be reported")
}
};
let dial = mem_transport.dial(address.clone()).unwrap();
let future = listeners
.into_future()
.map_err(|(err, _)| err)
.and_then(|(event, _)| {
match event {
Some(ListenersEvent::Incoming { listen_addr, upgrade, send_back_addr }) => {
assert_eq!(listen_addr, actual_addr);
assert_eq!(send_back_addr, actual_addr);
Some(ListenersEvent::Incoming { listen_addr, upgrade, send_back_addr, .. }) => {
assert_eq!(listen_addr, address);
assert_eq!(send_back_addr, address);
upgrade.map(|_| ()).map_err(|_| panic!())
},
_ => panic!()
@ -348,17 +356,24 @@ mod tests {
#[test]
fn listener_stream_can_iterate_over_listeners() {
let t = DummyTransport::new();
let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = "/ip4/127.0.0.1/tcp/4321".parse::<Multiaddr>().expect("bad multiaddr");
let expected_addrs = vec![addr1.to_string(), addr2.to_string()];
let mut t = DummyTransport::new();
let addr1 = tcp4([127, 0, 0, 1], 1234);
let addr2 = tcp4([127, 0, 0, 1], 4321);
t.set_initial_listener_state(ListenerState::Events(vec![
ListenerEvent::NewAddress(addr1.clone()),
ListenerEvent::NewAddress(addr2.clone())
]));
let mut ls = ListenersStream::new(t);
ls.listen_on(addr1).expect("listen_on failed");
ls.listen_on(addr2).expect("listen_on failed");
ls.listen_on(tcp4([0, 0, 0, 0], 0)).expect("listen_on");
let listener_addrs = ls.listeners().map(|ma| ma.to_string() ).collect::<Vec<String>>();
assert_eq!(listener_addrs, expected_addrs);
assert_matches!(ls.by_ref().wait().next(), Some(Ok(ListenersEvent::NewAddress { listen_addr, .. })) => {
assert_eq!(addr1, listen_addr)
});
assert_matches!(ls.by_ref().wait().next(), Some(Ok(ListenersEvent::NewAddress { listen_addr, .. })) => {
assert_eq!(addr2, listen_addr)
})
}
#[test]
@ -371,7 +386,7 @@ mod tests {
#[test]
fn listener_stream_poll_with_listeners_that_arent_ready_is_not_ready() {
let t = DummyTransport::new();
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr = tcp4([127, 0, 0, 1], 1234);
let mut ls = ListenersStream::new(t);
ls.listen_on(addr).expect("listen_on failed");
set_listener_state(&mut ls, 0, ListenerState::Ok(Async::NotReady));
@ -385,54 +400,74 @@ mod tests {
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
let expected_output = (peer_id.clone(), muxer.clone());
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id, muxer) ))));
t.set_initial_listener_state(ListenerState::Events(vec![
ListenerEvent::NewAddress(tcp4([127, 0, 0, 1], 9090)),
ListenerEvent::Upgrade {
upgrade: (peer_id.clone(), muxer.clone()),
listen_addr: tcp4([127, 0, 0, 1], 9090),
remote_addr: tcp4([127, 0, 0, 1], 32000)
},
ListenerEvent::Upgrade {
upgrade: (peer_id.clone(), muxer.clone()),
listen_addr: tcp4([127, 0, 0, 1], 9090),
remote_addr: tcp4([127, 0, 0, 1], 32000)
},
ListenerEvent::Upgrade {
upgrade: (peer_id.clone(), muxer.clone()),
listen_addr: tcp4([127, 0, 0, 1], 9090),
remote_addr: tcp4([127, 0, 0, 1], 32000)
}
]));
let mut ls = ListenersStream::new(t);
let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = "/ip4/127.0.0.2/tcp/4321".parse::<Multiaddr>().expect("bad multiaddr");
ls.listen_on(addr1).expect("listen_on works");
ls.listen_on(addr2).expect("listen_on works");
ls.listen_on(tcp4([127, 0, 0, 1], 1234)).expect("listen_on");
ls.listen_on(tcp4([127, 0, 0, 1], 4321)).expect("listen_on");
assert_eq!(ls.listeners.len(), 2);
assert_matches!(ls.poll(), Async::Ready(listeners_event) => {
assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.2/tcp/4321");
assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => {
assert_eq!(tup, expected_output)
assert_matches!(ls.by_ref().wait().next(), Some(Ok(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::NewAddress { .. })
});
assert_matches!(ls.by_ref().wait().next(), Some(Ok(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::NewAddress { .. })
});
assert_matches!(ls.by_ref().wait().next(), Some(Ok(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::Incoming { upgrade, .. } => {
assert_matches!(upgrade.wait(), Ok(output) => {
assert_eq!(output, expected_output)
});
})
});
assert_matches!(ls.poll(), Async::Ready(listeners_event) => {
assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.1/tcp/1234");
assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => {
assert_eq!(tup, expected_output)
assert_matches!(ls.by_ref().wait().next(), Some(Ok(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::Incoming { upgrade, .. } => {
assert_matches!(upgrade.wait(), Ok(output) => {
assert_eq!(output, expected_output)
});
})
});
set_listener_state(&mut ls, 1, ListenerState::Ok(Async::NotReady));
assert_matches!(ls.poll(), Async::Ready(listeners_event) => {
assert_matches!(listeners_event, ListenersEvent::Incoming{mut upgrade, listen_addr, ..} => {
assert_eq!(listen_addr.to_string(), "/ip4/127.0.0.1/tcp/1234");
assert_matches!(upgrade.poll().unwrap(), Async::Ready(tup) => {
assert_eq!(tup, expected_output)
assert_matches!(ls.by_ref().wait().next(), Some(Ok(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::Incoming { upgrade, .. } => {
assert_matches!(upgrade.wait(), Ok(output) => {
assert_eq!(output, expected_output)
});
})
});
}
#[test]
fn listener_stream_poll_with_closed_listener_emits_closed_event() {
let t = DummyTransport::new();
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr = tcp4([127, 0, 0, 1], 1234);
let mut ls = ListenersStream::new(t);
ls.listen_on(addr).expect("listen_on failed");
set_listener_state(&mut ls, 0, ListenerState::Ok(Async::Ready(None)));
assert_matches!(ls.poll(), Async::Ready(listeners_event) => {
assert_matches!(ls.by_ref().wait().next(), Some(Ok(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::Closed{..})
});
assert_eq!(ls.listeners.len(), 0); // it's gone
@ -443,92 +478,25 @@ mod tests {
let mut t = DummyTransport::new();
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id, muxer) ))));
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let event = ListenerEvent::Upgrade {
upgrade: (peer_id, muxer),
listen_addr: tcp4([127, 0, 0, 1], 1234),
remote_addr: tcp4([127, 0, 0, 1], 32000)
};
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some(event))));
let addr = tcp4([127, 0, 0, 1], 1234);
let mut ls = ListenersStream::new(t);
ls.listen_on(addr).expect("listen_on failed");
set_listener_state(&mut ls, 0, ListenerState::Error); // simulate an error on the socket
assert_matches!(ls.poll(), Async::Ready(listeners_event) => {
assert_matches!(ls.by_ref().wait().next(), Some(Ok(listeners_event)) => {
assert_matches!(listeners_event, ListenersEvent::Closed{..})
});
assert_eq!(ls.listeners.len(), 0); // it's gone
}
#[test]
fn listener_stream_poll_chatty_listeners_each_get_their_turn() {
let mut t = DummyTransport::new();
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id.clone(), muxer) )))); let mut ls = ListenersStream::new(t);
// Create 4 Listeners
for n in 0..4 {
let addr = format!("/ip4/127.0.0.{}/tcp/{}", n, n).parse::<Multiaddr>().expect("bad multiaddr");
ls.listen_on(addr).expect("listen_on failed");
}
// Poll() processes listeners in reverse order. Each listener is polled
// in turn.
for n in (0..4).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}
// Doing it again yields them in the same order
for n in (0..4).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}
// Make last listener NotReady; it will become the first element and
// retried after trying the other Listeners.
set_listener_state(&mut ls, 3, ListenerState::Ok(Async::NotReady));
for n in (0..3).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}
for n in (0..3).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}
// Turning the last listener back on means we now have 4 "good"
// listeners, and each get their turn.
set_listener_state(
&mut ls, 3,
ListenerState::Ok(Async::Ready(Some( (peer_id, DummyMuxer::new()) )))
);
for n in (0..4).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n))
})
}
}
#[test]
fn listener_stream_poll_processes_listeners_in_turn() {
let mut t = DummyTransport::new();
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
t.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some( (peer_id, muxer) ))));
let mut ls = ListenersStream::new(t);
for n in 0..4 {
let addr = format!("/ip4/127.0.0.{}/tcp/{}", n, n).parse::<Multiaddr>().expect("bad multiaddr");
ls.listen_on(addr).expect("listen_on failed");
}
for n in (0..4).rev() {
assert_matches!(ls.poll(), Async::Ready(ListenersEvent::Incoming{listen_addr, ..}) => {
assert_eq!(listen_addr.to_string(), format!("/ip4/127.0.0.{}/tcp/{}", n, n));
});
set_listener_state(&mut ls, 0, ListenerState::Ok(Async::NotReady));
}
// All Listeners are NotReady, so poll yields NotReady
assert_matches!(ls.poll(), Async::NotReady);
fn tcp4(ip: [u8; 4], port: u16) -> Multiaddr {
let protos = std::iter::once(multiaddr::Protocol::Ip4(ip.into()))
.chain(std::iter::once(multiaddr::Protocol::Tcp(port)));
Multiaddr::from_iter(protos)
}
}

View File

@ -138,14 +138,24 @@ where
{
/// One of the listeners gracefully closed.
ListenerClosed {
/// Address of the listener which closed.
listen_addr: Multiaddr,
/// The listener which closed.
listener: TTrans::Listener,
/// The error that happened. `Ok` if gracefully closed.
result: Result<(), <TTrans::Listener as Stream>::Error>,
},
/// One of the listeners is now listening on an additional address.
NewListenerAddress {
/// The new address the listener is now also listening on.
listen_addr: Multiaddr
},
/// One of the listeners is no longer listening on some address.
ExpiredListenerAddress {
/// The expired address.
listen_addr: Multiaddr
},
/// A new connection arrived on a listener.
IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo, TPeerId>),
@ -154,7 +164,7 @@ where
/// This can include, for example, an error during the handshake of the encryption layer, or
/// the connection unexpectedly closed.
IncomingConnectionError {
/// Address of the listener which received the connection.
/// The address of the listener which received the connection.
listen_addr: Multiaddr,
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
@ -244,19 +254,32 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match *self {
RawSwarmEvent::ListenerClosed { ref listen_addr, ref result, .. } => {
f.debug_struct("ListenerClosed")
RawSwarmEvent::NewListenerAddress { ref listen_addr } => {
f.debug_struct("NewListenerAddress")
.field("listen_addr", listen_addr)
.finish()
}
RawSwarmEvent::ExpiredListenerAddress { ref listen_addr } => {
f.debug_struct("ExpiredListenerAddress")
.field("listen_addr", listen_addr)
.finish()
}
RawSwarmEvent::ListenerClosed { ref result, .. } => {
f.debug_struct("ListenerClosed")
.field("result", result)
.finish()
}
RawSwarmEvent::IncomingConnection( IncomingConnectionEvent { ref listen_addr, ref send_back_addr, .. } ) => {
RawSwarmEvent::IncomingConnection(ref event) => {
f.debug_struct("IncomingConnection")
.field("listen_addr", listen_addr)
.field("send_back_addr", send_back_addr)
.field("listen_addr", &event.listen_addr)
.field("send_back_addr", &event.send_back_addr)
.finish()
}
RawSwarmEvent::IncomingConnectionError { ref listen_addr, ref send_back_addr, ref error} => {
RawSwarmEvent::IncomingConnectionError {
ref listen_addr,
ref send_back_addr,
ref error
} => {
f.debug_struct("IncomingConnectionError")
.field("listen_addr", listen_addr)
.field("send_back_addr", send_back_addr)
@ -493,7 +516,7 @@ where TTrans: Transport
upgrade: TTrans::ListenerUpgrade,
/// PeerId of the local node.
local_peer_id: TPeerId,
/// Address of the listener which received the connection.
/// Addresses of the listener which received the connection.
listen_addr: Multiaddr,
/// Address used to send back data to the remote.
send_back_addr: Multiaddr,
@ -647,7 +670,7 @@ impl ConnectedPoint {
/// Information about an incoming connection currently being negotiated.
#[derive(Debug, Copy, Clone)]
pub struct IncomingInfo<'a> {
/// Address of the listener that received the connection.
/// Listener address that received the connection.
pub listen_addr: &'a Multiaddr,
/// Stack of protocols used to send back data to the remote.
pub send_back_addr: &'a Multiaddr,
@ -719,14 +742,13 @@ where
/// Start listening on the given multiaddress.
#[inline]
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<Multiaddr, TransportError<TTrans::Error>> {
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<(), TransportError<TTrans::Error>> {
self.listeners.listen_on(addr)
}
/// Returns an iterator that produces the list of addresses we're listening on.
#[inline]
pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
self.listeners.listeners()
/// Returns an iterator that produces the list of addresses we are listening on.
pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
self.listeners.listen_addrs()
}
/// Returns limit on incoming connections.
@ -743,14 +765,13 @@ where
///
/// For each listener, calls `nat_traversal` with the observed address and returns the outcome.
#[inline]
pub fn nat_traversal<'a>(
&'a self,
observed_addr: &'a Multiaddr,
) -> impl Iterator<Item = Multiaddr> + 'a
where TMuxer: 'a,
THandler: 'a,
pub fn nat_traversal<'a>(&'a self, observed_addr: &'a Multiaddr)
-> impl Iterator<Item = Multiaddr> + 'a
where
TMuxer: 'a,
THandler: 'a,
{
self.listeners()
self.listen_addrs()
.flat_map(move |server| self.transport().nat_traversal(server, observed_addr))
}
@ -819,10 +840,7 @@ where
.filter_map(|&(_, ref endpoint)| {
match endpoint {
ConnectedPoint::Listener { listen_addr, send_back_addr } => {
Some(IncomingInfo {
listen_addr,
send_back_addr,
})
Some(IncomingInfo { listen_addr, send_back_addr })
},
ConnectedPoint::Dialer { .. } => None,
}
@ -986,30 +1004,26 @@ where
_ => {
match self.listeners.poll() {
Async::NotReady => (),
Async::Ready(ListenersEvent::Incoming {
upgrade, listen_addr, send_back_addr }) =>
{
Async::Ready(ListenersEvent::Incoming { upgrade, listen_addr, send_back_addr }) => {
let event = IncomingConnectionEvent {
upgrade,
local_peer_id:
self.reach_attempts.local_peer_id.clone(),
local_peer_id: self.reach_attempts.local_peer_id.clone(),
listen_addr,
send_back_addr,
active_nodes: &mut self.active_nodes,
other_reach_attempts: &mut self.reach_attempts.other_reach_attempts,
};
return Async::Ready(RawSwarmEvent::IncomingConnection(event));
},
Async::Ready(ListenersEvent::Closed {
listen_addr, listener, result }) =>
{
return Async::Ready(RawSwarmEvent::ListenerClosed {
listen_addr,
listener,
result,
});
}
Async::Ready(ListenersEvent::NewAddress { listen_addr }) => {
return Async::Ready(RawSwarmEvent::NewListenerAddress { listen_addr })
}
Async::Ready(ListenersEvent::AddressExpired { listen_addr }) => {
return Async::Ready(RawSwarmEvent::ExpiredListenerAddress { listen_addr })
}
Async::Ready(ListenersEvent::Closed { listener, result }) => {
return Async::Ready(RawSwarmEvent::ListenerClosed { listener, result })
}
}
}
}

View File

@ -26,6 +26,7 @@ use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent};
use crate::tests::dummy_transport::ListenerState;
use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
use crate::nodes::NodeHandlerEvent;
use crate::transport::ListenerEvent;
use assert_matches::assert_matches;
use parking_lot::Mutex;
use std::sync::Arc;
@ -39,17 +40,6 @@ fn query_transport() {
assert_eq!(raw_swarm.transport(), &transport2);
}
#[test]
fn starts_listening() {
let mut raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(DummyTransport::new(), PeerId::random());
let addr = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let addr2 = addr.clone();
assert!(raw_swarm.listen_on(addr).is_ok());
let listeners = raw_swarm.listeners().collect::<Vec<&Multiaddr>>();
assert_eq!(listeners.len(), 1);
assert_eq!(listeners[0], &addr2);
}
#[test]
fn local_node_peer() {
let peer_id = PeerId::random();
@ -59,9 +49,6 @@ fn local_node_peer() {
#[test]
fn nat_traversal_transforms_the_observed_address_according_to_the_transport_used() {
// the DummyTransport nat_traversal increments the port number by one for Ip4 addresses
let transport = DummyTransport::new();
let mut raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random());
let addr1 = "/ip4/127.0.0.1/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
// An unrelated outside address is returned as-is, no transform
let outside_addr1 = "/memory/0".parse::<Multiaddr>().expect("bad multiaddr");
@ -69,8 +56,26 @@ fn nat_traversal_transforms_the_observed_address_according_to_the_transport_used
let addr2 = "/ip4/127.0.0.2/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
let outside_addr2 = "/ip4/127.0.0.2/tcp/1234".parse::<Multiaddr>().expect("bad multiaddr");
raw_swarm.listen_on(addr1).unwrap();
raw_swarm.listen_on(addr2).unwrap();
// the DummyTransport nat_traversal increments the port number by one for Ip4 addresses
let mut transport = DummyTransport::new();
let events = vec![
ListenerEvent::NewAddress(addr1.clone()),
ListenerEvent::NewAddress(addr2.clone())
];
transport.set_initial_listener_state(ListenerState::Events(events));
let mut raw_swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random());
raw_swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
let raw_swarm =
future::lazy(move || {
assert_matches!(raw_swarm.poll(), Async::Ready(RawSwarmEvent::NewListenerAddress {..}));
assert_matches!(raw_swarm.poll(), Async::Ready(RawSwarmEvent::NewListenerAddress {..}));
Ok::<_, void::Void>(raw_swarm)
})
.wait()
.unwrap();
let natted = raw_swarm
.nat_traversal(&outside_addr1)
@ -124,8 +129,15 @@ fn num_incoming_negotiated() {
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
// Set up listener to see an incoming connection
transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(Some((peer_id, muxer)))));
let events = vec![
ListenerEvent::NewAddress("/ip4/127.0.0.1/tcp/1234".parse().unwrap()),
ListenerEvent::Upgrade {
upgrade: (peer_id.clone(), muxer.clone()),
listen_addr: "/ip4/127.0.0.1/tcp/1234".parse().unwrap(),
remote_addr: "/ip4/127.0.0.1/tcp/32111".parse().unwrap()
}
];
transport.set_initial_listener_state(ListenerState::Events(events));
let mut swarm = RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random());
swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
@ -138,10 +150,10 @@ fn num_incoming_negotiated() {
let swarm_fut = swarm.clone();
let fut = future::poll_fn(move || -> Poll<_, ()> {
let mut swarm_fut = swarm_fut.lock();
assert_matches!(swarm_fut.poll(), Async::Ready(RawSwarmEvent::NewListenerAddress {..}));
assert_matches!(swarm_fut.poll(), Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => {
incoming.accept(Handler::default());
});
Ok(Async::Ready(()))
});
rt.block_on(fut).expect("tokio works");
@ -407,10 +419,18 @@ fn limit_incoming_connections() {
let peer_id = PeerId::random();
let muxer = DummyMuxer::new();
let limit = 1;
transport.set_initial_listener_state(ListenerState::Ok(Async::Ready(
Some((peer_id, muxer)))));
let mut swarm = RawSwarm::<_, _, _, Handler, _>::new_with_incoming_limit(
transport, PeerId::random(), Some(limit));
let mut events = vec![ListenerEvent::NewAddress("/ip4/127.0.0.1/tcp/1234".parse().unwrap())];
events.extend(std::iter::repeat(
ListenerEvent::Upgrade {
upgrade: (peer_id.clone(), muxer.clone()),
listen_addr: "/ip4/127.0.0.1/tcp/1234".parse().unwrap(),
remote_addr: "/ip4/127.0.0.1/tcp/32111".parse().unwrap()
}
).take(10));
transport.set_initial_listener_state(ListenerState::Events(events));
let mut swarm = RawSwarm::<_, _, _, Handler, _>::new_with_incoming_limit(transport, PeerId::random(), Some(limit));
assert_eq!(swarm.incoming_limit(), Some(limit));
swarm.listen_on("/memory/0".parse().unwrap()).unwrap();
assert_eq!(swarm.incoming_negotiated().count(), 0);
@ -422,6 +442,7 @@ fn limit_incoming_connections() {
let fut = future::poll_fn(move || -> Poll<_, ()> {
let mut swarm_fut = swarm_fut.lock();
if i <= limit {
assert_matches!(swarm_fut.poll(), Async::Ready(RawSwarmEvent::NewListenerAddress {..}));
assert_matches!(swarm_fut.poll(),
Async::Ready(RawSwarmEvent::IncomingConnection(incoming)) => {
incoming.accept(Handler::default());
@ -431,9 +452,11 @@ fn limit_incoming_connections() {
Async::NotReady => (),
Async::Ready(x) => {
match x {
RawSwarmEvent::IncomingConnection(_) => (),
RawSwarmEvent::Connected { .. } => (),
_ => { panic!("Not expected event") },
RawSwarmEvent::NewListenerAddress {..} => {}
RawSwarmEvent::ExpiredListenerAddress {..} => {}
RawSwarmEvent::IncomingConnection(_) => {}
RawSwarmEvent::Connected {..} => {}
e => panic!("Not expected event: {:?}", e)
}
},
}