refactor(request-response): revise public API to follow naming convention (#3159)

This commit is contained in:
João Oliveira
2022-12-13 12:11:42 +00:00
committed by GitHub
parent cbf0a273cd
commit f828db60cb
11 changed files with 389 additions and 228 deletions

View File

@ -25,8 +25,12 @@ use futures::prelude::*;
use std::io;
/// A `RequestResponseCodec` defines the request and response types
/// for a [`RequestResponse`](crate::RequestResponse) protocol or
/// for a request-response `Behaviour` protocol or
/// protocol family and how they are encoded / decoded on an I/O stream.
#[deprecated(
since = "0.24.0",
note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::Codec`"
)]
#[async_trait]
pub trait RequestResponseCodec {
/// The type of protocol(s) or protocol versions being negotiated.
@ -78,3 +82,118 @@ pub trait RequestResponseCodec {
where
T: AsyncWrite + Unpin + Send;
}
/// A `Codec` defines the request and response types
/// for a request-response [`Behaviour`](crate::Behaviour) protocol or
/// protocol family and how they are encoded / decoded on an I/O stream.
#[async_trait]
pub trait Codec {
/// The type of protocol(s) or protocol versions being negotiated.
type Protocol: ProtocolName + Send + Clone;
/// The type of inbound and outbound requests.
type Request: Send;
/// The type of inbound and outbound responses.
type Response: Send;
/// Reads a request from the given I/O stream according to the
/// negotiated protocol.
async fn read_request<T>(
&mut self,
protocol: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send;
/// Reads a response from the given I/O stream according to the
/// negotiated protocol.
async fn read_response<T>(
&mut self,
protocol: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send;
/// Writes a request to the given I/O stream according to the
/// negotiated protocol.
async fn write_request<T>(
&mut self,
protocol: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send;
/// Writes a response to the given I/O stream according to the
/// negotiated protocol.
async fn write_response<T>(
&mut self,
protocol: &Self::Protocol,
io: &mut T,
res: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send;
}
#[allow(deprecated)]
#[async_trait]
impl<U> Codec for U
where
U: RequestResponseCodec + Send,
U::Protocol: Sync,
{
type Protocol = U::Protocol;
type Request = U::Request;
type Response = U::Response;
async fn read_request<T>(
&mut self,
protocol: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
self.read_request(protocol, io).await
}
async fn read_response<T>(
&mut self,
protocol: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
self.read_response(protocol, io).await
}
async fn write_request<T>(
&mut self,
protocol: &Self::Protocol,
io: &mut T,
req: Self::Request,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
self.write_request(protocol, io, req).await
}
async fn write_response<T>(
&mut self,
protocol: &Self::Protocol,
io: &mut T,
res: Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
self.write_response(protocol, io, res).await
}
}

View File

@ -20,7 +20,7 @@
mod protocol;
use crate::codec::RequestResponseCodec;
use crate::codec::Codec;
use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD};
use libp2p_swarm::handler::{
@ -48,11 +48,16 @@ use std::{
time::Duration,
};
/// A connection handler of a `RequestResponse` protocol.
#[doc(hidden)]
pub struct RequestResponseHandler<TCodec>
#[deprecated(
since = "0.24.0",
note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::handler::Handler`"
)]
pub type RequestResponseHandler<TCodec> = Handler<TCodec>;
/// A connection handler for a request response [`Behaviour`](super::Behaviour) protocol.
pub struct Handler<TCodec>
where
TCodec: RequestResponseCodec,
TCodec: Codec,
{
/// The supported inbound protocols.
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
@ -69,7 +74,7 @@ where
/// A pending fatal error that results in the connection being closed.
pending_error: Option<ConnectionHandlerUpgrErr<io::Error>>,
/// Queue of events to emit in `poll()`.
pending_events: VecDeque<RequestResponseHandlerEvent<TCodec>>,
pending_events: VecDeque<Event<TCodec>>,
/// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
outbound: VecDeque<RequestProtocol<TCodec>>,
/// Inbound upgrades waiting for the incoming request.
@ -88,9 +93,9 @@ where
inbound_request_id: Arc<AtomicU64>,
}
impl<TCodec> RequestResponseHandler<TCodec>
impl<TCodec> Handler<TCodec>
where
TCodec: RequestResponseCodec + Send + Clone + 'static,
TCodec: Codec + Send + Clone + 'static,
{
pub(super) fn new(
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
@ -125,10 +130,10 @@ where
) {
if sent {
self.pending_events
.push_back(RequestResponseHandlerEvent::ResponseSent(request_id))
.push_back(Event::ResponseSent(request_id))
} else {
self.pending_events
.push_back(RequestResponseHandlerEvent::ResponseOmission(request_id))
.push_back(Event::ResponseOmission(request_id))
}
}
@ -141,8 +146,7 @@ where
) {
match error {
ConnectionHandlerUpgrErr::Timeout => {
self.pending_events
.push_back(RequestResponseHandlerEvent::OutboundTimeout(info));
self.pending_events.push_back(Event::OutboundTimeout(info));
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The remote merely doesn't support the protocol(s) we requested.
@ -150,9 +154,8 @@ where
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the remote peer does not support the requested protocol(s).
self.pending_events.push_back(
RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info),
);
self.pending_events
.push_back(Event::OutboundUnsupportedProtocols(info));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
@ -169,18 +172,17 @@ where
>,
) {
match error {
ConnectionHandlerUpgrErr::Timeout => self
.pending_events
.push_back(RequestResponseHandlerEvent::InboundTimeout(info)),
ConnectionHandlerUpgrErr::Timeout => {
self.pending_events.push_back(Event::InboundTimeout(info))
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The local peer merely doesn't support the protocol(s) requested.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the local peer does not support the requested protocol(s).
self.pending_events.push_back(
RequestResponseHandlerEvent::InboundUnsupportedProtocols(info),
);
self.pending_events
.push_back(Event::InboundUnsupportedProtocols(info));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
@ -191,11 +193,16 @@ where
}
}
/// The events emitted by the [`RequestResponseHandler`].
#[doc(hidden)]
pub enum RequestResponseHandlerEvent<TCodec>
#[deprecated(
since = "0.24.0",
note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::handler::Event`"
)]
pub type RequestResponseHandlerEvent<TCodec> = Event<TCodec>;
/// The events emitted by the [`Handler`].
pub enum Event<TCodec>
where
TCodec: RequestResponseCodec,
TCodec: Codec,
{
/// A request has been received.
Request {
@ -225,58 +232,58 @@ where
InboundUnsupportedProtocols(RequestId),
}
impl<TCodec: RequestResponseCodec> fmt::Debug for RequestResponseHandlerEvent<TCodec> {
impl<TCodec: Codec> fmt::Debug for Event<TCodec> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
RequestResponseHandlerEvent::Request {
Event::Request {
request_id,
request: _,
sender: _,
} => f
.debug_struct("RequestResponseHandlerEvent::Request")
.debug_struct("Event::Request")
.field("request_id", request_id)
.finish(),
RequestResponseHandlerEvent::Response {
Event::Response {
request_id,
response: _,
} => f
.debug_struct("RequestResponseHandlerEvent::Response")
.debug_struct("Event::Response")
.field("request_id", request_id)
.finish(),
RequestResponseHandlerEvent::ResponseSent(request_id) => f
.debug_tuple("RequestResponseHandlerEvent::ResponseSent")
Event::ResponseSent(request_id) => f
.debug_tuple("Event::ResponseSent")
.field(request_id)
.finish(),
RequestResponseHandlerEvent::ResponseOmission(request_id) => f
.debug_tuple("RequestResponseHandlerEvent::ResponseOmission")
Event::ResponseOmission(request_id) => f
.debug_tuple("Event::ResponseOmission")
.field(request_id)
.finish(),
RequestResponseHandlerEvent::OutboundTimeout(request_id) => f
.debug_tuple("RequestResponseHandlerEvent::OutboundTimeout")
Event::OutboundTimeout(request_id) => f
.debug_tuple("Event::OutboundTimeout")
.field(request_id)
.finish(),
RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => f
.debug_tuple("RequestResponseHandlerEvent::OutboundUnsupportedProtocols")
Event::OutboundUnsupportedProtocols(request_id) => f
.debug_tuple("Event::OutboundUnsupportedProtocols")
.field(request_id)
.finish(),
RequestResponseHandlerEvent::InboundTimeout(request_id) => f
.debug_tuple("RequestResponseHandlerEvent::InboundTimeout")
Event::InboundTimeout(request_id) => f
.debug_tuple("Event::InboundTimeout")
.field(request_id)
.finish(),
RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => f
.debug_tuple("RequestResponseHandlerEvent::InboundUnsupportedProtocols")
Event::InboundUnsupportedProtocols(request_id) => f
.debug_tuple("Event::InboundUnsupportedProtocols")
.field(request_id)
.finish(),
}
}
}
impl<TCodec> ConnectionHandler for RequestResponseHandler<TCodec>
impl<TCodec> ConnectionHandler for Handler<TCodec>
where
TCodec: RequestResponseCodec + Send + Clone + 'static,
TCodec: Codec + Send + Clone + 'static,
{
type InEvent = RequestProtocol<TCodec>;
type OutEvent = RequestResponseHandlerEvent<TCodec>;
type OutEvent = Event<TCodec>;
type Error = ConnectionHandlerUpgrErr<io::Error>;
type InboundProtocol = ResponseProtocol<TCodec>;
type OutboundProtocol = RequestProtocol<TCodec>;
@ -309,7 +316,7 @@ where
};
// The handler waits for the request to come in. It then emits
// `RequestResponseHandlerEvent::Request` together with a
// `Event::Request` together with a
// `ResponseChannel`.
self.inbound
.push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed());
@ -350,13 +357,11 @@ where
Ok(((id, rq), rs_sender)) => {
// We received an inbound request.
self.keep_alive = KeepAlive::Yes;
return Poll::Ready(ConnectionHandlerEvent::Custom(
RequestResponseHandlerEvent::Request {
request_id: id,
request: rq,
sender: rs_sender,
},
));
return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Request {
request_id: id,
request: rq,
sender: rs_sender,
}));
}
Err(oneshot::Canceled) => {
// The inbound upgrade has errored or timed out reading
@ -409,11 +414,10 @@ where
protocol: response,
info: request_id,
}) => {
self.pending_events
.push_back(RequestResponseHandlerEvent::Response {
request_id,
response,
});
self.pending_events.push_back(Event::Response {
request_id,
response,
});
}
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error)

View File

@ -23,7 +23,7 @@
//! receives a request and sends a response, whereas the
//! outbound upgrade send a request and receives a response.
use crate::codec::RequestResponseCodec;
use crate::codec::Codec;
use crate::RequestId;
use futures::{channel::oneshot, future::BoxFuture, prelude::*};
@ -67,7 +67,7 @@ impl ProtocolSupport {
#[derive(Debug)]
pub struct ResponseProtocol<TCodec>
where
TCodec: RequestResponseCodec,
TCodec: Codec,
{
pub(crate) codec: TCodec,
pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
@ -78,7 +78,7 @@ where
impl<TCodec> UpgradeInfo for ResponseProtocol<TCodec>
where
TCodec: RequestResponseCodec,
TCodec: Codec,
{
type Info = TCodec::Protocol;
type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
@ -90,7 +90,7 @@ where
impl<TCodec> InboundUpgrade<NegotiatedSubstream> for ResponseProtocol<TCodec>
where
TCodec: RequestResponseCodec + Send + 'static,
TCodec: Codec + Send + 'static,
{
type Output = bool;
type Error = io::Error;
@ -132,7 +132,7 @@ where
/// Sends a request and receives a response.
pub struct RequestProtocol<TCodec>
where
TCodec: RequestResponseCodec,
TCodec: Codec,
{
pub(crate) codec: TCodec,
pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
@ -142,7 +142,7 @@ where
impl<TCodec> fmt::Debug for RequestProtocol<TCodec>
where
TCodec: RequestResponseCodec,
TCodec: Codec,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RequestProtocol")
@ -153,7 +153,7 @@ where
impl<TCodec> UpgradeInfo for RequestProtocol<TCodec>
where
TCodec: RequestResponseCodec,
TCodec: Codec,
{
type Info = TCodec::Protocol;
type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
@ -165,7 +165,7 @@ where
impl<TCodec> OutboundUpgrade<NegotiatedSubstream> for RequestProtocol<TCodec>
where
TCodec: RequestResponseCodec + Send + 'static,
TCodec: Codec + Send + 'static,
{
type Output = TCodec::Response;
type Error = io::Error;

View File

@ -22,34 +22,34 @@
//!
//! ## General Usage
//!
//! [`RequestResponse`] is a `NetworkBehaviour` that implements a generic
//! The [`Behaviour`] struct is a [`NetworkBehaviour`] that implements a generic
//! request/response protocol or protocol family, whereby each request is
//! sent over a new substream on a connection. `RequestResponse` is generic
//! sent over a new substream on a connection. `Behaviour` is generic
//! over the actual messages being sent, which are defined in terms of a
//! [`RequestResponseCodec`]. Creating a request/response protocol thus amounts
//! [`Codec`]. Creating a request/response protocol thus amounts
//! to providing an implementation of this trait which can then be
//! given to [`RequestResponse::new`]. Further configuration options are
//! available via the [`RequestResponseConfig`].
//! given to [`Behaviour::new`]. Further configuration options are
//! available via the [`Config`].
//!
//! Requests are sent using [`RequestResponse::send_request`] and the
//! responses received as [`RequestResponseMessage::Response`] via
//! [`RequestResponseEvent::Message`].
//! Requests are sent using [`Behaviour::send_request`] and the
//! responses received as [`Message::Response`] via
//! [`Event::Message`].
//!
//! Responses are sent using [`RequestResponse::send_response`] upon
//! receiving a [`RequestResponseMessage::Request`] via
//! [`RequestResponseEvent::Message`].
//! Responses are sent using [`Behaviour::send_response`] upon
//! receiving a [`Message::Request`] via
//! [`Event::Message`].
//!
//! ## Protocol Families
//!
//! A single [`RequestResponse`] instance can be used with an entire
//! A single [`Behaviour`] instance can be used with an entire
//! protocol family that share the same request and response types.
//! For that purpose, [`RequestResponseCodec::Protocol`] is typically
//! For that purpose, [`Codec::Protocol`] is typically
//! instantiated with a sum type.
//!
//! ## Limited Protocol Support
//!
//! It is possible to only support inbound or outbound requests for
//! a particular protocol. This is achieved by instantiating `RequestResponse`
//! a particular protocol. This is achieved by instantiating `Behaviour`
//! with protocols using [`ProtocolSupport::Inbound`] or
//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol
//! family can be configured in this way. Such protocols will not be
@ -61,11 +61,15 @@
pub mod codec;
pub mod handler;
pub use codec::{ProtocolName, RequestResponseCodec};
pub use codec::{Codec, ProtocolName};
#[allow(deprecated)]
pub use codec::RequestResponseCodec;
pub use handler::ProtocolSupport;
use futures::channel::oneshot;
use handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent};
use handler::{Handler, RequestProtocol};
use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::{
behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm},
@ -81,9 +85,40 @@ use std::{
time::Duration,
};
#[deprecated(
since = "0.24.0",
note = "Use libp2p::request_response::Behaviour instead."
)]
pub type RequestResponse<TCodec> = Behaviour<TCodec>;
#[deprecated(
since = "0.24.0",
note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::Config`"
)]
pub type RequestResponseConfig = Config;
#[deprecated(
since = "0.24.0",
note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::Event`"
)]
pub type RequestResponseEvent<TRequest, TResponse> = Event<TRequest, TResponse>;
#[deprecated(
since = "0.24.0",
note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::Message`"
)]
pub type RequestResponseMessage<TRequest, TResponse, TChannelResponse> =
Message<TRequest, TResponse, TChannelResponse>;
#[deprecated(
since = "0.24.0",
note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::handler::Event`"
)]
pub type HandlerEvent<TCodec> = handler::Event<TCodec>;
/// An inbound request or response.
#[derive(Debug)]
pub enum RequestResponseMessage<TRequest, TResponse, TChannelResponse = TResponse> {
pub enum Message<TRequest, TResponse, TChannelResponse = TResponse> {
/// A request message.
Request {
/// The ID of this request.
@ -93,7 +128,7 @@ pub enum RequestResponseMessage<TRequest, TResponse, TChannelResponse = TRespons
/// The channel waiting for the response.
///
/// If this channel is dropped instead of being used to send a response
/// via [`RequestResponse::send_response`], a [`RequestResponseEvent::InboundFailure`]
/// via [`Behaviour::send_response`], a [`Event::InboundFailure`]
/// with [`InboundFailure::ResponseOmission`] is emitted.
channel: ResponseChannel<TChannelResponse>,
},
@ -101,22 +136,22 @@ pub enum RequestResponseMessage<TRequest, TResponse, TChannelResponse = TRespons
Response {
/// The ID of the request that produced this response.
///
/// See [`RequestResponse::send_request`].
/// See [`Behaviour::send_request`].
request_id: RequestId,
/// The response message.
response: TResponse,
},
}
/// The events emitted by a [`RequestResponse`] protocol.
/// The events emitted by a request-response [`Behaviour`].
#[derive(Debug)]
pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse> {
pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
/// An incoming message (request or response).
Message {
/// The peer who sent the message.
peer: PeerId,
/// The incoming message.
message: RequestResponseMessage<TRequest, TResponse, TChannelResponse>,
message: Message<TRequest, TResponse, TChannelResponse>,
},
/// An outbound request failed.
OutboundFailure {
@ -191,7 +226,7 @@ impl std::error::Error for OutboundFailure {}
pub enum InboundFailure {
/// The inbound request timed out, either while reading the
/// incoming request or before a response is sent, e.g. if
/// [`RequestResponse::send_response`] is not called in a
/// [`Behaviour::send_response`] is not called in a
/// timely manner.
Timeout,
/// The connection closed before a response could be send.
@ -201,7 +236,7 @@ pub enum InboundFailure {
UnsupportedProtocols,
/// The local peer failed to respond to an inbound request
/// due to the [`ResponseChannel`] being dropped instead of
/// being passed to [`RequestResponse::send_response`].
/// being passed to [`Behaviour::send_response`].
ResponseOmission,
}
@ -230,7 +265,7 @@ impl std::error::Error for InboundFailure {}
/// A channel for sending a response to an inbound request.
///
/// See [`RequestResponse::send_response`].
/// See [`Behaviour::send_response`].
#[derive(Debug)]
pub struct ResponseChannel<TResponse> {
sender: oneshot::Sender<TResponse>,
@ -238,8 +273,8 @@ pub struct ResponseChannel<TResponse> {
impl<TResponse> ResponseChannel<TResponse> {
/// Checks whether the response channel is still open, i.e.
/// the `RequestResponse` behaviour is still waiting for a
/// a response to be sent via [`RequestResponse::send_response`]
/// the `Behaviour` is still waiting for a
/// a response to be sent via [`Behaviour::send_response`]
/// and this response channel.
///
/// If the response channel is no longer open then the inbound
@ -255,7 +290,7 @@ impl<TResponse> ResponseChannel<TResponse> {
/// inbound and likewise between two outbound requests. There is no
/// uniqueness guarantee in a set of both inbound and outbound
/// [`RequestId`]s nor in a set of inbound or outbound requests
/// originating from different [`RequestResponse`] behaviours.
/// originating from different [`Behaviour`]'s.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct RequestId(u64);
@ -265,14 +300,14 @@ impl fmt::Display for RequestId {
}
}
/// The configuration for a `RequestResponse` protocol.
/// The configuration for a `Behaviour` protocol.
#[derive(Debug, Clone)]
pub struct RequestResponseConfig {
pub struct Config {
request_timeout: Duration,
connection_keep_alive: Duration,
}
impl Default for RequestResponseConfig {
impl Default for Config {
fn default() -> Self {
Self {
connection_keep_alive: Duration::from_secs(10),
@ -281,7 +316,7 @@ impl Default for RequestResponseConfig {
}
}
impl RequestResponseConfig {
impl Config {
/// Sets the keep-alive timeout of idle connections.
pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self {
self.connection_keep_alive = v;
@ -296,9 +331,9 @@ impl RequestResponseConfig {
}
/// A request/response protocol for some message codec.
pub struct RequestResponse<TCodec>
pub struct Behaviour<TCodec>
where
TCodec: RequestResponseCodec + Clone + Send + 'static,
TCodec: Codec + Clone + Send + 'static,
{
/// The supported inbound protocols.
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
@ -309,16 +344,12 @@ where
/// The next (inbound) request ID.
next_inbound_id: Arc<AtomicU64>,
/// The protocol configuration.
config: RequestResponseConfig,
config: Config,
/// The protocol codec for reading and writing requests and responses.
codec: TCodec,
/// Pending events to return from `poll`.
pending_events: VecDeque<
NetworkBehaviourAction<
RequestResponseEvent<TCodec::Request, TCodec::Response>,
RequestResponseHandler<TCodec>,
>,
>,
pending_events:
VecDeque<NetworkBehaviourAction<Event<TCodec::Request, TCodec::Response>, Handler<TCodec>>>,
/// The currently connected peers, their pending outbound and inbound responses and their known,
/// reachable addresses, if any.
connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
@ -329,13 +360,13 @@ where
pending_outbound_requests: HashMap<PeerId, SmallVec<[RequestProtocol<TCodec>; 10]>>,
}
impl<TCodec> RequestResponse<TCodec>
impl<TCodec> Behaviour<TCodec>
where
TCodec: RequestResponseCodec + Clone + Send + 'static,
TCodec: Codec + Clone + Send + 'static,
{
/// Creates a new `RequestResponse` behaviour for the given
/// Creates a new `Behaviour` for the given
/// protocols, codec and configuration.
pub fn new<I>(codec: TCodec, protocols: I, cfg: RequestResponseConfig) -> Self
pub fn new<I>(codec: TCodec, protocols: I, cfg: Config) -> Self
where
I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
{
@ -349,7 +380,7 @@ where
outbound_protocols.push(p.clone());
}
}
RequestResponse {
Behaviour {
inbound_protocols,
outbound_protocols,
next_request_id: RequestId(1),
@ -373,8 +404,8 @@ where
/// > the `RequestResonse` protocol must either be embedded
/// > in another `NetworkBehaviour` that provides peer and
/// > address discovery, or known addresses of peers must be
/// > managed via [`RequestResponse::add_address`] and
/// > [`RequestResponse::remove_address`].
/// > managed via [`Behaviour::add_address`] and
/// > [`Behaviour::remove_address`].
pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId {
let request_id = self.next_request_id();
let request = RequestProtocol {
@ -404,12 +435,12 @@ where
/// If the [`ResponseChannel`] is already closed due to a timeout or the
/// connection being closed, the response is returned as an `Err` for
/// further handling. Once the response has been successfully sent on the
/// corresponding connection, [`RequestResponseEvent::ResponseSent`] is
/// emitted. In all other cases [`RequestResponseEvent::InboundFailure`]
/// corresponding connection, [`Event::ResponseSent`] is
/// emitted. In all other cases [`Event::InboundFailure`]
/// will be or has been emitted.
///
/// The provided `ResponseChannel` is obtained from an inbound
/// [`RequestResponseMessage::Request`].
/// [`Message::Request`].
pub fn send_response(
&mut self,
ch: ResponseChannel<TCodec::Response>,
@ -449,7 +480,7 @@ where
}
/// Checks whether an outbound request to the peer with the provided
/// [`PeerId`] initiated by [`RequestResponse::send_request`] is still
/// [`PeerId`] initiated by [`Behaviour::send_request`] is still
/// pending, i.e. waiting for a response.
pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
// Check if request is already sent on established connection.
@ -473,7 +504,7 @@ where
/// Checks whether an inbound request from the peer with the provided
/// [`PeerId`] is still pending, i.e. waiting for a response by the local
/// node through [`RequestResponse::send_response`].
/// node through [`Behaviour::send_response`].
pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
self.connected
.get(peer)
@ -644,7 +675,7 @@ where
for request_id in connection.pending_outbound_responses {
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
Event::InboundFailure {
peer: peer_id,
request_id,
error: InboundFailure::ConnectionClosed,
@ -655,7 +686,7 @@ where
for request_id in connection.pending_inbound_responses {
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
Event::OutboundFailure {
peer: peer_id,
request_id,
error: OutboundFailure::ConnectionClosed,
@ -679,7 +710,7 @@ where
for request in pending {
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
Event::OutboundFailure {
peer,
request_id: request.request_id,
error: OutboundFailure::DialFailure,
@ -691,15 +722,15 @@ where
}
}
impl<TCodec> NetworkBehaviour for RequestResponse<TCodec>
impl<TCodec> NetworkBehaviour for Behaviour<TCodec>
where
TCodec: RequestResponseCodec + Send + Clone + 'static,
TCodec: Codec + Send + Clone + 'static,
{
type ConnectionHandler = RequestResponseHandler<TCodec>;
type OutEvent = RequestResponseEvent<TCodec::Request, TCodec::Response>;
type ConnectionHandler = Handler<TCodec>;
type OutEvent = Event<TCodec::Request, TCodec::Response>;
fn new_handler(&mut self) -> Self::ConnectionHandler {
RequestResponseHandler::new(
Handler::new(
self.inbound_protocols.clone(),
self.codec.clone(),
self.config.connection_keep_alive,
@ -748,7 +779,7 @@ where
libp2p_swarm::ConnectionHandler>::OutEvent,
) {
match event {
RequestResponseHandlerEvent::Response {
handler::Event::Response {
request_id,
response,
} => {
@ -758,41 +789,43 @@ where
"Expect request_id to be pending before receiving response.",
);
let message = RequestResponseMessage::Response {
let message = Message::Response {
request_id,
response,
};
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::Message { peer, message },
));
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Message {
peer,
message,
}));
}
RequestResponseHandlerEvent::Request {
handler::Event::Request {
request_id,
request,
sender,
} => {
let channel = ResponseChannel { sender };
let message = RequestResponseMessage::Request {
let message = Message::Request {
request_id,
request,
channel,
};
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::Message { peer, message },
));
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Message {
peer,
message,
}));
match self.get_connection_mut(&peer, connection) {
Some(connection) => {
let inserted = connection.pending_outbound_responses.insert(request_id);
debug_assert!(inserted, "Expect id of new request to be unknown.");
}
// Connection closed after `RequestResponseEvent::Request` has been emitted.
// Connection closed after `Event::Request` has been emitted.
None => {
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
Event::InboundFailure {
peer,
request_id,
error: InboundFailure::ConnectionClosed,
@ -801,7 +834,7 @@ where
}
}
}
RequestResponseHandlerEvent::ResponseSent(request_id) => {
handler::Event::ResponseSent(request_id) => {
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
debug_assert!(
removed,
@ -809,11 +842,12 @@ where
);
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::ResponseSent { peer, request_id },
));
.push_back(NetworkBehaviourAction::GenerateEvent(Event::ResponseSent {
peer,
request_id,
}));
}
RequestResponseHandlerEvent::ResponseOmission(request_id) => {
handler::Event::ResponseOmission(request_id) => {
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
debug_assert!(
removed,
@ -822,14 +856,14 @@ where
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
Event::InboundFailure {
peer,
request_id,
error: InboundFailure::ResponseOmission,
},
));
}
RequestResponseHandlerEvent::OutboundTimeout(request_id) => {
handler::Event::OutboundTimeout(request_id) => {
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
debug_assert!(
removed,
@ -838,15 +872,15 @@ where
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
Event::OutboundFailure {
peer,
request_id,
error: OutboundFailure::Timeout,
},
));
}
RequestResponseHandlerEvent::InboundTimeout(request_id) => {
// Note: `RequestResponseHandlerEvent::InboundTimeout` is emitted both for timing
handler::Event::InboundTimeout(request_id) => {
// Note: `Event::InboundTimeout` is emitted both for timing
// out to receive the request and for timing out sending the response. In the former
// case the request is never added to `pending_outbound_responses` and thus one can
// not assert the request_id to be present before removing it.
@ -854,14 +888,14 @@ where
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
Event::InboundFailure {
peer,
request_id,
error: InboundFailure::Timeout,
},
));
}
RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => {
handler::Event::OutboundUnsupportedProtocols(request_id) => {
let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
debug_assert!(
removed,
@ -870,20 +904,20 @@ where
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
Event::OutboundFailure {
peer,
request_id,
error: OutboundFailure::UnsupportedProtocols,
},
));
}
RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => {
handler::Event::InboundUnsupportedProtocols(request_id) => {
// Note: No need to call `self.remove_pending_outbound_response`,
// `RequestResponseHandlerEvent::Request` was never emitted for this request and
// `Event::Request` was never emitted for this request and
// thus request was never added to `pending_outbound_responses`.
self.pending_events
.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
Event::InboundFailure {
peer,
request_id,
error: InboundFailure::UnsupportedProtocols,