Fix multiplex waking and make it so that inbound streams can be handled concurrently with negotiating the outbound stream

This commit is contained in:
Vurich
2017-12-14 17:37:32 +01:00
parent 5ddda08170
commit 19f0c8f3ef
12 changed files with 914 additions and 301 deletions

View File

@ -7,6 +7,7 @@ members = [
"libp2p-tcp-transport",
"multistream-select",
"datastore",
"futures-mutex",
"rw-stream-sink",
"circular-buffer",
"varint-rs",

View File

@ -20,16 +20,16 @@
extern crate bytes;
extern crate futures;
extern crate multiplex;
extern crate libp2p_secio as secio;
extern crate libp2p_swarm as swarm;
extern crate libp2p_tcp_transport as tcp;
extern crate multiplex;
extern crate tokio_core;
extern crate tokio_io;
use bytes::BytesMut;
use futures::{Stream, Sink, Future};
use swarm::{Transport, SimpleProtocol};
use futures::{Future, Sink, Stream};
use swarm::{SimpleProtocol, Transport};
use tcp::TcpConfig;
use tokio_core::reactor::Core;
use tokio_io::codec::length_delimited;
@ -73,20 +73,29 @@ fn main() {
// of any opened stream.
// We use it to dial `/ip4/127.0.0.1/tcp/10333`.
let dialer = transport.dial_and_listen(swarm::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap())
let dialer = transport
.dial_and_listen(swarm::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap())
.unwrap_or_else(|_| panic!("unsupported multiaddr protocol ; should never happen"))
.and_then(|(echo, incoming)| {
.and_then(|(incoming, echo)| {
// `echo` is what the closure used when initializing "echo" returns.
// Consequently, please note that the `send` method is available only because the type
// `length_delimited::Framed` has a `send` method.
echo.send("hello world".into()).map(Option::Some)
.select(incoming.for_each(|_| { println!("opened"); Ok(()) }).map(|()| None))
echo.and_then(|echo| echo.send("hello world".into()).map(Option::Some))
.select(
incoming
.for_each(|_| {
println!("opened");
Ok(())
})
.map(|()| None),
)
.map(|(n, _)| n)
.map_err(|(e, _)| e)
})
.and_then(|echo| {
// The message has been successfully sent. Now wait for an answer.
echo.unwrap().into_future()
echo.unwrap()
.into_future()
.map(|(msg, rest)| {
println!("received: {:?}", msg);
rest

15
futures-mutex/Cargo.toml Normal file
View File

@ -0,0 +1,15 @@
[package]
name = "futures-mutex"
version = "0.2.0"
authors = ["Adam Jacobus <adam.m.jacobus@gmail.com>"]
license = "Apache-2.0"
description = "A Mutex for the Future(s)"
readme = "README.md"
documentation = "https://docs.rs/futures-mutex"
repository = "https://github.com/proman21/futures-mutex"
keywords = ["future", "futures", "mutex", "concurrency"]
categories = ["asynchronous", "concurrency"]
[dependencies]
futures = "0.1.14"
crossbeam = "0.2.10"

201
futures-mutex/LICENSE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

28
futures-mutex/README.md Normal file
View File

@ -0,0 +1,28 @@
# futures-mutex
*A Mutex for the Future(s)*
[![Crates.io](https://img.shields.io/crates/v/futures-mutex.svg)](https://crates.io/crates/futures-mutex)
[![Docs.rs](https://docs.rs/futures-mutex/badge.svg)](https://docs.rs/futures-mutex)
## Usage
Add this to your `Cargo.toml`:
```toml
[dependencies]
futures-mutex = "0.2.0"
```
Then, add this to your crate:
```rust
extern crate futures_mutex;
```
`FutMutex<T>` follows a very similar API to [`futures::sync::BiLock`](https://docs.rs/futures/0.1.11/futures/sync/struct.BiLock.html), however it can have more than two handles.
## License
`futures-mutex` is distributed under the Apache License v2.0. See the LICENSE
file for the full text of the license.

319
futures-mutex/src/lib.rs Normal file
View File

@ -0,0 +1,319 @@
//! A Mutex for the Future(s)
//!
//! API is similar to [`futures::sync::BiLock`](https://docs.rs/futures/0.1.11/futures/sync/struct.BiLock.html)
//! However, it can be cloned into as many handles as desired.
//!
//! ```
//! extern crate futures;
//! extern crate futures_mutex;
//!
//! use futures::{Future, Poll, Async};
//! use futures_mutex::Mutex;
//!
//! struct AddTwo {
//! lock: Mutex<usize>
//! }
//!
//! impl Future for AddTwo {
//! type Item = usize;
//! type Error = ();
//! fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
//! match self.lock.poll_lock() {
//! Async::Ready(mut g) => {
//! *g += 2;
//! Ok(Async::Ready(*g))
//! },
//! Async::NotReady => Ok(Async::NotReady)
//! }
//! }
//! }
//!
//! fn main() {
//! let lock1: Mutex<usize> = Mutex::new(0);
//! let lock2 = lock1.clone();
//!
//! let future = AddTwo { lock: lock2 };
//!
//! // This future will return the current value and the recovered lock.
//! let used_lock = lock1.lock().map(|b| (*b, b.unlock()));
//!
//! let _ = future.join(used_lock).map(|(add_two, (value, _))| {
//! assert_eq!(add_two, value);
//! }).wait().unwrap();
//! }
//! ```
extern crate futures;
extern crate crossbeam;
use std::ops::{Deref, DerefMut};
use std::mem;
use std::sync::Arc;
use std::sync::atomic::{Ordering, AtomicBool};
use std::cell::UnsafeCell;
use crossbeam::sync::MsQueue;
use futures::task::{current, Task};
use futures::{Future, Poll, Async};
#[derive(Debug)]
struct Inner<T> {
wait_queue: MsQueue<Task>,
locked: AtomicBool,
data: UnsafeCell<T>
}
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
assert!(!self.locked.load(Ordering::SeqCst))
}
}
unsafe impl<T: Send> Send for Inner<T> {}
unsafe impl<T: Send> Sync for Inner<T> {}
/// A Mutex designed for use inside Futures. Works like `BiLock<T>` from the `futures` crate, but
/// with more than 2 handles.
///
/// **THIS IS NOT A GENRAL PURPOSE MUTEX! IF YOU CALL `poll_lock` OR `lock` OUTSIDE THE CONTEXT OF A TASK, IT WILL PANIC AND EAT YOUR LAUNDRY.**
///
/// This type provides a Mutex that will track tasks that are requesting the Mutex, and will unpark
/// them in the order they request the lock.
///
/// *As of now, there is no strong guarantee that a particular handle of the lock won't be starved. Hopefully the use of the queue will prevent this, but I haven't tried to test that.*
#[derive(Debug)]
pub struct Mutex<T> {
inner: Arc<Inner<T>>
}
impl<T> Mutex<T> {
/// Create a new Mutex wrapping around a value `t`
pub fn new(t: T) -> Mutex<T> {
let inner = Arc::new(Inner {
wait_queue: MsQueue::new(),
locked: AtomicBool::new(false),
data: UnsafeCell::new(t)
});
Mutex {
inner: inner
}
}
/// This will attempt a non-blocking lock on the mutex, returning `Async::NotReady` if it
/// can't be acquired.
///
/// This function will return immediatly with a `MutexGuard` if the lock was acquired
/// successfully. When it drops, the lock will be unlocked.
///
/// If it can't acquire the lock, it will schedule the current task to be unparked when it
/// might be able lock the mutex again.
///
/// # Panics
///
/// This function will panic if called outside the context of a future's task.
pub fn poll_lock(&self) -> Async<MutexGuard<T>> {
if self.inner.locked.compare_and_swap(false, true, Ordering::SeqCst) {
self.inner.wait_queue.push(current());
Async::NotReady
} else {
Async::Ready(MutexGuard{ inner: self })
}
}
/// Convert this lock into a future that resolves to a guard that allows access to the data.
/// This function returns `MutexAcquire<T>`, which resolves to a `MutexGuard<T>`
/// guard type.
///
/// The returned future will never return an error.
pub fn lock(&self) -> MutexAcquire<T> {
MutexAcquire {
inner: self
}
}
/// We unlock the mutex and wait for someone to lock. We try and unpark as many tasks as we
/// can to prevents dead tasks from deadlocking the mutex, or tasks that have finished their
/// critical section and were awakened.
fn unlock(&self) {
if !self.inner.locked.swap(false, Ordering::SeqCst) {
panic!("Tried to unlock an already unlocked Mutex, something has gone terribly wrong");
}
while !self.inner.locked.load(Ordering::SeqCst) {
match self.inner.wait_queue.try_pop() {
Some(task) => task.notify(),
None => return
}
}
}
}
impl<T> Clone for Mutex<T> {
fn clone(&self) -> Mutex<T> {
Mutex {
inner: self.inner.clone()
}
}
}
/// Returned RAII guard from the `poll_lock` method.
///
/// This structure acts as a sentinel to the data in the `Mutex<T>` itself,
/// implementing `Deref` and `DerefMut` to `T`. When dropped, the lock will be
/// unlocked.
#[derive(Debug)]
pub struct MutexGuard<'a, T: 'a> {
inner: &'a Mutex<T>
}
impl<'a, T> Deref for MutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
unsafe { &*self.inner.inner.data.get() }
}
}
impl<'a, T> DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.inner.inner.data.get() }
}
}
impl<'a, T> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
self.inner.unlock();
}
}
/// Future returned by `Mutex::lock` which resolves to a guard when a lock is acquired.
#[derive(Debug)]
pub struct MutexAcquire<'a, T: 'a> {
inner: &'a Mutex<T>
}
impl<'a, T> Future for MutexAcquire<'a, T> {
type Item = MutexGuard<'a, T>;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(self.inner.poll_lock())
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::executor::{self, Notify};
use futures::future;
use futures::stream::{self, Stream};
use std::thread;
pub fn unpark_noop() -> Arc<Notify> {
struct Foo;
impl Notify for Foo {
fn notify(&self, id: usize) {}
}
Arc::new(Foo)
}
#[test]
fn simple() {
let future = future::lazy(|| {
let lock1 = Mutex::new(1);
let lock2 = lock1.clone();
let lock3 = lock1.clone();
let mut guard = match lock1.poll_lock() {
Async::Ready(g) => g,
Async::NotReady => panic!("poll not ready"),
};
assert_eq!(*guard, 1);
*guard = 2;
assert!(lock1.poll_lock().is_not_ready());
assert!(lock2.poll_lock().is_not_ready());
assert!(lock3.poll_lock().is_not_ready());
drop(guard);
assert!(lock1.poll_lock().is_ready());
assert!(lock2.poll_lock().is_ready());
assert!(lock3.poll_lock().is_ready());
let guard = match lock2.poll_lock() {
Async::Ready(g) => g,
Async::NotReady => panic!("poll not ready"),
};
assert_eq!(*guard, 2);
assert!(lock1.poll_lock().is_not_ready());
assert!(lock2.poll_lock().is_not_ready());
assert!(lock3.poll_lock().is_not_ready());
Ok::<(), ()>(())
});
assert!(executor::spawn(future)
.poll_future_notify(unpark_noop())
.expect("failure in poll")
.is_ready());
}
#[test]
fn concurrent() {
const N: usize = 10000;
let lock1 = Mutex::new(0);
let lock2 = lock1.clone();
let a = Increment {
a: Some(lock1),
remaining: N,
};
let b = stream::iter_ok((0..N).map(Ok::<_, ()>)).fold(lock2, |b, _n| {
b.lock().map(|mut b| {
*b += 1;
b.unlock()
})
});
let t1 = thread::spawn(move || a.wait());
let b = b.wait().expect("b error");
let a = t1.join().unwrap().expect("a error");
match a.poll_lock() {
Async::Ready(l) => assert_eq!(*l, 2 * N),
Async::NotReady => panic!("poll not ready"),
}
match b.poll_lock() {
Async::Ready(l) => assert_eq!(*l, 2 * N),
Async::NotReady => panic!("poll not ready"),
}
struct Increment {
remaining: usize,
a: Option<Mutex<usize>>,
}
impl Future for Increment {
type Item = Mutex<usize>;
type Error = ();
fn poll(&mut self) -> Poll<Mutex<usize>, ()> {
loop {
if self.remaining == 0 {
return Ok(self.a.take().unwrap().into())
}
let a = self.a.as_ref().unwrap();
let mut a = match a.poll_lock() {
Async::Ready(l) => l,
Async::NotReady => return Ok(Async::NotReady),
};
self.remaining -= 1;
*a += 1;
}
}
}
}
}

View File

@ -43,14 +43,14 @@
//! TODO: this whole code is a dummy and should be rewritten after the design has been properly
//! figured out.
use futures::{Future, Stream, Async, Poll};
use futures::{Async, Future, Poll, Stream};
use futures::stream::Fuse as StreamFuse;
use futures::stream;
use multiaddr::Multiaddr;
use muxing::StreamMuxer;
use smallvec::SmallVec;
use std::io::Error as IoError;
use transport::{Transport, ConnectionUpgrade, UpgradedNode, MuxedTransport};
use transport::{ConnectionUpgrade, MuxedTransport, Transport, UpgradedNode};
/// Allows reusing the same muxed connection multiple times.
///
@ -59,16 +59,18 @@ use transport::{Transport, ConnectionUpgrade, UpgradedNode, MuxedTransport};
/// Implements the `Transport` trait.
#[derive(Clone)]
pub struct ConnectionReuse<T, C>
where T: Transport,
C: ConnectionUpgrade<T::RawConn>
where
T: Transport,
C: ConnectionUpgrade<T::RawConn>,
{
// Underlying transport and connection upgrade for when we need to dial or listen.
inner: UpgradedNode<T, C>,
}
impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C>
where T: Transport,
C: ConnectionUpgrade<T::RawConn>
where
T: Transport,
C: ConnectionUpgrade<T::RawConn>,
{
#[inline]
fn from(node: UpgradedNode<T, C>) -> ConnectionReuse<T, C> {
@ -77,20 +79,16 @@ impl<T, C> From<UpgradedNode<T, C>> for ConnectionReuse<T, C>
}
impl<T, C> Transport for ConnectionReuse<T, C>
where T: Transport + 'static, // TODO: 'static :(
where
T: Transport + 'static, // TODO: 'static :(
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :(
C: Clone,
C::Output: StreamMuxer + Clone,
C::NamesIter: Clone // TODO: not elegant
C::NamesIter: Clone, // TODO: not elegant
{
type RawConn = <C::Output as StreamMuxer>::Substream;
type Listener = ConnectionReuseListener<
Box<
Stream<
Item = (C::Output, Multiaddr),
Error = IoError,
>,
>,
Box<Stream<Item = (C::Output, Multiaddr), Error = IoError>>,
C::Output,
>;
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
@ -125,51 +123,54 @@ impl<T, C> Transport for ConnectionReuse<T, C>
}
impl<T, C> MuxedTransport for ConnectionReuse<T, C>
where T: Transport + 'static, // TODO: 'static :(
where
T: Transport + 'static, // TODO: 'static :(
C: ConnectionUpgrade<T::RawConn> + 'static, // TODO: 'static :(
C: Clone,
C::Output: StreamMuxer + Clone,
C::NamesIter: Clone // TODO: not elegant
C::NamesIter: Clone, // TODO: not elegant
{
type Incoming = Box<Stream<Item = <C::Output as StreamMuxer>::Substream, Error = IoError>>;
type DialAndListen = Box<Future<Item = (Self::RawConn, Self::Incoming), Error = IoError>>;
type Incoming = stream::AndThen<
stream::Repeat<C::Output, IoError>,
fn(C::Output)
-> <<C as ConnectionUpgrade<T::RawConn>>::Output as StreamMuxer>::InboundSubstream,
<<C as ConnectionUpgrade<T::RawConn>>::Output as StreamMuxer>::InboundSubstream,
>;
type Outgoing =
<<C as ConnectionUpgrade<T::RawConn>>::Output as StreamMuxer>::OutboundSubstream;
type DialAndListen = Box<Future<Item = (Self::Incoming, Self::Outgoing), Error = IoError>>;
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
let muxer_dial = match self.inner.dial(addr) {
Ok(l) => l,
Err((inner, addr)) => {
return Err((ConnectionReuse { inner: inner }, addr));
}
};
let future = muxer_dial
.and_then(|muxer| {
let dial = muxer.clone().outbound();
dial.map(|d| (d, muxer))
self.inner
.dial(addr)
.map_err(|(inner, addr)| (ConnectionReuse { inner: inner }, addr))
.map(|fut| {
fut.map(|muxer| {
(
stream::repeat(muxer.clone()).and_then(StreamMuxer::inbound as fn(_) -> _),
muxer.outbound(),
)
})
.and_then(|(dial, muxer)| {
let listener = stream::repeat(muxer).and_then(|muxer| muxer.inbound());
let listener = Box::new(listener) as Box<Stream<Item = _, Error = _>>;
Ok((dial, listener))
});
Ok(Box::new(future) as Box<_>)
})
.map(|fut| Box::new(fut) as _)
}
}
/// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
/// `ConnectionReuse` struct.
pub struct ConnectionReuseListener<S, M>
where S: Stream<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer
where
S: Stream<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer,
{
listener: StreamFuse<S>,
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
}
impl<S, M> Stream for ConnectionReuseListener<S, M>
where S: Stream<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer + Clone + 'static // TODO: 'static :(
where
S: Stream<Item = (M, Multiaddr), Error = IoError>,
M: StreamMuxer + Clone + 'static, // TODO: 'static :(
{
type Item = (M::Substream, Multiaddr);
type Error = IoError;
@ -207,7 +208,7 @@ impl<S, M> Stream for ConnectionReuseListener<S, M>
*next_incoming = new_next;
return Ok(Async::Ready(Some((incoming, client_addr.clone()))));
}
Ok(Async::NotReady) => {},
Ok(Async::NotReady) => {}
Err(_) => {
connections_to_drop.push(index);
}

View File

@ -30,8 +30,8 @@
//! in a complex chain of protocols negotiation.
use bytes::Bytes;
use futures::{Stream, Poll, Async};
use futures::future::{IntoFuture, Future, ok as future_ok, FutureResult, FromErr};
use futures::{Async, Poll, Stream};
use futures::future::{self, FromErr, Future, FutureResult, IntoFuture};
use multiaddr::Multiaddr;
use multistream_select;
use muxing::StreamMuxer;
@ -70,12 +70,15 @@ pub trait Transport {
/// > situations such as turning `/ip4/127.0.0.1/tcp/0` into
/// > `/ip4/127.0.0.1/tcp/<actual port>`.
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)>
where Self: Sized;
where
Self: Sized;
/// Dial to the given multi-addr.
///
/// Returns either a future which may resolve to a connection, or gives back the multiaddress.
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> where Self: Sized;
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)>
where
Self: Sized;
/// Builds a new struct that implements `Transport` that contains both `self` and `other`.
///
@ -83,7 +86,8 @@ pub trait Transport {
/// return an error then `other` will be tried.
#[inline]
fn or_transport<T>(self, other: T) -> OrTransport<Self, T>
where Self: Sized
where
Self: Sized,
{
OrTransport(self, other)
}
@ -95,8 +99,9 @@ pub trait Transport {
/// > (communication encryption), *multiplex*, but also a protocol handler.
#[inline]
fn with_upgrade<U>(self, upgrade: U) -> UpgradedNode<Self, U>
where Self: Sized,
U: ConnectionUpgrade<Self::RawConn>
where
Self: Sized,
U: ConnectionUpgrade<Self::RawConn>,
{
UpgradedNode {
transports: self,
@ -111,14 +116,18 @@ pub trait MuxedTransport: Transport {
/// Produces substreams on the dialed connection.
type Incoming: Stream<Item = Self::RawConn, Error = IoError>;
/// Future indicating when dialing succeeded.
type DialAndListen: Future<Item = (Self::RawConn, Self::Incoming), Error = IoError>;
/// Future resolving to an outgoing connection
type Outgoing: Future<Item = Self::RawConn, Error = IoError>;
/// Future resolving to a tuple of `(Incoming, Outgoing)`
type DialAndListen: Future<Item = (Self::Incoming, Self::Outgoing), Error = IoError>;
/// Dial to the given multi-addr, and listen to incoming substreams on the dialed connection.
///
/// Returns either a future which may resolve to a connection, or gives back the multiaddress.
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)>
where Self: Sized;
where
Self: Sized;
}
/// Dummy implementation of `Transport` that just denies every single attempt.
@ -145,7 +154,8 @@ impl Transport for DeniedTransport {
impl MuxedTransport for DeniedTransport {
// TODO: could use `!` once stable
type Incoming = Box<Stream<Item = Self::RawConn, Error = IoError>>;
type DialAndListen = Box<Future<Item = (Self::RawConn, Self::Incoming), Error = IoError>>;
type Outgoing = Box<Future<Item = Self::RawConn, Error = IoError>>;
type DialAndListen = Box<Future<Item = (Self::Incoming, Self::Outgoing), Error = IoError>>;
#[inline]
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
@ -158,15 +168,14 @@ impl MuxedTransport for DeniedTransport {
pub struct OrTransport<A, B>(A, B);
impl<A, B> Transport for OrTransport<A, B>
where A: Transport,
B: Transport
where
A: Transport,
B: Transport,
{
type RawConn = EitherSocket<A::RawConn, B::RawConn>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type Dial = EitherTransportFuture<
<A::Dial as IntoFuture>::Future,
<B::Dial as IntoFuture>::Future,
>;
type Dial =
EitherTransportFuture<<A::Dial as IntoFuture>::Future, <B::Dial as IntoFuture>::Future>;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
let (first, addr) = match self.0.listen_on(addr) {
@ -206,7 +215,8 @@ impl<F> SimpleProtocol<F> {
/// Builds a `SimpleProtocol`.
#[inline]
pub fn new<N>(name: N, upgrade: F) -> SimpleProtocol<F>
where N: Into<Bytes>
where
N: Into<Bytes>,
{
SimpleProtocol {
name: name.into(),
@ -226,33 +236,47 @@ impl<F> Clone for SimpleProtocol<F> {
}
impl<A, B> MuxedTransport for OrTransport<A, B>
where A: MuxedTransport,
where
A: MuxedTransport,
B: MuxedTransport,
A::DialAndListen: 'static,
B::DialAndListen: 'static,
{
type Incoming = EitherIncomingStream<A::Incoming, B::Incoming>;
type DialAndListen = EitherMuxedTransportFuture<A::DialAndListen, B::DialAndListen>;
type Outgoing = future::Either<
future::Map<A::Outgoing, fn(A::RawConn) -> Self::RawConn>,
future::Map<B::Outgoing, fn(B::RawConn) -> Self::RawConn>,
>;
type DialAndListen = Box<Future<Item = (Self::Incoming, Self::Outgoing), Error = IoError>>;
fn dial_and_listen(self, addr: Multiaddr)
-> Result<Self::DialAndListen, (Self, Multiaddr)>
{
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
let (first, addr) = match self.0.dial_and_listen(addr) {
Ok(connec) => {
return Ok(EitherMuxedTransportFuture::First(connec));
},
return Ok(Box::new(connec.map(|(inc, out)| {
(
EitherIncomingStream::First(inc),
future::Either::A(out.map(EitherSocket::First as fn(_) -> _)),
)
})));
}
Err(err) => err,
};
match self.1.dial_and_listen(addr) {
Ok(connec) => {
Ok(EitherMuxedTransportFuture::Second(connec))
},
Ok(connec) => Ok(Box::new(connec.map(|(inc, out)| {
(
EitherIncomingStream::Second(inc),
future::Either::B(out.map(EitherSocket::Second as fn(_) -> _)),
)
}))),
Err((second, addr)) => Err((OrTransport(first, second), addr)),
}
}
}
impl<C, F, O> ConnectionUpgrade<C> for SimpleProtocol<F>
where C: AsyncRead + AsyncWrite,
where
C: AsyncRead + AsyncWrite,
F: Fn(C) -> O,
O: IntoFuture<Error = IoError>,
{
@ -282,8 +306,9 @@ pub enum EitherListenStream<A, B> {
}
impl<A, B, Sa, Sb> Stream for EitherListenStream<A, B>
where A: Stream<Item = (Sa, Multiaddr), Error = IoError>,
B: Stream<Item = (Sb, Multiaddr), Error = IoError>
where
A: Stream<Item = (Sa, Multiaddr), Error = IoError>,
B: Stream<Item = (Sb, Multiaddr), Error = IoError>,
{
type Item = (EitherSocket<Sa, Sb>, Multiaddr);
type Error = IoError;
@ -291,12 +316,10 @@ impl<A, B, Sa, Sb> Stream for EitherListenStream<A, B>
#[inline]
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self {
&mut EitherListenStream::First(ref mut a) => {
a.poll().map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::First(s), a))))
}
&mut EitherListenStream::Second(ref mut a) => {
a.poll().map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::Second(s), a))))
}
&mut EitherListenStream::First(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::First(s), a)))),
&mut EitherListenStream::Second(ref mut a) => a.poll()
.map(|i| i.map(|v| v.map(|(s, a)| (EitherSocket::Second(s), a)))),
}
}
}
@ -309,8 +332,9 @@ pub enum EitherIncomingStream<A, B> {
}
impl<A, B, Sa, Sb> Stream for EitherIncomingStream<A, B>
where A: Stream<Item = Sa, Error = IoError>,
B: Stream<Item = Sb, Error = IoError>
where
A: Stream<Item = Sa, Error = IoError>,
B: Stream<Item = Sb, Error = IoError>,
{
type Item = EitherSocket<Sa, Sb>;
type Error = IoError;
@ -341,8 +365,9 @@ pub enum EitherTransportFuture<A, B> {
}
impl<A, B> Future for EitherTransportFuture<A, B>
where A: Future<Error = IoError>,
B: Future<Error = IoError>
where
A: Future<Error = IoError>,
B: Future<Error = IoError>,
{
type Item = EitherSocket<A::Item, B::Item>;
type Error = IoError;
@ -362,40 +387,6 @@ impl<A, B> Future for EitherTransportFuture<A, B>
}
}
/// Implements `Future` and redirects calls to either `First` or `Second`.
///
/// Additionally, the output will be wrapped inside a `EitherSocket` and a `EitherIncomingStream`.
///
/// > **Note**: This type is needed because of the lack of `-> impl Trait` in Rust. It can be
/// > removed eventually.
#[derive(Debug, Copy, Clone)]
pub enum EitherMuxedTransportFuture<A, B> {
First(A),
Second(B),
}
impl<A, B, Da, Db, Sa, Sb> Future for EitherMuxedTransportFuture<A, B>
where A: Future<Item = (Da, Sa), Error = IoError>,
B: Future<Item = (Db, Sb), Error = IoError>
{
type Item = (EitherSocket<Da, Db>, EitherIncomingStream<Sa, Sb>);
type Error = IoError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self {
&mut EitherMuxedTransportFuture::First(ref mut a) => {
let (dial, listen) = try_ready!(a.poll());
Ok(Async::Ready((EitherSocket::First(dial), EitherIncomingStream::First(listen))))
}
&mut EitherMuxedTransportFuture::Second(ref mut b) => {
let (dial, listen) = try_ready!(b.poll());
Ok(Async::Ready((EitherSocket::Second(dial), EitherIncomingStream::Second(listen))))
}
}
}
}
/// Implements `AsyncRead` and `AsyncWrite` and dispatches all method calls to either `First` or
/// `Second`.
#[derive(Debug, Copy, Clone)]
@ -405,8 +396,9 @@ pub enum EitherSocket<A, B> {
}
impl<A, B> AsyncRead for EitherSocket<A, B>
where A: AsyncRead,
B: AsyncRead
where
A: AsyncRead,
B: AsyncRead,
{
#[inline]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
@ -418,8 +410,9 @@ impl<A, B> AsyncRead for EitherSocket<A, B>
}
impl<A, B> Read for EitherSocket<A, B>
where A: Read,
B: Read
where
A: Read,
B: Read,
{
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, IoError> {
@ -431,8 +424,9 @@ impl<A, B> Read for EitherSocket<A, B>
}
impl<A, B> AsyncWrite for EitherSocket<A, B>
where A: AsyncWrite,
B: AsyncWrite
where
A: AsyncWrite,
B: AsyncWrite,
{
#[inline]
fn shutdown(&mut self) -> Poll<(), IoError> {
@ -444,8 +438,9 @@ impl<A, B> AsyncWrite for EitherSocket<A, B>
}
impl<A, B> Write for EitherSocket<A, B>
where A: Write,
B: Write
where
A: Write,
B: Write,
{
#[inline]
fn write(&mut self, buf: &[u8]) -> Result<usize, IoError> {
@ -465,7 +460,8 @@ impl<A, B> Write for EitherSocket<A, B>
}
impl<A, B> StreamMuxer for EitherSocket<A, B>
where A: StreamMuxer,
where
A: StreamMuxer,
B: StreamMuxer,
{
type Substream = EitherSocket<A::Substream, B::Substream>;
@ -521,8 +517,7 @@ pub trait ConnectionUpgrade<C: AsyncRead + AsyncWrite> {
///
/// Because performing the upgrade may not be instantaneous (eg. it may require a handshake),
/// this function returns a future instead of the direct output.
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint)
-> Self::Future;
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) -> Self::Future;
}
/// Type of connection for the upgrade.
@ -539,9 +534,10 @@ pub enum Endpoint {
pub struct OrUpgrade<A, B>(A, B);
impl<C, A, B> ConnectionUpgrade<C> for OrUpgrade<A, B>
where C: AsyncRead + AsyncWrite,
where
C: AsyncRead + AsyncWrite,
A: ConnectionUpgrade<C>,
B: ConnectionUpgrade<C>
B: ConnectionUpgrade<C>,
{
type NamesIter = NamesIterChain<A::NamesIter, B::NamesIter>;
type UpgradeIdentifier = EitherUpgradeIdentifier<A::UpgradeIdentifier, B::UpgradeIdentifier>;
@ -558,9 +554,7 @@ impl<C, A, B> ConnectionUpgrade<C> for OrUpgrade<A, B>
type Future = EitherConnUpgrFuture<A::Future, B::Future>;
#[inline]
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint)
-> Self::Future
{
fn upgrade(self, socket: C, id: Self::UpgradeIdentifier, ty: Endpoint) -> Self::Future {
match id {
EitherUpgradeIdentifier::First(id) => {
EitherConnUpgrFuture::First(self.0.upgrade(socket, id, ty))
@ -592,8 +586,9 @@ pub enum EitherConnUpgrFuture<A, B> {
}
impl<A, B> Future for EitherConnUpgrFuture<A, B>
where A: Future<Error = IoError>,
B: Future<Error = IoError>
where
A: Future<Error = IoError>,
B: Future<Error = IoError>,
{
type Item = EitherSocket<A::Item, B::Item>;
type Error = IoError;
@ -624,8 +619,9 @@ pub struct NamesIterChain<A, B> {
}
impl<A, B, AId, BId> Iterator for NamesIterChain<A, B>
where A: Iterator<Item = (Bytes, AId)>,
B: Iterator<Item = (Bytes, BId)>
where
A: Iterator<Item = (Bytes, AId)>,
B: Iterator<Item = (Bytes, BId)>,
{
type Item = (Bytes, EitherUpgradeIdentifier<AId, BId>);
@ -661,7 +657,8 @@ impl<A, B, AId, BId> Iterator for NamesIterChain<A, B>
pub struct PlainTextConfig;
impl<C> ConnectionUpgrade<C> for PlainTextConfig
where C: AsyncRead + AsyncWrite
where
C: AsyncRead + AsyncWrite,
{
type Output = C;
type Future = FutureResult<C, IoError>;
@ -670,7 +667,7 @@ impl<C> ConnectionUpgrade<C> for PlainTextConfig
#[inline]
fn upgrade(self, i: C, _: (), _: Endpoint) -> Self::Future {
future_ok(i)
future::ok(i)
}
#[inline]
@ -690,8 +687,9 @@ pub struct UpgradedNode<T, C> {
}
impl<'a, T, C> UpgradedNode<T, C>
where T: Transport + 'a,
C: ConnectionUpgrade<T::RawConn> + 'a
where
T: Transport + 'a,
C: ConnectionUpgrade<T::RawConn> + 'a,
{
/// Builds a new struct that implements `ConnectionUpgrade` that contains both `self` and
/// `other_upg`.
@ -700,7 +698,8 @@ impl<'a, T, C> UpgradedNode<T, C>
/// of `other_upg`, then upgrade the connection to the negogiated protocol.
#[inline]
pub fn or_upgrade<D>(self, other_upg: D) -> UpgradedNode<T, OrUpgrade<C, D>>
where D: ConnectionUpgrade<T::RawConn> + 'a
where
D: ConnectionUpgrade<T::RawConn> + 'a,
{
UpgradedNode {
transports: self.transports,
@ -754,35 +753,53 @@ impl<'a, T, C> UpgradedNode<T, C>
///
/// Note that this does the same as `MuxedTransport::dial_and_listen`, but with less
/// restrictions on the trait requirements.
pub fn dial_and_listen(self, addr: Multiaddr)
-> Result<Box<Future<Item = (C::Output, Box<Stream<Item = C::Output, Error = IoError> + 'a>), Error = IoError> + 'a>, (Self, Multiaddr)>
where T: MuxedTransport,
pub fn dial_and_listen(
self,
addr: Multiaddr,
) -> Result<
Box<
Future<
Item = (
Box<Stream<Item = C::Output, Error = IoError> + 'a>,
Box<Future<Item = C::Output, Error = IoError> + 'a>,
),
Error = IoError,
>
+ 'a,
>,
(Self, Multiaddr),
>
where
T: MuxedTransport,
C::NamesIter: Clone, // TODO: not elegant
C: Clone
C: Clone,
{
let upgrade = self.upgrade;
let upgrade2 = upgrade.clone();
let dialed_fut = match self.transports.dial_and_listen(addr) {
Ok(f) => f,
Err((trans, addr)) => {
self.transports
.dial_and_listen(addr)
.map_err(move |(trans, addr)| {
let builder = UpgradedNode {
transports: trans,
upgrade: upgrade,
};
return Err((builder, addr));
}
};
let upgrade2 = upgrade.clone();
(builder, addr)
})
.map(move |dialed_fut| {
let dialed_fut = dialed_fut
// Try to negotiate the protocol.
.and_then(move |(dialer, in_stream)| {
.map(move |(in_stream, dialer)| {
let upgrade = upgrade2.clone();
let dialer = {
let iter = upgrade2.protocol_names()
.map(|(name, id)| (name, <Bytes as PartialEq>::eq, id));
let negotiated = multistream_select::dialer_select_proto(dialer, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err));
let negotiated = dialer.and_then(|dialer| {
multistream_select::dialer_select_proto(dialer, iter)
.map_err(|err| IoError::new(IoErrorKind::Other, err))
});
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade2))
}
.and_then(|(upgrade_id, connection, upgrade)| {
@ -793,12 +810,15 @@ impl<'a, T, C> UpgradedNode<T, C>
// Try to negotiate the protocol.
.and_then(move |connection| {
let upgrade = upgrade.clone();
#[inline]
fn iter_map<T>((n, t): (Bytes, T)) -> (Bytes, fn(&Bytes,&Bytes)->bool, T) {
let iter = upgrade.protocol_names()
.map((|(n, t)| {
(n, <Bytes as PartialEq>::eq, t)
}
let iter = upgrade.protocol_names().map(iter_map);
let negotiated = multistream_select::listener_select_proto(connection, iter);
}) as fn(_) -> _);
let negotiated = multistream_select::listener_select_proto(
connection,
iter,
);
negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade))
.map_err(|err| IoError::new(IoErrorKind::Other, err))
})
@ -806,10 +826,14 @@ impl<'a, T, C> UpgradedNode<T, C>
upgrade.upgrade(connection, upgrade_id, Endpoint::Listener)
});
dialer.map(|d| (d, Box::new(in_stream) as Box<Stream<Item = _, Error = _>>))
(
Box::new(in_stream) as Box<Stream<Item = _, Error = _>>,
Box::new(dialer) as Box<Future<Item = _, Error = _>>,
)
});
Ok(Box::new(dialed_fut) as Box<_>)
Box::new(dialed_fut) as _
})
}
/// Start listening on the multiaddr using the transport that was passed to `new`.
@ -822,11 +846,15 @@ impl<'a, T, C> UpgradedNode<T, C>
self,
addr: Multiaddr,
) -> Result<
(Box<Stream<Item = (C::Output, Multiaddr), Error = IoError> + 'a>, Multiaddr),
(
Box<Stream<Item = (C::Output, Multiaddr), Error = IoError> + 'a>,
Multiaddr,
),
(Self, Multiaddr),
>
where C::NamesIter: Clone, // TODO: not elegant
C: Clone
where
C::NamesIter: Clone, // TODO: not elegant
C: Clone,
{
let upgrade = self.upgrade;
@ -865,11 +893,12 @@ impl<'a, T, C> UpgradedNode<T, C>
}
impl<T, C> Transport for UpgradedNode<T, C>
where T: Transport + 'static,
where
T: Transport + 'static,
C: ConnectionUpgrade<T::RawConn> + 'static,
C::Output: AsyncRead + AsyncWrite,
C::NamesIter: Clone, // TODO: not elegant
C: Clone
C: Clone,
{
type RawConn = C::Output;
type Listener = Box<Stream<Item = (C::Output, Multiaddr), Error = IoError>>;
@ -887,17 +916,20 @@ impl<T, C> Transport for UpgradedNode<T, C>
}
impl<T, C> MuxedTransport for UpgradedNode<T, C>
where T: MuxedTransport + 'static,
where
T: MuxedTransport + 'static,
C: ConnectionUpgrade<T::RawConn> + 'static,
C::Output: AsyncRead + AsyncWrite,
C::NamesIter: Clone, // TODO: not elegant
C: Clone
C: Clone,
{
type Incoming = Box<Stream<Item = C::Output, Error = IoError>>;
type DialAndListen = Box<Future<Item = (Self::RawConn, Self::Incoming), Error = IoError>>;
type Outgoing = Box<Future<Item = Self::RawConn, Error = IoError>>;
type DialAndListen = Box<Future<Item = (Self::Incoming, Self::Outgoing), Error = IoError>>;
#[inline]
fn dial_and_listen(self, addr: Multiaddr) -> Result<Self::DialAndListen, (Self, Multiaddr)> {
// Calls an inherent function above
self.dial_and_listen(addr)
}
}

View File

@ -15,3 +15,4 @@ rand = "0.3.17"
libp2p-swarm = { path = "../libp2p-swarm" }
varint = { path = "../varint-rs" }
error-chain = "0.11.0"
futures-mutex = { path = "../futures-mutex" }

View File

@ -23,6 +23,7 @@ extern crate bytes;
#[macro_use]
extern crate error_chain;
extern crate futures;
extern crate futures_mutex;
extern crate libp2p_swarm as swarm;
extern crate num_bigint;
extern crate num_traits;
@ -42,7 +43,7 @@ use futures::future::{self, FutureResult};
use header::MultiplexHeader;
use swarm::muxing::StreamMuxer;
use swarm::{ConnectionUpgrade, Endpoint};
use parking_lot::Mutex;
use futures_mutex::Mutex;
use read::{read_stream, MultiplexReadState};
use shared::{buf_from_slice, ByteBuf, MultiplexShared};
use std::iter;
@ -74,7 +75,7 @@ pub struct Substream<T> {
impl<T> Drop for Substream<T> {
fn drop(&mut self) {
let mut lock = self.state.lock();
let mut lock = self.state.lock().wait().expect("This should never fail");
lock.close_stream(self.id);
}
@ -110,9 +111,9 @@ impl<T> Substream<T> {
// TODO: We always zero the buffer, we should delegate to the inner stream.
impl<T: AsyncRead> Read for Substream<T> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut lock = match self.state.try_lock() {
Some(lock) => lock,
None => return Err(io::ErrorKind::WouldBlock.into()),
let mut lock = match self.state.poll_lock() {
Async::Ready(lock) => lock,
Async::NotReady => return Err(io::ErrorKind::WouldBlock.into()),
};
read_stream(&mut lock, (self.id, buf))
@ -123,7 +124,10 @@ impl<T: AsyncRead> AsyncRead for Substream<T> {}
impl<T: AsyncWrite> Write for Substream<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let mut lock = self.state.try_lock().ok_or(io::ErrorKind::WouldBlock)?;
let mut lock = match self.state.poll_lock() {
Async::Ready(lock) => lock,
Async::NotReady => return Err(io::ErrorKind::WouldBlock.into()),
};
let mut buffer = self.buffer
.take()
@ -143,11 +147,12 @@ impl<T: AsyncWrite> Write for Substream<T> {
}
fn flush(&mut self) -> io::Result<()> {
self.state
.try_lock()
.ok_or(io::ErrorKind::WouldBlock)?
.stream
.flush()
let mut lock = match self.state.poll_lock() {
Async::Ready(lock) => lock,
Async::NotReady => return Err(io::ErrorKind::WouldBlock.into()),
};
lock.stream.flush()
}
}
@ -167,9 +172,9 @@ impl<T: AsyncRead> Future for InboundFuture<T> {
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut lock = match self.state.try_lock() {
Some(lock) => lock,
None => return Ok(Async::NotReady),
let mut lock = match self.state.poll_lock() {
Async::Ready(lock) => lock,
Async::NotReady => return Ok(Async::NotReady),
};
// Attempt to make progress, but don't block if we can't
@ -217,7 +222,7 @@ impl<T> OutboundFuture<T> {
}
fn nonce_to_id(id: usize, end: Endpoint) -> u32 {
id as u32 * 2 + if end == Endpoint::Dialer { 50 } else { 0 }
id as u32 * 2 + if end == Endpoint::Dialer { 0 } else { 1 }
}
impl<T: AsyncWrite> Future for OutboundFuture<T> {
@ -225,9 +230,9 @@ impl<T: AsyncWrite> Future for OutboundFuture<T> {
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let mut lock = match self.state.try_lock() {
Some(lock) => lock,
None => return Ok(Async::NotReady),
let mut lock = match self.state.poll_lock() {
Async::Ready(lock) => lock,
Async::NotReady => return Ok(Async::NotReady),
};
loop {
@ -380,7 +385,7 @@ mod tests {
Some(id.to_string())
);
let stream = io::Cursor::new(mplex.state.lock().stream.get_ref().clone());
let stream = io::Cursor::new(mplex.state.lock().wait().unwrap().stream.get_ref().clone());
let mplex = Multiplex::listen(stream);
@ -422,7 +427,7 @@ mod tests {
);
}
let stream = io::Cursor::new(mplex.state.lock().stream.get_ref().clone());
let stream = io::Cursor::new(mplex.state.lock().wait().unwrap().stream.get_ref().clone());
let mplex = Multiplex::listen(stream);

View File

@ -116,7 +116,7 @@ pub fn read_stream<'a, O: Into<Option<(u32, &'a mut [u8])>>, T: AsyncRead>(
let header = if let Some(header) = header {
header
} else {
return Ok(0);
return Ok(on_block.unwrap_or(0));
};
let MultiplexHeader {
@ -183,7 +183,7 @@ pub fn read_stream<'a, O: Into<Option<(u32, &'a mut [u8])>>, T: AsyncRead>(
let length = if let Some(length) = length {
length
} else {
return Ok(0);
return Ok(on_block.unwrap_or(0));
};
lock.read_state = match next {
@ -208,7 +208,8 @@ pub fn read_stream<'a, O: Into<Option<(u32, &'a mut [u8])>>, T: AsyncRead>(
let is_open = lock.open_streams
.get(&substream_id)
.map(SubstreamMetadata::open)
.unwrap_or_else(|| lock.to_open.contains_key(&substream_id));
.unwrap_or(false);
//.unwrap_or_else(|| lock.to_open.contains_key(&substream_id));
if is_open {
Some(MultiplexReadState::ParsingMessageBody {
@ -286,20 +287,16 @@ pub fn read_stream<'a, O: Into<Option<(u32, &'a mut [u8])>>, T: AsyncRead>(
if remaining_bytes == 0 {
lock.read_state = None;
return on_block;
} else if substream_id == *id {
let number_read = *on_block.as_ref().unwrap_or(&0);
if buf.len() == 0 {
return Ok(0);
} else if number_read >= buf.len() {
if number_read >= buf.len() {
lock.read_state = Some(ParsingMessageBody {
substream_id,
remaining_bytes,
});
return on_block;
return Ok(number_read);
}
let read_result = {

View File

@ -92,6 +92,10 @@ pub fn write_stream<T: AsyncWrite>(
let id = write_request.header.substream_id;
if buf.get_ref().len() as u64 - buf.position() == 0 {
return Ok(0);
}
match (request.request_type, write_request.request_type) {
(RequestType::Substream, RequestType::Substream) if request.header.substream_id != id => {
use std::mem;