Implement Send everywhere (#458)

This commit is contained in:
Pierre Krieger
2018-09-06 09:54:35 +02:00
committed by GitHub
parent bca1083d9c
commit ea1f172397
36 changed files with 367 additions and 232 deletions

View File

@ -129,7 +129,7 @@ enum PeerState<D, M> where M: StreamMuxer {
// TODO: stronger Future type
Pending {
/// Future that produces the muxer.
future: Box<Future<Item = ((D, M), Multiaddr), Error = IoError>>,
future: Box<Future<Item = ((D, M), Multiaddr), Error = IoError> + Send>,
/// All the tasks to notify when `future` resolves.
notify: FnvHashMap<usize, task::Task>,
},
@ -164,15 +164,19 @@ where
impl<T, D, M> Transport for ConnectionReuse<T, D, M>
where
T: Transport + 'static, // TODO: 'static :(
T: Transport + Send + 'static, // TODO: 'static :(
T::Dial: Send,
T::MultiaddrFuture: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T: Transport<Output = (D, M)> + Clone + 'static, // TODO: 'static :(
M: StreamMuxer + 'static,
D: Clone + 'static,
M: Send + Sync + StreamMuxer + 'static,
D: Send + Clone + 'static,
T: Clone,
{
type Output = (D, ConnectionReuseSubstream<T, D, M>);
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
type Dial = ConnectionReuseDial<T, D, M>;
@ -245,10 +249,14 @@ where
impl<T, D, M> MuxedTransport for ConnectionReuse<T, D, M>
where
T: Transport + 'static, // TODO: 'static :(
T: Transport + Send + 'static, // TODO: 'static :(
T::Dial: Send,
T::MultiaddrFuture: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T: Transport<Output = (D, M)> + Clone + 'static, // TODO: 'static :(
M: StreamMuxer + 'static,
D: Clone + 'static,
M: Send + Sync + StreamMuxer + 'static,
D: Send + Clone + 'static,
T: Clone,
{
type Incoming = ConnectionReuseIncoming<T, D, M>;
@ -305,10 +313,10 @@ where
impl<T, D, M> Future for ConnectionReuseDial<T, D, M>
where
T: Transport<Output = (D, M)> + Clone,
M: StreamMuxer + 'static,
D: Clone + 'static,
<T as Transport>::Dial: 'static,
<T as Transport>::MultiaddrFuture: 'static,
M: Send + StreamMuxer + 'static,
D: Send + Clone + 'static,
<T as Transport>::Dial: Send + 'static,
<T as Transport>::MultiaddrFuture: Send + 'static,
{
type Item = ((D, ConnectionReuseSubstream<T, D, M>), FutureResult<Multiaddr, IoError>);
type Error = IoError;
@ -471,7 +479,7 @@ where
/// Identifier for this listener. Used to determine which connections were opened by it.
listener_id: u64,
/// Opened connections that need to be upgraded.
current_upgrades: FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
current_upgrades: FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError> + Send>>,
/// Shared between the whole connection reuse mechanism.
shared: Arc<Mutex<Shared<T, D, M>>>,
@ -484,7 +492,7 @@ where
M: StreamMuxer,
D: Clone,
L: Stream<Item = Lu, Error = IoError>,
Lu: Future<Item = (T::Output, Multiaddr), Error = IoError> + 'static,
Lu: Future<Item = (T::Output, Multiaddr), Error = IoError> + Send + 'static,
{
type Item = FutureResult<((D, ConnectionReuseSubstream<T, D, M>), FutureResult<Multiaddr, IoError>), IoError>;
type Error = IoError;

View File

@ -41,7 +41,7 @@ pub fn swarm<T, H, F>(
) -> (SwarmController<T, F::Future>, SwarmEvents<T, F::Future, H>)
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> F,
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>) -> F,
F: IntoFuture<Item = (), Error = IoError>,
{
let shared = Arc::new(Mutex::new(Shared {
@ -105,6 +105,11 @@ where
impl<T, F> SwarmController<T, F>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
T::Dial: Send,
T::MultiaddrFuture: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::Output: Send,
F: 'static,
{
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
@ -119,6 +124,8 @@ where
-> Result<impl Future<Item = (), Error = IoError>, Multiaddr>
where
Du: Transport + 'static, // TODO: 'static :-/
Du::Dial: Send,
Du::MultiaddrFuture: Send,
Du::Output: Into<T::Output>,
{
self.dial_then(multiaddr, transport, |v| v)
@ -132,8 +139,10 @@ where
-> Result<impl Future<Item = (), Error = IoError>, Multiaddr>
where
Du: Transport + 'static, // TODO: 'static :-/
Du::Dial: Send,
Du::MultiaddrFuture: Send,
Du::Output: Into<T::Output>,
TThen: FnOnce(Result<(), IoError>) -> Result<(), IoError> + 'static,
TThen: FnOnce(Result<(), IoError>) -> Result<(), IoError> + Send + 'static,
{
trace!("Swarm dialing {}", multiaddr);
@ -148,12 +157,12 @@ where
let mut then = Box::new(move |val: Result<(), IoError>| {
let then = then.take().expect("The Boxed FnMut should only be called once");
then(val);
}) as Box<FnMut(_)>;
}) as Box<FnMut(_) + Send>;
let dial = dial.then(|result| {
match result {
Ok((output, client_addr)) => {
let client_addr = Box::new(client_addr) as Box<Future<Item = _, Error = _>>;
let client_addr = Box::new(client_addr) as Box<Future<Item = _, Error = _> + Send>;
Ok((output.into(), then, client_addr))
}
Err(err) => {
@ -206,12 +215,12 @@ where
let listener = Box::new(
listener.map(|f| {
let f = f.map(|(out, maf)| {
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError> + Send>)
});
Box::new(f) as Box<Future<Item = _, Error = _>>
Box::new(f) as Box<Future<Item = _, Error = _> + Send>
}),
) as Box<Stream<Item = _, Error = _>>;
) as Box<Stream<Item = _, Error = _> + Send>;
shared.listeners.push((new_addr.clone(), listener.into_future()));
if let Some(task) = shared.task_to_notify.take() {
task.notify();
@ -242,7 +251,9 @@ where
impl<T, H, If, F> Stream for SwarmEvents<T, F, H>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> If,
T::MultiaddrFuture: Send,
T::IncomingUpgrade: Send,
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>) -> If,
If: IntoFuture<Future = F, Item = (), Error = IoError>,
F: Future<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
{
@ -259,7 +270,7 @@ where
debug!("Swarm received new multiplexed incoming connection");
shared.next_incoming = self.transport.clone().next_incoming();
let connec = connec.map(|(out, maf)| {
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError> + Send>)
});
shared.listeners_upgrade.push(Box::new(connec) as Box<_>);
}
@ -393,21 +404,21 @@ struct Shared<T, F> where T: MuxedTransport + 'static {
StreamFuture<
Box<
Stream<
Item = Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>,
Item = Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>), Error = IoError> + Send>,
Error = IoError,
>,
> + Send,
>,
>,
)>,
/// Futures that upgrade an incoming listening connection to a full connection.
listeners_upgrade:
Vec<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
Vec<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError> + Send>), Error = IoError> + Send>>,
/// Futures that dial a remote address.
///
/// Contains the address we dial, so that we can cancel it if necessary.
dialers: Vec<(Multiaddr, Box<Future<Item = (T::Output, Box<FnMut(Result<(), IoError>)>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>)>,
dialers: Vec<(Multiaddr, Box<Future<Item = (T::Output, Box<FnMut(Result<(), IoError>) + Send>, Box<Future<Item = Multiaddr, Error = IoError> + Send>), Error = IoError> + Send>)>,
/// List of futures produced by the swarm closure. Must be processed to the end.
to_process: Vec<F>,

View File

@ -40,15 +40,18 @@ pub struct AndThen<T, C> {
impl<T, C, F, O, Maf> Transport for AndThen<T, C>
where
T: Transport + 'static,
C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + 'static,
F: Future<Item = (O, Maf), Error = IoError> + 'static,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + Send + 'static,
F: Future<Item = (O, Maf), Error = IoError> + Send + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + 'static,
{
type Output = O;
type MultiaddrFuture = Maf;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
type ListenerUpgrade = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError> + Send>;
type Dial = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -116,12 +119,17 @@ where
impl<T, C, F, O, Maf> MuxedTransport for AndThen<T, C>
where
T: MuxedTransport + 'static,
C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + 'static,
F: Future<Item = (O, Maf), Error = IoError> + 'static,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::Incoming: Send,
T::IncomingUpgrade: Send,
C: FnOnce(T::Output, Endpoint, T::MultiaddrFuture) -> F + Clone + Send + 'static,
F: Future<Item = (O, Maf), Error = IoError> + Send + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + 'static,
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError> + Send>;
type IncomingUpgrade = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
@ -134,7 +142,7 @@ where
upgrade(connection, Endpoint::Listener, client_addr)
});
Box::new(future) as Box<Future<Item = _, Error = _>>
Box::new(future) as Box<Future<Item = _, Error = _> + Send>
});
Box::new(future) as Box<_>

View File

@ -85,26 +85,26 @@ impl<A, B> MuxedTransport for OrTransport<A, B>
where
A: MuxedTransport,
B: MuxedTransport,
A::Incoming: 'static, // TODO: meh :-/
B::Incoming: 'static, // TODO: meh :-/
A::IncomingUpgrade: 'static, // TODO: meh :-/
B::IncomingUpgrade: 'static, // TODO: meh :-/
A::Incoming: Send + 'static, // TODO: meh :-/
B::Incoming: Send + 'static, // TODO: meh :-/
A::IncomingUpgrade: Send + 'static, // TODO: meh :-/
B::IncomingUpgrade: Send + 'static, // TODO: meh :-/
A::Output: 'static, // TODO: meh :-/
B::Output: 'static, // TODO: meh :-/
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError> + Send>;
type IncomingUpgrade =
Box<Future<Item = (EitherOutput<A::Output, B::Output>, Self::MultiaddrFuture), Error = IoError>>;
Box<Future<Item = (EitherOutput<A::Output, B::Output>, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
let first = self.0.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherOutput::First(v), future::Either::A(addr)));
Box::new(fut) as Box<Future<Item = _, Error = _>>
Box::new(fut) as Box<Future<Item = _, Error = _> + Send>
});
let second = self.1.next_incoming().map(|out| {
let fut = out.map(move |(v, addr)| (EitherOutput::Second(v), future::Either::B(addr)));
Box::new(fut) as Box<Future<Item = _, Error = _>>
Box::new(fut) as Box<Future<Item = _, Error = _> + Send>
});
let future = first.select(second).map(|(i, _)| i).map_err(|(e, _)| e);
Box::new(future) as Box<_>

View File

@ -42,13 +42,16 @@ impl<T, F> Map<T, F> {
impl<T, F, D> Transport for Map<T, F>
where
T: Transport + 'static, // TODO: 'static :-/
F: FnOnce(T::Output, Endpoint) -> D + Clone + 'static, // TODO: 'static :-/
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
F: FnOnce(T::Output, Endpoint) -> D + Clone + Send + 'static, // TODO: 'static :-/
{
type Output = D;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let map = self.map;
@ -91,10 +94,15 @@ where
impl<T, F, D> MuxedTransport for Map<T, F>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
F: FnOnce(T::Output, Endpoint) -> D + Clone + 'static, // TODO: 'static :-/
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::Incoming: Send,
T::IncomingUpgrade: Send,
F: FnOnce(T::Output, Endpoint) -> D + Clone + Send + 'static, // TODO: 'static :-/
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError> + Send>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
fn next_incoming(self) -> Self::Incoming {
let map = self.map;

View File

@ -41,13 +41,14 @@ impl<T, F> MapErrDial<T, F> {
impl<T, F> Transport for MapErrDial<T, F>
where
T: Transport + 'static, // TODO: 'static :-/
F: FnOnce(IoError, Multiaddr) -> IoError + Clone + 'static, // TODO: 'static :-/
T::Dial: Send,
F: FnOnce(IoError, Multiaddr) -> IoError + Clone + Send + 'static, // TODO: 'static :-/
{
type Output = T::Output;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
match self.transport.listen_on(addr) {
@ -77,7 +78,8 @@ where
impl<T, F> MuxedTransport for MapErrDial<T, F>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
F: FnOnce(IoError, Multiaddr) -> IoError + Clone + 'static, // TODO: 'static :-/
T::Dial: Send,
F: FnOnce(IoError, Multiaddr) -> IoError + Clone + Send + 'static, // TODO: 'static :-/
{
type Incoming = T::Incoming;
type IncomingUpgrade = T::IncomingUpgrade;

View File

@ -50,12 +50,12 @@ impl<T> Clone for Dialer<T> {
}
}
impl<T: IntoBuf + 'static> Transport for Dialer<T> {
impl<T: IntoBuf + Send + 'static> Transport for Dialer<T> {
type Output = Channel<T>;
type Listener = Box<Stream<Item=Self::ListenerUpgrade, Error=io::Error>>;
type Listener = Box<Stream<Item=Self::ListenerUpgrade, Error=io::Error> + Send>;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), io::Error>;
type MultiaddrFuture = FutureResult<Multiaddr, io::Error>;
type Dial = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error>>;
type Dial = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Err((self, addr))
@ -93,12 +93,12 @@ impl<T> Clone for Listener<T> {
}
}
impl<T: IntoBuf + 'static> Transport for Listener<T> {
impl<T: IntoBuf + Send + 'static> Transport for Listener<T> {
type Output = Channel<T>;
type Listener = Box<Stream<Item=Self::ListenerUpgrade, Error=io::Error>>;
type Listener = Box<Stream<Item=Self::ListenerUpgrade, Error=io::Error> + Send>;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), io::Error>;
type MultiaddrFuture = FutureResult<Multiaddr, io::Error>;
type Dial = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error>>;
type Dial = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
if !is_memory_addr(&addr) {
@ -200,12 +200,12 @@ mod tests {
#[derive(Clone)]
struct Echo(mpsc::UnboundedSender<()>);
impl<Maf: 'static> ConnectionUpgrade<memory::Channel<Bytes>, Maf> for Echo {
impl<Maf: Send + 'static> ConnectionUpgrade<memory::Channel<Bytes>, Maf> for Echo {
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
type Output = ();
type MultiaddrFuture = Maf;
type Future = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error>>;
type Future = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error> + Send>;
fn protocol_names(&self) -> Self::NamesIter {
iter::once(("/echo/1.0.0".into(), ()))

View File

@ -47,8 +47,15 @@ impl<T, C> UpgradedNode<T, C> {
impl<'a, T, C> UpgradedNode<T, C>
where
T: Transport + 'a,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + 'a,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::MultiaddrFuture: Send,
T::Output: Send + AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + Send + 'a,
C::NamesIter: Send,
C::Future: Send,
C::UpgradeIdentifier: Send,
{
/// Returns a reference to the inner `Transport`.
#[inline]
@ -65,7 +72,7 @@ where
pub fn dial(
self,
addr: Multiaddr,
) -> Result<Box<Future<Item = (C::Output, C::MultiaddrFuture), Error = IoError> + 'a>, (Self, Multiaddr)>
) -> Result<Box<Future<Item = (C::Output, C::MultiaddrFuture), Error = IoError> + Send + 'a>, (Self, Multiaddr)>
where
C::NamesIter: Clone, // TODO: not elegant
{
@ -101,13 +108,15 @@ where
self,
) -> Box<
Future<
Item = Box<Future<Item = (C::Output, C::MultiaddrFuture), Error = IoError> + 'a>,
Item = Box<Future<Item = (C::Output, C::MultiaddrFuture), Error = IoError> + Send + 'a>,
Error = IoError,
>
+ 'a,
+ Send + 'a,
>
where
T: MuxedTransport,
T::Incoming: Send,
T::IncomingUpgrade: Send,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
@ -119,7 +128,7 @@ where
apply(connection, upgrade, Endpoint::Listener, client_addr)
});
Box::new(future) as Box<Future<Item = _, Error = _>>
Box::new(future) as Box<Future<Item = _, Error = _> + Send>
});
Box::new(future) as Box<_>
@ -138,9 +147,10 @@ where
(
Box<
Stream<
Item = Box<Future<Item = (C::Output, C::MultiaddrFuture), Error = IoError> + 'a>,
Item = Box<Future<Item = (C::Output, C::MultiaddrFuture), Error = IoError> + Send + 'a>,
Error = IoError,
>
+ Send
+ 'a,
>,
Multiaddr,
@ -187,17 +197,22 @@ where
impl<T, C> Transport for UpgradedNode<T, C>
where
T: Transport + 'static,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + 'static,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::MultiaddrFuture: Send,
T::Output: Send + AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + Clone + Send + 'static,
C::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError>,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
C::NamesIter: Clone + Send,
C::Future: Send,
C::UpgradeIdentifier: Send,
{
type Output = C::Output;
type MultiaddrFuture = C::MultiaddrFuture;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (C::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (C::Output, Self::MultiaddrFuture), Error = IoError>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
type ListenerUpgrade = Box<Future<Item = (C::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
type Dial = Box<Future<Item = (C::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -218,14 +233,21 @@ where
impl<T, C> MuxedTransport for UpgradedNode<T, C>
where
T: MuxedTransport + 'static,
T::Output: AsyncRead + AsyncWrite,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + 'static,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::MultiaddrFuture: Send,
T::Output: Send + AsyncRead + AsyncWrite,
T::Incoming: Send,
T::IncomingUpgrade: Send,
C: ConnectionUpgrade<T::Output, T::MultiaddrFuture> + Clone + Send + 'static,
C::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError>,
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
C::NamesIter: Clone + Send,
C::Future: Send,
C::UpgradeIdentifier: Send,
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (C::Output, Self::MultiaddrFuture), Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError> + Send>;
type IncomingUpgrade = Box<Future<Item = (C::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn next_incoming(self) -> Self::Incoming {

View File

@ -101,10 +101,17 @@ impl<T> UniqueConnec<T> {
#[inline]
pub fn dial<S, F, Du>(&self, swarm: &SwarmController<S, F>, multiaddr: &Multiaddr,
transport: Du) -> UniqueConnecFuture<T>
where T: Clone + 'static, // TODO: 'static :-/
where T: Clone + Send + 'static, // TODO: 'static :-/
Du: Transport + 'static, // TODO: 'static :-/
Du::Output: Into<S::Output>,
Du::Dial: Send,
Du::MultiaddrFuture: Send,
S: Clone + MuxedTransport,
S::Dial: Send,
S::Listener: Send,
S::ListenerUpgrade: Send,
S::Output: Send,
S::MultiaddrFuture: Send,
F: 'static,
{
self.dial_inner(swarm, multiaddr, transport, true)
@ -115,10 +122,17 @@ impl<T> UniqueConnec<T> {
#[inline]
pub fn dial_if_empty<S, F, Du>(&self, swarm: &SwarmController<S, F>, multiaddr: &Multiaddr,
transport: Du) -> UniqueConnecFuture<T>
where T: Clone + 'static, // TODO: 'static :-/
where T: Clone + Send + 'static, // TODO: 'static :-/
Du: Transport + 'static, // TODO: 'static :-/
Du::Output: Into<S::Output>,
Du::Dial: Send,
Du::MultiaddrFuture: Send,
S: Clone + MuxedTransport,
S::Dial: Send,
S::Listener: Send,
S::ListenerUpgrade: Send,
S::Output: Send,
S::MultiaddrFuture: Send,
F: 'static,
{
self.dial_inner(swarm, multiaddr, transport, false)
@ -127,10 +141,17 @@ impl<T> UniqueConnec<T> {
/// Inner implementation of `dial_*`.
fn dial_inner<S, F, Du>(&self, swarm: &SwarmController<S, F>, multiaddr: &Multiaddr,
transport: Du, dial_if_err: bool) -> UniqueConnecFuture<T>
where T: Clone + 'static, // TODO: 'static :-/
where T: Clone + Send + 'static, // TODO: 'static :-/
Du: Transport + 'static, // TODO: 'static :-/
Du::Output: Into<S::Output>,
Du::Dial: Send,
Du::MultiaddrFuture: Send,
S: Clone + MuxedTransport,
S::Dial: Send,
S::Listener: Send,
S::ListenerUpgrade: Send,
S::Output: Send,
S::MultiaddrFuture: Send,
F: 'static,
{
let mut inner = self.inner.lock();

View File

@ -64,18 +64,21 @@ pub struct LoopUpg<Inner> {
impl<State, Socket, Inner, Out, AddrFut> ConnectionUpgrade<(State, Socket), AddrFut>
for LoopUpg<Inner>
where
State: 'static,
Socket: AsyncRead + AsyncWrite + 'static,
State: Send + 'static,
Socket: AsyncRead + AsyncWrite + Send + 'static,
Inner: ConnectionUpgrade<
(State, Socket),
AddrFut,
Output = Loop<State, Socket, Out>,
MultiaddrFuture = AddrFut,
> + Clone
+ Send
+ 'static,
Inner::NamesIter: Clone + 'static,
AddrFut: 'static,
Out: 'static,
Inner::NamesIter: Clone + Send + 'static,
Inner::UpgradeIdentifier: Send,
Inner::Future: Send,
AddrFut: Send + 'static,
Out: Send + 'static,
{
type NamesIter = Inner::NamesIter;
type UpgradeIdentifier = Inner::UpgradeIdentifier;
@ -86,7 +89,7 @@ where
type Output = Out;
type MultiaddrFuture = AddrFut;
type Future = Box<Future<Item = (Out, Self::MultiaddrFuture), Error = IoError>>;
type Future = Box<Future<Item = (Out, Self::MultiaddrFuture), Error = IoError> + Send>;
fn upgrade(
self,

View File

@ -39,9 +39,9 @@ pub struct Map<U, F> {
impl<C, U, F, O, Maf> ConnectionUpgrade<C, Maf> for Map<U, F>
where
U: ConnectionUpgrade<C, Maf>,
U::Future: 'static, // TODO: 'static :(
U::Future: Send + 'static, // TODO: 'static :(
C: AsyncRead + AsyncWrite,
F: FnOnce(U::Output) -> O + 'static, // TODO: 'static :(
F: FnOnce(U::Output) -> O + Send + 'static, // TODO: 'static :(
{
type NamesIter = U::NamesIter;
type UpgradeIdentifier = U::UpgradeIdentifier;
@ -52,7 +52,7 @@ where
type Output = O;
type MultiaddrFuture = U::MultiaddrFuture;
type Future = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
type Future = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError> + Send>;
fn upgrade(
self,

View File

@ -42,11 +42,11 @@ pub struct MapAddr<U, F> {
impl<C, U, F, O, Maf> ConnectionUpgrade<C, Maf> for MapAddr<U, F>
where
U: ConnectionUpgrade<C, Maf>,
U::Future: 'static, // TODO: 'static :(
U::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError> + 'static, // TODO: 'static :(
U::Output: 'static, // TODO: 'static :(
U::Future: Send + 'static, // TODO: 'static :(
U::MultiaddrFuture: Future<Item = Multiaddr, Error = IoError> + Send + 'static, // TODO: 'static :(
U::Output: Send + 'static, // TODO: 'static :(
C: AsyncRead + AsyncWrite,
F: FnOnce(U::Output, &Multiaddr) -> O + 'static, // TODO: 'static :(
F: FnOnce(U::Output, &Multiaddr) -> O + Send + 'static, // TODO: 'static :(
{
type NamesIter = U::NamesIter;
type UpgradeIdentifier = U::UpgradeIdentifier;
@ -57,7 +57,7 @@ where
type Output = O;
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Future = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError>>;
type Future = Box<Future<Item = (O, Self::MultiaddrFuture), Error = IoError> + Send>;
fn upgrade(
self,

View File

@ -10,7 +10,7 @@ futures = { version = "0.1" }
log = "0.4"
smallvec = "0.5"
tokio-io = "0.1"
unsigned-varint = { version = "0.2", features = ["codec"] }
unsigned-varint = { version = "0.2.1", features = ["codec"] }
[dev-dependencies]
tokio-current-thread = "0.1"

View File

@ -13,7 +13,7 @@ log = "0.4"
parking_lot = "0.6"
tokio-codec = "0.1"
tokio-io = "0.1"
unsigned-varint = { version = "0.2", features = ["codec"] }
unsigned-varint = { version = "0.2.1", features = ["codec"] }
[dev-dependencies]
libp2p-tcp-transport = { path = "../../transports/tcp" }

View File

@ -18,4 +18,4 @@ protobuf = "2.0.2"
smallvec = "0.6.0"
tokio-codec = "0.1"
tokio-io = "0.1"
unsigned-varint = { version = "0.2", features = ["codec"] }
unsigned-varint = { version = "0.2.1", features = ["codec"] }

View File

@ -91,8 +91,8 @@ impl FloodSubUpgrade {
impl<C, Maf> ConnectionUpgrade<C, Maf> for FloodSubUpgrade
where
C: AsyncRead + AsyncWrite + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + 'static,
C: AsyncRead + AsyncWrite + Send + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + Send + 'static,
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
@ -104,7 +104,7 @@ where
type Output = FloodSubFuture;
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Future = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Future = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn upgrade(
@ -208,7 +208,7 @@ where
// TODO: what if multiple connections?
inner.remote_connections.write().remove(&remote_addr);
let future = future::ok(future::Loop::Break(()));
Box::new(future) as Box<Future<Item = _, Error = _>>
Box::new(future) as Box<Future<Item = _, Error = _> + Send>
}
}
})
@ -521,7 +521,7 @@ pub struct Message {
/// Implementation of `Future` that must be driven to completion in order for floodsub to work.
#[must_use = "futures do nothing unless polled"]
pub struct FloodSubFuture {
inner: Box<Future<Item = (), Error = IoError>>,
inner: Box<Future<Item = (), Error = IoError> + Send>,
}
impl Future for FloodSubFuture {

View File

@ -16,7 +16,7 @@ parking_lot = "0.6"
protobuf = "2.0.2"
tokio-codec = "0.1"
tokio-io = "0.1.0"
unsigned-varint = { version = "0.2", features = ["codec"] }
unsigned-varint = { version = "0.2.1", features = ["codec"] }
[dev-dependencies]
libp2p-tcp-transport = { path = "../../transports/tcp" }

View File

@ -48,7 +48,7 @@ impl<Trans> Clone for IdentifyTransport<Trans>
}
}
type CacheEntry = future::Shared<Box<Future<Item = IdentifyTransportOutcome, Error = IoError>>>;
type CacheEntry = future::Shared<Box<Future<Item = IdentifyTransportOutcome, Error = IoError> + Send>>;
impl<Trans> IdentifyTransport<Trans> {
/// Creates an `IdentifyTransport` that wraps around the given transport and peerstore.
@ -63,14 +63,18 @@ impl<Trans> IdentifyTransport<Trans> {
impl<Trans> Transport for IdentifyTransport<Trans>
where
Trans: Transport + Clone + 'static, // TODO: 'static :(
Trans::Output: AsyncRead + AsyncWrite,
Trans: Transport + Clone + Send + 'static, // TODO: 'static :(
Trans::Dial: Send,
Trans::Listener: Send,
Trans::ListenerUpgrade: Send,
Trans::MultiaddrFuture: Send,
Trans::Output: AsyncRead + AsyncWrite + Send,
{
type Output = IdentifyTransportOutput<Trans::Output>;
type MultiaddrFuture = future::FutureResult<Multiaddr, IoError>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -139,7 +143,7 @@ where
Ok((out, future::ok(client_addr)))
});
Box::new(fut) as Box<Future<Item = _, Error = _>>
Box::new(fut) as Box<Future<Item = _, Error = _> + Send>
});
Ok((Box::new(listener) as Box<_>, new_addr))
@ -213,11 +217,17 @@ where
impl<Trans> MuxedTransport for IdentifyTransport<Trans>
where
Trans: MuxedTransport + Clone + 'static,
Trans::Output: AsyncRead + AsyncWrite,
Trans: MuxedTransport + Clone + Send + 'static,
Trans::Dial: Send,
Trans::Listener: Send,
Trans::ListenerUpgrade: Send,
Trans::MultiaddrFuture: Send,
Trans::Output: AsyncRead + AsyncWrite + Send,
Trans::Incoming: Send,
Trans::IncomingUpgrade: Send,
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError> + Send>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
@ -274,7 +284,7 @@ where
Ok((out, future::ok(client_addr)))
});
Box::new(future) as Box<Future<Item = _, Error = _>>
Box::new(future) as Box<Future<Item = _, Error = _> + Send>
});
Box::new(future) as Box<_>
@ -286,7 +296,7 @@ pub struct IdentifyTransportOutput<S> {
/// The socket to communicate with the remote.
pub socket: S,
/// Outcome of the identification of the remote.
pub info: Box<Future<Item = IdentifyTransportOutcome, Error = IoError>>,
pub info: Box<Future<Item = IdentifyTransportOutcome, Error = IoError> + Send>,
}
/// Outcome of the identification of the remote.
@ -301,7 +311,7 @@ pub struct IdentifyTransportOutcome {
fn cache_entry<F, Fut>(cache: &Mutex<FnvHashMap<Multiaddr, CacheEntry>>, addr: Multiaddr, if_no_entry: F)
-> impl Future<Item = IdentifyTransportOutcome, Error = IoError>
where F: FnOnce() -> Fut,
Fut: Future<Item = IdentifyTransportOutcome, Error = IoError> + 'static,
Fut: Future<Item = IdentifyTransportOutcome, Error = IoError> + Send + 'static,
{
trace!("Looking up cache entry for {}", addr);
let mut cache = cache.lock();
@ -313,7 +323,7 @@ where F: FnOnce() -> Fut,
Entry::Vacant(entry) => {
trace!("No cache entry available");
let future = (Box::new(if_no_entry()) as Box<Future<Item = _, Error = _>>).shared();
let future = (Box::new(if_no_entry()) as Box<Future<Item = _, Error = _> + Send>).shared();
entry.insert(future.clone());
future::Either::B(future)
},

View File

@ -45,16 +45,21 @@ impl<Trans, AddrRes> PeerIdTransport<Trans, AddrRes> {
impl<Trans, AddrRes, AddrResOut> Transport for PeerIdTransport<Trans, AddrRes>
where
Trans: Transport + Clone + 'static, // TODO: 'static :(
Trans::Output: AsyncRead + AsyncWrite,
Trans: Transport + Clone + Send + 'static, // TODO: 'static :(
Trans::Dial: Send,
Trans::Listener: Send,
Trans::ListenerUpgrade: Send,
Trans::MultiaddrFuture: Send,
Trans::Output: AsyncRead + AsyncWrite + Send,
AddrRes: Fn(PeerId) -> AddrResOut + 'static, // TODO: 'static :(
AddrResOut: IntoIterator<Item = Multiaddr> + 'static, // TODO: 'static :(
AddrResOut::IntoIter: Send,
{
type Output = PeerIdTransportOutput<Trans::Output>;
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = IoError>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = IoError> + Send>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -94,11 +99,11 @@ where
let peer_id = info.info.public_key.clone().into_peer_id();
debug!("Identified {} as {:?}", original_addr, peer_id);
AddrComponent::P2P(peer_id.into()).into()
})) as Box<Future<Item = _, Error = _>>;
})) as Box<Future<Item = _, Error = _> + Send>;
(out, real_addr)
});
Box::new(fut) as Box<Future<Item = _, Error = _>>
Box::new(fut) as Box<Future<Item = _, Error = _> + Send>
});
Ok((Box::new(listener) as Box<_>, listened_addr))
@ -155,7 +160,7 @@ where
original_addr: original_addr,
};
// Replace the multiaddress with the one of the form `/p2p/...` or `/ipfs/...`.
Ok((out, Box::new(future::ok(addr)) as Box<Future<Item = _, Error = _>>))
Ok((out, Box::new(future::ok(addr)) as Box<Future<Item = _, Error = _> + Send>))
});
Ok(Box::new(future) as Box<_>)
@ -195,7 +200,7 @@ where
let peer_id = info.info.public_key.clone().into_peer_id();
debug!("Identified {} as {:?}", original_addr, peer_id);
AddrComponent::P2P(peer_id.into()).into()
})) as Box<Future<Item = _, Error = _>>;
})) as Box<Future<Item = _, Error = _> + Send>;
(out, real_addr)
});
@ -212,13 +217,20 @@ where
impl<Trans, AddrRes, AddrResOut> MuxedTransport for PeerIdTransport<Trans, AddrRes>
where
Trans: MuxedTransport + Clone + 'static,
Trans::Output: AsyncRead + AsyncWrite,
Trans: MuxedTransport + Clone + Send + 'static,
Trans::Dial: Send,
Trans::Listener: Send,
Trans::ListenerUpgrade: Send,
Trans::MultiaddrFuture: Send,
Trans::Output: AsyncRead + AsyncWrite + Send,
Trans::Incoming: Send,
Trans::IncomingUpgrade: Send,
AddrRes: Fn(PeerId) -> AddrResOut + 'static, // TODO: 'static :(
AddrResOut: IntoIterator<Item = Multiaddr> + 'static, // TODO: 'static :(
AddrResOut::IntoIter: Send,
{
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError>>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Incoming = Box<Future<Item = Self::IncomingUpgrade, Error = IoError> + Send>;
type IncomingUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn next_incoming(self) -> Self::Incoming {
@ -243,11 +255,11 @@ where
let peer_id = info.info.public_key.clone().into_peer_id();
debug!("Identified {} as {:?}", original_addr, peer_id);
AddrComponent::P2P(peer_id.into()).into()
})) as Box<Future<Item = _, Error = _>>;
})) as Box<Future<Item = _, Error = _> + Send>;
(out, real_addr)
});
Box::new(future) as Box<Future<Item = _, Error = _>>
Box::new(future) as Box<Future<Item = _, Error = _> + Send>
});
Box::new(future) as Box<_>
@ -261,7 +273,7 @@ pub struct PeerIdTransportOutput<S> {
/// Identification of the remote.
/// This may not be known immediately, hence why we use a future.
pub info: Box<Future<Item = IdentifyTransportOutcome, Error = IoError>>,
pub info: Box<Future<Item = IdentifyTransportOutcome, Error = IoError> + Send>,
/// Original address of the remote.
/// This layer turns the address of the remote into the `/p2p/...` form, but stores the
@ -313,8 +325,8 @@ mod tests {
impl Transport for UnderlyingTrans {
type Output = <TcpConfig as Transport>::Output;
type MultiaddrFuture = <TcpConfig as Transport>::MultiaddrFuture;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>;
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
type Dial = <TcpConfig as Transport>::Dial;
#[inline]
fn listen_on(

View File

@ -61,7 +61,7 @@ pub struct IdentifySender<T> {
impl<'a, T> IdentifySender<T>
where
T: AsyncWrite + 'a,
T: AsyncWrite + Send + 'a,
{
/// Sends back information to the client. Returns a future that is signalled whenever the
/// info have been sent.
@ -69,7 +69,7 @@ where
self,
info: IdentifyInfo,
observed_addr: &Multiaddr,
) -> Box<Future<Item = (), Error = IoError> + 'a> {
) -> Box<Future<Item = (), Error = IoError> + Send + 'a> {
debug!("Sending identify info to client");
trace!("Sending: {:?}", info);
@ -113,14 +113,14 @@ pub struct IdentifyInfo {
impl<C, Maf> ConnectionUpgrade<C, Maf> for IdentifyProtocolConfig
where
C: AsyncRead + AsyncWrite + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + 'static,
C: AsyncRead + AsyncWrite + Send + 'static,
Maf: Future<Item = Multiaddr, Error = IoError> + Send + 'static,
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
type Output = IdentifyOutput<C>;
type MultiaddrFuture = future::Either<future::FutureResult<Multiaddr, IoError>, Maf>;
type Future = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Future = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn protocol_names(&self) -> Self::NamesIter {

View File

@ -24,7 +24,7 @@ smallvec = "0.5"
tokio-codec = "0.1"
tokio-io = "0.1"
tokio-timer = "0.2.6"
unsigned-varint = { version = "0.2", features = ["codec"] }
unsigned-varint = { version = "0.2.1", features = ["codec"] }
[dev-dependencies]
libp2p-tcp-transport = { path = "../../transports/tcp" }

View File

@ -76,8 +76,9 @@ impl KadSystem {
/// This future should be driven to completion by the caller.
pub fn start<'a, F, Fut>(config: KadSystemConfig<impl Iterator<Item = PeerId>>, access: F)
-> (KadSystem, impl Future<Item = (), Error = IoError> + 'a)
where F: FnMut(&PeerId) -> Fut + Clone + 'a,
where F: FnMut(&PeerId) -> Fut + Send + Clone + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
Fut::Future: Send,
{
let system = KadSystem::without_init(config);
let init_future = system.perform_initialization(access);
@ -102,8 +103,9 @@ impl KadSystem {
/// Starts an initialization process.
pub fn perform_initialization<'a, F, Fut>(&self, access: F) -> impl Future<Item = (), Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + Clone + 'a,
where F: FnMut(&PeerId) -> Fut + Send + Clone + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
Fut::Future: Send,
{
let futures: Vec<_> = (0..256) // TODO: 256 is arbitrary
.map(|n| {
@ -147,8 +149,9 @@ impl KadSystem {
/// Starts a query for an iterative `FIND_NODE` request.
pub fn find_node<'a, F, Fut>(&self, searched_key: PeerId, access: F)
-> impl Stream<Item = KadQueryEvent<Vec<PeerId>>, Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + 'a,
where F: FnMut(&PeerId) -> Fut + Send + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
Fut::Future: Send,
{
query(access, &self.kbuckets, searched_key, self.parallelism as usize,
20, self.request_timeout) // TODO: arbitrary const
@ -162,14 +165,15 @@ impl KadSystem {
fn refresh<'a, F, Fut>(bucket_num: usize, access: F, kbuckets: &KBucketsTable<PeerId, ()>,
parallelism: usize, request_timeout: Duration)
-> impl Stream<Item = KadQueryEvent<()>, Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + 'a,
where F: FnMut(&PeerId) -> Fut + Send + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
Fut::Future: Send,
{
let peer_id = match gen_random_id(kbuckets.my_id(), bucket_num) {
Ok(p) => p,
Err(()) => {
let stream = stream::once(Ok(KadQueryEvent::Finished(())));
return Box::new(stream) as Box<Stream<Item = _, Error = _>>;
return Box::new(stream) as Box<Stream<Item = _, Error = _> + Send>;
},
};
@ -180,7 +184,7 @@ where F: FnMut(&PeerId) -> Fut + 'a,
KadQueryEvent::Finished(_) => KadQueryEvent::Finished(()),
}
});
Box::new(stream) as Box<Stream<Item = _, Error = _>>
Box::new(stream) as Box<Stream<Item = _, Error = _> + Send>
}
// Generates a random `PeerId` that belongs to the given bucket.
@ -227,6 +231,7 @@ fn query<'a, F, Fut>(
) -> impl Stream<Item = KadQueryEvent<Vec<PeerId>>, Error = IoError> + 'a
where F: FnMut(&PeerId) -> Fut + 'a,
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
Fut::Future: Send,
{
debug!("Start query for {:?} ; num results = {}", searched_key, num_results);
@ -240,7 +245,7 @@ where F: FnMut(&PeerId) -> Fut + 'a,
result: Vec<PeerId>,
// For each open connection, a future with the response of the remote.
// Note that don't use a `SmallVec` here because `select_all` produces a `Vec`.
current_attempts_fut: Vec<Box<Future<Item = Vec<protocol::KadPeer>, Error = IoError> + 'a>>,
current_attempts_fut: Vec<Box<Future<Item = Vec<protocol::KadPeer>, Error = IoError> + Send + 'a>>,
// For each open connection, the peer ID that we are connected to.
// Must always have the same length as `current_attempts_fut`.
current_attempts_addrs: SmallVec<[PeerId; 32]>,

View File

@ -65,11 +65,11 @@ impl KadConnecConfig {
impl<C, Maf> ConnectionUpgrade<C, Maf> for KadConnecConfig
where
C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
C: AsyncRead + AsyncWrite + Send + 'static, // TODO: 'static :-/
{
type Output = (
KadConnecController,
Box<Stream<Item = KadIncomingRequest, Error = IoError>>,
Box<Stream<Item = KadIncomingRequest, Error = IoError> + Send>,
);
type MultiaddrFuture = Maf;
type Future = future::Map<<KademliaProtocolConfig as ConnectionUpgrade<C, Maf>>::Future, fn((<KademliaProtocolConfig as ConnectionUpgrade<C, Maf>>::Output, Maf)) -> (Self::Output, Maf)>;
@ -191,8 +191,8 @@ impl KadFindNodeRespond {
}
// Builds a controller and stream from a stream/sink of raw messages.
fn build_from_sink_stream<'a, S>(connec: S) -> (KadConnecController, Box<Stream<Item = KadIncomingRequest, Error = IoError> + 'a>)
where S: Sink<SinkItem = KadMsg, SinkError = IoError> + Stream<Item = KadMsg, Error = IoError> + 'a
fn build_from_sink_stream<'a, S>(connec: S) -> (KadConnecController, Box<Stream<Item = KadIncomingRequest, Error = IoError> + Send + 'a>)
where S: Sink<SinkItem = KadMsg, SinkError = IoError> + Stream<Item = KadMsg, Error = IoError> + Send + 'a
{
let (tx, rx) = mpsc::unbounded();
let future = kademlia_handler(connec, rx);
@ -211,9 +211,9 @@ where S: Sink<SinkItem = KadMsg, SinkError = IoError> + Stream<Item = KadMsg, Er
fn kademlia_handler<'a, S>(
kad_bistream: S,
rq_rx: mpsc::UnboundedReceiver<(KadMsg, oneshot::Sender<KadMsg>)>,
) -> Box<Stream<Item = KadIncomingRequest, Error = IoError> + 'a>
) -> Box<Stream<Item = KadIncomingRequest, Error = IoError> + Send + 'a>
where
S: Stream<Item = KadMsg, Error = IoError> + Sink<SinkItem = KadMsg, SinkError = IoError> + 'a,
S: Stream<Item = KadMsg, Error = IoError> + Sink<SinkItem = KadMsg, SinkError = IoError> + Send + 'a,
{
let (kad_sink, kad_stream) = kad_bistream.split();
@ -255,7 +255,7 @@ where
Some(events
.into_future()
.map_err(|(err, _)| err)
.and_then(move |(message, events)| -> Box<Future<Item = _, Error = _>> {
.and_then(move |(message, events)| -> Box<Future<Item = _, Error = _> + Send> {
match message {
Some(EventSource::Finished) | None => {
let future = future::ok({
@ -397,7 +397,7 @@ where
}))
}).filter_map(|val| val);
Box::new(stream) as Box<Stream<Item = _, Error = IoError>>
Box::new(stream) as Box<Stream<Item = _, Error = IoError> + Send>
}
#[cfg(test)]

View File

@ -59,10 +59,10 @@ pub enum QueryEvent<TOut> {
pub fn find_node<'a, FBuckets, FFindNode>(
query_params: QueryParams<FBuckets, FFindNode>,
searched_key: PeerId,
) -> Box<Stream<Item = QueryEvent<Vec<PeerId>>, Error = IoError> + 'a>
) -> Box<Stream<Item = QueryEvent<Vec<PeerId>>, Error = IoError> + Send + 'a>
where
FBuckets: Fn(PeerId) -> Vec<PeerId> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError>> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError> + Send> + 'a + Clone,
{
query(query_params, searched_key, 20) // TODO: constant
}
@ -74,10 +74,10 @@ where
pub fn refresh<'a, FBuckets, FFindNode>(
query_params: QueryParams<FBuckets, FFindNode>,
bucket_num: usize,
) -> Box<Stream<Item = QueryEvent<()>, Error = IoError> + 'a>
) -> Box<Stream<Item = QueryEvent<()>, Error = IoError> + Send + 'a>
where
FBuckets: Fn(PeerId) -> Vec<PeerId> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError>> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError> + Send> + 'a + Clone,
{
let peer_id = match gen_random_id(&query_params.local_id, bucket_num) {
Ok(p) => p,
@ -132,10 +132,10 @@ fn query<'a, FBuckets, FFindNode>(
query_params: QueryParams<FBuckets, FFindNode>,
searched_key: PeerId,
num_results: usize,
) -> Box<Stream<Item = QueryEvent<Vec<PeerId>>, Error = IoError> + 'a>
) -> Box<Stream<Item = QueryEvent<Vec<PeerId>>, Error = IoError> + Send + 'a>
where
FBuckets: Fn(PeerId) -> Vec<PeerId> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError>> + 'a + Clone,
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError> + Send> + 'a + Clone,
{
debug!("Start query for {:?} ; num results = {}", searched_key, num_results);
@ -147,7 +147,7 @@ where
result: Vec<PeerId>,
// For each open connection, a future with the response of the remote.
// Note that don't use a `SmallVec` here because `select_all` produces a `Vec`.
current_attempts_fut: Vec<Box<Future<Item = Vec<protocol::Peer>, Error = IoError> + 'a>>,
current_attempts_fut: Vec<Box<Future<Item = Vec<protocol::Peer>, Error = IoError> + Send + 'a>>,
// For each open connection, the peer ID that we are connected to.
// Must always have the same length as `current_attempts_fut`.
current_attempts_addrs: SmallVec<[PeerId; 32]>,

View File

@ -65,10 +65,10 @@
//! .dial("127.0.0.1:12345".parse::<libp2p_core::Multiaddr>().unwrap()).unwrap_or_else(|_| panic!())
//! .and_then(|(out, _)| {
//! match out {
//! PingOutput::Ponger(processing) => Box::new(processing) as Box<Future<Item = _, Error = _>>,
//! PingOutput::Ponger(processing) => Box::new(processing) as Box<Future<Item = _, Error = _> + Send>,
//! PingOutput::Pinger { mut pinger, processing } => {
//! let f = pinger.ping().map_err(|_| panic!()).select(processing).map(|_| ()).map_err(|(err, _)| err);
//! Box::new(f) as Box<Future<Item = _, Error = _>>
//! Box::new(f) as Box<Future<Item = _, Error = _> + Send>
//! },
//! }
//! });
@ -118,15 +118,15 @@ pub enum PingOutput {
/// Object to use in order to ping the remote.
pinger: Pinger,
/// Future that drives the processing of the pings.
processing: Box<Future<Item = (), Error = IoError>>,
processing: Box<Future<Item = (), Error = IoError> + Send>,
},
/// We are on the listening side.
Ponger(Box<Future<Item = (), Error = IoError>>),
Ponger(Box<Future<Item = (), Error = IoError> + Send>),
}
impl<C, Maf> ConnectionUpgrade<C, Maf> for Ping
where
C: AsyncRead + AsyncWrite + 'static,
C: AsyncRead + AsyncWrite + Send + 'static,
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
@ -158,7 +158,7 @@ where
}
/// Upgrades a connection from the dialer side.
fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutput {
fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + Send + 'static) -> PingOutput {
// # How does it work?
//
// All the actual processing is performed by the *ponger*.
@ -199,7 +199,7 @@ fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutpu
Box::new(
sink.send(payload)
.map(|sink| Loop::Continue((sink, stream))),
) as Box<Future<Item = _, Error = _>>
) as Box<Future<Item = _, Error = _> + Send>
}
Message::Received(payload) => {
// Received a payload from the remote.
@ -210,17 +210,17 @@ fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutpu
debug!("Received pong (payload={:?}) ; ping fufilled", payload);
let _ = fut.send(());
Box::new(Ok(Loop::Continue((sink, stream))).into_future())
as Box<Future<Item = _, Error = _>>
as Box<Future<Item = _, Error = _> + Send>
} else {
// Payload was unexpected. Closing connection.
debug!("Received invalid payload ({:?}) ; closing", payload);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
as Box<Future<Item = _, Error = _> + Send>
}
}
}
} else {
Box::new(Ok(Loop::Break(())).into_future()) as Box<Future<Item = _, Error = _>>
Box::new(Ok(Loop::Break(())).into_future()) as Box<Future<Item = _, Error = _> + Send>
}
})
});
@ -232,7 +232,7 @@ fn upgrade_as_dialer(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutpu
}
/// Upgrades a connection from the listener side.
fn upgrade_as_listener(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOutput {
fn upgrade_as_listener(socket: impl AsyncRead + AsyncWrite + Send + 'static) -> PingOutput {
let sink_stream = Framed::new(socket, Codec);
let (sink, stream) = sink_stream.split();
@ -247,10 +247,10 @@ fn upgrade_as_listener(socket: impl AsyncRead + AsyncWrite + 'static) -> PingOut
Box::new(
sink.send(payload.freeze())
.map(|sink| Loop::Continue((sink, stream))),
) as Box<Future<Item = _, Error = _>>
) as Box<Future<Item = _, Error = _> + Send>
} else {
// Connection was closed
Box::new(Ok(Loop::Break(())).into_future()) as Box<Future<Item = _, Error = _>>
Box::new(Ok(Loop::Break(())).into_future()) as Box<Future<Item = _, Error = _> + Send>
}
})
});
@ -269,7 +269,7 @@ impl Pinger {
///
/// **Note**: Please be aware that there is no timeout on the ping. You should handle the
/// timeout yourself when you call this function.
pub fn ping(&mut self) -> Box<Future<Item = (), Error = Box<Error + Send + Sync>>> {
pub fn ping(&mut self) -> Box<Future<Item = (), Error = Box<Error + Send + Sync>> + Send> {
let (tx, rx) = oneshot::channel();
let payload: [u8; 32] = self.rng.sample(Standard);

View File

@ -59,9 +59,9 @@ use {SecioKeyPair, SecioKeyPairInner};
pub fn handshake<'a, S: 'a>(
socket: S,
local_key: SecioKeyPair,
) -> Box<Future<Item = (FullCodec<S>, PublicKey, Vec<u8>), Error = SecioError> + 'a>
) -> Box<Future<Item = (FullCodec<S>, PublicKey, Vec<u8>), Error = SecioError> + Send + 'a>
where
S: AsyncRead + AsyncWrite,
S: AsyncRead + AsyncWrite + Send,
{
// TODO: could be rewritten as a coroutine once coroutines land in stable Rust

View File

@ -289,12 +289,12 @@ where
impl<S, Maf> libp2p_core::ConnectionUpgrade<S, Maf> for SecioConfig
where
S: AsyncRead + AsyncWrite + 'static, // TODO: 'static :(
Maf: 'static, // TODO: 'static :(
S: AsyncRead + AsyncWrite + Send + 'static, // TODO: 'static :(
Maf: Send + 'static, // TODO: 'static :(
{
type Output = SecioOutput<S>;
type MultiaddrFuture = Maf;
type Future = Box<Future<Item = (Self::Output, Maf), Error = IoError>>;
type Future = Box<Future<Item = (Self::Output, Maf), Error = IoError> + Send>;
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
@ -342,7 +342,7 @@ pub struct SecioMiddleware<S> {
impl<S> SecioMiddleware<S>
where
S: AsyncRead + AsyncWrite,
S: AsyncRead + AsyncWrite + Send,
{
/// Attempts to perform a handshake on the given socket.
///
@ -351,7 +351,7 @@ where
pub fn handshake<'a>(
socket: S,
key_pair: SecioKeyPair,
) -> Box<Future<Item = (SecioMiddleware<S>, PublicKey, Vec<u8>), Error = SecioError> + 'a>
) -> Box<Future<Item = (SecioMiddleware<S>, PublicKey, Vec<u8>), Error = SecioError> + Send + 'a>
where
S: 'a,
{

View File

@ -62,8 +62,8 @@ where
C: AsyncRead + AsyncWrite,
F: Fn(C) -> O,
O: IntoFuture<Error = IoError>,
O::Future: 'static,
Maf: 'static,
O::Future: Send + 'static,
Maf: Send + 'static,
{
type NamesIter = iter::Once<(Bytes, ())>;
type UpgradeIdentifier = ();
@ -75,7 +75,7 @@ where
type Output = O::Item;
type MultiaddrFuture = Maf;
type Future = Box<Future<Item = (O::Item, Self::MultiaddrFuture), Error = IoError>>;
type Future = Box<Future<Item = (O::Item, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn upgrade(self, socket: C, _: (), _: Endpoint, client_addr: Maf) -> Self::Future {

View File

@ -94,13 +94,14 @@ where
impl<T> Transport for DnsConfig<T>
where
T: Transport + 'static, // TODO: 'static :-/
T: Transport + Send + 'static, // TODO: 'static :-/
T::Dial: Send,
{
type Output = T::Output;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {

View File

@ -157,13 +157,15 @@ where
impl<T> Transport for RateLimited<T>
where
T: Transport + 'static,
T::Output: AsyncRead + AsyncWrite,
T::Dial: Send,
T::MultiaddrFuture: Send,
T::Output: AsyncRead + AsyncWrite + Send,
{
type Output = Connection<T::Output>;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = Listener<T>;
type ListenerUpgrade = ListenerUpgrade<T>;
type Dial = Box<Future<Item = (Connection<T::Output>, Self::MultiaddrFuture), Error = io::Error>>;
type Dial = Box<Future<Item = (Connection<T::Output>, Self::MultiaddrFuture), Error = io::Error> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where

View File

@ -15,4 +15,4 @@ protobuf = "2.0.2"
rand = "0.4"
tokio-codec = "0.1"
tokio-io = "0.1"
unsigned-varint = { version = "0.2", features = ["codec"] }
unsigned-varint = { version = "0.2.1", features = ["codec"] }

View File

@ -46,17 +46,21 @@ pub struct RelayConfig<T, P> {
// but otherwise the stream is not programmatically accessible.
pub enum Output<C> {
Stream(C),
Sealed(Box<Future<Item=(), Error=io::Error>>)
Sealed(Box<Future<Item=(), Error=io::Error> + Send>)
}
impl<C, T, P, S, Maf> ConnectionUpgrade<C, Maf> for RelayConfig<T, P>
where
C: AsyncRead + AsyncWrite + 'static,
T: Transport + Clone + 'static,
T::Output: AsyncRead + AsyncWrite,
P: Deref<Target=S> + Clone + 'static,
C: AsyncRead + AsyncWrite + Send + 'static,
T: Transport + Clone + Send + 'static,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::MultiaddrFuture: Send,
T::Output: AsyncRead + AsyncWrite + Send,
P: Deref<Target=S> + Clone + Send + 'static,
S: 'static,
Maf: 'static,
Maf: Send + 'static,
for<'a> &'a S: Peerstore
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
@ -68,7 +72,7 @@ where
type Output = Output<C>;
type MultiaddrFuture = Maf;
type Future = Box<Future<Item=(Self::Output, Maf), Error=io::Error>>;
type Future = Box<Future<Item=(Self::Output, Maf), Error=io::Error> + Send>;
fn upgrade(self, conn: C, _: (), _: Endpoint, remote_addr: Maf) -> Self::Future {
let future = Io::new(conn).recv().and_then(move |(message, io)| {
@ -98,7 +102,11 @@ where
impl<T, P, S> RelayConfig<T, P>
where
T: Transport + Clone + 'static,
T::Output: AsyncRead + AsyncWrite,
T::Dial: Send, // TODO: remove
T::Listener: Send, // TODO: remove
T::ListenerUpgrade: Send, // TODO: remove
T::MultiaddrFuture: Send, // TODO: remove
T::Output: Send + AsyncRead + AsyncWrite,
P: Deref<Target = S> + Clone + 'static,
for<'a> &'a S: Peerstore,
{
@ -268,8 +276,8 @@ pub(crate) struct Source(pub(crate) CircuitRelay);
impl<C, Maf> ConnectionUpgrade<C, Maf> for Source
where
C: AsyncRead + AsyncWrite + 'static,
Maf: 'static,
C: AsyncRead + AsyncWrite + Send + 'static,
Maf: Send + 'static,
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
@ -280,7 +288,7 @@ where
type Output = C;
type MultiaddrFuture = Maf;
type Future = Box<Future<Item=(Self::Output, Maf), Error=io::Error>>;
type Future = Box<Future<Item=(Self::Output, Maf), Error=io::Error> + Send>;
fn upgrade(self, conn: C, _: (), _: Endpoint, remote_addr: Maf) -> Self::Future {
let future = Io::new(conn)

View File

@ -39,17 +39,21 @@ pub struct RelayTransport<T, P> {
impl<T, P, S> Transport for RelayTransport<T, P>
where
T: Transport + Clone + 'static,
T::Output: AsyncRead + AsyncWrite,
T: Transport + Send + Clone + 'static,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::MultiaddrFuture: Send,
T::Output: AsyncRead + AsyncWrite + Send,
P: Deref<Target=S> + Clone + 'static,
S: 'static,
for<'a> &'a S: Peerstore
{
type Output = T::Output;
type MultiaddrFuture = T::MultiaddrFuture;
type Listener = Box<Stream<Item=Self::ListenerUpgrade, Error=io::Error>>;
type ListenerUpgrade = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error>>;
type Dial = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error>>;
type Listener = Box<Stream<Item=Self::ListenerUpgrade, Error=io::Error> + Send>;
type ListenerUpgrade = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error> + Send>;
type Dial = Box<Future<Item=(Self::Output, Self::MultiaddrFuture), Error=io::Error> + Send>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Err((self, addr))
@ -85,7 +89,11 @@ where
impl<T, P, S> RelayTransport<T, P>
where
T: Transport + Clone + 'static,
T::Output: AsyncRead + AsyncWrite,
T::Dial: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
T::MultiaddrFuture: Send,
T::Output: AsyncRead + AsyncWrite + Send,
P: Deref<Target=S> + Clone + 'static,
for<'a> &'a S: Peerstore
{

View File

@ -13,7 +13,9 @@ rw-stream-sink = { path = "../../misc/rw-stream-sink" }
tokio-io = "0.1"
[target.'cfg(not(target_os = "emscripten"))'.dependencies]
websocket = { version = "0.20.2", default-features = false, features = ["async", "async-ssl"] }
# TODO: restore the upstream version once the branch is merged
websocket = { git = "https://github.com/tomaka/rust-websocket", branch = "send", default-features = false, features = ["async", "async-ssl"] }
#websocket = { version = "0.20.2", default-features = false, features = ["async", "async-ssl"] }
[target.'cfg(target_os = "emscripten")'.dependencies]
stdweb = { version = "0.1.3", default-features = false }

View File

@ -54,10 +54,10 @@ impl BrowserWsConfig {
impl Transport for BrowserWsConfig {
type Output = BrowserWsConn;
type MultiaddrFuture = FutureResult<Multiaddr, IoError>;
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>; // TODO: use `!`
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError> + Send>; // TODO: use `!`
type ListenerUpgrade =
Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>; // TODO: use `!`
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>; // TODO: use `!`
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
#[inline]
fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {

View File

@ -59,16 +59,20 @@ impl<T> Transport for WsConfig<T>
where
// TODO: this 'static is pretty arbitrary and is necessary because of the websocket library
T: Transport + 'static,
T::Dial: Send,
T::MultiaddrFuture: Send,
T::Listener: Send,
T::ListenerUpgrade: Send,
// TODO: this Send is pretty arbitrary and is necessary because of the websocket library
T::Output: AsyncRead + AsyncWrite + Send,
{
type Output = Box<AsyncStream>;
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = IoError>>;
type Output = Box<AsyncStream + Send>;
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = IoError> + Send>;
type Listener =
stream::Map<T::Listener, fn(<T as Transport>::ListenerUpgrade) -> Self::ListenerUpgrade>;
type ListenerUpgrade =
Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError> + Send>;
fn listen_on(
self,
@ -139,16 +143,16 @@ where
.map(|v| v.expect("we only take while this is Some"));
let read_write = RwStreamSink::new(framed_data);
Box::new(read_write) as Box<AsyncStream>
Box::new(read_write) as Box<AsyncStream + Send>
})
})
.map(|s| Box::new(Ok(s).into_future()) as Box<Future<Item = _, Error = _>>)
.map(|s| Box::new(Ok(s).into_future()) as Box<Future<Item = _, Error = _> + Send>)
.into_future()
.flatten()
.map(move |v| (v, Box::new(client_addr) as Box<Future<Item = _, Error = _>>))
.map(move |v| (v, Box::new(client_addr) as Box<Future<Item = _, Error = _> + Send>))
});
Box::new(upgraded) as Box<Future<Item = _, Error = _>>
Box::new(upgraded) as Box<Future<Item = _, Error = _> + Send>
});
Ok((listen, new_addr))
@ -198,7 +202,7 @@ where
addr.append(AddrComponent::WS);
};
addr
})) as Box<Future<Item = _, Error = _>>;
})) as Box<Future<Item = _, Error = _> + Send>;
ClientBuilder::new(&ws_addr)
.expect("generated ws address is always valid")
@ -223,7 +227,7 @@ where
}
});
let read_write = RwStreamSink::new(framed_data);
Box::new(read_write) as Box<AsyncStream>
Box::new(read_write) as Box<AsyncStream + Send>
})
.map(move |c| (c, client_addr))
});