Add libp2p-request-response protocol. (#1596)

* Add the libp2p-request-response protocol.

This crate provides a generic implementation for request/response
protocols, whereby each request is sent on a new substream.

* Fix OneShotHandler usage in floodsub.

* Custom ProtocolsHandler and multiple protocols.

  1. Implement a custom ProtocolsHandler instead of using
     the OneShotHandler for better control and error handling.
     In particular, all request/response sending/receiving is
     kept in the substreams upgrades and thus the background
     task of a connection.
  2. Support multiple protocols (usually protocol versions)
     with a single `RequestResponse` instance, with
     configurable inbound/outbound support.

* Small doc clarification.

* Remove unnecessary Sync bounds.

* Remove redundant Clone constraint.

* Update protocols/request-response/Cargo.toml

Co-authored-by: Toralf Wittner <tw@dtex.org>

* Update dev-dependencies.

* Update Cargo.tomls.

* Add changelog.

* Remove Sync bound from RequestResponseCodec::Protocol.

Apparently the compiler just needs some help with the scope
of borrows, which is unfortunate.

* Try async-trait.

* Allow checking whether a ResponseChannel is still open.

Also expand the commentary on `send_response` to indicate that
responses may be discard if they come in too late.

* Add `RequestResponse::is_pending`.

As an analogue of `ResponseChannel::is_open` for outbound requests.

* Revert now unnecessary changes to the OneShotHandler.

Since `libp2p-request-response` is no longer using it.

* Update CHANGELOG for libp2p-swarm.

Co-authored-by: Toralf Wittner <tw@dtex.org>
This commit is contained in:
Roman Borschel 2020-06-29 17:08:40 +02:00 committed by GitHub
parent 7270ed8721
commit eb8cb43508
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1481 additions and 72 deletions

View File

@ -10,6 +10,7 @@
- [`libp2p-ping` CHANGELOG](protocols/ping/CHANGELOG.md) - [`libp2p-ping` CHANGELOG](protocols/ping/CHANGELOG.md)
- [`libp2p-plaintext` CHANGELOG](protocols/plaintext/CHANGELOG.md) - [`libp2p-plaintext` CHANGELOG](protocols/plaintext/CHANGELOG.md)
- [`libp2p-pnet` CHANGELOG](protocols/pnet/CHANGELOG.md) - [`libp2p-pnet` CHANGELOG](protocols/pnet/CHANGELOG.md)
- [`libp2p-request-response` CHANGELOG](protocols/request-response/CHANGELOG.md)
- [`libp2p-secio` CHANGELOG](protocols/secio/CHANGELOG.md) - [`libp2p-secio` CHANGELOG](protocols/secio/CHANGELOG.md)
- [`libp2p-swarm` CHANGELOG](swarm/CHANGELOG.md) - [`libp2p-swarm` CHANGELOG](swarm/CHANGELOG.md)
- [`libp2p-tcp` CHANGELOG](transports/tcp/CHANGELOG.md) - [`libp2p-tcp` CHANGELOG](transports/tcp/CHANGELOG.md)

View File

@ -23,6 +23,7 @@ default = [
"ping", "ping",
"plaintext", "plaintext",
"pnet", "pnet",
"request-response",
"secio", "secio",
"secp256k1", "secp256k1",
"tcp-async-std", "tcp-async-std",
@ -43,6 +44,7 @@ noise = ["libp2p-noise"]
ping = ["libp2p-ping"] ping = ["libp2p-ping"]
plaintext = ["libp2p-plaintext"] plaintext = ["libp2p-plaintext"]
pnet = ["libp2p-pnet"] pnet = ["libp2p-pnet"]
request-response = ["libp2p-request-response"]
secio = ["libp2p-secio"] secio = ["libp2p-secio"]
tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"] tcp-async-std = ["libp2p-tcp", "libp2p-tcp/async-std"]
tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"] tcp-tokio = ["libp2p-tcp", "libp2p-tcp/tokio"]
@ -67,6 +69,7 @@ libp2p-noise = { version = "0.19.1", path = "protocols/noise", optional = true }
libp2p-ping = { version = "0.19.3", path = "protocols/ping", optional = true } libp2p-ping = { version = "0.19.3", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.19.1", path = "protocols/plaintext", optional = true } libp2p-plaintext = { version = "0.19.1", path = "protocols/plaintext", optional = true }
libp2p-pnet = { version = "0.19.1", path = "protocols/pnet", optional = true } libp2p-pnet = { version = "0.19.1", path = "protocols/pnet", optional = true }
libp2p-request-response = { version = "0.1.0", path = "protocols/request-response", optional = true }
libp2p-secio = { version = "0.19.2", path = "protocols/secio", default-features = false, optional = true } libp2p-secio = { version = "0.19.2", path = "protocols/secio", default-features = false, optional = true }
libp2p-swarm = { version = "0.19.1", path = "swarm" } libp2p-swarm = { version = "0.19.1", path = "swarm" }
libp2p-uds = { version = "0.19.2", path = "transports/uds", optional = true } libp2p-uds = { version = "0.19.2", path = "transports/uds", optional = true }
@ -107,6 +110,7 @@ members = [
"protocols/noise", "protocols/noise",
"protocols/ping", "protocols/ping",
"protocols/plaintext", "protocols/plaintext",
"protocols/request-response",
"protocols/secio", "protocols/secio",
"swarm", "swarm",
"transports/dns", "transports/dns",

View File

@ -87,12 +87,12 @@ pub use self::{
/// ///
/// # Context /// # Context
/// ///
/// In situations where we provide a list of protocols that we support, the elements of that list are required to /// In situations where we provide a list of protocols that we support,
/// implement the [`ProtocolName`] trait. /// the elements of that list are required to implement the [`ProtocolName`] trait.
/// ///
/// Libp2p will call the [`ProtocolName::protocol_name`] trait method on each element of that list, and transmit the /// Libp2p will call [`ProtocolName::protocol_name`] on each element of that list, and transmit the
/// returned value on the network. If the remote accepts a given protocol, the element serves as the return value of /// returned value on the network. If the remote accepts a given protocol, the element
/// the function that performed the negotiation. /// serves as the return value of the function that performed the negotiation.
/// ///
/// # Example /// # Example
/// ///
@ -118,6 +118,9 @@ pub use self::{
/// ///
pub trait ProtocolName { pub trait ProtocolName {
/// The protocol name as bytes. Transmitted on the network. /// The protocol name as bytes. Transmitted on the network.
///
/// **Note:** Valid protocol names must start with `/` and
/// not exceed 140 bytes in length.
fn protocol_name(&self) -> &[u8]; fn protocol_name(&self) -> &[u8];
} }

View File

@ -289,7 +289,7 @@ impl NetworkBehaviour for Floodsub {
_connection: ConnectionId, _connection: ConnectionId,
event: InnerMessage, event: InnerMessage,
) { ) {
// We ignore successful sends event. // We ignore successful sends or timeouts.
let event = match event { let event = match event {
InnerMessage::Rx(event) => event, InnerMessage::Rx(event) => event,
InnerMessage::Sent => return, InnerMessage::Sent => return,

View File

@ -0,0 +1,4 @@
# 0.1.0
Initial release.

View File

@ -0,0 +1,25 @@
[package]
name = "libp2p-request-response"
edition = "2018"
description = "Generic Request/Response Protocols"
version = "0.1.0"
authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
keywords = ["peer-to-peer", "libp2p", "networking"]
categories = ["network-programming", "asynchronous"]
[dependencies]
async-trait = "0.1"
futures = "0.3.1"
libp2p-core = { version = "0.19.2", path = "../../core" }
libp2p-swarm = { version = "0.19.1", path = "../../swarm" }
smallvec = "1.4"
wasm-timer = "0.2"
[dev-dependencies]
async-std = "1.6.2"
libp2p-noise = { path = "../noise" }
libp2p-tcp = { path = "../../transports/tcp", features = ["async-std"] }
libp2p-yamux = { path = "../../muxers/yamux" }
rand = "0.7"

View File

@ -0,0 +1,66 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
pub use libp2p_core::ProtocolName;
use async_trait::async_trait;
use futures::prelude::*;
use std::io;
/// A `RequestResponseCodec` defines the request and response types
/// for a [`RequestResponse`](crate::RequestResponse) protocol or
/// protocol family and how they are encoded / decoded on an I/O stream.
#[async_trait]
pub trait RequestResponseCodec {
/// The type of protocol(s) or protocol versions being negotiated.
type Protocol: ProtocolName + Send + Clone;
/// The type of inbound and outbound requests.
type Request: Send;
/// The type of inbound and outbound responses.
type Response: Send;
/// Reads a request from the given I/O stream according to the
/// negotiated protocol.
async fn read_request<T>(&mut self, protocol: &Self::Protocol, io: &mut T)
-> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send;
/// Reads a response from the given I/O stream according to the
/// negotiated protocol.
async fn read_response<T>(&mut self, protocol: &Self::Protocol, io: &mut T)
-> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send;
/// Writes a request to the given I/O stream according to the
/// negotiated protocol.
async fn write_request<T>(&mut self, protocol: &Self::Protocol, io: &mut T, req: Self::Request)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send;
/// Writes a response to the given I/O stream according to the
/// negotiated protocol.
async fn write_response<T>(&mut self, protocol: &Self::Protocol, io: &mut T, res: Self::Response)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send;
}

View File

@ -0,0 +1,326 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
mod protocol;
use crate::{EMPTY_QUEUE_SHRINK_THRESHOLD, RequestId};
use crate::codec::RequestResponseCodec;
pub use protocol::{RequestProtocol, ResponseProtocol, ProtocolSupport};
use futures::{
channel::oneshot,
future::BoxFuture,
prelude::*,
stream::FuturesUnordered
};
use libp2p_core::{
upgrade::{UpgradeError, NegotiationError},
};
use libp2p_swarm::{
SubstreamProtocol,
protocols_handler::{
KeepAlive,
ProtocolsHandler,
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr,
}
};
use smallvec::SmallVec;
use std::{
collections::VecDeque,
io,
time::Duration,
task::{Context, Poll}
};
use wasm_timer::Instant;
/// A connection handler of a `RequestResponse` protocol.
#[doc(hidden)]
pub struct RequestResponseHandler<TCodec>
where
TCodec: RequestResponseCodec,
{
/// The supported inbound protocols.
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
/// The request/response message codec.
codec: TCodec,
/// The keep-alive timeout of idle connections. A connection is considered
/// idle if there are no outbound substreams.
keep_alive_timeout: Duration,
/// The timeout for inbound and outbound substreams (i.e. request
/// and response processing).
substream_timeout: Duration,
/// The current connection keep-alive.
keep_alive: KeepAlive,
/// A pending fatal error that results in the connection being closed.
pending_error: Option<ProtocolsHandlerUpgrErr<io::Error>>,
/// Queue of events to emit in `poll()`.
pending_events: VecDeque<RequestResponseHandlerEvent<TCodec>>,
/// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
outbound: VecDeque<RequestProtocol<TCodec>>,
/// Inbound upgrades waiting for the incoming request.
inbound: FuturesUnordered<BoxFuture<'static,
Result<
(TCodec::Request, oneshot::Sender<TCodec::Response>),
oneshot::Canceled
>>>,
}
impl<TCodec> RequestResponseHandler<TCodec>
where
TCodec: RequestResponseCodec,
{
pub(super) fn new(
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
codec: TCodec,
keep_alive_timeout: Duration,
substream_timeout: Duration,
) -> Self {
Self {
inbound_protocols,
codec,
keep_alive: KeepAlive::Yes,
keep_alive_timeout,
substream_timeout,
outbound: VecDeque::new(),
inbound: FuturesUnordered::new(),
pending_events: VecDeque::new(),
pending_error: None,
}
}
}
/// The events emitted by the [`RequestResponseHandler`].
#[doc(hidden)]
pub enum RequestResponseHandlerEvent<TCodec>
where
TCodec: RequestResponseCodec
{
/// An inbound request.
Request {
request: TCodec::Request,
sender: oneshot::Sender<TCodec::Response>
},
/// An inbound response.
Response {
request_id: RequestId,
response: TCodec::Response
},
/// An outbound upgrade (i.e. request) timed out.
OutboundTimeout(RequestId),
/// An outbound request failed to negotiate a mutually supported protocol.
OutboundUnsupportedProtocols(RequestId),
/// An inbound request timed out.
InboundTimeout,
/// An inbound request failed to negotiate a mutually supported protocol.
InboundUnsupportedProtocols,
}
impl<TCodec> ProtocolsHandler for RequestResponseHandler<TCodec>
where
TCodec: RequestResponseCodec + Send + Clone + 'static,
{
type InEvent = RequestProtocol<TCodec>;
type OutEvent = RequestResponseHandlerEvent<TCodec>;
type Error = ProtocolsHandlerUpgrErr<io::Error>;
type InboundProtocol = ResponseProtocol<TCodec>;
type OutboundProtocol = RequestProtocol<TCodec>;
type OutboundOpenInfo = RequestId;
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
// A channel for notifying the handler when the inbound
// upgrade received the request.
let (rq_send, rq_recv) = oneshot::channel();
// A channel for notifying the inbound upgrade when the
// response is sent.
let (rs_send, rs_recv) = oneshot::channel();
// By keeping all I/O inside the `ResponseProtocol` and thus the
// inbound substream upgrade via above channels, we ensure that it
// is all subject to the configured timeout without extra bookkeeping
// for inbound substreams as well as their timeouts and also make the
// implementation of inbound and outbound upgrades symmetric in
// this sense.
let proto = ResponseProtocol {
protocols: self.inbound_protocols.clone(),
codec: self.codec.clone(),
request_sender: rq_send,
response_receiver: rs_recv,
};
// The handler waits for the request to come in. It then emits
// `RequestResponseHandlerEvent::Request` together with a
// `ResponseChannel`.
self.inbound.push(rq_recv.map_ok(move |rq| (rq, rs_send)).boxed());
SubstreamProtocol::new(proto).with_timeout(self.substream_timeout)
}
fn inject_fully_negotiated_inbound(
&mut self,
(): (),
) {
// Nothing to do, as the response has already been sent
// as part of the upgrade.
}
fn inject_fully_negotiated_outbound(
&mut self,
response: TCodec::Response,
request_id: RequestId,
) {
self.pending_events.push_back(
RequestResponseHandlerEvent::Response {
request_id, response
});
}
fn inject_event(&mut self, request: Self::InEvent) {
self.keep_alive = KeepAlive::Yes;
self.outbound.push_back(request);
}
fn inject_dial_upgrade_error(
&mut self,
info: RequestId,
error: ProtocolsHandlerUpgrErr<io::Error>,
) {
match error {
ProtocolsHandlerUpgrErr::Timeout => {
self.pending_events.push_back(
RequestResponseHandlerEvent::OutboundTimeout(info));
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The remote merely doesn't support the protocol(s) we requested.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the remote peer does not support the requested protocol(s).
self.pending_events.push_back(
RequestResponseHandlerEvent::OutboundUnsupportedProtocols(info));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
}
}
}
fn inject_listen_upgrade_error(
&mut self,
error: ProtocolsHandlerUpgrErr<io::Error>
) {
match error {
ProtocolsHandlerUpgrErr::Timeout => {
self.pending_events.push_back(
RequestResponseHandlerEvent::InboundTimeout);
}
ProtocolsHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
// The local peer merely doesn't support the protocol(s) requested.
// This is no reason to close the connection, which may
// successfully communicate with other protocols already.
// An event is reported to permit user code to react to the fact that
// the local peer does not support the requested protocol(s).
self.pending_events.push_back(
RequestResponseHandlerEvent::InboundUnsupportedProtocols);
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
}
}
}
fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive
}
fn poll(
&mut self,
cx: &mut Context,
) -> Poll<
ProtocolsHandlerEvent<RequestProtocol<TCodec>, RequestId, Self::OutEvent, Self::Error>,
> {
// Check for a pending (fatal) error.
if let Some(err) = self.pending_error.take() {
// The handler will not be polled again by the `Swarm`.
return Poll::Ready(ProtocolsHandlerEvent::Close(err))
}
// Drain pending events.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(ProtocolsHandlerEvent::Custom(event))
} else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.pending_events.shrink_to_fit();
}
// Check for inbound requests.
while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) {
match result {
Ok((rq, rs_sender)) => {
// We received an inbound request.
self.keep_alive = KeepAlive::Yes;
return Poll::Ready(ProtocolsHandlerEvent::Custom(
RequestResponseHandlerEvent::Request {
request: rq, sender: rs_sender
}))
}
Err(oneshot::Canceled) => {
// The inbound upgrade has errored or timed out reading
// or waiting for the request. The handler is informed
// via `inject_listen_upgrade_error`.
}
}
}
// Emit outbound requests.
if let Some(request) = self.outbound.pop_front() {
let info = request.request_id;
return Poll::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(request)
.with_timeout(self.substream_timeout),
info,
},
)
}
debug_assert!(self.outbound.is_empty());
if self.outbound.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.outbound.shrink_to_fit();
}
if self.inbound.is_empty() {
// No new inbound or outbound requests. However, we may just have
// started the latest inbound or outbound upgrade(s), so make sure
// the keep-alive timeout is preceded by the substream timeout.
let until = Instant::now() + self.substream_timeout + self.keep_alive_timeout;
self.keep_alive = KeepAlive::Until(until);
}
Poll::Pending
}
}

