diff --git a/muxers/mplex/src/lib.rs b/muxers/mplex/src/lib.rs index 63afc1bd..6ebf5384 100644 --- a/muxers/mplex/src/lib.rs +++ b/muxers/mplex/src/lib.rs @@ -95,7 +95,7 @@ impl MplexConfig { } #[inline] - fn upgrade(self, i: C, endpoint: Endpoint) -> Multiplex + fn upgrade(self, i: C) -> Multiplex where C: AsyncRead + AsyncWrite { @@ -107,7 +107,7 @@ impl MplexConfig { config: self, buffer: Vec::with_capacity(cmp::min(max_buffer_len, 512)), opened_substreams: Default::default(), - next_outbound_stream_id: if endpoint == Endpoint::Dialer { 0 } else { 1 }, + next_outbound_stream_id: 0, notifier_read: Arc::new(Notifier { to_notify: Mutex::new(Default::default()), }), @@ -163,7 +163,7 @@ where type Future = future::FutureResult; fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { - future::ok(self.upgrade(socket, Endpoint::Listener)) + future::ok(self.upgrade(socket)) } } @@ -176,7 +176,7 @@ where type Future = future::FutureResult; fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { - future::ok(self.upgrade(socket, Endpoint::Dialer)) + future::ok(self.upgrade(socket)) } } @@ -200,7 +200,7 @@ struct MultiplexInner { // The `Endpoint` value denotes who initiated the substream from our point of view // (see note [StreamId]). opened_substreams: FnvHashSet<(u32, Endpoint)>, - // Id of the next outgoing substream. Should always increase by two. + // Id of the next outgoing substream. next_outbound_stream_id: u32, /// List of tasks to notify when a read event happens on the underlying stream. notifier_read: Arc, @@ -382,7 +382,8 @@ where C: AsyncRead + AsyncWrite // Assign a substream ID now. let substream_id = { let n = inner.next_outbound_stream_id; - inner.next_outbound_stream_id += 2; + inner.next_outbound_stream_id = inner.next_outbound_stream_id.checked_add(1) + .expect("Mplex substream ID overflowed"); n };