From 6b7fc9508e1581bffe2779f7a75ab8b57cc247d6 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 4 Jan 2018 17:18:49 +0100 Subject: [PATCH] Update the echo dialer example --- example/examples/echo-dialer.rs | 80 ++++++++++++++++----------------- libp2p-swarm/src/lib.rs | 1 + libp2p-swarm/src/transport.rs | 23 ++++++++++ 3 files changed, 64 insertions(+), 40 deletions(-) diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index 06dea677..85023651 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -30,7 +30,7 @@ extern crate tokio_io; use bytes::BytesMut; use futures::{Future, Sink, Stream}; use std::env; -use swarm::{UpgradeExt, SimpleProtocol, Transport, MuxedTransport}; +use swarm::{UpgradeExt, SimpleProtocol, Transport, DeniedConnectionUpgrade}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::codec::length_delimited; @@ -70,51 +70,51 @@ fn main() { // a `Transport`. .into_connection_reuse(); - let transport_with_echo = transport - .clone() - // On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol - // just for this example. - // For this purpose, we create a `SimpleProtocol` struct. - .with_upgrade(SimpleProtocol::new("/echo/1.0.0", |socket| { - // This closure is called whenever a stream using the "echo" protocol has been - // successfully negotiated. The parameter is the raw socket (implements the AsyncRead - // and AsyncWrite traits), and the closure must return an implementation of - // `IntoFuture` that can yield any type of object. - Ok(length_delimited::Framed::<_, BytesMut>::new(socket)) - })); + // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming + // connections for us. The second parameter we pass is the connection upgrade that is accepted + // by the listening part. We don't want to accept anything, so we pass a dummy object that + // represents a connection that is always denied. + let (swarm_controller, swarm_future) = swarm::swarm(transport, DeniedConnectionUpgrade, + |_socket, _client_addr| -> Result<(), _> { + unreachable!("All incoming connections should have been denied") + }); - // We now have a `transport` variable that can be used either to dial nodes or listen to - // incoming connections, and that will automatically apply all the selected protocols on top - // of any opened stream. + // Building a struct that represents the protocol that we are going to use for dialing. + let proto = SimpleProtocol::new("/echo/1.0.0", |socket| { + // This closure is called whenever a stream using the "echo" protocol has been + // successfully negotiated. The parameter is the raw socket (implements the AsyncRead + // and AsyncWrite traits), and the closure must return an implementation of + // `IntoFuture` that can yield any type of object. + Ok(length_delimited::Framed::<_, BytesMut>::new(socket)) + }); - // We use it to dial the address. - let dialer = transport_with_echo - .dial(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr")) - // If the multiaddr protocol exists but is not supported, then we get an error containing - // the transport and the original multiaddress. Therefore we cannot directly use `unwrap()` - // or `expect()`, but have to add a `map_err()` beforehand. - .map_err(|(_, addr)| addr).expect("unsupported multiaddr") - - .and_then(|echo| { - // `echo` is what the closure used when initializing "echo" returns. + // We now use the controller to dial to the address. + swarm_controller + .dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), proto, |echo| { + // `echo` is what the closure used when initializing `proto` returns. // Consequently, please note that the `send` method is available only because the type // `length_delimited::Framed` has a `send` method. println!("Sending \"hello world\" to listener"); echo.send("hello world".into()) - }) - .and_then(|echo| { - // The message has been successfully sent. Now wait for an answer. - echo.into_future() - .map(|(msg, rest)| { - println!("Received message from listener: {:?}", msg); - rest + // Then listening for one message from the remote. + .and_then(|echo| { + echo.into_future().map_err(|(e, _)| e).map(|(n,_ )| n) }) - .map_err(|(err, _)| err) - }); + .and_then(|message| { + println!("Received message from listener: {:?}", message.unwrap()); + Ok(()) + }) + }) + // If the multiaddr protocol exists but is not supported, then we get an error containing + // the original multiaddress. + .expect("unsupported multiaddr"); - // `dialer` is a future that contains all the behaviour that we want, but nothing has actually - // started yet. Because we created the `TcpConfig` with tokio, we need to run the future - // through the tokio core. - core.run(dialer.map(|_| ()).select(transport.incoming().for_each(|_| Ok(())))) - .unwrap_or_else(|_| panic!()); + // The address we actually listen on can be different from the address that was passed to + // the `listen_on` function. For example if you pass `/ip4/0.0.0.0/tcp/0`, then the port `0` + // will be replaced with the actual port. + + // `swarm_future` is a future that contains all the behaviour that we want, but nothing has + // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the + // future through the tokio core. + core.run(swarm_future).unwrap(); } diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index b7da8738..fd5041f5 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -224,3 +224,4 @@ pub use self::muxing::StreamMuxer; pub use self::swarm::{swarm, SwarmController, SwarmFuture}; pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade}; pub use self::transport::{Endpoint, SimpleProtocol, MuxedTransport, UpgradeExt}; +pub use self::transport::{DeniedConnectionUpgrade}; diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 8099f988..80405b60 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -534,6 +534,29 @@ pub enum Endpoint { Listener, } +/// Implementation of `ConnectionUpgrade` that always fails to negotiate. +#[derive(Debug, Copy, Clone)] +pub struct DeniedConnectionUpgrade; + +impl ConnectionUpgrade for DeniedConnectionUpgrade + where C: AsyncRead + AsyncWrite +{ + type NamesIter = iter::Empty<(Bytes, ())>; + type UpgradeIdentifier = (); // TODO: could use `!` + type Output = (); // TODO: could use `!` + type Future = Box>; // TODO: could use `!` + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::empty() + } + + #[inline] + fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint) -> Self::Future { + unreachable!("the denied connection upgrade always fails to negotiate") + } +} + /// Extension trait for `ConnectionUpgrade`. Automatically implemented on everything. pub trait UpgradeExt { /// Builds a struct that will choose an upgrade between `self` and `other`, depending on what