diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index 4334bef0..e32f4537 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -78,70 +78,55 @@ fn main() { // successfully negotiated. The parameter is the raw socket (implements the AsyncRead // and AsyncWrite traits), and the closure must return an implementation of // `IntoFuture` that can yield any type of object. - Ok(length_delimited::Framed::new(socket)) + Ok(length_delimited::Framed::<_, bytes::BytesMut>::new(socket)) })); // We now have a `transport` variable that can be used either to dial nodes or listen to // incoming connections, and that will automatically apply all the selected protocols on top // of any opened stream. - // We use it to listen on the address. - let (listener, address) = transport + // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and + // outgoing connections for us. + let (swarm_controller, swarm_future) = swarm::swarm(transport, |socket, client_addr| { + println!("Successfully negotiated protocol with {}", client_addr); + + // We loop forever in order to handle all the messages sent by the client. + loop_fn(socket, move |socket| { + let client_addr = client_addr.clone(); + socket + .into_future() + .map_err(|(e, _)| e) + .and_then(move |(msg, rest)| { + if let Some(msg) = msg { + // One message has been received. We send it back to the client. + println!("Received a message from {}: {:?}\n => Sending back \ + identical message to remote", client_addr, msg); + Box::new(rest.send(msg).map(|m| Loop::Continue(m))) + as Box> + } else { + // End of stream. Connection closed. Breaking the loop. + println!("Received EOF from {}\n => Dropping connection", + client_addr); + Box::new(Ok(Loop::Break(())).into_future()) + as Box> + } + }) + }) + }); + + // We now use the controller to listen on the address. + let address = swarm_controller .listen_on(swarm::Multiaddr::new(&listen_addr).expect("invalid multiaddr")) // If the multiaddr protocol exists but is not supported, then we get an error containing - // the transport and the original multiaddress. Therefore we cannot directly use `unwrap()` - // or `expect()`, but have to add a `map_err()` beforehand. - .map_err(|(_, addr)| addr).expect("unsupported multiaddr"); + // the original multiaddress. + .expect("unsupported multiaddr"); + // The address we actually listen on can be different from the address that was passed to + // the `listen_on` function. For example if you pass `/ip4/0.0.0.0/tcp/0`, then the port `0` + // will be replaced with the actual port. println!("Now listening on {:?}", address); - let future = listener - .for_each(|(socket, client_addr)| { - // This closure is called whenever a new connection has been received. - // `socket` is a future that will be triggered once the upgrade to secio, multiplex - // and echo is complete. - let client_addr = client_addr.to_string(); - println!("Incoming connection from {}", client_addr); - - socket - .and_then(move |socket| { - println!("Successfully negotiated protocol with {}", client_addr); - - // We loop forever in order to handle all the messages sent by the client. - loop_fn(socket, move |socket| { - let client_addr = client_addr.clone(); - socket.into_future() - .map_err(|(err, _)| err) - .and_then(move |(msg, rest)| { - if let Some(msg) = msg { - // One message has been received. We send it back to the client. - println!("Received a message from {}: {:?}\n => Sending back \ - identical message to remote", client_addr, msg); - Box::new(rest.send(msg).map(|m| Loop::Continue(m))) - as Box> - } else { - // End of stream. Connection closed. Breaking the loop. - println!("Received EOF from {}\n => Dropping connection", - client_addr); - Box::new(Ok(Loop::Break(())).into_future()) - as Box> - } - }) - }) - }) - - // We absorb errors from the future so that an error while processing a client - // (eg. if the client unexpectedly disconnects) doesn't propagate and stop the - // entire server. - .then(move |res| { - if let Err(err) = res { - println!("Error while processing client: {:?}", err); - } - Ok(()) - }) - }); - - // `future` is a future that contains all the behaviour that we want, but nothing has actually - // started yet. Because we created the `TcpConfig` with tokio, we need to run the future - // through the tokio core. - core.run(future).unwrap(); + // `swarm_future` is a future that contains all the behaviour that we want, but nothing has + // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the + // future through the tokio core. + core.run(swarm_future).unwrap(); } diff --git a/libp2p-swarm/README.md b/libp2p-swarm/README.md index 1c6f02fc..bfe02a29 100644 --- a/libp2p-swarm/README.md +++ b/libp2p-swarm/README.md @@ -1,8 +1,9 @@ # libp2p-swarm -Transport and protocol upgrade system of *libp2p*. +Transport, protocol upgrade and swarm systems of *libp2p*. -This crate contains all the core traits and mechanisms of the transport system of *libp2p*. +This crate contains all the core traits and mechanisms of the transport and swarm systems +of *libp2p*. # The `Transport` trait @@ -27,11 +28,12 @@ multiple times in a row in order to chain as many implementations as you want. The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on transports that can receive incoming connections on streams that have been opened with `dial()`. -The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of -incoming connections. +The trait provides the `next_incoming()` method, which returns a future that will resolve to +the next substream that arrives from a dialed node. > **Note**: This trait is mainly implemented for transports that provide stream muxing -> capabilities. +> capabilities, but it can also be implemented in a dummy way by returning an empty +> iterator. # Connection upgrades @@ -57,7 +59,7 @@ A middleware can be applied on a transport by using the `with_upgrade` method of `Transport` trait. The return value of this method also implements the `Transport` trait, which means that you can call `dial()` and `listen_on()` on it in order to directly obtain an upgraded connection or a listener that will yield upgraded connections. Similarly, the -`dial_and_listen()` method will automatically apply the upgrade on both the dialer and the +`next_incoming()` method will automatically apply the upgrade on both the dialer and the listener. An error is produced if the remote doesn't support the protocol corresponding to the connection upgrade. @@ -100,11 +102,11 @@ implement the `AsyncRead` and `AsyncWrite` traits. This means that that the retu transport. However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named -`dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`, +`dial`, `listen_on`, and `next_incoming`, which will yield you a `Future` or a `Stream`, which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific way to use the protocol. -```no_run +```rust extern crate futures; extern crate libp2p_ping; extern crate libp2p_swarm; @@ -115,7 +117,6 @@ use futures::Future; use libp2p_ping::Ping; use libp2p_swarm::Transport; -# fn main() { let mut core = tokio_core::reactor::Core::new().unwrap(); let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) @@ -130,7 +131,6 @@ let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) // Runs until the ping arrives. core.run(ping_finished_future).unwrap(); -# } ``` ## Grouping protocols @@ -138,3 +138,40 @@ core.run(ping_finished_future).unwrap(); You can use the `.or_upgrade()` method to group multiple upgrades together. The return value also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the ones supported. + +# Swarm + +Once you have created an object that implements the `Transport` trait, you can put it in a +*swarm*. This is done by calling the `swarm()` freestanding function with the transport +alongside with a function or a closure that will turn the output of the upgrade (usually an +actual protocol, as explained above) into a `Future` producing `()`. + +```rust +extern crate futures; +extern crate libp2p_ping; +extern crate libp2p_swarm; +extern crate libp2p_tcp_transport; +extern crate tokio_core; + +use futures::Future; +use libp2p_ping::Ping; +use libp2p_swarm::Transport; + +let mut core = tokio_core::reactor::Core::new().unwrap(); + +let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) + .with_dummy_muxing() + .with_upgrade(Ping); + +let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, |(mut pinger, service), client_addr| { + pinger.ping().map_err(|_| panic!()) + .select(service).map_err(|_| panic!()) + .map(|_| ()) +}); + +// The `swarm_controller` can then be used to do some operations. +swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); + +// Runs until everything is finished. +core.run(swarm_future).unwrap(); +``` diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index 59e1fb6d..e96e77ea 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -21,9 +21,10 @@ // TODO: use this once stable ; for now we just copy-paste the content of the README.md //#![doc(include = "../README.md")] -//! Transport and protocol upgrade system of *libp2p*. +//! Transport, protocol upgrade and swarm systems of *libp2p*. //! -//! This crate contains all the core traits and mechanisms of the transport system of *libp2p*. +//! This crate contains all the core traits and mechanisms of the transport and swarm systems +//! of *libp2p*. //! //! # The `Transport` trait //! @@ -163,6 +164,44 @@ //! also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the //! ones supported. //! +//! # Swarm +//! +//! Once you have created an object that implements the `Transport` trait, you can put it in a +//! *swarm*. This is done by calling the `swarm()` freestanding function with the transport +//! alongside with a function or a closure that will turn the output of the upgrade (usually an +//! actual protocol, as explained above) into a `Future` producing `()`. +//! +//! ```no_run +//! extern crate futures; +//! extern crate libp2p_ping; +//! extern crate libp2p_swarm; +//! extern crate libp2p_tcp_transport; +//! extern crate tokio_core; +//! +//! use futures::Future; +//! use libp2p_ping::Ping; +//! use libp2p_swarm::Transport; +//! +//! # fn main() { +//! let mut core = tokio_core::reactor::Core::new().unwrap(); +//! +//! let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) +//! .with_dummy_muxing() +//! .with_upgrade(Ping); +//! +//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, |(mut pinger, service), client_addr| { +//! pinger.ping().map_err(|_| panic!()) +//! .select(service).map_err(|_| panic!()) +//! .map(|_| ()) +//! }); +//! +//! // The `swarm_controller` can then be used to do some operations. +//! swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); +//! +//! // Runs until everything is finished. +//! core.run(swarm_future).unwrap(); +//! # } +//! ``` extern crate bytes; #[macro_use] @@ -176,11 +215,13 @@ extern crate tokio_io; pub extern crate multiaddr; mod connection_reuse; +pub mod swarm; pub mod muxing; pub mod transport; pub use self::connection_reuse::ConnectionReuse; pub use self::multiaddr::Multiaddr; pub use self::muxing::StreamMuxer; +pub use self::swarm::{swarm, SwarmController, SwarmFuture}; pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade}; pub use self::transport::{Endpoint, SimpleProtocol, MuxedTransport, UpgradeExt}; diff --git a/libp2p-swarm/src/swarm.rs b/libp2p-swarm/src/swarm.rs new file mode 100644 index 00000000..95d9bb34 --- /dev/null +++ b/libp2p-swarm/src/swarm.rs @@ -0,0 +1,226 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::io::Error as IoError; +use futures::{IntoFuture, Future, Stream, Async, Poll}; +use futures::sync::mpsc; +use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode}; + +/// Creates a swarm. +/// +/// Requires an upgraded transport, and a function or closure that will turn the upgrade into a +/// `Future` that produces a `()`. +/// +/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to +/// control, and the `Future` must be driven to completion in order for things to work. +/// +pub fn swarm(upgraded: UpgradedNode, handler: H) + -> (SwarmController, SwarmFuture) + where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ + C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ + H: FnMut(C::Output, Multiaddr) -> F, + F: IntoFuture, +{ + let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded(); + let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded(); + + let future = SwarmFuture { + upgraded: upgraded.clone(), + handler: handler, + new_listeners: new_listeners_rx, + next_incoming: upgraded.clone().next_incoming(), + listeners: Vec::new(), + listeners_upgrade: Vec::new(), + dialers: Vec::new(), + new_dialers: new_dialers_rx, + to_process: Vec::new(), + }; + + let controller = SwarmController { + upgraded: upgraded, + new_listeners: new_listeners_tx, + new_dialers: new_dialers_tx, + }; + + (controller, future) +} + +/// Allows control of what the swarm is doing. +pub struct SwarmController + where T: MuxedTransport + 'static, // TODO: 'static :-/ + C: ConnectionUpgrade + 'static, // TODO: 'static :-/ +{ + upgraded: UpgradedNode, + new_listeners: mpsc::UnboundedSender>, Multiaddr), Error = IoError>>>, + new_dialers: mpsc::UnboundedSender<(Box>, Multiaddr)>, +} + +impl SwarmController + where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ + C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ + C::NamesIter: Clone, // TODO: not elegant +{ + /// Asks the swarm to dial the node with the given multiaddress. + /// + /// Once the connection has been open and upgraded, it will be given to the handler. + // TODO: consider returning a future so that errors can be processed? + pub fn dial(&self, multiaddr: Multiaddr) -> Result<(), Multiaddr> { + match self.upgraded.clone().dial(multiaddr.clone()) { + Ok(dial) => { + // Ignoring errors if the receiver has been closed, because in that situation + // nothing is going to be processed anyway. + let _ = self.new_dialers.unbounded_send((dial, multiaddr)); + Ok(()) + }, + Err((_, multiaddr)) => { + Err(multiaddr) + }, + } + } + + /// Adds a multiaddr to listen on. + pub fn listen_on(&self, multiaddr: Multiaddr) -> Result { + match self.upgraded.clone().listen_on(multiaddr) { + Ok((listener, new_addr)) => { + // Ignoring errors if the receiver has been closed, because in that situation + // nothing is going to be processed anyway. + let _ = self.new_listeners.unbounded_send(listener); + Ok(new_addr) + }, + Err((_, multiaddr)) => { + Err(multiaddr) + }, + } + } +} + +/// Future that must be driven to completion in order for the swarm to work. +pub struct SwarmFuture + where T: MuxedTransport + 'static, // TODO: 'static :-/ + C: ConnectionUpgrade + 'static, // TODO: 'static :-/ +{ + upgraded: UpgradedNode, + handler: H, + new_listeners: mpsc::UnboundedReceiver>, Multiaddr), Error = IoError>>>, + next_incoming: Box>, + listeners: Vec>, Multiaddr), Error = IoError>>>, + listeners_upgrade: Vec<(Box>, Multiaddr)>, + dialers: Vec<(Box>, Multiaddr)>, + new_dialers: mpsc::UnboundedReceiver<(Box>, Multiaddr)>, + to_process: Vec, +} + +impl Future for SwarmFuture + where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/, + C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ + H: FnMut(C::Output, Multiaddr) -> If, + If: IntoFuture, + F: Future, +{ + type Item = (); + type Error = IoError; + + fn poll(&mut self) -> Poll { + let handler = &mut self.handler; + + match self.next_incoming.poll() { + Ok(Async::Ready((connec, client_addr))) => { + self.next_incoming = self.upgraded.clone().next_incoming(); + self.to_process.push(handler(connec, client_addr).into_future()); + }, + Ok(Async::NotReady) => {}, + Err(err) => return Err(err), + }; + + match self.new_listeners.poll() { + Ok(Async::Ready(Some(new_listener))) => { + self.listeners.push(new_listener); + }, + Ok(Async::Ready(None)) | Err(_) => { + // New listener sender has been closed. + }, + Ok(Async::NotReady) => {}, + }; + + match self.new_dialers.poll() { + Ok(Async::Ready(Some((new_dialer, multiaddr)))) => { + self.dialers.push((new_dialer, multiaddr)); + }, + Ok(Async::Ready(None)) | Err(_) => { + // New dialers sender has been closed. + }, + Ok(Async::NotReady) => {}, + }; + + for n in (0 .. self.listeners.len()).rev() { + let mut listener = self.listeners.swap_remove(n); + match listener.poll() { + Ok(Async::Ready(Some((upgrade, client_addr)))) => { + self.listeners.push(listener); + self.listeners_upgrade.push((upgrade, client_addr)); + }, + Ok(Async::NotReady) => { + self.listeners.push(listener); + }, + Ok(Async::Ready(None)) => {}, + Err(err) => return Err(err), + }; + } + + for n in (0 .. self.listeners_upgrade.len()).rev() { + let (mut upgrade, addr) = self.listeners_upgrade.swap_remove(n); + match upgrade.poll() { + Ok(Async::Ready(output)) => { + self.to_process.push(handler(output, addr).into_future()); + }, + Ok(Async::NotReady) => { + self.listeners_upgrade.push((upgrade, addr)); + }, + Err(err) => return Err(err), + } + } + + for n in (0 .. self.dialers.len()).rev() { + let (mut dialer, addr) = self.dialers.swap_remove(n); + match dialer.poll() { + Ok(Async::Ready(output)) => { + self.to_process.push(handler(output, addr).into_future()); + }, + Ok(Async::NotReady) => { + self.dialers.push((dialer, addr)); + }, + Err(err) => return Err(err), + } + } + + for n in (0 .. self.to_process.len()).rev() { + let mut to_process = self.to_process.swap_remove(n); + match to_process.poll() { + Ok(Async::Ready(())) => {}, + Ok(Async::NotReady) => self.to_process.push(to_process), + Err(err) => return Err(err), + } + } + + // TODO: we never return `Ok(Ready)` because there's no way to know whether + // `next_incoming()` can produce anything more in the future + Ok(Async::NotReady) + } +} diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 2e959eb8..8099f988 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -118,6 +118,18 @@ pub trait Transport { upgrade: upgrade, } } + + /// Builds a dummy implementation of `MuxedTransport` that uses this transport. + /// + /// The resulting object will not actually use muxing. This means that dialing the same node + /// twice will result in two different connections instead of two substreams on the same + /// connection. + #[inline] + fn with_dummy_muxing(self) -> DummyMuxing + where Self: Sized + { + DummyMuxing { inner: self } + } } /// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which @@ -684,6 +696,53 @@ where } } +/// Dummy implementation of `MuxedTransport` that uses an inner `Transport`. +#[derive(Debug, Copy, Clone)] +pub struct DummyMuxing { + inner: T, +} + +impl MuxedTransport for DummyMuxing + where T: Transport +{ + type Incoming = future::Empty<(T::RawConn, Multiaddr), IoError>; + + fn next_incoming(self) -> Self::Incoming + where Self: Sized + { + future::empty() + } +} + +impl Transport for DummyMuxing + where T: Transport +{ + type RawConn = T::RawConn; + type Listener = T::Listener; + type ListenerUpgrade = T::ListenerUpgrade; + type Dial = T::Dial; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> + where + Self: Sized + { + self.inner.listen_on(addr).map_err(|(inner, addr)| { + (DummyMuxing { inner }, addr) + }) + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result + where + Self: Sized + { + self.inner.dial(addr).map_err(|(inner, addr)| { + (DummyMuxing { inner }, addr) + }) + } +} + /// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received /// connection. ///