From 496100cdb56bb39a19e62caf15eb0b82ccb521e6 Mon Sep 17 00:00:00 2001 From: Vurich Date: Mon, 11 Dec 2017 16:38:59 +0100 Subject: [PATCH] MultiplexEnd->Endpoint --- multiplex-rs/src/header.rs | 44 +++++++++++++++++--------------------- multiplex-rs/src/lib.rs | 34 ++++++++++++++--------------- 2 files changed, 37 insertions(+), 41 deletions(-) diff --git a/multiplex-rs/src/header.rs b/multiplex-rs/src/header.rs index 74408c3a..e3a01de1 100644 --- a/multiplex-rs/src/header.rs +++ b/multiplex-rs/src/header.rs @@ -18,6 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use swarm::Endpoint; + const FLAG_BITS: usize = 3; const FLAG_MASK: usize = (1usize << FLAG_BITS) - 1; @@ -29,12 +31,6 @@ pub mod errors { } } -#[derive(Copy, Clone, PartialEq, Eq, Debug)] -pub enum MultiplexEnd { - Initiator, - Receiver, -} - #[derive(Copy, Clone, PartialEq, Eq, Debug)] pub struct MultiplexHeader { pub packet_type: PacketType, @@ -44,9 +40,9 @@ pub struct MultiplexHeader { #[derive(Copy, Clone, PartialEq, Eq, Debug)] pub enum PacketType { Open, - Close(MultiplexEnd), - Reset(MultiplexEnd), - Message(MultiplexEnd), + Close(Endpoint), + Reset(Endpoint), + Message(Endpoint), } impl MultiplexHeader { @@ -57,21 +53,21 @@ impl MultiplexHeader { } } - pub fn close(id: u32, end: MultiplexEnd) -> Self { + pub fn close(id: u32, end: Endpoint) -> Self { MultiplexHeader { substream_id: id, packet_type: PacketType::Close(end), } } - pub fn reset(id: u32, end: MultiplexEnd) -> Self { + pub fn reset(id: u32, end: Endpoint) -> Self { MultiplexHeader { substream_id: id, packet_type: PacketType::Reset(end), } } - pub fn message(id: u32, end: MultiplexEnd) -> Self { + pub fn message(id: u32, end: Endpoint) -> Self { MultiplexHeader { substream_id: id, packet_type: PacketType::Message(end), @@ -96,14 +92,14 @@ impl MultiplexHeader { let packet_type = match flags { 0 => PacketType::Open, - 1 => PacketType::Message(MultiplexEnd::Receiver), - 2 => PacketType::Message(MultiplexEnd::Initiator), + 1 => PacketType::Message(Endpoint::Listener), + 2 => PacketType::Message(Endpoint::Dialer), - 3 => PacketType::Close(MultiplexEnd::Receiver), - 4 => PacketType::Close(MultiplexEnd::Initiator), + 3 => PacketType::Close(Endpoint::Listener), + 4 => PacketType::Close(Endpoint::Dialer), - 5 => PacketType::Reset(MultiplexEnd::Receiver), - 6 => PacketType::Reset(MultiplexEnd::Initiator), + 5 => PacketType::Reset(Endpoint::Listener), + 6 => PacketType::Reset(Endpoint::Dialer), _ => { use std::io; @@ -128,14 +124,14 @@ impl MultiplexHeader { let packet_type_id = match self.packet_type { PacketType::Open => 0, - PacketType::Message(MultiplexEnd::Receiver) => 1, - PacketType::Message(MultiplexEnd::Initiator) => 2, + PacketType::Message(Endpoint::Listener) => 1, + PacketType::Message(Endpoint::Dialer) => 2, - PacketType::Close(MultiplexEnd::Receiver) => 3, - PacketType::Close(MultiplexEnd::Initiator) => 4, + PacketType::Close(Endpoint::Listener) => 3, + PacketType::Close(Endpoint::Dialer) => 4, - PacketType::Reset(MultiplexEnd::Receiver) => 5, - PacketType::Reset(MultiplexEnd::Initiator) => 6, + PacketType::Reset(Endpoint::Listener) => 5, + PacketType::Reset(Endpoint::Dialer) => 6, }; let substream_id = (self.substream_id as u64) << FLAG_BITS; diff --git a/multiplex-rs/src/lib.rs b/multiplex-rs/src/lib.rs index 7e962915..2393ddee 100644 --- a/multiplex-rs/src/lib.rs +++ b/multiplex-rs/src/lib.rs @@ -39,9 +39,9 @@ mod header; use bytes::Bytes; use futures::{Async, Future, Poll}; use futures::future::{self, FutureResult}; -use header::{MultiplexEnd, MultiplexHeader}; +use header::MultiplexHeader; use swarm::muxing::StreamMuxer; -use swarm::ConnectionUpgrade; +use swarm::{ConnectionUpgrade, Endpoint}; use parking_lot::Mutex; use read::{read_stream, MultiplexReadState}; use shared::{buf_from_slice, ByteBuf, MultiplexShared}; @@ -66,7 +66,7 @@ use write::write_stream; pub struct Substream { id: u32, - end: MultiplexEnd, + end: Endpoint, name: Option, state: Arc>>, buffer: Option>, @@ -83,7 +83,7 @@ impl Drop for Substream { impl Substream { fn new>>( id: u32, - end: MultiplexEnd, + end: Endpoint, name: B, state: Arc>>, ) -> Self { @@ -158,7 +158,7 @@ impl AsyncWrite for Substream { } pub struct InboundFuture { - end: MultiplexEnd, + end: Endpoint, state: Arc>>, } @@ -216,8 +216,8 @@ impl OutboundFuture { } } -fn nonce_to_id(id: usize, end: MultiplexEnd) -> u32 { - id as u32 * 2 + if end == MultiplexEnd::Initiator { 1 } else { 0 } +fn nonce_to_id(id: usize, end: Endpoint) -> u32 { + id as u32 * 2 + if end == Endpoint::Dialer { 1 } else { 0 } } impl Future for OutboundFuture { @@ -276,7 +276,7 @@ impl Future for OutboundFuture { pub struct MultiplexMetadata { nonce: AtomicUsize, - end: MultiplexEnd, + end: Endpoint, } pub struct Multiplex { @@ -294,7 +294,7 @@ impl Clone for Multiplex { } impl Multiplex { - pub fn new(stream: T, end: MultiplexEnd) -> Self { + pub fn new(stream: T, end: Endpoint) -> Self { Multiplex { meta: Arc::new(MultiplexMetadata { nonce: AtomicUsize::new(0), @@ -305,11 +305,11 @@ impl Multiplex { } pub fn dial(stream: T) -> Self { - Self::new(stream, MultiplexEnd::Initiator) + Self::new(stream, Endpoint::Dialer) } pub fn listen(stream: T) -> Self { - Self::new(stream, MultiplexEnd::Receiver) + Self::new(stream, Endpoint::Listener) } } @@ -342,8 +342,8 @@ where type NamesIter = iter::Once<(Bytes, ())>; #[inline] - fn upgrade(self, i: C, _: ()) -> Self::Future { - future::ok(Multiplex::dial(i)) + fn upgrade(self, i: C, _: (), end: Endpoint) -> Self::Future { + future::ok(Multiplex::new(i, end)) } #[inline] @@ -466,7 +466,7 @@ mod tests { .chain( varint::encode( // ID for an unopened stream: 1 - MultiplexHeader::message(1, MultiplexEnd::Initiator).to_u64(), + MultiplexHeader::message(1, Endpoint::Dialer).to_u64(), ).into_iter(), ) // Body: `dummy_length` of zeroes @@ -477,7 +477,7 @@ mod tests { .chain( varint::encode( // ID for an opened stream: 0 - MultiplexHeader::message(0, MultiplexEnd::Initiator).to_u64(), + MultiplexHeader::message(0, Endpoint::Dialer).to_u64(), ).into_iter(), ) .chain(varint::encode(message.len())) @@ -516,7 +516,7 @@ mod tests { .chain( varint::encode( // ID for an unopened stream: 1 - MultiplexHeader::close(0, MultiplexEnd::Initiator).to_u64(), + MultiplexHeader::close(0, Endpoint::Dialer).to_u64(), ).into_iter(), ) .chain(varint::encode(dummy_length)) @@ -526,7 +526,7 @@ mod tests { .chain( varint::encode( // ID for an opened stream: 0 - MultiplexHeader::message(0, MultiplexEnd::Initiator).to_u64(), + MultiplexHeader::message(0, Endpoint::Dialer).to_u64(), ).into_iter(), ) .chain(varint::encode(dummy_length))