refactor(swarm): remove deprecated inject calls (#3264)

Finishes work first started with https://github.com/libp2p/rust-libp2p/issues/2832
This commit is contained in:
João Oliveira
2023-01-12 11:21:02 +00:00
committed by GitHub
parent 3cc824796d
commit 4c65c7d7c7
23 changed files with 1177 additions and 1376 deletions

View File

@ -21,11 +21,11 @@
//! [`ConnectionHandler`] handling direct connection upgraded through a relayed connection. //! [`ConnectionHandler`] handling direct connection upgraded through a relayed connection.
use libp2p_core::connection::ConnectionId; use libp2p_core::connection::ConnectionId;
use libp2p_core::upgrade::{DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; use libp2p_core::upgrade::DeniedUpgrade;
use libp2p_swarm::handler::ConnectionEvent; use libp2p_swarm::handler::ConnectionEvent;
use libp2p_swarm::{ use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, SubstreamProtocol, SubstreamProtocol,
}; };
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use void::Void; use void::Void;
@ -62,31 +62,8 @@ impl ConnectionHandler for Handler {
SubstreamProtocol::new(DeniedUpgrade, ()) SubstreamProtocol::new(DeniedUpgrade, ())
} }
fn inject_fully_negotiated_inbound(
&mut self,
_: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::InboundOpenInfo,
) {
}
fn inject_fully_negotiated_outbound(
&mut self,
_: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
_: Self::OutboundOpenInfo,
) {
}
fn on_behaviour_event(&mut self, _: Self::InEvent) {} fn on_behaviour_event(&mut self, _: Self::InEvent) {}
fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
_: ConnectionHandlerUpgrErr<
<Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Error,
>,
) {
}
fn connection_keep_alive(&self) -> KeepAlive { fn connection_keep_alive(&self) -> KeepAlive {
KeepAlive::No KeepAlive::No
} }

View File

@ -145,7 +145,7 @@ impl Behaviour {
} }
} }
hash_map::Entry::Vacant(_) => { hash_map::Entry::Vacant(_) => {
unreachable!("`inject_connection_closed` for unconnected peer.") unreachable!("`on_connection_closed` for unconnected peer.")
} }
}; };
} }

View File

@ -97,7 +97,7 @@ impl SubstreamHandler for Stream {
Stream::PendingRead(Framed::new(substream, RendezvousCodec::default())) Stream::PendingRead(Framed::new(substream, RendezvousCodec::default()))
} }
fn inject_event(self, event: Self::InEvent) -> Self { fn on_event(self, event: Self::InEvent) -> Self {
match (event, self) { match (event, self) {
(InEvent::RegisterResponse { ttl }, Stream::PendingBehaviour(substream)) => { (InEvent::RegisterResponse { ttl }, Stream::PendingBehaviour(substream)) => {
Stream::PendingSend(substream, Message::RegisterResponse(Ok(ttl))) Stream::PendingSend(substream, Message::RegisterResponse(Ok(ttl)))

View File

@ -94,7 +94,7 @@ impl SubstreamHandler for Stream {
})) }))
} }
fn inject_event(self, event: Self::InEvent) -> Self { fn on_event(self, event: Self::InEvent) -> Self {
void::unreachable(event) void::unreachable(event)
} }

View File

@ -29,10 +29,9 @@ use futures::future::{self, BoxFuture, Fuse, FusedFuture};
use futures::FutureExt; use futures::FutureExt;
use instant::Instant; use instant::Instant;
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
use libp2p_swarm::handler::{InboundUpgradeSend, OutboundUpgradeSend}; use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound};
use libp2p_swarm::{ use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, SubstreamProtocol,
NegotiatedSubstream, SubstreamProtocol,
}; };
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::fmt; use std::fmt;
@ -52,7 +51,7 @@ pub trait SubstreamHandler: Sized {
fn upgrade(open_info: Self::OpenInfo) fn upgrade(open_info: Self::OpenInfo)
-> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo>; -> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo>;
fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self; fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self;
fn inject_event(self, event: Self::InEvent) -> Self; fn on_event(self, event: Self::InEvent) -> Self;
fn advance(self, cx: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error>; fn advance(self, cx: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error>;
} }
@ -367,35 +366,47 @@ where
TInboundSubstreamHandler::upgrade(()) TInboundSubstreamHandler::upgrade(())
} }
fn inject_fully_negotiated_inbound( fn on_connection_event(
&mut self, &mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output, event: ConnectionEvent<
_: Self::InboundOpenInfo, Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) { ) {
self.inbound_substreams.insert( match event {
self.next_inbound_substream_id.fetch_and_increment(), ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
TInboundSubstreamHandler::new(protocol, ()), protocol, ..
); }) => {
self.inbound_substreams.insert(
self.next_inbound_substream_id.fetch_and_increment(),
TInboundSubstreamHandler::new(protocol, ()),
);
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol,
info,
}) => {
self.outbound_substreams.insert(
self.next_outbound_substream_id.fetch_and_increment(),
TOutboundSubstreamHandler::new(protocol, info),
);
}
// TODO: Handle upgrade errors properly
ConnectionEvent::AddressChange(_)
| ConnectionEvent::ListenUpgradeError(_)
| ConnectionEvent::DialUpgradeError(_) => {}
}
} }
fn inject_fully_negotiated_outbound( fn on_behaviour_event(&mut self, event: Self::InEvent) {
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
info: Self::OutboundOpenInfo,
) {
self.outbound_substreams.insert(
self.next_outbound_substream_id.fetch_and_increment(),
TOutboundSubstreamHandler::new(protocol, info),
);
}
fn inject_event(&mut self, event: Self::InEvent) {
match event { match event {
InEvent::NewSubstream { open_info } => self.new_substreams.push_back(open_info), InEvent::NewSubstream { open_info } => self.new_substreams.push_back(open_info),
InEvent::NotifyInboundSubstream { id, message } => { InEvent::NotifyInboundSubstream { id, message } => {
match self.inbound_substreams.remove(&id) { match self.inbound_substreams.remove(&id) {
Some(handler) => { Some(handler) => {
let new_handler = handler.inject_event(message); let new_handler = handler.on_event(message);
self.inbound_substreams.insert(id, new_handler); self.inbound_substreams.insert(id, new_handler);
} }
@ -407,7 +418,7 @@ where
InEvent::NotifyOutboundSubstream { id, message } => { InEvent::NotifyOutboundSubstream { id, message } => {
match self.outbound_substreams.remove(&id) { match self.outbound_substreams.remove(&id) {
Some(handler) => { Some(handler) => {
let new_handler = handler.inject_event(message); let new_handler = handler.on_event(message);
self.outbound_substreams.insert(id, new_handler); self.outbound_substreams.insert(id, new_handler);
} }
@ -419,14 +430,6 @@ where
} }
} }
fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
_: ConnectionHandlerUpgrErr<Void>,
) {
// TODO: Handle upgrade errors properly
}
fn connection_keep_alive(&self) -> KeepAlive { fn connection_keep_alive(&self) -> KeepAlive {
// Rudimentary keep-alive handling, to be extended as needed as this abstraction is used more by other protocols. // Rudimentary keep-alive handling, to be extended as needed as this abstraction is used more by other protocols.
@ -537,7 +540,7 @@ impl SubstreamHandler for void::Void {
unreachable!("we should never yield a substream") unreachable!("we should never yield a substream")
} }
fn inject_event(self, event: Self::InEvent) -> Self { fn on_event(self, event: Self::InEvent) -> Self {
void::unreachable(event) void::unreachable(event)
} }

View File

@ -366,7 +366,7 @@ where
Err(oneshot::Canceled) => { Err(oneshot::Canceled) => {
// The inbound upgrade has errored or timed out reading // The inbound upgrade has errored or timed out reading
// or waiting for the request. The handler is informed // or waiting for the request. The handler is informed
// via `inject_listen_upgrade_error`. // via `on_connection_event` call with `ConnectionEvent::ListenUpgradeError`.
} }
} }
} }

View File

@ -1,15 +1,19 @@
# 0.31.0 # 0.32.0 [unreleased]
- Replace `NetworkBehaviour` Derive macro deprecated `inject_*` method implementations - Replace `NetworkBehaviour` Derive macro deprecated `inject_*` method implementations
with the new `on_swarm_event` and `on_connection_handler_event`. with the new `on_swarm_event` and `on_connection_handler_event`.
See [PR 3011]. See [PR 3011] and [PR 3264].
[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011
[PR 3264]: https://github.com/libp2p/rust-libp2p/pull/3264
# 0.31.0
- Add `prelude` configuration option. - Add `prelude` configuration option.
The derive-macro generates code that needs to refer to various symbols. See [PR 3055]. The derive-macro generates code that needs to refer to various symbols. See [PR 3055].
- Update `rust-version` to reflect the actual MSRV: 1.60.0. See [PR 3090]. - Update `rust-version` to reflect the actual MSRV: 1.60.0. See [PR 3090].
[PR 3011]: https://github.com/libp2p/rust-libp2p/pull/3011
[PR 3055]: https://github.com/libp2p/rust-libp2p/pull/3055 [PR 3055]: https://github.com/libp2p/rust-libp2p/pull/3055
[PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090 [PR 3090]: https://github.com/libp2p/rust-libp2p/pull/3090

View File

@ -3,7 +3,7 @@ name = "libp2p-swarm-derive"
edition = "2021" edition = "2021"
rust-version = "1.60.0" rust-version = "1.60.0"
description = "Procedural macros of libp2p-swarm" description = "Procedural macros of libp2p-swarm"
version = "0.31.0" version = "0.32.0"
authors = ["Parity Technologies <admin@parity.io>"] authors = ["Parity Technologies <admin@parity.io>"]
license = "MIT" license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p" repository = "https://github.com/libp2p/rust-libp2p"

View File

@ -188,11 +188,23 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.enumerate() .enumerate()
.map(|(field_n, field)| match field.ident { .map(|(field_n, field)| match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::ConnectionEstablished(#connection_established {
self.#i.inject_connection_established(&peer_id, &connection_id, endpoint, Some(&failed_addresses.into()), other_established);}, peer_id,
connection_id,
endpoint,
failed_addresses,
other_established,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#field_n.on_swarm_event(#from_swarm::ConnectionEstablished(#connection_established {
self.#field_n.inject_connection_established(&peer_id, &connection_id, endpoint, Some(&failed_addresses.into()), other_established);}, peer_id,
connection_id,
endpoint,
failed_addresses,
other_established,
}));
},
}) })
}; };
@ -205,18 +217,29 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.enumerate() .enumerate()
.map(|(field_n, field)| match field.ident { .map(|(field_n, field)| match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::AddressChange(#address_change {
self.#i.inject_address_change(&peer_id, &connection_id, old, new);}, peer_id,
connection_id,
old,
new,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#field_n.on_swarm_event(#from_swarm::AddressChange(#address_change {
self.#field_n.inject_address_change(&peer_id, &connection_id, old, new);}, peer_id,
connection_id,
old,
new,
}));
},
}) })
}; };
// Build the list of statements to put in the body of `on_swarm_event()` // Build the list of statements to put in the body of `on_swarm_event()`
// for the `FromSwarm::ConnectionClosed` variant. // for the `FromSwarm::ConnectionClosed` variant.
let on_connection_closed_stmts = { let on_connection_closed_stmts = {
data_struct.fields data_struct
.fields
.iter() .iter()
.enumerate() .enumerate()
// The outmost handler belongs to the last behaviour. // The outmost handler belongs to the last behaviour.
@ -233,11 +256,23 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
}; };
let inject = match field.ident { let inject = match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::ConnectionClosed(#connection_closed {
self.#i.inject_connection_closed(&peer_id, &connection_id, endpoint, handler, remaining_established);}, peer_id,
connection_id,
endpoint,
handler,
remaining_established,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#enum_n.on_swarm_event(#from_swarm::ConnectionClosed(#connection_closed {
self.#enum_n.inject_connection_closed(&peer_id, &connection_id, endpoint, handler, remaining_established);}, peer_id,
connection_id,
endpoint,
handler,
remaining_established,
}));
},
}; };
quote! { quote! {
@ -269,13 +304,20 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
let inject = match field.ident { let inject = match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::DialFailure(#dial_failure {
self.#i.inject_dial_failure(peer_id, handler, error);}, peer_id,
handler,
error,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#enum_n.on_swarm_event(#from_swarm::DialFailure(#dial_failure {
self.#enum_n.inject_dial_failure(peer_id, handler, error);}, peer_id,
handler,
error,
}));
},
}; };
quote! { quote! {
#handler; #handler;
#inject; #inject;
@ -299,11 +341,19 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
let inject = match field.ident { let inject = match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::ListenFailure(#listen_failure {
self.#i.inject_listen_failure(local_addr, send_back_addr, handler);}, local_addr,
send_back_addr,
handler,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#enum_n.on_swarm_event(#from_swarm::ListenFailure(#listen_failure {
self.#enum_n.inject_listen_failure(local_addr, send_back_addr, handler);}, local_addr,
send_back_addr,
handler,
}));
},
}; };
quote! { quote! {
@ -323,11 +373,15 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.enumerate() .enumerate()
.map(|(field_n, field)| match field.ident { .map(|(field_n, field)| match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::NewListener(#new_listener {
self.#i.inject_new_listener(listener_id);}, listener_id,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#field_n.on_swarm_event(#from_swarm::NewListener(#new_listener {
self.#field_n.inject_new_listener(listener_id);}, listener_id,
}));
},
}) })
}; };
@ -340,11 +394,17 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.enumerate() .enumerate()
.map(|(field_n, field)| match field.ident { .map(|(field_n, field)| match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::NewListenAddr(#new_listen_addr {
self.#i.inject_new_listen_addr(listener_id, addr);}, listener_id,
addr,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#field_n.on_swarm_event(#from_swarm::NewListenAddr(#new_listen_addr {
self.#field_n.inject_new_listen_addr(listener_id, addr);}, listener_id,
addr,
}));
},
}) })
}; };
@ -357,11 +417,17 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.enumerate() .enumerate()
.map(|(field_n, field)| match field.ident { .map(|(field_n, field)| match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::ExpiredListenAddr(#expired_listen_addr {
self.#i.inject_expired_listen_addr(listener_id, addr);}, listener_id,
addr,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#field_n.on_swarm_event(#from_swarm::ExpiredListenAddr(#expired_listen_addr {
self.#field_n.inject_expired_listen_addr(listener_id, addr);}, listener_id,
addr,
}));
},
}) })
}; };
@ -374,11 +440,15 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.enumerate() .enumerate()
.map(|(field_n, field)| match field.ident { .map(|(field_n, field)| match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::NewExternalAddr(#new_external_addr {
self.#i.inject_new_external_addr(addr);}, addr,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#field_n.on_swarm_event(#from_swarm::NewExternalAddr(#new_external_addr {
self.#field_n.inject_new_external_addr(addr);}, addr,
}));
},
}) })
}; };
@ -391,11 +461,15 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.enumerate() .enumerate()
.map(|(field_n, field)| match field.ident { .map(|(field_n, field)| match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::ExpiredExternalAddr(#expired_external_addr {
self.#i.inject_expired_external_addr(addr);}, addr,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#field_n.on_swarm_event(#from_swarm::ExpiredExternalAddr(#expired_external_addr {
self.#field_n.inject_expired_external_addr(addr);}, addr,
}));
},
}) })
}; };
@ -408,11 +482,17 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.enumerate() .enumerate()
.map(|(field_n, field)| match field.ident { .map(|(field_n, field)| match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::ListenerError(#listener_error {
self.#i.inject_listener_error(listener_id, err);}, listener_id,
err,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#field_n.on_swarm_event(#from_swarm::ListenerError(#listener_error {
self.#field_n.inject_listener_error(listener_id, err);}, listener_id,
err,
}));
},
}) })
}; };
@ -425,11 +505,17 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
.enumerate() .enumerate()
.map(|(field_n, field)| match field.ident { .map(|(field_n, field)| match field.ident {
Some(ref i) => quote! { Some(ref i) => quote! {
#[allow(deprecated)] self.#i.on_swarm_event(#from_swarm::ListenerClosed(#listener_closed {
self.#i.inject_listener_closed(listener_id, reason);}, listener_id,
reason,
}));
},
None => quote! { None => quote! {
#[allow(deprecated)] self.#field_n.on_swarm_event(#from_swarm::ListenerClosed(#listener_closed {
self.#field_n.inject_listener_closed(listener_id, reason);}, listener_id,
reason,
}));
},
}) })
}; };
@ -456,11 +542,9 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
Some(match field.ident { Some(match field.ident {
Some(ref i) => quote! { #elem => { Some(ref i) => quote! { #elem => {
#[allow(deprecated)] #trait_to_impl::on_connection_handler_event(&mut self.#i, peer_id, connection_id, ev) }},
#trait_to_impl::inject_event(&mut self.#i, peer_id, connection_id, ev) }},
None => quote! { #elem => { None => quote! { #elem => {
#[allow(deprecated)] #trait_to_impl::on_connection_handler_event(&mut self.#field_n, peer_id, connection_id, ev) }},
#trait_to_impl::inject_event(&mut self.#field_n, peer_id, connection_id, ev) }},
}) })
}); });

