diff --git a/core/src/either.rs b/core/src/either.rs index 4b1b05d0..21718fed 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{muxing::{Shutdown, StreamMuxer}, Multiaddr, ProtocolName}; +use crate::{muxing::StreamMuxer, Multiaddr, ProtocolName}; use futures::prelude::*; use std::{fmt, io::{Error as IoError, Read, Write}}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -135,10 +135,10 @@ where type Substream = EitherOutput; type OutboundSubstream = EitherOutbound; - fn poll_inbound(&self) -> Poll, IoError> { + fn poll_inbound(&self) -> Poll { match self { - EitherOutput::First(inner) => inner.poll_inbound().map(|p| p.map(|o| o.map(EitherOutput::First))), - EitherOutput::Second(inner) => inner.poll_inbound().map(|p| p.map(|o| o.map(EitherOutput::Second))), + EitherOutput::First(inner) => inner.poll_inbound().map(|p| p.map(EitherOutput::First)), + EitherOutput::Second(inner) => inner.poll_inbound().map(|p| p.map(EitherOutput::Second)), } } @@ -149,13 +149,13 @@ where } } - fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll, IoError> { + fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll { match (self, substream) { (EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => { - inner.poll_outbound(substream).map(|p| p.map(|o| o.map(EitherOutput::First))) + inner.poll_outbound(substream).map(|p| p.map(EitherOutput::First)) }, (EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => { - inner.poll_outbound(substream).map(|p| p.map(|o| o.map(EitherOutput::Second))) + inner.poll_outbound(substream).map(|p| p.map(EitherOutput::Second)) }, _ => panic!("Wrong API usage") } @@ -214,13 +214,13 @@ where } } - fn shutdown_substream(&self, sub: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError> { + fn shutdown_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> { match (self, sub) { (EitherOutput::First(ref inner), EitherOutput::First(ref mut sub)) => { - inner.shutdown_substream(sub, kind) + inner.shutdown_substream(sub) }, (EitherOutput::Second(ref inner), EitherOutput::Second(ref mut sub)) => { - inner.shutdown_substream(sub, kind) + inner.shutdown_substream(sub) }, _ => panic!("Wrong API usage") } @@ -250,10 +250,10 @@ where } } - fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> { + fn close(&self) -> Poll<(), IoError> { match self { - EitherOutput::First(inner) => inner.shutdown(kind), - EitherOutput::Second(inner) => inner.shutdown(kind) + EitherOutput::First(inner) => inner.close(), + EitherOutput::Second(inner) => inner.close() } } diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 07b6e304..a1c2552f 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -54,24 +54,24 @@ use fnv::FnvHashMap; use futures::{future, prelude::*, try_ready}; use parking_lot::Mutex; -use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; +use std::io::{self, Read, Write}; use std::ops::Deref; use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio_io::{AsyncRead, AsyncWrite}; -/// Ways to shutdown a substream or stream muxer. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum Shutdown { - /// Shutdown inbound direction. - Inbound, - /// Shutdown outbound direction. - Outbound, - /// Shutdown everything. - All -} - /// Implemented on objects that can open and manage substreams. +/// +/// The state of a muxer, as exposed by this API, is the following: +/// +/// - A connection to the remote. The `is_remote_acknowledged`, `flush_all` and `close` methods +/// operate on this. +/// - A list of substreams that are open. The `poll_inbound`, `poll_outbound`, `read_substream`, +/// `write_substream`, `flush_substream`, `shutdown_substream` and `destroy_substream` methods +/// allow controlling these entries. +/// - A list of outbound substreams being opened. The `open_outbound`, `poll_outbound` and +/// `destroy_outbound` methods allow controlling these entries. +/// pub trait StreamMuxer { /// Type of the object that represents the raw substream where data can be read and written. type Substream; @@ -85,19 +85,20 @@ pub trait StreamMuxer { /// /// If `NotReady` is returned, then the current task will be notified once the muxer /// is ready to be polled, similar to the API of `Stream::poll()`. - /// However, only the latest task that was used to call this method may be notified. - fn poll_inbound(&self) -> Poll, IoError>; + /// Only the latest task that was used to call this method may be notified. + /// + /// An error can be generated if the connection has been closed. + fn poll_inbound(&self) -> Poll; - /// Opens a new outgoing substream, and produces a future that will be resolved when it becomes - /// available. + /// Opens a new outgoing substream, and produces the equivalent to a future that will be + /// resolved when it becomes available. + /// + /// The API of `OutboundSubstream` is totally opaque, and the object can only be interfaced + /// through the methods on the `StreamMuxer` trait. fn open_outbound(&self) -> Self::OutboundSubstream; /// Polls the outbound substream. /// - /// If this returns `Ok(Ready(None))`, that means that the outbound channel is closed and that - /// opening any further outbound substream will likely produce `None` as well. The existing - /// outbound substream attempts may however still succeed. - /// /// If `NotReady` is returned, then the current task will be notified once the substream /// is ready to be polled, similar to the API of `Future::poll()`. /// However, for each individual outbound substream, only the latest task that was used to @@ -105,10 +106,10 @@ pub trait StreamMuxer { /// /// May panic or produce an undefined result if an earlier polling of the same substream /// returned `Ready` or `Err`. - fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll, IoError>; + fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll; - /// Destroys an outbound substream. Use this after the outbound substream has finished, or if - /// you want to interrupt it. + /// Destroys an outbound substream future. Use this after the outbound substream has finished, + /// or if you want to interrupt it. fn destroy_outbound(&self, s: Self::OutboundSubstream); /// Reads data from a substream. The behaviour is the same as `tokio_io::AsyncRead::poll_read`. @@ -116,29 +117,51 @@ pub trait StreamMuxer { /// If `NotReady` is returned, then the current task will be notified once the substream /// is ready to be read. However, for each individual substream, only the latest task that /// was used to call this method may be notified. - fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll; + /// + /// If `Async::Ready(0)` is returned, the substream has been closed by the remote and should + /// no longer be read afterwards. + /// + /// An error can be generated if the connection has been closed, or if a protocol misbehaviour + /// happened. + fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll; /// Write data to a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_write`. /// /// If `NotReady` is returned, then the current task will be notified once the substream - /// is ready to be read. However, for each individual substream, only the latest task that - /// was used to call this method may be notified. - fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll; + /// is ready to be read. For each individual substream, only the latest task that was used to + /// call this method may be notified. + /// + /// Calling `write_substream` does not guarantee that data will arrive to the remote. To + /// ensure that, you should call `flush_substream`. + /// + /// It is incorrect to call this method on a substream if you called `shutdown_substream` on + /// this substream earlier. + fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll; /// Flushes a substream. The behaviour is the same as `tokio_io::AsyncWrite::poll_flush`. /// + /// After this method has been called, data written earlier on the substream is guaranteed to + /// be received by the remote. + /// /// If `NotReady` is returned, then the current task will be notified once the substream - /// is ready to be read. However, for each individual substream, only the latest task that - /// was used to call this method may be notified. - fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError>; + /// is ready to be read. For each individual substream, only the latest task that was used to + /// call this method may be notified. + /// + /// > **Note**: This method may be implemented as a call to `flush_all`. + fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error>; - /// Attempts to shut down a substream. The behaviour is similar to + /// Attempts to shut down the writing side of a substream. The behaviour is similar to /// `tokio_io::AsyncWrite::shutdown`. /// - /// Shutting down a substream does not imply `flush_substream`. If you want to make sure - /// that the remote is immediately informed about the shutdown, use `flush_substream` or - /// `flush`. - fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError>; + /// Contrary to `AsyncWrite::shutdown`, shutting down a substream does not imply + /// `flush_substream`. If you want to make sure that the remote is immediately informed about + /// the shutdown, use `flush_substream` or `flush_all`. + /// + /// After this method has been called, you should no longer attempt to write to this substream. + /// + /// An error can be generated if the connection has been closed, or if a protocol misbehaviour + /// happened. + fn shutdown_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error>; /// Destroys a substream. fn destroy_substream(&self, s: Self::Substream); @@ -151,23 +174,26 @@ pub trait StreamMuxer { /// has potentially not received it yet. fn is_remote_acknowledged(&self) -> bool; - /// Shutdown this `StreamMuxer`. + /// Closes this `StreamMuxer`. /// - /// If supported, sends a hint to the remote that we may no longer open any further outbound - /// or inbound substream. Calling `poll_outbound` or `poll_inbound` afterwards may or may not - /// produce `None`. + /// After this has returned `Ok(Async::Ready(()))`, the muxer has become useless. All + /// subsequent reads must return either `EOF` or an error. All subsequent writes, shutdowns, + /// or polls must generate an error or be ignored. /// - /// Shutting down the muxer does not imply `flush_all`. If you want to make sure that the - /// remote is immediately informed about the shutdown, use `flush_all`. - fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError>; + /// Calling this method implies `flush_all`. + /// + /// > **Note**: You are encouraged to call this method and wait for it to return `Ready`, so + /// > that the remote is properly informed of the shutdown. However, apart from + /// > properly informing the remote, there is no difference between this and + /// > immediately dropping the muxer. + fn close(&self) -> Poll<(), io::Error>; /// Flush this `StreamMuxer`. /// - /// This drains any write buffers of substreams and otherwise and delivers any pending shutdown - /// notifications due to `shutdown_substream` or `shutdown`. One may thus shutdown groups of - /// substreams followed by a final `flush_all` instead of having to do `flush_substream` for - /// each. - fn flush_all(&self) -> Poll<(), IoError>; + /// This drains any write buffers of substreams and delivers any pending shutdown notifications + /// due to `shutdown_substream` or `close`. One may thus shutdown groups of substreams + /// followed by a final `flush_all` instead of having to do `flush_substream` for each. + fn flush_all(&self) -> Poll<(), io::Error>; } /// Polls for an inbound from the muxer but wraps the output in an object that @@ -175,14 +201,14 @@ pub trait StreamMuxer { #[inline] pub fn inbound_from_ref_and_wrap

( muxer: P, -) -> impl Future>, Error = IoError> +) -> impl Future, Error = io::Error> where P: Deref + Clone, P::Target: StreamMuxer, { let muxer2 = muxer.clone(); future::poll_fn(move || muxer.poll_inbound()) - .map(|substream| substream.map(move |s| substream_from_ref(muxer2, s))) + .map(|substream| substream_from_ref(muxer2, substream)) } /// Same as `outbound_from_ref`, but wraps the output in an object that @@ -211,16 +237,15 @@ where P: Deref + Clone, P::Target: StreamMuxer, { - type Item = Option>; - type Error = IoError; + type Item = SubstreamRef

; + type Error = io::Error; fn poll(&mut self) -> Poll { match self.inner.poll() { - Ok(Async::Ready(Some(substream))) => { + Ok(Async::Ready(substream)) => { let out = substream_from_ref(self.inner.muxer.clone(), substream); - Ok(Async::Ready(Some(out))) + Ok(Async::Ready(out)) } - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), Ok(Async::NotReady) => Ok(Async::NotReady), Err(err) => Err(err), } @@ -256,8 +281,8 @@ where P: Deref, P::Target: StreamMuxer, { - type Item = Option<::Substream>; - type Error = IoError; + type Item = ::Substream; + type Error = io::Error; #[inline] fn poll(&mut self) -> Poll { @@ -323,11 +348,11 @@ where P::Target: StreamMuxer, { #[inline] - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, buf: &mut [u8]) -> Result { let s = self.substream.as_mut().expect("substream was empty"); match self.muxer.read_substream(s, buf)? { Async::Ready(n) => Ok(n), - Async::NotReady => Err(IoErrorKind::WouldBlock.into()) + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()) } } } @@ -338,7 +363,7 @@ where P::Target: StreamMuxer, { #[inline] - fn poll_read(&mut self, buf: &mut [u8]) -> Poll { + fn poll_read(&mut self, buf: &mut [u8]) -> Poll { let s = self.substream.as_mut().expect("substream was empty"); self.muxer.read_substream(s, buf) } @@ -350,20 +375,20 @@ where P::Target: StreamMuxer, { #[inline] - fn write(&mut self, buf: &[u8]) -> Result { + fn write(&mut self, buf: &[u8]) -> Result { let s = self.substream.as_mut().expect("substream was empty"); match self.muxer.write_substream(s, buf)? { Async::Ready(n) => Ok(n), - Async::NotReady => Err(IoErrorKind::WouldBlock.into()) + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()) } } #[inline] - fn flush(&mut self) -> Result<(), IoError> { + fn flush(&mut self) -> Result<(), io::Error> { let s = self.substream.as_mut().expect("substream was empty"); match self.muxer.flush_substream(s)? { Async::Ready(()) => Ok(()), - Async::NotReady => Err(IoErrorKind::WouldBlock.into()) + Async::NotReady => Err(io::ErrorKind::WouldBlock.into()) } } } @@ -374,20 +399,20 @@ where P::Target: StreamMuxer, { #[inline] - fn poll_write(&mut self, buf: &[u8]) -> Poll { + fn poll_write(&mut self, buf: &[u8]) -> Poll { let s = self.substream.as_mut().expect("substream was empty"); self.muxer.write_substream(s, buf) } #[inline] - fn shutdown(&mut self) -> Poll<(), IoError> { + fn shutdown(&mut self) -> Poll<(), io::Error> { let s = self.substream.as_mut().expect("substream was empty"); - self.muxer.shutdown_substream(s, Shutdown::All)?; + self.muxer.shutdown_substream(s)?; Ok(Async::Ready(())) } #[inline] - fn poll_flush(&mut self) -> Poll<(), IoError> { + fn poll_flush(&mut self) -> Poll<(), io::Error> { let s = self.substream.as_mut().expect("substream was empty"); self.muxer.flush_substream(s) } @@ -436,7 +461,7 @@ impl StreamMuxer for StreamMuxerBox { type OutboundSubstream = usize; // TODO: use a newtype #[inline] - fn poll_inbound(&self) -> Poll, IoError> { + fn poll_inbound(&self) -> Poll { self.inner.poll_inbound() } @@ -446,7 +471,7 @@ impl StreamMuxer for StreamMuxerBox { } #[inline] - fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll, IoError> { + fn poll_outbound(&self, s: &mut Self::OutboundSubstream) -> Poll { self.inner.poll_outbound(s) } @@ -456,23 +481,23 @@ impl StreamMuxer for StreamMuxerBox { } #[inline] - fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll { + fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll { self.inner.read_substream(s, buf) } #[inline] - fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll { + fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll { self.inner.write_substream(s, buf) } #[inline] - fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError> { + fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error> { self.inner.flush_substream(s) } #[inline] - fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError> { - self.inner.shutdown_substream(s, kind) + fn shutdown_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error> { + self.inner.shutdown_substream(s) } #[inline] @@ -481,8 +506,8 @@ impl StreamMuxer for StreamMuxerBox { } #[inline] - fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> { - self.inner.shutdown(kind) + fn close(&self) -> Poll<(), io::Error> { + self.inner.close() } #[inline] @@ -491,7 +516,7 @@ impl StreamMuxer for StreamMuxerBox { } #[inline] - fn flush_all(&self) -> Poll<(), IoError> { + fn flush_all(&self) -> Poll<(), io::Error> { self.inner.flush_all() } } @@ -509,15 +534,11 @@ impl StreamMuxer for Wrap where T: StreamMuxer { type OutboundSubstream = usize; // TODO: use a newtype #[inline] - fn poll_inbound(&self) -> Poll, IoError> { - match try_ready!(self.inner.poll_inbound()) { - Some(substream) => { - let id = self.next_substream.fetch_add(1, Ordering::Relaxed); - self.substreams.lock().insert(id, substream); - Ok(Async::Ready(Some(id))) - }, - None => Ok(Async::Ready(None)), - } + fn poll_inbound(&self) -> Poll { + let substream = try_ready!(self.inner.poll_inbound()); + let id = self.next_substream.fetch_add(1, Ordering::Relaxed); + self.substreams.lock().insert(id, substream); + Ok(Async::Ready(id)) } #[inline] @@ -532,16 +553,12 @@ impl StreamMuxer for Wrap where T: StreamMuxer { fn poll_outbound( &self, substream: &mut Self::OutboundSubstream, - ) -> Poll, IoError> { + ) -> Poll { let mut list = self.outbound.lock(); - match try_ready!(self.inner.poll_outbound(list.get_mut(substream).unwrap())) { - Some(substream) => { - let id = self.next_substream.fetch_add(1, Ordering::Relaxed); - self.substreams.lock().insert(id, substream); - Ok(Async::Ready(Some(id))) - }, - None => Ok(Async::Ready(None)), - } + let substream = try_ready!(self.inner.poll_outbound(list.get_mut(substream).unwrap())); + let id = self.next_substream.fetch_add(1, Ordering::Relaxed); + self.substreams.lock().insert(id, substream); + Ok(Async::Ready(id)) } #[inline] @@ -551,27 +568,27 @@ impl StreamMuxer for Wrap where T: StreamMuxer { } #[inline] - fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll { + fn read_substream(&self, s: &mut Self::Substream, buf: &mut [u8]) -> Poll { let mut list = self.substreams.lock(); self.inner.read_substream(list.get_mut(s).unwrap(), buf) } #[inline] - fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll { + fn write_substream(&self, s: &mut Self::Substream, buf: &[u8]) -> Poll { let mut list = self.substreams.lock(); self.inner.write_substream(list.get_mut(s).unwrap(), buf) } #[inline] - fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), IoError> { + fn flush_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error> { let mut list = self.substreams.lock(); self.inner.flush_substream(list.get_mut(s).unwrap()) } #[inline] - fn shutdown_substream(&self, s: &mut Self::Substream, kind: Shutdown) -> Poll<(), IoError> { + fn shutdown_substream(&self, s: &mut Self::Substream) -> Poll<(), io::Error> { let mut list = self.substreams.lock(); - self.inner.shutdown_substream(list.get_mut(s).unwrap(), kind) + self.inner.shutdown_substream(list.get_mut(s).unwrap()) } #[inline] @@ -581,8 +598,8 @@ impl StreamMuxer for Wrap where T: StreamMuxer { } #[inline] - fn shutdown(&self, kind: Shutdown) -> Poll<(), IoError> { - self.inner.shutdown(kind) + fn close(&self) -> Poll<(), io::Error> { + self.inner.close() } #[inline] @@ -591,7 +608,7 @@ impl StreamMuxer for Wrap where T: StreamMuxer { } #[inline] - fn flush_all(&self) -> Poll<(), IoError> { + fn flush_all(&self) -> Poll<(), io::Error> { self.inner.flush_all() } } diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 7e323788..2ba0f6ae 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -72,21 +72,10 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr /// the connection. NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TPeerId>), - /// A connection to a node has been closed. - /// - /// This happens once both the inbound and outbound channels are closed, and no more outbound - /// substream attempt is pending. - NodeClosed { - /// Identifier of the node. - peer_id: TPeerId, - /// User data that was passed when accepting. - user_data: TUserData, - }, - /// A connection to a node has errored. /// /// Can only happen after a node has been successfully reached. - NodeError { + NodeClosed { /// Identifier of the node. peer_id: TPeerId, /// The error that happened. @@ -129,16 +118,10 @@ where TOutEvent: fmt::Debug, .field(inner) .finish() }, - CollectionEvent::NodeClosed { ref peer_id, ref user_data } => { + CollectionEvent::NodeClosed { ref peer_id, ref error, ref user_data } => { f.debug_struct("CollectionEvent::NodeClosed") .field("peer_id", peer_id) .field("user_data", user_data) - .finish() - }, - CollectionEvent::NodeError { ref peer_id, ref error, ref user_data } => { - f.debug_struct("CollectionEvent::NodeError") - .field("peer_id", peer_id) - .field("user_data", user_data) .field("error", error) .finish() }, @@ -395,48 +378,34 @@ where let user_data = task.into_user_data(); match (user_data, result, handler) { - (TaskState::Pending, Err(TaskClosedEvent::Reach(err)), Some(handler)) => { + (TaskState::Pending, TaskClosedEvent::Reach(err), Some(handler)) => { Async::Ready(CollectionEvent::ReachError { id: ReachAttemptId(id), error: err, handler, }) }, - (TaskState::Pending, Ok(()), _) => { - panic!("The API of HandledNodesTasks guarantees that a task cannot \ - gracefully closed before being connected to a node, in which case \ - its state should be Connected and not Pending; QED"); - }, - (TaskState::Pending, Err(TaskClosedEvent::Node(_)), _) => { + (TaskState::Pending, TaskClosedEvent::Node(_), _) => { panic!("We switch the task state to Connected once we're connected, and \ a TaskClosedEvent::Node can only happen after we're \ connected; QED"); }, - (TaskState::Pending, Err(TaskClosedEvent::Reach(_)), None) => { + (TaskState::Pending, TaskClosedEvent::Reach(_), None) => { // TODO: this could be improved in the API of HandledNodesTasks panic!("The HandledNodesTasks is guaranteed to always return the handler \ when producing a TaskClosedEvent::Reach error"); }, - (TaskState::Connected(peer_id, user_data), Ok(()), _handler) => { + (TaskState::Connected(peer_id, user_data), TaskClosedEvent::Node(err), _handler) => { debug_assert!(_handler.is_none()); let _node_task_id = self.nodes.remove(&peer_id); debug_assert_eq!(_node_task_id, Some(id)); Async::Ready(CollectionEvent::NodeClosed { - peer_id, - user_data, - }) - }, - (TaskState::Connected(peer_id, user_data), Err(TaskClosedEvent::Node(err)), _handler) => { - debug_assert!(_handler.is_none()); - let _node_task_id = self.nodes.remove(&peer_id); - debug_assert_eq!(_node_task_id, Some(id)); - Async::Ready(CollectionEvent::NodeError { peer_id, error: err, user_data, }) }, - (TaskState::Connected(_, _), Err(TaskClosedEvent::Reach(_)), _) => { + (TaskState::Connected(_, _), TaskClosedEvent::Reach(_), _) => { panic!("A TaskClosedEvent::Reach can only happen before we are connected \ to a node; therefore the TaskState won't be Connected; QED"); }, diff --git a/core/src/nodes/collection/tests.rs b/core/src/nodes/collection/tests.rs index 58125047..2b10ebec 100644 --- a/core/src/nodes/collection/tests.rs +++ b/core/src/nodes/collection/tests.rs @@ -129,12 +129,12 @@ fn events_in_a_node_reaches_the_collection_stream() { let task_peer_id = PeerId::random(); let mut handler = Handler::default(); - handler.state = Some(HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("init"))))); + handler.state = Some(HandlerState::Ready(NodeHandlerEvent::Custom(OutEvent::Custom("init")))); let handler_states = vec![ HandlerState::Err, - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 3") ))), - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 2") ))), - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") ))), + HandlerState::Ready(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 3") )), + HandlerState::Ready(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 2") )), + HandlerState::Ready(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") )), ]; handler.next_states = handler_states; @@ -265,55 +265,7 @@ fn task_closed_with_error_when_task_is_connected_yields_node_error() { rt.block_on(future::poll_fn(move || -> Poll<_, ()> { let mut cs = cs_fut.lock(); assert_matches!(cs.poll(), Async::Ready(collection_ev) => { - assert_matches!(collection_ev, CollectionEvent::NodeError{..}); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); -} - -#[test] -fn task_closed_ok_when_task_is_connected_yields_node_closed() { - let cs = Arc::new(Mutex::new(TestCollectionStream::new())); - let peer_id = PeerId::random(); - let muxer = DummyMuxer::new(); - let task_inner_fut = future::ok((peer_id.clone(), muxer)); - let mut handler = Handler::default(); - handler.next_states = vec![HandlerState::Ready(None)]; // triggered when sending a NextState event - - cs.lock().add_reach_attempt(task_inner_fut, handler); - let mut rt = Builder::new().core_threads(1).build().unwrap(); - - // Kick it off - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - assert_matches!(cs.poll(), Async::NotReady); - // send an event so the Handler errors in two polls - cs.broadcast_event(&InEvent::NextState); - Ok(Async::Ready(())) - })).expect("tokio works"); - - // Accept the new node - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - // NodeReached, accept the connection so the task transitions from Pending to Connected - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeReached(reach_ev)) => { - reach_ev.accept(()); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); - - assert!(cs.lock().has_connection(&peer_id)); - - // Next poll, the Handler returns Async::Ready(None) because of the - // NextState message sent before. - let cs_fut = cs.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut cs = cs_fut.lock(); - // Node is closed normally: TaskClosed, Ok(()) - assert_matches!(cs.poll(), Async::Ready(CollectionEvent::NodeClosed{ peer_id: peer_id_in_event, .. }) => { - assert_eq!(peer_id_in_event, peer_id); + assert_matches!(collection_ev, CollectionEvent::NodeClosed{..}); }); Ok(Async::Ready(())) })).expect("tokio works"); diff --git a/core/src/nodes/handled_node.rs b/core/src/nodes/handled_node.rs index c231d4f8..a78e0659 100644 --- a/core/src/nodes/handled_node.rs +++ b/core/src/nodes/handled_node.rs @@ -19,8 +19,8 @@ // DEALINGS IN THE SOFTWARE. use crate::muxing::StreamMuxer; -use crate::nodes::node::{NodeEvent, NodeStream, Substream}; -use futures::{prelude::*, stream::Fuse}; +use crate::nodes::node::{NodeEvent, NodeStream, Substream, Close}; +use futures::prelude::*; use std::{error, fmt, io}; mod tests; @@ -53,31 +53,12 @@ pub trait NodeHandler { /// multiple times. fn inject_substream(&mut self, substream: Self::Substream, endpoint: NodeHandlerEndpoint); - /// Indicates to the handler that the inbound part of the muxer has been closed, and that - /// therefore no more inbound substream will be produced. - fn inject_inbound_closed(&mut self); - - /// Indicates to the handler that an outbound substream failed to open because the outbound - /// part of the muxer has been closed. - /// - /// # Panic - /// - /// Implementations are allowed to panic if `user_data` doesn't correspond to what was returned - /// earlier when polling, or is used multiple times. - fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo); - /// Injects an event coming from the outside into the handler. fn inject_event(&mut self, event: Self::InEvent); - /// Indicates to the node that it should shut down. After that, it is expected that `poll()` - /// returns `Ready(NodeHandlerEvent::Shutdown)` as soon as possible. + /// Should behave like `Stream::poll()`. /// - /// This method allows an implementation to perform a graceful shutdown of the substreams, and - /// send back various events. - fn shutdown(&mut self); - - /// Should behave like `Stream::poll()`. Should close if no more event can be produced and the - /// node should be closed. + /// Returning an error will close the connection to the remote. fn poll(&mut self) -> Poll, Self::Error>; } @@ -114,9 +95,6 @@ pub enum NodeHandlerEvent { /// Require a new outbound substream to be opened with the remote. OutboundSubstreamRequest(TOutboundOpenInfo), - /// Gracefully shut down the connection to the node. - Shutdown, - /// Other event. Custom(TCustom), } @@ -132,7 +110,6 @@ impl NodeHandlerEvent { NodeHandlerEvent::OutboundSubstreamRequest(val) => { NodeHandlerEvent::OutboundSubstreamRequest(map(val)) }, - NodeHandlerEvent::Shutdown => NodeHandlerEvent::Shutdown, NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(val), } } @@ -146,7 +123,6 @@ impl NodeHandlerEvent { NodeHandlerEvent::OutboundSubstreamRequest(val) => { NodeHandlerEvent::OutboundSubstreamRequest(val) }, - NodeHandlerEvent::Shutdown => NodeHandlerEvent::Shutdown, NodeHandlerEvent::Custom(val) => NodeHandlerEvent::Custom(map(val)), } } @@ -159,13 +135,9 @@ where THandler: NodeHandler>, { /// Node that handles the muxing. - node: Fuse>, + node: NodeStream, /// Handler that processes substreams. handler: THandler, - /// If true, `handler` has returned `Ready(None)` and therefore shouldn't be polled again. - handler_is_done: bool, - // True, if the node is shutting down. - is_shutting_down: bool } impl fmt::Debug for HandledNode @@ -177,8 +149,6 @@ where f.debug_struct("HandledNode") .field("node", &self.node) .field("handler", &self.handler) - .field("handler_is_done", &self.handler_is_done) - .field("is_shutting_down", &self.is_shutting_down) .finish() } } @@ -192,157 +162,71 @@ where #[inline] pub fn new(muxer: TMuxer, handler: THandler) -> Self { HandledNode { - node: NodeStream::new(muxer).fuse(), + node: NodeStream::new(muxer), handler, - handler_is_done: false, - is_shutting_down: false } } /// Returns a reference to the `NodeHandler` - pub fn handler(&self) -> &THandler{ + pub fn handler(&self) -> &THandler { &self.handler } /// Returns a mutable reference to the `NodeHandler` - pub fn handler_mut(&mut self) -> &mut THandler{ + pub fn handler_mut(&mut self) -> &mut THandler { &mut self.handler } - /// Injects an event to the handler. Has no effect if the handler has already shut down, - /// either by itself or after `shutdown()` has been called. + /// Injects an event to the handler. Has no effect if the handler is closing. #[inline] pub fn inject_event(&mut self, event: THandler::InEvent) { - if !self.handler_is_done { - self.handler.inject_event(event); - } + self.handler.inject_event(event); } /// Returns `true` if the remote has shown any sign of activity after the muxer has been open. /// /// See `StreamMuxer::is_remote_acknowledged`. pub fn is_remote_acknowledged(&self) -> bool { - self.node.get_ref().is_remote_acknowledged() - } - - /// Returns true if the inbound channel of the muxer is open. - /// - /// If `true` is returned, more inbound substream will be received. - #[inline] - pub fn is_inbound_open(&self) -> bool { - self.node.get_ref().is_inbound_open() - } - - /// Returns true if the outbound channel of the muxer is open. - /// - /// If `true` is returned, more outbound substream will be opened. - #[inline] - pub fn is_outbound_open(&self) -> bool { - self.node.get_ref().is_outbound_open() - } - - /// Returns true if the handled node is in the process of shutting down. - #[inline] - pub fn is_shutting_down(&self) -> bool { - self.is_shutting_down + self.node.is_remote_acknowledged() } /// Indicates to the handled node that it should shut down. After calling this method, the /// `Stream` will end in the not-so-distant future. - /// - /// After this method returns, `is_shutting_down()` should return true. - pub fn shutdown(&mut self) { - self.node.get_mut().shutdown_all(); - for user_data in self.node.get_mut().cancel_outgoing() { - self.handler.inject_outbound_closed(user_data); - } - if !self.handler_is_done { - self.handler.shutdown(); - } - self.is_shutting_down = true; + pub fn close(self) -> Close { + self.node.close().0 } -} -impl Stream for HandledNode -where - TMuxer: StreamMuxer, - THandler: NodeHandler>, -{ - type Item = THandler::OutEvent; - type Error = HandledNodeError; - - fn poll(&mut self) -> Poll, Self::Error> { + /// API similar to `Future::poll` that polls the node for events. + pub fn poll(&mut self) -> Poll> { loop { - if self.node.is_done() && self.handler_is_done { - return Ok(Async::Ready(None)); - } - let mut node_not_ready = false; match self.node.poll().map_err(HandledNodeError::Node)? { Async::NotReady => node_not_ready = true, - Async::Ready(Some(NodeEvent::InboundSubstream { substream })) => { - if !self.handler_is_done { - self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener) - } + Async::Ready(NodeEvent::InboundSubstream { substream }) => { + self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener) } - Async::Ready(Some(NodeEvent::OutboundSubstream { user_data, substream })) => { + Async::Ready(NodeEvent::OutboundSubstream { user_data, substream }) => { let endpoint = NodeHandlerEndpoint::Dialer(user_data); - if !self.handler_is_done { - self.handler.inject_substream(substream, endpoint) - } - } - Async::Ready(None) => { - if !self.is_shutting_down { - self.is_shutting_down = true; - if !self.handler_is_done { - self.handler.shutdown() - } - } - } - Async::Ready(Some(NodeEvent::OutboundClosed { user_data })) => { - if !self.handler_is_done { - self.handler.inject_outbound_closed(user_data) - } - } - Async::Ready(Some(NodeEvent::InboundClosed)) => { - if !self.handler_is_done { - self.handler.inject_inbound_closed() - } + self.handler.inject_substream(substream, endpoint) } } - match if self.handler_is_done { Async::Ready(NodeHandlerEvent::Shutdown) } else { self.handler.poll().map_err(HandledNodeError::Handler)? } { + match self.handler.poll().map_err(HandledNodeError::Handler)? { Async::NotReady => { if node_not_ready { break } } Async::Ready(NodeHandlerEvent::OutboundSubstreamRequest(user_data)) => { - if self.node.get_ref().is_outbound_open() { - match self.node.get_mut().open_substream(user_data) { - Ok(()) => (), - Err(user_data) => { - self.handler.inject_outbound_closed(user_data) - }, - } - } else { - self.handler.inject_outbound_closed(user_data); - } + self.node.open_substream(user_data); } Async::Ready(NodeHandlerEvent::Custom(event)) => { - return Ok(Async::Ready(Some(event))); - } - Async::Ready(NodeHandlerEvent::Shutdown) => { - self.handler_is_done = true; - if !self.is_shutting_down { - self.is_shutting_down = true; - self.node.get_mut().cancel_outgoing(); - self.node.get_mut().shutdown_all(); - } + return Ok(Async::Ready(event)); } } } + Ok(Async::NotReady) } } diff --git a/core/src/nodes/handled_node/tests.rs b/core/src/nodes/handled_node/tests.rs index 337b71ff..ee138c2e 100644 --- a/core/src/nodes/handled_node/tests.rs +++ b/core/src/nodes/handled_node/tests.rs @@ -22,10 +22,8 @@ use super::*; use assert_matches::assert_matches; -use tokio::runtime::current_thread; use crate::tests::dummy_muxer::{DummyMuxer, DummyConnectionState}; use crate::tests::dummy_handler::{Handler, HandlerState, InEvent, OutEvent, TestHandledNode}; -use std::{io, marker::PhantomData}; struct TestBuilder { muxer: DummyMuxer, @@ -68,7 +66,7 @@ impl TestBuilder { fn handled_node(&mut self) -> TestHandledNode { let mut h = HandledNode::new(self.muxer.clone(), self.handler.clone()); if self.want_open_substream { - h.node.get_mut().open_substream(self.substream_user_data).expect("open substream should work"); + h.node.open_substream(self.substream_user_data); } h } @@ -79,75 +77,9 @@ fn set_next_handler_outbound_state( handled_node: &mut TestHandledNode, next_sta handled_node.handler.next_outbound_state = Some(next_state); } -#[test] -fn proper_shutdown() { - struct ShutdownHandler { - did_substream_attempt: bool, - inbound_closed: bool, - substream_attempt_cancelled: bool, - shutdown_called: bool, - marker: PhantomData - } - impl NodeHandler for ShutdownHandler { - type InEvent = (); - type OutEvent = (); - type Substream = T; - type Error = io::Error; - type OutboundOpenInfo = (); - fn inject_substream(&mut self, _: Self::Substream, _: NodeHandlerEndpoint) { panic!() } - fn inject_inbound_closed(&mut self) { - assert!(!self.inbound_closed); - self.inbound_closed = true; - } - fn inject_outbound_closed(&mut self, _: ()) { - assert!(!self.substream_attempt_cancelled); - self.substream_attempt_cancelled = true; - } - fn inject_event(&mut self, _: Self::InEvent) { panic!() } - fn shutdown(&mut self) { - assert!(self.inbound_closed); - assert!(self.substream_attempt_cancelled); - self.shutdown_called = true; - } - fn poll(&mut self) -> Poll, io::Error> { - if self.shutdown_called { - Ok(Async::Ready(NodeHandlerEvent::Shutdown)) - } else if !self.did_substream_attempt { - self.did_substream_attempt = true; - Ok(Async::Ready(NodeHandlerEvent::OutboundSubstreamRequest(()))) - } else { - Ok(Async::NotReady) - } - } - } - - impl Drop for ShutdownHandler { - fn drop(&mut self) { - if self.did_substream_attempt { - assert!(self.shutdown_called); - } - } - } - - // Test that `shutdown()` is properly called on the handler once a node stops. - let mut muxer = DummyMuxer::new(); - muxer.set_inbound_connection_state(DummyConnectionState::Closed); - muxer.set_outbound_connection_state(DummyConnectionState::Closed); - let handled = HandledNode::new(muxer, ShutdownHandler { - did_substream_attempt: false, - inbound_closed: false, - substream_attempt_cancelled: false, - shutdown_called: false, - marker: PhantomData, - }); - - current_thread::Runtime::new().unwrap().block_on(handled.for_each(|_| Ok(()))).unwrap(); -} - #[test] fn can_inject_event() { let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Closed) .handled_node(); let event = InEvent::Custom("banana"); @@ -155,97 +87,9 @@ fn can_inject_event() { assert_eq!(handled.handler().events, vec![event]); } -#[test] -fn knows_if_inbound_is_closed() { - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Closed) - .with_handler_state(HandlerState::Ready(None)) // or we get into an infinite loop - .handled_node(); - handled.poll().expect("poll failed"); - assert!(!handled.is_inbound_open()) -} - -#[test] -fn knows_if_outbound_is_closed() { - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Pending) - .with_muxer_outbound_state(DummyConnectionState::Closed) - .with_handler_state(HandlerState::Ready(None)) // or we get into an infinite loop - .with_open_substream(987) // without at least one substream we do not poll_outbound so we never get the event - .handled_node(); - - handled.poll().expect("poll failed"); - assert!(!handled.is_outbound_open()); -} - -#[test] -fn is_shutting_down_is_true_when_called_shutdown_on_the_handled_node() { - let mut handled = TestBuilder::new() - .with_handler_state(HandlerState::Ready(None)) // Stop the loop towards the end of the first run - .handled_node(); - assert!(!handled.is_shutting_down()); - handled.poll().expect("poll should work"); - handled.shutdown(); - assert!(handled.is_shutting_down()); -} - -#[test] -fn is_shutting_down_is_true_when_in_and_outbounds_are_closed() { - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Closed) - .with_muxer_outbound_state(DummyConnectionState::Closed) - .with_open_substream(123) // avoid infinite loop - .handled_node(); - - handled.poll().expect("poll should work"); - - // Shutting down (in- and outbound are closed, and the handler is shutdown) - assert!(handled.is_shutting_down()); -} - -#[test] -fn is_shutting_down_is_true_when_handler_is_gone() { - // when in-/outbound NodeStreams are open or Async::Ready(None) we reach the handlers `poll()` and initiate shutdown. - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Pending) - .with_muxer_outbound_state(DummyConnectionState::Pending) - .with_handler_state(HandlerState::Ready(None)) // avoid infinite loop - .handled_node(); - - handled.poll().expect("poll should work"); - - assert!(handled.is_shutting_down()); -} - -#[test] -fn is_shutting_down_is_true_when_handler_is_gone_even_if_in_and_outbounds_are_open() { - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Opened) - .with_muxer_outbound_state(DummyConnectionState::Opened) - .with_open_substream(123) - .with_handler_state(HandlerState::Ready(None)) - .handled_node(); - - handled.poll().expect("poll should work"); - - assert!(handled.is_shutting_down()); -} - -#[test] -fn poll_with_unready_node_stream_polls_handler() { - let mut handled = TestBuilder::new() - // make NodeStream return NotReady - .with_muxer_inbound_state(DummyConnectionState::Pending) - // make Handler return return Ready(None) so we break the infinite loop - .with_handler_state(HandlerState::Ready(None)) - .handled_node(); - - assert_matches!(handled.poll(), Ok(Async::Ready(None))); -} - #[test] fn poll_with_unready_node_stream_and_handler_emits_custom_event() { - let expected_event = Some(NodeHandlerEvent::Custom(OutEvent::Custom("pineapple"))); + let expected_event = NodeHandlerEvent::Custom(OutEvent::Custom("pineapple")); let mut handled = TestBuilder::new() // make NodeStream return NotReady .with_muxer_inbound_state(DummyConnectionState::Pending) @@ -253,77 +97,36 @@ fn poll_with_unready_node_stream_and_handler_emits_custom_event() { .with_handler_state(HandlerState::Ready(expected_event)) .handled_node(); - assert_matches!(handled.poll(), Ok(Async::Ready(Some(event))) => { + assert_matches!(handled.poll(), Ok(Async::Ready(event)) => { assert_matches!(event, OutEvent::Custom("pineapple")) }); } #[test] fn handler_emits_outbound_closed_when_opening_new_substream_on_closed_node() { - let open_event = Some(NodeHandlerEvent::OutboundSubstreamRequest(456)); + let open_event = NodeHandlerEvent::OutboundSubstreamRequest(456); let mut handled = TestBuilder::new() .with_muxer_inbound_state(DummyConnectionState::Pending) - .with_muxer_outbound_state(DummyConnectionState::Closed) + .with_muxer_outbound_state(DummyConnectionState::Pending) .with_handler_state(HandlerState::Ready(open_event)) .handled_node(); set_next_handler_outbound_state( &mut handled, - HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("pear")))) + HandlerState::Ready(NodeHandlerEvent::Custom(OutEvent::Custom("pear"))) ); handled.poll().expect("poll works"); - assert_eq!(handled.handler().events, vec![InEvent::OutboundClosed]); -} - -#[test] -fn poll_returns_not_ready_when_node_stream_and_handler_is_not_ready() { - let mut handled = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Closed) - .with_muxer_outbound_state(DummyConnectionState::Closed) - .with_open_substream(12) - .with_handler_state(HandlerState::NotReady) - .handled_node(); - - // Under the hood, this is what happens when calling `poll()`: - // - we reach `node.poll_inbound()` and because the connection is - // closed, `inbound_finished` is set to true. - // - an Async::Ready(NodeEvent::InboundClosed) is yielded (also calls - // `inject_inbound_close`, but that's irrelevant here) - // - back in `poll()` we call `handler.poll()` which does nothing because - // `HandlerState` is `NotReady`: loop continues - // - polls the node again which now skips the inbound block because - // `inbound_finished` is true. - // - Now `poll_outbound()` is called which returns `Async::Ready(None)` - // and sets `outbound_finished` to true. …calls destroy_outbound and - // yields Ready(OutboundClosed) …so the HandledNode calls - // `inject_outbound_closed`. - // - Now we have `inbound_finished` and `outbound_finished` set (and no - // more outbound substreams). - // - Next we poll the handler again which again does nothing because - // HandlerState is NotReady (and the node is still there) - // - HandledNode polls the node again: we skip inbound and there are no - // more outbound substreams so we skip that too; the addr is now - // Resolved so that part is skipped too - // - We reach the last section and the NodeStream yields Async::Ready(None) - // - Back in HandledNode the Async::Ready(None) triggers a shutdown - // – …and causes the Handler to yield Async::Ready(None) - // – which in turn makes the HandledNode to yield Async::Ready(None) as well - assert_matches!(handled.poll(), Ok(Async::Ready(None))); - assert_eq!(handled.handler().events, vec![ - InEvent::InboundClosed, InEvent::OutboundClosed - ]); } #[test] fn poll_yields_inbound_closed_event() { let mut h = TestBuilder::new() - .with_muxer_inbound_state(DummyConnectionState::Closed) + .with_muxer_inbound_state(DummyConnectionState::Pending) .with_handler_state(HandlerState::Err) // stop the loop .handled_node(); assert_eq!(h.handler().events, vec![]); let _ = h.poll(); - assert_eq!(h.handler().events, vec![InEvent::InboundClosed]); } #[test] @@ -331,13 +134,12 @@ fn poll_yields_outbound_closed_event() { let mut h = TestBuilder::new() .with_muxer_inbound_state(DummyConnectionState::Pending) .with_open_substream(32) - .with_muxer_outbound_state(DummyConnectionState::Closed) + .with_muxer_outbound_state(DummyConnectionState::Pending) .with_handler_state(HandlerState::Err) // stop the loop .handled_node(); assert_eq!(h.handler().events, vec![]); let _ = h.poll(); - assert_eq!(h.handler().events, vec![InEvent::OutboundClosed]); } #[test] diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs index 76c4780c..367d5255 100644 --- a/core/src/nodes/handled_node_tasks.rs +++ b/core/src/nodes/handled_node_tasks.rs @@ -23,7 +23,7 @@ use crate::{ muxing::StreamMuxer, nodes::{ handled_node::{HandledNode, HandledNodeError, NodeHandler}, - node::Substream + node::{Close, Substream} } }; use fnv::FnvHashMap; @@ -158,7 +158,7 @@ pub enum HandledNodesEvent<'a, TInEvent, TOutEvent, TIntoHandler, TReachErr, THa /// The task that has been closed. task: ClosedTask, /// What happened. - result: Result<(), TaskClosedEvent>, + result: TaskClosedEvent, /// If the task closed before reaching the node, this contains the handler that was passed /// to `add_reach_attempt`. handler: Option, @@ -494,7 +494,7 @@ enum InToExtMessage { /// A connection to a node has succeeded. NodeReached(TPeerId), /// The task closed. - TaskClosed(Result<(), TaskClosedEvent>, Option), + TaskClosed(TaskClosedEvent, Option), /// An event from the node. NodeEvent(TOutEvent), } @@ -540,6 +540,9 @@ where /// Fully functional node. Node(HandledNode), + /// Node closing. + Closing(Close), + /// A panic happened while polling. Poisoned, } @@ -556,7 +559,7 @@ where type Error = (); fn poll(&mut self) -> Poll<(), ()> { - loop { + 'outer_loop: loop { match mem::replace(&mut self.inner, NodeTaskInner::Poisoned) { // First possibility: we are still trying to reach a node. NodeTaskInner::Future { mut future, handler, mut events_buffer } => { @@ -582,9 +585,7 @@ where for event in events_buffer { node.inject_event(event); } - if self.events_tx.unbounded_send((event, self.id)).is_err() { - node.shutdown(); - } + let _ = self.events_tx.unbounded_send((event, self.id)); self.inner = NodeTaskInner::Node(node); } Ok(Async::NotReady) => { @@ -593,7 +594,7 @@ where }, Err(err) => { // End the task - let event = InToExtMessage::TaskClosed(Err(TaskClosedEvent::Reach(err)), Some(handler)); + let event = InToExtMessage::TaskClosed(TaskClosedEvent::Reach(err), Some(handler)); let _ = self.events_tx.unbounded_send((event, self.id)); return Ok(Async::Ready(())); } @@ -614,9 +615,9 @@ where self.taken_over.push(take_over); }, Ok(Async::Ready(None)) => { - // Node closed by the external API; start shutdown process. - node.shutdown(); - break; + // Node closed by the external API; start closing. + self.inner = NodeTaskInner::Closing(node.close()); + continue 'outer_loop; } Err(()) => unreachable!("An unbounded receiver never errors"), } @@ -634,19 +635,12 @@ where self.inner = NodeTaskInner::Node(node); return Ok(Async::NotReady); }, - Ok(Async::Ready(Some(event))) => { + Ok(Async::Ready(event)) => { let event = InToExtMessage::NodeEvent(event); - if self.events_tx.unbounded_send((event, self.id)).is_err() { - node.shutdown(); - } - } - Ok(Async::Ready(None)) => { - let event = InToExtMessage::TaskClosed(Ok(()), None); let _ = self.events_tx.unbounded_send((event, self.id)); - return Ok(Async::Ready(())); // End the task. } Err(err) => { - let event = InToExtMessage::TaskClosed(Err(TaskClosedEvent::Node(err)), None); + let event = InToExtMessage::TaskClosed(TaskClosedEvent::Node(err), None); let _ = self.events_tx.unbounded_send((event, self.id)); return Ok(Async::Ready(())); // End the task. } @@ -654,6 +648,18 @@ where } }, + NodeTaskInner::Closing(mut closing) => { + match closing.poll() { + Ok(Async::Ready(())) | Err(_) => { + return Ok(Async::Ready(())); // End the task. + }, + Ok(Async::NotReady) => { + self.inner = NodeTaskInner::Closing(closing); + return Ok(Async::NotReady); + } + } + }, + // This happens if a previous poll has ended unexpectedly. The API of futures // guarantees that we shouldn't be polled again. NodeTaskInner::Poisoned => panic!("the node task panicked or errored earlier") diff --git a/core/src/nodes/node.rs b/core/src/nodes/node.rs index 30710c74..35929fd3 100644 --- a/core/src/nodes/node.rs +++ b/core/src/nodes/node.rs @@ -37,10 +37,9 @@ use std::sync::Arc; // implements the `AsyncRead` and `AsyncWrite` traits. // // This substream object raises the question of how to keep the `NodeStream` and the various -// substreams in sync without exposing a racy API. The answer is that we don't. The state of the -// node and the state of the substreams are totally detached, and they don't interact with each -// other in any way. Destroying the `NodeStream` doesn't close the substreams, nor is there a -// `close_substreams()` method or a "substream closed" event. +// substreams in sync without exposing a racy API. The answer is that the `NodeStream` holds +// ownership of the connection. Shutting node the `NodeStream` or destroying it will close all the +// existing substreams. The user of the `NodeStream` should be aware of that. /// Implementation of `Stream` that handles a node. /// @@ -55,30 +54,19 @@ where { /// The muxer used to manage substreams. muxer: Arc, - /// Tracks the state of the muxers inbound direction. - inbound_state: StreamState, - /// Tracks the state of the muxers outbound direction. - outbound_state: StreamState, /// List of substreams we are currently opening. outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>, } +/// Future that signals the remote that we have closed the connection. +pub struct Close { + /// Muxer to close. + muxer: Arc, +} + /// A successfully opened substream. pub type Substream = muxing::SubstreamRef>; -// Track state of stream muxer per direction. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -enum StreamState { - // direction is open - Open, - // direction is shutting down - Shutdown, - // direction has shutdown and is flushing - Flush, - // direction is closed - Closed -} - /// Event that can happen on the `NodeStream`. pub enum NodeEvent where @@ -86,7 +74,8 @@ where { /// A new inbound substream arrived. InboundSubstream { - /// The newly-opened substream. + /// The newly-opened substream. Will return EOF of an error if the `NodeStream` is + /// destroyed or `close_graceful` is called. substream: Substream, }, @@ -94,19 +83,10 @@ where OutboundSubstream { /// User data that has been passed to the `open_substream` method. user_data: TUserData, - /// The newly-opened substream. + /// The newly-opened substream. Will return EOF of an error if the `NodeStream` is + /// destroyed or `close_graceful` is called. substream: Substream, }, - - /// An outbound substream couldn't be opened because the muxer is no longer capable of opening - /// more substreams. - OutboundClosed { - /// User data that has been passed to the `open_substream` method. - user_data: TUserData, - }, - - /// The inbound side of the muxer has been closed. No more inbound substreams will be produced. - InboundClosed, } /// Identifier for a substream being opened. @@ -122,45 +102,18 @@ where pub fn new(muxer: TMuxer) -> Self { NodeStream { muxer: Arc::new(muxer), - inbound_state: StreamState::Open, - outbound_state: StreamState::Open, outbound_substreams: SmallVec::new(), } } /// Starts the process of opening a new outbound substream. /// - /// Returns an error if the outbound side of the muxer is closed. - /// /// After calling this method, polling the stream should eventually produce either an /// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has /// been passed to this method. - pub fn open_substream(&mut self, user_data: TUserData) -> Result<(), TUserData> { - if self.outbound_state != StreamState::Open { - return Err(user_data); - } - + pub fn open_substream(&mut self, user_data: TUserData) { let raw = self.muxer.open_outbound(); self.outbound_substreams.push((user_data, raw)); - - Ok(()) - } - - /// Returns true if the inbound channel of the muxer is open. - /// - /// If `true` is returned, more inbound substream will be produced. - #[inline] - pub fn is_inbound_open(&self) -> bool { - self.inbound_state == StreamState::Open - } - - /// Returns true if the outbound channel of the muxer is open. - /// - /// If `true` is returned, more outbound substream can be opened. Otherwise, calling - /// `open_substream` will return an `Err`. - #[inline] - pub fn is_outbound_open(&self) -> bool { - self.outbound_state == StreamState::Open } /// Returns `true` if the remote has shown any sign of activity after the muxer has been open. @@ -170,9 +123,13 @@ where self.muxer.is_remote_acknowledged() } - /// Destroys the node stream and returns all the pending outbound substreams. - pub fn close(mut self) -> Vec { - self.cancel_outgoing() + /// Destroys the node stream and returns all the pending outbound substreams, plus an object + /// that signals the remote that we shut down the connection. + #[must_use] + pub fn close(mut self) -> (Close, Vec) { + let substreams = self.cancel_outgoing(); + let close = Close { muxer: self.muxer.clone() }; + (close, substreams) } /// Destroys all outbound streams and returns the corresponding user data. @@ -185,104 +142,17 @@ where out } - /// Trigger node shutdown. - /// - /// After this, `NodeStream::poll` will eventually produce `None`, when both endpoints are - /// closed. - pub fn shutdown_all(&mut self) { - if self.inbound_state == StreamState::Open { - self.inbound_state = StreamState::Shutdown - } - if self.outbound_state == StreamState::Open { - self.outbound_state = StreamState::Shutdown - } - } - - // If in progress, drive this node's stream muxer shutdown to completion. - fn poll_shutdown(&mut self) -> Poll<(), IoError> { - use self::StreamState::*; - loop { - match (self.inbound_state, self.outbound_state) { - (Open, Open) | (Open, Closed) | (Closed, Open) | (Closed, Closed) => { - return Ok(Async::Ready(())) - } - (Shutdown, Shutdown) => { - if let Async::Ready(()) = self.muxer.shutdown(muxing::Shutdown::All)? { - self.inbound_state = StreamState::Flush; - self.outbound_state = StreamState::Flush; - continue - } - return Ok(Async::NotReady) - } - (Shutdown, _) => { - if let Async::Ready(()) = self.muxer.shutdown(muxing::Shutdown::Inbound)? { - self.inbound_state = StreamState::Flush; - continue - } - return Ok(Async::NotReady) - } - (_, Shutdown) => { - if let Async::Ready(()) = self.muxer.shutdown(muxing::Shutdown::Outbound)? { - self.outbound_state = StreamState::Flush; - continue - } - return Ok(Async::NotReady) - } - (Flush, Open) => { - if let Async::Ready(()) = self.muxer.flush_all()? { - self.inbound_state = StreamState::Closed; - continue - } - return Ok(Async::NotReady) - } - (Open, Flush) => { - if let Async::Ready(()) = self.muxer.flush_all()? { - self.outbound_state = StreamState::Closed; - continue - } - return Ok(Async::NotReady) - } - (Flush, Flush) | (Flush, Closed) | (Closed, Flush) => { - if let Async::Ready(()) = self.muxer.flush_all()? { - self.inbound_state = StreamState::Closed; - self.outbound_state = StreamState::Closed; - continue - } - return Ok(Async::NotReady) - } - } - } - } -} - -impl Stream for NodeStream -where - TMuxer: muxing::StreamMuxer, -{ - type Item = NodeEvent; - type Error = IoError; - - fn poll(&mut self) -> Poll, Self::Error> { - // Drive the shutdown process, if any. - if self.poll_shutdown()?.is_not_ready() { - return Ok(Async::NotReady) - } - + /// Provides an API similar to `Future`. + pub fn poll(&mut self) -> Poll, IoError> { // Polling inbound substream. - if self.inbound_state == StreamState::Open { - match self.muxer.poll_inbound()? { - Async::Ready(Some(substream)) => { - let substream = muxing::substream_from_ref(self.muxer.clone(), substream); - return Ok(Async::Ready(Some(NodeEvent::InboundSubstream { - substream, - }))); - } - Async::Ready(None) => { - self.inbound_state = StreamState::Closed; - return Ok(Async::Ready(Some(NodeEvent::InboundClosed))); - } - Async::NotReady => {} + match self.muxer.poll_inbound()? { + Async::Ready(substream) => { + let substream = muxing::substream_from_ref(self.muxer.clone(), substream); + return Ok(Async::Ready(NodeEvent::InboundSubstream { + substream, + })); } + Async::NotReady => {} } // Polling outbound substreams. @@ -290,18 +160,13 @@ where for n in (0..self.outbound_substreams.len()).rev() { let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n); match self.muxer.poll_outbound(&mut outbound) { - Ok(Async::Ready(Some(substream))) => { + Ok(Async::Ready(substream)) => { let substream = muxing::substream_from_ref(self.muxer.clone(), substream); self.muxer.destroy_outbound(outbound); - return Ok(Async::Ready(Some(NodeEvent::OutboundSubstream { + return Ok(Async::Ready(NodeEvent::OutboundSubstream { user_data, substream, - }))); - } - Ok(Async::Ready(None)) => { - self.outbound_state = StreamState::Closed; - self.muxer.destroy_outbound(outbound); - return Ok(Async::Ready(Some(NodeEvent::OutboundClosed { user_data }))); + })); } Ok(Async::NotReady) => { self.outbound_substreams.push((user_data, outbound)); @@ -312,13 +177,6 @@ where } } } - // Closing the node if there's no way we can do anything more. - if self.inbound_state == StreamState::Closed - && self.outbound_state == StreamState::Closed - && self.outbound_substreams.is_empty() - { - return Ok(Async::Ready(None)) - } // Nothing happened. Register our task to be notified and return. Ok(Async::NotReady) @@ -331,8 +189,6 @@ where { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { f.debug_struct("NodeStream") - .field("inbound_state", &self.inbound_state) - .field("outbound_state", &self.outbound_state) .field("outbound_substreams", &self.outbound_substreams.len()) .finish() } @@ -352,6 +208,28 @@ where } } +impl Future for Close +where + TMuxer: muxing::StreamMuxer, +{ + type Item = (); + type Error = IoError; + + fn poll(&mut self) -> Poll { + self.muxer.close() + } +} + +impl fmt::Debug for Close +where + TMuxer: muxing::StreamMuxer, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + f.debug_struct("Close") + .finish() + } +} + impl fmt::Debug for NodeEvent where TMuxer: muxing::StreamMuxer, @@ -371,15 +249,6 @@ where .field("substream", substream) .finish() }, - NodeEvent::OutboundClosed { user_data } => { - f.debug_struct("NodeEvent::OutboundClosed") - .field("user_data", user_data) - .finish() - }, - NodeEvent::InboundClosed => { - f.debug_struct("NodeEvent::InboundClosed") - .finish() - }, } } } @@ -397,78 +266,14 @@ mod node_stream { NodeStream::<_, Vec>::new(muxer) } - #[test] - fn can_open_outbound_substreams_until_an_outbound_channel_is_closed() { - let mut muxer = DummyMuxer::new(); - muxer.set_outbound_connection_state(DummyConnectionState::Closed); - let mut ns = NodeStream::<_, Vec>::new(muxer); - - // open first substream works - assert!(ns.open_substream(vec![1,2,3]).is_ok()); - - // Given the state we set on the DummyMuxer, once we poll() we'll get an - // `OutboundClosed` which will make subsequent calls to `open_substream` fail - let out = ns.poll(); - assert_matches!(out, Ok(Async::Ready(Some(node_event))) => { - assert_matches!(node_event, NodeEvent::OutboundClosed{user_data} => { - assert_eq!(user_data, vec![1,2,3]) - }) - }); - - // Opening a second substream fails because `outbound_state` is no longer open. - assert_matches!(ns.open_substream(vec![22]), Err(user_data) => { - assert_eq!(user_data, vec![22]); - }); - } - - #[test] - fn query_inbound_outbound_state() { - let ns = build_node_stream(); - assert!(ns.is_inbound_open()); - assert!(ns.is_outbound_open()); - } - - #[test] - fn query_inbound_state() { - let mut muxer = DummyMuxer::new(); - muxer.set_inbound_connection_state(DummyConnectionState::Closed); - let mut ns = NodeStream::<_, Vec>::new(muxer); - - assert_matches!(ns.poll(), Ok(Async::Ready(Some(node_event))) => { - assert_matches!(node_event, NodeEvent::InboundClosed) - }); - - assert!(!ns.is_inbound_open()); - } - - #[test] - fn query_outbound_state() { - let mut muxer = DummyMuxer::new(); - muxer.set_outbound_connection_state(DummyConnectionState::Closed); - let mut ns = NodeStream::<_, Vec>::new(muxer); - - assert!(ns.is_outbound_open()); - - ns.open_substream(vec![1]).unwrap(); - let poll_result = ns.poll(); - - assert_matches!(poll_result, Ok(Async::Ready(Some(node_event))) => { - assert_matches!(node_event, NodeEvent::OutboundClosed{user_data} => { - assert_eq!(user_data, vec![1]) - }) - }); - - assert!(!ns.is_outbound_open(), "outbound connection should be closed after polling"); - } - #[test] fn closing_a_node_stream_destroys_substreams_and_returns_submitted_user_data() { let mut ns = build_node_stream(); - ns.open_substream(vec![2]).unwrap(); - ns.open_substream(vec![3]).unwrap(); - ns.open_substream(vec![5]).unwrap(); + ns.open_substream(vec![2]); + ns.open_substream(vec![3]); + ns.open_substream(vec![5]); let user_data_submitted = ns.close(); - assert_eq!(user_data_submitted, vec![ + assert_eq!(user_data_submitted.1, vec![ vec![2], vec![3], vec![5] ]); } @@ -489,70 +294,27 @@ mod node_stream { }); } - #[test] - fn poll_closes_the_node_stream_when_no_more_work_can_be_done() { - let mut muxer = DummyMuxer::new(); - // ensure muxer.poll_inbound() returns Async::Ready(None) - muxer.set_inbound_connection_state(DummyConnectionState::Closed); - // ensure muxer.poll_outbound() returns Async::Ready(None) - muxer.set_outbound_connection_state(DummyConnectionState::Closed); - let mut ns = NodeStream::<_, Vec>::new(muxer); - ns.open_substream(vec![]).unwrap(); - ns.poll().unwrap(); // poll_inbound() - ns.poll().unwrap(); // poll_outbound() - ns.poll().unwrap(); // resolve the address - // Nothing more to do, the NodeStream should be closed - assert_matches!(ns.poll(), Ok(Async::Ready(None))); - } - - #[test] - fn poll_sets_up_substreams_yielding_them_in_reverse_order() { - let mut muxer = DummyMuxer::new(); - // ensure muxer.poll_inbound() returns Async::Ready(None) - muxer.set_inbound_connection_state(DummyConnectionState::Closed); - // ensure muxer.poll_outbound() returns Async::Ready(Some(substream)) - muxer.set_outbound_connection_state(DummyConnectionState::Opened); - let mut ns = NodeStream::<_, Vec>::new(muxer); - ns.open_substream(vec![1]).unwrap(); - ns.open_substream(vec![2]).unwrap(); - ns.poll().unwrap(); // poll_inbound() - - // poll() sets up second outbound substream - assert_matches!(ns.poll(), Ok(Async::Ready(Some(node_event))) => { - assert_matches!(node_event, NodeEvent::OutboundSubstream{ user_data, substream:_ } => { - assert_eq!(user_data, vec![2]); - }) - }); - // Next poll() sets up first outbound substream - assert_matches!(ns.poll(), Ok(Async::Ready(Some(node_event))) => { - assert_matches!(node_event, NodeEvent::OutboundSubstream{ user_data, substream: _ } => { - assert_eq!(user_data, vec![1]); - }) - }); - } - #[test] fn poll_keeps_outbound_substreams_when_the_outgoing_connection_is_not_ready() { let mut muxer = DummyMuxer::new(); - // ensure muxer.poll_inbound() returns Async::Ready(None) - muxer.set_inbound_connection_state(DummyConnectionState::Closed); + // ensure muxer.poll_inbound() returns Async::NotReady + muxer.set_inbound_connection_state(DummyConnectionState::Pending); // ensure muxer.poll_outbound() returns Async::NotReady muxer.set_outbound_connection_state(DummyConnectionState::Pending); let mut ns = NodeStream::<_, Vec>::new(muxer); - ns.open_substream(vec![1]).unwrap(); + ns.open_substream(vec![1]); ns.poll().unwrap(); // poll past inbound ns.poll().unwrap(); // poll outbound - assert!(ns.is_outbound_open()); assert!(format!("{:?}", ns).contains("outbound_substreams: 1")); } #[test] fn poll_returns_incoming_substream() { let mut muxer = DummyMuxer::new(); - // ensure muxer.poll_inbound() returns Async::Ready(Some(subs)) + // ensure muxer.poll_inbound() returns Async::Ready(subs) muxer.set_inbound_connection_state(DummyConnectionState::Opened); let mut ns = NodeStream::<_, Vec>::new(muxer); - assert_matches!(ns.poll(), Ok(Async::Ready(Some(node_event))) => { + assert_matches!(ns.poll(), Ok(Async::Ready(node_event)) => { assert_matches!(node_event, NodeEvent::InboundSubstream{ substream: _ }); }); } diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 5461857f..5045242d 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -180,19 +180,8 @@ where endpoint: ConnectedPoint, }, - /// A connection to a node has been closed. - /// - /// This happens once both the inbound and outbound channels are closed, and no more outbound - /// substream attempt is pending. - NodeClosed { - /// Identifier of the node. - peer_id: TPeerId, - /// Endpoint we were connected to. - endpoint: ConnectedPoint, - }, - /// The handler of a node has produced an error. - NodeError { + NodeClosed { /// Identifier of the node. peer_id: TPeerId, /// Endpoint we were connected to. @@ -280,14 +269,8 @@ where .field("endpoint", endpoint) .finish() } - RawSwarmEvent::NodeClosed { ref peer_id, ref endpoint } => { + RawSwarmEvent::NodeClosed { ref peer_id, ref endpoint, ref error } => { f.debug_struct("NodeClosed") - .field("peer_id", peer_id) - .field("endpoint", endpoint) - .finish() - } - RawSwarmEvent::NodeError { ref peer_id, ref endpoint, ref error } => { - f.debug_struct("NodeError") .field("peer_id", peer_id) .field("endpoint", endpoint) .field("error", error) @@ -995,7 +978,7 @@ where action = a; out_event = e; } - Async::Ready(CollectionEvent::NodeError { + Async::Ready(CollectionEvent::NodeClosed { peer_id, error, .. @@ -1007,22 +990,12 @@ where closed message after it has been opened, and no two closed \ messages; QED"); action = Default::default(); - out_event = RawSwarmEvent::NodeError { + out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint, error, }; } - Async::Ready(CollectionEvent::NodeClosed { peer_id, .. }) => { - let endpoint = self.reach_attempts.connected_points.remove(&peer_id) - .expect("We insert into connected_points whenever a connection is \ - opened and remove only when a connection is closed; the \ - underlying API is guaranteed to always deliver a connection \ - closed message after it has been opened, and no two closed \ - messages; QED"); - action = Default::default(); - out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint }; - } Async::Ready(CollectionEvent::NodeEvent { peer, event }) => { action = Default::default(); out_event = RawSwarmEvent::NodeEvent { peer_id: peer.id().clone(), event }; diff --git a/core/src/nodes/raw_swarm/tests.rs b/core/src/nodes/raw_swarm/tests.rs index e0139c30..d62415c6 100644 --- a/core/src/nodes/raw_swarm/tests.rs +++ b/core/src/nodes/raw_swarm/tests.rs @@ -158,7 +158,7 @@ fn broadcasted_events_reach_active_nodes() { muxer.set_outbound_connection_state(DummyConnectionState::Opened); let addr = "/ip4/127.0.0.1/tcp/1234".parse::().expect("bad multiaddr"); let mut handler = Handler::default(); - handler.next_states = vec![HandlerState::Ready(Some(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") ))),]; + handler.next_states = vec![HandlerState::Ready(NodeHandlerEvent::Custom(OutEvent::Custom("from handler 1") )),]; let dial_result = swarm.dial(addr, handler); assert!(dial_result.is_ok()); @@ -379,60 +379,6 @@ fn yields_node_error_when_there_is_an_error_after_successful_connect() { })).expect("tokio works"); } - // Poll again. It is going to be a NodeError because of how the - // handler's next state was set up. - let swarm_fut = swarm.clone(); - let expected_peer_id = peer_id.clone(); - rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut swarm = swarm_fut.lock(); - assert_matches!(swarm.poll(), Async::Ready(RawSwarmEvent::NodeError { peer_id, .. }) => { - assert_eq!(peer_id, expected_peer_id); - }); - Ok(Async::Ready(())) - })).expect("tokio works"); -} - -#[test] -fn yields_node_closed_when_the_node_closes_after_successful_connect() { - let mut transport = DummyTransport::new(); - let peer_id = PeerId::random(); - transport.set_next_peer_id(&peer_id); - let swarm = Arc::new(Mutex::new(RawSwarm::<_, _, _, Handler, _>::new(transport, PeerId::random()))); - - { - // Set up an outgoing connection with a PeerId we know - let swarm1 = swarm.clone(); - let mut swarm1 = swarm1.lock(); - let peer = swarm1.peer(peer_id.clone()); - let addr = "/unix/reachable".parse().expect("bad multiaddr"); - let mut handler = Handler::default(); - // Force handler to close - handler.next_states = vec![ HandlerState::Ready(None) ]; - peer.into_not_connected().unwrap().connect(addr, handler); - } - - // Ensure we run on a single thread - let mut rt = Builder::new().core_threads(1).build().unwrap(); - - // Drive it forward until we connect to the node. - let mut keep_polling = true; - while keep_polling { - let swarm_fut = swarm.clone(); - keep_polling = rt.block_on(future::poll_fn(move || -> Poll<_, ()> { - let mut swarm = swarm_fut.lock(); - // Push the Handler into the closed state on the next poll - swarm.broadcast_event(&InEvent::NextState); - match swarm.poll() { - Async::NotReady => Ok(Async::Ready(true)), - Async::Ready(event) => { - assert_matches!(event, RawSwarmEvent::Connected { .. }); - // We're connected, we can move on - Ok(Async::Ready(false)) - }, - } - })).expect("tokio works"); - } - // Poll again. It is going to be a NodeClosed because of how the // handler's next state was set up. let swarm_fut = swarm.clone(); diff --git a/core/src/protocols_handler/dummy.rs b/core/src/protocols_handler/dummy.rs index d543b6d4..5b354b59 100644 --- a/core/src/protocols_handler/dummy.rs +++ b/core/src/protocols_handler/dummy.rs @@ -33,7 +33,6 @@ use void::Void; /// Implementation of `ProtocolsHandler` that doesn't handle anything. pub struct DummyProtocolsHandler { - shutting_down: bool, marker: PhantomData, } @@ -41,7 +40,6 @@ impl Default for DummyProtocolsHandler { #[inline] fn default() -> Self { DummyProtocolsHandler { - shutting_down: false, marker: PhantomData, } } @@ -85,17 +83,9 @@ where #[inline] fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} - #[inline] - fn inject_inbound_closed(&mut self) {} - #[inline] fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Now } - #[inline] - fn shutdown(&mut self) { - self.shutting_down = true; - } - #[inline] fn poll( &mut self, @@ -103,10 +93,6 @@ where ProtocolsHandlerEvent, Void, > { - if self.shutting_down { - Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) - } else { - Ok(Async::NotReady) - } + Ok(Async::NotReady) } } diff --git a/core/src/protocols_handler/fuse.rs b/core/src/protocols_handler/fuse.rs deleted file mode 100644 index 54cbee62..00000000 --- a/core/src/protocols_handler/fuse.rs +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2019 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use crate::{ - either::EitherOutput, - protocols_handler::{KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr}, - upgrade::{ - DeniedUpgrade, - EitherUpgrade, - InboundUpgrade, - OutboundUpgrade, - } -}; -use futures::prelude::*; -use std::mem; - -/// Wrapper around a protocol handler and ignores all further method calls once it has shut down. -#[derive(Debug, Copy, Clone)] -pub struct Fuse { - inner: State, -} - -#[derive(Debug, Copy, Clone)] -enum State { - Normal(TProtoHandler), - ShuttingDown(TProtoHandler), - Shutdown, -} - -impl State { - fn as_ref(&self) -> Option<&TProtoHandler> { - match self { - State::Normal(h) => Some(h), - State::ShuttingDown(h) => Some(h), - State::Shutdown => None, - } - } - - fn as_mut(&mut self) -> Option<&mut TProtoHandler> { - match self { - State::Normal(h) => Some(h), - State::ShuttingDown(h) => Some(h), - State::Shutdown => None, - } - } -} - -impl Fuse { - /// Creates a `Fuse`. - #[inline] - pub(crate) fn new(inner: TProtoHandler) -> Self { - Fuse { - inner: State::Normal(inner), - } - } - - /// Returns true if `shutdown()` has been called in the past, or if polling has returned - /// `Shutdown` in the past. - pub fn is_shutting_down_or_shutdown(&self) -> bool { - match self.inner { - State::Normal(_) => false, - State::ShuttingDown(_) => true, - State::Shutdown => true, - } - } - - /// Returns true if polling has returned `Shutdown` in the past. - #[inline] - pub fn is_shutdown(&self) -> bool { - if let State::Shutdown = self.inner { - true - } else { - false - } - } -} - -impl ProtocolsHandler for Fuse -where - TProtoHandler: ProtocolsHandler, -{ - type InEvent = TProtoHandler::InEvent; - type OutEvent = TProtoHandler::OutEvent; - type Error = TProtoHandler::Error; - type Substream = TProtoHandler::Substream; - type InboundProtocol = EitherUpgrade; - type OutboundProtocol = TProtoHandler::OutboundProtocol; - type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo; - - #[inline] - fn listen_protocol(&self) -> Self::InboundProtocol { - if let Some(inner) = self.inner.as_ref() { - EitherUpgrade::A(inner.listen_protocol()) - } else { - EitherUpgrade::B(DeniedUpgrade) - } - } - - #[inline] - fn inject_fully_negotiated_inbound( - &mut self, - protocol: >::Output - ) { - match (protocol, self.inner.as_mut()) { - (EitherOutput::First(proto), Some(inner)) => { - inner.inject_fully_negotiated_inbound(proto) - }, - (EitherOutput::Second(_), None) => {} - (EitherOutput::First(_), None) => {} // Can happen if we shut down during an upgrade. - (EitherOutput::Second(_), Some(_)) => { - panic!("Wrong API usage; an upgrade was passed to a different object that the \ - one that asked for the upgrade") - }, - } - } - - #[inline] - fn inject_fully_negotiated_outbound( - &mut self, - protocol: >::Output, - info: Self::OutboundOpenInfo - ) { - if let Some(inner) = self.inner.as_mut() { - inner.inject_fully_negotiated_outbound(protocol, info) - } - } - - #[inline] - fn inject_event(&mut self, event: Self::InEvent) { - if let Some(inner) = self.inner.as_mut() { - inner.inject_event(event) - } - } - - #[inline] - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { - if let Some(inner) = self.inner.as_mut() { - inner.inject_dial_upgrade_error(info, error) - } - } - - #[inline] - fn inject_inbound_closed(&mut self) { - if let Some(inner) = self.inner.as_mut() { - inner.inject_inbound_closed() - } - } - - #[inline] - fn connection_keep_alive(&self) -> KeepAlive { - if let Some(inner) = self.inner.as_ref() { - inner.connection_keep_alive() - } else { - KeepAlive::Now - } - } - - #[inline] - fn shutdown(&mut self) { - self.inner = match mem::replace(&mut self.inner, State::Shutdown) { - State::Normal(mut inner) => { - inner.shutdown(); - State::ShuttingDown(inner) - }, - s @ State::ShuttingDown(_) => s, - s @ State::Shutdown => s, - }; - } - - #[inline] - fn poll( - &mut self, - ) -> Poll< - ProtocolsHandlerEvent, - Self::Error, - > { - let poll = match self.inner.as_mut() { - Some(i) => i.poll(), - None => return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)), - }; - - if let Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) = poll { - self.inner = State::Shutdown; - } - - poll - } -} diff --git a/core/src/protocols_handler/map_in.rs b/core/src/protocols_handler/map_in.rs index 42e8ca1f..d8510a8d 100644 --- a/core/src/protocols_handler/map_in.rs +++ b/core/src/protocols_handler/map_in.rs @@ -94,21 +94,11 @@ where self.inner.inject_dial_upgrade_error(info, error) } - #[inline] - fn inject_inbound_closed(&mut self) { - self.inner.inject_inbound_closed() - } - #[inline] fn connection_keep_alive(&self) -> KeepAlive { self.inner.connection_keep_alive() } - #[inline] - fn shutdown(&mut self) { - self.inner.shutdown() - } - #[inline] fn poll( &mut self, diff --git a/core/src/protocols_handler/map_out.rs b/core/src/protocols_handler/map_out.rs index f8f122ca..89f1b07b 100644 --- a/core/src/protocols_handler/map_out.rs +++ b/core/src/protocols_handler/map_out.rs @@ -89,21 +89,11 @@ where self.inner.inject_dial_upgrade_error(info, error) } - #[inline] - fn inject_inbound_closed(&mut self) { - self.inner.inject_inbound_closed() - } - #[inline] fn connection_keep_alive(&self) -> KeepAlive { self.inner.connection_keep_alive() } - #[inline] - fn shutdown(&mut self) { - self.inner.shutdown() - } - #[inline] fn poll( &mut self, @@ -114,7 +104,6 @@ where Ok(self.inner.poll()?.map(|ev| { match ev { ProtocolsHandlerEvent::Custom(ev) => ProtocolsHandlerEvent::Custom((self.map)(ev)), - ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown, ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } } diff --git a/core/src/protocols_handler/mod.rs b/core/src/protocols_handler/mod.rs index ff4e922e..14de1eeb 100644 --- a/core/src/protocols_handler/mod.rs +++ b/core/src/protocols_handler/mod.rs @@ -44,15 +44,13 @@ use std::{cmp::Ordering, error, fmt, time::Duration, time::Instant}; use tokio_io::{AsyncRead, AsyncWrite}; pub use self::dummy::DummyProtocolsHandler; -pub use self::fuse::Fuse; pub use self::map_in::MapInEvent; pub use self::map_out::MapOutEvent; -pub use self::node_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder}; +pub use self::node_handler::{NodeHandlerWrapper, NodeHandlerWrapperBuilder, NodeHandlerWrapperError}; pub use self::one_shot::OneShotHandler; pub use self::select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect}; mod dummy; -mod fuse; mod map_in; mod map_out; mod node_handler; @@ -138,10 +136,6 @@ pub trait ProtocolsHandler { /// Indicates to the handler that upgrading a substream to the given protocol has failed. fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>); - /// Indicates to the handler that the inbound part of the muxer has been closed, and that - /// therefore no more inbound substreams will be produced. - fn inject_inbound_closed(&mut self); - /// Returns until when the connection should be kept alive. /// /// If returns `Until`, that indicates that this connection may invoke `shutdown()` after the @@ -160,20 +154,9 @@ pub trait ProtocolsHandler { /// After `shutdown()` is called, the result of this method doesn't matter anymore. fn connection_keep_alive(&self) -> KeepAlive; - /// Indicates to the node that it should shut down. After that, it is expected that `poll()` - /// returns `Ready(ProtocolsHandlerEvent::Shutdown)` as soon as possible. + /// Should behave like `Stream::poll()`. /// - /// This method allows an implementation to perform a graceful shutdown of the substreams, and - /// send back various events. - fn shutdown(&mut self); - - /// Should behave like `Stream::poll()`. Should close if no more event can be produced and the - /// node should be closed. - /// - /// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns - /// > `Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown))`, all the other handlers - /// > will receive a call to `shutdown()` and will eventually be closed and - /// > destroyed. + /// Returning an error will close the connection to the remote. fn poll(&mut self) -> Poll, Self::Error>; /// Adds a closure that turns the input event into something else. @@ -196,16 +179,6 @@ pub trait ProtocolsHandler { MapOutEvent::new(self, map) } - /// Wraps around `self`. When `poll()` returns `Shutdown`, any further call to any method will - /// be ignored. - #[inline] - fn fuse(self) -> Fuse - where - Self: Sized, - { - Fuse::new(self) - } - /// Builds an implementation of `ProtocolsHandler` that handles both this protocol and the /// other one together. #[inline] @@ -251,11 +224,6 @@ pub enum ProtocolsHandlerEvent { info: TOutboundOpenInfo, }, - /// Perform a graceful shutdown of the connection to the remote. - /// - /// Should be returned after `shutdown()` has been called. - Shutdown, - /// Other event. Custom(TCustom), } @@ -280,7 +248,6 @@ impl info: map(info), } } - ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown, ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val), } } @@ -301,7 +268,6 @@ impl info, } } - ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown, ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(val), } } @@ -319,7 +285,6 @@ impl ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } => { ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info } } - ProtocolsHandlerEvent::Shutdown => ProtocolsHandlerEvent::Shutdown, ProtocolsHandlerEvent::Custom(val) => ProtocolsHandlerEvent::Custom(map(val)), } } diff --git a/core/src/protocols_handler/node_handler.rs b/core/src/protocols_handler/node_handler.rs index 9d1c0d26..9519c792 100644 --- a/core/src/protocols_handler/node_handler.rs +++ b/core/src/protocols_handler/node_handler.rs @@ -31,7 +31,7 @@ use crate::{ } }; use futures::prelude::*; -use std::time::{Duration, Instant}; +use std::{error, fmt, time::Duration, time::Instant}; use tokio_timer::{Delay, Timeout}; /// Prototype for a `NodeHandlerWrapper`. @@ -145,6 +145,46 @@ where connection_shutdown: Option, } +/// Error generated by the `NodeHandlerWrapper`. +#[derive(Debug)] +pub enum NodeHandlerWrapperError { + /// Error generated by the handler. + Handler(TErr), + /// The connection has been deemed useless and has been closed. + UselessTimeout, +} + +impl From for NodeHandlerWrapperError { + fn from(err: TErr) -> NodeHandlerWrapperError { + NodeHandlerWrapperError::Handler(err) + } +} + +impl fmt::Display for NodeHandlerWrapperError +where + TErr: fmt::Display +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + NodeHandlerWrapperError::Handler(err) => write!(f, "{}", err), + NodeHandlerWrapperError::UselessTimeout => + write!(f, "Node has been closed due to inactivity"), + } + } +} + +impl error::Error for NodeHandlerWrapperError +where + TErr: error::Error + 'static +{ + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match self { + NodeHandlerWrapperError::Handler(err) => Some(err), + NodeHandlerWrapperError::UselessTimeout => None, + } + } +} + impl NodeHandler for NodeHandlerWrapper where TProtoHandler: ProtocolsHandler, @@ -153,7 +193,7 @@ where { type InEvent = TProtoHandler::InEvent; type OutEvent = TProtoHandler::OutEvent; - type Error = TProtoHandler::Error; + type Error = NodeHandlerWrapperError; type Substream = TProtoHandler::Substream; // The first element of the tuple is the unique upgrade identifier // (see `unique_dial_upgrade_id`). @@ -192,42 +232,11 @@ where } } - #[inline] - fn inject_inbound_closed(&mut self) { - self.handler.inject_inbound_closed(); - } - - fn inject_outbound_closed(&mut self, user_data: Self::OutboundOpenInfo) { - let pos = match self - .queued_dial_upgrades - .iter() - .position(|(id, _)| id == &user_data.0) - { - Some(p) => p, - None => { - debug_assert!( - false, - "Received an outbound closed error with an invalid upgrade ID" - ); - return; - } - }; - - self.queued_dial_upgrades.remove(pos); - self.handler - .inject_dial_upgrade_error(user_data.1, ProtocolsHandlerUpgrErr::MuxerDeniedSubstream); - } - #[inline] fn inject_event(&mut self, event: Self::InEvent) { self.handler.inject_event(event); } - #[inline] - fn shutdown(&mut self) { - self.handler.shutdown(); - } - fn poll(&mut self) -> Poll, Self::Error> { // Continue negotiation of newly-opened substreams on the listening side. // We remove each element from `negotiating_in` one by one and add them back if not ready. @@ -273,55 +282,47 @@ where // Poll the handler at the end so that we see the consequences of the method calls on // `self.handler`. - loop { - let poll_result = self.handler.poll()?; + let poll_result = self.handler.poll()?; - self.connection_shutdown = match self.handler.connection_keep_alive() { - KeepAlive::Until(expiration) => Some(Delay::new(expiration)), - KeepAlive::Now => Some(Delay::new(Instant::now())), - KeepAlive::Forever => None, - }; + self.connection_shutdown = match self.handler.connection_keep_alive() { + KeepAlive::Until(expiration) => Some(Delay::new(expiration)), + KeepAlive::Now => Some(Delay::new(Instant::now())), + KeepAlive::Forever => None, + }; - match poll_result { - Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { - return Ok(Async::Ready(NodeHandlerEvent::Custom(event))); - } - Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - upgrade, - info, - }) => { - let id = self.unique_dial_upgrade_id; - self.unique_dial_upgrade_id += 1; - self.queued_dial_upgrades.push((id, upgrade)); - return Ok(Async::Ready( - NodeHandlerEvent::OutboundSubstreamRequest((id, info)), - )); - } - Async::Ready(ProtocolsHandlerEvent::Shutdown) => { - return Ok(Async::Ready(NodeHandlerEvent::Shutdown)) - }, - Async::NotReady => (), - }; - - // Check the `connection_shutdown`. - if let Some(mut connection_shutdown) = self.connection_shutdown.take() { - // If we're negotiating substreams, let's delay the closing. - if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { - match connection_shutdown.poll() { - Ok(Async::Ready(_)) | Err(_) => { - self.shutdown(); - continue; // We need to poll the handler again. - }, - Ok(Async::NotReady) => { - self.connection_shutdown = Some(connection_shutdown); - } - } - } else { - self.connection_shutdown = Some(connection_shutdown); - } + match poll_result { + Async::Ready(ProtocolsHandlerEvent::Custom(event)) => { + return Ok(Async::Ready(NodeHandlerEvent::Custom(event))); } + Async::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + upgrade, + info, + }) => { + let id = self.unique_dial_upgrade_id; + self.unique_dial_upgrade_id += 1; + self.queued_dial_upgrades.push((id, upgrade)); + return Ok(Async::Ready( + NodeHandlerEvent::OutboundSubstreamRequest((id, info)), + )); + } + Async::NotReady => (), + }; - break; + // Check the `connection_shutdown`. + if let Some(mut connection_shutdown) = self.connection_shutdown.take() { + // If we're negotiating substreams, let's delay the closing. + if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { + match connection_shutdown.poll() { + Ok(Async::Ready(_)) | Err(_) => { + return Err(NodeHandlerWrapperError::UselessTimeout); + }, + Ok(Async::NotReady) => { + self.connection_shutdown = Some(connection_shutdown); + } + } + } else { + self.connection_shutdown = Some(connection_shutdown); + } } Ok(Async::NotReady) diff --git a/core/src/protocols_handler/one_shot.rs b/core/src/protocols_handler/one_shot.rs index 9d2a6251..c0a50456 100644 --- a/core/src/protocols_handler/one_shot.rs +++ b/core/src/protocols_handler/one_shot.rs @@ -188,20 +188,12 @@ where self.pending_error = Some(error); } } - - #[inline] - fn inject_inbound_closed(&mut self) {} - + #[inline] fn connection_keep_alive(&self) -> KeepAlive { self.keep_alive } - #[inline] - fn shutdown(&mut self) { - self.shutting_down = true; - } - fn poll(&mut self) -> Poll, Self::Error> { if let Some(err) = self.pending_error.take() { return Err(err); @@ -213,10 +205,6 @@ where self.events_out.shrink_to_fit(); } - if self.shutting_down && self.dial_negotiated == 0 { - return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)); - } - if !self.dial_queue.is_empty() { if !self.shutting_down && self.dial_negotiated < self.max_dial_negotiated { self.dial_negotiated += 1; diff --git a/core/src/protocols_handler/select.rs b/core/src/protocols_handler/select.rs index a34c00cb..4e62afa4 100644 --- a/core/src/protocols_handler/select.rs +++ b/core/src/protocols_handler/select.rs @@ -24,7 +24,6 @@ use crate::{ either::EitherOutput, protocols_handler::{ KeepAlive, - Fuse, IntoProtocolsHandler, ProtocolsHandler, ProtocolsHandlerEvent, @@ -78,8 +77,8 @@ where fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler { ProtocolsHandlerSelect { - proto1: self.proto1.into_handler(remote_peer_id).fuse(), - proto2: self.proto2.into_handler(remote_peer_id).fuse(), + proto1: self.proto1.into_handler(remote_peer_id), + proto2: self.proto2.into_handler(remote_peer_id), } } } @@ -88,9 +87,9 @@ where #[derive(Debug, Clone)] pub struct ProtocolsHandlerSelect { /// The first protocol. - proto1: Fuse, + proto1: TProto1, /// The second protocol. - proto2: Fuse, + proto2: TProto2, } impl ProtocolsHandlerSelect { @@ -98,8 +97,8 @@ impl ProtocolsHandlerSelect { #[inline] pub(crate) fn new(proto1: TProto1, proto2: TProto2) -> Self { ProtocolsHandlerSelect { - proto1: Fuse::new(proto1), - proto2: Fuse::new(proto2), + proto1, + proto2, } } } @@ -119,7 +118,7 @@ where type OutEvent = EitherOutput; type Error = EitherError; type Substream = TSubstream; - type InboundProtocol = SelectUpgrade< as ProtocolsHandler>::InboundProtocol, as ProtocolsHandler>::InboundProtocol>; + type InboundProtocol = SelectUpgrade<::InboundProtocol, ::InboundProtocol>; type OutboundProtocol = EitherUpgrade; type OutboundOpenInfo = EitherOutput; @@ -160,12 +159,6 @@ where } } - #[inline] - fn inject_inbound_closed(&mut self) { - self.proto1.inject_inbound_closed(); - self.proto2.inject_inbound_closed(); - } - #[inline] fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<>::Error>) { match (info, error) { @@ -213,12 +206,6 @@ where cmp::max(self.proto1.connection_keep_alive(), self.proto2.connection_keep_alive()) } - #[inline] - fn shutdown(&mut self) { - self.proto1.shutdown(); - self.proto2.shutdown(); - } - fn poll(&mut self) -> Poll, Self::Error> { loop { match self.proto1.poll().map_err(EitherError::A)? { @@ -231,9 +218,6 @@ where info: EitherOutput::First(info), })); }, - Async::Ready(ProtocolsHandlerEvent::Shutdown) => { - self.proto2.shutdown(); - }, Async::NotReady => () }; @@ -247,22 +231,12 @@ where info: EitherOutput::Second(info), })); }, - Async::Ready(ProtocolsHandlerEvent::Shutdown) => { - if !self.proto1.is_shutting_down_or_shutdown() { - self.proto1.shutdown(); - continue; - } - }, Async::NotReady => () }; break; } - if self.proto1.is_shutdown() && self.proto2.is_shutdown() { - Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) - } else { - Ok(Async::NotReady) - } + Ok(Async::NotReady) } } diff --git a/core/src/swarm.rs b/core/src/swarm.rs index baa0d49a..dfc27ff9 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -49,7 +49,7 @@ use crate::{ node::Substream, raw_swarm::{self, RawSwarm, RawSwarmEvent} }, - protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapper, IntoProtocolsHandler, ProtocolsHandler}, + protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapper, NodeHandlerWrapperError, IntoProtocolsHandler, ProtocolsHandler}, transport::TransportError, }; use futures::prelude::*; @@ -68,7 +68,7 @@ where TTransport: Transport, <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent, NodeHandlerWrapperBuilder, - <<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error, + NodeHandlerWrapperError<<<::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error>, >, /// Handles which nodes to connect to and how to handle the events sent back by the protocol @@ -261,10 +261,7 @@ where TBehaviour: NetworkBehaviour, Async::Ready(RawSwarmEvent::Connected { peer_id, endpoint }) => { self.behaviour.inject_connected(peer_id, endpoint); }, - Async::Ready(RawSwarmEvent::NodeClosed { peer_id, endpoint }) => { - self.behaviour.inject_disconnected(&peer_id, endpoint); - }, - Async::Ready(RawSwarmEvent::NodeError { peer_id, endpoint, .. }) => { + Async::Ready(RawSwarmEvent::NodeClosed { peer_id, endpoint, .. }) => { self.behaviour.inject_disconnected(&peer_id, endpoint); }, Async::Ready(RawSwarmEvent::Replaced { peer_id, closed_endpoint, endpoint }) => { diff --git a/core/src/tests/dummy_handler.rs b/core/src/tests/dummy_handler.rs index 6c26d65a..8f666f17 100644 --- a/core/src/tests/dummy_handler.rs +++ b/core/src/tests/dummy_handler.rs @@ -54,7 +54,7 @@ impl Default for Handler { #[derive(Debug, PartialEq, Clone)] pub(crate) enum HandlerState { NotReady, - Ready(Option>), + Ready(NodeHandlerEvent), Err, } @@ -64,10 +64,6 @@ pub(crate) enum InEvent { Custom(&'static str), /// A substream request with a dummy payload Substream(Option), - /// Request closing of the outbound substream - OutboundClosed, - /// Request closing of the inbound substreams - InboundClosed, /// Request the handler to move to the next state NextState, } @@ -98,27 +94,18 @@ impl NodeHandler for Handler { }; self.events.push(InEvent::Substream(user_data)); } - fn inject_inbound_closed(&mut self) { - self.events.push(InEvent::InboundClosed); - } - fn inject_outbound_closed(&mut self, _: usize) { - self.events.push(InEvent::OutboundClosed); - if let Some(ref state) = self.next_outbound_state { - self.state = Some(state.clone()); - } - } fn inject_event(&mut self, inevent: Self::InEvent) { self.events.push(inevent.clone()); match inevent { InEvent::Custom(s) => { - self.state = Some(HandlerState::Ready(Some(NodeHandlerEvent::Custom( + self.state = Some(HandlerState::Ready(NodeHandlerEvent::Custom( OutEvent::Custom(s), - )))) + ))) } InEvent::Substream(Some(user_data)) => { - self.state = Some(HandlerState::Ready(Some( + self.state = Some(HandlerState::Ready( NodeHandlerEvent::OutboundSubstreamRequest(user_data), - ))) + )) } InEvent::NextState => { let next_state = self.next_states.pop(); @@ -127,15 +114,11 @@ impl NodeHandler for Handler { _ => unreachable!(), } } - fn shutdown(&mut self) { - self.state = Some(HandlerState::Ready(None)); - } fn poll(&mut self) -> Poll, IoError> { match self.state.take() { Some(ref state) => match state { HandlerState::NotReady => Ok(Async::NotReady), - HandlerState::Ready(None) => Ok(Async::Ready(NodeHandlerEvent::Shutdown)), - HandlerState::Ready(Some(event)) => Ok(Async::Ready(event.clone())), + HandlerState::Ready(event) => Ok(Async::Ready(event.clone())), HandlerState::Err => Err(io::Error::new(io::ErrorKind::Other, "oh noes")), }, None => Ok(Async::NotReady), diff --git a/core/src/tests/dummy_muxer.rs b/core/src/tests/dummy_muxer.rs index 7df8729d..f0962ff8 100644 --- a/core/src/tests/dummy_muxer.rs +++ b/core/src/tests/dummy_muxer.rs @@ -23,7 +23,7 @@ //! desired way when testing other components. use futures::prelude::*; -use crate::muxing::{Shutdown, StreamMuxer}; +use crate::muxing::StreamMuxer; use std::io::Error as IoError; /// Substream type @@ -39,8 +39,7 @@ pub struct DummyOutboundSubstream {} #[derive(Debug, PartialEq, Clone)] pub enum DummyConnectionState { Pending, // use this to trigger the Async::NotReady code path - Closed, // use this to trigger the Async::Ready(None) code path - Opened, // use this to trigger the Async::Ready(Some(_)) code path + Opened, // use this to trigger the Async::Ready(_) code path } #[derive(Debug, PartialEq, Clone)] struct DummyConnection { @@ -56,14 +55,14 @@ pub struct DummyMuxer{ impl DummyMuxer { /// Create a new `DummyMuxer` where the inbound substream is set to `Pending` - /// and the (single) outbound substream to `Closed`. + /// and the (single) outbound substream to `Pending`. pub fn new() -> Self { DummyMuxer { in_connection: DummyConnection { state: DummyConnectionState::Pending, }, out_connection: DummyConnection { - state: DummyConnectionState::Closed, + state: DummyConnectionState::Pending, }, } } @@ -80,11 +79,10 @@ impl DummyMuxer { impl StreamMuxer for DummyMuxer { type Substream = DummySubstream; type OutboundSubstream = DummyOutboundSubstream; - fn poll_inbound(&self) -> Poll, IoError> { + fn poll_inbound(&self) -> Poll { match self.in_connection.state { DummyConnectionState::Pending => Ok(Async::NotReady), - DummyConnectionState::Closed => Ok(Async::Ready(None)), - DummyConnectionState::Opened => Ok(Async::Ready(Some(Self::Substream {}))), + DummyConnectionState::Opened => Ok(Async::Ready(Self::Substream {})), } } fn open_outbound(&self) -> Self::OutboundSubstream { @@ -93,11 +91,10 @@ impl StreamMuxer for DummyMuxer { fn poll_outbound( &self, _substream: &mut Self::OutboundSubstream, - ) -> Poll, IoError> { + ) -> Poll { match self.out_connection.state { DummyConnectionState::Pending => Ok(Async::NotReady), - DummyConnectionState::Closed => Ok(Async::Ready(None)), - DummyConnectionState::Opened => Ok(Async::Ready(Some(Self::Substream {}))), + DummyConnectionState::Opened => Ok(Async::Ready(Self::Substream {})), } } fn destroy_outbound(&self, _: Self::OutboundSubstream) {} @@ -110,12 +107,12 @@ impl StreamMuxer for DummyMuxer { fn flush_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> { unreachable!() } - fn shutdown_substream(&self, _: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { + fn shutdown_substream(&self, _: &mut Self::Substream) -> Poll<(), IoError> { unreachable!() } fn destroy_substream(&self, _: Self::Substream) {} fn is_remote_acknowledged(&self) -> bool { true } - fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { + fn close(&self) -> Poll<(), IoError> { Ok(Async::Ready(())) } fn flush_all(&self) -> Poll<(), IoError> { diff --git a/core/tests/raw_swarm_dial_error.rs b/core/tests/raw_swarm_dial_error.rs index a9a5a855..b65b1663 100644 --- a/core/tests/raw_swarm_dial_error.rs +++ b/core/tests/raw_swarm_dial_error.rs @@ -28,11 +28,11 @@ use rand::seq::SliceRandom; use std::io; // TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ? -struct TestHandler(std::marker::PhantomData, bool); +struct TestHandler(std::marker::PhantomData); impl Default for TestHandler { fn default() -> Self { - TestHandler(std::marker::PhantomData, false) + TestHandler(std::marker::PhantomData) } } @@ -71,18 +71,10 @@ where } - fn inject_inbound_closed(&mut self) {} - fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Now } - fn shutdown(&mut self) { self.1 = true; } - fn poll(&mut self) -> Poll, Self::Error> { - if self.1 { - Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) - } else { - Ok(Async::NotReady) - } + Ok(Async::NotReady) } } diff --git a/core/tests/raw_swarm_simult.rs b/core/tests/raw_swarm_simult.rs index 94a82af3..297c3a06 100644 --- a/core/tests/raw_swarm_simult.rs +++ b/core/tests/raw_swarm_simult.rs @@ -27,11 +27,11 @@ use std::{io, time::Duration, time::Instant}; use tokio_timer::Delay; // TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ? -struct TestHandler(std::marker::PhantomData, bool); +struct TestHandler(std::marker::PhantomData); impl Default for TestHandler { fn default() -> Self { - TestHandler(std::marker::PhantomData, false) + TestHandler(std::marker::PhantomData) } } @@ -70,18 +70,10 @@ where } - fn inject_inbound_closed(&mut self) {} - fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Now } - fn shutdown(&mut self) { self.1 = true; } - fn poll(&mut self) -> Poll, Self::Error> { - if self.1 { - Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) - } else { - Ok(Async::NotReady) - } + Ok(Async::NotReady) } } diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index a882aa34..e5bbaa97 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -27,7 +27,6 @@ use bytes::Bytes; use libp2p_core::{ Endpoint, StreamMuxer, - muxing::Shutdown, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo} }; use log::{debug, trace}; @@ -247,8 +246,8 @@ task_local!{ /// Processes elements in `inner` until one matching `filter` is found. /// /// If `NotReady` is returned, the current task is scheduled for later, just like with any `Poll`. -/// `Ready(Some())` is almost always returned. `Ready(None)` is returned if the stream is EOF. -fn next_match(inner: &mut MultiplexInner, mut filter: F) -> Poll, IoError> +/// `Ready(Some())` is almost always returned. An error is returned if the stream is EOF. +fn next_match(inner: &mut MultiplexInner, mut filter: F) -> Poll where C: AsyncRead + AsyncWrite, F: FnMut(&codec::Elem) -> Option, { @@ -264,7 +263,7 @@ where C: AsyncRead + AsyncWrite, } inner.buffer.remove(offset); - return Ok(Async::Ready(Some(out))); + return Ok(Async::Ready(out)); } loop { @@ -286,7 +285,7 @@ where C: AsyncRead + AsyncWrite, let elem = match inner.inner.poll_stream_notify(&inner.notifier_read, 0) { Ok(Async::Ready(Some(item))) => item, - Ok(Async::Ready(None)) => return Ok(Async::Ready(None)), + Ok(Async::Ready(None)) => return Err(IoErrorKind::BrokenPipe.into()), Ok(Async::NotReady) => { inner.notifier_read.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current()); return Ok(Async::NotReady); @@ -315,7 +314,7 @@ where C: AsyncRead + AsyncWrite, } if let Some(out) = filter(&elem) { - return Ok(Async::Ready(Some(out))); + return Ok(Async::Ready(out)); } else { let endpoint = elem.endpoint().unwrap_or(Endpoint::Dialer); if inner.opened_substreams.contains(&(elem.substream_id(), !endpoint)) || elem.is_open_msg() { @@ -352,7 +351,7 @@ where C: AsyncRead + AsyncWrite type Substream = Substream; type OutboundSubstream = OutboundSubstream; - fn poll_inbound(&self) -> Poll, IoError> { + fn poll_inbound(&self) -> Poll { let mut inner = self.inner.lock(); if inner.opened_substreams.len() >= inner.config.max_substreams { @@ -368,16 +367,12 @@ where C: AsyncRead + AsyncWrite } })); - if let Some(num) = num { - debug!("Successfully opened inbound substream {}", num); - Ok(Async::Ready(Some(Substream { - current_data: Bytes::new(), - num, - endpoint: Endpoint::Listener, - }))) - } else { - Ok(Async::Ready(None)) - } + debug!("Successfully opened inbound substream {}", num); + Ok(Async::Ready(Substream { + current_data: Bytes::new(), + num, + endpoint: Endpoint::Listener, + })) } fn open_outbound(&self) -> Self::OutboundSubstream { @@ -399,7 +394,7 @@ where C: AsyncRead + AsyncWrite } } - fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll, IoError> { + fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll { loop { let mut inner = self.inner.lock(); @@ -444,11 +439,11 @@ where C: AsyncRead + AsyncWrite OutboundSubstreamState::Flush => { debug!("Successfully opened outbound substream {}", substream.num); substream.state = OutboundSubstreamState::Done; - return Ok(Async::Ready(Some(Substream { + return Ok(Async::Ready(Substream { num: substream.num, current_data: Bytes::new(), endpoint: Endpoint::Dialer, - }))); + })); }, OutboundSubstreamState::Done => unreachable!(), } @@ -485,8 +480,7 @@ where C: AsyncRead + AsyncWrite // We're in a loop, so all we need to do is set `substream.current_data` to the data we // just read and wait for the next iteration. match next_data_poll { - Ok(Async::Ready(Some(data))) => substream.current_data = data, - Ok(Async::Ready(None)) => return Ok(Async::Ready(0)), + Ok(Async::Ready(data)) => substream.current_data = data, Ok(Async::NotReady) => { // There was no data packet in the buffer about this substream; maybe it's // because it has been closed. @@ -534,7 +528,7 @@ where C: AsyncRead + AsyncWrite } } - fn shutdown_substream(&self, sub: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { + fn shutdown_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> { let elem = codec::Elem::Close { substream_id: sub.num, endpoint: sub.endpoint, @@ -555,7 +549,7 @@ where C: AsyncRead + AsyncWrite } #[inline] - fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { + fn close(&self) -> Poll<(), IoError> { let inner = &mut *self.inner.lock(); try_ready!(inner.inner.close_notify(&inner.notifier_write, 0)); inner.is_shutdown = true; diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index 18d5030b..4f82e71b 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -49,7 +49,7 @@ fn client_to_server_outbound() { .and_then(|(client, _)| client.unwrap().0) .map_err(|err| panic!("{:?}", err)) .and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client))) - .map(|client| Builder::new().new_read(client.unwrap())) + .map(|client| Builder::new().new_read(client)) .and_then(|client| { client .into_future() @@ -73,7 +73,7 @@ fn client_to_server_outbound() { .unwrap() .map_err(|err| panic!("{:?}", err)) .and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client))) - .map(|server| Builder::new().new_write(server.unwrap())) + .map(|server| Builder::new().new_write(server)) .and_then(|server| server.send("hello world".into())) .map(|_| ()); @@ -103,7 +103,7 @@ fn client_to_server_inbound() { .and_then(|(client, _)| client.unwrap().0) .map_err(|err| panic!("{:?}", err)) .and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client))) - .map(|client| Builder::new().new_read(client.unwrap())) + .map(|client| Builder::new().new_read(client)) .and_then(|client| { client .into_future() @@ -127,7 +127,7 @@ fn client_to_server_inbound() { .unwrap() .map_err(|err| panic!("{:?}", err)) .and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client))) - .map(|server| Builder::new().new_write(server.unwrap())) + .map(|server| Builder::new().new_write(server)) .and_then(|server| server.send("hello world".into())) .map(|_| ()); diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index 13ea7391..4b3136ea 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -14,4 +14,4 @@ futures = "0.1" libp2p-core = { version = "0.4.0", path = "../../core" } log = "0.4" tokio-io = "0.1" -yamux = "0.1.1" +yamux = "0.1.9" diff --git a/muxers/yamux/src/lib.rs b/muxers/yamux/src/lib.rs index 68cda050..18746edc 100644 --- a/muxers/yamux/src/lib.rs +++ b/muxers/yamux/src/lib.rs @@ -22,7 +22,7 @@ //! [specification](https://github.com/hashicorp/yamux/blob/master/spec.md). use futures::{future::{self, FutureResult}, prelude::*}; -use libp2p_core::{muxing::Shutdown, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}}; +use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use log::debug; use std::{io, iter, sync::atomic}; use std::io::{Error as IoError}; @@ -35,7 +35,8 @@ impl Yamux where C: AsyncRead + AsyncWrite + 'static { - pub fn new(c: C, cfg: yamux::Config, mode: yamux::Mode) -> Self { + pub fn new(c: C, mut cfg: yamux::Config, mode: yamux::Mode) -> Self { + cfg.set_read_after_close(false); Yamux(yamux::Connection::new(c, cfg, mode), atomic::AtomicBool::new(false)) } } @@ -48,17 +49,17 @@ where type OutboundSubstream = FutureResult, io::Error>; #[inline] - fn poll_inbound(&self) -> Poll, IoError> { + fn poll_inbound(&self) -> Poll { match self.0.poll() { Err(e) => { debug!("connection error: {}", e); Err(io::Error::new(io::ErrorKind::Other, e)) } Ok(Async::NotReady) => Ok(Async::NotReady), - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), + Ok(Async::Ready(None)) => Err(io::ErrorKind::BrokenPipe.into()), Ok(Async::Ready(Some(stream))) => { self.1.store(true, atomic::Ordering::Release); - Ok(Async::Ready(Some(stream))) + Ok(Async::Ready(stream)) } } } @@ -70,8 +71,12 @@ where } #[inline] - fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll, IoError> { - substream.poll() + fn poll_outbound(&self, substream: &mut Self::OutboundSubstream) -> Poll { + match substream.poll()? { + Async::Ready(Some(s)) => Ok(Async::Ready(s)), + Async::Ready(None) => Err(io::ErrorKind::BrokenPipe.into()), + Async::NotReady => Ok(Async::NotReady), + } } #[inline] @@ -98,7 +103,7 @@ where } #[inline] - fn shutdown_substream(&self, sub: &mut Self::Substream, _: Shutdown) -> Poll<(), IoError> { + fn shutdown_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> { sub.shutdown() } @@ -112,7 +117,7 @@ where } #[inline] - fn shutdown(&self, _: Shutdown) -> Poll<(), IoError> { + fn close(&self) -> Poll<(), IoError> { self.0.close() } diff --git a/protocols/identify/src/id_transport.rs b/protocols/identify/src/id_transport.rs index 7965758b..be39d3d2 100644 --- a/protocols/identify/src/id_transport.rs +++ b/protocols/identify/src/id_transport.rs @@ -27,7 +27,7 @@ use libp2p_core::{ transport::{TransportError, upgrade::TransportUpgradeError}, upgrade::{self, OutboundUpgradeApply, UpgradeError} }; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::io::Error as IoError; use std::mem; use std::sync::Arc; @@ -149,13 +149,10 @@ where TMuxer: muxing::StreamMuxer + Send + Sync + 'static, match mem::replace(&mut self.state, IdRetrieverState::Poisoned) { IdRetrieverState::OpeningSubstream(muxer, mut opening, config) => { match opening.poll() { - Ok(Async::Ready(Some(substream))) => { + Ok(Async::Ready(substream)) => { let upgrade = upgrade::apply_outbound(substream, config); self.state = IdRetrieverState::NegotiatingIdentify(muxer, upgrade) }, - Ok(Async::Ready(None)) => { - return Err(UpgradeError::Apply(IoError::new(IoErrorKind::Other, "remote refused our identify attempt"))) - } Ok(Async::NotReady) => { self.state = IdRetrieverState::OpeningSubstream(muxer, opening, config); return Ok(Async::NotReady); diff --git a/protocols/identify/src/listen_handler.rs b/protocols/identify/src/listen_handler.rs index e30ba0bd..eebaa793 100644 --- a/protocols/identify/src/listen_handler.rs +++ b/protocols/identify/src/listen_handler.rs @@ -35,9 +35,6 @@ pub struct IdentifyListenHandler { /// List of senders to yield to the user. pending_result: SmallVec<[IdentifySender; 4]>, - - /// True if `shutdown` has been called. - shutdown: bool, } impl IdentifyListenHandler { @@ -47,7 +44,6 @@ impl IdentifyListenHandler { IdentifyListenHandler { config: IdentifyProtocolConfig, pending_result: SmallVec::new(), - shutdown: false, } } } @@ -83,9 +79,6 @@ where #[inline] fn inject_event(&mut self, _: Self::InEvent) {} - #[inline] - fn inject_inbound_closed(&mut self) {} - #[inline] fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<>::Error>) {} @@ -94,11 +87,6 @@ where KeepAlive::Now } - #[inline] - fn shutdown(&mut self) { - self.shutdown = true; - } - fn poll( &mut self, ) -> Poll< @@ -115,10 +103,6 @@ where ))); } - if self.shutdown { - Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)) - } else { - Ok(Async::NotReady) - } + Ok(Async::NotReady) } } diff --git a/protocols/identify/src/periodic_id_handler.rs b/protocols/identify/src/periodic_id_handler.rs index 9dd8cccc..35e12b0e 100644 --- a/protocols/identify/src/periodic_id_handler.rs +++ b/protocols/identify/src/periodic_id_handler.rs @@ -45,9 +45,8 @@ pub struct PeriodicIdHandler { /// it the next time `poll()` is invoked. pending_result: Option, - /// Future that fires when we need to identify the node again. If `None`, means that we should - /// shut down. - next_id: Option, + /// Future that fires when we need to identify the node again. + next_id: Delay, /// If `true`, we have started an identification of the remote at least once in the past. first_id_happened: bool, @@ -72,7 +71,7 @@ impl PeriodicIdHandler { PeriodicIdHandler { config: IdentifyProtocolConfig, pending_result: None, - next_id: Some(Delay::new(Instant::now() + DELAY_TO_FIRST_ID)), + next_id: Delay::new(Instant::now() + DELAY_TO_FIRST_ID), first_id_happened: false, marker: PhantomData, } @@ -112,16 +111,11 @@ where #[inline] fn inject_event(&mut self, _: Self::InEvent) {} - #[inline] - fn inject_inbound_closed(&mut self) {} - #[inline] fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<>::Error>) { self.pending_result = Some(PeriodicIdHandlerEvent::IdentificationError(err)); self.first_id_happened = true; - if let Some(ref mut next_id) = self.next_id { - next_id.reset(Instant::now() + TRY_AGAIN_ON_ERR); - } + self.next_id.reset(Instant::now() + TRY_AGAIN_ON_ERR); } #[inline] @@ -133,11 +127,6 @@ where } } - #[inline] - fn shutdown(&mut self) { - self.next_id = None; - } - fn poll( &mut self, ) -> Poll< @@ -154,16 +143,11 @@ where ))); } - let next_id = match self.next_id { - Some(ref mut nid) => nid, - None => return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)), - }; - // Poll the future that fires when we need to identify the node again. - match next_id.poll()? { + match self.next_id.poll()? { Async::NotReady => Ok(Async::NotReady), Async::Ready(()) => { - next_id.reset(Instant::now() + DELAY_TO_NEXT_ID); + self.next_id.reset(Instant::now() + DELAY_TO_NEXT_ID); let upgrade = self.config.clone(); let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest { upgrade, info: () }; Ok(Async::Ready(ev)) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 24a67f85..00f66a53 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -42,10 +42,6 @@ where /// Configuration for the Kademlia protocol. config: KademliaProtocolConfig, - /// If true, we are trying to shut down the existing Kademlia substream and should refuse any - /// incoming connection. - shutting_down: bool, - /// If false, we always refuse incoming Kademlia substreams. allow_listening: bool, @@ -321,7 +317,6 @@ where fn with_allow_listening(allow_listening: bool) -> Self { KademliaHandler { config: Default::default(), - shutting_down: false, allow_listening, next_connec_unique_id: UniqueConnecId(0), substreams: Vec::new(), @@ -368,10 +363,6 @@ where protocol: >::Output, (msg, user_data): Self::OutboundOpenInfo, ) { - if self.shutting_down { - return; - } - self.substreams .push(SubstreamState::OutPendingSend(protocol, msg, user_data)); } @@ -387,10 +378,6 @@ where EitherOutput::Second(p) => void::unreachable(p), }; - if self.shutting_down { - return; - } - debug_assert!(self.allow_listening); let connec_unique_id = self.next_connec_unique_id; self.next_connec_unique_id.0 += 1; @@ -476,9 +463,6 @@ where } } - #[inline] - fn inject_inbound_closed(&mut self) {} - #[inline] fn inject_dial_upgrade_error( &mut self, @@ -498,33 +482,12 @@ where self.keep_alive } - #[inline] - fn shutdown(&mut self) { - self.shutting_down = true; - } - fn poll( &mut self, ) -> Poll< ProtocolsHandlerEvent, io::Error, > { - // Special case if shutting down. - if self.shutting_down { - for n in (0..self.substreams.len()).rev() { - match self.substreams.swap_remove(n).try_close() { - AsyncSink::Ready => (), - AsyncSink::NotReady(stream) => self.substreams.push(stream), - } - } - - if self.substreams.is_empty() { - return Ok(Async::Ready(ProtocolsHandlerEvent::Shutdown)); - } else { - return Ok(Async::NotReady); - } - } - // We remove each element from `substreams` one by one and add them back. for n in (0..self.substreams.len()).rev() { let mut substream = self.substreams.swap_remove(n); diff --git a/protocols/ping/src/handler.rs b/protocols/ping/src/handler.rs index 3b8ffbc5..87949605 100644 --- a/protocols/ping/src/handler.rs +++ b/protocols/ping/src/handler.rs @@ -95,10 +95,6 @@ where void::unreachable(event) } - fn inject_inbound_closed(&mut self) { - self.inner.inject_inbound_closed() - } - fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr) { self.inner.inject_dial_upgrade_error(info, error) } @@ -107,10 +103,6 @@ where self.inner.connection_keep_alive() } - fn shutdown(&mut self) { - self.inner.shutdown(); - } - fn poll( &mut self, ) -> Poll<