Replaced tokio_time::Deadline with tokio::timer::Timeout (#432)

This commit is contained in:
Eugene Chupriyanov
2018-08-28 16:38:08 +03:00
committed by Toralf Wittner
parent c77b1f5a0a
commit f5ce93c730
4 changed files with 19 additions and 18 deletions

View File

@ -23,7 +23,7 @@ rand = "0.4.2"
smallvec = "0.5" smallvec = "0.5"
tokio-codec = "0.1" tokio-codec = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
tokio-timer = "0.2" tokio-timer = "0.2.6"
unsigned-varint = { version = "0.1", features = ["codec"] } unsigned-varint = { version = "0.1", features = ["codec"] }
[dev-dependencies] [dev-dependencies]

View File

@ -30,8 +30,8 @@ use smallvec::SmallVec;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::mem; use std::mem;
use std::time::{Duration, Instant}; use std::time::Duration;
use tokio_timer::Deadline; use tokio_timer::Timeout;
/// Prototype for a future Kademlia protocol running on a socket. /// Prototype for a future Kademlia protocol running on a socket.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -331,7 +331,7 @@ where F: FnMut(&PeerId) -> Fut + 'a,
.and_then(move |controller| { .and_then(move |controller| {
controller.find_node(&searched_key2) controller.find_node(&searched_key2)
}); });
let with_deadline = Deadline::new(current_attempt, Instant::now() + request_timeout) let with_deadline = Timeout::new(current_attempt, request_timeout)
.map_err(|err| { .map_err(|err| {
if let Some(err) = err.into_inner() { if let Some(err) = err.into_inner() {
err err

View File

@ -8,4 +8,4 @@ license = "MIT"
futures = "0.1" futures = "0.1"
libp2p-core = { path = "../core" } libp2p-core = { path = "../core" }
log = "0.4.1" log = "0.4.1"
tokio-timer = "0.2.0" tokio-timer = "0.2.6"

View File

@ -33,8 +33,9 @@ extern crate tokio_timer;
use futures::{Async, Future, Poll, Stream}; use futures::{Async, Future, Poll, Stream};
use libp2p_core::{Multiaddr, MuxedTransport, Transport}; use libp2p_core::{Multiaddr, MuxedTransport, Transport};
use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::time::{Duration, Instant}; use std::time::Duration;
use tokio_timer::{Deadline, DeadlineError}; use tokio_timer::Timeout;
use tokio_timer::timeout::Error as TimeoutError;
/// Wraps around a `Transport` and adds a timeout to all the incoming and outgoing connections. /// Wraps around a `Transport` and adds a timeout to all the incoming and outgoing connections.
/// ///
@ -86,8 +87,8 @@ where
type Output = InnerTrans::Output; type Output = InnerTrans::Output;
type MultiaddrFuture = InnerTrans::MultiaddrFuture; type MultiaddrFuture = InnerTrans::MultiaddrFuture;
type Listener = TimeoutListener<InnerTrans::Listener>; type Listener = TimeoutListener<InnerTrans::Listener>;
type ListenerUpgrade = TokioTimerMapErr<Deadline<InnerTrans::ListenerUpgrade>>; type ListenerUpgrade = TokioTimerMapErr<Timeout<InnerTrans::ListenerUpgrade>>;
type Dial = TokioTimerMapErr<Deadline<InnerTrans::Dial>>; type Dial = TokioTimerMapErr<Timeout<InnerTrans::Dial>>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
match self.inner.listen_on(addr) { match self.inner.listen_on(addr) {
@ -114,7 +115,7 @@ where
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> { fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
match self.inner.dial(addr) { match self.inner.dial(addr) {
Ok(dial) => Ok(TokioTimerMapErr { Ok(dial) => Ok(TokioTimerMapErr {
inner: Deadline::new(dial, Instant::now() + self.outgoing_timeout), inner: Timeout::new(dial, self.outgoing_timeout),
}), }),
Err((inner, addr)) => { Err((inner, addr)) => {
let transport = TransportTimeout { let transport = TransportTimeout {
@ -139,7 +140,7 @@ where
InnerTrans: MuxedTransport, InnerTrans: MuxedTransport,
{ {
type Incoming = TimeoutIncoming<InnerTrans::Incoming>; type Incoming = TimeoutIncoming<InnerTrans::Incoming>;
type IncomingUpgrade = TokioTimerMapErr<Deadline<InnerTrans::IncomingUpgrade>>; type IncomingUpgrade = TokioTimerMapErr<Timeout<InnerTrans::IncomingUpgrade>>;
#[inline] #[inline]
fn next_incoming(self) -> Self::Incoming { fn next_incoming(self) -> Self::Incoming {
@ -161,14 +162,14 @@ impl<InnerStream> Stream for TimeoutListener<InnerStream>
where where
InnerStream: Stream, InnerStream: Stream,
{ {
type Item = TokioTimerMapErr<Deadline<InnerStream::Item>>; type Item = TokioTimerMapErr<Timeout<InnerStream::Item>>;
type Error = InnerStream::Error; type Error = InnerStream::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let inner_fut = try_ready!(self.inner.poll()); let inner_fut = try_ready!(self.inner.poll());
if let Some(inner_fut) = inner_fut { if let Some(inner_fut) = inner_fut {
let fut = TokioTimerMapErr { let fut = TokioTimerMapErr {
inner: Deadline::new(inner_fut, Instant::now() + self.timeout), inner: Timeout::new(inner_fut, self.timeout),
}; };
Ok(Async::Ready(Some(fut))) Ok(Async::Ready(Some(fut)))
} else { } else {
@ -188,19 +189,19 @@ impl<InnerFut> Future for TimeoutIncoming<InnerFut>
where where
InnerFut: Future, InnerFut: Future,
{ {
type Item = TokioTimerMapErr<Deadline<InnerFut::Item>>; type Item = TokioTimerMapErr<Timeout<InnerFut::Item>>;
type Error = InnerFut::Error; type Error = InnerFut::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner_fut = try_ready!(self.inner.poll()); let inner_fut = try_ready!(self.inner.poll());
let fut = TokioTimerMapErr { let fut = TokioTimerMapErr {
inner: Deadline::new(inner_fut, Instant::now() + self.timeout), inner: Timeout::new(inner_fut, self.timeout),
}; };
Ok(Async::Ready(fut)) Ok(Async::Ready(fut))
} }
} }
/// Wraps around a `Future`. Turns the error type from `DeadlineError<IoError>` to `IoError`. /// Wraps around a `Future`. Turns the error type from `TimeoutError<IoError>` to `IoError`.
// TODO: can be replaced with `impl Future` once `impl Trait` are fully stable in Rust // TODO: can be replaced with `impl Future` once `impl Trait` are fully stable in Rust
// (https://github.com/rust-lang/rust/issues/34511) // (https://github.com/rust-lang/rust/issues/34511)
pub struct TokioTimerMapErr<InnerFut> { pub struct TokioTimerMapErr<InnerFut> {
@ -209,13 +210,13 @@ pub struct TokioTimerMapErr<InnerFut> {
impl<InnerFut> Future for TokioTimerMapErr<InnerFut> impl<InnerFut> Future for TokioTimerMapErr<InnerFut>
where where
InnerFut: Future<Error = DeadlineError<IoError>>, InnerFut: Future<Error = TimeoutError<IoError>>,
{ {
type Item = InnerFut::Item; type Item = InnerFut::Item;
type Error = IoError; type Error = IoError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().map_err(|err: DeadlineError<IoError>| { self.inner.poll().map_err(|err: TimeoutError<IoError>| {
if err.is_inner() { if err.is_inner() {
err.into_inner().expect("ensured by is_inner()") err.into_inner().expect("ensured by is_inner()")
} else if err.is_elapsed() { } else if err.is_elapsed() {