View File

@ -0,0 +1,165 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! The definition of a request/response protocol via inbound
//! and outbound substream upgrades. The inbound upgrade
//! receives a request and sends a response, whereas the
//! outbound upgrade send a request and receives a response.
use crate::RequestId;
use crate::codec::RequestResponseCodec;
use futures::{
channel::oneshot,
future::BoxFuture,
prelude::*,
};
use libp2p_core::{
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
};
use libp2p_swarm::{
NegotiatedSubstream,
};
use smallvec::SmallVec;
use std::io;
/// The level of support for a particular protocol.
#[derive(Debug, Clone)]
pub enum ProtocolSupport {
/// The protocol is only supported for inbound requests.
Inbound,
/// The protocol is only supported for outbound requests.
Outbound,
/// The protocol is supported for inbound and outbound requests.
Full
}
impl ProtocolSupport {
/// Whether inbound requests are supported.
pub fn inbound(&self) -> bool {
match self {
ProtocolSupport::Inbound | ProtocolSupport::Full => true,
ProtocolSupport::Outbound => false,
}
}
/// Whether outbound requests are supported.
pub fn outbound(&self) -> bool {
match self {
ProtocolSupport::Outbound | ProtocolSupport::Full => true,
ProtocolSupport::Inbound => false,
}
}
}
/// Response substream upgrade protocol.
///
/// Receives a request and sends a response.
#[derive(Debug)]
pub struct ResponseProtocol<TCodec>
where
TCodec: RequestResponseCodec
{
pub(crate) codec: TCodec,
pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
pub(crate) request_sender: oneshot::Sender<TCodec::Request>,
pub(crate) response_receiver: oneshot::Receiver<TCodec::Response>
}
impl<TCodec> UpgradeInfo for ResponseProtocol<TCodec>
where
TCodec: RequestResponseCodec
{
type Info = TCodec::Protocol;
type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
fn protocol_info(&self) -> Self::InfoIter {
self.protocols.clone().into_iter()
}
}
impl<TCodec> InboundUpgrade<NegotiatedSubstream> for ResponseProtocol<TCodec>
where
TCodec: RequestResponseCodec + Send + 'static,
{
type Output = ();
type Error = io::Error;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_inbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future {
async move {
let read = self.codec.read_request(&protocol, &mut io);
let request = read.await?;
if let Ok(()) = self.request_sender.send(request) {
if let Ok(response) = self.response_receiver.await {
let write = self.codec.write_response(&protocol, &mut io, response);
write.await?;
}
}
Ok(())
}.boxed()
}
}
/// Request substream upgrade protocol.
///
/// Sends a request and receives a response.
#[derive(Debug, Clone)]
pub struct RequestProtocol<TCodec>
where
TCodec: RequestResponseCodec
{
pub(crate) codec: TCodec,
pub(crate) protocols: SmallVec<[TCodec::Protocol; 2]>,
pub(crate) request_id: RequestId,
pub(crate) request: TCodec::Request,
}
impl<TCodec> UpgradeInfo for RequestProtocol<TCodec>
where
TCodec: RequestResponseCodec
{
type Info = TCodec::Protocol;
type InfoIter = smallvec::IntoIter<[Self::Info; 2]>;
fn protocol_info(&self) -> Self::InfoIter {
self.protocols.clone().into_iter()
}
}
impl<TCodec> OutboundUpgrade<NegotiatedSubstream> for RequestProtocol<TCodec>
where
TCodec: RequestResponseCodec + Send + 'static,
{
type Output = TCodec::Response;
type Error = io::Error;
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;
fn upgrade_outbound(mut self, mut io: NegotiatedSubstream, protocol: Self::Info) -> Self::Future {
async move {
let write = self.codec.write_request(&protocol, &mut io, self.request);
write.await?;
let read = self.codec.read_response(&protocol, &mut io);
let response = read.await?;
Ok(response)
}.boxed()
}
}

