2018-01-03 14:19:24 +01:00
|
|
|
// Copyright 2018 Parity Technologies (UK) Ltd.
|
|
|
|
//
|
|
|
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
|
|
// copy of this software and associated documentation files (the "Software"),
|
|
|
|
// to deal in the Software without restriction, including without limitation
|
|
|
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
|
|
// and/or sell copies of the Software, and to permit persons to whom the
|
|
|
|
// Software is furnished to do so, subject to the following conditions:
|
|
|
|
//
|
|
|
|
// The above copyright notice and this permission notice shall be included in
|
|
|
|
// all copies or substantial portions of the Software.
|
|
|
|
//
|
|
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
|
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
|
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
|
|
// DEALINGS IN THE SOFTWARE.
|
|
|
|
|
2018-04-16 18:55:16 +09:00
|
|
|
use futures::stream::{FuturesUnordered, StreamFuture};
|
2018-01-03 14:19:24 +01:00
|
|
|
use futures::sync::mpsc;
|
2018-05-14 15:55:16 +02:00
|
|
|
use futures::{future, Async, Future, IntoFuture, Poll, Stream};
|
|
|
|
use std::fmt;
|
|
|
|
use std::io::Error as IoError;
|
|
|
|
use {Multiaddr, MuxedTransport, Transport};
|
2018-01-03 14:19:24 +01:00
|
|
|
|
|
|
|
/// Creates a swarm.
|
|
|
|
///
|
|
|
|
/// Requires an upgraded transport, and a function or closure that will turn the upgrade into a
|
|
|
|
/// `Future` that produces a `()`.
|
|
|
|
///
|
|
|
|
/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to
|
|
|
|
/// control, and the `Future` must be driven to completion in order for things to work.
|
|
|
|
///
|
2018-05-14 15:55:16 +02:00
|
|
|
pub fn swarm<T, H, F>(
|
2018-03-07 16:20:55 +01:00
|
|
|
transport: T,
|
|
|
|
handler: H,
|
2018-05-14 15:55:16 +02:00
|
|
|
) -> (SwarmController<T>, SwarmFuture<T, H, F::Future>)
|
2018-03-07 16:20:55 +01:00
|
|
|
where
|
|
|
|
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
|
2018-05-14 15:55:16 +02:00
|
|
|
H: FnMut(T::Output, Multiaddr) -> F,
|
2018-03-07 16:20:55 +01:00
|
|
|
F: IntoFuture<Item = (), Error = IoError>,
|
2018-01-03 14:19:24 +01:00
|
|
|
{
|
|
|
|
let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded();
|
|
|
|
let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded();
|
2018-01-03 15:46:45 +01:00
|
|
|
let (new_toprocess_tx, new_toprocess_rx) = mpsc::unbounded();
|
|
|
|
|
2018-01-03 14:19:24 +01:00
|
|
|
let future = SwarmFuture {
|
2018-05-14 15:55:16 +02:00
|
|
|
transport: transport.clone(),
|
2018-01-03 14:19:24 +01:00
|
|
|
handler: handler,
|
|
|
|
new_listeners: new_listeners_rx,
|
2018-05-14 15:55:16 +02:00
|
|
|
next_incoming: transport.clone().next_incoming(),
|
2018-04-16 18:55:16 +09:00
|
|
|
listeners: FuturesUnordered::new(),
|
|
|
|
listeners_upgrade: FuturesUnordered::new(),
|
|
|
|
dialers: FuturesUnordered::new(),
|
2018-01-03 14:19:24 +01:00
|
|
|
new_dialers: new_dialers_rx,
|
2018-04-16 18:55:16 +09:00
|
|
|
to_process: FuturesUnordered::new(),
|
2018-01-03 15:46:45 +01:00
|
|
|
new_toprocess: new_toprocess_rx,
|
2018-01-03 14:19:24 +01:00
|
|
|
};
|
|
|
|
|
|
|
|
let controller = SwarmController {
|
2018-01-03 15:46:45 +01:00
|
|
|
transport: transport,
|
2018-01-03 14:19:24 +01:00
|
|
|
new_listeners: new_listeners_tx,
|
|
|
|
new_dialers: new_dialers_tx,
|
2018-01-03 15:46:45 +01:00
|
|
|
new_toprocess: new_toprocess_tx,
|
2018-01-03 14:19:24 +01:00
|
|
|
};
|
|
|
|
|
|
|
|
(controller, future)
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Allows control of what the swarm is doing.
|
2018-05-14 15:55:16 +02:00
|
|
|
pub struct SwarmController<T>
|
2018-03-07 16:20:55 +01:00
|
|
|
where
|
2018-05-14 15:55:16 +02:00
|
|
|
T: MuxedTransport + 'static, // TODO: 'static :-/
|
2018-01-03 14:19:24 +01:00
|
|
|
{
|
2018-01-03 15:46:45 +01:00
|
|
|
transport: T,
|
2018-05-14 15:55:16 +02:00
|
|
|
new_listeners: mpsc::UnboundedSender<T::Listener>,
|
|
|
|
new_dialers: mpsc::UnboundedSender<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
|
2018-01-03 15:46:45 +01:00
|
|
|
new_toprocess: mpsc::UnboundedSender<Box<Future<Item = (), Error = IoError>>>,
|
2018-01-03 14:19:24 +01:00
|
|
|
}
|
|
|
|
|
2018-05-14 15:55:16 +02:00
|
|
|
impl<T> fmt::Debug for SwarmController<T>
|
2018-03-15 15:18:21 +01:00
|
|
|
where
|
|
|
|
T: fmt::Debug + MuxedTransport + 'static, // TODO: 'static :-/
|
|
|
|
{
|
|
|
|
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
|
|
|
|
fmt.debug_tuple("SwarmController")
|
2018-05-14 15:55:16 +02:00
|
|
|
.field(&self.transport)
|
2018-03-15 15:18:21 +01:00
|
|
|
.finish()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-14 15:55:16 +02:00
|
|
|
impl<T> Clone for SwarmController<T>
|
2018-03-15 11:58:11 +01:00
|
|
|
where
|
|
|
|
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
|
|
|
|
{
|
2018-05-14 15:55:16 +02:00
|
|
|
fn clone(&self) -> SwarmController<T> {
|
2018-03-15 11:58:11 +01:00
|
|
|
SwarmController {
|
|
|
|
transport: self.transport.clone(),
|
|
|
|
new_listeners: self.new_listeners.clone(),
|
|
|
|
new_dialers: self.new_dialers.clone(),
|
|
|
|
new_toprocess: self.new_toprocess.clone(),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-05-14 15:55:16 +02:00
|
|
|
impl<T> SwarmController<T>
|
2018-03-07 16:20:55 +01:00
|
|
|
where
|
|
|
|
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/
|
2018-01-03 14:19:24 +01:00
|
|
|
{
|
2018-01-03 15:46:45 +01:00
|
|
|
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
|
|
|
|
/// upgraded using the `upgrade`, and the output is sent to the handler that was passed when
|
|
|
|
/// calling `swarm`.
|
2018-01-03 14:19:24 +01:00
|
|
|
// TODO: consider returning a future so that errors can be processed?
|
2018-05-14 15:55:16 +02:00
|
|
|
pub fn dial_to_handler<Du>(&self, multiaddr: Multiaddr, transport: Du) -> Result<(), Multiaddr>
|
2018-03-07 16:20:55 +01:00
|
|
|
where
|
2018-05-14 15:55:16 +02:00
|
|
|
Du: Transport + 'static, // TODO: 'static :-/
|
|
|
|
Du::Output: Into<T::Output>,
|
2018-01-03 15:46:45 +01:00
|
|
|
{
|
2018-05-17 15:14:13 +02:00
|
|
|
trace!("Swarm dialing {}", multiaddr);
|
2018-03-16 16:39:02 +01:00
|
|
|
|
2018-05-14 15:55:16 +02:00
|
|
|
match transport.dial(multiaddr.clone()) {
|
2018-01-03 14:19:24 +01:00
|
|
|
Ok(dial) => {
|
2018-05-14 15:55:16 +02:00
|
|
|
let dial = Box::new(
|
2018-05-17 13:09:22 +02:00
|
|
|
dial.map(|(d, client_addr)| (d.into(), client_addr)),
|
2018-05-14 15:55:16 +02:00
|
|
|
) as Box<Future<Item = _, Error = _>>;
|
2018-01-03 14:19:24 +01:00
|
|
|
// Ignoring errors if the receiver has been closed, because in that situation
|
|
|
|
// nothing is going to be processed anyway.
|
2018-03-07 10:49:11 +01:00
|
|
|
let _ = self.new_dialers.unbounded_send(dial);
|
2018-01-03 14:19:24 +01:00
|
|
|
Ok(())
|
2018-03-07 16:20:55 +01:00
|
|
|
}
|
|
|
|
Err((_, multiaddr)) => Err(multiaddr),
|
2018-01-03 14:19:24 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-01-03 15:46:45 +01:00
|
|
|
/// Asks the swarm to dial the node with the given multiaddress. The connection is then
|
|
|
|
/// upgraded using the `upgrade`, and the output is then passed to `and_then`.
|
|
|
|
///
|
|
|
|
/// Contrary to `dial_to_handler`, the output of the upgrade is not given to the handler that
|
|
|
|
/// was passed at initialization.
|
|
|
|
// TODO: consider returning a future so that errors can be processed?
|
2018-03-07 16:20:55 +01:00
|
|
|
pub fn dial_custom_handler<Du, Df, Dfu>(
|
|
|
|
&self,
|
|
|
|
multiaddr: Multiaddr,
|
2018-05-14 15:55:16 +02:00
|
|
|
transport: Du,
|
2018-03-07 16:20:55 +01:00
|
|
|
and_then: Df,
|
|
|
|
) -> Result<(), Multiaddr>
|
|
|
|
where
|
2018-05-14 15:55:16 +02:00
|
|
|
Du: Transport + 'static, // TODO: 'static :-/
|
|
|
|
Df: FnOnce(Du::Output, Multiaddr) -> Dfu + 'static, // TODO: 'static :-/
|
2018-03-07 16:20:55 +01:00
|
|
|
Dfu: IntoFuture<Item = (), Error = IoError> + 'static, // TODO: 'static :-/
|
2018-01-03 15:46:45 +01:00
|
|
|
{
|
2018-05-17 15:14:13 +02:00
|
|
|
trace!("Swarm dialing {} with custom handler", multiaddr);
|
2018-03-16 16:39:02 +01:00
|
|
|
|
2018-05-14 15:55:16 +02:00
|
|
|
match transport.dial(multiaddr) {
|
2018-01-03 15:46:45 +01:00
|
|
|
Ok(dial) => {
|
2018-05-17 13:09:22 +02:00
|
|
|
let dial = Box::new(dial.and_then(|(d, m)| and_then(d, m))) as Box<_>;
|
2018-01-03 15:46:45 +01:00
|
|
|
// Ignoring errors if the receiver has been closed, because in that situation
|
|
|
|
// nothing is going to be processed anyway.
|
|
|
|
let _ = self.new_toprocess.unbounded_send(dial);
|
|
|
|
Ok(())
|
2018-03-07 16:20:55 +01:00
|
|
|
}
|
|
|
|
Err((_, multiaddr)) => Err(multiaddr),
|
2018-01-03 15:46:45 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that
|
|
|
|
/// was passed to `swarm`.
|
2018-01-03 14:19:24 +01:00
|
|
|
pub fn listen_on(&self, multiaddr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
|
2018-05-14 15:55:16 +02:00
|
|
|
match self.transport.clone().listen_on(multiaddr) {
|
2018-01-03 14:19:24 +01:00
|
|
|
Ok((listener, new_addr)) => {
|
2018-05-17 15:14:13 +02:00
|
|
|
trace!("Swarm listening on {}", new_addr);
|
2018-01-03 14:19:24 +01:00
|
|
|
// Ignoring errors if the receiver has been closed, because in that situation
|
|
|
|
// nothing is going to be processed anyway.
|
|
|
|
let _ = self.new_listeners.unbounded_send(listener);
|
|
|
|
Ok(new_addr)
|
2018-03-07 16:20:55 +01:00
|
|
|
}
|
|
|
|
Err((_, multiaddr)) => Err(multiaddr),
|
2018-01-03 14:19:24 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Future that must be driven to completion in order for the swarm to work.
|
2018-05-14 15:55:16 +02:00
|
|
|
pub struct SwarmFuture<T, H, F>
|
2018-03-07 16:20:55 +01:00
|
|
|
where
|
2018-05-14 15:55:16 +02:00
|
|
|
T: MuxedTransport + 'static, // TODO: 'static :-/
|
2018-01-03 14:19:24 +01:00
|
|
|
{
|
2018-05-14 15:55:16 +02:00
|
|
|
transport: T,
|
2018-01-03 14:19:24 +01:00
|
|
|
handler: H,
|
2018-05-14 15:55:16 +02:00
|
|
|
new_listeners: mpsc::UnboundedReceiver<T::Listener>,
|
|
|
|
next_incoming: T::Incoming,
|
2018-04-16 18:55:16 +09:00
|
|
|
listeners: FuturesUnordered<
|
|
|
|
StreamFuture<
|
|
|
|
Box<
|
|
|
|
Stream<
|
2018-05-14 15:55:16 +02:00
|
|
|
Item = Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>,
|
2018-04-16 18:55:16 +09:00
|
|
|
Error = IoError,
|
|
|
|
>,
|
2018-03-07 16:20:55 +01:00
|
|
|
>,
|
|
|
|
>,
|
|
|
|
>,
|
2018-04-16 18:55:16 +09:00
|
|
|
listeners_upgrade:
|
2018-05-14 15:55:16 +02:00
|
|
|
FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
|
|
|
|
dialers: FuturesUnordered<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
|
2018-03-07 16:20:55 +01:00
|
|
|
new_dialers:
|
2018-05-14 15:55:16 +02:00
|
|
|
mpsc::UnboundedReceiver<Box<Future<Item = (T::Output, Multiaddr), Error = IoError>>>,
|
2018-04-16 18:55:16 +09:00
|
|
|
to_process: FuturesUnordered<future::Either<F, Box<Future<Item = (), Error = IoError>>>>,
|
2018-01-03 15:46:45 +01:00
|
|
|
new_toprocess: mpsc::UnboundedReceiver<Box<Future<Item = (), Error = IoError>>>,
|
2018-01-03 14:19:24 +01:00
|
|
|
}
|
|
|
|
|
2018-05-14 15:55:16 +02:00
|
|
|
impl<T, H, If, F> Future for SwarmFuture<T, H, F>
|
2018-03-07 16:20:55 +01:00
|
|
|
where
|
|
|
|
T: MuxedTransport + Clone + 'static, // TODO: 'static :-/,
|
2018-05-14 15:55:16 +02:00
|
|
|
H: FnMut(T::Output, Multiaddr) -> If,
|
2018-03-07 16:20:55 +01:00
|
|
|
If: IntoFuture<Future = F, Item = (), Error = IoError>,
|
|
|
|
F: Future<Item = (), Error = IoError>,
|
2018-01-03 14:19:24 +01:00
|
|
|
{
|
|
|
|
type Item = ();
|
|
|
|
type Error = IoError;
|
|
|
|
|
|
|
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
|
|
let handler = &mut self.handler;
|
|
|
|
|
|
|
|
match self.next_incoming.poll() {
|
2018-03-07 11:51:52 +01:00
|
|
|
Ok(Async::Ready(connec)) => {
|
2018-05-17 15:14:13 +02:00
|
|
|
debug!("Swarm received new multiplexed incoming connection");
|
2018-05-14 15:55:16 +02:00
|
|
|
self.next_incoming = self.transport.clone().next_incoming();
|
|
|
|
self.listeners_upgrade.push(Box::new(connec) as Box<_>);
|
2018-03-07 16:20:55 +01:00
|
|
|
}
|
|
|
|
Ok(Async::NotReady) => {}
|
2018-03-16 16:39:02 +01:00
|
|
|
Err(err) => {
|
2018-05-17 15:14:13 +02:00
|
|
|
debug!("Error in multiplexed incoming connection: {:?}", err);
|
2018-05-14 15:55:16 +02:00
|
|
|
self.next_incoming = self.transport.clone().next_incoming();
|
2018-03-07 16:20:55 +01:00
|
|
|
}
|
2018-01-03 14:19:24 +01:00
|
|
|
};
|
|
|
|
|
|
|
|
match self.new_listeners.poll() {
|
|
|
|
Ok(Async::Ready(Some(new_listener))) => {
|
2018-05-14 15:55:16 +02:00
|
|
|
let new_listener = Box::new(
|
|
|
|
new_listener.map(|f| Box::new(f) as Box<Future<Item = _, Error = _>>),
|
|
|
|
) as Box<Stream<Item = _, Error = _>>;
|
2018-04-16 18:55:16 +09:00
|
|
|
self.listeners.push(new_listener.into_future());
|
2018-03-07 16:20:55 +01:00
|
|
|
}
|
2018-01-03 14:19:24 +01:00
|
|
|
Ok(Async::Ready(None)) | Err(_) => {
|
|
|
|
// New listener sender has been closed.
|
2018-03-07 16:20:55 +01:00
|
|
|
}
|
|
|
|
Ok(Async::NotReady) => {}
|
2018-01-03 14:19:24 +01:00
|
|
|
};
|
|
|
|
|
|
|
|
match self.new_dialers.poll() {
|
2018-03-07 10:49:11 +01:00
|
|
|
Ok(Async::Ready(Some(new_dialer))) => {
|
|
|
|
self.dialers.push(new_dialer);
|
2018-03-07 16:20:55 +01:00
|
|
|
}
|
2018-01-03 14:19:24 +01:00
|
|
|
Ok(Async::Ready(None)) | Err(_) => {
|
|
|
|
// New dialers sender has been closed.
|
2018-03-07 16:20:55 +01:00
|
|
|
}
|
|
|
|
Ok(Async::NotReady) => {}
|
2018-01-03 14:19:24 +01:00
|
|
|
};
|
|
|
|
|
2018-01-03 15:46:45 +01:00
|
|
|
match self.new_toprocess.poll() {
|
|
|
|
Ok(Async::Ready(Some(new_toprocess))) => {
|
|
|
|
self.to_process.push(future::Either::B(new_toprocess));
|
2018-03-07 16:20:55 +01:00
|
|
|
}
|
2018-01-03 15:46:45 +01:00
|
|
|
Ok(Async::Ready(None)) | Err(_) => {
|
|
|
|
// New to-process sender has been closed.
|
2018-03-07 16:20:55 +01:00
|
|
|
}
|
|
|
|
Ok(Async::NotReady) => {}
|
2018-01-03 15:46:45 +01:00
|
|
|
};
|
|
|
|
|
2018-04-16 18:55:16 +09:00
|
|
|
match self.listeners.poll() {
|
|
|
|
Ok(Async::Ready(Some((Some(upgrade), remaining)))) => {
|
2018-05-17 15:14:13 +02:00
|
|
|
trace!("Swarm received new connection on listener socket");
|
2018-04-16 18:55:16 +09:00
|
|
|
self.listeners_upgrade.push(upgrade);
|
|
|
|
self.listeners.push(remaining.into_future());
|
|
|
|
}
|
|
|
|
Err((err, _)) => {
|
2018-05-17 15:14:13 +02:00
|
|
|
warn!("Error in listener: {:?}", err);
|
2018-04-16 18:55:16 +09:00
|
|
|
}
|
|
|
|
_ => {}
|
2018-01-03 14:19:24 +01:00
|
|
|
}
|
|
|
|
|
2018-04-16 18:55:16 +09:00
|
|
|
match self.listeners_upgrade.poll() {
|
|
|
|
Ok(Async::Ready(Some((output, client_addr)))) => {
|
|
|
|
debug!(
|
|
|
|
"Successfully upgraded incoming connection with {}",
|
|
|
|
client_addr
|
|
|
|
);
|
|
|
|
self.to_process.push(future::Either::A(
|
|
|
|
handler(output, client_addr).into_future(),
|
|
|
|
));
|
2018-01-03 14:19:24 +01:00
|
|
|
}
|
2018-04-16 18:55:16 +09:00
|
|
|
Err(err) => {
|
2018-05-17 15:14:13 +02:00
|
|
|
warn!("Error in listener upgrade: {:?}", err);
|
2018-04-16 18:55:16 +09:00
|
|
|
}
|
|
|
|
_ => {}
|
2018-01-03 14:19:24 +01:00
|
|
|
}
|
|
|
|
|
2018-04-16 18:55:16 +09:00
|
|
|
match self.dialers.poll() {
|
|
|
|
Ok(Async::Ready(Some((output, addr)))) => {
|
|
|
|
trace!("Successfully upgraded dialed connection with {}", addr);
|
|
|
|
self.to_process
|
|
|
|
.push(future::Either::A(handler(output, addr).into_future()));
|
|
|
|
}
|
|
|
|
Err(err) => {
|
2018-05-17 15:14:13 +02:00
|
|
|
warn!("Error in dialer upgrade: {:?}", err);
|
2018-01-03 14:19:24 +01:00
|
|
|
}
|
2018-04-16 18:55:16 +09:00
|
|
|
_ => {}
|
2018-01-03 14:19:24 +01:00
|
|
|
}
|
|
|
|
|
2018-04-16 18:55:16 +09:00
|
|
|
match self.to_process.poll() {
|
|
|
|
Ok(Async::Ready(Some(()))) => {
|
2018-05-17 15:14:13 +02:00
|
|
|
trace!("Future returned by swarm handler driven to completion");
|
2018-04-16 18:55:16 +09:00
|
|
|
}
|
|
|
|
Err(err) => {
|
2018-05-17 15:14:13 +02:00
|
|
|
warn!("Error in processing: {:?}", err);
|
2018-01-03 14:19:24 +01:00
|
|
|
}
|
2018-04-16 18:55:16 +09:00
|
|
|
_ => {}
|
2018-01-03 14:19:24 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
// TODO: we never return `Ok(Ready)` because there's no way to know whether
|
|
|
|
// `next_incoming()` can produce anything more in the future
|
|
|
|
Ok(Async::NotReady)
|
|
|
|
}
|
|
|
|
}
|