Allow configuring the tasks executor (#1391)

* Allow configuring the tasks executor

* Minor tweaks

* Add executor_fn

* Create ThreadsPool at the end if necessary

* Allow configuring the tasks executor

* Minor tweaks

* Add executor_fn

* Create ThreadsPool at the end if necessary

* WIP

* Don't depend on async-std and tokio in core

* Replace FutureObj with PinBoxFuture

* Some docs on Executor

* Fix tests

Co-authored-by: Toralf Wittner <tw@dtex.org>
This commit is contained in:
Pierre Krieger 2020-01-20 14:18:35 +01:00 committed by GitHub
parent 0ed684ee30
commit f89683419a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 107 additions and 35 deletions

View File

@ -43,6 +43,8 @@ mod keys_proto {
pub use multiaddr; pub use multiaddr;
pub type Negotiated<T> = futures::compat::Compat01As03<multistream_select::Negotiated<futures::compat::Compat<T>>>; pub type Negotiated<T> = futures::compat::Compat01As03<multistream_select::Negotiated<futures::compat::Compat<T>>>;
use std::{future::Future, pin::Pin};
mod peer_id; mod peer_id;
mod translation; mod translation;
@ -156,3 +158,32 @@ impl ConnectedPoint {
} }
} }
/// Implemented on objects that can run a `Future` in the background.
///
/// > **Note**: While it may be tempting to implement this trait on types such as
/// > [`futures::stream::FuturesUnordered`], please note that passing an `Executor` is
/// > optional, and that `FuturesUnordered` (or a similar struct) will automatically
/// > be used as fallback by libp2p. The `Executor` trait should therefore only be
/// > about running `Future`s in the background.
pub trait Executor {
/// Run the given future in the background until it ends.
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
}
impl<'a, T: ?Sized + Executor> Executor for &'a T {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
T::exec(&**self, f)
}
}
impl<'a, T: ?Sized + Executor> Executor for &'a mut T {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
T::exec(&**self, f)
}
}
impl<T: ?Sized + Executor> Executor for Box<T> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
T::exec(&**self, f)
}
}

View File

