mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-24 15:21:33 +00:00
protocols/request-response: Close response stream even if response cannot be sent (#1987)
Co-authored-by: Roman Borschel <romanb@users.noreply.github.com> Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@ -106,6 +106,7 @@ where
|
|||||||
let write = self.codec.write_response(&protocol, &mut io, response);
|
let write = self.codec.write_response(&protocol, &mut io, response);
|
||||||
write.await?;
|
write.await?;
|
||||||
} else {
|
} else {
|
||||||
|
io.close().await?;
|
||||||
return Ok(false)
|
return Ok(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -163,7 +163,7 @@ pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse>
|
|||||||
|
|
||||||
/// Possible failures occurring in the context of sending
|
/// Possible failures occurring in the context of sending
|
||||||
/// an outbound request and receiving the response.
|
/// an outbound request and receiving the response.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
pub enum OutboundFailure {
|
pub enum OutboundFailure {
|
||||||
/// The request could not be sent because a dialing attempt failed.
|
/// The request could not be sent because a dialing attempt failed.
|
||||||
DialFailure,
|
DialFailure,
|
||||||
@ -183,7 +183,7 @@ pub enum OutboundFailure {
|
|||||||
|
|
||||||
/// Possible failures occurring in the context of receiving an
|
/// Possible failures occurring in the context of receiving an
|
||||||
/// inbound request and sending a response.
|
/// inbound request and sending a response.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, PartialEq)]
|
||||||
pub enum InboundFailure {
|
pub enum InboundFailure {
|
||||||
/// The inbound request timed out, either while reading the
|
/// The inbound request timed out, either while reading the
|
||||||
/// incoming request or before a response is sent, e.g. if
|
/// incoming request or before a response is sent, e.g. if
|
||||||
|
@ -47,8 +47,8 @@ fn is_response_outbound() {
|
|||||||
let cfg = RequestResponseConfig::default();
|
let cfg = RequestResponseConfig::default();
|
||||||
|
|
||||||
let (peer1_id, trans) = mk_transport();
|
let (peer1_id, trans) = mk_transport();
|
||||||
let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone());
|
let ping_proto1 = RequestResponse::new(PingCodec(), protocols, cfg);
|
||||||
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone());
|
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id);
|
||||||
|
|
||||||
let request_id1 = swarm1.send_request(&offline_peer, ping.clone());
|
let request_id1 = swarm1.send_request(&offline_peer, ping.clone());
|
||||||
|
|
||||||
@ -60,7 +60,7 @@ fn is_response_outbound() {
|
|||||||
e => panic!("Peer: Unexpected event: {:?}", e),
|
e => panic!("Peer: Unexpected event: {:?}", e),
|
||||||
}
|
}
|
||||||
|
|
||||||
let request_id2 = swarm1.send_request(&offline_peer, ping.clone());
|
let request_id2 = swarm1.send_request(&offline_peer, ping);
|
||||||
|
|
||||||
assert!(!swarm1.is_pending_outbound(&offline_peer, &request_id1));
|
assert!(!swarm1.is_pending_outbound(&offline_peer, &request_id1));
|
||||||
assert!(swarm1.is_pending_outbound(&offline_peer, &request_id2));
|
assert!(swarm1.is_pending_outbound(&offline_peer, &request_id2));
|
||||||
@ -77,11 +77,11 @@ fn ping_protocol() {
|
|||||||
|
|
||||||
let (peer1_id, trans) = mk_transport();
|
let (peer1_id, trans) = mk_transport();
|
||||||
let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone());
|
let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone());
|
||||||
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone());
|
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id);
|
||||||
|
|
||||||
let (peer2_id, trans) = mk_transport();
|
let (peer2_id, trans) = mk_transport();
|
||||||
let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg);
|
let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg);
|
||||||
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone());
|
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id);
|
||||||
|
|
||||||
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
|
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
|
||||||
|
|
||||||
@ -157,17 +157,17 @@ fn emits_inbound_connection_closed_failure() {
|
|||||||
|
|
||||||
let (peer1_id, trans) = mk_transport();
|
let (peer1_id, trans) = mk_transport();
|
||||||
let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone());
|
let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone());
|
||||||
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone());
|
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id);
|
||||||
|
|
||||||
let (peer2_id, trans) = mk_transport();
|
let (peer2_id, trans) = mk_transport();
|
||||||
let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg);
|
let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg);
|
||||||
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone());
|
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id);
|
||||||
|
|
||||||
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
|
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
|
||||||
Swarm::listen_on(&mut swarm1, addr).unwrap();
|
Swarm::listen_on(&mut swarm1, addr).unwrap();
|
||||||
|
|
||||||
futures::executor::block_on(async move {
|
futures::executor::block_on(async move {
|
||||||
while let Some(_) = swarm1.next().now_or_never() {}
|
while swarm1.next().now_or_never().is_some() {}
|
||||||
let addr1 = Swarm::listeners(&swarm1).next().unwrap();
|
let addr1 = Swarm::listeners(&swarm1).next().unwrap();
|
||||||
|
|
||||||
swarm2.add_address(&peer1_id, addr1.clone());
|
swarm2.add_address(&peer1_id, addr1.clone());
|
||||||
@ -201,6 +201,64 @@ fn emits_inbound_connection_closed_failure() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// We expect the substream to be properly closed when response channel is dropped.
|
||||||
|
/// Since the ping protocol used here expects a response, the sender considers this
|
||||||
|
/// early close as a protocol violation which results in the connection being closed.
|
||||||
|
/// If the substream were not properly closed when dropped, the sender would instead
|
||||||
|
/// run into a timeout waiting for the response.
|
||||||
|
#[test]
|
||||||
|
fn emits_inbound_connection_closed_if_channel_is_dropped() {
|
||||||
|
let ping = Ping("ping".to_string().into_bytes());
|
||||||
|
|
||||||
|
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
|
||||||
|
let cfg = RequestResponseConfig::default();
|
||||||
|
|
||||||
|
let (peer1_id, trans) = mk_transport();
|
||||||
|
let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone());
|
||||||
|
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id);
|
||||||
|
|
||||||
|
let (peer2_id, trans) = mk_transport();
|
||||||
|
let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg);
|
||||||
|
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id);
|
||||||
|
|
||||||
|
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
|
||||||
|
Swarm::listen_on(&mut swarm1, addr).unwrap();
|
||||||
|
|
||||||
|
futures::executor::block_on(async move {
|
||||||
|
while swarm1.next().now_or_never().is_some() {}
|
||||||
|
let addr1 = Swarm::listeners(&swarm1).next().unwrap();
|
||||||
|
|
||||||
|
swarm2.add_address(&peer1_id, addr1.clone());
|
||||||
|
swarm2.send_request(&peer1_id, ping.clone());
|
||||||
|
|
||||||
|
// Wait for swarm 1 to receive request by swarm 2.
|
||||||
|
let event = loop {
|
||||||
|
futures::select!(
|
||||||
|
event = swarm1.next().fuse() => if let RequestResponseEvent::Message {
|
||||||
|
peer,
|
||||||
|
message: RequestResponseMessage::Request { request, channel, .. }
|
||||||
|
} = event {
|
||||||
|
assert_eq!(&request, &ping);
|
||||||
|
assert_eq!(&peer, &peer2_id);
|
||||||
|
|
||||||
|
drop(channel);
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
event = swarm2.next().fuse() => {
|
||||||
|
break event;
|
||||||
|
},
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
let error = match event {
|
||||||
|
RequestResponseEvent::OutboundFailure { error, .. } => error,
|
||||||
|
e => panic!("unexpected event from peer 2: {:?}", e)
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(error, OutboundFailure::ConnectionClosed);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn ping_protocol_throttled() {
|
fn ping_protocol_throttled() {
|
||||||
let ping = Ping("ping".to_string().into_bytes());
|
let ping = Ping("ping".to_string().into_bytes());
|
||||||
@ -211,11 +269,11 @@ fn ping_protocol_throttled() {
|
|||||||
|
|
||||||
let (peer1_id, trans) = mk_transport();
|
let (peer1_id, trans) = mk_transport();
|
||||||
let ping_proto1 = RequestResponse::throttled(PingCodec(), protocols.clone(), cfg.clone());
|
let ping_proto1 = RequestResponse::throttled(PingCodec(), protocols.clone(), cfg.clone());
|
||||||
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone());
|
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id);
|
||||||
|
|
||||||
let (peer2_id, trans) = mk_transport();
|
let (peer2_id, trans) = mk_transport();
|
||||||
let ping_proto2 = RequestResponse::throttled(PingCodec(), protocols, cfg);
|
let ping_proto2 = RequestResponse::throttled(PingCodec(), protocols, cfg);
|
||||||
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone());
|
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id);
|
||||||
|
|
||||||
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
|
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user