diff --git a/core/src/transport/map_err.rs b/core/src/transport/map_err.rs index 975c02bb..3008f0fd 100644 --- a/core/src/transport/map_err.rs +++ b/core/src/transport/map_err.rs @@ -40,33 +40,22 @@ impl MapErr { impl Transport for MapErr where - T: Transport + 'static, // TODO: 'static :-/ - F: FnOnce(IoError) -> IoError + Clone + 'static, // TODO: 'static :-/ + T: Transport, + F: FnOnce(IoError) -> IoError + Clone, { type Output = T::Output; type MultiaddrFuture = T::MultiaddrFuture; - type Listener = Box>; - type ListenerUpgrade = - Box>; - type Dial = Box>; + type Listener = MapErrListener; + type ListenerUpgrade = MapErrListenerUpgrade; + type Dial = MapErrDial; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { let map = self.map; match self.transport.listen_on(addr) { Ok((stream, listen_addr)) => { - let map2 = map.clone(); - let stream = stream - .map(move |future| { - let map = map.clone(); - let future = future.into_future().map_err(move |err| map(err)); - Box::new(future) as Box<_> - }) - .map_err(move |err| { - let map = map2.clone(); - map(err) - }); - Ok((Box::new(stream), listen_addr)) + let stream = MapErrListener { inner: stream, map }; + Ok((stream, listen_addr)) } Err((transport, addr)) => Err((MapErr { transport, map }, addr)), } @@ -76,10 +65,7 @@ where let map = self.map; match self.transport.dial(addr) { - Ok(future) => { - let future = future.into_future().map_err(move |err| map(err)); - Ok(Box::new(future)) - } + Ok(future) => Ok(MapErrDial { inner: future, map: Some(map) }), Err((transport, addr)) => Err((MapErr { transport, map }, addr)), } } @@ -92,23 +78,166 @@ where impl MuxedTransport for MapErr where - T: MuxedTransport + 'static, // TODO: 'static :-/ - F: FnOnce(IoError) -> IoError + Clone + 'static, // TODO: 'static :-/ + T: MuxedTransport, + F: FnOnce(IoError) -> IoError + Clone, { - type Incoming = Box>; - type IncomingUpgrade = - Box>; + type Incoming = MapErrIncoming; + type IncomingUpgrade = MapErrIncomingUpgrade; + #[inline] fn next_incoming(self) -> Self::Incoming { - let map = self.map; - let map2 = map.clone(); - let future = self.transport - .next_incoming() - .map(move |upgrade| { - let future = upgrade.map_err(map); - Box::new(future) as Box<_> - }) - .map_err(map2); - Box::new(future) + MapErrIncoming { + inner: self.transport.next_incoming(), + map: Some(self.map), + } + } +} + +/// Listening stream for `MapErr`. +pub struct MapErrListener +where T: Transport { + inner: T::Listener, + map: F, +} + +impl Stream for MapErrListener +where T: Transport, + F: FnOnce(IoError) -> IoError + Clone, +{ + type Item = MapErrListenerUpgrade; + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll, Self::Error> { + match try_ready!(self.inner.poll()) { + Some(value) => Ok(Async::Ready( + Some(MapErrListenerUpgrade { inner: value, map: Some(self.map.clone()) }))), + None => Ok(Async::Ready(None)) + } + } +} + +/// Listening upgrade future for `MapErr`. +pub struct MapErrListenerUpgrade +where T: Transport { + inner: T::ListenerUpgrade, + map: Option, +} + +impl Future for MapErrListenerUpgrade +where T: Transport, + F: FnOnce(IoError) -> IoError, +{ + type Item = (T::Output, T::MultiaddrFuture); + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll { + match self.inner.poll() { + Ok(Async::Ready(value)) => { + Ok(Async::Ready(value)) + }, + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => { + let map = self.map.take().expect("poll() called again after error"); + Err(map(err)) + } + } + } +} + +/// Dialing future for `MapErr`. +pub struct MapErrDial +where T: Transport, + F: FnOnce(IoError) -> IoError, +{ + inner: T::Dial, + map: Option, +} + +impl Future for MapErrDial +where T: Transport, + F: FnOnce(IoError) -> IoError, +{ + type Item = (T::Output, T::MultiaddrFuture); + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll { + match self.inner.poll() { + Ok(Async::Ready(value)) => { + Ok(Async::Ready(value)) + }, + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => { + let map = self.map.take().expect("poll() called again after error"); + Err(map(err)) + } + } + } +} + +/// Incoming future for `MapErr`. +pub struct MapErrIncoming +where T: MuxedTransport +{ + inner: T::Incoming, + map: Option, +} + +impl Future for MapErrIncoming +where T: MuxedTransport, + F: FnOnce(IoError) -> IoError, +{ + type Item = MapErrIncomingUpgrade; + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll { + match self.inner.poll() { + Ok(Async::Ready(value)) => { + let map = self.map.take().expect("poll() called again after error"); + let value = MapErrIncomingUpgrade { + inner: value, + map: Some(map), + }; + Ok(Async::Ready(value)) + }, + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => { + let map = self.map.take().expect("poll() called again after error"); + Err(map(err)) + } + } + } +} + +/// Incoming upgrade future for `MapErr`. +pub struct MapErrIncomingUpgrade +where T: MuxedTransport +{ + inner: T::IncomingUpgrade, + map: Option, +} + +impl Future for MapErrIncomingUpgrade +where T: MuxedTransport, + F: FnOnce(IoError) -> IoError, +{ + type Item = (T::Output, T::MultiaddrFuture); + type Error = IoError; + + #[inline] + fn poll(&mut self) -> Poll { + match self.inner.poll() { + Ok(Async::Ready(value)) => { + Ok(Async::Ready(value)) + }, + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(err) => { + let map = self.map.take().expect("poll() called again after error"); + Err(map(err)) + } + } } }