From e4d43030ebe27045c3590c53c7da4f3b36c36e70 Mon Sep 17 00:00:00 2001 From: Shotaro Yamada Date: Mon, 24 Jun 2019 17:42:33 +0900 Subject: [PATCH] Make `::shutdown` imply flush (#1180) * Make `::shutdown` imply flush * Use try_ready * Apply suggestions from code review Co-Authored-By: Toralf Wittner --- core/src/muxing.rs | 25 ++++++++- muxers/mplex/tests/async_write.rs | 86 +++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 2 deletions(-) create mode 100644 muxers/mplex/tests/async_write.rs diff --git a/core/src/muxing.rs b/core/src/muxing.rs index 4e3c8957..28245666 100644 --- a/core/src/muxing.rs +++ b/core/src/muxing.rs @@ -338,6 +338,7 @@ where SubstreamRef { muxer, substream: Some(substream), + shutdown_state: ShutdownState::Shutdown, } } @@ -349,6 +350,13 @@ where { muxer: P, substream: Option<::Substream>, + shutdown_state: ShutdownState, +} + +enum ShutdownState { + Shutdown, + Flush, + Done, } impl

fmt::Debug for SubstreamRef

@@ -431,8 +439,21 @@ where #[inline] fn shutdown(&mut self) -> Poll<(), io::Error> { let s = self.substream.as_mut().expect("substream was empty"); - self.muxer.shutdown_substream(s).map_err(|e| e.into())?; - Ok(Async::Ready(())) + loop { + match self.shutdown_state { + ShutdownState::Shutdown => { + try_ready!(self.muxer.shutdown_substream(s).map_err(|e| e.into())); + self.shutdown_state = ShutdownState::Flush; + } + ShutdownState::Flush => { + try_ready!(self.muxer.flush_substream(s).map_err(|e| e.into())); + self.shutdown_state = ShutdownState::Done; + } + ShutdownState::Done => { + return Ok(Async::Ready(())); + } + } + } } #[inline] diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs new file mode 100644 index 00000000..be1a2b5a --- /dev/null +++ b/muxers/mplex/tests/async_write.rs @@ -0,0 +1,86 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_core::{muxing, Transport, transport::ListenerEvent}; +use libp2p_tcp::TcpConfig; +use futures::prelude::*; +use std::sync::{Arc, mpsc}; +use std::thread; +use tokio::runtime::current_thread::Runtime; + +#[test] +fn async_write() { + // Tests that `AsyncWrite::shutdown` implies flush. + + let (tx, rx) = mpsc::channel(); + + let bg_thread = thread::spawn(move || { + let transport = + TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new()); + + let mut listener = transport + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) + .unwrap(); + + let addr = listener.by_ref().wait() + .next() + .expect("some event") + .expect("no error") + .into_new_address() + .expect("listen address"); + + tx.send(addr).unwrap(); + + let future = listener + .filter_map(ListenerEvent::into_upgrade) + .into_future() + .map_err(|(err, _)| panic!("{:?}", err)) + .and_then(|(client, _)| client.unwrap().0) + .map_err(|err| panic!("{:?}", err)) + .and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client))) + .and_then(|client| { + tokio::io::read_to_end(client, vec![]) + }) + .and_then(|(_, msg)| { + assert_eq!(msg, b"hello world"); + Ok(()) + }); + + let mut rt = Runtime::new().unwrap(); + let _ = rt.block_on(future).unwrap(); + }); + + let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new()); + + let future = transport + .dial(rx.recv().unwrap()) + .unwrap() + .map_err(|err| panic!("{:?}", err)) + .and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client))) + .and_then(|server| tokio::io::write_all(server, b"hello world")) + .and_then(|(server, _)| { + tokio::io::shutdown(server) + }) + .map(|_| ()); + + let mut rt = Runtime::new().unwrap(); + let _ = rt.block_on(future).unwrap(); + bg_thread.join().unwrap(); +}