Merge remote-tracking branch 'upstream/master' into muxer-trait

This commit is contained in:
Pierre Krieger
2017-12-04 11:38:38 +01:00
8 changed files with 270 additions and 39 deletions

26
example/README.md Normal file
View File

@@ -0,0 +1,26 @@
# Examples
Running one of the examples:
```sh
cargo run --example <name-of-the-example>
```
The follow examples exist:
- `echo-dialer` will attempt to connect to `/ip4/127.0.0.1/tcp/10333`, negotiate the `/echo/1.0.0`
protocol, then send the `"hello world"` message. Compatible with the `echo-server` example.
- `echo-server` will listen on `/ip4/0.0.0.0/tcp/10333`, negotiate the `/echo/1.0.0` protocol with
each incoming connection, then send back any entering message. Compatible with the `echo-dialer`
example.
## How the keys were generated
The keys used in the examples were generated like this:
```sh
openssl genrsa -out private.pem 2048
openssl rsa -in private.pem -outform DER -pubout -out public.der
openssl pkcs8 -in private.pem -topk8 -nocrypt -out private.pk8
rm private.pem # optional
```

View File

@@ -65,7 +65,7 @@ fn main() {
let future = with_echo.listen_on(swarm::multiaddr::Multiaddr::new("/ip4/0.0.0.0/tcp/10333").unwrap())
.map_err(|_| panic!())
.unwrap()
.unwrap().0
.for_each(|(socket, _)| {
loop_fn(socket, |socket| {
socket.into_future()

View File

@@ -1 +0,0 @@
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAw3Dq8sdKZ/Lq0AkCFRB+ywQXpubvHgscR+WyVV4tdQE+0OcJSC5hx5W+XLR/y21PTe/30f0oYP7oJv8rH2Mov1Gvops2l6efVqPA8ZggDRrAkotjLXXJggDimIGichRS9+izNi/Lit77H2bFLmlkTfrFOjibWrPP+XvoYRFN3B1gyUT5P1hARePlbb86dcd1e5l/x/lBDH7DJ+TxsY7li6HjgvlxK4jAXa9yzdkDvJOpScs+la7gGawwesDKoQ5dWyqlgT93cbXhwOHTUvownl0hwtYjiK9UGWW8ptn9/3ehYAyi6Kx/SqLJsXiJFlPg16KNunGBHL7VAFyYZ51NEwIDAQAB

View File

@@ -88,9 +88,9 @@ impl<T, C> Transport for ConnectionReuse<T, C>
type Listener = ConnectionReuseListener<Box<Stream<Item = (C::Output, Multiaddr), Error = IoError>>, C::Output>;
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)> {
let listener = match self.inner.listen_on(addr.clone()) {
Ok(l) => l,
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (listener, new_addr) = match self.inner.listen_on(addr.clone()) {
Ok((l, a)) => (l, a),
Err((inner, addr)) => {
return Err((ConnectionReuse {
inner: inner,
@@ -98,10 +98,12 @@ impl<T, C> Transport for ConnectionReuse<T, C>
}
};
Ok(ConnectionReuseListener {
let listener = ConnectionReuseListener {
listener: listener.fuse(),
connections: Vec::new(),
})
};
Ok((listener, new_addr))
}
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {

View File

@@ -58,10 +58,16 @@ pub trait Transport {
/// A future which indicates that we are currently dialing to a peer.
type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
/// Listen on the given multi-addr.
/// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified
/// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised
/// to other nodes, instead of the one passed as parameter.
///
/// Returns the address back if it isn't supported.
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)>
///
/// > **Note**: The reason why we need to change the `Multiaddr` on success is to handle
/// > situations such as turning `/ip4/127.0.0.1/tcp/0` into
/// > `/ip4/127.0.0.1/tcp/<actual port>`.
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where Self: Sized;
/// Dial to the given multi-addr.
@@ -108,7 +114,7 @@ impl Transport for DeniedTransport {
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)> {
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
Err((DeniedTransport, addr))
}
@@ -133,14 +139,14 @@ impl<A, B> Transport for OrTransport<A, B>
<B::Dial as IntoFuture>::Future,
>;
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)> {
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (first, addr) = match self.0.listen_on(addr) {
Ok(connec) => return Ok(EitherStream::First(connec)),
Ok((connec, addr)) => return Ok((EitherStream::First(connec), addr)),
Err(err) => err,
};
match self.1.listen_on(addr) {
Ok(connec) => Ok(EitherStream::Second(connec)),
Ok((connec, addr)) => Ok((EitherStream::Second(connec), addr)),
Err((second, addr)) => Err((OrTransport(first, second), addr)),
}
}
@@ -543,14 +549,14 @@ impl<'a, T, C> UpgradedNode<T, C>
pub fn listen_on(
self,
addr: Multiaddr,
) -> Result<Box<Stream<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, (Self, Multiaddr)>
) -> Result<(Box<Stream<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, Multiaddr), (Self, Multiaddr)>
where C::NamesIter: Clone, // TODO: not elegant
C: Clone
{
let upgrade = self.upgrade;
let listening_stream = match self.transports.listen_on(addr) {
Ok(l) => l,
let (listening_stream, new_addr) = match self.transports.listen_on(addr) {
Ok((l, new_addr)) => (l, new_addr),
Err((trans, addr)) => {
let builder = UpgradedNode {
transports: trans,
@@ -580,7 +586,7 @@ impl<'a, T, C> UpgradedNode<T, C>
upgrade.upgrade(connection, upgrade_id).map(|c| (c, client_addr))
});
Ok(Box::new(stream))
Ok((Box::new(stream), new_addr))
}
}
@@ -596,7 +602,7 @@ impl<T, C> Transport for UpgradedNode<T, C>
type Dial = Box<Future<Item = C::Output, Error = IoError>>;
#[inline]
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)>
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where Self: Sized
{
self.listen_on(addr)

View File

@@ -62,12 +62,21 @@ impl Transport for Tcp {
/// Listen on the given multi-addr.
/// Returns the address back if it isn't supported.
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, (Self, Multiaddr)> {
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
Ok(Box::new(
futures::future::result(
TcpListener::bind(&socket_addr, &self.event_loop),
).map(|listener| {
let listener = TcpListener::bind(&socket_addr, &self.event_loop);
// We need to build the `Multiaddr` to return from this function. If an error happened,
// just return the original multiaddr.
let new_addr = match listener {
Ok(ref l) => if let Ok(new_s_addr) = l.local_addr() {
new_s_addr.to_multiaddr().expect("multiaddr generated from socket addr is \
always valid")
} else {
addr
}
Err(_) => addr,
};
let future = futures::future::result(listener).map(|listener| {
// Pull out a stream of sockets for incoming connections
listener.incoming().map(|(sock, addr)| {
let addr = addr.to_multiaddr()
@@ -75,8 +84,8 @@ impl Transport for Tcp {
(sock, addr)
})
})
.flatten_stream(),
))
.flatten_stream();
Ok((Box::new(future), new_addr))
} else {
Err((self, addr))
}
@@ -95,7 +104,7 @@ impl Transport for Tcp {
}
// This type of logic should probably be moved into the multiaddr package
fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, &Multiaddr> {
fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
let protocols = addr.protocol();
// TODO: This is nonconforming (since a multiaddr could specify TCP first) but we can't fix that
@@ -118,9 +127,9 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, &Multiaddr> {
));
}
}
Err(addr)
Err(())
}
_ => Err(addr),
_ => Err(()),
}
}
@@ -192,7 +201,7 @@ mod tests {
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap();
let tcp = Tcp::new(core.handle()).unwrap();
let handle = core.handle();
let listener = tcp.listen_on(addr).unwrap().for_each(|(sock, _)| {
let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| {
// Define what to do with the socket that just connected to us
// Which in this case is read 3 bytes
let handle_conn = tokio_io::io::read_exact(sock, [0; 3])
@@ -225,4 +234,16 @@ mod tests {
core.run(action).unwrap();
std::thread::sleep(std::time::Duration::from_millis(100));
}
#[test]
fn replace_port_0_in_returned_multiaddr() {
let core = Core::new().unwrap();
let tcp = Tcp::new(core.handle()).unwrap();
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/0").unwrap();
assert!(addr.to_string().contains("tcp/0"));
let (_, new_addr) = tcp.listen_on(addr).unwrap();
assert!(!new_addr.to_string().contains("tcp/0"));
}
}

View File

@@ -0,0 +1,92 @@
# Multistream-select
This crate implements the `multistream-select` protocol, which is the protocol used by libp2p
to negotiate which protocol to use with the remote.
> **Note**: This crate is used by the internals of *libp2p*, and it is not required to
> understand it in order to use *libp2p*.
Whenever a new connection or a new multiplexed substream is opened, libp2p uses
`multistream-select` to negotiate with the remote which protocol to use. After a protocol has
been successfully negotiated, the stream (ie. the connection or the multiplexed substream)
immediately stops using `multistream-select` and starts using the negotiated protocol.
## Protocol explanation
The dialer has two options available: either request the list of protocols that the listener
supports, or suggest a protocol. If a protocol is suggested, the listener can either accept (by
answering with the same protocol name) or refuse the choice (by answering "not available").
## Examples
For a dialer:
```rust
extern crate bytes;
extern crate futures;
extern crate multistream_select;
extern crate tokio_core;
use bytes::Bytes;
use multistream_select::dialer_select_proto;
use futures::{Future, Sink, Stream};
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
let mut core = Core::new().unwrap();
#[derive(Debug, Copy, Clone)]
enum MyProto { Echo, Hello }
let client = TcpStream::connect(&"127.0.0.1:10333".parse().unwrap(), &core.handle())
.from_err()
.and_then(move |connec| {
let protos = vec![
(Bytes::from("/echo/1.0.0"), <Bytes as PartialEq>::eq, MyProto::Echo),
(Bytes::from("/hello/2.5.0"), <Bytes as PartialEq>::eq, MyProto::Hello),
]
.into_iter();
dialer_select_proto(connec, protos).map(|r| r.0)
});
let negotiated_protocol: MyProto = core.run(client).expect("failed to find a protocol");
println!("negotiated: {:?}", negotiated_protocol);
```
For a listener:
```rust
extern crate bytes;
extern crate futures;
extern crate multistream_select;
extern crate tokio_core;
use bytes::Bytes;
use multistream_select::listener_select_proto;
use futures::{Future, Sink, Stream};
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
let mut core = Core::new().unwrap();
#[derive(Debug, Copy, Clone)]
enum MyProto { Echo, Hello }
let server = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap()
.incoming()
.from_err()
.and_then(move |(connec, _)| {
let protos = vec![
(Bytes::from("/echo/1.0.0"), <Bytes as PartialEq>::eq, MyProto::Echo),
(Bytes::from("/hello/2.5.0"), <Bytes as PartialEq>::eq, MyProto::Hello),
]
.into_iter();
listener_select_proto(connec, protos)
})
.for_each(|(proto, _connec)| {
println!("new remote with {:?} negotiated", proto);
Ok(())
});
core.run(server).expect("failed to run server");
```

View File

@@ -18,20 +18,105 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
// TODO: use this once stable ; for now we just copy-paste the content of the README.md
//#![doc(include = "../README.md")]
//! # Multistream-select
//!
//! Multistream-select is the "main" protocol of libp2p.
//! Whenever a connection opens between two peers, it starts talking in `multistream-select`.
//! This crate implements the `multistream-select` protocol, which is the protocol used by libp2p
//! to negotiate which protocol to use with the remote.
//!
//! The purpose of `multistream-select` is to choose which protocol we are going to use. As soon as
//! both sides agree on a given protocol, the socket immediately starts using it and multistream is
//! no longer relevant.
//! > **Note**: This crate is used by the internals of *libp2p*, and it is not required to
//! > understand it in order to use *libp2p*.
//!
//! However note that `multistream-select` is also sometimes used on top of another protocol such
//! as secio or multiplex. For example, two hosts can use `multistream-select` to decide to use
//! secio, then use `multistream-select` again (wrapped inside `secio`) to decide to use
//! `multiplex`, then use `multistream-select` one more time (wrapped inside `secio` and
//! `multiplex`) to decide to use the final actual protocol.
//! Whenever a new connection or a new multiplexed substream is opened, libp2p uses
//! `multistream-select` to negotiate with the remote which protocol to use. After a protocol has
//! been successfully negotiated, the stream (ie. the connection or the multiplexed substream)
//! immediately stops using `multistream-select` and starts using the negotiated protocol.
//!
//! ## Protocol explanation
//!
//! The dialer has two options available: either request the list of protocols that the listener
//! supports, or suggest a protocol. If a protocol is suggested, the listener can either accept (by
//! answering with the same protocol name) or refuse the choice (by answering "not available").
//!
//! ## Examples
//!
//! For a dialer:
//!
//! ```no_run
//! extern crate bytes;
//! extern crate futures;
//! extern crate multistream_select;
//! extern crate tokio_core;
//!
//! # fn main() {
//! use bytes::Bytes;
//! use multistream_select::dialer_select_proto;
//! use futures::{Future, Sink, Stream};
//! use tokio_core::net::TcpStream;
//! use tokio_core::reactor::Core;
//!
//! let mut core = Core::new().unwrap();
//!
//! #[derive(Debug, Copy, Clone)]
//! enum MyProto { Echo, Hello }
//!
//! let client = TcpStream::connect(&"127.0.0.1:10333".parse().unwrap(), &core.handle())
//! .from_err()
//! .and_then(move |connec| {
//! let protos = vec![
//! (Bytes::from("/echo/1.0.0"), <Bytes as PartialEq>::eq, MyProto::Echo),
//! (Bytes::from("/hello/2.5.0"), <Bytes as PartialEq>::eq, MyProto::Hello),
//! ]
//! .into_iter();
//! dialer_select_proto(connec, protos).map(|r| r.0)
//! });
//!
//! let negotiated_protocol: MyProto = core.run(client).expect("failed to find a protocol");
//! println!("negotiated: {:?}", negotiated_protocol);
//! # }
//! ```
//!
//! For a listener:
//!
//! ```no_run
//! extern crate bytes;
//! extern crate futures;
//! extern crate multistream_select;
//! extern crate tokio_core;
//!
//! # fn main() {
//! use bytes::Bytes;
//! use multistream_select::listener_select_proto;
//! use futures::{Future, Sink, Stream};
//! use tokio_core::net::TcpListener;
//! use tokio_core::reactor::Core;
//!
//! let mut core = Core::new().unwrap();
//!
//! #[derive(Debug, Copy, Clone)]
//! enum MyProto { Echo, Hello }
//!
//! let server = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap()
//! .incoming()
//! .from_err()
//! .and_then(move |(connec, _)| {
//! let protos = vec![
//! (Bytes::from("/echo/1.0.0"), <Bytes as PartialEq>::eq, MyProto::Echo),
//! (Bytes::from("/hello/2.5.0"), <Bytes as PartialEq>::eq, MyProto::Hello),
//! ]
//! .into_iter();
//! listener_select_proto(connec, protos)
//! })
//! .for_each(|(proto, _connec)| {
//! println!("new remote with {:?} negotiated", proto);
//! Ok(())
//! });
//!
//! core.run(server).expect("failed to run server");
//! # }
//! ```
extern crate bytes;
extern crate futures;