mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-24 15:21:33 +00:00
Add an alternative to the swarm (#472)
* Rewrite the swarm * Small improvement to Debug of ListenersStream * Fix Swarm::Replaced never being produced * Fix logic problem when reaching a node in swarm * Small comment in swarm * Add closed_multiaddr to Replaced event * Add address to NodeClosed and NodeError * Fix concerns * Remove StreamMuxer::boxed
This commit is contained in:
@ -18,7 +18,9 @@ protobuf = "2.0.2"
|
||||
quick-error = "1.2"
|
||||
rw-stream-sink = { path = "../misc/rw-stream-sink" }
|
||||
smallvec = "0.5"
|
||||
tokio-executor = "0.1.4"
|
||||
tokio-io = "0.1"
|
||||
void = "1"
|
||||
|
||||
[dev-dependencies]
|
||||
libp2p-ping = { path = "../protocols/ping" }
|
||||
|
@ -210,6 +210,20 @@ where
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn close_inbound(&self) {
|
||||
match *self {
|
||||
EitherOutput::First(ref inner) => inner.close_inbound(),
|
||||
EitherOutput::Second(ref inner) => inner.close_inbound(),
|
||||
}
|
||||
}
|
||||
|
||||
fn close_outbound(&self) {
|
||||
match *self {
|
||||
EitherOutput::First(ref inner) => inner.close_outbound(),
|
||||
EitherOutput::Second(ref inner) => inner.close_outbound(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
|
@ -219,7 +219,9 @@ extern crate protobuf;
|
||||
extern crate quick_error;
|
||||
extern crate rw_stream_sink;
|
||||
extern crate smallvec;
|
||||
extern crate tokio_executor;
|
||||
extern crate tokio_io;
|
||||
extern crate void;
|
||||
|
||||
#[cfg(test)]
|
||||
extern crate rand;
|
||||
@ -243,6 +245,7 @@ mod unique;
|
||||
|
||||
pub mod either;
|
||||
pub mod muxing;
|
||||
pub mod nodes;
|
||||
pub mod swarm;
|
||||
pub mod transport;
|
||||
pub mod upgrade;
|
||||
|
@ -18,9 +18,12 @@
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{future, prelude::*};
|
||||
use parking_lot::Mutex;
|
||||
use std::io::{Error as IoError, Read, Write};
|
||||
use std::ops::Deref;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
/// Implemented on objects that can open and manage substreams.
|
||||
@ -46,12 +49,17 @@ pub trait StreamMuxer {
|
||||
|
||||
/// Polls the outbound substream.
|
||||
///
|
||||
/// May panic or produce an undefined result if an earlier polling returned `Ready` or `Err`.
|
||||
/// If this returns `Ok(Ready(None))`, that means that the outbound channel is closed and that
|
||||
/// opening any further outbound substream will likely produce `None` as well. The existing
|
||||
/// outbound substream attempts may however still succeed.
|
||||
///
|
||||
/// If `NotReady` is returned, then the current task will be notified once the substream
|
||||
/// is ready to be polled, similar to the API of `Future::poll()`.
|
||||
/// However, for each individual outbound substream, only the latest task that was used to
|
||||
/// call this method may be notified.
|
||||
///
|
||||
/// May panic or produce an undefined result if an earlier polling of the same substream
|
||||
/// returned `Ready` or `Err`.
|
||||
fn poll_outbound(
|
||||
&self,
|
||||
substream: &mut Self::OutboundSubstream,
|
||||
@ -104,6 +112,14 @@ pub trait StreamMuxer {
|
||||
|
||||
/// Destroys a substream.
|
||||
fn destroy_substream(&self, substream: Self::Substream);
|
||||
|
||||
/// If supported, sends a hint to the remote that we may no longer accept any further inbound
|
||||
/// substream. Calling `poll_inbound` afterwards may or may not produce `None`.
|
||||
fn close_inbound(&self);
|
||||
|
||||
/// If supported, sends a hint to the remote that we may no longer open any further outbound
|
||||
/// substream. Calling `poll_outbound` afterwards may or may not produce `None`.
|
||||
fn close_outbound(&self);
|
||||
}
|
||||
|
||||
/// Polls for an inbound from the muxer but wraps the output in an object that
|
||||
@ -301,3 +317,206 @@ where
|
||||
.destroy_substream(self.substream.take().expect("substream was empty"))
|
||||
}
|
||||
}
|
||||
|
||||
/// Abstract `StreamMuxer`.
|
||||
pub struct StreamMuxerBox {
|
||||
inner: Box<StreamMuxer<Substream = usize, OutboundSubstream = usize> + Send + Sync>,
|
||||
}
|
||||
|
||||
impl StreamMuxerBox {
|
||||
/// Turns a stream muxer into a `StreamMuxerBox`.
|
||||
pub fn new<T>(muxer: T) -> StreamMuxerBox
|
||||
where
|
||||
T: StreamMuxer + Send + Sync + 'static,
|
||||
T::OutboundSubstream: Send,
|
||||
T::Substream: Send,
|
||||
{
|
||||
let wrap = Wrap {
|
||||
inner: muxer,
|
||||
substreams: Mutex::new(Default::default()),
|
||||
next_substream: AtomicUsize::new(0),
|
||||
outbound: Mutex::new(Default::default()),
|
||||
next_outbound: AtomicUsize::new(0),
|
||||
};
|
||||
|
||||
StreamMuxerBox {
|
||||
inner: Box::new(wrap),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamMuxer for StreamMuxerBox {
|
||||
type Substream = usize; // TODO: use a newtype
|
||||
type OutboundSubstream = usize; // TODO: use a newtype
|
||||
|
||||
#[inline]
|
||||
fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError> {
|
||||
self.inner.poll_inbound()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn open_outbound(&self) -> Self::OutboundSubstream {
|
||||
self.inner.open_outbound()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn poll_outbound(
|
||||
&self,
|
||||
substream: &mut Self::OutboundSubstream,
|
||||
) -> Poll<Option<Self::Substream>, IoError> {
|
||||
self.inner.poll_outbound(substream)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
|
||||
self.inner.destroy_outbound(substream)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn read_substream(
|
||||
&self,
|
||||
substream: &mut Self::Substream,
|
||||
buf: &mut [u8],
|
||||
) -> Result<usize, IoError>
|
||||
{
|
||||
self.inner.read_substream(substream, buf)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_substream(
|
||||
&self,
|
||||
substream: &mut Self::Substream,
|
||||
buf: &[u8],
|
||||
) -> Result<usize, IoError> {
|
||||
self.inner.write_substream(substream, buf)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> {
|
||||
self.inner.flush_substream(substream)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> {
|
||||
self.inner.shutdown_substream(substream)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn destroy_substream(&self, substream: Self::Substream) {
|
||||
self.inner.destroy_substream(substream)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn close_inbound(&self) {
|
||||
self.inner.close_inbound()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn close_outbound(&self) {
|
||||
self.inner.close_outbound()
|
||||
}
|
||||
}
|
||||
|
||||
struct Wrap<T> where T: StreamMuxer {
|
||||
inner: T,
|
||||
substreams: Mutex<FnvHashMap<usize, T::Substream>>,
|
||||
next_substream: AtomicUsize,
|
||||
outbound: Mutex<FnvHashMap<usize, T::OutboundSubstream>>,
|
||||
next_outbound: AtomicUsize,
|
||||
}
|
||||
|
||||
impl<T> StreamMuxer for Wrap<T> where T: StreamMuxer {
|
||||
type Substream = usize; // TODO: use a newtype
|
||||
type OutboundSubstream = usize; // TODO: use a newtype
|
||||
|
||||
#[inline]
|
||||
fn poll_inbound(&self) -> Poll<Option<Self::Substream>, IoError> {
|
||||
match try_ready!(self.inner.poll_inbound()) {
|
||||
Some(substream) => {
|
||||
let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
|
||||
self.substreams.lock().insert(id, substream);
|
||||
Ok(Async::Ready(Some(id)))
|
||||
},
|
||||
None => Ok(Async::Ready(None)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn open_outbound(&self) -> Self::OutboundSubstream {
|
||||
let outbound = self.inner.open_outbound();
|
||||
let id = self.next_outbound.fetch_add(1, Ordering::Relaxed);
|
||||
self.outbound.lock().insert(id, outbound);
|
||||
id
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn poll_outbound(
|
||||
&self,
|
||||
substream: &mut Self::OutboundSubstream,
|
||||
) -> Poll<Option<Self::Substream>, IoError> {
|
||||
let mut list = self.outbound.lock();
|
||||
match try_ready!(self.inner.poll_outbound(list.get_mut(substream).unwrap())) {
|
||||
Some(substream) => {
|
||||
let id = self.next_substream.fetch_add(1, Ordering::Relaxed);
|
||||
self.substreams.lock().insert(id, substream);
|
||||
Ok(Async::Ready(Some(id)))
|
||||
},
|
||||
None => Ok(Async::Ready(None)),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn destroy_outbound(&self, substream: Self::OutboundSubstream) {
|
||||
let mut list = self.outbound.lock();
|
||||
self.inner.destroy_outbound(list.remove(&substream).unwrap())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn read_substream(
|
||||
&self,
|
||||
substream: &mut Self::Substream,
|
||||
buf: &mut [u8],
|
||||
) -> Result<usize, IoError>
|
||||
{
|
||||
let mut list = self.substreams.lock();
|
||||
self.inner.read_substream(list.get_mut(substream).unwrap(), buf)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn write_substream(
|
||||
&self,
|
||||
substream: &mut Self::Substream,
|
||||
buf: &[u8],
|
||||
) -> Result<usize, IoError> {
|
||||
let mut list = self.substreams.lock();
|
||||
self.inner.write_substream(list.get_mut(substream).unwrap(), buf)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush_substream(&self, substream: &mut Self::Substream) -> Result<(), IoError> {
|
||||
let mut list = self.substreams.lock();
|
||||
self.inner.flush_substream(list.get_mut(substream).unwrap())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn shutdown_substream(&self, substream: &mut Self::Substream) -> Poll<(), IoError> {
|
||||
let mut list = self.substreams.lock();
|
||||
self.inner.shutdown_substream(list.get_mut(substream).unwrap())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn destroy_substream(&self, substream: Self::Substream) {
|
||||
let mut list = self.substreams.lock();
|
||||
self.inner.destroy_substream(list.remove(&substream).unwrap())
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn close_inbound(&self) {
|
||||
self.inner.close_inbound()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn close_outbound(&self) {
|
||||
self.inner.close_outbound()
|
||||
}
|
||||
}
|
||||
|
700
core/src/nodes/collection.rs
Normal file
700
core/src/nodes/collection.rs
Normal file
@ -0,0 +1,700 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use fnv::FnvHashMap;
|
||||
use futures::{prelude::*, sync::mpsc, sync::oneshot, task};
|
||||
use muxing::StreamMuxer;
|
||||
use nodes::node::{NodeEvent, NodeStream, Substream};
|
||||
use smallvec::SmallVec;
|
||||
use std::collections::hash_map::{Entry, OccupiedEntry};
|
||||
use std::io::Error as IoError;
|
||||
use tokio_executor;
|
||||
use void::Void;
|
||||
use {Multiaddr, PeerId};
|
||||
|
||||
// TODO: make generic over PeerId
|
||||
|
||||
// Implementor notes
|
||||
// =================
|
||||
//
|
||||
// This collection of nodes spawns a task for each individual node to process. This means that
|
||||
// events happen on the background at the same time as the `CollectionStream` is being polled.
|
||||
//
|
||||
// In order to make the API non-racy and avoid issues, we totally separate the state in the
|
||||
// `CollectionStream` and the states that the task nodes can access. They are only allowed to
|
||||
// exchange messages. The state in the `CollectionStream` is therefore delayed compared to the
|
||||
// tasks, and is updated only when `poll()` is called.
|
||||
//
|
||||
// The only thing that we must be careful about is substreams, as they are "detached" from the
|
||||
// state of the `CollectionStream` and allowed to process in parallel. This is why there is no
|
||||
// "substream closed" event being reported, as it could potentially create confusions and race
|
||||
// conditions in the user's code. See similar comments in the documentation of `NodeStream`.
|
||||
|
||||
/// Implementation of `Stream` that handles a collection of nodes.
|
||||
// TODO: implement Debug
|
||||
pub struct CollectionStream<TMuxer, TUserData>
|
||||
where
|
||||
TMuxer: StreamMuxer,
|
||||
{
|
||||
/// List of nodes, with a sender allowing to communicate messages.
|
||||
nodes: FnvHashMap<PeerId, (ReachAttemptId, mpsc::UnboundedSender<ExtToInMessage>)>,
|
||||
/// Known state of a task. Tasks are identified by the reach attempt ID.
|
||||
tasks: FnvHashMap<ReachAttemptId, TaskKnownState>,
|
||||
/// Identifier for the next task to spawn.
|
||||
next_task_id: ReachAttemptId,
|
||||
|
||||
/// List of node tasks to spawn.
|
||||
// TODO: stronger typing?
|
||||
to_spawn: SmallVec<[Box<Future<Item = (), Error = ()> + Send>; 8]>,
|
||||
/// Task to notify when an element is added to `to_spawn`.
|
||||
to_notify: Option<task::Task>,
|
||||
|
||||
/// Sender to emit events to the outside. Meant to be cloned and sent to tasks.
|
||||
events_tx: mpsc::UnboundedSender<(InToExtMessage<TMuxer>, ReachAttemptId)>,
|
||||
/// Receiver side for the events.
|
||||
events_rx: mpsc::UnboundedReceiver<(InToExtMessage<TMuxer>, ReachAttemptId)>,
|
||||
|
||||
/// Instead of passing directly the user data when opening an outbound substream attempt, we
|
||||
/// store it here and pass a `usize` to the node. This makes it possible to instantly close
|
||||
/// some attempts if necessary.
|
||||
// TODO: use something else than hashmap? we often need to iterate over everything, and a
|
||||
// SmallVec may be better
|
||||
outbound_attempts: FnvHashMap<usize, (PeerId, TUserData)>,
|
||||
/// Identifier for the next entry in `outbound_attempts`.
|
||||
next_outbound_attempt: usize,
|
||||
}
|
||||
|
||||
/// State of a task, as known by the frontend (the `ColletionStream`). Asynchronous compared to
|
||||
/// the actual state.
|
||||
enum TaskKnownState {
|
||||
/// Task is attempting to reach a peer.
|
||||
Pending { interrupt: oneshot::Sender<()> },
|
||||
/// The user interrupted this task.
|
||||
Interrupted,
|
||||
/// The task is connected to a peer.
|
||||
Connected(PeerId),
|
||||
}
|
||||
|
||||
/// Event that can happen on the `CollectionStream`.
|
||||
// TODO: implement Debug
|
||||
pub enum CollectionEvent<TMuxer, TUserData>
|
||||
where
|
||||
TMuxer: StreamMuxer,
|
||||
{
|
||||
/// A connection to a node has succeeded.
|
||||
NodeReached {
|
||||
/// Identifier of the node.
|
||||
peer_id: PeerId,
|
||||
/// Identifier of the reach attempt that succeeded.
|
||||
id: ReachAttemptId,
|
||||
},
|
||||
|
||||
/// A connection to a node has succeeded and replaces a former connection.
|
||||
///
|
||||
/// The opened substreams of the former node will keep working (unless the remote decides to
|
||||
/// close them).
|
||||
NodeReplaced {
|
||||
/// Identifier of the node.
|
||||
peer_id: PeerId,
|
||||
/// Outbound substream attempts that have been closed in the process.
|
||||
closed_outbound_substreams: Vec<TUserData>,
|
||||
/// Identifier of the reach attempt that succeeded.
|
||||
id: ReachAttemptId,
|
||||
},
|
||||
|
||||
/// A connection to a node has been closed.
|
||||
///
|
||||
/// This happens once both the inbound and outbound channels are closed, and no more outbound
|
||||
/// substream attempt is pending.
|
||||
NodeClosed {
|
||||
/// Identifier of the node.
|
||||
peer_id: PeerId,
|
||||
},
|
||||
|
||||
/// A connection to a node has errored.
|
||||
NodeError {
|
||||
/// Identifier of the node.
|
||||
peer_id: PeerId,
|
||||
/// The error that happened.
|
||||
error: IoError,
|
||||
/// Pending outbound substreams that were cancelled.
|
||||
closed_outbound_substreams: Vec<TUserData>,
|
||||
},
|
||||
|
||||
/// An error happened on the future that was trying to reach a node.
|
||||
ReachError {
|
||||
/// Identifier of the reach attempt that failed.
|
||||
id: ReachAttemptId,
|
||||
/// Error that happened on the future.
|
||||
error: IoError,
|
||||
},
|
||||
|
||||
/// The multiaddress of the node has been resolved.
|
||||
NodeMultiaddr {
|
||||
/// Identifier of the node.
|
||||
peer_id: PeerId,
|
||||
/// Address that has been resolved, or error that occured on the substream.
|
||||
address: Result<Multiaddr, IoError>,
|
||||
},
|
||||
|
||||
/// A new inbound substream arrived.
|
||||
InboundSubstream {
|
||||
/// Identifier of the node.
|
||||
peer_id: PeerId,
|
||||
/// The newly-opened substream.
|
||||
substream: Substream<TMuxer>,
|
||||
},
|
||||
|
||||
/// An outbound substream has successfully been opened.
|
||||
OutboundSubstream {
|
||||
/// Identifier of the node.
|
||||
peer_id: PeerId,
|
||||
/// Identifier of the substream. Same as what was returned by `open_substream`.
|
||||
user_data: TUserData,
|
||||
/// The newly-opened substream.
|
||||
substream: Substream<TMuxer>,
|
||||
},
|
||||
|
||||
/// The inbound side of a muxer has been gracefully closed. No more inbound substreams will
|
||||
/// be produced.
|
||||
InboundClosed {
|
||||
/// Identifier of the node.
|
||||
peer_id: PeerId,
|
||||
},
|
||||
|
||||
/// An outbound substream couldn't be opened because the muxer is no longer capable of opening
|
||||
/// more substreams.
|
||||
OutboundClosed {
|
||||
/// Identifier of the node.
|
||||
peer_id: PeerId,
|
||||
/// Identifier of the substream. Same as what was returned by `open_substream`.
|
||||
user_data: TUserData,
|
||||
},
|
||||
}
|
||||
|
||||
/// Identifier for a future that attempts to reach a node.
|
||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct ReachAttemptId(usize);
|
||||
|
||||
impl<TMuxer, TUserData> CollectionStream<TMuxer, TUserData>
|
||||
where
|
||||
TMuxer: StreamMuxer,
|
||||
{
|
||||
/// Creates a new empty collection.
|
||||
#[inline]
|
||||
pub fn new() -> Self {
|
||||
let (events_tx, events_rx) = mpsc::unbounded();
|
||||
|
||||
CollectionStream {
|
||||
nodes: Default::default(),
|
||||
tasks: Default::default(),
|
||||
next_task_id: ReachAttemptId(0),
|
||||
to_spawn: SmallVec::new(),
|
||||
to_notify: None,
|
||||
events_tx,
|
||||
events_rx,
|
||||
outbound_attempts: Default::default(),
|
||||
next_outbound_attempt: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds to the collection a future that tries to reach a remote.
|
||||
///
|
||||
/// This method spawns a task dedicated to resolving this future and processing the node's
|
||||
/// events.
|
||||
pub fn add_reach_attempt<TFut, TAddrFut>(&mut self, future: TFut) -> ReachAttemptId
|
||||
where
|
||||
TFut: Future<Item = ((PeerId, TMuxer), TAddrFut), Error = IoError> + Send + 'static,
|
||||
TMuxer: Send + Sync + 'static,
|
||||
TMuxer::OutboundSubstream: Send,
|
||||
TMuxer::Substream: Send,
|
||||
TAddrFut: Future<Item = Multiaddr, Error = IoError> + Send + 'static,
|
||||
TUserData: Send + 'static,
|
||||
{
|
||||
let reach_attempt_id = self.next_task_id;
|
||||
self.next_task_id.0 += 1;
|
||||
|
||||
let (interrupt_tx, interrupt_rx) = oneshot::channel();
|
||||
self.tasks.insert(
|
||||
reach_attempt_id,
|
||||
TaskKnownState::Pending {
|
||||
interrupt: interrupt_tx,
|
||||
},
|
||||
);
|
||||
|
||||
let task = Box::new(NodeTask {
|
||||
inner: NodeTaskInner::Future {
|
||||
future,
|
||||
interrupt: interrupt_rx,
|
||||
},
|
||||
events_tx: self.events_tx.clone(),
|
||||
id: reach_attempt_id,
|
||||
});
|
||||
|
||||
self.to_spawn.push(task);
|
||||
|
||||
if let Some(task) = self.to_notify.take() {
|
||||
task.notify();
|
||||
}
|
||||
|
||||
reach_attempt_id
|
||||
}
|
||||
|
||||
/// Interrupts a reach attempt.
|
||||
///
|
||||
/// Returns `Ok` if something was interrupted, and `Err` if the ID is not or no longer valid.
|
||||
pub fn interrupt(&mut self, id: ReachAttemptId) -> Result<(), ()> {
|
||||
match self.tasks.entry(id) {
|
||||
Entry::Vacant(_) => return Err(()),
|
||||
Entry::Occupied(mut entry) => {
|
||||
match entry.get() {
|
||||
&TaskKnownState::Connected(_) => return Err(()),
|
||||
&TaskKnownState::Interrupted => return Err(()),
|
||||
&TaskKnownState::Pending { .. } => (),
|
||||
};
|
||||
|
||||
match entry.insert(TaskKnownState::Interrupted) {
|
||||
TaskKnownState::Pending { interrupt } => {
|
||||
let _ = interrupt.send(());
|
||||
}
|
||||
TaskKnownState::Interrupted | TaskKnownState::Connected(_) => unreachable!(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Grants access to an object that allows controlling a node of the collection.
|
||||
///
|
||||
/// Returns `None` if we don't have a connection to this peer.
|
||||
#[inline]
|
||||
pub fn peer_mut(&mut self, id: &PeerId) -> Option<PeerMut<TUserData>>
|
||||
where
|
||||
TUserData: Send + 'static,
|
||||
{
|
||||
match self.nodes.entry(id.clone()) {
|
||||
Entry::Occupied(inner) => Some(PeerMut {
|
||||
inner,
|
||||
tasks: &mut self.tasks,
|
||||
next_outbound_attempt: &mut self.next_outbound_attempt,
|
||||
outbound_attempts: &mut self.outbound_attempts,
|
||||
}),
|
||||
Entry::Vacant(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if we are connected to the given peer.
|
||||
///
|
||||
/// This will return true only after a `NodeReached` event has been produced by `poll()`.
|
||||
#[inline]
|
||||
pub fn has_connection(&self, id: &PeerId) -> bool {
|
||||
self.nodes.contains_key(id)
|
||||
}
|
||||
|
||||
/// Returns a list of all the active connections.
|
||||
///
|
||||
/// Does not include reach attempts that haven't reached any target yet.
|
||||
#[inline]
|
||||
pub fn connections(&self) -> impl Iterator<Item = &PeerId> {
|
||||
self.nodes.keys()
|
||||
}
|
||||
}
|
||||
|
||||
/// Access to a peer in the collection.
|
||||
pub struct PeerMut<'a, TUserData>
|
||||
where
|
||||
TUserData: Send + 'static,
|
||||
{
|
||||
next_outbound_attempt: &'a mut usize,
|
||||
outbound_attempts: &'a mut FnvHashMap<usize, (PeerId, TUserData)>,
|
||||
inner: OccupiedEntry<'a, PeerId, (ReachAttemptId, mpsc::UnboundedSender<ExtToInMessage>)>,
|
||||
tasks: &'a mut FnvHashMap<ReachAttemptId, TaskKnownState>,
|
||||
}
|
||||
|
||||
impl<'a, TUserData> PeerMut<'a, TUserData>
|
||||
where
|
||||
TUserData: Send + 'static,
|
||||
{
|
||||
/// Starts the process of opening a new outbound substream towards the peer.
|
||||
pub fn open_substream(&mut self, user_data: TUserData) {
|
||||
let id = *self.next_outbound_attempt;
|
||||
*self.next_outbound_attempt += 1;
|
||||
|
||||
self.outbound_attempts
|
||||
.insert(id, (self.inner.key().clone(), user_data));
|
||||
|
||||
let _ = self
|
||||
.inner
|
||||
.get_mut()
|
||||
.1
|
||||
.unbounded_send(ExtToInMessage::OpenSubstream(id));
|
||||
}
|
||||
|
||||
/// Closes the connections to this node.
|
||||
///
|
||||
/// This cancels all the attempted outgoing substream attempts, and returns them.
|
||||
///
|
||||
/// No event will be generated for this node.
|
||||
pub fn close(self) -> Vec<TUserData> {
|
||||
let (peer_id, (task_id, _)) = self.inner.remove_entry();
|
||||
let user_datas = extract_from_attempt(self.outbound_attempts, &peer_id);
|
||||
// Set the task to `Interrupted` so that we ignore further messages from this closed node.
|
||||
match self.tasks.insert(task_id, TaskKnownState::Interrupted) {
|
||||
Some(TaskKnownState::Connected(ref p)) if p == &peer_id => (),
|
||||
None
|
||||
| Some(TaskKnownState::Connected(_))
|
||||
| Some(TaskKnownState::Pending { .. })
|
||||
| Some(TaskKnownState::Interrupted) => panic!("Inconsistent state"),
|
||||
}
|
||||
user_datas
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract from the hashmap the entries matching `node`.
|
||||
fn extract_from_attempt<TUserData>(
|
||||
outbound_attempts: &mut FnvHashMap<usize, (PeerId, TUserData)>,
|
||||
node: &PeerId,
|
||||
) -> Vec<TUserData> {
|
||||
let to_remove: Vec<usize> = outbound_attempts
|
||||
.iter()
|
||||
.filter(|(_, &(ref key, _))| key == node)
|
||||
.map(|(&k, _)| k)
|
||||
.collect();
|
||||
|
||||
let mut user_datas = Vec::with_capacity(to_remove.len());
|
||||
for to_remove in to_remove {
|
||||
let (_, user_data) = outbound_attempts.remove(&to_remove).unwrap();
|
||||
user_datas.push(user_data);
|
||||
}
|
||||
user_datas
|
||||
}
|
||||
|
||||
impl<TMuxer, TUserData> Stream for CollectionStream<TMuxer, TUserData>
|
||||
where
|
||||
TMuxer: StreamMuxer,
|
||||
{
|
||||
type Item = CollectionEvent<TMuxer, TUserData>;
|
||||
type Error = Void; // TODO: use ! once stable
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
for to_spawn in self.to_spawn.drain() {
|
||||
tokio_executor::spawn(to_spawn);
|
||||
}
|
||||
|
||||
loop {
|
||||
return match self.events_rx.poll() {
|
||||
Ok(Async::Ready(Some((InToExtMessage::NodeEvent(event), task_id)))) => {
|
||||
let peer_id = match self.tasks.get(&task_id) {
|
||||
Some(TaskKnownState::Connected(ref peer_id)) => peer_id.clone(),
|
||||
Some(TaskKnownState::Interrupted) => continue, // Ignore messages from this task.
|
||||
None | Some(TaskKnownState::Pending { .. }) => panic!("State mismatch"),
|
||||
};
|
||||
|
||||
match event {
|
||||
NodeEvent::Multiaddr(address) => {
|
||||
Ok(Async::Ready(Some(CollectionEvent::NodeMultiaddr {
|
||||
peer_id,
|
||||
address,
|
||||
})))
|
||||
}
|
||||
NodeEvent::InboundSubstream { substream } => {
|
||||
Ok(Async::Ready(Some(CollectionEvent::InboundSubstream {
|
||||
peer_id,
|
||||
substream,
|
||||
})))
|
||||
}
|
||||
NodeEvent::OutboundSubstream {
|
||||
user_data,
|
||||
substream,
|
||||
} => {
|
||||
let (_peer_id, actual_data) = self
|
||||
.outbound_attempts
|
||||
.remove(&user_data)
|
||||
.expect("State inconsistency in collection outbound user data");
|
||||
debug_assert_eq!(_peer_id, peer_id);
|
||||
Ok(Async::Ready(Some(CollectionEvent::OutboundSubstream {
|
||||
peer_id,
|
||||
user_data: actual_data,
|
||||
substream,
|
||||
})))
|
||||
}
|
||||
NodeEvent::InboundClosed => {
|
||||
Ok(Async::Ready(Some(CollectionEvent::InboundClosed {
|
||||
peer_id,
|
||||
})))
|
||||
}
|
||||
NodeEvent::OutboundClosed { user_data } => {
|
||||
let (_peer_id, actual_data) = self
|
||||
.outbound_attempts
|
||||
.remove(&user_data)
|
||||
.expect("State inconsistency in collection outbound user data");
|
||||
debug_assert_eq!(_peer_id, peer_id);
|
||||
Ok(Async::Ready(Some(CollectionEvent::OutboundClosed {
|
||||
peer_id,
|
||||
user_data: actual_data,
|
||||
})))
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Async::Ready(Some((InToExtMessage::NodeReached(peer_id, sender), task_id)))) => {
|
||||
match self
|
||||
.tasks
|
||||
.insert(task_id, TaskKnownState::Connected(peer_id.clone()))
|
||||
{
|
||||
Some(TaskKnownState::Pending { .. }) => (),
|
||||
Some(TaskKnownState::Interrupted) => continue,
|
||||
None | Some(TaskKnownState::Connected(_)) => panic!("Inconsistent state"),
|
||||
};
|
||||
|
||||
let replaced_node = self.nodes.insert(peer_id.clone(), (task_id, sender));
|
||||
let user_datas = extract_from_attempt(&mut self.outbound_attempts, &peer_id);
|
||||
if replaced_node.is_some() {
|
||||
Ok(Async::Ready(Some(CollectionEvent::NodeReplaced {
|
||||
peer_id,
|
||||
closed_outbound_substreams: user_datas,
|
||||
id: task_id,
|
||||
})))
|
||||
} else {
|
||||
Ok(Async::Ready(Some(CollectionEvent::NodeReached {
|
||||
peer_id,
|
||||
id: task_id,
|
||||
})))
|
||||
}
|
||||
}
|
||||
Ok(Async::Ready(Some((InToExtMessage::NodeClosed, task_id)))) => {
|
||||
let peer_id = match self.tasks.remove(&task_id) {
|
||||
Some(TaskKnownState::Connected(peer_id)) => peer_id.clone(),
|
||||
Some(TaskKnownState::Interrupted) => continue, // Ignore messages from this task.
|
||||
None | Some(TaskKnownState::Pending { .. }) => panic!("State mismatch"),
|
||||
};
|
||||
|
||||
let val = self.nodes.remove(&peer_id);
|
||||
debug_assert!(val.is_some());
|
||||
debug_assert!(
|
||||
extract_from_attempt(&mut self.outbound_attempts, &peer_id).is_empty()
|
||||
);
|
||||
Ok(Async::Ready(Some(CollectionEvent::NodeClosed { peer_id })))
|
||||
}
|
||||
Ok(Async::Ready(Some((InToExtMessage::NodeError(err), task_id)))) => {
|
||||
let peer_id = match self.tasks.remove(&task_id) {
|
||||
Some(TaskKnownState::Connected(peer_id)) => peer_id.clone(),
|
||||
Some(TaskKnownState::Interrupted) => continue, // Ignore messages from this task.
|
||||
None | Some(TaskKnownState::Pending { .. }) => panic!("State mismatch"),
|
||||
};
|
||||
|
||||
let val = self.nodes.remove(&peer_id);
|
||||
debug_assert!(val.is_some());
|
||||
let user_datas = extract_from_attempt(&mut self.outbound_attempts, &peer_id);
|
||||
Ok(Async::Ready(Some(CollectionEvent::NodeError {
|
||||
peer_id,
|
||||
error: err,
|
||||
closed_outbound_substreams: user_datas,
|
||||
})))
|
||||
}
|
||||
Ok(Async::Ready(Some((InToExtMessage::ReachError(err), task_id)))) => {
|
||||
match self.tasks.remove(&task_id) {
|
||||
Some(TaskKnownState::Interrupted) => continue,
|
||||
Some(TaskKnownState::Pending { .. }) => (),
|
||||
None | Some(TaskKnownState::Connected(_)) => panic!("Inconsistent state"),
|
||||
};
|
||||
|
||||
Ok(Async::Ready(Some(CollectionEvent::ReachError {
|
||||
id: task_id,
|
||||
error: err,
|
||||
})))
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
self.to_notify = Some(task::current());
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
Ok(Async::Ready(None)) => unreachable!("The tx is in self as well"),
|
||||
Err(()) => unreachable!("An unbounded receiver never errors"),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Message to transmit from the public API to a task.
|
||||
#[derive(Debug, Clone)]
|
||||
enum ExtToInMessage {
|
||||
/// A new substream shall be opened.
|
||||
OpenSubstream(usize),
|
||||
}
|
||||
|
||||
/// Message to transmit from a task to the public API.
|
||||
enum InToExtMessage<TMuxer>
|
||||
where
|
||||
TMuxer: StreamMuxer,
|
||||
{
|
||||
/// A connection to a node has succeeded.
|
||||
/// Closing the returned sender will end the task.
|
||||
NodeReached(PeerId, mpsc::UnboundedSender<ExtToInMessage>),
|
||||
NodeClosed,
|
||||
NodeError(IoError),
|
||||
ReachError(IoError),
|
||||
/// An event from the node.
|
||||
NodeEvent(NodeEvent<TMuxer, usize>),
|
||||
}
|
||||
|
||||
/// Implementation of `Future` that handles a single node, and all the communications between
|
||||
/// the various components of the `CollectionStream`.
|
||||
struct NodeTask<TFut, TMuxer, TAddrFut>
|
||||
where
|
||||
TMuxer: StreamMuxer,
|
||||
{
|
||||
/// Sender to transmit events to the outside.
|
||||
events_tx: mpsc::UnboundedSender<(InToExtMessage<TMuxer>, ReachAttemptId)>,
|
||||
/// Inner state of the `NodeTask`.
|
||||
inner: NodeTaskInner<TFut, TMuxer, TAddrFut>,
|
||||
/// Identifier of the attempt.
|
||||
id: ReachAttemptId,
|
||||
}
|
||||
|
||||
enum NodeTaskInner<TFut, TMuxer, TAddrFut>
|
||||
where
|
||||
TMuxer: StreamMuxer,
|
||||
{
|
||||
/// Future to resolve to connect to the node.
|
||||
Future {
|
||||
/// The future that will attempt to reach the node.
|
||||
future: TFut,
|
||||
/// Allows interrupting the attempt.
|
||||
interrupt: oneshot::Receiver<()>,
|
||||
},
|
||||
|
||||
/// Fully functional node.
|
||||
Node {
|
||||
/// The object that is actually processing things.
|
||||
/// This is an `Option` because we need to be able to extract it.
|
||||
node: NodeStream<TMuxer, TAddrFut, usize>,
|
||||
/// Receiving end for events sent from the main `CollectionStream`.
|
||||
in_events_rx: mpsc::UnboundedReceiver<ExtToInMessage>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<TFut, TMuxer, TAddrFut> Future for NodeTask<TFut, TMuxer, TAddrFut>
|
||||
where
|
||||
TMuxer: StreamMuxer,
|
||||
TFut: Future<Item = ((PeerId, TMuxer), TAddrFut), Error = IoError>,
|
||||
TAddrFut: Future<Item = Multiaddr, Error = IoError>,
|
||||
{
|
||||
type Item = ();
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<(), ()> {
|
||||
// Remember that this poll function is dedicated to a single node and is run
|
||||
// asynchronously.
|
||||
|
||||
// First, handle if we are still trying to reach a node.
|
||||
let new_state = if let NodeTaskInner::Future {
|
||||
ref mut future,
|
||||
ref mut interrupt,
|
||||
} = self.inner
|
||||
{
|
||||
match interrupt.poll() {
|
||||
Ok(Async::NotReady) => (),
|
||||
Ok(Async::Ready(())) | Err(_) => return Ok(Async::Ready(())),
|
||||
}
|
||||
|
||||
match future.poll() {
|
||||
Ok(Async::Ready(((peer_id, muxer), addr_fut))) => {
|
||||
let (sender, rx) = mpsc::unbounded();
|
||||
let event = InToExtMessage::NodeReached(peer_id, sender);
|
||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
||||
|
||||
Some(NodeTaskInner::Node {
|
||||
node: NodeStream::new(muxer, addr_fut),
|
||||
in_events_rx: rx,
|
||||
})
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
return Ok(Async::NotReady);
|
||||
}
|
||||
Err(error) => {
|
||||
// End the task
|
||||
let event = InToExtMessage::ReachError(error);
|
||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
if let Some(new_state) = new_state {
|
||||
self.inner = new_state;
|
||||
}
|
||||
|
||||
// Then handle if we're a node.
|
||||
if let NodeTaskInner::Node {
|
||||
ref mut node,
|
||||
ref mut in_events_rx,
|
||||
} = self.inner
|
||||
{
|
||||
// Start by handling commands received from the outside of the task.
|
||||
loop {
|
||||
match in_events_rx.poll() {
|
||||
Ok(Async::Ready(Some(ExtToInMessage::OpenSubstream(user_data)))) => match node
|
||||
.open_substream(user_data)
|
||||
{
|
||||
Ok(()) => (),
|
||||
Err(user_data) => {
|
||||
let event =
|
||||
InToExtMessage::NodeEvent(NodeEvent::OutboundClosed { user_data });
|
||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
||||
}
|
||||
},
|
||||
Ok(Async::Ready(None)) => {
|
||||
// Node closed by the external API ; end the task
|
||||
return Ok(Async::Ready(()));
|
||||
}
|
||||
Ok(Async::NotReady) => break,
|
||||
Err(()) => unreachable!("An unbounded receiver never errors"),
|
||||
}
|
||||
}
|
||||
|
||||
// Process the node.
|
||||
loop {
|
||||
match node.poll() {
|
||||
Ok(Async::NotReady) => break,
|
||||
Ok(Async::Ready(Some(event))) => {
|
||||
let event = InToExtMessage::NodeEvent(event);
|
||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
||||
}
|
||||
Ok(Async::Ready(None)) => {
|
||||
let event = InToExtMessage::NodeClosed;
|
||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
||||
return Ok(Async::Ready(())); // End the task.
|
||||
}
|
||||
Err(err) => {
|
||||
let event = InToExtMessage::NodeError(err);
|
||||
let _ = self.events_tx.unbounded_send((event, self.id));
|
||||
return Ok(Async::Ready(())); // End the task.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing's ready. The current task should have been registered by all of the inner
|
||||
// handlers.
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
221
core/src/nodes/listeners.rs
Normal file
221
core/src/nodes/listeners.rs
Normal file
@ -0,0 +1,221 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use futures::{prelude::*, task};
|
||||
use std::fmt;
|
||||
use void::Void;
|
||||
use {Multiaddr, Transport};
|
||||
|
||||
/// Implementation of `Stream` that handles listeners.
|
||||
///
|
||||
/// The stream cannot produce errors.
|
||||
pub struct ListenersStream<TTrans>
|
||||
where
|
||||
TTrans: Transport,
|
||||
{
|
||||
/// Transport used to spawn listeners.
|
||||
transport: TTrans,
|
||||
/// All the active listeners.
|
||||
listeners: Vec<Listener<TTrans>>,
|
||||
/// Task to notify when we add a new listener to `listeners`, so that we start polling.
|
||||
to_notify: Option<task::Task>,
|
||||
}
|
||||
|
||||
/// A single active listener.
|
||||
struct Listener<TTrans>
|
||||
where
|
||||
TTrans: Transport,
|
||||
{
|
||||
/// The object that actually listens.
|
||||
listener: TTrans::Listener,
|
||||
/// Address it is listening on.
|
||||
address: Multiaddr,
|
||||
}
|
||||
|
||||
/// Event that can happen on the `ListenersStream`.
|
||||
pub enum ListenersEvent<TTrans>
|
||||
where
|
||||
TTrans: Transport,
|
||||
{
|
||||
/// A connection is incoming on one of the listeners.
|
||||
Incoming {
|
||||
/// The produced upgrade.
|
||||
upgrade: TTrans::ListenerUpgrade,
|
||||
/// Address of the listener which received the connection.
|
||||
listen_addr: Multiaddr,
|
||||
},
|
||||
|
||||
/// A listener has closed, either gracefully or with an error.
|
||||
Closed {
|
||||
/// Address of the listener which closed.
|
||||
listen_addr: Multiaddr,
|
||||
/// The listener that closed.
|
||||
listener: TTrans::Listener,
|
||||
/// The error that happened. `Ok` if gracefully closed.
|
||||
result: Result<(), <TTrans::Listener as Stream>::Error>,
|
||||
},
|
||||
}
|
||||
|
||||
impl<TTrans> ListenersStream<TTrans>
|
||||
where
|
||||
TTrans: Transport,
|
||||
{
|
||||
/// Starts a new stream of listeners.
|
||||
#[inline]
|
||||
pub fn new(transport: TTrans) -> Self {
|
||||
ListenersStream {
|
||||
transport,
|
||||
listeners: Vec::new(),
|
||||
to_notify: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Same as `new`, but pre-allocates enough memory for the given number of
|
||||
/// simultaneous listeners.
|
||||
#[inline]
|
||||
pub fn with_capacity(transport: TTrans, capacity: usize) -> Self {
|
||||
ListenersStream {
|
||||
transport,
|
||||
listeners: Vec::with_capacity(capacity),
|
||||
to_notify: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Start listening on a multiaddress.
|
||||
///
|
||||
/// Returns an error if the transport doesn't support the given multiaddress.
|
||||
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<Multiaddr, Multiaddr>
|
||||
where
|
||||
TTrans: Clone,
|
||||
{
|
||||
let (listener, new_addr) = self
|
||||
.transport
|
||||
.clone()
|
||||
.listen_on(addr)
|
||||
.map_err(|(_, addr)| addr)?;
|
||||
|
||||
self.listeners.push(Listener {
|
||||
listener,
|
||||
address: new_addr.clone(),
|
||||
});
|
||||
|
||||
if let Some(task) = self.to_notify.take() {
|
||||
task.notify();
|
||||
}
|
||||
|
||||
Ok(new_addr)
|
||||
}
|
||||
|
||||
/// Returns the transport passed when building this object.
|
||||
#[inline]
|
||||
pub fn transport(&self) -> &TTrans {
|
||||
&self.transport
|
||||
}
|
||||
|
||||
/// Returns an iterator that produces the list of addresses we're listening on.
|
||||
#[inline]
|
||||
pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
|
||||
self.listeners.iter().map(|l| &l.address)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TTrans> Stream for ListenersStream<TTrans>
|
||||
where
|
||||
TTrans: Transport,
|
||||
{
|
||||
type Item = ListenersEvent<TTrans>;
|
||||
type Error = Void; // TODO: use ! once stable
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
// We remove each element from `listeners` one by one and add them back.
|
||||
for n in (0..self.listeners.len()).rev() {
|
||||
let mut listener = self.listeners.swap_remove(n);
|
||||
match listener.listener.poll() {
|
||||
Ok(Async::NotReady) => {
|
||||
self.listeners.push(listener);
|
||||
}
|
||||
Ok(Async::Ready(Some(upgrade))) => {
|
||||
let listen_addr = listener.address.clone();
|
||||
self.listeners.push(listener);
|
||||
return Ok(Async::Ready(Some(ListenersEvent::Incoming {
|
||||
upgrade,
|
||||
listen_addr,
|
||||
})));
|
||||
}
|
||||
Ok(Async::Ready(None)) => {
|
||||
return Ok(Async::Ready(Some(ListenersEvent::Closed {
|
||||
listen_addr: listener.address,
|
||||
listener: listener.listener,
|
||||
result: Ok(()),
|
||||
})));
|
||||
}
|
||||
Err(err) => {
|
||||
return Ok(Async::Ready(Some(ListenersEvent::Closed {
|
||||
listen_addr: listener.address,
|
||||
listener: listener.listener,
|
||||
result: Err(err),
|
||||
})));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We register the current task to be waken up if a new listener is added.
|
||||
self.to_notify = Some(task::current());
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TTrans> fmt::Debug for ListenersStream<TTrans>
|
||||
where
|
||||
TTrans: Transport + fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
f.debug_struct("ListenersStream")
|
||||
.field("transport", &self.transport)
|
||||
.field("listeners", &self.listeners().collect::<Vec<_>>())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<TTrans> fmt::Debug for ListenersEvent<TTrans>
|
||||
where
|
||||
TTrans: Transport,
|
||||
<TTrans::Listener as Stream>::Error: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
match self {
|
||||
ListenersEvent::Incoming {
|
||||
ref listen_addr, ..
|
||||
} => f
|
||||
.debug_struct("ListenersEvent::Incoming")
|
||||
.field("listen_addr", listen_addr)
|
||||
.finish(),
|
||||
ListenersEvent::Closed {
|
||||
ref listen_addr,
|
||||
ref result,
|
||||
..
|
||||
} => f
|
||||
.debug_struct("ListenersEvent::Closed")
|
||||
.field("listen_addr", listen_addr)
|
||||
.field("result", result)
|
||||
.finish(),
|
||||
}
|
||||
}
|
||||
}
|
24
core/src/nodes/mod.rs
Normal file
24
core/src/nodes/mod.rs
Normal file
@ -0,0 +1,24 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
pub mod collection;
|
||||
pub mod listeners;
|
||||
pub mod node;
|
||||
pub mod swarm;
|
355
core/src/nodes/node.rs
Normal file
355
core/src/nodes/node.rs
Normal file
@ -0,0 +1,355 @@
|
||||
// Copyright 2018 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use futures::{prelude::*, task};
|
||||
use muxing;
|
||||
use smallvec::SmallVec;
|
||||
use std::fmt;
|
||||
use std::io::Error as IoError;
|
||||
use std::sync::Arc;
|
||||
use Multiaddr;
|
||||
|
||||
// Implementor notes
|
||||
// =================
|
||||
//
|
||||
// In order to minimize the risk of bugs in higher-level code, we want to avoid as much as
|
||||
// possible having a racy API. The behaviour of methods should be well-defined and predictable.
|
||||
// As an example, calling the `multiaddr()` method should return `Some` only after a
|
||||
// `MultiaddrResolved` event has been emitted and never before, even if we technically already
|
||||
// know the address.
|
||||
//
|
||||
// In order to respect this coding practice, we should theoretically provide events such as "data
|
||||
// incoming on a substream", or "a substream is ready to be written". This would however make the
|
||||
// API of `NodeStream` really painful to use. Instead, we really want to provide an object that
|
||||
// implements the `AsyncRead` and `AsyncWrite` traits.
|
||||
//
|
||||
// This substream object raises the question of how to keep the `NodeStream` and the various
|
||||
// substreams in sync without exposing a racy API. The answer is that we don't. The state of the
|
||||
// node and the state of the substreams are totally detached, and they don't interact with each
|
||||
// other in any way. Destroying the `NodeStream` doesn't close the substreams, nor is there a
|
||||
// `close_substreams()` method or a "substream closed" event.
|
||||
|
||||
/// Implementation of `Stream` that handles a node.
|
||||
///
|
||||
/// The stream will receive substreams and can be used to open new outgoing substreams. Destroying
|
||||
/// the `NodeStream` will **not** close the existing substreams.
|
||||
///
|
||||
/// The stream will close once both the inbound and outbound channels are closed, and no more
|
||||
/// outbound substream attempt is pending.
|
||||
pub struct NodeStream<TMuxer, TAddrFut, TUserData>
|
||||
where
|
||||
TMuxer: muxing::StreamMuxer,
|
||||
{
|
||||
/// The muxer used to manage substreams.
|
||||
muxer: Arc<TMuxer>,
|
||||
/// If true, the inbound side of the muxer has closed earlier and should no longer be polled.
|
||||
inbound_finished: bool,
|
||||
/// If true, the outbound side of the muxer has closed earlier.
|
||||
outbound_finished: bool,
|
||||
/// Address of the node ; can be empty if the address hasn't been resolved yet.
|
||||
address: Addr<TAddrFut>,
|
||||
/// List of substreams we are currently opening.
|
||||
outbound_substreams: SmallVec<[(TUserData, TMuxer::OutboundSubstream); 8]>,
|
||||
/// Task to notify when a new element is added to `outbound_substreams`, so that we can start
|
||||
/// polling it.
|
||||
to_notify: Option<task::Task>,
|
||||
}
|
||||
|
||||
/// Address of the node.
|
||||
#[derive(Debug, Clone)]
|
||||
enum Addr<TAddrFut> {
|
||||
/// Future that will resolve the address.
|
||||
Future(TAddrFut),
|
||||
/// The address is now known.
|
||||
Resolved(Multiaddr),
|
||||
/// An error happened while resolving the future.
|
||||
Errored,
|
||||
}
|
||||
|
||||
/// A successfully opened substream.
|
||||
pub type Substream<TMuxer> = muxing::SubstreamRef<Arc<TMuxer>>;
|
||||
|
||||
/// Event that can happen on the `NodeStream`.
|
||||
pub enum NodeEvent<TMuxer, TUserData>
|
||||
where
|
||||
TMuxer: muxing::StreamMuxer,
|
||||
{
|
||||
/// The multiaddress future of the node has been resolved.
|
||||
///
|
||||
/// If this succeeded, after this event has been emitted calling `multiaddr()` will return
|
||||
/// `Some`.
|
||||
Multiaddr(Result<Multiaddr, IoError>),
|
||||
|
||||
/// A new inbound substream arrived.
|
||||
InboundSubstream {
|
||||
/// The newly-opened substream.
|
||||
substream: Substream<TMuxer>,
|
||||
},
|
||||
|
||||
/// An outbound substream has successfully been opened.
|
||||
OutboundSubstream {
|
||||
/// User data that has been passed to the `open_substream` method.
|
||||
user_data: TUserData,
|
||||
/// The newly-opened substream.
|
||||
substream: Substream<TMuxer>,
|
||||
},
|
||||
|
||||
/// An outbound substream couldn't be opened because the muxer is no longer capable of opening
|
||||
/// more substreams.
|
||||
OutboundClosed {
|
||||
/// User data that has been passed to the `open_substream` method.
|
||||
user_data: TUserData,
|
||||
},
|
||||
|
||||
/// The inbound side of the muxer has been closed. No more inbound substreams will be produced.
|
||||
InboundClosed,
|
||||
}
|
||||
|
||||
/// Identifier for a substream being opened.
|
||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct OutboundSubstreamId(usize);
|
||||
|
||||
impl<TMuxer, TAddrFut, TUserData> NodeStream<TMuxer, TAddrFut, TUserData>
|
||||
where
|
||||
TMuxer: muxing::StreamMuxer,
|
||||
TAddrFut: Future<Item = Multiaddr, Error = IoError>,
|
||||
{
|
||||
/// Creates a new node events stream.
|
||||
#[inline]
|
||||
pub fn new(muxer: TMuxer, multiaddr_future: TAddrFut) -> Self {
|
||||
NodeStream {
|
||||
muxer: Arc::new(muxer),
|
||||
inbound_finished: false,
|
||||
outbound_finished: false,
|
||||
address: Addr::Future(multiaddr_future),
|
||||
outbound_substreams: SmallVec::new(),
|
||||
to_notify: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the multiaddress of the node, if already known.
|
||||
///
|
||||
/// This method will always return `None` before a successful `Multiaddr` event has been
|
||||
/// returned by `poll()`, and will always return `Some` afterwards.
|
||||
#[inline]
|
||||
pub fn multiaddr(&self) -> Option<&Multiaddr> {
|
||||
match self.address {
|
||||
Addr::Resolved(ref addr) => Some(addr),
|
||||
Addr::Future(_) | Addr::Errored => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts the process of opening a new outbound substream.
|
||||
///
|
||||
/// Returns an error if the outbound side of the muxer is closed.
|
||||
///
|
||||
/// After calling this method, polling the stream should eventually produce either an
|
||||
/// `OutboundSubstream` event or an `OutboundClosed` event containing the user data that has
|
||||
/// been passed to this method.
|
||||
pub fn open_substream(&mut self, user_data: TUserData) -> Result<(), TUserData> {
|
||||
if self.outbound_finished {
|
||||
return Err(user_data);
|
||||
}
|
||||
|
||||
let raw = self.muxer.open_outbound();
|
||||
self.outbound_substreams.push((user_data, raw));
|
||||
|
||||
if let Some(task) = self.to_notify.take() {
|
||||
task.notify();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns true if the inbound channel of the muxer is closed.
|
||||
///
|
||||
/// If `true` is returned, then no more inbound substream will be produced.
|
||||
#[inline]
|
||||
pub fn is_inbound_closed(&self) -> bool {
|
||||
self.inbound_finished
|
||||
}
|
||||
|
||||
/// Returns true if the outbound channel of the muxer is closed.
|
||||
///
|
||||
/// If `true` is returned, then no more outbound substream can be opened. Calling
|
||||
/// `open_substream` will return an `Err`.
|
||||
#[inline]
|
||||
pub fn is_outbound_closed(&self) -> bool {
|
||||
self.outbound_finished
|
||||
}
|
||||
|
||||
/// Destroys the node stream and returns all the pending outbound substreams.
|
||||
pub fn close(mut self) -> Vec<TUserData> {
|
||||
let mut out = Vec::with_capacity(self.outbound_substreams.len());
|
||||
for (user_data, outbound) in self.outbound_substreams.drain() {
|
||||
out.push(user_data);
|
||||
self.muxer.destroy_outbound(outbound);
|
||||
}
|
||||
out
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMuxer, TAddrFut, TUserData> Stream for NodeStream<TMuxer, TAddrFut, TUserData>
|
||||
where
|
||||
TMuxer: muxing::StreamMuxer,
|
||||
TAddrFut: Future<Item = Multiaddr, Error = IoError>,
|
||||
{
|
||||
type Item = NodeEvent<TMuxer, TUserData>;
|
||||
type Error = IoError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
// Polling inbound substream.
|
||||
if !self.inbound_finished {
|
||||
match self.muxer.poll_inbound() {
|
||||
Ok(Async::Ready(Some(substream))) => {
|
||||
let substream = muxing::substream_from_ref(self.muxer.clone(), substream);
|
||||
return Ok(Async::Ready(Some(NodeEvent::InboundSubstream {
|
||||
substream,
|
||||
})));
|
||||
}
|
||||
Ok(Async::Ready(None)) => {
|
||||
self.inbound_finished = true;
|
||||
return Ok(Async::Ready(Some(NodeEvent::InboundClosed)));
|
||||
}
|
||||
Ok(Async::NotReady) => {}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
// Polling outbound substreams.
|
||||
// We remove each element from `outbound_substreams` one by one and add them back.
|
||||
for n in (0..self.outbound_substreams.len()).rev() {
|
||||
let (user_data, mut outbound) = self.outbound_substreams.swap_remove(n);
|
||||
match self.muxer.poll_outbound(&mut outbound) {
|
||||
Ok(Async::Ready(Some(substream))) => {
|
||||
let substream = muxing::substream_from_ref(self.muxer.clone(), substream);
|
||||
self.muxer.destroy_outbound(outbound);
|
||||
return Ok(Async::Ready(Some(NodeEvent::OutboundSubstream {
|
||||
user_data,
|
||||
substream,
|
||||
})));
|
||||
}
|
||||
Ok(Async::Ready(None)) => {
|
||||
self.outbound_finished = true;
|
||||
self.muxer.destroy_outbound(outbound);
|
||||
return Ok(Async::Ready(Some(NodeEvent::OutboundClosed { user_data })));
|
||||
}
|
||||
Ok(Async::NotReady) => {
|
||||
self.outbound_substreams.push((user_data, outbound));
|
||||
}
|
||||
Err(err) => {
|
||||
self.muxer.destroy_outbound(outbound);
|
||||
return Err(err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check whether the multiaddress is resolved.
|
||||
{
|
||||
let poll = match self.address {
|
||||
Addr::Future(ref mut fut) => Some(fut.poll()),
|
||||
Addr::Resolved(_) | Addr::Errored => None,
|
||||
};
|
||||
|
||||
match poll {
|
||||
Some(Ok(Async::NotReady)) | None => {}
|
||||
Some(Ok(Async::Ready(addr))) => {
|
||||
self.address = Addr::Resolved(addr.clone());
|
||||
return Ok(Async::Ready(Some(NodeEvent::Multiaddr(Ok(addr)))));
|
||||
}
|
||||
Some(Err(err)) => {
|
||||
self.address = Addr::Errored;
|
||||
return Ok(Async::Ready(Some(NodeEvent::Multiaddr(Err(err)))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Closing the node if there's no way we can do anything more.
|
||||
if self.inbound_finished && self.outbound_finished && self.outbound_substreams.is_empty() {
|
||||
return Ok(Async::Ready(None));
|
||||
}
|
||||
|
||||
// Nothing happened. Register our task to be notified and return.
|
||||
self.to_notify = Some(task::current());
|
||||
Ok(Async::NotReady)
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMuxer, TAddrFut, TUserData> fmt::Debug for NodeStream<TMuxer, TAddrFut, TUserData>
|
||||
where
|
||||
TMuxer: muxing::StreamMuxer,
|
||||
TAddrFut: Future<Item = Multiaddr, Error = IoError>,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
f.debug_struct("NodeStream")
|
||||
.field("address", &self.multiaddr())
|
||||
.field("inbound_finished", &self.inbound_finished)
|
||||
.field("outbound_finished", &self.outbound_finished)
|
||||
.field("outbound_substreams", &self.outbound_substreams.len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<TMuxer, TAddrFut, TUserData> Drop for NodeStream<TMuxer, TAddrFut, TUserData>
|
||||
where
|
||||
TMuxer: muxing::StreamMuxer,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
// The substreams that were produced will continue to work, as the muxer is held in an Arc.
|
||||
// However we will no longer process any further inbound or outbound substream, and we
|
||||
// therefore close everything.
|
||||
for (_, outbound) in self.outbound_substreams.drain() {
|
||||
self.muxer.destroy_outbound(outbound);
|
||||
}
|
||||
if !self.inbound_finished {
|
||||
self.muxer.close_inbound();
|
||||
}
|
||||
if !self.outbound_finished {
|
||||
self.muxer.close_outbound();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO:
|
||||
/*impl<TTrans> fmt::Debug for NodeEvent<TTrans>
|
||||
where TTrans: Transport,
|
||||
<TTrans::Listener as Stream>::Error: fmt::Debug,
|
||||
{
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
|
||||
match self {
|
||||
NodeEvent::Incoming { ref listen_addr, .. } => {
|
||||
f.debug_struct("NodeEvent::Incoming")
|
||||
.field("listen_addr", listen_addr)
|
||||
.finish()
|
||||
},
|
||||
NodeEvent::Closed { ref listen_addr, .. } => {
|
||||
f.debug_struct("NodeEvent::Closed")
|
||||
.field("listen_addr", listen_addr)
|
||||
.finish()
|
||||
},
|
||||
NodeEvent::Error { ref listen_addr, ref error, .. } => {
|
||||
f.debug_struct("NodeEvent::Error")
|
||||
.field("listen_addr", listen_addr)
|
||||
.field("error", error)
|
||||
.finish()
|
||||
},
|
||||
}
|
||||
}
|
||||
}*/
|
1047
core/src/nodes/swarm.rs
Normal file
1047
core/src/nodes/swarm.rs
Normal file
File diff suppressed because it is too large
Load Diff
@ -126,7 +126,7 @@ pub trait Transport {
|
||||
/// Turns this `Transport` into an abstract boxed transport.
|
||||
#[inline]
|
||||
fn boxed(self) -> boxed::Boxed<Self::Output>
|
||||
where Self: Sized + MuxedTransport + Clone + Send + Sync + 'static,
|
||||
where Self: Sized + Clone + Send + Sync + 'static,
|
||||
Self::Dial: Send + 'static,
|
||||
Self::Listener: Send + 'static,
|
||||
Self::ListenerUpgrade: Send + 'static,
|
||||
|
@ -512,6 +512,14 @@ where C: AsyncRead + AsyncWrite
|
||||
elem.substream_id() != substream.num || elem.endpoint() == Some(substream.endpoint)
|
||||
})
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn close_inbound(&self) {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn close_outbound(&self) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Active attempt to open an outbound substream.
|
||||
|
@ -105,6 +105,14 @@ where
|
||||
#[inline]
|
||||
fn destroy_substream(&self, _substream: Self::Substream) {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn close_inbound(&self) {
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn close_outbound(&self) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user