@ -19,6 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::{ use crate::{
Executor,
PeerId, PeerId,
muxing::StreamMuxer, muxing::StreamMuxer,
nodes::{ nodes::{
@ -306,11 +307,11 @@ where
TConnInfo: ConnectionInfo<PeerId = TPeerId>, TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash, TPeerId: Eq + Hash,
{ {
/// Creates a new empty collection. /// Creates a new empty collection. If `executor` is `Some`, uses the given executor to spawn
#[inline] /// tasks. Otherwise, runs tasks locally.
pub fn new() -> Self { pub fn new(executor: Option<Box<dyn Executor>>) -> Self {
CollectionStream { CollectionStream {
inner: tasks::Manager::new(), inner: tasks::Manager::new(executor),
nodes: Default::default(), nodes: Default::default(),
} }
} }

View File

@ -20,7 +20,7 @@
use crate::muxing::StreamMuxer; use crate::muxing::StreamMuxer;
use crate::{ use crate::{
ConnectedPoint, Multiaddr, PeerId, address_translation, ConnectedPoint, Executor, Multiaddr, PeerId, address_translation,
nodes::{ nodes::{
collection::{ collection::{
CollectionEvent, CollectionEvent,
@ -688,11 +688,11 @@ where
TPeerId: Eq + Hash + Clone, TPeerId: Eq + Hash + Clone,
{ {
/// Creates a new node events stream. /// Creates a new node events stream.
pub fn new(transport: TTrans, local_peer_id: TPeerId) -> Self { pub fn new(transport: TTrans, local_peer_id: TPeerId, executor: Option<Box<dyn Executor>>) -> Self {
// TODO: with_capacity? // TODO: with_capacity?
Network { Network {
listeners: ListenersStream::new(transport), listeners: ListenersStream::new(transport),
active_nodes: CollectionStream::new(), active_nodes: CollectionStream::new(executor),
reach_attempts: ReachAttempts { reach_attempts: ReachAttempts {
local_peer_id, local_peer_id,
out_reach_attempts: Default::default(), out_reach_attempts: Default::default(),
@ -706,12 +706,12 @@ where
/// Creates a new node event stream with incoming connections limit. /// Creates a new node event stream with incoming connections limit.
pub fn new_with_incoming_limit(transport: TTrans, pub fn new_with_incoming_limit(transport: TTrans,
local_peer_id: TPeerId, incoming_limit: Option<u32>) -> Self local_peer_id: TPeerId, executor: Option<Box<dyn Executor>>, incoming_limit: Option<u32>) -> Self
{ {
Network { Network {
incoming_limit, incoming_limit,
listeners: ListenersStream::new(transport), listeners: ListenersStream::new(transport),
active_nodes: CollectionStream::new(), active_nodes: CollectionStream::new(executor),
reach_attempts: ReachAttempts { reach_attempts: ReachAttempts {
local_peer_id, local_peer_id,
out_reach_attempts: Default::default(), out_reach_attempts: Default::default(),

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::{ use crate::{
PeerId, Executor, PeerId,
muxing::StreamMuxer, muxing::StreamMuxer,
nodes::{ nodes::{
handled_node::{HandledNode, IntoNodeHandler, NodeHandler}, handled_node::{HandledNode, IntoNodeHandler, NodeHandler},
@ -27,7 +27,7 @@ use crate::{
} }
}; };
use fnv::FnvHashMap; use fnv::FnvHashMap;
use futures::{prelude::*, channel::mpsc, executor::ThreadPool, stream::FuturesUnordered}; use futures::{prelude::*, channel::mpsc, stream::FuturesUnordered};
use std::{collections::hash_map::{Entry, OccupiedEntry}, error, fmt, pin::Pin, task::Context, task::Poll}; use std::{collections::hash_map::{Entry, OccupiedEntry}, error, fmt, pin::Pin, task::Context, task::Poll};
use super::{TaskId, task::{Task, FromTaskMessage, ToTaskMessage}, Error}; use super::{TaskId, task::{Task, FromTaskMessage, ToTaskMessage}, Error};
@ -63,9 +63,9 @@ pub struct Manager<I, O, H, E, HE, T, C = PeerId> {
/// Identifier for the next task to spawn. /// Identifier for the next task to spawn.
next_task_id: TaskId, next_task_id: TaskId,
/// Threads pool where we spawn the nodes' tasks. If `None`, then we push tasks to the /// Custom executor where we spawn the nodes' tasks. If `None`, then we push tasks to the
/// `local_spawns` list instead. /// `local_spawns` list instead.
threads_pool: Option<ThreadPool>, executor: Option<Box<dyn Executor>>,
/// If no executor is available, we move tasks to this set, and futures are polled on the /// If no executor is available, we move tasks to this set, and futures are polled on the
/// current thread instead. /// current thread instead.
@ -134,17 +134,14 @@ pub enum Event<'a, I, O, H, E, HE, T, C = PeerId> {
} }
impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> { impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> {
/// Creates a new task manager. /// Creates a new task manager. If `Some` is passed, uses the given executor to spawn tasks.
pub fn new() -> Self { /// Otherwise, background tasks are executed locally when you call `poll`.
pub fn new(executor: Option<Box<dyn Executor>>) -> Self {
let (tx, rx) = mpsc::channel(1); let (tx, rx) = mpsc::channel(1);
let threads_pool = ThreadPool::builder()
.name_prefix("libp2p-nodes-")
.create().ok();
Self { Self {
tasks: FnvHashMap::default(), tasks: FnvHashMap::default(),
next_task_id: TaskId(0), next_task_id: TaskId(0),
threads_pool, executor,
local_spawns: FuturesUnordered::new(), local_spawns: FuturesUnordered::new(),
events_tx: tx, events_tx: tx,
events_rx: rx events_rx: rx
@ -176,8 +173,8 @@ impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> {
self.tasks.insert(task_id, TaskInfo { sender: tx, user_data }); self.tasks.insert(task_id, TaskInfo { sender: tx, user_data });
let task = Box::pin(Task::new(task_id, self.events_tx.clone(), rx, future, handler)); let task = Box::pin(Task::new(task_id, self.events_tx.clone(), rx, future, handler));
if let Some(threads_pool) = &mut self.threads_pool { if let Some(executor) = &self.executor {
threads_pool.spawn_ok(task); executor.exec(task as Pin<Box<_>>)
} else { } else {
self.local_spawns.push(task); self.local_spawns.push(task);
} }
@ -212,8 +209,8 @@ impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> {
let task: Task<Pin<Box<futures::future::Pending<_>>>, _, _, _, _, _, _> = let task: Task<Pin<Box<futures::future::Pending<_>>>, _, _, _, _, _, _> =
Task::node(task_id, self.events_tx.clone(), rx, HandledNode::new(muxer, handler)); Task::node(task_id, self.events_tx.clone(), rx, HandledNode::new(muxer, handler));
if let Some(threads_pool) = &mut self.threads_pool { if let Some(executor) = &self.executor {
threads_pool.spawn_ok(Box::pin(task)); executor.exec(Box::pin(task))
} else { } else {
self.local_spawns.push(Box::pin(task)); self.local_spawns.push(Box::pin(task));
} }

View File

@ -91,14 +91,14 @@ where
fn deny_incoming_connec() { fn deny_incoming_connec() {
// Checks whether refusing an incoming connection on a swarm triggers the correct events. // Checks whether refusing an incoming connection on a swarm triggers the correct events.
let mut swarm1: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler<_>>, _> = { let mut swarm1: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler<_>>, _, _> = {
let local_key = identity::Keypair::generate_ed25519(); let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public(); let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new() let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1) .upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key)) .authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new()); .multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into()) Network::new(transport, local_public_key.into(), None)
}; };
let mut swarm2 = { let mut swarm2 = {
@ -108,7 +108,7 @@ fn deny_incoming_connec() {
.upgrade(upgrade::Version::V1) .upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key)) .authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new()); .multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into()) Network::new(transport, local_public_key.into(), None)
}; };
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
@ -177,7 +177,7 @@ fn dial_self() {
// negotiation to complete. // negotiation to complete.
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex)) util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
}); });
Network::new(transport, local_public_key.into()) Network::new(transport, local_public_key.into(), None)
}; };
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
@ -241,14 +241,14 @@ fn dial_self_by_id() {
// Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
// place. // place.
let mut swarm: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler<_>>, _> = { let mut swarm: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler<_>>, _, _> = {
let local_key = identity::Keypair::generate_ed25519(); let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public(); let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new() let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1) .upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key)) .authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new()); .multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into()) Network::new(transport, local_public_key.into(), None)
}; };
let peer_id = swarm.local_peer_id().clone(); let peer_id = swarm.local_peer_id().clone();
@ -266,7 +266,7 @@ fn multiple_addresses_err() {
.upgrade(upgrade::Version::V1) .upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key)) .authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new()); .multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into()) Network::new(transport, local_public_key.into(), None)
}; };
let mut addresses = Vec::new(); let mut addresses = Vec::new();

View File

@ -110,7 +110,7 @@ fn raw_swarm_simultaneous_connect() {
.upgrade(upgrade::Version::V1Lazy) .upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key)) .authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new()); .multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into_peer_id()) Network::new(transport, local_public_key.into_peer_id(), None)
}; };
let mut swarm2 = { let mut swarm2 = {
@ -120,7 +120,7 @@ fn raw_swarm_simultaneous_connect() {
.upgrade(upgrade::Version::V1Lazy) .upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key)) .authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new()); .multiplex(libp2p_mplex::MplexConfig::new());
Network::new(transport, local_public_key.into_peer_id()) Network::new(transport, local_public_key.into_peer_id(), None)
}; };
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();

View File

@ -78,8 +78,9 @@ pub use protocols_handler::{
}; };
use protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapperError}; use protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapperError};
use futures::prelude::*; use futures::{prelude::*, executor::{ThreadPool, ThreadPoolBuilder}};
use libp2p_core::{ use libp2p_core::{
Executor,
Transport, Multiaddr, Negotiated, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName, Transport, Multiaddr, Negotiated, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName,
muxing::StreamMuxer, muxing::StreamMuxer,
nodes::{ nodes::{
@ -571,6 +572,7 @@ impl<'a> PollParameters for SwarmPollParameters<'a> {
pub struct SwarmBuilder<TTransport, TBehaviour> { pub struct SwarmBuilder<TTransport, TBehaviour> {
incoming_limit: Option<u32>, incoming_limit: Option<u32>,
executor: Option<Box<dyn Executor>>,
local_peer_id: PeerId, local_peer_id: PeerId,
transport: TTransport, transport: TTransport,
behaviour: TBehaviour, behaviour: TBehaviour,
@ -610,6 +612,7 @@ where TBehaviour: NetworkBehaviour,
SwarmBuilder { SwarmBuilder {
incoming_limit: None, incoming_limit: None,
local_peer_id, local_peer_id,
executor: None,
transport, transport,
behaviour, behaviour,
} }
@ -620,6 +623,26 @@ where TBehaviour: NetworkBehaviour,
self self
} }
/// Sets the executor to use to spawn background tasks.
///
/// By default, uses a threads pool.
pub fn executor(mut self, executor: impl Executor + 'static) -> Self {
self.executor = Some(Box::new(executor));
self
}
/// Shortcut for calling `executor` with an object that calls the given closure.
pub fn executor_fn(mut self, executor: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + 'static) -> Self {
struct SpawnImpl<F>(F);
impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
(self.0)(f)
}
}
self.executor = Some(Box::new(SpawnImpl(executor)));
self
}
pub fn build(mut self) -> Swarm<TTransport, TBehaviour, TConnInfo> { pub fn build(mut self) -> Swarm<TTransport, TBehaviour, TConnInfo> {
let supported_protocols = self.behaviour let supported_protocols = self.behaviour
.new_handler() .new_handler()
@ -629,7 +652,27 @@ where TBehaviour: NetworkBehaviour,
.map(|info| info.protocol_name().to_vec()) .map(|info| info.protocol_name().to_vec())
.collect(); .collect();
let network = Network::new_with_incoming_limit(self.transport, self.local_peer_id, self.incoming_limit); let executor = self.executor.or_else(|| {
struct PoolWrapper(ThreadPool);
impl Executor for PoolWrapper {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
self.0.spawn_ok(f)
}
}
ThreadPoolBuilder::new()
.name_prefix("libp2p-task-")
.create()
.ok()
.map(|tp| Box::new(PoolWrapper(tp)) as Box<_>)
});
let network = Network::new_with_incoming_limit(
self.transport,
self.local_peer_id,
executor,
self.incoming_limit
);
ExpandedSwarm { ExpandedSwarm {
network, network,