From e5f23c74c0efc93bc703273a00fb52bdccd8c016 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 22 May 2018 18:58:27 +0200 Subject: [PATCH] Remove dial_custom_handler (#203) * Remove dial_custom_handler * Rename dial_to_handler to dial --- core/src/swarm.rs | 33 +--------------------------- kad/src/high_level.rs | 2 +- libp2p/examples/echo-dialer.rs | 39 ++++++++++++++++++---------------- libp2p/examples/floodsub.rs | 2 +- libp2p/examples/ping-client.rs | 29 +++++++++++++------------ libp2p/examples/relay.rs | 14 ++++++------ 6 files changed, 45 insertions(+), 74 deletions(-) diff --git a/core/src/swarm.rs b/core/src/swarm.rs index 83172754..f261ae5b 100644 --- a/core/src/swarm.rs +++ b/core/src/swarm.rs @@ -113,7 +113,7 @@ where /// upgraded using the `upgrade`, and the output is sent to the handler that was passed when /// calling `swarm`. // TODO: consider returning a future so that errors can be processed? - pub fn dial_to_handler(&self, multiaddr: Multiaddr, transport: Du) -> Result<(), Multiaddr> + pub fn dial(&self, multiaddr: Multiaddr, transport: Du) -> Result<(), Multiaddr> where Du: Transport + 'static, // TODO: 'static :-/ Du::Output: Into, @@ -134,37 +134,6 @@ where } } - /// Asks the swarm to dial the node with the given multiaddress. The connection is then - /// upgraded using the `upgrade`, and the output is then passed to `and_then`. - /// - /// Contrary to `dial_to_handler`, the output of the upgrade is not given to the handler that - /// was passed at initialization. - // TODO: consider returning a future so that errors can be processed? - pub fn dial_custom_handler( - &self, - multiaddr: Multiaddr, - transport: Du, - and_then: Df, - ) -> Result<(), Multiaddr> - where - Du: Transport + 'static, // TODO: 'static :-/ - Df: FnOnce(Du::Output, Multiaddr) -> Dfu + 'static, // TODO: 'static :-/ - Dfu: IntoFuture + 'static, // TODO: 'static :-/ - { - trace!("Swarm dialing {} with custom handler", multiaddr); - - match transport.dial(multiaddr) { - Ok(dial) => { - let dial = Box::new(dial.and_then(|(d, m)| and_then(d, m))) as Box<_>; - // Ignoring errors if the receiver has been closed, because in that situation - // nothing is going to be processed anyway. - let _ = self.new_toprocess.unbounded_send(dial); - Ok(()) - } - Err((_, multiaddr)) => Err(multiaddr), - } - } - /// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that /// was passed to `swarm`. pub fn listen_on(&self, multiaddr: Multiaddr) -> Result { diff --git a/kad/src/high_level.rs b/kad/src/high_level.rs index 5eb22885..46f60d97 100644 --- a/kad/src/high_level.rs +++ b/kad/src/high_level.rs @@ -468,7 +468,7 @@ where Entry::Vacant(entry) => { // Need to open a connection. match self.swarm_controller - .dial_to_handler(addr, self.kademlia_transport.clone()) + .dial(addr, self.kademlia_transport.clone()) { Ok(()) => (), Err(_addr) => { diff --git a/libp2p/examples/echo-dialer.rs b/libp2p/examples/echo-dialer.rs index 4573366c..59d38155 100644 --- a/libp2p/examples/echo-dialer.rs +++ b/libp2p/examples/echo-dialer.rs @@ -29,7 +29,7 @@ use futures::sync::oneshot; use futures::{Future, Sink, Stream}; use std::env; use libp2p::core::Transport; -use libp2p::core::upgrade::{self, DeniedConnectionUpgrade, SimpleProtocol}; +use libp2p::core::upgrade::{self, SimpleProtocol}; use libp2p::tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::AsyncRead; @@ -80,17 +80,6 @@ fn main() { // a `Transport`. .into_connection_reuse(); - // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming - // connections for us. The second parameter we pass is the connection upgrade that is accepted - // by the listening part. We don't want to accept anything, so we pass a dummy object that - // represents a connection that is always denied. - let (swarm_controller, swarm_future) = libp2p::core::swarm( - transport.clone().with_upgrade(DeniedConnectionUpgrade), - |_socket, _client_addr| -> Result<(), _> { - unreachable!("All incoming connections should have been denied") - }, - ); - // Building a struct that represents the protocol that we are going to use for dialing. let proto = SimpleProtocol::new("/echo/1.0.0", |socket| { // This closure is called whenever a stream using the "echo" protocol has been @@ -100,25 +89,39 @@ fn main() { Ok(AsyncRead::framed(socket, BytesCodec::new())) }); - // We now use the controller to dial to the address. let (finished_tx, finished_rx) = oneshot::channel(); - swarm_controller - .dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), transport.with_upgrade(proto), |echo, _| { + let mut finished_tx = Some(finished_tx); + + // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming + // connections for us. The second parameter we pass is the connection upgrade that is accepted + // by the listening part. We don't want to accept anything, so we pass a dummy object that + // represents a connection that is always denied. + let (swarm_controller, swarm_future) = libp2p::core::swarm( + transport.clone().with_upgrade(proto.clone()), + |echo, _client_addr| { // `echo` is what the closure used when initializing `proto` returns. // Consequently, please note that the `send` method is available only because the type // `length_delimited::Framed` has a `send` method. println!("Sending \"hello world\" to listener"); + let finished_tx = finished_tx.take(); echo.send("hello world".into()) // Then listening for one message from the remote. .and_then(|echo| { echo.into_future().map_err(|(e, _)| e).map(|(n,_ )| n) }) - .and_then(|message| { + .and_then(move |message| { println!("Received message from listener: {:?}", message.unwrap()); - finished_tx.send(()).unwrap(); + if let Some(finished_tx) = finished_tx { + finished_tx.send(()).unwrap(); + } Ok(()) }) - }) + }, + ); + + // We now use the controller to dial to the address. + swarm_controller + .dial(target_addr.parse().expect("invalid multiaddr"), transport.with_upgrade(proto)) // If the multiaddr protocol exists but is not supported, then we get an error containing // the original multiaddress. .expect("unsupported multiaddr"); diff --git a/libp2p/examples/floodsub.rs b/libp2p/examples/floodsub.rs index 5fc92eaa..8eb30e4f 100644 --- a/libp2p/examples/floodsub.rs +++ b/libp2p/examples/floodsub.rs @@ -135,7 +135,7 @@ fn main() { let target: Multiaddr = msg[6..].parse().unwrap(); println!("*Dialing {}*", target); swarm_controller - .dial_to_handler( + .dial( target, transport.clone().with_upgrade(floodsub_upgrade.clone()), ) diff --git a/libp2p/examples/ping-client.rs b/libp2p/examples/ping-client.rs index b5891562..7056f5e3 100644 --- a/libp2p/examples/ping-client.rs +++ b/libp2p/examples/ping-client.rs @@ -29,7 +29,7 @@ use futures::Future; use futures::sync::oneshot; use std::env; use libp2p::core::Transport; -use libp2p::core::upgrade::{self, DeniedConnectionUpgrade}; +use libp2p::core::upgrade; use libp2p::tcp::TcpConfig; use tokio_core::reactor::Core; @@ -76,25 +76,26 @@ fn main() { // connections for us. The second parameter we pass is the connection upgrade that is accepted // by the listening part. We don't want to accept anything, so we pass a dummy object that // represents a connection that is always denied. + let (tx, rx) = oneshot::channel(); + let mut tx = Some(tx); let (swarm_controller, swarm_future) = libp2p::core::swarm( - transport.clone().with_upgrade(DeniedConnectionUpgrade), - |_socket, _client_addr| -> Result<(), _> { - unreachable!("All incoming connections should have been denied") + transport.clone().with_upgrade(libp2p::ping::Ping), + |(mut pinger, future), _client_addr| { + let tx = tx.take(); + let ping = pinger.ping().map_err(|_| unreachable!()).inspect(move |_| { + println!("Received pong from the remote"); + if let Some(tx) = tx { + let _ = tx.send(()); + } + }); + ping.select(future).map(|_| ()).map_err(|(e, _)| e) }, ); // We now use the controller to dial to the address. - let (tx, rx) = oneshot::channel(); swarm_controller - .dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), - transport.with_upgrade(libp2p::ping::Ping), - |(mut pinger, future), _| { - let ping = pinger.ping().map_err(|_| unreachable!()).inspect(|_| { - println!("Received pong from the remote"); - let _ = tx.send(()); - }); - ping.select(future).map(|_| ()).map_err(|(e, _)| e) - }) + .dial(target_addr.parse().expect("invalid multiaddr"), + transport.with_upgrade(libp2p::ping::Ping)) // If the multiaddr protocol exists but is not supported, then we get an error containing // the original multiaddress. .expect("unsupported multiaddr"); diff --git a/libp2p/examples/relay.rs b/libp2p/examples/relay.rs index 832f4b6d..6e080407 100644 --- a/libp2p/examples/relay.rs +++ b/libp2p/examples/relay.rs @@ -131,17 +131,11 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box> { RelayTransport::new(opts.me, tcp, store, iter::once(opts.relay)).with_dummy_muxing() }; - let (control, future) = libp2p::core::swarm(transport.clone(), |_, _| { - future::ok(()) - }); - let echo = SimpleProtocol::new("/echo/1.0.0", |socket| { Ok(AsyncRead::framed(socket, BytesCodec::new())) }); - let address = format!("/p2p-circuit/p2p/{}", opts.dest.to_base58()).parse()?; - - control.dial_custom_handler(address, transport.with_upgrade(echo), |socket, _| { + let (control, future) = libp2p::core::swarm(transport.clone().with_upgrade(echo.clone()), |socket, _| { println!("sending \"hello world\""); socket.send("hello world".into()) .and_then(|socket| socket.into_future().map_err(|(e, _)| e).map(|(m, _)| m)) @@ -149,7 +143,11 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box> { println!("received message: {:?}", message); Ok(()) }) - }).map_err(|_| "failed to dial")?; + }); + + let address = format!("/p2p-circuit/p2p/{}", opts.dest.to_base58()).parse()?; + + control.dial(address, transport.with_upgrade(echo)).map_err(|_| "failed to dial")?; core.run(future).map_err(From::from) }