View File

@ -7,12 +7,18 @@
- Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134]. - Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134].
- Remove deprecated `inject_*` methods from `NetworkBehaviour` and `ConnectionHandler`.
see [PR 3264].
- Update to `libp2p-swarm-derive` `v0.32.0`.
- Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`. - Remove type parameter from `PendingOutboundConnectionError` and `PendingInboundConnectionError`.
These two types are always used with `std::io::Error`. See [PR 3272]. These two types are always used with `std::io::Error`. See [PR 3272].
[PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170 [PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170
[PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134 [PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134
[PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 [PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153
[PR 3264]: https://github.com/libp2p/rust-libp2p/pull/3264
[PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272 [PR 3272]: https://github.com/libp2p/rust-libp2p/pull/3272
# 0.41.1 # 0.41.1

View File

@ -17,7 +17,7 @@ futures = "0.3.1"
futures-timer = "3.0.2" futures-timer = "3.0.2"
instant = "0.1.11" instant = "0.1.11"
libp2p-core = { version = "0.39.0", path = "../core" } libp2p-core = { version = "0.39.0", path = "../core" }
libp2p-swarm-derive = { version = "0.31.0", path = "../swarm-derive", optional = true } libp2p-swarm-derive = { version = "0.32.0", path = "../swarm-derive", optional = true }
log = "0.4" log = "0.4"
pin-project = "1.0.0" pin-project = "1.0.0"
rand = "0.8" rand = "0.8"

View File

@ -140,7 +140,8 @@ pub trait NetworkBehaviour: 'static {
/// ///
/// The network behaviour (ie. the implementation of this trait) and the handlers it has spawned /// The network behaviour (ie. the implementation of this trait) and the handlers it has spawned
/// (ie. the objects returned by `new_handler`) can communicate by passing messages. Messages /// (ie. the objects returned by `new_handler`) can communicate by passing messages. Messages
/// sent from the handler to the behaviour are injected with [`NetworkBehaviour::inject_event`], /// sent from the handler to the behaviour are invoked with
/// [`NetworkBehaviour::on_connection_handler_event`],
/// and the behaviour can send a message to the handler by making [`NetworkBehaviour::poll`] /// and the behaviour can send a message to the handler by making [`NetworkBehaviour::poll`]
/// return [`NetworkBehaviourAction::NotifyHandler`]. /// return [`NetworkBehaviourAction::NotifyHandler`].
/// ///
@ -159,7 +160,7 @@ pub trait NetworkBehaviour: 'static {
} }
/// Informs the behaviour about an event from the [`Swarm`](crate::Swarm). /// Informs the behaviour about an event from the [`Swarm`](crate::Swarm).
fn on_swarm_event(&mut self, _event: FromSwarm<Self::ConnectionHandler>) {} fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>);
/// Informs the behaviour about an event generated by the [`ConnectionHandler`] dedicated to the /// Informs the behaviour about an event generated by the [`ConnectionHandler`] dedicated to the
/// peer identified by `peer_id`. for the behaviour. /// peer identified by `peer_id`. for the behaviour.
@ -175,209 +176,6 @@ pub trait NetworkBehaviour: 'static {
) { ) {
} }
/// Informs the behaviour about a newly established connection to a peer.
#[deprecated(
since = "0.40.2",
note = "Handle `FromSwarm::ConnectionEstablished` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_connection_established(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
failed_addresses: Option<&Vec<Multiaddr>>,
other_established: usize,
) {
self.on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id: *peer_id,
connection_id: *connection_id,
endpoint,
failed_addresses: failed_addresses
.map(|v| v.as_slice())
.unwrap_or_else(|| &[]),
other_established,
}));
}
/// Informs the behaviour about a closed connection to a peer.
///
/// A call to this method is always paired with an earlier call to
/// [`NetworkBehaviour::inject_connection_established`] with the same peer ID, connection ID and endpoint.
#[deprecated(
since = "0.40.2",
note = "Handle `FromSwarm::ConnectionClosed` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_connection_closed(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
handler: <Self::ConnectionHandler as IntoConnectionHandler>::Handler,
remaining_established: usize,
) {
self.on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id: *peer_id,
connection_id: *connection_id,
endpoint,
handler,
remaining_established,
}));
}
/// Informs the behaviour that the [`ConnectedPoint`] of an existing connection has changed.
#[deprecated(
since = "0.40.2",
note = "Handle `FromSwarm::AddressChange` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_address_change(
&mut self,
peer_id: &PeerId,
connection_id: &ConnectionId,
old: &ConnectedPoint,
new: &ConnectedPoint,
) {
self.on_swarm_event(FromSwarm::AddressChange(AddressChange {
peer_id: *peer_id,
connection_id: *connection_id,
old,
new,
}));
}
/// Informs the behaviour about an event generated by the handler dedicated to the peer identified by `peer_id`.
/// for the behaviour.
///
/// The `peer_id` is guaranteed to be in a connected state. In other words,
/// [`NetworkBehaviour::inject_connection_established`] has previously been called with this `PeerId`.
#[deprecated(
since = "0.40.2",
note = "Implement `NetworkBehaviour::on_connection_handler_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_event(
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
) {
self.on_connection_handler_event(peer_id, connection, event);
}
/// Indicates to the behaviour that the dial to a known or unknown node failed.
#[deprecated(
since = "0.40.2",
note = "Handle `InEvent::DialFailure` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_dial_failure(
&mut self,
peer_id: Option<PeerId>,
handler: Self::ConnectionHandler,
error: &DialError,
) {
self.on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler,
error,
}));
}
/// Indicates to the behaviour that an error happened on an incoming connection during its
/// initial handshake.
///
/// This can include, for example, an error during the handshake of the encryption layer, or the
/// connection unexpectedly closed.
#[deprecated(
since = "0.40.2",
note = "Handle `FromSwarm::ListenFailure` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_listen_failure(
&mut self,
local_addr: &Multiaddr,
send_back_addr: &Multiaddr,
handler: Self::ConnectionHandler,
) {
self.on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr,
send_back_addr,
handler,
}));
}
/// Indicates to the behaviour that a new listener was created.
#[deprecated(
since = "0.40.2",
note = "Handle `FromSwarm::NewListener` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_new_listener(&mut self, id: ListenerId) {
self.on_swarm_event(FromSwarm::NewListener(NewListener { listener_id: id }));
}
/// Indicates to the behaviour that we have started listening on a new multiaddr.
#[deprecated(
since = "0.40.2",
note = "Handle `FromSwarm::NewListenAddr` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
listener_id: id,
addr,
}));
}
/// Indicates to the behaviour that a multiaddr we were listening on has expired,
/// which means that we are no longer listening on it.
#[deprecated(
since = "0.40.2",
note = "Handle `FromSwarm::ExpiredListenAddr` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_expired_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
self.on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
listener_id: id,
addr,
}));
}
/// A listener experienced an error.
#[deprecated(
since = "0.40.2",
note = "Handle `FromSwarm::ListenerError` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) {
self.on_swarm_event(FromSwarm::ListenerError(ListenerError {
listener_id: id,
err,
}));
}
/// A listener closed.
#[deprecated(
since = "0.40.2",
note = "Handle `FromSwarm::ListenerClosed` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &std::io::Error>) {
self.on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
listener_id: id,
reason,
}));
}
/// Indicates to the behaviour that we have discovered a new external address for us.
#[deprecated(
since = "0.40.2",
note = "Handle `FromSwarm::NewExternalAddr` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.on_swarm_event(FromSwarm::NewExternalAddr(NewExternalAddr { addr }));
}
/// Indicates to the behaviour that an external address was removed.
#[deprecated(
since = "0.40.2",
note = "Handle `FromSwarm::ExpiredExternalAddr` in `NetworkBehaviour::on_swarm_event` instead. The default implementation of this `inject_*` method delegates to it."
)]
fn inject_expired_external_addr(&mut self, addr: &Multiaddr) {
self.on_swarm_event(FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }));
}
/// Polls for things that swarm should do. /// Polls for things that swarm should do.
/// ///
/// This API mimics the API of the `Stream` trait. The method may register the current task in /// This API mimics the API of the `Stream` trait. The method may register the current task in
@ -447,8 +245,8 @@ pub enum NetworkBehaviourAction<
/// Instructs the swarm to start a dial. /// Instructs the swarm to start a dial.
/// ///
/// On success, [`NetworkBehaviour::inject_connection_established`] is invoked. /// On success, [`NetworkBehaviour::on_swarm_event`] with `ConnectionEstablished` is invoked.
/// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked. /// On failure, [`NetworkBehaviour::on_swarm_event`] with `DialFailure` is invoked.
/// ///
/// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure /// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure
/// and connection closing. Thus it can be used to carry state, which otherwise would have to be /// and connection closing. Thus it can be used to carry state, which otherwise would have to be
@ -468,10 +266,11 @@ pub enum NetworkBehaviourAction<
/// # use libp2p_core::PeerId; /// # use libp2p_core::PeerId;
/// # use libp2p_plaintext::PlainText2Config; /// # use libp2p_plaintext::PlainText2Config;
/// # use libp2p_swarm::{ /// # use libp2p_swarm::{
/// # DialError, IntoConnectionHandler, KeepAlive, NegotiatedSubstream, /// # FromSwarm, DialFailure, DialError, IntoConnectionHandler, KeepAlive, NegotiatedSubstream,
/// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ConnectionHandler, /// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ConnectionHandler,
/// # ConnectionHandlerEvent, ConnectionHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent, /// # ConnectionHandlerEvent, ConnectionHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent,
/// # }; /// # };
/// # use libp2p_swarm::handler::ConnectionEvent;
/// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition}; /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
/// # use libp2p_yamux as yamux; /// # use libp2p_yamux as yamux;
/// # use std::collections::VecDeque; /// # use std::collections::VecDeque;
@ -533,7 +332,7 @@ pub enum NetworkBehaviourAction<
/// # } /// # }
/// # /// #
/// # /// #
/// # fn inject_event( /// # fn on_connection_handler_event(
/// # &mut self, /// # &mut self,
/// # _: PeerId, /// # _: PeerId,
/// # _: ConnectionId, /// # _: ConnectionId,
@ -542,17 +341,17 @@ pub enum NetworkBehaviourAction<
/// # unreachable!(); /// # unreachable!();
/// # } /// # }
/// # /// #
/// fn inject_dial_failure( /// fn on_swarm_event(
/// &mut self, /// &mut self,
/// _: Option<PeerId>, /// event: FromSwarm<Self::ConnectionHandler>,
/// handler: Self::ConnectionHandler,
/// _: &DialError,
/// ) { /// ) {
/// // As expected, sending the message failed. But lucky us, we got the handler back, thus /// // As expected, sending the message failed. But lucky us, we got the handler back, thus
/// // the precious message is not lost and we can return it back to the user. /// // the precious message is not lost and we can return it back to the user.
/// let msg = handler.message.unwrap(); /// if let FromSwarm::DialFailure(DialFailure { handler, .. }) = event {
/// self.outbox_to_swarm /// let msg = handler.message.unwrap();
/// .push_back(NetworkBehaviourAction::GenerateEvent(msg)) /// self.outbox_to_swarm
/// .push_back(NetworkBehaviourAction::GenerateEvent(msg))
/// }
/// } /// }
/// # /// #
/// # fn poll( /// # fn poll(
@ -586,28 +385,17 @@ pub enum NetworkBehaviourAction<
/// # SubstreamProtocol::new(DeniedUpgrade, ()) /// # SubstreamProtocol::new(DeniedUpgrade, ())
/// # } /// # }
/// # /// #
/// # fn inject_fully_negotiated_inbound( /// # fn on_behaviour_event(&mut self, _event: Self::InEvent) {}
/// # &mut self,
/// # _: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
/// # _: Self::InboundOpenInfo,
/// # ) {
/// # }
/// # /// #
/// # fn inject_fully_negotiated_outbound( /// # fn on_connection_event(
/// # &mut self, /// # &mut self,
/// # _: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output, /// # event: ConnectionEvent<
/// # _: Self::OutboundOpenInfo, /// # Self::InboundProtocol,
/// # ) { /// # Self::OutboundProtocol,
/// # } /// # Self::InboundOpenInfo,
/// # /// # Self::OutboundOpenInfo,
/// # fn inject_event(&mut self, _event: Self::InEvent) {} /// # >,
/// # /// # ) {}
/// # fn inject_dial_upgrade_error(
/// # &mut self,
/// # _: Self::OutboundOpenInfo,
/// # _: ConnectionHandlerUpgrErr<Void>,
/// # ) {
/// # }
/// # /// #
/// # fn connection_keep_alive(&self) -> KeepAlive { /// # fn connection_keep_alive(&self) -> KeepAlive {
/// # KeepAlive::Yes /// # KeepAlive::Yes
@ -641,7 +429,7 @@ pub enum NetworkBehaviourAction<
/// If the specified connection no longer exists, the event is silently dropped. /// If the specified connection no longer exists, the event is silently dropped.
/// ///
/// Typically the connection ID given is the same as the one passed to /// Typically the connection ID given is the same as the one passed to
/// [`NetworkBehaviour::inject_event`], i.e. whenever the behaviour wishes to /// [`NetworkBehaviour::on_connection_handler_event`], i.e. whenever the behaviour wishes to
/// respond to a request on the same connection (and possibly the same /// respond to a request on the same connection (and possibly the same
/// substream, as per the implementation of [`ConnectionHandler`]). /// substream, as per the implementation of [`ConnectionHandler`]).
/// ///
@ -1128,103 +916,3 @@ impl<'a, Handler: IntoConnectionHandler> FromSwarm<'a, Handler> {
} }
} }
} }
/// Helper function to call [`NetworkBehaviour`]'s `inject_*` methods given a `FromSwarm.
/// TODO: Remove this function when we remove the remaining `inject_*` calls
/// from [`Either`] and [`Toggle`].
pub(crate) fn inject_from_swarm<T: NetworkBehaviour>(
behaviour: &mut T,
event: FromSwarm<T::ConnectionHandler>,
) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished {
peer_id,
connection_id,
endpoint,
failed_addresses,
other_established,
}) => {
#[allow(deprecated)]
behaviour.inject_connection_established(
&peer_id,
&connection_id,
endpoint,
Some(&failed_addresses.into()),
other_established,
);
}
FromSwarm::ConnectionClosed(ConnectionClosed {
peer_id,
connection_id,
endpoint,
handler,
remaining_established,
}) => {
#[allow(deprecated)]
behaviour.inject_connection_closed(
&peer_id,
&connection_id,
endpoint,
handler,
remaining_established,
);
}
FromSwarm::AddressChange(AddressChange {
peer_id,
connection_id,
old,
new,
}) => {
#[allow(deprecated)]
behaviour.inject_address_change(&peer_id, &connection_id, old, new);
}
FromSwarm::DialFailure(DialFailure {
peer_id,
handler,
error,
}) => {
#[allow(deprecated)]
behaviour.inject_dial_failure(peer_id, handler, error);
}
FromSwarm::ListenFailure(ListenFailure {
local_addr,
send_back_addr,
handler,
}) => {
#[allow(deprecated)]
behaviour.inject_listen_failure(local_addr, send_back_addr, handler);
}
FromSwarm::NewListener(NewListener { listener_id }) => {
#[allow(deprecated)]
behaviour.inject_new_listener(listener_id);
}
FromSwarm::NewListenAddr(NewListenAddr { listener_id, addr }) => {
#[allow(deprecated)]
behaviour.inject_new_listen_addr(listener_id, addr);
}
FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id, addr }) => {
#[allow(deprecated)]
behaviour.inject_expired_listen_addr(listener_id, addr);
}
FromSwarm::ListenerError(ListenerError { listener_id, err }) => {
#[allow(deprecated)]
behaviour.inject_listener_error(listener_id, err);
}
FromSwarm::ListenerClosed(ListenerClosed {
listener_id,
reason,
}) => {
#[allow(deprecated)]
behaviour.inject_listener_closed(listener_id, reason);
}
FromSwarm::NewExternalAddr(NewExternalAddr { addr }) => {
#[allow(deprecated)]
behaviour.inject_new_external_addr(addr);
}
FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }) =>
{
#[allow(deprecated)]
behaviour.inject_expired_external_addr(addr)
}
}
}

