diff --git a/core/Cargo.toml b/core/Cargo.toml index c2b85f01..4f039c35 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -48,10 +48,8 @@ libp2p-mplex = { version = "0.12.0", path = "../muxers/mplex" } libp2p-secio = { version = "0.12.0", path = "../protocols/secio" } rand = "0.6" quickcheck = "0.8" -tokio = "0.1" -wasm-timer = "0.1" +wasm-timer = "0.2" assert_matches = "1.3" -tokio-mock-task = "0.1" [features] default = ["secp256k1"] diff --git a/core/src/nodes/listeners.rs b/core/src/nodes/listeners.rs index b9c8ebbf..861f3e75 100644 --- a/core/src/nodes/listeners.rs +++ b/core/src/nodes/listeners.rs @@ -51,32 +51,30 @@ use std::{collections::VecDeque, fmt, pin::Pin}; /// listeners.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); /// /// // The `listeners` will now generate events when polled. -/// let future = listeners.for_each(move |event| { -/// match event { -/// ListenersEvent::NewAddress { listener_id, listen_addr } => { -/// println!("Listener {:?} is listening at address {}", listener_id, listen_addr); -/// }, -/// ListenersEvent::AddressExpired { listener_id, listen_addr } => { -/// println!("Listener {:?} is no longer listening at address {}", listener_id, listen_addr); -/// }, -/// ListenersEvent::Closed { listener_id, .. } => { -/// println!("Listener {:?} has been closed", listener_id); -/// }, -/// ListenersEvent::Error { listener_id, error } => { -/// println!("Listener {:?} has experienced an error: {}", listener_id, error); -/// }, -/// ListenersEvent::Incoming { listener_id, upgrade, local_addr, .. } => { -/// println!("Listener {:?} has a new connection on {}", listener_id, local_addr); -/// // We don't do anything with the newly-opened connection, but in a real-life -/// // program you probably want to use it! -/// drop(upgrade); -/// }, -/// }; -/// -/// Ok(()) -/// }); -/// -/// tokio::run(future.map_err(|_| ())); +/// futures::executor::block_on(async move { +/// while let Some(event) = listeners.next().await { +/// match event { +/// ListenersEvent::NewAddress { listener_id, listen_addr } => { +/// println!("Listener {:?} is listening at address {}", listener_id, listen_addr); +/// }, +/// ListenersEvent::AddressExpired { listener_id, listen_addr } => { +/// println!("Listener {:?} is no longer listening at address {}", listener_id, listen_addr); +/// }, +/// ListenersEvent::Closed { listener_id, .. } => { +/// println!("Listener {:?} has been closed", listener_id); +/// }, +/// ListenersEvent::Error { listener_id, error } => { +/// println!("Listener {:?} has experienced an error: {}", listener_id, error); +/// }, +/// ListenersEvent::Incoming { listener_id, upgrade, local_addr, .. } => { +/// println!("Listener {:?} has a new connection on {}", listener_id, local_addr); +/// // We don't do anything with the newly-opened connection, but in a real-life +/// // program you probably want to use it! +/// drop(upgrade); +/// }, +/// } +/// } +/// }) /// # } /// ``` pub struct ListenersStream @@ -358,7 +356,6 @@ mod tests { use super::*; use crate::transport::{self, ListenerEvent}; use assert_matches::assert_matches; - use tokio::runtime::current_thread::Runtime; use std::{io, iter::FromIterator}; use futures::{future::{self}, stream}; use crate::PeerId; diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index e53a1f2b..ad3312e6 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::{Transport, transport::{TransportError, ListenerEvent}}; -use bytes::{Bytes, IntoBuf}; +use bytes::IntoBuf; use fnv::FnvHashMap; use futures::{future::{self, Ready}, prelude::*, channel::mpsc, task::Context, task::Poll}; use lazy_static::lazy_static; @@ -29,7 +29,7 @@ use rw_stream_sink::RwStreamSink; use std::{collections::hash_map::Entry, error, fmt, io, num::NonZeroU64, pin::Pin}; lazy_static! { - static ref HUB: Mutex>>> = + static ref HUB: Mutex>>>> = Mutex::new(FnvHashMap::default()); } @@ -39,13 +39,13 @@ pub struct MemoryTransport; /// Connection to a `MemoryTransport` currently being opened. pub struct DialFuture { - sender: mpsc::Sender>, - channel_to_send: Option>, - channel_to_return: Option>, + sender: mpsc::Sender>>, + channel_to_send: Option>>, + channel_to_return: Option>>, } impl Future for DialFuture { - type Output = Result, MemoryTransportError>; + type Output = Result>, MemoryTransportError>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { match self.sender.poll_ready(cx) { @@ -67,7 +67,7 @@ impl Future for DialFuture { } impl Transport for MemoryTransport { - type Output = Channel; + type Output = Channel>; type Error = MemoryTransportError; type Listener = Listener; type ListenerUpgrade = Ready>; @@ -168,13 +168,13 @@ pub struct Listener { /// The address we are listening on. addr: Multiaddr, /// Receives incoming connections. - receiver: mpsc::Receiver>, + receiver: mpsc::Receiver>>, /// Generate `ListenerEvent::NewAddress` to inform about our listen address. tell_listen_addr: bool } impl Stream for Listener { - type Item = Result, MemoryTransportError>>>, MemoryTransportError>; + type Item = Result>, MemoryTransportError>>>, MemoryTransportError>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if self.tell_listen_addr { @@ -230,7 +230,7 @@ pub type Channel = RwStreamSink>; /// A channel represents an established, in-memory, logical connection between two endpoints. /// /// Implements `Sink` and `Stream`. -pub struct Chan { +pub struct Chan> { incoming: mpsc::Receiver, outgoing: mpsc::Sender, } diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index e6ff2d2b..976ec980 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -34,7 +34,7 @@ use libp2p_swarm::{ protocols_handler::NodeHandlerWrapperBuilder }; use rand::seq::SliceRandom; -use std::io; +use std::{io, task::Context, task::Poll}; // TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ? struct TestHandler(std::marker::PhantomData); @@ -47,7 +47,7 @@ impl Default for TestHandler { impl ProtocolsHandler for TestHandler where - TSubstream: futures::PollRead + futures::PollWrite + TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static { type InEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139) type OutEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139) @@ -82,7 +82,7 @@ where fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::No } - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll, Self::Error> { + fn poll(&mut self, _: &mut Context) -> Poll> { Poll::Pending } } @@ -113,26 +113,27 @@ fn deny_incoming_connec() { swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - let address = - if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll() { - listen_addr + let address = futures::executor::block_on(future::poll_fn(|cx| { + if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) { + Poll::Ready(listen_addr) } else { panic!("Was expecting the listen address to be reported") - }; + } + })); swarm2 .peer(swarm1.local_peer_id().clone()) .into_not_connected().unwrap() .connect(address.clone(), TestHandler::default().into_node_handler_builder()); - let future = future::poll_fn(|| -> Poll> { - match swarm1.poll() { + futures::executor::block_on(future::poll_fn(|cx| -> Poll> { + match swarm1.poll(cx) { Poll::Ready(NetworkEvent::IncomingConnection(inc)) => drop(inc), Poll::Ready(_) => unreachable!(), Poll::Pending => (), } - match swarm2.poll() { + match swarm2.poll(cx) { Poll::Ready(NetworkEvent::DialError { new_state: PeerState::NotConnected, peer_id, @@ -148,9 +149,7 @@ fn deny_incoming_connec() { } Poll::Pending - }); - - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + })).unwrap(); } #[test] @@ -176,31 +175,30 @@ fn dial_self() { .and_then(|(peer, mplex), _| { // Gracefully close the connection to allow protocol // negotiation to complete. - util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex)) + util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) }); Network::new(transport, local_public_key.into()) }; swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - let (address, mut swarm) = - future::lazy(move || { - if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm.poll() { + let (address, mut swarm) = futures::executor::block_on( + future::lazy(move |cx| { + if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm.poll(cx) { Ok::<_, void::Void>((listen_addr, swarm)) } else { panic!("Was expecting the listen address to be reported") } - }) - .wait() + })) .unwrap(); swarm.dial(address.clone(), TestHandler::default().into_node_handler_builder()).unwrap(); let mut got_dial_err = false; let mut got_inc_err = false; - let future = future::poll_fn(|| -> Poll> { + futures::executor::block_on(future::poll_fn(|cx| -> Poll> { loop { - match swarm.poll() { + match swarm.poll(cx) { Poll::Ready(NetworkEvent::UnknownPeerDialError { multiaddr, error: UnknownPeerDialErr::FoundLocalPeerId, @@ -210,7 +208,7 @@ fn dial_self() { assert!(!got_dial_err); got_dial_err = true; if got_inc_err { - return Ok(Poll::Ready(())); + return Poll::Ready(Ok(())); } }, Poll::Ready(NetworkEvent::IncomingConnectionError { @@ -222,7 +220,7 @@ fn dial_self() { assert!(!got_inc_err); got_inc_err = true; if got_dial_err { - return Ok(Poll::Ready(())); + return Poll::Ready(Ok(())); } }, Poll::Ready(NetworkEvent::IncomingConnection(inc)) => { @@ -235,9 +233,7 @@ fn dial_self() { Poll::Pending => break Poll::Pending, } } - }); - - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + })).unwrap(); } #[test] @@ -288,9 +284,9 @@ fn multiple_addresses_err() { .connect_iter(addresses.clone(), TestHandler::default().into_node_handler_builder()) .unwrap(); - let future = future::poll_fn(|| -> Poll> { + futures::executor::block_on(future::poll_fn(|cx| -> Poll> { loop { - match swarm.poll() { + match swarm.poll(cx) { Poll::Ready(NetworkEvent::DialError { new_state, peer_id, @@ -302,7 +298,7 @@ fn multiple_addresses_err() { assert_eq!(multiaddr, expected); if addresses.is_empty() { assert_eq!(new_state, PeerState::NotConnected); - return Ok(Poll::Ready(())); + return Poll::Ready(Ok(())); } else { match new_state { PeerState::Dialing { num_pending_addresses } => { @@ -316,7 +312,5 @@ fn multiple_addresses_err() { Poll::Pending => break Poll::Pending, } } - }); - - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(future).unwrap(); + })).unwrap(); } diff --git a/core/tests/network_simult.rs b/core/tests/network_simult.rs index 612875db..7d7a247a 100644 --- a/core/tests/network_simult.rs +++ b/core/tests/network_simult.rs @@ -31,8 +31,8 @@ use libp2p_swarm::{ ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, }; -use std::{io, time::Duration}; -use wasm_timer::{Delay, Instant}; +use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; +use wasm_timer::Delay; // TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ? struct TestHandler(std::marker::PhantomData); @@ -45,7 +45,7 @@ impl Default for TestHandler { impl ProtocolsHandler for TestHandler where - TSubstream: futures::PollRead + futures::PollWrite + TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static { type InEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139) type OutEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139) @@ -80,7 +80,7 @@ where fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Yes } - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll, Self::Error> { + fn poll(&mut self, _: &mut Context) -> Poll> { Poll::Pending } } @@ -116,7 +116,7 @@ fn raw_swarm_simultaneous_connect() { .and_then(|(peer, mplex), _| { // Gracefully close the connection to allow protocol // negotiation to complete. - util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex)) + util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) }); Network::new(transport, local_public_key.into_peer_id()) }; @@ -131,7 +131,7 @@ fn raw_swarm_simultaneous_connect() { .and_then(|(peer, mplex), _| { // Gracefully close the connection to allow protocol // negotiation to complete. - util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex)) + util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) }); Network::new(transport, local_public_key.into_peer_id()) }; @@ -139,17 +139,17 @@ fn raw_swarm_simultaneous_connect() { swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); swarm2.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); - let (swarm1_listen_addr, swarm2_listen_addr, mut swarm1, mut swarm2) = - future::lazy(move || { + let (swarm1_listen_addr, swarm2_listen_addr, mut swarm1, mut swarm2) = futures::executor::block_on( + future::lazy(move |cx| { let swarm1_listen_addr = - if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll() { + if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) { listen_addr } else { panic!("Was expecting the listen address to be reported") }; let swarm2_listen_addr = - if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm2.poll() { + if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm2.poll(cx) { listen_addr } else { panic!("Was expecting the listen address to be reported") @@ -157,19 +157,16 @@ fn raw_swarm_simultaneous_connect() { Ok::<_, void::Void>((swarm1_listen_addr, swarm2_listen_addr, swarm1, swarm2)) }) - .wait() - .unwrap(); - - let mut reactor = tokio::runtime::current_thread::Runtime::new().unwrap(); + ).unwrap(); loop { let mut swarm1_step = 0; let mut swarm2_step = 0; - let mut swarm1_dial_start = Delay::new(Instant::now() + Duration::new(0, rand::random::() % 50_000_000)); - let mut swarm2_dial_start = Delay::new(Instant::now() + Duration::new(0, rand::random::() % 50_000_000)); + let mut swarm1_dial_start = Delay::new(Duration::new(0, rand::random::() % 50_000_000)); + let mut swarm2_dial_start = Delay::new(Duration::new(0, rand::random::() % 50_000_000)); - let future = future::poll_fn(|| -> Poll { + let future = future::poll_fn(|cx| -> Poll { loop { let mut swarm1_not_ready = false; let mut swarm2_not_ready = false; @@ -178,7 +175,7 @@ fn raw_swarm_simultaneous_connect() { // handle other nodes, which may delay the processing. if swarm1_step == 0 { - match swarm1_dial_start.poll().unwrap() { + match Future::poll(Pin::new(&mut swarm1_dial_start), cx) { Poll::Ready(_) => { let handler = TestHandler::default().into_node_handler_builder(); swarm1.peer(swarm2.local_peer_id().clone()) @@ -192,7 +189,7 @@ fn raw_swarm_simultaneous_connect() { } if swarm2_step == 0 { - match swarm2_dial_start.poll().unwrap() { + match Future::poll(Pin::new(&mut swarm2_dial_start), cx) { Poll::Ready(_) => { let handler = TestHandler::default().into_node_handler_builder(); swarm2.peer(swarm1.local_peer_id().clone()) @@ -206,7 +203,7 @@ fn raw_swarm_simultaneous_connect() { } if rand::random::() < 0.1 { - match swarm1.poll() { + match swarm1.poll(cx) { Poll::Ready(NetworkEvent::IncomingConnectionError { error: IncomingError::DeniedLowerPriority, .. }) => { @@ -218,7 +215,7 @@ fn raw_swarm_simultaneous_connect() { if swarm1_step == 0 { // The connection was established before // swarm1 started dialing; discard the test run. - return Ok(Poll::Ready(false)) + return Poll::Ready(false) } assert_eq!(swarm1_step, 1); swarm1_step = 2; @@ -237,7 +234,7 @@ fn raw_swarm_simultaneous_connect() { } if rand::random::() < 0.1 { - match swarm2.poll() { + match swarm2.poll(cx) { Poll::Ready(NetworkEvent::IncomingConnectionError { error: IncomingError::DeniedLowerPriority, .. }) => { @@ -249,7 +246,7 @@ fn raw_swarm_simultaneous_connect() { if swarm2_step == 0 { // The connection was established before // swarm2 started dialing; discard the test run. - return Ok(Poll::Ready(false)) + return Poll::Ready(false) } assert_eq!(swarm2_step, 1); swarm2_step = 2; @@ -269,7 +266,7 @@ fn raw_swarm_simultaneous_connect() { // TODO: make sure that >= 5 is correct if swarm1_step + swarm2_step >= 5 { - return Ok(Poll::Ready(true)); + return Poll::Ready(true); } if swarm1_not_ready && swarm2_not_ready { @@ -278,7 +275,7 @@ fn raw_swarm_simultaneous_connect() { } }); - if reactor.block_on(future).unwrap() { + if futures::executor::block_on(future) { // The test exercised what we wanted to exercise: a simultaneous connect. break } else { diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index 8ac012da..bee0c8d7 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -22,13 +22,13 @@ mod util; use futures::prelude::*; use libp2p_core::identity; -use libp2p_core::transport::{Transport, MemoryTransport, ListenerEvent}; +use libp2p_core::transport::{Transport, MemoryTransport}; use libp2p_core::upgrade::{self, UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade}; use libp2p_mplex::MplexConfig; use libp2p_secio::SecioConfig; use multiaddr::Multiaddr; use rand::random; -use std::io; +use std::{io, pin::Pin}; #[derive(Clone)] struct HelloUpgrade {} @@ -44,30 +44,36 @@ impl UpgradeInfo for HelloUpgrade { impl InboundUpgrade for HelloUpgrade where - C: AsyncRead + AsyncWrite + Send + 'static + C: AsyncRead + AsyncWrite + Send + Unpin + 'static { type Output = Negotiated; type Error = io::Error; - type Future = Box + Send>; + type Future = Pin> + Send>>; - fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { - Box::new(nio::read_exact(socket, [0u8; 5]).map(|(io, buf)| { + fn upgrade_inbound(self, mut socket: Negotiated, _: Self::Info) -> Self::Future { + Box::pin(async move { + let mut buf = [0u8; 5]; + socket.read_exact(&mut buf).await.unwrap(); assert_eq!(&buf[..], "hello".as_bytes()); - io - })) + Ok(socket) + }) } } impl OutboundUpgrade for HelloUpgrade where - C: AsyncWrite + AsyncRead + Send + 'static, + C: AsyncWrite + AsyncRead + Send + Unpin + 'static, { type Output = Negotiated; type Error = io::Error; - type Future = Box + Send>; + type Future = Pin> + Send>>; - fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { - Box::new(nio::write_all(socket, "hello").map(|(io, _)| io)) + fn upgrade_outbound(self, mut socket: Negotiated, _: Self::Info) -> Self::Future { + Box::pin(async move { + socket.write_all(b"hello").await.unwrap(); + socket.flush().await.unwrap(); + Ok(socket) + }) } } @@ -85,7 +91,7 @@ fn upgrade_pipeline() { .and_then(|(peer, mplex), _| { // Gracefully close the connection to allow protocol // negotiation to complete. - util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex)) + util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) }); let dialer_keys = identity::Keypair::generate_ed25519(); @@ -100,27 +106,31 @@ fn upgrade_pipeline() { .and_then(|(peer, mplex), _| { // Gracefully close the connection to allow protocol // negotiation to complete. - util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex)) + util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) }); let listen_addr: Multiaddr = format!("/memory/{}", random::()).parse().unwrap(); - let listener = listener_transport.listen_on(listen_addr.clone()).unwrap() - .filter_map(ListenerEvent::into_upgrade) - .for_each(move |(upgrade, _remote_addr)| { - let dialer = dialer_id.clone(); - upgrade.map(move |(peer, _mplex)| { - assert_eq!(peer, dialer) - }) - }) - .map_err(|e| panic!("Listener error: {}", e)); + + async_std::task::spawn({ + let listen_addr = listen_addr.clone(); + let dialer_id = dialer_id.clone(); + async move { + let mut listener = listener_transport.listen_on(listen_addr).unwrap(); + loop { + let (upgrade, _remote_addr) = match listener.next().await.unwrap().unwrap().into_upgrade() { + Some(u) => u, + None => continue + }; - let dialer = dialer_transport.dial(listen_addr).unwrap() - .map(move |(peer, _mplex)| { - assert_eq!(peer, listener_id) - }); + let (peer, _mplex) = upgrade.await.unwrap(); + assert_eq!(peer, dialer_id); + } + } + }); - let mut rt = tokio::runtime::Runtime::new().unwrap(); - rt.spawn(listener); - rt.block_on(dialer).unwrap() + async_std::task::block_on(async move { + let (peer, _mplex) = dialer_transport.dial(listen_addr).unwrap().await.unwrap(); + assert_eq!(peer, listener_id); + }); } diff --git a/core/tests/util.rs b/core/tests/util.rs index 69b1f936..395e0d9c 100644 --- a/core/tests/util.rs +++ b/core/tests/util.rs @@ -29,11 +29,11 @@ where { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { loop { match std::mem::replace(&mut self.state, CloseMuxerState::Done) { CloseMuxerState::Close(muxer) => { - if muxer.close()?.is_not_ready() { + if !muxer.close(cx)?.is_ready() { self.state = CloseMuxerState::Close(muxer); return Poll::Pending } @@ -45,3 +45,5 @@ where } } +impl Unpin for CloseMuxer { +}