Disable RequestResponse::throttled. (#1711)

Can be enabled again after #1706 is resolved.

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
This commit is contained in:
Toralf Wittner
2020-08-18 14:29:38 +02:00
committed by GitHub
parent e1df920703
commit b4ad2d6297
3 changed files with 207 additions and 205 deletions

View File

@ -2,9 +2,6 @@
- Fixed connection keep-alive, permitting connections to close due - Fixed connection keep-alive, permitting connections to close due
to inactivity. to inactivity.
- Added `RequestResponse::throttled` to wrap the behaviour into one that
enforces limits on inbound and outbound requests per peer. The limits
have to be known upfront by all nodes.
- Bump `libp2p-core` and `libp2p-swarm` dependencies. - Bump `libp2p-core` and `libp2p-swarm` dependencies.
# 0.1.1 # 0.1.1

View File

@ -70,11 +70,13 @@
pub mod codec; pub mod codec;
pub mod handler; pub mod handler;
pub mod throttled;
// Disabled until #1706 is fixed:
// pub mod throttled;
// pub use throttled::Throttled;
pub use codec::{RequestResponseCodec, ProtocolName}; pub use codec::{RequestResponseCodec, ProtocolName};
pub use handler::ProtocolSupport; pub use handler::ProtocolSupport;
pub use throttled::Throttled;
use futures::{ use futures::{
channel::oneshot, channel::oneshot,
@ -311,10 +313,11 @@ where
} }
} }
/// Wrap this behaviour in [`Throttled`] to limit the number of concurrent requests per peer. // Disabled until #1706 is fixed.
pub fn throttled(self) -> Throttled<TCodec> { // /// Wrap this behaviour in [`Throttled`] to limit the number of concurrent requests per peer.
Throttled::new(self) // pub fn throttled(self) -> Throttled<TCodec> {
} // Throttled::new(self)
// }
/// Initiates sending a request. /// Initiates sending a request.
/// ///

View File