View File

@ -18,9 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::behaviour::{ use crate::behaviour::{self, NetworkBehaviour, NetworkBehaviourAction, PollParameters};
self, inject_from_swarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters,
};
use crate::handler::either::IntoEitherHandler; use crate::handler::either::IntoEitherHandler;
use either::Either; use either::Either;
use libp2p_core::{Multiaddr, PeerId}; use libp2p_core::{Multiaddr, PeerId};
@ -51,26 +49,20 @@ where
fn on_swarm_event(&mut self, event: behaviour::FromSwarm<Self::ConnectionHandler>) { fn on_swarm_event(&mut self, event: behaviour::FromSwarm<Self::ConnectionHandler>) {
match self { match self {
Either::Left(b) => inject_from_swarm( Either::Left(b) => b.on_swarm_event(event.map_handler(
b, |h| h.unwrap_left(),
event.map_handler( |h| match h {
|h| h.unwrap_left(), Either::Left(h) => h,
|h| match h { Either::Right(_) => unreachable!(),
Either::Left(h) => h, },
Either::Right(_) => unreachable!(), )),
}, Either::Right(b) => b.on_swarm_event(event.map_handler(
), |h| h.unwrap_right(),
), |h| match h {
Either::Right(b) => inject_from_swarm( Either::Right(h) => h,
b, Either::Left(_) => unreachable!(),
event.map_handler( },
|h| h.unwrap_right(), )),
|h| match h {
Either::Right(h) => h,
Either::Left(_) => unreachable!(),
},
),
),
} }
} }
@ -82,12 +74,10 @@ where
) { ) {
match (self, event) { match (self, event) {
(Either::Left(left), Either::Left(event)) => { (Either::Left(left), Either::Left(event)) => {
#[allow(deprecated)] left.on_connection_handler_event(peer_id, connection_id, event);
left.inject_event(peer_id, connection_id, event);
} }
(Either::Right(right), Either::Right(event)) => { (Either::Right(right), Either::Right(event)) => {
#[allow(deprecated)] right.on_connection_handler_event(peer_id, connection_id, event);
right.inject_event(peer_id, connection_id, event);
} }
_ => unreachable!(), _ => unreachable!(),
} }

View File

@ -18,11 +18,11 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::behaviour::{inject_from_swarm, FromSwarm}; use crate::behaviour::FromSwarm;
use crate::handler::{ use crate::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent,
DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, IntoConnectionHandler, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
KeepAlive, ListenUpgradeError, SubstreamProtocol, IntoConnectionHandler, KeepAlive, ListenUpgradeError, SubstreamProtocol,
}; };
use crate::upgrade::SendWrapper; use crate::upgrade::SendWrapper;
use crate::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use crate::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
@ -87,7 +87,7 @@ where
fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) { fn on_swarm_event(&mut self, event: FromSwarm<Self::ConnectionHandler>) {
if let Some(behaviour) = &mut self.inner { if let Some(behaviour) = &mut self.inner {
if let Some(event) = event.maybe_map_handler(|h| h.inner, |h| h.inner) { if let Some(event) = event.maybe_map_handler(|h| h.inner, |h| h.inner) {
inject_from_swarm(behaviour, event); behaviour.on_swarm_event(event);
} }
} }
} }
@ -99,8 +99,7 @@ where
event: crate::THandlerOutEvent<Self>, event: crate::THandlerOutEvent<Self>,
) { ) {
if let Some(behaviour) = &mut self.inner { if let Some(behaviour) = &mut self.inner {
#[allow(deprecated)] behaviour.on_connection_handler_event(peer_id, connection_id, event)
behaviour.inject_event(peer_id, connection_id, event)
} }
} }
@ -176,13 +175,17 @@ where
}; };
if let Either::Left(info) = info { if let Either::Left(info) = info {
#[allow(deprecated)]
self.inner self.inner
.as_mut() .as_mut()
.expect("Can't receive an inbound substream if disabled; QED") .expect("Can't receive an inbound substream if disabled; QED")
.inject_fully_negotiated_inbound(out, info) .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
FullyNegotiatedInbound {
protocol: out,
info,
},
));
} else { } else {
panic!("Unexpected Either::Right in enabled `inject_fully_negotiated_inbound`.") panic!("Unexpected Either::Right in enabled `on_fully_negotiated_inbound`.")
} }
} }
@ -199,11 +202,11 @@ where
(None, Either::Right(())) => return, (None, Either::Right(())) => return,
(Some(_), Either::Right(())) => panic!( (Some(_), Either::Right(())) => panic!(
"Unexpected `Either::Right` inbound info through \ "Unexpected `Either::Right` inbound info through \
`inject_listen_upgrade_error` in enabled state.", `on_listen_upgrade_error` in enabled state.",
), ),
(None, Either::Left(_)) => panic!( (None, Either::Left(_)) => panic!(
"Unexpected `Either::Left` inbound info through \ "Unexpected `Either::Left` inbound info through \
`inject_listen_upgrade_error` in disabled state.", `on_listen_upgrade_error` in disabled state.",
), ),
}; };
@ -218,8 +221,10 @@ where
} }
}; };
#[allow(deprecated)] inner.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
inner.inject_listen_upgrade_error(info, err) info,
error: err,
}));
} }
} }
@ -251,11 +256,10 @@ where
} }
fn on_behaviour_event(&mut self, event: Self::InEvent) { fn on_behaviour_event(&mut self, event: Self::InEvent) {
#[allow(deprecated)]
self.inner self.inner
.as_mut() .as_mut()
.expect("Can't receive events if disabled; QED") .expect("Can't receive events if disabled; QED")
.inject_event(event) .on_behaviour_event(event)
} }
fn connection_keep_alive(&self) -> KeepAlive { fn connection_keep_alive(&self) -> KeepAlive {
@ -299,28 +303,31 @@ where
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol: out, protocol: out,
info, info,
}) => }) => self
{ .inner
#[allow(deprecated)] .as_mut()
self.inner .expect("Can't receive an outbound substream if disabled; QED")
.as_mut() .on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
.expect("Can't receive an outbound substream if disabled; QED") FullyNegotiatedOutbound {
.inject_fully_negotiated_outbound(out, info) protocol: out,
} info,
},
)),
ConnectionEvent::AddressChange(address_change) => { ConnectionEvent::AddressChange(address_change) => {
if let Some(inner) = self.inner.as_mut() { if let Some(inner) = self.inner.as_mut() {
#[allow(deprecated)] inner.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
inner.inject_address_change(address_change.new_address) new_address: address_change.new_address,
}));
} }
} }
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error: err }) => self
{ .inner
#[allow(deprecated)] .as_mut()
self.inner .expect("Can't receive an outbound substream if disabled; QED")
.as_mut() .on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
.expect("Can't receive an outbound substream if disabled; QED") info,
.inject_dial_upgrade_error(info, err) error: err,
} })),
ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
self.on_listen_upgrade_error(listen_upgrade_error) self.on_listen_upgrade_error(listen_upgrade_error)
} }

View File

