Split ConnectionUpgrade. (#642)

Introduce `InboundUpgrade` and `OutboundUpgrade`.
This commit is contained in:
Toralf Wittner
2018-11-15 17:41:11 +01:00
committed by Pierre Krieger
parent 466385a58a
commit 2e549884ef
52 changed files with 2010 additions and 1658 deletions

View File

@ -21,8 +21,11 @@
//! Contains the `IdentifyTransport` type.
use futures::prelude::*;
use libp2p_core::{Endpoint, Multiaddr, PeerId, PublicKey, Transport, muxing, upgrade::apply};
use protocol::{IdentifyOutput, IdentifyProtocolConfig};
use libp2p_core::{
Multiaddr, PeerId, PublicKey, muxing, Transport,
upgrade::{self, OutboundUpgradeApply, UpgradeError}
};
use protocol::{RemoteInfo, IdentifyProtocolConfig};
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::mem;
use std::sync::Arc;
@ -71,26 +74,7 @@ where
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (listener, new_addr) = match self.transport.listen_on(addr) {
Ok((l, a)) => (l, a),
Err((inner, addr)) => {
let id = IdentifyTransport {
transport: inner,
};
return Err((id, addr));
}
};
let listener = listener
.map(move |(upgrade, remote_addr)| {
let upgr = upgrade
.and_then(move |muxer| {
IdRetriever::new(muxer, IdentifyProtocolConfig, Endpoint::Listener)
});
(Box::new(upgr) as Box<Future<Item = _, Error = _> + Send>, remote_addr)
});
Ok((Box::new(listener) as Box<_>, new_addr))
Err((self, addr))
}
#[inline]
@ -107,7 +91,7 @@ where
};
let dial = dial.and_then(move |muxer| {
IdRetriever::new(muxer, IdentifyProtocolConfig, Endpoint::Dialer)
IdRetriever::new(muxer, IdentifyProtocolConfig).map_err(|e| e.into_io_error())
});
Ok(Box::new(dial) as Box<_>)
@ -126,9 +110,7 @@ where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
TMuxer::Substream: Send,
{
/// Internal state.
state: IdRetrieverState<TMuxer>,
/// Whether we're dialing or listening.
endpoint: Endpoint,
state: IdRetrieverState<TMuxer>
}
enum IdRetrieverState<TMuxer>
@ -138,7 +120,7 @@ where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
/// We are in the process of opening a substream with the remote.
OpeningSubstream(Arc<TMuxer>, muxing::OutboundSubstreamRefWrapFuture<Arc<TMuxer>>, IdentifyProtocolConfig),
/// We opened the substream and are currently negotiating the identify protocol.
NegotiatingIdentify(Arc<TMuxer>, apply::UpgradeApplyFuture<muxing::SubstreamRef<Arc<TMuxer>>, IdentifyProtocolConfig>),
NegotiatingIdentify(Arc<TMuxer>, OutboundUpgradeApply<muxing::SubstreamRef<Arc<TMuxer>>, IdentifyProtocolConfig>),
/// We retreived the remote's public key and are ready to yield it when polled again.
Finishing(Arc<TMuxer>, PublicKey),
/// Something bad happend, or the `Future` is finished, and shouldn't be polled again.
@ -150,13 +132,12 @@ where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
TMuxer::Substream: Send,
{
/// Creates a new `IdRetriever` ready to be polled.
fn new(muxer: TMuxer, config: IdentifyProtocolConfig, endpoint: Endpoint) -> Self {
fn new(muxer: TMuxer, config: IdentifyProtocolConfig) -> Self {
let muxer = Arc::new(muxer);
let opening = muxing::outbound_from_ref_and_wrap(muxer.clone());
IdRetriever {
state: IdRetrieverState::OpeningSubstream(muxer, opening, config),
endpoint,
state: IdRetrieverState::OpeningSubstream(muxer, opening, config)
}
}
}
@ -166,7 +147,7 @@ where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
TMuxer::Substream: Send,
{
type Item = (PeerId, TMuxer);
type Error = IoError;
type Error = UpgradeError<IoError>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
// This loop is here so that we can continue polling until we're ready.
@ -177,28 +158,24 @@ where TMuxer: muxing::StreamMuxer + Send + Sync + 'static,
IdRetrieverState::OpeningSubstream(muxer, mut opening, config) => {
match opening.poll() {
Ok(Async::Ready(Some(substream))) => {
let upgrade = apply::apply(substream, config, self.endpoint);
self.state = IdRetrieverState::NegotiatingIdentify(muxer, upgrade);
let upgrade = upgrade::apply_outbound(substream, config);
self.state = IdRetrieverState::NegotiatingIdentify(muxer, upgrade)
},
Ok(Async::Ready(None)) => {
return Err(IoError::new(IoErrorKind::Other, "remote refused our identify attempt"));
},
return Err(UpgradeError::Apply(IoError::new(IoErrorKind::Other, "remote refused our identify attempt")))
}
Ok(Async::NotReady) => {
self.state = IdRetrieverState::OpeningSubstream(muxer, opening, config);
return Ok(Async::NotReady);
},
Err(err) => return Err(err),
Err(err) => return Err(UpgradeError::Apply(err))
}
},
IdRetrieverState::NegotiatingIdentify(muxer, mut nego) => {
match nego.poll() {
Ok(Async::Ready(IdentifyOutput::RemoteInfo { info, .. })) => {
Ok(Async::Ready(RemoteInfo { info, .. })) => {
self.state = IdRetrieverState::Finishing(muxer, info.public_key);
},
Ok(Async::Ready(IdentifyOutput::Sender { .. })) => {
unreachable!("IdentifyOutput::Sender can never be the output from \
the dialing side");
},
Ok(Async::NotReady) => {
self.state = IdRetrieverState::NegotiatingIdentify(muxer, nego);
return Ok(Async::NotReady);

View File

@ -70,7 +70,6 @@ extern crate fnv;
extern crate futures;
extern crate libp2p_peerstore;
extern crate libp2p_core;
#[macro_use]
extern crate log;
extern crate multiaddr;
extern crate parking_lot;
@ -87,7 +86,7 @@ pub use self::listen_handler::IdentifyListenHandler;
pub use self::listen_layer::IdentifyListen;
pub use self::periodic_id_handler::{PeriodicIdentification, PeriodicIdentificationEvent};
pub use self::periodic_id_layer::{PeriodicIdentifyBehaviour, PeriodicIdentifyBehaviourEvent};
pub use self::protocol::{IdentifyInfo, IdentifyOutput};
pub use self::protocol::{IdentifyInfo, RemoteInfo};
pub use self::protocol::{IdentifyProtocolConfig, IdentifySender};
mod id_transport;

View File

@ -18,15 +18,16 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{IdentifySender, IdentifyProtocolConfig};
use futures::prelude::*;
use libp2p_core::nodes::handled_node::NodeHandlerEndpoint;
use libp2p_core::nodes::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent};
use libp2p_core::ConnectionUpgrade;
use libp2p_core::{
nodes::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent},
upgrade::{DeniedUpgrade, InboundUpgrade}
};
use smallvec::SmallVec;
use std::io;
use tokio_io::{AsyncRead, AsyncWrite};
use void::Void;
use {IdentifySender, IdentifyOutput, IdentifyProtocolConfig};
use void::{Void, unreachable};
/// Protocol handler that identifies the remote at a regular period.
pub struct IdentifyListenHandler<TSubstream> {
@ -59,28 +60,29 @@ where
type InEvent = Void;
type OutEvent = IdentifySender<TSubstream>;
type Substream = TSubstream;
type Protocol = IdentifyProtocolConfig;
type InboundProtocol = IdentifyProtocolConfig;
type OutboundProtocol = DeniedUpgrade;
type OutboundOpenInfo = ();
#[inline]
fn listen_protocol(&self) -> Self::Protocol {
fn listen_protocol(&self) -> Self::InboundProtocol {
self.config.clone()
}
fn inject_fully_negotiated(
#[inline]
fn dialer_protocol(&self) -> Self::OutboundProtocol {
DeniedUpgrade
}
fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::Protocol as ConnectionUpgrade<TSubstream>>::Output,
endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>,
protocol: <Self::InboundProtocol as InboundUpgrade<TSubstream>>::Output
) {
match protocol {
IdentifyOutput::Sender { sender } => {
debug_assert!(if let NodeHandlerEndpoint::Listener = endpoint { true } else { false });
self.pending_result.push(sender);
}
IdentifyOutput::RemoteInfo { .. } => unreachable!(
"RemoteInfo can only be produced if we dial the protocol, but we never do that"
),
}
self.pending_result.push(protocol)
}
fn inject_fully_negotiated_outbound(&mut self, protocol: Void, _: Self::OutboundOpenInfo) {
unreachable(protocol)
}
#[inline]
@ -102,7 +104,7 @@ where
) -> Poll<
Option<
ProtocolsHandlerEvent<
Self::Protocol,
Self::OutboundProtocol,
Self::OutboundOpenInfo,
Self::OutEvent,
>,

View File

@ -18,18 +18,16 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use crate::{RemoteInfo, IdentifyProtocolConfig};
use futures::prelude::*;
use libp2p_core::nodes::handled_node::NodeHandlerEndpoint;
use libp2p_core::nodes::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent};
use libp2p_core::upgrade::{self, toggleable::Toggleable};
use libp2p_core::{ConnectionUpgrade, Multiaddr};
use std::io;
use std::marker::PhantomData;
use std::time::{Duration, Instant};
use libp2p_core::{
nodes::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent},
upgrade::{self, DeniedUpgrade, OutboundUpgrade, Toggleable}
};
use std::{io, marker::PhantomData, time::{Duration, Instant}};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Delay;
use void::Void;
use {IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig};
use void::{Void, unreachable};
/// Delay between the moment we connect and the first time we identify.
const DELAY_TO_FIRST_ID: Duration = Duration::from_millis(500);
@ -59,13 +57,7 @@ pub struct PeriodicIdentification<TSubstream> {
#[derive(Debug)]
pub enum PeriodicIdentificationEvent {
/// We obtained identification information from the remote
Identified {
/// Information of the remote.
info: IdentifyInfo,
/// Address the remote observes us as.
observed_addr: Multiaddr,
},
Identified(RemoteInfo),
/// Failed to identify the remote.
IdentificationError(io::Error),
}
@ -90,36 +82,30 @@ where
type InEvent = Void;
type OutEvent = PeriodicIdentificationEvent;
type Substream = TSubstream;
type Protocol = Toggleable<IdentifyProtocolConfig>;
type InboundProtocol = DeniedUpgrade;
type OutboundProtocol = Toggleable<IdentifyProtocolConfig>;
type OutboundOpenInfo = ();
#[inline]
fn listen_protocol(&self) -> Self::Protocol {
let mut upgrade = self.config.clone();
upgrade.disable();
upgrade
fn listen_protocol(&self) -> Self::InboundProtocol {
DeniedUpgrade
}
fn inject_fully_negotiated(
#[inline]
fn dialer_protocol(&self) -> Self::OutboundProtocol {
self.config.clone()
}
fn inject_fully_negotiated_inbound(&mut self, protocol: Void) {
unreachable(protocol)
}
fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::Protocol as ConnectionUpgrade<TSubstream>>::Output,
_endpoint: NodeHandlerEndpoint<Self::OutboundOpenInfo>,
protocol: <Self::OutboundProtocol as OutboundUpgrade<TSubstream>>::Output,
_info: Self::OutboundOpenInfo,
) {
match protocol {
IdentifyOutput::RemoteInfo {
info,
observed_addr,
} => {
self.pending_result = Some(PeriodicIdentificationEvent::Identified {
info,
observed_addr,
});
}
IdentifyOutput::Sender { .. } => unreachable!(
"Sender can only be produced if we listen for the identify \
protocol ; however we disable it in listen_protocol"
),
}
self.pending_result = Some(PeriodicIdentificationEvent::Identified(protocol))
}
#[inline]
@ -146,7 +132,7 @@ where
) -> Poll<
Option<
ProtocolsHandlerEvent<
Self::Protocol,
Self::OutboundProtocol,
Self::OutboundOpenInfo,
PeriodicIdentificationEvent,
>,

View File

@ -65,15 +65,12 @@ where
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
match event {
PeriodicIdentificationEvent::Identified {
info,
observed_addr,
} => {
PeriodicIdentificationEvent::Identified(remote) => {
self.events
.push_back(PeriodicIdentifyBehaviourEvent::Identified {
peer_id: peer_id,
info: info,
observed_addr: observed_addr,
info: remote.info,
observed_addr: remote.observed_addr,
});
}
_ => (), // TODO: exhaustive pattern

View File

@ -19,8 +19,12 @@
// DEALINGS IN THE SOFTWARE.
use bytes::{Bytes, BytesMut};
use futures::{future, Future, Sink, Stream};
use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr, PublicKey};
use futures::{future::{self, FutureResult}, Future, Sink, Stream};
use libp2p_core::{
Multiaddr, PublicKey,
upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}
};
use log::{debug, trace};
use protobuf::Message as ProtobufMessage;
use protobuf::parse_from_bytes as protobuf_parse_from_bytes;
use protobuf::RepeatedField;
@ -35,22 +39,14 @@ use unsigned_varint::codec;
#[derive(Debug, Clone)]
pub struct IdentifyProtocolConfig;
/// Output of the connection upgrade.
pub enum IdentifyOutput<T> {
/// We obtained information from the remote. Happens when we are the dialer.
RemoteInfo {
/// Information about the remote.
info: IdentifyInfo,
/// Address the remote sees for us.
observed_addr: Multiaddr,
},
#[derive(Debug, Clone)]
pub struct RemoteInfo {
/// Information about the remote.
pub info: IdentifyInfo,
/// Address the remote sees for us.
pub observed_addr: Multiaddr,
/// We opened a connection to the remote and need to send it information. Happens when we are
/// the listener.
Sender {
/// Object used to send identify info to the client.
sender: IdentifySender<T>,
},
_priv: ()
}
/// Object used to send back information to the client.
@ -64,11 +60,9 @@ where
{
/// Sends back information to the client. Returns a future that is signalled whenever the
/// info have been sent.
pub fn send(
self,
info: IdentifyInfo,
observed_addr: &Multiaddr,
) -> Box<Future<Item = (), Error = IoError> + Send + 'a> {
pub fn send(self, info: IdentifyInfo, observed_addr: &Multiaddr)
-> Box<Future<Item = (), Error = IoError> + Send + 'a>
{
debug!("Sending identify info to client");
trace!("Sending: {:?}", info);
@ -108,67 +102,73 @@ pub struct IdentifyInfo {
pub listen_addrs: Vec<Multiaddr>,
/// Protocols supported by the node, e.g. `/ipfs/ping/1.0.0`.
pub protocols: Vec<String>,
_priv: ()
}
impl<C> ConnectionUpgrade<C> for IdentifyProtocolConfig
where
C: AsyncRead + AsyncWrite + Send + 'static,
{
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = ();
type Output = IdentifyOutput<C>;
type Future = Box<Future<Item = Self::Output, Error = IoError> + Send>;
impl UpgradeInfo for IdentifyProtocolConfig {
type UpgradeId = ();
type NamesIter = iter::Once<(Bytes, Self::UpgradeId)>;
#[inline]
fn protocol_names(&self) -> Self::NamesIter {
iter::once((Bytes::from("/ipfs/id/1.0.0"), ()))
}
}
fn upgrade(self, socket: C, _: (), ty: Endpoint) -> Self::Future {
trace!("Upgrading connection as {:?}", ty);
impl<C> InboundUpgrade<C> for IdentifyProtocolConfig
where
C: AsyncRead + AsyncWrite + Send + 'static,
{
type Output = IdentifySender<C>;
type Error = IoError;
type Future = FutureResult<Self::Output, IoError>;
fn upgrade_inbound(self, socket: C, _: ()) -> Self::Future {
trace!("Upgrading inbound connection");
let socket = Framed::new(socket, codec::UviBytes::default());
let sender = IdentifySender { inner: socket };
future::ok(sender)
}
}
match ty {
Endpoint::Dialer => {
let future = socket
.into_future()
.map(|(msg, _)| msg)
.map_err(|(err, _)| err)
.and_then(|msg| {
debug!("Received identify message");
impl<C> OutboundUpgrade<C> for IdentifyProtocolConfig
where
C: AsyncRead + AsyncWrite + Send + 'static,
{
type Output = RemoteInfo;
type Error = IoError;
type Future = Box<dyn Future<Item=Self::Output, Error=IoError> + Send>;
if let Some(msg) = msg {
let (info, observed_addr) = match parse_proto_msg(msg) {
Ok(v) => v,
Err(err) => {
debug!("Failed to parse protobuf message; error = {:?}", err);
return Err(err.into());
}
};
trace!("Remote observes us as {:?}", observed_addr);
trace!("Information received: {:?}", info);
Ok(IdentifyOutput::RemoteInfo {
info,
observed_addr: observed_addr.clone(),
})
} else {
debug!("Identify protocol stream closed before receiving info");
Err(IoErrorKind::InvalidData.into())
fn upgrade_outbound(self, socket: C, _: ()) -> Self::Future {
let socket = Framed::new(socket, codec::UviBytes::<BytesMut>::default());
let future = socket
.into_future()
.map(|(msg, _)| msg)
.map_err(|(err, _)| err)
.and_then(|msg| {
debug!("Received identify message");
if let Some(msg) = msg {
let (info, observed_addr) = match parse_proto_msg(msg) {
Ok(v) => v,
Err(err) => {
debug!("Failed to parse protobuf message; error = {:?}", err);
return Err(err.into());
}
});
Box::new(future) as Box<_>
}
Endpoint::Listener => {
let sender = IdentifySender { inner: socket };
let future = future::ok(IdentifyOutput::Sender { sender });
Box::new(future) as Box<_>
}
}
};
trace!("Remote observes us as {:?}", observed_addr);
trace!("Information received: {:?}", info);
Ok(RemoteInfo {
info,
observed_addr: observed_addr.clone(),
_priv: ()
})
} else {
debug!("Identify protocol stream closed before receiving info");
Err(IoErrorKind::InvalidData.into())
}
});
Box::new(future) as Box<_>
}
}
@ -199,6 +199,7 @@ fn parse_proto_msg(msg: BytesMut) -> Result<(IdentifyInfo, Multiaddr), IoError>
agent_version: msg.take_agentVersion(),
listen_addrs: listen_addrs,
protocols: msg.take_protocols().into_vec(),
_priv: ()
};
Ok((info, observed_addr))
@ -216,10 +217,10 @@ mod tests {
use self::tokio::runtime::current_thread::Runtime;
use self::libp2p_tcp_transport::TcpConfig;
use futures::{Future, Stream};
use libp2p_core::{PublicKey, Transport};
use libp2p_core::{PublicKey, Transport, upgrade::{apply_outbound, apply_inbound}};
use std::sync::mpsc;
use std::thread;
use {IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig};
use {IdentifyInfo, RemoteInfo, IdentifyProtocolConfig};
#[test]
fn correct_transfer() {
@ -229,19 +230,23 @@ mod tests {
let (tx, rx) = mpsc::channel();
let bg_thread = thread::spawn(move || {
let transport = TcpConfig::new().with_upgrade(IdentifyProtocolConfig);
let transport = TcpConfig::new();
let (listener, addr) = transport
.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
.unwrap();
tx.send(addr).unwrap();
let future = listener
.into_future()
.map_err(|(err, _)| err)
.and_then(|(client, _)| client.unwrap().0)
.and_then(|identify| match identify {
IdentifyOutput::Sender { sender, .. } => sender.send(
.and_then(|socket| {
apply_inbound(socket, IdentifyProtocolConfig).map_err(|e| e.into_io_error())
})
.and_then(|sender| {
sender.send(
IdentifyInfo {
public_key: PublicKey::Ed25519(vec![1, 2, 3, 4, 5, 7]),
protocol_version: "proto_version".to_owned(),
@ -251,47 +256,34 @@ mod tests {
"/ip6/::1/udp/1000".parse().unwrap(),
],
protocols: vec!["proto1".to_string(), "proto2".to_string()],
_priv: ()
},
&"/ip4/100.101.102.103/tcp/5000".parse().unwrap(),
),
_ => panic!(),
)
});
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future).unwrap();
});
let transport = TcpConfig::new().with_upgrade(IdentifyProtocolConfig);
let transport = TcpConfig::new();
let future = transport
.dial(rx.recv().unwrap())
let future = transport.dial(rx.recv().unwrap())
.unwrap_or_else(|_| panic!())
.and_then(|identify| match identify {
IdentifyOutput::RemoteInfo {
info,
observed_addr,
} => {
assert_eq!(
observed_addr,
"/ip4/100.101.102.103/tcp/5000".parse().unwrap()
);
assert_eq!(info.public_key, PublicKey::Ed25519(vec![1, 2, 3, 4, 5, 7]));
assert_eq!(info.protocol_version, "proto_version");
assert_eq!(info.agent_version, "agent_version");
assert_eq!(
info.listen_addrs,
&[
"/ip4/80.81.82.83/tcp/500".parse().unwrap(),
"/ip6/::1/udp/1000".parse().unwrap()
]
);
assert_eq!(
info.protocols,
&["proto1".to_string(), "proto2".to_string()]
);
Ok(())
}
_ => panic!(),
.and_then(|socket| {
apply_outbound(socket, IdentifyProtocolConfig).map_err(|e| e.into_io_error())
})
.and_then(|RemoteInfo { info, observed_addr, .. }| {
assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap());
assert_eq!(info.public_key, PublicKey::Ed25519(vec![1, 2, 3, 4, 5, 7]));
assert_eq!(info.protocol_version, "proto_version");
assert_eq!(info.agent_version, "agent_version");
assert_eq!(info.listen_addrs,
&["/ip4/80.81.82.83/tcp/500".parse().unwrap(),
"/ip6/::1/udp/1000".parse().unwrap()]);
assert_eq!(info.protocols, &["proto1".to_string(), "proto2".to_string()]);
Ok(())
});
let mut rt = Runtime::new().unwrap();
let _ = rt.block_on(future).unwrap();
bg_thread.join().unwrap();