Full support for multiple connections per peer in libp2p-swarm. (#1519)

* [libp2p-swarm] Make the multiple connections per peer first-class.

This commit makes the notion of multiple connections per peer
first-class in the API of libp2p-swarm, introducing the new
callbacks `inject_connection_established` and
`inject_connection_closed`. The `endpoint` parameter from
`inject_connected` and `inject_disconnected` is removed,
since the first connection to open may not be the last
connection to close, i.e. it cannot be guaranteed,
as was previously the case, that the endpoints passed
to these callbacks match up.

* Have identify track all addresses.

So that identify requests can be answered with the correct
observed address of the connection on which the request
arrives.

* Cleanup

* Cleanup

* Improve the `Peer` state API.

* Remove connection ID from `SwarmEvent::Dialing`.

* Mark `DialPeerCondition` non-exhaustive.

* Re-encapsulate `NetworkConfig`.

To retain the possibility of not re-exposing all
network configuration choices, thereby providing
a more convenient API on the \`SwarmBuilder\`.

* Rework Swarm::dial API.

* Update CHANGELOG.

* Doc formatting tweaks.
This commit is contained in:
Roman Borschel
2020-03-31 15:41:13 +02:00
committed by GitHub
parent bc455038fc
commit be970466b3
18 changed files with 491 additions and 239 deletions

View File

@ -72,18 +72,34 @@ pub trait NetworkBehaviour: Send + 'static {
/// address should be the most likely to be reachable.
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr>;
/// Indicates the behaviour that we connected to the node with the given peer id through the
/// given endpoint.
/// Indicates the behaviour that we connected to the node with the given peer id.
///
/// This node now has a handler (as spawned by `new_handler`) running in the background.
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint);
///
/// This method is only called when the connection to the peer is
/// established, preceded by `inject_connection_established`.
fn inject_connected(&mut self, peer_id: &PeerId);
/// Indicates the behaviour that we disconnected from the node with the given peer id. The
/// endpoint is the one we used to be connected to.
/// Indicates the behaviour that we disconnected from the node with the given peer id.
///
/// There is no handler running anymore for this node. Any event that has been sent to it may
/// or may not have been processed by the handler.
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint);
///
/// This method is only called when the last established connection to the peer
/// is closed, preceded by `inject_connection_closed`.
fn inject_disconnected(&mut self, peer_id: &PeerId);
/// Informs the behaviour about a newly established connection to a peer.
fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint)
{}
/// Informs the behaviour about a closed connection to a peer.
///
/// A call to this method is always paired with an earlier call to
/// `inject_connection_established` with the same peer ID, connection ID and
/// endpoint.
fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint)
{}
/// Informs the behaviour about an event generated by the handler dedicated to the peer identified by `peer_id`.
/// for the behaviour.
@ -204,6 +220,8 @@ pub enum NetworkBehaviourAction<TInEvent, TOutEvent> {
DialPeer {
/// The peer to try reach.
peer_id: PeerId,
/// The condition for initiating a new dialing attempt.
condition: DialPeerCondition,
},
/// Instructs the `Swarm` to send an event to the handler dedicated to a
@ -253,3 +271,37 @@ pub enum NotifyHandler {
All
}
/// The available conditions under which a new dialing attempt to
/// a peer is initiated when requested by [`NetworkBehaviourAction::DialPeer`].
#[derive(Debug, Copy, Clone)]
#[non_exhaustive]
pub enum DialPeerCondition {
/// A new dialing attempt is initiated _only if_ the peer is currently
/// considered disconnected, i.e. there is no established connection
/// and no ongoing dialing attempt.
///
/// If there is an ongoing dialing attempt, the addresses reported by
/// [`NetworkBehaviour::addresses_of_peer`] are added to the ongoing
/// dialing attempt, ignoring duplicates.
Disconnected,
/// A new dialing attempt is initiated _only if_ there is currently
/// no ongoing dialing attempt, i.e. the peer is either considered
/// disconnected or connected but without an ongoing dialing attempt.
///
/// If there is an ongoing dialing attempt, the addresses reported by
/// [`NetworkBehaviour::addresses_of_peer`] are added to the ongoing
/// dialing attempt, ignoring duplicates.
///
/// This condition implies [`DialPeerCondition::Disconnected`].
NotDialing,
// TODO: Once multiple dialing attempts per peer are permitted.
// See https://github.com/libp2p/rust-libp2p/pull/1506.
// Always,
}
impl Default for DialPeerCondition {
fn default() -> Self {
DialPeerCondition::Disconnected
}
}

View File

@ -65,7 +65,8 @@ pub use behaviour::{
NetworkBehaviourAction,
NetworkBehaviourEventProcess,
PollParameters,
NotifyHandler
NotifyHandler,
DialPeerCondition
};
pub use protocols_handler::{
IntoProtocolsHandler,
@ -89,7 +90,6 @@ use futures::{
stream::FusedStream,
};
use libp2p_core::{
ConnectedPoint,
Executor,
Transport,
Multiaddr,
@ -99,6 +99,8 @@ use libp2p_core::{
ConnectionError,
ConnectionId,
ConnectionInfo,
ConnectionLimit,
ConnectedPoint,
EstablishedConnection,
IntoConnectionHandler,
ListenerId,
@ -108,7 +110,6 @@ use libp2p_core::{
transport::{TransportError, boxed::Boxed as BoxTransport},
muxing::{StreamMuxer, StreamMuxerBox},
network::{
DialError,
Network,
NetworkInfo,
NetworkEvent,
@ -201,12 +202,6 @@ pub enum SwarmEvent<TBvEv, THandleErr> {
/// Endpoint of the connection that has been closed.
endpoint: ConnectedPoint,
},
/// Starting to try to reach the given peer.
///
/// We are trying to connect to this peer until a [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished)
/// event is reported, or a [`UnreachableAddr`](SwarmEvent::UnreachableAddr) event is reported
/// with `attempts_remaining` equal to 0.
Dialing(PeerId),
/// Tried to dial an address but it ended up being unreachaable.
UnreachableAddr {
/// `PeerId` that we were trying to reach.
@ -246,6 +241,13 @@ pub enum SwarmEvent<TBvEv, THandleErr> {
/// The listener error.
error: io::Error,
},
/// A new dialing attempt has been initiated.
///
/// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished)
/// event is reported if the dialing attempt succeeds, otherwise a
/// [`UnreachableAddr`](SwarmEvent::UnreachableAddr) event is reported
/// with `attempts_remaining` equal to 0.
Dialing(PeerId),
}
/// Contains the state of the network, plus the way it should behave.
@ -367,31 +369,65 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// Tries to dial the given address.
///
/// Returns an error if the address is not supported.
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), DialError<io::Error>> {
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> {
let handler = me.behaviour.new_handler();
me.network.dial(&addr, handler.into_node_handler_builder()).map(|_id| ())
}
/// Tries to reach the given peer using the elements in the topology.
/// Tries to initiate a dialing attempt to the given peer.
///
/// Has no effect if we are already connected to that peer, or if no address is known for the
/// peer.
pub fn dial(me: &mut Self, peer_id: PeerId) {
let addrs = me.behaviour.addresses_of_peer(&peer_id);
/// If a new dialing attempt has been initiated, `Ok(true)` is returned.
///
/// If there is an ongoing dialing attempt, the current addresses of the
/// peer, as reported by [`NetworkBehaviour::addresses_of_peer`] are added
/// to the ongoing dialing attempt, ignoring duplicates. In this case no
/// new dialing attempt is initiated.
///
/// If no new dialing attempt has been initiated, meaning there is an ongoing
/// dialing attempt or `addresses_of_peer` reports no addresses, `Ok(false)`
/// is returned.
pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result<bool, ConnectionLimit> {
let mut addrs = me.behaviour.addresses_of_peer(peer_id).into_iter();
match me.network.peer(peer_id.clone()) {
Peer::Disconnected(peer) => {
let mut addrs = addrs.into_iter();
if let Some(first) = addrs.next() {
let handler = me.behaviour.new_handler().into_node_handler_builder();
if peer.connect(first, addrs, handler).is_err() {
me.behaviour.inject_dial_failure(&peer_id);
match peer.connect(first, addrs, handler) {
Ok(_) => return Ok(true),
Err(error) => {
log::debug!(
"New dialing attempt to disconnected peer {:?} failed: {:?}.",
peer_id, error);
me.behaviour.inject_dial_failure(&peer_id);
return Err(error)
}
}
}
Ok(false)
},
Peer::Connected(peer) => {
if let Some(first) = addrs.next() {
let handler = me.behaviour.new_handler().into_node_handler_builder();
match peer.connect(first, addrs, handler) {
Ok(_) => return Ok(true),
Err(error) => {
log::debug!(
"New dialing attempt to connected peer {:?} failed: {:?}.",
peer_id, error);
me.behaviour.inject_dial_failure(&peer_id);
return Err(error)
}
}
}
Ok(false)
}
Peer::Dialing(mut peer) => {
peer.connection().add_addresses(addrs)
peer.connection().add_addresses(addrs);
Ok(false)
},
Peer::Connected(_) | Peer::Local => {}
Peer::Local => {
Err(ConnectionLimit { current: 0, limit: 0 })
}
}
}
@ -498,35 +534,29 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
peer_id,
endpoint,
});
} else if num_established.get() == 1 {
this.behaviour.inject_connected(peer_id.clone(), endpoint.clone());
return Poll::Ready(SwarmEvent::ConnectionEstablished {
peer_id,
endpoint,
num_established,
});
} else {
// For now, secondary connections are not explicitly reported to
// the behaviour. A behaviour only gets awareness of the
// connections via the events emitted from the connection handlers.
log::trace!("Secondary connection established: {:?}; Total (peer): {}.",
log::debug!("Connection established: {:?}; Total (peer): {}.",
connection.connected(), num_established);
let endpoint = connection.endpoint().clone();
this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint);
if num_established.get() == 1 {
this.behaviour.inject_connected(&peer_id);
}
return Poll::Ready(SwarmEvent::ConnectionEstablished {
peer_id,
endpoint,
num_established,
peer_id, num_established, endpoint
});
}
},
Poll::Ready(NetworkEvent::ConnectionError { connected, error, num_established }) => {
log::debug!("Connection {:?} closed by {:?}", connected, error);
let peer_id = connected.peer_id().clone();
Poll::Ready(NetworkEvent::ConnectionError { id, connected, error, num_established }) => {
log::debug!("Connection {:?} closed: {:?}", connected, error);
let info = connected.info;
let endpoint = connected.endpoint;
this.behaviour.inject_connection_closed(info.peer_id(), &id, &endpoint);
if num_established == 0 {
this.behaviour.inject_disconnected(&peer_id, endpoint.clone());
this.behaviour.inject_disconnected(info.peer_id());
}
return Poll::Ready(SwarmEvent::ConnectionClosed {
peer_id,
peer_id: info.peer_id().clone(),
endpoint,
cause: error,
num_established,
@ -663,12 +693,40 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
let _ = ExpandedSwarm::dial_addr(&mut *this, address);
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => {
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => {
if this.banned_peers.contains(&peer_id) {
this.behaviour.inject_dial_failure(&peer_id);
} else {
ExpandedSwarm::dial(&mut *this, peer_id.clone());
return Poll::Ready(SwarmEvent::Dialing(peer_id))
let result = match condition {
DialPeerCondition::Disconnected
if this.network.is_disconnected(&peer_id) =>
{
ExpandedSwarm::dial(this, &peer_id)
}
DialPeerCondition::NotDialing
if !this.network.is_dialing(&peer_id) =>
{
ExpandedSwarm::dial(this, &peer_id)
}
_ => {
log::trace!("Condition for new dialing attempt to {:?} not met: {:?}",
peer_id, condition);
if let Some(mut peer) = this.network.peer(peer_id.clone()).into_dialing() {
let addrs = this.behaviour.addresses_of_peer(peer.id());
peer.connection().add_addresses(addrs);
}
Ok(false)
}
};
match result {
Ok(false) => {},
Ok(true) => return Poll::Ready(SwarmEvent::Dialing(peer_id)),
Err(err) => {
log::debug!("Initiating dialing attempt to {:?} failed: {:?}",
&peer_id, err);
this.behaviour.inject_dial_failure(&peer_id);
}
}
}
},
Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => {
@ -922,28 +980,33 @@ impl<'a> PollParameters for SwarmPollParameters<'a> {
}
}
/// A `SwarmBuilder` provides an API for configuring and constructing a `Swarm`,
/// including the underlying [`Network`].
pub struct SwarmBuilder<TBehaviour, TConnInfo> {
local_peer_id: PeerId,
transport: BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>,
behaviour: TBehaviour,
network: NetworkConfig,
network_config: NetworkConfig,
}
impl<TBehaviour, TConnInfo> SwarmBuilder<TBehaviour, TConnInfo>
where TBehaviour: NetworkBehaviour,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
pub fn new<TTransport, TMuxer>(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self
/// Creates a new `SwarmBuilder` from the given transport, behaviour and
/// local peer ID. The `Swarm` with its underlying `Network` is obtained
/// via [`SwarmBuilder::build`].
pub fn new<TTrans, TMuxer>(transport: TTrans, behaviour: TBehaviour, local_peer_id: PeerId) -> Self
where
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone + Send + Sync + 'static,
TTransport::Error: Send + Sync + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
TTrans: Transport<Output = (TConnInfo, TMuxer)> + Clone + Send + Sync + 'static,
TTrans::Error: Send + Sync + 'static,
TTrans::Listener: Send + 'static,
TTrans::ListenerUpgrade: Send + 'static,
TTrans::Dial: Send + 'static,
{
let transport = transport
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)))
@ -954,35 +1017,41 @@ where TBehaviour: NetworkBehaviour,
local_peer_id,
transport,
behaviour,
network: NetworkConfig::default(),
network_config: Default::default(),
}
}
pub fn incoming_limit(mut self, incoming_limit: usize) -> Self {
self.network.set_pending_incoming_limit(incoming_limit);
self
}
/// Sets the executor to use to spawn background tasks.
/// Configures the `Executor` to use for spawning background tasks.
///
/// By default, uses a threads pool.
pub fn executor(mut self, executor: impl Executor + Send + 'static) -> Self {
self.network.set_executor(Box::new(executor));
/// By default, unless another executor has been configured,
/// [`SwarmBuilder::build`] will try to set up a `ThreadPool`.
pub fn executor(mut self, e: Box<dyn Executor + Send>) -> Self {
self.network_config.set_executor(e);
self
}
/// Shortcut for calling `executor` with an object that calls the given closure.
pub fn executor_fn(mut self, executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + 'static) -> Self {
struct SpawnImpl<F>(F);
impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
(self.0)(f)
}
}
self.network.set_executor(Box::new(SpawnImpl(executor)));
/// Configures a limit for the number of simultaneous incoming
/// connection attempts.
pub fn incoming_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_incoming_limit(n);
self
}
/// Configures a limit for the number of simultaneous outgoing
/// connection attempts.
pub fn outgoing_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_outgoing_limit(n);
self
}
/// Configures a limit for the number of simultaneous
/// established connections per peer.
pub fn peer_connection_limit(mut self, n: usize) -> Self {
self.network_config.set_established_per_peer_limit(n);
self
}
/// Builds a `Swarm` with the current configuration.
pub fn build(mut self) -> Swarm<TBehaviour, TConnInfo> {
let supported_protocols = self.behaviour
.new_handler()
@ -992,9 +1061,10 @@ where TBehaviour: NetworkBehaviour,
.map(|info| info.protocol_name().to_vec())
.collect();
// If no executor has been explicitly configured, try to set up
// a thread pool.
if self.network.executor().is_none() {
let mut network_cfg = self.network_config;
// If no executor has been explicitly configured, try to set up a thread pool.
if network_cfg.executor().is_none() {
struct PoolWrapper(ThreadPool);
impl Executor for PoolWrapper {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
@ -1002,21 +1072,17 @@ where TBehaviour: NetworkBehaviour,
}
}
if let Some(executor) = ThreadPoolBuilder::new()
.name_prefix("libp2p-task-")
match ThreadPoolBuilder::new()
.name_prefix("libp2p-swarm-task-")
.create()
.ok()
.map(|tp| Box::new(PoolWrapper(tp)) as Box<_>)
{
self.network.set_executor(Box::new(executor));
Ok(executor) => { network_cfg.set_executor(Box::new(executor)); },
Err(err) => log::warn!("Failed to create executor thread pool: {:?}", err)
}
}
let network = Network::new(
self.transport,
self.local_peer_id,
self.network,
);
let network = Network::new(self.transport, self.local_peer_id, network_cfg);
ExpandedSwarm {
network,
@ -1047,9 +1113,13 @@ impl NetworkBehaviour for DummyBehaviour {
Vec::new()
}
fn inject_connected(&mut self, _: PeerId, _: libp2p_core::ConnectedPoint) {}
fn inject_connected(&mut self, _: &PeerId) {}
fn inject_disconnected(&mut self, _: &PeerId, _: libp2p_core::ConnectedPoint) {}
fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId) {}
fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {}
fn inject_event(&mut self, _: PeerId, _: ConnectionId,
_: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
@ -1067,9 +1137,9 @@ impl NetworkBehaviour for DummyBehaviour {
mod tests {
use crate::{DummyBehaviour, SwarmBuilder};
use libp2p_core::{
identity,
PeerId,
PublicKey,
identity,
transport::dummy::{DummyStream, DummyTransport}
};
use libp2p_mplex::Multiplex;
@ -1084,7 +1154,8 @@ mod tests {
let transport = DummyTransport::<(PeerId, Multiplex<DummyStream>)>::new();
let behaviour = DummyBehaviour {};
let swarm = SwarmBuilder::new(transport, behaviour, id.into())
.incoming_limit(4).build();
.incoming_connection_limit(4)
.build();
assert_eq!(swarm.network.incoming_limit(), Some(4));
}

View File

@ -133,10 +133,10 @@ enum Shutdown {
/// Error generated by the `NodeHandlerWrapper`.
#[derive(Debug)]
pub enum NodeHandlerWrapperError<TErr> {
/// Error generated by the handler.
/// The connection handler encountered an error.
Handler(TErr),
/// The connection has been deemed useless and has been closed.
UselessTimeout,
/// The connection keep-alive timeout expired.
KeepAliveTimeout,
}
impl<TErr> From<TErr> for NodeHandlerWrapperError<TErr> {
@ -152,8 +152,8 @@ where
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
NodeHandlerWrapperError::Handler(err) => write!(f, "{}", err),
NodeHandlerWrapperError::UselessTimeout =>
write!(f, "Node has been closed due to inactivity"),
NodeHandlerWrapperError::KeepAliveTimeout =>
write!(f, "Connection closed due to expired keep-alive timeout."),
}
}
}
@ -165,7 +165,7 @@ where
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
NodeHandlerWrapperError::Handler(err) => Some(err),
NodeHandlerWrapperError::UselessTimeout => None,
NodeHandlerWrapperError::KeepAliveTimeout => None,
}
}
}
@ -314,9 +314,9 @@ where
if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() {
match self.shutdown {
Shutdown::None => {},
Shutdown::Asap => return Poll::Ready(Err(NodeHandlerWrapperError::UselessTimeout)),
Shutdown::Asap => return Poll::Ready(Err(NodeHandlerWrapperError::KeepAliveTimeout)),
Shutdown::Later(ref mut delay, _) => match Future::poll(Pin::new(delay), cx) {
Poll::Ready(_) => return Poll::Ready(Err(NodeHandlerWrapperError::UselessTimeout)),
Poll::Ready(_) => return Poll::Ready(Err(NodeHandlerWrapperError::KeepAliveTimeout)),
Poll::Pending => {}
}
}

View File

@ -76,15 +76,27 @@ where
self.inner.as_mut().map(|b| b.addresses_of_peer(peer_id)).unwrap_or_else(Vec::new)
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
fn inject_connected(&mut self, peer_id: &PeerId) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_connected(peer_id, endpoint)
inner.inject_connected(peer_id)
}
}
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
fn inject_disconnected(&mut self, peer_id: &PeerId) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_disconnected(peer_id, endpoint)
inner.inject_disconnected(peer_id)
}
}
fn inject_connection_established(&mut self, peer_id: &PeerId, connection: &ConnectionId, endpoint: &ConnectedPoint) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_connection_established(peer_id, connection, endpoint)
}
}
fn inject_connection_closed(&mut self, peer_id: &PeerId, connection: &ConnectionId, endpoint: &ConnectedPoint) {
if let Some(inner) = self.inner.as_mut() {
inner.inject_connection_closed(peer_id, connection, endpoint)
}
}