Simplify trait bounds on NetworkBehaviour (#1405)

* Simplify trait bounds requirements

* More work

* Moar

* Finish

* Fix final tests

* More simplification

* Use separate traits for Inbound/Outbound

* Update gossipsub and remove warnings

* Add documentation to swarm

* Remove BoxSubstream

* Fix tests not compiling

* Fix stack overflow

* Address concerns

* For some reason my IDE ignored libp2p-kad
This commit is contained in:
Pierre Krieger 2020-02-07 16:29:30 +01:00 committed by GitHub
parent 69852a580b
commit 1eff4b9823
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 580 additions and 652 deletions

View File

@ -24,8 +24,9 @@ use futures::prelude::*;
use libp2p_core::identity;
use libp2p_core::multiaddr::multiaddr;
use libp2p_core::nodes::network::{Network, NetworkEvent, NetworkReachError, PeerState, UnknownPeerDialErr, IncomingError};
use libp2p_core::{PeerId, Transport, upgrade};
use libp2p_core::{muxing::StreamMuxerBox, PeerId, Transport, upgrade};
use libp2p_swarm::{
NegotiatedSubstream,
ProtocolsHandler,
KeepAlive,
SubstreamProtocol,
@ -37,22 +38,13 @@ use rand::seq::SliceRandom;
use std::{io, task::Context, task::Poll};
// TODO: replace with DummyProtocolsHandler after https://github.com/servo/rust-smallvec/issues/139 ?
struct TestHandler<TSubstream>(std::marker::PhantomData<TSubstream>);
#[derive(Default)]
struct TestHandler;
impl<TSubstream> Default for TestHandler<TSubstream> {
fn default() -> Self {
TestHandler(std::marker::PhantomData)
}
}
impl<TSubstream> ProtocolsHandler for TestHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static
{
impl ProtocolsHandler for TestHandler {
type InEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
type OutEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
type Error = io::Error;
type Substream = TSubstream;
type InboundProtocol = upgrade::DeniedUpgrade;
type OutboundProtocol = upgrade::DeniedUpgrade;
type OutboundOpenInfo = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
@ -63,12 +55,12 @@ where
fn inject_fully_negotiated_inbound(
&mut self,
_: <Self::InboundProtocol as upgrade::InboundUpgrade<Self::Substream>>::Output
_: <Self::InboundProtocol as upgrade::InboundUpgrade<NegotiatedSubstream>>::Output
) { panic!() }
fn inject_fully_negotiated_outbound(
&mut self,
_: <Self::OutboundProtocol as upgrade::OutboundUpgrade<Self::Substream>>::Output,
_: <Self::OutboundProtocol as upgrade::OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo
) { panic!() }
@ -76,7 +68,7 @@ where
panic!()
}
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as upgrade::OutboundUpgrade<Self::Substream>>::Error>) {
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as upgrade::OutboundUpgrade<NegotiatedSubstream>>::Error>) {
}
@ -91,13 +83,14 @@ where
fn deny_incoming_connec() {
// Checks whether refusing an incoming connection on a swarm triggers the correct events.
let mut swarm1: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler<_>>, _, _> = {
let mut swarm1: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler>, _, _> = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), None)
};
@ -107,7 +100,8 @@ fn deny_incoming_connec() {
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), None)
};
@ -176,7 +170,8 @@ fn dial_self() {
// Gracefully close the connection to allow protocol
// negotiation to complete.
util::CloseMuxer::new(mplex).map_ok(move |mplex| (peer, mplex))
});
})
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), None)
};
@ -241,13 +236,14 @@ fn dial_self_by_id() {
// Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
// place.
let mut swarm: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler<_>>, _, _> = {
let mut swarm: Network<_, _, _, NodeHandlerWrapperBuilder<TestHandler>, _, _> = {
let local_key = identity::Keypair::generate_ed25519();
let local_public_key = local_key.public();
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), None)
};
@ -265,7 +261,8 @@ fn multiple_addresses_err() {
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into(), None)
};

View File

@ -19,10 +19,11 @@
// DEALINGS IN THE SOFTWARE.
use futures::prelude::*;
use libp2p_core::{identity, upgrade, Transport};
use libp2p_core::{identity, muxing::StreamMuxerBox, upgrade, Transport};
use libp2p_core::nodes::{Network, NetworkEvent, Peer};
use libp2p_core::nodes::network::IncomingError;
use libp2p_swarm::{
NegotiatedSubstream,
ProtocolsHandler,
KeepAlive,
SubstreamProtocol,
@ -32,22 +33,13 @@ use libp2p_swarm::{
use std::{io, task::Context, task::Poll, time::Duration};
use wasm_timer::Delay;
struct TestHandler<TSubstream>(std::marker::PhantomData<TSubstream>);
#[derive(Default)]
struct TestHandler;
impl<TSubstream> Default for TestHandler<TSubstream> {
fn default() -> Self {
TestHandler(std::marker::PhantomData)
}
}
impl<TSubstream> ProtocolsHandler for TestHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static
{
impl ProtocolsHandler for TestHandler {
type InEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
type OutEvent = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
type Error = io::Error;
type Substream = TSubstream;
type InboundProtocol = upgrade::DeniedUpgrade;
type OutboundProtocol = upgrade::DeniedUpgrade;
type OutboundOpenInfo = (); // TODO: cannot be Void (https://github.com/servo/rust-smallvec/issues/139)
@ -58,12 +50,12 @@ where
fn inject_fully_negotiated_inbound(
&mut self,
_: <Self::InboundProtocol as upgrade::InboundUpgrade<Self::Substream>>::Output
_: <Self::InboundProtocol as upgrade::InboundUpgrade<NegotiatedSubstream>>::Output
) { panic!() }
fn inject_fully_negotiated_outbound(
&mut self,
_: <Self::OutboundProtocol as upgrade::OutboundUpgrade<Self::Substream>>::Output,
_: <Self::OutboundProtocol as upgrade::OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo
) { panic!() }
@ -71,7 +63,7 @@ where
panic!()
}
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as upgrade::OutboundUpgrade<Self::Substream>>::Error>) {
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as upgrade::OutboundUpgrade<NegotiatedSubstream>>::Error>) {
}
@ -109,7 +101,8 @@ fn raw_swarm_simultaneous_connect() {
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into_peer_id(), None)
};
@ -119,7 +112,8 @@ fn raw_swarm_simultaneous_connect() {
let transport = libp2p_tcp::TcpConfig::new()
.upgrade(upgrade::Version::V1Lazy)
.authenticate(libp2p_secio::SecioConfig::new(local_key))
.multiplex(libp2p_mplex::MplexConfig::new());
.multiplex(libp2p_mplex::MplexConfig::new())
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)));
Network::new(transport, local_public_key.into_peer_id(), None)
};

View File

