mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 11:02:12 +00:00
[multistream-select] Fix panic with V1Lazy
(regression) and more convenient transport boxing. (#1783)
* [multistream-select] Fix panic with V1Lazy and add integration tests. Fixes a panic when using the `V1Lazy` negotiation protocol, a regression introduced in https://github.com/libp2p/rust-libp2p/pull/1484. Thereby adds integration tests for a transport upgrade with both `V1` and `V1Lazy` to the `multistream-select` crate to prevent future regressions. * Cleanup. * Update changelog.
This commit is contained in:
parent
2a5c1832a1
commit
3e31ea9337
@ -25,14 +25,13 @@
|
|||||||
//! any desired protocols. The rest of the module defines combinators for
|
//! any desired protocols. The rest of the module defines combinators for
|
||||||
//! modifying a transport through composition with other transports or protocol upgrades.
|
//! modifying a transport through composition with other transports or protocol upgrades.
|
||||||
|
|
||||||
use crate::ConnectedPoint;
|
use crate::{ConnectedPoint, ConnectionInfo, muxing::{StreamMuxer, StreamMuxerBox}};
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
use multiaddr::Multiaddr;
|
use multiaddr::Multiaddr;
|
||||||
use std::{error::Error, fmt};
|
use std::{error::Error, fmt};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub mod and_then;
|
pub mod and_then;
|
||||||
pub mod boxed;
|
|
||||||
pub mod choice;
|
pub mod choice;
|
||||||
pub mod dummy;
|
pub mod dummy;
|
||||||
pub mod map;
|
pub mod map;
|
||||||
@ -41,8 +40,10 @@ pub mod memory;
|
|||||||
pub mod timeout;
|
pub mod timeout;
|
||||||
pub mod upgrade;
|
pub mod upgrade;
|
||||||
|
|
||||||
|
mod boxed;
|
||||||
mod optional;
|
mod optional;
|
||||||
|
|
||||||
|
pub use self::boxed::Boxed;
|
||||||
pub use self::choice::OrTransport;
|
pub use self::choice::OrTransport;
|
||||||
pub use self::memory::MemoryTransport;
|
pub use self::memory::MemoryTransport;
|
||||||
pub use self::optional::OptionalTransport;
|
pub use self::optional::OptionalTransport;
|
||||||
@ -128,14 +129,24 @@ pub trait Transport {
|
|||||||
where
|
where
|
||||||
Self: Sized;
|
Self: Sized;
|
||||||
|
|
||||||
/// Turns the transport into an abstract boxed (i.e. heap-allocated) transport.
|
/// Boxes an authenticated, multiplexed transport, including the
|
||||||
fn boxed(self) -> boxed::Boxed<Self::Output, Self::Error>
|
/// `StreamMuxer` and transport errors.
|
||||||
where Self: Sized + Clone + Send + Sync + 'static,
|
fn boxed<I, M>(self) -> boxed::Boxed<(I, StreamMuxerBox), std::io::Error>
|
||||||
|
where
|
||||||
|
Self: Transport<Output = (I, M)> + Sized + Clone + Send + Sync + 'static,
|
||||||
Self::Dial: Send + 'static,
|
Self::Dial: Send + 'static,
|
||||||
Self::Listener: Send + 'static,
|
Self::Listener: Send + 'static,
|
||||||
Self::ListenerUpgrade: Send + 'static,
|
Self::ListenerUpgrade: Send + 'static,
|
||||||
|
Self::Error: Send + Sync,
|
||||||
|
I: ConnectionInfo,
|
||||||
|
M: StreamMuxer + Send + Sync + 'static,
|
||||||
|
M::Substream: Send + 'static,
|
||||||
|
M::OutboundSubstream: Send + 'static
|
||||||
|
|
||||||
{
|
{
|
||||||
boxed::boxed(self)
|
boxed::boxed(
|
||||||
|
self.map(|(i, m), _| (i, StreamMuxerBox::new(m)))
|
||||||
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Applies a function on the connections created by the transport.
|
/// Applies a function on the connections created by the transport.
|
||||||
|
@ -24,7 +24,6 @@ use multiaddr::Multiaddr;
|
|||||||
use std::{error, fmt, pin::Pin, sync::Arc};
|
use std::{error, fmt, pin::Pin, sync::Arc};
|
||||||
|
|
||||||
/// See the `Transport::boxed` method.
|
/// See the `Transport::boxed` method.
|
||||||
#[inline]
|
|
||||||
pub fn boxed<T>(transport: T) -> Boxed<T::Output, T::Error>
|
pub fn boxed<T>(transport: T) -> Boxed<T::Output, T::Error>
|
||||||
where
|
where
|
||||||
T: Transport + Clone + Send + Sync + 'static,
|
T: Transport + Clone + Send + Sync + 'static,
|
||||||
@ -37,9 +36,14 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Dial<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
|
/// See the `Transport::boxed` method.
|
||||||
pub type Listener<O, E> = Pin<Box<dyn Stream<Item = Result<ListenerEvent<ListenerUpgrade<O, E>, E>, E>> + Send>>;
|
pub struct Boxed<O, E> {
|
||||||
pub type ListenerUpgrade<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
|
inner: Arc<dyn Abstract<O, E> + Send + Sync>,
|
||||||
|
}
|
||||||
|
|
||||||
|
type Dial<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
|
||||||
|
type Listener<O, E> = Pin<Box<dyn Stream<Item = Result<ListenerEvent<ListenerUpgrade<O, E>, E>, E>> + Send>>;
|
||||||
|
type ListenerUpgrade<O, E> = Pin<Box<dyn Future<Output = Result<O, E>> + Send>>;
|
||||||
|
|
||||||
trait Abstract<O, E> {
|
trait Abstract<O, E> {
|
||||||
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O, E>, TransportError<E>>;
|
fn listen_on(&self, addr: Multiaddr) -> Result<Listener<O, E>, TransportError<E>>;
|
||||||
@ -68,11 +72,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// See the `Transport::boxed` method.
|
|
||||||
pub struct Boxed<O, E> {
|
|
||||||
inner: Arc<dyn Abstract<O, E> + Send + Sync>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<O, E> fmt::Debug for Boxed<O, E> {
|
impl<O, E> fmt::Debug for Boxed<O, E> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
write!(f, "BoxedTransport")
|
write!(f, "BoxedTransport")
|
||||||
|
@ -36,22 +36,11 @@ use libp2p_core::{
|
|||||||
use libp2p_noise as noise;
|
use libp2p_noise as noise;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
use std::{io, error::Error, fmt, task::Poll};
|
use std::{io, task::Poll};
|
||||||
use util::TestHandler;
|
use util::TestHandler;
|
||||||
|
|
||||||
type TestNetwork = Network<TestTransport, (), (), TestHandler>;
|
type TestNetwork = Network<TestTransport, (), (), TestHandler>;
|
||||||
type TestTransport = transport::boxed::Boxed<(PeerId, StreamMuxerBox), BoxError>;
|
type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox), io::Error>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct BoxError(Box<dyn Error + Send + 'static>);
|
|
||||||
|
|
||||||
impl Error for BoxError {}
|
|
||||||
|
|
||||||
impl fmt::Display for BoxError {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
write!(f, "Transport error: {}", self.0)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_network(cfg: NetworkConfig) -> TestNetwork {
|
fn new_network(cfg: NetworkConfig) -> TestNetwork {
|
||||||
let local_key = identity::Keypair::generate_ed25519();
|
let local_key = identity::Keypair::generate_ed25519();
|
||||||
@ -61,13 +50,11 @@ fn new_network(cfg: NetworkConfig) -> TestNetwork {
|
|||||||
.upgrade(upgrade::Version::V1)
|
.upgrade(upgrade::Version::V1)
|
||||||
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
||||||
.multiplex(libp2p_mplex::MplexConfig::new())
|
.multiplex(libp2p_mplex::MplexConfig::new())
|
||||||
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)))
|
|
||||||
.and_then(|(peer, mplex), _| {
|
.and_then(|(peer, mplex), _| {
|
||||||
// Gracefully close the connection to allow protocol
|
// Gracefully close the connection to allow protocol
|
||||||
// negotiation to complete.
|
// negotiation to complete.
|
||||||
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
|
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
|
||||||
})
|
})
|
||||||
.map_err(|e| BoxError(Box::new(e)))
|
|
||||||
.boxed();
|
.boxed();
|
||||||
TestNetwork::new(transport, local_public_key.into(), cfg)
|
TestNetwork::new(transport, local_public_key.into(), cfg)
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
# 0.8.3 [unreleased]
|
# 0.8.3 [unreleased]
|
||||||
|
|
||||||
|
- Fix a regression resulting in a panic with the `V1Lazy` protocol.
|
||||||
|
[PR 1783](https://github.com/libp2p/rust-libp2p/pull/1783).
|
||||||
|
|
||||||
- Fix a potential deadlock during protocol negotiation due
|
- Fix a potential deadlock during protocol negotiation due
|
||||||
to a missing flush, potentially resulting in sporadic protocol
|
to a missing flush, potentially resulting in sporadic protocol
|
||||||
upgrade timeouts.
|
upgrade timeouts.
|
||||||
|
@ -19,6 +19,10 @@ unsigned-varint = "0.5"
|
|||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
async-std = "1.6.2"
|
async-std = "1.6.2"
|
||||||
|
env_logger = "*"
|
||||||
|
libp2p-core = { path = "../../core" }
|
||||||
|
libp2p-mplex = { path = "../../muxers/mplex" }
|
||||||
|
libp2p-plaintext = { path = "../../protocols/plaintext" }
|
||||||
quickcheck = "0.9.0"
|
quickcheck = "0.9.0"
|
||||||
rand = "0.7.2"
|
rand = "0.7.2"
|
||||||
rw-stream-sink = "0.2.1"
|
rw-stream-sink = "0.2.1"
|
||||||
|
@ -127,6 +127,7 @@ impl<TInner> Negotiated<TInner> {
|
|||||||
|
|
||||||
if let Message::Header(v) = &msg {
|
if let Message::Header(v) = &msg {
|
||||||
if *v == version {
|
if *v == version {
|
||||||
|
*this.state = State::Expecting { io, protocol, version };
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
134
misc/multistream-select/tests/transport.rs
Normal file
134
misc/multistream-select/tests/transport.rs
Normal file
@ -0,0 +1,134 @@
|
|||||||
|
// Copyright 2020 Parity Technologies (UK) Ltd.
|
||||||
|
//
|
||||||
|
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||||
|
// copy of this software and associated documentation files (the "Software"),
|
||||||
|
// to deal in the Software without restriction, including without limitation
|
||||||
|
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||||
|
// and/or sell copies of the Software, and to permit persons to whom the
|
||||||
|
// Software is furnished to do so, subject to the following conditions:
|
||||||
|
//
|
||||||
|
// The above copyright notice and this permission notice shall be included in
|
||||||
|
// all copies or substantial portions of the Software.
|
||||||
|
//
|
||||||
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||||
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||||
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
|
use libp2p_core::{
|
||||||
|
connection::{ConnectionHandler, ConnectionHandlerEvent, Substream, SubstreamEndpoint},
|
||||||
|
identity,
|
||||||
|
muxing::StreamMuxerBox,
|
||||||
|
upgrade,
|
||||||
|
multiaddr::Protocol,
|
||||||
|
Multiaddr,
|
||||||
|
Network,
|
||||||
|
network::{NetworkEvent, NetworkConfig},
|
||||||
|
PeerId,
|
||||||
|
Transport,
|
||||||
|
transport::{self, MemoryTransport}
|
||||||
|
};
|
||||||
|
use libp2p_mplex::MplexConfig;
|
||||||
|
use libp2p_plaintext::PlainText2Config;
|
||||||
|
use futures::{channel::oneshot, ready, prelude::*};
|
||||||
|
use rand::random;
|
||||||
|
use std::{io, task::{Context, Poll}};
|
||||||
|
|
||||||
|
type TestTransport = transport::Boxed<(PeerId, StreamMuxerBox), io::Error>;
|
||||||
|
type TestNetwork = Network<TestTransport, (), (), TestHandler>;
|
||||||
|
|
||||||
|
fn mk_transport(up: upgrade::Version) -> (PeerId, TestTransport) {
|
||||||
|
let keys = identity::Keypair::generate_ed25519();
|
||||||
|
let id = keys.public().into_peer_id();
|
||||||
|
(id, MemoryTransport::default()
|
||||||
|
.upgrade(up)
|
||||||
|
.authenticate(PlainText2Config { local_public_key: keys.public() })
|
||||||
|
.multiplex(MplexConfig::default())
|
||||||
|
.boxed())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tests the transport upgrade process with all supported
|
||||||
|
/// upgrade protocol versions.
|
||||||
|
#[test]
|
||||||
|
fn transport_upgrade() {
|
||||||
|
let _ = env_logger::try_init();
|
||||||
|
|
||||||
|
fn run(up: upgrade::Version) {
|
||||||
|
let (dialer_id, dialer_transport) = mk_transport(up);
|
||||||
|
let (listener_id, listener_transport) = mk_transport(up);
|
||||||
|
|
||||||
|
let listen_addr = Multiaddr::from(Protocol::Memory(random::<u64>()));
|
||||||
|
|
||||||
|
let mut dialer = TestNetwork::new(dialer_transport, dialer_id, NetworkConfig::default());
|
||||||
|
let mut listener = TestNetwork::new(listener_transport, listener_id, NetworkConfig::default());
|
||||||
|
|
||||||
|
listener.listen_on(listen_addr).unwrap();
|
||||||
|
let (addr_sender, addr_receiver) = oneshot::channel();
|
||||||
|
|
||||||
|
let client = async move {
|
||||||
|
let addr = addr_receiver.await.unwrap();
|
||||||
|
dialer.dial(&addr, TestHandler()).unwrap();
|
||||||
|
futures::future::poll_fn(move |cx| {
|
||||||
|
loop {
|
||||||
|
match ready!(dialer.poll(cx)) {
|
||||||
|
NetworkEvent::ConnectionEstablished { .. } => {
|
||||||
|
return Poll::Ready(())
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).await
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut addr_sender = Some(addr_sender);
|
||||||
|
let server = futures::future::poll_fn(move |cx| {
|
||||||
|
loop {
|
||||||
|
match ready!(listener.poll(cx)) {
|
||||||
|
NetworkEvent::NewListenerAddress { listen_addr, .. } => {
|
||||||
|
addr_sender.take().unwrap().send(listen_addr).unwrap();
|
||||||
|
}
|
||||||
|
NetworkEvent::IncomingConnection { connection, .. } => {
|
||||||
|
listener.accept(connection, TestHandler()).unwrap();
|
||||||
|
}
|
||||||
|
NetworkEvent::ConnectionEstablished { .. } => {
|
||||||
|
return Poll::Ready(())
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
async_std::task::block_on(future::select(Box::pin(server), Box::pin(client)));
|
||||||
|
}
|
||||||
|
|
||||||
|
run(upgrade::Version::V1);
|
||||||
|
run(upgrade::Version::V1Lazy);
|
||||||
|
}
|
||||||
|
|
||||||
|
struct TestHandler();
|
||||||
|
|
||||||
|
impl ConnectionHandler for TestHandler {
|
||||||
|
type InEvent = ();
|
||||||
|
type OutEvent = ();
|
||||||
|
type Error = io::Error;
|
||||||
|
type Substream = Substream<StreamMuxerBox>;
|
||||||
|
type OutboundOpenInfo = ();
|
||||||
|
|
||||||
|
fn inject_substream(&mut self, _: Self::Substream, _: SubstreamEndpoint<Self::OutboundOpenInfo>)
|
||||||
|
{}
|
||||||
|
|
||||||
|
fn inject_event(&mut self, _: Self::InEvent)
|
||||||
|
{}
|
||||||
|
|
||||||
|
fn inject_address_change(&mut self, _: &Multiaddr)
|
||||||
|
{}
|
||||||
|
|
||||||
|
fn poll(&mut self, _: &mut Context<'_>)
|
||||||
|
-> Poll<Result<ConnectionHandlerEvent<Self::OutboundOpenInfo, Self::OutEvent>, Self::Error>>
|
||||||
|
{
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
@ -23,14 +23,13 @@ use log::debug;
|
|||||||
use quickcheck::{QuickCheck, TestResult};
|
use quickcheck::{QuickCheck, TestResult};
|
||||||
use rand::{random, seq::SliceRandom, SeedableRng};
|
use rand::{random, seq::SliceRandom, SeedableRng};
|
||||||
use std::{
|
use std::{
|
||||||
io::Error,
|
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use libp2p_core::{
|
use libp2p_core::{
|
||||||
identity, multiaddr::Protocol, muxing::StreamMuxerBox, transport::MemoryTransport, upgrade,
|
identity, multiaddr::Protocol, transport::MemoryTransport, upgrade,
|
||||||
Multiaddr, Transport,
|
Multiaddr, Transport,
|
||||||
};
|
};
|
||||||
use libp2p_gossipsub::{
|
use libp2p_gossipsub::{
|
||||||
@ -151,10 +150,7 @@ fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
|
|||||||
.authenticate(PlainText2Config {
|
.authenticate(PlainText2Config {
|
||||||
local_public_key: public_key.clone(),
|
local_public_key: public_key.clone(),
|
||||||
})
|
})
|
||||||
.multiplex(yamux::Config::default())
|
.multiplex(yamux::Config::default());
|
||||||
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
|
|
||||||
.map_err(|e| -> Error { panic!("Failed to create transport: {:?}", e) })
|
|
||||||
.boxed();
|
|
||||||
|
|
||||||
let peer_id = public_key.clone().into_peer_id();
|
let peer_id = public_key.clone().into_peer_id();
|
||||||
|
|
||||||
|
@ -38,7 +38,6 @@ use libp2p_core::{
|
|||||||
identity,
|
identity,
|
||||||
transport::MemoryTransport,
|
transport::MemoryTransport,
|
||||||
multiaddr::{Protocol, Multiaddr, multiaddr},
|
multiaddr::{Protocol, Multiaddr, multiaddr},
|
||||||
muxing::StreamMuxerBox,
|
|
||||||
upgrade
|
upgrade
|
||||||
};
|
};
|
||||||
use libp2p_noise as noise;
|
use libp2p_noise as noise;
|
||||||
@ -46,7 +45,7 @@ use libp2p_swarm::Swarm;
|
|||||||
use libp2p_yamux as yamux;
|
use libp2p_yamux as yamux;
|
||||||
use quickcheck::*;
|
use quickcheck::*;
|
||||||
use rand::{Rng, random, thread_rng, rngs::StdRng, SeedableRng};
|
use rand::{Rng, random, thread_rng, rngs::StdRng, SeedableRng};
|
||||||
use std::{collections::{HashSet, HashMap}, time::Duration, io, num::NonZeroUsize, u64};
|
use std::{collections::{HashSet, HashMap}, time::Duration, num::NonZeroUsize, u64};
|
||||||
use multihash::{wrap, Code, Multihash};
|
use multihash::{wrap, Code, Multihash};
|
||||||
|
|
||||||
type TestSwarm = Swarm<Kademlia<MemoryStore>>;
|
type TestSwarm = Swarm<Kademlia<MemoryStore>>;
|
||||||
@ -62,10 +61,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) {
|
|||||||
let transport = MemoryTransport::default()
|
let transport = MemoryTransport::default()
|
||||||
.upgrade(upgrade::Version::V1)
|
.upgrade(upgrade::Version::V1)
|
||||||
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
||||||
.multiplex(yamux::Config::default())
|
.multiplex(yamux::Config::default());
|
||||||
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
|
|
||||||
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
|
|
||||||
.boxed();
|
|
||||||
|
|
||||||
let local_id = local_public_key.clone().into_peer_id();
|
let local_id = local_public_key.clone().into_peer_id();
|
||||||
let store = MemoryStore::new(local_id.clone());
|
let store = MemoryStore::new(local_id.clone());
|
||||||
|
@ -25,7 +25,7 @@ use libp2p_core::{
|
|||||||
PeerId,
|
PeerId,
|
||||||
identity,
|
identity,
|
||||||
muxing::StreamMuxerBox,
|
muxing::StreamMuxerBox,
|
||||||
transport::{Transport, boxed::Boxed},
|
transport::{self, Transport},
|
||||||
upgrade
|
upgrade
|
||||||
};
|
};
|
||||||
use libp2p_mplex as mplex;
|
use libp2p_mplex as mplex;
|
||||||
@ -196,7 +196,7 @@ fn max_failures() {
|
|||||||
|
|
||||||
fn mk_transport(muxer: MuxerChoice) -> (
|
fn mk_transport(muxer: MuxerChoice) -> (
|
||||||
PeerId,
|
PeerId,
|
||||||
Boxed<
|
transport::Boxed<
|
||||||
(PeerId, StreamMuxerBox),
|
(PeerId, StreamMuxerBox),
|
||||||
io::Error
|
io::Error
|
||||||
>
|
>
|
||||||
@ -204,8 +204,7 @@ fn mk_transport(muxer: MuxerChoice) -> (
|
|||||||
let id_keys = identity::Keypair::generate_ed25519();
|
let id_keys = identity::Keypair::generate_ed25519();
|
||||||
let peer_id = id_keys.public().into_peer_id();
|
let peer_id = id_keys.public().into_peer_id();
|
||||||
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();
|
let noise_keys = noise::Keypair::<noise::X25519Spec>::new().into_authentic(&id_keys).unwrap();
|
||||||
|
(peer_id, TcpConfig::new()
|
||||||
let transport = TcpConfig::new()
|
|
||||||
.nodelay(true)
|
.nodelay(true)
|
||||||
.upgrade(upgrade::Version::V1)
|
.upgrade(upgrade::Version::V1)
|
||||||
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
||||||
@ -215,11 +214,7 @@ fn mk_transport(muxer: MuxerChoice) -> (
|
|||||||
MuxerChoice::Mplex =>
|
MuxerChoice::Mplex =>
|
||||||
upgrade::EitherUpgrade::B(mplex::MplexConfig::default()),
|
upgrade::EitherUpgrade::B(mplex::MplexConfig::default()),
|
||||||
})
|
})
|
||||||
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
|
.boxed())
|
||||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
|
||||||
.boxed();
|
|
||||||
|
|
||||||
(peer_id, transport)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Copy, Clone)]
|
#[derive(Debug, Copy, Clone)]
|
||||||
|
@ -26,7 +26,7 @@ use libp2p_core::{
|
|||||||
PeerId,
|
PeerId,
|
||||||
identity,
|
identity,
|
||||||
muxing::StreamMuxerBox,
|
muxing::StreamMuxerBox,
|
||||||
transport::{Transport, boxed::Boxed},
|
transport::{self, Transport},
|
||||||
upgrade::{self, read_one, write_one}
|
upgrade::{self, read_one, write_one}
|
||||||
};
|
};
|
||||||
use libp2p_noise::{NoiseConfig, X25519Spec, Keypair};
|
use libp2p_noise::{NoiseConfig, X25519Spec, Keypair};
|
||||||
@ -213,19 +213,16 @@ fn ping_protocol_throttled() {
|
|||||||
let () = async_std::task::block_on(peer2);
|
let () = async_std::task::block_on(peer2);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn mk_transport() -> (PeerId, Boxed<(PeerId, StreamMuxerBox), io::Error>) {
|
fn mk_transport() -> (PeerId, transport::Boxed<(PeerId, StreamMuxerBox), io::Error>) {
|
||||||
let id_keys = identity::Keypair::generate_ed25519();
|
let id_keys = identity::Keypair::generate_ed25519();
|
||||||
let peer_id = id_keys.public().into_peer_id();
|
let peer_id = id_keys.public().into_peer_id();
|
||||||
let noise_keys = Keypair::<X25519Spec>::new().into_authentic(&id_keys).unwrap();
|
let noise_keys = Keypair::<X25519Spec>::new().into_authentic(&id_keys).unwrap();
|
||||||
let transport = TcpConfig::new()
|
(peer_id, TcpConfig::new()
|
||||||
.nodelay(true)
|
.nodelay(true)
|
||||||
.upgrade(upgrade::Version::V1)
|
.upgrade(upgrade::Version::V1)
|
||||||
.authenticate(NoiseConfig::xx(noise_keys).into_authenticated())
|
.authenticate(NoiseConfig::xx(noise_keys).into_authenticated())
|
||||||
.multiplex(libp2p_yamux::Config::default())
|
.multiplex(libp2p_yamux::Config::default())
|
||||||
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
|
.boxed())
|
||||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
|
||||||
.boxed();
|
|
||||||
(peer_id, transport)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Simple Ping-Pong Protocol
|
// Simple Ping-Pong Protocol
|
||||||
|
@ -303,8 +303,8 @@ pub fn build_tcp_ws_noise_mplex_yamux(keypair: identity::Keypair)
|
|||||||
.upgrade(core::upgrade::Version::V1)
|
.upgrade(core::upgrade::Version::V1)
|
||||||
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
||||||
.multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()))
|
.multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()))
|
||||||
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
|
.timeout(std::time::Duration::from_secs(20))
|
||||||
.timeout(std::time::Duration::from_secs(20)))
|
.boxed())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Builds an implementation of `Transport` that is suitable for usage with the `Swarm`.
|
/// Builds an implementation of `Transport` that is suitable for usage with the `Swarm`.
|
||||||
@ -335,6 +335,6 @@ pub fn build_tcp_ws_pnet_noise_mplex_yamux(keypair: identity::Keypair, psk: PreS
|
|||||||
.upgrade(core::upgrade::Version::V1)
|
.upgrade(core::upgrade::Version::V1)
|
||||||
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
||||||
.multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()))
|
.multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new()))
|
||||||
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
|
.timeout(std::time::Duration::from_secs(20))
|
||||||
.timeout(std::time::Duration::from_secs(20)))
|
.boxed())
|
||||||
}
|
}
|
||||||
|
@ -110,7 +110,7 @@ use libp2p_core::{
|
|||||||
PendingConnectionError,
|
PendingConnectionError,
|
||||||
Substream
|
Substream
|
||||||
},
|
},
|
||||||
transport::{TransportError, boxed::Boxed as BoxTransport},
|
transport::{self, TransportError},
|
||||||
muxing::{StreamMuxer, StreamMuxerBox},
|
muxing::{StreamMuxer, StreamMuxerBox},
|
||||||
network::{
|
network::{
|
||||||
Network,
|
Network,
|
||||||
@ -261,7 +261,7 @@ where
|
|||||||
TConnInfo: ConnectionInfo<PeerId = PeerId>,
|
TConnInfo: ConnectionInfo<PeerId = PeerId>,
|
||||||
{
|
{
|
||||||
network: Network<
|
network: Network<
|
||||||
BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>,
|
transport::Boxed<(TConnInfo, StreamMuxerBox), io::Error>,
|
||||||
TInEvent,
|
TInEvent,
|
||||||
TOutEvent,
|
TOutEvent,
|
||||||
NodeHandlerWrapperBuilder<THandler>,
|
NodeHandlerWrapperBuilder<THandler>,
|
||||||
@ -972,7 +972,7 @@ impl<'a> PollParameters for SwarmPollParameters<'a> {
|
|||||||
/// including the underlying [`Network`].
|
/// including the underlying [`Network`].
|
||||||
pub struct SwarmBuilder<TBehaviour, TConnInfo> {
|
pub struct SwarmBuilder<TBehaviour, TConnInfo> {
|
||||||
local_peer_id: PeerId,
|
local_peer_id: PeerId,
|
||||||
transport: BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>,
|
transport: transport::Boxed<(TConnInfo, StreamMuxerBox), io::Error>,
|
||||||
behaviour: TBehaviour,
|
behaviour: TBehaviour,
|
||||||
network_config: NetworkConfig,
|
network_config: NetworkConfig,
|
||||||
}
|
}
|
||||||
@ -996,14 +996,9 @@ where TBehaviour: NetworkBehaviour,
|
|||||||
TTrans::ListenerUpgrade: Send + 'static,
|
TTrans::ListenerUpgrade: Send + 'static,
|
||||||
TTrans::Dial: Send + 'static,
|
TTrans::Dial: Send + 'static,
|
||||||
{
|
{
|
||||||
let transport = transport
|
|
||||||
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)))
|
|
||||||
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
|
|
||||||
.boxed();
|
|
||||||
|
|
||||||
SwarmBuilder {
|
SwarmBuilder {
|
||||||
local_peer_id,
|
local_peer_id,
|
||||||
transport,
|
transport: transport.boxed(),
|
||||||
behaviour,
|
behaviour,
|
||||||
network_config: Default::default(),
|
network_config: Default::default(),
|
||||||
}
|
}
|
||||||
@ -1220,10 +1215,7 @@ mod tests {
|
|||||||
let transport = transport::MemoryTransport::default()
|
let transport = transport::MemoryTransport::default()
|
||||||
.upgrade(upgrade::Version::V1)
|
.upgrade(upgrade::Version::V1)
|
||||||
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
|
||||||
.multiplex(libp2p_mplex::MplexConfig::new())
|
.multiplex(libp2p_mplex::MplexConfig::new());
|
||||||
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
|
|
||||||
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
|
|
||||||
.boxed();
|
|
||||||
let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
|
let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
|
||||||
SwarmBuilder::new(transport, behaviour, pubkey.into()).build()
|
SwarmBuilder::new(transport, behaviour, pubkey.into()).build()
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user