@ -27,7 +27,10 @@ pub use error::{
PendingOutboundConnectionError, PendingOutboundConnectionError,
}; };
use crate::handler::ConnectionHandler; use crate::handler::{
AddressChange, ConnectionEvent, ConnectionHandler, DialUpgradeError, FullyNegotiatedInbound,
FullyNegotiatedOutbound, ListenUpgradeError,
};
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper}; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, SendWrapper};
use crate::{ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol}; use crate::{ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, SubstreamProtocol};
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
@ -150,8 +153,7 @@ where
/// Notifies the connection handler of an event. /// Notifies the connection handler of an event.
pub fn on_behaviour_event(&mut self, event: THandler::InEvent) { pub fn on_behaviour_event(&mut self, event: THandler::InEvent) {
#[allow(deprecated)] self.handler.on_behaviour_event(event);
self.handler.inject_event(event);
} }
/// Begins an orderly shutdown of the connection, returning the connection /// Begins an orderly shutdown of the connection, returning the connection
@ -180,9 +182,13 @@ where
loop { loop {
match requested_substreams.poll_next_unpin(cx) { match requested_substreams.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(()))) => continue, Poll::Ready(Some(Ok(()))) => continue,
Poll::Ready(Some(Err(user_data))) => { Poll::Ready(Some(Err(info))) => {
#[allow(deprecated)] handler.on_connection_event(ConnectionEvent::DialUpgradeError(
handler.inject_dial_upgrade_error(user_data, ConnectionHandlerUpgrErr::Timeout); DialUpgradeError {
info,
error: ConnectionHandlerUpgrErr::Timeout,
},
));
continue; continue;
} }
Poll::Ready(None) | Poll::Pending => {} Poll::Ready(None) | Poll::Pending => {}
@ -209,14 +215,16 @@ where
// In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams. // In case the [`ConnectionHandler`] can not make any more progress, poll the negotiating outbound streams.
match negotiating_out.poll_next_unpin(cx) { match negotiating_out.poll_next_unpin(cx) {
Poll::Pending | Poll::Ready(None) => {} Poll::Pending | Poll::Ready(None) => {}
Poll::Ready(Some((user_data, Ok(upgrade)))) => { Poll::Ready(Some((info, Ok(protocol)))) => {
#[allow(deprecated)] handler.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
handler.inject_fully_negotiated_outbound(upgrade, user_data); FullyNegotiatedOutbound { protocol, info },
));
continue; continue;
} }
Poll::Ready(Some((user_data, Err(err)))) => { Poll::Ready(Some((info, Err(error)))) => {
#[allow(deprecated)] handler.on_connection_event(ConnectionEvent::DialUpgradeError(
handler.inject_dial_upgrade_error(user_data, err); DialUpgradeError { info, error },
));
continue; continue;
} }
} }
@ -225,14 +233,16 @@ where
// make any more progress, poll the negotiating inbound streams. // make any more progress, poll the negotiating inbound streams.
match negotiating_in.poll_next_unpin(cx) { match negotiating_in.poll_next_unpin(cx) {
Poll::Pending | Poll::Ready(None) => {} Poll::Pending | Poll::Ready(None) => {}
Poll::Ready(Some((user_data, Ok(upgrade)))) => { Poll::Ready(Some((info, Ok(protocol)))) => {
#[allow(deprecated)] handler.on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
handler.inject_fully_negotiated_inbound(upgrade, user_data); FullyNegotiatedInbound { protocol, info },
));
continue; continue;
} }
Poll::Ready(Some((user_data, Err(err)))) => { Poll::Ready(Some((info, Err(error)))) => {
#[allow(deprecated)] handler.on_connection_event(ConnectionEvent::ListenUpgradeError(
handler.inject_listen_upgrade_error(user_data, err); ListenUpgradeError { info, error },
));
continue; continue;
} }
} }
@ -279,8 +289,9 @@ where
match muxing.poll_unpin(cx)? { match muxing.poll_unpin(cx)? {
Poll::Pending => {} Poll::Pending => {}
Poll::Ready(StreamMuxerEvent::AddressChange(address)) => { Poll::Ready(StreamMuxerEvent::AddressChange(address)) => {
#[allow(deprecated)] handler.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
handler.inject_address_change(&address); new_address: &address,
}));
return Poll::Ready(Ok(Event::AddressChange(address))); return Poll::Ready(Ok(Event::AddressChange(address)));
} }
} }
@ -757,34 +768,35 @@ mod tests {
SubstreamProtocol::new(DeniedUpgrade, ()).with_timeout(self.upgrade_timeout) SubstreamProtocol::new(DeniedUpgrade, ()).with_timeout(self.upgrade_timeout)
} }
fn inject_fully_negotiated_inbound( fn on_connection_event(
&mut self, &mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output, event: ConnectionEvent<
_: Self::InboundOpenInfo, Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) { ) {
void::unreachable(protocol) match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol,
..
}) => void::unreachable(protocol),
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol,
..
}) => void::unreachable(protocol),
ConnectionEvent::DialUpgradeError(DialUpgradeError { error, .. }) => {
self.error = Some(error)
}
ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) => {}
}
} }
fn inject_fully_negotiated_outbound( fn on_behaviour_event(&mut self, event: Self::InEvent) {
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
_: Self::OutboundOpenInfo,
) {
void::unreachable(protocol)
}
fn inject_event(&mut self, event: Self::InEvent) {
void::unreachable(event) void::unreachable(event)
} }
fn inject_dial_upgrade_error(
&mut self,
_: Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
) {
self.error = Some(error)
}
fn connection_keep_alive(&self) -> KeepAlive { fn connection_keep_alive(&self) -> KeepAlive {
KeepAlive::Yes KeepAlive::Yes
} }

View File

@ -116,102 +116,6 @@ pub trait ConnectionHandler: Send + 'static {
/// > This allows a remote to put the list of supported protocols in a cache. /// > This allows a remote to put the list of supported protocols in a cache.
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo>; fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo>;
/// Injects the output of a successful upgrade on a new inbound substream.
///
/// Note that it is up to the [`ConnectionHandler`] implementation to manage the lifetime of the
/// negotiated inbound substreams. E.g. the implementation has to enforce a limit on the number
/// of simultaneously open negotiated inbound substreams. In other words it is up to the
/// [`ConnectionHandler`] implementation to stop a malicious remote node to open and keep alive
/// an excessive amount of inbound substreams.
#[deprecated(
since = "0.41.0",
note = "Handle `ConnectionEvent::FullyNegotiatedInbound` on `ConnectionHandler::on_connection_event` instead.
The default implemention of this `inject_*` method delegates to it."
)]
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as InboundUpgradeSend>::Output,
info: Self::InboundOpenInfo,
) {
self.on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
FullyNegotiatedInbound { protocol, info },
))
}
/// Injects the output of a successful upgrade on a new outbound substream.
///
/// The second argument is the information that was previously passed to
/// [`ConnectionHandlerEvent::OutboundSubstreamRequest`].
#[deprecated(
since = "0.41.0",
note = "Handle `ConnectionEvent::FullyNegotiatedOutbound` on `ConnectionHandler::on_connection_event` instead.
The default implemention of this `inject_*` method delegates to it."
)]
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output,
info: Self::OutboundOpenInfo,
) {
self.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
FullyNegotiatedOutbound { protocol, info },
))
}
/// Injects an event coming from the outside in the handler.
#[deprecated(
since = "0.41.0",
note = "Implement `ConnectionHandler::on_behaviour_event` instead. The default implementation of `inject_event` delegates to it."
)]
fn inject_event(&mut self, event: Self::InEvent) {
self.on_behaviour_event(event);
}
/// Notifies the handler of a change in the address of the remote.
#[deprecated(
since = "0.41.0",
note = "Handle `ConnectionEvent::AddressChange` on `ConnectionHandler::on_connection_event` instead.
The default implemention of this `inject_*` method delegates to it."
)]
fn inject_address_change(&mut self, new_address: &Multiaddr) {
self.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
new_address,
}))
}
/// Indicates to the handler that upgrading an outbound substream to the given protocol has failed.
#[deprecated(
since = "0.41.0",
note = "Handle `ConnectionEvent::DialUpgradeError` on `ConnectionHandler::on_connection_event` instead.
The default implemention of this `inject_*` method delegates to it."
)]
fn inject_dial_upgrade_error(
&mut self,
info: Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
) {
self.on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
info,
error,
}))
}
/// Indicates to the handler that upgrading an inbound substream to the given protocol has failed.
#[deprecated(
since = "0.41.0",
note = "Handle `ConnectionEvent::ListenUpgradeError` on `ConnectionHandler::on_connection_event` instead.
The default implemention of this `inject_*` method delegates to it."
)]
fn inject_listen_upgrade_error(
&mut self,
info: Self::InboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
) {
self.on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info,
error,
}))
}
/// Returns until when the connection should be kept alive. /// Returns until when the connection should be kept alive.
/// ///
/// This method is called by the `Swarm` after each invocation of /// This method is called by the `Swarm` after each invocation of
@ -279,18 +183,17 @@ pub trait ConnectionHandler: Send + 'static {
} }
/// Informs the handler about an event from the [`NetworkBehaviour`](super::NetworkBehaviour). /// Informs the handler about an event from the [`NetworkBehaviour`](super::NetworkBehaviour).
fn on_behaviour_event(&mut self, _event: Self::InEvent) {} fn on_behaviour_event(&mut self, _event: Self::InEvent);
fn on_connection_event( fn on_connection_event(
&mut self, &mut self,
_event: ConnectionEvent< event: ConnectionEvent<
Self::InboundProtocol, Self::InboundProtocol,
Self::OutboundProtocol, Self::OutboundProtocol,
Self::InboundOpenInfo, Self::InboundOpenInfo,
Self::OutboundOpenInfo, Self::OutboundOpenInfo,
>, >,
) { );
}
} }
/// Enumeration with the list of the possible stream events /// Enumeration with the list of the possible stream events

View File

