mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-28 17:21:34 +00:00
Fix connection & handler shutdown when using KeepAlive::Now
. (#1072)
* Fix connection & handler shutdown when using `KeepAlive::Now`. Delay::new(Instant::now()) is never immediately ready, resulting in `KeepAlive::Now` to have no effect, since the delay is re-created on every execution of `poll()` in the `NodeHandlerWrapper`. It can also send the node handler into a busy-loop, since every newly created Delay will trigger a task wakeup, which creates a new Delay with Instant::now(), and so forth. The use of `Delay::new(Instant::now())` for "immediate" connection shutdown is therefore removed here entirely. An important assumption is thereby that as long as the node handler non-empty `negotiating_in` and `negotiating_out`, the handler is not dependent on such a Delay for task wakeup. * Trigger CI.
This commit is contained in:
@ -31,7 +31,7 @@ use crate::{
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use std::{error, fmt, time::{Duration, Instant}};
|
use std::{error, fmt, time::Duration};
|
||||||
use tokio_timer::{Delay, Timeout};
|
use tokio_timer::{Delay, Timeout};
|
||||||
|
|
||||||
/// Prototype for a `NodeHandlerWrapper`.
|
/// Prototype for a `NodeHandlerWrapper`.
|
||||||
@ -64,7 +64,7 @@ where
|
|||||||
negotiating_out: Vec::new(),
|
negotiating_out: Vec::new(),
|
||||||
queued_dial_upgrades: Vec::new(),
|
queued_dial_upgrades: Vec::new(),
|
||||||
unique_dial_upgrade_id: 0,
|
unique_dial_upgrade_id: 0,
|
||||||
connection_shutdown: None,
|
shutdown: Shutdown::None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -85,7 +85,7 @@ where
|
|||||||
negotiating_out: Vec::new(),
|
negotiating_out: Vec::new(),
|
||||||
queued_dial_upgrades: Vec::new(),
|
queued_dial_upgrades: Vec::new(),
|
||||||
unique_dial_upgrade_id: 0,
|
unique_dial_upgrade_id: 0,
|
||||||
connection_shutdown: None,
|
shutdown: Shutdown::None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -112,9 +112,26 @@ where
|
|||||||
queued_dial_upgrades: Vec<(u64, TProtoHandler::OutboundProtocol)>,
|
queued_dial_upgrades: Vec<(u64, TProtoHandler::OutboundProtocol)>,
|
||||||
/// Unique identifier assigned to each queued dial upgrade.
|
/// Unique identifier assigned to each queued dial upgrade.
|
||||||
unique_dial_upgrade_id: u64,
|
unique_dial_upgrade_id: u64,
|
||||||
/// When a connection has been deemed useless, will contain `Some` with a `Delay` to when it
|
/// The currently planned connection & handler shutdown.
|
||||||
/// should be shut down.
|
shutdown: Shutdown,
|
||||||
connection_shutdown: Option<Delay>,
|
}
|
||||||
|
|
||||||
|
/// The options for a planned connection & handler shutdown.
|
||||||
|
///
|
||||||
|
/// A shutdown is planned anew based on the the return value of
|
||||||
|
/// [`ProtocolsHandler::connection_keep_alive`] of the underlying handler
|
||||||
|
/// after every invocation of [`ProtocolsHandler::poll`].
|
||||||
|
///
|
||||||
|
/// A planned shutdown is always postponed for as long as there are ingoing
|
||||||
|
/// or outgoing substreams being negotiated, i.e. it is a graceful, "idle"
|
||||||
|
/// shutdown.
|
||||||
|
enum Shutdown {
|
||||||
|
/// No shutdown is planned.
|
||||||
|
None,
|
||||||
|
/// A shut down is planned as soon as possible.
|
||||||
|
Asap,
|
||||||
|
/// A shut down is planned for when a `Delay` has elapsed.
|
||||||
|
Later(Delay)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Error generated by the `NodeHandlerWrapper`.
|
/// Error generated by the `NodeHandlerWrapper`.
|
||||||
@ -257,10 +274,12 @@ where
|
|||||||
// calls on `self.handler`.
|
// calls on `self.handler`.
|
||||||
let poll_result = self.handler.poll()?;
|
let poll_result = self.handler.poll()?;
|
||||||
|
|
||||||
self.connection_shutdown = match self.handler.connection_keep_alive() {
|
// Ask the handler whether it wants the connection (and the handler itself)
|
||||||
KeepAlive::Until(expiration) => Some(Delay::new(expiration)),
|
// to be kept alive, which determines the planned shutdown, if any.
|
||||||
KeepAlive::Now => Some(Delay::new(Instant::now())),
|
self.shutdown = match self.handler.connection_keep_alive() {
|
||||||
KeepAlive::Forever => None,
|
KeepAlive::Until(t) => Shutdown::Later(Delay::new(t)),
|
||||||
|
KeepAlive::Now => Shutdown::Asap,
|
||||||
|
KeepAlive::Forever => Shutdown::None
|
||||||
};
|
};
|
||||||
|
|
||||||
match poll_result {
|
match poll_result {
|
||||||
@ -282,21 +301,18 @@ where
|
|||||||
Async::NotReady => (),
|
Async::NotReady => (),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check the `connection_shutdown`.
|
// Check if the connection (and handler) should be shut down.
|
||||||
if let Some(mut connection_shutdown) = self.connection_shutdown.take() {
|
// As long as we're still negotiating substreams, shutdown is always postponed.
|
||||||
// If we're negotiating substreams, let's delay the closing.
|
|
||||||
if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() {
|
if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() {
|
||||||
match connection_shutdown.poll() {
|
match self.shutdown {
|
||||||
Ok(Async::Ready(_)) | Err(_) => {
|
Shutdown::None => {},
|
||||||
return Err(NodeHandlerWrapperError::UselessTimeout);
|
Shutdown::Asap => return Err(NodeHandlerWrapperError::UselessTimeout),
|
||||||
},
|
Shutdown::Later(ref mut delay) => match delay.poll() {
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::Ready(_)) | Err(_) =>
|
||||||
self.connection_shutdown = Some(connection_shutdown);
|
return Err(NodeHandlerWrapperError::UselessTimeout),
|
||||||
|
Ok(Async::NotReady) => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
self.connection_shutdown = Some(connection_shutdown);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Async::NotReady)
|
Ok(Async::NotReady)
|
||||||
|
@ -76,7 +76,7 @@ where
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Now }
|
fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Forever }
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, Self::Error> {
|
fn poll(&mut self) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent>, Self::Error> {
|
||||||
Ok(Async::NotReady)
|
Ok(Async::NotReady)
|
||||||
|
@ -220,60 +220,60 @@ pub enum IdentifyEvent {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::{Identify, IdentifyEvent};
|
use crate::{Identify, IdentifyEvent};
|
||||||
use futures::prelude::*;
|
use futures::{future, prelude::*};
|
||||||
use libp2p_core::identity;
|
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
|
identity,
|
||||||
|
PeerId,
|
||||||
upgrade::{self, OutboundUpgradeExt, InboundUpgradeExt},
|
upgrade::{self, OutboundUpgradeExt, InboundUpgradeExt},
|
||||||
|
muxing::StreamMuxer,
|
||||||
Multiaddr,
|
Multiaddr,
|
||||||
Swarm,
|
Swarm,
|
||||||
Transport
|
Transport
|
||||||
};
|
};
|
||||||
|
use libp2p_tcp::TcpConfig;
|
||||||
|
use libp2p_secio::SecioConfig;
|
||||||
|
use libp2p_mplex::MplexConfig;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use std::io;
|
use std::{fmt, io};
|
||||||
|
use tokio::runtime::current_thread;
|
||||||
|
|
||||||
|
fn transport() -> (identity::PublicKey, impl Transport<
|
||||||
|
Output = (PeerId, impl StreamMuxer<Substream = impl Send, OutboundSubstream = impl Send>),
|
||||||
|
Listener = impl Send,
|
||||||
|
ListenerUpgrade = impl Send,
|
||||||
|
Dial = impl Send,
|
||||||
|
Error = impl fmt::Debug
|
||||||
|
> + Clone) {
|
||||||
|
let id_keys = identity::Keypair::generate_ed25519();
|
||||||
|
let pubkey = id_keys.public();
|
||||||
|
let transport = TcpConfig::new()
|
||||||
|
.nodelay(true)
|
||||||
|
.with_upgrade(SecioConfig::new(id_keys))
|
||||||
|
.and_then(move |out, endpoint| {
|
||||||
|
let peer_id = out.remote_key.into_peer_id();
|
||||||
|
let peer_id2 = peer_id.clone();
|
||||||
|
let upgrade = MplexConfig::default()
|
||||||
|
.map_outbound(move |muxer| (peer_id, muxer))
|
||||||
|
.map_inbound(move |muxer| (peer_id2, muxer));
|
||||||
|
upgrade::apply(out.stream, upgrade, endpoint)
|
||||||
|
});
|
||||||
|
(pubkey, transport)
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn periodic_id_works() {
|
fn periodic_id_works() {
|
||||||
let node1_key = identity::Keypair::generate_ed25519();
|
let (mut swarm1, pubkey1) = {
|
||||||
let node1_public_key = node1_key.public();
|
let (pubkey, transport) = transport();
|
||||||
let node2_key = identity::Keypair::generate_ed25519();
|
let protocol = Identify::new("a".to_string(), "b".to_string(), pubkey.clone());
|
||||||
let node2_public_key = node2_key.public();
|
let swarm = Swarm::new(transport, protocol, pubkey.clone().into_peer_id());
|
||||||
|
(swarm, pubkey)
|
||||||
let mut swarm1 = {
|
|
||||||
// TODO: make creating the transport more elegant ; literaly half of the code of the test
|
|
||||||
// is about creating the transport
|
|
||||||
let local_peer_id = node1_public_key.clone().into_peer_id();
|
|
||||||
let transport = libp2p_tcp::TcpConfig::new()
|
|
||||||
.with_upgrade(libp2p_secio::SecioConfig::new(node1_key))
|
|
||||||
.and_then(move |out, endpoint| {
|
|
||||||
let peer_id = out.remote_key.into_peer_id();
|
|
||||||
let peer_id2 = peer_id.clone();
|
|
||||||
let upgrade = libp2p_mplex::MplexConfig::default()
|
|
||||||
.map_outbound(move |muxer| (peer_id, muxer))
|
|
||||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
|
||||||
upgrade::apply(out.stream, upgrade, endpoint)
|
|
||||||
})
|
|
||||||
.map_err(|_| -> io::Error { panic!() });
|
|
||||||
|
|
||||||
Swarm::new(transport, Identify::new("a".to_string(), "b".to_string(), node1_public_key.clone()), local_peer_id)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut swarm2 = {
|
let (mut swarm2, pubkey2) = {
|
||||||
// TODO: make creating the transport more elegant ; literaly half of the code of the test
|
let (pubkey, transport) = transport();
|
||||||
// is about creating the transport
|
let protocol = Identify::new("c".to_string(), "d".to_string(), pubkey.clone());
|
||||||
let local_peer_id = node2_public_key.clone().into();
|
let swarm = Swarm::new(transport, protocol, pubkey.clone().into_peer_id());
|
||||||
let transport = libp2p_tcp::TcpConfig::new()
|
(swarm, pubkey)
|
||||||
.with_upgrade(libp2p_secio::SecioConfig::new(node2_key))
|
|
||||||
.and_then(move |out, endpoint| {
|
|
||||||
let peer_id = out.remote_key.into_peer_id();
|
|
||||||
let peer_id2 = peer_id.clone();
|
|
||||||
let upgrade = libp2p_mplex::MplexConfig::default()
|
|
||||||
.map_outbound(move |muxer| (peer_id, muxer))
|
|
||||||
.map_inbound(move |muxer| (peer_id2, muxer));
|
|
||||||
upgrade::apply(out.stream, upgrade, endpoint)
|
|
||||||
})
|
|
||||||
.map_err(|_| -> io::Error { panic!() });
|
|
||||||
|
|
||||||
Swarm::new(transport, Identify::new("c".to_string(), "d".to_string(), node2_public_key.clone()), local_peer_id)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let addr: Multiaddr = {
|
let addr: Multiaddr = {
|
||||||
@ -282,51 +282,45 @@ mod tests {
|
|||||||
};
|
};
|
||||||
|
|
||||||
Swarm::listen_on(&mut swarm1, addr.clone()).unwrap();
|
Swarm::listen_on(&mut swarm1, addr.clone()).unwrap();
|
||||||
Swarm::dial_addr(&mut swarm2, addr).unwrap();
|
Swarm::dial_addr(&mut swarm2, addr.clone()).unwrap();
|
||||||
|
|
||||||
let mut swarm1_good = false;
|
// nb. Either swarm may receive the `Identified` event first, upon which
|
||||||
let mut swarm2_good = false;
|
// it will permit the connection to be closed, as defined by
|
||||||
|
// `PeriodicIdHandler::connection_keep_alive`. Hence the test succeeds if
|
||||||
tokio::runtime::current_thread::Runtime::new()
|
// either `Identified` event arrives correctly.
|
||||||
.unwrap()
|
current_thread::Runtime::new().unwrap().block_on(
|
||||||
.block_on(futures::future::poll_fn(move || -> Result<_, io::Error> {
|
future::poll_fn(move || -> Result<_, io::Error> {
|
||||||
loop {
|
loop {
|
||||||
let mut swarm1_not_ready = false;
|
|
||||||
match swarm1.poll().unwrap() {
|
match swarm1.poll().unwrap() {
|
||||||
Async::Ready(Some(IdentifyEvent::Identified { info, .. })) => {
|
Async::Ready(Some(IdentifyEvent::Identified { info, .. })) => {
|
||||||
assert_eq!(info.public_key, node2_public_key);
|
assert_eq!(info.public_key, pubkey2);
|
||||||
assert_eq!(info.protocol_version, "c");
|
assert_eq!(info.protocol_version, "c");
|
||||||
assert_eq!(info.agent_version, "d");
|
assert_eq!(info.agent_version, "d");
|
||||||
assert!(!info.protocols.is_empty());
|
assert!(!info.protocols.is_empty());
|
||||||
assert!(info.listen_addrs.is_empty());
|
assert!(info.listen_addrs.is_empty());
|
||||||
swarm1_good = true;
|
return Ok(Async::Ready(()))
|
||||||
},
|
},
|
||||||
Async::Ready(Some(IdentifyEvent::SendBack { result: Ok(()), .. })) => (),
|
Async::Ready(Some(IdentifyEvent::SendBack { result: Ok(()), .. })) => (),
|
||||||
Async::Ready(_) => panic!(),
|
Async::Ready(e) => panic!("{:?}", e),
|
||||||
Async::NotReady => swarm1_not_ready = true,
|
Async::NotReady => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
match swarm2.poll().unwrap() {
|
match swarm2.poll().unwrap() {
|
||||||
Async::Ready(Some(IdentifyEvent::Identified { info, .. })) => {
|
Async::Ready(Some(IdentifyEvent::Identified { info, .. })) => {
|
||||||
assert_eq!(info.public_key, node1_public_key);
|
assert_eq!(info.public_key, pubkey1);
|
||||||
assert_eq!(info.protocol_version, "a");
|
assert_eq!(info.protocol_version, "a");
|
||||||
assert_eq!(info.agent_version, "b");
|
assert_eq!(info.agent_version, "b");
|
||||||
assert!(!info.protocols.is_empty());
|
assert!(!info.protocols.is_empty());
|
||||||
assert_eq!(info.listen_addrs.len(), 1);
|
assert_eq!(info.listen_addrs.len(), 1);
|
||||||
swarm2_good = true;
|
return Ok(Async::Ready(()))
|
||||||
},
|
},
|
||||||
Async::Ready(Some(IdentifyEvent::SendBack { result: Ok(()), .. })) => (),
|
Async::Ready(Some(IdentifyEvent::SendBack { result: Ok(()), .. })) => (),
|
||||||
Async::Ready(_) => panic!(),
|
Async::Ready(e) => panic!("{:?}", e),
|
||||||
Async::NotReady if swarm1_not_ready => break,
|
Async::NotReady => break
|
||||||
Async::NotReady => ()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if swarm1_good && swarm2_good {
|
|
||||||
Ok(Async::Ready(()))
|
|
||||||
} else {
|
|
||||||
Ok(Async::NotReady)
|
Ok(Async::NotReady)
|
||||||
}
|
|
||||||
}))
|
}))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user