diff --git a/protocols/secio/src/codec/encode.rs b/protocols/secio/src/codec/encode.rs index 86b616ec..6cd3addc 100644 --- a/protocols/secio/src/codec/encode.rs +++ b/protocols/secio/src/codec/encode.rs @@ -22,10 +22,7 @@ use bytes::BytesMut; use super::StreamCipher; -use futures::sink::Sink; -use futures::stream::Stream; -use futures::Poll; -use futures::StartSend; +use futures::prelude::*; use ring::hmac; /// Wraps around a `Sink`. Encodes the buffers passed to it and passes it to the underlying sink. @@ -39,18 +36,16 @@ pub struct EncoderMiddleware { cipher_state: StreamCipher, hmac_key: hmac::SigningKey, raw_sink: S, + pending: Option // buffer encrypted data which can not be sent right away } impl EncoderMiddleware { - pub fn new( - raw_sink: S, - cipher: StreamCipher, - hmac_key: hmac::SigningKey, - ) -> EncoderMiddleware { + pub fn new(raw: S, cipher: StreamCipher, key: hmac::SigningKey) -> EncoderMiddleware { EncoderMiddleware { cipher_state: cipher, - hmac_key, - raw_sink, + hmac_key: key, + raw_sink: raw, + pending: None } } } @@ -63,22 +58,42 @@ where type SinkError = S::SinkError; fn start_send(&mut self, mut data_buf: Self::SinkItem) -> StartSend { - // TODO if SinkError gets refactor to SecioError, - // then use try_apply_keystream + if let Some(data) = self.pending.take() { + if let AsyncSink::NotReady(data) = self.raw_sink.start_send(data)? { + self.pending = Some(data); + return Ok(AsyncSink::NotReady(data_buf)) + } + } + debug_assert!(self.pending.is_none()); + // TODO if SinkError gets refactor to SecioError, then use try_apply_keystream self.cipher_state.apply_keystream(&mut data_buf[..]); let signature = hmac::sign(&self.hmac_key, &data_buf[..]); data_buf.extend_from_slice(signature.as_ref()); - self.raw_sink.start_send(data_buf) - + if let AsyncSink::NotReady(data) = self.raw_sink.start_send(data_buf)? { + self.pending = Some(data) + } + Ok(AsyncSink::Ready) } #[inline] fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + if let Some(data) = self.pending.take() { + if let AsyncSink::NotReady(data) = self.raw_sink.start_send(data)? { + self.pending = Some(data); + return Ok(Async::NotReady) + } + } self.raw_sink.poll_complete() } #[inline] fn close(&mut self) -> Poll<(), Self::SinkError> { + if let Some(data) = self.pending.take() { + if let AsyncSink::NotReady(data) = self.raw_sink.start_send(data)? { + self.pending = Some(data); + return Ok(Async::NotReady) + } + } self.raw_sink.close() } }