Make <SubstreamRef as AsyncWrite>::shutdown imply flush (#1180)

* Make `<SubstreamRef as AsyncWrite>::shutdown` imply flush

* Use try_ready

* Apply suggestions from code review

Co-Authored-By: Toralf Wittner <tw@dtex.org>
This commit is contained in:
Shotaro Yamada
2019-06-24 17:42:33 +09:00
committed by Pierre Krieger
parent 69bd0dfffb
commit e4d43030eb
2 changed files with 109 additions and 2 deletions

View File

@ -338,6 +338,7 @@ where
SubstreamRef {
muxer,
substream: Some(substream),
shutdown_state: ShutdownState::Shutdown,
}
}
@ -349,6 +350,13 @@ where
{
muxer: P,
substream: Option<<P::Target as StreamMuxer>::Substream>,
shutdown_state: ShutdownState,
}
enum ShutdownState {
Shutdown,
Flush,
Done,
}
impl<P> fmt::Debug for SubstreamRef<P>
@ -431,8 +439,21 @@ where
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
let s = self.substream.as_mut().expect("substream was empty");
self.muxer.shutdown_substream(s).map_err(|e| e.into())?;
Ok(Async::Ready(()))
loop {
match self.shutdown_state {
ShutdownState::Shutdown => {
try_ready!(self.muxer.shutdown_substream(s).map_err(|e| e.into()));
self.shutdown_state = ShutdownState::Flush;
}
ShutdownState::Flush => {
try_ready!(self.muxer.flush_substream(s).map_err(|e| e.into()));
self.shutdown_state = ShutdownState::Done;
}
ShutdownState::Done => {
return Ok(Async::Ready(()));
}
}
}
}
#[inline]

View File

@ -0,0 +1,86 @@
// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use libp2p_core::{muxing, Transport, transport::ListenerEvent};
use libp2p_tcp::TcpConfig;
use futures::prelude::*;
use std::sync::{Arc, mpsc};
use std::thread;
use tokio::runtime::current_thread::Runtime;
#[test]
fn async_write() {
// Tests that `AsyncWrite::shutdown` implies flush.
let (tx, rx) = mpsc::channel();
let bg_thread = thread::spawn(move || {
let transport =
TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new());
let mut listener = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
let addr = listener.by_ref().wait()
.next()
.expect("some event")
.expect("no error")
.into_new_address()
.expect("listen address");
tx.send(addr).unwrap();
let future = listener
.filter_map(ListenerEvent::into_upgrade)
.into_future()
.map_err(|(err, _)| panic!("{:?}", err))
.and_then(|(client, _)| client.unwrap().0)
.map_err(|err| panic!("{:?}", err))
.and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client)))
.and_then(|client| {
tokio::io::read_to_end(client, vec![])
})
.and_then(|(_, msg)| {
assert_eq!(msg, b"hello world");
Ok(())
});
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future).unwrap();
});
let transport = TcpConfig::new().with_upgrade(libp2p_mplex::MplexConfig::new());
let future = transport
.dial(rx.recv().unwrap())
.unwrap()
.map_err(|err| panic!("{:?}", err))
.and_then(|client| muxing::inbound_from_ref_and_wrap(Arc::new(client)))
.and_then(|server| tokio::io::write_all(server, b"hello world"))
.and_then(|(server, _)| {
tokio::io::shutdown(server)
})
.map(|_| ());
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future).unwrap();
bg_thread.join().unwrap();
}