@ -82,9 +82,9 @@ fn main() -> Result<(), Box<dyn Error>> {
// Use the derive to generate delegating NetworkBehaviour impl and require the
// NetworkBehaviourEventProcess implementations below.
#[derive(NetworkBehaviour)]
struct MyBehaviour<TSubstream: AsyncRead + AsyncWrite> {
floodsub: Floodsub<TSubstream>,
mdns: Mdns<TSubstream>,
struct MyBehaviour {
floodsub: Floodsub,
mdns: Mdns,
// Struct fields which do not implement NetworkBehaviour need to be ignored
#[behaviour(ignore)]
@ -92,7 +92,7 @@ fn main() -> Result<(), Box<dyn Error>> {
ignored_member: bool,
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<FloodsubEvent> for MyBehaviour<TSubstream> {
impl NetworkBehaviourEventProcess<FloodsubEvent> for MyBehaviour {
// Called when `floodsub` produces an event.
fn inject_event(&mut self, message: FloodsubEvent) {
if let FloodsubEvent::Message(message) = message {
@ -101,7 +101,7 @@ fn main() -> Result<(), Box<dyn Error>> {
}
}
impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour<TSubstream> {
impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
// Called when `mdns` produces an event.
fn inject_event(&mut self, event: MdnsEvent) {
match event {

View File

@ -56,15 +56,12 @@ fn main() -> Result<(), Box<dyn Error>> {
// We create a custom network behaviour that combines Kademlia and mDNS.
#[derive(NetworkBehaviour)]
struct MyBehaviour<TSubstream: AsyncRead + AsyncWrite> {
kademlia: Kademlia<TSubstream, MemoryStore>,
mdns: Mdns<TSubstream>
struct MyBehaviour {
kademlia: Kademlia<MemoryStore>,
mdns: Mdns
}
impl<T> NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour<T>
where
T: AsyncRead + AsyncWrite
{
impl NetworkBehaviourEventProcess<MdnsEvent> for MyBehaviour {
// Called when `mdns` produces an event.
fn inject_event(&mut self, event: MdnsEvent) {
if let MdnsEvent::Discovered(list) = event {
@ -75,10 +72,7 @@ fn main() -> Result<(), Box<dyn Error>> {
}
}
impl<T> NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour<T>
where
T: AsyncRead + AsyncWrite
{
impl NetworkBehaviourEventProcess<KademliaEvent> for MyBehaviour {
// Called when `kademlia` produces an event.
fn inject_event(&mut self, message: KademliaEvent) {
match message {
@ -153,10 +147,7 @@ fn main() -> Result<(), Box<dyn Error>> {
}))
}
fn handle_input_line<T>(kademlia: &mut Kademlia<T, MemoryStore>, line: String)
where
T: AsyncRead + AsyncWrite
{
fn handle_input_line(kademlia: &mut Kademlia<MemoryStore>, line: String) {
let mut args = line.split(" ");
match args.next() {

View File

@ -59,17 +59,6 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
let connected_point = quote!{::libp2p::core::ConnectedPoint};
let listener_id = quote!{::libp2p::core::nodes::ListenerId};
// Name of the type parameter that represents the substream.
let substream_generic = {
let mut n = "TSubstream".to_string();
// Avoid collisions.
while ast.generics.type_params().any(|tp| tp.ident == n) {
n.push('1');
}
let n = Ident::new(&n, name.span());
quote!{#n}
};
let poll_parameters = quote!{::libp2p::swarm::PollParameters};
// Build the generics.
@ -77,30 +66,22 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
let tp = ast.generics.type_params();
let lf = ast.generics.lifetimes();
let cst = ast.generics.const_params();
quote!{<#(#lf,)* #(#tp,)* #(#cst,)* #substream_generic>}
quote!{<#(#lf,)* #(#tp,)* #(#cst,)*>}
};
// Build the `where ...` clause of the trait implementation.
let where_clause = {
let mut additional = data_struct.fields.iter()
let additional = data_struct.fields.iter()
.filter(|x| !is_ignored(x))
.flat_map(|field| {
let ty = &field.ty;
vec![
quote!{#ty: #trait_to_impl},
quote!{Self: #net_behv_event_proc<<#ty as #trait_to_impl>::OutEvent>},
quote!{<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler: #protocols_handler<Substream = #substream_generic>},
// Note: this bound is required because of https://github.com/rust-lang/rust/issues/55697
quote!{<<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::InboundProtocol: ::libp2p::core::InboundUpgrade<::libp2p::core::Negotiated<#substream_generic>>},
quote!{<<<#ty as #trait_to_impl>::ProtocolsHandler as #into_protocols_handler>::Handler as #protocols_handler>::OutboundProtocol: ::libp2p::core::OutboundUpgrade<::libp2p::core::Negotiated<#substream_generic>>},
]
})
.collect::<Vec<_>>();
additional.push(quote!{#substream_generic: ::libp2p::futures::io::AsyncRead});
additional.push(quote!{#substream_generic: ::libp2p::futures::io::AsyncWrite});
additional.push(quote!{#substream_generic: Unpin});
if let Some(where_clause) = where_clause {
if where_clause.predicates.trailing_punct() {
Some(quote!{#where_clause #(#additional),*})

View File

@ -36,18 +36,18 @@ fn empty() {
fn one_field() {
#[allow(dead_code)]
#[derive(NetworkBehaviour)]
struct Foo<TSubstream> {
ping: libp2p::ping::Ping<TSubstream>,
struct Foo {
ping: libp2p::ping::Ping,
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {
}
}
#[allow(dead_code)]
fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>();
fn foo() {
require_net_behaviour::<Foo>();
}
}
@ -55,24 +55,24 @@ fn one_field() {
fn two_fields() {
#[allow(dead_code)]
#[derive(NetworkBehaviour)]
struct Foo<TSubstream> {
ping: libp2p::ping::Ping<TSubstream>,
identify: libp2p::identify::Identify<TSubstream>,
struct Foo {
ping: libp2p::ping::Ping,
identify: libp2p::identify::Identify,
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::identify::IdentifyEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::identify::IdentifyEvent> for Foo {
fn inject_event(&mut self, _: libp2p::identify::IdentifyEvent) {
}
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {
}
}
#[allow(dead_code)]
fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>();
fn foo() {
require_net_behaviour::<Foo>();
}
}
@ -80,32 +80,32 @@ fn two_fields() {
fn three_fields() {
#[allow(dead_code)]
#[derive(NetworkBehaviour)]
struct Foo<TSubstream> {
ping: libp2p::ping::Ping<TSubstream>,
identify: libp2p::identify::Identify<TSubstream>,
kad: libp2p::kad::Kademlia<TSubstream, libp2p::kad::record::store::MemoryStore>,
struct Foo {
ping: libp2p::ping::Ping,
identify: libp2p::identify::Identify,
kad: libp2p::kad::Kademlia<libp2p::kad::record::store::MemoryStore>,
#[behaviour(ignore)]
foo: String,
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {
}
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::identify::IdentifyEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::identify::IdentifyEvent> for Foo {
fn inject_event(&mut self, _: libp2p::identify::IdentifyEvent) {
}
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::kad::KademliaEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::kad::KademliaEvent> for Foo {
fn inject_event(&mut self, _: libp2p::kad::KademliaEvent) {
}
}
#[allow(dead_code)]
fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>();
fn foo() {
require_net_behaviour::<Foo>();
}
}
@ -114,28 +114,28 @@ fn custom_polling() {
#[allow(dead_code)]
#[derive(NetworkBehaviour)]
#[behaviour(poll_method = "foo")]
struct Foo<TSubstream> {
ping: libp2p::ping::Ping<TSubstream>,
identify: libp2p::identify::Identify<TSubstream>,
struct Foo {
ping: libp2p::ping::Ping,
identify: libp2p::identify::Identify,
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {
}
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::identify::IdentifyEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::identify::IdentifyEvent> for Foo {
fn inject_event(&mut self, _: libp2p::identify::IdentifyEvent) {
}
}
impl<TSubstream> Foo<TSubstream> {
impl Foo {
fn foo<T>(&mut self, _: &mut std::task::Context) -> std::task::Poll<libp2p::swarm::NetworkBehaviourAction<T, ()>> { std::task::Poll::Pending }
}
#[allow(dead_code)]
fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>();
fn foo() {
require_net_behaviour::<Foo>();
}
}
@ -144,24 +144,24 @@ fn custom_event_no_polling() {
#[allow(dead_code)]
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Vec<String>")]
struct Foo<TSubstream> {
ping: libp2p::ping::Ping<TSubstream>,
identify: libp2p::identify::Identify<TSubstream>,
struct Foo {
ping: libp2p::ping::Ping,
identify: libp2p::identify::Identify,
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {
}
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::identify::IdentifyEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::identify::IdentifyEvent> for Foo {
fn inject_event(&mut self, _: libp2p::identify::IdentifyEvent) {
}
}
#[allow(dead_code)]
fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>();
fn foo() {
require_net_behaviour::<Foo>();
}
}
@ -170,28 +170,28 @@ fn custom_event_and_polling() {
#[allow(dead_code)]
#[derive(NetworkBehaviour)]
#[behaviour(poll_method = "foo", out_event = "String")]
struct Foo<TSubstream> {
ping: libp2p::ping::Ping<TSubstream>,
identify: libp2p::identify::Identify<TSubstream>,
struct Foo {
ping: libp2p::ping::Ping,
identify: libp2p::identify::Identify,
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for Foo {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {
}
}
impl<TSubstream> libp2p::swarm::NetworkBehaviourEventProcess<libp2p::identify::IdentifyEvent> for Foo<TSubstream> {
impl libp2p::swarm::NetworkBehaviourEventProcess<libp2p::identify::IdentifyEvent> for Foo {
fn inject_event(&mut self, _: libp2p::identify::IdentifyEvent) {
}
}
impl<TSubstream> Foo<TSubstream> {
impl Foo {
fn foo<T>(&mut self, _: &mut std::task::Context) -> std::task::Poll<libp2p::swarm::NetworkBehaviourAction<T, String>> { std::task::Poll::Pending }
}
#[allow(dead_code)]
fn foo<TSubstream: libp2p::futures::AsyncRead + libp2p::futures::AsyncWrite + Send + Unpin + 'static>() {
require_net_behaviour::<Foo<TSubstream>>();
fn foo() {
require_net_behaviour::<Foo>();
}
}
@ -199,25 +199,8 @@ fn custom_event_and_polling() {
fn where_clause() {
#[allow(dead_code)]
#[derive(NetworkBehaviour)]
struct Foo<TSubstream> where TSubstream: std::fmt::Debug {
ping: libp2p::ping::Ping<TSubstream>,
}
#[allow(dead_code)]
#[derive(NetworkBehaviour)]
struct Bar<TSubstream: std::fmt::Debug> {
ping: libp2p::ping::Ping<TSubstream>,
}
#[allow(dead_code)]
#[derive(NetworkBehaviour)]
struct Baz<TSubstream> where TSubstream: std::fmt::Debug + Clone, {
ping: libp2p::ping::Ping<TSubstream>,
}
#[allow(dead_code)]
#[derive(NetworkBehaviour)]
struct Qux<TSubstream: std::fmt::Debug> where TSubstream: Clone {
ping: libp2p::ping::Ping<TSubstream>,
struct Foo<T: Copy> {
ping: libp2p::ping::Ping,
bar: T,
}
}

View File

@ -30,14 +30,14 @@ use libp2p_swarm::{
};
use log::warn;
use smallvec::SmallVec;
use std::{cmp, fmt, io, iter, marker::PhantomData, mem, pin::Pin, time::Duration, task::Context, task::Poll};
use std::{cmp, fmt, io, iter, mem, pin::Pin, time::Duration, task::Context, task::Poll};
use wasm_timer::{Delay, Instant};
const MDNS_RESPONSE_TTL: std::time::Duration = Duration::from_secs(5 * 60);
/// A `NetworkBehaviour` for mDNS. Automatically discovers peers on the local network and adds
/// them to the topology.
pub struct Mdns<TSubstream> {
pub struct Mdns {
/// The inner service.
service: MaybeBusyMdnsService,
@ -51,9 +51,6 @@ pub struct Mdns<TSubstream> {
///
/// `None` if `discovered_nodes` is empty.
closest_expiration: Option<Delay>,
/// Marker to pin the generic.
marker: PhantomData<TSubstream>,
}
/// `MdnsService::next` takes ownership of `self`, returning a future that resolves with both itself
@ -86,14 +83,13 @@ impl fmt::Debug for MaybeBusyMdnsService {
}
}
impl<TSubstream> Mdns<TSubstream> {
impl Mdns {
/// Builds a new `Mdns` behaviour.
pub fn new() -> io::Result<Mdns<TSubstream>> {
pub fn new() -> io::Result<Mdns> {
Ok(Mdns {
service: MaybeBusyMdnsService::Free(MdnsService::new()?),
discovered_nodes: SmallVec::new(),
closest_expiration: None,
marker: PhantomData,
})
}
@ -174,11 +170,8 @@ impl fmt::Debug for ExpiredAddrsIter {
}
}
impl<TSubstream> NetworkBehaviour for Mdns<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
type ProtocolsHandler = DummyProtocolsHandler<TSubstream>;
impl NetworkBehaviour for Mdns {
type ProtocolsHandler = DummyProtocolsHandler;
type OutEvent = MdnsEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
@ -344,7 +337,7 @@ where
}
}
impl<TSubstream> fmt::Debug for Mdns<TSubstream> {
impl fmt::Debug for Mdns {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Mdns")
.field("service", &self.service)

View File

@ -22,7 +22,6 @@ use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubs
use crate::topic::Topic;
use cuckoofilter::CuckooFilter;
use fnv::FnvHashSet;
use futures::prelude::*;
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::{
NetworkBehaviour,
@ -33,12 +32,12 @@ use libp2p_swarm::{
};
use rand;
use smallvec::SmallVec;
use std::{collections::VecDeque, iter, marker::PhantomData};
use std::{collections::VecDeque, iter};
use std::collections::hash_map::{DefaultHasher, HashMap};
use std::task::{Context, Poll};
/// Network behaviour that handles the floodsub protocol.
pub struct Floodsub<TSubstream> {
pub struct Floodsub {
/// Events that need to be yielded to the outside when polling.
events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
@ -60,12 +59,9 @@ pub struct Floodsub<TSubstream> {
// We keep track of the messages we received (in the format `hash(source ID, seq_no)`) so that
// we don't dispatch the same message twice if we receive it twice on the network.
received: CuckooFilter<DefaultHasher>,
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
}
impl<TSubstream> Floodsub<TSubstream> {
impl Floodsub {
/// Creates a `Floodsub`.
pub fn new(local_peer_id: PeerId) -> Self {
Floodsub {
@ -75,7 +71,6 @@ impl<TSubstream> Floodsub<TSubstream> {
connected_peers: HashMap::new(),
subscribed_topics: SmallVec::new(),
received: CuckooFilter::new(),
marker: PhantomData,
}
}
@ -108,9 +103,7 @@ impl<TSubstream> Floodsub<TSubstream> {
pub fn remove_node_from_partial_view(&mut self, peer_id: &PeerId) {
self.target_peers.remove(&peer_id);
}
}
impl<TSubstream> Floodsub<TSubstream> {
/// Subscribes to a topic.
///
/// Returns true if the subscription worked. Returns false if we were already subscribed.
@ -226,11 +219,8 @@ impl<TSubstream> Floodsub<TSubstream> {
}
}
impl<TSubstream> NetworkBehaviour for Floodsub<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type ProtocolsHandler = OneShotHandler<TSubstream, FloodsubConfig, FloodsubRpc, InnerMessage>;
impl NetworkBehaviour for Floodsub {
type ProtocolsHandler = OneShotHandler<FloodsubConfig, FloodsubRpc, InnerMessage>;
type OutEvent = FloodsubEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {

View File

@ -19,6 +19,5 @@
// DEALINGS IN THE SOFTWARE.
fn main() {
prost_build::compile_protos(&["src/rpc.proto"], &["src"]).unwrap();
prost_build::compile_protos(&["src/rpc.proto"], &["src"]).unwrap();
}

View File

@ -38,7 +38,6 @@ use std::{
collections::HashSet,
collections::VecDeque,
iter,
marker::PhantomData,
sync::Arc,
task::{Context, Poll},
};
@ -47,7 +46,7 @@ use wasm_timer::{Instant, Interval};
mod tests;
/// Network behaviour that handles the gossipsub protocol.
pub struct Gossipsub<TSubstream> {
pub struct Gossipsub {
/// Configuration providing gossipsub performance parameters.
config: GossipsubConfig,
@ -84,12 +83,9 @@ pub struct Gossipsub<TSubstream> {
/// Heartbeat interval stream.
heartbeat: Interval,
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
}
impl<TSubstream> Gossipsub<TSubstream> {
impl Gossipsub {
/// Creates a `Gossipsub` struct given a set of parameters specified by `gs_config`.
pub fn new(local_peer_id: PeerId, gs_config: GossipsubConfig) -> Self {
let local_peer_id = if gs_config.no_source_id {
@ -118,7 +114,6 @@ impl<TSubstream> Gossipsub<TSubstream> {
Instant::now() + gs_config.heartbeat_initial_delay,
gs_config.heartbeat_interval,
),
marker: PhantomData,
}
}
@ -987,11 +982,8 @@ impl<TSubstream> Gossipsub<TSubstream> {
}
}
impl<TSubstream> NetworkBehaviour for Gossipsub<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type ProtocolsHandler = GossipsubHandler<TSubstream>;
impl NetworkBehaviour for Gossipsub {
type ProtocolsHandler = GossipsubHandler;
type OutEvent = GossipsubEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {

View File

@ -18,13 +18,11 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
// collection of tests for the gossipsub network behaviour
#[cfg(test)]
mod tests {
use super::super::*;
use async_std::net::TcpStream;
// helper functions for testing
@ -34,15 +32,11 @@ mod tests {
peer_no: usize,
topics: Vec<String>,
to_subscribe: bool,
) -> (
Gossipsub<TcpStream>,
Vec<PeerId>,
Vec<TopicHash>,
) {
) -> (Gossipsub, Vec<PeerId>, Vec<TopicHash>) {
// generate a default GossipsubConfig
let gs_config = GossipsubConfig::default();
// create a gossipsub struct
let mut gs: Gossipsub<TcpStream> = Gossipsub::new(PeerId::random(), gs_config);
let mut gs: Gossipsub = Gossipsub::new(PeerId::random(), gs_config);
let mut topic_hashes = vec![];
@ -62,7 +56,7 @@ mod tests {
for _ in 0..peer_no {
let peer = PeerId::random();
peers.push(peer.clone());
<Gossipsub<TcpStream> as NetworkBehaviour>::inject_connected(
<Gossipsub as NetworkBehaviour>::inject_connected(
&mut gs,
peer.clone(),
dummy_connected_point.clone(),
@ -572,7 +566,7 @@ mod tests {
// generate a default GossipsubConfig
let gs_config = GossipsubConfig::default();
// create a gossipsub struct
let mut gs: Gossipsub<usize> = Gossipsub::new(PeerId::random(), gs_config);
let mut gs: Gossipsub = Gossipsub::new(PeerId::random(), gs_config);
// create a topic and fill it with some peers
let topic_hash = Topic::new("Test".into()).no_hash().clone();
@ -584,25 +578,25 @@ mod tests {
gs.topic_peers.insert(topic_hash.clone(), peers.clone());
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 5, { |_| true });
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 5, { |_| true });
assert!(random_peers.len() == 5, "Expected 5 peers to be returned");
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 30, { |_| true });
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 30, { |_| true });
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(random_peers == peers, "Expected no shuffling");
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 20, { |_| true });
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 20, { |_| true });
assert!(random_peers.len() == 20, "Expected 20 peers to be returned");
assert!(random_peers == peers, "Expected no shuffling");
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 0, { |_| true });
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 0, { |_| true });
assert!(random_peers.len() == 0, "Expected 0 peers to be returned");
// test the filter
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 5, { |_| false });
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 5, { |_| false });
assert!(random_peers.len() == 0, "Expected 0 peers to be returned");
let random_peers =
Gossipsub::<usize>::get_random_peers(&gs.topic_peers, &topic_hash, 10, {
Gossipsub::get_random_peers(&gs.topic_peers, &topic_hash, 10, {
|peer| peers.contains(peer)
});
assert!(random_peers.len() == 10, "Expected 10 peers to be returned");

View File

@ -22,10 +22,11 @@ use crate::behaviour::GossipsubRpc;
use crate::protocol::{GossipsubCodec, ProtocolConfig};
use futures::prelude::*;
use futures_codec::Framed;
use libp2p_core::upgrade::{InboundUpgrade, Negotiated, OutboundUpgrade};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade};
use libp2p_swarm::protocols_handler::{
KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol,
};
use libp2p_swarm::NegotiatedSubstream;
use log::{debug, trace, warn};
use smallvec::SmallVec;
use std::{
@ -36,18 +37,15 @@ use std::{
};
/// Protocol Handler that manages a single long-lived substream with a peer.
pub struct GossipsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
pub struct GossipsubHandler {
/// Upgrade configuration for the gossipsub protocol.
listen_protocol: SubstreamProtocol<ProtocolConfig>,
/// The single long-lived outbound substream.
outbound_substream: Option<OutboundSubstreamState<TSubstream>>,
outbound_substream: Option<OutboundSubstreamState>,
/// The single long-lived inbound substream.
inbound_substream: Option<InboundSubstreamState<TSubstream>>,
inbound_substream: Option<InboundSubstreamState>,
/// Queue of values that we want to send to the remote.
send_queue: SmallVec<[GossipsubRpc; 16]>,
@ -57,39 +55,30 @@ where
}
/// State of the inbound substream, opened either by us or by the remote.
enum InboundSubstreamState<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
enum InboundSubstreamState {
/// Waiting for a message from the remote. The idle state for an inbound substream.
WaitingInput(Framed<Negotiated<TSubstream>, GossipsubCodec>),
WaitingInput(Framed<NegotiatedSubstream, GossipsubCodec>),
/// The substream is being closed.
Closing(Framed<Negotiated<TSubstream>, GossipsubCodec>),
Closing(Framed<NegotiatedSubstream, GossipsubCodec>),
/// An error occurred during processing.
Poisoned,
}
/// State of the outbound substream, opened either by us or by the remote.
enum OutboundSubstreamState<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
enum OutboundSubstreamState {
/// Waiting for the user to send a message. The idle state for an outbound substream.
WaitingOutput(Framed<Negotiated<TSubstream>, GossipsubCodec>),
WaitingOutput(Framed<NegotiatedSubstream, GossipsubCodec>),
/// Waiting to send a message to the remote.
PendingSend(Framed<Negotiated<TSubstream>, GossipsubCodec>, GossipsubRpc),
PendingSend(Framed<NegotiatedSubstream, GossipsubCodec>, GossipsubRpc),
/// Waiting to flush the substream so that the data arrives to the remote.
PendingFlush(Framed<Negotiated<TSubstream>, GossipsubCodec>),
PendingFlush(Framed<NegotiatedSubstream, GossipsubCodec>),
/// The substream is being closed. Used by either substream.
_Closing(Framed<Negotiated<TSubstream>, GossipsubCodec>),
_Closing(Framed<NegotiatedSubstream, GossipsubCodec>),
/// An error occurred during processing.
Poisoned,
}
impl<TSubstream> GossipsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
impl GossipsubHandler {
/// Builds a new `GossipsubHandler`.
pub fn new(protocol_id: impl Into<Cow<'static, [u8]>>, max_transmit_size: usize) -> Self {
GossipsubHandler {
@ -105,10 +94,7 @@ where
}
}
impl<TSubstream> Default for GossipsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
impl Default for GossipsubHandler {
fn default() -> Self {
GossipsubHandler {
listen_protocol: SubstreamProtocol::new(ProtocolConfig::default()),
@ -120,14 +106,10 @@ where
}
}
impl<TSubstream> ProtocolsHandler for GossipsubHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
impl ProtocolsHandler for GossipsubHandler {
type InEvent = GossipsubRpc;
type OutEvent = GossipsubRpc;
type Error = io::Error;
type Substream = TSubstream;
type InboundProtocol = ProtocolConfig;
type OutboundProtocol = ProtocolConfig;
type OutboundOpenInfo = GossipsubRpc;
@ -138,7 +120,7 @@ where
fn inject_fully_negotiated_inbound(
&mut self,
substream: <Self::InboundProtocol as InboundUpgrade<Negotiated<TSubstream>>>::Output,
substream: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
) {
// new inbound substream. Replace the current one, if it exists.
trace!("New inbound substream request");
@ -147,7 +129,7 @@ where
fn inject_fully_negotiated_outbound(
&mut self,
substream: <Self::OutboundProtocol as OutboundUpgrade<Negotiated<TSubstream>>>::Output,
substream: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
message: Self::OutboundOpenInfo,
) {
// Should never establish a new outbound substream if one already exists.
@ -169,7 +151,7 @@ where
&mut self,
_: Self::OutboundOpenInfo,
_: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error,
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
// Ignore upgrade errors for now.

View File

@ -33,20 +33,16 @@ use libp2p_core::{
identity,
multiaddr::Protocol,
muxing::StreamMuxerBox,
nodes::Substream,
transport::{boxed::Boxed, MemoryTransport},
upgrade, Multiaddr, PeerId, Transport,
transport::MemoryTransport,
upgrade, Multiaddr, Transport,
};
use libp2p_gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic};
use libp2p_plaintext::PlainText2Config;
use libp2p_swarm::Swarm;
use libp2p_yamux as yamux;
type TestSwarm =
Swarm<Boxed<(PeerId, StreamMuxerBox), Error>, Gossipsub<Substream<StreamMuxerBox>>>;
struct Graph {
pub nodes: Vec<(Multiaddr, TestSwarm)>,
pub nodes: Vec<(Multiaddr, Swarm<Gossipsub>)>,
}
impl Future for Graph {
@ -77,7 +73,7 @@ impl Graph {
.cycle()
.take(num_nodes)
.map(|_| build_node())
.collect::<Vec<(Multiaddr, TestSwarm)>>();
.collect::<Vec<(Multiaddr, Swarm<Gossipsub>)>>();
let mut connected_nodes = vec![not_connected_nodes.pop().unwrap()];
@ -137,7 +133,7 @@ impl Graph {
}
}
fn build_node() -> (Multiaddr, TestSwarm) {
fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
let key = identity::Keypair::generate_ed25519();
let public_key = key.public();
@ -148,7 +144,7 @@ fn build_node() -> (Multiaddr, TestSwarm) {
})
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| panic!("Failed to create transport: {:?}", e))
.map_err(|e| -> Error { panic!("Failed to create transport: {:?}", e) })
.boxed();
let peer_id = public_key.clone().into_peer_id();

View File

@ -23,10 +23,10 @@ use futures::prelude::*;
use libp2p_core::upgrade::{
InboundUpgrade,
OutboundUpgrade,
ReadOneError,
Negotiated
ReadOneError
};
use libp2p_swarm::{
NegotiatedSubstream,
KeepAlive,
SubstreamProtocol,
ProtocolsHandler,
@ -34,7 +34,7 @@ use libp2p_swarm::{
ProtocolsHandlerUpgrErr
};
use smallvec::SmallVec;
use std::{marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration};
use std::{pin::Pin, task::Context, task::Poll, time::Duration};
use wasm_timer::Delay;
/// Delay between the moment we connect and the first time we identify.
@ -49,35 +49,32 @@ const TRY_AGAIN_ON_ERR: Duration = Duration::from_secs(60 * 60);
/// Outbound requests are sent periodically. The handler performs expects
/// at least one identification request to be answered by the remote before
/// permitting the underlying connection to be closed.
pub struct IdentifyHandler<TSubstream> {
pub struct IdentifyHandler {
/// Configuration for the protocol.
config: IdentifyProtocolConfig,
/// Pending events to yield.
events: SmallVec<[IdentifyHandlerEvent<TSubstream>; 4]>,
events: SmallVec<[IdentifyHandlerEvent; 4]>,
/// Future that fires when we need to identify the node again.
next_id: Delay,
/// Whether the handler should keep the connection alive.
keep_alive: KeepAlive,
/// Marker for strong typing.
marker: PhantomData<TSubstream>,
}
/// Event produced by the `IdentifyHandler`.
#[derive(Debug)]
pub enum IdentifyHandlerEvent<TSubstream> {
pub enum IdentifyHandlerEvent {
/// We obtained identification information from the remote
Identified(RemoteInfo),
/// We received a request for identification.
Identify(ReplySubstream<Negotiated<TSubstream>>),
Identify(ReplySubstream<NegotiatedSubstream>),
/// Failed to identify the remote.
IdentificationError(ProtocolsHandlerUpgrErr<ReadOneError>),
}
impl<TSubstream> IdentifyHandler<TSubstream> {
impl IdentifyHandler {
/// Creates a new `IdentifyHandler`.
pub fn new() -> Self {
IdentifyHandler {
@ -85,19 +82,14 @@ impl<TSubstream> IdentifyHandler<TSubstream> {
events: SmallVec::new(),
next_id: Delay::new(DELAY_TO_FIRST_ID),
keep_alive: KeepAlive::Yes,
marker: PhantomData,
}
}
}
impl<TSubstream> ProtocolsHandler for IdentifyHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
impl ProtocolsHandler for IdentifyHandler {
type InEvent = ();
type OutEvent = IdentifyHandlerEvent<TSubstream>;
type OutEvent = IdentifyHandlerEvent;
type Error = ReadOneError;
type Substream = TSubstream;
type InboundProtocol = IdentifyProtocolConfig;
type OutboundProtocol = IdentifyProtocolConfig;
type OutboundOpenInfo = ();
@ -108,14 +100,14 @@ where
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<Negotiated<TSubstream>>>::Output
protocol: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
) {
self.events.push(IdentifyHandlerEvent::Identify(protocol))
}
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<Negotiated<TSubstream>>>::Output,
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_info: Self::OutboundOpenInfo,
) {
self.events.push(IdentifyHandlerEvent::Identified(protocol));
@ -128,7 +120,7 @@ where
&mut self,
_info: Self::OutboundOpenInfo,
err: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Self::Substream>>::Error
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error
>
) {
self.events.push(IdentifyHandlerEvent::IdentificationError(err));
@ -144,7 +136,7 @@ where
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
IdentifyHandlerEvent<TSubstream>,
IdentifyHandlerEvent,
Self::Error,
>,
> {

View File

@ -26,9 +26,10 @@ use libp2p_core::{
Multiaddr,
PeerId,
PublicKey,
upgrade::{Negotiated, ReadOneError, UpgradeError}
upgrade::{ReadOneError, UpgradeError}
};
use libp2p_swarm::{
NegotiatedSubstream,
NetworkBehaviour,
NetworkBehaviourAction,
PollParameters,
@ -39,7 +40,7 @@ use std::{collections::HashMap, collections::VecDeque, io, pin::Pin, task::Conte
/// Network behaviour that automatically identifies nodes periodically, returns information
/// about them, and answers identify queries from other nodes.
pub struct Identify<TSubstream> {
pub struct Identify {
/// Protocol version to send back to remotes.
protocol_version: String,
/// Agent version to send back to remotes.
@ -49,17 +50,17 @@ pub struct Identify<TSubstream> {
/// For each peer we're connected to, the observed address to send back to it.
observed_addresses: HashMap<PeerId, Multiaddr>,
/// Pending replies to send.
pending_replies: VecDeque<Reply<TSubstream>>,
pending_replies: VecDeque<Reply>,
/// Pending events to be emitted when polled.
events: VecDeque<NetworkBehaviourAction<(), IdentifyEvent>>,
}
/// A pending reply to an inbound identification request.
enum Reply<TSubstream> {
enum Reply {
/// The reply is queued for sending.
Queued {
peer: PeerId,
io: ReplySubstream<Negotiated<TSubstream>>,
io: ReplySubstream<NegotiatedSubstream>,
observed: Multiaddr
},
/// The reply is being sent.
@ -69,7 +70,7 @@ enum Reply<TSubstream> {
}
}
impl<TSubstream> Identify<TSubstream> {
impl Identify {
/// Creates a new `Identify` network behaviour.
pub fn new(protocol_version: String, agent_version: String, local_public_key: PublicKey) -> Self {
Identify {
@ -83,11 +84,8 @@ impl<TSubstream> Identify<TSubstream> {
}
}
impl<TSubstream> NetworkBehaviour for Identify<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type ProtocolsHandler = IdentifyHandler<TSubstream>;
impl NetworkBehaviour for Identify {
type ProtocolsHandler = IdentifyHandler;
type OutEvent = IdentifyEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {

View File

@ -31,19 +31,18 @@ use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{Query, QueryId, QueryPool, QueryConfig, QueryPoolState};
use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord};
use fnv::{FnvHashMap, FnvHashSet};
use futures::prelude::*;
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler};
use log::{info, debug, warn};
use smallvec::SmallVec;
use std::{borrow::Cow, error, iter, marker::PhantomData, time::Duration};
use std::{borrow::Cow, error, iter, time::Duration};
use std::collections::VecDeque;
use std::num::NonZeroUsize;
use std::task::{Context, Poll};
use wasm_timer::Instant;
/// Network behaviour that handles Kademlia.
pub struct Kademlia<TSubstream, TStore> {
pub struct Kademlia<TStore> {
/// The Kademlia routing table.
kbuckets: KBucketsTable<kbucket::Key<PeerId>, Addresses>,
@ -75,9 +74,6 @@ pub struct Kademlia<TSubstream, TStore> {
/// Queued events to return when the behaviour is being polled.
queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,
/// Marker to pin the generics.
marker: PhantomData<TSubstream>,
/// The record storage.
store: TStore,
}
@ -217,7 +213,7 @@ impl KademliaConfig {
}
}
impl<TSubstream, TStore> Kademlia<TSubstream, TStore>
impl<TStore> Kademlia<TStore>
where
for<'a> TStore: RecordStore<'a>
{
@ -255,7 +251,6 @@ where
put_record_job,
record_ttl: config.record_ttl,
provider_record_ttl: config.provider_record_ttl,
marker: PhantomData,
}
}
@ -1008,12 +1003,12 @@ where
}
}
impl<TSubstream, TStore> NetworkBehaviour for Kademlia<TSubstream, TStore>
impl<TStore> NetworkBehaviour for Kademlia<TStore>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
for<'a> TStore: RecordStore<'a>,
TStore: Send + 'static,
{
type ProtocolsHandler = KademliaHandler<TSubstream, QueryId>;
type ProtocolsHandler = KademliaHandler<QueryId>;
type OutEvent = KademliaEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {

View File

@ -34,8 +34,7 @@ use libp2p_core::{
PeerId,
Transport,
identity,
transport::{MemoryTransport, boxed::Boxed},
nodes::Substream,
transport::MemoryTransport,
multiaddr::{Protocol, multiaddr},
muxing::StreamMuxerBox,
upgrade
@ -48,10 +47,7 @@ use rand::{Rng, random, thread_rng};
use std::{collections::{HashSet, HashMap}, io, num::NonZeroUsize, u64};
use multihash::{Multihash, Hash::SHA2256};
type TestSwarm = Swarm<
Boxed<(PeerId, StreamMuxerBox), io::Error>,
Kademlia<Substream<StreamMuxerBox>, MemoryStore>
>;
type TestSwarm = Swarm<Kademlia<MemoryStore>>;
/// Builds swarms, each listening on a port. Does *not* connect the nodes together.
fn build_nodes(num: usize) -> (u64, Vec<TestSwarm>) {
@ -71,7 +67,7 @@ fn build_nodes_with_config(num: usize, cfg: KademliaConfig) -> (u64, Vec<TestSwa
.authenticate(SecioConfig::new(local_key))
.multiplex(yamux::Config::default())
.map(|(p, m), _| (p, StreamMuxerBox::new(m)))
.map_err(|e| panic!("Failed to create transport: {:?}", e))
.map_err(|e| -> io::Error { panic!("Failed to create transport: {:?}", e); })
.boxed();
let local_id = local_public_key.clone().into_peer_id();

View File

@ -25,6 +25,7 @@ use crate::protocol::{
use crate::record::{self, Record};
use futures::prelude::*;
use libp2p_swarm::{
NegotiatedSubstream,
KeepAlive,
SubstreamProtocol,
ProtocolsHandler,
@ -33,7 +34,7 @@ use libp2p_swarm::{
};
use libp2p_core::{
either::EitherOutput,
upgrade::{self, InboundUpgrade, OutboundUpgrade, Negotiated}
upgrade::{self, InboundUpgrade, OutboundUpgrade}
};
use log::trace;
use std::{borrow::Cow, error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration};
@ -45,10 +46,7 @@ use wasm_timer::Instant;
/// make.
///
/// It also handles requests made by the remote.
pub struct KademliaHandler<TSubstream, TUserData>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
pub struct KademliaHandler<TUserData> {
/// Configuration for the Kademlia protocol.
config: KademliaProtocolConfig,
@ -59,51 +57,45 @@ where
next_connec_unique_id: UniqueConnecId,
/// List of active substreams with the state they are in.
substreams: Vec<SubstreamState<TSubstream, TUserData>>,
substreams: Vec<SubstreamState<TUserData>>,
/// Until when to keep the connection alive.
keep_alive: KeepAlive,
}
/// State of an active substream, opened either by us or by the remote.
enum SubstreamState<TSubstream, TUserData>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
enum SubstreamState<TUserData> {
/// We haven't started opening the outgoing substream yet.
/// Contains the request we want to send, and the user data if we expect an answer.
OutPendingOpen(KadRequestMsg, Option<TUserData>),
/// Waiting to send a message to the remote.
OutPendingSend(
KadOutStreamSink<Negotiated<TSubstream>>,
KadOutStreamSink<NegotiatedSubstream>,
KadRequestMsg,
Option<TUserData>,
),
/// Waiting to flush the substream so that the data arrives to the remote.
OutPendingFlush(KadOutStreamSink<Negotiated<TSubstream>>, Option<TUserData>),
OutPendingFlush(KadOutStreamSink<NegotiatedSubstream>, Option<TUserData>),
/// Waiting for an answer back from the remote.
// TODO: add timeout
OutWaitingAnswer(KadOutStreamSink<Negotiated<TSubstream>>, TUserData),
OutWaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
/// An error happened on the substream and we should report the error to the user.
OutReportError(KademliaHandlerQueryErr, TUserData),
/// The substream is being closed.
OutClosing(KadOutStreamSink<Negotiated<TSubstream>>),
OutClosing(KadOutStreamSink<NegotiatedSubstream>),
/// Waiting for a request from the remote.
InWaitingMessage(UniqueConnecId, KadInStreamSink<Negotiated<TSubstream>>),
InWaitingMessage(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
/// Waiting for the user to send a `KademliaHandlerIn` event containing the response.
InWaitingUser(UniqueConnecId, KadInStreamSink<Negotiated<TSubstream>>),
InWaitingUser(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
/// Waiting to send an answer back to the remote.
InPendingSend(UniqueConnecId, KadInStreamSink<Negotiated<TSubstream>>, KadResponseMsg),
InPendingSend(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>, KadResponseMsg),
/// Waiting to flush an answer back to the remote.
InPendingFlush(UniqueConnecId, KadInStreamSink<Negotiated<TSubstream>>),
InPendingFlush(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
/// The substream is being closed.
InClosing(KadInStreamSink<Negotiated<TSubstream>>),
InClosing(KadInStreamSink<NegotiatedSubstream>),
}
impl<TSubstream, TUserData> SubstreamState<TSubstream, TUserData>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
impl<TUserData> SubstreamState<TUserData> {
/// Tries to close the substream.
///
/// If the substream is not ready to be closed, returns it back.
@ -379,10 +371,7 @@ pub struct KademliaRequestId {
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
struct UniqueConnecId(u64);
impl<TSubstream, TUserData> KademliaHandler<TSubstream, TUserData>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
impl<TUserData> KademliaHandler<TUserData> {
/// Create a `KademliaHandler` that only allows sending messages to the remote but denying
/// incoming connections.
pub fn dial_only() -> Self {
@ -415,25 +404,20 @@ where
}
}
impl<TSubstream, TUserData> Default for KademliaHandler<TSubstream, TUserData>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
impl<TUserData> Default for KademliaHandler<TUserData> {
#[inline]
fn default() -> Self {
KademliaHandler::dial_and_listen()
}
}
impl<TSubstream, TUserData> ProtocolsHandler for KademliaHandler<TSubstream, TUserData>
impl<TUserData> ProtocolsHandler for KademliaHandler<TUserData>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
TUserData: Clone,
TUserData: Clone + Send + 'static,
{
type InEvent = KademliaHandlerIn<TUserData>;
type OutEvent = KademliaHandlerEvent<TUserData>;
type Error = io::Error; // TODO: better error type?
type Substream = TSubstream;
type InboundProtocol = upgrade::EitherUpgrade<KademliaProtocolConfig, upgrade::DeniedUpgrade>;
type OutboundProtocol = KademliaProtocolConfig;
// Message of the request to send to the remote, and user data if we expect an answer.
@ -450,7 +434,7 @@ where
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<Negotiated<TSubstream>>>::Output,
protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
(msg, user_data): Self::OutboundOpenInfo,
) {
self.substreams
@ -459,7 +443,7 @@ where
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<Negotiated<TSubstream>>>::Output,
protocol: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
) {
// If `self.allow_listening` is false, then we produced a `DeniedUpgrade` and `protocol`
// is a `Void`.
@ -695,12 +679,12 @@ where
///
/// Returns the new state for that substream, an event to generate, and whether the substream
/// should be polled again.
fn advance_substream<TSubstream, TUserData>(
state: SubstreamState<TSubstream, TUserData>,
fn advance_substream<TUserData>(
state: SubstreamState<TUserData>,
upgrade: KademliaProtocolConfig,
cx: &mut Context,
) -> (
Option<SubstreamState<TSubstream, TUserData>>,
Option<SubstreamState<TUserData>>,
Option<
ProtocolsHandlerEvent<
KademliaProtocolConfig,
@ -711,8 +695,6 @@ fn advance_substream<TSubstream, TUserData>(
>,
bool,
)
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
match state {
SubstreamState::OutPendingOpen(msg, user_data) => {

View File

@ -157,7 +157,7 @@ impl Error for PingFailure {
/// and answering ping queries.
///
/// If the remote doesn't respond, produces an error that closes the connection.
pub struct PingHandler<TSubstream> {
pub struct PingHandler {
/// Configuration options.
config: PingConfig,
/// The timer for when to send the next ping.
@ -167,10 +167,9 @@ pub struct PingHandler<TSubstream> {
pending_results: VecDeque<PingResult>,
/// The number of consecutive ping failures that occurred.
failures: u32,
_marker: std::marker::PhantomData<TSubstream>
}
impl<TSubstream> PingHandler<TSubstream> {
impl PingHandler {
/// Builds a new `PingHandler` with the given configuration.
pub fn new(config: PingConfig) -> Self {
PingHandler {
@ -178,19 +177,14 @@ impl<TSubstream> PingHandler<TSubstream> {
next_ping: Delay::new(Duration::new(0, 0)),
pending_results: VecDeque::with_capacity(2),
failures: 0,
_marker: std::marker::PhantomData
}
}
}
impl<TSubstream> ProtocolsHandler for PingHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
impl ProtocolsHandler for PingHandler {
type InEvent = Void;
type OutEvent = PingResult;
type Error = PingFailure;
type Substream = TSubstream;
type InboundProtocol = protocol::Ping;
type OutboundProtocol = protocol::Ping;
type OutboundOpenInfo = ();
@ -265,7 +259,6 @@ where
mod tests {
use super::*;
use async_std::net::TcpStream;
use futures::future;
use quickcheck::*;
use rand::Rng;
@ -279,7 +272,7 @@ mod tests {
}
}
fn tick(h: &mut PingHandler<TcpStream>)
fn tick(h: &mut PingHandler)
-> ProtocolsHandlerEvent<protocol::Ping, (), PingResult, PingFailure>
{
async_std::task::block_on(future::poll_fn(|cx| h.poll(cx) ))
@ -288,7 +281,7 @@ mod tests {
#[test]
fn ping_interval() {
fn prop(cfg: PingConfig, ping_rtt: Duration) -> bool {
let mut h = PingHandler::<TcpStream>::new(cfg);
let mut h = PingHandler::new(cfg);
// Send ping
match tick(&mut h) {
@ -318,7 +311,7 @@ mod tests {
#[test]
fn max_failures() {
let cfg = PingConfig::arbitrary(&mut StdGen::new(rand::thread_rng(), 100));
let mut h = PingHandler::<TcpStream>::new(cfg);
let mut h = PingHandler::new(cfg);
for _ in 0 .. h.config.max_failures.get() - 1 {
h.inject_dial_upgrade_error((), ProtocolsHandlerUpgrErr::Timeout);
match tick(&mut h) {

View File

@ -47,22 +47,20 @@ pub mod handler;
pub use handler::{PingConfig, PingResult, PingSuccess, PingFailure};
use handler::PingHandler;
use futures::prelude::*;
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use std::{collections::VecDeque, marker::PhantomData, task::Context, task::Poll};
use std::{collections::VecDeque, task::Context, task::Poll};
use void::Void;
/// `Ping` is a [`NetworkBehaviour`] that responds to inbound pings and
/// periodically sends outbound pings on every established connection.
///
/// See the crate root documentation for more information.
pub struct Ping<TSubstream> {
pub struct Ping {
/// Configuration for outbound pings.
config: PingConfig,
/// Queue of events to yield to the swarm.
events: VecDeque<PingEvent>,
_marker: PhantomData<TSubstream>,
}
/// Event generated by the `Ping` network behaviour.
@ -74,28 +72,24 @@ pub struct PingEvent {
pub result: PingResult,
}
impl<TSubstream> Ping<TSubstream> {
impl Ping {
/// Creates a new `Ping` network behaviour with the given configuration.
pub fn new(config: PingConfig) -> Self {
Ping {
config,
events: VecDeque::new(),
_marker: PhantomData,
}
}
}
impl<TSubstream> Default for Ping<TSubstream> {
impl Default for Ping {
fn default() -> Self {
Ping::new(PingConfig::new())
}
}
impl<TSubstream> NetworkBehaviour for Ping<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{
type ProtocolsHandler = PingHandler<TSubstream>;
impl NetworkBehaviour for Ping {
type ProtocolsHandler = PingHandler;
type OutEvent = PingEvent;
fn new_handler(&mut self) -> Self::ProtocolsHandler {

View File

@ -26,11 +26,10 @@ use libp2p_core::{
identity,
muxing::StreamMuxerBox,
transport::{Transport, boxed::Boxed},
either::EitherError,
upgrade::{self, UpgradeError}
upgrade
};
use libp2p_ping::*;
use libp2p_secio::{SecioConfig, SecioError};
use libp2p_secio::SecioConfig;
use libp2p_swarm::Swarm;
use libp2p_tcp::TcpConfig;
use futures::{prelude::*, channel::mpsc};
@ -93,7 +92,7 @@ fn mk_transport() -> (
PeerId,
Boxed<
(PeerId, StreamMuxerBox),
EitherError<EitherError<io::Error, UpgradeError<SecioError>>, UpgradeError<io::Error>>
io::Error
>
) {
let id_keys = identity::Keypair::generate_ed25519();
@ -104,6 +103,7 @@ fn mk_transport() -> (
.authenticate(SecioConfig::new(id_keys))
.multiplex(libp2p_yamux::Config::default())
.map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer)))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();
(peer_id, transport)
}

View File

@ -43,12 +43,12 @@ use std::{error, task::Context, task::Poll};
///
/// `#[behaviour(ignore)]` can be added on a struct field to disable generation of delegation to
/// the fields which do not implement `NetworkBehaviour`.
pub trait NetworkBehaviour {
pub trait NetworkBehaviour: Send + 'static {
/// Handler for all the protocols the network behaviour supports.
type ProtocolsHandler: IntoProtocolsHandler;
/// Event generated by the `NetworkBehaviour` and that the swarm will report back.
type OutEvent;
type OutEvent: Send + 'static;
/// Creates a new `ProtocolsHandler` for a connection with a peer.
///

View File

@ -55,6 +55,7 @@
mod behaviour;
mod registry;
mod upgrade;
pub mod protocols_handler;
pub mod toggle;
@ -77,28 +78,35 @@ pub use protocols_handler::{
SubstreamProtocol
};
/// Substream for which a protocol has been chosen.
///
/// Implements the [`AsyncRead`](futures::io::AsyncRead) and
/// [`AsyncWrite`](futures::io::AsyncWrite) traits.
pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;
use protocols_handler::{NodeHandlerWrapperBuilder, NodeHandlerWrapperError};
use futures::{prelude::*, executor::{ThreadPool, ThreadPoolBuilder}};
use libp2p_core::{
Executor,
Transport, Multiaddr, Negotiated, PeerId, InboundUpgrade, OutboundUpgrade, UpgradeInfo, ProtocolName,
muxing::StreamMuxer,
Executor, Negotiated, Transport, Multiaddr, PeerId, ProtocolName,
muxing::{StreamMuxer, StreamMuxerBox},
nodes::{
ListenerId,
ListenerId, Substream,
collection::ConnectionInfo,
node::Substream,
network::{self, Network, NetworkEvent}
},
transport::TransportError
transport::{
boxed::Boxed as BoxTransport,
TransportError,
}
};
use registry::{Addresses, AddressIntoIter};
use smallvec::SmallVec;
use std::{error, fmt, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
use std::{error, fmt, io, ops::{Deref, DerefMut}, pin::Pin, task::{Context, Poll}};
use std::collections::HashSet;
use upgrade::UpgradeInfoSend as _;
/// Contains the state of the network, plus the way it should behave.
pub type Swarm<TTransport, TBehaviour, TConnInfo = PeerId> = ExpandedSwarm<
TTransport,
pub type Swarm<TBehaviour, TConnInfo = PeerId> = ExpandedSwarm<
TBehaviour,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
@ -135,12 +143,9 @@ pub enum SwarmEvent<TBvEv> {
}
/// Contains the state of the network, plus the way it should behave.
pub struct ExpandedSwarm<TTransport, TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo = PeerId>
where
TTransport: Transport,
{
pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo = PeerId> {
network: Network<
TTransport,
BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>,
TInEvent,
TOutEvent,
NodeHandlerWrapperBuilder<THandler>,
@ -170,10 +175,8 @@ where
send_event_to_complete: Option<(PeerId, TInEvent)>
}
impl<TTransport, TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> Deref for
ExpandedSwarm<TTransport, TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
where
TTransport: Transport,
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> Deref for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
{
type Target = TBehaviour;
@ -182,69 +185,50 @@ where
}
}
impl<TTransport, TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> DerefMut for
ExpandedSwarm<TTransport, TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
where
TTransport: Transport,
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> DerefMut for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
{
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.behaviour
}
}
impl<TTransport, TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> Unpin for
ExpandedSwarm<TTransport, TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
where
TTransport: Transport,
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> Unpin for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
{
}
impl<TTransport, TBehaviour, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
ExpandedSwarm<TTransport, TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
TMuxer::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTransport::Error: Send + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
THandlerErr: error::Error + Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Substream = Substream<TMuxer>, Error = THandlerErr> + Send + 'static,
<THandler::Handler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
<THandler::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Negotiated<Substream<TMuxer>>> + Send + 'static,
<<THandler::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static,
<<<THandler::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Negotiated<Substream<TMuxer>>>>::Error: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Negotiated<Substream<TMuxer>>>>::Future: Send + 'static,
<THandler::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade<Negotiated<Substream<TMuxer>>> + Send + 'static,
<<THandler::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::Info: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static,
<<<THandler::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Negotiated<Substream<TMuxer>>>>::Future: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Negotiated<Substream<TMuxer>>>>::Error: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr>,
{
/// Builds a new `Swarm`.
pub fn new(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self {
pub fn new<TTransport, TMuxer>(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self
where
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone + Send + Sync + 'static,
TTransport::Error: Send + Sync + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
{
SwarmBuilder::new(transport, behaviour, local_peer_id)
.build()
}
/// Returns the transport passed when building this object.
pub fn transport(me: &Self) -> &TTransport {
me.network.transport()
}
/// Starts listening on the given address.
///
/// Returns an error if the address is not supported.
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<ListenerId, TransportError<TTransport::Error>> {
pub fn listen_on(me: &mut Self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
me.network.listen_on(addr)
}
@ -258,7 +242,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
/// Tries to dial the given address.
///
/// Returns an error if the address is not supported.
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), TransportError<TTransport::Error>> {
pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), TransportError<io::Error>> {
let handler = me.behaviour.new_handler();
me.network.dial(addr, handler.into_node_handler_builder())
}
@ -496,35 +480,14 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}
}
impl<TTransport, TBehaviour, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> Stream for
ExpandedSwarm<TTransport, TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo> Stream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler, THandlerErr, TConnInfo>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
TMuxer: StreamMuxer + Send + Sync + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTransport::Error: Send + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
THandlerErr: error::Error + Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Substream = Substream<TMuxer>, Error = THandlerErr> + Send + 'static,
<THandler::Handler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
<THandler::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Negotiated<Substream<TMuxer>>> + Send + 'static,
<<THandler::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Negotiated<Substream<TMuxer>>>>::Future: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Negotiated<Substream<TMuxer>>>>::Error: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static,
<<<THandler::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static,
<THandler::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade<Negotiated<Substream<TMuxer>>> + Send + 'static,
<<THandler::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Negotiated<Substream<TMuxer>>>>::Future: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Negotiated<Substream<TMuxer>>>>::Error: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::Info: Send + 'static,
<<THandler::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static,
<<<THandler::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr>,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
type Item = TBehaviour::OutEvent;
@ -570,45 +533,35 @@ impl<'a> PollParameters for SwarmPollParameters<'a> {
}
}
pub struct SwarmBuilder<TTransport, TBehaviour> {
pub struct SwarmBuilder<TBehaviour, TConnInfo> {
incoming_limit: Option<u32>,
executor: Option<Box<dyn Executor + Send>>,
local_peer_id: PeerId,
transport: TTransport,
transport: BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>,
behaviour: TBehaviour,
}
impl<TTransport, TBehaviour, TMuxer, TConnInfo> SwarmBuilder<TTransport, TBehaviour>
impl<TBehaviour, TConnInfo> SwarmBuilder<TBehaviour, TConnInfo>
where TBehaviour: NetworkBehaviour,
TMuxer: StreamMuxer + Send + Sync + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone,
TTransport::Error: Send + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
TBehaviour::ProtocolsHandler: Send + 'static,
<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler: ProtocolsHandler<Substream = Substream<TMuxer>> + Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Negotiated<Substream<TMuxer>>> + Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::Info: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter: Send + 'static,
<<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Negotiated<Substream<TMuxer>>>>::Error: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InboundProtocol as InboundUpgrade<Negotiated<Substream<TMuxer>>>>::Future: Send + 'static,
<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade<Negotiated<Substream<TMuxer>>> + Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::Info: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter: Send + 'static,
<<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Negotiated<Substream<TMuxer>>>>::Future: Send + 'static,
<<<TBehaviour::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutboundProtocol as OutboundUpgrade<Negotiated<Substream<TMuxer>>>>::Error: Send + 'static,
TConnInfo: ConnectionInfo<PeerId = PeerId> + fmt::Debug + Clone + Send + 'static,
{
pub fn new(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self {
pub fn new<TTransport, TMuxer>(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self
where
TMuxer: StreamMuxer + Send + Sync + 'static,
TMuxer::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::OutboundSubstream: Send + 'static,
<TMuxer as StreamMuxer>::Substream: Send + 'static,
TTransport: Transport<Output = (TConnInfo, TMuxer)> + Clone + Send + Sync + 'static,
TTransport::Error: Send + Sync + 'static,
TTransport::Listener: Send + 'static,
TTransport::ListenerUpgrade: Send + 'static,
TTransport::Dial: Send + 'static,
{
let transport = transport
.map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer)))
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed();
SwarmBuilder {
incoming_limit: None,
local_peer_id,
@ -643,7 +596,7 @@ where TBehaviour: NetworkBehaviour,
self
}
pub fn build(mut self) -> Swarm<TTransport, TBehaviour, TConnInfo> {
pub fn build(mut self) -> Swarm<TBehaviour, TConnInfo> {
let supported_protocols = self.behaviour
.new_handler()
.inbound_protocol()
@ -686,58 +639,49 @@ where TBehaviour: NetworkBehaviour,
}
}
/// Dummy implementation of [`NetworkBehaviour`] that doesn't do anything.
#[derive(Clone, Default)]
pub struct DummyBehaviour {
}
impl NetworkBehaviour for DummyBehaviour {
type ProtocolsHandler = protocols_handler::DummyProtocolsHandler;
type OutEvent = void::Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
protocols_handler::DummyProtocolsHandler::default()
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _: PeerId, _: libp2p_core::ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId, _: libp2p_core::ConnectedPoint) {}
fn inject_node_event(&mut self, _: PeerId,
_: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters) ->
Poll<NetworkBehaviourAction<<Self::ProtocolsHandler as
ProtocolsHandler>::InEvent, Self::OutEvent>>
{
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use crate::protocols_handler::{DummyProtocolsHandler, ProtocolsHandler};
use crate::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, SwarmBuilder};
use crate::{DummyBehaviour, SwarmBuilder};
use libp2p_core::{
ConnectedPoint,
identity,
Multiaddr,
PeerId,
PublicKey,
transport::dummy::{DummyStream, DummyTransport}
};
use libp2p_mplex::Multiplex;
use futures::prelude::*;
use std::{marker::PhantomData, task::Context, task::Poll};
use void::Void;
#[derive(Clone)]
struct DummyBehaviour<TSubstream> {
marker: PhantomData<TSubstream>,
}
impl<TSubstream> NetworkBehaviour
for DummyBehaviour<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + Unpin
{
type ProtocolsHandler = DummyProtocolsHandler<TSubstream>;
type OutEvent = Void;
fn new_handler(&mut self) -> Self::ProtocolsHandler {
DummyProtocolsHandler::default()
}
fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}
fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {}
fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {}
fn inject_node_event(&mut self, _: PeerId,
_: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent) {}
fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters) ->
Poll<NetworkBehaviourAction<<Self::ProtocolsHandler as
ProtocolsHandler>::InEvent, Self::OutEvent>>
{
Poll::Pending
}
}
fn get_random_id() -> PublicKey {
identity::Keypair::generate_ed25519().public()
@ -747,8 +691,7 @@ mod tests {
fn test_build_swarm() {
let id = get_random_id();
let transport = DummyTransport::<(PeerId, Multiplex<DummyStream>)>::new();
let behaviour = DummyBehaviour{marker: PhantomData};
let swarm = SwarmBuilder::new(transport, behaviour, id.into())
let swarm = SwarmBuilder::new(transport, DummyBehaviour {}, id.into())
.incoming_limit(Some(4)).build();
assert_eq!(swarm.network.incoming_limit(), Some(4));
}
@ -757,8 +700,7 @@ mod tests {
fn test_build_swarm_with_max_listeners_none() {
let id = get_random_id();
let transport = DummyTransport::<(PeerId, Multiplex<DummyStream>)>::new();
let behaviour = DummyBehaviour{marker: PhantomData};
let swarm = SwarmBuilder::new(transport, behaviour, id.into()).build();
let swarm = SwarmBuilder::new(transport, DummyBehaviour {}, id.into()).build();
assert!(swarm.network.incoming_limit().is_none())
}
}

View File

@ -44,12 +44,16 @@ mod node_handler;
mod one_shot;
mod select;
use futures::prelude::*;
pub use crate::upgrade::{
InboundUpgradeSend,
OutboundUpgradeSend,
UpgradeInfoSend,
};
use libp2p_core::{
ConnectedPoint,
Negotiated,
PeerId,
upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeError},
upgrade::{self, UpgradeError},
};
use std::{cmp::Ordering, error, fmt, task::Context, task::Poll, time::Duration};
use wasm_timer::Instant;
@ -93,21 +97,19 @@ pub use select::{IntoProtocolsHandlerSelect, ProtocolsHandlerSelect};
/// Implementors of this trait should keep in mind that the connection can be closed at any time.
/// When a connection is closed gracefully, the substreams used by the handler may still
/// continue reading data until the remote closes its side of the connection.
pub trait ProtocolsHandler {
pub trait ProtocolsHandler: Send + 'static {
/// Custom event that can be received from the outside.
type InEvent;
type InEvent: Send + 'static;
/// Custom event that can be produced by the handler and that will be returned to the outside.
type OutEvent;
type OutEvent: Send + 'static;
/// The type of errors returned by [`ProtocolsHandler::poll`].
type Error: error::Error;
/// The type of substreams on which the protocol(s) are negotiated.
type Substream: AsyncRead + AsyncWrite + Unpin;
type Error: error::Error + Send + 'static;
/// The inbound upgrade for the protocol(s) used by the handler.
type InboundProtocol: InboundUpgrade<Negotiated<Self::Substream>>;
type InboundProtocol: InboundUpgradeSend + Send + 'static;
/// The outbound upgrade for the protocol(s) used by the handler.
type OutboundProtocol: OutboundUpgrade<Negotiated<Self::Substream>>;
type OutboundProtocol: OutboundUpgradeSend;
/// The type of additional information passed to an `OutboundSubstreamRequest`.
type OutboundOpenInfo;
type OutboundOpenInfo: Send + 'static;
/// The [`InboundUpgrade`] to apply on inbound substreams to negotiate the
/// desired protocols.
@ -121,7 +123,7 @@ pub trait ProtocolsHandler {
/// Injects the output of a successful upgrade on a new inbound substream.
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<Negotiated<Self::Substream>>>::Output
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
);
/// Injects the output of a successful upgrade on a new outbound substream.
@ -130,7 +132,7 @@ pub trait ProtocolsHandler {
/// [`ProtocolsHandlerEvent::OutboundSubstreamRequest`].
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Output,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
info: Self::OutboundOpenInfo
);
@ -142,7 +144,7 @@ pub trait ProtocolsHandler {
&mut self,
info: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Error
<Self::OutboundProtocol as OutboundUpgradeSend>::Error
>
);
@ -450,7 +452,7 @@ where
}
/// Prototype for a `ProtocolsHandler`.
pub trait IntoProtocolsHandler {
pub trait IntoProtocolsHandler: Send + 'static {
/// The protocols handler.
type Handler: ProtocolsHandler;

View File

@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::NegotiatedSubstream;
use crate::protocols_handler::{
KeepAlive,
SubstreamProtocol,
@ -25,33 +26,25 @@ use crate::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use futures::prelude::*;
use libp2p_core::{Negotiated, upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade}};
use std::{marker::PhantomData, task::Context, task::Poll};
use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade};
use std::task::{Context, Poll};
use void::Void;
/// Implementation of `ProtocolsHandler` that doesn't handle anything.
pub struct DummyProtocolsHandler<TSubstream> {
marker: PhantomData<TSubstream>,
pub struct DummyProtocolsHandler {
}
impl<TSubstream> Default for DummyProtocolsHandler<TSubstream> {
#[inline]
impl Default for DummyProtocolsHandler {
fn default() -> Self {
DummyProtocolsHandler {
marker: PhantomData,
}
}
}
impl<TSubstream> ProtocolsHandler for DummyProtocolsHandler<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
{
impl ProtocolsHandler for DummyProtocolsHandler {
type InEvent = Void;
type OutEvent = Void;
type Error = Void;
type Substream = TSubstream;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = Void;
@ -64,14 +57,14 @@ where
#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
_: <Self::InboundProtocol as InboundUpgrade<Negotiated<TSubstream>>>::Output
_: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output
) {
}
#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
_: <Self::OutboundProtocol as OutboundUpgrade<Negotiated<TSubstream>>>::Output,
_: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo
) {
}
@ -80,7 +73,7 @@ where
fn inject_event(&mut self, _: Self::InEvent) {}
#[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Error>) {}
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error>) {}
#[inline]
fn connection_keep_alive(&self) -> KeepAlive { KeepAlive::No }

View File

@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend};
use crate::protocols_handler::{
KeepAlive,
SubstreamProtocol,
@ -25,7 +26,7 @@ use crate::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use libp2p_core::{Negotiated, upgrade::{InboundUpgrade, OutboundUpgrade}};
use std::{marker::PhantomData, task::Context, task::Poll};
/// Wrapper around a protocol handler that turns the input event into something else.
@ -51,11 +52,12 @@ impl<TProtoHandler, TMap, TNewIn> ProtocolsHandler for MapInEvent<TProtoHandler,
where
TProtoHandler: ProtocolsHandler,
TMap: Fn(TNewIn) -> Option<TProtoHandler::InEvent>,
TNewIn: Send + 'static,
TMap: Send + 'static,
{
type InEvent = TNewIn;
type OutEvent = TProtoHandler::OutEvent;
type Error = TProtoHandler::Error;
type Substream = TProtoHandler::Substream;
type InboundProtocol = TProtoHandler::InboundProtocol;
type OutboundProtocol = TProtoHandler::OutboundProtocol;
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
@ -68,7 +70,7 @@ where
#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<Negotiated<Self::Substream>>>::Output
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
) {
self.inner.inject_fully_negotiated_inbound(protocol)
}
@ -76,7 +78,7 @@ where
#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Output,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
info: Self::OutboundOpenInfo
) {
self.inner.inject_fully_negotiated_outbound(protocol, info)
@ -90,7 +92,7 @@ where
}
#[inline]
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Error>) {
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
self.inner.inject_dial_upgrade_error(info, error)
}

View File

@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend};
use crate::protocols_handler::{
KeepAlive,
SubstreamProtocol,
@ -25,7 +26,7 @@ use crate::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use libp2p_core::{Negotiated, upgrade::{InboundUpgrade, OutboundUpgrade}};
use std::task::{Context, Poll};
/// Wrapper around a protocol handler that turns the output event into something else.
@ -49,11 +50,12 @@ impl<TProtoHandler, TMap, TNewOut> ProtocolsHandler for MapOutEvent<TProtoHandle
where
TProtoHandler: ProtocolsHandler,
TMap: FnMut(TProtoHandler::OutEvent) -> TNewOut,
TNewOut: Send + 'static,
TMap: Send + 'static,
{
type InEvent = TProtoHandler::InEvent;
type OutEvent = TNewOut;
type Error = TProtoHandler::Error;
type Substream = TProtoHandler::Substream;
type InboundProtocol = TProtoHandler::InboundProtocol;
type OutboundProtocol = TProtoHandler::OutboundProtocol;
type OutboundOpenInfo = TProtoHandler::OutboundOpenInfo;
@ -66,7 +68,7 @@ where
#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgrade<Negotiated<Self::Substream>>>::Output
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output
) {
self.inner.inject_fully_negotiated_inbound(protocol)
}
@ -74,7 +76,7 @@ where
#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Output,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
info: Self::OutboundOpenInfo
) {
self.inner.inject_fully_negotiated_outbound(protocol, info)
@ -86,7 +88,7 @@ where
}
#[inline]
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Error>) {
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
self.inner.inject_dial_upgrade_error(info, error)
}

View File

@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::upgrade::SendWrapper;
use crate::protocols_handler::{
KeepAlive,
ProtocolsHandler,
@ -25,10 +26,13 @@ use crate::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr
};
use futures::prelude::*;
use libp2p_core::{
ConnectedPoint,
PeerId,
muxing::StreamMuxerBox,
nodes::Substream,
nodes::collection::ConnectionInfo,
nodes::handled_node::{IntoNodeHandler, NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent},
upgrade::{self, InboundUpgradeApply, OutboundUpgradeApply}
@ -102,17 +106,17 @@ where
handler: TProtoHandler,
/// Futures that upgrade incoming substreams.
negotiating_in:
Vec<(InboundUpgradeApply<TProtoHandler::Substream, TProtoHandler::InboundProtocol>, Delay)>,
Vec<(InboundUpgradeApply<Substream<StreamMuxerBox>, SendWrapper<TProtoHandler::InboundProtocol>>, Delay)>,
/// Futures that upgrade outgoing substreams. The first element of the tuple is the userdata
/// to pass back once successfully opened.
negotiating_out: Vec<(
TProtoHandler::OutboundOpenInfo,
OutboundUpgradeApply<TProtoHandler::Substream, TProtoHandler::OutboundProtocol>,
OutboundUpgradeApply<Substream<StreamMuxerBox>, SendWrapper<TProtoHandler::OutboundProtocol>>,
Delay,
)>,
/// For each outbound substream request, how to upgrade it. The first element of the tuple
/// is the unique identifier (see `unique_dial_upgrade_id`).
queued_dial_upgrades: Vec<(u64, (upgrade::Version, TProtoHandler::OutboundProtocol))>,
queued_dial_upgrades: Vec<(u64, (upgrade::Version, SendWrapper<TProtoHandler::OutboundProtocol>))>,
/// Unique identifier assigned to each queued dial upgrade.
unique_dial_upgrade_id: u64,
/// The currently planned connection & handler shutdown.
@ -184,7 +188,7 @@ where
type InEvent = TProtoHandler::InEvent;
type OutEvent = TProtoHandler::OutEvent;
type Error = NodeHandlerWrapperError<TProtoHandler::Error>;
type Substream = TProtoHandler::Substream;
type Substream = Substream<StreamMuxerBox>;
// The first element of the tuple is the unique upgrade identifier
// (see `unique_dial_upgrade_id`).
type OutboundOpenInfo = (u64, TProtoHandler::OutboundOpenInfo, Duration);
@ -198,7 +202,7 @@ where
NodeHandlerEndpoint::Listener => {
let protocol = self.handler.listen_protocol();
let timeout = protocol.timeout().clone();
let upgrade = upgrade::apply_inbound(substream, protocol.into_upgrade().1);
let upgrade = upgrade::apply_inbound(substream, SendWrapper(protocol.into_upgrade().1));
let timeout = Delay::new(timeout);
self.negotiating_in.push((upgrade, timeout));
}
@ -305,7 +309,8 @@ where
let id = self.unique_dial_upgrade_id;
let timeout = protocol.timeout().clone();
self.unique_dial_upgrade_id += 1;
self.queued_dial_upgrades.push((id, protocol.into_upgrade()));
let (version, upgrade) = protocol.into_upgrade();
self.queued_dial_upgrades.push((id, (version, SendWrapper(upgrade))));
return Poll::Ready(Ok(
NodeHandlerEvent::OutboundSubstreamRequest((id, info, timeout)),
));

View File

@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend};
use crate::protocols_handler::{
KeepAlive,
ProtocolsHandler,
@ -25,25 +26,24 @@ use crate::protocols_handler::{
ProtocolsHandlerUpgrErr,
SubstreamProtocol
};
use futures::prelude::*;
use libp2p_core::{Negotiated, upgrade::{InboundUpgrade, OutboundUpgrade}};
use smallvec::SmallVec;
use std::{error, marker::PhantomData, task::Context, task::Poll, time::Duration};
use std::{error, task::Context, task::Poll, time::Duration};
use wasm_timer::Instant;
/// Implementation of `ProtocolsHandler` that opens a new substream for each individual message.
///
/// This struct is meant to be a helper for other implementations to use.
// TODO: Debug
pub struct OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
pub struct OneShotHandler<TInProto, TOutProto, TOutEvent>
where
TOutProto: OutboundUpgrade<Negotiated<TSubstream>>,
TOutProto: OutboundUpgradeSend,
{
/// The upgrade for inbound substreams.
listen_protocol: SubstreamProtocol<TInProto>,
/// If `Some`, something bad happened and we should shut down the handler with an error.
pending_error:
Option<ProtocolsHandlerUpgrErr<<TOutProto as OutboundUpgrade<Negotiated<TSubstream>>>::Error>>,
Option<ProtocolsHandlerUpgrErr<<TOutProto as OutboundUpgradeSend>::Error>>,
/// Queue of events to produce in `poll()`.
events_out: SmallVec<[TOutEvent; 4]>,
/// Queue of outbound substreams to open.
@ -56,14 +56,12 @@ where
keep_alive: KeepAlive,
/// After the given duration has elapsed, an inactive connection will shutdown.
inactive_timeout: Duration,
/// Pin the `TSubstream` generic.
marker: PhantomData<TSubstream>,
}
impl<TSubstream, TInProto, TOutProto, TOutEvent>
OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
impl<TInProto, TOutProto, TOutEvent>
OneShotHandler<TInProto, TOutProto, TOutEvent>
where
TOutProto: OutboundUpgrade<Negotiated<TSubstream>>,
TOutProto: OutboundUpgradeSend,
{
/// Creates a `OneShotHandler`.
#[inline]
@ -80,7 +78,6 @@ where
max_dial_negotiated: 8,
keep_alive: KeepAlive::Yes,
inactive_timeout,
marker: PhantomData,
}
}
@ -116,11 +113,11 @@ where
}
}
impl<TSubstream, TInProto, TOutProto, TOutEvent> Default
for OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
impl<TInProto, TOutProto, TOutEvent> Default
for OneShotHandler<TInProto, TOutProto, TOutEvent>
where
TOutProto: OutboundUpgrade<Negotiated<TSubstream>>,
TInProto: InboundUpgrade<Negotiated<TSubstream>> + Default,
TOutProto: OutboundUpgradeSend,
TInProto: InboundUpgradeSend + Default,
{
#[inline]
fn default() -> Self {
@ -128,23 +125,22 @@ where
}
}
impl<TSubstream, TInProto, TOutProto, TOutEvent> ProtocolsHandler
for OneShotHandler<TSubstream, TInProto, TOutProto, TOutEvent>
impl<TInProto, TOutProto, TOutEvent> ProtocolsHandler
for OneShotHandler<TInProto, TOutProto, TOutEvent>
where
TSubstream: AsyncRead + AsyncWrite + Unpin,
TInProto: InboundUpgrade<Negotiated<TSubstream>>,
TOutProto: OutboundUpgrade<Negotiated<TSubstream>>,
TInProto: InboundUpgradeSend + Send + 'static,
TOutProto: OutboundUpgradeSend,
TInProto::Output: Into<TOutEvent>,
TOutProto::Output: Into<TOutEvent>,
TOutProto::Error: error::Error + 'static,
TOutProto::Error: error::Error + Send + 'static,
SubstreamProtocol<TInProto>: Clone,
TOutEvent: Send + 'static,
{
type InEvent = TOutProto;
type OutEvent = TOutEvent;
type Error = ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Error,
<Self::OutboundProtocol as OutboundUpgradeSend>::Error,
>;
type Substream = TSubstream;
type InboundProtocol = TInProto;
type OutboundProtocol = TOutProto;
type OutboundOpenInfo = ();
@ -157,7 +153,7 @@ where
#[inline]
fn inject_fully_negotiated_inbound(
&mut self,
out: <Self::InboundProtocol as InboundUpgrade<Negotiated<Self::Substream>>>::Output,
out: <Self::InboundProtocol as InboundUpgradeSend>::Output,
) {
// If we're shutting down the connection for inactivity, reset the timeout.
if !self.keep_alive.is_yes() {
@ -170,7 +166,7 @@ where
#[inline]
fn inject_fully_negotiated_outbound(
&mut self,
out: <Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Output,
out: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
_: Self::OutboundOpenInfo,
) {
self.dial_negotiated -= 1;
@ -192,7 +188,7 @@ where
&mut self,
_: Self::OutboundOpenInfo,
error: ProtocolsHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Error,
<Self::OutboundProtocol as OutboundUpgradeSend>::Error,
>,
) {
if self.pending_error.is_none() {

View File

@ -18,6 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::upgrade::{SendWrapper, InboundUpgradeSend, OutboundUpgradeSend};
use crate::protocols_handler::{
KeepAlive,
SubstreamProtocol,
@ -26,13 +27,12 @@ use crate::protocols_handler::{
ProtocolsHandlerEvent,
ProtocolsHandlerUpgrErr,
};
use futures::prelude::*;
use libp2p_core::{
ConnectedPoint,
Negotiated,
PeerId,
either::{EitherError, EitherOutput},
upgrade::{InboundUpgrade, OutboundUpgrade, EitherUpgrade, SelectUpgrade, UpgradeError}
upgrade::{EitherUpgrade, SelectUpgrade, UpgradeError}
};
use std::{cmp, task::Context, task::Poll};
@ -56,17 +56,10 @@ impl<TProto1, TProto2> IntoProtocolsHandlerSelect<TProto1, TProto2> {
}
}
impl<TProto1, TProto2, TSubstream> IntoProtocolsHandler for IntoProtocolsHandlerSelect<TProto1, TProto2>
impl<TProto1, TProto2> IntoProtocolsHandler for IntoProtocolsHandlerSelect<TProto1, TProto2>
where
TProto1: IntoProtocolsHandler,
TProto2: IntoProtocolsHandler,
TProto1::Handler: ProtocolsHandler<Substream = TSubstream>,
TProto2::Handler: ProtocolsHandler<Substream = TSubstream>,
TSubstream: AsyncRead + AsyncWrite + Unpin,
<TProto1::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Negotiated<TSubstream>>,
<TProto2::Handler as ProtocolsHandler>::InboundProtocol: InboundUpgrade<Negotiated<TSubstream>>,
<TProto1::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade<Negotiated<TSubstream>>,
<TProto2::Handler as ProtocolsHandler>::OutboundProtocol: OutboundUpgrade<Negotiated<TSubstream>>
{
type Handler = ProtocolsHandlerSelect<TProto1::Handler, TProto2::Handler>;
@ -78,7 +71,7 @@ where
}
fn inbound_protocol(&self) -> <Self::Handler as ProtocolsHandler>::InboundProtocol {
SelectUpgrade::new(self.proto1.inbound_protocol(), self.proto2.inbound_protocol())
SelectUpgrade::new(SendWrapper(self.proto1.inbound_protocol()), SendWrapper(self.proto2.inbound_protocol()))
}
}
@ -102,23 +95,16 @@ impl<TProto1, TProto2> ProtocolsHandlerSelect<TProto1, TProto2> {
}
}
impl<TSubstream, TProto1, TProto2>
ProtocolsHandler for ProtocolsHandlerSelect<TProto1, TProto2>
impl<TProto1, TProto2> ProtocolsHandler for ProtocolsHandlerSelect<TProto1, TProto2>
where
TProto1: ProtocolsHandler<Substream = TSubstream>,
TProto2: ProtocolsHandler<Substream = TSubstream>,
TSubstream: AsyncRead + AsyncWrite + Unpin,
TProto1::InboundProtocol: InboundUpgrade<Negotiated<TSubstream>>,
TProto2::InboundProtocol: InboundUpgrade<Negotiated<TSubstream>>,
TProto1::OutboundProtocol: OutboundUpgrade<Negotiated<TSubstream>>,
TProto2::OutboundProtocol: OutboundUpgrade<Negotiated<TSubstream>>
TProto1: ProtocolsHandler,
TProto2: ProtocolsHandler,
{
type InEvent = EitherOutput<TProto1::InEvent, TProto2::InEvent>;
type OutEvent = EitherOutput<TProto1::OutEvent, TProto2::OutEvent>;
type Error = EitherError<TProto1::Error, TProto2::Error>;
type Substream = TSubstream;
type InboundProtocol = SelectUpgrade<<TProto1 as ProtocolsHandler>::InboundProtocol, <TProto2 as ProtocolsHandler>::InboundProtocol>;
type OutboundProtocol = EitherUpgrade<TProto1::OutboundProtocol, TProto2::OutboundProtocol>;
type InboundProtocol = SelectUpgrade<SendWrapper<<TProto1 as ProtocolsHandler>::InboundProtocol>, SendWrapper<<TProto2 as ProtocolsHandler>::InboundProtocol>>;
type OutboundProtocol = EitherUpgrade<SendWrapper<TProto1::OutboundProtocol>, SendWrapper<TProto2::OutboundProtocol>>;
type OutboundOpenInfo = EitherOutput<TProto1::OutboundOpenInfo, TProto2::OutboundOpenInfo>;
#[inline]
@ -126,11 +112,11 @@ where
let proto1 = self.proto1.listen_protocol();
let proto2 = self.proto2.listen_protocol();
let timeout = std::cmp::max(proto1.timeout(), proto2.timeout()).clone();
let choice = SelectUpgrade::new(proto1.into_upgrade().1, proto2.into_upgrade().1);
let choice = SelectUpgrade::new(SendWrapper(proto1.into_upgrade().1), SendWrapper(proto2.into_upgrade().1));
SubstreamProtocol::new(choice).with_timeout(timeout)
}
fn inject_fully_negotiated_outbound(&mut self, protocol: <Self::OutboundProtocol as OutboundUpgrade<Negotiated<TSubstream>>>::Output, endpoint: Self::OutboundOpenInfo) {
fn inject_fully_negotiated_outbound(&mut self, protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output, endpoint: Self::OutboundOpenInfo) {
match (protocol, endpoint) {
(EitherOutput::First(protocol), EitherOutput::First(info)) =>
self.proto1.inject_fully_negotiated_outbound(protocol, info),
@ -143,7 +129,7 @@ where
}
}
fn inject_fully_negotiated_inbound(&mut self, protocol: <Self::InboundProtocol as InboundUpgrade<Negotiated<TSubstream>>>::Output) {
fn inject_fully_negotiated_inbound(&mut self, protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output) {
match protocol {
EitherOutput::First(protocol) =>
self.proto1.inject_fully_negotiated_inbound(protocol),
@ -161,7 +147,7 @@ where
}
#[inline]
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Error>) {
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, error: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
match (info, error) {
(EitherOutput::First(info), ProtocolsHandlerUpgrErr::Timer) => {
self.proto1.inject_dial_upgrade_error(info, ProtocolsHandlerUpgrErr::Timer)
@ -215,7 +201,7 @@ where
info,
}) => {
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(EitherUpgrade::A),
protocol: protocol.map_upgrade(|u| EitherUpgrade::A(SendWrapper(u))),
info: EitherOutput::First(info),
});
},
@ -234,7 +220,7 @@ where
info,
}) => {
return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest {
protocol: protocol.map_upgrade(EitherUpgrade::B),
protocol: protocol.map_upgrade(|u| EitherUpgrade::B(SendWrapper(u))),
info: EitherOutput::Second(info),
});
},

View File

@ -19,6 +19,7 @@
// DEALINGS IN THE SOFTWARE.
use crate::{NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use crate::upgrade::{SendWrapper, InboundUpgradeSend, OutboundUpgradeSend};
use crate::protocols_handler::{
KeepAlive,
SubstreamProtocol,
@ -27,13 +28,13 @@ use crate::protocols_handler::{
ProtocolsHandlerUpgrErr,
IntoProtocolsHandler
};
use libp2p_core::{
ConnectedPoint,
PeerId,
Multiaddr,
Negotiated,
either::EitherOutput,
upgrade::{InboundUpgrade, OutboundUpgrade, DeniedUpgrade, EitherUpgrade}
upgrade::{DeniedUpgrade, EitherUpgrade}
};
use std::{error, task::Context, task::Poll};
@ -173,9 +174,9 @@ where
fn inbound_protocol(&self) -> <Self::Handler as ProtocolsHandler>::InboundProtocol {
if let Some(inner) = self.inner.as_ref() {
EitherUpgrade::A(inner.inbound_protocol())
EitherUpgrade::A(SendWrapper(inner.inbound_protocol()))
} else {
EitherUpgrade::B(DeniedUpgrade)
EitherUpgrade::B(SendWrapper(DeniedUpgrade))
}
}
}
@ -192,22 +193,21 @@ where
type InEvent = TInner::InEvent;
type OutEvent = TInner::OutEvent;
type Error = TInner::Error;
type Substream = TInner::Substream;
type InboundProtocol = EitherUpgrade<TInner::InboundProtocol, DeniedUpgrade>;
type InboundProtocol = EitherUpgrade<SendWrapper<TInner::InboundProtocol>, SendWrapper<DeniedUpgrade>>;
type OutboundProtocol = TInner::OutboundProtocol;
type OutboundOpenInfo = TInner::OutboundOpenInfo;
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol> {
if let Some(inner) = self.inner.as_ref() {
inner.listen_protocol().map_upgrade(EitherUpgrade::A)
inner.listen_protocol().map_upgrade(|u| EitherUpgrade::A(SendWrapper(u)))
} else {
SubstreamProtocol::new(EitherUpgrade::B(DeniedUpgrade))
SubstreamProtocol::new(EitherUpgrade::B(SendWrapper(DeniedUpgrade)))
}
}
fn inject_fully_negotiated_inbound(
&mut self,
out: <Self::InboundProtocol as InboundUpgrade<Negotiated<Self::Substream>>>::Output
out: <Self::InboundProtocol as InboundUpgradeSend>::Output
) {
let out = match out {
EitherOutput::First(out) => out,
@ -220,7 +220,7 @@ where
fn inject_fully_negotiated_outbound(
&mut self,
out: <Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Output,
out: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
info: Self::OutboundOpenInfo
) {
self.inner.as_mut().expect("Can't receive an outbound substream if disabled; QED")
@ -232,7 +232,7 @@ where
.inject_event(event)
}
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgrade<Negotiated<Self::Substream>>>::Error>) {
fn inject_dial_upgrade_error(&mut self, info: Self::OutboundOpenInfo, err: ProtocolsHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>) {
self.inner.as_mut().expect("Can't receive an outbound substream if disabled; QED")
.inject_dial_upgrade_error(info, err)
}

158
swarm/src/upgrade.rs Normal file
View File

@ -0,0 +1,158 @@
// Copyright 2020 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::NegotiatedSubstream;
use futures::prelude::*;
use libp2p_core::upgrade;
/// Implemented automatically on all types that implement [`UpgradeInfo`](upgrade::UpgradeInfo)
/// and `Send + 'static`.
///
/// Do not implement this trait yourself. Instead, please implement
/// [`UpgradeInfo`](upgrade::UpgradeInfo).
pub trait UpgradeInfoSend: Send + 'static {
/// Equivalent to [`UpgradeInfo::Info`](upgrade::UpgradeInfo::Info).
type Info: upgrade::ProtocolName + Clone + Send + 'static;
/// Equivalent to [`UpgradeInfo::InfoIter`](upgrade::UpgradeInfo::InfoIter).
type InfoIter: Iterator<Item = Self::Info> + Send + 'static;
/// Equivalent to [`UpgradeInfo::protocol_info`](upgrade::UpgradeInfo::protocol_info).
fn protocol_info(&self) -> Self::InfoIter;
}
impl<T> UpgradeInfoSend for T
where
T: upgrade::UpgradeInfo + Send + 'static,
T::Info: Send + 'static,
<T::InfoIter as IntoIterator>::IntoIter: Send + 'static,
{
type Info = T::Info;
type InfoIter = <T::InfoIter as IntoIterator>::IntoIter;
fn protocol_info(&self) -> Self::InfoIter {
upgrade::UpgradeInfo::protocol_info(self).into_iter()
}
}
/// Implemented automatically on all types that implement
/// [`OutboundUpgrade`](upgrade::OutboundUpgrade) and `Send + 'static`.
///
/// Do not implement this trait yourself. Instead, please implement
/// [`OutboundUpgrade`](upgrade::OutboundUpgrade).
pub trait OutboundUpgradeSend: UpgradeInfoSend {
/// Equivalent to [`OutboundUpgrade::Output`](upgrade::OutboundUpgrade::Output).
type Output: Send + 'static;
/// Equivalent to [`OutboundUpgrade::Error`](upgrade::OutboundUpgrade::Error).
type Error: Send + 'static;
/// Equivalent to [`OutboundUpgrade::Future`](upgrade::OutboundUpgrade::Future).
type Future: Future<Output = Result<Self::Output, Self::Error>> + Send + 'static;
/// Equivalent to [`OutboundUpgrade::upgrade_outbound`](upgrade::OutboundUpgrade::upgrade_outbound).
fn upgrade_outbound(self, socket: NegotiatedSubstream, info: Self::Info) -> Self::Future;
}
impl<T, TInfo> OutboundUpgradeSend for T
where
T: upgrade::OutboundUpgrade<NegotiatedSubstream, Info = TInfo> + UpgradeInfoSend<Info = TInfo>,
TInfo: upgrade::ProtocolName + Clone + Send + 'static,
T::Output: Send + 'static,
T::Error: Send + 'static,
T::Future: Send + 'static,
{
type Output = T::Output;
type Error = T::Error;
type Future = T::Future;
fn upgrade_outbound(self, socket: NegotiatedSubstream, info: TInfo) -> Self::Future {
upgrade::OutboundUpgrade::upgrade_outbound(self, socket, info)
}
}
/// Implemented automatically on all types that implement
/// [`InboundUpgrade`](upgrade::InboundUpgrade) and `Send + 'static`.
///
/// Do not implement this trait yourself. Instead, please implement
/// [`InboundUpgrade`](upgrade::InboundUpgrade).
pub trait InboundUpgradeSend: UpgradeInfoSend {
/// Equivalent to [`InboundUpgrade::Output`](upgrade::InboundUpgrade::Output).
type Output: Send + 'static;
/// Equivalent to [`InboundUpgrade::Error`](upgrade::InboundUpgrade::Error).
type Error: Send + 'static;
/// Equivalent to [`InboundUpgrade::Future`](upgrade::InboundUpgrade::Future).
type Future: Future<Output = Result<Self::Output, Self::Error>> + Send + 'static;
/// Equivalent to [`InboundUpgrade::upgrade_inbound`](upgrade::InboundUpgrade::upgrade_inbound).
fn upgrade_inbound(self, socket: NegotiatedSubstream, info: Self::Info) -> Self::Future;
}
impl<T, TInfo> InboundUpgradeSend for T
where
T: upgrade::InboundUpgrade<NegotiatedSubstream, Info = TInfo> + UpgradeInfoSend<Info = TInfo>,
TInfo: upgrade::ProtocolName + Clone + Send + 'static,
T::Output: Send + 'static,
T::Error: Send + 'static,
T::Future: Send + 'static,
{
type Output = T::Output;
type Error = T::Error;
type Future = T::Future;
fn upgrade_inbound(self, socket: NegotiatedSubstream, info: TInfo) -> Self::Future {
upgrade::InboundUpgrade::upgrade_inbound(self, socket, info)
}
}
/// Wraps around a type that implements [`OutboundUpgradeSend`], [`InboundUpgradeSend`], or
/// both, and implements [`OutboundUpgrade`](upgrade::OutboundUpgrade) and/or
/// [`InboundUpgrade`](upgrade::InboundUpgrade).
///
/// > **Note**: This struct is mostly an implementation detail of the library and normally
/// > doesn't need to be used directly.
pub struct SendWrapper<T>(pub T);
impl<T: UpgradeInfoSend> upgrade::UpgradeInfo for SendWrapper<T> {
type Info = T::Info;
type InfoIter = T::InfoIter;
fn protocol_info(&self) -> Self::InfoIter {
UpgradeInfoSend::protocol_info(&self.0)
}
}
impl<T: OutboundUpgradeSend> upgrade::OutboundUpgrade<NegotiatedSubstream> for SendWrapper<T> {
type Output = T::Output;
type Error = T::Error;
type Future = T::Future;
fn upgrade_outbound(self, socket: NegotiatedSubstream, info: T::Info) -> Self::Future {
OutboundUpgradeSend::upgrade_outbound(self.0, socket, info)
}
}
impl<T: InboundUpgradeSend> upgrade::InboundUpgrade<NegotiatedSubstream> for SendWrapper<T> {
type Output = T::Output;
type Error = T::Error;
type Future = T::Future;
fn upgrade_inbound(self, socket: NegotiatedSubstream, info: T::Info) -> Self::Future {
InboundUpgradeSend::upgrade_inbound(self.0, socket, info)
}
}