diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index e26d112f..8de90b3c 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -2,9 +2,6 @@ - Fixed connection keep-alive, permitting connections to close due to inactivity. -- Added `RequestResponse::throttled` to wrap the behaviour into one that - enforces limits on inbound and outbound requests per peer. The limits - have to be known upfront by all nodes. - Bump `libp2p-core` and `libp2p-swarm` dependencies. # 0.1.1 diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 2bd7e57d..e7a728b3 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -70,11 +70,13 @@ pub mod codec; pub mod handler; -pub mod throttled; + +// Disabled until #1706 is fixed: +// pub mod throttled; +// pub use throttled::Throttled; pub use codec::{RequestResponseCodec, ProtocolName}; pub use handler::ProtocolSupport; -pub use throttled::Throttled; use futures::{ channel::oneshot, @@ -311,10 +313,11 @@ where } } - /// Wrap this behaviour in [`Throttled`] to limit the number of concurrent requests per peer. - pub fn throttled(self) -> Throttled { - Throttled::new(self) - } +// Disabled until #1706 is fixed. +// /// Wrap this behaviour in [`Throttled`] to limit the number of concurrent requests per peer. +// pub fn throttled(self) -> Throttled { +// Throttled::new(self) +// } /// Initiates sending a request. /// diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 0efec0a1..11a8601d 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -35,7 +35,8 @@ use libp2p_swarm::Swarm; use libp2p_tcp::TcpConfig; use futures::{prelude::*, channel::mpsc}; use rand::{self, Rng}; -use std::{collections::HashSet, io, iter, num::NonZeroU16}; +use std::{io, iter}; +// use std::{collections::HashSet, num::NonZeroU16}; // Disabled until #1706 is fixed. /// Exercises a simple ping protocol. #[test] @@ -116,201 +117,202 @@ fn ping_protocol() { let () = async_std::task::block_on(peer2); } -/// Like `ping_protocol`, but throttling concurrent requests. -#[test] -fn ping_protocol_throttled() { - let ping = Ping("ping".to_string().into_bytes()); - let pong = Pong("pong".to_string().into_bytes()); - - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); - - let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled(); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); - - let (peer2_id, trans) = mk_transport(); - let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled(); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); - - let (mut tx, mut rx) = mpsc::channel::(1); - - let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - Swarm::listen_on(&mut swarm1, addr).unwrap(); - - let expected_ping = ping.clone(); - let expected_pong = pong.clone(); - - let limit: u16 = rand::thread_rng().gen_range(1, 10); - swarm1.set_default_limit(NonZeroU16::new(limit).unwrap()); - swarm2.set_default_limit(NonZeroU16::new(limit).unwrap()); - - let peer1 = async move { - while let Some(_) = swarm1.next().now_or_never() {} - - let l = Swarm::listeners(&swarm1).next().unwrap(); - tx.send(l.clone()).await.unwrap(); - - loop { - match swarm1.next().await { - throttled::Event::Event(RequestResponseEvent::Message { - peer, - message: RequestResponseMessage::Request { request, channel } - }) => { - assert_eq!(&request, &expected_ping); - assert_eq!(&peer, &peer2_id); - swarm1.send_response(channel, pong.clone()); - }, - e => panic!("Peer1: Unexpected event: {:?}", e) - } - } - }; - - let num_pings: u8 = rand::thread_rng().gen_range(1, 100); - - let peer2 = async move { - let mut count = 0; - let addr = rx.next().await.unwrap(); - swarm2.add_address(&peer1_id, addr.clone()); - let mut blocked = false; - let mut req_ids = HashSet::new(); - - loop { - if !blocked { - while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() { - req_ids.insert(id); - } - blocked = true; - } - match swarm2.next().await { - throttled::Event::ResumeSending(peer) => { - assert_eq!(peer, peer1_id); - blocked = false - } - throttled::Event::Event(RequestResponseEvent::Message { - peer, - message: RequestResponseMessage::Response { request_id, response } - }) => { - count += 1; - assert_eq!(&response, &expected_pong); - assert_eq!(&peer, &peer1_id); - assert!(req_ids.remove(&request_id)); - if count >= num_pings { - break - } - } - e => panic!("Peer2: Unexpected event: {:?}", e) - } - } - }; - - async_std::task::spawn(Box::pin(peer1)); - let () = async_std::task::block_on(peer2); -} - -#[test] -fn ping_protocol_limit_violation() { - let ping = Ping("ping".to_string().into_bytes()); - let pong = Pong("pong".to_string().into_bytes()); - - let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); - - let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled(); - let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); - - let (peer2_id, trans) = mk_transport(); - let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled(); - let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); - - let (mut tx, mut rx) = mpsc::channel::(1); - - let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); - Swarm::listen_on(&mut swarm1, addr).unwrap(); - - let expected_ping = ping.clone(); - let expected_pong = pong.clone(); - - swarm2.set_default_limit(NonZeroU16::new(2).unwrap()); - - let peer1 = async move { - while let Some(_) = swarm1.next().now_or_never() {} - - let l = Swarm::listeners(&swarm1).next().unwrap(); - tx.send(l.clone()).await.unwrap(); - - let mut pending_responses = Vec::new(); - - loop { - match swarm1.next().await { - throttled::Event::Event(RequestResponseEvent::Message { - peer, - message: RequestResponseMessage::Request { request, channel } - }) => { - assert_eq!(&request, &expected_ping); - assert_eq!(&peer, &peer2_id); - pending_responses.push((channel, pong.clone())); - }, - throttled::Event::TooManyInboundRequests(p) => { - assert_eq!(p, peer2_id); - break - } - e => panic!("Peer1: Unexpected event: {:?}", e) - } - if pending_responses.len() >= 2 { - for (channel, pong) in pending_responses.drain(..) { - swarm1.send_response(channel, pong) - } - } - } - }; - - let num_pings: u8 = rand::thread_rng().gen_range(1, 100); - - let peer2 = async move { - let mut count = 0; - let addr = rx.next().await.unwrap(); - swarm2.add_address(&peer1_id, addr.clone()); - let mut blocked = false; - let mut req_ids = HashSet::new(); - - loop { - if !blocked { - while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() { - req_ids.insert(id); - } - blocked = true; - } - match swarm2.next().await { - throttled::Event::ResumeSending(peer) => { - assert_eq!(peer, peer1_id); - blocked = false - } - throttled::Event::Event(RequestResponseEvent::Message { - peer, - message: RequestResponseMessage::Response { request_id, response } - }) => { - count += 1; - assert_eq!(&response, &expected_pong); - assert_eq!(&peer, &peer1_id); - assert!(req_ids.remove(&request_id)); - if count >= num_pings { - break - } - } - throttled::Event::Event(RequestResponseEvent::OutboundFailure { error, .. }) => { - assert!(matches!(error, OutboundFailure::ConnectionClosed)); - break - } - e => panic!("Peer2: Unexpected event: {:?}", e) - } - } - }; - - async_std::task::spawn(Box::pin(peer1)); - let () = async_std::task::block_on(peer2); -} +// Disabled until #1706 is fixed. +///// Like `ping_protocol`, but throttling concurrent requests. +//#[test] +//fn ping_protocol_throttled() { +// let ping = Ping("ping".to_string().into_bytes()); +// let pong = Pong("pong".to_string().into_bytes()); +// +// let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); +// let cfg = RequestResponseConfig::default(); +// +// let (peer1_id, trans) = mk_transport(); +// let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled(); +// let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); +// +// let (peer2_id, trans) = mk_transport(); +// let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled(); +// let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); +// +// let (mut tx, mut rx) = mpsc::channel::(1); +// +// let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); +// Swarm::listen_on(&mut swarm1, addr).unwrap(); +// +// let expected_ping = ping.clone(); +// let expected_pong = pong.clone(); +// +// let limit: u16 = rand::thread_rng().gen_range(1, 10); +// swarm1.set_default_limit(NonZeroU16::new(limit).unwrap()); +// swarm2.set_default_limit(NonZeroU16::new(limit).unwrap()); +// +// let peer1 = async move { +// while let Some(_) = swarm1.next().now_or_never() {} +// +// let l = Swarm::listeners(&swarm1).next().unwrap(); +// tx.send(l.clone()).await.unwrap(); +// +// loop { +// match swarm1.next().await { +// throttled::Event::Event(RequestResponseEvent::Message { +// peer, +// message: RequestResponseMessage::Request { request, channel } +// }) => { +// assert_eq!(&request, &expected_ping); +// assert_eq!(&peer, &peer2_id); +// swarm1.send_response(channel, pong.clone()); +// }, +// e => panic!("Peer1: Unexpected event: {:?}", e) +// } +// } +// }; +// +// let num_pings: u8 = rand::thread_rng().gen_range(1, 100); +// +// let peer2 = async move { +// let mut count = 0; +// let addr = rx.next().await.unwrap(); +// swarm2.add_address(&peer1_id, addr.clone()); +// let mut blocked = false; +// let mut req_ids = HashSet::new(); +// +// loop { +// if !blocked { +// while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() { +// req_ids.insert(id); +// } +// blocked = true; +// } +// match swarm2.next().await { +// throttled::Event::ResumeSending(peer) => { +// assert_eq!(peer, peer1_id); +// blocked = false +// } +// throttled::Event::Event(RequestResponseEvent::Message { +// peer, +// message: RequestResponseMessage::Response { request_id, response } +// }) => { +// count += 1; +// assert_eq!(&response, &expected_pong); +// assert_eq!(&peer, &peer1_id); +// assert!(req_ids.remove(&request_id)); +// if count >= num_pings { +// break +// } +// } +// e => panic!("Peer2: Unexpected event: {:?}", e) +// } +// } +// }; +// +// async_std::task::spawn(Box::pin(peer1)); +// let () = async_std::task::block_on(peer2); +//} +// +//#[test] +//fn ping_protocol_limit_violation() { +// let ping = Ping("ping".to_string().into_bytes()); +// let pong = Pong("pong".to_string().into_bytes()); +// +// let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); +// let cfg = RequestResponseConfig::default(); +// +// let (peer1_id, trans) = mk_transport(); +// let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled(); +// let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); +// +// let (peer2_id, trans) = mk_transport(); +// let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled(); +// let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); +// +// let (mut tx, mut rx) = mpsc::channel::(1); +// +// let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); +// Swarm::listen_on(&mut swarm1, addr).unwrap(); +// +// let expected_ping = ping.clone(); +// let expected_pong = pong.clone(); +// +// swarm2.set_default_limit(NonZeroU16::new(2).unwrap()); +// +// let peer1 = async move { +// while let Some(_) = swarm1.next().now_or_never() {} +// +// let l = Swarm::listeners(&swarm1).next().unwrap(); +// tx.send(l.clone()).await.unwrap(); +// +// let mut pending_responses = Vec::new(); +// +// loop { +// match swarm1.next().await { +// throttled::Event::Event(RequestResponseEvent::Message { +// peer, +// message: RequestResponseMessage::Request { request, channel } +// }) => { +// assert_eq!(&request, &expected_ping); +// assert_eq!(&peer, &peer2_id); +// pending_responses.push((channel, pong.clone())); +// }, +// throttled::Event::TooManyInboundRequests(p) => { +// assert_eq!(p, peer2_id); +// break +// } +// e => panic!("Peer1: Unexpected event: {:?}", e) +// } +// if pending_responses.len() >= 2 { +// for (channel, pong) in pending_responses.drain(..) { +// swarm1.send_response(channel, pong) +// } +// } +// } +// }; +// +// let num_pings: u8 = rand::thread_rng().gen_range(1, 100); +// +// let peer2 = async move { +// let mut count = 0; +// let addr = rx.next().await.unwrap(); +// swarm2.add_address(&peer1_id, addr.clone()); +// let mut blocked = false; +// let mut req_ids = HashSet::new(); +// +// loop { +// if !blocked { +// while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() { +// req_ids.insert(id); +// } +// blocked = true; +// } +// match swarm2.next().await { +// throttled::Event::ResumeSending(peer) => { +// assert_eq!(peer, peer1_id); +// blocked = false +// } +// throttled::Event::Event(RequestResponseEvent::Message { +// peer, +// message: RequestResponseMessage::Response { request_id, response } +// }) => { +// count += 1; +// assert_eq!(&response, &expected_pong); +// assert_eq!(&peer, &peer1_id); +// assert!(req_ids.remove(&request_id)); +// if count >= num_pings { +// break +// } +// } +// throttled::Event::Event(RequestResponseEvent::OutboundFailure { error, .. }) => { +// assert!(matches!(error, OutboundFailure::ConnectionClosed)); +// break +// } +// e => panic!("Peer2: Unexpected event: {:?}", e) +// } +// } +// }; +// +// async_std::task::spawn(Box::pin(peer1)); +// let () = async_std::task::block_on(peer2); +//} fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) { let id_keys = identity::Keypair::generate_ed25519();