@ -35,7 +35,8 @@ use libp2p_swarm::Swarm;
use libp2p_tcp::TcpConfig; use libp2p_tcp::TcpConfig;
use futures::{prelude::*, channel::mpsc}; use futures::{prelude::*, channel::mpsc};
use rand::{self, Rng}; use rand::{self, Rng};
use std::{collections::HashSet, io, iter, num::NonZeroU16}; use std::{io, iter};
// use std::{collections::HashSet, num::NonZeroU16}; // Disabled until #1706 is fixed.
/// Exercises a simple ping protocol. /// Exercises a simple ping protocol.
#[test] #[test]
@ -116,201 +117,202 @@ fn ping_protocol() {
let () = async_std::task::block_on(peer2); let () = async_std::task::block_on(peer2);
} }
/// Like `ping_protocol`, but throttling concurrent requests. // Disabled until #1706 is fixed.
#[test] ///// Like `ping_protocol`, but throttling concurrent requests.
fn ping_protocol_throttled() { //#[test]
let ping = Ping("ping".to_string().into_bytes()); //fn ping_protocol_throttled() {
let pong = Pong("pong".to_string().into_bytes()); // let ping = Ping("ping".to_string().into_bytes());
// let pong = Pong("pong".to_string().into_bytes());
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); //
let cfg = RequestResponseConfig::default(); // let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
// let cfg = RequestResponseConfig::default();
let (peer1_id, trans) = mk_transport(); //
let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled(); // let (peer1_id, trans) = mk_transport();
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); // let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled();
// let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone());
let (peer2_id, trans) = mk_transport(); //
let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled(); // let (peer2_id, trans) = mk_transport();
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); // let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled();
// let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone());
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1); //
// let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); //
Swarm::listen_on(&mut swarm1, addr).unwrap(); // let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
// Swarm::listen_on(&mut swarm1, addr).unwrap();
let expected_ping = ping.clone(); //
let expected_pong = pong.clone(); // let expected_ping = ping.clone();
// let expected_pong = pong.clone();
let limit: u16 = rand::thread_rng().gen_range(1, 10); //
swarm1.set_default_limit(NonZeroU16::new(limit).unwrap()); // let limit: u16 = rand::thread_rng().gen_range(1, 10);
swarm2.set_default_limit(NonZeroU16::new(limit).unwrap()); // swarm1.set_default_limit(NonZeroU16::new(limit).unwrap());
// swarm2.set_default_limit(NonZeroU16::new(limit).unwrap());
let peer1 = async move { //
while let Some(_) = swarm1.next().now_or_never() {} // let peer1 = async move {
// while let Some(_) = swarm1.next().now_or_never() {}
let l = Swarm::listeners(&swarm1).next().unwrap(); //
tx.send(l.clone()).await.unwrap(); // let l = Swarm::listeners(&swarm1).next().unwrap();
// tx.send(l.clone()).await.unwrap();
loop { //
match swarm1.next().await { // loop {
throttled::Event::Event(RequestResponseEvent::Message { // match swarm1.next().await {
peer, // throttled::Event::Event(RequestResponseEvent::Message {
message: RequestResponseMessage::Request { request, channel } // peer,
}) => { // message: RequestResponseMessage::Request { request, channel }
assert_eq!(&request, &expected_ping); // }) => {
assert_eq!(&peer, &peer2_id); // assert_eq!(&request, &expected_ping);
swarm1.send_response(channel, pong.clone()); // assert_eq!(&peer, &peer2_id);
}, // swarm1.send_response(channel, pong.clone());
e => panic!("Peer1: Unexpected event: {:?}", e) // },
} // e => panic!("Peer1: Unexpected event: {:?}", e)
} // }
}; // }
// };
let num_pings: u8 = rand::thread_rng().gen_range(1, 100); //
// let num_pings: u8 = rand::thread_rng().gen_range(1, 100);
let peer2 = async move { //
let mut count = 0; // let peer2 = async move {
let addr = rx.next().await.unwrap(); // let mut count = 0;
swarm2.add_address(&peer1_id, addr.clone()); // let addr = rx.next().await.unwrap();
let mut blocked = false; // swarm2.add_address(&peer1_id, addr.clone());
let mut req_ids = HashSet::new(); // let mut blocked = false;
// let mut req_ids = HashSet::new();
loop { //
if !blocked { // loop {
while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() { // if !blocked {
req_ids.insert(id); // while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() {
} // req_ids.insert(id);
blocked = true; // }
} // blocked = true;
match swarm2.next().await { // }
throttled::Event::ResumeSending(peer) => { // match swarm2.next().await {
assert_eq!(peer, peer1_id); // throttled::Event::ResumeSending(peer) => {
blocked = false // assert_eq!(peer, peer1_id);
} // blocked = false
throttled::Event::Event(RequestResponseEvent::Message { // }
peer, // throttled::Event::Event(RequestResponseEvent::Message {
message: RequestResponseMessage::Response { request_id, response } // peer,
}) => { // message: RequestResponseMessage::Response { request_id, response }
count += 1; // }) => {
assert_eq!(&response, &expected_pong); // count += 1;
assert_eq!(&peer, &peer1_id); // assert_eq!(&response, &expected_pong);
assert!(req_ids.remove(&request_id)); // assert_eq!(&peer, &peer1_id);
if count >= num_pings { // assert!(req_ids.remove(&request_id));
break // if count >= num_pings {
} // break
} // }
e => panic!("Peer2: Unexpected event: {:?}", e) // }
} // e => panic!("Peer2: Unexpected event: {:?}", e)
} // }
}; // }
// };
async_std::task::spawn(Box::pin(peer1)); //
let () = async_std::task::block_on(peer2); // async_std::task::spawn(Box::pin(peer1));
} // let () = async_std::task::block_on(peer2);
//}
#[test] //
fn ping_protocol_limit_violation() { //#[test]
let ping = Ping("ping".to_string().into_bytes()); //fn ping_protocol_limit_violation() {
let pong = Pong("pong".to_string().into_bytes()); // let ping = Ping("ping".to_string().into_bytes());
// let pong = Pong("pong".to_string().into_bytes());
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); //
let cfg = RequestResponseConfig::default(); // let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
// let cfg = RequestResponseConfig::default();
let (peer1_id, trans) = mk_transport(); //
let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled(); // let (peer1_id, trans) = mk_transport();
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); // let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()).throttled();
// let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone());
let (peer2_id, trans) = mk_transport(); //
let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled(); // let (peer2_id, trans) = mk_transport();
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); // let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg).throttled();
// let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone());
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1); //
// let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); //
Swarm::listen_on(&mut swarm1, addr).unwrap(); // let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
// Swarm::listen_on(&mut swarm1, addr).unwrap();
let expected_ping = ping.clone(); //
let expected_pong = pong.clone(); // let expected_ping = ping.clone();
// let expected_pong = pong.clone();
swarm2.set_default_limit(NonZeroU16::new(2).unwrap()); //
// swarm2.set_default_limit(NonZeroU16::new(2).unwrap());
let peer1 = async move { //
while let Some(_) = swarm1.next().now_or_never() {} // let peer1 = async move {
// while let Some(_) = swarm1.next().now_or_never() {}
let l = Swarm::listeners(&swarm1).next().unwrap(); //
tx.send(l.clone()).await.unwrap(); // let l = Swarm::listeners(&swarm1).next().unwrap();
// tx.send(l.clone()).await.unwrap();
let mut pending_responses = Vec::new(); //
// let mut pending_responses = Vec::new();
loop { //
match swarm1.next().await { // loop {
throttled::Event::Event(RequestResponseEvent::Message { // match swarm1.next().await {
peer, // throttled::Event::Event(RequestResponseEvent::Message {
message: RequestResponseMessage::Request { request, channel } // peer,
}) => { // message: RequestResponseMessage::Request { request, channel }
assert_eq!(&request, &expected_ping); // }) => {
assert_eq!(&peer, &peer2_id); // assert_eq!(&request, &expected_ping);
pending_responses.push((channel, pong.clone())); // assert_eq!(&peer, &peer2_id);
}, // pending_responses.push((channel, pong.clone()));
throttled::Event::TooManyInboundRequests(p) => { // },
assert_eq!(p, peer2_id); // throttled::Event::TooManyInboundRequests(p) => {
break // assert_eq!(p, peer2_id);
} // break
e => panic!("Peer1: Unexpected event: {:?}", e) // }
} // e => panic!("Peer1: Unexpected event: {:?}", e)
if pending_responses.len() >= 2 { // }
for (channel, pong) in pending_responses.drain(..) { // if pending_responses.len() >= 2 {
swarm1.send_response(channel, pong) // for (channel, pong) in pending_responses.drain(..) {
} // swarm1.send_response(channel, pong)
} // }
} // }
}; // }
// };
let num_pings: u8 = rand::thread_rng().gen_range(1, 100); //
// let num_pings: u8 = rand::thread_rng().gen_range(1, 100);
let peer2 = async move { //
let mut count = 0; // let peer2 = async move {
let addr = rx.next().await.unwrap(); // let mut count = 0;
swarm2.add_address(&peer1_id, addr.clone()); // let addr = rx.next().await.unwrap();
let mut blocked = false; // swarm2.add_address(&peer1_id, addr.clone());
let mut req_ids = HashSet::new(); // let mut blocked = false;
// let mut req_ids = HashSet::new();
loop { //
if !blocked { // loop {
while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() { // if !blocked {
req_ids.insert(id); // while let Some(id) = swarm2.send_request(&peer1_id, ping.clone()).ok() {
} // req_ids.insert(id);
blocked = true; // }
} // blocked = true;
match swarm2.next().await { // }
throttled::Event::ResumeSending(peer) => { // match swarm2.next().await {
assert_eq!(peer, peer1_id); // throttled::Event::ResumeSending(peer) => {
blocked = false // assert_eq!(peer, peer1_id);
} // blocked = false
throttled::Event::Event(RequestResponseEvent::Message { // }
peer, // throttled::Event::Event(RequestResponseEvent::Message {
message: RequestResponseMessage::Response { request_id, response } // peer,
}) => { // message: RequestResponseMessage::Response { request_id, response }
count += 1; // }) => {
assert_eq!(&response, &expected_pong); // count += 1;
assert_eq!(&peer, &peer1_id); // assert_eq!(&response, &expected_pong);
assert!(req_ids.remove(&request_id)); // assert_eq!(&peer, &peer1_id);
if count >= num_pings { // assert!(req_ids.remove(&request_id));
break // if count >= num_pings {
} // break
} // }
throttled::Event::Event(RequestResponseEvent::OutboundFailure { error, .. }) => { // }
assert!(matches!(error, OutboundFailure::ConnectionClosed)); // throttled::Event::Event(RequestResponseEvent::OutboundFailure { error, .. }) => {
break // assert!(matches!(error, OutboundFailure::ConnectionClosed));
} // break
e => panic!("Peer2: Unexpected event: {:?}", e) // }
} // e => panic!("Peer2: Unexpected event: {:?}", e)
} // }
}; // }
// };
async_std::task::spawn(Box::pin(peer1)); //
let () = async_std::task::block_on(peer2); // async_std::task::spawn(Box::pin(peer1));
} // let () = async_std::task::block_on(peer2);
//}
fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) { fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) {
let id_keys = identity::Keypair::generate_ed25519(); let id_keys = identity::Keypair::generate_ed25519();