@ -19,9 +19,9 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::handler::{ use crate::handler::{
AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr,
ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, InboundUpgradeSend,
IntoConnectionHandler, KeepAlive, ListenUpgradeError, SubstreamProtocol, IntoConnectionHandler, KeepAlive, ListenUpgradeError, OutboundUpgradeSend, SubstreamProtocol,
}; };
use crate::upgrade::SendWrapper; use crate::upgrade::SendWrapper;
use either::Either; use either::Either;
@ -90,6 +90,190 @@ impl<L, R> IntoEitherHandler<L, R> {
} }
} }
impl<LIP, RIP, LIOI, RIOI>
FullyNegotiatedInbound<EitherUpgrade<SendWrapper<LIP>, SendWrapper<RIP>>, Either<LIOI, RIOI>>
where
RIP: InboundUpgradeSend,
LIP: InboundUpgradeSend,
{
fn transpose(
self,
) -> Either<FullyNegotiatedInbound<LIP, LIOI>, FullyNegotiatedInbound<RIP, RIOI>> {
match self {
FullyNegotiatedInbound {
protocol: EitherOutput::First(protocol),
info: Either::Left(info),
} => Either::Left(FullyNegotiatedInbound { protocol, info }),
FullyNegotiatedInbound {
protocol: EitherOutput::Second(protocol),
info: Either::Right(info),
} => Either::Right(FullyNegotiatedInbound { protocol, info }),
_ => unreachable!(),
}
}
}
impl<LOP, ROP, LOOI, ROOI>
FullyNegotiatedOutbound<EitherUpgrade<SendWrapper<LOP>, SendWrapper<ROP>>, Either<LOOI, ROOI>>
where
LOP: OutboundUpgradeSend,
ROP: OutboundUpgradeSend,
{
fn transpose(
self,
) -> Either<FullyNegotiatedOutbound<LOP, LOOI>, FullyNegotiatedOutbound<ROP, ROOI>> {
match self {
FullyNegotiatedOutbound {
protocol: EitherOutput::First(protocol),
info: Either::Left(info),
} => Either::Left(FullyNegotiatedOutbound { protocol, info }),
FullyNegotiatedOutbound {
protocol: EitherOutput::Second(protocol),
info: Either::Right(info),
} => Either::Right(FullyNegotiatedOutbound { protocol, info }),
_ => unreachable!(),
}
}
}
impl<LOP, ROP, LOOI, ROOI>
DialUpgradeError<Either<LOOI, ROOI>, EitherUpgrade<SendWrapper<LOP>, SendWrapper<ROP>>>
where
LOP: OutboundUpgradeSend,
ROP: OutboundUpgradeSend,
{
fn transpose(self) -> Either<DialUpgradeError<LOOI, LOP>, DialUpgradeError<ROOI, ROP>> {
match self {
DialUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(error))),
info: Either::Left(info),
} => Either::Left(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)),
info,
}),
DialUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(error))),
info: Either::Right(info),
} => Either::Right(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)),
info,
}),
DialUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info: Either::Left(info),
} => Either::Left(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info,
}),
DialUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info: Either::Right(info),
} => Either::Right(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info,
}),
DialUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info: Either::Left(info),
} => Either::Left(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info,
}),
DialUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info: Either::Right(info),
} => Either::Right(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info,
}),
DialUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info: Either::Left(info),
} => Either::Left(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info,
}),
DialUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info: Either::Right(info),
} => Either::Right(DialUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info,
}),
_ => unreachable!(),
}
}
}
impl<LIP, RIP, LIOI, RIOI>
ListenUpgradeError<Either<LIOI, RIOI>, EitherUpgrade<SendWrapper<LIP>, SendWrapper<RIP>>>
where
RIP: InboundUpgradeSend,
LIP: InboundUpgradeSend,
{
fn transpose(self) -> Either<ListenUpgradeError<LIOI, LIP>, ListenUpgradeError<RIOI, RIP>> {
match self {
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(error))),
info: Either::Left(info),
} => Either::Left(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)),
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(error))),
info: Either::Right(info),
} => Either::Right(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(error)),
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info: Either::Left(info),
} => Either::Left(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info: Either::Right(info),
} => Either::Right(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info: Either::Left(info),
} => Either::Left(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info: Either::Right(info),
} => Either::Right(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timer,
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info: Either::Left(info),
} => Either::Left(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info,
}),
ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info: Either::Right(info),
} => Either::Right(ListenUpgradeError {
error: ConnectionHandlerUpgrErr::Timeout,
info,
}),
_ => unreachable!(),
}
}
}
/// Implementation of a [`ConnectionHandler`] that represents either of two [`ConnectionHandler`] /// Implementation of a [`ConnectionHandler`] that represents either of two [`ConnectionHandler`]
/// implementations. /// implementations.
impl<L, R> ConnectionHandler for Either<L, R> impl<L, R> ConnectionHandler for Either<L, R>
@ -122,10 +306,8 @@ where
fn on_behaviour_event(&mut self, event: Self::InEvent) { fn on_behaviour_event(&mut self, event: Self::InEvent) {
match (self, event) { match (self, event) {
#[allow(deprecated)] (Either::Left(handler), Either::Left(event)) => handler.on_behaviour_event(event),
(Either::Left(handler), Either::Left(event)) => handler.inject_event(event), (Either::Right(handler), Either::Right(event)) => handler.on_behaviour_event(event),
#[allow(deprecated)]
(Either::Right(handler), Either::Right(event)) => handler.inject_event(event),
_ => unreachable!(), _ => unreachable!(),
} }
} }
@ -174,188 +356,62 @@ where
>, >,
) { ) {
match event { match event {
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
protocol: output, match (fully_negotiated_inbound.transpose(), self) {
info, (Either::Left(fully_negotiated_inbound), Either::Left(handler)) => handler
}) => match (self, output, info) { .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
(Either::Left(handler), EitherOutput::First(output), Either::Left(info)) => fully_negotiated_inbound,
{ )),
#[allow(deprecated)] (Either::Right(fully_negotiated_inbound), Either::Right(handler)) => handler
handler.inject_fully_negotiated_outbound(output, info) .on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
} fully_negotiated_inbound,
(Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) => )),
{
#[allow(deprecated)]
handler.inject_fully_negotiated_outbound(output, info)
}
_ => unreachable!(),
},
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol: output,
info,
}) => match (self, output, info) {
(Either::Left(handler), EitherOutput::First(output), Either::Left(info)) =>
{
#[allow(deprecated)]
handler.inject_fully_negotiated_inbound(output, info)
}
(Either::Right(handler), EitherOutput::Second(output), Either::Right(info)) =>
{
#[allow(deprecated)]
handler.inject_fully_negotiated_inbound(output, info)
}
_ => unreachable!(),
},
ConnectionEvent::AddressChange(AddressChange { new_address: addr }) => match self {
#[allow(deprecated)]
Either::Left(handler) => handler.inject_address_change(addr),
#[allow(deprecated)]
Either::Right(handler) => handler.inject_address_change(addr),
},
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => match error {
ConnectionHandlerUpgrErr::Timer => match (self, info) {
(Either::Left(handler), Either::Left(info)) => {
#[allow(deprecated)]
handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer);
}
(Either::Right(handler), Either::Right(info)) => {
#[allow(deprecated)]
handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer);
}
_ => unreachable!(), _ => unreachable!(),
},
ConnectionHandlerUpgrErr::Timeout => match (self, info) {
(Either::Left(handler), Either::Left(info)) => {
#[allow(deprecated)]
handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout);
}
(Either::Right(handler), Either::Right(info)) => {
#[allow(deprecated)]
handler.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout);
}
_ => unreachable!(),
},
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)) => {
match (self, info) {
(Either::Left(handler), Either::Left(info)) => {
#[allow(deprecated)]
handler.inject_dial_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
);
}
(Either::Right(handler), Either::Right(info)) => {
#[allow(deprecated)]
handler.inject_dial_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
);
}
_ => unreachable!(),
}
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => {
match (self, info) {
(Either::Left(handler), Either::Left(info)) => {
#[allow(deprecated)]
handler.inject_dial_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
);
}
_ => unreachable!(),
}
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => {
match (self, info) {
(Either::Right(handler), Either::Right(info)) => {
#[allow(deprecated)]
handler.inject_dial_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
);
}
_ => unreachable!(),
}
}
},
ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info, error }) => {
match error {
ConnectionHandlerUpgrErr::Timer => match (self, info) {
(Either::Left(handler), Either::Left(info)) => {
#[allow(deprecated)]
handler
.inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timer);
}
(Either::Right(handler), Either::Right(info)) => {
#[allow(deprecated)]
handler
.inject_listen_upgrade_error(info, ConnectionHandlerUpgrErr::Timer);
}
_ => unreachable!(),
},
ConnectionHandlerUpgrErr::Timeout => match (self, info) {
(Either::Left(handler), Either::Left(info)) => {
#[allow(deprecated)]
handler.inject_listen_upgrade_error(
info,
ConnectionHandlerUpgrErr::Timeout,
);
}
(Either::Right(handler), Either::Right(info)) => {
#[allow(deprecated)]
handler.inject_listen_upgrade_error(
info,
ConnectionHandlerUpgrErr::Timeout,
);
}
_ => unreachable!(),
},
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)) => {
match (self, info) {
(Either::Left(handler), Either::Left(info)) => {
#[allow(deprecated)]
handler.inject_listen_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
);
}
(Either::Right(handler), Either::Right(info)) => {
#[allow(deprecated)]
handler.inject_listen_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(error)),
);
}
_ => unreachable!(),
}
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => {
match (self, info) {
(Either::Left(handler), Either::Left(info)) => {
#[allow(deprecated)]
handler.inject_listen_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
);
}
_ => unreachable!(),
}
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => {
match (self, info) {
(Either::Right(handler), Either::Right(info)) => {
#[allow(deprecated)]
handler.inject_listen_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
);
}
_ => unreachable!(),
}
}
} }
} }
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
match (fully_negotiated_outbound.transpose(), self) {
(Either::Left(fully_negotiated_outbound), Either::Left(handler)) => handler
.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
fully_negotiated_outbound,
)),
(Either::Right(fully_negotiated_outbound), Either::Right(handler)) => handler
.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
fully_negotiated_outbound,
)),
_ => unreachable!(),
}
}
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
match (dial_upgrade_error.transpose(), self) {
(Either::Left(dial_upgrade_error), Either::Left(handler)) => handler
.on_connection_event(ConnectionEvent::DialUpgradeError(dial_upgrade_error)),
(Either::Right(dial_upgrade_error), Either::Right(handler)) => handler
.on_connection_event(ConnectionEvent::DialUpgradeError(dial_upgrade_error)),
_ => unreachable!(),
}
}
ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
match (listen_upgrade_error.transpose(), self) {
(Either::Left(listen_upgrade_error), Either::Left(handler)) => handler
.on_connection_event(ConnectionEvent::ListenUpgradeError(
listen_upgrade_error,
)),
(Either::Right(listen_upgrade_error), Either::Right(handler)) => handler
.on_connection_event(ConnectionEvent::ListenUpgradeError(
listen_upgrade_error,
)),
_ => unreachable!(),
}
}
ConnectionEvent::AddressChange(address_change) => match self {
Either::Left(handler) => {
handler.on_connection_event(ConnectionEvent::AddressChange(address_change))
}
Either::Right(handler) => {
handler.on_connection_event(ConnectionEvent::AddressChange(address_change))
}
},
} }
} }
} }

View File

@ -19,9 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::handler::{ use crate::handler::{
AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError,
SubstreamProtocol,
}; };
use std::{fmt::Debug, marker::PhantomData, task::Context, task::Poll}; use std::{fmt::Debug, marker::PhantomData, task::Context, task::Poll};
@ -65,8 +63,7 @@ where
fn on_behaviour_event(&mut self, event: TNewIn) { fn on_behaviour_event(&mut self, event: TNewIn) {
if let Some(event) = (self.map)(event) { if let Some(event) = (self.map)(event) {
#[allow(deprecated)] self.inner.on_behaviour_event(event);
self.inner.inject_event(event);
} }
} }
@ -97,35 +94,6 @@ where
Self::OutboundOpenInfo, Self::OutboundOpenInfo,
>, >,
) { ) {
match event { self.inner.on_connection_event(event);
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { protocol, info }) =>
{
#[allow(deprecated)]
self.inner.inject_fully_negotiated_inbound(protocol, info)
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol,
info,
}) =>
{
#[allow(deprecated)]
self.inner.inject_fully_negotiated_outbound(protocol, info)
}
ConnectionEvent::AddressChange(AddressChange { new_address }) =>
{
#[allow(deprecated)]
self.inner.inject_address_change(new_address)
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) =>
{
#[allow(deprecated)]
self.inner.inject_dial_upgrade_error(info, error)
}
ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info, error }) =>
{
#[allow(deprecated)]
self.inner.inject_listen_upgrade_error(info, error)
}
}
} }
} }

View File

@ -19,9 +19,7 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::handler::{ use crate::handler::{
AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, SubstreamProtocol,
FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, ListenUpgradeError,
SubstreamProtocol,
}; };
use std::fmt::Debug; use std::fmt::Debug;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
@ -59,8 +57,7 @@ where
} }
fn on_behaviour_event(&mut self, event: Self::InEvent) { fn on_behaviour_event(&mut self, event: Self::InEvent) {
#[allow(deprecated)] self.inner.on_behaviour_event(event)
self.inner.inject_event(event)
} }
fn connection_keep_alive(&self) -> KeepAlive { fn connection_keep_alive(&self) -> KeepAlive {
@ -96,35 +93,6 @@ where
Self::OutboundOpenInfo, Self::OutboundOpenInfo,
>, >,
) { ) {
match event { self.inner.on_connection_event(event);
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { protocol, info }) =>
{
#[allow(deprecated)]
self.inner.inject_fully_negotiated_inbound(protocol, info)
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol,
info,
}) =>
{
#[allow(deprecated)]
self.inner.inject_fully_negotiated_outbound(protocol, info)
}
ConnectionEvent::AddressChange(AddressChange { new_address }) =>
{
#[allow(deprecated)]
self.inner.inject_address_change(new_address)
}
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) =>
{
#[allow(deprecated)]
self.inner.inject_dial_upgrade_error(info, error)
}
ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info, error }) =>
{
#[allow(deprecated)]
self.inner.inject_listen_upgrade_error(info, error)
}
}
} }
} }

View File

