diff --git a/core/src/upgrade/mod.rs b/core/src/upgrade/mod.rs index 702e7f22..50032bd3 100644 --- a/core/src/upgrade/mod.rs +++ b/core/src/upgrade/mod.rs @@ -74,7 +74,7 @@ pub use self::{ error::UpgradeError, map::{MapInboundUpgrade, MapOutboundUpgrade, MapInboundUpgradeErr, MapOutboundUpgradeErr}, select::SelectUpgrade, - transfer::{write_one, WriteOne, read_one, ReadOne, read_one_then, ReadOneThen, ReadOneError, request_response, RequestResponse}, + transfer::{write_one, WriteOne, read_one, ReadOne, read_one_then, ReadOneThen, ReadOneError, request_response, RequestResponse, read_respond, ReadRespond}, }; /// Types serving as protocol names. diff --git a/core/src/upgrade/transfer.rs b/core/src/upgrade/transfer.rs index 1134c5cb..20a2d7f4 100644 --- a/core/src/upgrade/transfer.rs +++ b/core/src/upgrade/transfer.rs @@ -167,9 +167,21 @@ where type Item = Vec; type Error = ReadOneError; + fn poll(&mut self) -> Poll { + Ok(self.inner.poll()?.map(|(_, out)| out)) + } +} + +impl Future for ReadOneInner +where + TSocket: AsyncRead, +{ + type Item = (TSocket, Vec); + type Error = ReadOneError; + fn poll(&mut self) -> Poll { loop { - match mem::replace(&mut self.inner, ReadOneInner::Poisoned) { + match mem::replace(self, ReadOneInner::Poisoned) { ReadOneInner::ReadLen { mut socket, mut len_buf, @@ -203,10 +215,9 @@ where data_buf[.. n].copy_from_slice(&data_start[.. n]); let mut data_buf = io::Window::new(data_buf); data_buf.set_start(data_start.len()); - self.inner = - ReadOneInner::ReadRest(io::read_exact(socket, data_buf)); + *self = ReadOneInner::ReadRest(io::read_exact(socket, data_buf)); } else { - self.inner = ReadOneInner::ReadLen { + *self = ReadOneInner::ReadLen { socket, len_buf, max_size, @@ -214,7 +225,7 @@ where } } Async::NotReady => { - self.inner = ReadOneInner::ReadLen { + *self = ReadOneInner::ReadLen { socket, len_buf, max_size, @@ -225,11 +236,11 @@ where } ReadOneInner::ReadRest(mut inner) => { match inner.poll()? { - Async::Ready((_, data)) => { - return Ok(Async::Ready(data.into_inner())); + Async::Ready((socket, data)) => { + return Ok(Async::Ready((socket, data.into_inner()))); } Async::NotReady => { - self.inner = ReadOneInner::ReadRest(inner); + *self = ReadOneInner::ReadRest(inner); return Ok(Async::NotReady); } } @@ -280,33 +291,40 @@ impl error::Error for ReadOneError { } /// Similar to `read_one`, but applies a transformation on the output buffer. +/// +/// > **Note**: The `param` parameter is an arbitrary value that will be passed back to `then`. +/// > This parameter is normally not necessary, as we could just pass a closure that has +/// > ownership of any data we want. In practice, though, this would make the +/// > `ReadRespond` type impossible to express as a concrete type. Once the `impl Trait` +/// > syntax is allowed within traits, we can remove this parameter. #[inline] -pub fn read_one_then( +pub fn read_one_then( socket: TSocket, max_size: usize, + param: TParam, then: TThen, -) -> ReadOneThen +) -> ReadOneThen where TSocket: AsyncRead, - TThen: FnOnce(Vec) -> Result, + TThen: FnOnce(Vec, TParam) -> Result, TErr: From, { ReadOneThen { inner: read_one(socket, max_size), - then: Some(then), + then: Some((param, then)), } } /// Future that makes `read_one_then` work. -pub struct ReadOneThen { +pub struct ReadOneThen { inner: ReadOne, - then: Option, + then: Option<(TParam, TThen)>, } -impl Future for ReadOneThen +impl Future for ReadOneThen where TSocket: AsyncRead, - TThen: FnOnce(Vec) -> Result, + TThen: FnOnce(Vec, TParam) -> Result, TErr: From, { type Item = TOut; @@ -315,8 +333,60 @@ where fn poll(&mut self) -> Poll { match self.inner.poll()? { Async::Ready(buffer) => { - let then = self.then.take().expect("Future was polled after it was finished"); - Ok(Async::Ready(then(buffer)?)) + let (param, then) = self.then.take() + .expect("Future was polled after it was finished"); + Ok(Async::Ready(then(buffer, param)?)) + }, + Async::NotReady => Ok(Async::NotReady), + } + } +} + +/// Similar to `read_one`, but applies a transformation on the output buffer. +/// +/// > **Note**: The `param` parameter is an arbitrary value that will be passed back to `then`. +/// > This parameter is normally not necessary, as we could just pass a closure that has +/// > ownership of any data we want. In practice, though, this would make the +/// > `ReadRespond` type impossible to express as a concrete type. Once the `impl Trait` +/// > syntax is allowed within traits, we can remove this parameter. +#[inline] +pub fn read_respond( + socket: TSocket, + max_size: usize, + param: TParam, + then: TThen, +) -> ReadRespond +where + TSocket: AsyncRead, + TThen: FnOnce(TSocket, Vec, TParam) -> Result, + TErr: From, +{ + ReadRespond { + inner: read_one(socket, max_size).inner, + then: Some((then, param)), + } +} + +/// Future that makes `read_respond` work. +pub struct ReadRespond { + inner: ReadOneInner, + then: Option<(TThen, TParam)>, +} + +impl Future for ReadRespond +where + TSocket: AsyncRead, + TThen: FnOnce(TSocket, Vec, TParam) -> Result, + TErr: From, +{ + type Item = TOut; + type Error = TErr; + + fn poll(&mut self) -> Poll { + match self.inner.poll()? { + Async::Ready((socket, buffer)) => { + let (then, param) = self.then.take().expect("Future was polled after it was finished"); + Ok(Async::Ready(then(socket, buffer, param)?)) }, Async::NotReady => Ok(Async::NotReady), } @@ -325,43 +395,50 @@ where /// Send a message to the given socket, then shuts down the writing side, then reads an answer. /// -/// This combines `write_one` followed with `read_one`. +/// This combines `write_one` followed with `read_one_then`. +/// +/// > **Note**: The `param` parameter is an arbitrary value that will be passed back to `then`. +/// > This parameter is normally not necessary, as we could just pass a closure that has +/// > ownership of any data we want. In practice, though, this would make the +/// > `ReadRespond` type impossible to express as a concrete type. Once the `impl Trait` +/// > syntax is allowed within traits, we can remove this parameter. #[inline] -pub fn request_response( +pub fn request_response( socket: TSocket, data: TData, max_size: usize, + param: TParam, then: TThen, -) -> RequestResponse +) -> RequestResponse where TSocket: AsyncRead + AsyncWrite, TData: AsRef<[u8]>, - TThen: FnOnce(Vec) -> Result, + TThen: FnOnce(Vec, TParam) -> Result, { RequestResponse { - inner: RequestResponseInner::Write(write_one(socket, data).inner, max_size, then), + inner: RequestResponseInner::Write(write_one(socket, data).inner, max_size, param, then), } } /// Future that makes `request_response` work. -pub struct RequestResponse> { - inner: RequestResponseInner, +pub struct RequestResponse> { + inner: RequestResponseInner, } -enum RequestResponseInner { +enum RequestResponseInner { // We need to write data to the socket. - Write(WriteOneInner, usize, TThen), + Write(WriteOneInner, usize, TParam, TThen), // We need to read the message. - Read(ReadOneThen), + Read(ReadOneThen), // An error happened during the processing. Poisoned, } -impl Future for RequestResponse +impl Future for RequestResponse where TSocket: AsyncRead + AsyncWrite, TData: AsRef<[u8]>, - TThen: FnOnce(Vec) -> Result, + TThen: FnOnce(Vec, TParam) -> Result, TErr: From, { type Item = TOut; @@ -370,14 +447,14 @@ where fn poll(&mut self) -> Poll { loop { match mem::replace(&mut self.inner, RequestResponseInner::Poisoned) { - RequestResponseInner::Write(mut inner, max_size, then) => { + RequestResponseInner::Write(mut inner, max_size, param, then) => { match inner.poll().map_err(ReadOneError::Io)? { Async::Ready(socket) => { self.inner = - RequestResponseInner::Read(read_one_then(socket, max_size, then)); + RequestResponseInner::Read(read_one_then(socket, max_size, param, then)); } Async::NotReady => { - self.inner = RequestResponseInner::Write(inner, max_size, then); + self.inner = RequestResponseInner::Write(inner, max_size, param, then); return Ok(Async::NotReady); } } @@ -428,7 +505,7 @@ mod tests { let mut in_buffer = len_buf.to_vec(); in_buffer.extend_from_slice(&original_data); - let future = read_one_then(Cursor::new(in_buffer), 10_000, move |out| -> Result<_, ReadOneError> { + let future = read_one_then(Cursor::new(in_buffer), 10_000, (), move |out, ()| -> Result<_, ReadOneError> { assert_eq!(out, original_data); Ok(()) }); @@ -438,7 +515,7 @@ mod tests { #[test] fn read_one_zero_len() { - let future = read_one_then(Cursor::new(vec![0]), 10_000, move |out| -> Result<_, ReadOneError> { + let future = read_one_then(Cursor::new(vec![0]), 10_000, (), move |out, ()| -> Result<_, ReadOneError> { assert!(out.is_empty()); Ok(()) }); @@ -454,7 +531,7 @@ mod tests { let mut in_buffer = len_buf.to_vec(); in_buffer.extend((0..5000).map(|_| 0)); - let future = read_one_then(Cursor::new(in_buffer), 100, move |_| -> Result<_, ReadOneError> { + let future = read_one_then(Cursor::new(in_buffer), 100, (), move |_, ()| -> Result<_, ReadOneError> { Ok(()) }); diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 0f8727af..5c72794b 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -53,11 +53,11 @@ where { type Output = FloodsubRpc; type Error = FloodsubDecodeError; - type Future = upgrade::ReadOneThen) -> Result>; + type Future = upgrade::ReadOneThen, ()) -> Result>; #[inline] fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { - upgrade::read_one_then(socket, 2048, |packet| { + upgrade::read_one_then(socket, 2048, (), |packet, ()| { let mut rpc: rpc_proto::RPC = protobuf::parse_from_bytes(&packet)?; let mut messages = Vec::with_capacity(rpc.get_publish().len());