diff --git a/.circleci/config.yml b/.circleci/config.yml index 9e15501c..3eaea96b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -105,10 +105,6 @@ jobs: - checkout - restore_cache: key: integration-test-cache - - run: - command: cargo run --example ping-client -- /ip4/127.0.0.1/tcp/4001 - - run: - command: cargo run --example echo-dialer -- /ip4/127.0.0.1/tcp/10333 - save_cache: key: integration-test-cache paths: diff --git a/Cargo.toml b/Cargo.toml index 14bd9423..0d95e231 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,28 +47,6 @@ tokio-current-thread = "0.1" tokio-io = "0.1" tokio-stdin = "0.1" -[[example]] -name = "echo-dialer" - -[[example]] -name = "echo-server" - -[[example]] -name = "floodsub" - -[[example]] -name = "kademlia" - -[[example]] -name = "ping-client" - -[[example]] -name = "random_peerid" - -[[example]] -name = "relay" - - [workspace] members = [ "core", diff --git a/core/src/connection_reuse.rs b/core/src/connection_reuse.rs deleted file mode 100644 index 68287dc0..00000000 --- a/core/src/connection_reuse.rs +++ /dev/null @@ -1,816 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Contains the `ConnectionReuse` struct. Stores open muxed connections to nodes so that dialing -//! a node reuses the same connection instead of opening a new one. -//! -//! A `ConnectionReuse` can only be created from an `UpgradedNode` whose `ConnectionUpgrade` -//! yields as `StreamMuxer`. -//! -//! # Behaviour -//! -//! The API exposed by the `ConnectionReuse` struct consists in the `Transport` trait -//! implementation, with the `dial` and `listen_on` methods. -//! -//! When called on a `ConnectionReuse`, the `listen_on` method will listen on the given -//! multiaddress (by using the underlying `Transport`), then will apply a `flat_map` on the -//! incoming connections so that we actually listen to the incoming substreams of each connection. -//! -//! When called on a `ConnectionReuse`, the `dial` method will try to use a connection that has -//! already been opened earlier, and open an outgoing substream on it. If none is available, it -//! will dial the given multiaddress. Dialed node can also spontaneously open new substreams with -//! us. In order to handle these new substreams you should use the `next_incoming` method of the -//! `MuxedTransport` trait. - -use fnv::FnvHashMap; -use futures::future::{self, FutureResult}; -use futures::{Async, Future, Poll, Stream, stream, task}; -use futures::stream::FuturesUnordered; -use multiaddr::Multiaddr; -use muxing::{self, StreamMuxer}; -use parking_lot::Mutex; -use std::collections::hash_map::Entry; -use std::io::{Error as IoError, ErrorKind as IoErrorKind, Read, Write}; -use std::mem; -use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, atomic::AtomicUsize, atomic::Ordering}; -use tokio_io::{AsyncRead, AsyncWrite}; -use transport::{MuxedTransport, Transport}; - -/// Allows reusing the same muxed connection multiple times. -/// -/// Can be created from an `UpgradedNode` through the `From` trait. -/// -/// Implements the `Transport` trait. -pub struct ConnectionReuse -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - /// Struct shared between most of the `ConnectionReuse` infrastructure. - shared: Arc>>, -} - -impl Clone for ConnectionReuse -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - #[inline] - fn clone(&self) -> Self { - ConnectionReuse { - shared: self.shared.clone(), - } - } -} - -/// Struct shared between most of the `ConnectionReuse` infrastructure. -struct Shared -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - /// Underlying transport and connection upgrade, used when we need to dial or listen. - transport: T, - - /// All the connections that were opened, whether successful and/or active or not. - // TODO: this will grow forever - connections: FnvHashMap>, - - /// Tasks to notify when one or more new elements were added to `connections`. - notify_on_new_connec: FnvHashMap, - - /// Next `connection_id` to use when opening a connection. - next_connection_id: u64, - - /// Next `listener_id` for the next listener we create. - next_listener_id: u64, -} - -enum PeerState where M: StreamMuxer { - /// Connection is active and can be used to open substreams. - Active { - /// The muxer to open new substreams. - muxer: Arc, - /// Custom data passed to the output. - custom_data: D, - /// Future of the address of the client. - client_addr: Multiaddr, - /// Unique identifier for this connection in the `ConnectionReuse`. - connection_id: u64, - /// Number of open substreams. - num_substreams: u64, - /// Id of the listener that created this connection, or `None` if it was opened by a - /// dialer. - listener_id: Option, - }, - - /// Connection is pending. - // TODO: stronger Future type - Pending { - /// Future that produces the muxer. - future: Box + Send>, - /// All the tasks to notify when `future` resolves. - notify: FnvHashMap, - }, - - /// An earlier connection attempt errored. - Errored(IoError), - - /// The `PeerState` is poisonned. Happens if a panic happened while executing some of the - /// functions. - Poisonned, -} - -impl ConnectionReuse -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - #[inline] - pub(crate) fn new(node: T) -> ConnectionReuse { - ConnectionReuse { - shared: Arc::new(Mutex::new(Shared { - transport: node, - connections: Default::default(), - notify_on_new_connec: Default::default(), - next_connection_id: 0, - next_listener_id: 0, - })), - } - } -} - -impl Transport for ConnectionReuse -where - T: Transport + Send + 'static, // TODO: 'static :( - T::Dial: Send, - T::MultiaddrFuture: Send, - T::Listener: Send, - T::ListenerUpgrade: Send, - T: Transport + Clone + 'static, // TODO: 'static :( - M: Send + Sync + StreamMuxer + 'static, - D: Send + Clone + 'static, - T: Clone, -{ - type Output = (D, ConnectionReuseSubstream); - type MultiaddrFuture = future::FutureResult; - type Listener = Box + Send>; - type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; - type Dial = ConnectionReuseDial; - - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - let mut shared = self.shared.lock(); - - let (listener, new_addr) = match shared.transport.clone().listen_on(addr.clone()) { - Ok((l, a)) => (l, a), - Err((_, addr)) => { - return Err(( - ConnectionReuse { - shared: self.shared.clone(), - }, - addr, - )); - } - }; - - let listener = listener - .map(|upgr| { - upgr.and_then(|(out, addr)| { - trace!("Waiting for remote's address as listener"); - addr.map(move |addr| (out, addr)) - }) - }) - .fuse(); - - let listener_id = shared.next_listener_id; - shared.next_listener_id += 1; - - let listener = ConnectionReuseListener { - shared: self.shared.clone(), - listener, - listener_id, - current_upgrades: FuturesUnordered::new(), - }; - - Ok((Box::new(listener) as Box<_>, new_addr)) - } - - #[inline] - fn dial(self, addr: Multiaddr) -> Result { - let mut shared = self.shared.lock(); - - // If an earlier attempt to dial this multiaddress failed, we clear the error. Otherwise - // the returned `Future` will immediately produce the error. - let must_clear = match shared.connections.get(&addr) { - Some(&PeerState::Errored(ref err)) => { - trace!("Clearing existing connection to {} which errored earlier: {:?}", addr, err); - true - }, - _ => false, - }; - if must_clear { - shared.connections.remove(&addr); - } - - Ok(ConnectionReuseDial { - outbound: None, - shared: self.shared.clone(), - addr, - }) - } - - #[inline] - fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.shared.lock().transport.nat_traversal(server, observed) - } -} - -impl MuxedTransport for ConnectionReuse -where - T: Transport + Send + 'static, // TODO: 'static :( - T::Dial: Send, - T::MultiaddrFuture: Send, - T::Listener: Send, - T::ListenerUpgrade: Send, - T: Transport + Clone + 'static, // TODO: 'static :( - M: Send + Sync + StreamMuxer + 'static, - D: Send + Clone + 'static, - T: Clone, -{ - type Incoming = ConnectionReuseIncoming; - type IncomingUpgrade = - future::FutureResult<((D, ConnectionReuseSubstream), Self::MultiaddrFuture), IoError>; - - #[inline] - fn next_incoming(self) -> Self::Incoming { - ConnectionReuseIncoming { - shared: self.shared.clone(), - } - } -} - -static NEXT_TASK_ID: AtomicUsize = AtomicUsize::new(0); -// `TASK_ID` is used internally to uniquely identify each task. -task_local!{ - static TASK_ID: usize = NEXT_TASK_ID.fetch_add(1, Ordering::Relaxed) -} - -/// Implementation of `Future` for dialing a node. -pub struct ConnectionReuseDial -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - /// The future that will construct the substream, the connection id the muxer comes from, and - /// the `Future` of the client's multiaddr. - /// If `None`, we need to grab a new outbound substream from the muxer. - outbound: Option>, - - // Shared between the whole connection reuse mechanism. - shared: Arc>>, - - // The address we're trying to dial. - addr: Multiaddr, -} - -struct ConnectionReuseDialOut -where - M: StreamMuxer, -{ - /// Custom data for the connection. - custom_data: D, - /// The pending outbound substream. - stream: muxing::OutboundSubstreamRefWrapFuture>, - /// Id of the connection that was used to create the substream. - connection_id: u64, - /// Address of the remote. - client_addr: Multiaddr, -} - -impl Future for ConnectionReuseDial -where - T: Transport + Clone, - M: Send + StreamMuxer + 'static, - D: Send + Clone + 'static, - ::Dial: Send + 'static, - ::MultiaddrFuture: Send + 'static, -{ - type Item = ((D, ConnectionReuseSubstream), FutureResult); - type Error = IoError; - - fn poll(&mut self) -> Poll { - loop { - let should_kill_existing_muxer; - if let Some(mut outbound) = self.outbound.take() { - match outbound.stream.poll() { - Ok(Async::Ready(Some(inner))) => { - trace!("Opened new outgoing substream to {}", self.addr); - let substream = ConnectionReuseSubstream { - connection_id: outbound.connection_id, - shared: self.shared.clone(), - inner, - addr: outbound.client_addr.clone(), - }; - return Ok(Async::Ready(((outbound.custom_data, substream), future::ok(outbound.client_addr)))); - }, - Ok(Async::NotReady) => { - self.outbound = Some(outbound); - return Ok(Async::NotReady); - }, - Ok(Async::Ready(None)) => { - // The muxer can no longer produce outgoing substreams. - // Let's reopen a connection. - trace!("Closing existing connection to {} ; can't produce outgoing substreams", self.addr); - should_kill_existing_muxer = true; - }, - Err(err) => { - // If we get an error while opening a substream, we decide to ignore it - // and open a new muxer. - // If opening the muxer produces an error, *then* we will return it. - debug!("Error while opening outgoing substream to {}: {:?}", self.addr, err); - should_kill_existing_muxer = true; - }, - } - } else { - should_kill_existing_muxer = false; - } - - // If we reach this point, that means we have to fill `self.outbound`. - // If `should_kill_existing_muxer`, do not use any existing connection but create a - // new one instead. - let mut shared = self.shared.lock(); - let shared = &mut *shared; // Avoids borrow errors - - // TODO: could be optimized - if should_kill_existing_muxer { - shared.connections.remove(&self.addr); - } - let connec = match shared.connections.entry(self.addr.clone()) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => { - // Build the connection. - let state = match shared.transport.clone().dial(self.addr.clone()) { - Ok(future) => { - trace!("Opened new connection to {:?}", self.addr); - let future = future.and_then(|(out, addr)| addr.map(move |a| (out, a))); - let future = Box::new(future); - PeerState::Pending { future, notify: Default::default() } - }, - Err(_) => { - trace!("Failed to open connection to {:?}, multiaddr not supported", self.addr); - let err = IoError::new(IoErrorKind::ConnectionRefused, "multiaddr not supported"); - PeerState::Errored(err) - }, - }; - - for task in shared.notify_on_new_connec.drain() { - task.1.notify(); - } - - e.insert(state) - }, - }; - - match mem::replace(&mut *connec, PeerState::Poisonned) { - PeerState::Active { muxer, custom_data, connection_id, listener_id, mut num_substreams, client_addr } => { - let outbound = muxing::outbound_from_ref_and_wrap(muxer.clone()); - num_substreams += 1; - *connec = PeerState::Active { muxer, custom_data: custom_data.clone(), connection_id, listener_id, num_substreams, client_addr: client_addr.clone() }; - trace!("Using existing connection to {} to open outbound substream", self.addr); - self.outbound = Some(ConnectionReuseDialOut { - custom_data, - stream: outbound, - connection_id, - client_addr, - }); - }, - PeerState::Pending { mut future, mut notify } => { - match future.poll() { - Ok(Async::Ready(((custom_data, muxer), client_addr))) => { - trace!("Successful new connection to {} ({})", self.addr, client_addr); - for task in notify { - task.1.notify(); - } - let muxer = Arc::new(muxer); - let first_outbound = muxing::outbound_from_ref_and_wrap(muxer.clone()); - let connection_id = shared.next_connection_id; - shared.next_connection_id += 1; - *connec = PeerState::Active { muxer, custom_data: custom_data.clone(), connection_id, num_substreams: 1, listener_id: None, client_addr: client_addr.clone() }; - self.outbound = Some(ConnectionReuseDialOut { - custom_data, - stream: first_outbound, - connection_id, - client_addr, - }); - }, - Ok(Async::NotReady) => { - notify.insert(TASK_ID.with(|&t| t), task::current()); - *connec = PeerState::Pending { future, notify }; - return Ok(Async::NotReady); - }, - Err(err) => { - trace!("Failed new connection to {}: {:?}", self.addr, err); - let io_err = IoError::new(err.kind(), err.to_string()); - *connec = PeerState::Errored(err); - return Err(io_err); - }, - } - }, - PeerState::Errored(err) => { - trace!("Existing new connection to {} errored earlier: {:?}", self.addr, err); - let io_err = IoError::new(err.kind(), err.to_string()); - *connec = PeerState::Errored(err); - return Err(io_err); - }, - PeerState::Poisonned => { - panic!("Poisonned peer state"); - }, - } - } - } -} - -impl Drop for ConnectionReuseDial -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - fn drop(&mut self) { - if let Some(outbound) = self.outbound.take() { - let mut shared = self.shared.lock(); - remove_one_substream(&mut *shared, outbound.connection_id, &outbound.client_addr); - } - } -} - -/// Implementation of `Stream` for the connections incoming from listening on a specific address. -pub struct ConnectionReuseListener -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - /// The main listener. - listener: stream::Fuse, - /// Identifier for this listener. Used to determine which connections were opened by it. - listener_id: u64, - /// Opened connections that need to be upgraded. - current_upgrades: FuturesUnordered + Send>>, - - /// Shared between the whole connection reuse mechanism. - shared: Arc>>, -} - -impl Stream for ConnectionReuseListener -where - T: Transport, - T: Transport, - M: StreamMuxer, - D: Clone, - L: Stream, - Lu: Future + Send + 'static, -{ - type Item = FutureResult<((D, ConnectionReuseSubstream), FutureResult), IoError>; - type Error = IoError; - - fn poll(&mut self) -> Poll, Self::Error> { - // Check for any incoming connection on the listening socket. - // Note that since `self.listener` is a `Fuse`, it's not a problem to continue polling even - // after it is finished or after it error'ed. - loop { - match self.listener.poll() { - Ok(Async::Ready(Some(upgrade))) => { - trace!("New incoming connection"); - self.current_upgrades.push(Box::new(upgrade)); - } - Ok(Async::NotReady) => break, - Ok(Async::Ready(None)) => { - debug!("Listener has been closed"); - break; - } - Err(err) => { - debug!("Error while polling listener: {:?}", err); - return Err(err); - } - }; - } - - // Process the connections being upgraded. - loop { - match self.current_upgrades.poll() { - Ok(Async::Ready(Some(((custom_data, muxer), client_addr)))) => { - // Successfully upgraded a new incoming connection. - trace!("New multiplexed connection from {}", client_addr); - let mut shared = self.shared.lock(); - let muxer = Arc::new(muxer); - let connection_id = shared.next_connection_id; - shared.next_connection_id += 1; - let state = PeerState::Active { muxer, custom_data, connection_id, listener_id: Some(self.listener_id), num_substreams: 1, client_addr: client_addr.clone() }; - shared.connections.insert(client_addr, state); - for to_notify in shared.notify_on_new_connec.drain() { - to_notify.1.notify(); - } - } - Ok(Async::Ready(None)) | Ok(Async::NotReady) => { - break; - }, - Err(err) => { - // Insert the rest of the pending upgrades, but not the current one. - debug!("Error while upgrading listener connection: {:?}", err); - return Ok(Async::Ready(Some(future::err(err)))); - } - } - } - - // Poll all the incoming connections on all the connections we opened. - let mut shared = self.shared.lock(); - match poll_incoming(&self.shared, &mut shared, Some(self.listener_id)) { - Ok(Async::Ready(None)) => { - if self.listener.is_done() && self.current_upgrades.is_empty() { - Ok(Async::Ready(None)) - } else { - Ok(Async::NotReady) - } - }, - Ok(Async::Ready(Some(substream))) => { - Ok(Async::Ready(Some(substream))) - }, - Ok(Async::NotReady) => { - Ok(Async::NotReady) - } - Err(err) => { - Ok(Async::Ready(Some(future::err(err)))) - } - } - } -} - -/// Implementation of `Future` that yields the next incoming substream from a dialed connection. -#[must_use = "futures do nothing unless polled"] -pub struct ConnectionReuseIncoming -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - // Shared between the whole connection reuse system. - shared: Arc>>, -} - -impl Future for ConnectionReuseIncoming -where - T: Transport, - T: Transport, - M: StreamMuxer, - D: Clone, -{ - type Item = future::FutureResult<((D, ConnectionReuseSubstream), future::FutureResult), IoError>; - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll { - let mut shared = self.shared.lock(); - match poll_incoming(&self.shared, &mut shared, None) { - Ok(Async::Ready(Some(substream))) => { - Ok(Async::Ready(substream)) - }, - Ok(Async::Ready(None)) | Ok(Async::NotReady) => { - // TODO: will add an element to the list every time - shared.notify_on_new_connec.insert(TASK_ID.with(|&v| v), task::current()); - Ok(Async::NotReady) - }, - Err(err) => Err(err) - } - } -} - -/// Polls the incoming substreams on all the incoming connections that match the `listener`. -/// -/// Returns `Ready(None)` if no connection is matching the `listener`. Returns `NotReady` if -/// one or more connections are matching the `listener` but they are not ready. -fn poll_incoming(shared_arc: &Arc>>, shared: &mut Shared, listener: Option) - -> Poll), FutureResult), IoError>>, IoError> -where - T: Transport, - T: Transport, - M: StreamMuxer, - D: Clone, -{ - // Keys of the elements in `shared.connections` to remove afterwards. - let mut to_remove = Vec::new(); - // Substream to return, if any found. - let mut ret_value = None; - let mut found_one = false; - - for (addr, state) in shared.connections.iter_mut() { - match *state { - PeerState::Active { ref custom_data, ref muxer, ref mut num_substreams, connection_id, ref client_addr, listener_id } => { - if listener_id != listener { - continue; - } - found_one = true; - - match muxer.poll_inbound() { - Ok(Async::Ready(Some(inner))) => { - trace!("New incoming substream from {}", client_addr); - *num_substreams += 1; - let substream = ConnectionReuseSubstream { - inner: muxing::substream_from_ref(muxer.clone(), inner), - shared: shared_arc.clone(), - connection_id, - addr: client_addr.clone(), - }; - ret_value = Some(Ok(((custom_data.clone(), substream), future::ok(client_addr.clone())))); - break; - }, - Ok(Async::Ready(None)) => { - // The muxer isn't capable of opening any inbound stream anymore, so - // we close the connection entirely. - trace!("Removing existing connection to {} as it cannot open inbound anymore", addr); - to_remove.push(addr.clone()); - }, - Ok(Async::NotReady) => (), - Err(err) => { - // If an error happens while opening an inbound stream, we close the - // connection entirely. - trace!("Error while opening inbound substream to {}: {:?}", addr, err); - to_remove.push(addr.clone()); - ret_value = Some(Err(err)); - break; - }, - } - }, - PeerState::Pending { ref mut notify, .. } => { - notify.insert(TASK_ID.with(|&t| t), task::current()); - }, - PeerState::Errored(_) => {}, - PeerState::Poisonned => { - panic!("Poisonned peer state"); - }, - } - } - - for to_remove in to_remove { - shared.connections.remove(&to_remove); - } - - match ret_value { - Some(Ok(val)) => Ok(Async::Ready(Some(future::ok(val)))), - Some(Err(err)) => Err(err), - None => { - if found_one { - Ok(Async::NotReady) - } else { - Ok(Async::Ready(None)) - } - }, - } -} - -/// Removes one substream from an active connection. Closes the connection if necessary. -fn remove_one_substream(shared: &mut Shared, connec_id: u64, addr: &Multiaddr) -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - shared.connections.retain(|_, connec| { - if let PeerState::Active { connection_id, ref mut num_substreams, .. } = connec { - if *connection_id == connec_id { - *num_substreams -= 1; - if *num_substreams == 0 { - trace!("All substreams to {} closed ; closing main connection", addr); - return false; - } - } - } - - true - }); -} - -/// Wraps around the `Substream`. -pub struct ConnectionReuseSubstream -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - inner: muxing::SubstreamRef>, - shared: Arc>>, - /// Id this connection was created from. - connection_id: u64, - /// Address of the remote. - addr: Multiaddr, -} - -impl Deref for ConnectionReuseSubstream -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - type Target = muxing::SubstreamRef>; - - #[inline] - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl DerefMut for ConnectionReuseSubstream -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - #[inline] - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -impl Read for ConnectionReuseSubstream -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - #[inline] - fn read(&mut self, buf: &mut [u8]) -> Result { - self.inner.read(buf) - } -} - -impl AsyncRead for ConnectionReuseSubstream -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ -} - -impl Write for ConnectionReuseSubstream -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - #[inline] - fn write(&mut self, buf: &[u8]) -> Result { - self.inner.write(buf) - } - - #[inline] - fn flush(&mut self) -> Result<(), IoError> { - self.inner.flush() - } -} - -impl AsyncWrite for ConnectionReuseSubstream -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - #[inline] - fn shutdown(&mut self) -> Poll<(), IoError> { - self.inner.shutdown() - } -} - -impl Drop for ConnectionReuseSubstream -where - T: Transport, - T: Transport, - M: StreamMuxer, -{ - fn drop(&mut self) { - let mut shared = self.shared.lock(); - remove_one_substream(&mut *shared, self.connection_id, &self.addr); - } -} diff --git a/core/src/lib.rs b/core/src/lib.rs index 469b09b4..00292c3a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -164,47 +164,6 @@ //! also implements the `ConnectionUpgrade` trait and will choose one of the protocols amongst the //! ones supported. //! -//! # Swarm -//! -//! Once you have created an object that implements the `Transport` trait, you can put it in a -//! *swarm*. This is done by calling the `swarm()` freestanding function with the transport -//! alongside with a function or a closure that will turn the output of the upgrade (usually an -//! actual protocol, as explained above) into a `Future` producing `()`. -//! -//! ```no_run -//! extern crate futures; -//! extern crate libp2p_ping; -//! extern crate libp2p_core; -//! extern crate libp2p_tcp_transport; -//! extern crate tokio_current_thread; -//! -//! use futures::{Future, Stream}; -//! use libp2p_ping::{Ping, PingOutput}; -//! use libp2p_core::Transport; -//! -//! # fn main() { -//! let transport = libp2p_tcp_transport::TcpConfig::new() -//! .with_dummy_muxing(); -//! -//! let (swarm_controller, swarm_future) = libp2p_core::swarm(transport.with_upgrade(Ping::default()), -//! |out, client_addr| { -//! match out { -//! PingOutput::Ponger(processing) => Box::new(processing) as Box>, -//! PingOutput::Pinger(mut pinger) => { -//! pinger.ping(()); -//! let f = pinger.into_future().map(|_| ()).map_err(|(err, _)| err); -//! Box::new(f) as Box> -//! }, -//! } -//! }); -//! -//! // The `swarm_controller` can then be used to do some operations. -//! swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); -//! -//! // Runs until everything is finished. -//! tokio_current_thread::block_on_all(swarm_future.for_each(|_| Ok(()))).unwrap(); -//! # } -//! ``` extern crate bs58; extern crate bytes; @@ -244,11 +203,9 @@ extern crate tokio_mock_task; /// Multi-address re-export. pub extern crate multiaddr; -mod connection_reuse; mod keys_proto; mod peer_id; mod public_key; -mod unique; #[cfg(test)] mod tests; @@ -256,16 +213,12 @@ mod tests; pub mod either; pub mod muxing; pub mod nodes; -pub mod swarm; pub mod transport; pub mod upgrade; -pub use self::connection_reuse::ConnectionReuse; pub use self::multiaddr::Multiaddr; pub use self::muxing::StreamMuxer; pub use self::peer_id::PeerId; pub use self::public_key::PublicKey; -pub use self::swarm::{swarm, SwarmController, SwarmEvents}; pub use self::transport::{MuxedTransport, Transport}; -pub use self::unique::{UniqueConnec, UniqueConnecFuture, UniqueConnecState}; pub use self::upgrade::{ConnectionUpgrade, Endpoint}; diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index 309dee4a..14d53111 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -190,91 +190,3 @@ impl Into>> for Chan { RwStreamSink::new(self) } } - -#[cfg(test)] -mod tests { - use bytes::Bytes; - use futures::{future::{self, Either, Loop}, prelude::*, sync::mpsc}; - use std::{io, iter}; - use {transport::memory, swarm, ConnectionUpgrade, Endpoint, Transport}; - use tokio_codec::{BytesCodec, Framed}; - use tokio_current_thread; - - #[test] - fn echo() { - #[derive(Clone)] - struct Echo(mpsc::UnboundedSender<()>); - - impl ConnectionUpgrade, Maf> for Echo { - type NamesIter = iter::Once<(Bytes, ())>; - type UpgradeIdentifier = (); - type Output = (); - type MultiaddrFuture = Maf; - type Future = Box + Send>; - - fn protocol_names(&self) -> Self::NamesIter { - iter::once(("/echo/1.0.0".into(), ())) - } - - fn upgrade(self, chan: memory::Channel, _: (), e: Endpoint, maf: Maf) -> Self::Future { - let chan = Framed::new(chan, BytesCodec::new()); - match e { - Endpoint::Listener => { - let future = future::loop_fn(chan, move |chan| { - chan.into_future() - .map_err(|(e, _)| e) - .and_then(move |(msg, chan)| { - if let Some(msg) = msg { - println!("listener received: {:?}", msg); - Either::A(chan.send(msg.freeze()).map(Loop::Continue)) - } else { - println!("listener received EOF at destination"); - Either::B(future::ok(Loop::Break(()))) - } - }) - }); - Box::new(future.map(move |()| ((), maf))) as Box<_> - } - Endpoint::Dialer => { - let future = chan.send("hello world".into()) - .and_then(|chan| { - chan.into_future().map_err(|(e, _)| e).map(|(n,_ )| n) - }) - .and_then(|msg| { - println!("dialer received: {:?}", msg.unwrap()); - self.0.send(()) - .map(|_| ()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - }); - Box::new(future.map(move |()| ((), maf))) as Box<_> - } - } - } - } - - let (finish_tx, finish_rx) = mpsc::unbounded(); - let echo = Echo(finish_tx); - - let (dialer, listener) = memory::connector(); - - let dialer = dialer.with_dummy_muxing().with_upgrade(echo.clone()); - let listener = listener.with_dummy_muxing().with_upgrade(echo); - - let (control, future) = swarm(listener, |sock, _addr| Ok(sock)); - - control.listen_on("/memory".parse().expect("/memory is a valid multiaddr")).unwrap(); - control.dial("/memory".parse().expect("/memory is a valid multiaddr"), dialer).unwrap(); - - let finish_rx = finish_rx.into_future() - .map(|_| ()) - .map_err(|((), _)| io::Error::new(io::ErrorKind::Other, "receive error")); - - let future = future - .for_each(|_| Ok(())) - .select(finish_rx) - .map(|_| ()) - .map_err(|(e, _)| e); - - tokio_current_thread::block_on_all(future).unwrap(); - } -} diff --git a/core/src/transport/mod.rs b/core/src/transport/mod.rs index 77354f20..7679e3f0 100644 --- a/core/src/transport/mod.rs +++ b/core/src/transport/mod.rs @@ -29,10 +29,8 @@ //! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades //! together in a complex chain of protocols negotiation. -use connection_reuse::ConnectionReuse; use futures::prelude::*; use multiaddr::Multiaddr; -use muxing::StreamMuxer; use std::io::Error as IoError; use tokio_io::{AsyncRead, AsyncWrite}; use upgrade::{ConnectionUpgrade, Endpoint}; @@ -243,17 +241,6 @@ pub trait Transport { DummyMuxing::new(self) } - /// Turns this `Transport` into a `ConnectionReuse`. If the `Output` implements the - /// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`. - #[inline] - fn into_connection_reuse(self) -> ConnectionReuse - where - Self: Sized + Transport, - M: StreamMuxer, - { - ConnectionReuse::new(self) - } - /// Wraps around the `Transport` and makes it interruptible. #[inline] fn interruptible(self) -> (interruptible::Interruptible, interruptible::Interrupt) diff --git a/core/tests/lots_of_connec.rs b/core/tests/lots_of_connec.rs deleted file mode 100644 index 99a69a9e..00000000 --- a/core/tests/lots_of_connec.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -extern crate bytes; -extern crate futures; -extern crate libp2p_mplex as multiplex; -extern crate libp2p_core; -extern crate libp2p_tcp_transport; -extern crate rand; -extern crate tokio_current_thread; -extern crate tokio_io; - -use futures::{future, future::Future, Stream}; -use libp2p_core::Transport; -use libp2p_tcp_transport::TcpConfig; -use std::sync::{atomic, Arc}; - -#[test] -fn lots_of_swarms() { - let transport = TcpConfig::new().with_dummy_muxing(); - - let mut swarm_controllers = Vec::new(); - let mut swarm_futures = Vec::new(); - - let num_established = Arc::new(atomic::AtomicUsize::new(0)); - - for _ in 0 .. 200 + rand::random::() % 100 { - let esta = num_established.clone(); - let (ctrl, fut) = libp2p_core::swarm( - transport.clone(), - move |socket, _| { - esta.fetch_add(1, atomic::Ordering::SeqCst); - future::ok(socket).join(future::empty::<(), _>()).map(|_| ()) - } - ); - - swarm_controllers.push(ctrl); - swarm_futures.push(fut.for_each(|_| Ok(()))); - } - - let mut addresses = Vec::new(); - for ctrl in &swarm_controllers { - addresses.push(ctrl.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap()); - } - - let mut dial_fut = Vec::new(); - let num_to_establish = 150 + rand::random::() % 150; - for _ in 0 .. num_to_establish { - let ctrl = swarm_controllers.get(rand::random::() % swarm_controllers.len()).unwrap(); - let to_dial = addresses.get(rand::random::() % addresses.len()).unwrap(); - dial_fut.push(ctrl.dial(to_dial.clone(), transport.clone()).unwrap()); - } - - let select_swarm = future::select_all(swarm_futures).map(|_| ()).map_err(|(err, _, _)| err); - let select_dial = future::join_all(dial_fut).map(|_| ()); - let combined = select_swarm.select(select_dial).map(|_| ()).map_err(|(err, _)| err); - tokio_current_thread::block_on_all(combined).unwrap(); - - assert_eq!(num_established.load(atomic::Ordering::SeqCst), num_to_establish * 2); -} diff --git a/core/tests/multiplex.rs b/core/tests/multiplex.rs deleted file mode 100644 index 989216cc..00000000 --- a/core/tests/multiplex.rs +++ /dev/null @@ -1,261 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -extern crate bytes; -extern crate futures; -extern crate libp2p_mplex as multiplex; -extern crate libp2p_core; -extern crate libp2p_tcp_transport; -extern crate tokio_current_thread; -extern crate tokio_io; - -use bytes::BytesMut; -use futures::future::Future; -use futures::{Sink, Stream}; -use libp2p_core::{muxing, Multiaddr, MuxedTransport, Transport, transport}; -use std::sync::{atomic, Arc}; -use std::thread; -use tokio_io::codec::length_delimited::Framed; - -// Ensures that a transport is only ever used once for dialing. -#[derive(Debug)] -struct OnlyOnce(T, atomic::AtomicBool); -impl From for OnlyOnce { - fn from(tp: T) -> OnlyOnce { - OnlyOnce(tp, atomic::AtomicBool::new(false)) - } -} -impl Clone for OnlyOnce { - fn clone(&self) -> Self { - OnlyOnce( - self.0.clone(), - atomic::AtomicBool::new(self.1.load(atomic::Ordering::SeqCst)), - ) - } -} -impl Transport for OnlyOnce { - type Output = T::Output; - type MultiaddrFuture = T::MultiaddrFuture; - type Listener = T::Listener; - type ListenerUpgrade = T::ListenerUpgrade; - type Dial = T::Dial; - fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { - Ok(self.0.listen_on(addr).unwrap_or_else(|_| panic!())) - } - fn dial(self, addr: Multiaddr) -> Result { - assert!(!self.1.swap(true, atomic::Ordering::SeqCst)); - Ok(self.0.dial(addr).unwrap_or_else(|_| panic!())) - } - fn nat_traversal(&self, a: &Multiaddr, b: &Multiaddr) -> Option { - self.0.nat_traversal(a, b) - } -} - -#[test] -fn client_to_server_outbound() { - // A client opens a connection to a server, then an outgoing substream, then sends a message - // on that substream. - - let (tx, rx) = transport::connector(); - - let bg_thread = thread::spawn(move || { - let future = rx - .with_upgrade(multiplex::MplexConfig::new()) - .map(|val, _| ((), val)) - .into_connection_reuse() - .map(|((), val), _| val) - .listen_on("/memory".parse().unwrap()) - .unwrap_or_else(|_| panic!()).0 - .into_future() - .map_err(|(err, _)| err) - .and_then(|(client, _)| client.unwrap()) - .map(|client| client.0) - .map(|client| Framed::<_, BytesMut>::new(client)) - .and_then(|client| { - client - .into_future() - .map_err(|(err, _)| err) - .map(|(msg, _)| msg) - }) - .and_then(|msg| { - let msg = msg.unwrap(); - assert_eq!(msg, "hello world"); - Ok(()) - }); - - tokio_current_thread::block_on_all(future).unwrap(); - }); - - let future = tx - .with_upgrade(multiplex::MplexConfig::new()) - .dial("/memory".parse().unwrap()) - .unwrap_or_else(|_| panic!()) - .and_then(|client| muxing::outbound_from_ref_and_wrap(Arc::new(client.0))) - .map(|server| Framed::<_, BytesMut>::new(server.unwrap())) - .and_then(|server| server.send("hello world".into())) - .map(|_| ()); - - tokio_current_thread::block_on_all(future).unwrap(); - bg_thread.join().unwrap(); -} - -#[test] -fn connection_reused_for_dialing() { - // A client dials the same multiaddress twice in a row. We check that it uses two substreams - // instead of opening two different connections. - - let (tx, rx) = transport::connector(); - - let bg_thread = thread::spawn(move || { - let future = OnlyOnce::from(rx) - .with_upgrade(multiplex::MplexConfig::new()) - .map(|val, _| ((), val)) - .into_connection_reuse() - .map(|((), val), _| val) - .listen_on("/memory".parse().unwrap()) - .unwrap_or_else(|_| panic!()).0 - .into_future() - .map_err(|(err, _)| err) - .and_then(|(client, rest)| client.unwrap().map(move |c| (c.0, rest))) - .map(|(client, rest)| (Framed::<_, BytesMut>::new(client), rest)) - .and_then(|(client, rest)| { - client - .into_future() - .map(|v| (v, rest)) - .map_err(|(err, _)| err) - }) - .and_then(|((msg, _), rest)| { - let msg = msg.unwrap(); - assert_eq!(msg, "hello world"); - Ok(rest) - }) - .flatten_stream() - .into_future() - .map_err(|(err, _)| err) - .and_then(|(client, _)| client.unwrap()) - .map(|client| client.0) - .map(|client| Framed::<_, BytesMut>::new(client)) - .and_then(|client| client.into_future().map_err(|(err, _)| err)) - .and_then(|(msg, _)| { - let msg = msg.unwrap(); - assert_eq!(msg, "second message"); - Ok(()) - }); - - tokio_current_thread::block_on_all(future).unwrap(); - }); - - let transport = OnlyOnce::from(tx) - .with_upgrade(multiplex::MplexConfig::new()) - .map(|val, _| ((), val)) - .into_connection_reuse() - .map(|((), val), _| val); - - let future = transport - .clone() - .dial("/memory".parse().unwrap()) - .unwrap_or_else(|_| panic!()) - .map(|server| Framed::<_, BytesMut>::new(server.0)) - .and_then(|server| server.send("hello world".into())) - .and_then(|first_connec| { - transport - .clone() - .dial("/memory".parse().unwrap()) - .unwrap_or_else(|_| panic!()) - .map(|server| Framed::<_, BytesMut>::new(server.0)) - .map(|server| (first_connec, server)) - }) - .and_then(|(_first, second)| second.send("second message".into())) - .map(|_| ()); - - tokio_current_thread::block_on_all(future).unwrap(); - bg_thread.join().unwrap(); -} - -#[test] -fn use_opened_listen_to_dial() { - // A server waits for an incoming substream and a message on it, then opens an outgoing - // substream on that same connection, that the client has to accept. The client then sends a - // message on that new substream. - - let (tx, rx) = transport::connector(); - - let bg_thread = thread::spawn(move || { - let future = OnlyOnce::from(rx) - .with_upgrade(multiplex::MplexConfig::new()) - .listen_on("/memory".parse().unwrap()) - .unwrap_or_else(|_| panic!()).0 - .into_future() - .map_err(|(err, _)| err) - .and_then(|(client, _)| client.unwrap()) - .map(|client| Arc::new(client.0)) - .and_then(|c| { - let c2 = c.clone(); - muxing::inbound_from_ref_and_wrap(c.clone()).map(move |i| (c2, i)) - }) - .map(|(muxer, client)| (muxer, Framed::<_, BytesMut>::new(client.unwrap()))) - .and_then(|(muxer, client)| { - client - .into_future() - .map(move |msg| (muxer, msg)) - .map_err(|(err, _)| err) - }) - .and_then(|(muxer, (msg, _))| { - let msg = msg.unwrap(); - assert_eq!(msg, "hello world"); - muxing::outbound_from_ref_and_wrap(muxer) - }) - .map(|client| Framed::<_, BytesMut>::new(client.unwrap())) - .and_then(|client| client.into_future().map_err(|(err, _)| err)) - .and_then(|(msg, _)| { - let msg = msg.unwrap(); - assert_eq!(msg, "second message"); - Ok(()) - }); - - tokio_current_thread::block_on_all(future).unwrap(); - }); - - let transport = OnlyOnce::from(tx) - .with_upgrade(multiplex::MplexConfig::new()) - .map(|val, _| ((), val)) - .into_connection_reuse() - .map(|((), val), _| val); - - let future = transport - .clone() - .dial("/memory".parse().unwrap()) - .unwrap_or_else(|_| panic!()) - .map(|server| Framed::<_, BytesMut>::new(server.0)) - .and_then(|server| server.send("hello world".into())) - .and_then(|first_connec| { - transport - .clone() - .next_incoming() - .and_then(|server| server) - .map(|server| Framed::<_, BytesMut>::new(server.0)) - .map(|server| (first_connec, server)) - }) - .and_then(|(_first, second)| second.send("second message".into())) - .map(|_| ()); - - tokio_current_thread::block_on_all(future).unwrap(); - bg_thread.join().unwrap(); -} diff --git a/examples/echo-dialer.rs b/examples/echo-dialer.rs deleted file mode 100644 index 337abfe8..00000000 --- a/examples/echo-dialer.rs +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -extern crate bytes; -extern crate env_logger; -extern crate futures; -extern crate libp2p; -extern crate tokio_codec; -extern crate tokio_current_thread; - -use futures::sync::oneshot; -use futures::{Future, Sink, Stream}; -use std::env; -use libp2p::SimpleProtocol; -use libp2p::core::Transport; -use libp2p::core::{upgrade, either::EitherOutput}; -use libp2p::tcp::TcpConfig; -use tokio_codec::{BytesCodec, Framed}; -use libp2p::websocket::WsConfig; - -fn main() { - env_logger::init(); - - // Determine which address to dial. - let target_addr = env::args() - .nth(1) - .unwrap_or("/ip4/127.0.0.1/tcp/10333".to_owned()); - - // We start by creating a `TcpConfig` that indicates that we want TCP/IP. - let transport = TcpConfig::new() - // In addition to TCP/IP, we also want to support the Websockets protocol on top of TCP/IP. - // The parameter passed to `WsConfig::new()` must be an implementation of `Transport` to be - // used for the underlying multiaddress. - .or_transport(WsConfig::new(TcpConfig::new())) - - // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, - // depending on which one the remote supports. - .with_upgrade({ - let plain_text = upgrade::PlainTextConfig; - - let secio = { - let private_key = include_bytes!("test-rsa-private-key.pk8"); - let public_key = include_bytes!("test-rsa-public-key.der").to_vec(); - let keypair = libp2p::secio::SecioKeyPair::rsa_from_pkcs8(private_key, public_key).unwrap(); - libp2p::secio::SecioConfig::new(keypair) - - }; - - upgrade::or( - upgrade::map(plain_text, |pt| EitherOutput::First(pt)), - upgrade::map(secio, |out: libp2p::secio::SecioOutput<_>| EitherOutput::Second(out.stream)) - ) - }) - - // On top of plaintext or secio, we will use the multiplex protocol. - .with_upgrade(libp2p::mplex::MplexConfig::new()) - // The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a - // `Transport` because the output of the upgrade is not a stream but a controller for - // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into - // a `Transport`. - .map(|val, _| ((), val)) - .into_connection_reuse() - .map(|((), val), _| val); - - // Building a struct that represents the protocol that we are going to use for dialing. - let proto = SimpleProtocol::new("/echo/1.0.0", |socket| { - // This closure is called whenever a stream using the "echo" protocol has been - // successfully negotiated. The parameter is the raw socket (implements the AsyncRead - // and AsyncWrite traits), and the closure must return an implementation of - // `IntoFuture` that can yield any type of object. - Ok(Framed::new(socket, BytesCodec::new())) - }); - - let (finished_tx, finished_rx) = oneshot::channel(); - let mut finished_tx = Some(finished_tx); - - // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming - // connections for us. The second parameter we pass is the connection upgrade that is accepted - // by the listening part. We don't want to accept anything, so we pass a dummy object that - // represents a connection that is always denied. - let (swarm_controller, swarm_future) = libp2p::core::swarm( - transport.clone().with_upgrade(proto.clone()), - |echo, _client_addr| { - // `echo` is what the closure used when initializing `proto` returns. - // Consequently, please note that the `send` method is available only because the type - // `length_delimited::Framed` has a `send` method. - println!("Sending \"hello world\" to listener"); - let finished_tx = finished_tx.take(); - echo.send("hello world".into()) - // Then listening for one message from the remote. - .and_then(|echo| { - echo.into_future().map_err(|(e, _)| e).map(|(n,_ )| n) - }) - .and_then(move |message| { - println!("Received message from listener: {:?}", message.unwrap()); - if let Some(finished_tx) = finished_tx { - finished_tx.send(()).unwrap(); - } - Ok(()) - }) - }, - ); - - // We now use the controller to dial to the address. - swarm_controller - .dial(target_addr.parse().expect("invalid multiaddr"), transport.with_upgrade(proto)) - // If the multiaddr protocol exists but is not supported, then we get an error containing - // the original multiaddress. - .expect("unsupported multiaddr"); - - // The address we actually listen on can be different from the address that was passed to - // the `listen_on` function. For example if you pass `/ip4/0.0.0.0/tcp/0`, then the port `0` - // will be replaced with the actual port. - - // `swarm_future` is a future that contains all the behaviour that we want, but nothing has - // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the - // future through the tokio core. - let final_future = swarm_future - .for_each(|_| Ok(())) - .select(finished_rx.map_err(|_| unreachable!())) - .map(|_| ()) - .map_err(|(err, _)| err); - tokio_current_thread::block_on_all(final_future).unwrap(); -} diff --git a/examples/echo-server.rs b/examples/echo-server.rs deleted file mode 100644 index a1f22ce8..00000000 --- a/examples/echo-server.rs +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -extern crate bytes; -extern crate env_logger; -extern crate futures; -extern crate libp2p; -extern crate tokio_codec; -extern crate tokio_current_thread; - -use futures::future::{loop_fn, Future, IntoFuture, Loop}; -use futures::{Sink, Stream}; -use std::env; -use libp2p::SimpleProtocol; -use libp2p::core::Transport; -use libp2p::core::{upgrade, either::EitherOutput}; -use libp2p::tcp::TcpConfig; -use tokio_codec::{BytesCodec, Framed}; -use libp2p::websocket::WsConfig; - -fn main() { - env_logger::init(); - - // Determine which address to listen to. - let listen_addr = env::args() - .nth(1) - .unwrap_or("/ip4/0.0.0.0/tcp/10333".to_owned()); - - // We start by creating a `TcpConfig` that indicates that we want TCP/IP. - let transport = TcpConfig::new() - // In addition to TCP/IP, we also want to support the Websockets protocol on top of TCP/IP. - // The parameter passed to `WsConfig::new()` must be an implementation of `Transport` to be - // used for the underlying multiaddress. - .or_transport(WsConfig::new(TcpConfig::new())) - - // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, - // depending on which one the remote supports. - .with_upgrade({ - let plain_text = upgrade::PlainTextConfig; - - let secio = { - let private_key = include_bytes!("test-rsa-private-key.pk8"); - let public_key = include_bytes!("test-rsa-public-key.der").to_vec(); - let keypair = libp2p::secio::SecioKeyPair::rsa_from_pkcs8(private_key, public_key).unwrap(); - libp2p::secio::SecioConfig::new(keypair) - }; - - upgrade::or( - upgrade::map(plain_text, |pt| EitherOutput::First(pt)), - upgrade::map(secio, |out: libp2p::secio::SecioOutput<_>| EitherOutput::Second(out.stream)) - ) - }) - - // On top of plaintext or secio, we will use the multiplex protocol. - .with_upgrade(libp2p::mplex::MplexConfig::new()) - // The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a - // `Transport` because the output of the upgrade is not a stream but a controller for - // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into - // a `Transport`. - .map(|val, _| ((), val)) - .into_connection_reuse() - .map(|((), val), _| val); - - // We now have a `transport` variable that can be used either to dial nodes or listen to - // incoming connections, and that will automatically apply secio and multiplex on top - // of any opened stream. - - // We now prepare the protocol that we are going to negotiate with nodes that open a connection - // or substream to our server. - let proto = SimpleProtocol::new("/echo/1.0.0", |socket| { - // This closure is called whenever a stream using the "echo" protocol has been - // successfully negotiated. The parameter is the raw socket (implements the AsyncRead - // and AsyncWrite traits), and the closure must return an implementation of - // `IntoFuture` that can yield any type of object. - Ok(Framed::new(socket, BytesCodec::new())) - }); - - // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and - // outgoing connections for us. - let (swarm_controller, swarm_future) = libp2p::core::swarm( - transport.clone().with_upgrade(proto), - |socket, _client_addr| { - println!("Successfully negotiated protocol"); - - // The type of `socket` is exactly what the closure of `SimpleProtocol` returns. - - // We loop forever in order to handle all the messages sent by the client. - loop_fn(socket, move |socket| { - socket - .into_future() - .map_err(|(e, _)| e) - .and_then(move |(msg, rest)| { - if let Some(msg) = msg { - // One message has been received. We send it back to the client. - println!( - "Received a message: {:?}\n => Sending back \ - identical message to remote", msg - ); - Box::new(rest.send(msg.freeze()).map(|m| Loop::Continue(m))) - as Box> - } else { - // End of stream. Connection closed. Breaking the loop. - println!("Received EOF\n => Dropping connection"); - Box::new(Ok(Loop::Break(())).into_future()) - as Box> - } - }) - }) - }, - ); - - // We now use the controller to listen on the address. - let address = swarm_controller - .listen_on(listen_addr.parse().expect("invalid multiaddr")) - // If the multiaddr protocol exists but is not supported, then we get an error containing - // the original multiaddress. - .expect("unsupported multiaddr"); - // The address we actually listen on can be different from the address that was passed to - // the `listen_on` function. For example if you pass `/ip4/0.0.0.0/tcp/0`, then the port `0` - // will be replaced with the actual port. - println!("Now listening on {:?}", address); - - // `swarm_future` is a future that contains all the behaviour that we want, but nothing has - // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the - // future through the tokio core. - tokio_current_thread::block_on_all(swarm_future.for_each(|_| Ok(()))).unwrap(); -} diff --git a/examples/floodsub.rs b/examples/floodsub.rs deleted file mode 100644 index 5f006a66..00000000 --- a/examples/floodsub.rs +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -extern crate bytes; -extern crate env_logger; -extern crate futures; -extern crate libp2p; -extern crate rand; -extern crate tokio_current_thread; -extern crate tokio_io; -extern crate tokio_stdin; - -use futures::Stream; -use futures::future::Future; -use std::{env, mem}; -use libp2p::core::{either::EitherOutput, upgrade}; -use libp2p::core::{Multiaddr, Transport, PublicKey}; -use libp2p::peerstore::PeerId; -use libp2p::tcp::TcpConfig; -use libp2p::websocket::WsConfig; - -fn main() { - env_logger::init(); - - // Determine which address to listen to. - let listen_addr = env::args() - .nth(1) - .unwrap_or("/ip4/0.0.0.0/tcp/10050".to_owned()); - - // We start by creating a `TcpConfig` that indicates that we want TCP/IP. - let transport = TcpConfig::new() - // In addition to TCP/IP, we also want to support the Websockets protocol on top of TCP/IP. - // The parameter passed to `WsConfig::new()` must be an implementation of `Transport` to be - // used for the underlying multiaddress. - .or_transport(WsConfig::new(TcpConfig::new())) - - // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, - // depending on which one the remote supports. - .with_upgrade({ - let plain_text = upgrade::PlainTextConfig; - - let secio = { - let private_key = include_bytes!("test-rsa-private-key.pk8"); - let public_key = include_bytes!("test-rsa-public-key.der").to_vec(); - libp2p::secio::SecioConfig::new( - libp2p::secio::SecioKeyPair::rsa_from_pkcs8(private_key, public_key).unwrap() - ) - }; - - upgrade::or( - upgrade::map(plain_text, |pt| EitherOutput::First(pt)), - upgrade::map(secio, |out: libp2p::secio::SecioOutput<_>| EitherOutput::Second(out.stream)) - ) - }) - - // On top of plaintext or secio, we will use the multiplex protocol. - .with_upgrade(libp2p::mplex::MplexConfig::new()) - // The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a - // `Transport` because the output of the upgrade is not a stream but a controller for - // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into - // a `Transport`. - .map(|val, _| ((), val)) - .into_connection_reuse() - .map(|((), val), _| val); - - // We now have a `transport` variable that can be used either to dial nodes or listen to - // incoming connections, and that will automatically apply secio and multiplex on top - // of any opened stream. - - // We now prepare the protocol that we are going to negotiate with nodes that open a connection - // or substream to our server. - let my_id = { - let key = (0..2048).map(|_| rand::random::()).collect::>(); - PeerId::from_public_key(PublicKey::Rsa(key)) - }; - - let (floodsub_upgrade, floodsub_rx) = libp2p::floodsub::FloodSubUpgrade::new(my_id); - - // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and - // outgoing connections for us. - let (swarm_controller, swarm_future) = libp2p::core::swarm( - transport.clone().with_upgrade(floodsub_upgrade.clone()), - |socket, _| { - println!("Successfully negotiated protocol"); - socket - }, - ); - - let address = swarm_controller - .listen_on(listen_addr.parse().expect("invalid multiaddr")) - .expect("unsupported multiaddr"); - println!("Now listening on {:?}", address); - - let topic = libp2p::floodsub::TopicBuilder::new("chat").build(); - - let floodsub_ctl = libp2p::floodsub::FloodSubController::new(&floodsub_upgrade); - floodsub_ctl.subscribe(&topic); - - let floodsub_rx = floodsub_rx.for_each(|msg| { - if let Ok(msg) = String::from_utf8(msg.data) { - println!("< {}", msg); - } - Ok(()) - }); - - let stdin = { - let mut buffer = Vec::new(); - tokio_stdin::spawn_stdin_stream_unbounded().for_each(move |msg| { - if msg != b'\r' && msg != b'\n' { - buffer.push(msg); - return Ok(()); - } else if buffer.is_empty() { - return Ok(()); - } - - let msg = String::from_utf8(mem::replace(&mut buffer, Vec::new())).unwrap(); - if msg.starts_with("/dial ") { - let target: Multiaddr = msg[6..].parse().unwrap(); - println!("*Dialing {}*", target); - swarm_controller - .dial( - target, - transport.clone().with_upgrade(floodsub_upgrade.clone()), - ) - .unwrap(); - } else { - floodsub_ctl.publish(&topic, msg.into_bytes()); - } - - Ok(()) - }) - }; - - let final_fut = swarm_future - .for_each(|_| Ok(())) - .select(floodsub_rx) - .map(|_| ()) - .map_err(|e| e.0) - .select(stdin.map_err(|_| unreachable!())) - .map(|_| ()) - .map_err(|e| e.0); - tokio_current_thread::block_on_all(final_fut).unwrap(); -} diff --git a/examples/kademlia.rs b/examples/kademlia.rs deleted file mode 100644 index f2c08d06..00000000 --- a/examples/kademlia.rs +++ /dev/null @@ -1,292 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -// Libp2p's code unfortunately produces very large types. Rust's default length limit for type -// names is not large enough, therefore we need this attribute. -#![type_length_limit = "4194304"] - -extern crate bigint; -extern crate bytes; -extern crate env_logger; -extern crate futures; -extern crate libp2p; -extern crate tokio_current_thread; -extern crate tokio_io; -extern crate multiaddr; - -use bigint::U512; -use futures::{Future, Stream}; -use libp2p::peerstore::{PeerAccess, PeerId, Peerstore}; -use multiaddr::Multiaddr; -use std::collections::HashMap; -use std::env; -use std::sync::{Arc, Mutex}; -use std::time::Duration; -use libp2p::core::{Transport, PublicKey, UniqueConnec}; -use libp2p::core::{upgrade, either::EitherOutput}; -use libp2p::kad::{KadConnecConfig, KadConnectionType, KadPeer, KadQueryEvent, KadSystem}; -use libp2p::kad::{KadSystemConfig, KadIncomingRequest}; -use libp2p::tcp::TcpConfig; - -fn main() { - env_logger::init(); - - // Determine which addresses to listen to. - let listen_addrs = { - let mut args = env::args().skip(1).collect::>(); - if args.is_empty() { - args.push("/ip4/0.0.0.0/tcp/0".to_owned()); - } - args - }; - - let peer_store = Arc::new(libp2p::peerstore::memory_peerstore::MemoryPeerstore::empty()); - ipfs_bootstrap(&*peer_store); - - // We create a `TcpConfig` that indicates that we want TCP/IP. - let transport = TcpConfig::new() - - // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, - // depending on which one the remote supports. - .with_upgrade({ - let plain_text = upgrade::PlainTextConfig; - - let secio = { - let private_key = include_bytes!("test-rsa-private-key.pk8"); - let public_key = include_bytes!("test-rsa-public-key.der").to_vec(); - libp2p::secio::SecioConfig::new( - libp2p::secio::SecioKeyPair::rsa_from_pkcs8(private_key, public_key).unwrap() - ) - }; - - upgrade::or( - upgrade::map(plain_text, |pt| EitherOutput::First(pt)), - upgrade::map(secio, |out: libp2p::secio::SecioOutput<_>| EitherOutput::Second(out.stream)) - ) - }) - - // On top of plaintext or secio, we will use the multiplex protocol. - .with_upgrade(libp2p::mplex::MplexConfig::new()) - // The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a - // `Transport` because the output of the upgrade is not a stream but a controller for - // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into - // a `Transport`. - .map(|val, _| ((), val)) - .into_connection_reuse() - .map(|((), val), _| val); - - let addr_resolver = { - let peer_store = peer_store.clone(); - move |peer_id| { - peer_store - .peer(&peer_id) - .into_iter() - .flat_map(|peer| peer.addrs()) - .collect::>() - .into_iter() - } - }; - - let transport = libp2p::identify::PeerIdTransport::new(transport, addr_resolver) - .and_then({ - let peer_store = peer_store.clone(); - move |id_out, _, remote_addr| { - let socket = id_out.socket; - let original_addr = id_out.original_addr; - id_out.info.map(move |info| { - let peer_id = info.info.public_key.into_peer_id(); - peer_store.peer_or_create(&peer_id).add_addr(original_addr, Duration::from_secs(3600)); - (socket, remote_addr) - }) - } - }); - - // We now have a `transport` variable that can be used either to dial nodes or listen to - // incoming connections, and that will automatically apply secio and multiplex on top - // of any opened stream. - - let my_peer_id = PeerId::from_public_key(PublicKey::Rsa(include_bytes!("test-rsa-public-key.der").to_vec())); - println!("Local peer id is: {:?}", my_peer_id); - - let kad_system = Arc::new(KadSystem::without_init(KadSystemConfig { - parallelism: 3, - local_peer_id: my_peer_id.clone(), - kbuckets_timeout: Duration::from_secs(10), - request_timeout: Duration::from_secs(10), - known_initial_peers: peer_store.peers(), - })); - - let active_kad_connections = Arc::new(Mutex::new(HashMap::<_, UniqueConnec<_>>::new())); - - // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and - // outgoing connections for us. - let (swarm_controller, swarm_future) = libp2p::core::swarm( - transport.clone().with_upgrade(KadConnecConfig::new()), - { - let peer_store = peer_store.clone(); - let kad_system = kad_system.clone(); - let active_kad_connections = active_kad_connections.clone(); - move |(kad_ctrl, kad_stream), node_addr| { - let peer_store = peer_store.clone(); - let kad_system = kad_system.clone(); - let active_kad_connections = active_kad_connections.clone(); - node_addr.and_then(move |node_addr| { - let node_id = p2p_multiaddr_to_node_id(node_addr); - let node_id2 = node_id.clone(); - let fut = kad_stream.for_each(move |req| { - let peer_store = peer_store.clone(); - kad_system.update_kbuckets(node_id2.clone()); - match req { - KadIncomingRequest::FindNode { searched, responder } => { - let result = kad_system - .known_closest_peers(&searched) - .map(move |peer_id| { - let addrs = peer_store - .peer(&peer_id) - .into_iter() - .flat_map(|p| p.addrs()) - .collect::>(); - KadPeer { - node_id: peer_id.clone(), - multiaddrs: addrs, - connection_ty: KadConnectionType::Connected, // meh :-/ - } - }) - .collect::>(); - responder.respond(result); - }, - KadIncomingRequest::GetProviders { .. } => { - }, - KadIncomingRequest::AddProvider { .. } => { - }, - KadIncomingRequest::PingPong => { - } - }; - Ok(()) - }); - - let mut active_kad_connections = active_kad_connections.lock().unwrap(); - active_kad_connections - .entry(node_id) - .or_insert_with(Default::default) - .tie_or_passthrough(kad_ctrl, fut) - }) - } - } - ); - - for listen_addr in listen_addrs { - let addr = swarm_controller - .listen_on(listen_addr.parse().expect("invalid multiaddr")) - .expect("unsupported multiaddr"); - println!("Now listening on {:?}", addr); - } - - let finish_enum = kad_system - .find_node(my_peer_id.clone(), |peer| { - let addr = Multiaddr::from(libp2p::multiaddr::Protocol::P2p(peer.clone().into())); - active_kad_connections.lock().unwrap().entry(peer.clone()) - .or_insert_with(Default::default) - .dial(&swarm_controller, &addr, transport.clone().with_upgrade(KadConnecConfig::new())) - }) - .filter_map(move |event| { - match event { - KadQueryEvent::PeersReported(peers) => { - for peer in peers { - peer_store.peer_or_create(&peer.node_id) - .add_addrs(peer.multiaddrs, Duration::from_secs(3600)); - } - None - }, - KadQueryEvent::Finished(out) => Some(out), - } - }) - .into_future() - .map_err(|(err, _)| err) - .map(|(out, _)| out.unwrap()) - .and_then(|out| { - let local_hash = U512::from(my_peer_id.digest()); - println!("Results of peer discovery for {:?}:", my_peer_id); - for n in out { - let other_hash = U512::from(n.digest()); - let dist = 512 - (local_hash ^ other_hash).leading_zeros(); - println!("* {:?} (distance bits = {:?} (lower is better))", n, dist); - } - Ok(()) - }); - - // `swarm_future` is a future that contains all the behaviour that we want, but nothing has - // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the - // future through the tokio core. - tokio_current_thread::block_on_all( - finish_enum - .select(swarm_future.for_each(|_| Ok(()))) - .map(|(n, _)| n) - .map_err(|(err, _)| err), - ).unwrap(); -} - -/// Expects a multiaddr of the format `/p2p/` and returns the node ID. -/// Panics if the format is not correct. -fn p2p_multiaddr_to_node_id(client_addr: Multiaddr) -> PeerId { - let (first, second); - { - let mut iter = client_addr.iter(); - first = iter.next(); - second = iter.next(); - } - match (first, second) { - (Some(libp2p::multiaddr::Protocol::P2p(node_id)), None) => - PeerId::from_multihash(node_id).expect("libp2p always reports a valid node id"), - _ => panic!("Reported multiaddress is in the wrong format ; programmer error") - } -} - -/// Stores initial addresses on the given peer store. Uses a very large timeout. -pub fn ipfs_bootstrap

(peer_store: P) -where - P: Peerstore + Clone, -{ - const ADDRESSES: &[&str] = &[ - "/ip4/127.0.0.1/tcp/4001/p2p/QmQRx32wQkw3hB45j4UDw8V9Ju4mGbxMyhs2m8mpFrFkur", - // TODO: add some bootstrap nodes here - ]; - - let ttl = Duration::from_secs(100 * 365 * 24 * 3600); - - for address in ADDRESSES.iter() { - let mut multiaddr = address - .parse::() - .expect("failed to parse hard-coded multiaddr"); - - let p2p_component = multiaddr.pop().expect("hard-coded multiaddr is empty"); - let peer = match p2p_component { - libp2p::multiaddr::Protocol::P2p(key) => { - PeerId::from_multihash(key).expect("invalid peer id") - } - _ => panic!("hard-coded multiaddr didn't end with /p2p/"), - }; - - peer_store - .clone() - .peer_or_create(&peer) - .add_addr(multiaddr, ttl.clone()); - } -} diff --git a/examples/ping-client.rs b/examples/ping-client.rs deleted file mode 100644 index 5ebe5c33..00000000 --- a/examples/ping-client.rs +++ /dev/null @@ -1,123 +0,0 @@ -// Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -extern crate bytes; -extern crate env_logger; -extern crate futures; -extern crate libp2p; -extern crate tokio_current_thread; -extern crate tokio_io; - -use futures::{Future, Stream}; -use futures::sync::oneshot; -use std::env; -use libp2p::core::Transport; -use libp2p::core::{upgrade, either::EitherOutput}; -use libp2p::tcp::TcpConfig; - -fn main() { - env_logger::init(); - - // Determine which address to dial. - let target_addr = env::args() - .nth(1) - .unwrap_or("/ip4/127.0.0.1/tcp/4001".to_owned()); - - // We start by creating a `TcpConfig` that indicates that we want TCP/IP. - let transport = TcpConfig::new() - - // On top of TCP/IP, we will use either the plaintext protocol or the secio protocol, - // depending on which one the remote supports. - .with_upgrade({ - let plain_text = upgrade::PlainTextConfig; - - let secio = { - let private_key = include_bytes!("test-rsa-private-key.pk8"); - let public_key = include_bytes!("test-rsa-public-key.der").to_vec(); - libp2p::secio::SecioConfig::new( - libp2p::secio::SecioKeyPair::rsa_from_pkcs8(private_key, public_key).unwrap() - ) - }; - - upgrade::or( - upgrade::map(plain_text, |pt| EitherOutput::First(pt)), - upgrade::map(secio, |out: libp2p::secio::SecioOutput<_>| EitherOutput::Second(out.stream)) - ) - }) - - // On top of plaintext or secio, we will use the multiplex protocol. - .with_upgrade(libp2p::mplex::MplexConfig::new()) - // The object returned by the call to `with_upgrade(MplexConfig::new())` can't be used as a - // `Transport` because the output of the upgrade is not a stream but a controller for - // muxing. We have to explicitly call `into_connection_reuse()` in order to turn this into - // a `Transport`. - .map(|val, _| ((), val)) - .into_connection_reuse() - .map(|((), val), _| val); - - // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming - // connections for us. The second parameter we pass is the connection upgrade that is accepted - // by the listening part. We don't want to accept anything, so we pass a dummy object that - // represents a connection that is always denied. - let (tx, rx) = oneshot::channel(); - let mut tx = Some(tx); - let (swarm_controller, swarm_future) = libp2p::core::swarm( - transport.clone().with_upgrade(libp2p::ping::Ping::default()), - |out, _client_addr| { - if let libp2p::ping::PingOutput::Pinger(mut pinger) = out { - let tx = tx.take(); - pinger.ping(()); - pinger - .into_future() - .map(move |_| { - println!("Received pong from the remote"); - if let Some(tx) = tx { - let _ = tx.send(()); - } - () - }) - .map_err(|(e, _)| e) - } else { - unreachable!() - } - }, - ); - - // We now use the controller to dial to the address. - swarm_controller - .dial(target_addr.parse().expect("invalid multiaddr"), - transport.with_upgrade(libp2p::ping::Ping::default())) - // If the multiaddr protocol exists but is not supported, then we get an error containing - // the original multiaddress. - .expect("unsupported multiaddr"); - - // The address we actually listen on can be different from the address that was passed to - // the `listen_on` function. For example if you pass `/ip4/0.0.0.0/tcp/0`, then the port `0` - // will be replaced with the actual port. - - // `swarm_future` is a future that contains all the behaviour that we want, but nothing has - // actually started yet. Because we created the `TcpConfig` with tokio, we need to run the - // future through the tokio core. - tokio_current_thread::block_on_all( - rx.select(swarm_future.for_each(|_| Ok(())).map_err(|_| unreachable!())) - .map_err(|(e, _)| e) - .map(|_| ()), - ).unwrap(); -} diff --git a/examples/random_peerid.rs b/examples/random_peerid.rs deleted file mode 100644 index 6fce007b..00000000 --- a/examples/random_peerid.rs +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -extern crate libp2p; -extern crate rand; - -use libp2p::{PeerId, core::PublicKey}; - -fn main() { - let pid = { - let key = (0..2048).map(|_| rand::random::()).collect::>(); - PeerId::from_public_key(PublicKey::Rsa(key)) - }; - println!("{}", pid.to_base58()); -} diff --git a/examples/relay.rs b/examples/relay.rs deleted file mode 100644 index 0eab6c4b..00000000 --- a/examples/relay.rs +++ /dev/null @@ -1,224 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! Example runs -//! ============ -//! -//! As destination -//! -------------- -//! -//! relay listen \ -//! --self "QmcwnUP8cM2U4EeMW6g6nbFUQRyE6xXh65TPaZD9bqkhdK" \ -//! --listen "/ip4/127.0.0.1/tcp/10003" \ -//! --peer "QmXnxVaQoP8cPm2J5uN73GPEu3pCkJdYDNDCMZS8dMxTXL=/ip4/127.0.0.1/tcp/10002" -//! -//! As relay -//! -------- -//! -//! relay listen \ -//! --self "QmXnxVaQoP8cPm2J5uN73GPEu3pCkJdYDNDCMZS8dMxTXL" \ -//! --listen "/ip4/127.0.0.1/tcp/10002" \ -//! --peer "QmcwnUP8cM2U4EeMW6g6nbFUQRyE6xXh65TPaZD9bqkhdK=/ip4/127.0.0.1/tcp/10003" -//! -//! As source -//! --------- -//! -//! relay dial \ -//! --self "QmYJ46WjbwxLkrTGU1JZNEN3HnYbcuES8QahG1PAMCxFY8" \ -//! --destination "QmcwnUP8cM2U4EeMW6g6nbFUQRyE6xXh65TPaZD9bqkhdK" \ -//! --relay "QmXnxVaQoP8cPm2J5uN73GPEu3pCkJdYDNDCMZS8dMxTXL" \ -//! --peer "QmXnxVaQoP8cPm2J5uN73GPEu3pCkJdYDNDCMZS8dMxTXL=/ip4/127.0.0.1/tcp/10002" - -extern crate bytes; -extern crate env_logger; -extern crate futures; -extern crate libp2p; -extern crate libp2p_yamux; -extern crate rand; -#[macro_use] -extern crate structopt; -extern crate tokio_codec; -extern crate tokio_current_thread; - -use libp2p::SimpleProtocol; -use libp2p::core::Multiaddr; -use libp2p::core::transport::Transport; -use libp2p::core::upgrade; -use futures::{future::{self, Either, Loop, loop_fn}, prelude::*}; -use libp2p::peerstore::{PeerAccess, PeerId, Peerstore, memory_peerstore::MemoryPeerstore}; -use libp2p::relay::{RelayConfig, RelayTransport}; -use std::{error::Error, iter, str::FromStr, sync::Arc, time::Duration}; -use structopt::StructOpt; -use libp2p::tcp::TcpConfig; -use tokio_codec::{BytesCodec, Framed}; - -fn main() -> Result<(), Box> { - env_logger::init(); - match Options::from_args() { - Options::Dialer(opts) => run_dialer(opts), - Options::Listener(opts) => run_listener(opts) - } -} - -#[derive(Debug, StructOpt)] -#[structopt(name = "relay", about = "Usage example for /libp2p/relay/circuit/0.1.0")] -enum Options { - #[structopt(name = "dial")] - /// Run in dial mode. - Dialer(DialerOpts), - #[structopt(name = "listen")] - /// Run in listener mode. - Listener(ListenerOpts) -} - -#[derive(Debug, StructOpt)] -struct DialerOpts { - #[structopt(short = "s", long = "self", parse(try_from_str))] - /// The PeerId of this node. - me: PeerId, - #[structopt(short = "d", long = "destination", parse(try_from_str))] - /// The PeerId to dial. - dest: PeerId, - #[structopt(short = "r", long = "relay", parse(try_from_str))] - /// The PeerId of the relay node to use when dialing the destination. - relay: PeerId, - #[structopt(short = "p", long = "peer", parse(try_from_str = "parse_peer_addr"))] - /// A network peer known to this node (format: PeerId=Multiaddr). - /// For example: QmXnxVaQoP8cPm2J5uN73GPEu3pCkJdYDNDCMZS8dMxTXL=/ip4/127.0.0.1/tcp/12345 - peers: Vec<(PeerId, Multiaddr)> -} - -#[derive(Debug, StructOpt)] -struct ListenerOpts { - #[structopt(short = "s", long = "self", parse(try_from_str))] - /// The PeerId of this node. - me: PeerId, - #[structopt(short = "p", long = "peer", parse(try_from_str = "parse_peer_addr"))] - /// A network peer know to this node (format: PeerId=Multiaddr). - /// For example: QmXnxVaQoP8cPm2J5uN73GPEu3pCkJdYDNDCMZS8dMxTXL=/ip4/127.0.0.1/tcp/12345 - peers: Vec<(PeerId, Multiaddr)>, - #[structopt(short = "l", long = "listen", parse(try_from_str))] - /// The multiaddress to listen for incoming connections. - listen: Multiaddr -} - -fn run_dialer(opts: DialerOpts) -> Result<(), Box> { - let store = Arc::new(MemoryPeerstore::empty()); - for (p, a) in opts.peers { - store.peer_or_create(&p).add_addr(a, Duration::from_secs(600)) - } - - let transport = { - let tcp = TcpConfig::new() - .with_upgrade(libp2p_yamux::Config::default()) - .map(|val, _| ((), val)) - .into_connection_reuse() - .map(|((), val), _| val); - RelayTransport::new(opts.me, tcp, store, iter::once(opts.relay)).with_dummy_muxing() - }; - - let echo = SimpleProtocol::new("/echo/1.0.0", |socket| { - Ok(Framed::new(socket, BytesCodec::new())) - }); - - let (control, future) = libp2p::core::swarm(transport.clone().with_upgrade(echo.clone()), |socket, _| { - println!("sending \"hello world\""); - socket.send("hello world".into()) - .and_then(|socket| socket.into_future().map_err(|(e, _)| e).map(|(m, _)| m)) - .and_then(|message| { - println!("received message: {:?}", message); - Ok(()) - }) - }); - - let address = format!("/p2p-circuit/p2p/{}", opts.dest.to_base58()).parse()?; - - control.dial(address, transport.with_upgrade(echo)).map_err(|_| "failed to dial")?; - - tokio_current_thread::block_on_all(future.for_each(|_| Ok(()))).map_err(From::from) -} - -fn run_listener(opts: ListenerOpts) -> Result<(), Box> { - let store = Arc::new(MemoryPeerstore::empty()); - for (p, a) in opts.peers { - store.peer_or_create(&p).add_addr(a, Duration::from_secs(600)) - } - - let transport = TcpConfig::new() - .with_upgrade(libp2p_yamux::Config::default()) - .map(|val, _| ((), val)) - .into_connection_reuse() - .map(|((), val), _| val); - - let relay = RelayConfig::new(opts.me, transport.clone(), store); - - let echo = SimpleProtocol::new("/echo/1.0.0", |socket| { - Ok(Framed::new(socket, BytesCodec::new())) - }); - - let upgraded = transport.with_upgrade(relay) - .and_then(|out, endpoint, addr| { - match out { - libp2p::relay::Output::Sealed(future) => { - Either::A(future.map(|out| (Either::A(out), Either::A(addr)))) - } - libp2p::relay::Output::Stream(socket) => { - Either::B(upgrade::apply(socket, echo, endpoint, addr) - .map(|(out, addr)| (Either::B(out), Either::B(addr)))) - } - } - }); - - let (control, future) = libp2p::core::swarm(upgraded, |out, _| { - match out { - Either::A(()) => Either::A(future::ok(())), - Either::B(socket) => Either::B(loop_fn(socket, move |socket| { - socket.into_future() - .map_err(|(e, _)| e) - .and_then(move |(msg, socket)| { - if let Some(msg) = msg { - println!("received at destination: {:?}", msg); - Either::A(socket.send(msg.freeze()).map(Loop::Continue)) - } else { - println!("received EOF at destination"); - Either::B(future::ok(Loop::Break(()))) - } - }) - })) - } - }); - - control.listen_on(opts.listen).map_err(|_| "failed to listen")?; - tokio_current_thread::block_on_all(future.for_each(|_| Ok(()))).map_err(From::from) -} - -// Custom parsers /////////////////////////////////////////////////////////// - -fn parse_peer_addr(s: &str) -> Result<(PeerId, Multiaddr), Box> { - let mut iter = s.splitn(2, '='); - let p = iter.next() - .and_then(|s| PeerId::from_str(s).ok()) - .ok_or("missing or invalid PeerId")?; - let m = iter.next() - .and_then(|s| Multiaddr::from_str(s).ok()) - .ok_or("missing or invalid Multiaddr")?; - Ok((p, m)) -} - diff --git a/examples/test-rsa-private-key.pk8 b/examples/test-rsa-private-key.pk8 deleted file mode 100644 index 452b7af1..00000000 Binary files a/examples/test-rsa-private-key.pk8 and /dev/null differ diff --git a/examples/test-rsa-public-key.der b/examples/test-rsa-public-key.der deleted file mode 100644 index 9e62c93e..00000000 Binary files a/examples/test-rsa-public-key.der and /dev/null differ diff --git a/src/lib.rs b/src/lib.rs index 9f95f032..2d81fb74 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -159,7 +159,7 @@ pub extern crate libp2p_yamux as yamux; pub mod simple; -pub use self::core::{Transport, ConnectionUpgrade, PeerId, swarm}; +pub use self::core::{Transport, ConnectionUpgrade, PeerId}; pub use self::multiaddr::Multiaddr; pub use self::simple::SimpleProtocol; pub use self::transport_timeout::TransportTimeout;