@ -22,14 +22,15 @@
//! indexed by some key. //! indexed by some key.
use crate::handler::{ use crate::handler::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler, AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent,
KeepAlive, SubstreamProtocol, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
IntoConnectionHandler, KeepAlive, ListenUpgradeError, SubstreamProtocol,
}; };
use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend}; use crate::upgrade::{InboundUpgradeSend, OutboundUpgradeSend, UpgradeInfoSend};
use crate::NegotiatedSubstream; use crate::NegotiatedSubstream;
use futures::{future::BoxFuture, prelude::*}; use futures::{future::BoxFuture, prelude::*};
use libp2p_core::upgrade::{NegotiationError, ProtocolError, ProtocolName, UpgradeError}; use libp2p_core::upgrade::{NegotiationError, ProtocolError, ProtocolName, UpgradeError};
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId}; use libp2p_core::{ConnectedPoint, PeerId};
use rand::Rng; use rand::Rng;
use std::{ use std::{
cmp, cmp,
@ -62,7 +63,7 @@ where
impl<K, H> MultiHandler<K, H> impl<K, H> MultiHandler<K, H>
where where
K: Hash + Eq, K: Clone + Debug + Hash + Eq + Send + 'static,
H: ConnectionHandler, H: ConnectionHandler,
{ {
/// Create and populate a `MultiHandler` from the given handler iterator. /// Create and populate a `MultiHandler` from the given handler iterator.
@ -82,6 +83,134 @@ where
)?; )?;
Ok(m) Ok(m)
} }
fn on_listen_upgrade_error(
&mut self,
ListenUpgradeError { error, mut info }: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
match error {
ConnectionHandlerUpgrErr::Timer => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Timer,
},
));
}
}
}
ConnectionHandlerUpgrErr::Timeout => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Timeout,
},
));
}
}
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)),
},
));
}
}
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)) => match e {
ProtocolError::IoError(e) => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::IoError(
e.kind().into(),
));
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
e,
)),
},
));
}
}
}
ProtocolError::InvalidMessage => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
e,
)),
},
));
}
}
}
ProtocolError::InvalidProtocol => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
e,
)),
},
));
}
}
}
ProtocolError::TooManyProtocols => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e =
NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
e,
)),
},
));
}
}
}
},
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) => {
if let Some(h) = self.handlers.get_mut(&k) {
if let Some(i) = info.take(&k) {
h.on_connection_event(ConnectionEvent::ListenUpgradeError(
ListenUpgradeError {
info: i,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
},
));
}
}
}
}
}
} }
impl<K, H> ConnectionHandler for MultiHandler<K, H> impl<K, H> ConnectionHandler for MultiHandler<K, H>
@ -121,164 +250,79 @@ where
SubstreamProtocol::new(upgrade, info).with_timeout(timeout) SubstreamProtocol::new(upgrade, info).with_timeout(timeout)
} }
fn inject_fully_negotiated_outbound( fn on_connection_event(
&mut self, &mut self,
protocol: <Self::OutboundProtocol as OutboundUpgradeSend>::Output, event: ConnectionEvent<
(key, arg): Self::OutboundOpenInfo, Self::InboundProtocol,
Self::OutboundProtocol,
Self::InboundOpenInfo,
Self::OutboundOpenInfo,
>,
) { ) {
if let Some(h) = self.handlers.get_mut(&key) { match event {
#[allow(deprecated)] ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
h.inject_fully_negotiated_outbound(protocol, arg) protocol,
} else { info: (key, arg),
log::error!("inject_fully_negotiated_outbound: no handler for key") }) => {
} if let Some(h) = self.handlers.get_mut(&key) {
} h.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(
FullyNegotiatedOutbound {
fn inject_fully_negotiated_inbound( protocol,
&mut self, info: arg,
(key, arg): <Self::InboundProtocol as InboundUpgradeSend>::Output, },
mut info: Self::InboundOpenInfo, ));
) { } else {
if let Some(h) = self.handlers.get_mut(&key) { log::error!("FullyNegotiatedOutbound: no handler for key")
if let Some(i) = info.take(&key) { }
#[allow(deprecated)] }
h.inject_fully_negotiated_inbound(arg, i) ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol: (key, arg),
mut info,
}) => {
if let Some(h) = self.handlers.get_mut(&key) {
if let Some(i) = info.take(&key) {
h.on_connection_event(ConnectionEvent::FullyNegotiatedInbound(
FullyNegotiatedInbound {
protocol: arg,
info: i,
},
));
}
} else {
log::error!("FullyNegotiatedInbound: no handler for key")
}
}
ConnectionEvent::AddressChange(AddressChange { new_address }) => {
for h in self.handlers.values_mut() {
h.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
new_address,
}));
}
}
ConnectionEvent::DialUpgradeError(DialUpgradeError {
info: (key, arg),
error,
}) => {
if let Some(h) = self.handlers.get_mut(&key) {
h.on_connection_event(ConnectionEvent::DialUpgradeError(DialUpgradeError {
info: arg,
error,
}));
} else {
log::error!("DialUpgradeError: no handler for protocol")
}
}
ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
self.on_listen_upgrade_error(listen_upgrade_error)
} }
} else {
log::error!("inject_fully_negotiated_inbound: no handler for key")
} }
} }
fn on_behaviour_event(&mut self, (key, event): Self::InEvent) { fn on_behaviour_event(&mut self, (key, event): Self::InEvent) {
if let Some(h) = self.handlers.get_mut(&key) { if let Some(h) = self.handlers.get_mut(&key) {
#[allow(deprecated)] h.on_behaviour_event(event)
h.inject_event(event)
} else { } else {
log::error!("inject_event: no handler for key") log::error!("on_behaviour_event: no handler for key")
}
}
fn inject_address_change(&mut self, addr: &Multiaddr) {
for h in self.handlers.values_mut() {
#[allow(deprecated)]
h.inject_address_change(addr)
}
}
fn inject_dial_upgrade_error(
&mut self,
(key, arg): Self::OutboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::OutboundProtocol as OutboundUpgradeSend>::Error>,
) {
if let Some(h) = self.handlers.get_mut(&key) {
#[allow(deprecated)]
h.inject_dial_upgrade_error(arg, error)
} else {
log::error!("inject_dial_upgrade_error: no handler for protocol")
}
}
fn inject_listen_upgrade_error(
&mut self,
mut info: Self::InboundOpenInfo,
error: ConnectionHandlerUpgrErr<<Self::InboundProtocol as InboundUpgradeSend>::Error>,
) {
match error {
ConnectionHandlerUpgrErr::Timer => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
#[allow(deprecated)]
h.inject_listen_upgrade_error(i, ConnectionHandlerUpgrErr::Timer)
}
}
}
ConnectionHandlerUpgrErr::Timeout => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
#[allow(deprecated)]
h.inject_listen_upgrade_error(i, ConnectionHandlerUpgrErr::Timeout)
}
}
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
#[allow(deprecated)]
h.inject_listen_upgrade_error(
i,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed,
)),
)
}
}
}
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e),
)) => match e {
ProtocolError::IoError(e) => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::IoError(
e.kind().into(),
));
#[allow(deprecated)]
h.inject_listen_upgrade_error(
i,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)),
)
}
}
}
ProtocolError::InvalidMessage => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidMessage);
#[allow(deprecated)]
h.inject_listen_upgrade_error(
i,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)),
)
}
}
}
ProtocolError::InvalidProtocol => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e = NegotiationError::ProtocolError(ProtocolError::InvalidProtocol);
#[allow(deprecated)]
h.inject_listen_upgrade_error(
i,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)),
)
}
}
}
ProtocolError::TooManyProtocols => {
for (k, h) in &mut self.handlers {
if let Some(i) = info.take(k) {
let e =
NegotiationError::ProtocolError(ProtocolError::TooManyProtocols);
#[allow(deprecated)]
h.inject_listen_upgrade_error(
i,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e)),
)
}
}
}
},
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply((k, e))) => {
if let Some(h) = self.handlers.get_mut(&k) {
if let Some(i) = info.take(&k) {
#[allow(deprecated)]
h.inject_listen_upgrade_error(
i,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
)
}
}
}
} }
} }

View File

