Implement swarm

This commit is contained in:
Pierre Krieger
2018-01-03 14:19:24 +01:00
parent 2339bfd4d3
commit 641f8a09d7
5 changed files with 416 additions and 68 deletions

View File

@ -78,70 +78,55 @@ fn main() {
// successfully negotiated. The parameter is the raw socket (implements the AsyncRead
// and AsyncWrite traits), and the closure must return an implementation of
// `IntoFuture` that can yield any type of object.
Ok(length_delimited::Framed::new(socket))
Ok(length_delimited::Framed::<_, bytes::BytesMut>::new(socket))
}));
// We now have a `transport` variable that can be used either to dial nodes or listen to
// incoming connections, and that will automatically apply all the selected protocols on top
// of any opened stream.
// We use it to listen on the address.
let (listener, address) = transport
// Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and
// outgoing connections for us.
let (swarm_controller, swarm_future) = swarm::swarm(transport, |socket, client_addr| {
println!("Successfully negotiated protocol with {}", client_addr);
// We loop forever in order to handle all the messages sent by the client.
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket
.into_future()
.map_err(|(e, _)| e)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
})
});
// We now use the controller to listen on the address.
let address = swarm_controller
.listen_on(swarm::Multiaddr::new(&listen_addr).expect("invalid multiaddr"))
// If the multiaddr protocol exists but is not supported, then we get an error containing
// the transport and the original multiaddress. Therefore we cannot directly use `unwrap()`
// or `expect()`, but have to add a `map_err()` beforehand.
.map_err(|(_, addr)| addr).expect("unsupported multiaddr");
// the original multiaddress.
.expect("unsupported multiaddr");
// The address we actually listen on can be different from the address that was passed to
// the `listen_on` function. For example if you pass `/ip4/0.0.0.0/tcp/0`, then the port `0`
// will be replaced with the actual port.
println!("Now listening on {:?}", address);
let future = listener
.for_each(|(socket, client_addr)| {
// This closure is called whenever a new connection has been received.
// `socket` is a future that will be triggered once the upgrade to secio, multiplex
// and echo is complete.
let client_addr = client_addr.to_string();
println!("Incoming connection from {}", client_addr);
socket
.and_then(move |socket| {
println!("Successfully negotiated protocol with {}", client_addr);
// We loop forever in order to handle all the messages sent by the client.
loop_fn(socket, move |socket| {
let client_addr = client_addr.clone();
socket.into_future()
.map_err(|(err, _)| err)
.and_then(move |(msg, rest)| {
if let Some(msg) = msg {
// One message has been received. We send it back to the client.
println!("Received a message from {}: {:?}\n => Sending back \
identical message to remote", client_addr, msg);
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
as Box<Future<Item = _, Error = _>>
} else {
// End of stream. Connection closed. Breaking the loop.
println!("Received EOF from {}\n => Dropping connection",
client_addr);
Box::new(Ok(Loop::Break(())).into_future())
as Box<Future<Item = _, Error = _>>
}
})
})
})
// We absorb errors from the future so that an error while processing a client
// (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
// entire server.
.then(move |res| {
if let Err(err) = res {
println!("Error while processing client: {:?}", err);
}
Ok(())
})
});
// `future` is a future that contains all the behaviour that we want, but nothing has actually
// started yet. Because we created the `TcpConfig` with tokio, we need to run the future
// through the tokio core.
core.run(future).unwrap();
// `swarm_future` is a future that contains all the behaviour that we want, but nothing has
// actually started yet. Because we created the `TcpConfig` with tokio, we need to run the
// future through the tokio core.
core.run(swarm_future).unwrap();
}

View File

