Upgrade libp2p-kad to stable futures (#1254)

* Upgrade libp2p-kad to stable futures

* Fix comment
This commit is contained in:
Pierre Krieger
2019-09-26 10:11:16 +02:00
committed by GitHub
parent d7e9ba473b
commit 7f5868472d
5 changed files with 177 additions and 140 deletions

View File

@ -14,7 +14,8 @@ arrayvec = "0.4.7"
bytes = "0.4" bytes = "0.4"
either = "1.5" either = "1.5"
fnv = "1.0" fnv = "1.0"
futures = "0.1" futures_codec = "0.2"
futures-preview = "0.3.0-alpha.18"
log = "0.4" log = "0.4"
libp2p-core = { version = "0.12.0", path = "../../core" } libp2p-core = { version = "0.12.0", path = "../../core" }
libp2p-swarm = { version = "0.2.0", path = "../../swarm" } libp2p-swarm = { version = "0.2.0", path = "../../swarm" }
@ -24,9 +25,7 @@ protobuf = "2.3"
rand = "0.6.0" rand = "0.6.0"
sha2 = "0.8.0" sha2 = "0.8.0"
smallvec = "0.6" smallvec = "0.6"
tokio-codec = "0.1" wasm-timer = "0.2"
tokio-io = "0.1"
wasm-timer = "0.1"
uint = "0.8" uint = "0.8"
unsigned-varint = { git = "https://github.com/tomaka/unsigned-varint", branch = "futures-codec", features = ["codec"] } unsigned-varint = { git = "https://github.com/tomaka/unsigned-varint", branch = "futures-codec", features = ["codec"] }
void = "1.0" void = "1.0"
@ -37,4 +36,3 @@ libp2p-tcp = { version = "0.12.0", path = "../../transports/tcp" }
libp2p-yamux = { version = "0.12.0", path = "../../muxers/yamux" } libp2p-yamux = { version = "0.12.0", path = "../../muxers/yamux" }
quickcheck = "0.8" quickcheck = "0.8"
rand = "0.6.0" rand = "0.6.0"
tokio = "0.1"

View File

@ -39,7 +39,7 @@ use smallvec::SmallVec;
use std::{borrow::Cow, error, iter, marker::PhantomData, time::Duration}; use std::{borrow::Cow, error, iter, marker::PhantomData, time::Duration};
use std::collections::VecDeque; use std::collections::VecDeque;
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use tokio_io::{AsyncRead, AsyncWrite}; use std::task::{Context, Poll};
use wasm_timer::Instant; use wasm_timer::Instant;
/// Network behaviour that handles Kademlia. /// Network behaviour that handles Kademlia.
@ -1010,7 +1010,7 @@ where
impl<TSubstream, TStore> NetworkBehaviour for Kademlia<TSubstream, TStore> impl<TSubstream, TStore> NetworkBehaviour for Kademlia<TSubstream, TStore>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
for<'a> TStore: RecordStore<'a>, for<'a> TStore: RecordStore<'a>,
{ {
type ProtocolsHandler = KademliaHandler<TSubstream, QueryId>; type ProtocolsHandler = KademliaHandler<TSubstream, QueryId>;
@ -1304,7 +1304,7 @@ where
}; };
} }
fn poll(&mut self, parameters: &mut impl PollParameters) -> Async< fn poll(&mut self, cx: &mut Context, parameters: &mut impl PollParameters) -> Poll<
NetworkBehaviourAction< NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent, <Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent, Self::OutEvent,
@ -1319,7 +1319,7 @@ where
if let Some(mut job) = self.add_provider_job.take() { if let Some(mut job) = self.add_provider_job.take() {
let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity); let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
for _ in 0 .. num { for _ in 0 .. num {
if let Async::Ready(r) = job.poll(&mut self.store, now) { if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
self.start_add_provider(r.key, AddProviderContext::Republish) self.start_add_provider(r.key, AddProviderContext::Republish)
} else { } else {
break break
@ -1333,7 +1333,7 @@ where
if let Some(mut job) = self.put_record_job.take() { if let Some(mut job) = self.put_record_job.take() {
let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity); let num = usize::min(JOBS_MAX_NEW_QUERIES, jobs_query_capacity);
for _ in 0 .. num { for _ in 0 .. num {
if let Async::Ready(r) = job.poll(&mut self.store, now) { if let Poll::Ready(r) = job.poll(cx, &mut self.store, now) {
let context = if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) { let context = if r.publisher.as_ref() == Some(self.kbuckets.local_key().preimage()) {
PutRecordContext::Republish PutRecordContext::Republish
} else { } else {
@ -1350,7 +1350,7 @@ where
loop { loop {
// Drain queued events first. // Drain queued events first.
if let Some(event) = self.queued_events.pop_front() { if let Some(event) = self.queued_events.pop_front() {
return Async::Ready(event); return Poll::Ready(event);
} }
// Drain applied pending entries from the routing table. // Drain applied pending entries from the routing table.
@ -1361,7 +1361,7 @@ where
addresses: value, addresses: value,
old_peer: entry.evicted.map(|n| n.key.into_preimage()) old_peer: entry.evicted.map(|n| n.key.into_preimage())
}; };
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
} }
// Look for a finished query. // Look for a finished query.
@ -1369,12 +1369,12 @@ where
match self.queries.poll(now) { match self.queries.poll(now) {
QueryPoolState::Finished(q) => { QueryPoolState::Finished(q) => {
if let Some(event) = self.query_finished(q, parameters) { if let Some(event) = self.query_finished(q, parameters) {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
} }
} }
QueryPoolState::Timeout(q) => { QueryPoolState::Timeout(q) => {
if let Some(event) = self.query_timeout(q) { if let Some(event) = self.query_timeout(q) {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event)) return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
} }
} }
QueryPoolState::Waiting(Some((query, peer_id))) => { QueryPoolState::Waiting(Some((query, peer_id))) => {
@ -1406,7 +1406,7 @@ where
// If no new events have been queued either, signal `NotReady` to // If no new events have been queued either, signal `NotReady` to
// be polled again later. // be polled again later.
if self.queued_events.is_empty() { if self.queued_events.is_empty() {
return Async::NotReady return Poll::Pending
} }
} }
} }

View File

@ -36,8 +36,7 @@ use libp2p_core::{
upgrade::{self, InboundUpgrade, OutboundUpgrade, Negotiated} upgrade::{self, InboundUpgrade, OutboundUpgrade, Negotiated}
}; };
use log::trace; use log::trace;
use std::{borrow::Cow, error, fmt, io, time::Duration}; use std::{borrow::Cow, error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use wasm_timer::Instant; use wasm_timer::Instant;
/// Protocol handler that handles Kademlia communications with the remote. /// Protocol handler that handles Kademlia communications with the remote.
@ -48,7 +47,7 @@ use wasm_timer::Instant;
/// It also handles requests made by the remote. /// It also handles requests made by the remote.
pub struct KademliaHandler<TSubstream, TUserData> pub struct KademliaHandler<TSubstream, TUserData>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
{ {
/// Configuration for the Kademlia protocol. /// Configuration for the Kademlia protocol.
config: KademliaProtocolConfig, config: KademliaProtocolConfig,
@ -69,7 +68,7 @@ where
/// State of an active substream, opened either by us or by the remote. /// State of an active substream, opened either by us or by the remote.
enum SubstreamState<TSubstream, TUserData> enum SubstreamState<TSubstream, TUserData>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
{ {
/// We haven't started opening the outgoing substream yet. /// We haven't started opening the outgoing substream yet.
/// Contains the request we want to send, and the user data if we expect an answer. /// Contains the request we want to send, and the user data if we expect an answer.
@ -103,29 +102,29 @@ where
impl<TSubstream, TUserData> SubstreamState<TSubstream, TUserData> impl<TSubstream, TUserData> SubstreamState<TSubstream, TUserData>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
{ {
/// Consumes this state and tries to close the substream. /// Tries to close the substream.
/// ///
/// If the substream is not ready to be closed, returns it back. /// If the substream is not ready to be closed, returns it back.
fn try_close(self) -> AsyncSink<Self> { fn try_close(&mut self, cx: &mut Context) -> Poll<()> {
match self { match self {
SubstreamState::OutPendingOpen(_, _) SubstreamState::OutPendingOpen(_, _)
| SubstreamState::OutReportError(_, _) => AsyncSink::Ready, | SubstreamState::OutReportError(_, _) => Poll::Ready(()),
SubstreamState::OutPendingSend(mut stream, _, _) SubstreamState::OutPendingSend(ref mut stream, _, _)
| SubstreamState::OutPendingFlush(mut stream, _) | SubstreamState::OutPendingFlush(ref mut stream, _)
| SubstreamState::OutWaitingAnswer(mut stream, _) | SubstreamState::OutWaitingAnswer(ref mut stream, _)
| SubstreamState::OutClosing(mut stream) => match stream.close() { | SubstreamState::OutClosing(ref mut stream) => match Sink::poll_close(Pin::new(stream), cx) {
Ok(Async::Ready(())) | Err(_) => AsyncSink::Ready, Poll::Ready(_) => Poll::Ready(()),
Ok(Async::NotReady) => AsyncSink::NotReady(SubstreamState::OutClosing(stream)), Poll::Pending => Poll::Pending,
}, },
SubstreamState::InWaitingMessage(_, mut stream) SubstreamState::InWaitingMessage(_, ref mut stream)
| SubstreamState::InWaitingUser(_, mut stream) | SubstreamState::InWaitingUser(_, ref mut stream)
| SubstreamState::InPendingSend(_, mut stream, _) | SubstreamState::InPendingSend(_, ref mut stream, _)
| SubstreamState::InPendingFlush(_, mut stream) | SubstreamState::InPendingFlush(_, ref mut stream)
| SubstreamState::InClosing(mut stream) => match stream.close() { | SubstreamState::InClosing(ref mut stream) => match Sink::poll_close(Pin::new(stream), cx) {
Ok(Async::Ready(())) | Err(_) => AsyncSink::Ready, Poll::Ready(_) => Poll::Ready(()),
Ok(Async::NotReady) => AsyncSink::NotReady(SubstreamState::InClosing(stream)), Poll::Pending => Poll::Pending,
}, },
} }
} }
@ -382,7 +381,7 @@ struct UniqueConnecId(u64);
impl<TSubstream, TUserData> KademliaHandler<TSubstream, TUserData> impl<TSubstream, TUserData> KademliaHandler<TSubstream, TUserData>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
{ {
/// Create a `KademliaHandler` that only allows sending messages to the remote but denying /// Create a `KademliaHandler` that only allows sending messages to the remote but denying
/// incoming connections. /// incoming connections.
@ -418,7 +417,7 @@ where
impl<TSubstream, TUserData> Default for KademliaHandler<TSubstream, TUserData> impl<TSubstream, TUserData> Default for KademliaHandler<TSubstream, TUserData>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
{ {
#[inline] #[inline]
fn default() -> Self { fn default() -> Self {
@ -428,7 +427,7 @@ where
impl<TSubstream, TUserData> ProtocolsHandler for KademliaHandler<TSubstream, TUserData> impl<TSubstream, TUserData> ProtocolsHandler for KademliaHandler<TSubstream, TUserData>
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
TUserData: Clone, TUserData: Clone,
{ {
type InEvent = KademliaHandlerIn<TUserData>; type InEvent = KademliaHandlerIn<TUserData>;
@ -485,7 +484,10 @@ where
_ => false, _ => false,
}); });
if let Some(pos) = pos { if let Some(pos) = pos {
let _ = self.substreams.remove(pos).try_close(); // TODO: we don't properly close down the substream
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
let _ = self.substreams.remove(pos).try_close(&mut cx);
} }
} }
KademliaHandlerIn::FindNodeReq { key, user_data } => { KademliaHandlerIn::FindNodeReq { key, user_data } => {
@ -639,22 +641,22 @@ where
fn poll( fn poll(
&mut self, &mut self,
cx: &mut Context,
) -> Poll< ) -> Poll<
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>, ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>,
io::Error,
> { > {
// We remove each element from `substreams` one by one and add them back. // We remove each element from `substreams` one by one and add them back.
for n in (0..self.substreams.len()).rev() { for n in (0..self.substreams.len()).rev() {
let mut substream = self.substreams.swap_remove(n); let mut substream = self.substreams.swap_remove(n);
loop { loop {
match advance_substream(substream, self.config.clone()) { match advance_substream(substream, self.config.clone(), cx) {
(Some(new_state), Some(event), _) => { (Some(new_state), Some(event), _) => {
self.substreams.push(new_state); self.substreams.push(new_state);
return Ok(Async::Ready(event)); return Poll::Ready(event);
} }
(None, Some(event), _) => { (None, Some(event), _) => {
return Ok(Async::Ready(event)); return Poll::Ready(event);
} }
(Some(new_state), None, false) => { (Some(new_state), None, false) => {
self.substreams.push(new_state); self.substreams.push(new_state);
@ -677,7 +679,7 @@ where
self.keep_alive = KeepAlive::Yes; self.keep_alive = KeepAlive::Yes;
} }
Ok(Async::NotReady) Poll::Pending
} }
} }
@ -688,6 +690,7 @@ where
fn advance_substream<TSubstream, TUserData>( fn advance_substream<TSubstream, TUserData>(
state: SubstreamState<TSubstream, TUserData>, state: SubstreamState<TSubstream, TUserData>,
upgrade: KademliaProtocolConfig, upgrade: KademliaProtocolConfig,
cx: &mut Context,
) -> ( ) -> (
Option<SubstreamState<TSubstream, TUserData>>, Option<SubstreamState<TSubstream, TUserData>>,
Option< Option<
@ -695,12 +698,13 @@ fn advance_substream<TSubstream, TUserData>(
KademliaProtocolConfig, KademliaProtocolConfig,
(KadRequestMsg, Option<TUserData>), (KadRequestMsg, Option<TUserData>),
KademliaHandlerEvent<TUserData>, KademliaHandlerEvent<TUserData>,
io::Error,
>, >,
>, >,
bool, bool,
) )
where where
TSubstream: AsyncRead + AsyncWrite, TSubstream: AsyncRead + AsyncWrite + Unpin,
{ {
match state { match state {
SubstreamState::OutPendingOpen(msg, user_data) => { SubstreamState::OutPendingOpen(msg, user_data) => {
@ -711,17 +715,14 @@ where
(None, Some(ev), false) (None, Some(ev), false)
} }
SubstreamState::OutPendingSend(mut substream, msg, user_data) => { SubstreamState::OutPendingSend(mut substream, msg, user_data) => {
match substream.start_send(msg) { match Sink::poll_ready(Pin::new(&mut substream), cx) {
Ok(AsyncSink::Ready) => ( Poll::Ready(Ok(())) => {
match Sink::start_send(Pin::new(&mut substream), msg) {
Ok(()) => (
Some(SubstreamState::OutPendingFlush(substream, user_data)), Some(SubstreamState::OutPendingFlush(substream, user_data)),
None, None,
true, true,
), ),
Ok(AsyncSink::NotReady(msg)) => (
Some(SubstreamState::OutPendingSend(substream, msg, user_data)),
None,
false,
),
Err(error) => { Err(error) => {
let event = if let Some(user_data) = user_data { let event = if let Some(user_data) = user_data {
Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError { Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
@ -732,13 +733,32 @@ where
None None
}; };
(None, event, false)
}
}
},
Poll::Pending => (
Some(SubstreamState::OutPendingSend(substream, msg, user_data)),
None,
false,
),
Poll::Ready(Err(error)) => {
let event = if let Some(user_data) = user_data {
Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error),
user_data
}))
} else {
None
};
(None, event, false) (None, event, false)
} }
} }
} }
SubstreamState::OutPendingFlush(mut substream, user_data) => { SubstreamState::OutPendingFlush(mut substream, user_data) => {
match substream.poll_complete() { match Sink::poll_flush(Pin::new(&mut substream), cx) {
Ok(Async::Ready(())) => { Poll::Ready(Ok(())) => {
if let Some(user_data) = user_data { if let Some(user_data) = user_data {
( (
Some(SubstreamState::OutWaitingAnswer(substream, user_data)), Some(SubstreamState::OutWaitingAnswer(substream, user_data)),
@ -749,12 +769,12 @@ where
(Some(SubstreamState::OutClosing(substream)), None, true) (Some(SubstreamState::OutClosing(substream)), None, true)
} }
} }
Ok(Async::NotReady) => ( Poll::Pending => (
Some(SubstreamState::OutPendingFlush(substream, user_data)), Some(SubstreamState::OutPendingFlush(substream, user_data)),
None, None,
false, false,
), ),
Err(error) => { Poll::Ready(Err(error)) => {
let event = if let Some(user_data) = user_data { let event = if let Some(user_data) = user_data {
Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError { Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error), error: KademliaHandlerQueryErr::Io(error),
@ -768,8 +788,8 @@ where
} }
} }
} }
SubstreamState::OutWaitingAnswer(mut substream, user_data) => match substream.poll() { SubstreamState::OutWaitingAnswer(mut substream, user_data) => match Stream::poll_next(Pin::new(&mut substream), cx) {
Ok(Async::Ready(Some(msg))) => { Poll::Ready(Some(Ok(msg))) => {
let new_state = SubstreamState::OutClosing(substream); let new_state = SubstreamState::OutClosing(substream);
let event = process_kad_response(msg, user_data); let event = process_kad_response(msg, user_data);
( (
@ -778,19 +798,19 @@ where
true, true,
) )
} }
Ok(Async::NotReady) => ( Poll::Pending => (
Some(SubstreamState::OutWaitingAnswer(substream, user_data)), Some(SubstreamState::OutWaitingAnswer(substream, user_data)),
None, None,
false, false,
), ),
Err(error) => { Poll::Ready(Some(Err(error))) => {
let event = KademliaHandlerEvent::QueryError { let event = KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(error), error: KademliaHandlerQueryErr::Io(error),
user_data, user_data,
}; };
(None, Some(ProtocolsHandlerEvent::Custom(event)), false) (None, Some(ProtocolsHandlerEvent::Custom(event)), false)
} }
Ok(Async::Ready(None)) => { Poll::Ready(None) => {
let event = KademliaHandlerEvent::QueryError { let event = KademliaHandlerEvent::QueryError {
error: KademliaHandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()), error: KademliaHandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()),
user_data, user_data,
@ -802,13 +822,13 @@ where
let event = KademliaHandlerEvent::QueryError { error, user_data }; let event = KademliaHandlerEvent::QueryError { error, user_data };
(None, Some(ProtocolsHandlerEvent::Custom(event)), false) (None, Some(ProtocolsHandlerEvent::Custom(event)), false)
} }
SubstreamState::OutClosing(mut stream) => match stream.close() { SubstreamState::OutClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) {
Ok(Async::Ready(())) => (None, None, false), Poll::Ready(Ok(())) => (None, None, false),
Ok(Async::NotReady) => (Some(SubstreamState::OutClosing(stream)), None, false), Poll::Pending => (Some(SubstreamState::OutClosing(stream)), None, false),
Err(_) => (None, None, false), Poll::Ready(Err(_)) => (None, None, false),
}, },
SubstreamState::InWaitingMessage(id, mut substream) => match substream.poll() { SubstreamState::InWaitingMessage(id, mut substream) => match Stream::poll_next(Pin::new(&mut substream), cx) {
Ok(Async::Ready(Some(msg))) => { Poll::Ready(Some(Ok(msg))) => {
if let Ok(ev) = process_kad_request(msg, id) { if let Ok(ev) = process_kad_request(msg, id) {
( (
Some(SubstreamState::InWaitingUser(id, substream)), Some(SubstreamState::InWaitingUser(id, substream)),
@ -819,16 +839,16 @@ where
(Some(SubstreamState::InClosing(substream)), None, true) (Some(SubstreamState::InClosing(substream)), None, true)
} }
} }
Ok(Async::NotReady) => ( Poll::Pending => (
Some(SubstreamState::InWaitingMessage(id, substream)), Some(SubstreamState::InWaitingMessage(id, substream)),
None, None,
false, false,
), ),
Ok(Async::Ready(None)) => { Poll::Ready(None) => {
trace!("Inbound substream: EOF"); trace!("Inbound substream: EOF");
(None, None, false) (None, None, false)
} }
Err(e) => { Poll::Ready(Some(Err(e))) => {
trace!("Inbound substream error: {:?}", e); trace!("Inbound substream error: {:?}", e);
(None, None, false) (None, None, false)
}, },
@ -838,36 +858,39 @@ where
None, None,
false, false,
), ),
SubstreamState::InPendingSend(id, mut substream, msg) => match substream.start_send(msg) { SubstreamState::InPendingSend(id, mut substream, msg) => match Sink::poll_ready(Pin::new(&mut substream), cx) {
Ok(AsyncSink::Ready) => ( Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) {
Ok(()) => (
Some(SubstreamState::InPendingFlush(id, substream)), Some(SubstreamState::InPendingFlush(id, substream)),
None, None,
true, true,
), ),
Ok(AsyncSink::NotReady(msg)) => ( Err(_) => (None, None, false),
},
Poll::Pending => (
Some(SubstreamState::InPendingSend(id, substream, msg)), Some(SubstreamState::InPendingSend(id, substream, msg)),
None, None,
false, false,
), ),
Err(_) => (None, None, false), Poll::Ready(Err(_)) => (None, None, false),
}, }
SubstreamState::InPendingFlush(id, mut substream) => match substream.poll_complete() { SubstreamState::InPendingFlush(id, mut substream) => match Sink::poll_flush(Pin::new(&mut substream), cx) {
Ok(Async::Ready(())) => ( Poll::Ready(Ok(())) => (
Some(SubstreamState::InWaitingMessage(id, substream)), Some(SubstreamState::InWaitingMessage(id, substream)),
None, None,
true, true,
), ),
Ok(Async::NotReady) => ( Poll::Pending => (
Some(SubstreamState::InPendingFlush(id, substream)), Some(SubstreamState::InPendingFlush(id, substream)),
None, None,
false, false,
), ),
Err(_) => (None, None, false), Poll::Ready(Err(_)) => (None, None, false),
}, },
SubstreamState::InClosing(mut stream) => match stream.close() { SubstreamState::InClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) {
Ok(Async::Ready(())) => (None, None, false), Poll::Ready(Ok(())) => (None, None, false),
Ok(Async::NotReady) => (Some(SubstreamState::InClosing(stream)), None, false), Poll::Pending => (Some(SubstreamState::InClosing(stream)), None, false),
Err(_) => (None, None, false), Poll::Ready(Err(_)) => (None, None, false),
}, },
} }
} }

View File

@ -65,6 +65,8 @@ use crate::record::{self, Record, ProviderRecord, store::RecordStore};
use libp2p_core::PeerId; use libp2p_core::PeerId;
use futures::prelude::*; use futures::prelude::*;
use std::collections::HashSet; use std::collections::HashSet;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use std::vec; use std::vec;
use wasm_timer::{Instant, Delay}; use wasm_timer::{Instant, Delay};
@ -96,16 +98,18 @@ impl<T> PeriodicJob<T> {
/// Cuts short the remaining delay, if the job is currently waiting /// Cuts short the remaining delay, if the job is currently waiting
/// for the delay to expire. /// for the delay to expire.
fn asap(&mut self) { fn asap(&mut self) {
if let PeriodicJobState::Waiting(delay) = &mut self.state { if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
delay.reset(Instant::now() - Duration::from_secs(1)) let new_deadline = Instant::now() - Duration::from_secs(1);
*deadline = new_deadline;
delay.reset_at(new_deadline);
} }
} }
/// Returns `true` if the job is currently not running but ready /// Returns `true` if the job is currently not running but ready
/// to be run, `false` otherwise. /// to be run, `false` otherwise.
fn is_ready(&mut self, now: Instant) -> bool { fn is_ready(&mut self, cx: &mut Context, now: Instant) -> bool {
if let PeriodicJobState::Waiting(delay) = &mut self.state { if let PeriodicJobState::Waiting(delay, deadline) = &mut self.state {
if now >= delay.deadline() || delay.poll().map(|a| a.is_ready()).unwrap_or(false) { if now >= *deadline || !Future::poll(Pin::new(delay), cx).is_pending() {
return true return true
} }
} }
@ -117,7 +121,7 @@ impl<T> PeriodicJob<T> {
#[derive(Debug)] #[derive(Debug)]
enum PeriodicJobState<T> { enum PeriodicJobState<T> {
Running(T), Running(T),
Waiting(Delay) Waiting(Delay, Instant)
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -143,7 +147,8 @@ impl PutRecordJob {
record_ttl: Option<Duration>, record_ttl: Option<Duration>,
) -> Self { ) -> Self {
let now = Instant::now(); let now = Instant::now();
let delay = Delay::new(now + replicate_interval); let deadline = now + replicate_interval;
let delay = Delay::new_at(deadline);
let next_publish = publish_interval.map(|i| now + i); let next_publish = publish_interval.map(|i| now + i);
Self { Self {
local_id, local_id,
@ -153,7 +158,7 @@ impl PutRecordJob {
skipped: HashSet::new(), skipped: HashSet::new(),
inner: PeriodicJob { inner: PeriodicJob {
interval: replicate_interval, interval: replicate_interval,
state: PeriodicJobState::Waiting(delay) state: PeriodicJobState::Waiting(delay, deadline)
} }
} }
} }
@ -185,11 +190,11 @@ impl PutRecordJob {
/// Must be called in the context of a task. When `NotReady` is returned, /// Must be called in the context of a task. When `NotReady` is returned,
/// the current task is registered to be notified when the job is ready /// the current task is registered to be notified when the job is ready
/// to be run. /// to be run.
pub fn poll<T>(&mut self, store: &mut T, now: Instant) -> Async<Record> pub fn poll<T>(&mut self, cx: &mut Context, store: &mut T, now: Instant) -> Poll<Record>
where where
for<'a> T: RecordStore<'a> for<'a> T: RecordStore<'a>
{ {
if self.inner.is_ready(now) { if self.inner.is_ready(cx, now) {
let publish = self.next_publish.map_or(false, |t_pub| now >= t_pub); let publish = self.next_publish.map_or(false, |t_pub| now >= t_pub);
let records = store.records() let records = store.records()
.filter_map(|r| { .filter_map(|r| {
@ -224,7 +229,7 @@ impl PutRecordJob {
if r.is_expired(now) { if r.is_expired(now) {
store.remove(&r.key) store.remove(&r.key)
} else { } else {
return Async::Ready(r) return Poll::Ready(r)
} }
} else { } else {
break break
@ -232,12 +237,13 @@ impl PutRecordJob {
} }
// Wait for the next run. // Wait for the next run.
let delay = Delay::new(now + self.inner.interval); let deadline = now + self.inner.interval;
self.inner.state = PeriodicJobState::Waiting(delay); let delay = Delay::new_at(deadline);
assert!(!self.inner.is_ready(now)); self.inner.state = PeriodicJobState::Waiting(delay, deadline);
assert!(!self.inner.is_ready(cx, now));
} }
Async::NotReady Poll::Pending
} }
} }
@ -256,7 +262,10 @@ impl AddProviderJob {
Self { Self {
inner: PeriodicJob { inner: PeriodicJob {
interval, interval,
state: PeriodicJobState::Waiting(Delay::new(now + interval)) state: {
let deadline = now + interval;
PeriodicJobState::Waiting(Delay::new_at(deadline), deadline)
}
} }
} }
} }
@ -279,11 +288,11 @@ impl AddProviderJob {
/// Must be called in the context of a task. When `NotReady` is returned, /// Must be called in the context of a task. When `NotReady` is returned,
/// the current task is registered to be notified when the job is ready /// the current task is registered to be notified when the job is ready
/// to be run. /// to be run.
pub fn poll<T>(&mut self, store: &mut T, now: Instant) -> Async<ProviderRecord> pub fn poll<T>(&mut self, cx: &mut Context, store: &mut T, now: Instant) -> Poll<ProviderRecord>
where where
for<'a> T: RecordStore<'a> for<'a> T: RecordStore<'a>
{ {
if self.inner.is_ready(now) { if self.inner.is_ready(cx, now) {
let records = store.provided() let records = store.provided()
.map(|r| r.into_owned()) .map(|r| r.into_owned())
.collect::<Vec<_>>() .collect::<Vec<_>>()
@ -297,19 +306,20 @@ impl AddProviderJob {
if r.is_expired(now) { if r.is_expired(now) {
store.remove_provider(&r.key, &r.provider) store.remove_provider(&r.key, &r.provider)
} else { } else {
return Async::Ready(r) return Poll::Ready(r)
} }
} else { } else {
break break
} }
} }
let delay = Delay::new(now + self.inner.interval); let deadline = now + self.inner.interval;
self.inner.state = PeriodicJobState::Waiting(delay); let delay = Delay::new_at(deadline);
assert!(!self.inner.is_ready(now)); self.inner.state = PeriodicJobState::Waiting(delay, deadline);
assert!(!self.inner.is_ready(cx, now));
} }
Async::NotReady Poll::Pending
} }
} }
@ -360,11 +370,11 @@ mod tests {
// All (non-expired) records in the store must be yielded by the job. // All (non-expired) records in the store must be yielded by the job.
for r in store.records().map(|r| r.into_owned()).collect::<Vec<_>>() { for r in store.records().map(|r| r.into_owned()).collect::<Vec<_>>() {
if !r.is_expired(now) { if !r.is_expired(now) {
assert_eq!(job.poll(&mut store, now), Async::Ready(r)); assert_eq!(job.poll(&mut store, now), Poll::Ready(r));
assert!(job.is_running()); assert!(job.is_running());
} }
} }
assert_eq!(job.poll(&mut store, now), Async::NotReady); assert_eq!(job.poll(&mut store, now), Poll::Pending);
assert!(!job.is_running()); assert!(!job.is_running());
} }
@ -390,11 +400,11 @@ mod tests {
// All (non-expired) records in the store must be yielded by the job. // All (non-expired) records in the store must be yielded by the job.
for r in store.provided().map(|r| r.into_owned()).collect::<Vec<_>>() { for r in store.provided().map(|r| r.into_owned()).collect::<Vec<_>>() {
if !r.is_expired(now) { if !r.is_expired(now) {
assert_eq!(job.poll(&mut store, now), Async::Ready(r)); assert_eq!(job.poll(&mut store, now), Poll::Ready(r));
assert!(job.is_running()); assert!(job.is_running());
} }
} }
assert_eq!(job.poll(&mut store, now), Async::NotReady); assert_eq!(job.poll(&mut store, now), Poll::Pending);
assert!(!job.is_running()); assert!(!job.is_running());
} }

View File

@ -34,14 +34,13 @@ use bytes::BytesMut;
use codec::UviBytes; use codec::UviBytes;
use crate::protobuf_structs::dht as proto; use crate::protobuf_structs::dht as proto;
use crate::record::{self, Record}; use crate::record::{self, Record};
use futures::{future::{self, FutureResult}, sink, stream, Sink, Stream}; use futures::prelude::*;
use futures_codec::Framed;
use libp2p_core::{Multiaddr, PeerId}; use libp2p_core::{Multiaddr, PeerId};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated}; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, Negotiated};
use protobuf::{self, Message}; use protobuf::{self, Message};
use std::{borrow::Cow, convert::TryFrom, time::Duration}; use std::{borrow::Cow, convert::TryFrom, time::Duration};
use std::{io, iter}; use std::{io, iter};
use tokio_codec::Framed;
use tokio_io::{AsyncRead, AsyncWrite};
use unsigned_varint::codec; use unsigned_varint::codec;
use wasm_timer::Instant; use wasm_timer::Instant;
@ -176,10 +175,10 @@ impl UpgradeInfo for KademliaProtocolConfig {
impl<C> InboundUpgrade<C> for KademliaProtocolConfig impl<C> InboundUpgrade<C> for KademliaProtocolConfig
where where
C: AsyncRead + AsyncWrite, C: AsyncRead + AsyncWrite + Unpin,
{ {
type Output = KadInStreamSink<Negotiated<C>>; type Output = KadInStreamSink<Negotiated<C>>;
type Future = FutureResult<Self::Output, io::Error>; type Future = future::Ready<Result<Self::Output, io::Error>>;
type Error = io::Error; type Error = io::Error;
#[inline] #[inline]
@ -189,14 +188,17 @@ where
future::ok( future::ok(
Framed::new(incoming, codec) Framed::new(incoming, codec)
.from_err() .err_into()
.with::<_, fn(_) -> _, _>(|response| { .with::<_, _, fn(_) -> _, _>(|response| {
let proto_struct = resp_msg_to_proto(response); let proto_struct = resp_msg_to_proto(response);
proto_struct.write_to_bytes().map_err(invalid_data) future::ready(proto_struct.write_to_bytes().map_err(invalid_data))
}) })
.and_then::<fn(_) -> _, _>(|bytes| { .and_then::<_, fn(_) -> _>(|bytes| {
let request = protobuf::parse_from_bytes(&bytes)?; let request = match protobuf::parse_from_bytes(&bytes) {
proto_to_req_msg(request) Ok(r) => r,
Err(err) => return future::ready(Err(err.into()))
};
future::ready(proto_to_req_msg(request))
}), }),
) )
} }
@ -204,10 +206,10 @@ where
impl<C> OutboundUpgrade<C> for KademliaProtocolConfig impl<C> OutboundUpgrade<C> for KademliaProtocolConfig
where where
C: AsyncRead + AsyncWrite, C: AsyncRead + AsyncWrite + Unpin,
{ {
type Output = KadOutStreamSink<Negotiated<C>>; type Output = KadOutStreamSink<Negotiated<C>>;
type Future = FutureResult<Self::Output, io::Error>; type Future = future::Ready<Result<Self::Output, io::Error>>;
type Error = io::Error; type Error = io::Error;
#[inline] #[inline]
@ -217,14 +219,17 @@ where
future::ok( future::ok(
Framed::new(incoming, codec) Framed::new(incoming, codec)
.from_err() .err_into()
.with::<_, fn(_) -> _, _>(|request| { .with::<_, _, fn(_) -> _, _>(|request| {
let proto_struct = req_msg_to_proto(request); let proto_struct = req_msg_to_proto(request);
proto_struct.write_to_bytes().map_err(invalid_data) future::ready(proto_struct.write_to_bytes().map_err(invalid_data))
}) })
.and_then::<fn(_) -> _, _>(|bytes| { .and_then::<_, fn(_) -> _>(|bytes| {
let response = protobuf::parse_from_bytes(&bytes)?; let response = match protobuf::parse_from_bytes(&bytes) {
proto_to_resp_msg(response) Ok(r) => r,
Err(err) => return future::ready(Err(err.into()))
};
future::ready(proto_to_resp_msg(response))
}), }),
) )
} }
@ -238,13 +243,14 @@ pub type KadOutStreamSink<S> = KadStreamSink<S, KadRequestMsg, KadResponseMsg>;
pub type KadStreamSink<S, A, B> = stream::AndThen< pub type KadStreamSink<S, A, B> = stream::AndThen<
sink::With< sink::With<
stream::FromErr<Framed<S, UviBytes<Vec<u8>>>, io::Error>, stream::ErrInto<Framed<S, UviBytes<Vec<u8>>>, io::Error>,
Vec<u8>,
A, A,
fn(A) -> Result<Vec<u8>, io::Error>, future::Ready<Result<Vec<u8>, io::Error>>,
Result<Vec<u8>, io::Error>, fn(A) -> future::Ready<Result<Vec<u8>, io::Error>>,
>, >,
fn(BytesMut) -> Result<B, io::Error>, future::Ready<Result<B, io::Error>>,
Result<B, io::Error>, fn(BytesMut) -> future::Ready<Result<B, io::Error>>,
>; >;
/// Request that we can send to a peer or that we received from a peer. /// Request that we can send to a peer or that we received from a peer.