mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-22 14:21:33 +00:00
Implement swarm
This commit is contained in:
@ -3,10 +3,10 @@ members = [
|
||||
"multihash",
|
||||
"multistream-select",
|
||||
"datastore",
|
||||
"libp2p-host",
|
||||
"example",
|
||||
"libp2p-peerstore",
|
||||
"libp2p-secio",
|
||||
"libp2p-transport",
|
||||
"libp2p-swarm",
|
||||
"libp2p-tcp-transport",
|
||||
"rw-stream-sink",
|
||||
]
|
||||
|
10
README.md
10
README.md
@ -13,12 +13,14 @@ Architecture of the crates of this repository:
|
||||
|
||||
- `datastore`: Utility library whose API provides a key-value storage with multiple possible
|
||||
backends. Used by `peerstore`.
|
||||
- `libp2p-host`: Stub. Will probably get reworked or removed.
|
||||
- `example`: Example usages of this library.
|
||||
- `libp2p-peerstore`: Generic storage for information about remote peers (their multiaddresses and
|
||||
their public key), with multiple possible backends. Each multiaddress also has a time-to-live.
|
||||
- `libp2p-secio`: Implementation of the `secio` protocol. Encrypts communications.
|
||||
- `libp2p-tcp-transport`: Implementation of the `Transport` trait for TCP/IP.
|
||||
- `libp2p-transport`: Contains the `Transport` trait. Will probably get reworked or removed.
|
||||
Used by `libp2p-swarm`.
|
||||
- `libp2p-secio`: Implementation of the `secio` protocol. Encrypts communications. Implements the
|
||||
`ConnectionUpgrade` trait of `libp2p-swarm`.
|
||||
- `libp2p-swarm`: Core library that contains all the traits of *libp2p* and plugs things together.
|
||||
- `libp2p-tcp-transport`: Implementation of the `Transport` trait of `libp2p-swarm` for TCP/IP.
|
||||
- `multihash`: Utility library that allows one to represent and manipulate
|
||||
[*multihashes*](https://github.com/multiformats/multihash). A *multihash* is a combination of a
|
||||
hash and its hashing algorithm.
|
||||
|
15
example/Cargo.toml
Normal file
15
example/Cargo.toml
Normal file
@ -0,0 +1,15 @@
|
||||
[package]
|
||||
name = "example"
|
||||
version = "0.1.0"
|
||||
authors = ["pierre <pierre.krieger1708@gmail.com>"]
|
||||
|
||||
[dependencies]
|
||||
bytes = "0.4"
|
||||
futures = "0.1"
|
||||
libp2p-secio = { path = "../libp2p-secio" }
|
||||
libp2p-swarm = { path = "../libp2p-swarm" }
|
||||
libp2p-tcp-transport = { path = "../libp2p-tcp-transport" }
|
||||
ring = { version = "0.12.1", features = ["rsa_signing"] }
|
||||
tokio-core = "0.1"
|
||||
tokio-io = "0.1"
|
||||
untrusted = "0.6"
|
105
example/examples/echo-dialer.rs
Normal file
105
example/examples/echo-dialer.rs
Normal file
@ -0,0 +1,105 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
extern crate bytes;
|
||||
extern crate futures;
|
||||
extern crate libp2p_secio as secio;
|
||||
extern crate libp2p_swarm as swarm;
|
||||
extern crate libp2p_tcp_transport as tcp;
|
||||
extern crate ring;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio_io;
|
||||
extern crate untrusted;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::future::{Future, FutureResult, IntoFuture};
|
||||
use futures::{Stream, Sink};
|
||||
use ring::signature::RSAKeyPair;
|
||||
use std::io::Error as IoError;
|
||||
use std::iter;
|
||||
use std::sync::Arc;
|
||||
use swarm::{Transport, ConnectionUpgrade};
|
||||
use tcp::Tcp;
|
||||
use tokio_core::reactor::Core;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_io::codec::length_delimited;
|
||||
use untrusted::Input;
|
||||
|
||||
fn main() {
|
||||
let mut core = Core::new().unwrap();
|
||||
let tcp = Tcp::new(core.handle()).unwrap();
|
||||
|
||||
let with_secio = tcp
|
||||
.with_upgrade(swarm::PlainText)
|
||||
.or_upgrade({
|
||||
let private_key = {
|
||||
let pkcs8 = include_bytes!("test-private-key.pk8");
|
||||
Arc::new(RSAKeyPair::from_pkcs8(Input::from(&pkcs8[..])).unwrap())
|
||||
};
|
||||
let public_key = include_bytes!("test-public-key.der").to_vec();
|
||||
|
||||
secio::SecioConnUpgrade {
|
||||
local_public_key: public_key,
|
||||
local_private_key: private_key,
|
||||
}
|
||||
});
|
||||
|
||||
let with_echo = with_secio.with_upgrade(Echo);
|
||||
|
||||
let dialer = with_echo.dial(swarm::multiaddr::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap())
|
||||
.map_err(|_| panic!())
|
||||
.unwrap()
|
||||
.and_then(|f| {
|
||||
f.send("hello world".into())
|
||||
})
|
||||
.and_then(|f| {
|
||||
f.into_future()
|
||||
.map(|(msg, rest)| {
|
||||
println!("received: {:?}", msg);
|
||||
rest
|
||||
})
|
||||
.map_err(|(err, _)| err)
|
||||
});
|
||||
|
||||
core.run(dialer).unwrap();
|
||||
}
|
||||
|
||||
// TODO: copy-pasted from echo-server
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct Echo;
|
||||
impl<C> ConnectionUpgrade<C> for Echo
|
||||
where C: AsyncRead + AsyncWrite
|
||||
{
|
||||
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
|
||||
type UpgradeIdentifier = ();
|
||||
|
||||
#[inline]
|
||||
fn protocol_names(&self) -> Self::NamesIter {
|
||||
iter::once(("/echo/1.0.0".into(), ()))
|
||||
}
|
||||
|
||||
type Output = length_delimited::Framed<C>;
|
||||
type Future = FutureResult<Self::Output, IoError>;
|
||||
|
||||
#[inline]
|
||||
fn upgrade(self, socket: C, _: Self::UpgradeIdentifier) -> Self::Future {
|
||||
Ok(length_delimited::Framed::new(socket)).into_future()
|
||||
}
|
||||
}
|
107
example/examples/echo-server.rs
Normal file
107
example/examples/echo-server.rs
Normal file
@ -0,0 +1,107 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
extern crate bytes;
|
||||
extern crate futures;
|
||||
extern crate libp2p_secio as secio;
|
||||
extern crate libp2p_swarm as swarm;
|
||||
extern crate libp2p_tcp_transport as tcp;
|
||||
extern crate ring;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio_io;
|
||||
extern crate untrusted;
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::future::{Future, FutureResult, IntoFuture, loop_fn, Loop};
|
||||
use futures::{Stream, Sink};
|
||||
use ring::signature::RSAKeyPair;
|
||||
use std::io::Error as IoError;
|
||||
use std::iter;
|
||||
use std::sync::Arc;
|
||||
use swarm::{Transport, ConnectionUpgrade};
|
||||
use tcp::Tcp;
|
||||
use tokio_core::reactor::Core;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_io::codec::length_delimited;
|
||||
use untrusted::Input;
|
||||
|
||||
fn main() {
|
||||
let mut core = Core::new().unwrap();
|
||||
let tcp = Tcp::new(core.handle()).unwrap();
|
||||
|
||||
let with_secio = tcp
|
||||
.with_upgrade(swarm::PlainText)
|
||||
.or_upgrade({
|
||||
let private_key = {
|
||||
let pkcs8 = include_bytes!("test-private-key.pk8");
|
||||
Arc::new(RSAKeyPair::from_pkcs8(Input::from(&pkcs8[..])).unwrap())
|
||||
};
|
||||
let public_key = include_bytes!("test-public-key.der").to_vec();
|
||||
|
||||
secio::SecioConnUpgrade {
|
||||
local_public_key: public_key,
|
||||
local_private_key: private_key,
|
||||
}
|
||||
});
|
||||
|
||||
let with_echo = with_secio.with_upgrade(Echo);
|
||||
|
||||
let future = with_echo.listen_on(swarm::multiaddr::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap())
|
||||
.map_err(|_| panic!())
|
||||
.unwrap()
|
||||
.for_each(|socket| {
|
||||
loop_fn(socket, |socket| {
|
||||
socket.into_future()
|
||||
.map_err(|(err, _)| err)
|
||||
.and_then(|(msg, rest)| {
|
||||
if let Some(msg) = msg {
|
||||
Box::new(rest.send(msg).map(|m| Loop::Continue(m))) as Box<Future<Item = _, Error = _>>
|
||||
} else {
|
||||
Box::new(Ok(Loop::Break(())).into_future()) as Box<Future<Item = _, Error = _>>
|
||||
}
|
||||
})
|
||||
})
|
||||
});
|
||||
|
||||
core.run(future).unwrap();
|
||||
}
|
||||
|
||||
// TODO: copy-pasted from echo-dialer
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct Echo;
|
||||
impl<C> ConnectionUpgrade<C> for Echo
|
||||
where C: AsyncRead + AsyncWrite
|
||||
{
|
||||
type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>;
|
||||
type UpgradeIdentifier = ();
|
||||
|
||||
#[inline]
|
||||
fn protocol_names(&self) -> Self::NamesIter {
|
||||
iter::once(("/echo/1.0.0".into(), ()))
|
||||
}
|
||||
|
||||
type Output = length_delimited::Framed<C>;
|
||||
type Future = FutureResult<Self::Output, IoError>;
|
||||
|
||||
#[inline]
|
||||
fn upgrade(self, socket: C, _: Self::UpgradeIdentifier) -> Self::Future {
|
||||
Ok(length_delimited::Framed::new(socket)).into_future()
|
||||
}
|
||||
}
|
BIN
example/examples/test-private-key.pk8
Normal file
BIN
example/examples/test-private-key.pk8
Normal file
Binary file not shown.
BIN
example/examples/test-public-key.der
Normal file
BIN
example/examples/test-public-key.der
Normal file
Binary file not shown.
1
example/examples/test-public-key.pem
Normal file
1
example/examples/test-public-key.pem
Normal file
@ -0,0 +1 @@
|
||||
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAw3Dq8sdKZ/Lq0AkCFRB+ywQXpubvHgscR+WyVV4tdQE+0OcJSC5hx5W+XLR/y21PTe/30f0oYP7oJv8rH2Mov1Gvops2l6efVqPA8ZggDRrAkotjLXXJggDimIGichRS9+izNi/Lit77H2bFLmlkTfrFOjibWrPP+XvoYRFN3B1gyUT5P1hARePlbb86dcd1e5l/x/lBDH7DJ+TxsY7li6HjgvlxK4jAXa9yzdkDvJOpScs+la7gGawwesDKoQ5dWyqlgT93cbXhwOHTUvownl0hwtYjiK9UGWW8ptn9/3ehYAyi6Kx/SqLJsXiJFlPg16KNunGBHL7VAFyYZ51NEwIDAQAB
|
0
example/src/lib.rs
Normal file
0
example/src/lib.rs
Normal file
@ -1,96 +0,0 @@
|
||||
//! "Host" abstraction for transports, listener addresses, peer store.
|
||||
|
||||
extern crate futures;
|
||||
extern crate libp2p_transport as transport;
|
||||
|
||||
use futures::{Future, IntoFuture};
|
||||
use transport::{ProtocolId, Socket};
|
||||
use transport::multiaddr::Multiaddr;
|
||||
|
||||
/// Produces a future for each incoming `Socket`.
|
||||
pub trait Handler<S: Socket> {
|
||||
type Future: IntoFuture<Item = (), Error = ()>;
|
||||
|
||||
/// Handle the incoming socket, producing a future which should resolve
|
||||
/// when the handler is finished.
|
||||
fn handle(&self, socket: S) -> Self::Future;
|
||||
fn boxed(self) -> BoxHandler<S>
|
||||
where
|
||||
Self: Sized + Send + 'static,
|
||||
<Self::Future as IntoFuture>::Future: Send + 'static,
|
||||
{
|
||||
BoxHandler(Box::new(move |socket| {
|
||||
Box::new(self.handle(socket).into_future()) as _
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Socket, F, U> Handler<S> for F
|
||||
where
|
||||
F: Fn(S) -> U,
|
||||
U: IntoFuture<Item = (), Error = ()>,
|
||||
{
|
||||
type Future = U;
|
||||
|
||||
fn handle(&self, socket: S) -> U {
|
||||
(self)(socket)
|
||||
}
|
||||
}
|
||||
|
||||
/// A boxed handler.
|
||||
pub struct BoxHandler<S: Socket>(Box<Handler<S, Future = Box<Future<Item = (), Error = ()>>>>);
|
||||
|
||||
impl<S: Socket> Handler<S> for BoxHandler<S> {
|
||||
type Future = Box<Future<Item = (), Error = ()>>;
|
||||
|
||||
fn handle(&self, socket: S) -> Self::Future {
|
||||
self.0.handle(socket)
|
||||
}
|
||||
}
|
||||
|
||||
/// Multiplexes sockets onto handlers by protocol-id.
|
||||
pub trait Mux: Sync {
|
||||
/// The socket type this manages.
|
||||
type Socket: Socket;
|
||||
|
||||
/// Attach an incoming socket.
|
||||
fn push(&self, socket: Self::Socket);
|
||||
|
||||
/// Set the socket handler for a given protocol id.
|
||||
fn set_handler(&self, proto: ProtocolId, handler: BoxHandler<Self::Socket>);
|
||||
|
||||
/// Remove the socket handler for the given protocol id, returning the old handler if it existed.
|
||||
fn remove_handler(&self, proto: &ProtocolId) -> Option<BoxHandler<Self::Socket>>;
|
||||
}
|
||||
|
||||
/// Unimplemented. Maps peer IDs to connected addresses, protocols, and data.
|
||||
pub trait PeerStore {}
|
||||
|
||||
/// This is a common abstraction over the low-level bits of libp2p.
|
||||
///
|
||||
/// It handles connecting over, adding and removing transports,
|
||||
/// wraps an arbitrary event loop, and manages protocol IDs.
|
||||
pub trait Host {
|
||||
type Socket: Socket;
|
||||
type Mux: Mux<Socket = Self::Socket>;
|
||||
type Multiaddrs: IntoIterator<Item = Multiaddr>;
|
||||
|
||||
/// Get a handle to the peer store.
|
||||
fn peer_store(&self) -> &PeerStore;
|
||||
|
||||
/// Get a handle to the underlying muxer.
|
||||
fn mux(&self) -> &Self::Mux;
|
||||
|
||||
/// Set the socket handler for a given protocol id.
|
||||
fn set_handler(&self, proto: ProtocolId, handler: BoxHandler<Self::Socket>) {
|
||||
self.mux().set_handler(proto, handler);
|
||||
}
|
||||
|
||||
/// Remove the socket handler for the given protocol id, returning the old handler if it existed.
|
||||
fn remove_handler(&self, proto: &ProtocolId) -> Option<BoxHandler<Self::Socket>> {
|
||||
self.mux().remove_handler(proto)
|
||||
}
|
||||
|
||||
/// Addresses we're listening on.
|
||||
fn listen_addrs(&self) -> Self::Multiaddrs;
|
||||
}
|
@ -6,10 +6,12 @@ authors = ["Parity Technologies <admin@parity.io>"]
|
||||
[dependencies]
|
||||
bytes = "0.4"
|
||||
futures = "0.1"
|
||||
libp2p-swarm = { path = "../libp2p-swarm" }
|
||||
protobuf = "1.4.2"
|
||||
rand = "0.3.17"
|
||||
ring = { version = "0.12.1", features = ["rsa_signing"] }
|
||||
rust-crypto = "^0.2"
|
||||
rw-stream-sink = { path = "../rw-stream-sink" }
|
||||
tokio-core = "0.1.6"
|
||||
tokio-io = "0.1.0"
|
||||
untrusted = "0.6.0"
|
||||
|
@ -27,23 +27,32 @@
|
||||
//! method will perform a handshake with the host, and return a future that corresponds to the
|
||||
//! moment when the handshake succeeds or errored. On success, the future produces a
|
||||
//! `SecioMiddleware` that implements `Sink` and `Stream` and can be used to send packets of data.
|
||||
//!
|
||||
//! However for integration with the rest of `libp2p` you are encouraged to use the
|
||||
//! `SecioConnUpgrade` struct instead. This struct implements the `ConnectionUpgrade` trait and
|
||||
//! will automatically apply secio on any incoming or outgoing connection.
|
||||
|
||||
extern crate bytes;
|
||||
extern crate crypto;
|
||||
extern crate futures;
|
||||
extern crate libp2p_swarm;
|
||||
extern crate protobuf;
|
||||
extern crate rand;
|
||||
extern crate ring;
|
||||
extern crate rw_stream_sink;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio_io;
|
||||
extern crate untrusted;
|
||||
|
||||
pub use self::error::SecioError;
|
||||
|
||||
use bytes::BytesMut;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::{Future, Poll, StartSend, Sink, Stream};
|
||||
use futures::stream::MapErr as StreamMapErr;
|
||||
use ring::signature::RSAKeyPair;
|
||||
use std::io::Error as IoError;
|
||||
use rw_stream_sink::RwStreamSink;
|
||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
use std::iter;
|
||||
use std::sync::Arc;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
@ -54,6 +63,55 @@ mod keys_proto;
|
||||
mod handshake;
|
||||
mod structs_proto;
|
||||
|
||||
/// Implementation of the `ConnectionUpgrade` trait of `libp2p_swarm`. Automatically applies any
|
||||
/// secio on any connection.
|
||||
#[derive(Clone)]
|
||||
pub struct SecioConnUpgrade {
|
||||
/// Public key of the local node. Must match `local_private_key` or an error will happen during
|
||||
/// the handshake.
|
||||
pub local_public_key: Vec<u8>,
|
||||
/// Private key that will be used to prove the identity of the local node.
|
||||
pub local_private_key: Arc<RSAKeyPair>,
|
||||
}
|
||||
|
||||
impl<S> libp2p_swarm::ConnectionUpgrade<S> for SecioConnUpgrade
|
||||
where S: AsyncRead + AsyncWrite + 'static
|
||||
{
|
||||
type Output = RwStreamSink<
|
||||
StreamMapErr<
|
||||
SecioMiddleware<S>,
|
||||
fn(SecioError) -> IoError,
|
||||
>,
|
||||
>;
|
||||
type Future = Box<Future<Item = Self::Output, Error = IoError>>;
|
||||
type NamesIter = iter::Once<(Bytes, ())>;
|
||||
type UpgradeIdentifier = ();
|
||||
|
||||
#[inline]
|
||||
fn protocol_names(&self) -> Self::NamesIter {
|
||||
iter::once(("/secio/1.0.0".into(), ()))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn upgrade(self, incoming: S, _: ()) -> Self::Future {
|
||||
let fut = SecioMiddleware::handshake(
|
||||
incoming,
|
||||
self.local_public_key.clone(),
|
||||
self.local_private_key.clone(),
|
||||
);
|
||||
let wrapped = fut.map(|stream_sink| {
|
||||
let mapped = stream_sink.map_err(map_err as fn(_) -> _);
|
||||
RwStreamSink::new(mapped)
|
||||
}).map_err(map_err);
|
||||
Box::new(wrapped)
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn map_err(err: SecioError) -> IoError {
|
||||
IoError::new(IoErrorKind::InvalidData, err)
|
||||
}
|
||||
|
||||
/// Wraps around an object that implements `AsyncRead` and `AsyncWrite`.
|
||||
///
|
||||
/// Implements `Sink` and `Stream` whose items are frames of data. Each frame is encoded
|
||||
|
@ -1,10 +1,12 @@
|
||||
[package]
|
||||
name = "libp2p-host"
|
||||
name = "libp2p-swarm"
|
||||
version = "0.1.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
|
||||
[dependencies]
|
||||
bytes = "0.4"
|
||||
multiaddr = "0.2.0"
|
||||
multistream-select = { path = "../multistream-select" }
|
||||
futures = "0.1"
|
||||
tokio-core = "0.1"
|
||||
tokio-io = "0.1"
|
||||
libp2p-transport = { path = "../libp2p-transport" }
|
34
libp2p-swarm/src/lib.rs
Normal file
34
libp2p-swarm/src/lib.rs
Normal file
@ -0,0 +1,34 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! Transport and I/O primitives for libp2p.
|
||||
|
||||
extern crate bytes;
|
||||
#[macro_use]
|
||||
extern crate futures;
|
||||
extern crate multistream_select;
|
||||
extern crate tokio_io;
|
||||
|
||||
/// Multi-address re-export.
|
||||
pub extern crate multiaddr;
|
||||
|
||||
pub mod transport;
|
||||
|
||||
pub use self::transport::{ConnectionUpgrade, PlainText, Transport, UpgradedNode, OrUpgrade};
|
611
libp2p-swarm/src/transport.rs
Normal file
611
libp2p-swarm/src/transport.rs
Normal file
@ -0,0 +1,611 @@
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! Handles entering a connection with a peer.
|
||||
//!
|
||||
//! The two main elements of this module are the `Transport` and `ConnectionUpgrade` traits.
|
||||
//! `Transport` is implemented on objects that allow dialing and listening. `ConnectionUpgrade` is
|
||||
//! implemented on objects that make it possible to upgrade a connection (for example by adding an
|
||||
//! encryption middleware to the connection).
|
||||
//!
|
||||
//! Thanks to the `Transport::or_transport`, `Transport::with_upgrade` and
|
||||
//! `UpgradeNode::or_upgrade` methods, you can combine multiple transports and/or upgrades together
|
||||
//! in a complex chain of protocols negotiation.
|
||||
|
||||
use bytes::Bytes;
|
||||
use futures::{Stream, Poll, Async};
|
||||
use futures::future::{IntoFuture, Future, ok as future_ok, FutureResult};
|
||||
use multiaddr::Multiaddr;
|
||||
use multistream_select;
|
||||
use std::io::{Cursor, Error as IoError, Read, Write};
|
||||
use std::iter;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
/// A transport is an object that can be used to produce connections by listening or dialing a
|
||||
/// peer.
|
||||
///
|
||||
/// This trait is implemented on concrete transports (eg. TCP, UDP, etc.), but also on wrappers
|
||||
/// around them.
|
||||
///
|
||||
/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other
|
||||
/// > words, listening or dialing consumes the transport object. This has been designed
|
||||
/// > so that you would implement this trait on `&Foo` or `&mut Foo` instead of directly
|
||||
/// > on `Foo`.
|
||||
pub trait Transport {
|
||||
/// The raw connection to a peer.
|
||||
type RawConn: AsyncRead + AsyncWrite;
|
||||
|
||||
/// The listener produces incoming connections.
|
||||
type Listener: Stream<Item = Self::RawConn, Error = IoError>;
|
||||
|
||||
/// A future which indicates that we are currently dialing to a peer.
|
||||
type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
|
||||
|
||||
/// Listen on the given multi-addr.
|
||||
///
|
||||
/// Returns the address back if it isn't supported.
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)>
|
||||
where Self: Sized;
|
||||
|
||||
/// Dial to the given multi-addr.
|
||||
///
|
||||
/// Returns either a future which may resolve to a connection, or gives back the multiaddress.
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> where Self: Sized;
|
||||
|
||||
/// Builds a new struct that implements `Transport` that contains both `self` and `other`.
|
||||
///
|
||||
/// The returned object will redirect its calls to `self`, except that if `listen_on` or `dial`
|
||||
/// return an error then `other` will be tried.
|
||||
#[inline]
|
||||
fn or_transport<T>(self, other: T) -> OrTransport<Self, T>
|
||||
where Self: Sized
|
||||
{
|
||||
OrTransport(self, other)
|
||||
}
|
||||
|
||||
/// Wraps this transport inside an upgrade. Whenever a connection that uses this transport
|
||||
/// is established, it is wrapped inside the upgrade.
|
||||
///
|
||||
/// > **Note**: The concept of an *upgrade* for example includes middlewares such *secio*
|
||||
/// > (communication encryption), *multiplex*, but also a protocol handler.
|
||||
#[inline]
|
||||
fn with_upgrade<U>(self, upgrade: U) -> UpgradedNode<Self, U>
|
||||
where Self: Sized,
|
||||
U: ConnectionUpgrade<Self::RawConn>
|
||||
{
|
||||
UpgradedNode {
|
||||
transports: self,
|
||||
upgrade: upgrade,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Dummy implementation of `Transport` that just denies every single attempt.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct DeniedTransport;
|
||||
|
||||
impl Transport for DeniedTransport {
|
||||
// TODO: could use `!` for associated types once stable
|
||||
type RawConn = Cursor<Vec<u8>>;
|
||||
type Listener = Box<Stream<Item = Self::RawConn, Error = IoError>>;
|
||||
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
||||
|
||||
#[inline]
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)> {
|
||||
Err((DeniedTransport, addr))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||
Err((DeniedTransport, addr))
|
||||
}
|
||||
}
|
||||
|
||||
/// Struct returned by `or_transport()`.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct OrTransport<A, B>(A, B);
|
||||
|
||||
impl<A, B> Transport for OrTransport<A, B>
|
||||
where A: Transport,
|
||||
B: Transport
|
||||
{
|
||||
type RawConn = EitherSocket<A::RawConn, B::RawConn>;
|
||||
type Listener = EitherStream<A::Listener, B::Listener>;
|
||||
type Dial = EitherTransportFuture<
|
||||
<A::Dial as IntoFuture>::Future,
|
||||
<B::Dial as IntoFuture>::Future,
|
||||
>;
|
||||
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)> {
|
||||
let (first, addr) = match self.0.listen_on(addr) {
|
||||
Ok(connec) => return Ok(EitherStream::First(connec)),
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
match self.1.listen_on(addr) {
|
||||
Ok(connec) => Ok(EitherStream::Second(connec)),
|
||||
Err((second, addr)) => Err((OrTransport(first, second), addr)),
|
||||
}
|
||||
}
|
||||
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||
let (first, addr) = match self.0.dial(addr) {
|
||||
Ok(connec) => return Ok(EitherTransportFuture::First(connec.into_future())),
|
||||
Err(err) => err,
|
||||
};
|
||||
|
||||
match self.1.dial(addr) {
|
||||
Ok(connec) => Ok(EitherTransportFuture::Second(connec.into_future())),
|
||||
Err((second, addr)) => Err((OrTransport(first, second), addr)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum EitherStream<A, B> {
|
||||
First(A),
|
||||
Second(B),
|
||||
}
|
||||
|
||||
impl<A, B> Stream for EitherStream<A, B>
|
||||
where A: Stream<Error = IoError>,
|
||||
B: Stream<Error = IoError>
|
||||
{
|
||||
type Item = EitherSocket<A::Item, B::Item>;
|
||||
type Error = IoError;
|
||||
|
||||
#[inline]
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
match self {
|
||||
&mut EitherStream::First(ref mut a) => {
|
||||
a.poll().map(|i| i.map(|v| v.map(EitherSocket::First)))
|
||||
}
|
||||
&mut EitherStream::Second(ref mut a) => {
|
||||
a.poll().map(|i| i.map(|v| v.map(EitherSocket::Second)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements `Future` and redirects calls to either `First` or `Second`.
|
||||
///
|
||||
/// Additionally, the output will be wrapped inside a `EitherSocket`.
|
||||
///
|
||||
/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be
|
||||
/// > removed eventually.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum EitherTransportFuture<A, B> {
|
||||
First(A),
|
||||
Second(B),
|
||||
}
|
||||
|
||||
impl<A, B> Future for EitherTransportFuture<A, B>
|
||||
where A: Future<Error = IoError>,
|
||||
B: Future<Error = IoError>
|
||||
{
|
||||
type Item = EitherSocket<A::Item, B::Item>;
|
||||
type Error = IoError;
|
||||
|
||||
#[inline]
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self {
|
||||
&mut EitherTransportFuture::First(ref mut a) => {
|
||||
let item = try_ready!(a.poll());
|
||||
Ok(Async::Ready(EitherSocket::First(item)))
|
||||
}
|
||||
&mut EitherTransportFuture::Second(ref mut b) => {
|
||||
let item = try_ready!(b.poll());
|
||||
Ok(Async::Ready(EitherSocket::Second(item)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to either `First` or
|
||||
/// `Second`.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum EitherSocket<A, B> {
|
||||
First(A),
|
||||
Second(B),
|
||||
}
|
||||
|
||||
impl<A, B> AsyncRead for EitherSocket<A, B>
|
||||
where A: AsyncRead,
|
||||
B: AsyncRead
|
||||
{
|
||||
#[inline]
|
||||
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
|
||||
match self {
|
||||
&EitherSocket::First(ref a) => a.prepare_uninitialized_buffer(buf),
|
||||
&EitherSocket::Second(ref b) => b.prepare_uninitialized_buffer(buf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Read for EitherSocket<A, B>
|
||||
where A: Read,
|
||||
B: Read
|
||||
{
|
||||
#[inline]
|
||||
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
|
||||
match self {
|
||||
&mut EitherSocket::First(ref mut a) => a.read(buf),
|
||||
&mut EitherSocket::Second(ref mut b) => b.read(buf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> AsyncWrite for EitherSocket<A, B>
|
||||
where A: AsyncWrite,
|
||||
B: AsyncWrite
|
||||
{
|
||||
#[inline]
|
||||
fn shutdown(&mut self) -> Poll<(), IoError> {
|
||||
match self {
|
||||
&mut EitherSocket::First(ref mut a) => a.shutdown(),
|
||||
&mut EitherSocket::Second(ref mut b) => b.shutdown(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, B> Write for EitherSocket<A, B>
|
||||
where A: Write,
|
||||
B: Write
|
||||
{
|
||||
#[inline]
|
||||
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
|
||||
match self {
|
||||
&mut EitherSocket::First(ref mut a) => a.write(buf),
|
||||
&mut EitherSocket::Second(ref mut b) => b.write(buf),
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush(&mut self) -> Result<(), IoError> {
|
||||
match self {
|
||||
&mut EitherSocket::First(ref mut a) => a.flush(),
|
||||
&mut EitherSocket::Second(ref mut b) => b.flush(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Implemented on structs that describe a possible upgrade to a connection between two peers.
|
||||
///
|
||||
/// The generic `C` is the type of the incoming connection before it is upgraded.
|
||||
///
|
||||
/// > **Note**: The `upgrade` method of this trait uses `self` and not `&self` or `&mut self`.
|
||||
/// > This has been designed so that you would implement this trait on `&Foo` or
|
||||
/// > `&mut Foo` instead of directly on `Foo`.
|
||||
pub trait ConnectionUpgrade<C: AsyncRead + AsyncWrite> {
|
||||
/// Iterator returned by `protocol_names`.
|
||||
type NamesIter: Iterator<Item = (Bytes, Self::UpgradeIdentifier)>;
|
||||
/// Type that serves as an identifier for the protocol. This type only exists to be returned
|
||||
/// by the `NamesIter` and then be passed to `upgrade`.
|
||||
///
|
||||
/// This is only useful on implementations that dispatch between multiple possible upgrades.
|
||||
/// Any basic implementation will probably just use the `()` type.
|
||||
type UpgradeIdentifier;
|
||||
|
||||
/// Returns the name of the protocols to advertise to the remote.
|
||||
fn protocol_names(&self) -> Self::NamesIter;
|
||||
|
||||
/// Type of the stream that has been upgraded. Generally wraps around `C` and `Self`.
|
||||
///
|
||||
/// > **Note**: For upgrades that add an intermediary layer (such as `secio` or `multiplex`),
|
||||
/// > this associated type must implement `AsyncRead + AsyncWrite`.
|
||||
type Output;
|
||||
/// Type of the future that will resolve to `Self::Output`.
|
||||
type Future: Future<Item = Self::Output, Error = IoError>;
|
||||
|
||||
/// This method is called after protocol negotiation has been performed.
|
||||
///
|
||||
/// Because performing the upgrade may not be instantaneous (eg. it may require a handshake),
|
||||
/// this function returns a future instead of the direct output.
|
||||
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier) -> Self::Future;
|
||||
}
|
||||
|
||||
/// See `or_upgrade()`.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct OrUpgrade<A, B>(A, B);
|
||||
|
||||
impl<C, A, B> ConnectionUpgrade<C> for OrUpgrade<A, B>
|
||||
where C: AsyncRead + AsyncWrite,
|
||||
A: ConnectionUpgrade<C>,
|
||||
B: ConnectionUpgrade<C>
|
||||
{
|
||||
type NamesIter = NamesIterChain<A::NamesIter, B::NamesIter>;
|
||||
type UpgradeIdentifier = EitherUpgradeIdentifier<A::UpgradeIdentifier, B::UpgradeIdentifier>;
|
||||
|
||||
#[inline]
|
||||
fn protocol_names(&self) -> Self::NamesIter {
|
||||
NamesIterChain {
|
||||
first: self.0.protocol_names(),
|
||||
second: self.1.protocol_names(),
|
||||
}
|
||||
}
|
||||
|
||||
type Output = EitherSocket<A::Output, B::Output>;
|
||||
type Future = EitherConnUpgrFuture<A::Future, B::Future>;
|
||||
|
||||
#[inline]
|
||||
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier) -> Self::Future {
|
||||
match id {
|
||||
EitherUpgradeIdentifier::First(id) => {
|
||||
EitherConnUpgrFuture::First(self.0.upgrade(socket, id))
|
||||
}
|
||||
EitherUpgradeIdentifier::Second(id) => {
|
||||
EitherConnUpgrFuture::Second(self.1.upgrade(socket, id))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal struct used by the `OrUpgrade` trait.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum EitherUpgradeIdentifier<A, B> {
|
||||
First(A),
|
||||
Second(B),
|
||||
}
|
||||
|
||||
/// Implements `Future` and redirects calls to either `First` or `Second`.
|
||||
///
|
||||
/// Additionally, the output will be wrapped inside a `EitherSocket`.
|
||||
///
|
||||
/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be
|
||||
/// > removed eventually.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum EitherConnUpgrFuture<A, B> {
|
||||
First(A),
|
||||
Second(B),
|
||||
}
|
||||
|
||||
impl<A, B> Future for EitherConnUpgrFuture<A, B>
|
||||
where A: Future<Error = IoError>,
|
||||
B: Future<Error = IoError>
|
||||
{
|
||||
type Item = EitherSocket<A::Item, B::Item>;
|
||||
type Error = IoError;
|
||||
|
||||
#[inline]
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
match self {
|
||||
&mut EitherConnUpgrFuture::First(ref mut a) => {
|
||||
let item = try_ready!(a.poll());
|
||||
Ok(Async::Ready(EitherSocket::First(item)))
|
||||
}
|
||||
&mut EitherConnUpgrFuture::Second(ref mut b) => {
|
||||
let item = try_ready!(b.poll());
|
||||
Ok(Async::Ready(EitherSocket::Second(item)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal type used by the `OrUpgrade` struct.
|
||||
///
|
||||
/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be
|
||||
/// > removed eventually.
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct NamesIterChain<A, B> {
|
||||
first: A,
|
||||
second: B,
|
||||
}
|
||||
|
||||
impl<A, B, AId, BId> Iterator for NamesIterChain<A, B>
|
||||
where A: Iterator<Item = (Bytes, AId)>,
|
||||
B: Iterator<Item = (Bytes, BId)>
|
||||
{
|
||||
type Item = (Bytes, EitherUpgradeIdentifier<AId, BId>);
|
||||
|
||||
#[inline]
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
if let Some((name, id)) = self.first.next() {
|
||||
return Some((name, EitherUpgradeIdentifier::First(id)));
|
||||
}
|
||||
if let Some((name, id)) = self.second.next() {
|
||||
return Some((name, EitherUpgradeIdentifier::Second(id)));
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||
let (min1, max1) = self.first.size_hint();
|
||||
let (min2, max2) = self.second.size_hint();
|
||||
let max = match (max1, max2) {
|
||||
(Some(max1), Some(max2)) => max1.checked_add(max2),
|
||||
_ => None,
|
||||
};
|
||||
(min1.saturating_add(min2), max)
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementation of the `ConnectionUpgrade` that negotiates the `/plaintext/1.0.0` protocol and
|
||||
/// simply passes communications through without doing anything more.
|
||||
///
|
||||
/// > **Note**: Generally used as an alternative to `secio` if a security layer is not desirable.
|
||||
// TODO: move `PlainText` to a separate crate?
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct PlainText;
|
||||
|
||||
impl<C> ConnectionUpgrade<C> for PlainText
|
||||
where C: AsyncRead + AsyncWrite
|
||||
{
|
||||
type Output = C;
|
||||
type Future = FutureResult<C, IoError>;
|
||||
type UpgradeIdentifier = ();
|
||||
type NamesIter = iter::Once<(Bytes, ())>;
|
||||
|
||||
#[inline]
|
||||
fn upgrade(self, i: C, _: ()) -> Self::Future {
|
||||
future_ok(i)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn protocol_names(&self) -> Self::NamesIter {
|
||||
iter::once((Bytes::from("/plaintext/1.0.0"), ()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received
|
||||
/// connection.
|
||||
///
|
||||
/// See the `Transport::with_upgrade` method.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UpgradedNode<T, C> {
|
||||
transports: T,
|
||||
upgrade: C,
|
||||
}
|
||||
|
||||
impl<'a, T, C> UpgradedNode<T, C>
|
||||
where T: Transport + 'a,
|
||||
C: ConnectionUpgrade<T::RawConn> + 'a
|
||||
{
|
||||
/// Builds a new struct that implements `ConnectionUpgrade` that contains both `self` and
|
||||
/// `other_upg`.
|
||||
///
|
||||
/// The returned object will try to negotiate either the protocols of `self` or the protocols
|
||||
/// of `other_upg`, then upgrade the connection to the negogiated protocol.
|
||||
#[inline]
|
||||
pub fn or_upgrade<D>(self, other_upg: D) -> UpgradedNode<T, OrUpgrade<C, D>>
|
||||
where D: ConnectionUpgrade<T::RawConn> + 'a
|
||||
{
|
||||
UpgradedNode {
|
||||
transports: self.transports,
|
||||
upgrade: OrUpgrade(self.upgrade, other_upg),
|
||||
}
|
||||
}
|
||||
|
||||
/// Tries to dial on the `Multiaddr` using the transport that was passed to `new`, then upgrade
|
||||
/// the connection.
|
||||
///
|
||||
/// Note that this does the same as `Transport::dial`, but with less restrictions on the trait
|
||||
/// requirements.
|
||||
#[inline]
|
||||
pub fn dial(
|
||||
self,
|
||||
addr: Multiaddr,
|
||||
) -> Result<Box<Future<Item = C::Output, Error = IoError> + 'a>, (Self, Multiaddr)> {
|
||||
let upgrade = self.upgrade;
|
||||
|
||||
let dialed_fut = match self.transports.dial(addr) {
|
||||
Ok(f) => f.into_future(),
|
||||
Err((trans, addr)) => {
|
||||
let builder = UpgradedNode {
|
||||
transports: trans,
|
||||
upgrade: upgrade,
|
||||
};
|
||||
|
||||
return Err((builder, addr));
|
||||
}
|
||||
};
|
||||
|
||||
let future = dialed_fut
|
||||
// Try to negotiate the protocol.
|
||||
.and_then(move |connection| {
|
||||
let iter = upgrade.protocol_names()
|
||||
.map(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
|
||||
let negotiated = multistream_select::dialer_select_proto(connection, iter)
|
||||
.map_err(|err| panic!("{:?}", err)); // TODO:
|
||||
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade))
|
||||
})
|
||||
.and_then(|(upgrade_id, connection, upgrade)| {
|
||||
upgrade.upgrade(connection, upgrade_id)
|
||||
});
|
||||
|
||||
Ok(Box::new(future))
|
||||
}
|
||||
|
||||
/// Start listening on the multiaddr using the transport that was passed to `new`.
|
||||
/// Then whenever a connection is opened, it is upgraded.
|
||||
///
|
||||
/// Note that this does the same as `Transport::listen_on`, but with less restrictions on the
|
||||
/// trait requirements.
|
||||
#[inline]
|
||||
pub fn listen_on(
|
||||
self,
|
||||
addr: Multiaddr,
|
||||
) -> Result<Box<Stream<Item = C::Output, Error = IoError> + 'a>, (Self, Multiaddr)>
|
||||
where C::NamesIter: Clone, // TODO: not elegant
|
||||
C: Clone
|
||||
{
|
||||
let upgrade = self.upgrade;
|
||||
|
||||
let listening_stream = match self.transports.listen_on(addr) {
|
||||
Ok(l) => l,
|
||||
Err((trans, addr)) => {
|
||||
let builder = UpgradedNode {
|
||||
transports: trans,
|
||||
upgrade: upgrade,
|
||||
};
|
||||
|
||||
return Err((builder, addr));
|
||||
}
|
||||
};
|
||||
|
||||
let stream = listening_stream
|
||||
// Try to negotiate the protocol.
|
||||
.and_then(move |connection| {
|
||||
let upgrade = upgrade.clone();
|
||||
#[inline]
|
||||
fn iter_map<T>((n, t): (Bytes, T)) -> (Bytes, fn(&Bytes,&Bytes)->bool, T) {
|
||||
(n, <Bytes as PartialEq>::eq, t)
|
||||
}
|
||||
let iter = upgrade.protocol_names().map(iter_map);
|
||||
let negotiated = multistream_select::listener_select_proto(connection, iter)
|
||||
.map_err(|err| panic!("{:?}", err)); // TODO:
|
||||
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade))
|
||||
.map_err(|_| panic!()) // TODO:
|
||||
})
|
||||
.map_err(|_| panic!()) // TODO:
|
||||
.and_then(|(upgrade_id, connection, upgrade)| {
|
||||
upgrade.upgrade(connection, upgrade_id)
|
||||
});
|
||||
|
||||
Ok(Box::new(stream))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, C> Transport for UpgradedNode<T, C>
|
||||
where T: Transport + 'static,
|
||||
C: ConnectionUpgrade<T::RawConn> + 'static,
|
||||
C::Output: AsyncRead + AsyncWrite,
|
||||
C::NamesIter: Clone, // TODO: not elegant
|
||||
C: Clone
|
||||
{
|
||||
type RawConn = C::Output;
|
||||
type Listener = Box<Stream<Item = C::Output, Error = IoError>>;
|
||||
type Dial = Box<Future<Item = C::Output, Error = IoError>>;
|
||||
|
||||
#[inline]
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)>
|
||||
where Self: Sized
|
||||
{
|
||||
self.listen_on(addr)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
|
||||
where Self: Sized
|
||||
{
|
||||
self.dial(addr)
|
||||
}
|
||||
}
|
@ -4,7 +4,7 @@ version = "0.1.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
|
||||
[dependencies]
|
||||
libp2p-transport = { path = "../libp2p-transport" }
|
||||
libp2p-swarm = { path = "../libp2p-swarm" }
|
||||
futures = "0.1"
|
||||
multiaddr = "0.2.0"
|
||||
tokio-core = "0.1"
|
||||
|
@ -1,4 +1,26 @@
|
||||
extern crate libp2p_transport as transport;
|
||||
// Copyright 2017 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any person obtaining a
|
||||
// copy of this software and associated documentation files (the "Software"),
|
||||
// to deal in the Software without restriction, including without limitation
|
||||
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
// and/or sell copies of the Software, and to permit persons to whom the
|
||||
// Software is furnished to do so, subject to the following conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice shall be included in
|
||||
// all copies or substantial portions of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! Implementation of the libp2p `Transport` trait for TCP/IP.
|
||||
|
||||
extern crate libp2p_swarm as swarm;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio_io;
|
||||
extern crate multiaddr;
|
||||
@ -6,20 +28,25 @@ extern crate futures;
|
||||
|
||||
use std::io::Error as IoError;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use tokio_core::reactor::Core;
|
||||
use tokio_core::reactor::Handle;
|
||||
use tokio_core::net::{TcpStream, TcpListener, TcpStreamNew};
|
||||
use futures::Future;
|
||||
use futures::stream::Stream;
|
||||
use multiaddr::{Multiaddr, Protocol};
|
||||
use transport::Transport;
|
||||
use swarm::Transport;
|
||||
|
||||
/// Represents a TCP/IP transport capability for libp2p.
|
||||
///
|
||||
/// Each `Tcp` struct is tied to a tokio reactor. The TCP sockets created by libp2p will need to
|
||||
/// be progressed by running the futures and streams obtained by libp2p through the tokio reactor.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Tcp {
|
||||
pub event_loop: Core,
|
||||
event_loop: Handle,
|
||||
}
|
||||
|
||||
impl Tcp {
|
||||
pub fn new() -> Result<Tcp, IoError> {
|
||||
Ok(Tcp { event_loop: Core::new()? })
|
||||
pub fn new(handle: Handle) -> Result<Tcp, IoError> {
|
||||
Ok(Tcp { event_loop: handle })
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,11 +62,11 @@ impl Transport for Tcp {
|
||||
|
||||
/// Listen on the given multi-addr.
|
||||
/// Returns the address back if it isn't supported.
|
||||
fn listen_on(&mut self, addr: Multiaddr) -> Result<Self::Listener, Multiaddr> {
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)> {
|
||||
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
|
||||
Ok(Box::new(
|
||||
futures::future::result(
|
||||
TcpListener::bind(&socket_addr, &self.event_loop.handle()),
|
||||
TcpListener::bind(&socket_addr, &self.event_loop),
|
||||
).map(|listener| {
|
||||
// Pull out a stream of sockets for incoming connections
|
||||
listener.incoming().map(|x| x.0)
|
||||
@ -47,18 +74,18 @@ impl Transport for Tcp {
|
||||
.flatten_stream(),
|
||||
))
|
||||
} else {
|
||||
Err(addr)
|
||||
Err((self, addr))
|
||||
}
|
||||
}
|
||||
|
||||
/// Dial to the given multi-addr.
|
||||
/// Returns either a future which may resolve to a connection,
|
||||
/// or gives back the multiaddress.
|
||||
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, Multiaddr> {
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
|
||||
Ok(TcpStream::connect(&socket_addr, &self.event_loop.handle()))
|
||||
Ok(TcpStream::connect(&socket_addr, &self.event_loop))
|
||||
} else {
|
||||
Err(addr)
|
||||
Err((self, addr))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -98,11 +125,12 @@ mod tests {
|
||||
use super::{Tcp, multiaddr_to_socketaddr};
|
||||
use std;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
use tokio_core::reactor::Core;
|
||||
use tokio_io;
|
||||
use futures::Future;
|
||||
use futures::stream::Stream;
|
||||
use multiaddr::Multiaddr;
|
||||
use transport::Transport;
|
||||
use swarm::Transport;
|
||||
|
||||
#[test]
|
||||
fn multiaddr_to_tcp_conversion() {
|
||||
@ -156,9 +184,10 @@ mod tests {
|
||||
use std::io::Write;
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let mut core = Core::new().unwrap();
|
||||
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap();
|
||||
let mut tcp = Tcp::new().unwrap();
|
||||
let handle = tcp.event_loop.handle();
|
||||
let tcp = Tcp::new(core.handle()).unwrap();
|
||||
let handle = core.handle();
|
||||
let listener = tcp.listen_on(addr).unwrap().for_each(|sock| {
|
||||
// Define what to do with the socket that just connected to us
|
||||
// Which in this case is read 3 bytes
|
||||
@ -172,11 +201,12 @@ mod tests {
|
||||
Ok(())
|
||||
});
|
||||
|
||||
tcp.event_loop.run(listener).unwrap();
|
||||
core.run(listener).unwrap();
|
||||
});
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap();
|
||||
let mut tcp = Tcp::new().unwrap();
|
||||
let mut core = Core::new().unwrap();
|
||||
let tcp = Tcp::new(core.handle()).unwrap();
|
||||
// Obtain a future socket through dialing
|
||||
let socket = tcp.dial(addr.clone()).unwrap();
|
||||
// Define what to do with the socket once it's obtained
|
||||
@ -188,7 +218,7 @@ mod tests {
|
||||
Err(x) => Err(x),
|
||||
});
|
||||
// Execute the future in our event loop
|
||||
tcp.event_loop.run(action).unwrap();
|
||||
core.run(action).unwrap();
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
}
|
||||
}
|
||||
|
@ -1,9 +0,0 @@
|
||||
[package]
|
||||
name = "libp2p-transport"
|
||||
version = "0.1.0"
|
||||
authors = ["Parity Technologies <admin@parity.io>"]
|
||||
|
||||
[dependencies]
|
||||
multiaddr = "0.2.0"
|
||||
futures = "0.1"
|
||||
tokio-io = "0.1"
|
@ -1,62 +0,0 @@
|
||||
//! Transport and I/O primitives for libp2p.
|
||||
|
||||
extern crate futures;
|
||||
extern crate tokio_io;
|
||||
|
||||
/// Multi-address re-export.
|
||||
pub extern crate multiaddr;
|
||||
|
||||
use multiaddr::Multiaddr;
|
||||
use futures::{IntoFuture, Future};
|
||||
use futures::stream::Stream;
|
||||
use std::io::Error as IoError;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
||||
// Something more strongly-typed?
|
||||
pub type ProtocolId = String;
|
||||
pub type PeerId = String;
|
||||
|
||||
/// A logical wire between us and a peer. We can read and write through this asynchronously.
|
||||
///
|
||||
/// You can have multiple `Socket`s between you and any given peer.
|
||||
pub trait Socket: AsyncRead + AsyncWrite + Sized {
|
||||
type Conn: Conn<Socket = Self>;
|
||||
|
||||
/// Get the protocol ID this socket uses.
|
||||
fn protocol_id(&self) -> ProtocolId;
|
||||
|
||||
/// Access the underlying connection.
|
||||
fn conn(&self) -> &Self::Conn;
|
||||
}
|
||||
|
||||
/// A connection between you and a peer.
|
||||
pub trait Conn {
|
||||
/// The socket type this connection manages.
|
||||
type Socket;
|
||||
type SocketFuture: IntoFuture<Item = Self::Socket, Error = IoError>;
|
||||
|
||||
/// Initiate a socket between you and the peer on the given protocol.
|
||||
fn make_socket(&self, proto: ProtocolId) -> Self::SocketFuture;
|
||||
}
|
||||
|
||||
/// A transport is a stream producing incoming connections.
|
||||
/// These are transports or wrappers around them.
|
||||
pub trait Transport {
|
||||
/// The raw connection.
|
||||
type RawConn: AsyncRead + AsyncWrite;
|
||||
|
||||
/// The listener produces incoming connections.
|
||||
type Listener: Stream<Item = Self::RawConn>;
|
||||
|
||||
/// A future which indicates currently dialing to a peer.
|
||||
type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
|
||||
|
||||
/// Listen on the given multi-addr.
|
||||
/// Returns the address back if it isn't supported.
|
||||
fn listen_on(&mut self, addr: Multiaddr) -> Result<Self::Listener, Multiaddr>;
|
||||
|
||||
/// Dial to the given multi-addr.
|
||||
/// Returns either a future which may resolve to a connection,
|
||||
/// or gives back the multiaddress.
|
||||
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, Multiaddr>;
|
||||
}
|
Reference in New Issue
Block a user