Simplify the mplex codec implementation (#445)

This commit is contained in:
Pierre Krieger
2018-09-03 15:41:36 +02:00
committed by GitHub
parent b5acf226f9
commit 35e005879b

View File

@ -18,7 +18,6 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use std::cmp;
use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::mem; use std::mem;
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, BytesMut};
@ -79,7 +78,7 @@ pub struct Codec {
enum CodecDecodeState { enum CodecDecodeState {
Begin, Begin,
HasHeader(u32), HasHeader(u32),
HasHeaderAndLen(u32, usize, BytesMut), HasHeaderAndLen(u32, usize),
Poisoned, Poisoned,
} }
@ -114,10 +113,11 @@ impl Decoder for Codec {
match self.varint_decoder.decode(src)? { match self.varint_decoder.decode(src)? {
Some(len) => { Some(len) => {
if len as usize > MAX_FRAME_SIZE { if len as usize > MAX_FRAME_SIZE {
return Err(IoErrorKind::InvalidData.into()); let msg = format!("Mplex frame length {} exceeds maximum", len);
return Err(IoError::new(IoErrorKind::InvalidData, msg));
} }
self.decoder_state = CodecDecodeState::HasHeaderAndLen(header, len as usize, BytesMut::with_capacity(len as usize)); self.decoder_state = CodecDecodeState::HasHeaderAndLen(header, len as usize);
}, },
None => { None => {
self.decoder_state = CodecDecodeState::HasHeader(header); self.decoder_state = CodecDecodeState::HasHeader(header);
@ -125,18 +125,15 @@ impl Decoder for Codec {
}, },
} }
}, },
CodecDecodeState::HasHeaderAndLen(header, len, mut buf) => { CodecDecodeState::HasHeaderAndLen(header, len) => {
debug_assert!(len == 0 || buf.len() < len); if src.len() < len {
let to_transfer = cmp::min(src.len(), len - buf.len()); self.decoder_state = CodecDecodeState::HasHeaderAndLen(header, len);
let to_reserve = len - src.len();
buf.put(src.split_to(to_transfer)); // TODO: more optimal? src.reserve(to_reserve);
if buf.len() < len {
self.decoder_state = CodecDecodeState::HasHeaderAndLen(header, len, buf);
return Ok(None); return Ok(None);
} }
self.decoder_state = CodecDecodeState::Begin; let buf = src.split_to(len);
let substream_id = (header >> 3) as u32; let substream_id = (header >> 3) as u32;
let out = match header & 7 { let out = match header & 7 {
0 => Elem::Open { substream_id }, 0 => Elem::Open { substream_id },
@ -146,14 +143,18 @@ impl Decoder for Codec {
4 => Elem::Close { substream_id, endpoint: Endpoint::Dialer }, 4 => Elem::Close { substream_id, endpoint: Endpoint::Dialer },
5 => Elem::Reset { substream_id, endpoint: Endpoint::Listener }, 5 => Elem::Reset { substream_id, endpoint: Endpoint::Listener },
6 => Elem::Reset { substream_id, endpoint: Endpoint::Dialer }, 6 => Elem::Reset { substream_id, endpoint: Endpoint::Dialer },
_ => return Err(IoErrorKind::InvalidData.into()), _ => {
let msg = format!("Invalid mplex header value 0x{:x}", header);
return Err(IoError::new(IoErrorKind::InvalidData, msg));
},
}; };
self.decoder_state = CodecDecodeState::Begin;
return Ok(Some(out)); return Ok(Some(out));
}, },
CodecDecodeState::Poisoned => { CodecDecodeState::Poisoned => {
return Err(IoErrorKind::InvalidData.into()); return Err(IoError::new(IoErrorKind::InvalidData, "Mplex codec poisoned"));
} }
} }
} }