protocols/request-response: Emit InboundFailure::ConnectionClosed (#1886)

A user of libp2p-request-response is guaranteed to receive an additional event
after receiving a request via `RequestResponseEvent::Message`.

After receiving the request:

- If the user responds in time and the connection is still alive, the
user can expect a `ResponseSent`.

- If the user drops the response channel, the user can expect an
`InboundFailure::ResponseOmission`.

- If the user does not respond in time, the user can expect an
`InboundFailure::Timeout`.

Thus far the user did not receive an event when the connection to the
peer closes. With this commit:

- If the connection to the peer closes before the users calls
`send_response` or after the user calls `send_response` but before the
response can be send on the network, the user can expect an
`InboundFailure::ConnectionClosed`.
This commit is contained in:
Max Inden
2020-12-16 16:03:35 +01:00
committed by GitHub
parent 23b0aa016f
commit 3af5ba45f3
4 changed files with 263 additions and 77 deletions

View File

@ -87,7 +87,7 @@ use libp2p_swarm::{
};
use smallvec::SmallVec;
use std::{
collections::{VecDeque, HashMap},
collections::{HashMap, HashSet, VecDeque},
fmt,
time::Duration,
sync::{atomic::AtomicU64, Arc},
@ -141,14 +141,6 @@ pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse>
error: OutboundFailure,
},
/// An inbound request failed.
///
/// > **Note**: The case whereby a connection on which a response is sent
/// > closes after [`RequestResponse::send_response`] already succeeded
/// > but before the response could be sent on the connection is reflected
/// > by there being no [`RequestResponseEvent::ResponseSent`] event.
/// > Code interested in ensuring a response has been successfully
/// > handed to the transport layer, e.g. before continuing with the next
/// > step in a multi-step protocol, should listen to these events.
InboundFailure {
/// The peer from whom the request was received.
peer: PeerId,
@ -198,6 +190,8 @@ pub enum InboundFailure {
/// [`RequestResponse::send_response`] is not called in a
/// timely manner.
Timeout,
/// The connection closed before a response could be send.
ConnectionClosed,
/// The local peer supports none of the protocols requested
/// by the remote.
UnsupportedProtocols,
@ -297,15 +291,14 @@ where
NetworkBehaviourAction<
RequestProtocol<TCodec>,
RequestResponseEvent<TCodec::Request, TCodec::Response>>>,
/// The currently connected peers and their known, reachable addresses, if any.
/// The currently connected peers, their pending outbound and inbound responses and their known,
/// reachable addresses, if any.
connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
/// Externally managed addresses via `add_address` and `remove_address`.
addresses: HashMap<PeerId, SmallVec<[Multiaddr; 6]>>,
/// Requests that have not yet been sent and are waiting for a connection
/// to be established.
pending_requests: HashMap<PeerId, SmallVec<[RequestProtocol<TCodec>; 10]>>,
/// Responses that have not yet been received.
pending_responses: HashMap<RequestId, (PeerId, ConnectionId)>
pending_outbound_requests: HashMap<PeerId, SmallVec<[RequestProtocol<TCodec>; 10]>>,
}
impl<TCodec> RequestResponse<TCodec>
@ -337,8 +330,7 @@ where
codec,
pending_events: VecDeque::new(),
connected: HashMap::new(),
pending_requests: HashMap::new(),
pending_responses: HashMap::new(),
pending_outbound_requests: HashMap::new(),
addresses: HashMap::new(),
}
}
@ -382,7 +374,7 @@ where
peer_id: peer.clone(),
condition: DialPeerCondition::Disconnected,
});
self.pending_requests.entry(peer.clone()).or_default().push(request);
self.pending_outbound_requests.entry(peer.clone()).or_default().push(request);
}
request_id
@ -390,11 +382,12 @@ where
/// Initiates sending a response to an inbound request.
///
/// If the `ResponseChannel` is already closed due to a timeout or
/// the connection being closed, the response is returned as an `Err`
/// for further handling. Once the response has been successfully sent
/// on the corresponding connection, [`RequestResponseEvent::ResponseSent`]
/// is emitted.
/// If the [`ResponseChannel`] is already closed due to a timeout or the
/// connection being closed, the response is returned as an `Err` for
/// further handling. Once the response has been successfully sent on the
/// corresponding connection, [`RequestResponseEvent::ResponseSent`] is
/// emitted. In all other cases [`RequestResponseEvent::InboundFailure`]
/// will be or has been emitted.
///
/// The provided `ResponseChannel` is obtained from an inbound
/// [`RequestResponseMessage::Request`].
@ -434,11 +427,22 @@ where
}
}
/// Checks whether an outbound request initiated by
/// [`RequestResponse::send_request`] is still pending, i.e. waiting
/// for a response.
pub fn is_pending_outbound(&self, req_id: &RequestId) -> bool {
self.pending_responses.contains_key(req_id)
/// Checks whether an outbound request to the peer with the provided
/// [`PeerId`] initiated by [`RequestResponse::send_request`] is still
/// pending, i.e. waiting for a response.
pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
self.connected.get(peer)
.map(|cs| cs.iter().any(|c| c.pending_inbound_responses.contains(request_id)))
.unwrap_or(false)
}
/// Checks whether an inbound request from the peer with the provided
/// [`PeerId`] is still pending, i.e. waiting for a response by the local
/// node through [`RequestResponse::send_response`].
pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
self.connected.get(peer)
.map(|cs| cs.iter().any(|c| c.pending_outbound_responses.contains(request_id)))
.unwrap_or(false)
}
/// Returns the next request ID.
@ -454,16 +458,16 @@ where
fn try_send_request(&mut self, peer: &PeerId, request: RequestProtocol<TCodec>)
-> Option<RequestProtocol<TCodec>>
{
if let Some(connections) = self.connected.get(peer) {
if let Some(connections) = self.connected.get_mut(peer) {
if connections.is_empty() {
return Some(request)
}
let ix = (request.request_id.0 as usize) % connections.len();
let conn = connections[ix].id;
self.pending_responses.insert(request.request_id, (peer.clone(), conn));
let conn = &mut connections[ix];
conn.pending_inbound_responses.insert(request.request_id);
self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
handler: NotifyHandler::One(conn),
handler: NotifyHandler::One(conn.id),
event: request
});
None
@ -471,6 +475,50 @@ where
Some(request)
}
}
/// Remove pending outbound response for the given peer and connection.
///
/// Returns `true` if the provided connection to the given peer is still
/// alive and the [`RequestId`] was previously present and is now removed.
/// Returns `false` otherwise.
fn remove_pending_outbound_response(
&mut self,
peer: &PeerId,
connection: ConnectionId,
request: RequestId,
) -> bool {
self.get_connection_mut(peer, connection)
.map(|c| c.pending_outbound_responses.remove(&request))
.unwrap_or(false)
}
/// Remove pending inbound response for the given peer and connection.
///
/// Returns `true` if the provided connection to the given peer is still
/// alive and the [`RequestId`] was previously present and is now removed.
/// Returns `false` otherwise.
fn remove_pending_inbound_response(
&mut self,
peer: &PeerId,
connection: ConnectionId,
request: &RequestId,
) -> bool {
self.get_connection_mut(peer, connection)
.map(|c| c.pending_inbound_responses.remove(request))
.unwrap_or(false)
}
/// Returns a mutable reference to the connection in `self.connected`
/// corresponding to the given [`PeerId`] and [`ConnectionId`].
fn get_connection_mut(
&mut self,
peer: &PeerId,
connection: ConnectionId,
) -> Option<&mut Connection> {
self.connected.get_mut(peer).and_then(|connections| {
connections.iter_mut().find(|c| c.id == connection)
})
}
}
impl<TCodec> NetworkBehaviour for RequestResponse<TCodec>
@ -502,7 +550,7 @@ where
}
fn inject_connected(&mut self, peer: &PeerId) {
if let Some(pending) = self.pending_requests.remove(peer) {
if let Some(pending) = self.pending_outbound_requests.remove(peer) {
for request in pending {
let request = self.try_send_request(peer, request);
assert!(request.is_none());
@ -515,33 +563,44 @@ where
ConnectedPoint::Dialer { address } => Some(address.clone()),
ConnectedPoint::Listener { .. } => None
};
let connections = self.connected.entry(peer.clone()).or_default();
connections.push(Connection { id: *conn, address })
self.connected.entry(peer.clone())
.or_default()
.push(Connection::new(*conn, address));
}
fn inject_connection_closed(&mut self, peer: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) {
if let Some(connections) = self.connected.get_mut(peer) {
if let Some(pos) = connections.iter().position(|c| &c.id == conn) {
connections.remove(pos);
}
fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) {
let connections = self.connected.get_mut(peer_id)
.expect("Expected some established connection to peer before closing.");
let connection = connections.iter()
.position(|c| &c.id == conn)
.map(|p: usize| connections.remove(p))
.expect("Expected connection to be established before closing.");
if connections.is_empty() {
self.connected.remove(peer_id);
}
let pending_events = &mut self.pending_events;
for request_id in connection.pending_outbound_responses {
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
peer: peer_id.clone(),
request_id,
error: InboundFailure::ConnectionClosed
}
));
// Any pending responses of requests sent over this connection must be considered failed.
self.pending_responses.retain(|rid, (peer, cid)| {
if conn != cid {
return true
}
pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
}
for request_id in connection.pending_inbound_responses {
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
peer: peer.clone(),
request_id: *rid,
peer: peer_id.clone(),
request_id,
error: OutboundFailure::ConnectionClosed
}
));
false
});
}
}
fn inject_disconnected(&mut self, peer: &PeerId) {
@ -555,7 +614,7 @@ where
// only created when a peer is not connected when a request is made.
// Thus these requests must be considered failed, even if there is
// another, concurrent dialing attempt ongoing.
if let Some(pending) = self.pending_requests.remove(peer) {
if let Some(pending) = self.pending_outbound_requests.remove(peer) {
for request in pending {
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
@ -571,12 +630,17 @@ where
fn inject_event(
&mut self,
peer: PeerId,
_: ConnectionId,
connection: ConnectionId,
event: RequestResponseHandlerEvent<TCodec>,
) {
match event {
RequestResponseHandlerEvent::Response { request_id, response } => {
self.pending_responses.remove(&request_id);
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
debug_assert!(
removed,
"Expect request_id to be pending before receiving response.",
);
let message = RequestResponseMessage::Response { request_id, response };
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
@ -586,15 +650,41 @@ where
let channel = ResponseChannel { request_id, peer: peer.clone(), sender };
let message = RequestResponseMessage::Request { request_id, request, channel };
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::Message { peer, message }
RequestResponseEvent::Message { peer: peer.clone(), message }
));
match self.get_connection_mut(&peer, connection) {
Some(connection) => {
let inserted = connection.pending_outbound_responses.insert(request_id);
debug_assert!(inserted, "Expect id of new request to be unknown.");
},
// Connection closed after `RequestResponseEvent::Request` has been emitted.
None => {
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
peer: peer.clone(),
request_id,
error: InboundFailure::ConnectionClosed
}
));
}
}
}
RequestResponseHandlerEvent::ResponseSent(request_id) => {
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
debug_assert!(removed, "Expect request_id to be pending before response is sent.");
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::ResponseSent { peer, request_id }));
}
RequestResponseHandlerEvent::ResponseOmission(request_id) => {
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
debug_assert!(
removed,
"Expect request_id to be pending before response is omitted.",
);
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
@ -604,17 +694,24 @@ where
}));
}
RequestResponseHandlerEvent::OutboundTimeout(request_id) => {
if let Some((peer, _conn)) = self.pending_responses.remove(&request_id) {
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
peer,
request_id,
error: OutboundFailure::Timeout,
}));
}
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
debug_assert!(removed, "Expect request_id to be pending before request times out.");
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
peer,
request_id,
error: OutboundFailure::Timeout,
}));
}
RequestResponseHandlerEvent::InboundTimeout(request_id) => {
// Note: `RequestResponseHandlerEvent::InboundTimeout` is emitted both for timing
// out to receive the request and for timing out sending the response. In the former
// case the request is never added to `pending_outbound_responses` and thus one can
// not assert the request_id to be present before removing it.
self.remove_pending_outbound_response(&peer, connection, request_id);
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
@ -624,6 +721,12 @@ where
}));
}
RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => {
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
debug_assert!(
removed,
"Expect request_id to be pending before failing to connect.",
);
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
@ -633,6 +736,9 @@ where
}));
}
RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => {
// Note: No need to call `self.remove_pending_outbound_response`,
// `RequestResponseHandlerEvent::Request` was never emitted for this request and
// thus request was never added to `pending_outbound_responses`.
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
@ -670,4 +776,22 @@ const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
struct Connection {
id: ConnectionId,
address: Option<Multiaddr>,
/// Pending outbound responses where corresponding inbound requests have
/// been received on this connection and emitted via `poll` but have not yet
/// been answered.
pending_outbound_responses: HashSet<RequestId>,
/// Pending inbound responses for previously sent requests on this
/// connection.
pending_inbound_responses: HashSet<RequestId>
}
impl Connection {
fn new(id: ConnectionId, address: Option<Multiaddr>) -> Self {
Self {
id,
address,
pending_outbound_responses: Default::default(),
pending_inbound_responses: Default::default(),
}
}
}

View File

@ -345,8 +345,17 @@ where
/// Are we waiting for a response to the given request?
///
/// See [`RequestResponse::is_pending_outbound`] for details.
pub fn is_pending_outbound(&self, p: &RequestId) -> bool {
self.behaviour.is_pending_outbound(p)
pub fn is_pending_outbound(&self, p: &PeerId, r: &RequestId) -> bool {
self.behaviour.is_pending_outbound(p, r)
}
/// Is the remote waiting for the local node to respond to the given
/// request?
///
/// See [`RequestResponse::is_pending_inbound`] for details.
pub fn is_pending_inbound(&self, p: &PeerId, r: &RequestId) -> bool {
self.behaviour.is_pending_inbound(p, r)
}
/// Send a credit grant to the given peer.
@ -523,15 +532,10 @@ where
info.send_budget.remaining += credit;
info.send_budget.grant = Some(id);
}
match self.behaviour.send_response(channel, Message::ack(id)) {
Err(_) => log::debug! {
"{:08x}: Failed to send ack for credit grant {}.",
self.id, id
},
Ok(()) => {
info.send_budget.received.insert(request_id);
}
}
// Note: Failing to send a response to a credit grant is
// handled along with other inbound failures further below.
let _ = self.behaviour.send_response(channel, Message::ack(id));
info.send_budget.received.insert(request_id);
}
continue
}