@ -1,8 +1,9 @@
# libp2p-swarm
Transport and protocol upgrade system of *libp2p*.
Transport, protocol upgrade and swarm systems of *libp2p*.
This crate contains all the core traits and mechanisms of the transport system of *libp2p*.
This crate contains all the core traits and mechanisms of the transport and swarm systems
of *libp2p*.
# The `Transport` trait
@ -27,11 +28,12 @@ multiple times in a row in order to chain as many implementations as you want.
The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on
transports that can receive incoming connections on streams that have been opened with `dial()`.
The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of
incoming connections.
The trait provides the `next_incoming()` method, which returns a future that will resolve to
the next substream that arrives from a dialed node.
> **Note**: This trait is mainly implemented for transports that provide stream muxing
> capabilities.
> capabilities, but it can also be implemented in a dummy way by returning an empty
> iterator.
# Connection upgrades
@ -57,7 +59,7 @@ A middleware can be applied on a transport by using the `with_upgrade` method of
`Transport` trait. The return value of this method also implements the `Transport` trait, which
means that you can call `dial()` and `listen_on()` on it in order to directly obtain an
upgraded connection or a listener that will yield upgraded connections. Similarly, the
`dial_and_listen()` method will automatically apply the upgrade on both the dialer and the
`next_incoming()` method will automatically apply the upgrade on both the dialer and the
listener. An error is produced if the remote doesn't support the protocol corresponding to the
connection upgrade.
@ -100,11 +102,11 @@ implement the `AsyncRead` and `AsyncWrite` traits. This means that that the retu
transport.
However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named
`dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`,
`dial`, `listen_on`, and `next_incoming`, which will yield you a `Future` or a `Stream`,
which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific
way to use the protocol.
```no_run
```rust
extern crate futures;
extern crate libp2p_ping;
extern crate libp2p_swarm;
@ -115,7 +117,6 @@ use futures::Future;
use libp2p_ping::Ping;
use libp2p_swarm::Transport;
# fn main() {
let mut core = tokio_core::reactor::Core::new().unwrap();
let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
@ -130,7 +131,6 @@ let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle())
// Runs until the ping arrives.
core.run(ping_finished_future).unwrap();
# }
```
## Grouping protocols
@ -138,3 +138,40 @@ core.run(ping_finished_future).unwrap();
You can use the `.or_upgrade()` method to group multiple upgrades together. The return value
also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the
ones supported.
# Swarm
Once you have created an object that implements the `Transport` trait, you can put it in a
*swarm*. This is done by calling the `swarm()` freestanding function with the transport
alongside with a function or a closure that will turn the output of the upgrade (usually an
actual protocol, as explained above) into a `Future` producing `()`.
```rust
extern crate futures;
extern crate libp2p_ping;
extern crate libp2p_swarm;
extern crate libp2p_tcp_transport;
extern crate tokio_core;
use futures::Future;
use libp2p_ping::Ping;
use libp2p_swarm::Transport;
let mut core = tokio_core::reactor::Core::new().unwrap();
let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
.with_dummy_muxing()
.with_upgrade(Ping);
let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, |(mut pinger, service), client_addr| {
pinger.ping().map_err(|_| panic!())
.select(service).map_err(|_| panic!())
.map(|_| ())
});
// The `swarm_controller` can then be used to do some operations.
swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap());
// Runs until everything is finished.
core.run(swarm_future).unwrap();
```

View File

@ -21,9 +21,10 @@
// TODO: use this once stable ; for now we just copy-paste the content of the README.md
//#![doc(include = "../README.md")]
//! Transport and protocol upgrade system of *libp2p*.
//! Transport, protocol upgrade and swarm systems of *libp2p*.
//!
//! This crate contains all the core traits and mechanisms of the transport system of *libp2p*.
//! This crate contains all the core traits and mechanisms of the transport and swarm systems
//! of *libp2p*.
//!
//! # The `Transport` trait
//!
@ -163,6 +164,44 @@
//! also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the
//! ones supported.
//!
//! # Swarm
//!
//! Once you have created an object that implements the `Transport` trait, you can put it in a
//! *swarm*. This is done by calling the `swarm()` freestanding function with the transport
//! alongside with a function or a closure that will turn the output of the upgrade (usually an
//! actual protocol, as explained above) into a `Future` producing `()`.
//!
//! ```no_run
//! extern crate futures;
//! extern crate libp2p_ping;
//! extern crate libp2p_swarm;
//! extern crate libp2p_tcp_transport;
//! extern crate tokio_core;
//!
//! use futures::Future;
//! use libp2p_ping::Ping;
//! use libp2p_swarm::Transport;
//!
//! # fn main() {
//! let mut core = tokio_core::reactor::Core::new().unwrap();
//!
//! let transport = libp2p_tcp_transport::TcpConfig::new(core.handle())
//! .with_dummy_muxing()
//! .with_upgrade(Ping);
//!
//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, |(mut pinger, service), client_addr| {
//! pinger.ping().map_err(|_| panic!())
//! .select(service).map_err(|_| panic!())
//! .map(|_| ())
//! });
//!
//! // The `swarm_controller` can then be used to do some operations.
//! swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap());
//!
//! // Runs until everything is finished.
//! core.run(swarm_future).unwrap();
//! # }
//! ```
extern crate bytes;
#[macro_use]
@ -176,11 +215,13 @@ extern crate tokio_io;
pub extern crate multiaddr;
mod connection_reuse;
pub mod swarm;
pub mod muxing;
pub mod transport;
pub use self::connection_reuse::ConnectionReuse;
pub use self::multiaddr::Multiaddr;
pub use self::muxing::StreamMuxer;
pub use self::swarm::{swarm, SwarmController, SwarmFuture};
pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade};
pub use self::transport::{Endpoint, SimpleProtocol, MuxedTransport, UpgradeExt};