View File

@ -0,0 +1,607 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Generic request/response protocols.
//!
//! ## General Usage
//!
//! [`RequestResponse`] is a `NetworkBehaviour` that implements a generic
//! request/response protocol or protocol family, whereby each request is
//! sent over a new substream on a connection. `RequestResponse` is generic
//! over the actual messages being sent, which are defined in terms of a
//! [`RequestResponseCodec`]. Creating a request/response protocol thus amounts
//! to providing an implementation of this trait which can then be
//! given to [`RequestResponse::new`]. Further configuration options are
//! available via the [`RequestResponseConfig`].
//!
//! Requests are sent using [`RequestResponse::send_request`] and the
//! responses received as [`RequestResponseMessage::Response`] via
//! [`RequestResponseEvent::Message`].
//!
//! Responses are sent using [`RequestResponse::send_response`] upon
//! receiving a [`RequestResponseMessage::Request`] via
//! [`RequestResponseEvent::Message`].
//!
//! ## Protocol Families
//!
//! A single [`RequestResponse`] instance can be used with an entire
//! protocol family that share the same request and response types.
//! For that purpose, [`RequestResponseCodec::Protocol`] is typically
//! instantiated with a sum type.
//!
//! ## One-Way Protocols
//!
//! The implementation supports one-way protocols that do not
//! have responses. In these cases the [`RequestResponseCodec::Response`] can
//! be defined as `()` and [`RequestResponseCodec::read_response`] as well as
//! [`RequestResponseCodec::write_response`] given the obvious implementations.
//! Note that `RequestResponseMessage::Response` will still be emitted,
//! immediately after the request has been sent, since `RequestResponseCodec::read_response`
//! will not actually read anything from the given I/O stream.
//! [`RequestResponse::send_response`] need not be called for one-way protocols,
//! i.e. the [`ResponseChannel`] may just be dropped.
//!
//! ## Limited Protocol Support
//!
//! It is possible to only support inbound or outbound requests for
//! a particular protocol. This is achieved by instantiating `RequestResponse`
//! with protocols using [`ProtocolSupport::Inbound`] or
//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol
//! family can be configured in this way. Such protocols will not be
//! advertised during inbound respectively outbound protocol negotiation
//! on the substreams.
pub mod codec;
pub mod handler;
pub use codec::{RequestResponseCodec, ProtocolName};
pub use handler::ProtocolSupport;
use futures::{
channel::oneshot,
};
use handler::{
RequestProtocol,
RequestResponseHandler,
RequestResponseHandlerEvent,
};
use libp2p_core::{
ConnectedPoint,
Multiaddr,
PeerId,
connection::ConnectionId,
};
use libp2p_swarm::{
DialPeerCondition,
NetworkBehaviour,
NetworkBehaviourAction,
NotifyHandler,
PollParameters,
};
use smallvec::SmallVec;
use std::{
collections::{VecDeque, HashMap},
time::Duration,
task::{Context, Poll}
};
/// An inbound request or response.
#[derive(Debug)]
pub enum RequestResponseMessage<TRequest, TResponse> {
/// A request message.
Request {
/// The request message.
request: TRequest,
/// The sender of the request who is awaiting a response.
///
/// See [`RequestResponse::send_response`].
channel: ResponseChannel<TResponse>,
},
/// A response message.
Response {
/// The ID of the request that produced this response.
///
/// See [`RequestResponse::send_request`].
request_id: RequestId,
/// The response message.
response: TResponse
},
}
/// The events emitted by a [`RequestResponse`] protocol.
#[derive(Debug)]
pub enum RequestResponseEvent<TRequest, TResponse> {
/// An incoming message (request or response).
Message {
/// The peer who sent the message.
peer: PeerId,
/// The incoming message.
message: RequestResponseMessage<TRequest, TResponse>
},
/// An outbound request failed.
OutboundFailure {
/// The peer to whom the request was sent.
peer: PeerId,
/// The (local) ID of the failed request.
request_id: RequestId,
/// The error that occurred.
error: OutboundFailure,
},
/// An inbound request failed.
InboundFailure {
/// The peer from whom the request was received.
peer: PeerId,
/// The error that occurred.
error: InboundFailure,
},
}
/// Possible failures occurring in the context of sending
/// an outbound request and receiving the response.
#[derive(Debug)]
pub enum OutboundFailure {
/// The request could not be sent because a dialing attempt failed.
DialFailure,
/// The request timed out before a response was received.
///
/// It is not known whether the request may have been
/// received (and processed) by the remote peer.
Timeout,
/// The connection closed before a response was received.
///
/// It is not known whether the request may have been
/// received (and processed) by the remote peer.
ConnectionClosed,
/// The remote supports none of the requested protocols.
UnsupportedProtocols,
}
/// Possible failures occurring in the context of receiving an
/// inbound request and sending a response.
#[derive(Debug)]
pub enum InboundFailure {
/// The inbound request timed out, either while reading the
/// incoming request or before a response is sent, i.e. if
/// [`RequestResponse::send_response`] is not called in a
/// timely manner.
Timeout,
/// The local peer supports none of the requested protocols.
UnsupportedProtocols,
}
/// A channel for sending a response to an inbound request.
///
/// See [`RequestResponse::send_response`].
#[derive(Debug)]
pub struct ResponseChannel<TResponse> {
peer: PeerId,
sender: oneshot::Sender<TResponse>,
}
impl<TResponse> ResponseChannel<TResponse> {
/// Checks whether the response channel is still open, i.e.
/// the `RequestResponse` behaviour is still waiting for a
/// a response to be sent via [`RequestResponse::send_response`]
/// and this response channel.
///
/// If the response channel is no longer open then the inbound
/// request timed out waiting for the response.
pub fn is_open(&self) -> bool {
!self.sender.is_canceled()
}
}
/// The (local) ID of an outgoing request.
///
/// See [`RequestResponse::send_request`].
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub struct RequestId(u64);
/// The configuration for a `RequestResponse` protocol.
#[derive(Debug, Clone)]
pub struct RequestResponseConfig {
request_timeout: Duration,
connection_keep_alive: Duration,
}
impl Default for RequestResponseConfig {
fn default() -> Self {
Self {
connection_keep_alive: Duration::from_secs(10),
request_timeout: Duration::from_secs(10),
}
}
}
impl RequestResponseConfig {
/// Sets the keep-alive timeout of idle connections.
pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self {
self.connection_keep_alive = v;
self
}
/// Sets the timeout for inbound and outbound requests.
pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self {
self.request_timeout = v;
self
}
}
/// A request/response protocol for some message codec.
pub struct RequestResponse<TCodec>
where
TCodec: RequestResponseCodec,
{
/// The supported inbound protocols.
inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
/// The supported outbound protocols.
outbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
/// The next (local) request ID.
next_request_id: RequestId,
/// The protocol configuration.
config: RequestResponseConfig,
/// The protocol codec for reading and writing requests and responses.
codec: TCodec,
/// Pending events to return from `poll`.
pending_events: VecDeque<
NetworkBehaviourAction<
RequestProtocol<TCodec>,
RequestResponseEvent<TCodec::Request, TCodec::Response>>>,
/// The currently connected peers and their known, reachable addresses, if any.
connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
/// Externally managed addresses via `add_address` and `remove_address`.
addresses: HashMap<PeerId, SmallVec<[Multiaddr; 6]>>,
/// Requests that have not yet been sent and are waiting for a connection
/// to be established.
pending_requests: HashMap<PeerId, SmallVec<[RequestProtocol<TCodec>; 10]>>,
/// Responses that have not yet been received.
pending_responses: HashMap<RequestId, (PeerId, ConnectionId)>,
}
impl<TCodec> RequestResponse<TCodec>
where
TCodec: RequestResponseCodec + Clone,
{
/// Creates a new `RequestResponse` behaviour for the given
/// protocols, codec and configuration.
pub fn new<I>(codec: TCodec, protocols: I, cfg: RequestResponseConfig) -> Self
where
I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>
{
let mut inbound_protocols = SmallVec::new();
let mut outbound_protocols = SmallVec::new();
for (p, s) in protocols {
if s.inbound() {
inbound_protocols.push(p.clone());
}
if s.outbound() {
outbound_protocols.push(p.clone());
}
}
RequestResponse {
inbound_protocols,
outbound_protocols,
next_request_id: RequestId(1),
config: cfg,
codec,
pending_events: VecDeque::new(),
connected: HashMap::new(),
pending_requests: HashMap::new(),
pending_responses: HashMap::new(),
addresses: HashMap::new(),
}
}
/// Initiates sending a request.
///
/// If the targeted peer is currently not connected, a dialing
/// attempt is initiated and the request is sent as soon as a
/// connection is established.
///
/// > **Note**: In order for such a dialing attempt to succeed,
/// > the `RequestResonse` protocol must either be embedded
/// > in another `NetworkBehaviour` that provides peer and
/// > address discovery, or known addresses of peers must be
/// > managed via [`RequestResponse::add_address`] and
/// > [`RequestResponse::remove_address`].
pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId {
let request_id = self.next_request_id();
let request = RequestProtocol {
request_id,
codec: self.codec.clone(),
protocols: self.outbound_protocols.clone(),
request,
};
if let Some(request) = self.try_send_request(peer, request) {
self.pending_events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: peer.clone(),
condition: DialPeerCondition::Disconnected,
});
self.pending_requests.entry(peer.clone()).or_default().push(request);
}
request_id
}
/// Initiates sending a response to an inbound request.
///
/// If the `ResponseChannel` is already closed due to a timeout,
/// the response is discarded and eventually [`RequestResponseEvent::InboundFailure`]
/// is emitted by `RequestResponse::poll`.
///
/// The provided `ResponseChannel` is obtained from a
/// [`RequestResponseMessage::Request`].
pub fn send_response(&mut self, ch: ResponseChannel<TCodec::Response>, rs: TCodec::Response) {
// Fails only if the inbound upgrade timed out waiting for the response,
// in which case the handler emits `RequestResponseHandlerEvent::InboundTimeout`
// which in turn results in `RequestResponseEvent::InboundFailure`.
let _ = ch.sender.send(rs);
}
/// Adds a known address for a peer that can be used for
/// dialing attempts by the `Swarm`, i.e. is returned
/// by [`NetworkBehaviour::addresses_of_peer`].
///
/// Addresses added in this way are only removed by `remove_address`.
pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) {
self.addresses.entry(peer.clone()).or_default().push(address);
}
/// Removes an address of a peer previously added via `add_address`.
pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) {
let mut last = false;
if let Some(addresses) = self.addresses.get_mut(peer) {
addresses.retain(|a| a != address);
last = addresses.is_empty();
}
if last {
self.addresses.remove(peer);
}
}
/// Checks whether a peer is currently connected.
pub fn is_connected(&self, peer: &PeerId) -> bool {
self.connected.contains_key(peer)
}
/// Checks whether an outbound request initiated by
/// [`RequestResponse::send_request`] is still pending, i.e. waiting
/// for a response.
pub fn is_pending(&self, req_id: &RequestId) -> bool {
self.pending_responses.contains_key(req_id)
}
/// Returns the next request ID.
fn next_request_id(&mut self) -> RequestId {
let request_id = self.next_request_id;
self.next_request_id.0 += 1;
request_id
}
/// Tries to send a request by queueing an appropriate event to be
/// emitted to the `Swarm`. If the peer is not currently connected,
/// the given request is return unchanged.
fn try_send_request(&mut self, peer: &PeerId, request: RequestProtocol<TCodec>)
-> Option<RequestProtocol<TCodec>>
{
if let Some(connections) = self.connected.get(peer) {
let ix = (request.request_id.0 as usize) % connections.len();
let conn = connections[ix].id;
self.pending_responses.insert(request.request_id, (peer.clone(), conn));
self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer.clone(),
handler: NotifyHandler::One(conn),
event: request
});
None
} else {
Some(request)
}
}
}
impl<TCodec> NetworkBehaviour for RequestResponse<TCodec>
where
TCodec: RequestResponseCodec + Send + Clone + 'static,
{
type ProtocolsHandler = RequestResponseHandler<TCodec>;
type OutEvent = RequestResponseEvent<TCodec::Request, TCodec::Response>;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
RequestResponseHandler::new(
self.inbound_protocols.clone(),
self.codec.clone(),
self.config.connection_keep_alive,
self.config.request_timeout,
)
}
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
let mut addresses = Vec::new();
if let Some(connections) = self.connected.get(peer) {
addresses.extend(connections.iter().filter_map(|c| c.address.clone()))
}
if let Some(more) = self.addresses.get(peer) {
addresses.extend(more.into_iter().cloned());
}
addresses
}
fn inject_connected(&mut self, peer: &PeerId) {
if let Some(pending) = self.pending_requests.remove(peer) {
for request in pending {
let request = self.try_send_request(peer, request);
assert!(request.is_none());
}
}
}
fn inject_connection_established(&mut self, peer: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
let address = match endpoint {
ConnectedPoint::Dialer { address } => Some(address.clone()),
ConnectedPoint::Listener { .. } => None
};
let connections = self.connected.entry(peer.clone()).or_default();
connections.push(Connection { id: *conn, address })
}
fn inject_connection_closed(&mut self, peer: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) {
if let Some(connections) = self.connected.get_mut(peer) {
if let Some(pos) = connections.iter().position(|c| &c.id == conn) {
connections.remove(pos);
}
}
// Any pending responses of requests sent over this connection
// must be considered failed.
let failed = self.pending_responses.iter()
.filter_map(|(r, (p, c))|
if conn == c {
Some((p.clone(), *r))
} else {
None
})
.collect::<Vec<_>>();
for (peer, request_id) in failed {
self.pending_responses.remove(&request_id);
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
peer,
request_id,
error: OutboundFailure::ConnectionClosed
}
));
}
}
fn inject_disconnected(&mut self, peer: &PeerId) {
self.connected.remove(peer);
}
fn inject_dial_failure(&mut self, peer: &PeerId) {
// If there are pending outgoing requests when a dial failure occurs,
// it is implied that we are not connected to the peer, since pending
// outgoing requests are drained when a connection is established and
// only created when a peer is not connected when a request is made.
// Thus these requests must be considered failed, even if there is
// another, concurrent dialing attempt ongoing.
if let Some(pending) = self.pending_requests.remove(peer) {
for request in pending {
self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
peer: peer.clone(),
request_id: request.request_id,
error: OutboundFailure::DialFailure
}
));
}
}
}
fn inject_event(
&mut self,
peer: PeerId,
_: ConnectionId,
event: RequestResponseHandlerEvent<TCodec>,
) {
match event {
RequestResponseHandlerEvent::Response { request_id, response } => {
self.pending_responses.remove(&request_id);
let message = RequestResponseMessage::Response { request_id, response };
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::Message { peer, message }));
}
RequestResponseHandlerEvent::Request { request, sender } => {
let channel = ResponseChannel { peer: peer.clone(), sender };
let message = RequestResponseMessage::Request { request, channel };
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::Message { peer, message }));
}
RequestResponseHandlerEvent::OutboundTimeout(request_id) => {
if let Some((peer, _conn)) = self.pending_responses.remove(&request_id) {
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
peer,
request_id,
error: OutboundFailure::Timeout,
}));
}
}
RequestResponseHandlerEvent::InboundTimeout => {
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
peer,
error: InboundFailure::Timeout,
}));
}
RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => {
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::OutboundFailure {
peer,
request_id,
error: OutboundFailure::UnsupportedProtocols,
}));
}
RequestResponseHandlerEvent::InboundUnsupportedProtocols => {
self.pending_events.push_back(
NetworkBehaviourAction::GenerateEvent(
RequestResponseEvent::InboundFailure {
peer,
error: InboundFailure::UnsupportedProtocols,
}));
}
}
}
fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters)
-> Poll<NetworkBehaviourAction<
RequestProtocol<TCodec>,
RequestResponseEvent<TCodec::Request, TCodec::Response>
>>
{
if let Some(ev) = self.pending_events.pop_front() {
return Poll::Ready(ev);
} else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
self.pending_events.shrink_to_fit();
}
Poll::Pending
}
}
/// Internal threshold for when to shrink the capacity
/// of empty queues. If the capacity of an empty queue
/// exceeds this threshold, the associated memory is
/// released.
const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
/// Internal information tracked for an established connection.
struct Connection {
id: ConnectionId,
address: Option<Multiaddr>,
}

