diff --git a/core/Cargo.toml b/core/Cargo.toml index d5406632..401d3fc3 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -16,24 +16,24 @@ bytes = "0.4" ed25519-dalek = "1.0.0-pre.2" failure = "0.1" fnv = "1.0" +futures = { version = "0.3.1", features = ["compat", "io-compat", "executor", "thread-pool"] } futures-timer = "0.3" lazy_static = "1.2" +libsecp256k1 = { version = "0.3.1", optional = true } log = "0.4" multiaddr = { package = "parity-multiaddr", version = "0.5.0", path = "../misc/multiaddr" } multihash = { package = "parity-multihash", version = "0.1.4", path = "../misc/multihash" } multistream-select = { version = "0.6.0", path = "../misc/multistream-select" } -futures-preview = { version = "0.3.0-alpha.18", features = ["compat", "io-compat"] } parking_lot = "0.9.0" protobuf = "2.8" quick-error = "1.2" rand = "0.7" rw-stream-sink = { version = "0.1.1", path = "../misc/rw-stream-sink" } -libsecp256k1 = { version = "0.3.1", optional = true } sha2 = "0.8.0" smallvec = "0.6" -wasm-timer = "0.1" unsigned-varint = "0.2" void = "1" +wasm-timer = "0.1" zeroize = "1" [target.'cfg(not(any(target_os = "emscripten", target_os = "unknown")))'.dependencies] @@ -41,15 +41,15 @@ ring = { version = "0.16.9", features = ["alloc", "std"], default-features = fal untrusted = "0.7.0" [dev-dependencies] -async-std = "0.99" -libp2p-swarm = { version = "0.3.0", path = "../swarm" } -libp2p-tcp = { version = "0.13.0", path = "../transports/tcp" } +assert_matches = "1.3" +async-std = "1.0" libp2p-mplex = { version = "0.13.0", path = "../muxers/mplex" } libp2p-secio = { version = "0.13.0", path = "../protocols/secio" } -rand = "0.7.2" +libp2p-swarm = { version = "0.3.0", path = "../swarm" } +libp2p-tcp = { version = "0.13.0", path = "../transports/tcp" } quickcheck = "0.9.0" +rand = "0.7.2" wasm-timer = "0.2" -assert_matches = "1.3" [features] default = ["secp256k1"] diff --git a/core/src/peer_id.rs b/core/src/peer_id.rs index 9ebb6829..ae659238 100644 --- a/core/src/peer_id.rs +++ b/core/src/peer_id.rs @@ -28,7 +28,7 @@ use std::{convert::TryFrom, fmt, str::FromStr}; /// automatically used as the peer id using an identity multihash. // // Note: see `from_public_key` for how this value will be used in the future. -const MAX_INLINE_KEY_LENGTH: usize = 42; +const _MAX_INLINE_KEY_LENGTH: usize = 42; /// Identifier of a peer of the network. /// diff --git a/core/src/upgrade/transfer.rs b/core/src/upgrade/transfer.rs index 57a92f0e..28a9c298 100644 --- a/core/src/upgrade/transfer.rs +++ b/core/src/upgrade/transfer.rs @@ -115,7 +115,7 @@ pub async fn read_varint(socket: &mut (impl AsyncRead + Unpin)) -> Result **Note**: Assumes that a variable-length prefix indicates the length of the message. This is /// > compatible with what `write_one` does. pub async fn read_one(socket: &mut (impl AsyncRead + Unpin), max_size: usize) - -> Result, ReadOneError> + -> Result, ReadOneError> { let len = read_varint(socket).await?; if len > max_size { @@ -171,7 +171,6 @@ impl error::Error for ReadOneError { #[cfg(test)] mod tests { use super::*; - use std::io::{self, Cursor}; #[test] fn write_one_works() { @@ -181,7 +180,7 @@ mod tests { let mut out = vec![0; 10_000]; futures::executor::block_on( - write_one(&mut Cursor::new(&mut out[..]), data.clone()) + write_one(&mut futures::io::Cursor::new(&mut out[..]), data.clone()) ).unwrap(); let (out_len, out_data) = unsigned_varint::decode::usize(&out).unwrap(); diff --git a/misc/rw-stream-sink/Cargo.toml b/misc/rw-stream-sink/Cargo.toml index 0ed7701b..2d4709cf 100644 --- a/misc/rw-stream-sink/Cargo.toml +++ b/misc/rw-stream-sink/Cargo.toml @@ -10,4 +10,8 @@ keywords = ["networking"] categories = ["network-programming", "asynchronous"] [dependencies] -futures-preview = "0.3.0-alpha.18" +bytes = "0.4.12" +futures = "0.3.1" + +[dev-dependencies] +async-std = "1.0" diff --git a/misc/rw-stream-sink/src/lib.rs b/misc/rw-stream-sink/src/lib.rs index c4e6ec63..8bcdd3a3 100644 --- a/misc/rw-stream-sink/src/lib.rs +++ b/misc/rw-stream-sink/src/lib.rs @@ -18,179 +18,195 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! This crate provides the `RwStreamSink` type. It wraps around a `Stream + Sink` that produces -//! and accepts byte arrays, and implements `PollRead` and `PollWrite`. +//! This crate provides the [`RwStreamSink`] type. It wraps around a [`Stream`] +//! and [`Sink`] that produces and accepts byte arrays, and implements +//! [`AsyncRead`] and [`AsyncWrite`]. //! -//! Each call to `write()` will send one packet on the sink. Calls to `read()` will read from -//! incoming packets. -//! -//! > **Note**: Although this crate is hosted in the libp2p repo, it is purely a utility crate and -//! > not at all specific to libp2p. +//! Each call to [`AsyncWrite::poll_write`] will send one packet to the sink. +//! Calls to [`AsyncRead::read`] will read from the stream's incoming packets. -use futures::prelude::*; -use std::{cmp, io, pin::Pin, task::Context, task::Poll}; +use bytes::{IntoBuf, Buf}; +use futures::{prelude::*, ready}; +use std::{io, pin::Pin, task::{Context, Poll}}; -/// Wraps around a `Stream + Sink` whose items are buffers. Implements `AsyncRead` and `AsyncWrite`. -/// -/// The `B` generic is the type of buffers that the `Sink` accepts. The `I` generic is the type of -/// buffer that the `Stream` generates. -pub struct RwStreamSink { +/// Wraps a [`Stream`] and [`Sink`] whose items are buffers. +/// Implements [`AsyncRead`] and [`AsyncWrite`]. +pub struct RwStreamSink +where + S: TryStream, + ::Ok: IntoBuf +{ inner: S, - current_item: Option>, + current_item: Option<<::Ok as IntoBuf>::Buf> } -impl RwStreamSink { +impl RwStreamSink +where + S: TryStream, + ::Ok: IntoBuf +{ /// Wraps around `inner`. - pub fn new(inner: S) -> RwStreamSink { + pub fn new(inner: S) -> Self { RwStreamSink { inner, current_item: None } } } impl AsyncRead for RwStreamSink where - S: TryStream, Error = io::Error> + Unpin, + S: TryStream + Unpin, + ::Ok: IntoBuf { - fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { // Grab the item to copy from. - let current_item = loop { + let item_to_copy = loop { if let Some(ref mut i) = self.current_item { - if !i.is_empty() { - break i; + if i.has_remaining() { + break i } } - - self.current_item = Some(match TryStream::try_poll_next(Pin::new(&mut self.inner), cx) { - Poll::Ready(Some(Ok(i))) => i, - Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), - Poll::Ready(None) => return Poll::Ready(Ok(0)), // EOF - Poll::Pending => return Poll::Pending, + self.current_item = Some(match ready!(self.inner.try_poll_next_unpin(cx)) { + Some(Ok(i)) => i.into_buf(), + Some(Err(e)) => return Poll::Ready(Err(e)), + None => return Poll::Ready(Ok(0)) // EOF }); }; // Copy it! - debug_assert!(!current_item.is_empty()); - let to_copy = cmp::min(buf.len(), current_item.len()); - buf[..to_copy].copy_from_slice(¤t_item[..to_copy]); - for _ in 0..to_copy { current_item.remove(0); } + debug_assert!(item_to_copy.has_remaining()); + let to_copy = std::cmp::min(buf.len(), item_to_copy.remaining()); + item_to_copy.take(to_copy).copy_to_slice(&mut buf[.. to_copy]); Poll::Ready(Ok(to_copy)) } } impl AsyncWrite for RwStreamSink where - S: Stream + Sink, Error = io::Error> + Unpin, + S: TryStream + Sink<::Ok, Error = io::Error> + Unpin, + ::Ok: IntoBuf + for<'r> From<&'r [u8]> { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { - match Sink::poll_ready(Pin::new(&mut self.inner), cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)) - } - - let len = buf.len(); - match Sink::start_send(Pin::new(&mut self.inner), buf.into()) { - Ok(()) => Poll::Ready(Ok(len)), - Err(err) => Poll::Ready(Err(err)) + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + ready!(Pin::new(&mut self.inner).poll_ready(cx)?); + let n = buf.len(); + if let Err(e) = Pin::new(&mut self.inner).start_send(buf.into()) { + return Poll::Ready(Err(e)) } + Poll::Ready(Ok(n)) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Sink::poll_flush(Pin::new(&mut self.inner), cx) + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - Sink::poll_close(Pin::new(&mut self.inner), cx) + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.inner).poll_close(cx) } } -impl Unpin for RwStreamSink { -} +impl Unpin for RwStreamSink +where + S: TryStream, + ::Ok: IntoBuf +{} #[cfg(test)] mod tests { - use crate::RwStreamSink; - use futures::{prelude::*, stream, channel::mpsc::channel}; - use std::io::Read; + use async_std::task; + use bytes::Bytes; + use futures::{channel::mpsc, prelude::*, stream}; + use std::{pin::Pin, task::{Context, Poll}}; + use super::RwStreamSink; // This struct merges a stream and a sink and is quite useful for tests. struct Wrapper(St, Si); + impl Stream for Wrapper where - St: Stream, + St: Stream + Unpin, + Si: Unpin { type Item = St::Item; - type Error = St::Error; - fn poll(&mut self) -> Poll, Self::Error> { - self.0.poll() + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.0.poll_next_unpin(cx) } } - impl Sink for Wrapper + + impl Sink for Wrapper where - Si: Sink, + St: Unpin, + Si: Sink + Unpin, { - type SinkItem = Si::SinkItem; - type SinkError = Si::SinkError; - fn start_send( - &mut self, - item: Self::SinkItem, - ) -> StartSend { - self.1.start_send(item) + type Error = Si::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.1).poll_ready(cx) } - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.1.poll_complete() + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + Pin::new(&mut self.1).start_send(item) } - fn close(&mut self) -> Poll<(), Self::SinkError> { - self.1.close() + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.1).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + Pin::new(&mut self.1).poll_close(cx) } } #[test] fn basic_reading() { - let (tx1, _) = channel::>(10); - let (tx2, rx2) = channel(10); + let (tx1, _) = mpsc::channel::>(10); + let (mut tx2, rx2) = mpsc::channel(10); - let mut wrapper = RwStreamSink::new(Wrapper(rx2.map_err(|_| panic!()), tx1)); + let mut wrapper = RwStreamSink::new(Wrapper(rx2.map(Ok), tx1)); - tx2.send(Bytes::from("hel")) - .and_then(|tx| tx.send(Bytes::from("lo wor"))) - .and_then(|tx| tx.send(Bytes::from("ld"))) - .wait() - .unwrap(); + task::block_on(async move { + tx2.send(Bytes::from("hel")).await.unwrap(); + tx2.send(Bytes::from("lo wor")).await.unwrap(); + tx2.send(Bytes::from("ld")).await.unwrap(); + tx2.close().await.unwrap(); - let mut data = Vec::new(); - wrapper.read_to_end(&mut data).unwrap(); - assert_eq!(data, b"hello world"); + let mut data = Vec::new(); + wrapper.read_to_end(&mut data).await.unwrap(); + assert_eq!(data, b"hello world"); + }) } #[test] fn skip_empty_stream_items() { let data: Vec<&[u8]> = vec![b"", b"foo", b"", b"bar", b"", b"baz", b""]; - let mut rws = RwStreamSink::new(stream::iter_ok::<_, std::io::Error>(data)); + let mut rws = RwStreamSink::new(stream::iter(data).map(Ok)); let mut buf = [0; 9]; - assert_eq!(3, rws.read(&mut buf).unwrap()); - assert_eq!(3, rws.read(&mut buf[3..]).unwrap()); - assert_eq!(3, rws.read(&mut buf[6..]).unwrap()); - assert_eq!(0, rws.read(&mut buf).unwrap()); - assert_eq!(b"foobarbaz", &buf[..]); + task::block_on(async move { + assert_eq!(3, rws.read(&mut buf).await.unwrap()); + assert_eq!(3, rws.read(&mut buf[3..]).await.unwrap()); + assert_eq!(3, rws.read(&mut buf[6..]).await.unwrap()); + assert_eq!(0, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"foobarbaz", &buf[..]) + }) } #[test] fn partial_read() { let data: Vec<&[u8]> = vec![b"hell", b"o world"]; - let mut rws = RwStreamSink::new(stream::iter_ok::<_, std::io::Error>(data)); + let mut rws = RwStreamSink::new(stream::iter(data).map(Ok)); let mut buf = [0; 3]; - assert_eq!(3, rws.read(&mut buf).unwrap()); - assert_eq!(b"hel", &buf[..3]); - assert_eq!(0, rws.read(&mut buf[..0]).unwrap()); - assert_eq!(1, rws.read(&mut buf).unwrap()); - assert_eq!(b"l", &buf[..1]); - assert_eq!(3, rws.read(&mut buf).unwrap()); - assert_eq!(b"o w", &buf[..3]); - assert_eq!(0, rws.read(&mut buf[..0]).unwrap()); - assert_eq!(3, rws.read(&mut buf).unwrap()); - assert_eq!(b"orl", &buf[..3]); - assert_eq!(1, rws.read(&mut buf).unwrap()); - assert_eq!(b"d", &buf[..1]); - assert_eq!(0, rws.read(&mut buf).unwrap()); + task::block_on(async move { + assert_eq!(3, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"hel", &buf[..3]); + assert_eq!(0, rws.read(&mut buf[..0]).await.unwrap()); + assert_eq!(1, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"l", &buf[..1]); + assert_eq!(3, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"o w", &buf[..3]); + assert_eq!(0, rws.read(&mut buf[..0]).await.unwrap()); + assert_eq!(3, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"orl", &buf[..3]); + assert_eq!(1, rws.read(&mut buf).await.unwrap()); + assert_eq!(b"d", &buf[..1]); + assert_eq!(0, rws.read(&mut buf).await.unwrap()); + }) } } diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 29c2bb24..f4656e8d 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -12,11 +12,11 @@ categories = ["network-programming", "asynchronous"] [dependencies] bytes = "0.4.5" fnv = "1.0" -futures_codec = "0.3.0" -futures-preview = "0.3.0-alpha.18" +futures = "0.3.1" +futures_codec = "0.3.1" libp2p-core = { version = "0.13.0", path = "../../core" } log = "0.4" -parking_lot = "0.8" +parking_lot = "0.9" unsigned-varint = { version = "0.2.3", features = ["futures-codec"] } [dev-dependencies] diff --git a/protocols/secio/Cargo.toml b/protocols/secio/Cargo.toml index a06495ea..80808c65 100644 --- a/protocols/secio/Cargo.toml +++ b/protocols/secio/Cargo.toml @@ -10,21 +10,21 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -bytes = "0.4" -futures-preview = "0.3.0-alpha.18" -futures_codec = "0.3.0" +aes-ctr = "0.3" +aesni = { version = "0.6", features = ["nocheck"], optional = true } +bytes = "0.4.12" +ctr = "0.3" +futures = "0.3.1" +futures_codec = "0.3.1" +hmac = "0.7.0" +lazy_static = "1.2.0" libp2p-core = { version = "0.13.0", path = "../../core" } log = "0.4.6" protobuf = "2.8" rand = "0.6.5" -aes-ctr = "0.3" -aesni = { version = "0.6", features = ["nocheck"], optional = true } -twofish = "0.2.0" -ctr = "0.3" -lazy_static = "1.2.0" rw-stream-sink = { version = "0.1.1", path = "../../misc/rw-stream-sink" } sha2 = "0.8.0" -hmac = "0.7.0" +twofish = "0.2.0" unsigned-varint = { version = "0.2.3", features = ["futures-codec"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] @@ -44,8 +44,8 @@ secp256k1 = [] aes-all = ["aesni"] [dev-dependencies] -async-std = "0.99" -criterion = "0.3.0" +async-std = "1.0" +criterion = "0.3" libp2p-mplex = { version = "0.13.0", path = "../../muxers/mplex" } libp2p-tcp = { version = "0.13.0", path = "../../transports/tcp" } diff --git a/protocols/secio/src/lib.rs b/protocols/secio/src/lib.rs index 9a0a103a..205198d9 100644 --- a/protocols/secio/src/lib.rs +++ b/protocols/secio/src/lib.rs @@ -148,7 +148,7 @@ impl SecioConfig { /// Output of the secio protocol. pub struct SecioOutput where - S: AsyncRead + AsyncWrite, + S: AsyncRead + AsyncWrite + Unpin { /// The encrypted stream. pub stream: RwStreamSink, fn(SecioError) -> io::Error>>, diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 131c46be..55535691 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -10,7 +10,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -futures-preview = "0.3.0-alpha.18" +futures = "0.3.1" libp2p-core = { version = "0.13.0", path = "../core" } smallvec = "0.6" wasm-timer = "0.2" diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index b3e7aac3..12244e13 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -10,11 +10,11 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -async-std = "0.99" -bytes = "0.4" +async-std = "1.0" +bytes = "0.4.12" +futures = "0.3.1" +futures-timer = "2.0" get_if_addrs = "0.5.3" ipnet = "2.0.0" libp2p-core = { version = "0.13.0", path = "../../core" } log = "0.4.1" -futures-preview = "0.3.0-alpha.18" -futures-timer = "0.3" diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index a9e7740e..6de747ea 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -37,10 +37,7 @@ //! documentation of `swarm` and of libp2p in general to learn how to use the `Transport` trait. use async_std::net::TcpStream; -use futures::{ - future::{self, Ready}, - prelude::*, -}; +use futures::{future::{self, Ready}, prelude::*}; use futures_timer::Delay; use get_if_addrs::{IfAddr, get_if_addrs}; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; @@ -449,7 +446,7 @@ impl Drop for TcpTransStream { mod tests { use futures::prelude::*; use libp2p_core::{Transport, multiaddr::{Multiaddr, Protocol}, transport::ListenerEvent}; - use std::{net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration}; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use super::{multiaddr_to_socketaddr, TcpConfig}; #[test]