226
libp2p-swarm/src/swarm.rs Normal file
View File

@ -0,0 +1,226 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use std::io::Error as IoError;
use futures::{IntoFuture, Future, Stream, Async, Poll};
use futures::sync::mpsc;
use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode};
/// Creates a swarm.
///
/// Requires an upgraded transport, and a function or closure that will turn the upgrade into a
/// `Future` that produces a `()`.
///
/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to
/// control, and the `Future` must be driven to completion in order for things to work.
///
pub fn swarm<T, C, H, F>(upgraded: UpgradedNode<T, C>, handler: H)
-> (SwarmController<T, C>, SwarmFuture<T, C, H, F::Future>)
where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
C: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
H: FnMut(C::Output, Multiaddr) -> F,
F: IntoFuture<Item = (), Error = IoError>,
{
let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded();
let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded();
let future = SwarmFuture {
upgraded: upgraded.clone(),
handler: handler,
new_listeners: new_listeners_rx,
next_incoming: upgraded.clone().next_incoming(),
listeners: Vec::new(),
listeners_upgrade: Vec::new(),
dialers: Vec::new(),
new_dialers: new_dialers_rx,
to_process: Vec::new(),
};
let controller = SwarmController {
upgraded: upgraded,
new_listeners: new_listeners_tx,
new_dialers: new_dialers_tx,
};
(controller, future)
}
/// Allows control of what the swarm is doing.
pub struct SwarmController<T, C>
where T: MuxedTransport + 'static, // TODO: 'static :-/
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
{
upgraded: UpgradedNode<T, C>,
new_listeners: mpsc::UnboundedSender<Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError>>, Multiaddr), Error = IoError>>>,
new_dialers: mpsc::UnboundedSender<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
}
impl<T, C> SwarmController<T, C>
where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
C: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
C::NamesIter: Clone, // TODO: not elegant
{
/// Asks the swarm to dial the node with the given multiaddress.
///
/// Once the connection has been open and upgraded, it will be given to the handler.
// TODO: consider returning a future so that errors can be processed?
pub fn dial(&self, multiaddr: Multiaddr) -> Result<(), Multiaddr> {
match self.upgraded.clone().dial(multiaddr.clone()) {
Ok(dial) => {
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_dialers.unbounded_send((dial, multiaddr));
Ok(())
},
Err((_, multiaddr)) => {
Err(multiaddr)
},
}
}
/// Adds a multiaddr to listen on.
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
match self.upgraded.clone().listen_on(multiaddr) {
Ok((listener, new_addr)) => {
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_listeners.unbounded_send(listener);
Ok(new_addr)
},
Err((_, multiaddr)) => {
Err(multiaddr)
},
}
}
}
/// Future that must be driven to completion in order for the swarm to work.
pub struct SwarmFuture<T, C, H, F>
where T: MuxedTransport + 'static, // TODO: 'static :-/
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :-/
{
upgraded: UpgradedNode<T, C>,
handler: H,
new_listeners: mpsc::UnboundedReceiver<Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError>>, Multiaddr), Error = IoError>>>,
next_incoming: Box<Future<Item = (C::Output, Multiaddr), Error = IoError>>,
listeners: Vec<Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError>>, Multiaddr), Error = IoError>>>,
listeners_upgrade: Vec<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
dialers: Vec<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
new_dialers: mpsc::UnboundedReceiver<(Box<Future<Item = C::Output, Error = IoError>>, Multiaddr)>,
to_process: Vec<F>,
}
impl<T, C, H, If, F> Future for SwarmFuture<T, C, H, F>
where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
C: ConnectionUpgrade<T::RawConn> + Clone + 'static, // TODO: 'static :-/
H: FnMut(C::Output, Multiaddr) -> If,
If: IntoFuture<Future = F, Item = (), Error = IoError>,
F: Future<Item = (), Error = IoError>,
{
type Item = ();
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let handler = &mut self.handler;
match self.next_incoming.poll() {
Ok(Async::Ready((connec, client_addr))) => {
self.next_incoming = self.upgraded.clone().next_incoming();
self.to_process.push(handler(connec, client_addr).into_future());
},
Ok(Async::NotReady) => {},
Err(err) => return Err(err),
};
match self.new_listeners.poll() {
Ok(Async::Ready(Some(new_listener))) => {
self.listeners.push(new_listener);
},
Ok(Async::Ready(None)) | Err(_) => {
// New listener sender has been closed.
},
Ok(Async::NotReady) => {},
};
match self.new_dialers.poll() {
Ok(Async::Ready(Some((new_dialer, multiaddr)))) => {
self.dialers.push((new_dialer, multiaddr));
},
Ok(Async::Ready(None)) | Err(_) => {
// New dialers sender has been closed.
},
Ok(Async::NotReady) => {},
};
for n in (0 .. self.listeners.len()).rev() {
let mut listener = self.listeners.swap_remove(n);
match listener.poll() {
Ok(Async::Ready(Some((upgrade, client_addr)))) => {
self.listeners.push(listener);
self.listeners_upgrade.push((upgrade, client_addr));
},
Ok(Async::NotReady) => {
self.listeners.push(listener);
},
Ok(Async::Ready(None)) => {},
Err(err) => return Err(err),
};
}
for n in (0 .. self.listeners_upgrade.len()).rev() {
let (mut upgrade, addr) = self.listeners_upgrade.swap_remove(n);
match upgrade.poll() {
Ok(Async::Ready(output)) => {
self.to_process.push(handler(output, addr).into_future());
},
Ok(Async::NotReady) => {
self.listeners_upgrade.push((upgrade, addr));
},
Err(err) => return Err(err),
}
}
for n in (0 .. self.dialers.len()).rev() {
let (mut dialer, addr) = self.dialers.swap_remove(n);
match dialer.poll() {
Ok(Async::Ready(output)) => {
self.to_process.push(handler(output, addr).into_future());
},
Ok(Async::NotReady) => {
self.dialers.push((dialer, addr));
},
Err(err) => return Err(err),
}
}
for n in (0 .. self.to_process.len()).rev() {
let mut to_process = self.to_process.swap_remove(n);
match to_process.poll() {
Ok(Async::Ready(())) => {},
Ok(Async::NotReady) => self.to_process.push(to_process),
Err(err) => return Err(err),
}
}
// TODO: we never return `Ok(Ready)` because there's no way to know whether
// `next_incoming()` can produce anything more in the future
Ok(Async::NotReady)
}
}

