Address some review comments on #1328

This commit is contained in:
Pierre Krieger
2020-01-02 12:59:10 +01:00
parent ff0d2d55bd
commit 23fc6ee524
2 changed files with 26 additions and 4 deletions

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use crate::{muxing::StreamMuxer, ProtocolName, transport::ListenerEvent};
use futures::prelude::*;
use futures::{prelude::*, io::{IoSlice, IoSliceMut}};
use pin_project::{pin_project, project};
use std::{fmt, io::{Error as IoError}, pin::Pin, task::Context, task::Poll};
@ -77,6 +77,17 @@ where
EitherOutput::Second(b) => AsyncRead::poll_read(b, cx, buf),
}
}
#[project]
fn poll_read_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &mut [IoSliceMut])
-> Poll<Result<usize, IoError>>
{
#[project]
match self.project() {
EitherOutput::First(a) => AsyncRead::poll_read_vectored(a, cx, bufs),
EitherOutput::Second(b) => AsyncRead::poll_read_vectored(b, cx, bufs),
}
}
}
impl<A, B> AsyncWrite for EitherOutput<A, B>
@ -93,6 +104,17 @@ where
}
}
#[project]
fn poll_write_vectored(self: Pin<&mut Self>, cx: &mut Context, bufs: &[IoSlice])
-> Poll<Result<usize, IoError>>
{
#[project]
match self.project() {
EitherOutput::First(a) => AsyncWrite::poll_write_vectored(a, cx, bufs),
EitherOutput::Second(b) => AsyncWrite::poll_write_vectored(b, cx, bufs),
}
}
#[project]
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), IoError>> {
#[project]

View File

@ -27,7 +27,7 @@ use crate::{
}
};
use fnv::FnvHashMap;
use futures::{prelude::*, channel::mpsc, executor::ThreadPool, stream::FuturesUnordered, task::SpawnExt as _};
use futures::{prelude::*, channel::mpsc, executor::ThreadPool, stream::FuturesUnordered};
use std::{collections::hash_map::{Entry, OccupiedEntry}, error, fmt, pin::Pin, task::Context, task::Poll};
use super::{TaskId, task::{Task, FromTaskMessage, ToTaskMessage}, Error};
@ -177,7 +177,7 @@ impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> {
let task = Box::pin(Task::new(task_id, self.events_tx.clone(), rx, future, handler));
if let Some(threads_pool) = &mut self.threads_pool {
threads_pool.spawn(task).expect("spawning a task on a thread pool never fails; qed");
threads_pool.spawn_ok(task);
} else {
self.local_spawns.push(task);
}
@ -213,7 +213,7 @@ impl<I, O, H, E, HE, T, C> Manager<I, O, H, E, HE, T, C> {
Task::node(task_id, self.events_tx.clone(), rx, HandledNode::new(muxer, handler));
if let Some(threads_pool) = &mut self.threads_pool {
threads_pool.spawn(Box::pin(task)).expect("spawning a task on a threads pool never fails; qed");
threads_pool.spawn_ok(Box::pin(task));
} else {
self.local_spawns.push(Box::pin(task));
}