diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 3f993ea9..7685ecab 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -2,6 +2,10 @@ - Update `libp2p-swarm` and `libp2p-core`. +- Emit `InboundFailure::ConnectionClosed` for inbound requests that failed due + to the underlying connection closing. + [PR 1886](https://github.com/libp2p/rust-libp2p/pull/1886). + # 0.7.0 [2020-12-08] - Refine emitted events for inbound requests, introducing diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 286c3f29..0a106210 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -87,7 +87,7 @@ use libp2p_swarm::{ }; use smallvec::SmallVec; use std::{ - collections::{VecDeque, HashMap}, + collections::{HashMap, HashSet, VecDeque}, fmt, time::Duration, sync::{atomic::AtomicU64, Arc}, @@ -141,14 +141,6 @@ pub enum RequestResponseEvent error: OutboundFailure, }, /// An inbound request failed. - /// - /// > **Note**: The case whereby a connection on which a response is sent - /// > closes after [`RequestResponse::send_response`] already succeeded - /// > but before the response could be sent on the connection is reflected - /// > by there being no [`RequestResponseEvent::ResponseSent`] event. - /// > Code interested in ensuring a response has been successfully - /// > handed to the transport layer, e.g. before continuing with the next - /// > step in a multi-step protocol, should listen to these events. InboundFailure { /// The peer from whom the request was received. peer: PeerId, @@ -198,6 +190,8 @@ pub enum InboundFailure { /// [`RequestResponse::send_response`] is not called in a /// timely manner. Timeout, + /// The connection closed before a response could be send. + ConnectionClosed, /// The local peer supports none of the protocols requested /// by the remote. UnsupportedProtocols, @@ -297,15 +291,14 @@ where NetworkBehaviourAction< RequestProtocol, RequestResponseEvent>>, - /// The currently connected peers and their known, reachable addresses, if any. + /// The currently connected peers, their pending outbound and inbound responses and their known, + /// reachable addresses, if any. connected: HashMap>, /// Externally managed addresses via `add_address` and `remove_address`. addresses: HashMap>, /// Requests that have not yet been sent and are waiting for a connection /// to be established. - pending_requests: HashMap; 10]>>, - /// Responses that have not yet been received. - pending_responses: HashMap + pending_outbound_requests: HashMap; 10]>>, } impl RequestResponse @@ -337,8 +330,7 @@ where codec, pending_events: VecDeque::new(), connected: HashMap::new(), - pending_requests: HashMap::new(), - pending_responses: HashMap::new(), + pending_outbound_requests: HashMap::new(), addresses: HashMap::new(), } } @@ -382,7 +374,7 @@ where peer_id: peer.clone(), condition: DialPeerCondition::Disconnected, }); - self.pending_requests.entry(peer.clone()).or_default().push(request); + self.pending_outbound_requests.entry(peer.clone()).or_default().push(request); } request_id @@ -390,11 +382,12 @@ where /// Initiates sending a response to an inbound request. /// - /// If the `ResponseChannel` is already closed due to a timeout or - /// the connection being closed, the response is returned as an `Err` - /// for further handling. Once the response has been successfully sent - /// on the corresponding connection, [`RequestResponseEvent::ResponseSent`] - /// is emitted. + /// If the [`ResponseChannel`] is already closed due to a timeout or the + /// connection being closed, the response is returned as an `Err` for + /// further handling. Once the response has been successfully sent on the + /// corresponding connection, [`RequestResponseEvent::ResponseSent`] is + /// emitted. In all other cases [`RequestResponseEvent::InboundFailure`] + /// will be or has been emitted. /// /// The provided `ResponseChannel` is obtained from an inbound /// [`RequestResponseMessage::Request`]. @@ -434,11 +427,22 @@ where } } - /// Checks whether an outbound request initiated by - /// [`RequestResponse::send_request`] is still pending, i.e. waiting - /// for a response. - pub fn is_pending_outbound(&self, req_id: &RequestId) -> bool { - self.pending_responses.contains_key(req_id) + /// Checks whether an outbound request to the peer with the provided + /// [`PeerId`] initiated by [`RequestResponse::send_request`] is still + /// pending, i.e. waiting for a response. + pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { + self.connected.get(peer) + .map(|cs| cs.iter().any(|c| c.pending_inbound_responses.contains(request_id))) + .unwrap_or(false) + } + + /// Checks whether an inbound request from the peer with the provided + /// [`PeerId`] is still pending, i.e. waiting for a response by the local + /// node through [`RequestResponse::send_response`]. + pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { + self.connected.get(peer) + .map(|cs| cs.iter().any(|c| c.pending_outbound_responses.contains(request_id))) + .unwrap_or(false) } /// Returns the next request ID. @@ -454,16 +458,16 @@ where fn try_send_request(&mut self, peer: &PeerId, request: RequestProtocol) -> Option> { - if let Some(connections) = self.connected.get(peer) { + if let Some(connections) = self.connected.get_mut(peer) { if connections.is_empty() { return Some(request) } let ix = (request.request_id.0 as usize) % connections.len(); - let conn = connections[ix].id; - self.pending_responses.insert(request.request_id, (peer.clone(), conn)); + let conn = &mut connections[ix]; + conn.pending_inbound_responses.insert(request.request_id); self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: peer.clone(), - handler: NotifyHandler::One(conn), + handler: NotifyHandler::One(conn.id), event: request }); None @@ -471,6 +475,50 @@ where Some(request) } } + + /// Remove pending outbound response for the given peer and connection. + /// + /// Returns `true` if the provided connection to the given peer is still + /// alive and the [`RequestId`] was previously present and is now removed. + /// Returns `false` otherwise. + fn remove_pending_outbound_response( + &mut self, + peer: &PeerId, + connection: ConnectionId, + request: RequestId, + ) -> bool { + self.get_connection_mut(peer, connection) + .map(|c| c.pending_outbound_responses.remove(&request)) + .unwrap_or(false) + } + + /// Remove pending inbound response for the given peer and connection. + /// + /// Returns `true` if the provided connection to the given peer is still + /// alive and the [`RequestId`] was previously present and is now removed. + /// Returns `false` otherwise. + fn remove_pending_inbound_response( + &mut self, + peer: &PeerId, + connection: ConnectionId, + request: &RequestId, + ) -> bool { + self.get_connection_mut(peer, connection) + .map(|c| c.pending_inbound_responses.remove(request)) + .unwrap_or(false) + } + + /// Returns a mutable reference to the connection in `self.connected` + /// corresponding to the given [`PeerId`] and [`ConnectionId`]. + fn get_connection_mut( + &mut self, + peer: &PeerId, + connection: ConnectionId, + ) -> Option<&mut Connection> { + self.connected.get_mut(peer).and_then(|connections| { + connections.iter_mut().find(|c| c.id == connection) + }) + } } impl NetworkBehaviour for RequestResponse @@ -502,7 +550,7 @@ where } fn inject_connected(&mut self, peer: &PeerId) { - if let Some(pending) = self.pending_requests.remove(peer) { + if let Some(pending) = self.pending_outbound_requests.remove(peer) { for request in pending { let request = self.try_send_request(peer, request); assert!(request.is_none()); @@ -515,33 +563,44 @@ where ConnectedPoint::Dialer { address } => Some(address.clone()), ConnectedPoint::Listener { .. } => None }; - let connections = self.connected.entry(peer.clone()).or_default(); - connections.push(Connection { id: *conn, address }) + self.connected.entry(peer.clone()) + .or_default() + .push(Connection::new(*conn, address)); } - fn inject_connection_closed(&mut self, peer: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) { - if let Some(connections) = self.connected.get_mut(peer) { - if let Some(pos) = connections.iter().position(|c| &c.id == conn) { - connections.remove(pos); - } + fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) { + let connections = self.connected.get_mut(peer_id) + .expect("Expected some established connection to peer before closing."); + + let connection = connections.iter() + .position(|c| &c.id == conn) + .map(|p: usize| connections.remove(p)) + .expect("Expected connection to be established before closing."); + + if connections.is_empty() { + self.connected.remove(peer_id); } - let pending_events = &mut self.pending_events; + for request_id in connection.pending_outbound_responses { + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::InboundFailure { + peer: peer_id.clone(), + request_id, + error: InboundFailure::ConnectionClosed + } + )); - // Any pending responses of requests sent over this connection must be considered failed. - self.pending_responses.retain(|rid, (peer, cid)| { - if conn != cid { - return true - } - pending_events.push_back(NetworkBehaviourAction::GenerateEvent( + } + + for request_id in connection.pending_inbound_responses { + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent( RequestResponseEvent::OutboundFailure { - peer: peer.clone(), - request_id: *rid, + peer: peer_id.clone(), + request_id, error: OutboundFailure::ConnectionClosed } )); - false - }); + } } fn inject_disconnected(&mut self, peer: &PeerId) { @@ -555,7 +614,7 @@ where // only created when a peer is not connected when a request is made. // Thus these requests must be considered failed, even if there is // another, concurrent dialing attempt ongoing. - if let Some(pending) = self.pending_requests.remove(peer) { + if let Some(pending) = self.pending_outbound_requests.remove(peer) { for request in pending { self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent( RequestResponseEvent::OutboundFailure { @@ -571,12 +630,17 @@ where fn inject_event( &mut self, peer: PeerId, - _: ConnectionId, + connection: ConnectionId, event: RequestResponseHandlerEvent, ) { match event { RequestResponseHandlerEvent::Response { request_id, response } => { - self.pending_responses.remove(&request_id); + let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + debug_assert!( + removed, + "Expect request_id to be pending before receiving response.", + ); + let message = RequestResponseMessage::Response { request_id, response }; self.pending_events.push_back( NetworkBehaviourAction::GenerateEvent( @@ -586,15 +650,41 @@ where let channel = ResponseChannel { request_id, peer: peer.clone(), sender }; let message = RequestResponseMessage::Request { request_id, request, channel }; self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::Message { peer, message } + RequestResponseEvent::Message { peer: peer.clone(), message } )); + + match self.get_connection_mut(&peer, connection) { + Some(connection) => { + let inserted = connection.pending_outbound_responses.insert(request_id); + debug_assert!(inserted, "Expect id of new request to be unknown."); + }, + // Connection closed after `RequestResponseEvent::Request` has been emitted. + None => { + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::InboundFailure { + peer: peer.clone(), + request_id, + error: InboundFailure::ConnectionClosed + } + )); + } + } } RequestResponseHandlerEvent::ResponseSent(request_id) => { + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + debug_assert!(removed, "Expect request_id to be pending before response is sent."); + self.pending_events.push_back( NetworkBehaviourAction::GenerateEvent( RequestResponseEvent::ResponseSent { peer, request_id })); } RequestResponseHandlerEvent::ResponseOmission(request_id) => { + let removed = self.remove_pending_outbound_response(&peer, connection, request_id); + debug_assert!( + removed, + "Expect request_id to be pending before response is omitted.", + ); + self.pending_events.push_back( NetworkBehaviourAction::GenerateEvent( RequestResponseEvent::InboundFailure { @@ -604,17 +694,24 @@ where })); } RequestResponseHandlerEvent::OutboundTimeout(request_id) => { - if let Some((peer, _conn)) = self.pending_responses.remove(&request_id) { - self.pending_events.push_back( - NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::OutboundFailure { - peer, - request_id, - error: OutboundFailure::Timeout, - })); - } + let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + debug_assert!(removed, "Expect request_id to be pending before request times out."); + + self.pending_events.push_back( + NetworkBehaviourAction::GenerateEvent( + RequestResponseEvent::OutboundFailure { + peer, + request_id, + error: OutboundFailure::Timeout, + })); } RequestResponseHandlerEvent::InboundTimeout(request_id) => { + // Note: `RequestResponseHandlerEvent::InboundTimeout` is emitted both for timing + // out to receive the request and for timing out sending the response. In the former + // case the request is never added to `pending_outbound_responses` and thus one can + // not assert the request_id to be present before removing it. + self.remove_pending_outbound_response(&peer, connection, request_id); + self.pending_events.push_back( NetworkBehaviourAction::GenerateEvent( RequestResponseEvent::InboundFailure { @@ -624,6 +721,12 @@ where })); } RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => { + let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); + debug_assert!( + removed, + "Expect request_id to be pending before failing to connect.", + ); + self.pending_events.push_back( NetworkBehaviourAction::GenerateEvent( RequestResponseEvent::OutboundFailure { @@ -633,6 +736,9 @@ where })); } RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => { + // Note: No need to call `self.remove_pending_outbound_response`, + // `RequestResponseHandlerEvent::Request` was never emitted for this request and + // thus request was never added to `pending_outbound_responses`. self.pending_events.push_back( NetworkBehaviourAction::GenerateEvent( RequestResponseEvent::InboundFailure { @@ -670,4 +776,22 @@ const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100; struct Connection { id: ConnectionId, address: Option, + /// Pending outbound responses where corresponding inbound requests have + /// been received on this connection and emitted via `poll` but have not yet + /// been answered. + pending_outbound_responses: HashSet, + /// Pending inbound responses for previously sent requests on this + /// connection. + pending_inbound_responses: HashSet +} + +impl Connection { + fn new(id: ConnectionId, address: Option) -> Self { + Self { + id, + address, + pending_outbound_responses: Default::default(), + pending_inbound_responses: Default::default(), + } + } } diff --git a/protocols/request-response/src/throttled.rs b/protocols/request-response/src/throttled.rs index 8c12564c..b5555726 100644 --- a/protocols/request-response/src/throttled.rs +++ b/protocols/request-response/src/throttled.rs @@ -345,8 +345,17 @@ where /// Are we waiting for a response to the given request? /// /// See [`RequestResponse::is_pending_outbound`] for details. - pub fn is_pending_outbound(&self, p: &RequestId) -> bool { - self.behaviour.is_pending_outbound(p) + pub fn is_pending_outbound(&self, p: &PeerId, r: &RequestId) -> bool { + self.behaviour.is_pending_outbound(p, r) + } + + + /// Is the remote waiting for the local node to respond to the given + /// request? + /// + /// See [`RequestResponse::is_pending_inbound`] for details. + pub fn is_pending_inbound(&self, p: &PeerId, r: &RequestId) -> bool { + self.behaviour.is_pending_inbound(p, r) } /// Send a credit grant to the given peer. @@ -523,15 +532,10 @@ where info.send_budget.remaining += credit; info.send_budget.grant = Some(id); } - match self.behaviour.send_response(channel, Message::ack(id)) { - Err(_) => log::debug! { - "{:08x}: Failed to send ack for credit grant {}.", - self.id, id - }, - Ok(()) => { - info.send_budget.received.insert(request_id); - } - } + // Note: Failing to send a response to a credit grant is + // handled along with other inbound failures further below. + let _ = self.behaviour.send_response(channel, Message::ack(id)); + info.send_budget.received.insert(request_id); } continue } diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index de4d1983..8f029756 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -33,7 +33,7 @@ use libp2p_noise::{NoiseConfig, X25519Spec, Keypair}; use libp2p_request_response::*; use libp2p_swarm::Swarm; use libp2p_tcp::TcpConfig; -use futures::{prelude::*, channel::mpsc}; +use futures::{prelude::*, channel::mpsc, executor::LocalPool, task::SpawnExt}; use rand::{self, Rng}; use std::{io, iter}; use std::{collections::HashSet, num::NonZeroU16}; @@ -122,6 +122,59 @@ fn ping_protocol() { let () = async_std::task::block_on(peer2); } +#[test] +fn emits_inbound_connection_closed_failure() { + let ping = Ping("ping".to_string().into_bytes()); + + let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); + let cfg = RequestResponseConfig::default(); + + let (peer1_id, trans) = mk_transport(); + let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); + let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone()); + + let (peer2_id, trans) = mk_transport(); + let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); + let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone()); + + let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); + Swarm::listen_on(&mut swarm1, addr).unwrap(); + + futures::executor::block_on(async move { + while let Some(_) = swarm1.next().now_or_never() {} + let addr1 = Swarm::listeners(&swarm1).next().unwrap(); + + swarm2.add_address(&peer1_id, addr1.clone()); + swarm2.send_request(&peer1_id, ping.clone()); + + // Wait for swarm 1 to receive request by swarm 2. + let _channel = loop { + futures::select!( + event = swarm1.next().fuse() => match event { + RequestResponseEvent::Message { + peer, + message: RequestResponseMessage::Request { request, channel, .. } + } => { + assert_eq!(&request, &ping); + assert_eq!(&peer, &peer2_id); + break channel; + }, + e => panic!("Peer1: Unexpected event: {:?}", e) + }, + event = swarm2.next().fuse() => panic!("Peer2: Unexpected event: {:?}", event), + ) + }; + + // Drop swarm 2 in order for the connection between swarm 1 and 2 to close. + drop(swarm2); + + match swarm1.next().await { + RequestResponseEvent::InboundFailure { error: InboundFailure::ConnectionClosed, ..} => {}, + e => panic!("Peer1: Unexpected event: {:?}", e) + } + }); +} + #[test] fn ping_protocol_throttled() { let ping = Ping("ping".to_string().into_bytes()); @@ -215,12 +268,14 @@ fn ping_protocol_throttled() { } } e => panic!("Peer2: Unexpected event: {:?}", e) + } } }; - async_std::task::spawn(Box::pin(peer1)); - let () = async_std::task::block_on(peer2); + let mut pool = LocalPool::new(); + pool.spawner().spawn(peer1.boxed()).unwrap(); + pool.run_until(peer2); } fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox)>) { @@ -302,4 +357,3 @@ impl RequestResponseCodec for PingCodec { write_one(io, data).await } } -