View File

@ -118,6 +118,18 @@ pub trait Transport {
upgrade: upgrade,
}
}
/// Builds a dummy implementation of `MuxedTransport` that uses this transport.
///
/// The resulting object will not actually use muxing. This means that dialing the same node
/// twice will result in two different connections instead of two substreams on the same
/// connection.
#[inline]
fn with_dummy_muxing(self) -> DummyMuxing<Self>
where Self: Sized
{
DummyMuxing { inner: self }
}
}
/// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which
@ -684,6 +696,53 @@ where
}
}
/// Dummy implementation of `MuxedTransport` that uses an inner `Transport`.
#[derive(Debug, Copy, Clone)]
pub struct DummyMuxing<T> {
inner: T,
}
impl<T> MuxedTransport for DummyMuxing<T>
where T: Transport
{
type Incoming = future::Empty<(T::RawConn, Multiaddr), IoError>;
fn next_incoming(self) -> Self::Incoming
where Self: Sized
{
future::empty()
}
}
impl<T> Transport for DummyMuxing<T>
where T: Transport
{
type RawConn = T::RawConn;
type Listener = T::Listener;
type ListenerUpgrade = T::ListenerUpgrade;
type Dial = T::Dial;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where
Self: Sized
{
self.inner.listen_on(addr).map_err(|(inner, addr)| {
(DummyMuxing { inner }, addr)
})
}
#[inline]
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
where
Self: Sized
{
self.inner.dial(addr).map_err(|(inner, addr)| {
(DummyMuxing { inner }, addr)
})
}
}
/// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received
/// connection.
///