Remove dial_custom_handler (#203)

* Remove dial_custom_handler

* Rename dial_to_handler to dial
This commit is contained in:
Pierre Krieger 2018-05-22 18:58:27 +02:00 committed by GitHub
parent 6d41923ca5
commit e5f23c74c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 45 additions and 74 deletions

View File

@ -113,7 +113,7 @@ where
/// upgraded using the `upgrade`, and the output is sent to the handler that was passed when /// upgraded using the `upgrade`, and the output is sent to the handler that was passed when
/// calling `swarm`. /// calling `swarm`.
// TODO: consider returning a future so that errors can be processed? // TODO: consider returning a future so that errors can be processed?
pub fn dial_to_handler<Du>(&self, multiaddr: Multiaddr, transport: Du) -> Result<(), Multiaddr> pub fn dial<Du>(&self, multiaddr: Multiaddr, transport: Du) -> Result<(), Multiaddr>
where where
Du: Transport + 'static, // TODO: 'static :-/ Du: Transport + 'static, // TODO: 'static :-/
Du::Output: Into<T::Output>, Du::Output: Into<T::Output>,
@ -134,37 +134,6 @@ where
} }
} }
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
/// upgraded using the `upgrade`, and the output is then passed to `and_then`.
///
/// Contrary to `dial_to_handler`, the output of the upgrade is not given to the handler that
/// was passed at initialization.
// TODO: consider returning a future so that errors can be processed?
pub fn dial_custom_handler<Du, Df, Dfu>(
&self,
multiaddr: Multiaddr,
transport: Du,
and_then: Df,
) -> Result<(), Multiaddr>
where
Du: Transport + 'static, // TODO: 'static :-/
Df: FnOnce(Du::Output, Multiaddr) -> Dfu + 'static, // TODO: 'static :-/
Dfu: IntoFuture<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
{
trace!("Swarm dialing {} with custom handler", multiaddr);
match transport.dial(multiaddr) {
Ok(dial) => {
let dial = Box::new(dial.and_then(|(d, m)| and_then(d, m))) as Box<_>;
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_toprocess.unbounded_send(dial);
Ok(())
}
Err((_, multiaddr)) => Err(multiaddr),
}
}
/// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that /// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that
/// was passed to `swarm`. /// was passed to `swarm`.
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> { pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {

View File

@ -468,7 +468,7 @@ where
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
// Need to open a connection. // Need to open a connection.
match self.swarm_controller match self.swarm_controller
.dial_to_handler(addr, self.kademlia_transport.clone()) .dial(addr, self.kademlia_transport.clone())
{ {
Ok(()) => (), Ok(()) => (),
Err(_addr) => { Err(_addr) => {

View File

@ -29,7 +29,7 @@ use futures::sync::oneshot;
use futures::{Future, Sink, Stream}; use futures::{Future, Sink, Stream};
use std::env; use std::env;
use libp2p::core::Transport; use libp2p::core::Transport;
use libp2p::core::upgrade::{self, DeniedConnectionUpgrade, SimpleProtocol}; use libp2p::core::upgrade::{self, SimpleProtocol};
use libp2p::tcp::TcpConfig; use libp2p::tcp::TcpConfig;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
use tokio_io::AsyncRead; use tokio_io::AsyncRead;
@ -80,17 +80,6 @@ fn main() {
// a `Transport`. // a `Transport`.
.into_connection_reuse(); .into_connection_reuse();
// Let's put this `transport` into a *swarm*. The swarm will handle all the incoming
// connections for us. The second parameter we pass is the connection upgrade that is accepted
// by the listening part. We don't want to accept anything, so we pass a dummy object that
// represents a connection that is always denied.
let (swarm_controller, swarm_future) = libp2p::core::swarm(
transport.clone().with_upgrade(DeniedConnectionUpgrade),
|_socket, _client_addr| -> Result<(), _> {
unreachable!("All incoming connections should have been denied")
},
);
// Building a struct that represents the protocol that we are going to use for dialing. // Building a struct that represents the protocol that we are going to use for dialing.
let proto = SimpleProtocol::new("/echo/1.0.0", |socket| { let proto = SimpleProtocol::new("/echo/1.0.0", |socket| {
// This closure is called whenever a stream using the "echo" protocol has been // This closure is called whenever a stream using the "echo" protocol has been
@ -100,25 +89,39 @@ fn main() {
Ok(AsyncRead::framed(socket, BytesCodec::new())) Ok(AsyncRead::framed(socket, BytesCodec::new()))
}); });
// We now use the controller to dial to the address.
let (finished_tx, finished_rx) = oneshot::channel(); let (finished_tx, finished_rx) = oneshot::channel();
swarm_controller let mut finished_tx = Some(finished_tx);
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), transport.with_upgrade(proto), |echo, _| {
// Let's put this `transport` into a *swarm*. The swarm will handle all the incoming
// connections for us. The second parameter we pass is the connection upgrade that is accepted
// by the listening part. We don't want to accept anything, so we pass a dummy object that
// represents a connection that is always denied.
let (swarm_controller, swarm_future) = libp2p::core::swarm(
transport.clone().with_upgrade(proto.clone()),
|echo, _client_addr| {
// `echo` is what the closure used when initializing `proto` returns. // `echo` is what the closure used when initializing `proto` returns.
// Consequently, please note that the `send` method is available only because the type // Consequently, please note that the `send` method is available only because the type
// `length_delimited::Framed` has a `send` method. // `length_delimited::Framed` has a `send` method.
println!("Sending \"hello world\" to listener"); println!("Sending \"hello world\" to listener");
let finished_tx = finished_tx.take();
echo.send("hello world".into()) echo.send("hello world".into())
// Then listening for one message from the remote. // Then listening for one message from the remote.
.and_then(|echo| { .and_then(|echo| {
echo.into_future().map_err(|(e, _)| e).map(|(n,_ )| n) echo.into_future().map_err(|(e, _)| e).map(|(n,_ )| n)
}) })
.and_then(|message| { .and_then(move |message| {
println!("Received message from listener: {:?}", message.unwrap()); println!("Received message from listener: {:?}", message.unwrap());
if let Some(finished_tx) = finished_tx {
finished_tx.send(()).unwrap(); finished_tx.send(()).unwrap();
}
Ok(()) Ok(())
}) })
}) },
);
// We now use the controller to dial to the address.
swarm_controller
.dial(target_addr.parse().expect("invalid multiaddr"), transport.with_upgrade(proto))
// If the multiaddr protocol exists but is not supported, then we get an error containing // If the multiaddr protocol exists but is not supported, then we get an error containing
// the original multiaddress. // the original multiaddress.
.expect("unsupported multiaddr"); .expect("unsupported multiaddr");

View File

@ -135,7 +135,7 @@ fn main() {
let target: Multiaddr = msg[6..].parse().unwrap(); let target: Multiaddr = msg[6..].parse().unwrap();
println!("*Dialing {}*", target); println!("*Dialing {}*", target);
swarm_controller swarm_controller
.dial_to_handler( .dial(
target, target,
transport.clone().with_upgrade(floodsub_upgrade.clone()), transport.clone().with_upgrade(floodsub_upgrade.clone()),
) )

View File

@ -29,7 +29,7 @@ use futures::Future;
use futures::sync::oneshot; use futures::sync::oneshot;
use std::env; use std::env;
use libp2p::core::Transport; use libp2p::core::Transport;
use libp2p::core::upgrade::{self, DeniedConnectionUpgrade}; use libp2p::core::upgrade;
use libp2p::tcp::TcpConfig; use libp2p::tcp::TcpConfig;
use tokio_core::reactor::Core; use tokio_core::reactor::Core;
@ -76,25 +76,26 @@ fn main() {
// connections for us. The second parameter we pass is the connection upgrade that is accepted // connections for us. The second parameter we pass is the connection upgrade that is accepted
// by the listening part. We don't want to accept anything, so we pass a dummy object that // by the listening part. We don't want to accept anything, so we pass a dummy object that
// represents a connection that is always denied. // represents a connection that is always denied.
let (tx, rx) = oneshot::channel();
let mut tx = Some(tx);
let (swarm_controller, swarm_future) = libp2p::core::swarm( let (swarm_controller, swarm_future) = libp2p::core::swarm(
transport.clone().with_upgrade(DeniedConnectionUpgrade), transport.clone().with_upgrade(libp2p::ping::Ping),
|_socket, _client_addr| -> Result<(), _> { |(mut pinger, future), _client_addr| {
unreachable!("All incoming connections should have been denied") let tx = tx.take();
let ping = pinger.ping().map_err(|_| unreachable!()).inspect(move |_| {
println!("Received pong from the remote");
if let Some(tx) = tx {
let _ = tx.send(());
}
});
ping.select(future).map(|_| ()).map_err(|(e, _)| e)
}, },
); );
// We now use the controller to dial to the address. // We now use the controller to dial to the address.
let (tx, rx) = oneshot::channel();
swarm_controller swarm_controller
.dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), .dial(target_addr.parse().expect("invalid multiaddr"),
transport.with_upgrade(libp2p::ping::Ping), transport.with_upgrade(libp2p::ping::Ping))
|(mut pinger, future), _| {
let ping = pinger.ping().map_err(|_| unreachable!()).inspect(|_| {
println!("Received pong from the remote");
let _ = tx.send(());
});
ping.select(future).map(|_| ()).map_err(|(e, _)| e)
})
// If the multiaddr protocol exists but is not supported, then we get an error containing // If the multiaddr protocol exists but is not supported, then we get an error containing
// the original multiaddress. // the original multiaddress.
.expect("unsupported multiaddr"); .expect("unsupported multiaddr");

View File

@ -131,17 +131,11 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box<Error>> {
RelayTransport::new(opts.me, tcp, store, iter::once(opts.relay)).with_dummy_muxing() RelayTransport::new(opts.me, tcp, store, iter::once(opts.relay)).with_dummy_muxing()
}; };
let (control, future) = libp2p::core::swarm(transport.clone(), |_, _| {
future::ok(())
});
let echo = SimpleProtocol::new("/echo/1.0.0", |socket| { let echo = SimpleProtocol::new("/echo/1.0.0", |socket| {
Ok(AsyncRead::framed(socket, BytesCodec::new())) Ok(AsyncRead::framed(socket, BytesCodec::new()))
}); });
let address = format!("/p2p-circuit/p2p/{}", opts.dest.to_base58()).parse()?; let (control, future) = libp2p::core::swarm(transport.clone().with_upgrade(echo.clone()), |socket, _| {
control.dial_custom_handler(address, transport.with_upgrade(echo), |socket, _| {
println!("sending \"hello world\""); println!("sending \"hello world\"");
socket.send("hello world".into()) socket.send("hello world".into())
.and_then(|socket| socket.into_future().map_err(|(e, _)| e).map(|(m, _)| m)) .and_then(|socket| socket.into_future().map_err(|(e, _)| e).map(|(m, _)| m))
@ -149,7 +143,11 @@ fn run_dialer(opts: DialerOpts) -> Result<(), Box<Error>> {
println!("received message: {:?}", message); println!("received message: {:?}", message);
Ok(()) Ok(())
}) })
}).map_err(|_| "failed to dial")?; });
let address = format!("/p2p-circuit/p2p/{}", opts.dest.to_base58()).parse()?;
control.dial(address, transport.with_upgrade(echo)).map_err(|_| "failed to dial")?;
core.run(future).map_err(From::from) core.run(future).map_err(From::from)
} }