From fda4b37931daf60f3c4d929dcbd709b4cfc47b33 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 7 Dec 2017 18:06:38 +0100 Subject: [PATCH] Add comments in the examples --- example/examples/echo-dialer.rs | 54 ++++++++++++++++++++-------- example/examples/echo-server.rs | 64 ++++++++++++++++++++++++++------- 2 files changed, 91 insertions(+), 27 deletions(-) diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index eb6e84f3..73b90a89 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -34,10 +34,15 @@ use tokio_core::reactor::Core; use tokio_io::codec::length_delimited; fn main() { + // We start by building the tokio engine that will run all the sockets. let mut core = Core::new().unwrap(); - let tcp = TcpConfig::new(core.handle()); - - let with_secio = tcp + + // Now let's build the transport stack. + // We start by creating a `TcpConfig` that indicates that we want TCP/IP. + let transport = TcpConfig::new(core.handle()) + + // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, + // depending on which one the remote supports. .with_upgrade(swarm::PlainTextConfig) .or_upgrade({ let private_key = include_bytes!("test-private-key.pk8"); @@ -45,19 +50,35 @@ fn main() { secio::SecioConfig { key: secio::SecioKeyPair::rsa_from_pkcs8(private_key, public_key).unwrap(), } - }); - - let with_echo = with_secio.with_upgrade(SimpleProtocol::new("/echo/1.0.0", |socket| { - Ok(length_delimited::Framed::<_, BytesMut>::new(socket)) - })); - - let dialer = with_echo.dial(swarm::multiaddr::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap()) - .unwrap_or_else(|_| panic!()) - .and_then(|f| { - f.send("hello world".into()) }) - .and_then(|f| { - f.into_future() + + // On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol + // just for this example. + // For this purpose, we create a `SimpleProtocol` struct. + .with_upgrade(SimpleProtocol::new("/echo/1.0.0", |socket| { + // This closure is called whenever a stream using the "echo" protocol has been + // successfully negotiated. The parameter is the raw socket (implements the AsyncRead + // and AsyncWrite traits), and the closure must return an implementation of + // `IntoFuture` that can yield any type of object. + Ok(length_delimited::Framed::<_, BytesMut>::new(socket)) + })); + + // We now have a `transport` variable that can be used either to dial nodes or listen to + // incoming connections, and that will automatically apply all the selected protocols on top + // of any opened stream. + + // We use it to dial `/ip4/127.0.0.1/tcp/10333`. + let dialer = transport.dial(swarm::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap()) + .unwrap_or_else(|_| panic!("unsupported multiaddr protocol ; should never happen")) + .and_then(|echo| { + // `echo` is what the closure used when initializing "echo" returns. + // Consequently, please note that the `send` method is available only because the type + // `length_delimited::Framed` has a `send` method. + echo.send("hello world".into()) + }) + .and_then(|echo| { + // The message has been successfully sent. Now wait for an answer. + echo.into_future() .map(|(msg, rest)| { println!("received: {:?}", msg); rest @@ -65,5 +86,8 @@ fn main() { .map_err(|(err, _)| err) }); + // `dialer` is a future that contains all the behaviour that we want, but nothing has actually + // started yet. Because we created the `TcpConfig` with tokio, we need to run the future + // through the tokio core. core.run(dialer).unwrap(); } diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index 8cb6fa7b..5c8fb209 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -34,10 +34,15 @@ use tokio_core::reactor::Core; use tokio_io::codec::length_delimited; fn main() { + // We start by building the tokio engine that will run all the sockets. let mut core = Core::new().unwrap(); - let tcp = TcpConfig::new(core.handle()); - let with_secio = tcp + // Now let's build the transport stack. + // We start by creating a `TcpConfig` that indicates that we want TCP/IP. + let transport = TcpConfig::new(core.handle()) + + // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, + // depending on which one the remote supports. .with_upgrade(swarm::PlainTextConfig) .or_upgrade({ let private_key = include_bytes!("test-private-key.pk8"); @@ -45,27 +50,62 @@ fn main() { secio::SecioConfig { key: secio::SecioKeyPair::rsa_from_pkcs8(private_key, public_key).unwrap(), } - }); + }) + + // On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol + // just for this example. + // For this purpose, we create a `SimpleProtocol` struct. + .with_upgrade(SimpleProtocol::new("/echo/1.0.0", |socket| { + // This closure is called whenever a stream using the "echo" protocol has been + // successfully negotiated. The parameter is the raw socket (implements the AsyncRead + // and AsyncWrite traits), and the closure must return an implementation of + // `IntoFuture` that can yield any type of object. + Ok(length_delimited::Framed::new(socket)) + })); - let with_echo = with_secio.with_upgrade(SimpleProtocol::new("/echo/1.0.0", |socket| { - Ok(length_delimited::Framed::new(socket)) - })); + // We now have a `transport` variable that can be used either to dial nodes or listen to + // incoming connections, and that will automatically apply all the selected protocols on top + // of any opened stream. - let future = with_echo.listen_on(swarm::multiaddr::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap()) - .unwrap_or_else(|_| panic!()).0 - .for_each(|(socket, _)| { - loop_fn(socket, |socket| { + // We use it to listen on `/ip4/127.0.0.1/tcp/10333`. + let future = transport.listen_on(swarm::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap()) + .unwrap_or_else(|_| panic!("unsupported multiaddr protocol ; should never happen")).0 + + .for_each(|(socket, client_addr)| { + // This closure is called whenever a new connection has been received and successfully + // upgraded to use secio/plaintext and echo. + let client_addr = client_addr.to_string(); + println!("Received connection from {}", client_addr); + + // We loop forever in order to handle all the messages sent by the client. + let client_finished = loop_fn(socket, |socket| { socket.into_future() .map_err(|(err, _)| err) .and_then(|(msg, rest)| { if let Some(msg) = msg { - Box::new(rest.send(msg).map(|m| Loop::Continue(m))) as Box> + // One message has been received. We send it back to the client. + Box::new(rest.send(msg).map(|m| Loop::Continue(m))) + as Box> } else { - Box::new(Ok(Loop::Break(())).into_future()) as Box> + // End of stream. Connection closed. Breaking the loop. + Box::new(Ok(Loop::Break(())).into_future()) + as Box> } }) + }); + + // We absorb errors from the `client_finished` future so that an error while processing + // a client doesn't propagate and stop the entire server. + client_finished.then(move |res| { + if let Err(err) = res { + println!("error while processing client {}: {:?}", client_addr, err); + } + Ok(()) }) }); + // `future` is a future that contains all the behaviour that we want, but nothing has actually + // started yet. Because we created the `TcpConfig` with tokio, we need to run the future + // through the tokio core. core.run(future).unwrap(); }