feat: move Identify I/O from NetworkBehaviour to ConnectionHandler (#3208)

Addresses #2885
This commit is contained in:
João Oliveira
2022-12-13 20:24:31 +00:00
committed by GitHub
parent c39d25ea08
commit f80c7141ab
4 changed files with 292 additions and 226 deletions

View File

@ -1,7 +1,12 @@
# 0.42.0 [unreleased] # 0.42.0 [unreleased]
- Move I/O from `Behaviour` to `Handler`. Handle `Behaviour`'s Identify and Push requests independently by incoming order,
previously Push requests were prioritized. see [PR 3208].
- Update to `libp2p-swarm` `v0.42.0`. - Update to `libp2p-swarm` `v0.42.0`.
[PR 3208]: https://github.com/libp2p/rust-libp2p/pull/3208
# 0.41.0 # 0.41.0
- Change default `cache_size` of `Config` to 100. See [PR 2995]. - Change default `cache_size` of `Config` to 100. See [PR 2995].

View File

@ -18,24 +18,21 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::handler::{self, Proto, Push}; use crate::handler::{self, InEvent, Proto};
use crate::protocol::{Info, ReplySubstream, UpgradeError}; use crate::protocol::{Info, Protocol, UpgradeError};
use futures::prelude::*;
use libp2p_core::{ use libp2p_core::{
connection::ConnectionId, multiaddr::Protocol, ConnectedPoint, Multiaddr, PeerId, PublicKey, connection::ConnectionId, multiaddr, ConnectedPoint, Multiaddr, PeerId, PublicKey,
}; };
use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm};
use libp2p_swarm::{ use libp2p_swarm::{
dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError, dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError,
IntoConnectionHandler, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
NotifyHandler, PollParameters,
}; };
use lru::LruCache; use lru::LruCache;
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::{ use std::{
collections::{HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet, VecDeque},
iter::FromIterator, iter::FromIterator,
pin::Pin,
task::Context, task::Context,
task::Poll, task::Poll,
time::Duration, time::Duration,
@ -51,30 +48,23 @@ pub struct Behaviour {
config: Config, config: Config,
/// For each peer we're connected to, the observed address to send back to it. /// For each peer we're connected to, the observed address to send back to it.
connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>, connected: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
/// Pending replies to send. /// Pending requests to be fulfilled, either `Handler` requests for `Behaviour` info
pending_replies: VecDeque<Reply>, /// to address identification requests, or push requests to peers
/// with current information about the local peer.
requests: Vec<Request>,
/// Pending events to be emitted when polled. /// Pending events to be emitted when polled.
events: VecDeque<NetworkBehaviourAction<Event, Proto>>, events: VecDeque<NetworkBehaviourAction<Event, Proto>>,
/// Peers to which an active push with current information about
/// the local peer should be sent.
pending_push: HashSet<PeerId>,
/// The addresses of all peers that we have discovered. /// The addresses of all peers that we have discovered.
discovered_peers: PeerCache, discovered_peers: PeerCache,
} }
/// A pending reply to an inbound identification request. /// A `Behaviour` request to be fulfilled, either `Handler` requests for `Behaviour` info
enum Reply { /// to address identification requests, or push requests to peers
/// The reply is queued for sending. /// with current information about the local peer.
Queued { #[derive(Debug, PartialEq, Eq)]
peer: PeerId, struct Request {
io: ReplySubstream<NegotiatedSubstream>, peer_id: PeerId,
observed: Multiaddr, protocol: Protocol,
},
/// The reply is being sent.
Sending {
peer: PeerId,
io: Pin<Box<dyn Future<Output = Result<(), UpgradeError>> + Send>>,
},
} }
/// Configuration for the [`identify::Behaviour`](Behaviour). /// Configuration for the [`identify::Behaviour`](Behaviour).
@ -184,9 +174,8 @@ impl Behaviour {
Self { Self {
config, config,
connected: HashMap::new(), connected: HashMap::new(),
pending_replies: VecDeque::new(), requests: Vec::new(),
events: VecDeque::new(), events: VecDeque::new(),
pending_push: HashSet::new(),
discovered_peers, discovered_peers,
} }
} }
@ -197,7 +186,13 @@ impl Behaviour {
I: IntoIterator<Item = PeerId>, I: IntoIterator<Item = PeerId>,
{ {
for p in peers { for p in peers {
if self.pending_push.insert(p) && !self.connected.contains_key(&p) { let request = Request {
peer_id: p,
protocol: Protocol::Push,
};
if !self.requests.contains(&request) {
self.requests.push(request);
let handler = self.new_handler(); let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::Dial { self.events.push_back(NetworkBehaviourAction::Dial {
opts: DialOpts::peer_id(p).build(), opts: DialOpts::peer_id(p).build(),
@ -240,13 +235,19 @@ impl NetworkBehaviour for Behaviour {
type OutEvent = Event; type OutEvent = Event;
fn new_handler(&mut self) -> Self::ConnectionHandler { fn new_handler(&mut self) -> Self::ConnectionHandler {
Proto::new(self.config.initial_delay, self.config.interval) Proto::new(
self.config.initial_delay,
self.config.interval,
self.config.local_public_key.clone(),
self.config.protocol_version.clone(),
self.config.agent_version.clone(),
)
} }
fn on_connection_handler_event( fn on_connection_handler_event(
&mut self, &mut self,
peer_id: PeerId, peer_id: PeerId,
connection: ConnectionId, connection_id: ConnectionId,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent, event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
) { ) {
match event { match event {
@ -271,26 +272,22 @@ impl NetworkBehaviour for Behaviour {
score: AddressScore::Finite(1), score: AddressScore::Finite(1),
}); });
} }
handler::Event::Identification(peer) => {
self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent {
peer_id: peer,
}));
}
handler::Event::IdentificationPushed => { handler::Event::IdentificationPushed => {
self.events self.events
.push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed { .push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed {
peer_id, peer_id,
})); }));
} }
handler::Event::Identify(sender) => { handler::Event::Identify => {
let observed = self self.requests.push(Request {
.connected peer_id,
.get(&peer_id) protocol: Protocol::Identify(connection_id),
.and_then(|addrs| addrs.get(&connection))
.expect(
"`on_connection_handler_event` is only called \
with an established connection and calling `NetworkBehaviour::on_event` \
with `FromSwarm::ConnectionEstablished ensures there is an entry; qed",
);
self.pending_replies.push_back(Reply::Queued {
peer: peer_id,
io: sender,
observed: observed.clone(),
}); });
} }
handler::Event::IdentificationError(error) => { handler::Event::IdentificationError(error) => {
@ -305,99 +302,41 @@ impl NetworkBehaviour for Behaviour {
fn poll( fn poll(
&mut self, &mut self,
cx: &mut Context<'_>, _cx: &mut Context<'_>,
params: &mut impl PollParameters, params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> { ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(event) = self.events.pop_front() { if let Some(event) = self.events.pop_front() {
return Poll::Ready(event); return Poll::Ready(event);
} }
// Check for a pending active push to perform. // Check for pending requests.
let peer_push = self.pending_push.iter().find_map(|peer| { match self.requests.pop() {
self.connected.get(peer).map(|conns| { Some(Request {
let observed_addr = conns peer_id,
.values() protocol: Protocol::Push,
.next() }) => Poll::Ready(NetworkBehaviourAction::NotifyHandler {
.expect("connected peer has a connection")
.clone();
let listen_addrs = listen_addrs(params);
let protocols = supported_protocols(params);
let info = Info {
public_key: self.config.local_public_key.clone(),
protocol_version: self.config.protocol_version.clone(),
agent_version: self.config.agent_version.clone(),
listen_addrs,
protocols,
observed_addr,
};
(*peer, Push(info))
})
});
if let Some((peer_id, push)) = peer_push {
self.pending_push.remove(&peer_id);
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id, peer_id,
event: push,
handler: NotifyHandler::Any, handler: NotifyHandler::Any,
}); event: InEvent {
listen_addrs: listen_addrs(params),
supported_protocols: supported_protocols(params),
protocol: Protocol::Push,
},
}),
Some(Request {
peer_id,
protocol: Protocol::Identify(connection_id),
}) => Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event: InEvent {
listen_addrs: listen_addrs(params),
supported_protocols: supported_protocols(params),
protocol: Protocol::Identify(connection_id),
},
}),
None => Poll::Pending,
} }
// Check for pending replies to send.
if let Some(r) = self.pending_replies.pop_front() {
let mut sending = 0;
let to_send = self.pending_replies.len() + 1;
let mut reply = Some(r);
loop {
match reply {
Some(Reply::Queued { peer, io, observed }) => {
let info = Info {
listen_addrs: listen_addrs(params),
protocols: supported_protocols(params),
public_key: self.config.local_public_key.clone(),
protocol_version: self.config.protocol_version.clone(),
agent_version: self.config.agent_version.clone(),
observed_addr: observed,
};
let io = Box::pin(io.send(info));
reply = Some(Reply::Sending { peer, io });
}
Some(Reply::Sending { peer, mut io }) => {
sending += 1;
match Future::poll(Pin::new(&mut io), cx) {
Poll::Ready(Ok(())) => {
let event = Event::Sent { peer_id: peer };
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
Poll::Pending => {
self.pending_replies.push_back(Reply::Sending { peer, io });
if sending == to_send {
// All remaining futures are NotReady
break;
} else {
reply = self.pending_replies.pop_front();
}
}
Poll::Ready(Err(err)) => {
let event = Event::Error {
peer_id: peer,
error: ConnectionHandlerUpgrErr::Upgrade(
libp2p_core::upgrade::UpgradeError::Apply(err),
),
};
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event));
}
}
}
None => unreachable!(),
}
}
}
Poll::Pending
} }
fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> { fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
@ -417,7 +356,13 @@ impl NetworkBehaviour for Behaviour {
}) => { }) => {
if remaining_established == 0 { if remaining_established == 0 {
self.connected.remove(&peer_id); self.connected.remove(&peer_id);
self.pending_push.remove(&peer_id); self.requests.retain(|request| {
request
!= &Request {
peer_id,
protocol: Protocol::Push,
}
});
} else if let Some(addrs) = self.connected.get_mut(&peer_id) { } else if let Some(addrs) = self.connected.get_mut(&peer_id) {
addrs.remove(&connection_id); addrs.remove(&connection_id);
} }
@ -425,7 +370,13 @@ impl NetworkBehaviour for Behaviour {
FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => { FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) => {
if let Some(peer_id) = peer_id { if let Some(peer_id) = peer_id {
if !self.connected.contains_key(&peer_id) { if !self.connected.contains_key(&peer_id) {
self.pending_push.remove(&peer_id); self.requests.retain(|request| {
request
!= &Request {
peer_id,
protocol: Protocol::Push,
}
});
} }
} }
@ -437,14 +388,17 @@ impl NetworkBehaviour for Behaviour {
} }
} }
} }
FromSwarm::NewListenAddr(_) => { FromSwarm::NewListenAddr(_) | FromSwarm::ExpiredListenAddr(_) => {
if self.config.push_listen_addr_updates { if self.config.push_listen_addr_updates {
self.pending_push.extend(self.connected.keys()); for p in self.connected.keys() {
} let request = Request {
} peer_id: *p,
FromSwarm::ExpiredListenAddr(_) => { protocol: Protocol::Push,
if self.config.push_listen_addr_updates { };
self.pending_push.extend(self.connected.keys()); if !self.requests.contains(&request) {
self.requests.push(request);
}
}
} }
} }
FromSwarm::AddressChange(_) FromSwarm::AddressChange(_)
@ -509,7 +463,7 @@ fn listen_addrs(params: &impl PollParameters) -> Vec<Multiaddr> {
/// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true. /// the given peer_id. If there is no peer_id for the peer in the mutiaddr, this returns true.
fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool { fn multiaddr_matches_peer_id(addr: &Multiaddr, peer_id: &PeerId) -> bool {
let last_component = addr.iter().last(); let last_component = addr.iter().last();
if let Some(Protocol::P2p(multi_addr_peer_id)) = last_component { if let Some(multiaddr::Protocol::P2p(multi_addr_peer_id)) = last_component {
return multi_addr_peer_id == *peer_id.as_ref(); return multi_addr_peer_id == *peer_id.as_ref();
} }
true true
@ -557,6 +511,7 @@ impl PeerCache {
mod tests { mod tests {
use super::*; use super::*;
use futures::pin_mut; use futures::pin_mut;
use futures::prelude::*;
use libp2p_core::{identity, muxing::StreamMuxerBox, transport, upgrade, PeerId, Transport}; use libp2p_core::{identity, muxing::StreamMuxerBox, transport, upgrade, PeerId, Transport};
use libp2p_mplex::MplexConfig; use libp2p_mplex::MplexConfig;
use libp2p_noise as noise; use libp2p_noise as noise;
@ -618,7 +573,7 @@ mod tests {
// nb. Either swarm may receive the `Identified` event first, upon which // nb. Either swarm may receive the `Identified` event first, upon which
// it will permit the connection to be closed, as defined by // it will permit the connection to be closed, as defined by
// `IdentifyHandler::connection_keep_alive`. Hence the test succeeds if // `Handler::connection_keep_alive`. Hence the test succeeds if
// either `Identified` event arrives correctly. // either `Identified` event arrives correctly.
async_std::task::block_on(async move { async_std::task::block_on(async move {
loop { loop {
@ -835,8 +790,8 @@ mod tests {
let addr_without_peer_id: Multiaddr = addr.clone(); let addr_without_peer_id: Multiaddr = addr.clone();
let mut addr_with_other_peer_id = addr.clone(); let mut addr_with_other_peer_id = addr.clone();
addr.push(Protocol::P2p(peer_id.into())); addr.push(multiaddr::Protocol::P2p(peer_id.into()));
addr_with_other_peer_id.push(Protocol::P2p(other_peer_id.into())); addr_with_other_peer_id.push(multiaddr::Protocol::P2p(other_peer_id.into()));
assert!(multiaddr_matches_peer_id(&addr, &peer_id)); assert!(multiaddr_matches_peer_id(&addr, &peer_id));
assert!(!multiaddr_matches_peer_id( assert!(!multiaddr_matches_peer_id(

View File

@ -19,14 +19,15 @@
// DEALINGS IN THE SOFTWARE. // DEALINGS IN THE SOFTWARE.
use crate::protocol::{ use crate::protocol::{
InboundPush, Info, OutboundPush, Protocol, PushProtocol, ReplySubstream, UpgradeError, self, Identify, InboundPush, Info, OutboundPush, Protocol, Push, UpgradeError,
}; };
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::prelude::*; use futures::prelude::*;
use futures::stream::FuturesUnordered;
use futures_timer::Delay; use futures_timer::Delay;
use libp2p_core::either::{EitherError, EitherOutput}; use libp2p_core::either::{EitherError, EitherOutput};
use libp2p_core::upgrade::{EitherUpgrade, SelectUpgrade}; use libp2p_core::upgrade::{EitherUpgrade, SelectUpgrade};
use libp2p_core::{ConnectedPoint, PeerId}; use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, PublicKey};
use libp2p_swarm::handler::{ use libp2p_swarm::handler::{
ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound,
}; };
@ -36,18 +37,31 @@ use libp2p_swarm::{
}; };
use log::warn; use log::warn;
use smallvec::SmallVec; use smallvec::SmallVec;
use std::collections::VecDeque;
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration}; use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};
pub struct Proto { pub struct Proto {
initial_delay: Duration, initial_delay: Duration,
interval: Duration, interval: Duration,
public_key: PublicKey,
protocol_version: String,
agent_version: String,
} }
impl Proto { impl Proto {
pub fn new(initial_delay: Duration, interval: Duration) -> Self { pub fn new(
initial_delay: Duration,
interval: Duration,
public_key: PublicKey,
protocol_version: String,
agent_version: String,
) -> Self {
Proto { Proto {
initial_delay, initial_delay,
interval, interval,
public_key,
protocol_version,
agent_version,
} }
} }
} }
@ -55,12 +69,25 @@ impl Proto {
impl IntoConnectionHandler for Proto { impl IntoConnectionHandler for Proto {
type Handler = Handler; type Handler = Handler;
fn into_handler(self, remote_peer_id: &PeerId, _endpoint: &ConnectedPoint) -> Self::Handler { fn into_handler(self, remote_peer_id: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
Handler::new(self.initial_delay, self.interval, *remote_peer_id) let observed_addr = match endpoint {
ConnectedPoint::Dialer { address, .. } => address,
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
};
Handler::new(
self.initial_delay,
self.interval,
*remote_peer_id,
self.public_key,
self.protocol_version,
self.agent_version,
observed_addr.clone(),
)
} }
fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol { fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
SelectUpgrade::new(Protocol, PushProtocol::inbound()) SelectUpgrade::new(Identify, Push::inbound())
} }
} }
@ -74,14 +101,16 @@ pub struct Handler {
inbound_identify_push: Option<BoxFuture<'static, Result<Info, UpgradeError>>>, inbound_identify_push: Option<BoxFuture<'static, Result<Info, UpgradeError>>>,
/// Pending events to yield. /// Pending events to yield.
events: SmallVec< events: SmallVec<
[ConnectionHandlerEvent< [ConnectionHandlerEvent<EitherUpgrade<Identify, Push<OutboundPush>>, (), Event, io::Error>;
EitherUpgrade<Protocol, PushProtocol<OutboundPush>>, 4],
(),
Event,
io::Error,
>; 4],
>, >,
/// Streams awaiting `BehaviourInfo` to then send identify requests.
reply_streams: VecDeque<NegotiatedSubstream>,
/// Pending identification replies, awaiting being sent.
pending_replies: FuturesUnordered<BoxFuture<'static, Result<PeerId, UpgradeError>>>,
/// Future that fires when we need to identify the node again. /// Future that fires when we need to identify the node again.
trigger_next_identify: Delay, trigger_next_identify: Delay,
@ -90,36 +119,75 @@ pub struct Handler {
/// The interval of `trigger_next_identify`, i.e. the recurrent delay. /// The interval of `trigger_next_identify`, i.e. the recurrent delay.
interval: Duration, interval: Duration,
/// The public key of the local peer.
public_key: PublicKey,
/// Application-specific version of the protocol family used by the peer,
/// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
protocol_version: String,
/// Name and version of the peer, similar to the `User-Agent` header in
/// the HTTP protocol.
agent_version: String,
/// Address observed by or for the remote.
observed_addr: Multiaddr,
} }
/// Event produced by the `IdentifyHandler`. /// An event from `Behaviour` with the information requested by the `Handler`.
#[derive(Debug)]
pub struct InEvent {
/// The addresses that the peer is listening on.
pub listen_addrs: Vec<Multiaddr>,
/// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
pub supported_protocols: Vec<String>,
/// The protocol w.r.t. the information requested.
pub protocol: Protocol,
}
/// Event produced by the `Handler`.
#[derive(Debug)] #[derive(Debug)]
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
pub enum Event { pub enum Event {
/// We obtained identification information from the remote. /// We obtained identification information from the remote.
Identified(Info), Identified(Info),
/// We replied to an identification request from the remote.
Identification(PeerId),
/// We actively pushed our identification information to the remote. /// We actively pushed our identification information to the remote.
IdentificationPushed, IdentificationPushed,
/// We received a request for identification. /// We received a request for identification.
Identify(ReplySubstream<NegotiatedSubstream>), Identify,
/// Failed to identify the remote. /// Failed to identify the remote, or to reply to an identification request.
IdentificationError(ConnectionHandlerUpgrErr<UpgradeError>), IdentificationError(ConnectionHandlerUpgrErr<UpgradeError>),
} }
/// Identifying information of the local node that is pushed to a remote.
#[derive(Debug)]
pub struct Push(pub Info);
impl Handler { impl Handler {
/// Creates a new `IdentifyHandler`. /// Creates a new `Handler`.
pub fn new(initial_delay: Duration, interval: Duration, remote_peer_id: PeerId) -> Self { pub fn new(
initial_delay: Duration,
interval: Duration,
remote_peer_id: PeerId,
public_key: PublicKey,
protocol_version: String,
agent_version: String,
observed_addr: Multiaddr,
) -> Self {
Self { Self {
remote_peer_id, remote_peer_id,
inbound_identify_push: Default::default(), inbound_identify_push: Default::default(),
events: SmallVec::new(), events: SmallVec::new(),
reply_streams: VecDeque::new(),
pending_replies: FuturesUnordered::new(),
trigger_next_identify: Delay::new(initial_delay), trigger_next_identify: Delay::new(initial_delay),
keep_alive: KeepAlive::Yes, keep_alive: KeepAlive::Yes,
interval, interval,
public_key,
protocol_version,
agent_version,
observed_addr,
} }
} }
@ -133,9 +201,18 @@ impl Handler {
>, >,
) { ) {
match output { match output {
EitherOutput::First(substream) => self EitherOutput::First(substream) => {
.events self.events
.push(ConnectionHandlerEvent::Custom(Event::Identify(substream))), .push(ConnectionHandlerEvent::Custom(Event::Identify));
if !self.reply_streams.is_empty() {
warn!(
"New inbound identify request from {} while a previous one \
is still pending. Queueing the new one.",
self.remote_peer_id,
);
}
self.reply_streams.push_back(substream);
}
EitherOutput::Second(fut) => { EitherOutput::Second(fut) => {
if self.inbound_identify_push.replace(fut).is_some() { if self.inbound_identify_push.replace(fut).is_some() {
warn!( warn!(
@ -195,26 +272,58 @@ impl Handler {
} }
impl ConnectionHandler for Handler { impl ConnectionHandler for Handler {
type InEvent = Push; type InEvent = InEvent;
type OutEvent = Event; type OutEvent = Event;
type Error = io::Error; type Error = io::Error;
type InboundProtocol = SelectUpgrade<Protocol, PushProtocol<InboundPush>>; type InboundProtocol = SelectUpgrade<Identify, Push<InboundPush>>;
type OutboundProtocol = EitherUpgrade<Protocol, PushProtocol<OutboundPush>>; type OutboundProtocol = EitherUpgrade<Identify, Push<OutboundPush>>;
type OutboundOpenInfo = (); type OutboundOpenInfo = ();
type InboundOpenInfo = (); type InboundOpenInfo = ();
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> { fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
SubstreamProtocol::new(SelectUpgrade::new(Protocol, PushProtocol::inbound()), ()) SubstreamProtocol::new(SelectUpgrade::new(Identify, Push::inbound()), ())
} }
fn on_behaviour_event(&mut self, Push(push): Self::InEvent) { fn on_behaviour_event(
self.events &mut self,
.push(ConnectionHandlerEvent::OutboundSubstreamRequest { InEvent {
protocol: SubstreamProtocol::new( listen_addrs,
EitherUpgrade::B(PushProtocol::outbound(push)), supported_protocols,
(), protocol,
), }: Self::InEvent,
}); ) {
let info = Info {
public_key: self.public_key.clone(),
protocol_version: self.protocol_version.clone(),
agent_version: self.agent_version.clone(),
listen_addrs,
protocols: supported_protocols,
observed_addr: self.observed_addr.clone(),
};
match protocol {
Protocol::Push => {
self.events
.push(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(
EitherUpgrade::B(Push::outbound(info)),
(),
),
});
}
Protocol::Identify(_) => {
let substream = self
.reply_streams
.pop_front()
.expect("A BehaviourInfo reply should have a matching substream.");
let peer = self.remote_peer_id;
let fut = Box::pin(async move {
protocol::send(substream, info).await?;
Ok(peer)
});
self.pending_replies.push(fut);
}
}
} }
fn connection_keep_alive(&self) -> KeepAlive { fn connection_keep_alive(&self) -> KeepAlive {
@ -237,7 +346,7 @@ impl ConnectionHandler for Handler {
Poll::Ready(()) => { Poll::Ready(()) => {
self.trigger_next_identify.reset(self.interval); self.trigger_next_identify.reset(self.interval);
let ev = ConnectionHandlerEvent::OutboundSubstreamRequest { let ev = ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(EitherUpgrade::A(Protocol), ()), protocol: SubstreamProtocol::new(EitherUpgrade::A(Identify), ()),
}; };
return Poll::Ready(ev); return Poll::Ready(ev);
} }
@ -255,7 +364,18 @@ impl ConnectionHandler for Handler {
} }
} }
Poll::Pending // Check for pending replies to send.
match self.pending_replies.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(peer_id))) => Poll::Ready(ConnectionHandlerEvent::Custom(
Event::Identification(peer_id),
)),
Poll::Ready(Some(Err(err))) => Poll::Ready(ConnectionHandlerEvent::Custom(
Event::IdentificationError(ConnectionHandlerUpgrErr::Upgrade(
libp2p_core::upgrade::UpgradeError::Apply(err),
)),
)),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
} }
fn on_connection_event( fn on_connection_event(

View File

@ -22,13 +22,14 @@ use crate::structs_proto;
use asynchronous_codec::{FramedRead, FramedWrite}; use asynchronous_codec::{FramedRead, FramedWrite};
use futures::{future::BoxFuture, prelude::*}; use futures::{future::BoxFuture, prelude::*};
use libp2p_core::{ use libp2p_core::{
connection::ConnectionId,
identity, multiaddr, identity, multiaddr,
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}, upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
Multiaddr, PublicKey, Multiaddr, PublicKey,
}; };
use log::trace; use log::trace;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::{fmt, io, iter, pin::Pin}; use std::{io, iter, pin::Pin};
use thiserror::Error; use thiserror::Error;
use void::Void; use void::Void;
@ -38,25 +39,32 @@ pub const PROTOCOL_NAME: &[u8; 14] = b"/ipfs/id/1.0.0";
pub const PUSH_PROTOCOL_NAME: &[u8; 19] = b"/ipfs/id/push/1.0.0"; pub const PUSH_PROTOCOL_NAME: &[u8; 19] = b"/ipfs/id/push/1.0.0";
/// The type of the Substream protocol.
#[derive(Debug, PartialEq, Eq)]
pub enum Protocol {
Identify(ConnectionId),
Push,
}
/// Substream upgrade protocol for `/ipfs/id/1.0.0`. /// Substream upgrade protocol for `/ipfs/id/1.0.0`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Protocol; pub struct Identify;
/// Substream upgrade protocol for `/ipfs/id/push/1.0.0`. /// Substream upgrade protocol for `/ipfs/id/push/1.0.0`.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct PushProtocol<T>(T); pub struct Push<T>(T);
pub struct InboundPush(); pub struct InboundPush();
pub struct OutboundPush(Info); pub struct OutboundPush(Info);
impl PushProtocol<InboundPush> { impl Push<InboundPush> {
pub fn inbound() -> Self { pub fn inbound() -> Self {
PushProtocol(InboundPush()) Push(InboundPush())
} }
} }
impl PushProtocol<OutboundPush> { impl Push<OutboundPush> {
pub fn outbound(info: Info) -> Self { pub fn outbound(info: Info) -> Self {
PushProtocol(OutboundPush(info)) Push(OutboundPush(info))
} }
} }
@ -79,31 +87,7 @@ pub struct Info {
pub observed_addr: Multiaddr, pub observed_addr: Multiaddr,
} }
/// The substream on which a reply is expected to be sent. impl UpgradeInfo for Identify {
pub struct ReplySubstream<T> {
inner: T,
}
impl<T> fmt::Debug for ReplySubstream<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("ReplySubstream").finish()
}
}
impl<T> ReplySubstream<T>
where
T: AsyncWrite + Unpin,
{
/// Sends back the requested information on the substream.
///
/// Consumes the substream, returning a future that resolves
/// when the reply has been sent on the underlying connection.
pub async fn send(self, info: Info) -> Result<(), UpgradeError> {
send(self.inner, info).await.map_err(Into::into)
}
}
impl UpgradeInfo for Protocol {
type Info = &'static [u8]; type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>; type InfoIter = iter::Once<Self::Info>;
@ -112,17 +96,17 @@ impl UpgradeInfo for Protocol {
} }
} }
impl<C> InboundUpgrade<C> for Protocol { impl<C> InboundUpgrade<C> for Identify {
type Output = ReplySubstream<C>; type Output = C;
type Error = UpgradeError; type Error = UpgradeError;
type Future = future::Ready<Result<Self::Output, UpgradeError>>; type Future = future::Ready<Result<Self::Output, UpgradeError>>;
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
future::ok(ReplySubstream { inner: socket }) future::ok(socket)
} }
} }
impl<C> OutboundUpgrade<C> for Protocol impl<C> OutboundUpgrade<C> for Identify
where where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{ {
@ -135,7 +119,7 @@ where
} }
} }
impl<T> UpgradeInfo for PushProtocol<T> { impl<T> UpgradeInfo for Push<T> {
type Info = &'static [u8]; type Info = &'static [u8];
type InfoIter = iter::Once<Self::Info>; type InfoIter = iter::Once<Self::Info>;
@ -144,7 +128,7 @@ impl<T> UpgradeInfo for PushProtocol<T> {
} }
} }
impl<C> InboundUpgrade<C> for PushProtocol<InboundPush> impl<C> InboundUpgrade<C> for Push<InboundPush>
where where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{ {
@ -158,7 +142,7 @@ where
} }
} }
impl<C> OutboundUpgrade<C> for PushProtocol<OutboundPush> impl<C> OutboundUpgrade<C> for Push<OutboundPush>
where where
C: AsyncWrite + Unpin + Send + 'static, C: AsyncWrite + Unpin + Send + 'static,
{ {
@ -171,7 +155,7 @@ where
} }
} }
async fn send<T>(io: T, info: Info) -> Result<(), UpgradeError> pub(crate) async fn send<T>(io: T, info: Info) -> Result<(), UpgradeError>
where where
T: AsyncWrite + Unpin, T: AsyncWrite + Unpin,
{ {
@ -316,10 +300,11 @@ mod tests {
.await .await
.unwrap(); .unwrap();
let sender = apply_inbound(socket, Protocol).await.unwrap(); let sender = apply_inbound(socket, Identify).await.unwrap();
sender send(
.send(Info { sender,
Info {
public_key: send_pubkey, public_key: send_pubkey,
protocol_version: "proto_version".to_owned(), protocol_version: "proto_version".to_owned(),
agent_version: "agent_version".to_owned(), agent_version: "agent_version".to_owned(),
@ -329,16 +314,17 @@ mod tests {
], ],
protocols: vec!["proto1".to_string(), "proto2".to_string()], protocols: vec!["proto1".to_string(), "proto2".to_string()],
observed_addr: "/ip4/100.101.102.103/tcp/5000".parse().unwrap(), observed_addr: "/ip4/100.101.102.103/tcp/5000".parse().unwrap(),
}) },
.await )
.unwrap(); .await
.unwrap();
}); });
async_std::task::block_on(async move { async_std::task::block_on(async move {
let mut transport = tcp::async_io::Transport::default(); let mut transport = tcp::async_io::Transport::default();
let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
let info = apply_outbound(socket, Protocol, upgrade::Version::V1) let info = apply_outbound(socket, Identify, upgrade::Version::V1)
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(