@ -19,12 +19,14 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::handler::{ use crate::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, AddressChange, ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent,
DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, IntoConnectionHandler, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
KeepAlive, ListenUpgradeError, SubstreamProtocol, InboundUpgradeSend, IntoConnectionHandler, KeepAlive, ListenUpgradeError, OutboundUpgradeSend,
SubstreamProtocol,
}; };
use crate::upgrade::SendWrapper; use crate::upgrade::SendWrapper;
use either::Either;
use libp2p_core::{ use libp2p_core::{
either::{EitherError, EitherOutput}, either::{EitherError, EitherOutput},
upgrade::{EitherUpgrade, NegotiationError, ProtocolError, SelectUpgrade, UpgradeError}, upgrade::{EitherUpgrade, NegotiationError, ProtocolError, SelectUpgrade, UpgradeError},
@ -98,136 +100,135 @@ impl<TProto1, TProto2> ConnectionHandlerSelect<TProto1, TProto2> {
} }
} }
impl<S1OOI, S2OOI, S1OP, S2OP>
FullyNegotiatedOutbound<
EitherUpgrade<SendWrapper<S1OP>, SendWrapper<S2OP>>,
EitherOutput<S1OOI, S2OOI>,
>
where
S1OP: OutboundUpgradeSend,
S2OP: OutboundUpgradeSend,
S1OOI: Send + 'static,
S2OOI: Send + 'static,
{
fn transpose(
self,
) -> Either<FullyNegotiatedOutbound<S1OP, S1OOI>, FullyNegotiatedOutbound<S2OP, S2OOI>> {
match self {
FullyNegotiatedOutbound {
protocol: EitherOutput::First(protocol),
info: EitherOutput::First(info),
} => Either::Left(FullyNegotiatedOutbound { protocol, info }),
FullyNegotiatedOutbound {
protocol: EitherOutput::Second(protocol),
info: EitherOutput::Second(info),
} => Either::Right(FullyNegotiatedOutbound { protocol, info }),
_ => panic!("wrong API usage: the protocol doesn't match the upgrade info"),
}
}
}
impl<S1IP, S1IOI, S2IP, S2IOI>
FullyNegotiatedInbound<SelectUpgrade<SendWrapper<S1IP>, SendWrapper<S2IP>>, (S1IOI, S2IOI)>
where
S1IP: InboundUpgradeSend,
S2IP: InboundUpgradeSend,
{
fn transpose(
self,
) -> Either<FullyNegotiatedInbound<S1IP, S1IOI>, FullyNegotiatedInbound<S2IP, S2IOI>> {
match self {
FullyNegotiatedInbound {
protocol: EitherOutput::First(protocol),
info: (i1, _i2),
} => Either::Left(FullyNegotiatedInbound { protocol, info: i1 }),
FullyNegotiatedInbound {
protocol: EitherOutput::Second(protocol),
info: (_i1, i2),
} => Either::Right(FullyNegotiatedInbound { protocol, info: i2 }),
}
}
}
impl<S1OOI, S2OOI, S1OP, S2OP>
DialUpgradeError<
EitherOutput<S1OOI, S2OOI>,
EitherUpgrade<SendWrapper<S1OP>, SendWrapper<S2OP>>,
>
where
S1OP: OutboundUpgradeSend,
S2OP: OutboundUpgradeSend,
S1OOI: Send + 'static,
S2OOI: Send + 'static,
{
fn transpose(self) -> Either<DialUpgradeError<S1OOI, S1OP>, DialUpgradeError<S2OOI, S2OP>> {
match self {
DialUpgradeError {
info: EitherOutput::First(info),
error: ConnectionHandlerUpgrErr::Timer,
} => Either::Left(DialUpgradeError {
info,
error: ConnectionHandlerUpgrErr::Timer,
}),
DialUpgradeError {
info: EitherOutput::First(info),
error: ConnectionHandlerUpgrErr::Timeout,
} => Either::Left(DialUpgradeError {
info,
error: ConnectionHandlerUpgrErr::Timeout,
}),
DialUpgradeError {
info: EitherOutput::First(info),
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
} => Either::Left(DialUpgradeError {
info,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
}),
DialUpgradeError {
info: EitherOutput::First(info),
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))),
} => Either::Left(DialUpgradeError {
info,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)),
}),
DialUpgradeError {
info: EitherOutput::Second(info),
error: ConnectionHandlerUpgrErr::Timer,
} => Either::Right(DialUpgradeError {
info,
error: ConnectionHandlerUpgrErr::Timer,
}),
DialUpgradeError {
info: EitherOutput::Second(info),
error: ConnectionHandlerUpgrErr::Timeout,
} => Either::Right(DialUpgradeError {
info,
error: ConnectionHandlerUpgrErr::Timeout,
}),
DialUpgradeError {
info: EitherOutput::Second(info),
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
} => Either::Right(DialUpgradeError {
info,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
}),
DialUpgradeError {
info: EitherOutput::Second(info),
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(err))),
} => Either::Right(DialUpgradeError {
info,
error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)),
}),
_ => panic!("Wrong API usage; the upgrade error doesn't match the outbound open info"),
}
}
}
impl<TProto1, TProto2> ConnectionHandlerSelect<TProto1, TProto2> impl<TProto1, TProto2> ConnectionHandlerSelect<TProto1, TProto2>
where where
TProto1: ConnectionHandler, TProto1: ConnectionHandler,
TProto2: ConnectionHandler, TProto2: ConnectionHandler,
{ {
fn on_fully_negotiated_outbound(
&mut self,
FullyNegotiatedOutbound {
protocol,
info: endpoint,
}: FullyNegotiatedOutbound<
<Self as ConnectionHandler>::OutboundProtocol,
<Self as ConnectionHandler>::OutboundOpenInfo,
>,
) {
match (protocol, endpoint) {
(EitherOutput::First(protocol), EitherOutput::First(info)) =>
{
#[allow(deprecated)]
self.proto1.inject_fully_negotiated_outbound(protocol, info)
}
(EitherOutput::Second(protocol), EitherOutput::Second(info)) =>
{
#[allow(deprecated)]
self.proto2.inject_fully_negotiated_outbound(protocol, info)
}
(EitherOutput::First(_), EitherOutput::Second(_)) => {
panic!("wrong API usage: the protocol doesn't match the upgrade info")
}
(EitherOutput::Second(_), EitherOutput::First(_)) => {
panic!("wrong API usage: the protocol doesn't match the upgrade info")
}
}
}
fn on_fully_negotiated_inbound(
&mut self,
FullyNegotiatedInbound {
protocol,
info: (i1, i2),
}: FullyNegotiatedInbound<
<Self as ConnectionHandler>::InboundProtocol,
<Self as ConnectionHandler>::InboundOpenInfo,
>,
) {
match protocol {
EitherOutput::First(protocol) =>
{
#[allow(deprecated)]
self.proto1.inject_fully_negotiated_inbound(protocol, i1)
}
EitherOutput::Second(protocol) =>
{
#[allow(deprecated)]
self.proto2.inject_fully_negotiated_inbound(protocol, i2)
}
}
}
fn on_dial_upgrade_error(
&mut self,
DialUpgradeError { info, error }: DialUpgradeError<
<Self as ConnectionHandler>::OutboundOpenInfo,
<Self as ConnectionHandler>::OutboundProtocol,
>,
) {
match (info, error) {
#[allow(deprecated)]
(EitherOutput::First(info), ConnectionHandlerUpgrErr::Timer) => self
.proto1
.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer),
#[allow(deprecated)]
(EitherOutput::First(info), ConnectionHandlerUpgrErr::Timeout) => self
.proto1
.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout),
#[allow(deprecated)]
(
EitherOutput::First(info),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
) => self.proto1.inject_dial_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
),
#[allow(deprecated)]
(
EitherOutput::First(info),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(err))),
) => self.proto1.inject_dial_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)),
),
(
EitherOutput::First(_),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(_))),
) => {
panic!("Wrong API usage; the upgrade error doesn't match the outbound open info");
}
#[allow(deprecated)]
(EitherOutput::Second(info), ConnectionHandlerUpgrErr::Timeout) => self
.proto2
.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timeout),
#[allow(deprecated)]
(EitherOutput::Second(info), ConnectionHandlerUpgrErr::Timer) => self
.proto2
.inject_dial_upgrade_error(info, ConnectionHandlerUpgrErr::Timer),
#[allow(deprecated)]
(
EitherOutput::Second(info),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
) => self.proto2.inject_dial_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(err)),
),
#[allow(deprecated)]
(
EitherOutput::Second(info),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(err))),
) => self.proto2.inject_dial_upgrade_error(
info,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(err)),
),
(
EitherOutput::Second(_),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(_))),
) => {
panic!("Wrong API usage; the upgrade error doesn't match the outbound open info");
}
}
}
fn on_listen_upgrade_error( fn on_listen_upgrade_error(
&mut self, &mut self,
ListenUpgradeError { ListenUpgradeError {
@ -240,36 +241,47 @@ where
) { ) {
match error { match error {
ConnectionHandlerUpgrErr::Timer => { ConnectionHandlerUpgrErr::Timer => {
#[allow(deprecated)]
self.proto1 self.proto1
.inject_listen_upgrade_error(i1, ConnectionHandlerUpgrErr::Timer); .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
#[allow(deprecated)] info: i1,
error: ConnectionHandlerUpgrErr::Timer,
}));
self.proto2 self.proto2
.inject_listen_upgrade_error(i2, ConnectionHandlerUpgrErr::Timer) .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i2,
error: ConnectionHandlerUpgrErr::Timer,
}));
} }
ConnectionHandlerUpgrErr::Timeout => { ConnectionHandlerUpgrErr::Timeout => {
#[allow(deprecated)]
self.proto1 self.proto1
.inject_listen_upgrade_error(i1, ConnectionHandlerUpgrErr::Timeout); .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
#[allow(deprecated)] info: i1,
error: ConnectionHandlerUpgrErr::Timeout,
}));
self.proto2 self.proto2
.inject_listen_upgrade_error(i2, ConnectionHandlerUpgrErr::Timeout) .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
info: i2,
error: ConnectionHandlerUpgrErr::Timeout,
}));
} }
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => { ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(NegotiationError::Failed)) => {
#[allow(deprecated)] self.proto1
self.proto1.inject_listen_upgrade_error( .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
i1, info: i1,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::Failed, NegotiationError::Failed,
)), )),
); }));
#[allow(deprecated)]
self.proto2.inject_listen_upgrade_error( self.proto2
i2, .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( info: i2,
NegotiationError::Failed, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
)), NegotiationError::Failed,
); )),
}));
} }
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select( ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(
NegotiationError::ProtocolError(e), NegotiationError::ProtocolError(e),
@ -295,32 +307,30 @@ where
e2 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols) e2 = NegotiationError::ProtocolError(ProtocolError::TooManyProtocols)
} }
} }
#[allow(deprecated)] self.proto1
self.proto1.inject_listen_upgrade_error( .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
i1, info: i1,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e1)), error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e1)),
); }));
#[allow(deprecated)] self.proto2
self.proto2.inject_listen_upgrade_error( .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
i2, info: i2,
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e2)), error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Select(e2)),
) }));
} }
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::A(e))) => {
{ self.proto1
#[allow(deprecated)] .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
self.proto1.inject_listen_upgrade_error( info: i1,
i1, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), }));
)
} }
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(EitherError::B(e))) => {
{ self.proto2
#[allow(deprecated)] .on_connection_event(ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
self.proto2.inject_listen_upgrade_error( info: i2,
i2, error: ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)),
ConnectionHandlerUpgrErr::Upgrade(UpgradeError::Apply(e)), }));
)
} }
} }
} }
@ -357,10 +367,8 @@ where
fn on_behaviour_event(&mut self, event: Self::InEvent) { fn on_behaviour_event(&mut self, event: Self::InEvent) {
match event { match event {
#[allow(deprecated)] EitherOutput::First(event) => self.proto1.on_behaviour_event(event),
EitherOutput::First(event) => self.proto1.inject_event(event), EitherOutput::Second(event) => self.proto2.on_behaviour_event(event),
#[allow(deprecated)]
EitherOutput::Second(event) => self.proto2.inject_event(event),
} }
} }
@ -430,19 +438,45 @@ where
) { ) {
match event { match event {
ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => { ConnectionEvent::FullyNegotiatedOutbound(fully_negotiated_outbound) => {
self.on_fully_negotiated_outbound(fully_negotiated_outbound) match fully_negotiated_outbound.transpose() {
Either::Left(f) => self
.proto1
.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(f)),
Either::Right(f) => self
.proto2
.on_connection_event(ConnectionEvent::FullyNegotiatedOutbound(f)),
}
} }
ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => { ConnectionEvent::FullyNegotiatedInbound(fully_negotiated_inbound) => {
self.on_fully_negotiated_inbound(fully_negotiated_inbound) match fully_negotiated_inbound.transpose() {
Either::Left(f) => self
.proto1
.on_connection_event(ConnectionEvent::FullyNegotiatedInbound(f)),
Either::Right(f) => self
.proto2
.on_connection_event(ConnectionEvent::FullyNegotiatedInbound(f)),
}
} }
ConnectionEvent::AddressChange(address) => { ConnectionEvent::AddressChange(address) => {
#[allow(deprecated)] self.proto1
self.proto1.inject_address_change(address.new_address); .on_connection_event(ConnectionEvent::AddressChange(AddressChange {
#[allow(deprecated)] new_address: address.new_address,
self.proto2.inject_address_change(address.new_address) }));
self.proto2
.on_connection_event(ConnectionEvent::AddressChange(AddressChange {
new_address: address.new_address,
}));
} }
ConnectionEvent::DialUpgradeError(dial_upgrade_error) => { ConnectionEvent::DialUpgradeError(dial_upgrade_error) => {
self.on_dial_upgrade_error(dial_upgrade_error) match dial_upgrade_error.transpose() {
Either::Left(err) => self
.proto1
.on_connection_event(ConnectionEvent::DialUpgradeError(err)),
Either::Right(err) => self
.proto2
.on_connection_event(ConnectionEvent::DialUpgradeError(err)),
}
} }
ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => { ConnectionEvent::ListenUpgradeError(listen_upgrade_error) => {
self.on_listen_upgrade_error(listen_upgrade_error) self.on_listen_upgrade_error(listen_upgrade_error)

View File

@ -101,8 +101,10 @@ pub mod derive_prelude {
} }
pub use behaviour::{ pub use behaviour::{
CloseConnection, ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, AddressChange, CloseConnection, ConnectionClosed, DialFailure, ExpiredExternalAddr,
NotifyHandler, PollParameters, ExpiredListenAddr, ExternalAddresses, FromSwarm, ListenAddresses, ListenFailure,
ListenerClosed, ListenerError, NetworkBehaviour, NetworkBehaviourAction, NewExternalAddr,
NewListenAddr, NotifyHandler, PollParameters,
}; };
pub use connection::pool::{ConnectionCounters, ConnectionLimits}; pub use connection::pool::{ConnectionCounters, ConnectionLimits};
pub use connection::{ pub use connection::{
@ -462,8 +464,10 @@ where
/// Depending on the underlying transport, one listener may have multiple listening addresses. /// Depending on the underlying transport, one listener may have multiple listening addresses.
pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> { pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
let id = self.transport.listen_on(addr)?; let id = self.transport.listen_on(addr)?;
#[allow(deprecated)] self.behaviour
self.behaviour.inject_new_listener(id); .on_swarm_event(FromSwarm::NewListener(behaviour::NewListener {
listener_id: id,
}));
Ok(id) Ok(id)
} }
@ -487,9 +491,9 @@ where
/// # use libp2p_swarm::dummy; /// # use libp2p_swarm::dummy;
/// # /// #
/// let mut swarm = Swarm::without_executor( /// let mut swarm = Swarm::without_executor(
/// DummyTransport::new().boxed(), /// DummyTransport::new().boxed(),
/// dummy::Behaviour, /// dummy::Behaviour,
/// PeerId::random(), /// PeerId::random(),
/// ); /// );
/// ///
/// // Dial a known peer. /// // Dial a known peer.
@ -524,8 +528,12 @@ where
if !should_dial { if !should_dial {
let e = DialError::DialPeerConditionFalse(condition); let e = DialError::DialPeerConditionFalse(condition);
#[allow(deprecated)] self.behaviour
self.behaviour.inject_dial_failure(peer_id, handler, &e); .on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler,
error: &e,
}));
return Err(e); return Err(e);
} }
@ -534,9 +542,13 @@ where
// Check if peer is banned. // Check if peer is banned.
if self.banned_peers.contains(&peer_id) { if self.banned_peers.contains(&peer_id) {
let error = DialError::Banned; let error = DialError::Banned;
#[allow(deprecated)]
self.behaviour self.behaviour
.inject_dial_failure(Some(peer_id), handler, &error); .on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id: Some(peer_id),
handler,
error: &error,
}));
return Err(error); return Err(error);
} }
} }
@ -558,8 +570,12 @@ where
if addresses.is_empty() { if addresses.is_empty() {
let error = DialError::NoAddresses; let error = DialError::NoAddresses;
#[allow(deprecated)] self.behaviour
self.behaviour.inject_dial_failure(peer_id, handler, &error); .on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler,
error: &error,
}));
return Err(error); return Err(error);
}; };
@ -599,8 +615,13 @@ where
Ok(_connection_id) => Ok(()), Ok(_connection_id) => Ok(()),
Err((connection_limit, handler)) => { Err((connection_limit, handler)) => {
let error = DialError::ConnectionLimit(connection_limit); let error = DialError::ConnectionLimit(connection_limit);
#[allow(deprecated)] self.behaviour
self.behaviour.inject_dial_failure(peer_id, handler, &error); .on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler,
error: &error,
}));
Err(error) Err(error)
} }
} }
@ -641,15 +662,17 @@ where
let result = self.external_addrs.add(a.clone(), s); let result = self.external_addrs.add(a.clone(), s);
let expired = match &result { let expired = match &result {
AddAddressResult::Inserted { expired } => { AddAddressResult::Inserted { expired } => {
#[allow(deprecated)] self.behaviour
self.behaviour.inject_new_external_addr(&a); .on_swarm_event(FromSwarm::NewExternalAddr(NewExternalAddr { addr: &a }));
expired expired
} }
AddAddressResult::Updated { expired } => expired, AddAddressResult::Updated { expired } => expired,
}; };
for a in expired { for a in expired {
#[allow(deprecated)] self.behaviour
self.behaviour.inject_expired_external_addr(&a.addr); .on_swarm_event(FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr {
addr: &a.addr,
}));
} }
result result
} }
@ -662,8 +685,8 @@ where
/// otherwise. /// otherwise.
pub fn remove_external_address(&mut self, addr: &Multiaddr) -> bool { pub fn remove_external_address(&mut self, addr: &Multiaddr) -> bool {
if self.external_addrs.remove(addr) { if self.external_addrs.remove(addr) {
#[allow(deprecated)] self.behaviour
self.behaviour.inject_expired_external_addr(addr); .on_swarm_event(FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }));
true true
} else { } else {
false false
@ -769,15 +792,23 @@ where
); );
let failed_addresses = concurrent_dial_errors let failed_addresses = concurrent_dial_errors
.as_ref() .as_ref()
.map(|es| es.iter().map(|(a, _)| a).cloned().collect()); .map(|es| {
#[allow(deprecated)] es.iter()
self.behaviour.inject_connection_established( .map(|(a, _)| a)
&peer_id, .cloned()
&id, .collect::<Vec<Multiaddr>>()
&endpoint, })
failed_addresses.as_ref(), .unwrap_or_default();
non_banned_established, self.behaviour
); .on_swarm_event(FromSwarm::ConnectionEstablished(
behaviour::ConnectionEstablished {
peer_id,
connection_id: id,
endpoint: &endpoint,
failed_addresses: &failed_addresses,
other_established: non_banned_established,
},
));
return Some(SwarmEvent::ConnectionEstablished { return Some(SwarmEvent::ConnectionEstablished {
peer_id, peer_id,
num_established, num_established,
@ -795,8 +826,12 @@ where
} => { } => {
let error = error.into(); let error = error.into();
#[allow(deprecated)] self.behaviour
self.behaviour.inject_dial_failure(peer, handler, &error); .on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id: peer,
handler,
error: &error,
}));
if let Some(peer) = peer { if let Some(peer) = peer {
log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,); log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,);
@ -817,9 +852,12 @@ where
handler, handler,
} => { } => {
log::debug!("Incoming connection failed: {:?}", error); log::debug!("Incoming connection failed: {:?}", error);
#[allow(deprecated)]
self.behaviour self.behaviour
.inject_listen_failure(&local_addr, &send_back_addr, handler); .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
handler,
}));
return Some(SwarmEvent::IncomingConnectionError { return Some(SwarmEvent::IncomingConnectionError {
local_addr, local_addr,
send_back_addr, send_back_addr,
@ -858,14 +896,14 @@ where
.into_iter() .into_iter()
.filter(|conn_id| !self.banned_peer_connections.contains(conn_id)) .filter(|conn_id| !self.banned_peer_connections.contains(conn_id))
.count(); .count();
#[allow(deprecated)] self.behaviour
self.behaviour.inject_connection_closed( .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
&peer_id, peer_id,
&id, connection_id: id,
&endpoint, endpoint: &endpoint,
handler, handler,
remaining_non_banned, remaining_established: remaining_non_banned,
); }));
} }
return Some(SwarmEvent::ConnectionClosed { return Some(SwarmEvent::ConnectionClosed {
peer_id, peer_id,
@ -878,8 +916,8 @@ where
if self.banned_peer_connections.contains(&id) { if self.banned_peer_connections.contains(&id) {
log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id); log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id);
} else { } else {
#[allow(deprecated)] self.behaviour
self.behaviour.inject_event(peer_id, id, event); .on_connection_handler_event(peer_id, id, event);
} }
} }
PoolEvent::AddressChange { PoolEvent::AddressChange {
@ -889,13 +927,13 @@ where
old_endpoint, old_endpoint,
} => { } => {
if !self.banned_peer_connections.contains(&id) { if !self.banned_peer_connections.contains(&id) {
#[allow(deprecated)] self.behaviour
self.behaviour.inject_address_change( .on_swarm_event(FromSwarm::AddressChange(AddressChange {
&peer_id, peer_id,
&id, connection_id: id,
&old_endpoint, old: &old_endpoint,
&new_endpoint, new: &new_endpoint,
); }));
} }
} }
} }
@ -933,9 +971,12 @@ where
}); });
} }
Err((connection_limit, handler)) => { Err((connection_limit, handler)) => {
#[allow(deprecated)]
self.behaviour self.behaviour
.inject_listen_failure(&local_addr, &send_back_addr, handler); .on_swarm_event(FromSwarm::ListenFailure(ListenFailure {
local_addr: &local_addr,
send_back_addr: &send_back_addr,
handler,
}));
log::warn!("Incoming connection rejected: {:?}", connection_limit); log::warn!("Incoming connection rejected: {:?}", connection_limit);
} }
}; };
@ -949,9 +990,11 @@ where
if !addrs.contains(&listen_addr) { if !addrs.contains(&listen_addr) {
addrs.push(listen_addr.clone()) addrs.push(listen_addr.clone())
} }
#[allow(deprecated)]
self.behaviour self.behaviour
.inject_new_listen_addr(listener_id, &listen_addr); .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
listener_id,
addr: &listen_addr,
}));
return Some(SwarmEvent::NewListenAddr { return Some(SwarmEvent::NewListenAddr {
listener_id, listener_id,
address: listen_addr, address: listen_addr,
@ -969,9 +1012,11 @@ where
if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) { if let Some(addrs) = self.listened_addrs.get_mut(&listener_id) {
addrs.retain(|a| a != &listen_addr); addrs.retain(|a| a != &listen_addr);
} }
#[allow(deprecated)]
self.behaviour self.behaviour
.inject_expired_listen_addr(listener_id, &listen_addr); .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
listener_id,
addr: &listen_addr,
}));
return Some(SwarmEvent::ExpiredListenAddr { return Some(SwarmEvent::ExpiredListenAddr {
listener_id, listener_id,
address: listen_addr, address: listen_addr,
@ -984,17 +1029,15 @@ where
log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason); log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default(); let addrs = self.listened_addrs.remove(&listener_id).unwrap_or_default();
for addr in addrs.iter() { for addr in addrs.iter() {
#[allow(deprecated)] self.behaviour.on_swarm_event(FromSwarm::ExpiredListenAddr(
self.behaviour.inject_expired_listen_addr(listener_id, addr); ExpiredListenAddr { listener_id, addr },
));
} }
#[allow(deprecated)] self.behaviour
self.behaviour.inject_listener_closed( .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
listener_id, listener_id,
match &reason { reason: reason.as_ref().copied(),
Ok(()) => Ok(()), }));
Err(err) => Err(err),
},
);
return Some(SwarmEvent::ListenerClosed { return Some(SwarmEvent::ListenerClosed {
listener_id, listener_id,
addresses: addrs.to_vec(), addresses: addrs.to_vec(),
@ -1002,8 +1045,11 @@ where
}); });
} }
TransportEvent::ListenerError { listener_id, error } => { TransportEvent::ListenerError { listener_id, error } => {
#[allow(deprecated)] self.behaviour
self.behaviour.inject_listener_error(listener_id, &error); .on_swarm_event(FromSwarm::ListenerError(ListenerError {
listener_id,
err: &error,
}));
return Some(SwarmEvent::ListenerError { listener_id, error }); return Some(SwarmEvent::ListenerError { listener_id, error });
} }
} }
@ -1797,12 +1843,12 @@ mod tests {
/// Establishes multiple connections between two peers, /// Establishes multiple connections between two peers,
/// after which one peer bans the other. /// after which one peer bans the other.
/// ///
/// The test expects both behaviours to be notified via pairs of /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
/// [`NetworkBehaviour::inject_connection_established`] / [`NetworkBehaviour::inject_connection_closed`] /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
/// calls while unbanned. /// while unbanned.
/// ///
/// While the ban is in effect, further dials occur. For these connections no /// While the ban is in effect, further dials occur. For these connections no
/// [`NetworkBehaviour::inject_connection_established`], [`NetworkBehaviour::inject_connection_closed`] /// [`FromSwarm::ConnectionEstablished`], [`FromSwarm::ConnectionClosed`]
/// calls should be registered. /// calls should be registered.
#[test] #[test]
fn test_connect_disconnect_ban() { fn test_connect_disconnect_ban() {
@ -1920,8 +1966,8 @@ mod tests {
/// Establishes multiple connections between two peers, /// Establishes multiple connections between two peers,
/// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`]. /// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
/// ///
/// The test expects both behaviours to be notified via pairs of /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
/// [`NetworkBehaviour::inject_connection_established`] / [`NetworkBehaviour::inject_connection_closed`] calls. /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
#[test] #[test]
fn test_swarm_disconnect() { fn test_swarm_disconnect() {
// Since the test does not try to open any substreams, we can // Since the test does not try to open any substreams, we can
@ -1986,8 +2032,8 @@ mod tests {
/// after which one peer disconnects the other /// after which one peer disconnects the other
/// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`]. /// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`].
/// ///
/// The test expects both behaviours to be notified via pairs of /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
/// [`NetworkBehaviour::inject_connection_established`] / [`NetworkBehaviour::inject_connection_closed`] calls. /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
#[test] #[test]
fn test_behaviour_disconnect_all() { fn test_behaviour_disconnect_all() {
// Since the test does not try to open any substreams, we can // Since the test does not try to open any substreams, we can
@ -2054,8 +2100,8 @@ mod tests {
/// after which one peer closes a single connection /// after which one peer closes a single connection
/// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`]. /// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`].
/// ///
/// The test expects both behaviours to be notified via pairs of /// The test expects both behaviours to be notified via calls to [`NetworkBehaviour::on_swarm_event`]
/// [`NetworkBehaviour::inject_connection_established`] / [`NetworkBehaviour::inject_connection_closed`] calls. /// with pairs of [`FromSwarm::ConnectionEstablished`] / [`FromSwarm::ConnectionClosed`]
#[test] #[test]
fn test_behaviour_disconnect_one() { fn test_behaviour_disconnect_one() {
// Since the test does not try to open any substreams, we can // Since the test does not try to open any substreams, we can

View File

@ -127,7 +127,7 @@ where
pub addresses_of_peer: Vec<PeerId>, pub addresses_of_peer: Vec<PeerId>,
pub on_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>, pub on_connection_established: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>,
pub on_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>, pub on_connection_closed: Vec<(PeerId, ConnectionId, ConnectedPoint, usize)>,
pub on_event: Vec<( pub on_connection_handler_event: Vec<(
PeerId, PeerId,
ConnectionId, ConnectionId,
<<TInner::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent, <<TInner::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
@ -155,7 +155,7 @@ where
addresses_of_peer: Vec::new(), addresses_of_peer: Vec::new(),
on_connection_established: Vec::new(), on_connection_established: Vec::new(),
on_connection_closed: Vec::new(), on_connection_closed: Vec::new(),
on_event: Vec::new(), on_connection_handler_event: Vec::new(),
on_dial_failure: Vec::new(), on_dial_failure: Vec::new(),
on_new_listener: Vec::new(), on_new_listener: Vec::new(),
on_new_listen_addr: Vec::new(), on_new_listen_addr: Vec::new(),
@ -173,7 +173,7 @@ where
self.addresses_of_peer = Vec::new(); self.addresses_of_peer = Vec::new();
self.on_connection_established = Vec::new(); self.on_connection_established = Vec::new();
self.on_connection_closed = Vec::new(); self.on_connection_closed = Vec::new();
self.on_event = Vec::new(); self.on_connection_handler_event = Vec::new();
self.on_dial_failure = Vec::new(); self.on_dial_failure = Vec::new();
self.on_new_listen_addr = Vec::new(); self.on_new_listen_addr = Vec::new();
self.on_new_external_addr = Vec::new(); self.on_new_external_addr = Vec::new();
@ -290,15 +290,14 @@ where
endpoint.clone(), endpoint.clone(),
other_established, other_established,
)); ));
let errors = Some(failed_addresses.to_vec()); self.inner
#[allow(deprecated)] .on_swarm_event(FromSwarm::ConnectionEstablished(ConnectionEstablished {
self.inner.inject_connection_established( peer_id,
&peer_id, connection_id,
&connection_id, endpoint,
endpoint, failed_addresses,
errors.as_ref(), other_established,
other_established, }));
);
} }
fn on_connection_closed( fn on_connection_closed(
@ -343,8 +342,8 @@ where
.iter() .iter()
.any(|(peer, conn_id, endpoint, _)| (peer, conn_id, endpoint) .any(|(peer, conn_id, endpoint, _)| (peer, conn_id, endpoint)
== (&peer_id, &connection_id, endpoint)), == (&peer_id, &connection_id, endpoint)),
"`inject_connection_closed` is called only for connections for \ "`on_swarm_event` with `FromSwarm::ConnectionClosed is called only for connections for\
which `inject_connection_established` was called first." which `on_swarm_event` with `FromSwarm::ConnectionEstablished` was called first."
); );
self.on_connection_closed.push(( self.on_connection_closed.push((
peer_id, peer_id,
@ -352,14 +351,14 @@ where
endpoint.clone(), endpoint.clone(),
remaining_established, remaining_established,
)); ));
#[allow(deprecated)] self.inner
self.inner.inject_connection_closed( .on_swarm_event(FromSwarm::ConnectionClosed(ConnectionClosed {
&peer_id, peer_id,
&connection_id, connection_id,
endpoint, endpoint,
handler, handler,
remaining_established, remaining_established,
); }));
} }
} }
@ -395,47 +394,60 @@ where
error, error,
}) => { }) => {
self.on_dial_failure.push(peer_id); self.on_dial_failure.push(peer_id);
#[allow(deprecated)] self.inner
self.inner.inject_dial_failure(peer_id, handler, error); .on_swarm_event(FromSwarm::DialFailure(DialFailure {
peer_id,
handler,
error,
}));
} }
FromSwarm::NewListener(NewListener { listener_id }) => { FromSwarm::NewListener(NewListener { listener_id }) => {
self.on_new_listener.push(listener_id); self.on_new_listener.push(listener_id);
#[allow(deprecated)] self.inner
self.inner.inject_new_listener(listener_id); .on_swarm_event(FromSwarm::NewListener(NewListener { listener_id }));
} }
FromSwarm::NewListenAddr(NewListenAddr { listener_id, addr }) => { FromSwarm::NewListenAddr(NewListenAddr { listener_id, addr }) => {
self.on_new_listen_addr.push((listener_id, addr.clone())); self.on_new_listen_addr.push((listener_id, addr.clone()));
#[allow(deprecated)] self.inner
self.inner.inject_new_listen_addr(listener_id, addr); .on_swarm_event(FromSwarm::NewListenAddr(NewListenAddr {
listener_id,
addr,
}));
} }
FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id, addr }) => { FromSwarm::ExpiredListenAddr(ExpiredListenAddr { listener_id, addr }) => {
self.on_expired_listen_addr self.on_expired_listen_addr
.push((listener_id, addr.clone())); .push((listener_id, addr.clone()));
#[allow(deprecated)] self.inner
self.inner.inject_expired_listen_addr(listener_id, addr); .on_swarm_event(FromSwarm::ExpiredListenAddr(ExpiredListenAddr {
listener_id,
addr,
}));
} }
FromSwarm::NewExternalAddr(NewExternalAddr { addr }) => { FromSwarm::NewExternalAddr(NewExternalAddr { addr }) => {
self.on_new_external_addr.push(addr.clone()); self.on_new_external_addr.push(addr.clone());
#[allow(deprecated)] self.inner
self.inner.inject_new_external_addr(addr); .on_swarm_event(FromSwarm::NewExternalAddr(NewExternalAddr { addr }));
} }
FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }) => { FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }) => {
self.on_expired_external_addr.push(addr.clone()); self.on_expired_external_addr.push(addr.clone());
#[allow(deprecated)] self.inner
self.inner.inject_expired_external_addr(addr); .on_swarm_event(FromSwarm::ExpiredExternalAddr(ExpiredExternalAddr { addr }));
} }
FromSwarm::ListenerError(ListenerError { listener_id, err }) => { FromSwarm::ListenerError(ListenerError { listener_id, err }) => {
self.on_listener_error.push(listener_id); self.on_listener_error.push(listener_id);
#[allow(deprecated)] self.inner
self.inner.inject_listener_error(listener_id, err); .on_swarm_event(FromSwarm::ListenerError(ListenerError { listener_id, err }));
} }
FromSwarm::ListenerClosed(ListenerClosed { FromSwarm::ListenerClosed(ListenerClosed {
listener_id, listener_id,
reason, reason,
}) => { }) => {
self.on_listener_closed.push((listener_id, reason.is_ok())); self.on_listener_closed.push((listener_id, reason.is_ok()));
#[allow(deprecated)] self.inner
self.inner.inject_listener_closed(listener_id, reason); .on_swarm_event(FromSwarm::ListenerClosed(ListenerClosed {
listener_id,
reason,
}));
} }
_ => {} _ => {}
} }
@ -461,9 +473,8 @@ where
"`on_connection_handler_event` is never called for closed connections." "`on_connection_handler_event` is never called for closed connections."
); );
self.on_event.push((p, c, e.clone())); self.on_connection_handler_event.push((p, c, e.clone()));
#[allow(deprecated)] self.inner.on_connection_handler_event(p, c, e);
self.inner.inject_event(p, c, e);
} }
fn poll( fn poll(