mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-17 21:31:20 +00:00
`Transport::listen_on` is an asynchronous operation. It returns immediately but the actual process of establishing a listening socket happens as part of `Transport::poll` which will return one or more `TransportEvent`s related to a particular `listen_on` call. Currently, `listen_on` returns a `ListenerId` which allows the user of the `Transport` interface to correlate the events with a particular `listen_on` call. This "user" is the `Swarm` runtime. Currently, a user of libp2p establishes a new listening socket by talking to the `Swarm::listen_on` interface and it is not possible to do the same thing via the `NetworkBehaviour` trait. Within the `NetworkBehaviour` trait, we emit _commands_ to the `Swarm` like `ToSwarm::Dial`. These commands don't have a "return value" like a synchronous function does and thus, if we were to add a `ToSwarm::ListenOn` command, it could not receive the `ListenerId` from the `Transport`. To fix this and to be consistent with our [coding guidelines](https://github.com/libp2p/rust-libp2p/blob/master/docs/coding-guidelines.md#allow-correlating-asynchronous-responses-to-their-requests) we change the interface of `Transport::listen_on` to require the user to pass in a `ListenerId`. This will allow us to construct a command in a `NetworkBehaviour` that remembers this ID which enables precise tracking of which events containing a `ListenerId` correlate which a particular `listen_on` command. This is especially important in the context of listening on wildcard addresses like `0.0.0.0` because we end up binding to multiple network interfaces and thus emit multiple events for a single `listen_on` call. Pull-Request: #3567.
126 lines
4.6 KiB
Rust
126 lines
4.6 KiB
Rust
// Copyright 2018 Parity Technologies (UK) Ltd.
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
|
// copy of this software and associated documentation files (the "Software"),
|
|
// to deal in the Software without restriction, including without limitation
|
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
|
// and/or sell copies of the Software, and to permit persons to whom the
|
|
// Software is furnished to do so, subject to the following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included in
|
|
// all copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
|
// DEALINGS IN THE SOFTWARE.
|
|
|
|
use futures::prelude::*;
|
|
use instant::Instant;
|
|
use libp2p_swarm::StreamProtocol;
|
|
use rand::{distributions, prelude::*};
|
|
use std::{io, time::Duration};
|
|
|
|
pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/ipfs/ping/1.0.0");
|
|
|
|
/// The `Ping` protocol upgrade.
|
|
///
|
|
/// The ping protocol sends 32 bytes of random data in configurable
|
|
/// intervals over a single outbound substream, expecting to receive
|
|
/// the same bytes as a response. At the same time, incoming pings
|
|
/// on inbound substreams are answered by sending back the received bytes.
|
|
///
|
|
/// At most a single inbound and outbound substream is kept open at
|
|
/// any time. In case of a ping timeout or another error on a substream, the
|
|
/// substream is dropped. If a configurable number of consecutive
|
|
/// outbound pings fail, the connection is closed.
|
|
///
|
|
/// Successful pings report the round-trip time.
|
|
///
|
|
/// > **Note**: The round-trip time of a ping may be subject to delays induced
|
|
/// > by the underlying transport, e.g. in the case of TCP there is
|
|
/// > Nagle's algorithm, delayed acks and similar configuration options
|
|
/// > which can affect latencies especially on otherwise low-volume
|
|
/// > connections.
|
|
#[derive(Default, Debug, Copy, Clone)]
|
|
pub(crate) struct Ping;
|
|
const PING_SIZE: usize = 32;
|
|
|
|
/// Sends a ping and waits for the pong.
|
|
pub(crate) async fn send_ping<S>(mut stream: S) -> io::Result<(S, Duration)>
|
|
where
|
|
S: AsyncRead + AsyncWrite + Unpin,
|
|
{
|
|
let payload: [u8; PING_SIZE] = thread_rng().sample(distributions::Standard);
|
|
stream.write_all(&payload).await?;
|
|
stream.flush().await?;
|
|
let started = Instant::now();
|
|
let mut recv_payload = [0u8; PING_SIZE];
|
|
stream.read_exact(&mut recv_payload).await?;
|
|
if recv_payload == payload {
|
|
Ok((stream, started.elapsed()))
|
|
} else {
|
|
Err(io::Error::new(
|
|
io::ErrorKind::InvalidData,
|
|
"Ping payload mismatch",
|
|
))
|
|
}
|
|
}
|
|
|
|
/// Waits for a ping and sends a pong.
|
|
pub(crate) async fn recv_ping<S>(mut stream: S) -> io::Result<S>
|
|
where
|
|
S: AsyncRead + AsyncWrite + Unpin,
|
|
{
|
|
let mut payload = [0u8; PING_SIZE];
|
|
stream.read_exact(&mut payload).await?;
|
|
stream.write_all(&payload).await?;
|
|
stream.flush().await?;
|
|
Ok(stream)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use futures::StreamExt;
|
|
use libp2p_core::{
|
|
multiaddr::multiaddr,
|
|
transport::{memory::MemoryTransport, ListenerId, Transport},
|
|
};
|
|
use rand::{thread_rng, Rng};
|
|
use std::time::Duration;
|
|
|
|
#[test]
|
|
fn ping_pong() {
|
|
let mem_addr = multiaddr![Memory(thread_rng().gen::<u64>())];
|
|
let mut transport = MemoryTransport::new().boxed();
|
|
transport.listen_on(ListenerId::next(), mem_addr).unwrap();
|
|
|
|
let listener_addr = transport
|
|
.select_next_some()
|
|
.now_or_never()
|
|
.and_then(|ev| ev.into_new_address())
|
|
.expect("MemoryTransport not listening on an address!");
|
|
|
|
async_std::task::spawn(async move {
|
|
let transport_event = transport.next().await.unwrap();
|
|
let (listener_upgrade, _) = transport_event.into_incoming().unwrap();
|
|
let conn = listener_upgrade.await.unwrap();
|
|
recv_ping(conn).await.unwrap();
|
|
});
|
|
|
|
async_std::task::block_on(async move {
|
|
let c = MemoryTransport::new()
|
|
.dial(listener_addr)
|
|
.unwrap()
|
|
.await
|
|
.unwrap();
|
|
let (_, rtt) = send_ping(c).await.unwrap();
|
|
assert!(rtt > Duration::from_secs(0));
|
|
});
|
|
}
|
|
}
|