rust-libp2p/core/src/swarm.rs

364 lines
14 KiB
Rust
Raw Normal View History

2018-01-03 14:19:24 +01:00
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use futures::stream::{FuturesUnordered, StreamFuture};
use futures::sync::{mpsc, oneshot};
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
use std::fmt;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use {Multiaddr, MuxedTransport, Transport};
2018-01-03 14:19:24 +01:00
/// Creates a swarm.
///
/// Requires an upgraded transport, and a function or closure that will turn the upgrade into a
/// `Future` that produces a `()`.
///
/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to
/// control, and the `Future` must be driven to completion in order for things to work.
///
pub fn swarm<T, H, F>(
transport: T,
handler: H,
) -> (SwarmController<T>, SwarmFuture<T, H, F::Future>)
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> F,
F: IntoFuture<Item = (), Error = IoError>,
2018-01-03 14:19:24 +01:00
{
let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded();
let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded();
let (new_toprocess_tx, new_toprocess_rx) = mpsc::unbounded();
2018-01-03 14:19:24 +01:00
let future = SwarmFuture {
transport: transport.clone(),
2018-01-03 14:19:24 +01:00
handler: handler,
new_listeners: new_listeners_rx,
next_incoming: transport.clone().next_incoming(),
listeners: FuturesUnordered::new(),
listeners_upgrade: FuturesUnordered::new(),
dialers: FuturesUnordered::new(),
2018-01-03 14:19:24 +01:00
new_dialers: new_dialers_rx,
to_process: FuturesUnordered::new(),
new_toprocess: new_toprocess_rx,
2018-01-03 14:19:24 +01:00
};
let controller = SwarmController {
transport: transport,
2018-01-03 14:19:24 +01:00
new_listeners: new_listeners_tx,
new_dialers: new_dialers_tx,
new_toprocess: new_toprocess_tx,
2018-01-03 14:19:24 +01:00
};
(controller, future)
}
/// Allows control of what the swarm is doing.
pub struct SwarmController<T>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
2018-01-03 14:19:24 +01:00
{
transport: T,
new_listeners: mpsc::UnboundedSender<T::Listener>,
new_dialers: mpsc::UnboundedSender<Box<Future<Item = (T::Output, oneshot::Sender<Result<(), IoError>>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = ()>>>,
new_toprocess: mpsc::UnboundedSender<Box<Future<Item = (), Error = IoError>>>,
2018-01-03 14:19:24 +01:00
}
impl<T> fmt::Debug for SwarmController<T>
Implement Kademlia peer discovery (#120) * Impl Clone for SwarmController and remove 'static * Implement Kademlia * Implement ConnectionReuse correctly * Implement ConnectionReuse correctly * Add some tests and fixes * Remove useless boolean in active_connections * Correctly run tests * Optimize the processing * Rustfmt on libp2p-kad * Improve Kademlia example * Next incoming is now in two steps * Some work * Remove log * Fix dialing a node even if we already have a connection * Add a proper PeerId to Peerstore * Turn identify into a transport layer * Expose the dialed multiaddress * Add identified nodes to the peerstore * Allow configuring the TTL of the addresses * Split identify in two modules * Some comments and tweaks * Run rustfmt * More work * Add test and bugfix * Fix everything * Start transition to new identify system * More work * Minor style * Start implementation of Kademlia server upgrade * Continue implementing the Kademlia server * Start reimplementing high-level kademlia code * Continue reimplementing high-level code * More work * More work * More work * Fix wrong address reported when dialing * Make it work * Remove cluster_level field everywhere * Fix bug in varint-rs when encoding * More work * More work * More work * More work * More work * Bugfix * More work * Implement ping * Style in kademlia_handler * More work * Better error handling in query.rs * More work * More work * More work * Debug impls * Some cleanup in swarm * More work * Clean up changes in swarm * Unpublish the kbucket module * Fix examples and some warnings * Fix websocket browser code * Rustfmt on libp2p-kad * Kad initialization process * Add logging to the example * Fix concerns * Fix style
2018-03-15 15:18:21 +01:00
where
T: fmt::Debug + MuxedTransport + 'static, // TODO: 'static :-/
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_tuple("SwarmController")
.field(&self.transport)
Implement Kademlia peer discovery (#120) * Impl Clone for SwarmController and remove 'static * Implement Kademlia * Implement ConnectionReuse correctly * Implement ConnectionReuse correctly * Add some tests and fixes * Remove useless boolean in active_connections * Correctly run tests * Optimize the processing * Rustfmt on libp2p-kad * Improve Kademlia example * Next incoming is now in two steps * Some work * Remove log * Fix dialing a node even if we already have a connection * Add a proper PeerId to Peerstore * Turn identify into a transport layer * Expose the dialed multiaddress * Add identified nodes to the peerstore * Allow configuring the TTL of the addresses * Split identify in two modules * Some comments and tweaks * Run rustfmt * More work * Add test and bugfix * Fix everything * Start transition to new identify system * More work * Minor style * Start implementation of Kademlia server upgrade * Continue implementing the Kademlia server * Start reimplementing high-level kademlia code * Continue reimplementing high-level code * More work * More work * More work * Fix wrong address reported when dialing * Make it work * Remove cluster_level field everywhere * Fix bug in varint-rs when encoding * More work * More work * More work * More work * More work * Bugfix * More work * Implement ping * Style in kademlia_handler * More work * Better error handling in query.rs * More work * More work * More work * Debug impls * Some cleanup in swarm * More work * Clean up changes in swarm * Unpublish the kbucket module * Fix examples and some warnings * Fix websocket browser code * Rustfmt on libp2p-kad * Kad initialization process * Add logging to the example * Fix concerns * Fix style
2018-03-15 15:18:21 +01:00
.finish()
}
}
impl<T> Clone for SwarmController<T>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
{
fn clone(&self) -> SwarmController<T> {
SwarmController {
transport: self.transport.clone(),
new_listeners: self.new_listeners.clone(),
new_dialers: self.new_dialers.clone(),
new_toprocess: self.new_toprocess.clone(),
}
}
}
impl<T> SwarmController<T>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
2018-01-03 14:19:24 +01:00
{
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
/// upgraded using the `upgrade`, and the output is sent to the handler that was passed when
/// calling `swarm`.
///
/// Returns a future that is signalled once the closure in the `swarm` has returned its future.
/// Therefore if the closure in the swarm has some side effect (eg. write something in a
/// variable), this side effect will be observable when this future succeeds.
pub fn dial<Du>(&self, multiaddr: Multiaddr, transport: Du)
-> Result<impl Future<Item = (), Error = IoError>, Multiaddr>
where
Du: Transport + 'static, // TODO: 'static :-/
Du::Output: Into<T::Output>,
{
trace!("Swarm dialing {}", multiaddr);
match transport.dial(multiaddr.clone()) {
2018-01-03 14:19:24 +01:00
Ok(dial) => {
let (tx, rx) = oneshot::channel();
let dial = dial.then(|result| {
match result {
Ok((output, client_addr)) => {
let client_addr = Box::new(client_addr) as Box<Future<Item = _, Error = _>>;
Ok((output.into(), tx, client_addr))
}
Err(err) => {
debug!("Error in dialer upgrade: {:?}", err);
let _ = tx.send(Err(err));
Err(())
}
}
});
2018-01-03 14:19:24 +01:00
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_dialers.unbounded_send(Box::new(dial) as Box<_>);
Ok(rx.then(|result| {
match result {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(err),
Err(_) => Err(IoError::new(IoErrorKind::ConnectionAborted,
"dial cancelled the swarm future has been destroyed")),
}
}))
}
Err((_, multiaddr)) => Err(multiaddr),
2018-01-03 14:19:24 +01:00
}
}
/// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that
/// was passed to `swarm`.
2018-01-03 14:19:24 +01:00
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
match self.transport.clone().listen_on(multiaddr) {
2018-01-03 14:19:24 +01:00
Ok((listener, new_addr)) => {
trace!("Swarm listening on {}", new_addr);
2018-01-03 14:19:24 +01:00
// Ignoring errors if the receiver has been closed, because in that situation
// nothing is going to be processed anyway.
let _ = self.new_listeners.unbounded_send(listener);
Ok(new_addr)
}
Err((_, multiaddr)) => Err(multiaddr),
2018-01-03 14:19:24 +01:00
}
}
}
/// Future that must be driven to completion in order for the swarm to work.
pub struct SwarmFuture<T, H, F>
where
T: MuxedTransport + 'static, // TODO: 'static :-/
2018-01-03 14:19:24 +01:00
{
transport: T,
2018-01-03 14:19:24 +01:00
handler: H,
new_listeners: mpsc::UnboundedReceiver<T::Listener>,
next_incoming: T::Incoming,
listeners: FuturesUnordered<
StreamFuture<
Box<
Stream<
Item = Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>,
Error = IoError,
>,
>,
>,
>,
listeners_upgrade:
FuturesUnordered<Box<Future<Item = (T::Output, Box<Future<Item = Multiaddr, Error = IoError>>), Error = IoError>>>,
dialers: FuturesUnordered<Box<Future<Item = (T::Output, oneshot::Sender<Result<(), IoError>>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = ()>>>,
new_dialers:
mpsc::UnboundedReceiver<Box<Future<Item = (T::Output, oneshot::Sender<Result<(), IoError>>, Box<Future<Item = Multiaddr, Error = IoError>>), Error = ()>>>,
to_process: FuturesUnordered<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
2018-01-03 14:19:24 +01:00
}
impl<T, H, If, F> Future for SwarmFuture<T, H, F>
where
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
H: FnMut(T::Output, Box<Future<Item = Multiaddr, Error = IoError>>) -> If,
If: IntoFuture<Future = F, Item = (), Error = IoError>,
F: Future<Item = (), Error = IoError>,
2018-01-03 14:19:24 +01:00
{
type Item = ();
type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let handler = &mut self.handler;
loop {
match self.next_incoming.poll() {
Ok(Async::Ready(connec)) => {
debug!("Swarm received new multiplexed incoming connection");
self.next_incoming = self.transport.clone().next_incoming();
let connec = connec.map(|(out, maf)| {
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
});
self.listeners_upgrade.push(Box::new(connec) as Box<_>);
}
Ok(Async::NotReady) => break,
Err(err) => {
debug!("Error in multiplexed incoming connection: {:?}", err);
self.next_incoming = self.transport.clone().next_incoming();
break;
}
}
2018-06-22 11:02:47 +02:00
}
2018-01-03 14:19:24 +01:00
2018-06-22 11:02:47 +02:00
loop {
match self.new_listeners.poll() {
Ok(Async::Ready(Some(new_listener))) => {
let new_listener = Box::new(
new_listener.map(|f| {
let f = f.map(|(out, maf)| {
(out, Box::new(maf) as Box<Future<Item = Multiaddr, Error = IoError>>)
});
2018-06-22 11:02:47 +02:00
Box::new(f) as Box<Future<Item = _, Error = _>>
}),
) as Box<Stream<Item = _, Error = _>>;
self.listeners.push(new_listener.into_future());
}
Ok(Async::Ready(None)) | Err(_) => {
// New listener sender has been closed.
break;
}
Ok(Async::NotReady) => break,
}
2018-06-22 11:02:47 +02:00
}
2018-01-03 14:19:24 +01:00
2018-06-22 11:02:47 +02:00
loop {
match self.new_dialers.poll() {
Ok(Async::Ready(Some(new_dialer))) => {
self.dialers.push(new_dialer);
}
Ok(Async::Ready(None)) | Err(_) => {
// New dialers sender has been closed.
break
}
Ok(Async::NotReady) => break,
}
2018-06-22 11:02:47 +02:00
}
2018-01-03 14:19:24 +01:00
2018-06-22 11:02:47 +02:00
loop {
match self.new_toprocess.poll() {
Ok(Async::Ready(Some(new_toprocess))) => {
self.to_process.push(future::Either::B(new_toprocess));
}
Ok(Async::Ready(None)) | Err(_) => {
// New to-process sender has been closed.
break
}
Ok(Async::NotReady) => break,
}
2018-06-22 11:02:47 +02:00
}
loop {
match self.listeners.poll() {
Ok(Async::Ready(Some((Some(upgrade), remaining)))) => {
trace!("Swarm received new connection on listener socket");
self.listeners_upgrade.push(upgrade);
self.listeners.push(remaining.into_future());
}
Err((err, _)) => {
2018-06-04 15:53:58 +02:00
debug!("Error in listener: {:?}", err);
break
}
_ => break
}
2018-01-03 14:19:24 +01:00
}
2018-06-22 11:02:47 +02:00
loop {
match self.listeners_upgrade.poll() {
Ok(Async::Ready(Some((output, client_addr)))) => {
debug!("Successfully upgraded incoming connection");
self.to_process.push(future::Either::A(
handler(output, client_addr).into_future(),
));
}
Err(err) => {
debug!("Error in listener upgrade: {:?}", err);
break;
}
_ => break
}
2018-01-03 14:19:24 +01:00
}
2018-06-22 11:02:47 +02:00
loop {
match self.dialers.poll() {
Ok(Async::Ready(Some((output, notifier, addr)))) => {
2018-06-22 11:02:47 +02:00
trace!("Successfully upgraded dialed connection");
self.to_process
.push(future::Either::A(handler(output, addr).into_future()));
let _ = notifier.send(Ok(()));
2018-06-22 11:02:47 +02:00
}
Err(()) => break,
_ => break,
2018-01-03 14:19:24 +01:00
}
}
2018-06-22 11:02:47 +02:00
loop {
match self.to_process.poll() {
Ok(Async::Ready(Some(()))) => {
trace!("Future returned by swarm handler driven to completion");
}
Err(err) => {
debug!("Error in processing: {:?}", err);
}
_ => break,
2018-01-03 14:19:24 +01:00
}
}
// TODO: we never return `Ok(Ready)` because there's no way to know whether
// `next_incoming()` can produce anything more in the future
Ok(Async::NotReady)
}
}
2018-07-14 13:31:22 +02:00
#[cfg(test)]
mod tests {
use futures::future;
use transport::DeniedTransport;
use swarm;
#[test]
fn transport_error_propagation_listen() {
let (swarm_ctrl, _swarm_future) = swarm(DeniedTransport, |_, _| future::empty());
assert!(swarm_ctrl.listen_on("/ip4/127.0.0.1/tcp/10000".parse().unwrap()).is_err());
}
#[test]
fn transport_error_propagation_dial() {
let (swarm_ctrl, _swarm_future) = swarm(DeniedTransport, |_, _| future::empty());
let addr = "/ip4/127.0.0.1/tcp/10000".parse().unwrap();
assert!(swarm_ctrl.dial(addr, DeniedTransport).is_err());
}
}