diff --git a/Cargo.toml b/Cargo.toml
index 516ab03c..2fc3facc 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -58,21 +58,22 @@ members = [
"misc/multistream-select",
"misc/peer-id-generator",
"misc/rw-stream-sink",
- "transports/dns",
+ "muxers/mplex",
+ "muxers/yamux",
"protocols/floodsub",
"protocols/identify",
"protocols/kad",
- "protocols/ping",
"protocols/observed",
- "transports/relay",
+ "protocols/ping",
+ "protocols/plaintext",
"protocols/secio",
- "muxers/mplex",
- "muxers/yamux",
- "stores/peerstore",
"stores/datastore",
- "transports/tcp",
- "transports/uds",
- "transports/websocket",
- "transports/timeout",
+ "stores/peerstore",
+ "transports/dns",
"transports/ratelimit",
+ "transports/relay",
+ "transports/tcp",
+ "transports/timeout",
+ "transports/uds",
+ "transports/websocket"
]
diff --git a/core/src/either.rs b/core/src/either.rs
index 0566dbd3..17827062 100644
--- a/core/src/either.rs
+++ b/core/src/either.rs
@@ -18,11 +18,42 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
+use crate::{muxing::{Shutdown, StreamMuxer}, Multiaddr};
use futures::prelude::*;
-use muxing::{Shutdown, StreamMuxer};
-use std::io::{Error as IoError, Read, Write};
+use std::{fmt, io::{Error as IoError, Read, Write}};
use tokio_io::{AsyncRead, AsyncWrite};
-use Multiaddr;
+
+#[derive(Debug, Copy, Clone)]
+pub enum EitherError {
+ A(A),
+ B(B)
+}
+
+impl fmt::Display for EitherError
+where
+ A: fmt::Display,
+ B: fmt::Display
+{
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match self {
+ EitherError::A(a) => a.fmt(f),
+ EitherError::B(b) => b.fmt(f)
+ }
+ }
+}
+
+impl std::error::Error for EitherError
+where
+ A: fmt::Debug + std::error::Error,
+ B: fmt::Debug + std::error::Error
+{
+ fn cause(&self) -> Option<&dyn std::error::Error> {
+ match self {
+ EitherError::A(a) => a.cause(),
+ EitherError::B(b) => b.cause()
+ }
+ }
+}
/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to
/// either `First` or `Second`.
diff --git a/core/src/lib.rs b/core/src/lib.rs
index 11d279b4..dff458ab 100644
--- a/core/src/lib.rs
+++ b/core/src/lib.rs
@@ -89,7 +89,7 @@
//!
//! # fn main() {
//! let tcp_transport = libp2p_tcp_transport::TcpConfig::new();
-//! let upgraded = tcp_transport.with_upgrade(libp2p_core::upgrade::PlainTextConfig);
+//! let upgraded = tcp_transport.with_upgrade(libp2p_core::upgrade::DeniedUpgrade);
//!
//! // upgraded.dial(...) // automatically applies the plain text protocol on the socket
//! # }
@@ -132,31 +132,28 @@
//! extern crate tokio;
//!
//! use futures::{Future, Stream};
-//! use libp2p_ping::protocol::{Ping, PingOutput};
-//! use libp2p_core::Transport;
+//! use libp2p_ping::protocol::Ping;
+//! use libp2p_core::{Transport, upgrade::apply_outbound};
//! use tokio::runtime::current_thread::Runtime;
//!
//! # fn main() {
-//! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new()
-//! // We have a `TcpConfig` struct that implements `Transport`, and apply a `Ping` upgrade on it.
-//! .with_upgrade(Ping::default())
+//! let ping_dialer = libp2p_tcp_transport::TcpConfig::new()
+//! // We have a `TcpConfig` struct that implements `Dialer`, and apply a `Ping` upgrade on it.
+//! .and_then(|socket, _| {
+//! apply_outbound(socket, Ping::default()).map_err(|e| e.into_io_error())
+//! })
//! // TODO: right now the only available protocol is ping, but we want to replace it with
//! // something that is more simple to use
-//! .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!())
-//! .and_then(|out| {
-//! match out {
-//! PingOutput::Ponger(processing) => Box::new(processing) as Box>,
-//! PingOutput::Pinger(mut pinger) => {
-//! pinger.ping(());
-//! let f = pinger.into_future().map(|_| ()).map_err(|(err, _)| err);
-//! Box::new(f) as Box>
-//! },
-//! }
+//! .dial("/ip4/127.0.0.1/tcp/12345".parse::().unwrap()).unwrap_or_else(|_| panic!())
+//! .and_then(|mut pinger| {
+//! pinger.ping(());
+//! let f = pinger.into_future().map(|_| ()).map_err(|(e, _)| e);
+//! Box::new(f) as Box>
//! });
//!
//! // Runs until the ping arrives.
//! let mut rt = Runtime::new().unwrap();
-//! let _ = rt.block_on(ping_finished_future).unwrap();
+//! let _ = rt.block_on(ping_dialer).unwrap();
//! # }
//! ```
//!
@@ -221,4 +218,24 @@ pub use self::muxing::StreamMuxer;
pub use self::peer_id::PeerId;
pub use self::public_key::PublicKey;
pub use self::transport::Transport;
-pub use self::upgrade::{ConnectionUpgrade, Endpoint};
+pub use self::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, UpgradeError};
+
+#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
+pub enum Endpoint {
+ /// The socket comes from a dialer.
+ Dialer,
+ /// The socket comes from a listener.
+ Listener,
+}
+
+impl std::ops::Not for Endpoint {
+ type Output = Endpoint;
+
+ fn not(self) -> Self::Output {
+ match self {
+ Endpoint::Dialer => Endpoint::Listener,
+ Endpoint::Listener => Endpoint::Dialer
+ }
+ }
+}
+
diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs
index aadd25a4..6ba78435 100644
--- a/core/src/nodes/collection.rs
+++ b/core/src/nodes/collection.rs
@@ -18,15 +18,19 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
+use crate::{
+ PeerId,
+ muxing::StreamMuxer,
+ nodes::{
+ node::Substream,
+ handled_node_tasks::{HandledNodesEvent, HandledNodesTasks},
+ handled_node_tasks::{Task as HandledNodesTask, TaskId},
+ handled_node::NodeHandler
+ }
+};
use fnv::FnvHashMap;
use futures::prelude::*;
-use muxing::StreamMuxer;
-use nodes::node::Substream;
-use nodes::handled_node_tasks::{HandledNodesEvent, HandledNodesTasks};
-use nodes::handled_node_tasks::{Task as HandledNodesTask, TaskId};
-use nodes::handled_node::NodeHandler;
use std::{collections::hash_map::Entry, fmt, io, mem};
-use PeerId;
// TODO: make generic over PeerId
@@ -276,7 +280,8 @@ impl CollectionStream(&mut self, future: TFut, handler: THandler)
-> ReachAttemptId
where
- TFut: Future- + Send + 'static,
+ TFut: Future
- + Send + 'static,
+ TFut::Error: std::error::Error + Send + Sync + 'static,
THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs
index a33ce3fd..44dc8adf 100644
--- a/core/src/nodes/handled_node_tasks.rs
+++ b/core/src/nodes/handled_node_tasks.rs
@@ -18,18 +18,25 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
+use crate::{
+ PeerId,
+ muxing::StreamMuxer,
+ nodes::{
+ handled_node::{HandledNode, NodeHandler},
+ node::Substream
+ }
+};
use fnv::FnvHashMap;
use futures::{prelude::*, stream, sync::mpsc};
-use muxing::StreamMuxer;
-use nodes::node::Substream;
-use nodes::handled_node::{HandledNode, NodeHandler};
use smallvec::SmallVec;
-use std::collections::hash_map::{Entry, OccupiedEntry};
-use std::io::Error as IoError;
-use std::{fmt, mem};
+use std::{
+ collections::hash_map::{Entry, OccupiedEntry},
+ fmt,
+ io::{self, Error as IoError},
+ mem
+};
use tokio_executor;
use void::Void;
-use PeerId;
// TODO: make generic over PeerId
@@ -135,10 +142,10 @@ impl HandledNodesTasks(&mut self, future: TFut, handler: THandler)
- -> TaskId
+ pub fn add_reach_attempt(&mut self, future: TFut, handler: THandler) -> TaskId
where
- TFut: Future
- + Send + 'static,
+ TFut: Future
- + Send + 'static,
+ TFut::Error: std::error::Error + Send + Sync + 'static,
THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
@@ -349,7 +356,8 @@ impl Future for
NodeTask
where
TMuxer: StreamMuxer,
- TFut: Future
- ,
+ TFut: Future
- ,
+ TFut::Error: std::error::Error + Send + Sync + 'static,
THandler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent>,
{
type Item = ();
@@ -388,7 +396,8 @@ where
},
Err(err) => {
// End the task
- let event = InToExtMessage::TaskClosed(Err(err), Some(handler));
+ let ioerr = IoError::new(io::ErrorKind::Other, err);
+ let event = InToExtMessage::TaskClosed(Err(ioerr), Some(handler));
let _ = self.events_tx.unbounded_send((event, self.id));
return Ok(Async::Ready(()));
}
@@ -464,6 +473,7 @@ mod tests {
use tests::dummy_muxer::{DummyMuxer, DummyConnectionState};
use tokio::runtime::Builder;
use tokio::runtime::current_thread::Runtime;
+ use void::Void;
use {PeerId, PublicKey};
type TestNodeTask = NodeTask<
@@ -570,7 +580,7 @@ mod tests {
let peer_id = PublicKey::Rsa((0 .. 2048).map(|_| -> u8 { random() }).collect()).into_peer_id();
let mut task_ids = Vec::new();
for _i in 0..self.task_count {
- let fut = future::ok((peer_id.clone(), self.muxer.clone()));
+ let fut = future::ok::<_, Void>((peer_id.clone(), self.muxer.clone()));
task_ids.push(
handled_nodes.add_reach_attempt(fut, self.handler.clone())
);
@@ -719,7 +729,7 @@ mod tests {
assert_eq!(handled_nodes.tasks().count(), 0);
assert_eq!(handled_nodes.to_spawn.len(), 0);
- handled_nodes.add_reach_attempt( future::empty(), Handler::default() );
+ handled_nodes.add_reach_attempt( future::empty::<_, Void>(), Handler::default() );
assert_eq!(handled_nodes.tasks().count(), 1);
assert_eq!(handled_nodes.to_spawn.len(), 1);
diff --git a/core/src/nodes/protocols_handler.rs b/core/src/nodes/protocols_handler.rs
index 33581245..c5c6e0d7 100644
--- a/core/src/nodes/protocols_handler.rs
+++ b/core/src/nodes/protocols_handler.rs
@@ -18,15 +18,26 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
-use either::EitherOutput;
+use crate::{
+ either::{EitherError, EitherOutput},
+ nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent},
+ upgrade::{
+ self,
+ InboundUpgrade,
+ InboundUpgradeExt,
+ OutboundUpgrade,
+ OutboundUpgradeExt,
+ UpgradeInfo,
+ InboundUpgradeApply,
+ OutboundUpgradeApply,
+ DeniedUpgrade
+ }
+};
use futures::prelude::*;
-use nodes::handled_node::{NodeHandler, NodeHandlerEndpoint, NodeHandlerEvent};
use std::{io, marker::PhantomData, time::Duration};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_timer::Timeout;
-use upgrade::{self, apply::UpgradeApplyFuture, DeniedConnectionUpgrade};
use void::Void;
-use {ConnectionUpgrade, Endpoint};
/// Handler for a set of protocols for a specific connection with a remote.
///
@@ -81,7 +92,9 @@ pub trait ProtocolsHandler {
/// The type of the substream that contains the raw data.
type Substream: AsyncRead + AsyncWrite;
/// The upgrade for the protocol or protocols handled by this handler.
- type Protocol: ConnectionUpgrade;
+ type InboundProtocol: InboundUpgrade;
+ /// The upgrade for the protocol or protocols handled by this handler.
+ type OutboundProtocol: OutboundUpgrade;
/// Information about a substream. Can be sent to the handler through a `NodeHandlerEndpoint`,
/// and will be passed back in `inject_substream` or `inject_outbound_closed`.
type OutboundOpenInfo;
@@ -92,15 +105,22 @@ pub trait ProtocolsHandler {
/// > context you wouldn't accept one in particular (eg. only allow one substream at
/// > a time for a given protocol). The reason is that remotes are allowed to put the
/// > list of supported protocols in a cache in order to avoid spurious queries.
- fn listen_protocol(&self) -> Self::Protocol;
+ fn listen_protocol(&self) -> Self::InboundProtocol;
+
+ fn dialer_protocol(&self) -> Self::OutboundProtocol;
/// Injects a fully-negotiated substream in the handler.
///
/// This method is called when a substream has been successfully opened and negotiated.
- fn inject_fully_negotiated(
+ fn inject_fully_negotiated_inbound(
&mut self,
- protocol: >::Output,
- endpoint: NodeHandlerEndpoint,
+ protocol: >::Output
+ );
+
+ fn inject_fully_negotiated_outbound(
+ &mut self,
+ protocol: >::Output,
+ info: Self::OutboundOpenInfo
);
/// Injects an event coming from the outside in the handler.
@@ -126,12 +146,7 @@ pub trait ProtocolsHandler {
/// > **Note**: If this handler is combined with other handlers, as soon as `poll()` returns
/// > `Ok(Async::Ready(None))`, all the other handlers will receive a call to
/// > `shutdown()` and will eventually be closed and destroyed.
- fn poll(
- &mut self,
- ) -> Poll<
- Option>,
- io::Error,
- >;
+ fn poll(&mut self) -> Poll