diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs index f2e35842..68287dc0 100644 --- a/core/src/connection_reuse.rs +++ b/core/src/connection_reuse.rs @@ -129,7 +129,7 @@ enum PeerState where M: StreamMuxer { // TODO: stronger Future type Pending { /// Future that produces the muxer. - future: Box>, + future: Box + Send>, /// All the tasks to notify when `future` resolves. notify: FnvHashMap, }, @@ -164,15 +164,19 @@ where impl Transport for ConnectionReuse where - T: Transport + 'static, // TODO: 'static :( + T: Transport + Send + 'static, // TODO: 'static :( + T::Dial: Send, + T::MultiaddrFuture: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, T: Transport + Clone + 'static, // TODO: 'static :( - M: StreamMuxer + 'static, - D: Clone + 'static, + M: Send + Sync + StreamMuxer + 'static, + D: Send + Clone + 'static, T: Clone, { type Output = (D, ConnectionReuseSubstream); type MultiaddrFuture = future::FutureResult; - type Listener = Box>; + type Listener = Box + Send>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; type Dial = ConnectionReuseDial; @@ -245,10 +249,14 @@ where impl MuxedTransport for ConnectionReuse where - T: Transport + 'static, // TODO: 'static :( + T: Transport + Send + 'static, // TODO: 'static :( + T::Dial: Send, + T::MultiaddrFuture: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, T: Transport + Clone + 'static, // TODO: 'static :( - M: StreamMuxer + 'static, - D: Clone + 'static, + M: Send + Sync + StreamMuxer + 'static, + D: Send + Clone + 'static, T: Clone, { type Incoming = ConnectionReuseIncoming; @@ -305,10 +313,10 @@ where impl Future for ConnectionReuseDial where T: Transport + Clone, - M: StreamMuxer + 'static, - D: Clone + 'static, - ::Dial: 'static, - ::MultiaddrFuture: 'static, + M: Send + StreamMuxer + 'static, + D: Send + Clone + 'static, + ::Dial: Send + 'static, + ::MultiaddrFuture: Send + 'static, { type Item = ((D, ConnectionReuseSubstream), FutureResult); type Error = IoError; @@ -471,7 +479,7 @@ where /// Identifier for this listener. Used to determine which connections were opened by it. listener_id: u64, /// Opened connections that need to be upgraded. - current_upgrades: FuturesUnordered>>, + current_upgrades: FuturesUnordered + Send>>, /// Shared between the whole connection reuse mechanism. shared: Arc>>, @@ -484,7 +492,7 @@ where M: StreamMuxer, D: Clone, L: Stream, - Lu: Future + 'static, + Lu: Future + Send + 'static, { type Item = FutureResult<((D, ConnectionReuseSubstream), FutureResult), IoError>; type Error = IoError; diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 35f58857..f7d95ee5 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -41,7 +41,7 @@ pub fn swarm( ) -> (SwarmController, SwarmEvents) where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ - H: FnMut(T::Output, Box>) -> F, + H: FnMut(T::Output, Box + Send>) -> F, F: IntoFuture, { let shared = Arc::new(Mutex::new(Shared { @@ -105,6 +105,11 @@ where impl SwarmController where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ + T::Dial: Send, + T::MultiaddrFuture: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, + T::Output: Send, F: 'static, { /// Asks the swarm to dial the node with the given multiaddress. The connection is then @@ -119,6 +124,8 @@ where -> Result, Multiaddr> where Du: Transport + 'static, // TODO: 'static :-/ + Du::Dial: Send, + Du::MultiaddrFuture: Send, Du::Output: Into, { self.dial_then(multiaddr, transport, |v| v) @@ -132,8 +139,10 @@ where -> Result, Multiaddr> where Du: Transport + 'static, // TODO: 'static :-/ + Du::Dial: Send, + Du::MultiaddrFuture: Send, Du::Output: Into, - TThen: FnOnce(Result<(), IoError>) -> Result<(), IoError> + 'static, + TThen: FnOnce(Result<(), IoError>) -> Result<(), IoError> + Send + 'static, { trace!("Swarm dialing {}", multiaddr); @@ -148,12 +157,12 @@ where let mut then = Box::new(move |val: Result<(), IoError>| { let then = then.take().expect("The Boxed FnMut should only be called once"); then(val); - }) as Box; + }) as Box; let dial = dial.then(|result| { match result { Ok((output, client_addr)) => { - let client_addr = Box::new(client_addr) as Box>; + let client_addr = Box::new(client_addr) as Box + Send>; Ok((output.into(), then, client_addr)) } Err(err) => { @@ -206,12 +215,12 @@ where let listener = Box::new( listener.map(|f| { let f = f.map(|(out, maf)| { - (out, Box::new(maf) as Box>) + (out, Box::new(maf) as Box + Send>) }); - Box::new(f) as Box> + Box::new(f) as Box + Send> }), - ) as Box>; + ) as Box + Send>; shared.listeners.push((new_addr.clone(), listener.into_future())); if let Some(task) = shared.task_to_notify.take() { task.notify(); @@ -242,7 +251,9 @@ where impl Stream for SwarmEvents where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/, - H: FnMut(T::Output, Box>) -> If, + T::MultiaddrFuture: Send, + T::IncomingUpgrade: Send, + H: FnMut(T::Output, Box + Send>) -> If, If: IntoFuture, F: Future + 'static, // TODO: 'static :-/ { @@ -259,7 +270,7 @@ where debug!("Swarm received new multiplexed incoming connection"); shared.next_incoming = self.transport.clone().next_incoming(); let connec = connec.map(|(out, maf)| { - (out, Box::new(maf) as Box>) + (out, Box::new(maf) as Box + Send>) }); shared.listeners_upgrade.push(Box::new(connec) as Box<_>); } @@ -393,21 +404,21 @@ struct Shared where T: MuxedTransport + 'static { StreamFuture< Box< Stream< - Item = Box>), Error = IoError>>, + Item = Box + Send>), Error = IoError> + Send>, Error = IoError, - >, + > + Send, >, >, )>, /// Futures that upgrade an incoming listening connection to a full connection. listeners_upgrade: - Vec>), Error = IoError>>>, + Vec + Send>), Error = IoError> + Send>>, /// Futures that dial a remote address. /// /// Contains the address we dial, so that we can cancel it if necessary. - dialers: Vec<(Multiaddr, Box)>, Box>), Error = IoError>>)>, + dialers: Vec<(Multiaddr, Box) + Send>, Box + Send>), Error = IoError> + Send>)>, /// List of futures produced by the swarm closure. Must be processed to the end. to_process: Vec, diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index 94b341bf..dcfb6664 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -40,15 +40,18 @@ pub struct AndThen { impl Transport for AndThen where T: Transport + 'static, - C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + 'static, - F: Future + 'static, + T::Dial: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, + C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + Send + 'static, + F: Future + Send + 'static, Maf: Future + 'static, { type Output = O; type MultiaddrFuture = Maf; - type Listener = Box>; - type ListenerUpgrade = Box>; - type Dial = Box>; + type Listener = Box + Send>; + type ListenerUpgrade = Box + Send>; + type Dial = Box + Send>; #[inline] fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -116,12 +119,17 @@ where impl MuxedTransport for AndThen where T: MuxedTransport + 'static, - C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + 'static, - F: Future + 'static, + T::Dial: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, + T::Incoming: Send, + T::IncomingUpgrade: Send, + C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + Send + 'static, + F: Future + Send + 'static, Maf: Future + 'static, { - type Incoming = Box>; - type IncomingUpgrade = Box>; + type Incoming = Box + Send>; + type IncomingUpgrade = Box + Send>; #[inline] fn next_incoming(self) -> Self::Incoming { @@ -134,7 +142,7 @@ where upgrade(connection, Endpoint::Listener, client_addr) }); - Box::new(future) as Box> + Box::new(future) as Box + Send> }); Box::new(future) as Box<_> diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index e977bd8f..78288cf9 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -85,26 +85,26 @@ impl MuxedTransport for OrTransport where A: MuxedTransport, B: MuxedTransport, - A::Incoming: 'static, // TODO: meh :-/ - B::Incoming: 'static, // TODO: meh :-/ - A::IncomingUpgrade: 'static, // TODO: meh :-/ - B::IncomingUpgrade: 'static, // TODO: meh :-/ + A::Incoming: Send + 'static, // TODO: meh :-/ + B::Incoming: Send + 'static, // TODO: meh :-/ + A::IncomingUpgrade: Send + 'static, // TODO: meh :-/ + B::IncomingUpgrade: Send + 'static, // TODO: meh :-/ A::Output: 'static, // TODO: meh :-/ B::Output: 'static, // TODO: meh :-/ { - type Incoming = Box>; + type Incoming = Box + Send>; type IncomingUpgrade = - Box, Self::MultiaddrFuture), Error = IoError>>; + Box, Self::MultiaddrFuture), Error = IoError> + Send>; #[inline] fn next_incoming(self) -> Self::Incoming { let first = self.0.next_incoming().map(|out| { let fut = out.map(move |(v, addr)| (EitherOutput::First(v), future::Either::A(addr))); - Box::new(fut) as Box> + Box::new(fut) as Box + Send> }); let second = self.1.next_incoming().map(|out| { let fut = out.map(move |(v, addr)| (EitherOutput::Second(v), future::Either::B(addr))); - Box::new(fut) as Box> + Box::new(fut) as Box + Send> }); let future = first.select(second).map(|(i, _)| i).map_err(|(e, _)| e); Box::new(future) as Box<_> diff --git a/core/src/transport/map.rs b/core/src/transport/map.rs index 77bf43c3..d4e09068 100644 --- a/core/src/transport/map.rs +++ b/core/src/transport/map.rs @@ -42,13 +42,16 @@ impl Map { impl Transport for Map where T: Transport + 'static, // TODO: 'static :-/ - F: FnOnce(T::Output, Endpoint) -> D + Clone + 'static, // TODO: 'static :-/ + T::Dial: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, + F: FnOnce(T::Output, Endpoint) -> D + Clone + Send + 'static, // TODO: 'static :-/ { type Output = D; type MultiaddrFuture = T::MultiaddrFuture; - type Listener = Box>; - type ListenerUpgrade = Box>; - type Dial = Box>; + type Listener = Box + Send>; + type ListenerUpgrade = Box + Send>; + type Dial = Box + Send>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { let map = self.map; @@ -91,10 +94,15 @@ where impl MuxedTransport for Map where T: MuxedTransport + 'static, // TODO: 'static :-/ - F: FnOnce(T::Output, Endpoint) -> D + Clone + 'static, // TODO: 'static :-/ + T::Dial: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, + T::Incoming: Send, + T::IncomingUpgrade: Send, + F: FnOnce(T::Output, Endpoint) -> D + Clone + Send + 'static, // TODO: 'static :-/ { - type Incoming = Box>; - type IncomingUpgrade = Box>; + type Incoming = Box + Send>; + type IncomingUpgrade = Box + Send>; fn next_incoming(self) -> Self::Incoming { let map = self.map; diff --git a/core/src/transport/map_err_dial.rs b/core/src/transport/map_err_dial.rs index c90f7349..f99ff8e9 100644 --- a/core/src/transport/map_err_dial.rs +++ b/core/src/transport/map_err_dial.rs @@ -41,13 +41,14 @@ impl MapErrDial { impl Transport for MapErrDial where T: Transport + 'static, // TODO: 'static :-/ - F: FnOnce(IoError, Multiaddr) -> IoError + Clone + 'static, // TODO: 'static :-/ + T::Dial: Send, + F: FnOnce(IoError, Multiaddr) -> IoError + Clone + Send + 'static, // TODO: 'static :-/ { type Output = T::Output; type MultiaddrFuture = T::MultiaddrFuture; type Listener = T::Listener; type ListenerUpgrade = T::ListenerUpgrade; - type Dial = Box>; + type Dial = Box + Send>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { match self.transport.listen_on(addr) { @@ -77,7 +78,8 @@ where impl MuxedTransport for MapErrDial where T: MuxedTransport + 'static, // TODO: 'static :-/ - F: FnOnce(IoError, Multiaddr) -> IoError + Clone + 'static, // TODO: 'static :-/ + T::Dial: Send, + F: FnOnce(IoError, Multiaddr) -> IoError + Clone + Send + 'static, // TODO: 'static :-/ { type Incoming = T::Incoming; type IncomingUpgrade = T::IncomingUpgrade; diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index ffbb2069..2b152dba 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -50,12 +50,12 @@ impl Clone for Dialer { } } -impl Transport for Dialer { +impl Transport for Dialer { type Output = Channel; - type Listener = Box>; + type Listener = Box + Send>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), io::Error>; type MultiaddrFuture = FutureResult; - type Dial = Box>; + type Dial = Box + Send>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { Err((self, addr)) @@ -93,12 +93,12 @@ impl Clone for Listener { } } -impl Transport for Listener { +impl Transport for Listener { type Output = Channel; - type Listener = Box>; + type Listener = Box + Send>; type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), io::Error>; type MultiaddrFuture = FutureResult; - type Dial = Box>; + type Dial = Box + Send>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { if !is_memory_addr(&addr) { @@ -200,12 +200,12 @@ mod tests { #[derive(Clone)] struct Echo(mpsc::UnboundedSender<()>); - impl ConnectionUpgrade, Maf> for Echo { + impl ConnectionUpgrade, Maf> for Echo { type NamesIter = iter::Once<(Bytes, ())>; type UpgradeIdentifier = (); type Output = (); type MultiaddrFuture = Maf; - type Future = Box>; + type Future = Box + Send>; fn protocol_names(&self) -> Self::NamesIter { iter::once(("/echo/1.0.0".into(), ())) diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index f1a4fd17..8d9fc621 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -47,8 +47,15 @@ impl UpgradedNode { impl<'a, T, C> UpgradedNode where T: Transport + 'a, - T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade + 'a, + T::Dial: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, + T::MultiaddrFuture: Send, + T::Output: Send + AsyncRead + AsyncWrite, + C: ConnectionUpgrade + Send + 'a, + C::NamesIter: Send, + C::Future: Send, + C::UpgradeIdentifier: Send, { /// Returns a reference to the inner `Transport`. #[inline] @@ -65,7 +72,7 @@ where pub fn dial( self, addr: Multiaddr, - ) -> Result + 'a>, (Self, Multiaddr)> + ) -> Result + Send + 'a>, (Self, Multiaddr)> where C::NamesIter: Clone, // TODO: not elegant { @@ -101,13 +108,15 @@ where self, ) -> Box< Future< - Item = Box + 'a>, + Item = Box + Send + 'a>, Error = IoError, > - + 'a, + + Send + 'a, > where T: MuxedTransport, + T::Incoming: Send, + T::IncomingUpgrade: Send, C::NamesIter: Clone, // TODO: not elegant C: Clone, { @@ -119,7 +128,7 @@ where apply(connection, upgrade, Endpoint::Listener, client_addr) }); - Box::new(future) as Box> + Box::new(future) as Box + Send> }); Box::new(future) as Box<_> @@ -138,9 +147,10 @@ where ( Box< Stream< - Item = Box + 'a>, + Item = Box + Send + 'a>, Error = IoError, > + + Send + 'a, >, Multiaddr, @@ -187,17 +197,22 @@ where impl Transport for UpgradedNode where T: Transport + 'static, - T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade + 'static, + T::Dial: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, + T::MultiaddrFuture: Send, + T::Output: Send + AsyncRead + AsyncWrite, + C: ConnectionUpgrade + Clone + Send + 'static, C::MultiaddrFuture: Future, - C::NamesIter: Clone, // TODO: not elegant - C: Clone, + C::NamesIter: Clone + Send, + C::Future: Send, + C::UpgradeIdentifier: Send, { type Output = C::Output; type MultiaddrFuture = C::MultiaddrFuture; - type Listener = Box>; - type ListenerUpgrade = Box>; - type Dial = Box>; + type Listener = Box + Send>; + type ListenerUpgrade = Box + Send>; + type Dial = Box + Send>; #[inline] fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -218,14 +233,21 @@ where impl MuxedTransport for UpgradedNode where T: MuxedTransport + 'static, - T::Output: AsyncRead + AsyncWrite, - C: ConnectionUpgrade + 'static, + T::Dial: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, + T::MultiaddrFuture: Send, + T::Output: Send + AsyncRead + AsyncWrite, + T::Incoming: Send, + T::IncomingUpgrade: Send, + C: ConnectionUpgrade + Clone + Send + 'static, C::MultiaddrFuture: Future, - C::NamesIter: Clone, // TODO: not elegant - C: Clone, + C::NamesIter: Clone + Send, + C::Future: Send, + C::UpgradeIdentifier: Send, { - type Incoming = Box>; - type IncomingUpgrade = Box>; + type Incoming = Box + Send>; + type IncomingUpgrade = Box + Send>; #[inline] fn next_incoming(self) -> Self::Incoming { diff --git a/core/src/unique.rs b/core/src/unique.rs index 084596f0..0e621587 100644 --- a/core/src/unique.rs +++ b/core/src/unique.rs @@ -101,10 +101,17 @@ impl UniqueConnec { #[inline] pub fn dial(&self, swarm: &SwarmController, multiaddr: &Multiaddr, transport: Du) -> UniqueConnecFuture - where T: Clone + 'static, // TODO: 'static :-/ + where T: Clone + Send + 'static, // TODO: 'static :-/ Du: Transport + 'static, // TODO: 'static :-/ Du::Output: Into, + Du::Dial: Send, + Du::MultiaddrFuture: Send, S: Clone + MuxedTransport, + S::Dial: Send, + S::Listener: Send, + S::ListenerUpgrade: Send, + S::Output: Send, + S::MultiaddrFuture: Send, F: 'static, { self.dial_inner(swarm, multiaddr, transport, true) @@ -115,10 +122,17 @@ impl UniqueConnec { #[inline] pub fn dial_if_empty(&self, swarm: &SwarmController, multiaddr: &Multiaddr, transport: Du) -> UniqueConnecFuture - where T: Clone + 'static, // TODO: 'static :-/ + where T: Clone + Send + 'static, // TODO: 'static :-/ Du: Transport + 'static, // TODO: 'static :-/ Du::Output: Into, + Du::Dial: Send, + Du::MultiaddrFuture: Send, S: Clone + MuxedTransport, + S::Dial: Send, + S::Listener: Send, + S::ListenerUpgrade: Send, + S::Output: Send, + S::MultiaddrFuture: Send, F: 'static, { self.dial_inner(swarm, multiaddr, transport, false) @@ -127,10 +141,17 @@ impl UniqueConnec { /// Inner implementation of `dial_*`. fn dial_inner(&self, swarm: &SwarmController, multiaddr: &Multiaddr, transport: Du, dial_if_err: bool) -> UniqueConnecFuture - where T: Clone + 'static, // TODO: 'static :-/ + where T: Clone + Send + 'static, // TODO: 'static :-/ Du: Transport + 'static, // TODO: 'static :-/ Du::Output: Into, + Du::Dial: Send, + Du::MultiaddrFuture: Send, S: Clone + MuxedTransport, + S::Dial: Send, + S::Listener: Send, + S::ListenerUpgrade: Send, + S::Output: Send, + S::MultiaddrFuture: Send, F: 'static, { let mut inner = self.inner.lock(); diff --git a/core/src/upgrade/loop_upg.rs b/core/src/upgrade/loop_upg.rs index f3e67c5d..faf761c0 100644 --- a/core/src/upgrade/loop_upg.rs +++ b/core/src/upgrade/loop_upg.rs @@ -64,18 +64,21 @@ pub struct LoopUpg { impl ConnectionUpgrade<(State, Socket), AddrFut> for LoopUpg where - State: 'static, - Socket: AsyncRead + AsyncWrite + 'static, + State: Send + 'static, + Socket: AsyncRead + AsyncWrite + Send + 'static, Inner: ConnectionUpgrade< (State, Socket), AddrFut, Output = Loop, MultiaddrFuture = AddrFut, > + Clone + + Send + 'static, - Inner::NamesIter: Clone + 'static, - AddrFut: 'static, - Out: 'static, + Inner::NamesIter: Clone + Send + 'static, + Inner::UpgradeIdentifier: Send, + Inner::Future: Send, + AddrFut: Send + 'static, + Out: Send + 'static, { type NamesIter = Inner::NamesIter; type UpgradeIdentifier = Inner::UpgradeIdentifier; @@ -86,7 +89,7 @@ where type Output = Out; type MultiaddrFuture = AddrFut; - type Future = Box>; + type Future = Box + Send>; fn upgrade( self, diff --git a/core/src/upgrade/map.rs b/core/src/upgrade/map.rs index 98eb4e3f..92f365ec 100644 --- a/core/src/upgrade/map.rs +++ b/core/src/upgrade/map.rs @@ -39,9 +39,9 @@ pub struct Map { impl ConnectionUpgrade for Map where U: ConnectionUpgrade, - U::Future: 'static, // TODO: 'static :( + U::Future: Send + 'static, // TODO: 'static :( C: AsyncRead + AsyncWrite, - F: FnOnce(U::Output) -> O + 'static, // TODO: 'static :( + F: FnOnce(U::Output) -> O + Send + 'static, // TODO: 'static :( { type NamesIter = U::NamesIter; type UpgradeIdentifier = U::UpgradeIdentifier; @@ -52,7 +52,7 @@ where type Output = O; type MultiaddrFuture = U::MultiaddrFuture; - type Future = Box>; + type Future = Box + Send>; fn upgrade( self, diff --git a/core/src/upgrade/map_addr.rs b/core/src/upgrade/map_addr.rs index 9a60fb02..4b6de993 100644 --- a/core/src/upgrade/map_addr.rs +++ b/core/src/upgrade/map_addr.rs @@ -42,11 +42,11 @@ pub struct MapAddr { impl ConnectionUpgrade for MapAddr where U: ConnectionUpgrade, - U::Future: 'static, // TODO: 'static :( - U::MultiaddrFuture: Future + 'static, // TODO: 'static :( - U::Output: 'static, // TODO: 'static :( + U::Future: Send + 'static, // TODO: 'static :( + U::MultiaddrFuture: Future + Send + 'static, // TODO: 'static :( + U::Output: Send + 'static, // TODO: 'static :( C: AsyncRead + AsyncWrite, - F: FnOnce(U::Output, &Multiaddr) -> O + 'static, // TODO: 'static :( + F: FnOnce(U::Output, &Multiaddr) -> O + Send + 'static, // TODO: 'static :( { type NamesIter = U::NamesIter; type UpgradeIdentifier = U::UpgradeIdentifier; @@ -57,7 +57,7 @@ where type Output = O; type MultiaddrFuture = future::FutureResult; - type Future = Box>; + type Future = Box + Send>; fn upgrade( self, diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index 3a9c3fc3..fcdcb6d0 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -10,7 +10,7 @@ futures = { version = "0.1" } log = "0.4" smallvec = "0.5" tokio-io = "0.1" -unsigned-varint = { version = "0.2", features = ["codec"] } +unsigned-varint = { version = "0.2.1", features = ["codec"] } [dev-dependencies] tokio-current-thread = "0.1" diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 4692f251..0f2fbf82 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -13,7 +13,7 @@ log = "0.4" parking_lot = "0.6" tokio-codec = "0.1" tokio-io = "0.1" -unsigned-varint = { version = "0.2", features = ["codec"] } +unsigned-varint = { version = "0.2.1", features = ["codec"] } [dev-dependencies] libp2p-tcp-transport = { path = "../../transports/tcp" } diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index e543b52a..6c696e8a 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -18,4 +18,4 @@ protobuf = "2.0.2" smallvec = "0.6.0" tokio-codec = "0.1" tokio-io = "0.1" -unsigned-varint = { version = "0.2", features = ["codec"] } +unsigned-varint = { version = "0.2.1", features = ["codec"] } diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index f90fb127..f2b74c4b 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -91,8 +91,8 @@ impl FloodSubUpgrade { impl ConnectionUpgrade for FloodSubUpgrade where - C: AsyncRead + AsyncWrite + 'static, - Maf: Future + 'static, + C: AsyncRead + AsyncWrite + Send + 'static, + Maf: Future + Send + 'static, { type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; type UpgradeIdentifier = (); @@ -104,7 +104,7 @@ where type Output = FloodSubFuture; type MultiaddrFuture = future::FutureResult; - type Future = Box>; + type Future = Box + Send>; #[inline] fn upgrade( @@ -208,7 +208,7 @@ where // TODO: what if multiple connections? inner.remote_connections.write().remove(&remote_addr); let future = future::ok(future::Loop::Break(())); - Box::new(future) as Box> + Box::new(future) as Box + Send> } } }) @@ -521,7 +521,7 @@ pub struct Message { /// Implementation of `Future` that must be driven to completion in order for floodsub to work. #[must_use = "futures do nothing unless polled"] pub struct FloodSubFuture { - inner: Box>, + inner: Box + Send>, } impl Future for FloodSubFuture { diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index 1df3e221..365bf099 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -16,7 +16,7 @@ parking_lot = "0.6" protobuf = "2.0.2" tokio-codec = "0.1" tokio-io = "0.1.0" -unsigned-varint = { version = "0.2", features = ["codec"] } +unsigned-varint = { version = "0.2.1", features = ["codec"] } [dev-dependencies] libp2p-tcp-transport = { path = "../../transports/tcp" } diff --git a/protocols/identify/src/identify_transport.rs b/protocols/identify/src/identify_transport.rs index bee1d5bb..dc039c0d 100644 --- a/protocols/identify/src/identify_transport.rs +++ b/protocols/identify/src/identify_transport.rs @@ -48,7 +48,7 @@ impl Clone for IdentifyTransport } } -type CacheEntry = future::Shared>>; +type CacheEntry = future::Shared + Send>>; impl IdentifyTransport { /// Creates an `IdentifyTransport` that wraps around the given transport and peerstore. @@ -63,14 +63,18 @@ impl IdentifyTransport { impl Transport for IdentifyTransport where - Trans: Transport + Clone + 'static, // TODO: 'static :( - Trans::Output: AsyncRead + AsyncWrite, + Trans: Transport + Clone + Send + 'static, // TODO: 'static :( + Trans::Dial: Send, + Trans::Listener: Send, + Trans::ListenerUpgrade: Send, + Trans::MultiaddrFuture: Send, + Trans::Output: AsyncRead + AsyncWrite + Send, { type Output = IdentifyTransportOutput; type MultiaddrFuture = future::FutureResult; - type Listener = Box>; - type ListenerUpgrade = Box>; - type Dial = Box>; + type Listener = Box + Send>; + type ListenerUpgrade = Box + Send>; + type Dial = Box + Send>; #[inline] fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -139,7 +143,7 @@ where Ok((out, future::ok(client_addr))) }); - Box::new(fut) as Box> + Box::new(fut) as Box + Send> }); Ok((Box::new(listener) as Box<_>, new_addr)) @@ -213,11 +217,17 @@ where impl MuxedTransport for IdentifyTransport where - Trans: MuxedTransport + Clone + 'static, - Trans::Output: AsyncRead + AsyncWrite, + Trans: MuxedTransport + Clone + Send + 'static, + Trans::Dial: Send, + Trans::Listener: Send, + Trans::ListenerUpgrade: Send, + Trans::MultiaddrFuture: Send, + Trans::Output: AsyncRead + AsyncWrite + Send, + Trans::Incoming: Send, + Trans::IncomingUpgrade: Send, { - type Incoming = Box>; - type IncomingUpgrade = Box>; + type Incoming = Box + Send>; + type IncomingUpgrade = Box + Send>; #[inline] fn next_incoming(self) -> Self::Incoming { @@ -274,7 +284,7 @@ where Ok((out, future::ok(client_addr))) }); - Box::new(future) as Box> + Box::new(future) as Box + Send> }); Box::new(future) as Box<_> @@ -286,7 +296,7 @@ pub struct IdentifyTransportOutput { /// The socket to communicate with the remote. pub socket: S, /// Outcome of the identification of the remote. - pub info: Box>, + pub info: Box + Send>, } /// Outcome of the identification of the remote. @@ -301,7 +311,7 @@ pub struct IdentifyTransportOutcome { fn cache_entry(cache: &Mutex>, addr: Multiaddr, if_no_entry: F) -> impl Future where F: FnOnce() -> Fut, - Fut: Future + 'static, + Fut: Future + Send + 'static, { trace!("Looking up cache entry for {}", addr); let mut cache = cache.lock(); @@ -313,7 +323,7 @@ where F: FnOnce() -> Fut, Entry::Vacant(entry) => { trace!("No cache entry available"); - let future = (Box::new(if_no_entry()) as Box>).shared(); + let future = (Box::new(if_no_entry()) as Box + Send>).shared(); entry.insert(future.clone()); future::Either::B(future) }, diff --git a/protocols/identify/src/peer_id_transport.rs b/protocols/identify/src/peer_id_transport.rs index e96a2dd3..1d1d3536 100644 --- a/protocols/identify/src/peer_id_transport.rs +++ b/protocols/identify/src/peer_id_transport.rs @@ -45,16 +45,21 @@ impl PeerIdTransport { impl Transport for PeerIdTransport where - Trans: Transport + Clone + 'static, // TODO: 'static :( - Trans::Output: AsyncRead + AsyncWrite, + Trans: Transport + Clone + Send + 'static, // TODO: 'static :( + Trans::Dial: Send, + Trans::Listener: Send, + Trans::ListenerUpgrade: Send, + Trans::MultiaddrFuture: Send, + Trans::Output: AsyncRead + AsyncWrite + Send, AddrRes: Fn(PeerId) -> AddrResOut + 'static, // TODO: 'static :( AddrResOut: IntoIterator + 'static, // TODO: 'static :( + AddrResOut::IntoIter: Send, { type Output = PeerIdTransportOutput; - type MultiaddrFuture = Box>; - type Listener = Box>; - type ListenerUpgrade = Box>; - type Dial = Box>; + type MultiaddrFuture = Box + Send>; + type Listener = Box + Send>; + type ListenerUpgrade = Box + Send>; + type Dial = Box + Send>; #[inline] fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -94,11 +99,11 @@ where let peer_id = info.info.public_key.clone().into_peer_id(); debug!("Identified {} as {:?}", original_addr, peer_id); AddrComponent::P2P(peer_id.into()).into() - })) as Box>; + })) as Box + Send>; (out, real_addr) }); - Box::new(fut) as Box> + Box::new(fut) as Box + Send> }); Ok((Box::new(listener) as Box<_>, listened_addr)) @@ -155,7 +160,7 @@ where original_addr: original_addr, }; // Replace the multiaddress with the one of the form `/p2p/...` or `/ipfs/...`. - Ok((out, Box::new(future::ok(addr)) as Box>)) + Ok((out, Box::new(future::ok(addr)) as Box + Send>)) }); Ok(Box::new(future) as Box<_>) @@ -195,7 +200,7 @@ where let peer_id = info.info.public_key.clone().into_peer_id(); debug!("Identified {} as {:?}", original_addr, peer_id); AddrComponent::P2P(peer_id.into()).into() - })) as Box>; + })) as Box + Send>; (out, real_addr) }); @@ -212,13 +217,20 @@ where impl MuxedTransport for PeerIdTransport where - Trans: MuxedTransport + Clone + 'static, - Trans::Output: AsyncRead + AsyncWrite, + Trans: MuxedTransport + Clone + Send + 'static, + Trans::Dial: Send, + Trans::Listener: Send, + Trans::ListenerUpgrade: Send, + Trans::MultiaddrFuture: Send, + Trans::Output: AsyncRead + AsyncWrite + Send, + Trans::Incoming: Send, + Trans::IncomingUpgrade: Send, AddrRes: Fn(PeerId) -> AddrResOut + 'static, // TODO: 'static :( AddrResOut: IntoIterator + 'static, // TODO: 'static :( + AddrResOut::IntoIter: Send, { - type Incoming = Box>; - type IncomingUpgrade = Box>; + type Incoming = Box + Send>; + type IncomingUpgrade = Box + Send>; #[inline] fn next_incoming(self) -> Self::Incoming { @@ -243,11 +255,11 @@ where let peer_id = info.info.public_key.clone().into_peer_id(); debug!("Identified {} as {:?}", original_addr, peer_id); AddrComponent::P2P(peer_id.into()).into() - })) as Box>; + })) as Box + Send>; (out, real_addr) }); - Box::new(future) as Box> + Box::new(future) as Box + Send> }); Box::new(future) as Box<_> @@ -261,7 +273,7 @@ pub struct PeerIdTransportOutput { /// Identification of the remote. /// This may not be known immediately, hence why we use a future. - pub info: Box>, + pub info: Box + Send>, /// Original address of the remote. /// This layer turns the address of the remote into the `/p2p/...` form, but stores the @@ -313,8 +325,8 @@ mod tests { impl Transport for UnderlyingTrans { type Output = ::Output; type MultiaddrFuture = ::MultiaddrFuture; - type Listener = Box>; - type ListenerUpgrade = Box>; + type Listener = Box + Send>; + type ListenerUpgrade = Box + Send>; type Dial = ::Dial; #[inline] fn listen_on( diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 0ed2bab2..31506a02 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -61,7 +61,7 @@ pub struct IdentifySender { impl<'a, T> IdentifySender where - T: AsyncWrite + 'a, + T: AsyncWrite + Send + 'a, { /// Sends back information to the client. Returns a future that is signalled whenever the /// info have been sent. @@ -69,7 +69,7 @@ where self, info: IdentifyInfo, observed_addr: &Multiaddr, - ) -> Box + 'a> { + ) -> Box + Send + 'a> { debug!("Sending identify info to client"); trace!("Sending: {:?}", info); @@ -113,14 +113,14 @@ pub struct IdentifyInfo { impl ConnectionUpgrade for IdentifyProtocolConfig where - C: AsyncRead + AsyncWrite + 'static, - Maf: Future + 'static, + C: AsyncRead + AsyncWrite + Send + 'static, + Maf: Future + Send + 'static, { type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; type UpgradeIdentifier = (); type Output = IdentifyOutput; type MultiaddrFuture = future::Either, Maf>; - type Future = Box>; + type Future = Box + Send>; #[inline] fn protocol_names(&self) -> Self::NamesIter { diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 8f38f9ca..e733f5e6 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -24,7 +24,7 @@ smallvec = "0.5" tokio-codec = "0.1" tokio-io = "0.1" tokio-timer = "0.2.6" -unsigned-varint = { version = "0.2", features = ["codec"] } +unsigned-varint = { version = "0.2.1", features = ["codec"] } [dev-dependencies] libp2p-tcp-transport = { path = "../../transports/tcp" } diff --git a/protocols/kad/src/high_level.rs b/protocols/kad/src/high_level.rs index 90e8c430..d838ed6d 100644 --- a/protocols/kad/src/high_level.rs +++ b/protocols/kad/src/high_level.rs @@ -76,8 +76,9 @@ impl KadSystem { /// This future should be driven to completion by the caller. pub fn start<'a, F, Fut>(config: KadSystemConfig>, access: F) -> (KadSystem, impl Future + 'a) - where F: FnMut(&PeerId) -> Fut + Clone + 'a, - Fut: IntoFuture + 'a, + where F: FnMut(&PeerId) -> Fut + Send + Clone + 'a, + Fut: IntoFuture + 'a, + Fut::Future: Send, { let system = KadSystem::without_init(config); let init_future = system.perform_initialization(access); @@ -102,8 +103,9 @@ impl KadSystem { /// Starts an initialization process. pub fn perform_initialization<'a, F, Fut>(&self, access: F) -> impl Future + 'a - where F: FnMut(&PeerId) -> Fut + Clone + 'a, + where F: FnMut(&PeerId) -> Fut + Send + Clone + 'a, Fut: IntoFuture + 'a, + Fut::Future: Send, { let futures: Vec<_> = (0..256) // TODO: 256 is arbitrary .map(|n| { @@ -147,8 +149,9 @@ impl KadSystem { /// Starts a query for an iterative `FIND_NODE` request. pub fn find_node<'a, F, Fut>(&self, searched_key: PeerId, access: F) -> impl Stream>, Error = IoError> + 'a - where F: FnMut(&PeerId) -> Fut + 'a, + where F: FnMut(&PeerId) -> Fut + Send + 'a, Fut: IntoFuture + 'a, + Fut::Future: Send, { query(access, &self.kbuckets, searched_key, self.parallelism as usize, 20, self.request_timeout) // TODO: arbitrary const @@ -162,14 +165,15 @@ impl KadSystem { fn refresh<'a, F, Fut>(bucket_num: usize, access: F, kbuckets: &KBucketsTable, parallelism: usize, request_timeout: Duration) -> impl Stream, Error = IoError> + 'a -where F: FnMut(&PeerId) -> Fut + 'a, +where F: FnMut(&PeerId) -> Fut + Send + 'a, Fut: IntoFuture + 'a, + Fut::Future: Send, { let peer_id = match gen_random_id(kbuckets.my_id(), bucket_num) { Ok(p) => p, Err(()) => { let stream = stream::once(Ok(KadQueryEvent::Finished(()))); - return Box::new(stream) as Box>; + return Box::new(stream) as Box + Send>; }, }; @@ -180,7 +184,7 @@ where F: FnMut(&PeerId) -> Fut + 'a, KadQueryEvent::Finished(_) => KadQueryEvent::Finished(()), } }); - Box::new(stream) as Box> + Box::new(stream) as Box + Send> } // Generates a random `PeerId` that belongs to the given bucket. @@ -227,6 +231,7 @@ fn query<'a, F, Fut>( ) -> impl Stream>, Error = IoError> + 'a where F: FnMut(&PeerId) -> Fut + 'a, Fut: IntoFuture + 'a, + Fut::Future: Send, { debug!("Start query for {:?} ; num results = {}", searched_key, num_results); @@ -240,7 +245,7 @@ where F: FnMut(&PeerId) -> Fut + 'a, result: Vec, // For each open connection, a future with the response of the remote. // Note that don't use a `SmallVec` here because `select_all` produces a `Vec`. - current_attempts_fut: Vec, Error = IoError> + 'a>>, + current_attempts_fut: Vec, Error = IoError> + Send + 'a>>, // For each open connection, the peer ID that we are connected to. // Must always have the same length as `current_attempts_fut`. current_attempts_addrs: SmallVec<[PeerId; 32]>, diff --git a/protocols/kad/src/kad_server.rs b/protocols/kad/src/kad_server.rs index 3f3343b4..d00a264d 100644 --- a/protocols/kad/src/kad_server.rs +++ b/protocols/kad/src/kad_server.rs @@ -65,11 +65,11 @@ impl KadConnecConfig { impl ConnectionUpgrade for KadConnecConfig where - C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ + C: AsyncRead + AsyncWrite + Send + 'static, // TODO: 'static :-/ { type Output = ( KadConnecController, - Box>, + Box + Send>, ); type MultiaddrFuture = Maf; type Future = future::Map<>::Future, fn((>::Output, Maf)) -> (Self::Output, Maf)>; @@ -191,8 +191,8 @@ impl KadFindNodeRespond { } // Builds a controller and stream from a stream/sink of raw messages. -fn build_from_sink_stream<'a, S>(connec: S) -> (KadConnecController, Box + 'a>) -where S: Sink + Stream + 'a +fn build_from_sink_stream<'a, S>(connec: S) -> (KadConnecController, Box + Send + 'a>) +where S: Sink + Stream + Send + 'a { let (tx, rx) = mpsc::unbounded(); let future = kademlia_handler(connec, rx); @@ -211,9 +211,9 @@ where S: Sink + Stream( kad_bistream: S, rq_rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender)>, -) -> Box + 'a> +) -> Box + Send + 'a> where - S: Stream + Sink + 'a, + S: Stream + Sink + Send + 'a, { let (kad_sink, kad_stream) = kad_bistream.split(); @@ -255,7 +255,7 @@ where Some(events .into_future() .map_err(|(err, _)| err) - .and_then(move |(message, events)| -> Box> { + .and_then(move |(message, events)| -> Box + Send> { match message { Some(EventSource::Finished) | None => { let future = future::ok({ @@ -397,7 +397,7 @@ where })) }).filter_map(|val| val); - Box::new(stream) as Box> + Box::new(stream) as Box + Send> } #[cfg(test)] diff --git a/protocols/kad/src/query.rs b/protocols/kad/src/query.rs index 6098a63c..fec81e5c 100644 --- a/protocols/kad/src/query.rs +++ b/protocols/kad/src/query.rs @@ -59,10 +59,10 @@ pub enum QueryEvent { pub fn find_node<'a, FBuckets, FFindNode>( query_params: QueryParams, searched_key: PeerId, -) -> Box>, Error = IoError> + 'a> +) -> Box>, Error = IoError> + Send + 'a> where FBuckets: Fn(PeerId) -> Vec + 'a + Clone, - FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError>> + 'a + Clone, + FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError> + Send> + 'a + Clone, { query(query_params, searched_key, 20) // TODO: constant } @@ -74,10 +74,10 @@ where pub fn refresh<'a, FBuckets, FFindNode>( query_params: QueryParams, bucket_num: usize, -) -> Box, Error = IoError> + 'a> +) -> Box, Error = IoError> + Send + 'a> where FBuckets: Fn(PeerId) -> Vec + 'a + Clone, - FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError>> + 'a + Clone, + FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError> + Send> + 'a + Clone, { let peer_id = match gen_random_id(&query_params.local_id, bucket_num) { Ok(p) => p, @@ -132,10 +132,10 @@ fn query<'a, FBuckets, FFindNode>( query_params: QueryParams, searched_key: PeerId, num_results: usize, -) -> Box>, Error = IoError> + 'a> +) -> Box>, Error = IoError> + Send + 'a> where FBuckets: Fn(PeerId) -> Vec + 'a + Clone, - FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError>> + 'a + Clone, + FFindNode: Fn(Multiaddr, PeerId) -> Box, Error = IoError> + Send> + 'a + Clone, { debug!("Start query for {:?} ; num results = {}", searched_key, num_results); @@ -147,7 +147,7 @@ where result: Vec, // For each open connection, a future with the response of the remote. // Note that don't use a `SmallVec` here because `select_all` produces a `Vec`. - current_attempts_fut: Vec, Error = IoError> + 'a>>, + current_attempts_fut: Vec, Error = IoError> + Send + 'a>>, // For each open connection, the peer ID that we are connected to. // Must always have the same length as `current_attempts_fut`. current_attempts_addrs: SmallVec<[PeerId; 32]>, diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index adcd4e99..a9cec286 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -65,10 +65,10 @@ //! .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!()) //! .and_then(|(out, _)| { //! match out { -//! PingOutput::Ponger(processing) => Box::new(processing) as Box>, +//! PingOutput::Ponger(processing) => Box::new(processing) as Box + Send>, //! PingOutput::Pinger { mut pinger, processing } => { //! let f = pinger.ping().map_err(|_| panic!()).select(processing).map(|_| ()).map_err(|(err, _)| err); -//! Box::new(f) as Box> +//! Box::new(f) as Box + Send> //! }, //! } //! }); @@ -118,15 +118,15 @@ pub enum PingOutput { /// Object to use in order to ping the remote. pinger: Pinger, /// Future that drives the processing of the pings. - processing: Box>, + processing: Box + Send>, }, /// We are on the listening side. - Ponger(Box>), + Ponger(Box + Send>), } impl ConnectionUpgrade for Ping where - C: AsyncRead + AsyncWrite + 'static, + C: AsyncRead + AsyncWrite + Send + 'static, { type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; type UpgradeIdentifier = (); @@ -158,7 +158,7 @@ where } /// Upgrades a connection from the dialer side. -fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutput { +fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + Send + 'static) -> PingOutput { // # How does it work? // // All the actual processing is performed by the *ponger*. @@ -199,7 +199,7 @@ fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutpu Box::new( sink.send(payload) .map(|sink| Loop::Continue((sink, stream))), - ) as Box> + ) as Box + Send> } Message::Received(payload) => { // Received a payload from the remote. @@ -210,17 +210,17 @@ fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutpu debug!("Received pong (payload={:?}) ; ping fufilled", payload); let _ = fut.send(()); Box::new(Ok(Loop::Continue((sink, stream))).into_future()) - as Box> + as Box + Send> } else { // Payload was unexpected. Closing connection. debug!("Received invalid payload ({:?}) ; closing", payload); Box::new(Ok(Loop::Break(())).into_future()) - as Box> + as Box + Send> } } } } else { - Box::new(Ok(Loop::Break(())).into_future()) as Box> + Box::new(Ok(Loop::Break(())).into_future()) as Box + Send> } }) }); @@ -232,7 +232,7 @@ fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutpu } /// Upgrades a connection from the listener side. -fn upgrade_as_listener(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutput { +fn upgrade_as_listener(socket: impl AsyncRead + AsyncWrite + Send + 'static) -> PingOutput { let sink_stream = Framed::new(socket, Codec); let (sink, stream) = sink_stream.split(); @@ -247,10 +247,10 @@ fn upgrade_as_listener(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOut Box::new( sink.send(payload.freeze()) .map(|sink| Loop::Continue((sink, stream))), - ) as Box> + ) as Box + Send> } else { // Connection was closed - Box::new(Ok(Loop::Break(())).into_future()) as Box> + Box::new(Ok(Loop::Break(())).into_future()) as Box + Send> } }) }); @@ -269,7 +269,7 @@ impl Pinger { /// /// **Note**: Please be aware that there is no timeout on the ping. You should handle the /// timeout yourself when you call this function. - pub fn ping(&mut self) -> Box>> { + pub fn ping(&mut self) -> Box> + Send> { let (tx, rx) = oneshot::channel(); let payload: [u8; 32] = self.rng.sample(Standard); diff --git a/protocols/secio/src/handshake.rs b/protocols/secio/src/handshake.rs index 7c98f72e..c7da0143 100644 --- a/protocols/secio/src/handshake.rs +++ b/protocols/secio/src/handshake.rs @@ -59,9 +59,9 @@ use {SecioKeyPair, SecioKeyPairInner}; pub fn handshake<'a, S: 'a>( socket: S, local_key: SecioKeyPair, -) -> Box, PublicKey, Vec), Error = SecioError> + 'a> +) -> Box, PublicKey, Vec), Error = SecioError> + Send + 'a> where - S: AsyncRead + AsyncWrite, + S: AsyncRead + AsyncWrite + Send, { // TODO: could be rewritten as a coroutine once coroutines land in stable Rust diff --git a/protocols/secio/src/lib.rs b/protocols/secio/src/lib.rs index c8452e52..0f88f94e 100644 --- a/protocols/secio/src/lib.rs +++ b/protocols/secio/src/lib.rs @@ -289,12 +289,12 @@ where impl libp2p_core::ConnectionUpgrade for SecioConfig where - S: AsyncRead + AsyncWrite + 'static, // TODO: 'static :( - Maf: 'static, // TODO: 'static :( + S: AsyncRead + AsyncWrite + Send + 'static, // TODO: 'static :( + Maf: Send + 'static, // TODO: 'static :( { type Output = SecioOutput; type MultiaddrFuture = Maf; - type Future = Box>; + type Future = Box + Send>; type NamesIter = iter::Once<(Bytes, ())>; type UpgradeIdentifier = (); @@ -342,7 +342,7 @@ pub struct SecioMiddleware { impl SecioMiddleware where - S: AsyncRead + AsyncWrite, + S: AsyncRead + AsyncWrite + Send, { /// Attempts to perform a handshake on the given socket. /// @@ -351,7 +351,7 @@ where pub fn handshake<'a>( socket: S, key_pair: SecioKeyPair, - ) -> Box, PublicKey, Vec), Error = SecioError> + 'a> + ) -> Box, PublicKey, Vec), Error = SecioError> + Send + 'a> where S: 'a, { diff --git a/src/simple.rs b/src/simple.rs index 5cebe43e..f640af86 100644 --- a/src/simple.rs +++ b/src/simple.rs @@ -62,8 +62,8 @@ where C: AsyncRead + AsyncWrite, F: Fn(C) -> O, O: IntoFuture, - O::Future: 'static, - Maf: 'static, + O::Future: Send + 'static, + Maf: Send + 'static, { type NamesIter = iter::Once<(Bytes, ())>; type UpgradeIdentifier = (); @@ -75,7 +75,7 @@ where type Output = O::Item; type MultiaddrFuture = Maf; - type Future = Box>; + type Future = Box + Send>; #[inline] fn upgrade(self, socket: C, _: (), _: Endpoint, client_addr: Maf) -> Self::Future { diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index 24e7d4e9..cdff70b0 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -94,13 +94,14 @@ where impl Transport for DnsConfig where - T: Transport + 'static, // TODO: 'static :-/ + T: Transport + Send + 'static, // TODO: 'static :-/ + T::Dial: Send, { type Output = T::Output; type MultiaddrFuture = T::MultiaddrFuture; type Listener = T::Listener; type ListenerUpgrade = T::ListenerUpgrade; - type Dial = Box>; + type Dial = Box + Send>; #[inline] fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { diff --git a/transports/ratelimit/src/lib.rs b/transports/ratelimit/src/lib.rs index c5fa55a7..9fc86a24 100644 --- a/transports/ratelimit/src/lib.rs +++ b/transports/ratelimit/src/lib.rs @@ -157,13 +157,15 @@ where impl Transport for RateLimited where T: Transport + 'static, - T::Output: AsyncRead + AsyncWrite, + T::Dial: Send, + T::MultiaddrFuture: Send, + T::Output: AsyncRead + AsyncWrite + Send, { type Output = Connection; type MultiaddrFuture = T::MultiaddrFuture; type Listener = Listener; type ListenerUpgrade = ListenerUpgrade; - type Dial = Box, Self::MultiaddrFuture), Error = io::Error>>; + type Dial = Box, Self::MultiaddrFuture), Error = io::Error> + Send>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> where diff --git a/transports/relay/Cargo.toml b/transports/relay/Cargo.toml index edecf4cf..0fa426b2 100644 --- a/transports/relay/Cargo.toml +++ b/transports/relay/Cargo.toml @@ -15,4 +15,4 @@ protobuf = "2.0.2" rand = "0.4" tokio-codec = "0.1" tokio-io = "0.1" -unsigned-varint = { version = "0.2", features = ["codec"] } +unsigned-varint = { version = "0.2.1", features = ["codec"] } diff --git a/transports/relay/src/protocol.rs b/transports/relay/src/protocol.rs index fc8c5779..b08ba981 100644 --- a/transports/relay/src/protocol.rs +++ b/transports/relay/src/protocol.rs @@ -46,17 +46,21 @@ pub struct RelayConfig { // but otherwise the stream is not programmatically accessible. pub enum Output { Stream(C), - Sealed(Box>) + Sealed(Box + Send>) } impl ConnectionUpgrade for RelayConfig where - C: AsyncRead + AsyncWrite + 'static, - T: Transport + Clone + 'static, - T::Output: AsyncRead + AsyncWrite, - P: Deref + Clone + 'static, + C: AsyncRead + AsyncWrite + Send + 'static, + T: Transport + Clone + Send + 'static, + T::Dial: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, + T::MultiaddrFuture: Send, + T::Output: AsyncRead + AsyncWrite + Send, + P: Deref + Clone + Send + 'static, S: 'static, - Maf: 'static, + Maf: Send + 'static, for<'a> &'a S: Peerstore { type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; @@ -68,7 +72,7 @@ where type Output = Output; type MultiaddrFuture = Maf; - type Future = Box>; + type Future = Box + Send>; fn upgrade(self, conn: C, _: (), _: Endpoint, remote_addr: Maf) -> Self::Future { let future = Io::new(conn).recv().and_then(move |(message, io)| { @@ -98,7 +102,11 @@ where impl RelayConfig where T: Transport + Clone + 'static, - T::Output: AsyncRead + AsyncWrite, + T::Dial: Send, // TODO: remove + T::Listener: Send, // TODO: remove + T::ListenerUpgrade: Send, // TODO: remove + T::MultiaddrFuture: Send, // TODO: remove + T::Output: Send + AsyncRead + AsyncWrite, P: Deref + Clone + 'static, for<'a> &'a S: Peerstore, { @@ -268,8 +276,8 @@ pub(crate) struct Source(pub(crate) CircuitRelay); impl ConnectionUpgrade for Source where - C: AsyncRead + AsyncWrite + 'static, - Maf: 'static, + C: AsyncRead + AsyncWrite + Send + 'static, + Maf: Send + 'static, { type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; type UpgradeIdentifier = (); @@ -280,7 +288,7 @@ where type Output = C; type MultiaddrFuture = Maf; - type Future = Box>; + type Future = Box + Send>; fn upgrade(self, conn: C, _: (), _: Endpoint, remote_addr: Maf) -> Self::Future { let future = Io::new(conn) diff --git a/transports/relay/src/transport.rs b/transports/relay/src/transport.rs index c3c85595..fe2c8b95 100644 --- a/transports/relay/src/transport.rs +++ b/transports/relay/src/transport.rs @@ -39,17 +39,21 @@ pub struct RelayTransport { impl Transport for RelayTransport where - T: Transport + Clone + 'static, - T::Output: AsyncRead + AsyncWrite, + T: Transport + Send + Clone + 'static, + T::Dial: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, + T::MultiaddrFuture: Send, + T::Output: AsyncRead + AsyncWrite + Send, P: Deref + Clone + 'static, S: 'static, for<'a> &'a S: Peerstore { type Output = T::Output; type MultiaddrFuture = T::MultiaddrFuture; - type Listener = Box>; - type ListenerUpgrade = Box>; - type Dial = Box>; + type Listener = Box + Send>; + type ListenerUpgrade = Box + Send>; + type Dial = Box + Send>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { Err((self, addr)) @@ -85,7 +89,11 @@ where impl RelayTransport where T: Transport + Clone + 'static, - T::Output: AsyncRead + AsyncWrite, + T::Dial: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, + T::MultiaddrFuture: Send, + T::Output: AsyncRead + AsyncWrite + Send, P: Deref + Clone + 'static, for<'a> &'a S: Peerstore { diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 99e1927d..1a24209d 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -13,7 +13,9 @@ rw-stream-sink = { path = "../../misc/rw-stream-sink" } tokio-io = "0.1" [target.'cfg(not(target_os = "emscripten"))'.dependencies] -websocket = { version = "0.20.2", default-features = false, features = ["async", "async-ssl"] } +# TODO: restore the upstream version once the branch is merged +websocket = { git = "https://github.com/tomaka/rust-websocket", branch = "send", default-features = false, features = ["async", "async-ssl"] } +#websocket = { version = "0.20.2", default-features = false, features = ["async", "async-ssl"] } [target.'cfg(target_os = "emscripten")'.dependencies] stdweb = { version = "0.1.3", default-features = false } diff --git a/transports/websocket/src/browser.rs b/transports/websocket/src/browser.rs index 15f28629..3bae2b70 100644 --- a/transports/websocket/src/browser.rs +++ b/transports/websocket/src/browser.rs @@ -54,10 +54,10 @@ impl BrowserWsConfig { impl Transport for BrowserWsConfig { type Output = BrowserWsConn; type MultiaddrFuture = FutureResult; - type Listener = Box>; // TODO: use `!` + type Listener = Box + Send>; // TODO: use `!` type ListenerUpgrade = - Box>; // TODO: use `!` - type Dial = Box>; + Box + Send>; // TODO: use `!` + type Dial = Box + Send>; #[inline] fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { diff --git a/transports/websocket/src/desktop.rs b/transports/websocket/src/desktop.rs index 9a741bf0..ece2943e 100644 --- a/transports/websocket/src/desktop.rs +++ b/transports/websocket/src/desktop.rs @@ -59,16 +59,20 @@ impl Transport for WsConfig where // TODO: this 'static is pretty arbitrary and is necessary because of the websocket library T: Transport + 'static, + T::Dial: Send, + T::MultiaddrFuture: Send, + T::Listener: Send, + T::ListenerUpgrade: Send, // TODO: this Send is pretty arbitrary and is necessary because of the websocket library T::Output: AsyncRead + AsyncWrite + Send, { - type Output = Box; - type MultiaddrFuture = Box>; + type Output = Box; + type MultiaddrFuture = Box + Send>; type Listener = stream::Map::ListenerUpgrade) -> Self::ListenerUpgrade>; type ListenerUpgrade = - Box>; - type Dial = Box>; + Box + Send>; + type Dial = Box + Send>; fn listen_on( self, @@ -139,16 +143,16 @@ where .map(|v| v.expect("we only take while this is Some")); let read_write = RwStreamSink::new(framed_data); - Box::new(read_write) as Box + Box::new(read_write) as Box }) }) - .map(|s| Box::new(Ok(s).into_future()) as Box>) + .map(|s| Box::new(Ok(s).into_future()) as Box + Send>) .into_future() .flatten() - .map(move |v| (v, Box::new(client_addr) as Box>)) + .map(move |v| (v, Box::new(client_addr) as Box + Send>)) }); - Box::new(upgraded) as Box> + Box::new(upgraded) as Box + Send> }); Ok((listen, new_addr)) @@ -198,7 +202,7 @@ where addr.append(AddrComponent::WS); }; addr - })) as Box>; + })) as Box + Send>; ClientBuilder::new(&ws_addr) .expect("generated ws address is always valid") @@ -223,7 +227,7 @@ where } }); let read_write = RwStreamSink::new(framed_data); - Box::new(read_write) as Box + Box::new(read_write) as Box }) .map(move |c| (c, client_addr)) });