Switch MemoryTransport to Vec<u8> and fix tests (#1274)

This commit is contained in:
Pierre Krieger
2019-10-21 15:14:31 +00:00
committed by GitHub
parent abe2f2afc1
commit b1f31111e0
7 changed files with 127 additions and 129 deletions

View File

@ -48,10 +48,8 @@ libp2p-mplex = { version = "0.12.0", path = "../muxers/mplex" }
libp2p-secio = { version = "0.12.0", path = "../protocols/secio" }
rand = "0.6"
quickcheck = "0.8"
tokio = "0.1"
wasm-timer = "0.1"
wasm-timer = "0.2"
assert_matches = "1.3"
tokio-mock-task = "0.1"
[features]
default = ["secp256k1"]

View File

@ -51,7 +51,8 @@ use std::{collections::VecDeque, fmt, pin::Pin};
/// listeners.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
///
/// // The `listeners` will now generate events when polled.
/// let future = listeners.for_each(move |event| {
/// futures::executor::block_on(async move {
/// while let Some(event) = listeners.next().await {
/// match event {
/// ListenersEvent::NewAddress { listener_id, listen_addr } => {
/// println!("Listener {:?} is listening at address {}", listener_id, listen_addr);
@ -71,12 +72,9 @@ use std::{collections::VecDeque, fmt, pin::Pin};
/// // program you probably want to use it!
/// drop(upgrade);
/// },
/// };
///
/// Ok(())
/// });
///
/// tokio::run(future.map_err(|_| ()));
/// }
/// }
/// })
/// # }
/// ```
pub struct ListenersStream<TTrans>
@ -358,7 +356,6 @@ mod tests {
use super::*;
use crate::transport::{self, ListenerEvent};
use assert_matches::assert_matches;
use tokio::runtime::current_thread::Runtime;
use std::{io, iter::FromIterator};
use futures::{future::{self}, stream};
use crate::PeerId;

View File

@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use crate::{Transport, transport::{TransportError, ListenerEvent}};
use bytes::{Bytes, IntoBuf};
use bytes::IntoBuf;
use fnv::FnvHashMap;
use futures::{future::{self, Ready}, prelude::*, channel::mpsc, task::Context, task::Poll};
use lazy_static::lazy_static;
@ -29,7 +29,7 @@ use rw_stream_sink::RwStreamSink;
use std::{collections::hash_map::Entry, error, fmt, io, num::NonZeroU64, pin::Pin};
lazy_static! {
static ref HUB: Mutex<FnvHashMap<NonZeroU64, mpsc::Sender<Channel<Bytes>>>> =
static ref HUB: Mutex<FnvHashMap<NonZeroU64, mpsc::Sender<Channel<Vec<u8>>>>> =
Mutex::new(FnvHashMap::default());
}
@ -39,13 +39,13 @@ pub struct MemoryTransport;
/// Connection to a `MemoryTransport` currently being opened.
pub struct DialFuture {
sender: mpsc::Sender<Channel<Bytes>>,
channel_to_send: Option<Channel<Bytes>>,
channel_to_return: Option<Channel<Bytes>>,
sender: mpsc::Sender<Channel<Vec<u8>>>,
channel_to_send: Option<Channel<Vec<u8>>>,
channel_to_return: Option<Channel<Vec<u8>>>,
}
impl Future for DialFuture {
type Output = Result<Channel<Bytes>, MemoryTransportError>;
type Output = Result<Channel<Vec<u8>>, MemoryTransportError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.sender.poll_ready(cx) {
@ -67,7 +67,7 @@ impl Future for DialFuture {
}
impl Transport for MemoryTransport {
type Output = Channel<Bytes>;
type Output = Channel<Vec<u8>>;
type Error = MemoryTransportError;
type Listener = Listener;
type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
@ -168,13 +168,13 @@ pub struct Listener {
/// The address we are listening on.
addr: Multiaddr,
/// Receives incoming connections.
receiver: mpsc::Receiver<Channel<Bytes>>,
receiver: mpsc::Receiver<Channel<Vec<u8>>>,
/// Generate `ListenerEvent::NewAddress` to inform about our listen address.
tell_listen_addr: bool
}
impl Stream for Listener {
type Item = Result<ListenerEvent<Ready<Result<Channel<Bytes>, MemoryTransportError>>>, MemoryTransportError>;
type Item = Result<ListenerEvent<Ready<Result<Channel<Vec<u8>>, MemoryTransportError>>>, MemoryTransportError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if self.tell_listen_addr {
@ -230,7 +230,7 @@ pub type Channel<T> = RwStreamSink<Chan<T>>;
/// A channel represents an established, in-memory, logical connection between two endpoints.
///
/// Implements `Sink` and `Stream`.
pub struct Chan<T = Bytes> {
pub struct Chan<T = Vec<u8>> {
incoming: mpsc::Receiver<T>,
outgoing: mpsc::Sender<T>,
}

View File

@ -34,7 +34,7 @@ use libp2p_swarm::{
protocols_handler::NodeHandlerWrapperBuilder
};
use rand::seq::SliceRandom;
use std::io;
use std::{io, task::Context, task::Poll};
// TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ?
struct TestHandler<TSubstream>(std::marker::PhantomData<TSubstream>);
@ -47,7 +47,7 @@ impl<TSubstream> Default for TestHandler<TSubstream> {
impl<TSubstream> ProtocolsHandler for TestHandler<TSubstream>
where
TSubstream: futures::PollRead + futures::PollWrite
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static
{
type InEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
type OutEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
@ -82,7 +82,7 @@ where
fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::No }
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>, Self::Error> {
fn poll(&mut self, _: &mut Context) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> {
Poll::Pending
}
}
@ -113,26 +113,27 @@ fn deny_incoming_connec() {
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let address =
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll() {
listen_addr
let address = futures::executor::block_on(future::poll_fn(|cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) {
Poll::Ready(listen_addr)
} else {
panic!("Was expecting the listen address to be reported")
};
}
}));
swarm2
.peer(swarm1.local_peer_id().clone())
.into_not_connected().unwrap()
.connect(address.clone(), TestHandler::default().into_node_handler_builder());
let future = future::poll_fn(|| -> Poll<Result<(), io::Error>> {
match swarm1.poll() {
futures::executor::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
match swarm1.poll(cx) {
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => drop(inc),
Poll::Ready(_) => unreachable!(),
Poll::Pending => (),
}
match swarm2.poll() {
match swarm2.poll(cx) {
Poll::Ready(NetworkEvent::DialError {
new_state: PeerState::NotConnected,
peer_id,
@ -148,9 +149,7 @@ fn deny_incoming_connec() {
}
Poll::Pending
});
tokio::runtime::current_thread::Runtime::new().unwrap().block_on(future).unwrap();
})).unwrap();
}
#[test]
@ -176,31 +175,30 @@ fn dial_self() {
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
});
Network::new(transport, local_public_key.into())
};
swarm.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let (address, mut swarm) =
future::lazy(move || {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm.poll() {
let (address, mut swarm) = futures::executor::block_on(
future::lazy(move |cx| {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm.poll(cx) {
Ok::<_, void::Void>((listen_addr, swarm))
} else {
panic!("Was expecting the listen address to be reported")
}
})
.wait()
}))
.unwrap();
swarm.dial(address.clone(), TestHandler::default().into_node_handler_builder()).unwrap();
let mut got_dial_err = false;
let mut got_inc_err = false;
let future = future::poll_fn(|| -> Poll<Result<(), io::Error>> {
futures::executor::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
loop {
match swarm.poll() {
match swarm.poll(cx) {
Poll::Ready(NetworkEvent::UnknownPeerDialError {
multiaddr,
error: UnknownPeerDialErr::FoundLocalPeerId,
@ -210,7 +208,7 @@ fn dial_self() {
assert!(!got_dial_err);
got_dial_err = true;
if got_inc_err {
return Ok(Poll::Ready(()));
return Poll::Ready(Ok(()));
}
},
Poll::Ready(NetworkEvent::IncomingConnectionError {
@ -222,7 +220,7 @@ fn dial_self() {
assert!(!got_inc_err);
got_inc_err = true;
if got_dial_err {
return Ok(Poll::Ready(()));
return Poll::Ready(Ok(()));
}
},
Poll::Ready(NetworkEvent::IncomingConnection(inc)) => {
@ -235,9 +233,7 @@ fn dial_self() {
Poll::Pending => break Poll::Pending,
}
}
});
tokio::runtime::current_thread::Runtime::new().unwrap().block_on(future).unwrap();
})).unwrap();
}
#[test]
@ -288,9 +284,9 @@ fn multiple_addresses_err() {
.connect_iter(addresses.clone(), TestHandler::default().into_node_handler_builder())
.unwrap();
let future = future::poll_fn(|| -> Poll<Result<(), io::Error>> {
futures::executor::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
loop {
match swarm.poll() {
match swarm.poll(cx) {
Poll::Ready(NetworkEvent::DialError {
new_state,
peer_id,
@ -302,7 +298,7 @@ fn multiple_addresses_err() {
assert_eq!(multiaddr, expected);
if addresses.is_empty() {
assert_eq!(new_state, PeerState::NotConnected);
return Ok(Poll::Ready(()));
return Poll::Ready(Ok(()));
} else {
match new_state {
PeerState::Dialing { num_pending_addresses } => {
@ -316,7 +312,5 @@ fn multiple_addresses_err() {
Poll::Pending => break Poll::Pending,
}
}
});
tokio::runtime::current_thread::Runtime::new().unwrap().block_on(future).unwrap();
})).unwrap();
}

View File

@ -31,8 +31,8 @@ use libp2p_swarm::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr,
};
use std::{io, time::Duration};
use wasm_timer::{Delay, Instant};
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
use wasm_timer::Delay;
// TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ?
struct TestHandler<TSubstream>(std::marker::PhantomData<TSubstream>);
@ -45,7 +45,7 @@ impl<TSubstream> Default for TestHandler<TSubstream> {
impl<TSubstream> ProtocolsHandler for TestHandler<TSubstream>
where
TSubstream: futures::PollRead + futures::PollWrite
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static
{
type InEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
type OutEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
@ -80,7 +80,7 @@ where
fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::Yes }
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>, Self::Error> {
fn poll(&mut self, _: &mut Context) -> Poll<ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>> {
Poll::Pending
}
}
@ -116,7 +116,7 @@ fn raw_swarm_simultaneous_connect() {
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
});
Network::new(transport, local_public_key.into_peer_id())
};
@ -131,7 +131,7 @@ fn raw_swarm_simultaneous_connect() {
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
});
Network::new(transport, local_public_key.into_peer_id())
};
@ -139,17 +139,17 @@ fn raw_swarm_simultaneous_connect() {
swarm1.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
swarm2.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap();
let (swarm1_listen_addr, swarm2_listen_addr, mut swarm1, mut swarm2) =
future::lazy(move || {
let (swarm1_listen_addr, swarm2_listen_addr, mut swarm1, mut swarm2) = futures::executor::block_on(
future::lazy(move |cx| {
let swarm1_listen_addr =
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll() {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm1.poll(cx) {
listen_addr
} else {
panic!("Was expecting the listen address to be reported")
};
let swarm2_listen_addr =
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm2.poll() {
if let Poll::Ready(NetworkEvent::NewListenerAddress { listen_addr, .. }) = swarm2.poll(cx) {
listen_addr
} else {
panic!("Was expecting the listen address to be reported")
@ -157,19 +157,16 @@ fn raw_swarm_simultaneous_connect() {
Ok::<_, void::Void>((swarm1_listen_addr, swarm2_listen_addr, swarm1, swarm2))
})
.wait()
.unwrap();
let mut reactor = tokio::runtime::current_thread::Runtime::new().unwrap();
).unwrap();
loop {
let mut swarm1_step = 0;
let mut swarm2_step = 0;
let mut swarm1_dial_start = Delay::new(Instant::now() + Duration::new(0, rand::random::<u32>() % 50_000_000));
let mut swarm2_dial_start = Delay::new(Instant::now() + Duration::new(0, rand::random::<u32>() % 50_000_000));
let mut swarm1_dial_start = Delay::new(Duration::new(0, rand::random::<u32>() % 50_000_000));
let mut swarm2_dial_start = Delay::new(Duration::new(0, rand::random::<u32>() % 50_000_000));
let future = future::poll_fn(|| -> Poll<bool, io::Error> {
let future = future::poll_fn(|cx| -> Poll<bool> {
loop {
let mut swarm1_not_ready = false;
let mut swarm2_not_ready = false;
@ -178,7 +175,7 @@ fn raw_swarm_simultaneous_connect() {
// handle other nodes, which may delay the processing.
if swarm1_step == 0 {
match swarm1_dial_start.poll().unwrap() {
match Future::poll(Pin::new(&mut swarm1_dial_start), cx) {
Poll::Ready(_) => {
let handler = TestHandler::default().into_node_handler_builder();
swarm1.peer(swarm2.local_peer_id().clone())
@ -192,7 +189,7 @@ fn raw_swarm_simultaneous_connect() {
}
if swarm2_step == 0 {
match swarm2_dial_start.poll().unwrap() {
match Future::poll(Pin::new(&mut swarm2_dial_start), cx) {
Poll::Ready(_) => {
let handler = TestHandler::default().into_node_handler_builder();
swarm2.peer(swarm1.local_peer_id().clone())
@ -206,7 +203,7 @@ fn raw_swarm_simultaneous_connect() {
}
if rand::random::<f32>() < 0.1 {
match swarm1.poll() {
match swarm1.poll(cx) {
Poll::Ready(NetworkEvent::IncomingConnectionError {
error: IncomingError::DeniedLowerPriority, ..
}) => {
@ -218,7 +215,7 @@ fn raw_swarm_simultaneous_connect() {
if swarm1_step == 0 {
// The connection was established before
// swarm1 started dialing; discard the test run.
return Ok(Poll::Ready(false))
return Poll::Ready(false)
}
assert_eq!(swarm1_step, 1);
swarm1_step = 2;
@ -237,7 +234,7 @@ fn raw_swarm_simultaneous_connect() {
}
if rand::random::<f32>() < 0.1 {
match swarm2.poll() {
match swarm2.poll(cx) {
Poll::Ready(NetworkEvent::IncomingConnectionError {
error: IncomingError::DeniedLowerPriority, ..
}) => {
@ -249,7 +246,7 @@ fn raw_swarm_simultaneous_connect() {
if swarm2_step == 0 {
// The connection was established before
// swarm2 started dialing; discard the test run.
return Ok(Poll::Ready(false))
return Poll::Ready(false)
}
assert_eq!(swarm2_step, 1);
swarm2_step = 2;
@ -269,7 +266,7 @@ fn raw_swarm_simultaneous_connect() {
// TODO: make sure that >= 5 is correct
if swarm1_step + swarm2_step >= 5 {
return Ok(Poll::Ready(true));
return Poll::Ready(true);
}
if swarm1_not_ready && swarm2_not_ready {
@ -278,7 +275,7 @@ fn raw_swarm_simultaneous_connect() {
}
});
if reactor.block_on(future).unwrap() {
if futures::executor::block_on(future) {
// The test exercised what we wanted to exercise: a simultaneous connect.
break
} else {

View File

@ -22,13 +22,13 @@ mod util;
use futures::prelude::*;
use libp2p_core::identity;
use libp2p_core::transport::{Transport, MemoryTransport, ListenerEvent};
use libp2p_core::transport::{Transport, MemoryTransport};
use libp2p_core::upgrade::{self, UpgradeInfo, Negotiated, InboundUpgrade, OutboundUpgrade};
use libp2p_mplex::MplexConfig;
use libp2p_secio::SecioConfig;
use multiaddr::Multiaddr;
use rand::random;
use std::io;
use std::{io, pin::Pin};
#[derive(Clone)]
struct HelloUpgrade {}
@ -44,30 +44,36 @@ impl UpgradeInfo for HelloUpgrade {
impl<C> InboundUpgrade<C> for HelloUpgrade
where
C: AsyncRead + AsyncWrite + Send + 'static
C: AsyncRead + AsyncWrite + Send + Unpin + 'static
{
type Output = Negotiated<C>;
type Error = io::Error;
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_inbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
Box::new(nio::read_exact(socket, [0u8; 5]).map(|(io, buf)| {
fn upgrade_inbound(self, mut socket: Negotiated<C>, _: Self::Info) -> Self::Future {
Box::pin(async move {
let mut buf = [0u8; 5];
socket.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf[..], "hello".as_bytes());
io
}))
Ok(socket)
})
}
}
impl<C> OutboundUpgrade<C> for HelloUpgrade
where
C: AsyncWrite + AsyncRead + Send + 'static,
C: AsyncWrite + AsyncRead + Send + Unpin + 'static,
{
type Output = Negotiated<C>;
type Error = io::Error;
type Future = Box<dyn Future<Item = Self::Output, Error = Self::Error> + Send>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
fn upgrade_outbound(self, socket: Negotiated<C>, _: Self::Info) -> Self::Future {
Box::new(nio::write_all(socket, "hello").map(|(io, _)| io))
fn upgrade_outbound(self, mut socket: Negotiated<C>, _: Self::Info) -> Self::Future {
Box::pin(async move {
socket.write_all(b"hello").await.unwrap();
socket.flush().await.unwrap();
Ok(socket)
})
}
}
@ -85,7 +91,7 @@ fn upgrade_pipeline() {
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
});
let dialer_keys = identity::Keypair::generate_ed25519();
@ -100,27 +106,31 @@ fn upgrade_pipeline() {
.and_then(|(peer, mplex), _| {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map(move |mplex| (peer, mplex))
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
});
let listen_addr: Multiaddr = format!("/memory/{}", random::<u64>()).parse().unwrap();
let listener = listener_transport.listen_on(listen_addr.clone()).unwrap()
.filter_map(ListenerEvent::into_upgrade)
.for_each(move |(upgrade, _remote_addr)| {
let dialer = dialer_id.clone();
upgrade.map(move |(peer, _mplex)| {
assert_eq!(peer, dialer)
})
})
.map_err(|e| panic!("Listener error: {}", e));
let dialer = dialer_transport.dial(listen_addr).unwrap()
.map(move |(peer, _mplex)| {
assert_eq!(peer, listener_id)
async_std::task::spawn({
let listen_addr = listen_addr.clone();
let dialer_id = dialer_id.clone();
async move {
let mut listener = listener_transport.listen_on(listen_addr).unwrap();
loop {
let (upgrade, _remote_addr) = match listener.next().await.unwrap().unwrap().into_upgrade() {
Some(u) => u,
None => continue
};
let (peer, _mplex) = upgrade.await.unwrap();
assert_eq!(peer, dialer_id);
}
}
});
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.spawn(listener);
rt.block_on(dialer).unwrap()
async_std::task::block_on(async move {
let (peer, _mplex) = dialer_transport.dial(listen_addr).unwrap().await.unwrap();
assert_eq!(peer, listener_id);
});
}

View File

@ -29,11 +29,11 @@ where
{
type Output = Result<M, M::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match std::mem::replace(&mut self.state, CloseMuxerState::Done) {
CloseMuxerState::Close(muxer) => {
if muxer.close()?.is_not_ready() {
if !muxer.close(cx)?.is_ready() {
self.state = CloseMuxerState::Close(muxer);
return Poll::Pending
}
@ -45,3 +45,5 @@ where
}
}
impl<M> Unpin for CloseMuxer<M> {
}