diff --git a/Cargo.toml b/Cargo.toml index ef17a95e..421bd81b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "libp2p-tcp-transport", "multistream-select", "datastore", + "futures-mutex", "rw-stream-sink", "circular-buffer", "varint-rs", diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index fa2098af..c247b28b 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -1,35 +1,35 @@ // Copyright 2017 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // -// The above copyright notice and this permission notice shall be included in +// The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. extern crate bytes; extern crate futures; -extern crate multiplex; extern crate libp2p_secio as secio; extern crate libp2p_swarm as swarm; extern crate libp2p_tcp_transport as tcp; +extern crate multiplex; extern crate tokio_core; extern crate tokio_io; use bytes::BytesMut; -use futures::{Stream, Sink, Future}; -use swarm::{Transport, SimpleProtocol}; +use futures::{Future, Sink, Stream}; +use swarm::{SimpleProtocol, Transport}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::codec::length_delimited; @@ -73,20 +73,29 @@ fn main() { // of any opened stream. // We use it to dial `/ip4/127.0.0.1/tcp/10333`. - let dialer = transport.dial_and_listen(swarm::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap()) + let dialer = transport + .dial_and_listen(swarm::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap()) .unwrap_or_else(|_| panic!("unsupported multiaddr protocol ; should never happen")) - .and_then(|(echo, incoming)| { + .and_then(|(incoming, echo)| { // `echo` is what the closure used when initializing "echo" returns. // Consequently, please note that the `send` method is available only because the type // `length_delimited::Framed` has a `send` method. - echo.send("hello world".into()).map(Option::Some) - .select(incoming.for_each(|_| { println!("opened"); Ok(()) }).map(|()| None)) + echo.and_then(|echo| echo.send("hello world".into()).map(Option::Some)) + .select( + incoming + .for_each(|_| { + println!("opened"); + Ok(()) + }) + .map(|()| None), + ) .map(|(n, _)| n) .map_err(|(e, _)| e) }) .and_then(|echo| { // The message has been successfully sent. Now wait for an answer. - echo.unwrap().into_future() + echo.unwrap() + .into_future() .map(|(msg, rest)| { println!("received: {:?}", msg); rest diff --git a/futures-mutex/Cargo.toml b/futures-mutex/Cargo.toml new file mode 100644 index 00000000..6ba96685 --- /dev/null +++ b/futures-mutex/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "futures-mutex" +version = "0.2.0" +authors = ["Adam Jacobus "] +license = "Apache-2.0" +description = "A Mutex for the Future(s)" +readme = "README.md" +documentation = "https://docs.rs/futures-mutex" +repository = "https://github.com/proman21/futures-mutex" +keywords = ["future", "futures", "mutex", "concurrency"] +categories = ["asynchronous", "concurrency"] + +[dependencies] +futures = "0.1.14" +crossbeam = "0.2.10" diff --git a/futures-mutex/LICENSE b/futures-mutex/LICENSE new file mode 100644 index 00000000..5e0fd33c --- /dev/null +++ b/futures-mutex/LICENSE @@ -0,0 +1,201 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + +"License" shall mean the terms and conditions for use, reproduction, +and distribution as defined by Sections 1 through 9 of this document. + +"Licensor" shall mean the copyright owner or entity authorized by +the copyright owner that is granting the License. + +"Legal Entity" shall mean the union of the acting entity and all +other entities that control, are controlled by, or are under common +control with that entity. For the purposes of this definition, +"control" means (i) the power, direct or indirect, to cause the +direction or management of such entity, whether by contract or +otherwise, or (ii) ownership of fifty percent (50%) or more of the +outstanding shares, or (iii) beneficial ownership of such entity. + +"You" (or "Your") shall mean an individual or Legal Entity +exercising permissions granted by this License. + +"Source" form shall mean the preferred form for making modifications, +including but not limited to software source code, documentation +source, and configuration files. + +"Object" form shall mean any form resulting from mechanical +transformation or translation of a Source form, including but +not limited to compiled object code, generated documentation, +and conversions to other media types. + +"Work" shall mean the work of authorship, whether in Source or +Object form, made available under the License, as indicated by a +copyright notice that is included in or attached to the work +(an example is provided in the Appendix below). + +"Derivative Works" shall mean any work, whether in Source or Object +form, that is based on (or derived from) the Work and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. For the purposes +of this License, Derivative Works shall not include works that remain +separable from, or merely link (or bind by name) to the interfaces of, +the Work and Derivative Works thereof. + +"Contribution" shall mean any work of authorship, including +the original version of the Work and any modifications or additions +to that Work or Derivative Works thereof, that is intentionally +submitted to Licensor for inclusion in the Work by the copyright owner +or by an individual or Legal Entity authorized to submit on behalf of +the copyright owner. For the purposes of this definition, "submitted" +means any form of electronic, verbal, or written communication sent +to the Licensor or its representatives, including but not limited to +communication on electronic mailing lists, source code control systems, +and issue tracking systems that are managed by, or on behalf of, the +Licensor for the purpose of discussing and improving the Work, but +excluding communication that is conspicuously marked or otherwise +designated in writing by the copyright owner as "Not a Contribution." + +"Contributor" shall mean Licensor and any individual or Legal Entity +on behalf of whom a Contribution has been received by Licensor and +subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +copyright license to reproduce, prepare Derivative Works of, +publicly display, publicly perform, sublicense, and distribute the +Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +(except as stated in this section) patent license to make, have made, +use, offer to sell, sell, import, and otherwise transfer the Work, +where such license applies only to those patent claims licensable +by such Contributor that are necessarily infringed by their +Contribution(s) alone or by combination of their Contribution(s) +with the Work to which such Contribution(s) was submitted. If You +institute patent litigation against any entity (including a +cross-claim or counterclaim in a lawsuit) alleging that the Work +or a Contribution incorporated within the Work constitutes direct +or contributory patent infringement, then any patent licenses +granted to You under this License for that Work shall terminate +as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the +Work or Derivative Works thereof in any medium, with or without +modifications, and in Source or Object form, provided that You +meet the following conditions: + +(a) You must give any other recipients of the Work or +Derivative Works a copy of this License; and + +(b) You must cause any modified files to carry prominent notices +stating that You changed the files; and + +(c) You must retain, in the Source form of any Derivative Works +that You distribute, all copyright, patent, trademark, and +attribution notices from the Source form of the Work, +excluding those notices that do not pertain to any part of +the Derivative Works; and + +(d) If the Work includes a "NOTICE" text file as part of its +distribution, then any Derivative Works that You distribute must +include a readable copy of the attribution notices contained +within such NOTICE file, excluding those notices that do not +pertain to any part of the Derivative Works, in at least one +of the following places: within a NOTICE text file distributed +as part of the Derivative Works; within the Source form or +documentation, if provided along with the Derivative Works; or, +within a display generated by the Derivative Works, if and +wherever such third-party notices normally appear. The contents +of the NOTICE file are for informational purposes only and +do not modify the License. You may add Your own attribution +notices within Derivative Works that You distribute, alongside +or as an addendum to the NOTICE text from the Work, provided +that such additional attribution notices cannot be construed +as modifying the License. + +You may add Your own copyright statement to Your modifications and +may provide additional or different license terms and conditions +for use, reproduction, or distribution of Your modifications, or +for any such Derivative Works as a whole, provided Your use, +reproduction, and distribution of the Work otherwise complies with +the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, +any Contribution intentionally submitted for inclusion in the Work +by You to the Licensor shall be under the terms and conditions of +this License, without any additional terms or conditions. +Notwithstanding the above, nothing herein shall supersede or modify +the terms of any separate license agreement you may have executed +with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade +names, trademarks, service marks, or product names of the Licensor, +except as required for reasonable and customary use in describing the +origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or +agreed to in writing, Licensor provides the Work (and each +Contributor provides its Contributions) on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied, including, without limitation, any warranties or conditions +of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A +PARTICULAR PURPOSE. You are solely responsible for determining the +appropriateness of using or redistributing the Work and assume any +risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, +whether in tort (including negligence), contract, or otherwise, +unless required by applicable law (such as deliberate and grossly +negligent acts) or agreed to in writing, shall any Contributor be +liable to You for damages, including any direct, indirect, special, +incidental, or consequential damages of any character arising as a +result of this License or out of the use or inability to use the +Work (including but not limited to damages for loss of goodwill, +work stoppage, computer failure or malfunction, or any and all +other commercial damages or losses), even if such Contributor +has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing +the Work or Derivative Works thereof, You may choose to offer, +and charge a fee for, acceptance of support, warranty, indemnity, +or other liability obligations and/or rights consistent with this +License. However, in accepting such obligations, You may act only +on Your own behalf and on Your sole responsibility, not on behalf +of any other Contributor, and only if You agree to indemnify, +defend, and hold each Contributor harmless for any liability +incurred by, or claims asserted against, such Contributor by reason +of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + +To apply the Apache License to your work, attach the following +boilerplate notice, with the fields enclosed by brackets "{}" +replaced with your own identifying information. (Don't include +the brackets!) The text should be enclosed in the appropriate +comment syntax for the file format. We also recommend that a +file or class name and description of purpose be included on the +same "printed page" as the copyright notice for easier +identification within third-party archives. + +Copyright {yyyy} {name of copyright owner} + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/futures-mutex/README.md b/futures-mutex/README.md new file mode 100644 index 00000000..a97e7644 --- /dev/null +++ b/futures-mutex/README.md @@ -0,0 +1,28 @@ +# futures-mutex + +*A Mutex for the Future(s)* + +[![Crates.io](https://img.shields.io/crates/v/futures-mutex.svg)](https://crates.io/crates/futures-mutex) +[![Docs.rs](https://docs.rs/futures-mutex/badge.svg)](https://docs.rs/futures-mutex) + +## Usage + +Add this to your `Cargo.toml`: + +```toml +[dependencies] +futures-mutex = "0.2.0" +``` + +Then, add this to your crate: + +```rust +extern crate futures_mutex; +``` + +`FutMutex` follows a very similar API to [`futures::sync::BiLock`](https://docs.rs/futures/0.1.11/futures/sync/struct.BiLock.html), however it can have more than two handles. + +## License + +`futures-mutex` is distributed under the Apache License v2.0. See the LICENSE +file for the full text of the license. diff --git a/futures-mutex/src/lib.rs b/futures-mutex/src/lib.rs new file mode 100644 index 00000000..9ba4b57a --- /dev/null +++ b/futures-mutex/src/lib.rs @@ -0,0 +1,319 @@ +//! A Mutex for the Future(s) +//! +//! API is similar to [`futures::sync::BiLock`](https://docs.rs/futures/0.1.11/futures/sync/struct.BiLock.html) +//! However, it can be cloned into as many handles as desired. +//! +//! ``` +//! extern crate futures; +//! extern crate futures_mutex; +//! +//! use futures::{Future, Poll, Async}; +//! use futures_mutex::Mutex; +//! +//! struct AddTwo { +//! lock: Mutex +//! } +//! +//! impl Future for AddTwo { +//! type Item = usize; +//! type Error = (); +//! fn poll(&mut self) -> Poll { +//! match self.lock.poll_lock() { +//! Async::Ready(mut g) => { +//! *g += 2; +//! Ok(Async::Ready(*g)) +//! }, +//! Async::NotReady => Ok(Async::NotReady) +//! } +//! } +//! } +//! +//! fn main() { +//! let lock1: Mutex = Mutex::new(0); +//! let lock2 = lock1.clone(); +//! +//! let future = AddTwo { lock: lock2 }; +//! +//! // This future will return the current value and the recovered lock. +//! let used_lock = lock1.lock().map(|b| (*b, b.unlock())); +//! +//! let _ = future.join(used_lock).map(|(add_two, (value, _))| { +//! assert_eq!(add_two, value); +//! }).wait().unwrap(); +//! } +//! ``` + +extern crate futures; +extern crate crossbeam; + +use std::ops::{Deref, DerefMut}; +use std::mem; +use std::sync::Arc; +use std::sync::atomic::{Ordering, AtomicBool}; +use std::cell::UnsafeCell; +use crossbeam::sync::MsQueue; +use futures::task::{current, Task}; +use futures::{Future, Poll, Async}; + +#[derive(Debug)] +struct Inner { + wait_queue: MsQueue, + locked: AtomicBool, + data: UnsafeCell +} + +impl Drop for Inner { + fn drop(&mut self) { + assert!(!self.locked.load(Ordering::SeqCst)) + } +} + +unsafe impl Send for Inner {} +unsafe impl Sync for Inner {} + +/// A Mutex designed for use inside Futures. Works like `BiLock` from the `futures` crate, but +/// with more than 2 handles. +/// +/// **THIS IS NOT A GENRAL PURPOSE MUTEX! IF YOU CALL `poll_lock` OR `lock` OUTSIDE THE CONTEXT OF A TASK, IT WILL PANIC AND EAT YOUR LAUNDRY.** +/// +/// This type provides a Mutex that will track tasks that are requesting the Mutex, and will unpark +/// them in the order they request the lock. +/// +/// *As of now, there is no strong guarantee that a particular handle of the lock won't be starved. Hopefully the use of the queue will prevent this, but I haven't tried to test that.* +#[derive(Debug)] +pub struct Mutex { + inner: Arc> +} + +impl Mutex { + /// Create a new Mutex wrapping around a value `t` + pub fn new(t: T) -> Mutex { + let inner = Arc::new(Inner { + wait_queue: MsQueue::new(), + locked: AtomicBool::new(false), + data: UnsafeCell::new(t) + }); + + Mutex { + inner: inner + } + } + + /// This will attempt a non-blocking lock on the mutex, returning `Async::NotReady` if it + /// can't be acquired. + /// + /// This function will return immediatly with a `MutexGuard` if the lock was acquired + /// successfully. When it drops, the lock will be unlocked. + /// + /// If it can't acquire the lock, it will schedule the current task to be unparked when it + /// might be able lock the mutex again. + /// + /// # Panics + /// + /// This function will panic if called outside the context of a future's task. + pub fn poll_lock(&self) -> Async> { + if self.inner.locked.compare_and_swap(false, true, Ordering::SeqCst) { + self.inner.wait_queue.push(current()); + Async::NotReady + } else { + Async::Ready(MutexGuard{ inner: self }) + } + } + + /// Convert this lock into a future that resolves to a guard that allows access to the data. + /// This function returns `MutexAcquire`, which resolves to a `MutexGuard` + /// guard type. + /// + /// The returned future will never return an error. + pub fn lock(&self) -> MutexAcquire { + MutexAcquire { + inner: self + } + } + + /// We unlock the mutex and wait for someone to lock. We try and unpark as many tasks as we + /// can to prevents dead tasks from deadlocking the mutex, or tasks that have finished their + /// critical section and were awakened. + fn unlock(&self) { + if !self.inner.locked.swap(false, Ordering::SeqCst) { + panic!("Tried to unlock an already unlocked Mutex, something has gone terribly wrong"); + } + + while !self.inner.locked.load(Ordering::SeqCst) { + match self.inner.wait_queue.try_pop() { + Some(task) => task.notify(), + None => return + } + } + } +} + +impl Clone for Mutex { + fn clone(&self) -> Mutex { + Mutex { + inner: self.inner.clone() + } + } +} + +/// Returned RAII guard from the `poll_lock` method. +/// +/// This structure acts as a sentinel to the data in the `Mutex` itself, +/// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be +/// unlocked. +#[derive(Debug)] +pub struct MutexGuard<'a, T: 'a> { + inner: &'a Mutex +} + +impl<'a, T> Deref for MutexGuard<'a, T> { + type Target = T; + fn deref(&self) -> &Self::Target { + unsafe { &*self.inner.inner.data.get() } + } +} + +impl<'a, T> DerefMut for MutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.inner.inner.data.get() } + } +} + +impl<'a, T> Drop for MutexGuard<'a, T> { + fn drop(&mut self) { + self.inner.unlock(); + } +} + +/// Future returned by `Mutex::lock` which resolves to a guard when a lock is acquired. +#[derive(Debug)] +pub struct MutexAcquire<'a, T: 'a> { + inner: &'a Mutex +} + +impl<'a, T> Future for MutexAcquire<'a, T> { + type Item = MutexGuard<'a, T>; + type Error = (); + + fn poll(&mut self) -> Poll { + Ok(self.inner.poll_lock()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::executor::{self, Notify}; + use futures::future; + use futures::stream::{self, Stream}; + use std::thread; + + pub fn unpark_noop() -> Arc { + struct Foo; + + impl Notify for Foo { + fn notify(&self, id: usize) {} + } + + Arc::new(Foo) + } + + #[test] + fn simple() { + let future = future::lazy(|| { + let lock1 = Mutex::new(1); + let lock2 = lock1.clone(); + let lock3 = lock1.clone(); + + let mut guard = match lock1.poll_lock() { + Async::Ready(g) => g, + Async::NotReady => panic!("poll not ready"), + }; + assert_eq!(*guard, 1); + *guard = 2; + + assert!(lock1.poll_lock().is_not_ready()); + assert!(lock2.poll_lock().is_not_ready()); + assert!(lock3.poll_lock().is_not_ready()); + drop(guard); + + assert!(lock1.poll_lock().is_ready()); + assert!(lock2.poll_lock().is_ready()); + assert!(lock3.poll_lock().is_ready()); + + let guard = match lock2.poll_lock() { + Async::Ready(g) => g, + Async::NotReady => panic!("poll not ready"), + }; + assert_eq!(*guard, 2); + + assert!(lock1.poll_lock().is_not_ready()); + assert!(lock2.poll_lock().is_not_ready()); + assert!(lock3.poll_lock().is_not_ready()); + + Ok::<(), ()>(()) + }); + + assert!(executor::spawn(future) + .poll_future_notify(unpark_noop()) + .expect("failure in poll") + .is_ready()); + } + + #[test] + fn concurrent() { + const N: usize = 10000; + let lock1 = Mutex::new(0); + let lock2 = lock1.clone(); + + let a = Increment { + a: Some(lock1), + remaining: N, + }; + let b = stream::iter_ok((0..N).map(Ok::<_, ()>)).fold(lock2, |b, _n| { + b.lock().map(|mut b| { + *b += 1; + b.unlock() + }) + }); + + let t1 = thread::spawn(move || a.wait()); + let b = b.wait().expect("b error"); + let a = t1.join().unwrap().expect("a error"); + + match a.poll_lock() { + Async::Ready(l) => assert_eq!(*l, 2 * N), + Async::NotReady => panic!("poll not ready"), + } + match b.poll_lock() { + Async::Ready(l) => assert_eq!(*l, 2 * N), + Async::NotReady => panic!("poll not ready"), + } + + struct Increment { + remaining: usize, + a: Option>, + } + + impl Future for Increment { + type Item = Mutex; + type Error = (); + + fn poll(&mut self) -> Poll, ()> { + loop { + if self.remaining == 0 { + return Ok(self.a.take().unwrap().into()) + } + + let a = self.a.as_ref().unwrap(); + let mut a = match a.poll_lock() { + Async::Ready(l) => l, + Async::NotReady => return Ok(Async::NotReady), + }; + self.remaining -= 1; + *a += 1; + } + } + } + } +} diff --git a/libp2p-swarm/src/connection_reuse.rs b/libp2p-swarm/src/connection_reuse.rs index d8a20bd7..04ecc894 100644 --- a/libp2p-swarm/src/connection_reuse.rs +++ b/libp2p-swarm/src/connection_reuse.rs @@ -43,14 +43,14 @@ //! TODO: this whole code is a dummy and should be rewritten after the design has been properly //! figured out. -use futures::{Future, Stream, Async, Poll}; +use futures::{Async, Future, Poll, Stream}; use futures::stream::Fuse as StreamFuse; use futures::stream; use multiaddr::Multiaddr; use muxing::StreamMuxer; use smallvec::SmallVec; use std::io::Error as IoError; -use transport::{Transport, ConnectionUpgrade, UpgradedNode, MuxedTransport}; +use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode}; /// Allows reusing the same muxed connection multiple times. /// @@ -59,16 +59,18 @@ use transport::{Transport, ConnectionUpgrade, UpgradedNode, MuxedTransport}; /// Implements the `Transport` trait. #[derive(Clone)] pub struct ConnectionReuse - where T: Transport, - C: ConnectionUpgrade +where + T: Transport, + C: ConnectionUpgrade, { // Underlying transport and connection upgrade for when we need to dial or listen. inner: UpgradedNode, } impl From> for ConnectionReuse - where T: Transport, - C: ConnectionUpgrade +where + T: Transport, + C: ConnectionUpgrade, { #[inline] fn from(node: UpgradedNode) -> ConnectionReuse { @@ -77,20 +79,16 @@ impl From> for ConnectionReuse } impl Transport for ConnectionReuse - where T: Transport + 'static, // TODO: 'static :( - C: ConnectionUpgrade + 'static, // TODO: 'static :( - C: Clone, - C::Output: StreamMuxer + Clone, - C::NamesIter: Clone // TODO: not elegant +where + T: Transport + 'static, // TODO: 'static :( + C: ConnectionUpgrade + 'static, // TODO: 'static :( + C: Clone, + C::Output: StreamMuxer + Clone, + C::NamesIter: Clone, // TODO: not elegant { type RawConn = ::Substream; type Listener = ConnectionReuseListener< - Box< - Stream< - Item = (C::Output, Multiaddr), - Error = IoError, - >, - >, + Box>, C::Output, >; type Dial = Box>; @@ -125,51 +123,54 @@ impl Transport for ConnectionReuse } impl MuxedTransport for ConnectionReuse - where T: Transport + 'static, // TODO: 'static :( - C: ConnectionUpgrade + 'static, // TODO: 'static :( - C: Clone, - C::Output: StreamMuxer + Clone, - C::NamesIter: Clone // TODO: not elegant +where + T: Transport + 'static, // TODO: 'static :( + C: ConnectionUpgrade + 'static, // TODO: 'static :( + C: Clone, + C::Output: StreamMuxer + Clone, + C::NamesIter: Clone, // TODO: not elegant { - type Incoming = Box::Substream, Error = IoError>>; - type DialAndListen = Box>; + type Incoming = stream::AndThen< + stream::Repeat, + fn(C::Output) + -> <>::Output as StreamMuxer>::InboundSubstream, + <>::Output as StreamMuxer>::InboundSubstream, + >; + type Outgoing = + <>::Output as StreamMuxer>::OutboundSubstream; + type DialAndListen = Box>; fn dial_and_listen(self, addr: Multiaddr) -> Result { - let muxer_dial = match self.inner.dial(addr) { - Ok(l) => l, - Err((inner, addr)) => { - return Err((ConnectionReuse { inner: inner }, addr)); - } - }; - - let future = muxer_dial - .and_then(|muxer| { - let dial = muxer.clone().outbound(); - dial.map(|d| (d, muxer)) + self.inner + .dial(addr) + .map_err(|(inner, addr)| (ConnectionReuse { inner: inner }, addr)) + .map(|fut| { + fut.map(|muxer| { + ( + stream::repeat(muxer.clone()).and_then(StreamMuxer::inbound as fn(_) -> _), + muxer.outbound(), + ) + }) }) - .and_then(|(dial, muxer)| { - let listener = stream::repeat(muxer).and_then(|muxer| muxer.inbound()); - let listener = Box::new(listener) as Box>; - Ok((dial, listener)) - }); - - Ok(Box::new(future) as Box<_>) + .map(|fut| Box::new(fut) as _) } } /// Implementation of `Stream - where S: Stream, - M: StreamMuxer +where + S: Stream, + M: StreamMuxer, { listener: StreamFuse, connections: Vec<(M, ::InboundSubstream, Multiaddr)>, } impl Stream for ConnectionReuseListener - where S: Stream, - M: StreamMuxer + Clone + 'static // TODO: 'static :( +where + S: Stream, + M: StreamMuxer + Clone + 'static, // TODO: 'static :( { type Item = (M::Substream, Multiaddr); type Error = IoError; @@ -207,7 +208,7 @@ impl Stream for ConnectionReuseListener *next_incoming = new_next; return Ok(Async::Ready(Some((incoming, client_addr.clone())))); } - Ok(Async::NotReady) => {}, + Ok(Async::NotReady) => {} Err(_) => { connections_to_drop.push(index); } diff --git a/libp2p-swarm/src/transport.rs b/libp2p-swarm/src/transport.rs index 870248e1..cc0e2819 100644 --- a/libp2p-swarm/src/transport.rs +++ b/libp2p-swarm/src/transport.rs @@ -30,8 +30,8 @@ //! in a complex chain of protocols negotiation. use bytes::Bytes; -use futures::{Stream, Poll, Async}; -use futures::future::{IntoFuture, Future, ok as future_ok, FutureResult, FromErr}; +use futures::{Async, Poll, Stream}; +use futures::future::{self, FromErr, Future, FutureResult, IntoFuture}; use multiaddr::Multiaddr; use multistream_select; use muxing::StreamMuxer; @@ -67,15 +67,18 @@ pub trait Transport { /// Returns the address back if it isn't supported. /// /// > **Note**: The reason why we need to change the `Multiaddr` on success is to handle - /// > situations such as turning `/ip4/127.0.0.1/tcp/0` into - /// > `/ip4/127.0.0.1/tcp/`. + /// > situations such as turning `/ip4/127.0.0.1/tcp/0` into + /// > `/ip4/127.0.0.1/tcp/`. fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> - where Self: Sized; + where + Self: Sized; /// Dial to the given multi-addr. /// /// Returns either a future which may resolve to a connection, or gives back the multiaddress. - fn dial(self, addr: Multiaddr) -> Result where Self: Sized; + fn dial(self, addr: Multiaddr) -> Result + where + Self: Sized; /// Builds a new struct that implements `Transport` that contains both `self` and `other`. /// @@ -83,7 +86,8 @@ pub trait Transport { /// return an error then `other` will be tried. #[inline] fn or_transport(self, other: T) -> OrTransport - where Self: Sized + where + Self: Sized, { OrTransport(self, other) } @@ -95,8 +99,9 @@ pub trait Transport { /// > (communication encryption), *multiplex*, but also a protocol handler. #[inline] fn with_upgrade(self, upgrade: U) -> UpgradedNode - where Self: Sized, - U: ConnectionUpgrade + where + Self: Sized, + U: ConnectionUpgrade, { UpgradedNode { transports: self, @@ -111,14 +116,18 @@ pub trait MuxedTransport: Transport { /// Produces substreams on the dialed connection. type Incoming: Stream; - /// Future indicating when dialing succeeded. - type DialAndListen: Future; + /// Future resolving to an outgoing connection + type Outgoing: Future; + + /// Future resolving to a tuple of `(Incoming, Outgoing)` + type DialAndListen: Future; /// Dial to the given multi-addr, and listen to incoming substreams on the dialed connection. /// /// Returns either a future which may resolve to a connection, or gives back the multiaddress. fn dial_and_listen(self, addr: Multiaddr) -> Result - where Self: Sized; + where + Self: Sized; } /// Dummy implementation of `Transport` that just denies every single attempt. @@ -145,7 +154,8 @@ impl Transport for DeniedTransport { impl MuxedTransport for DeniedTransport { // TODO: could use `!` once stable type Incoming = Box>; - type DialAndListen = Box>; + type Outgoing = Box>; + type DialAndListen = Box>; #[inline] fn dial_and_listen(self, addr: Multiaddr) -> Result { @@ -158,15 +168,14 @@ impl MuxedTransport for DeniedTransport { pub struct OrTransport(A, B); impl Transport for OrTransport - where A: Transport, - B: Transport +where + A: Transport, + B: Transport, { type RawConn = EitherSocket; type Listener = EitherListenStream; - type Dial = EitherTransportFuture< - ::Future, - ::Future, - >; + type Dial = + EitherTransportFuture<::Future, ::Future>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { let (first, addr) = match self.0.listen_on(addr) { @@ -206,7 +215,8 @@ impl SimpleProtocol { /// Builds a `SimpleProtocol`. #[inline] pub fn new(name: N, upgrade: F) -> SimpleProtocol - where N: Into + where + N: Into, { SimpleProtocol { name: name.into(), @@ -226,52 +236,66 @@ impl Clone for SimpleProtocol { } impl MuxedTransport for OrTransport - where A: MuxedTransport, - B: MuxedTransport, +where + A: MuxedTransport, + B: MuxedTransport, + A::DialAndListen: 'static, + B::DialAndListen: 'static, { type Incoming = EitherIncomingStream; - type DialAndListen = EitherMuxedTransportFuture; + type Outgoing = future::Either< + future::Map Self::RawConn>, + future::Map Self::RawConn>, + >; + type DialAndListen = Box>; - fn dial_and_listen(self, addr: Multiaddr) - -> Result - { + fn dial_and_listen(self, addr: Multiaddr) -> Result { let (first, addr) = match self.0.dial_and_listen(addr) { Ok(connec) => { - return Ok(EitherMuxedTransportFuture::First(connec)); - }, + return Ok(Box::new(connec.map(|(inc, out)| { + ( + EitherIncomingStream::First(inc), + future::Either::A(out.map(EitherSocket::First as fn(_) -> _)), + ) + }))); + } Err(err) => err, }; match self.1.dial_and_listen(addr) { - Ok(connec) => { - Ok(EitherMuxedTransportFuture::Second(connec)) - }, + Ok(connec) => Ok(Box::new(connec.map(|(inc, out)| { + ( + EitherIncomingStream::Second(inc), + future::Either::B(out.map(EitherSocket::Second as fn(_) -> _)), + ) + }))), Err((second, addr)) => Err((OrTransport(first, second), addr)), } } } impl ConnectionUpgrade for SimpleProtocol - where C: AsyncRead + AsyncWrite, - F: Fn(C) -> O, - O: IntoFuture, +where + C: AsyncRead + AsyncWrite, + F: Fn(C) -> O, + O: IntoFuture, { - type NamesIter = iter::Once<(Bytes, ())>; - type UpgradeIdentifier = (); + type NamesIter = iter::Once<(Bytes, ())>; + type UpgradeIdentifier = (); - #[inline] - fn protocol_names(&self) -> Self::NamesIter { - iter::once((self.name.clone(), ())) - } + #[inline] + fn protocol_names(&self) -> Self::NamesIter { + iter::once((self.name.clone(), ())) + } - type Output = O::Item; - type Future = FromErr; + type Output = O::Item; + type Future = FromErr; - #[inline] - fn upgrade(self, socket: C, _: (), _: Endpoint) -> Self::Future { + #[inline] + fn upgrade(self, socket: C, _: (), _: Endpoint) -> Self::Future { let upgrade = &self.upgrade; upgrade(socket).into_future().from_err() - } + } } /// Implements `Stream` and dispatches all method calls to either `First` or `Second`. @@ -282,8 +306,9 @@ pub enum EitherListenStream { } impl Stream for EitherListenStream - where A: Stream, - B: Stream +where + A: Stream, + B: Stream, { type Item = (EitherSocket, Multiaddr); type Error = IoError; @@ -291,12 +316,10 @@ impl Stream for EitherListenStream #[inline] fn poll(&mut self) -> Poll, Self::Error> { match self { - &mut EitherListenStream::First(ref mut a) => { - a.poll().map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::First(s), a)))) - } - &mut EitherListenStream::Second(ref mut a) => { - a.poll().map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::Second(s), a)))) - } + &mut EitherListenStream::First(ref mut a) => a.poll() + .map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::First(s), a)))), + &mut EitherListenStream::Second(ref mut a) => a.poll() + .map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::Second(s), a)))), } } } @@ -309,8 +332,9 @@ pub enum EitherIncomingStream { } impl Stream for EitherIncomingStream - where A: Stream, - B: Stream +where + A: Stream, + B: Stream, { type Item = EitherSocket; type Error = IoError; @@ -341,8 +365,9 @@ pub enum EitherTransportFuture { } impl Future for EitherTransportFuture - where A: Future, - B: Future +where + A: Future, + B: Future, { type Item = EitherSocket; type Error = IoError; @@ -362,40 +387,6 @@ impl Future for EitherTransportFuture } } -/// Implements `Future` and redirects calls to either `First` or `Second`. -/// -/// Additionally, the output will be wrapped inside a `EitherSocket` and a `EitherIncomingStream`. -/// -/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be -/// > removed eventually. -#[derive(Debug, Copy, Clone)] -pub enum EitherMuxedTransportFuture { - First(A), - Second(B), -} - -impl Future for EitherMuxedTransportFuture - where A: Future, - B: Future -{ - type Item = (EitherSocket, EitherIncomingStream); - type Error = IoError; - - #[inline] - fn poll(&mut self) -> Poll { - match self { - &mut EitherMuxedTransportFuture::First(ref mut a) => { - let (dial, listen) = try_ready!(a.poll()); - Ok(Async::Ready((EitherSocket::First(dial), EitherIncomingStream::First(listen)))) - } - &mut EitherMuxedTransportFuture::Second(ref mut b) => { - let (dial, listen) = try_ready!(b.poll()); - Ok(Async::Ready((EitherSocket::Second(dial), EitherIncomingStream::Second(listen)))) - } - } - } -} - /// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to either `First` or /// `Second`. #[derive(Debug, Copy, Clone)] @@ -405,8 +396,9 @@ pub enum EitherSocket { } impl AsyncRead for EitherSocket - where A: AsyncRead, - B: AsyncRead +where + A: AsyncRead, + B: AsyncRead, { #[inline] unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { @@ -418,8 +410,9 @@ impl AsyncRead for EitherSocket } impl Read for EitherSocket - where A: Read, - B: Read +where + A: Read, + B: Read, { #[inline] fn read(&mut self, buf: &mut [u8]) -> Result { @@ -431,8 +424,9 @@ impl Read for EitherSocket } impl AsyncWrite for EitherSocket - where A: AsyncWrite, - B: AsyncWrite +where + A: AsyncWrite, + B: AsyncWrite, { #[inline] fn shutdown(&mut self) -> Poll<(), IoError> { @@ -444,8 +438,9 @@ impl AsyncWrite for EitherSocket } impl Write for EitherSocket - where A: Write, - B: Write +where + A: Write, + B: Write, { #[inline] fn write(&mut self, buf: &[u8]) -> Result { @@ -465,8 +460,9 @@ impl Write for EitherSocket } impl StreamMuxer for EitherSocket - where A: StreamMuxer, - B: StreamMuxer, +where + A: StreamMuxer, + B: StreamMuxer, { type Substream = EitherSocket; type InboundSubstream = EitherTransportFuture; @@ -521,8 +517,7 @@ pub trait ConnectionUpgrade { /// /// Because performing the upgrade may not be instantaneous (eg. it may require a handshake), /// this function returns a future instead of the direct output. - fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) - -> Self::Future; + fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) -> Self::Future; } /// Type of connection for the upgrade. @@ -539,9 +534,10 @@ pub enum Endpoint { pub struct OrUpgrade(A, B); impl ConnectionUpgrade for OrUpgrade - where C: AsyncRead + AsyncWrite, - A: ConnectionUpgrade, - B: ConnectionUpgrade +where + C: AsyncRead + AsyncWrite, + A: ConnectionUpgrade, + B: ConnectionUpgrade, { type NamesIter = NamesIterChain; type UpgradeIdentifier = EitherUpgradeIdentifier; @@ -558,9 +554,7 @@ impl ConnectionUpgrade for OrUpgrade type Future = EitherConnUpgrFuture; #[inline] - fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) - -> Self::Future - { + fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) -> Self::Future { match id { EitherUpgradeIdentifier::First(id) => { EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty)) @@ -592,8 +586,9 @@ pub enum EitherConnUpgrFuture { } impl Future for EitherConnUpgrFuture - where A: Future, - B: Future +where + A: Future, + B: Future, { type Item = EitherSocket; type Error = IoError; @@ -624,8 +619,9 @@ pub struct NamesIterChain { } impl Iterator for NamesIterChain - where A: Iterator, - B: Iterator +where + A: Iterator, + B: Iterator, { type Item = (Bytes, EitherUpgradeIdentifier); @@ -661,7 +657,8 @@ impl Iterator for NamesIterChain pub struct PlainTextConfig; impl ConnectionUpgrade for PlainTextConfig - where C: AsyncRead + AsyncWrite +where + C: AsyncRead + AsyncWrite, { type Output = C; type Future = FutureResult; @@ -670,7 +667,7 @@ impl ConnectionUpgrade for PlainTextConfig #[inline] fn upgrade(self, i: C, _: (), _: Endpoint) -> Self::Future { - future_ok(i) + future::ok(i) } #[inline] @@ -690,8 +687,9 @@ pub struct UpgradedNode { } impl<'a, T, C> UpgradedNode - where T: Transport + 'a, - C: ConnectionUpgrade + 'a +where + T: Transport + 'a, + C: ConnectionUpgrade + 'a, { /// Builds a new struct that implements `ConnectionUpgrade` that contains both `self` and /// `other_upg`. @@ -700,7 +698,8 @@ impl<'a, T, C> UpgradedNode /// of `other_upg`, then upgrade the connection to the negogiated protocol. #[inline] pub fn or_upgrade(self, other_upg: D) -> UpgradedNode> - where D: ConnectionUpgrade + 'a + where + D: ConnectionUpgrade + 'a, { UpgradedNode { transports: self.transports, @@ -733,17 +732,17 @@ impl<'a, T, C> UpgradedNode }; let future = dialed_fut - // Try to negotiate the protocol. - .and_then(move |connection| { - let iter = upgrade.protocol_names() - .map(|(name, id)| (name, ::eq, id)); - let negotiated = multistream_select::dialer_select_proto(connection, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)); - negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade)) - }) - .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) - }); + // Try to negotiate the protocol. + .and_then(move |connection| { + let iter = upgrade.protocol_names() + .map(|(name, id)| (name, ::eq, id)); + let negotiated = multistream_select::dialer_select_proto(connection, iter) + .map_err(|err| IoError::new(IoErrorKind::Other, err)); + negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade)) + }) + .and_then(|(upgrade_id, connection, upgrade)| { + upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) + }); Ok(Box::new(future)) } @@ -754,62 +753,87 @@ impl<'a, T, C> UpgradedNode /// /// Note that this does the same as `MuxedTransport::dial_and_listen`, but with less /// restrictions on the trait requirements. - pub fn dial_and_listen(self, addr: Multiaddr) - -> Result + 'a>), Error = IoError> + 'a>, (Self, Multiaddr)> - where T: MuxedTransport, - C::NamesIter: Clone, // TODO: not elegant - C: Clone + pub fn dial_and_listen( + self, + addr: Multiaddr, + ) -> Result< + Box< + Future< + Item = ( + Box + 'a>, + Box + 'a>, + ), + Error = IoError, + > + + 'a, + >, + (Self, Multiaddr), + > + where + T: MuxedTransport, + C::NamesIter: Clone, // TODO: not elegant + C: Clone, { let upgrade = self.upgrade; + let upgrade2 = upgrade.clone(); - let dialed_fut = match self.transports.dial_and_listen(addr) { - Ok(f) => f, - Err((trans, addr)) => { + self.transports + .dial_and_listen(addr) + .map_err(move |(trans, addr)| { let builder = UpgradedNode { transports: trans, upgrade: upgrade, }; - return Err((builder, addr)); - } - }; + (builder, addr) + }) + .map(move |dialed_fut| { + let dialed_fut = dialed_fut + // Try to negotiate the protocol. + .map(move |(in_stream, dialer)| { + let upgrade = upgrade2.clone(); - let upgrade2 = upgrade.clone(); - let dialed_fut = dialed_fut - // Try to negotiate the protocol. - .and_then(move |(dialer, in_stream)| { - let dialer = { - let iter = upgrade2.protocol_names() - .map(|(name, id)| (name, ::eq, id)); - let negotiated = multistream_select::dialer_select_proto(dialer, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)); - negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade2)) - } - .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) - }); + let dialer = { + let iter = upgrade2.protocol_names() + .map(|(name, id)| (name, ::eq, id)); + let negotiated = dialer.and_then(|dialer| { + multistream_select::dialer_select_proto(dialer, iter) + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + }); + negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade2)) + } + .and_then(|(upgrade_id, connection, upgrade)| { + upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer) + }); - let in_stream = in_stream - // Try to negotiate the protocol. - .and_then(move |connection| { - let upgrade = upgrade.clone(); - #[inline] - fn iter_map((n, t): (Bytes, T)) -> (Bytes, fn(&Bytes,&Bytes)->bool, T) { - (n, ::eq, t) - } - let iter = upgrade.protocol_names().map(iter_map); - let negotiated = multistream_select::listener_select_proto(connection, iter); - negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade)) - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - }) - .and_then(|(upgrade_id, connection, upgrade)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) - }); + let in_stream = in_stream + // Try to negotiate the protocol. + .and_then(move |connection| { + let upgrade = upgrade.clone(); - dialer.map(|d| (d, Box::new(in_stream) as Box>)) - }); - - Ok(Box::new(dialed_fut) as Box<_>) + let iter = upgrade.protocol_names() + .map((|(n, t)| { + (n, ::eq, t) + }) as fn(_) -> _); + let negotiated = multistream_select::listener_select_proto( + connection, + iter, + ); + negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade)) + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + }) + .and_then(|(upgrade_id, connection, upgrade)| { + upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) + }); + + ( + Box::new(in_stream) as Box>, + Box::new(dialer) as Box>, + ) + }); + + Box::new(dialed_fut) as _ + }) } /// Start listening on the multiaddr using the transport that was passed to `new`. @@ -822,11 +846,15 @@ impl<'a, T, C> UpgradedNode self, addr: Multiaddr, ) -> Result< - (Box + 'a>, Multiaddr), + ( + Box + 'a>, + Multiaddr, + ), (Self, Multiaddr), > - where C::NamesIter: Clone, // TODO: not elegant - C: Clone + where + C::NamesIter: Clone, // TODO: not elegant + C: Clone, { let upgrade = self.upgrade; @@ -843,33 +871,34 @@ impl<'a, T, C> UpgradedNode }; let stream = listening_stream - // Try to negotiate the protocol. - .and_then(move |(connection, client_addr)| { - let upgrade = upgrade.clone(); - #[inline] - fn iter_map((n, t): (Bytes, T)) -> (Bytes, fn(&Bytes,&Bytes)->bool, T) { - (n, ::eq, t) - } - let iter = upgrade.protocol_names().map(iter_map); - let negotiated = multistream_select::listener_select_proto(connection, iter); - negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr)) - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - }) - .and_then(|(upgrade_id, connection, upgrade, client_addr)| { - upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) - .map(|s| (s, client_addr)) - }); + // Try to negotiate the protocol. + .and_then(move |(connection, client_addr)| { + let upgrade = upgrade.clone(); + #[inline] + fn iter_map((n, t): (Bytes, T)) -> (Bytes, fn(&Bytes,&Bytes)->bool, T) { + (n, ::eq, t) + } + let iter = upgrade.protocol_names().map(iter_map); + let negotiated = multistream_select::listener_select_proto(connection, iter); + negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr)) + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + }) + .and_then(|(upgrade_id, connection, upgrade, client_addr)| { + upgrade.upgrade(connection, upgrade_id, Endpoint::Listener) + .map(|s| (s, client_addr)) + }); Ok((Box::new(stream), new_addr)) } } impl Transport for UpgradedNode - where T: Transport + 'static, - C: ConnectionUpgrade + 'static, - C::Output: AsyncRead + AsyncWrite, - C::NamesIter: Clone, // TODO: not elegant - C: Clone +where + T: Transport + 'static, + C: ConnectionUpgrade + 'static, + C::Output: AsyncRead + AsyncWrite, + C::NamesIter: Clone, // TODO: not elegant + C: Clone, { type RawConn = C::Output; type Listener = Box>; @@ -887,17 +916,20 @@ impl Transport for UpgradedNode } impl MuxedTransport for UpgradedNode - where T: MuxedTransport + 'static, - C: ConnectionUpgrade + 'static, - C::Output: AsyncRead + AsyncWrite, - C::NamesIter: Clone, // TODO: not elegant - C: Clone +where + T: MuxedTransport + 'static, + C: ConnectionUpgrade + 'static, + C::Output: AsyncRead + AsyncWrite, + C::NamesIter: Clone, // TODO: not elegant + C: Clone, { type Incoming = Box>; - type DialAndListen = Box>; + type Outgoing = Box>; + type DialAndListen = Box>; #[inline] fn dial_and_listen(self, addr: Multiaddr) -> Result { + // Calls an inherent function above self.dial_and_listen(addr) } } diff --git a/multiplex-rs/Cargo.toml b/multiplex-rs/Cargo.toml index 57dccefd..8cedadc6 100644 --- a/multiplex-rs/Cargo.toml +++ b/multiplex-rs/Cargo.toml @@ -15,3 +15,4 @@ rand = "0.3.17" libp2p-swarm = { path = "../libp2p-swarm" } varint = { path = "../varint-rs" } error-chain = "0.11.0" +futures-mutex = { path = "../futures-mutex" } \ No newline at end of file diff --git a/multiplex-rs/src/lib.rs b/multiplex-rs/src/lib.rs index d7ebd113..11d9eacf 100644 --- a/multiplex-rs/src/lib.rs +++ b/multiplex-rs/src/lib.rs @@ -23,6 +23,7 @@ extern crate bytes; #[macro_use] extern crate error_chain; extern crate futures; +extern crate futures_mutex; extern crate libp2p_swarm as swarm; extern crate num_bigint; extern crate num_traits; @@ -42,7 +43,7 @@ use futures::future::{self, FutureResult}; use header::MultiplexHeader; use swarm::muxing::StreamMuxer; use swarm::{ConnectionUpgrade, Endpoint}; -use parking_lot::Mutex; +use futures_mutex::Mutex; use read::{read_stream, MultiplexReadState}; use shared::{buf_from_slice, ByteBuf, MultiplexShared}; use std::iter; @@ -74,7 +75,7 @@ pub struct Substream { impl Drop for Substream { fn drop(&mut self) { - let mut lock = self.state.lock(); + let mut lock = self.state.lock().wait().expect("This should never fail"); lock.close_stream(self.id); } @@ -110,9 +111,9 @@ impl Substream { // TODO: We always zero the buffer, we should delegate to the inner stream. impl Read for Substream { fn read(&mut self, buf: &mut [u8]) -> io::Result { - let mut lock = match self.state.try_lock() { - Some(lock) => lock, - None => return Err(io::ErrorKind::WouldBlock.into()), + let mut lock = match self.state.poll_lock() { + Async::Ready(lock) => lock, + Async::NotReady => return Err(io::ErrorKind::WouldBlock.into()), }; read_stream(&mut lock, (self.id, buf)) @@ -123,7 +124,10 @@ impl AsyncRead for Substream {} impl Write for Substream { fn write(&mut self, buf: &[u8]) -> io::Result { - let mut lock = self.state.try_lock().ok_or(io::ErrorKind::WouldBlock)?; + let mut lock = match self.state.poll_lock() { + Async::Ready(lock) => lock, + Async::NotReady => return Err(io::ErrorKind::WouldBlock.into()), + }; let mut buffer = self.buffer .take() @@ -143,11 +147,12 @@ impl Write for Substream { } fn flush(&mut self) -> io::Result<()> { - self.state - .try_lock() - .ok_or(io::ErrorKind::WouldBlock)? - .stream - .flush() + let mut lock = match self.state.poll_lock() { + Async::Ready(lock) => lock, + Async::NotReady => return Err(io::ErrorKind::WouldBlock.into()), + }; + + lock.stream.flush() } } @@ -167,9 +172,9 @@ impl Future for InboundFuture { type Error = io::Error; fn poll(&mut self) -> Poll { - let mut lock = match self.state.try_lock() { - Some(lock) => lock, - None => return Ok(Async::NotReady), + let mut lock = match self.state.poll_lock() { + Async::Ready(lock) => lock, + Async::NotReady => return Ok(Async::NotReady), }; // Attempt to make progress, but don't block if we can't @@ -217,7 +222,7 @@ impl OutboundFuture { } fn nonce_to_id(id: usize, end: Endpoint) -> u32 { - id as u32 * 2 + if end == Endpoint::Dialer { 50 } else { 0 } + id as u32 * 2 + if end == Endpoint::Dialer { 0 } else { 1 } } impl Future for OutboundFuture { @@ -225,9 +230,9 @@ impl Future for OutboundFuture { type Error = io::Error; fn poll(&mut self) -> Poll { - let mut lock = match self.state.try_lock() { - Some(lock) => lock, - None => return Ok(Async::NotReady), + let mut lock = match self.state.poll_lock() { + Async::Ready(lock) => lock, + Async::NotReady => return Ok(Async::NotReady), }; loop { @@ -380,7 +385,7 @@ mod tests { Some(id.to_string()) ); - let stream = io::Cursor::new(mplex.state.lock().stream.get_ref().clone()); + let stream = io::Cursor::new(mplex.state.lock().wait().unwrap().stream.get_ref().clone()); let mplex = Multiplex::listen(stream); @@ -422,7 +427,7 @@ mod tests { ); } - let stream = io::Cursor::new(mplex.state.lock().stream.get_ref().clone()); + let stream = io::Cursor::new(mplex.state.lock().wait().unwrap().stream.get_ref().clone()); let mplex = Multiplex::listen(stream); diff --git a/multiplex-rs/src/read.rs b/multiplex-rs/src/read.rs index 5df00b14..7337011e 100644 --- a/multiplex-rs/src/read.rs +++ b/multiplex-rs/src/read.rs @@ -116,7 +116,7 @@ pub fn read_stream<'a, O: Into>, T: AsyncRead>( let header = if let Some(header) = header { header } else { - return Ok(0); + return Ok(on_block.unwrap_or(0)); }; let MultiplexHeader { @@ -183,7 +183,7 @@ pub fn read_stream<'a, O: Into>, T: AsyncRead>( let length = if let Some(length) = length { length } else { - return Ok(0); + return Ok(on_block.unwrap_or(0)); }; lock.read_state = match next { @@ -208,7 +208,8 @@ pub fn read_stream<'a, O: Into>, T: AsyncRead>( let is_open = lock.open_streams .get(&substream_id) .map(SubstreamMetadata::open) - .unwrap_or_else(|| lock.to_open.contains_key(&substream_id)); + .unwrap_or(false); + //.unwrap_or_else(|| lock.to_open.contains_key(&substream_id)); if is_open { Some(MultiplexReadState::ParsingMessageBody { @@ -286,20 +287,16 @@ pub fn read_stream<'a, O: Into>, T: AsyncRead>( if remaining_bytes == 0 { lock.read_state = None; - - return on_block; } else if substream_id == *id { let number_read = *on_block.as_ref().unwrap_or(&0); - if buf.len() == 0 { - return Ok(0); - } else if number_read >= buf.len() { + if number_read >= buf.len() { lock.read_state = Some(ParsingMessageBody { substream_id, remaining_bytes, }); - return on_block; + return Ok(number_read); } let read_result = { diff --git a/multiplex-rs/src/write.rs b/multiplex-rs/src/write.rs index ab5c151f..5eaf40f1 100644 --- a/multiplex-rs/src/write.rs +++ b/multiplex-rs/src/write.rs @@ -92,6 +92,10 @@ pub fn write_stream( let id = write_request.header.substream_id; + if buf.get_ref().len() as u64 - buf.position() == 0 { + return Ok(0); + } + match (request.request_type, write_request.request_type) { (RequestType::Substream, RequestType::Substream) if request.header.substream_id != id => { use std::mem;