// Copyright 2021 COMIT Network. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. //! A generic [`ConnectionHandler`] that delegates the handling of substreams to [`SubstreamHandler`]s. //! //! This module is an attempt to simplify the implementation of protocols by freeing implementations from dealing with aspects such as concurrent substreams. //! Particularly for outbound substreams, it greatly simplifies the definition of protocols through the [`FutureSubstream`] helper. //! //! At the moment, this module is an implementation detail of the rendezvous protocol but the intent is for it to be provided as a generic module that is accessible to other protocols as well. use futures::future::{self, BoxFuture, Fuse, FusedFuture}; use futures::FutureExt; use instant::Instant; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_swarm::handler::{ConnectionEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound}; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerEvent, KeepAlive, NegotiatedSubstream, SubstreamProtocol, }; use std::collections::{HashMap, VecDeque}; use std::fmt; use std::future::Future; use std::hash::Hash; use std::task::{Context, Poll}; use std::time::Duration; use void::Void; /// Handles a substream throughout its lifetime. pub trait SubstreamHandler: Sized { type InEvent; type OutEvent; type Error; type OpenInfo; fn upgrade(open_info: Self::OpenInfo) -> SubstreamProtocol; fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self; fn on_event(self, event: Self::InEvent) -> Self; fn advance(self, cx: &mut Context<'_>) -> Result, Self::Error>; } /// The result of advancing a [`SubstreamHandler`]. pub enum Next { /// Return the given event and set the handler into `next_state`. EmitEvent { event: TEvent, next_state: TState }, /// The handler currently cannot do any more work, set its state back into `next_state`. Pending { next_state: TState }, /// The handler performed some work and wants to continue in the given state. /// /// This variant is useful because it frees the handler from implementing a loop internally. Continue { next_state: TState }, /// The handler finished. Done, } impl Next { pub fn map_state( self, map: impl FnOnce(TState) -> TNextState, ) -> Next { match self { Next::EmitEvent { event, next_state } => Next::EmitEvent { event, next_state: map(next_state), }, Next::Pending { next_state } => Next::Pending { next_state: map(next_state), }, Next::Continue { next_state } => Next::Pending { next_state: map(next_state), }, Next::Done => Next::Done, } } } #[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)] pub struct InboundSubstreamId(u64); impl InboundSubstreamId { fn fetch_and_increment(&mut self) -> Self { let next_id = *self; self.0 += 1; next_id } } impl fmt::Display for InboundSubstreamId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) } } #[derive(Debug, Hash, Eq, PartialEq, Clone, Copy)] pub struct OutboundSubstreamId(u64); impl OutboundSubstreamId { fn fetch_and_increment(&mut self) -> Self { let next_id = *self; self.0 += 1; next_id } } impl fmt::Display for OutboundSubstreamId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", self.0) } } pub struct PassthroughProtocol { ident: Option<&'static [u8]>, } impl PassthroughProtocol { pub fn new(ident: &'static [u8]) -> Self { Self { ident: Some(ident) } } } impl UpgradeInfo for PassthroughProtocol { type Info = &'static [u8]; type InfoIter = std::option::IntoIter; fn protocol_info(&self) -> Self::InfoIter { self.ident.into_iter() } } impl InboundUpgrade for PassthroughProtocol { type Output = C; type Error = Void; type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future { match self.ident { Some(_) => future::ready(Ok(socket)).boxed(), None => future::pending().boxed(), } } } impl OutboundUpgrade for PassthroughProtocol { type Output = C; type Error = Void; type Future = BoxFuture<'static, Result>; fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future { match self.ident { Some(_) => future::ready(Ok(socket)).boxed(), None => future::pending().boxed(), } } } /// An implementation of [`ConnectionHandler`] that delegates to individual [`SubstreamHandler`]s. pub struct SubstreamConnectionHandler { inbound_substreams: HashMap, outbound_substreams: HashMap, next_inbound_substream_id: InboundSubstreamId, next_outbound_substream_id: OutboundSubstreamId, new_substreams: VecDeque, initial_keep_alive_deadline: Instant, } impl SubstreamConnectionHandler { pub fn new(initial_keep_alive: Duration) -> Self { Self { inbound_substreams: Default::default(), outbound_substreams: Default::default(), next_inbound_substream_id: InboundSubstreamId(0), next_outbound_substream_id: OutboundSubstreamId(0), new_substreams: Default::default(), initial_keep_alive_deadline: Instant::now() + initial_keep_alive, } } } impl SubstreamConnectionHandler { pub fn new_outbound_only(initial_keep_alive: Duration) -> Self { Self { inbound_substreams: Default::default(), outbound_substreams: Default::default(), next_inbound_substream_id: InboundSubstreamId(0), next_outbound_substream_id: OutboundSubstreamId(0), new_substreams: Default::default(), initial_keep_alive_deadline: Instant::now() + initial_keep_alive, } } } impl SubstreamConnectionHandler { pub fn new_inbound_only(initial_keep_alive: Duration) -> Self { Self { inbound_substreams: Default::default(), outbound_substreams: Default::default(), next_inbound_substream_id: InboundSubstreamId(0), next_outbound_substream_id: OutboundSubstreamId(0), new_substreams: Default::default(), initial_keep_alive_deadline: Instant::now() + initial_keep_alive, } } } /// Poll all substreams within the given HashMap. /// /// This is defined as a separate function because we call it with two different fields stored within [`SubstreamConnectionHandler`]. fn poll_substreams( substreams: &mut HashMap, cx: &mut Context<'_>, ) -> Poll> where TSubstream: SubstreamHandler, TId: Copy + Eq + Hash + fmt::Display, { let substream_ids = substreams.keys().copied().collect::>(); 'loop_substreams: for id in substream_ids { let mut handler = substreams .remove(&id) .expect("we just got the key out of the map"); let (next_state, poll) = 'loop_handler: loop { match handler.advance(cx) { Ok(Next::EmitEvent { next_state, event }) => { break (next_state, Poll::Ready(Ok((id, event)))) } Ok(Next::Pending { next_state }) => break (next_state, Poll::Pending), Ok(Next::Continue { next_state }) => { handler = next_state; continue 'loop_handler; } Ok(Next::Done) => { log::debug!("Substream handler {} finished", id); continue 'loop_substreams; } Err(e) => return Poll::Ready(Err((id, e))), } }; substreams.insert(id, next_state); return poll; } Poll::Pending } /// Event sent from the [`libp2p_swarm::NetworkBehaviour`] to the [`SubstreamConnectionHandler`]. #[allow(clippy::enum_variant_names)] #[derive(Debug)] pub enum InEvent { /// Open a new substream using the provided `open_info`. /// /// For "client-server" protocols, this is typically the initial message to be sent to the other party. NewSubstream { open_info: I }, NotifyInboundSubstream { id: InboundSubstreamId, message: TInboundEvent, }, NotifyOutboundSubstream { id: OutboundSubstreamId, message: TOutboundEvent, }, } /// Event produced by the [`SubstreamConnectionHandler`] for the corresponding [`libp2p_swarm::NetworkBehaviour`]. #[derive(Debug)] pub enum OutEvent { /// An inbound substream produced an event. InboundEvent { id: InboundSubstreamId, message: TInbound, }, /// An outbound substream produced an event. OutboundEvent { id: OutboundSubstreamId, message: TOutbound, }, /// An inbound substream errored irrecoverably. InboundError { id: InboundSubstreamId, error: TInboundError, }, /// An outbound substream errored irrecoverably. OutboundError { id: OutboundSubstreamId, error: TOutboundError, }, } impl< TInboundInEvent, TInboundOutEvent, TOutboundInEvent, TOutboundOutEvent, TOutboundOpenInfo, TInboundError, TOutboundError, TInboundSubstreamHandler, TOutboundSubstreamHandler, > ConnectionHandler for SubstreamConnectionHandler< TInboundSubstreamHandler, TOutboundSubstreamHandler, TOutboundOpenInfo, > where TInboundSubstreamHandler: SubstreamHandler< InEvent = TInboundInEvent, OutEvent = TInboundOutEvent, Error = TInboundError, OpenInfo = (), >, TOutboundSubstreamHandler: SubstreamHandler< InEvent = TOutboundInEvent, OutEvent = TOutboundOutEvent, Error = TOutboundError, OpenInfo = TOutboundOpenInfo, >, TInboundInEvent: fmt::Debug + Send + 'static, TInboundOutEvent: fmt::Debug + Send + 'static, TOutboundInEvent: fmt::Debug + Send + 'static, TOutboundOutEvent: fmt::Debug + Send + 'static, TOutboundOpenInfo: fmt::Debug + Send + 'static, TInboundError: fmt::Debug + Send + 'static, TOutboundError: fmt::Debug + Send + 'static, TInboundSubstreamHandler: Send + 'static, TOutboundSubstreamHandler: Send + 'static, { type InEvent = InEvent; type OutEvent = OutEvent; type Error = Void; type InboundProtocol = PassthroughProtocol; type OutboundProtocol = PassthroughProtocol; type InboundOpenInfo = (); type OutboundOpenInfo = TOutboundOpenInfo; fn listen_protocol(&self) -> SubstreamProtocol { TInboundSubstreamHandler::upgrade(()) } fn on_connection_event( &mut self, event: ConnectionEvent< Self::InboundProtocol, Self::OutboundProtocol, Self::InboundOpenInfo, Self::OutboundOpenInfo, >, ) { match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { protocol, .. }) => { self.inbound_substreams.insert( self.next_inbound_substream_id.fetch_and_increment(), TInboundSubstreamHandler::new(protocol, ()), ); } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol, info, }) => { self.outbound_substreams.insert( self.next_outbound_substream_id.fetch_and_increment(), TOutboundSubstreamHandler::new(protocol, info), ); } // TODO: Handle upgrade errors properly ConnectionEvent::AddressChange(_) | ConnectionEvent::ListenUpgradeError(_) | ConnectionEvent::DialUpgradeError(_) => {} } } fn on_behaviour_event(&mut self, event: Self::InEvent) { match event { InEvent::NewSubstream { open_info } => self.new_substreams.push_back(open_info), InEvent::NotifyInboundSubstream { id, message } => { match self.inbound_substreams.remove(&id) { Some(handler) => { let new_handler = handler.on_event(message); self.inbound_substreams.insert(id, new_handler); } None => { log::debug!("Substream with ID {} not found", id); } } } InEvent::NotifyOutboundSubstream { id, message } => { match self.outbound_substreams.remove(&id) { Some(handler) => { let new_handler = handler.on_event(message); self.outbound_substreams.insert(id, new_handler); } None => { log::debug!("Substream with ID {} not found", id); } } } } } fn connection_keep_alive(&self) -> KeepAlive { // Rudimentary keep-alive handling, to be extended as needed as this abstraction is used more by other protocols. if Instant::now() < self.initial_keep_alive_deadline { return KeepAlive::Yes; } if self.inbound_substreams.is_empty() && self.outbound_substreams.is_empty() && self.new_substreams.is_empty() { return KeepAlive::No; } KeepAlive::Yes } fn poll( &mut self, cx: &mut Context<'_>, ) -> Poll< ConnectionHandlerEvent< Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error, >, > { if let Some(open_info) = self.new_substreams.pop_front() { return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: TOutboundSubstreamHandler::upgrade(open_info), }); } match poll_substreams(&mut self.inbound_substreams, cx) { Poll::Ready(Ok((id, message))) => { return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::InboundEvent { id, message, })) } Poll::Ready(Err((id, error))) => { return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::InboundError { id, error, })) } Poll::Pending => {} } match poll_substreams(&mut self.outbound_substreams, cx) { Poll::Ready(Ok((id, message))) => { return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::OutboundEvent { id, message, })) } Poll::Ready(Err((id, error))) => { return Poll::Ready(ConnectionHandlerEvent::Custom(OutEvent::OutboundError { id, error, })) } Poll::Pending => {} } Poll::Pending } } /// A helper struct for substream handlers that can be implemented as async functions. /// /// This only works for substreams without an `InEvent` because - once constructed - the state of an inner future is opaque. pub struct FutureSubstream { future: Fuse>>, } impl FutureSubstream { pub fn new(future: impl Future> + Send + 'static) -> Self { Self { future: future.boxed().fuse(), } } pub fn advance(mut self, cx: &mut Context<'_>) -> Result, TError> { if self.future.is_terminated() { return Ok(Next::Done); } match self.future.poll_unpin(cx) { Poll::Ready(Ok(event)) => Ok(Next::EmitEvent { event, next_state: self, }), Poll::Ready(Err(error)) => Err(error), Poll::Pending => Ok(Next::Pending { next_state: self }), } } } impl SubstreamHandler for void::Void { type InEvent = void::Void; type OutEvent = void::Void; type Error = void::Void; type OpenInfo = (); fn new(_: NegotiatedSubstream, _: Self::OpenInfo) -> Self { unreachable!("we should never yield a substream") } fn on_event(self, event: Self::InEvent) -> Self { void::unreachable(event) } fn advance(self, _: &mut Context<'_>) -> Result, Self::Error> { void::unreachable(self) } fn upgrade( open_info: Self::OpenInfo, ) -> SubstreamProtocol { SubstreamProtocol::new(PassthroughProtocol { ident: None }, open_info) } }