diff --git a/example/Cargo.toml b/example/Cargo.toml index 9dbb378e..4ae24fc3 100644 --- a/example/Cargo.toml +++ b/example/Cargo.toml @@ -6,6 +6,7 @@ authors = ["pierre "] [dependencies] bytes = "0.4" futures = "0.1" +multiplex = { path = "../multiplex-rs" } libp2p-secio = { path = "../libp2p-secio" } libp2p-swarm = { path = "../libp2p-swarm" } libp2p-tcp-transport = { path = "../libp2p-tcp-transport" } diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index 73b90a89..fa2098af 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -20,6 +20,7 @@ extern crate bytes; extern crate futures; +extern crate multiplex; extern crate libp2p_secio as secio; extern crate libp2p_swarm as swarm; extern crate libp2p_tcp_transport as tcp; @@ -52,6 +53,10 @@ fn main() { } }) + .with_upgrade(multiplex::MultiplexConfig); + let transport: swarm::ConnectionReuse<_, _> = transport.into(); + + let transport = transport // On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol // just for this example. // For this purpose, we create a `SimpleProtocol` struct. @@ -68,17 +73,20 @@ fn main() { // of any opened stream. // We use it to dial `/ip4/127.0.0.1/tcp/10333`. - let dialer = transport.dial(swarm::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap()) + let dialer = transport.dial_and_listen(swarm::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap()) .unwrap_or_else(|_| panic!("unsupported multiaddr protocol ; should never happen")) - .and_then(|echo| { + .and_then(|(echo, incoming)| { // `echo` is what the closure used when initializing "echo" returns. // Consequently, please note that the `send` method is available only because the type // `length_delimited::Framed` has a `send` method. - echo.send("hello world".into()) + echo.send("hello world".into()).map(Option::Some) + .select(incoming.for_each(|_| { println!("opened"); Ok(()) }).map(|()| None)) + .map(|(n, _)| n) + .map_err(|(e, _)| e) }) .and_then(|echo| { // The message has been successfully sent. Now wait for an answer. - echo.into_future() + echo.unwrap().into_future() .map(|(msg, rest)| { println!("received: {:?}", msg); rest diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index e0fdaa29..a0b104b4 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -181,4 +181,4 @@ pub use self::connection_reuse::ConnectionReuse; pub use self::multiaddr::Multiaddr; pub use self::muxing::StreamMuxer; pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade}; -pub use self::transport::{Endpoint, SimpleProtocol}; +pub use self::transport::{Endpoint, SimpleProtocol, MuxedTransport}; diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 03b78bd1..870248e1 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -786,7 +786,7 @@ impl<'a, T, C> UpgradedNode negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade2)) } .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id) + upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) }); let in_stream = in_stream @@ -803,7 +803,7 @@ impl<'a, T, C> UpgradedNode .map_err(|err| IoError::new(IoErrorKind::Other, err)) }) .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id) + upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) }); dialer.map(|d| (d, Box::new(in_stream) as Box>)) diff --git a/multiplex-rs/src/lib.rs b/multiplex-rs/src/lib.rs index 8c47bf26..d7ebd113 100644 --- a/multiplex-rs/src/lib.rs +++ b/multiplex-rs/src/lib.rs @@ -217,7 +217,7 @@ impl OutboundFuture { } fn nonce_to_id(id: usize, end: Endpoint) -> u32 { - id as u32 * 2 + if end == Endpoint::Dialer { 1 } else { 0 } + id as u32 * 2 + if end == Endpoint::Dialer { 50 } else { 0 } } impl Future for OutboundFuture { @@ -330,6 +330,7 @@ impl StreamMuxer for Multiplex { } } +#[derive(Debug, Copy, Clone)] pub struct MultiplexConfig; impl ConnectionUpgrade for MultiplexConfig diff --git a/multistream-select/src/dialer_select.rs b/multistream-select/src/dialer_select.rs index b05451d7..7657b6ef 100644 --- a/multistream-select/src/dialer_select.rs +++ b/multistream-select/src/dialer_select.rs @@ -95,6 +95,7 @@ pub fn dialer_select_proto_serial<'a, R, I, P>( }) // Once read, analyze the response. .and_then(|(message, rest, proto_name, proto_value)| { + if message.is_none() { println!("empty"); } let message = message.ok_or(ProtocolChoiceError::UnexpectedMessage)?; match message { @@ -107,7 +108,7 @@ pub fn dialer_select_proto_serial<'a, R, I, P>( ListenerToDialerMessage::NotAvailable => { Ok(Loop::Continue(rest)) }, - _ => Err(ProtocolChoiceError::UnexpectedMessage) + _ => { println!("c {:?}", message); Err(ProtocolChoiceError::UnexpectedMessage) } } }) }) @@ -140,7 +141,7 @@ pub fn dialer_select_proto_parallel<'a, R, I, M, P>( .and_then(move |(msg, dialer)| { let list = match msg { Some(ListenerToDialerMessage::ProtocolsListResponse { list }) => list, - _ => return Err(ProtocolChoiceError::UnexpectedMessage), + _ => { println!("b {:?}", msg); return Err(ProtocolChoiceError::UnexpectedMessage) }, }; let mut found = None; @@ -174,7 +175,7 @@ pub fn dialer_select_proto_parallel<'a, R, I, M, P>( Some(ListenerToDialerMessage::ProtocolAck { ref name }) if name == &proto_name => { Ok((proto_val, dialer.into_inner())) } - _ => Err(ProtocolChoiceError::UnexpectedMessage), + _ => { println!("a {:?}", msg); Err(ProtocolChoiceError::UnexpectedMessage) }, }); // The "Rust doesn't have impl Trait yet" tax.