diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs index ef7d1fdf..b38f4569 100644 --- a/examples/file-sharing.rs +++ b/examples/file-sharing.rs @@ -215,10 +215,7 @@ mod network { use libp2p::kad::record::store::MemoryStore; use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult}; use libp2p::multiaddr::Protocol; - use libp2p::request_response::{ - ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent, - RequestResponseMessage, ResponseChannel, - }; + use libp2p::request_response::{self, ProtocolSupport, RequestId, ResponseChannel}; use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent}; use std::collections::{hash_map, HashMap, HashSet}; use std::iter; @@ -254,7 +251,7 @@ mod network { libp2p::development_transport(id_keys).await?, ComposedBehaviour { kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), - request_response: RequestResponse::new( + request_response: request_response::Behaviour::new( FileExchangeCodec(), iter::once((FileExchangeProtocol(), ProtocolSupport::Full)), Default::default(), @@ -459,9 +456,9 @@ mod network { )) => {} SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} SwarmEvent::Behaviour(ComposedEvent::RequestResponse( - RequestResponseEvent::Message { message, .. }, + request_response::Event::Message { message, .. }, )) => match message { - RequestResponseMessage::Request { + request_response::Message::Request { request, channel, .. } => { self.event_sender @@ -472,7 +469,7 @@ mod network { .await .expect("Event receiver not to be dropped."); } - RequestResponseMessage::Response { + request_response::Message::Response { request_id, response, } => { @@ -484,7 +481,7 @@ mod network { } }, SwarmEvent::Behaviour(ComposedEvent::RequestResponse( - RequestResponseEvent::OutboundFailure { + request_response::Event::OutboundFailure { request_id, error, .. }, )) => { @@ -495,7 +492,7 @@ mod network { .send(Err(Box::new(error))); } SwarmEvent::Behaviour(ComposedEvent::RequestResponse( - RequestResponseEvent::ResponseSent { .. }, + request_response::Event::ResponseSent { .. }, )) => {} SwarmEvent::NewListenAddr { address, .. } => { let local_peer_id = *self.swarm.local_peer_id(); @@ -604,18 +601,18 @@ mod network { #[derive(NetworkBehaviour)] #[behaviour(out_event = "ComposedEvent")] struct ComposedBehaviour { - request_response: RequestResponse, + request_response: request_response::Behaviour, kademlia: Kademlia, } #[derive(Debug)] enum ComposedEvent { - RequestResponse(RequestResponseEvent), + RequestResponse(request_response::Event), Kademlia(KademliaEvent), } - impl From> for ComposedEvent { - fn from(event: RequestResponseEvent) -> Self { + impl From> for ComposedEvent { + fn from(event: request_response::Event) -> Self { ComposedEvent::RequestResponse(event) } } @@ -682,7 +679,7 @@ mod network { } #[async_trait] - impl RequestResponseCodec for FileExchangeCodec { + impl request_response::Codec for FileExchangeCodec { type Protocol = FileExchangeProtocol; type Request = FileRequest; type Response = FileResponse; diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index cc0adc51..a0d5ad4f 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -32,8 +32,7 @@ use libp2p_core::{ connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Endpoint, Multiaddr, PeerId, }; use libp2p_request_response::{ - ProtocolSupport, RequestId, RequestResponse, RequestResponseConfig, RequestResponseEvent, - RequestResponseMessage, ResponseChannel, + self as request_response, ProtocolSupport, RequestId, ResponseChannel, }; use libp2p_swarm::{ behaviour::{ @@ -167,7 +166,7 @@ pub struct Behaviour { local_peer_id: PeerId, // Inner behaviour for sending requests and receiving the response. - inner: RequestResponse, + inner: request_response::Behaviour, config: Config, @@ -218,9 +217,9 @@ pub struct Behaviour { impl Behaviour { pub fn new(local_peer_id: PeerId, config: Config) -> Self { let protocols = iter::once((AutoNatProtocol, ProtocolSupport::Full)); - let mut cfg = RequestResponseConfig::default(); + let mut cfg = request_response::Config::default(); cfg.set_request_timeout(config.timeout); - let inner = RequestResponse::new(AutoNatCodec, protocols, cfg); + let inner = request_response::Behaviour::new(AutoNatCodec, protocols, cfg); Self { local_peer_id, inner, @@ -419,7 +418,8 @@ impl Behaviour { } impl NetworkBehaviour for Behaviour { - type ConnectionHandler = as NetworkBehaviour>::ConnectionHandler; + type ConnectionHandler = + as NetworkBehaviour>::ConnectionHandler; type OutEvent = Event; fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll { @@ -432,21 +432,21 @@ impl NetworkBehaviour for Behaviour { match self.inner.poll(cx, params) { Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { let (mut events, action) = match event { - RequestResponseEvent::Message { - message: RequestResponseMessage::Response { .. }, + request_response::Event::Message { + message: request_response::Message::Response { .. }, .. } - | RequestResponseEvent::OutboundFailure { .. } => { + | request_response::Event::OutboundFailure { .. } => { self.as_client().handle_event(params, event) } - RequestResponseEvent::Message { - message: RequestResponseMessage::Request { .. }, + request_response::Event::Message { + message: request_response::Message::Request { .. }, .. } - | RequestResponseEvent::InboundFailure { .. } => { + | request_response::Event::InboundFailure { .. } => { self.as_server().handle_event(params, event) } - RequestResponseEvent::ResponseSent { .. } => (VecDeque::new(), None), + request_response::Event::ResponseSent { .. } => (VecDeque::new(), None), }; self.pending_out_events.append(&mut events); if let Some(action) = action { @@ -542,12 +542,12 @@ type Action = NetworkBehaviourAction< ::ConnectionHandler, >; -// Trait implemented for `AsClient` as `AsServer` to handle events from the inner [`RequestResponse`] Protocol. +// Trait implemented for `AsClient` and `AsServer` to handle events from the inner [`request_response::Behaviour`] Protocol. trait HandleInnerEvent { fn handle_event( &mut self, params: &mut impl PollParameters, - event: RequestResponseEvent, + event: request_response::Event, ) -> (VecDeque, Option); } diff --git a/protocols/autonat/src/behaviour/as_client.rs b/protocols/autonat/src/behaviour/as_client.rs index cbb63f6a..43763029 100644 --- a/protocols/autonat/src/behaviour/as_client.rs +++ b/protocols/autonat/src/behaviour/as_client.rs @@ -28,9 +28,7 @@ use futures::FutureExt; use futures_timer::Delay; use instant::Instant; use libp2p_core::{connection::ConnectionId, Multiaddr, PeerId}; -use libp2p_request_response::{ - OutboundFailure, RequestId, RequestResponse, RequestResponseEvent, RequestResponseMessage, -}; +use libp2p_request_response::{self as request_response, OutboundFailure, RequestId}; use libp2p_swarm::{AddressScore, NetworkBehaviourAction, PollParameters}; use rand::{seq::SliceRandom, thread_rng}; use std::{ @@ -83,7 +81,7 @@ pub enum OutboundProbeEvent { /// View over [`super::Behaviour`] in a client role. pub struct AsClient<'a> { - pub inner: &'a mut RequestResponse, + pub inner: &'a mut request_response::Behaviour, pub local_peer_id: PeerId, pub config: &'a Config, pub connected: &'a HashMap>>, @@ -105,15 +103,15 @@ impl<'a> HandleInnerEvent for AsClient<'a> { fn handle_event( &mut self, params: &mut impl PollParameters, - event: RequestResponseEvent, + event: request_response::Event, ) -> (VecDeque, Option) { let mut events = VecDeque::new(); let mut action = None; match event { - RequestResponseEvent::Message { + request_response::Event::Message { peer, message: - RequestResponseMessage::Response { + request_response::Message::Response { request_id, response, }, @@ -160,7 +158,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> { } } } - RequestResponseEvent::OutboundFailure { + request_response::Event::OutboundFailure { peer, error, request_id, diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/behaviour/as_server.rs index f858c48c..455ac3d1 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/behaviour/as_server.rs @@ -25,8 +25,7 @@ use super::{ use instant::Instant; use libp2p_core::{connection::ConnectionId, multiaddr::Protocol, Multiaddr, PeerId}; use libp2p_request_response::{ - InboundFailure, RequestId, RequestResponse, RequestResponseEvent, RequestResponseMessage, - ResponseChannel, + self as request_response, InboundFailure, RequestId, ResponseChannel, }; use libp2p_swarm::{ dial_opts::{DialOpts, PeerCondition}, @@ -75,7 +74,7 @@ pub enum InboundProbeEvent { /// View over [`super::Behaviour`] in a server role. pub struct AsServer<'a> { - pub inner: &'a mut RequestResponse, + pub inner: &'a mut request_response::Behaviour, pub config: &'a Config, pub connected: &'a HashMap>>, pub probe_id: &'a mut ProbeId, @@ -98,15 +97,15 @@ impl<'a> HandleInnerEvent for AsServer<'a> { fn handle_event( &mut self, _params: &mut impl PollParameters, - event: RequestResponseEvent, + event: request_response::Event, ) -> (VecDeque, Option) { let mut events = VecDeque::new(); let mut action = None; match event { - RequestResponseEvent::Message { + request_response::Event::Message { peer, message: - RequestResponseMessage::Request { + request_response::Message::Request { request_id, request, channel, @@ -161,7 +160,7 @@ impl<'a> HandleInnerEvent for AsServer<'a> { } } } - RequestResponseEvent::InboundFailure { + request_response::Event::InboundFailure { peer, error, request_id, diff --git a/protocols/autonat/src/protocol.rs b/protocols/autonat/src/protocol.rs index f39c22aa..08ae4153 100644 --- a/protocols/autonat/src/protocol.rs +++ b/protocols/autonat/src/protocol.rs @@ -22,7 +22,7 @@ use crate::structs_proto; use async_trait::async_trait; use futures::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use libp2p_core::{upgrade, Multiaddr, PeerId}; -use libp2p_request_response::{ProtocolName, RequestResponseCodec}; +use libp2p_request_response::{self as request_response, ProtocolName}; use prost::Message; use std::{convert::TryFrom, io}; @@ -42,7 +42,7 @@ impl ProtocolName for AutoNatProtocol { pub struct AutoNatCodec; #[async_trait] -impl RequestResponseCodec for AutoNatCodec { +impl request_response::Codec for AutoNatCodec { type Protocol = AutoNatProtocol; type Request = DialRequest; type Response = DialResponse; diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index dcc6d441..59eb17ac 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,7 +1,17 @@ # 0.24.0 [unreleased] +- Rename types as per [discussion 2174]. + `RequestResponse` has been renamed to `Behaviour`. + The `RequestResponse` prefix has been removed from various types like `RequestResponseEvent`. + Users should prefer importing the request_response protocol as a module (`use libp2p::request_response;`), + and refer to its types via `request_response::`. For example: `request_response::Behaviour` or `request_response::Event`. + See [PR 3159]. + - Update to `libp2p-swarm` `v0.42.0`. +[discussion 2174]: https://github.com/libp2p/rust-libp2p/discussions/2174 +[PR 3159]: https://github.com/libp2p/rust-libp2p/pull/3159 + # 0.23.0 - Update to `libp2p-core` `v0.38.0`. diff --git a/protocols/request-response/src/codec.rs b/protocols/request-response/src/codec.rs index 5345d200..c5af967d 100644 --- a/protocols/request-response/src/codec.rs +++ b/protocols/request-response/src/codec.rs @@ -25,8 +25,12 @@ use futures::prelude::*; use std::io; /// A `RequestResponseCodec` defines the request and response types -/// for a [`RequestResponse`](crate::RequestResponse) protocol or +/// for a request-response `Behaviour` protocol or /// protocol family and how they are encoded / decoded on an I/O stream. +#[deprecated( + since = "0.24.0", + note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::Codec`" +)] #[async_trait] pub trait RequestResponseCodec { /// The type of protocol(s) or protocol versions being negotiated. @@ -78,3 +82,118 @@ pub trait RequestResponseCodec { where T: AsyncWrite + Unpin + Send; } + +/// A `Codec` defines the request and response types +/// for a request-response [`Behaviour`](crate::Behaviour) protocol or +/// protocol family and how they are encoded / decoded on an I/O stream. +#[async_trait] +pub trait Codec { + /// The type of protocol(s) or protocol versions being negotiated. + type Protocol: ProtocolName + Send + Clone; + /// The type of inbound and outbound requests. + type Request: Send; + /// The type of inbound and outbound responses. + type Response: Send; + + /// Reads a request from the given I/O stream according to the + /// negotiated protocol. + async fn read_request( + &mut self, + protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send; + + /// Reads a response from the given I/O stream according to the + /// negotiated protocol. + async fn read_response( + &mut self, + protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send; + + /// Writes a request to the given I/O stream according to the + /// negotiated protocol. + async fn write_request( + &mut self, + protocol: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send; + + /// Writes a response to the given I/O stream according to the + /// negotiated protocol. + async fn write_response( + &mut self, + protocol: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send; +} + +#[allow(deprecated)] +#[async_trait] +impl Codec for U +where + U: RequestResponseCodec + Send, + U::Protocol: Sync, +{ + type Protocol = U::Protocol; + + type Request = U::Request; + + type Response = U::Response; + + async fn read_request( + &mut self, + protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + self.read_request(protocol, io).await + } + + async fn read_response( + &mut self, + protocol: &Self::Protocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + self.read_response(protocol, io).await + } + + async fn write_request( + &mut self, + protocol: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + self.write_request(protocol, io, req).await + } + + async fn write_response( + &mut self, + protocol: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + self.write_response(protocol, io, res).await + } +} diff --git a/protocols/request-response/src/handler.rs b/protocols/request-response/src/handler.rs index 317da87d..3eba5b10 100644 --- a/protocols/request-response/src/handler.rs +++ b/protocols/request-response/src/handler.rs @@ -20,7 +20,7 @@ mod protocol; -use crate::codec::RequestResponseCodec; +use crate::codec::Codec; use crate::{RequestId, EMPTY_QUEUE_SHRINK_THRESHOLD}; use libp2p_swarm::handler::{ @@ -48,11 +48,16 @@ use std::{ time::Duration, }; -/// A connection handler of a `RequestResponse` protocol. -#[doc(hidden)] -pub struct RequestResponseHandler +#[deprecated( + since = "0.24.0", + note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::handler::Handler`" +)] +pub type RequestResponseHandler = Handler; + +/// A connection handler for a request response [`Behaviour`](super::Behaviour) protocol. +pub struct Handler where - TCodec: RequestResponseCodec, + TCodec: Codec, { /// The supported inbound protocols. inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, @@ -69,7 +74,7 @@ where /// A pending fatal error that results in the connection being closed. pending_error: Option>, /// Queue of events to emit in `poll()`. - pending_events: VecDeque>, + pending_events: VecDeque>, /// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`. outbound: VecDeque>, /// Inbound upgrades waiting for the incoming request. @@ -88,9 +93,9 @@ where inbound_request_id: Arc, } -impl RequestResponseHandler +impl Handler where - TCodec: RequestResponseCodec + Send + Clone + 'static, + TCodec: Codec + Send + Clone + 'static, { pub(super) fn new( inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, @@ -125,10 +130,10 @@ where ) { if sent { self.pending_events - .push_back(RequestResponseHandlerEvent::ResponseSent(request_id)) + .push_back(Event::ResponseSent(request_id)) } else { self.pending_events - .push_back(RequestResponseHandlerEvent::ResponseOmission(request_id)) + .push_back(Event::ResponseOmission(request_id)) } } @@ -141,8 +146,7 @@ where ) { match error { ConnectionHandlerUpgrErr::Timeout => { - self.pending_events - .push_back(RequestResponseHandlerEvent::OutboundTimeout(info)); + self.pending_events.push_back(Event::OutboundTimeout(info)); } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { // The remote merely doesn't support the protocol(s) we requested. @@ -150,9 +154,8 @@ where // successfully communicate with other protocols already. // An event is reported to permit user code to react to the fact that // the remote peer does not support the requested protocol(s). - self.pending_events.push_back( - RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info), - ); + self.pending_events + .push_back(Event::OutboundUnsupportedProtocols(info)); } _ => { // Anything else is considered a fatal error or misbehaviour of @@ -169,18 +172,17 @@ where >, ) { match error { - ConnectionHandlerUpgrErr::Timeout => self - .pending_events - .push_back(RequestResponseHandlerEvent::InboundTimeout(info)), + ConnectionHandlerUpgrErr::Timeout => { + self.pending_events.push_back(Event::InboundTimeout(info)) + } ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { // The local peer merely doesn't support the protocol(s) requested. // This is no reason to close the connection, which may // successfully communicate with other protocols already. // An event is reported to permit user code to react to the fact that // the local peer does not support the requested protocol(s). - self.pending_events.push_back( - RequestResponseHandlerEvent::InboundUnsupportedProtocols(info), - ); + self.pending_events + .push_back(Event::InboundUnsupportedProtocols(info)); } _ => { // Anything else is considered a fatal error or misbehaviour of @@ -191,11 +193,16 @@ where } } -/// The events emitted by the [`RequestResponseHandler`]. -#[doc(hidden)] -pub enum RequestResponseHandlerEvent +#[deprecated( + since = "0.24.0", + note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::handler::Event`" +)] +pub type RequestResponseHandlerEvent = Event; + +/// The events emitted by the [`Handler`]. +pub enum Event where - TCodec: RequestResponseCodec, + TCodec: Codec, { /// A request has been received. Request { @@ -225,58 +232,58 @@ where InboundUnsupportedProtocols(RequestId), } -impl fmt::Debug for RequestResponseHandlerEvent { +impl fmt::Debug for Event { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - RequestResponseHandlerEvent::Request { + Event::Request { request_id, request: _, sender: _, } => f - .debug_struct("RequestResponseHandlerEvent::Request") + .debug_struct("Event::Request") .field("request_id", request_id) .finish(), - RequestResponseHandlerEvent::Response { + Event::Response { request_id, response: _, } => f - .debug_struct("RequestResponseHandlerEvent::Response") + .debug_struct("Event::Response") .field("request_id", request_id) .finish(), - RequestResponseHandlerEvent::ResponseSent(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::ResponseSent") + Event::ResponseSent(request_id) => f + .debug_tuple("Event::ResponseSent") .field(request_id) .finish(), - RequestResponseHandlerEvent::ResponseOmission(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::ResponseOmission") + Event::ResponseOmission(request_id) => f + .debug_tuple("Event::ResponseOmission") .field(request_id) .finish(), - RequestResponseHandlerEvent::OutboundTimeout(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::OutboundTimeout") + Event::OutboundTimeout(request_id) => f + .debug_tuple("Event::OutboundTimeout") .field(request_id) .finish(), - RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::OutboundUnsupportedProtocols") + Event::OutboundUnsupportedProtocols(request_id) => f + .debug_tuple("Event::OutboundUnsupportedProtocols") .field(request_id) .finish(), - RequestResponseHandlerEvent::InboundTimeout(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::InboundTimeout") + Event::InboundTimeout(request_id) => f + .debug_tuple("Event::InboundTimeout") .field(request_id) .finish(), - RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => f - .debug_tuple("RequestResponseHandlerEvent::InboundUnsupportedProtocols") + Event::InboundUnsupportedProtocols(request_id) => f + .debug_tuple("Event::InboundUnsupportedProtocols") .field(request_id) .finish(), } } } -impl ConnectionHandler for RequestResponseHandler +impl ConnectionHandler for Handler where - TCodec: RequestResponseCodec + Send + Clone + 'static, + TCodec: Codec + Send + Clone + 'static, { type InEvent = RequestProtocol; - type OutEvent = RequestResponseHandlerEvent; + type OutEvent = Event; type Error = ConnectionHandlerUpgrErr; type InboundProtocol = ResponseProtocol; type OutboundProtocol = RequestProtocol; @@ -309,7 +316,7 @@ where }; // The handler waits for the request to come in. It then emits - // `RequestResponseHandlerEvent::Request` together with a + // `Event::Request` together with a // `ResponseChannel`. self.inbound .push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed()); @@ -350,13 +357,11 @@ where Ok(((id, rq), rs_sender)) => { // We received an inbound request. self.keep_alive = KeepAlive::Yes; - return Poll::Ready(ConnectionHandlerEvent::Custom( - RequestResponseHandlerEvent::Request { - request_id: id, - request: rq, - sender: rs_sender, - }, - )); + return Poll::Ready(ConnectionHandlerEvent::Custom(Event::Request { + request_id: id, + request: rq, + sender: rs_sender, + })); } Err(oneshot::Canceled) => { // The inbound upgrade has errored or timed out reading @@ -409,11 +414,10 @@ where protocol: response, info: request_id, }) => { - self.pending_events - .push_back(RequestResponseHandlerEvent::Response { - request_id, - response, - }); + self.pending_events.push_back(Event::Response { + request_id, + response, + }); } ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { self.on_dial_upgrade_error(dial_upgrade_error) diff --git a/protocols/request-response/src/handler/protocol.rs b/protocols/request-response/src/handler/protocol.rs index dda4ee00..84ef3657 100644 --- a/protocols/request-response/src/handler/protocol.rs +++ b/protocols/request-response/src/handler/protocol.rs @@ -23,7 +23,7 @@ //! receives a request and sends a response, whereas the //! outbound upgrade send a request and receives a response. -use crate::codec::RequestResponseCodec; +use crate::codec::Codec; use crate::RequestId; use futures::{channel::oneshot, future::BoxFuture, prelude::*}; @@ -67,7 +67,7 @@ impl ProtocolSupport { #[derive(Debug)] pub struct ResponseProtocol where - TCodec: RequestResponseCodec, + TCodec: Codec, { pub(crate) codec: TCodec, pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, @@ -78,7 +78,7 @@ where impl UpgradeInfo for ResponseProtocol where - TCodec: RequestResponseCodec, + TCodec: Codec, { type Info = TCodec::Protocol; type InfoIter = smallvec::IntoIter<[Self::Info; 2]>; @@ -90,7 +90,7 @@ where impl InboundUpgrade for ResponseProtocol where - TCodec: RequestResponseCodec + Send + 'static, + TCodec: Codec + Send + 'static, { type Output = bool; type Error = io::Error; @@ -132,7 +132,7 @@ where /// Sends a request and receives a response. pub struct RequestProtocol where - TCodec: RequestResponseCodec, + TCodec: Codec, { pub(crate) codec: TCodec, pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>, @@ -142,7 +142,7 @@ where impl fmt::Debug for RequestProtocol where - TCodec: RequestResponseCodec, + TCodec: Codec, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("RequestProtocol") @@ -153,7 +153,7 @@ where impl UpgradeInfo for RequestProtocol where - TCodec: RequestResponseCodec, + TCodec: Codec, { type Info = TCodec::Protocol; type InfoIter = smallvec::IntoIter<[Self::Info; 2]>; @@ -165,7 +165,7 @@ where impl OutboundUpgrade for RequestProtocol where - TCodec: RequestResponseCodec + Send + 'static, + TCodec: Codec + Send + 'static, { type Output = TCodec::Response; type Error = io::Error; diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index 68c62125..f5fa3067 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -22,34 +22,34 @@ //! //! ## General Usage //! -//! [`RequestResponse`] is a `NetworkBehaviour` that implements a generic +//! The [`Behaviour`] struct is a [`NetworkBehaviour`] that implements a generic //! request/response protocol or protocol family, whereby each request is -//! sent over a new substream on a connection. `RequestResponse` is generic +//! sent over a new substream on a connection. `Behaviour` is generic //! over the actual messages being sent, which are defined in terms of a -//! [`RequestResponseCodec`]. Creating a request/response protocol thus amounts +//! [`Codec`]. Creating a request/response protocol thus amounts //! to providing an implementation of this trait which can then be -//! given to [`RequestResponse::new`]. Further configuration options are -//! available via the [`RequestResponseConfig`]. +//! given to [`Behaviour::new`]. Further configuration options are +//! available via the [`Config`]. //! -//! Requests are sent using [`RequestResponse::send_request`] and the -//! responses received as [`RequestResponseMessage::Response`] via -//! [`RequestResponseEvent::Message`]. +//! Requests are sent using [`Behaviour::send_request`] and the +//! responses received as [`Message::Response`] via +//! [`Event::Message`]. //! -//! Responses are sent using [`RequestResponse::send_response`] upon -//! receiving a [`RequestResponseMessage::Request`] via -//! [`RequestResponseEvent::Message`]. +//! Responses are sent using [`Behaviour::send_response`] upon +//! receiving a [`Message::Request`] via +//! [`Event::Message`]. //! //! ## Protocol Families //! -//! A single [`RequestResponse`] instance can be used with an entire +//! A single [`Behaviour`] instance can be used with an entire //! protocol family that share the same request and response types. -//! For that purpose, [`RequestResponseCodec::Protocol`] is typically +//! For that purpose, [`Codec::Protocol`] is typically //! instantiated with a sum type. //! //! ## Limited Protocol Support //! //! It is possible to only support inbound or outbound requests for -//! a particular protocol. This is achieved by instantiating `RequestResponse` +//! a particular protocol. This is achieved by instantiating `Behaviour` //! with protocols using [`ProtocolSupport::Inbound`] or //! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol //! family can be configured in this way. Such protocols will not be @@ -61,11 +61,15 @@ pub mod codec; pub mod handler; -pub use codec::{ProtocolName, RequestResponseCodec}; +pub use codec::{Codec, ProtocolName}; + +#[allow(deprecated)] +pub use codec::RequestResponseCodec; + pub use handler::ProtocolSupport; use futures::channel::oneshot; -use handler::{RequestProtocol, RequestResponseHandler, RequestResponseHandlerEvent}; +use handler::{Handler, RequestProtocol}; use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}, @@ -81,9 +85,40 @@ use std::{ time::Duration, }; +#[deprecated( + since = "0.24.0", + note = "Use libp2p::request_response::Behaviour instead." +)] +pub type RequestResponse = Behaviour; + +#[deprecated( + since = "0.24.0", + note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::Config`" +)] +pub type RequestResponseConfig = Config; + +#[deprecated( + since = "0.24.0", + note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::Event`" +)] +pub type RequestResponseEvent = Event; + +#[deprecated( + since = "0.24.0", + note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::Message`" +)] +pub type RequestResponseMessage = + Message; + +#[deprecated( + since = "0.24.0", + note = "Use re-exports that omit `RequestResponse` prefix, i.e. `libp2p::request_response::handler::Event`" +)] +pub type HandlerEvent = handler::Event; + /// An inbound request or response. #[derive(Debug)] -pub enum RequestResponseMessage { +pub enum Message { /// A request message. Request { /// The ID of this request. @@ -93,7 +128,7 @@ pub enum RequestResponseMessage, }, @@ -101,22 +136,22 @@ pub enum RequestResponseMessage { +pub enum Event { /// An incoming message (request or response). Message { /// The peer who sent the message. peer: PeerId, /// The incoming message. - message: RequestResponseMessage, + message: Message, }, /// An outbound request failed. OutboundFailure { @@ -191,7 +226,7 @@ impl std::error::Error for OutboundFailure {} pub enum InboundFailure { /// The inbound request timed out, either while reading the /// incoming request or before a response is sent, e.g. if - /// [`RequestResponse::send_response`] is not called in a + /// [`Behaviour::send_response`] is not called in a /// timely manner. Timeout, /// The connection closed before a response could be send. @@ -201,7 +236,7 @@ pub enum InboundFailure { UnsupportedProtocols, /// The local peer failed to respond to an inbound request /// due to the [`ResponseChannel`] being dropped instead of - /// being passed to [`RequestResponse::send_response`]. + /// being passed to [`Behaviour::send_response`]. ResponseOmission, } @@ -230,7 +265,7 @@ impl std::error::Error for InboundFailure {} /// A channel for sending a response to an inbound request. /// -/// See [`RequestResponse::send_response`]. +/// See [`Behaviour::send_response`]. #[derive(Debug)] pub struct ResponseChannel { sender: oneshot::Sender, @@ -238,8 +273,8 @@ pub struct ResponseChannel { impl ResponseChannel { /// Checks whether the response channel is still open, i.e. - /// the `RequestResponse` behaviour is still waiting for a - /// a response to be sent via [`RequestResponse::send_response`] + /// the `Behaviour` is still waiting for a + /// a response to be sent via [`Behaviour::send_response`] /// and this response channel. /// /// If the response channel is no longer open then the inbound @@ -255,7 +290,7 @@ impl ResponseChannel { /// inbound and likewise between two outbound requests. There is no /// uniqueness guarantee in a set of both inbound and outbound /// [`RequestId`]s nor in a set of inbound or outbound requests -/// originating from different [`RequestResponse`] behaviours. +/// originating from different [`Behaviour`]'s. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub struct RequestId(u64); @@ -265,14 +300,14 @@ impl fmt::Display for RequestId { } } -/// The configuration for a `RequestResponse` protocol. +/// The configuration for a `Behaviour` protocol. #[derive(Debug, Clone)] -pub struct RequestResponseConfig { +pub struct Config { request_timeout: Duration, connection_keep_alive: Duration, } -impl Default for RequestResponseConfig { +impl Default for Config { fn default() -> Self { Self { connection_keep_alive: Duration::from_secs(10), @@ -281,7 +316,7 @@ impl Default for RequestResponseConfig { } } -impl RequestResponseConfig { +impl Config { /// Sets the keep-alive timeout of idle connections. pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self { self.connection_keep_alive = v; @@ -296,9 +331,9 @@ impl RequestResponseConfig { } /// A request/response protocol for some message codec. -pub struct RequestResponse +pub struct Behaviour where - TCodec: RequestResponseCodec + Clone + Send + 'static, + TCodec: Codec + Clone + Send + 'static, { /// The supported inbound protocols. inbound_protocols: SmallVec<[TCodec::Protocol; 2]>, @@ -309,16 +344,12 @@ where /// The next (inbound) request ID. next_inbound_id: Arc, /// The protocol configuration. - config: RequestResponseConfig, + config: Config, /// The protocol codec for reading and writing requests and responses. codec: TCodec, /// Pending events to return from `poll`. - pending_events: VecDeque< - NetworkBehaviourAction< - RequestResponseEvent, - RequestResponseHandler, - >, - >, + pending_events: + VecDeque, Handler>>, /// The currently connected peers, their pending outbound and inbound responses and their known, /// reachable addresses, if any. connected: HashMap>, @@ -329,13 +360,13 @@ where pending_outbound_requests: HashMap; 10]>>, } -impl RequestResponse +impl Behaviour where - TCodec: RequestResponseCodec + Clone + Send + 'static, + TCodec: Codec + Clone + Send + 'static, { - /// Creates a new `RequestResponse` behaviour for the given + /// Creates a new `Behaviour` for the given /// protocols, codec and configuration. - pub fn new(codec: TCodec, protocols: I, cfg: RequestResponseConfig) -> Self + pub fn new(codec: TCodec, protocols: I, cfg: Config) -> Self where I: IntoIterator, { @@ -349,7 +380,7 @@ where outbound_protocols.push(p.clone()); } } - RequestResponse { + Behaviour { inbound_protocols, outbound_protocols, next_request_id: RequestId(1), @@ -373,8 +404,8 @@ where /// > the `RequestResonse` protocol must either be embedded /// > in another `NetworkBehaviour` that provides peer and /// > address discovery, or known addresses of peers must be - /// > managed via [`RequestResponse::add_address`] and - /// > [`RequestResponse::remove_address`]. + /// > managed via [`Behaviour::add_address`] and + /// > [`Behaviour::remove_address`]. pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId { let request_id = self.next_request_id(); let request = RequestProtocol { @@ -404,12 +435,12 @@ where /// If the [`ResponseChannel`] is already closed due to a timeout or the /// connection being closed, the response is returned as an `Err` for /// further handling. Once the response has been successfully sent on the - /// corresponding connection, [`RequestResponseEvent::ResponseSent`] is - /// emitted. In all other cases [`RequestResponseEvent::InboundFailure`] + /// corresponding connection, [`Event::ResponseSent`] is + /// emitted. In all other cases [`Event::InboundFailure`] /// will be or has been emitted. /// /// The provided `ResponseChannel` is obtained from an inbound - /// [`RequestResponseMessage::Request`]. + /// [`Message::Request`]. pub fn send_response( &mut self, ch: ResponseChannel, @@ -449,7 +480,7 @@ where } /// Checks whether an outbound request to the peer with the provided - /// [`PeerId`] initiated by [`RequestResponse::send_request`] is still + /// [`PeerId`] initiated by [`Behaviour::send_request`] is still /// pending, i.e. waiting for a response. pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { // Check if request is already sent on established connection. @@ -473,7 +504,7 @@ where /// Checks whether an inbound request from the peer with the provided /// [`PeerId`] is still pending, i.e. waiting for a response by the local - /// node through [`RequestResponse::send_response`]. + /// node through [`Behaviour::send_response`]. pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool { self.connected .get(peer) @@ -644,7 +675,7 @@ where for request_id in connection.pending_outbound_responses { self.pending_events .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::InboundFailure { + Event::InboundFailure { peer: peer_id, request_id, error: InboundFailure::ConnectionClosed, @@ -655,7 +686,7 @@ where for request_id in connection.pending_inbound_responses { self.pending_events .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::OutboundFailure { + Event::OutboundFailure { peer: peer_id, request_id, error: OutboundFailure::ConnectionClosed, @@ -679,7 +710,7 @@ where for request in pending { self.pending_events .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::OutboundFailure { + Event::OutboundFailure { peer, request_id: request.request_id, error: OutboundFailure::DialFailure, @@ -691,15 +722,15 @@ where } } -impl NetworkBehaviour for RequestResponse +impl NetworkBehaviour for Behaviour where - TCodec: RequestResponseCodec + Send + Clone + 'static, + TCodec: Codec + Send + Clone + 'static, { - type ConnectionHandler = RequestResponseHandler; - type OutEvent = RequestResponseEvent; + type ConnectionHandler = Handler; + type OutEvent = Event; fn new_handler(&mut self) -> Self::ConnectionHandler { - RequestResponseHandler::new( + Handler::new( self.inbound_protocols.clone(), self.codec.clone(), self.config.connection_keep_alive, @@ -748,7 +779,7 @@ where libp2p_swarm::ConnectionHandler>::OutEvent, ) { match event { - RequestResponseHandlerEvent::Response { + handler::Event::Response { request_id, response, } => { @@ -758,41 +789,43 @@ where "Expect request_id to be pending before receiving response.", ); - let message = RequestResponseMessage::Response { + let message = Message::Response { request_id, response, }; self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::Message { peer, message }, - )); + .push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { + peer, + message, + })); } - RequestResponseHandlerEvent::Request { + handler::Event::Request { request_id, request, sender, } => { let channel = ResponseChannel { sender }; - let message = RequestResponseMessage::Request { + let message = Message::Request { request_id, request, channel, }; self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::Message { peer, message }, - )); + .push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { + peer, + message, + })); match self.get_connection_mut(&peer, connection) { Some(connection) => { let inserted = connection.pending_outbound_responses.insert(request_id); debug_assert!(inserted, "Expect id of new request to be unknown."); } - // Connection closed after `RequestResponseEvent::Request` has been emitted. + // Connection closed after `Event::Request` has been emitted. None => { self.pending_events .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::InboundFailure { + Event::InboundFailure { peer, request_id, error: InboundFailure::ConnectionClosed, @@ -801,7 +834,7 @@ where } } } - RequestResponseHandlerEvent::ResponseSent(request_id) => { + handler::Event::ResponseSent(request_id) => { let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( removed, @@ -809,11 +842,12 @@ where ); self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::ResponseSent { peer, request_id }, - )); + .push_back(NetworkBehaviourAction::GenerateEvent(Event::ResponseSent { + peer, + request_id, + })); } - RequestResponseHandlerEvent::ResponseOmission(request_id) => { + handler::Event::ResponseOmission(request_id) => { let removed = self.remove_pending_outbound_response(&peer, connection, request_id); debug_assert!( removed, @@ -822,14 +856,14 @@ where self.pending_events .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::InboundFailure { + Event::InboundFailure { peer, request_id, error: InboundFailure::ResponseOmission, }, )); } - RequestResponseHandlerEvent::OutboundTimeout(request_id) => { + handler::Event::OutboundTimeout(request_id) => { let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); debug_assert!( removed, @@ -838,15 +872,15 @@ where self.pending_events .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::OutboundFailure { + Event::OutboundFailure { peer, request_id, error: OutboundFailure::Timeout, }, )); } - RequestResponseHandlerEvent::InboundTimeout(request_id) => { - // Note: `RequestResponseHandlerEvent::InboundTimeout` is emitted both for timing + handler::Event::InboundTimeout(request_id) => { + // Note: `Event::InboundTimeout` is emitted both for timing // out to receive the request and for timing out sending the response. In the former // case the request is never added to `pending_outbound_responses` and thus one can // not assert the request_id to be present before removing it. @@ -854,14 +888,14 @@ where self.pending_events .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::InboundFailure { + Event::InboundFailure { peer, request_id, error: InboundFailure::Timeout, }, )); } - RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => { + handler::Event::OutboundUnsupportedProtocols(request_id) => { let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); debug_assert!( removed, @@ -870,20 +904,20 @@ where self.pending_events .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::OutboundFailure { + Event::OutboundFailure { peer, request_id, error: OutboundFailure::UnsupportedProtocols, }, )); } - RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => { + handler::Event::InboundUnsupportedProtocols(request_id) => { // Note: No need to call `self.remove_pending_outbound_response`, - // `RequestResponseHandlerEvent::Request` was never emitted for this request and + // `Event::Request` was never emitted for this request and // thus request was never added to `pending_outbound_responses`. self.pending_events .push_back(NetworkBehaviourAction::GenerateEvent( - RequestResponseEvent::InboundFailure { + Event::InboundFailure { peer, request_id, error: InboundFailure::UnsupportedProtocols, diff --git a/protocols/request-response/tests/ping.rs b/protocols/request-response/tests/ping.rs index 1e0af206..72c77c3a 100644 --- a/protocols/request-response/tests/ping.rs +++ b/protocols/request-response/tests/ping.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -//! Integration tests for the `RequestResponse` network behaviour. +//! Integration tests for the `Behaviour`. use async_trait::async_trait; use futures::{channel::mpsc, prelude::*, AsyncWriteExt}; @@ -43,10 +43,10 @@ fn is_response_outbound() { let offline_peer = PeerId::random(); let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); + let cfg = Config::default(); let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::new(PingCodec(), protocols, cfg); + let ping_proto1 = Behaviour::new(PingCodec(), protocols, cfg); let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let request_id1 = swarm1 @@ -54,7 +54,7 @@ fn is_response_outbound() { .send_request(&offline_peer, ping.clone()); match futures::executor::block_on(swarm1.select_next_some()) { - SwarmEvent::Behaviour(RequestResponseEvent::OutboundFailure { + SwarmEvent::Behaviour(Event::OutboundFailure { peer, request_id: req_id, error: _error, @@ -82,14 +82,14 @@ fn ping_protocol() { let pong = Pong("pong".to_string().into_bytes()); let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); + let cfg = Config::default(); let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); + let ping_proto1 = Behaviour::new(PingCodec(), protocols.clone(), cfg.clone()); let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); - let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); + let ping_proto2 = Behaviour::new(PingCodec(), protocols, cfg); let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let (mut tx, mut rx) = mpsc::channel::(1); @@ -104,10 +104,10 @@ fn ping_protocol() { loop { match swarm1.select_next_some().await { SwarmEvent::NewListenAddr { address, .. } => tx.send(address).await.unwrap(), - SwarmEvent::Behaviour(RequestResponseEvent::Message { + SwarmEvent::Behaviour(Event::Message { peer, message: - RequestResponseMessage::Request { + Message::Request { request, channel, .. }, }) => { @@ -118,7 +118,7 @@ fn ping_protocol() { .send_response(channel, pong.clone()) .unwrap(); } - SwarmEvent::Behaviour(RequestResponseEvent::ResponseSent { peer, .. }) => { + SwarmEvent::Behaviour(Event::ResponseSent { peer, .. }) => { assert_eq!(&peer, &peer2_id); } SwarmEvent::Behaviour(e) => panic!("Peer1: Unexpected event: {:?}", e), @@ -138,10 +138,10 @@ fn ping_protocol() { loop { match swarm2.select_next_some().await { - SwarmEvent::Behaviour(RequestResponseEvent::Message { + SwarmEvent::Behaviour(Event::Message { peer, message: - RequestResponseMessage::Response { + Message::Response { request_id, response, }, @@ -171,14 +171,14 @@ fn emits_inbound_connection_closed_failure() { let ping = Ping("ping".to_string().into_bytes()); let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); + let cfg = Config::default(); let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); + let ping_proto1 = Behaviour::new(PingCodec(), protocols.clone(), cfg.clone()); let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); - let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); + let ping_proto2 = Behaviour::new(PingCodec(), protocols, cfg); let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); @@ -195,9 +195,9 @@ fn emits_inbound_connection_closed_failure() { let _channel = loop { futures::select!( event = swarm1.select_next_some() => match event { - SwarmEvent::Behaviour(RequestResponseEvent::Message { + SwarmEvent::Behaviour(Event::Message { peer, - message: RequestResponseMessage::Request { request, channel, .. } + message: Message::Request { request, channel, .. } }) => { assert_eq!(&request, &ping); assert_eq!(&peer, &peer2_id); @@ -219,7 +219,7 @@ fn emits_inbound_connection_closed_failure() { loop { match swarm1.select_next_some().await { - SwarmEvent::Behaviour(RequestResponseEvent::InboundFailure { + SwarmEvent::Behaviour(Event::InboundFailure { error: InboundFailure::ConnectionClosed, .. }) => break, @@ -240,14 +240,14 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() { let ping = Ping("ping".to_string().into_bytes()); let protocols = iter::once((PingProtocol(), ProtocolSupport::Full)); - let cfg = RequestResponseConfig::default(); + let cfg = Config::default(); let (peer1_id, trans) = mk_transport(); - let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone()); + let ping_proto1 = Behaviour::new(PingCodec(), protocols.clone(), cfg.clone()); let mut swarm1 = Swarm::without_executor(trans, ping_proto1, peer1_id); let (peer2_id, trans) = mk_transport(); - let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg); + let ping_proto2 = Behaviour::new(PingCodec(), protocols, cfg); let mut swarm2 = Swarm::without_executor(trans, ping_proto2, peer2_id); let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap(); @@ -264,9 +264,9 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() { let event = loop { futures::select!( event = swarm1.select_next_some() => { - if let SwarmEvent::Behaviour(RequestResponseEvent::Message { + if let SwarmEvent::Behaviour(Event::Message { peer, - message: RequestResponseMessage::Request { request, channel, .. } + message: Message::Request { request, channel, .. } }) = event { assert_eq!(&request, &ping); assert_eq!(&peer, &peer2_id); @@ -284,7 +284,7 @@ fn emits_inbound_connection_closed_if_channel_is_dropped() { }; let error = match event { - RequestResponseEvent::OutboundFailure { error, .. } => error, + Event::OutboundFailure { error, .. } => error, e => panic!("unexpected event from peer 2: {:?}", e), }; @@ -324,7 +324,7 @@ impl ProtocolName for PingProtocol { } #[async_trait] -impl RequestResponseCodec for PingCodec { +impl libp2p_request_response::Codec for PingCodec { type Protocol = PingProtocol; type Request = Ping; type Response = Pong;