From 2339bfd4d30668d8cbba3b8d32f0305cd03be8cb Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 3 Jan 2018 14:23:03 +0100 Subject: [PATCH 01/12] Fix the documentation libp2p-swarm --- libp2p-swarm/src/lib.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index 31e2fc66..59e1fb6d 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -48,11 +48,12 @@ //! The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on //! transports that can receive incoming connections on streams that have been opened with `dial()`. //! -//! The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of -//! incoming connections. +//! The trait provides the `next_incoming()` method, which returns a future that will resolve to +//! the next substream that arrives from a dialed node. //! //! > **Note**: This trait is mainly implemented for transports that provide stream muxing -//! > capabilities. +//! > capabilities, but it can also be implemented in a dummy way by returning an empty +//! > iterator. //! //! # Connection upgrades //! @@ -78,7 +79,7 @@ //! `Transport` trait. The return value of this method also implements the `Transport` trait, which //! means that you can call `dial()` and `listen_on()` on it in order to directly obtain an //! upgraded connection or a listener that will yield upgraded connections. Similarly, the -//! `dial_and_listen()` method will automatically apply the upgrade on both the dialer and the +//! `next_incoming()` method will automatically apply the upgrade on both the dialer and the //! listener. An error is produced if the remote doesn't support the protocol corresponding to the //! connection upgrade. //! @@ -123,7 +124,7 @@ //! transport. //! //! However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named -//! `dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`, +//! `dial`, `listen_on`, and `next_incoming`, which will yield you a `Future` or a `Stream`, //! which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific //! way to use the protocol. //! From 641f8a09d7e4c4d5a3f6fadbbf4b8a0f99e6e462 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 3 Jan 2018 14:19:24 +0100 Subject: [PATCH 02/12] Implement swarm --- example/examples/echo-server.rs | 97 ++++++-------- libp2p-swarm/README.md | 57 ++++++-- libp2p-swarm/src/lib.rs | 45 ++++++- libp2p-swarm/src/swarm.rs | 226 ++++++++++++++++++++++++++++++++ libp2p-swarm/src/transport.rs | 59 +++++++++ 5 files changed, 416 insertions(+), 68 deletions(-) create mode 100644 libp2p-swarm/src/swarm.rs diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index 4334bef0..e32f4537 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -78,70 +78,55 @@ fn main() { // successfully negotiated. The parameter is the raw socket (implements the AsyncRead // and AsyncWrite traits), and the closure must return an implementation of // `IntoFuture` that can yield any type of object. - Ok(length_delimited::Framed::new(socket)) + Ok(length_delimited::Framed::<_, bytes::BytesMut>::new(socket)) })); // We now have a `transport` variable that can be used either to dial nodes or listen to // incoming connections, and that will automatically apply all the selected protocols on top // of any opened stream. - // We use it to listen on the address. - let (listener, address) = transport + // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and + // outgoing connections for us. + let (swarm_controller, swarm_future) = swarm::swarm(transport, |socket, client_addr| { + println!("Successfully negotiated protocol with {}", client_addr); + + // We loop forever in order to handle all the messages sent by the client. + loop_fn(socket, move |socket| { + let client_addr = client_addr.clone(); + socket + .into_future() + .map_err(|(e, _)| e) + .and_then(move |(msg, rest)| { + if let Some(msg) = msg { + // One message has been received. We send it back to the client. + println!("Received a message from {}: {:?}\n => Sending back \ + identical message to remote", client_addr, msg); + Box::new(rest.send(msg).map(|m| Loop::Continue(m))) + as Box> + } else { + // End of stream. Connection closed. Breaking the loop. + println!("Received EOF from {}\n => Dropping connection", + client_addr); + Box::new(Ok(Loop::Break(())).into_future()) + as Box> + } + }) + }) + }); + + // We now use the controller to listen on the address. + let address = swarm_controller .listen_on(swarm::Multiaddr::new(&listen_addr).expect("invalid multiaddr")) // If the multiaddr protocol exists but is not supported, then we get an error containing - // the transport and the original multiaddress. Therefore we cannot directly use `unwrap()` - // or `expect()`, but have to add a `map_err()` beforehand. - .map_err(|(_, addr)| addr).expect("unsupported multiaddr"); + // the original multiaddress. + .expect("unsupported multiaddr"); + // The address we actually listen on can be different from the address that was passed to + // the `listen_on` function. For example if you pass `/ip4/0.0.0.0/tcp/0`, then the port `0` + // will be replaced with the actual port. println!("Now listening on {:?}", address); - let future = listener - .for_each(|(socket, client_addr)| { - // This closure is called whenever a new connection has been received. - // `socket` is a future that will be triggered once the upgrade to secio, multiplex - // and echo is complete. - let client_addr = client_addr.to_string(); - println!("Incoming connection from {}", client_addr); - - socket - .and_then(move |socket| { - println!("Successfully negotiated protocol with {}", client_addr); - - // We loop forever in order to handle all the messages sent by the client. - loop_fn(socket, move |socket| { - let client_addr = client_addr.clone(); - socket.into_future() - .map_err(|(err, _)| err) - .and_then(move |(msg, rest)| { - if let Some(msg) = msg { - // One message has been received. We send it back to the client. - println!("Received a message from {}: {:?}\n => Sending back \ - identical message to remote", client_addr, msg); - Box::new(rest.send(msg).map(|m| Loop::Continue(m))) - as Box> - } else { - // End of stream. Connection closed. Breaking the loop. - println!("Received EOF from {}\n => Dropping connection", - client_addr); - Box::new(Ok(Loop::Break(())).into_future()) - as Box> - } - }) - }) - }) - - // We absorb errors from the future so that an error while processing a client - // (eg. if the client unexpectedly disconnects) doesn't propagate and stop the - // entire server. - .then(move |res| { - if let Err(err) = res { - println!("Error while processing client: {:?}", err); - } - Ok(()) - }) - }); - - // `future` is a future that contains all the behaviour that we want, but nothing has actually - // started yet. Because we created the `TcpConfig` with tokio, we need to run the future - // through the tokio core. - core.run(future).unwrap(); + // `swarm_future` is a future that contains all the behaviour that we want, but nothing has + // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the + // future through the tokio core. + core.run(swarm_future).unwrap(); } diff --git a/libp2p-swarm/README.md b/libp2p-swarm/README.md index 1c6f02fc..bfe02a29 100644 --- a/libp2p-swarm/README.md +++ b/libp2p-swarm/README.md @@ -1,8 +1,9 @@ # libp2p-swarm -Transport and protocol upgrade system of *libp2p*. +Transport, protocol upgrade and swarm systems of *libp2p*. -This crate contains all the core traits and mechanisms of the transport system of *libp2p*. +This crate contains all the core traits and mechanisms of the transport and swarm systems +of *libp2p*. # The `Transport` trait @@ -27,11 +28,12 @@ multiple times in a row in order to chain as many implementations as you want. The `MuxedTransport` trait is an extension to the `Transport` trait, and is implemented on transports that can receive incoming connections on streams that have been opened with `dial()`. -The trait provides the `dial_and_listen()` method, which returns both a dialer and a stream of -incoming connections. +The trait provides the `next_incoming()` method, which returns a future that will resolve to +the next substream that arrives from a dialed node. > **Note**: This trait is mainly implemented for transports that provide stream muxing -> capabilities. +> capabilities, but it can also be implemented in a dummy way by returning an empty +> iterator. # Connection upgrades @@ -57,7 +59,7 @@ A middleware can be applied on a transport by using the `with_upgrade` method of `Transport` trait. The return value of this method also implements the `Transport` trait, which means that you can call `dial()` and `listen_on()` on it in order to directly obtain an upgraded connection or a listener that will yield upgraded connections. Similarly, the -`dial_and_listen()` method will automatically apply the upgrade on both the dialer and the +`next_incoming()` method will automatically apply the upgrade on both the dialer and the listener. An error is produced if the remote doesn't support the protocol corresponding to the connection upgrade. @@ -100,11 +102,11 @@ implement the `AsyncRead` and `AsyncWrite` traits. This means that that the retu transport. However the `UpgradedNode` struct returned by `with_upgrade` still provides methods named -`dial`, `listen_on`, and `dial_and_listen`, which will yield you a `Future` or a `Stream`, +`dial`, `listen_on`, and `next_incoming`, which will yield you a `Future` or a `Stream`, which you can use to obtain the `Output`. This `Output` can then be used in a protocol-specific way to use the protocol. -```no_run +```rust extern crate futures; extern crate libp2p_ping; extern crate libp2p_swarm; @@ -115,7 +117,6 @@ use futures::Future; use libp2p_ping::Ping; use libp2p_swarm::Transport; -# fn main() { let mut core = tokio_core::reactor::Core::new().unwrap(); let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) @@ -130,7 +131,6 @@ let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) // Runs until the ping arrives. core.run(ping_finished_future).unwrap(); -# } ``` ## Grouping protocols @@ -138,3 +138,40 @@ core.run(ping_finished_future).unwrap(); You can use the `.or_upgrade()` method to group multiple upgrades together. The return value also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the ones supported. + +# Swarm + +Once you have created an object that implements the `Transport` trait, you can put it in a +*swarm*. This is done by calling the `swarm()` freestanding function with the transport +alongside with a function or a closure that will turn the output of the upgrade (usually an +actual protocol, as explained above) into a `Future` producing `()`. + +```rust +extern crate futures; +extern crate libp2p_ping; +extern crate libp2p_swarm; +extern crate libp2p_tcp_transport; +extern crate tokio_core; + +use futures::Future; +use libp2p_ping::Ping; +use libp2p_swarm::Transport; + +let mut core = tokio_core::reactor::Core::new().unwrap(); + +let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) + .with_dummy_muxing() + .with_upgrade(Ping); + +let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, |(mut pinger, service), client_addr| { + pinger.ping().map_err(|_| panic!()) + .select(service).map_err(|_| panic!()) + .map(|_| ()) +}); + +// The `swarm_controller` can then be used to do some operations. +swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); + +// Runs until everything is finished. +core.run(swarm_future).unwrap(); +``` diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index 59e1fb6d..e96e77ea 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -21,9 +21,10 @@ // TODO: use this once stable ; for now we just copy-paste the content of the README.md //#![doc(include = "../README.md")] -//! Transport and protocol upgrade system of *libp2p*. +//! Transport, protocol upgrade and swarm systems of *libp2p*. //! -//! This crate contains all the core traits and mechanisms of the transport system of *libp2p*. +//! This crate contains all the core traits and mechanisms of the transport and swarm systems +//! of *libp2p*. //! //! # The `Transport` trait //! @@ -163,6 +164,44 @@ //! also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the //! ones supported. //! +//! # Swarm +//! +//! Once you have created an object that implements the `Transport` trait, you can put it in a +//! *swarm*. This is done by calling the `swarm()` freestanding function with the transport +//! alongside with a function or a closure that will turn the output of the upgrade (usually an +//! actual protocol, as explained above) into a `Future` producing `()`. +//! +//! ```no_run +//! extern crate futures; +//! extern crate libp2p_ping; +//! extern crate libp2p_swarm; +//! extern crate libp2p_tcp_transport; +//! extern crate tokio_core; +//! +//! use futures::Future; +//! use libp2p_ping::Ping; +//! use libp2p_swarm::Transport; +//! +//! # fn main() { +//! let mut core = tokio_core::reactor::Core::new().unwrap(); +//! +//! let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) +//! .with_dummy_muxing() +//! .with_upgrade(Ping); +//! +//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, |(mut pinger, service), client_addr| { +//! pinger.ping().map_err(|_| panic!()) +//! .select(service).map_err(|_| panic!()) +//! .map(|_| ()) +//! }); +//! +//! // The `swarm_controller` can then be used to do some operations. +//! swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); +//! +//! // Runs until everything is finished. +//! core.run(swarm_future).unwrap(); +//! # } +//! ``` extern crate bytes; #[macro_use] @@ -176,11 +215,13 @@ extern crate tokio_io; pub extern crate multiaddr; mod connection_reuse; +pub mod swarm; pub mod muxing; pub mod transport; pub use self::connection_reuse::ConnectionReuse; pub use self::multiaddr::Multiaddr; pub use self::muxing::StreamMuxer; +pub use self::swarm::{swarm, SwarmController, SwarmFuture}; pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade}; pub use self::transport::{Endpoint, SimpleProtocol, MuxedTransport, UpgradeExt}; diff --git a/libp2p-swarm/src/swarm.rs b/libp2p-swarm/src/swarm.rs new file mode 100644 index 00000000..95d9bb34 --- /dev/null +++ b/libp2p-swarm/src/swarm.rs @@ -0,0 +1,226 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::io::Error as IoError; +use futures::{IntoFuture, Future, Stream, Async, Poll}; +use futures::sync::mpsc; +use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode}; + +/// Creates a swarm. +/// +/// Requires an upgraded transport, and a function or closure that will turn the upgrade into a +/// `Future` that produces a `()`. +/// +/// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to +/// control, and the `Future` must be driven to completion in order for things to work. +/// +pub fn swarm(upgraded: UpgradedNode, handler: H) + -> (SwarmController, SwarmFuture) + where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ + C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ + H: FnMut(C::Output, Multiaddr) -> F, + F: IntoFuture, +{ + let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded(); + let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded(); + + let future = SwarmFuture { + upgraded: upgraded.clone(), + handler: handler, + new_listeners: new_listeners_rx, + next_incoming: upgraded.clone().next_incoming(), + listeners: Vec::new(), + listeners_upgrade: Vec::new(), + dialers: Vec::new(), + new_dialers: new_dialers_rx, + to_process: Vec::new(), + }; + + let controller = SwarmController { + upgraded: upgraded, + new_listeners: new_listeners_tx, + new_dialers: new_dialers_tx, + }; + + (controller, future) +} + +/// Allows control of what the swarm is doing. +pub struct SwarmController + where T: MuxedTransport + 'static, // TODO: 'static :-/ + C: ConnectionUpgrade + 'static, // TODO: 'static :-/ +{ + upgraded: UpgradedNode, + new_listeners: mpsc::UnboundedSender>, Multiaddr), Error = IoError>>>, + new_dialers: mpsc::UnboundedSender<(Box>, Multiaddr)>, +} + +impl SwarmController + where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ + C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ + C::NamesIter: Clone, // TODO: not elegant +{ + /// Asks the swarm to dial the node with the given multiaddress. + /// + /// Once the connection has been open and upgraded, it will be given to the handler. + // TODO: consider returning a future so that errors can be processed? + pub fn dial(&self, multiaddr: Multiaddr) -> Result<(), Multiaddr> { + match self.upgraded.clone().dial(multiaddr.clone()) { + Ok(dial) => { + // Ignoring errors if the receiver has been closed, because in that situation + // nothing is going to be processed anyway. + let _ = self.new_dialers.unbounded_send((dial, multiaddr)); + Ok(()) + }, + Err((_, multiaddr)) => { + Err(multiaddr) + }, + } + } + + /// Adds a multiaddr to listen on. + pub fn listen_on(&self, multiaddr: Multiaddr) -> Result { + match self.upgraded.clone().listen_on(multiaddr) { + Ok((listener, new_addr)) => { + // Ignoring errors if the receiver has been closed, because in that situation + // nothing is going to be processed anyway. + let _ = self.new_listeners.unbounded_send(listener); + Ok(new_addr) + }, + Err((_, multiaddr)) => { + Err(multiaddr) + }, + } + } +} + +/// Future that must be driven to completion in order for the swarm to work. +pub struct SwarmFuture + where T: MuxedTransport + 'static, // TODO: 'static :-/ + C: ConnectionUpgrade + 'static, // TODO: 'static :-/ +{ + upgraded: UpgradedNode, + handler: H, + new_listeners: mpsc::UnboundedReceiver>, Multiaddr), Error = IoError>>>, + next_incoming: Box>, + listeners: Vec>, Multiaddr), Error = IoError>>>, + listeners_upgrade: Vec<(Box>, Multiaddr)>, + dialers: Vec<(Box>, Multiaddr)>, + new_dialers: mpsc::UnboundedReceiver<(Box>, Multiaddr)>, + to_process: Vec, +} + +impl Future for SwarmFuture + where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/, + C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ + H: FnMut(C::Output, Multiaddr) -> If, + If: IntoFuture, + F: Future, +{ + type Item = (); + type Error = IoError; + + fn poll(&mut self) -> Poll { + let handler = &mut self.handler; + + match self.next_incoming.poll() { + Ok(Async::Ready((connec, client_addr))) => { + self.next_incoming = self.upgraded.clone().next_incoming(); + self.to_process.push(handler(connec, client_addr).into_future()); + }, + Ok(Async::NotReady) => {}, + Err(err) => return Err(err), + }; + + match self.new_listeners.poll() { + Ok(Async::Ready(Some(new_listener))) => { + self.listeners.push(new_listener); + }, + Ok(Async::Ready(None)) | Err(_) => { + // New listener sender has been closed. + }, + Ok(Async::NotReady) => {}, + }; + + match self.new_dialers.poll() { + Ok(Async::Ready(Some((new_dialer, multiaddr)))) => { + self.dialers.push((new_dialer, multiaddr)); + }, + Ok(Async::Ready(None)) | Err(_) => { + // New dialers sender has been closed. + }, + Ok(Async::NotReady) => {}, + }; + + for n in (0 .. self.listeners.len()).rev() { + let mut listener = self.listeners.swap_remove(n); + match listener.poll() { + Ok(Async::Ready(Some((upgrade, client_addr)))) => { + self.listeners.push(listener); + self.listeners_upgrade.push((upgrade, client_addr)); + }, + Ok(Async::NotReady) => { + self.listeners.push(listener); + }, + Ok(Async::Ready(None)) => {}, + Err(err) => return Err(err), + }; + } + + for n in (0 .. self.listeners_upgrade.len()).rev() { + let (mut upgrade, addr) = self.listeners_upgrade.swap_remove(n); + match upgrade.poll() { + Ok(Async::Ready(output)) => { + self.to_process.push(handler(output, addr).into_future()); + }, + Ok(Async::NotReady) => { + self.listeners_upgrade.push((upgrade, addr)); + }, + Err(err) => return Err(err), + } + } + + for n in (0 .. self.dialers.len()).rev() { + let (mut dialer, addr) = self.dialers.swap_remove(n); + match dialer.poll() { + Ok(Async::Ready(output)) => { + self.to_process.push(handler(output, addr).into_future()); + }, + Ok(Async::NotReady) => { + self.dialers.push((dialer, addr)); + }, + Err(err) => return Err(err), + } + } + + for n in (0 .. self.to_process.len()).rev() { + let mut to_process = self.to_process.swap_remove(n); + match to_process.poll() { + Ok(Async::Ready(())) => {}, + Ok(Async::NotReady) => self.to_process.push(to_process), + Err(err) => return Err(err), + } + } + + // TODO: we never return `Ok(Ready)` because there's no way to know whether + // `next_incoming()` can produce anything more in the future + Ok(Async::NotReady) + } +} diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 2e959eb8..8099f988 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -118,6 +118,18 @@ pub trait Transport { upgrade: upgrade, } } + + /// Builds a dummy implementation of `MuxedTransport` that uses this transport. + /// + /// The resulting object will not actually use muxing. This means that dialing the same node + /// twice will result in two different connections instead of two substreams on the same + /// connection. + #[inline] + fn with_dummy_muxing(self) -> DummyMuxing + where Self: Sized + { + DummyMuxing { inner: self } + } } /// Extension trait for `Transport`. Implemented on structs that provide a `Transport` on which @@ -684,6 +696,53 @@ where } } +/// Dummy implementation of `MuxedTransport` that uses an inner `Transport`. +#[derive(Debug, Copy, Clone)] +pub struct DummyMuxing { + inner: T, +} + +impl MuxedTransport for DummyMuxing + where T: Transport +{ + type Incoming = future::Empty<(T::RawConn, Multiaddr), IoError>; + + fn next_incoming(self) -> Self::Incoming + where Self: Sized + { + future::empty() + } +} + +impl Transport for DummyMuxing + where T: Transport +{ + type RawConn = T::RawConn; + type Listener = T::Listener; + type ListenerUpgrade = T::ListenerUpgrade; + type Dial = T::Dial; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> + where + Self: Sized + { + self.inner.listen_on(addr).map_err(|(inner, addr)| { + (DummyMuxing { inner }, addr) + }) + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result + where + Self: Sized + { + self.inner.dial(addr).map_err(|(inner, addr)| { + (DummyMuxing { inner }, addr) + }) + } +} + /// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received /// connection. /// From 13ba95e282bd58c96691e795de87696e0e77edb2 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 3 Jan 2018 15:46:45 +0100 Subject: [PATCH 03/12] Separate between listener, dial_to_handler and dial_custom_handler --- example/examples/echo-server.rs | 29 ++++++------- libp2p-swarm/README.md | 5 +-- libp2p-swarm/src/lib.rs | 5 +-- libp2p-swarm/src/swarm.rs | 74 +++++++++++++++++++++++++++------ 4 files changed, 81 insertions(+), 32 deletions(-) diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index e32f4537..99e7aa62 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -68,28 +68,29 @@ fn main() { // `Transport` because the output of the upgrade is not a stream but a controller for // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into // a `Transport`. - .into_connection_reuse() - - // On top of both mutiplex and plaintext/secio, we use the "echo" protocol, which is a - // custom protocol just for this example. - // For this purpose, we create a `SimpleProtocol` struct. - .with_upgrade(SimpleProtocol::new("/echo/1.0.0", |socket| { - // This closure is called whenever a stream using the "echo" protocol has been - // successfully negotiated. The parameter is the raw socket (implements the AsyncRead - // and AsyncWrite traits), and the closure must return an implementation of - // `IntoFuture` that can yield any type of object. - Ok(length_delimited::Framed::<_, bytes::BytesMut>::new(socket)) - })); + .into_connection_reuse(); // We now have a `transport` variable that can be used either to dial nodes or listen to - // incoming connections, and that will automatically apply all the selected protocols on top + // incoming connections, and that will automatically apply secio and multiplex on top // of any opened stream. + // We now prepare the protocol that we are going to negotiate with nodes that open a connection + // or substream to our server. + let proto = SimpleProtocol::new("/echo/1.0.0", |socket| { + // This closure is called whenever a stream using the "echo" protocol has been + // successfully negotiated. The parameter is the raw socket (implements the AsyncRead + // and AsyncWrite traits), and the closure must return an implementation of + // `IntoFuture` that can yield any type of object. + Ok(length_delimited::Framed::<_, bytes::BytesMut>::new(socket)) + }); + // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and // outgoing connections for us. - let (swarm_controller, swarm_future) = swarm::swarm(transport, |socket, client_addr| { + let (swarm_controller, swarm_future) = swarm::swarm(transport, proto, |socket, client_addr| { println!("Successfully negotiated protocol with {}", client_addr); + // The type of `socket` is exactly what the closure of `SimpleProtocol` returns. + // We loop forever in order to handle all the messages sent by the client. loop_fn(socket, move |socket| { let client_addr = client_addr.clone(); diff --git a/libp2p-swarm/README.md b/libp2p-swarm/README.md index bfe02a29..cd13dde4 100644 --- a/libp2p-swarm/README.md +++ b/libp2p-swarm/README.md @@ -160,10 +160,9 @@ use libp2p_swarm::Transport; let mut core = tokio_core::reactor::Core::new().unwrap(); let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) - .with_dummy_muxing() - .with_upgrade(Ping); + .with_dummy_muxing(); -let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, |(mut pinger, service), client_addr| { +let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, Ping, |(mut pinger, service), client_addr| { pinger.ping().map_err(|_| panic!()) .select(service).map_err(|_| panic!()) .map(|_| ()) diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index e96e77ea..b7da8738 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -186,10 +186,9 @@ //! let mut core = tokio_core::reactor::Core::new().unwrap(); //! //! let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) -//! .with_dummy_muxing() -//! .with_upgrade(Ping); +//! .with_dummy_muxing(); //! -//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, |(mut pinger, service), client_addr| { +//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, Ping, |(mut pinger, service), client_addr| { //! pinger.ping().map_err(|_| panic!()) //! .select(service).map_err(|_| panic!()) //! .map(|_| ()) diff --git a/libp2p-swarm/src/swarm.rs b/libp2p-swarm/src/swarm.rs index 95d9bb34..8d1e27bd 100644 --- a/libp2p-swarm/src/swarm.rs +++ b/libp2p-swarm/src/swarm.rs @@ -19,7 +19,7 @@ // DEALINGS IN THE SOFTWARE. use std::io::Error as IoError; -use futures::{IntoFuture, Future, Stream, Async, Poll}; +use futures::{IntoFuture, Future, Stream, Async, Poll, future}; use futures::sync::mpsc; use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode}; @@ -31,7 +31,7 @@ use {ConnectionUpgrade, Multiaddr, MuxedTransport, UpgradedNode}; /// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to /// control, and the `Future` must be driven to completion in order for things to work. /// -pub fn swarm(upgraded: UpgradedNode, handler: H) +pub fn swarm(transport: T, upgrade: C, handler: H) -> (SwarmController, SwarmFuture) where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ @@ -40,6 +40,9 @@ pub fn swarm(upgraded: UpgradedNode, handler: H) { let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded(); let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded(); + let (new_toprocess_tx, new_toprocess_rx) = mpsc::unbounded(); + + let upgraded = transport.clone().with_upgrade(upgrade); let future = SwarmFuture { upgraded: upgraded.clone(), @@ -51,12 +54,15 @@ pub fn swarm(upgraded: UpgradedNode, handler: H) dialers: Vec::new(), new_dialers: new_dialers_rx, to_process: Vec::new(), + new_toprocess: new_toprocess_rx, }; let controller = SwarmController { + transport: transport, upgraded: upgraded, new_listeners: new_listeners_tx, new_dialers: new_dialers_tx, + new_toprocess: new_toprocess_tx, }; (controller, future) @@ -67,9 +73,11 @@ pub struct SwarmController where T: MuxedTransport + 'static, // TODO: 'static :-/ C: ConnectionUpgrade + 'static, // TODO: 'static :-/ { + transport: T, upgraded: UpgradedNode, new_listeners: mpsc::UnboundedSender>, Multiaddr), Error = IoError>>>, new_dialers: mpsc::UnboundedSender<(Box>, Multiaddr)>, + new_toprocess: mpsc::UnboundedSender>>, } impl SwarmController @@ -77,13 +85,17 @@ impl SwarmController C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ C::NamesIter: Clone, // TODO: not elegant { - /// Asks the swarm to dial the node with the given multiaddress. - /// - /// Once the connection has been open and upgraded, it will be given to the handler. + /// Asks the swarm to dial the node with the given multiaddress. The connection is then + /// upgraded using the `upgrade`, and the output is sent to the handler that was passed when + /// calling `swarm`. // TODO: consider returning a future so that errors can be processed? - pub fn dial(&self, multiaddr: Multiaddr) -> Result<(), Multiaddr> { - match self.upgraded.clone().dial(multiaddr.clone()) { + pub fn dial_to_handler(&self, multiaddr: Multiaddr, upgrade: Du) -> Result<(), Multiaddr> + where Du: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ + Du::Output: Into, + { + match self.transport.clone().with_upgrade(upgrade).dial(multiaddr.clone()) { Ok(dial) => { + let dial = Box::new(dial.map(Into::into)) as Box>; // Ignoring errors if the receiver has been closed, because in that situation // nothing is going to be processed anyway. let _ = self.new_dialers.unbounded_send((dial, multiaddr)); @@ -95,7 +107,34 @@ impl SwarmController } } - /// Adds a multiaddr to listen on. + /// Asks the swarm to dial the node with the given multiaddress. The connection is then + /// upgraded using the `upgrade`, and the output is then passed to `and_then`. + /// + /// Contrary to `dial_to_handler`, the output of the upgrade is not given to the handler that + /// was passed at initialization. + // TODO: consider returning a future so that errors can be processed? + pub fn dial_custom_handler(&self, multiaddr: Multiaddr, upgrade: Du, and_then: Df) + -> Result<(), Multiaddr> + where Du: ConnectionUpgrade + 'static, // TODO: 'static :-/ + Df: FnOnce(Du::Output) -> Dfu + 'static, // TODO: 'static :-/ + Dfu: IntoFuture + 'static, // TODO: 'static :-/ + { + match self.transport.clone().with_upgrade(upgrade).dial(multiaddr) { + Ok(dial) => { + let dial = Box::new(dial.and_then(and_then)) as Box<_>; + // Ignoring errors if the receiver has been closed, because in that situation + // nothing is going to be processed anyway. + let _ = self.new_toprocess.unbounded_send(dial); + Ok(()) + }, + Err((_, multiaddr)) => { + Err(multiaddr) + }, + } + } + + /// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that + /// was passed to `swarm`. pub fn listen_on(&self, multiaddr: Multiaddr) -> Result { match self.upgraded.clone().listen_on(multiaddr) { Ok((listener, new_addr)) => { @@ -124,7 +163,8 @@ pub struct SwarmFuture listeners_upgrade: Vec<(Box>, Multiaddr)>, dialers: Vec<(Box>, Multiaddr)>, new_dialers: mpsc::UnboundedReceiver<(Box>, Multiaddr)>, - to_process: Vec, + to_process: Vec>>>, + new_toprocess: mpsc::UnboundedReceiver>>, } impl Future for SwarmFuture @@ -143,7 +183,7 @@ impl Future for SwarmFuture match self.next_incoming.poll() { Ok(Async::Ready((connec, client_addr))) => { self.next_incoming = self.upgraded.clone().next_incoming(); - self.to_process.push(handler(connec, client_addr).into_future()); + self.to_process.push(future::Either::A(handler(connec, client_addr).into_future())); }, Ok(Async::NotReady) => {}, Err(err) => return Err(err), @@ -169,6 +209,16 @@ impl Future for SwarmFuture Ok(Async::NotReady) => {}, }; + match self.new_toprocess.poll() { + Ok(Async::Ready(Some(new_toprocess))) => { + self.to_process.push(future::Either::B(new_toprocess)); + }, + Ok(Async::Ready(None)) | Err(_) => { + // New to-process sender has been closed. + }, + Ok(Async::NotReady) => {}, + }; + for n in (0 .. self.listeners.len()).rev() { let mut listener = self.listeners.swap_remove(n); match listener.poll() { @@ -188,7 +238,7 @@ impl Future for SwarmFuture let (mut upgrade, addr) = self.listeners_upgrade.swap_remove(n); match upgrade.poll() { Ok(Async::Ready(output)) => { - self.to_process.push(handler(output, addr).into_future()); + self.to_process.push(future::Either::A(handler(output, addr).into_future())); }, Ok(Async::NotReady) => { self.listeners_upgrade.push((upgrade, addr)); @@ -201,7 +251,7 @@ impl Future for SwarmFuture let (mut dialer, addr) = self.dialers.swap_remove(n); match dialer.poll() { Ok(Async::Ready(output)) => { - self.to_process.push(handler(output, addr).into_future()); + self.to_process.push(future::Either::A(handler(output, addr).into_future())); }, Ok(Async::NotReady) => { self.dialers.push((dialer, addr)); From 4025405e490971cb7600e18f3bc0f299e3353698 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 3 Jan 2018 17:46:52 +0100 Subject: [PATCH 04/12] Add a README note about the stabilization of impl Trait --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index e389a7c8..cab1a6ab 100644 --- a/README.md +++ b/README.md @@ -28,3 +28,17 @@ Architecture of the crates of this repository: upgrade. - `rw-stream-sink`: Utility library that makes it possible to wrap around a tokio `Stream + Sink` of bytes and implements `AsyncRead + AsyncWrite`. + +## About the `impl Trait` syntax + +Right now a lot of code of this library uses `Box` or `Box` objects, or forces +`'static` lifetime bounds. + +This is caused by the lack of a stable `impl Trait` syntax in the Rust language. Once this syntax +is fully implemented and stabilized, it will be possible to change this code to use plain and +non-static objects instead of boxes. + +Progress for the `impl Trait` syntax can be tracked in [this issue of the Rust repository](https://github.com/rust-lang/rust/issues/34511). + +Once this syntax is stable in the nightly version, we will consider requiring the nightly version +of the compiler and switching to this syntax. From c39d0e71456b2eeabebbfe21f7d10c4529dec79a Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 13 Dec 2017 12:08:40 +0100 Subject: [PATCH 05/12] Implement the identify protocol --- Cargo.toml | 4 + README.md | 2 + libp2p-identify/Cargo.toml | 17 + libp2p-identify/regen_structs_proto.sh | 12 + libp2p-identify/src/lib.rs | 215 ++++++++++ libp2p-identify/src/structs_proto.rs | 550 +++++++++++++++++++++++++ libp2p-identify/structs.proto | 23 ++ 7 files changed, 823 insertions(+) create mode 100644 libp2p-identify/Cargo.toml create mode 100755 libp2p-identify/regen_structs_proto.sh create mode 100644 libp2p-identify/src/lib.rs create mode 100644 libp2p-identify/src/structs_proto.rs create mode 100644 libp2p-identify/structs.proto diff --git a/Cargo.toml b/Cargo.toml index f1fdf589..d1ec4f90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,9 @@ [workspace] members = [ + "multistream-select", + "datastore", + "example", + "libp2p-identify", "libp2p-peerstore", "libp2p-ping", "libp2p-secio", diff --git a/README.md b/README.md index cab1a6ab..c025f7d7 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,8 @@ Architecture of the crates of this repository: - `datastore`: Utility library whose API provides a key-value storage with multiple possible backends. Used by `peerstore`. - `example`: Example usages of this library. +- `libp2p-identify`: Protocol implementation that allows a node A to query another node B what + information B knows about A. Implements the `ConnectionUpgrade` trait of `libp2p-swarm`. - `libp2p-peerstore`: Generic storage for information about remote peers (their multiaddresses and their public key), with multiple possible backends. Each multiaddress also has a time-to-live. Used by `libp2p-swarm`. diff --git a/libp2p-identify/Cargo.toml b/libp2p-identify/Cargo.toml new file mode 100644 index 00000000..200e2604 --- /dev/null +++ b/libp2p-identify/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "libp2p-identify" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +bytes = "0.4" +futures = "0.1" +libp2p-peerstore = { path = "../libp2p-peerstore" } +libp2p-swarm = { path = "../libp2p-swarm" } +multiaddr = "0.2.0" +protobuf = "1.4.2" +tokio-io = "0.1.0" + +[dev-dependencies] +libp2p-tcp-transport = { path = "../libp2p-tcp-transport" } +tokio-core = "0.1.0" diff --git a/libp2p-identify/regen_structs_proto.sh b/libp2p-identify/regen_structs_proto.sh new file mode 100755 index 00000000..7c837cdf --- /dev/null +++ b/libp2p-identify/regen_structs_proto.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +# This script regenerates the `src/structs_proto.rs` and `src/keys_proto.rs` files from +# `structs.proto` and `keys.proto`. + +sudo docker run --rm -v `pwd`:/usr/code:z -w /usr/code rust /bin/bash -c " \ + apt-get update; \ + apt-get install -y protobuf-compiler; \ + cargo install protobuf; \ + protoc --rust_out . structs.proto" + +mv -f structs.rs ./src/structs_proto.rs diff --git a/libp2p-identify/src/lib.rs b/libp2p-identify/src/lib.rs new file mode 100644 index 00000000..cca90427 --- /dev/null +++ b/libp2p-identify/src/lib.rs @@ -0,0 +1,215 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the `/ipfs/id/1.0.0` protocol. Allows a node A to query another node B which +//! information B knows about A. Also includes the addresses B is listening on. +//! +//! When two nodes connect to each other, the listening half sends a message to the dialing half, +//! indicating the information, and then the protocol stops. + +extern crate bytes; +extern crate futures; +extern crate multiaddr; +extern crate libp2p_peerstore; +extern crate libp2p_swarm; +extern crate protobuf; +extern crate tokio_io; + +use bytes::{Bytes, BytesMut}; +use futures::{Future, Stream, Sink}; +use libp2p_swarm::{ConnectionUpgrade, Endpoint}; +use multiaddr::Multiaddr; +use protobuf::Message as ProtobufMessage; +use protobuf::core::parse_from_bytes as protobuf_parse_from_bytes; +use protobuf::repeated::RepeatedField; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::iter; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::length_delimited; + +mod structs_proto; + +/// Prototype for an upgrade to the identity protocol. +#[derive(Debug, Clone)] +pub struct IdentifyProtocol { + pub information: IdentifyInfo, +} + +/// Information sent from the listener to the dialer. +#[derive(Debug, Clone)] +pub struct IdentifyInfo { + /// Public key of the node. + pub public_key: Vec, + /// Version of the "global" protocol, eg. `ipfs/1.0.0`. + pub protocol_version: String, + /// Name and version. Can be thought as similar to the `User-Agent` header of HTTP. + pub agent_version: String, + /// Addresses that are listened on. + pub listen_addrs: Vec, + /// Address that the server uses to communicate with the dialer. + pub observed_addr: Multiaddr, + /// Protocols supported by the remote. + pub protocols: Vec, +} + +impl ConnectionUpgrade for IdentifyProtocol + where C: AsyncRead + AsyncWrite + 'static +{ + type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; + type UpgradeIdentifier = (); + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once((Bytes::from("/ipfs/id/1.0.0"), ())) + } + + type Output = Option; + type Future = Box>; + + fn upgrade(self, socket: C, _: (), ty: Endpoint) -> Self::Future { + // TODO: use jack's varint library instead + let socket = length_delimited::Builder::new().length_field_length(1).new_framed(socket); + + match ty { + Endpoint::Dialer => { + let future = socket.into_future() + .map(|(msg, _)| msg) + .map_err(|(err, _)| err) + .and_then(|msg| if let Some(msg) = msg { + Ok(Some(parse_proto_msg(msg)?)) + } else { + Ok(None) + }); + + Box::new(future) as Box<_> + } + + Endpoint::Listener => { + let info = self.information; + + let listen_addrs = info.listen_addrs + .into_iter() + .map(|addr| addr.to_string().into_bytes()) + .collect(); + + let mut message = structs_proto::Identify::new(); + message.set_agentVersion(info.agent_version); + message.set_protocolVersion(info.protocol_version); + message.set_publicKey(info.public_key); + message.set_listenAddrs(listen_addrs); + message.set_observedAddr(info.observed_addr.to_string().into_bytes()); + message.set_protocols(RepeatedField::from_vec(info.protocols)); + + let bytes = message.write_to_bytes().expect("we control the protobuf message"); + let future = socket.send(bytes).map(|_| None); + Box::new(future) as Box<_> + } + } + } +} + +// Turns a protobuf message into an `IdentifyInfo`. If something bad happens, turn it into +// an `IoError`. +fn parse_proto_msg(msg: BytesMut) -> Result { + match protobuf_parse_from_bytes::(&msg) { + Ok(mut msg) => { + let listen_addrs = { + let mut addrs = Vec::new(); + for addr in msg.take_listenAddrs().into_iter() { + addrs.push(bytes_to_multiaddr(addr)?); + } + addrs + }; + + let observed_addr = bytes_to_multiaddr(msg.take_observedAddr())?; + + Ok(IdentifyInfo { + public_key: msg.take_publicKey(), + protocol_version: msg.take_protocolVersion(), + agent_version: msg.take_agentVersion(), + listen_addrs: listen_addrs, + observed_addr: observed_addr, + protocols: msg.take_protocols().into_vec(), + }) + } + + Err(err) => { + Err(IoError::new(IoErrorKind::InvalidData, err)) + } + } +} + +// Turn a `Vec` into a `Multiaddr`. If something bad happens, turn it into an `IoError`. +fn bytes_to_multiaddr(bytes: Vec) -> Result { + let string = match String::from_utf8(bytes) { + Ok(b) => b, + Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), + }; + + match Multiaddr::new(&string) { + Ok(b) => Ok(b), + Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), + } +} + +#[cfg(test)] +mod tests { + extern crate libp2p_tcp_transport; + extern crate tokio_core; + + use self::libp2p_tcp_transport::TcpConfig; + use self::tokio_core::reactor::Core; + use IdentifyInfo; + use IdentifyProtocol; + use futures::{IntoFuture, Future, Stream}; + use libp2p_swarm::Transport; + use libp2p_swarm::multiaddr::Multiaddr; + + #[test] + fn basic() { + let mut core = Core::new().unwrap(); + let tcp = TcpConfig::new(core.handle()); + let with_proto = tcp.with_upgrade(IdentifyProtocol { + information: IdentifyInfo { + public_key: vec![1, 2, 3, 4], + protocol_version: "ipfs/1.0.0".to_owned(), + agent_version: "agent/version".to_owned(), + listen_addrs: vec![Multiaddr::new("/ip4/5.6.7.8/tcp/12345").unwrap()], + observed_addr: Multiaddr::new("/ip4/1.2.3.4/tcp/9876").unwrap(), + protocols: vec!["ping".to_owned(), "kad".to_owned()], + }, + }); + + let (server, addr) = with_proto.clone() + .listen_on(Multiaddr::new("/ip4/127.0.0.1/tcp/0").unwrap()) + .unwrap(); + let server = server.into_future() + .map(|(n, _)| n) + .map_err(|(err, _)| err); + let dialer = with_proto.dial(addr) + .unwrap() + .into_future(); + + let (recv, should_be_empty) = core.run(dialer.join(server)).unwrap(); + assert!(should_be_empty.unwrap().0.unwrap().is_none()); + let recv = recv.unwrap(); + assert_eq!(recv.public_key, &[1, 2, 3, 4]); + } +} diff --git a/libp2p-identify/src/structs_proto.rs b/libp2p-identify/src/structs_proto.rs new file mode 100644 index 00000000..616a5b94 --- /dev/null +++ b/libp2p-identify/src/structs_proto.rs @@ -0,0 +1,550 @@ +// This file is generated. Do not edit +// @generated + +// https://github.com/Manishearth/rust-clippy/issues/702 +#![allow(unknown_lints)] +#![allow(clippy)] + +#![cfg_attr(rustfmt, rustfmt_skip)] + +#![allow(box_pointers)] +#![allow(dead_code)] +#![allow(missing_docs)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(non_upper_case_globals)] +#![allow(trivial_casts)] +#![allow(unsafe_code)] +#![allow(unused_imports)] +#![allow(unused_results)] + +use protobuf::Message as Message_imported_for_functions; +use protobuf::ProtobufEnum as ProtobufEnum_imported_for_functions; + +#[derive(PartialEq,Clone,Default)] +pub struct Identify { + // message fields + protocolVersion: ::protobuf::SingularField<::std::string::String>, + agentVersion: ::protobuf::SingularField<::std::string::String>, + publicKey: ::protobuf::SingularField<::std::vec::Vec>, + listenAddrs: ::protobuf::RepeatedField<::std::vec::Vec>, + observedAddr: ::protobuf::SingularField<::std::vec::Vec>, + protocols: ::protobuf::RepeatedField<::std::string::String>, + // special fields + unknown_fields: ::protobuf::UnknownFields, + cached_size: ::protobuf::CachedSize, +} + +// see codegen.rs for the explanation why impl Sync explicitly +unsafe impl ::std::marker::Sync for Identify {} + +impl Identify { + pub fn new() -> Identify { + ::std::default::Default::default() + } + + pub fn default_instance() -> &'static Identify { + static mut instance: ::protobuf::lazy::Lazy = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const Identify, + }; + unsafe { + instance.get(Identify::new) + } + } + + // optional string protocolVersion = 5; + + pub fn clear_protocolVersion(&mut self) { + self.protocolVersion.clear(); + } + + pub fn has_protocolVersion(&self) -> bool { + self.protocolVersion.is_some() + } + + // Param is passed by value, moved + pub fn set_protocolVersion(&mut self, v: ::std::string::String) { + self.protocolVersion = ::protobuf::SingularField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_protocolVersion(&mut self) -> &mut ::std::string::String { + if self.protocolVersion.is_none() { + self.protocolVersion.set_default(); + } + self.protocolVersion.as_mut().unwrap() + } + + // Take field + pub fn take_protocolVersion(&mut self) -> ::std::string::String { + self.protocolVersion.take().unwrap_or_else(|| ::std::string::String::new()) + } + + pub fn get_protocolVersion(&self) -> &str { + match self.protocolVersion.as_ref() { + Some(v) => &v, + None => "", + } + } + + fn get_protocolVersion_for_reflect(&self) -> &::protobuf::SingularField<::std::string::String> { + &self.protocolVersion + } + + fn mut_protocolVersion_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::string::String> { + &mut self.protocolVersion + } + + // optional string agentVersion = 6; + + pub fn clear_agentVersion(&mut self) { + self.agentVersion.clear(); + } + + pub fn has_agentVersion(&self) -> bool { + self.agentVersion.is_some() + } + + // Param is passed by value, moved + pub fn set_agentVersion(&mut self, v: ::std::string::String) { + self.agentVersion = ::protobuf::SingularField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_agentVersion(&mut self) -> &mut ::std::string::String { + if self.agentVersion.is_none() { + self.agentVersion.set_default(); + } + self.agentVersion.as_mut().unwrap() + } + + // Take field + pub fn take_agentVersion(&mut self) -> ::std::string::String { + self.agentVersion.take().unwrap_or_else(|| ::std::string::String::new()) + } + + pub fn get_agentVersion(&self) -> &str { + match self.agentVersion.as_ref() { + Some(v) => &v, + None => "", + } + } + + fn get_agentVersion_for_reflect(&self) -> &::protobuf::SingularField<::std::string::String> { + &self.agentVersion + } + + fn mut_agentVersion_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::string::String> { + &mut self.agentVersion + } + + // optional bytes publicKey = 1; + + pub fn clear_publicKey(&mut self) { + self.publicKey.clear(); + } + + pub fn has_publicKey(&self) -> bool { + self.publicKey.is_some() + } + + // Param is passed by value, moved + pub fn set_publicKey(&mut self, v: ::std::vec::Vec) { + self.publicKey = ::protobuf::SingularField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_publicKey(&mut self) -> &mut ::std::vec::Vec { + if self.publicKey.is_none() { + self.publicKey.set_default(); + } + self.publicKey.as_mut().unwrap() + } + + // Take field + pub fn take_publicKey(&mut self) -> ::std::vec::Vec { + self.publicKey.take().unwrap_or_else(|| ::std::vec::Vec::new()) + } + + pub fn get_publicKey(&self) -> &[u8] { + match self.publicKey.as_ref() { + Some(v) => &v, + None => &[], + } + } + + fn get_publicKey_for_reflect(&self) -> &::protobuf::SingularField<::std::vec::Vec> { + &self.publicKey + } + + fn mut_publicKey_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::vec::Vec> { + &mut self.publicKey + } + + // repeated bytes listenAddrs = 2; + + pub fn clear_listenAddrs(&mut self) { + self.listenAddrs.clear(); + } + + // Param is passed by value, moved + pub fn set_listenAddrs(&mut self, v: ::protobuf::RepeatedField<::std::vec::Vec>) { + self.listenAddrs = v; + } + + // Mutable pointer to the field. + pub fn mut_listenAddrs(&mut self) -> &mut ::protobuf::RepeatedField<::std::vec::Vec> { + &mut self.listenAddrs + } + + // Take field + pub fn take_listenAddrs(&mut self) -> ::protobuf::RepeatedField<::std::vec::Vec> { + ::std::mem::replace(&mut self.listenAddrs, ::protobuf::RepeatedField::new()) + } + + pub fn get_listenAddrs(&self) -> &[::std::vec::Vec] { + &self.listenAddrs + } + + fn get_listenAddrs_for_reflect(&self) -> &::protobuf::RepeatedField<::std::vec::Vec> { + &self.listenAddrs + } + + fn mut_listenAddrs_for_reflect(&mut self) -> &mut ::protobuf::RepeatedField<::std::vec::Vec> { + &mut self.listenAddrs + } + + // optional bytes observedAddr = 4; + + pub fn clear_observedAddr(&mut self) { + self.observedAddr.clear(); + } + + pub fn has_observedAddr(&self) -> bool { + self.observedAddr.is_some() + } + + // Param is passed by value, moved + pub fn set_observedAddr(&mut self, v: ::std::vec::Vec) { + self.observedAddr = ::protobuf::SingularField::some(v); + } + + // Mutable pointer to the field. + // If field is not initialized, it is initialized with default value first. + pub fn mut_observedAddr(&mut self) -> &mut ::std::vec::Vec { + if self.observedAddr.is_none() { + self.observedAddr.set_default(); + } + self.observedAddr.as_mut().unwrap() + } + + // Take field + pub fn take_observedAddr(&mut self) -> ::std::vec::Vec { + self.observedAddr.take().unwrap_or_else(|| ::std::vec::Vec::new()) + } + + pub fn get_observedAddr(&self) -> &[u8] { + match self.observedAddr.as_ref() { + Some(v) => &v, + None => &[], + } + } + + fn get_observedAddr_for_reflect(&self) -> &::protobuf::SingularField<::std::vec::Vec> { + &self.observedAddr + } + + fn mut_observedAddr_for_reflect(&mut self) -> &mut ::protobuf::SingularField<::std::vec::Vec> { + &mut self.observedAddr + } + + // repeated string protocols = 3; + + pub fn clear_protocols(&mut self) { + self.protocols.clear(); + } + + // Param is passed by value, moved + pub fn set_protocols(&mut self, v: ::protobuf::RepeatedField<::std::string::String>) { + self.protocols = v; + } + + // Mutable pointer to the field. + pub fn mut_protocols(&mut self) -> &mut ::protobuf::RepeatedField<::std::string::String> { + &mut self.protocols + } + + // Take field + pub fn take_protocols(&mut self) -> ::protobuf::RepeatedField<::std::string::String> { + ::std::mem::replace(&mut self.protocols, ::protobuf::RepeatedField::new()) + } + + pub fn get_protocols(&self) -> &[::std::string::String] { + &self.protocols + } + + fn get_protocols_for_reflect(&self) -> &::protobuf::RepeatedField<::std::string::String> { + &self.protocols + } + + fn mut_protocols_for_reflect(&mut self) -> &mut ::protobuf::RepeatedField<::std::string::String> { + &mut self.protocols + } +} + +impl ::protobuf::Message for Identify { + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream) -> ::protobuf::ProtobufResult<()> { + while !is.eof()? { + let (field_number, wire_type) = is.read_tag_unpack()?; + match field_number { + 5 => { + ::protobuf::rt::read_singular_string_into(wire_type, is, &mut self.protocolVersion)?; + }, + 6 => { + ::protobuf::rt::read_singular_string_into(wire_type, is, &mut self.agentVersion)?; + }, + 1 => { + ::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.publicKey)?; + }, + 2 => { + ::protobuf::rt::read_repeated_bytes_into(wire_type, is, &mut self.listenAddrs)?; + }, + 4 => { + ::protobuf::rt::read_singular_bytes_into(wire_type, is, &mut self.observedAddr)?; + }, + 3 => { + ::protobuf::rt::read_repeated_string_into(wire_type, is, &mut self.protocols)?; + }, + _ => { + ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u32 { + let mut my_size = 0; + if let Some(ref v) = self.protocolVersion.as_ref() { + my_size += ::protobuf::rt::string_size(5, &v); + } + if let Some(ref v) = self.agentVersion.as_ref() { + my_size += ::protobuf::rt::string_size(6, &v); + } + if let Some(ref v) = self.publicKey.as_ref() { + my_size += ::protobuf::rt::bytes_size(1, &v); + } + for value in &self.listenAddrs { + my_size += ::protobuf::rt::bytes_size(2, &value); + }; + if let Some(ref v) = self.observedAddr.as_ref() { + my_size += ::protobuf::rt::bytes_size(4, &v); + } + for value in &self.protocols { + my_size += ::protobuf::rt::string_size(3, &value); + }; + my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields()); + self.cached_size.set(my_size); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream) -> ::protobuf::ProtobufResult<()> { + if let Some(ref v) = self.protocolVersion.as_ref() { + os.write_string(5, &v)?; + } + if let Some(ref v) = self.agentVersion.as_ref() { + os.write_string(6, &v)?; + } + if let Some(ref v) = self.publicKey.as_ref() { + os.write_bytes(1, &v)?; + } + for v in &self.listenAddrs { + os.write_bytes(2, &v)?; + }; + if let Some(ref v) = self.observedAddr.as_ref() { + os.write_bytes(4, &v)?; + } + for v in &self.protocols { + os.write_string(3, &v)?; + }; + os.write_unknown_fields(self.get_unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn get_cached_size(&self) -> u32 { + self.cached_size.get() + } + + fn get_unknown_fields(&self) -> &::protobuf::UnknownFields { + &self.unknown_fields + } + + fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields { + &mut self.unknown_fields + } + + fn as_any(&self) -> &::std::any::Any { + self as &::std::any::Any + } + fn as_any_mut(&mut self) -> &mut ::std::any::Any { + self as &mut ::std::any::Any + } + fn into_any(self: Box) -> ::std::boxed::Box<::std::any::Any> { + self + } + + fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor { + ::protobuf::MessageStatic::descriptor_static(None::) + } +} + +impl ::protobuf::MessageStatic for Identify { + fn new() -> Identify { + Identify::new() + } + + fn descriptor_static(_: ::std::option::Option) -> &'static ::protobuf::reflect::MessageDescriptor { + static mut descriptor: ::protobuf::lazy::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::reflect::MessageDescriptor, + }; + unsafe { + descriptor.get(|| { + let mut fields = ::std::vec::Vec::new(); + fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "protocolVersion", + Identify::get_protocolVersion_for_reflect, + Identify::mut_protocolVersion_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "agentVersion", + Identify::get_agentVersion_for_reflect, + Identify::mut_agentVersion_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "publicKey", + Identify::get_publicKey_for_reflect, + Identify::mut_publicKey_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "listenAddrs", + Identify::get_listenAddrs_for_reflect, + Identify::mut_listenAddrs_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_singular_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>( + "observedAddr", + Identify::get_observedAddr_for_reflect, + Identify::mut_observedAddr_for_reflect, + )); + fields.push(::protobuf::reflect::accessor::make_repeated_field_accessor::<_, ::protobuf::types::ProtobufTypeString>( + "protocols", + Identify::get_protocols_for_reflect, + Identify::mut_protocols_for_reflect, + )); + ::protobuf::reflect::MessageDescriptor::new::( + "Identify", + fields, + file_descriptor_proto() + ) + }) + } + } +} + +impl ::protobuf::Clear for Identify { + fn clear(&mut self) { + self.clear_protocolVersion(); + self.clear_agentVersion(); + self.clear_publicKey(); + self.clear_listenAddrs(); + self.clear_observedAddr(); + self.clear_protocols(); + self.unknown_fields.clear(); + } +} + +impl ::std::fmt::Debug for Identify { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for Identify { + fn as_ref(&self) -> ::protobuf::reflect::ProtobufValueRef { + ::protobuf::reflect::ProtobufValueRef::Message(self) + } +} + +static file_descriptor_proto_data: &'static [u8] = b"\ + \n\rstructs.proto\"\xda\x01\n\x08Identify\x12(\n\x0fprotocolVersion\x18\ + \x05\x20\x01(\tR\x0fprotocolVersion\x12\"\n\x0cagentVersion\x18\x06\x20\ + \x01(\tR\x0cagentVersion\x12\x1c\n\tpublicKey\x18\x01\x20\x01(\x0cR\tpub\ + licKey\x12\x20\n\x0blistenAddrs\x18\x02\x20\x03(\x0cR\x0blistenAddrs\x12\ + \"\n\x0cobservedAddr\x18\x04\x20\x01(\x0cR\x0cobservedAddr\x12\x1c\n\tpr\ + otocols\x18\x03\x20\x03(\tR\tprotocolsJ\xc2\t\n\x06\x12\x04\0\0\x16\x01\ + \n\n\n\x02\x04\0\x12\x04\0\0\x16\x01\n\n\n\x03\x04\0\x01\x12\x03\0\x08\ + \x10\nX\n\x04\x04\0\x02\0\x12\x03\x02\x02&\x1a8\x20protocolVersion\x20de\ + termines\x20compatibility\x20between\x20peers\n\"\x11\x20e.g.\x20ipfs/1.\ + 0.0\n\n\x0c\n\x05\x04\0\x02\0\x04\x12\x03\x02\x02\n\n\x0c\n\x05\x04\0\ + \x02\0\x05\x12\x03\x02\x0b\x11\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x02\ + \x12!\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x02$%\n\x9f\x01\n\x04\x04\0\ + \x02\x01\x12\x03\x06\x02#\x1a|\x20agentVersion\x20is\x20like\x20a\x20Use\ + rAgent\x20string\x20in\x20browsers,\x20or\x20client\x20version\x20in\x20\ + bittorrent\n\x20includes\x20the\x20client\x20name\x20and\x20client.\n\"\ + \x14\x20e.g.\x20go-ipfs/0.1.0\n\n\x0c\n\x05\x04\0\x02\x01\x04\x12\x03\ + \x06\x02\n\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x06\x0b\x11\n\x0c\n\x05\ + \x04\0\x02\x01\x01\x12\x03\x06\x12\x1e\n\x0c\n\x05\x04\0\x02\x01\x03\x12\ + \x03\x06!\"\n\xe3\x01\n\x04\x04\0\x02\x02\x12\x03\x0b\x02\x1f\x1a\xd5\ + \x01\x20publicKey\x20is\x20this\x20node's\x20public\x20key\x20(which\x20\ + also\x20gives\x20its\x20node.ID)\n\x20-\x20may\x20not\x20need\x20to\x20b\ + e\x20sent,\x20as\x20secure\x20channel\x20implies\x20it\x20has\x20been\ + \x20sent.\n\x20-\x20then\x20again,\x20if\x20we\x20change\x20/\x20disable\ + \x20secure\x20channel,\x20may\x20still\x20want\x20it.\n\n\x0c\n\x05\x04\ + \0\x02\x02\x04\x12\x03\x0b\x02\n\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\ + \x0b\x0b\x10\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x0b\x11\x1a\n\x0c\n\ + \x05\x04\0\x02\x02\x03\x12\x03\x0b\x1d\x1e\n]\n\x04\x04\0\x02\x03\x12\ + \x03\x0e\x02!\x1aP\x20listenAddrs\x20are\x20the\x20multiaddrs\x20the\x20\ + sender\x20node\x20listens\x20for\x20open\x20connections\x20on\n\n\x0c\n\ + \x05\x04\0\x02\x03\x04\x12\x03\x0e\x02\n\n\x0c\n\x05\x04\0\x02\x03\x05\ + \x12\x03\x0e\x0b\x10\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x0e\x11\x1c\n\ + \x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x0e\x1f\x20\n\x81\x02\n\x04\x04\0\ + \x02\x04\x12\x03\x13\x02\"\x1a\xf3\x01\x20oservedAddr\x20is\x20the\x20mu\ + ltiaddr\x20of\x20the\x20remote\x20endpoint\x20that\x20the\x20sender\x20n\ + ode\x20perceives\n\x20this\x20is\x20useful\x20information\x20to\x20conve\ + y\x20to\x20the\x20other\x20side,\x20as\x20it\x20helps\x20the\x20remote\ + \x20endpoint\n\x20determine\x20whether\x20its\x20connection\x20to\x20the\ + \x20local\x20peer\x20goes\x20through\x20NAT.\n\n\x0c\n\x05\x04\0\x02\x04\ + \x04\x12\x03\x13\x02\n\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x13\x0b\x10\ + \n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x13\x11\x1d\n\x0c\n\x05\x04\0\x02\ + \x04\x03\x12\x03\x13\x20!\n\x0b\n\x04\x04\0\x02\x05\x12\x03\x15\x02\x20\ + \n\x0c\n\x05\x04\0\x02\x05\x04\x12\x03\x15\x02\n\n\x0c\n\x05\x04\0\x02\ + \x05\x05\x12\x03\x15\x0b\x11\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x15\ + \x12\x1b\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03\x15\x1e\x1f\ +"; + +static mut file_descriptor_proto_lazy: ::protobuf::lazy::Lazy<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::lazy::Lazy { + lock: ::protobuf::lazy::ONCE_INIT, + ptr: 0 as *const ::protobuf::descriptor::FileDescriptorProto, +}; + +fn parse_descriptor_proto() -> ::protobuf::descriptor::FileDescriptorProto { + ::protobuf::parse_from_bytes(file_descriptor_proto_data).unwrap() +} + +pub fn file_descriptor_proto() -> &'static ::protobuf::descriptor::FileDescriptorProto { + unsafe { + file_descriptor_proto_lazy.get(|| { + parse_descriptor_proto() + }) + } +} diff --git a/libp2p-identify/structs.proto b/libp2p-identify/structs.proto new file mode 100644 index 00000000..0ff074e6 --- /dev/null +++ b/libp2p-identify/structs.proto @@ -0,0 +1,23 @@ +message Identify { + // protocolVersion determines compatibility between peers + optional string protocolVersion = 5; // e.g. ipfs/1.0.0 + + // agentVersion is like a UserAgent string in browsers, or client version in bittorrent + // includes the client name and client. + optional string agentVersion = 6; // e.g. go-ipfs/0.1.0 + + // publicKey is this node's public key (which also gives its node.ID) + // - may not need to be sent, as secure channel implies it has been sent. + // - then again, if we change / disable secure channel, may still want it. + optional bytes publicKey = 1; + + // listenAddrs are the multiaddrs the sender node listens for open connections on + repeated bytes listenAddrs = 2; + + // oservedAddr is the multiaddr of the remote endpoint that the sender node perceives + // this is useful information to convey to the other side, as it helps the remote endpoint + // determine whether its connection to the local peer goes through NAT. + optional bytes observedAddr = 4; + + repeated string protocols = 3; +} From aa2c4a2a452781cc8b216291d5d7d9dc397185b2 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 4 Jan 2018 12:09:39 +0100 Subject: [PATCH 06/12] Fix some concerns --- libp2p-identify/src/lib.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/libp2p-identify/src/lib.rs b/libp2p-identify/src/lib.rs index cca90427..ea1d6087 100644 --- a/libp2p-identify/src/lib.rs +++ b/libp2p-identify/src/lib.rs @@ -117,7 +117,12 @@ impl ConnectionUpgrade for IdentifyProtocol message.set_observedAddr(info.observed_addr.to_string().into_bytes()); message.set_protocols(RepeatedField::from_vec(info.protocols)); - let bytes = message.write_to_bytes().expect("we control the protobuf message"); + let bytes = message.write_to_bytes() + .expect("writing protobuf failed ; should never happen"); + + // On the server side, after sending the information to the client we make the + // future produce a `None`. If we were on the client side, this would contain the + // information received by the server. let future = socket.send(bytes).map(|_| None); Box::new(future) as Box<_> } @@ -163,7 +168,7 @@ fn bytes_to_multiaddr(bytes: Vec) -> Result { Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), }; - match Multiaddr::new(&string) { + match string.parse() { Ok(b) => Ok(b), Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), } @@ -180,7 +185,6 @@ mod tests { use IdentifyProtocol; use futures::{IntoFuture, Future, Stream}; use libp2p_swarm::Transport; - use libp2p_swarm::multiaddr::Multiaddr; #[test] fn basic() { @@ -191,24 +195,24 @@ mod tests { public_key: vec![1, 2, 3, 4], protocol_version: "ipfs/1.0.0".to_owned(), agent_version: "agent/version".to_owned(), - listen_addrs: vec![Multiaddr::new("/ip4/5.6.7.8/tcp/12345").unwrap()], - observed_addr: Multiaddr::new("/ip4/1.2.3.4/tcp/9876").unwrap(), + listen_addrs: vec!["/ip4/5.6.7.8/tcp/12345".parse().unwrap()], + observed_addr: "/ip4/1.2.3.4/tcp/9876".parse().unwrap(), protocols: vec!["ping".to_owned(), "kad".to_owned()], }, }); let (server, addr) = with_proto.clone() - .listen_on(Multiaddr::new("/ip4/127.0.0.1/tcp/0").unwrap()) + .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) .unwrap(); let server = server.into_future() - .map(|(n, _)| n) - .map_err(|(err, _)| err); + .map_err(|(err, _)| err) + .and_then(|(n, _)| n.unwrap().0); let dialer = with_proto.dial(addr) .unwrap() .into_future(); let (recv, should_be_empty) = core.run(dialer.join(server)).unwrap(); - assert!(should_be_empty.unwrap().0.unwrap().is_none()); + assert!(should_be_empty.is_none()); let recv = recv.unwrap(); assert_eq!(recv.public_key, &[1, 2, 3, 4]); } From ed5a24c591818c173e70cf088ea04c0a1e7311f2 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 4 Jan 2018 12:11:21 +0100 Subject: [PATCH 07/12] More concerns --- libp2p-identify/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/libp2p-identify/src/lib.rs b/libp2p-identify/src/lib.rs index ea1d6087..4b85b2ab 100644 --- a/libp2p-identify/src/lib.rs +++ b/libp2p-identify/src/lib.rs @@ -74,15 +74,14 @@ impl ConnectionUpgrade for IdentifyProtocol { type NamesIter = iter::Once<(Bytes, Self::UpgradeIdentifier)>; type UpgradeIdentifier = (); + type Output = Option; + type Future = Box>; #[inline] fn protocol_names(&self) -> Self::NamesIter { iter::once((Bytes::from("/ipfs/id/1.0.0"), ())) } - type Output = Option; - type Future = Box>; - fn upgrade(self, socket: C, _: (), ty: Endpoint) -> Self::Future { // TODO: use jack's varint library instead let socket = length_delimited::Builder::new().length_field_length(1).new_framed(socket); From 6b7fc9508e1581bffe2779f7a75ab8b57cc247d6 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 4 Jan 2018 17:18:49 +0100 Subject: [PATCH 08/12] Update the echo dialer example --- example/examples/echo-dialer.rs | 80 ++++++++++++++++----------------- libp2p-swarm/src/lib.rs | 1 + libp2p-swarm/src/transport.rs | 23 ++++++++++ 3 files changed, 64 insertions(+), 40 deletions(-) diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index 06dea677..85023651 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -30,7 +30,7 @@ extern crate tokio_io; use bytes::BytesMut; use futures::{Future, Sink, Stream}; use std::env; -use swarm::{UpgradeExt, SimpleProtocol, Transport, MuxedTransport}; +use swarm::{UpgradeExt, SimpleProtocol, Transport, DeniedConnectionUpgrade}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::codec::length_delimited; @@ -70,51 +70,51 @@ fn main() { // a `Transport`. .into_connection_reuse(); - let transport_with_echo = transport - .clone() - // On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol - // just for this example. - // For this purpose, we create a `SimpleProtocol` struct. - .with_upgrade(SimpleProtocol::new("/echo/1.0.0", |socket| { - // This closure is called whenever a stream using the "echo" protocol has been - // successfully negotiated. The parameter is the raw socket (implements the AsyncRead - // and AsyncWrite traits), and the closure must return an implementation of - // `IntoFuture` that can yield any type of object. - Ok(length_delimited::Framed::<_, BytesMut>::new(socket)) - })); + // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming + // connections for us. The second parameter we pass is the connection upgrade that is accepted + // by the listening part. We don't want to accept anything, so we pass a dummy object that + // represents a connection that is always denied. + let (swarm_controller, swarm_future) = swarm::swarm(transport, DeniedConnectionUpgrade, + |_socket, _client_addr| -> Result<(), _> { + unreachable!("All incoming connections should have been denied") + }); - // We now have a `transport` variable that can be used either to dial nodes or listen to - // incoming connections, and that will automatically apply all the selected protocols on top - // of any opened stream. + // Building a struct that represents the protocol that we are going to use for dialing. + let proto = SimpleProtocol::new("/echo/1.0.0", |socket| { + // This closure is called whenever a stream using the "echo" protocol has been + // successfully negotiated. The parameter is the raw socket (implements the AsyncRead + // and AsyncWrite traits), and the closure must return an implementation of + // `IntoFuture` that can yield any type of object. + Ok(length_delimited::Framed::<_, BytesMut>::new(socket)) + }); - // We use it to dial the address. - let dialer = transport_with_echo - .dial(swarm::Multiaddr::new(&target_addr).expect("invalid multiaddr")) - // If the multiaddr protocol exists but is not supported, then we get an error containing - // the transport and the original multiaddress. Therefore we cannot directly use `unwrap()` - // or `expect()`, but have to add a `map_err()` beforehand. - .map_err(|(_, addr)| addr).expect("unsupported multiaddr") - - .and_then(|echo| { - // `echo` is what the closure used when initializing "echo" returns. + // We now use the controller to dial to the address. + swarm_controller + .dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), proto, |echo| { + // `echo` is what the closure used when initializing `proto` returns. // Consequently, please note that the `send` method is available only because the type // `length_delimited::Framed` has a `send` method. println!("Sending \"hello world\" to listener"); echo.send("hello world".into()) - }) - .and_then(|echo| { - // The message has been successfully sent. Now wait for an answer. - echo.into_future() - .map(|(msg, rest)| { - println!("Received message from listener: {:?}", msg); - rest + // Then listening for one message from the remote. + .and_then(|echo| { + echo.into_future().map_err(|(e, _)| e).map(|(n,_ )| n) }) - .map_err(|(err, _)| err) - }); + .and_then(|message| { + println!("Received message from listener: {:?}", message.unwrap()); + Ok(()) + }) + }) + // If the multiaddr protocol exists but is not supported, then we get an error containing + // the original multiaddress. + .expect("unsupported multiaddr"); - // `dialer` is a future that contains all the behaviour that we want, but nothing has actually - // started yet. Because we created the `TcpConfig` with tokio, we need to run the future - // through the tokio core. - core.run(dialer.map(|_| ()).select(transport.incoming().for_each(|_| Ok(())))) - .unwrap_or_else(|_| panic!()); + // The address we actually listen on can be different from the address that was passed to + // the `listen_on` function. For example if you pass `/ip4/0.0.0.0/tcp/0`, then the port `0` + // will be replaced with the actual port. + + // `swarm_future` is a future that contains all the behaviour that we want, but nothing has + // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the + // future through the tokio core. + core.run(swarm_future).unwrap(); } diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index b7da8738..fd5041f5 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -224,3 +224,4 @@ pub use self::muxing::StreamMuxer; pub use self::swarm::{swarm, SwarmController, SwarmFuture}; pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade}; pub use self::transport::{Endpoint, SimpleProtocol, MuxedTransport, UpgradeExt}; +pub use self::transport::{DeniedConnectionUpgrade}; diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 8099f988..80405b60 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -534,6 +534,29 @@ pub enum Endpoint { Listener, } +/// Implementation of `ConnectionUpgrade` that always fails to negotiate. +#[derive(Debug, Copy, Clone)] +pub struct DeniedConnectionUpgrade; + +impl ConnectionUpgrade for DeniedConnectionUpgrade + where C: AsyncRead + AsyncWrite +{ + type NamesIter = iter::Empty<(Bytes, ())>; + type UpgradeIdentifier = (); // TODO: could use `!` + type Output = (); // TODO: could use `!` + type Future = Box>; // TODO: could use `!` + + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::empty() + } + + #[inline] + fn upgrade(self, _: C, _: Self::UpgradeIdentifier, _: Endpoint) -> Self::Future { + unreachable!("the denied connection upgrade always fails to negotiate") + } +} + /// Extension trait for `ConnectionUpgrade`. Automatically implemented on everything. pub trait UpgradeExt { /// Builds a struct that will choose an upgrade between `self` and `other`, depending on what From c4cbfe4452dd5f13d281310c0f7033a635e58493 Mon Sep 17 00:00:00 2001 From: Vurich Date: Fri, 15 Dec 2017 17:38:10 +0100 Subject: [PATCH 09/12] Fix varint panicking on large numbers --- varint-rs/src/lib.rs | 87 +++++++++++++++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 21 deletions(-) diff --git a/varint-rs/src/lib.rs b/varint-rs/src/lib.rs index 4a0a4855..7c660bf6 100644 --- a/varint-rs/src/lib.rs +++ b/varint-rs/src/lib.rs @@ -123,34 +123,48 @@ pub trait EncoderHelper: Sized { /// Helper trait to allow multiple integer types to be encoded pub trait DecoderHelper: Sized { /// Decode a single byte - fn decode_one(decoder: &mut DecoderState, byte: u8) -> Option; + fn decode_one(decoder: &mut DecoderState, byte: u8) -> errors::Result>; /// Read as much of the varint as possible fn read(decoder: &mut DecoderState, input: R) -> Poll, Error>; } macro_rules! impl_decoderstate { - ($t:ty) => { impl_decoderstate!($t, <$t>::from, |v| v); }; + ($t:ty) => { + impl_decoderstate!( + $t, + |a| a as $t, + |a: $t, b| -> Option<$t> { a.checked_shl(b as u32) } + ); + }; ($t:ty, $make_fn:expr) => { impl_decoderstate!($t, $make_fn, $make_fn); }; - ($t:ty, $make_fn:expr, $make_shift_fn:expr) => { + ($t:ty, $make_fn:expr, $shift_fn:expr) => { impl DecoderHelper for $t { #[inline] - fn decode_one(decoder: &mut DecoderState, byte: u8) -> Option<$t> { - decoder.accumulator.take().and_then(|accumulator| { - let out = accumulator | - ( - $make_fn(byte & 0x7F) << - $make_shift_fn(decoder.shift * USABLE_BITS_PER_BYTE) - ); + fn decode_one(decoder: &mut DecoderState, byte: u8) -> ::errors::Result> { + let res = decoder.accumulator.take().and_then(|accumulator| { + let out = accumulator | match $shift_fn( + $make_fn(byte & 0x7F), + decoder.shift * USABLE_BITS_PER_BYTE, + ) { + Some(a) => a, + None => return Some(Err(ErrorKind::ParseError.into())), + }; decoder.shift += 1; if byte & 0x80 == 0 { - Some(out) + Some(Ok(out)) } else { decoder.accumulator = AccumulatorState::InProgress(out); None } - }) + }); + + match res { + Some(Ok(number)) => Ok(Some(number)), + Some(Err(err)) => Err(err), + None => Ok(None), + } } fn read( @@ -173,7 +187,7 @@ macro_rules! impl_decoderstate { match input.read_exact(&mut buffer) { Ok(()) => { - if let Some(out) = Self::decode_one(decoder, buffer[0]) { + if let Some(out) = Self::decode_one(decoder, buffer[0])? { break Ok(Async::Ready(Some(out))); } } @@ -258,9 +272,9 @@ impl_encoderstate!(u64, (|val| val as u64)); impl_encoderstate!(u32, (|val| val as u32)); impl_decoderstate!(usize); -impl_decoderstate!(BigUint); -impl_decoderstate!(u64, (|val| val as u64)); -impl_decoderstate!(u32, (|val| val as u32)); +impl_decoderstate!(BigUint, BigUint::from, |a, b| Some(a << b)); +impl_decoderstate!(u64); +impl_decoderstate!(u32); impl EncoderState { pub fn source(&self) -> &T { @@ -368,7 +382,8 @@ impl Decoder for VarintDecoder { // We know that the length is not 0, so this cannot fail. let first_byte = src.split_to(1)[0]; let mut state = self.state.take().unwrap_or_default(); - let out = T::decode_one(&mut state, first_byte); + let out = T::decode_one(&mut state, first_byte) + .map_err(|_| io::Error::from(io::ErrorKind::Other))?; if let Some(out) = out { break Ok(Some(out)); @@ -390,10 +405,12 @@ pub fn decode(mut input: R) -> errors::Resu match input.read_exact(&mut buffer) { Ok(()) => { - if let Some(out) = T::decode_one(&mut decoder, buffer[0]) { - break Ok(out); - } - } + if let Some(out) = T::decode_one(&mut decoder, buffer[0]) + .map_err(|_| io::Error::from(io::ErrorKind::Other))? + { + break Ok(out); + } + } Err(inner) => break Err(Error::with_chain(inner, ErrorKind::ParseError)), } } @@ -417,6 +434,34 @@ mod tests { use num_bigint::BigUint; use futures::{Future, Stream}; + #[test] + fn large_number_fails() { + use std::io::Cursor; + use futures::Async; + use super::WriteState; + + let mut out = vec![0u8; 10]; + + { + let writable: Cursor<&mut [_]> = Cursor::new(&mut out); + + let mut state = EncoderState::new(::std::u64::MAX); + + assert_eq!( + state.write(writable).unwrap(), + Async::Ready(WriteState::Done(10)) + ); + } + + let result: Result, _> = FramedRead::new(&out[..], VarintDecoder::new()) + .into_future() + .map(|(out, _)| out) + .map_err(|(out, _)| out) + .wait(); + + assert!(result.is_err()); + } + #[test] fn can_decode_basic_biguint() { assert_eq!( From 8fa8fa3e7b863f1a11c16c0454326270127404b4 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 8 Jan 2018 16:25:12 +0100 Subject: [PATCH 10/12] More concerns --- libp2p-identify/src/lib.rs | 45 ++++++++++++++++++++++---------------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/libp2p-identify/src/lib.rs b/libp2p-identify/src/lib.rs index 4b85b2ab..54d4c57e 100644 --- a/libp2p-identify/src/lib.rs +++ b/libp2p-identify/src/lib.rs @@ -52,6 +52,16 @@ pub struct IdentifyProtocol { pub information: IdentifyInfo, } +impl IdentifyProtocol { + /// Builds a new `IdentifyProtocol`. + #[inline] + pub fn new(information: IdentifyInfo) -> IdentifyProtocol { + IdentifyProtocol { + information + } + } +} + /// Information sent from the listener to the dialer. #[derive(Debug, Clone)] pub struct IdentifyInfo { @@ -162,15 +172,14 @@ fn parse_proto_msg(msg: BytesMut) -> Result { // Turn a `Vec` into a `Multiaddr`. If something bad happens, turn it into an `IoError`. fn bytes_to_multiaddr(bytes: Vec) -> Result { - let string = match String::from_utf8(bytes) { - Ok(b) => b, - Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), - }; - - match string.parse() { - Ok(b) => Ok(b), - Err(err) => return Err(IoError::new(IoErrorKind::InvalidData, err)), - } + String::from_utf8(bytes) + .map_err(|err| { + IoError::new(IoErrorKind::InvalidData, err) + }) + .and_then(|s| { + s.parse() + .map_err(|err| IoError::new(IoErrorKind::InvalidData, err)) + }) } #[cfg(test)] @@ -189,16 +198,14 @@ mod tests { fn basic() { let mut core = Core::new().unwrap(); let tcp = TcpConfig::new(core.handle()); - let with_proto = tcp.with_upgrade(IdentifyProtocol { - information: IdentifyInfo { - public_key: vec![1, 2, 3, 4], - protocol_version: "ipfs/1.0.0".to_owned(), - agent_version: "agent/version".to_owned(), - listen_addrs: vec!["/ip4/5.6.7.8/tcp/12345".parse().unwrap()], - observed_addr: "/ip4/1.2.3.4/tcp/9876".parse().unwrap(), - protocols: vec!["ping".to_owned(), "kad".to_owned()], - }, - }); + let with_proto = tcp.with_upgrade(IdentifyProtocol::new(IdentifyInfo { + public_key: vec![1, 2, 3, 4], + protocol_version: "ipfs/1.0.0".to_owned(), + agent_version: "agent/version".to_owned(), + listen_addrs: vec!["/ip4/5.6.7.8/tcp/12345".parse().unwrap()], + observed_addr: "/ip4/1.2.3.4/tcp/9876".parse().unwrap(), + protocols: vec!["ping".to_owned(), "kad".to_owned()], + })); let (server, addr) = with_proto.clone() .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) From 01b474d35007cdc01e83ec9a69a74e79d5f37728 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 9 Jan 2018 11:52:12 +0100 Subject: [PATCH 11/12] Fix using wrong multistream endpoint --- libp2p-swarm/src/swarm.rs | 3 +++ libp2p-swarm/src/transport.rs | 8 +++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/libp2p-swarm/src/swarm.rs b/libp2p-swarm/src/swarm.rs index 8d1e27bd..7f1cea45 100644 --- a/libp2p-swarm/src/swarm.rs +++ b/libp2p-swarm/src/swarm.rs @@ -35,6 +35,7 @@ pub fn swarm(transport: T, upgrade: C, handler: H) -> (SwarmController, SwarmFuture) where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ + C::NamesIter: Clone, // TODO: not elegant H: FnMut(C::Output, Multiaddr) -> F, F: IntoFuture, { @@ -170,6 +171,7 @@ pub struct SwarmFuture impl Future for SwarmFuture where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/, C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ + C::NamesIter: Clone, // TODO: not elegant H: FnMut(C::Output, Multiaddr) -> If, If: IntoFuture, F: Future, @@ -186,6 +188,7 @@ impl Future for SwarmFuture self.to_process.push(future::Either::A(handler(connec, client_addr).into_future())); }, Ok(Async::NotReady) => {}, + // TODO: may not be the best idea because we're killing the whole server Err(err) => return Err(err), }; diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 8099f988..15042a6d 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -811,7 +811,9 @@ where /// This function returns the next incoming substream. You are strongly encouraged to call it /// if you have a muxed transport. pub fn next_incoming(self) -> Box + 'a> - where T: MuxedTransport + where T: MuxedTransport, + C::NamesIter: Clone, // TODO: not elegant + C: Clone, { let upgrade = self.upgrade; @@ -819,8 +821,8 @@ where // Try to negotiate the protocol. .and_then(move |(connection, addr)| { let iter = upgrade.protocol_names() - .map(|(name, id)| (name, ::eq, id)); - let negotiated = multistream_select::dialer_select_proto(connection, iter) + .map::<_, fn(_) -> _>(|(name, id)| (name, ::eq, id)); + let negotiated = multistream_select::listener_select_proto(connection, iter) .map_err(|err| IoError::new(IoErrorKind::Other, err)); negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) }) From acbe1d0386ea13e6c4c08ddce5208905d12bda2d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 28 Dec 2017 18:07:49 +0100 Subject: [PATCH 12/12] No longer use deprecated function Multiaddr::new --- example/examples/echo-server.rs | 2 +- libp2p-peerstore/README.md | 4 +- libp2p-peerstore/src/lib.rs | 4 +- libp2p-peerstore/src/peer_info.rs | 2 +- libp2p-peerstore/src/peerstore_tests.rs | 14 ++-- libp2p-ping/README.md | 2 +- libp2p-ping/src/lib.rs | 2 +- libp2p-secio/README.md | 2 +- libp2p-secio/src/lib.rs | 2 +- libp2p-swarm/README.md | 2 +- libp2p-swarm/src/lib.rs | 2 +- libp2p-tcp-transport/src/lib.rs | 23 +++--- rust-multiaddr/README.md | 2 +- rust-multiaddr/src/lib.rs | 96 ++++++++++--------------- rust-multiaddr/tests/lib.rs | 26 +++---- 15 files changed, 81 insertions(+), 104 deletions(-) diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index 99e7aa62..d185c364 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -117,7 +117,7 @@ fn main() { // We now use the controller to listen on the address. let address = swarm_controller - .listen_on(swarm::Multiaddr::new(&listen_addr).expect("invalid multiaddr")) + .listen_on(listen_addr.parse().expect("invalid multiaddr")) // If the multiaddr protocol exists but is not supported, then we get an error containing // the original multiaddress. .expect("unsupported multiaddr"); diff --git a/libp2p-peerstore/README.md b/libp2p-peerstore/README.md index cbde2e99..f5d3c9cb 100644 --- a/libp2p-peerstore/README.md +++ b/libp2p-peerstore/README.md @@ -34,7 +34,7 @@ let peer_id = vec![1, 2, 3, 4]; // `peer_or_create` mutably borrows the peerstore, so we have to do it in a local scope. let mut peer = peerstore.peer_or_create(&peer_id); peer.set_pub_key(vec![60, 90, 120, 150]); - peer.add_addr(Multiaddr::new("/ip4/10.11.12.13/tcp/20000").unwrap(), + peer.add_addr("/ip4/10.11.12.13/tcp/20000".parse::().unwrap(), Duration::from_millis(5000)); } @@ -43,6 +43,6 @@ let peer_id = vec![1, 2, 3, 4]; let mut peer = peerstore.peer(&peer_id).expect("peer doesn't exist in the peerstore"); assert_eq!(peer.get_pub_key().unwrap(), &[60, 90, 120, 150]); assert_eq!(peer.addrs().collect::>(), - &[Multiaddr::new("/ip4/10.11.12.13/tcp/20000").unwrap()]); + &["/ip4/10.11.12.13/tcp/20000".parse::().unwrap()]); } ``` diff --git a/libp2p-peerstore/src/lib.rs b/libp2p-peerstore/src/lib.rs index 6a329eaa..ea7598c0 100644 --- a/libp2p-peerstore/src/lib.rs +++ b/libp2p-peerstore/src/lib.rs @@ -55,7 +55,7 @@ //! // `peer_or_create` mutably borrows the peerstore, so we have to do it in a local scope. //! let mut peer = peerstore.peer_or_create(&peer_id); //! peer.set_pub_key(vec![60, 90, 120, 150]); -//! peer.add_addr(Multiaddr::new("/ip4/10.11.12.13/tcp/20000").unwrap(), +//! peer.add_addr("/ip4/10.11.12.13/tcp/20000".parse::().unwrap(), //! Duration::from_millis(5000)); //! } //! @@ -64,7 +64,7 @@ //! let mut peer = peerstore.peer(&peer_id).expect("peer doesn't exist in the peerstore"); //! assert_eq!(peer.get_pub_key().unwrap(), &[60, 90, 120, 150]); //! assert_eq!(peer.addrs().collect::>(), -//! &[Multiaddr::new("/ip4/10.11.12.13/tcp/20000").unwrap()]); +//! &["/ip4/10.11.12.13/tcp/20000".parse::().unwrap()]); //! } //! # } //! ``` diff --git a/libp2p-peerstore/src/peer_info.rs b/libp2p-peerstore/src/peer_info.rs index ff23d3af..924952d7 100644 --- a/libp2p-peerstore/src/peer_info.rs +++ b/libp2p-peerstore/src/peer_info.rs @@ -162,7 +162,7 @@ impl<'de> Deserialize<'de> for PeerInfo { let addrs = { let mut out = Vec::with_capacity(interm.addrs.len()); for (addr, since_epoch) in interm.addrs { - let addr = match Multiaddr::new(&addr) { + let addr = match addr.parse::() { Ok(a) => a, Err(err) => return Err(DeserializerError::custom(err)), }; diff --git a/libp2p-peerstore/src/peerstore_tests.rs b/libp2p-peerstore/src/peerstore_tests.rs index 5c54e41f..08dc30a9 100644 --- a/libp2p-peerstore/src/peerstore_tests.rs +++ b/libp2p-peerstore/src/peerstore_tests.rs @@ -62,7 +62,7 @@ macro_rules! peerstore_tests { $($stmt;)* let peer_store = $create_peerstore; let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap(); - let addr = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); + let addr = "/ip4/0.0.0.0/tcp/0".parse::().unwrap(); peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(5000)); @@ -76,7 +76,7 @@ macro_rules! peerstore_tests { $($stmt;)* let peer_store = $create_peerstore; let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap(); - let addr = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); + let addr = "/ip4/0.0.0.0/tcp/0".parse::().unwrap(); peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(0)); thread::sleep(Duration::from_millis(2)); @@ -90,7 +90,7 @@ macro_rules! peerstore_tests { $($stmt;)* let peer_store = $create_peerstore; let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap(); - let addr = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); + let addr = "/ip4/0.0.0.0/tcp/0".parse::().unwrap(); peer_store.peer_or_create(&peer_id).add_addr(addr.clone(), Duration::from_millis(5000)); peer_store.peer(&peer_id).unwrap().clear_addrs(); @@ -105,8 +105,8 @@ macro_rules! peerstore_tests { let peer_store = $create_peerstore; let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap(); - let addr1 = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); - let addr2 = Multiaddr::new("/ip4/0.0.0.1/tcp/0").unwrap(); + let addr1 = "/ip4/0.0.0.0/tcp/0".parse::().unwrap(); + let addr2 = "/ip4/0.0.0.1/tcp/0".parse::().unwrap(); peer_store.peer_or_create(&peer_id).add_addr(addr1.clone(), Duration::from_millis(5000)); peer_store.peer_or_create(&peer_id).add_addr(addr2.clone(), Duration::from_millis(5000)); @@ -124,8 +124,8 @@ macro_rules! peerstore_tests { let peer_store = $create_peerstore; let peer_id = multihash::encode(multihash::Hash::SHA2512, &[1, 2, 3]).unwrap(); - let addr1 = Multiaddr::new("/ip4/0.0.0.0/tcp/0").unwrap(); - let addr2 = Multiaddr::new("/ip4/0.0.0.1/tcp/0").unwrap(); + let addr1 = "/ip4/0.0.0.0/tcp/0".parse::().unwrap(); + let addr2 = "/ip4/0.0.0.1/tcp/0".parse::().unwrap(); peer_store.peer_or_create(&peer_id).add_addr(addr1.clone(), Duration::from_millis(5000)); peer_store.peer_or_create(&peer_id).add_addr(addr2.clone(), Duration::from_millis(5000)); diff --git a/libp2p-ping/README.md b/libp2p-ping/README.md index 620235c7..a88f5987 100644 --- a/libp2p-ping/README.md +++ b/libp2p-ping/README.md @@ -43,7 +43,7 @@ let mut core = tokio_core::reactor::Core::new().unwrap(); let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) .with_upgrade(Ping) - .dial(libp2p_swarm::Multiaddr::new("127.0.0.1:12345").unwrap()).unwrap_or_else(|_| panic!()) + .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!()) .and_then(|(mut pinger, service)| { pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) }); diff --git a/libp2p-ping/src/lib.rs b/libp2p-ping/src/lib.rs index 98c30f29..21c64ea7 100644 --- a/libp2p-ping/src/lib.rs +++ b/libp2p-ping/src/lib.rs @@ -67,7 +67,7 @@ //! //! let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) //! .with_upgrade(Ping) -//! .dial(libp2p_swarm::Multiaddr::new("127.0.0.1:12345").unwrap()).unwrap_or_else(|_| panic!()) +//! .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!()) //! .and_then(|(mut pinger, service)| { //! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) //! }); diff --git a/libp2p-secio/README.md b/libp2p-secio/README.md index a39e8752..37b07204 100644 --- a/libp2p-secio/README.md +++ b/libp2p-secio/README.md @@ -37,7 +37,7 @@ let transport = TcpConfig::new(core.handle()) } }); -let future = transport.dial(Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap()) +let future = transport.dial("/ip4/127.0.0.1/tcp/12345".parse::().unwrap()) .unwrap_or_else(|_| panic!("Unable to dial node")) .and_then(|connection| { // Sends "hello world" on the connection, will be encrypted. diff --git a/libp2p-secio/src/lib.rs b/libp2p-secio/src/lib.rs index bcece16f..95c0cbcc 100644 --- a/libp2p-secio/src/lib.rs +++ b/libp2p-secio/src/lib.rs @@ -58,7 +58,7 @@ //! } //! }); //! -//! let future = transport.dial(Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap()) +//! let future = transport.dial("/ip4/127.0.0.1/tcp/12345".parse::().unwrap()) //! .unwrap_or_else(|_| panic!("Unable to dial node")) //! .and_then(|connection| { //! // Sends "hello world" on the connection, will be encrypted. diff --git a/libp2p-swarm/README.md b/libp2p-swarm/README.md index cd13dde4..62ebae13 100644 --- a/libp2p-swarm/README.md +++ b/libp2p-swarm/README.md @@ -124,7 +124,7 @@ let ping_finished_future = libp2p_tcp_transport::TcpConfig::new(core.handle()) .with_upgrade(Ping) // TODO: right now the only available protocol is ping, but we want to replace it with // something that is more simple to use - .dial(libp2p_swarm::Multiaddr::new("127.0.0.1:12345").unwrap()).unwrap_or_else(|_| panic!()) + .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!()) .and_then(|(mut pinger, service)| { pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) }); diff --git a/libp2p-swarm/src/lib.rs b/libp2p-swarm/src/lib.rs index fd5041f5..2171f665 100644 --- a/libp2p-swarm/src/lib.rs +++ b/libp2p-swarm/src/lib.rs @@ -148,7 +148,7 @@ //! .with_upgrade(Ping) //! // TODO: right now the only available protocol is ping, but we want to replace it with //! // something that is more simple to use -//! .dial(libp2p_swarm::Multiaddr::new("127.0.0.1:12345").unwrap()).unwrap_or_else(|_| panic!()) +//! .dial("127.0.0.1:12345".parse::().unwrap()).unwrap_or_else(|_| panic!()) //! .and_then(|(mut pinger, service)| { //! pinger.ping().map_err(|_| panic!()).select(service).map_err(|_| panic!()) //! }); diff --git a/libp2p-tcp-transport/src/lib.rs b/libp2p-tcp-transport/src/lib.rs index cc311769..f7e9ed85 100644 --- a/libp2p-tcp-transport/src/lib.rs +++ b/libp2p-tcp-transport/src/lib.rs @@ -168,33 +168,32 @@ mod tests { fn multiaddr_to_tcp_conversion() { use std::net::Ipv6Addr; - assert!(multiaddr_to_socketaddr(&Multiaddr::new("/ip4/127.0.0.1/udp/1234").unwrap()).is_err()); + assert!(multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()).is_err()); assert_eq!( - multiaddr_to_socketaddr(&Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap()), + multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::().unwrap()), Ok(SocketAddr::new( IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345, )) ); assert_eq!( - multiaddr_to_socketaddr(&Multiaddr::new("/ip4/255.255.255.255/tcp/8080").unwrap()), + multiaddr_to_socketaddr(&"/ip4/255.255.255.255/tcp/8080".parse::().unwrap()), Ok(SocketAddr::new( IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080, )) ); assert_eq!( - multiaddr_to_socketaddr(&Multiaddr::new("/ip6/::1/tcp/12345").unwrap()), + multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::().unwrap()), Ok(SocketAddr::new( IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345, )) ); assert_eq!( - multiaddr_to_socketaddr(&Multiaddr::new( - "/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080", - ).unwrap()), + multiaddr_to_socketaddr(&"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080" + .parse::().unwrap()), Ok(SocketAddr::new( IpAddr::V6(Ipv6Addr::new( 65535, @@ -217,7 +216,7 @@ mod tests { std::thread::spawn(move || { let mut core = Core::new().unwrap(); - let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); + let addr = "/ip4/127.0.0.1/tcp/12345".parse::().unwrap(); let tcp = TcpConfig::new(core.handle()); let handle = core.handle(); let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| { @@ -238,7 +237,7 @@ mod tests { core.run(listener).unwrap(); }); std::thread::sleep(std::time::Duration::from_millis(100)); - let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); + let addr = "/ip4/127.0.0.1/tcp/12345".parse::().unwrap(); let mut core = Core::new().unwrap(); let tcp = TcpConfig::new(core.handle()); // Obtain a future socket through dialing @@ -261,7 +260,7 @@ mod tests { let core = Core::new().unwrap(); let tcp = TcpConfig::new(core.handle()); - let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/0").unwrap(); + let addr = "/ip4/127.0.0.1/tcp/0".parse::().unwrap(); assert!(addr.to_string().contains("tcp/0")); let (_, new_addr) = tcp.listen_on(addr).unwrap(); @@ -273,7 +272,7 @@ mod tests { let core = Core::new().unwrap(); let tcp = TcpConfig::new(core.handle()); - let addr = Multiaddr::new("/ip6/::1/tcp/0").unwrap(); + let addr: Multiaddr = "/ip6/::1/tcp/0".parse().unwrap(); assert!(addr.to_string().contains("tcp/0")); let (_, new_addr) = tcp.listen_on(addr).unwrap(); @@ -285,7 +284,7 @@ mod tests { let core = Core::new().unwrap(); let tcp = TcpConfig::new(core.handle()); - let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345/tcp/12345").unwrap(); + let addr = "/ip4/127.0.0.1/tcp/12345/tcp/12345".parse::().unwrap(); assert!(tcp.listen_on(addr).is_err()); } } diff --git a/rust-multiaddr/README.md b/rust-multiaddr/README.md index d611edc7..56e5a80f 100644 --- a/rust-multiaddr/README.md +++ b/rust-multiaddr/README.md @@ -38,7 +38,7 @@ extern crate multiaddr; use multiaddr::{Multiaddr, ToMultiaddr}; -let address = Multiaddr::new("/ip4/127.0.0.1/udp/1234").unwrap(); +let address = "/ip4/127.0.0.1/udp/1234".parse::().unwrap(); // or directly from a string let other = "/ip4/127.0.0.1".to_multiaddr().unwrap(); diff --git a/rust-multiaddr/src/lib.rs b/rust-multiaddr/src/lib.rs index 617556ee..f80adb2d 100644 --- a/rust-multiaddr/src/lib.rs +++ b/rust-multiaddr/src/lib.rs @@ -38,7 +38,7 @@ impl fmt::Display for Multiaddr { /// ``` /// use multiaddr::Multiaddr; /// - /// let address = Multiaddr::new("/ip4/127.0.0.1/udt").unwrap(); + /// let address: Multiaddr = "/ip4/127.0.0.1/udt".parse().unwrap(); /// assert_eq!(address.to_string(), "/ip4/127.0.0.1/udt"); /// ``` /// @@ -52,51 +52,6 @@ impl fmt::Display for Multiaddr { } impl Multiaddr { - /// Create a new multiaddr based on a string representation, like - /// `/ip4/127.0.0.1/udp/1234`. - /// - /// # Examples - /// - /// Simple construction - /// - /// ``` - /// use multiaddr::Multiaddr; - /// - /// let address = Multiaddr::new("/ip4/127.0.0.1/udp/1234").unwrap(); - /// assert_eq!(address.to_bytes(), [ - /// 4, 127, 0, 0, 1, - /// 17, 4, 210 - /// ]); - /// ``` - /// - #[deprecated(note = "Use `string.parse()` instead")] - pub fn new(input: &str) -> Result { - let mut bytes = Vec::new(); - - let mut parts = input.split('/'); - // A multiaddr must start with `/` - if !parts.next().ok_or(Error::InvalidMultiaddr)?.is_empty() { - return Err(Error::InvalidMultiaddr); - } - - while let Some(part) = parts.next() { - let protocol: ProtocolId = part.parse()?; - let addr_component = match protocol.size() { - ProtocolArgSize::Fixed { bytes: 0 } => { - protocol.parse_data("")? // TODO: bad design - }, - _ => { - let data = parts.next().ok_or(Error::MissingAddress)?; - protocol.parse_data(data)? - }, - }; - - addr_component.write_bytes(&mut bytes).expect("writing to a Vec never fails"); - } - - Ok(Multiaddr { bytes: bytes }) - } - /// Return a copy to disallow changing the bytes directly pub fn to_bytes(&self) -> Vec { self.bytes.to_owned() @@ -116,7 +71,7 @@ impl Multiaddr { /// ``` /// use multiaddr::{Multiaddr, ProtocolId}; /// - /// let address = Multiaddr::new("/ip4/127.0.0.1").unwrap(); + /// let address: Multiaddr = "/ip4/127.0.0.1".parse().unwrap(); /// assert_eq!(address.protocol(), vec![ProtocolId::IP4]); /// ``` /// @@ -133,9 +88,9 @@ impl Multiaddr { /// ``` /// use multiaddr::Multiaddr; /// - /// let address = Multiaddr::new("/ip4/127.0.0.1").unwrap(); + /// let address: Multiaddr = "/ip4/127.0.0.1".parse().unwrap(); /// let nested = address.encapsulate("/udt").unwrap(); - /// assert_eq!(nested, Multiaddr::new("/ip4/127.0.0.1/udt").unwrap()); + /// assert_eq!(nested, "/ip4/127.0.0.1/udt".parse().unwrap()); /// ``` /// pub fn encapsulate(&self, input: T) -> Result { @@ -154,9 +109,9 @@ impl Multiaddr { /// ``` /// use multiaddr::{Multiaddr, ToMultiaddr}; /// - /// let address = Multiaddr::new("/ip4/127.0.0.1/udt/sctp/5678").unwrap(); + /// let address: Multiaddr = "/ip4/127.0.0.1/udt/sctp/5678".parse().unwrap(); /// let unwrapped = address.decapsulate("/udt").unwrap(); - /// assert_eq!(unwrapped, Multiaddr::new("/ip4/127.0.0.1").unwrap()); + /// assert_eq!(unwrapped, "/ip4/127.0.0.1".parse().unwrap()); /// /// assert_eq!( /// address.decapsulate("/udt").unwrap(), @@ -281,8 +236,31 @@ impl FromStr for Multiaddr { type Err = Error; #[inline] - fn from_str(s: &str) -> Result { - Multiaddr::new(s) + fn from_str(input: &str) -> Result { + let mut bytes = Vec::new(); + + let mut parts = input.split('/'); + // A multiaddr must start with `/` + if !parts.next().ok_or(Error::InvalidMultiaddr)?.is_empty() { + return Err(Error::InvalidMultiaddr); + } + + while let Some(part) = parts.next() { + let protocol: ProtocolId = part.parse()?; + let addr_component = match protocol.size() { + ProtocolArgSize::Fixed { bytes: 0 } => { + protocol.parse_data("")? // TODO: bad design + }, + _ => { + let data = parts.next().ok_or(Error::MissingAddress)?; + protocol.parse_data(data)? + }, + }; + + addr_component.write_bytes(&mut bytes).expect("writing to a Vec never fails"); + } + + Ok(Multiaddr { bytes: bytes }) } } @@ -337,14 +315,14 @@ impl ToMultiaddr for SocketAddr { impl ToMultiaddr for SocketAddrV4 { fn to_multiaddr(&self) -> Result { - Multiaddr::new(&format!("/ip4/{}/tcp/{}", self.ip(), self.port())) + format!("/ip4/{}/tcp/{}", self.ip(), self.port()).parse() } } impl ToMultiaddr for SocketAddrV6 { fn to_multiaddr(&self) -> Result { // TODO: Should how should we handle `flowinfo` and `scope_id`? - Multiaddr::new(&format!("/ip6/{}/tcp/{}", self.ip(), self.port())) + format!("/ip6/{}/tcp/{}", self.ip(), self.port()).parse() } } @@ -359,25 +337,25 @@ impl ToMultiaddr for IpAddr { impl ToMultiaddr for Ipv4Addr { fn to_multiaddr(&self) -> Result { - Multiaddr::new(&format!("/ip4/{}", &self)) + format!("/ip4/{}", &self).parse() } } impl ToMultiaddr for Ipv6Addr { fn to_multiaddr(&self) -> Result { - Multiaddr::new(&format!("/ip6/{}", &self)) + format!("/ip6/{}", &self).parse() } } impl ToMultiaddr for String { fn to_multiaddr(&self) -> Result { - Multiaddr::new(self) + self.parse() } } impl<'a> ToMultiaddr for &'a str { fn to_multiaddr(&self) -> Result { - Multiaddr::new(self) + self.parse() } } diff --git a/rust-multiaddr/tests/lib.rs b/rust-multiaddr/tests/lib.rs index 6525e23f..fc4604d1 100644 --- a/rust-multiaddr/tests/lib.rs +++ b/rust-multiaddr/tests/lib.rs @@ -17,20 +17,20 @@ fn protocol_to_name() { fn assert_bytes(source: &str, target: &str, protocols: Vec) -> () { - let address = Multiaddr::new(source).unwrap(); + let address = source.parse::().unwrap(); assert_eq!(hex::encode(address.to_bytes().as_slice()), target); - assert_eq!(address.protocol(), protocols); + assert_eq!(address.iter().map(|addr| addr.protocol_id()).collect::>(), protocols); } fn ma_valid(source: &str, target: &str, protocols: Vec) -> () { assert_bytes(source, target, protocols); - assert_eq!(Multiaddr::new(source).unwrap().to_string(), source); + assert_eq!(source.parse::().unwrap().to_string(), source); } #[test] fn multiaddr_eq() { - let m1 = Multiaddr::new("/ip4/127.0.0.1/udp/1234").unwrap(); - let m2 = Multiaddr::new("/ip4/127.0.0.1/tcp/1234").unwrap(); - let m3 = Multiaddr::new("/ip4/127.0.0.1/tcp/1234").unwrap(); + let m1 = "/ip4/127.0.0.1/udp/1234".parse::().unwrap(); + let m2 = "/ip4/127.0.0.1/tcp/1234".parse::().unwrap(); + let m3 = "/ip4/127.0.0.1/tcp/1234".parse::().unwrap(); assert_ne!(m1, m2); assert_ne!(m2, m1); @@ -135,7 +135,7 @@ fn construct_fail() { "/p2p-circuit/50"]; for address in &addresses { - assert!(Multiaddr::new(address).is_err(), address.to_string()); + assert!(address.parse::().is_err(), address.to_string()); } } @@ -143,17 +143,17 @@ fn construct_fail() { #[test] fn to_multiaddr() { assert_eq!(Ipv4Addr::new(127, 0, 0, 1).to_multiaddr().unwrap(), - Multiaddr::new("/ip4/127.0.0.1").unwrap()); + "/ip4/127.0.0.1".parse::().unwrap()); assert_eq!(Ipv6Addr::new(0x2601, 0x9, 0x4f81, 0x9700, 0x803e, 0xca65, 0x66e8, 0xc21) .to_multiaddr() .unwrap(), - Multiaddr::new("/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21").unwrap()); + "/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21".parse::().unwrap()); assert_eq!("/ip4/127.0.0.1/tcp/1234".to_string().to_multiaddr().unwrap(), - Multiaddr::new("/ip4/127.0.0.1/tcp/1234").unwrap()); + "/ip4/127.0.0.1/tcp/1234".parse::().unwrap()); assert_eq!("/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21".to_multiaddr().unwrap(), - Multiaddr::new("/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21").unwrap()); + "/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21".parse::().unwrap()); assert_eq!(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 1234).to_multiaddr().unwrap(), - Multiaddr::new("/ip4/127.0.0.1/tcp/1234").unwrap()); + "/ip4/127.0.0.1/tcp/1234".parse::().unwrap()); assert_eq!(SocketAddrV6::new(Ipv6Addr::new(0x2601, 0x9, 0x4f81, @@ -167,5 +167,5 @@ fn to_multiaddr() { 0) .to_multiaddr() .unwrap(), - Multiaddr::new("/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/tcp/1234").unwrap()); + "/ip6/2601:9:4f81:9700:803e:ca65:66e8:c21/tcp/1234".parse::().unwrap()); }