View File

@ -0,0 +1,195 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Integration tests for the `RequestResponse` network behaviour.
use async_trait::async_trait;
use libp2p_core::{
Multiaddr,
PeerId,
identity,
muxing::StreamMuxerBox,
transport::{Transport, boxed::Boxed},
upgrade::{self, read_one, write_one}
};
use libp2p_noise::{NoiseConfig, X25519Spec, Keypair};
use libp2p_request_response::*;
use libp2p_swarm::Swarm;
use libp2p_tcp::TcpConfig;
use futures::{prelude::*, channel::mpsc};
use rand::{self, Rng};
use std::{io, iter};
/// Exercises a simple ping protocol.
#[test]
fn ping_protocol() {
let num_pings: u8 = rand::thread_rng().gen_range(1, 100);
let ping = Ping("ping".to_string().into_bytes());
let pong = Pong("pong".to_string().into_bytes());
let protocols = iter::once((PingProtocol(), ProtocolSupport::Full));
let cfg = RequestResponseConfig::default();
let (peer1_id, trans) = mk_transport();
let ping_proto1 = RequestResponse::new(PingCodec(), protocols.clone(), cfg.clone());
let mut swarm1 = Swarm::new(trans, ping_proto1, peer1_id.clone());
let (peer2_id, trans) = mk_transport();
let ping_proto2 = RequestResponse::new(PingCodec(), protocols, cfg);
let mut swarm2 = Swarm::new(trans, ping_proto2, peer2_id.clone());
let (mut tx, mut rx) = mpsc::channel::<Multiaddr>(1);
let addr = "/ip4/127.0.0.1/tcp/0".parse().unwrap();
Swarm::listen_on(&mut swarm1, addr).unwrap();
let expected_ping = ping.clone();
let expected_pong = pong.clone();
let peer1 = async move {
while let Some(_) = swarm1.next().now_or_never() {}
let l = Swarm::listeners(&swarm1).next().unwrap();
tx.send(l.clone()).await.unwrap();
loop {
match swarm1.next().await {
RequestResponseEvent::Message {
peer,
message: RequestResponseMessage::Request { request, channel }
} => {
assert_eq!(&request, &expected_ping);
assert_eq!(&peer, &peer2_id);
swarm1.send_response(channel, pong.clone());
},
e => panic!("Peer1: Unexpected event: {:?}", e)
}
}
};
let peer2 = async move {
let mut count = 0;
let addr = rx.next().await.unwrap();
swarm2.add_address(&peer1_id, addr.clone());
let mut req_id = swarm2.send_request(&peer1_id, ping.clone());
loop {
match swarm2.next().await {
RequestResponseEvent::Message {
peer,
message: RequestResponseMessage::Response { request_id, response }
} => {
count += 1;
assert_eq!(&response, &expected_pong);
assert_eq!(&peer, &peer1_id);
assert_eq!(req_id, request_id);
if count >= num_pings {
return
} else {
req_id = swarm2.send_request(&peer1_id, ping.clone());
}
},
e => panic!("Peer2: Unexpected event: {:?}", e)
}
}
};
async_std::task::spawn(Box::pin(peer1));
let () = async_std::task::block_on(peer2);
}
fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) {
let id_keys = identity::Keypair::generate_ed25519();
let peer_id = id_keys.public().into_peer_id();
let noise_keys = Keypair::<X25519Spec>::new().into_authentic(&id_keys).unwrap();
let transport = TcpConfig::new()
.nodelay(true)
.upgrade(upgrade::Version::V1)
.authenticate(NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(libp2p_yamux::Config::default())
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();
(peer_id, transport)
}
// Simple Ping-Pong Protocol
#[derive(Debug, Clone)]
struct PingProtocol();
#[derive(Clone)]
struct PingCodec();
#[derive(Debug, Clone, PartialEq, Eq)]
struct Ping(Vec<u8>);
#[derive(Debug, Clone, PartialEq, Eq)]
struct Pong(Vec<u8>);
impl ProtocolName for PingProtocol {
fn protocol_name(&self) -> &[u8] {
"/ping/1".as_bytes()
}
}
#[async_trait]
impl RequestResponseCodec for PingCodec {
type Protocol = PingProtocol;
type Request = Ping;
type Response = Pong;
async fn read_request<T>(&mut self, _: &PingProtocol, io: &mut T)
-> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send
{
read_one(io, 1024)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
.map_ok(Ping)
.await
}
async fn read_response<T>(&mut self, _: &PingProtocol, io: &mut T)
-> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send
{
read_one(io, 1024)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
.map_ok(Pong)
.await
}
async fn write_request<T>(&mut self, _: &PingProtocol, io: &mut T, Ping(data): Ping)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send
{
write_one(io, data).await
}
async fn write_response<T>(&mut self, _: &PingProtocol, io: &mut T, Pong(data): Pong)
-> io::Result<()>
where
T: AsyncWrite + Unpin + Send
{
write_one(io, data).await
}
}

View File

@ -1,3 +1,10 @@
# 0.20.0 [????-??-??]
- Add `ProtocolsHandler::inject_listen_upgrade_error`, the inbound
analogue of `ProtocolsHandler::inject_dial_upgrade_error`, with an
empty default implementation. No implementation is required to
retain existing behaviour.
# 0.19.1 [2020-06-18] # 0.19.1 [2020-06-18]
- Bugfix: Fix MultiHandler panicking when empty - Bugfix: Fix MultiHandler panicking when empty

View File

@ -140,7 +140,7 @@ pub trait ProtocolsHandler: Send + 'static {
/// Injects an event coming from the outside in the handler. /// Injects an event coming from the outside in the handler.
fn inject_event(&mut self, event: Self::InEvent); fn inject_event(&mut self, event: Self::InEvent);
/// Indicates to the handler that upgrading a substream to the given protocol has failed. /// Indicates to the handler that upgrading an outbound substream to the given protocol has failed.
fn inject_dial_upgrade_error( fn inject_dial_upgrade_error(
&mut self, &mut self,
info: Self::OutboundOpenInfo, info: Self::OutboundOpenInfo,
@ -149,6 +149,14 @@ pub trait ProtocolsHandler: Send + 'static {
> >
); );
/// Indicates to the handler that upgrading an inbound substream to the given protocol has failed.
fn inject_listen_upgrade_error(
&mut self,
_: ProtocolsHandlerUpgrErr<
<Self::InboundProtocol as InboundUpgradeSend>::Error
>
) {}
/// Returns until when the connection should be kept alive. /// Returns until when the connection should be kept alive.
/// ///
/// This method is called by the `Swarm` after each invocation of /// This method is called by the `Swarm` after each invocation of
@ -236,7 +244,7 @@ pub struct SubstreamProtocol<TUpgrade> {
} }
impl<TUpgrade> SubstreamProtocol<TUpgrade> { impl<TUpgrade> SubstreamProtocol<TUpgrade> {
/// Create a new `ListenProtocol` from the given upgrade. /// Create a new `SubstreamProtocol` from the given upgrade.
/// ///
/// The default timeout for applying the given upgrade on a substream is /// The default timeout for applying the given upgrade on a substream is
/// 10 seconds. /// 10 seconds.

View File

@ -228,15 +228,26 @@ where
for n in (0..self.negotiating_in.len()).rev() { for n in (0..self.negotiating_in.len()).rev() {
let (mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n); let (mut in_progress, mut timeout) = self.negotiating_in.swap_remove(n);
match Future::poll(Pin::new(&mut timeout), cx) { match Future::poll(Pin::new(&mut timeout), cx) {
Poll::Ready(_) => continue, Poll::Ready(Ok(_)) => {
let err = ProtocolsHandlerUpgrErr::Timeout;
self.handler.inject_listen_upgrade_error(err);
continue
}
Poll::Ready(Err(_)) => {
let err = ProtocolsHandlerUpgrErr::Timer;
self.handler.inject_listen_upgrade_error(err);
continue;
}
Poll::Pending => {}, Poll::Pending => {},
} }
match Future::poll(Pin::new(&mut in_progress), cx) { match Future::poll(Pin::new(&mut in_progress), cx) {
Poll::Ready(Ok(upgrade)) => Poll::Ready(Ok(upgrade)) =>
self.handler.inject_fully_negotiated_inbound(upgrade), self.handler.inject_fully_negotiated_inbound(upgrade),
Poll::Pending => self.negotiating_in.push((in_progress, timeout)), Poll::Pending => self.negotiating_in.push((in_progress, timeout)),
// TODO: return a diagnostic event? Poll::Ready(Err(err)) => {
Poll::Ready(Err(_err)) => {} let err = ProtocolsHandlerUpgrErr::Upgrade(err);
self.handler.inject_listen_upgrade_error(err);
}
} }
} }

View File

@ -31,23 +31,20 @@ use smallvec::SmallVec;
use std::{error, task::Context, task::Poll, time::Duration}; use std::{error, task::Context, task::Poll, time::Duration};
use wasm_timer::Instant; use wasm_timer::Instant;
/// Implementation of `ProtocolsHandler` that opens a new substream for each individual message. /// A `ProtocolsHandler` that opens a new substream for each request.
///
/// This struct is meant to be a helper for other implementations to use.
// TODO: Debug // TODO: Debug
pub struct OneShotHandler<TInProto, TOutProto, TOutEvent> pub struct OneShotHandler<TInbound, TOutbound, TEvent>
where where
TOutProto: OutboundUpgradeSend, TOutbound: OutboundUpgradeSend,
{ {
/// The upgrade for inbound substreams. /// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<TInProto>, listen_protocol: SubstreamProtocol<TInbound>,
/// If `Some`, something bad happened and we should shut down the handler with an error. /// If `Some`, something bad happened and we should shut down the handler with an error.
pending_error: pending_error: Option<ProtocolsHandlerUpgrErr<<TOutbound as OutboundUpgradeSend>::Error>>,
Option<ProtocolsHandlerUpgrErr<<TOutProto as OutboundUpgradeSend>::Error>>,
/// Queue of events to produce in `poll()`. /// Queue of events to produce in `poll()`.
events_out: SmallVec<[TOutEvent; 4]>, events_out: SmallVec<[TEvent; 4]>,
/// Queue of outbound substreams to open. /// Queue of outbound substreams to open.
dial_queue: SmallVec<[TOutProto; 4]>, dial_queue: SmallVec<[TOutbound; 4]>,
/// Current number of concurrent outbound substreams being opened. /// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32, dial_negotiated: u32,
/// Maximum number of concurrent outbound substreams being opened. Value is never modified. /// Maximum number of concurrent outbound substreams being opened. Value is never modified.
@ -58,15 +55,14 @@ where
config: OneShotHandlerConfig, config: OneShotHandlerConfig,
} }
impl<TInProto, TOutProto, TOutEvent> impl<TInbound, TOutbound, TEvent>
OneShotHandler<TInProto, TOutProto, TOutEvent> OneShotHandler<TInbound, TOutbound, TEvent>
where where
TOutProto: OutboundUpgradeSend, TOutbound: OutboundUpgradeSend,
{ {
/// Creates a `OneShotHandler`. /// Creates a `OneShotHandler`.
#[inline]
pub fn new( pub fn new(
listen_protocol: SubstreamProtocol<TInProto>, listen_protocol: SubstreamProtocol<TInbound>,
config: OneShotHandlerConfig, config: OneShotHandlerConfig,
) -> Self { ) -> Self {
OneShotHandler { OneShotHandler {
@ -77,12 +73,11 @@ where
dial_negotiated: 0, dial_negotiated: 0,
max_dial_negotiated: 8, max_dial_negotiated: 8,
keep_alive: KeepAlive::Yes, keep_alive: KeepAlive::Yes,
config config,
} }
} }
/// Returns the number of pending requests. /// Returns the number of pending requests.
#[inline]
pub fn pending_requests(&self) -> u32 { pub fn pending_requests(&self) -> u32 {
self.dial_negotiated + self.dial_queue.len() as u32 self.dial_negotiated + self.dial_queue.len() as u32
} }
@ -91,8 +86,7 @@ where
/// ///
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound /// > **Note**: If you modify the protocol, modifications will only applies to future inbound
/// > substreams, not the ones already being negotiated. /// > substreams, not the ones already being negotiated.
#[inline] pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<TInbound> {
pub fn listen_protocol_ref(&self) -> &SubstreamProtocol<TInProto> {
&self.listen_protocol &self.listen_protocol
} }
@ -100,26 +94,23 @@ where
/// ///
/// > **Note**: If you modify the protocol, modifications will only applies to future inbound /// > **Note**: If you modify the protocol, modifications will only applies to future inbound
/// > substreams, not the ones already being negotiated. /// > substreams, not the ones already being negotiated.
#[inline] pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<TInbound> {
pub fn listen_protocol_mut(&mut self) -> &mut SubstreamProtocol<TInProto> {
&mut self.listen_protocol &mut self.listen_protocol
} }
/// Opens an outbound substream with `upgrade`. /// Opens an outbound substream with `upgrade`.
#[inline] pub fn send_request(&mut self, upgrade: TOutbound) {
pub fn send_request(&mut self, upgrade: TOutProto) {
self.keep_alive = KeepAlive::Yes; self.keep_alive = KeepAlive::Yes;
self.dial_queue.push(upgrade); self.dial_queue.push(upgrade);
} }
} }
impl<TInProto, TOutProto, TOutEvent> Default impl<TInbound, TOutbound, TEvent> Default
for OneShotHandler<TInProto, TOutProto, TOutEvent> for OneShotHandler<TInbound, TOutbound, TEvent>
where where
TOutProto: OutboundUpgradeSend, TOutbound: OutboundUpgradeSend,
TInProto: InboundUpgradeSend + Default, TInbound: InboundUpgradeSend + Default,
{ {
#[inline]
fn default() -> Self { fn default() -> Self {
OneShotHandler::new( OneShotHandler::new(
SubstreamProtocol::new(Default::default()), SubstreamProtocol::new(Default::default()),
@ -128,45 +119,42 @@ where
} }
} }
impl<TInProto, TOutProto, TOutEvent> ProtocolsHandler impl<TInbound, TOutbound, TEvent> ProtocolsHandler
for OneShotHandler<TInProto, TOutProto, TOutEvent> for OneShotHandler<TInbound, TOutbound, TEvent>
where where
TInProto: InboundUpgradeSend + Send + 'static, TInbound: InboundUpgradeSend + Send + 'static,
TOutProto: OutboundUpgradeSend, TOutbound: OutboundUpgradeSend,
TInProto::Output: Into<TOutEvent>, TInbound::Output: Into<TEvent>,
TOutProto::Output: Into<TOutEvent>, TOutbound::Output: Into<TEvent>,
TOutProto::Error: error::Error + Send + 'static, TOutbound::Error: error::Error + Send + 'static,
SubstreamProtocol<TInProto>: Clone, SubstreamProtocol<TInbound>: Clone,
TOutEvent: Send + 'static, TEvent: Send + 'static,
{ {
type InEvent = TOutProto; type InEvent = TOutbound;
type OutEvent = TOutEvent; type OutEvent = TEvent;
type Error = ProtocolsHandlerUpgrErr< type Error = ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgradeSend>::Error, <Self::OutboundProtocol as OutboundUpgradeSend>::Error,
>; >;
type InboundProtocol = TInProto; type InboundProtocol = TInbound;
type OutboundProtocol = TOutProto; type OutboundProtocol = TOutbound;
type OutboundOpenInfo = (); type OutboundOpenInfo = ();
#[inline]
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> { fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
self.listen_protocol.clone() self.listen_protocol.clone()
} }
#[inline]
fn inject_fully_negotiated_inbound( fn inject_fully_negotiated_inbound(
&mut self, &mut self,
out: <Self::InboundProtocol as InboundUpgradeSend>::Output, out: <Self::InboundProtocol as InboundUpgradeSend>::Output,
) { ) {
// If we're shutting down the connection for inactivity, reset the timeout. // If we're shutting down the connection for inactivity, reset the timeout.
if !self.keep_alive.is_yes() { if !self.keep_alive.is_yes() {
self.keep_alive = KeepAlive::Until(Instant::now() + self.config.inactive_timeout); self.keep_alive = KeepAlive::Until(Instant::now() + self.config.keep_alive_timeout);
} }
self.events_out.push(out.into()); self.events_out.push(out.into());
} }
#[inline]
fn inject_fully_negotiated_outbound( fn inject_fully_negotiated_outbound(
&mut self, &mut self,
out: <Self::OutboundProtocol as OutboundUpgradeSend>::Output, out: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
@ -175,21 +163,19 @@ where
self.dial_negotiated -= 1; self.dial_negotiated -= 1;
if self.dial_negotiated == 0 && self.dial_queue.is_empty() { if self.dial_negotiated == 0 && self.dial_queue.is_empty() {
self.keep_alive = KeepAlive::Until(Instant::now() + self.config.inactive_timeout); self.keep_alive = KeepAlive::Until(Instant::now() + self.config.keep_alive_timeout);
} }
self.events_out.push(out.into()); self.events_out.push(out.into());
} }
#[inline]
fn inject_event(&mut self, event: Self::InEvent) { fn inject_event(&mut self, event: Self::InEvent) {
self.send_request(event); self.send_request(event);
} }
#[inline]
fn inject_dial_upgrade_error( fn inject_dial_upgrade_error(
&mut self, &mut self,
_: Self::OutboundOpenInfo, _info: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr< error: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgradeSend>::Error, <Self::OutboundProtocol as OutboundUpgradeSend>::Error,
>, >,
@ -199,7 +185,6 @@ where
} }
} }
#[inline]
fn connection_keep_alive(&self) -> KeepAlive { fn connection_keep_alive(&self) -> KeepAlive {
self.keep_alive self.keep_alive
} }
@ -211,12 +196,12 @@ where
ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>, ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>,
> { > {
if let Some(err) = self.pending_error.take() { if let Some(err) = self.pending_error.take() {
return Poll::Ready(ProtocolsHandlerEvent::Close(err)); return Poll::Ready(ProtocolsHandlerEvent::Close(err))
} }
if !self.events_out.is_empty() { if !self.events_out.is_empty() {
return Poll::Ready(ProtocolsHandlerEvent::Custom( return Poll::Ready(ProtocolsHandlerEvent::Custom(
self.events_out.remove(0), self.events_out.remove(0)
)); ));
} else { } else {
self.events_out.shrink_to_fit(); self.events_out.shrink_to_fit();
@ -225,10 +210,11 @@ where
if !self.dial_queue.is_empty() { if !self.dial_queue.is_empty() {
if self.dial_negotiated < self.max_dial_negotiated { if self.dial_negotiated < self.max_dial_negotiated {
self.dial_negotiated += 1; self.dial_negotiated += 1;
let upgrade = self.dial_queue.remove(0);
return Poll::Ready( return Poll::Ready(
ProtocolsHandlerEvent::OutboundSubstreamRequest { ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(self.dial_queue.remove(0)) protocol: SubstreamProtocol::new(upgrade)
.with_timeout(self.config.substream_timeout), .with_timeout(self.config.outbound_substream_timeout),
info: (), info: (),
}, },
); );
@ -244,17 +230,18 @@ where
/// Configuration parameters for the `OneShotHandler` /// Configuration parameters for the `OneShotHandler`
#[derive(Debug)] #[derive(Debug)]
pub struct OneShotHandlerConfig { pub struct OneShotHandlerConfig {
/// After the given duration has elapsed, an inactive connection will shutdown. /// Keep-alive timeout for idle connections.
pub inactive_timeout: Duration, pub keep_alive_timeout: Duration,
/// Timeout duration for each newly opened outbound substream. /// Timeout for outbound substream upgrades.
pub substream_timeout: Duration, pub outbound_substream_timeout: Duration,
} }
impl Default for OneShotHandlerConfig { impl Default for OneShotHandlerConfig {
fn default() -> Self { fn default() -> Self {
let inactive_timeout = Duration::from_secs(10); OneShotHandlerConfig {
let substream_timeout = Duration::from_secs(10); keep_alive_timeout: Duration::from_secs(10),
OneShotHandlerConfig { inactive_timeout, substream_timeout } outbound_substream_timeout: Duration::from_secs(10),
}
} }
} }