diff --git a/circular-buffer/src/lib.rs b/circular-buffer/src/lib.rs index c74505ee..d6fb6f05 100644 --- a/circular-buffer/src/lib.rs +++ b/circular-buffer/src/lib.rs @@ -29,8 +29,8 @@ extern crate smallvec; -use std::ops::Drop; use std::mem::ManuallyDrop; +use std::ops::Drop; pub use smallvec::Array; @@ -41,8 +41,8 @@ use owned_slice::OwnedSlice; /// elements of these slices would be leaked after the slice goes out of scope. `OwnedSlice` simply /// manually drops all its elements when it goes out of scope. pub mod owned_slice { - use std::ops::{Deref, DerefMut, Drop}; use std::mem::ManuallyDrop; + use std::ops::{Deref, DerefMut, Drop}; /// A slice that owns its elements, but not their storage. This is useful for things like /// `Vec::retain` and `CircularBuffer::pop_slice`, since these functions can return a slice but diff --git a/datastore/src/json_file.rs b/datastore/src/json_file.rs index ce7c44d8..2d1c4bd3 100644 --- a/datastore/src/json_file.rs +++ b/datastore/src/json_file.rs @@ -27,8 +27,8 @@ use futures::stream::{iter_ok, Stream}; use query::{naive_apply_query, Query}; use serde::Serialize; use serde::de::DeserializeOwned; -use serde_json::{from_reader, from_value, to_value, to_writer, Map}; use serde_json::value::Value; +use serde_json::{from_reader, from_value, to_value, to_writer, Map}; use std::borrow::Cow; use std::fs; use std::io::Cursor; @@ -264,11 +264,11 @@ where #[cfg(test)] mod tests { - use {Filter, FilterOp, FilterTy, Order, Query}; use Datastore; use JsonFileDatastore; use futures::{Future, Stream}; use tempfile::NamedTempFile; + use {Filter, FilterOp, FilterTy, Order, Query}; #[test] fn open_and_flush() { diff --git a/datastore/src/lib.rs b/datastore/src/lib.rs index 56b61a2d..5dc18fa0 100644 --- a/datastore/src/lib.rs +++ b/datastore/src/lib.rs @@ -113,8 +113,8 @@ use std::borrow::Cow; use std::io::Error as IoError; use std::ops::DerefMut; -mod query; mod json_file; +mod query; pub use self::json_file::{JsonFileDatastore, JsonFileDatastoreEntry}; pub use self::query::{Filter, FilterOp, FilterTy, Order, Query}; diff --git a/datastore/src/query.rs b/datastore/src/query.rs index 36828bec..f7062279 100644 --- a/datastore/src/query.rs +++ b/datastore/src/query.rs @@ -18,8 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::{Async, Future, Poll, Stream}; use futures::stream::{iter_ok, Skip as StreamSkip, Take as StreamTake}; +use futures::{Async, Future, Poll, Stream}; use std::borrow::Cow; use std::cmp::Ordering; use std::io::Error as IoError; diff --git a/dns/src/lib.rs b/dns/src/lib.rs index 40221c4e..4408871d 100644 --- a/dns/src/lib.rs +++ b/dns/src/lib.rs @@ -99,10 +99,10 @@ impl Transport for DnsConfig where T: Transport + 'static, // TODO: 'static :-/ { - type RawConn = T::RawConn; + type Output = T::Output; type Listener = T::Listener; type ListenerUpgrade = T::ListenerUpgrade; - type Dial = Box>; + type Dial = Box>; #[inline] fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -238,10 +238,10 @@ mod tests { #[derive(Clone)] struct CustomTransport; impl Transport for CustomTransport { - type RawConn = ::RawConn; + type Output = ::Output; type Listener = ::Listener; type ListenerUpgrade = ::ListenerUpgrade; - type Dial = Box>; + type Dial = Box>; #[inline] fn listen_on( diff --git a/example/examples/echo-dialer.rs b/example/examples/echo-dialer.rs index aaf1cba7..49b1b130 100644 --- a/example/examples/echo-dialer.rs +++ b/example/examples/echo-dialer.rs @@ -29,11 +29,11 @@ extern crate libp2p_websocket as websocket; extern crate tokio_core; extern crate tokio_io; -use futures::{Future, Sink, Stream}; use futures::sync::oneshot; +use futures::{Future, Sink, Stream}; use std::env; use swarm::Transport; -use swarm::upgrade::{self, DeniedConnectionUpgrade, SimpleProtocol, UpgradeExt}; +use swarm::upgrade::{self, DeniedConnectionUpgrade, SimpleProtocol}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::AsyncRead; @@ -73,7 +73,7 @@ fn main() { } }; - plain_text.or_upgrade(secio) + upgrade::or(plain_text, upgrade::map(secio, |(socket, _)| socket)) }) // On top of plaintext or secio, we will use the multiplex protocol. @@ -89,8 +89,7 @@ fn main() { // by the listening part. We don't want to accept anything, so we pass a dummy object that // represents a connection that is always denied. let (swarm_controller, swarm_future) = swarm::swarm( - transport, - DeniedConnectionUpgrade, + transport.clone().with_upgrade(DeniedConnectionUpgrade), |_socket, _client_addr| -> Result<(), _> { unreachable!("All incoming connections should have been denied") }, @@ -108,7 +107,7 @@ fn main() { // We now use the controller to dial to the address. let (finished_tx, finished_rx) = oneshot::channel(); swarm_controller - .dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), proto, |echo, _| { + .dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), transport.with_upgrade(proto), |echo, _| { // `echo` is what the closure used when initializing `proto` returns. // Consequently, please note that the `send` method is available only because the type // `length_delimited::Framed` has a `send` method. diff --git a/example/examples/echo-server.rs b/example/examples/echo-server.rs index bbed10bf..fcc7a672 100644 --- a/example/examples/echo-server.rs +++ b/example/examples/echo-server.rs @@ -33,7 +33,7 @@ use futures::future::{loop_fn, Future, IntoFuture, Loop}; use futures::{Sink, Stream}; use std::env; use swarm::Transport; -use swarm::upgrade::{self, SimpleProtocol, UpgradeExt}; +use swarm::upgrade::{self, SimpleProtocol}; use tcp::TcpConfig; use tokio_core::reactor::Core; use tokio_io::AsyncRead; @@ -72,7 +72,7 @@ fn main() { } }; - plain_text.or_upgrade(secio) + upgrade::or(plain_text, upgrade::map(secio, |(socket, _)| socket)) }) // On top of plaintext or secio, we will use the multiplex protocol. @@ -99,36 +99,39 @@ fn main() { // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and // outgoing connections for us. - let (swarm_controller, swarm_future) = swarm::swarm(transport, proto, |socket, client_addr| { - println!("Successfully negotiated protocol with {}", client_addr); + let (swarm_controller, swarm_future) = swarm::swarm( + transport.clone().with_upgrade(proto), + |socket, client_addr| { + println!("Successfully negotiated protocol with {}", client_addr); - // The type of `socket` is exactly what the closure of `SimpleProtocol` returns. + // The type of `socket` is exactly what the closure of `SimpleProtocol` returns. - // We loop forever in order to handle all the messages sent by the client. - loop_fn(socket, move |socket| { - let client_addr = client_addr.clone(); - socket - .into_future() - .map_err(|(e, _)| e) - .and_then(move |(msg, rest)| { - if let Some(msg) = msg { - // One message has been received. We send it back to the client. - println!( - "Received a message from {}: {:?}\n => Sending back \ - identical message to remote", - client_addr, msg - ); - Box::new(rest.send(msg.freeze()).map(|m| Loop::Continue(m))) - as Box> - } else { - // End of stream. Connection closed. Breaking the loop. - println!("Received EOF from {}\n => Dropping connection", client_addr); - Box::new(Ok(Loop::Break(())).into_future()) - as Box> - } - }) - }) - }); + // We loop forever in order to handle all the messages sent by the client. + loop_fn(socket, move |socket| { + let client_addr = client_addr.clone(); + socket + .into_future() + .map_err(|(e, _)| e) + .and_then(move |(msg, rest)| { + if let Some(msg) = msg { + // One message has been received. We send it back to the client. + println!( + "Received a message from {}: {:?}\n => Sending back \ + identical message to remote", + client_addr, msg + ); + Box::new(rest.send(msg.freeze()).map(|m| Loop::Continue(m))) + as Box> + } else { + // End of stream. Connection closed. Breaking the loop. + println!("Received EOF from {}\n => Dropping connection", client_addr); + Box::new(Ok(Loop::Break(())).into_future()) + as Box> + } + }) + }) + }, + ); // We now use the controller to listen on the address. let address = swarm_controller diff --git a/example/examples/floodsub.rs b/example/examples/floodsub.rs index fc0f26d6..58bba6df 100644 --- a/example/examples/floodsub.rs +++ b/example/examples/floodsub.rs @@ -33,12 +33,12 @@ extern crate tokio_core; extern crate tokio_io; extern crate tokio_stdin; -use futures::future::Future; use futures::Stream; +use futures::future::Future; use peerstore::PeerId; use std::{env, mem}; +use swarm::upgrade; use swarm::{Multiaddr, Transport}; -use swarm::upgrade::{self, UpgradeExt}; use tcp::TcpConfig; use tokio_core::reactor::Core; use websocket::WsConfig; @@ -75,7 +75,7 @@ fn main() { } }; - plain_text.or_upgrade(secio) + upgrade::or(plain_text, upgrade::map(secio, |(socket, _)| socket)) }) // On top of plaintext or secio, we will use the multiplex protocol. @@ -102,8 +102,7 @@ fn main() { // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and // outgoing connections for us. let (swarm_controller, swarm_future) = swarm::swarm( - transport, - floodsub_upgrade.clone(), + transport.clone().with_upgrade(floodsub_upgrade.clone()), |socket, client_addr| { println!("Successfully negotiated protocol with {}", client_addr); socket @@ -142,7 +141,10 @@ fn main() { let target: Multiaddr = msg[6..].parse().unwrap(); println!("*Dialing {}*", target); swarm_controller - .dial_to_handler(target, floodsub_upgrade.clone()) + .dial_to_handler( + target, + transport.clone().with_upgrade(floodsub_upgrade.clone()), + ) .unwrap(); } else { floodsub_ctl.publish(&topic, msg.into_bytes()); diff --git a/example/examples/kademlia.rs b/example/examples/kademlia.rs index 8b324589..347d8a23 100644 --- a/example/examples/kademlia.rs +++ b/example/examples/kademlia.rs @@ -40,7 +40,7 @@ use std::env; use std::sync::Arc; use std::time::Duration; use swarm::Transport; -use swarm::upgrade::{self, UpgradeExt}; +use swarm::upgrade; use tcp::TcpConfig; use tokio_core::reactor::Core; @@ -80,7 +80,7 @@ fn main() { } }; - plain_text.or_upgrade(secio) + upgrade::or(plain_text, upgrade::map(secio, |(socket, _)| socket)) }) // On top of plaintext or secio, we will use the multiplex protocol. @@ -116,9 +116,13 @@ fn main() { // Let's put this `transport` into a *swarm*. The swarm will handle all the incoming and // outgoing connections for us. - let (swarm_controller, swarm_future) = swarm::swarm(transport, proto, |upgrade, _| upgrade); + let (swarm_controller, swarm_future) = swarm::swarm( + transport.clone().with_upgrade(proto.clone()), + |upgrade, _| upgrade, + ); - let (kad_controller, _kad_init) = kad_ctl_proto.start(swarm_controller.clone()); + let (kad_controller, _kad_init) = + kad_ctl_proto.start(swarm_controller.clone(), transport.with_upgrade(proto)); for listen_addr in listen_addrs { let addr = swarm_controller diff --git a/example/examples/ping-client.rs b/example/examples/ping-client.rs index 27f89e87..b8eede4a 100644 --- a/example/examples/ping-client.rs +++ b/example/examples/ping-client.rs @@ -33,7 +33,7 @@ use futures::Future; use futures::sync::oneshot; use std::env; use swarm::Transport; -use swarm::upgrade::{self, DeniedConnectionUpgrade, UpgradeExt}; +use swarm::upgrade::{self, DeniedConnectionUpgrade}; use tcp::TcpConfig; use tokio_core::reactor::Core; @@ -65,7 +65,7 @@ fn main() { } }; - plain_text.or_upgrade(secio) + upgrade::or(plain_text, upgrade::map(secio, |(socket, _)| socket)) }) // On top of plaintext or secio, we will use the multiplex protocol. @@ -81,8 +81,7 @@ fn main() { // by the listening part. We don't want to accept anything, so we pass a dummy object that // represents a connection that is always denied. let (swarm_controller, swarm_future) = swarm::swarm( - transport, - DeniedConnectionUpgrade, + transport.clone().with_upgrade(DeniedConnectionUpgrade), |_socket, _client_addr| -> Result<(), _> { unreachable!("All incoming connections should have been denied") }, @@ -91,7 +90,8 @@ fn main() { // We now use the controller to dial to the address. let (tx, rx) = oneshot::channel(); swarm_controller - .dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), ping::Ping, + .dial_custom_handler(target_addr.parse().expect("invalid multiaddr"), + transport.with_upgrade(ping::Ping), |(mut pinger, future), _| { let ping = pinger.ping().map_err(|_| unreachable!()).inspect(|_| { println!("Received pong from the remote"); diff --git a/floodsub/src/lib.rs b/floodsub/src/lib.rs index 40670626..0797588f 100644 --- a/floodsub/src/lib.rs +++ b/floodsub/src/lib.rs @@ -39,17 +39,11 @@ mod topic; pub use self::topic::{Topic, TopicBuilder, TopicHash}; -use std::fmt; -use std::hash::{Hash, Hasher}; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::iter; -use std::sync::Arc; -use std::sync::atomic::{AtomicUsize, Ordering}; -use bytes::{Bytes, BytesMut}; use byteorder::{BigEndian, WriteBytesExt}; +use bytes::{Bytes, BytesMut}; use fnv::{FnvHashMap, FnvHashSet, FnvHasher}; -use futures::{future, Future, Poll, Sink, Stream}; use futures::sync::mpsc; +use futures::{future, Future, Poll, Sink, Stream}; use libp2p_peerstore::PeerId; use libp2p_swarm::{ConnectionUpgrade, Endpoint}; use log::Level; @@ -57,6 +51,12 @@ use multiaddr::{AddrComponent, Multiaddr}; use parking_lot::{Mutex, RwLock}; use protobuf::Message as ProtobufMessage; use smallvec::SmallVec; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::iter; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use tokio_io::{AsyncRead, AsyncWrite}; use varint::VarintCodec; diff --git a/identify/src/protocol.rs b/identify/src/protocol.rs index 8a34bae1..00b1d00d 100644 --- a/identify/src/protocol.rs +++ b/identify/src/protocol.rs @@ -29,8 +29,8 @@ use protobuf::repeated::RepeatedField; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::iter; use structs_proto; -use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::Framed; +use tokio_io::{AsyncRead, AsyncWrite}; use varint::VarintCodec; /// Configuration for an upgrade to the identity protocol. @@ -238,11 +238,11 @@ mod tests { use self::libp2p_tcp_transport::TcpConfig; use self::tokio_core::reactor::Core; - use {IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig}; use futures::{Future, Stream}; use libp2p_swarm::Transport; use std::sync::mpsc; use std::thread; + use {IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig}; #[test] fn correct_transfer() { diff --git a/identify/src/transport.rs b/identify/src/transport.rs index 916b5fe0..93caebb1 100644 --- a/identify/src/transport.rs +++ b/identify/src/transport.rs @@ -26,6 +26,7 @@ use protocol::{IdentifyInfo, IdentifyOutput, IdentifyProtocolConfig}; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::ops::Deref; use std::time::Duration; +use tokio_io::{AsyncRead, AsyncWrite}; /// Implementation of `Transport`. See [the crate root description](index.html). #[derive(Debug, Clone)] @@ -59,13 +60,14 @@ impl IdentifyTransport { impl Transport for IdentifyTransport where Trans: Transport + Clone + 'static, // TODO: 'static :( + Trans::Output: AsyncRead + AsyncWrite, PStoreRef: Deref + Clone + 'static, // TODO: 'static :( for<'r> &'r PStore: Peerstore, { - type RawConn = Trans::RawConn; + type Output = Trans::Output; type Listener = Box>; - type ListenerUpgrade = Box>; - type Dial = Box>; + type ListenerUpgrade = Box>; + type Dial = Box>; #[inline] fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -275,11 +277,12 @@ where impl MuxedTransport for IdentifyTransport where Trans: MuxedTransport + Clone + 'static, + Trans::Output: AsyncRead + AsyncWrite, PStoreRef: Deref + Clone + 'static, for<'r> &'r PStore: Peerstore, { type Incoming = Box>; - type IncomingUpgrade = Box>; + type IncomingUpgrade = Box>; #[inline] fn next_incoming(self) -> Self::Incoming { @@ -391,8 +394,8 @@ mod tests { use self::tokio_core::reactor::Core; use IdentifyTransport; use futures::{Future, Stream}; - use libp2p_peerstore::{PeerAccess, PeerId, Peerstore}; use libp2p_peerstore::memory_peerstore::MemoryPeerstore; + use libp2p_peerstore::{PeerAccess, PeerId, Peerstore}; use libp2p_swarm::Transport; use multiaddr::{AddrComponent, Multiaddr}; use std::io::Error as IoError; @@ -410,9 +413,9 @@ mod tests { inner: TcpConfig, } impl Transport for UnderlyingTrans { - type RawConn = ::RawConn; + type Output = ::Output; type Listener = Box>; - type ListenerUpgrade = Box>; + type ListenerUpgrade = Box>; type Dial = ::Dial; #[inline] fn listen_on( diff --git a/kad/src/high_level.rs b/kad/src/high_level.rs index 85fd571e..e7779f8e 100644 --- a/kad/src/high_level.rs +++ b/kad/src/high_level.rs @@ -24,13 +24,12 @@ use bytes::Bytes; use fnv::FnvHashMap; -use futures::{self, future, Future}; use futures::sync::oneshot; +use futures::{self, future, Future}; use kad_server::{KadServerInterface, KademliaServerConfig, KademliaServerController}; use kbucket::{KBucketsPeerId, KBucketsTable, UpdateOutcome}; use libp2p_peerstore::{PeerAccess, PeerId, Peerstore}; -use libp2p_swarm::{Endpoint, MuxedTransport, SwarmController}; -use libp2p_swarm::ConnectionUpgrade; +use libp2p_swarm::{ConnectionUpgrade, Endpoint, MuxedTransport, SwarmController, Transport}; use multiaddr::Multiaddr; use parking_lot::Mutex; use protocol::ConnectionType; @@ -95,27 +94,28 @@ where } /// Turns the prototype into an actual controller by feeding it a swarm. - pub fn start( + pub fn start( self, - swarm: SwarmController, + swarm: SwarmController, + kademlia_transport: K, ) -> ( - KademliaController, + KademliaController, Box>, ) where P: Clone + Deref + 'static, // TODO: 'static :-/ for<'r> &'r Pc: Peerstore, - R: Clone + 'static, // TODO: 'static :-/ - T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ - C: Clone + ConnectionUpgrade + 'static, // TODO: 'static :-/ - C::NamesIter: Clone, - C::Output: From, + R: Clone + 'static, // TODO: 'static :-/ + T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ + T::Output: From, + K: Transport + Clone + 'static, // TODO: 'static :-/ { // TODO: initialization let controller = KademliaController { inner: self.inner.clone(), swarm_controller: swarm, + kademlia_transport, }; let init_future = { @@ -142,40 +142,40 @@ where /// Object that allows one to make queries on the Kademlia system. #[derive(Debug)] -pub struct KademliaController +pub struct KademliaController where - T: MuxedTransport + 'static, // TODO: 'static :-/ - C: ConnectionUpgrade + 'static, // TODO: 'static :-/ + T: MuxedTransport + 'static, // TODO: 'static :-/ { inner: Arc>, - swarm_controller: SwarmController, + swarm_controller: SwarmController, + kademlia_transport: K, } -impl Clone for KademliaController +impl Clone for KademliaController where T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ - C: Clone + ConnectionUpgrade + 'static, // TODO: 'static :-/ + K: Clone, { #[inline] fn clone(&self) -> Self { KademliaController { inner: self.inner.clone(), swarm_controller: self.swarm_controller.clone(), + kademlia_transport: self.kademlia_transport.clone(), } } } -impl KademliaController +impl KademliaController where P: Deref, for<'r> &'r Pc: Peerstore, R: Clone, T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ - C: Clone + ConnectionUpgrade + 'static, // TODO: 'static :-/ { /// Performs an iterative find node query on the network. /// - /// Will query the network for the peers that are the closest to `searched_key` and return + /// Will query the network for the peers that4 are the closest to `searched_key` and return /// the results. /// /// The algorithm used is a standard Kademlia algorithm. The details are not documented, so @@ -188,8 +188,8 @@ where where P: Clone + 'static, R: 'static, - C::NamesIter: Clone, - C::Output: From, + T::Output: From, + K: Transport + Clone + 'static, { query::find_node(self.clone(), searched_key) } @@ -214,10 +214,9 @@ impl KademliaUpgrade { /// Builds a connection upgrade from the controller. #[inline] - pub fn from_controller(ctl: &KademliaController) -> Self + pub fn from_controller(ctl: &KademliaController) -> Self where T: MuxedTransport, - C: ConnectionUpgrade, { KademliaUpgrade { inner: ctl.inner.clone(), @@ -408,15 +407,14 @@ where } } -impl query::QueryInterface for KademliaController +impl query::QueryInterface for KademliaController where P: Clone + Deref + 'static, // TODO: 'static :-/ for<'r> &'r Pc: Peerstore, - R: Clone + 'static, // TODO: 'static :-/ - T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ - C: Clone + ConnectionUpgrade + 'static, // TODO: 'static :-/ - C::NamesIter: Clone, - C::Output: From, + R: Clone + 'static, // TODO: 'static :-/ + T: Clone + MuxedTransport + 'static, // TODO: 'static :-/ + T::Output: From, + K: Transport + Clone + 'static, { #[inline] fn local_id(&self) -> &PeerId { @@ -469,11 +467,9 @@ where } Entry::Vacant(entry) => { // Need to open a connection. - let proto = KademliaUpgrade { - inner: self.inner.clone(), - upgrade: KademliaServerConfig::new(self.inner.clone()), - }; - match self.swarm_controller.dial_to_handler(addr, proto) { + match self.swarm_controller + .dial_to_handler(addr, self.kademlia_transport.clone()) + { Ok(()) => (), Err(_addr) => { let fut = future::err(IoError::new( diff --git a/kad/src/kad_server.rs b/kad/src/kad_server.rs index 305cb42e..5f6487c8 100644 --- a/kad/src/kad_server.rs +++ b/kad/src/kad_server.rs @@ -36,8 +36,8 @@ //! `Arc` in order to be available whenever we need to request something from a node. use bytes::Bytes; -use futures::{future, Future, Sink, Stream}; use futures::sync::{mpsc, oneshot}; +use futures::{future, Future, Sink, Stream}; use libp2p_peerstore::PeerId; use libp2p_swarm::ConnectionUpgrade; use libp2p_swarm::Endpoint; diff --git a/kad/src/protocol.rs b/kad/src/protocol.rs index 73947fb7..c6ea9f30 100644 --- a/kad/src/protocol.rs +++ b/kad/src/protocol.rs @@ -26,8 +26,8 @@ //! used to send messages. use bytes::Bytes; -use futures::{Sink, Stream}; use futures::future; +use futures::{Sink, Stream}; use libp2p_peerstore::PeerId; use libp2p_swarm::{ConnectionUpgrade, Endpoint, Multiaddr}; use protobuf::{self, Message}; @@ -162,9 +162,9 @@ where } /// Custom trait that derives `Sink` and `Stream`, so that we can box it. -pub trait KadStreamSink - : Stream + Sink - { +pub trait KadStreamSink: + Stream + Sink +{ } impl KadStreamSink for T where diff --git a/mplex/src/lib.rs b/mplex/src/lib.rs index 57c611a3..d899437b 100644 --- a/mplex/src/lib.rs +++ b/mplex/src/lib.rs @@ -35,25 +35,25 @@ extern crate rand; extern crate tokio_io; extern crate varint; -mod read; -mod write; -mod shared; mod header; +mod read; +mod shared; +mod write; use bytes::Bytes; use circular_buffer::Array; -use futures::{Async, Future, Poll}; use futures::future::{self, FutureResult}; -use header::MultiplexHeader; -use swarm::muxing::StreamMuxer; -use swarm::{ConnectionUpgrade, Endpoint, Multiaddr}; +use futures::{Async, Future, Poll}; use futures_mutex::Mutex; +use header::MultiplexHeader; use read::{read_stream, MultiplexReadState}; use shared::{buf_from_slice, ByteBuf, MultiplexShared}; -use std::iter; use std::io::{self, Read, Write}; +use std::iter; use std::sync::Arc; use std::sync::atomic::{self, AtomicUsize}; +use swarm::muxing::StreamMuxer; +use swarm::{ConnectionUpgrade, Endpoint, Multiaddr}; use tokio_io::{AsyncRead, AsyncWrite}; use write::write_stream; diff --git a/mplex/src/read.rs b/mplex/src/read.rs index 48e503d1..8e367398 100644 --- a/mplex/src/read.rs +++ b/mplex/src/read.rs @@ -18,14 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use {bytes, varint}; +use circular_buffer::Array; use futures::Async; use futures::task; use header::{MultiplexHeader, PacketType}; +use shared::SubstreamMetadata; use std::io; use tokio_io::AsyncRead; -use shared::SubstreamMetadata; -use circular_buffer::Array; +use {bytes, varint}; pub enum NextMultiplexState { NewStream(u32), diff --git a/mplex/src/shared.rs b/mplex/src/shared.rs index 670a3d55..8598c084 100644 --- a/mplex/src/shared.rs +++ b/mplex/src/shared.rs @@ -21,11 +21,11 @@ use read::MultiplexReadState; use write::MultiplexWriteState; -use circular_buffer::{Array, CircularBuffer}; -use std::collections::HashMap; -use bytes::Bytes; use arrayvec::ArrayVec; +use bytes::Bytes; +use circular_buffer::{Array, CircularBuffer}; use futures::task::Task; +use std::collections::HashMap; const BUF_SIZE: usize = 1024; diff --git a/mplex/src/write.rs b/mplex/src/write.rs index de5417a3..5f987a10 100644 --- a/mplex/src/write.rs +++ b/mplex/src/write.rs @@ -18,14 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use shared::{ByteBuf, MultiplexShared, SubstreamMetadata}; use header::MultiplexHeader; +use shared::{ByteBuf, MultiplexShared, SubstreamMetadata}; use circular_buffer; -use varint; use futures::task; use std::io; use tokio_io::AsyncWrite; +use varint; #[derive(Copy, Clone, PartialEq, Eq, Debug)] pub enum RequestType { diff --git a/multistream-select/src/dialer_select.rs b/multistream-select/src/dialer_select.rs index b1d55fb4..53bf0e61 100644 --- a/multistream-select/src/dialer_select.rs +++ b/multistream-select/src/dialer_select.rs @@ -23,8 +23,8 @@ use ProtocolChoiceError; use bytes::Bytes; -use futures::{Future, Sink, Stream}; use futures::future::{loop_fn, result, Loop}; +use futures::{Future, Sink, Stream}; use protocol::Dialer; use protocol::DialerToListenerMessage; diff --git a/multistream-select/src/length_delimited.rs b/multistream-select/src/length_delimited.rs index 9ba5fac9..16e653a3 100644 --- a/multistream-select/src/length_delimited.rs +++ b/multistream-select/src/length_delimited.rs @@ -26,10 +26,10 @@ //! We purposely only support a frame length of under 64kiB. Frames most consist in a short //! protocol name, which is highly unlikely to be more than 64kiB long. -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; -use std::marker::PhantomData; use futures::{Async, Poll, Sink, StartSend, Stream}; use smallvec::SmallVec; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::marker::PhantomData; use tokio_io::AsyncRead; /// Wraps around a `AsyncRead` and implements `Stream`. @@ -232,10 +232,10 @@ fn decode_length_prefix(buf: &[u8]) -> u16 { #[cfg(test)] mod tests { - use std::io::Cursor; - use std::io::ErrorKind; use futures::{Future, Stream}; use length_delimited::LengthDelimitedFramedRead; + use std::io::Cursor; + use std::io::ErrorKind; #[test] fn basic_read() { diff --git a/multistream-select/src/listener_select.rs b/multistream-select/src/listener_select.rs index 4af73e3d..63dba9e3 100644 --- a/multistream-select/src/listener_select.rs +++ b/multistream-select/src/listener_select.rs @@ -23,8 +23,8 @@ use ProtocolChoiceError; use bytes::Bytes; -use futures::{Future, Sink, Stream}; use futures::future::{err, loop_fn, Loop}; +use futures::{Future, Sink, Stream}; use protocol::DialerToListenerMessage; use protocol::Listener; diff --git a/multistream-select/src/protocol/dialer.rs b/multistream-select/src/protocol/dialer.rs index 163530f8..f78e16de 100644 --- a/multistream-select/src/protocol/dialer.rs +++ b/multistream-select/src/protocol/dialer.rs @@ -28,9 +28,9 @@ use protocol::ListenerToDialerMessage; use protocol::MULTISTREAM_PROTOCOL_WITH_LF; use protocol::MultistreamSelectError; use std::io::{BufRead, Cursor, Read}; -use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited::Builder as LengthDelimitedBuilder; use tokio_io::codec::length_delimited::FramedWrite as LengthDelimitedFramedWrite; +use tokio_io::{AsyncRead, AsyncWrite}; use varint; /// Wraps around a `AsyncRead+AsyncWrite`. Assumes that we're on the dialer's side. Produces and @@ -191,12 +191,12 @@ where #[cfg(test)] mod tests { extern crate tokio_core; - use bytes::Bytes; - use futures::{Sink, Stream}; - use futures::Future; - use protocol::{Dialer, DialerToListenerMessage, MultistreamSelectError}; use self::tokio_core::net::{TcpListener, TcpStream}; use self::tokio_core::reactor::Core; + use bytes::Bytes; + use futures::Future; + use futures::{Sink, Stream}; + use protocol::{Dialer, DialerToListenerMessage, MultistreamSelectError}; #[test] fn wrong_proto_name() { diff --git a/multistream-select/src/protocol/listener.rs b/multistream-select/src/protocol/listener.rs index 6d98a2d4..901e376f 100644 --- a/multistream-select/src/protocol/listener.rs +++ b/multistream-select/src/protocol/listener.rs @@ -27,9 +27,9 @@ use protocol::DialerToListenerMessage; use protocol::ListenerToDialerMessage; use protocol::MULTISTREAM_PROTOCOL_WITH_LF; use protocol::MultistreamSelectError; -use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited::Builder as LengthDelimitedBuilder; use tokio_io::codec::length_delimited::FramedWrite as LengthDelimitedFramedWrite; +use tokio_io::{AsyncRead, AsyncWrite}; use varint; /// Wraps around a `AsyncRead+AsyncWrite`. Assumes that we're on the listener's side. Produces and @@ -186,12 +186,12 @@ where #[cfg(test)] mod tests { extern crate tokio_core; - use bytes::Bytes; - use futures::{Sink, Stream}; - use futures::Future; - use protocol::{Dialer, Listener, ListenerToDialerMessage, MultistreamSelectError}; use self::tokio_core::net::{TcpListener, TcpStream}; use self::tokio_core::reactor::Core; + use bytes::Bytes; + use futures::Future; + use futures::{Sink, Stream}; + use protocol::{Dialer, Listener, ListenerToDialerMessage, MultistreamSelectError}; #[test] fn wrong_proto_name() { diff --git a/multistream-select/src/tests.rs b/multistream-select/src/tests.rs index bf479cd1..b2af03e4 100644 --- a/multistream-select/src/tests.rs +++ b/multistream-select/src/tests.rs @@ -24,16 +24,16 @@ extern crate tokio_core; -use {dialer_select_proto, listener_select_proto}; -use ProtocolChoiceError; -use bytes::Bytes; -use dialer_select::{dialer_select_proto_parallel, dialer_select_proto_serial}; -use futures::{Sink, Stream}; -use futures::Future; -use protocol::{Dialer, DialerToListenerMessage, Listener, ListenerToDialerMessage}; use self::tokio_core::net::TcpListener; use self::tokio_core::net::TcpStream; use self::tokio_core::reactor::Core; +use ProtocolChoiceError; +use bytes::Bytes; +use dialer_select::{dialer_select_proto_parallel, dialer_select_proto_serial}; +use futures::Future; +use futures::{Sink, Stream}; +use protocol::{Dialer, DialerToListenerMessage, Listener, ListenerToDialerMessage}; +use {dialer_select_proto, listener_select_proto}; #[test] fn negotiate_with_self_succeeds() { diff --git a/peerstore/src/json_peerstore.rs b/peerstore/src/json_peerstore.rs index 7372feed..3bcae3dc 100644 --- a/peerstore/src/json_peerstore.rs +++ b/peerstore/src/json_peerstore.rs @@ -22,6 +22,7 @@ use super::TTL; use PeerId; +use bs58; use datastore::{Datastore, JsonFileDatastore, JsonFileDatastoreEntry, Query}; use futures::{Future, Stream}; use multiaddr::Multiaddr; @@ -31,7 +32,6 @@ use std::io::Error as IoError; use std::iter; use std::path::PathBuf; use std::vec::IntoIter as VecIntoIter; -use bs58; /// Peerstore backend that uses a Json file. pub struct JsonPeerstore { diff --git a/peerstore/src/peer_info.rs b/peerstore/src/peer_info.rs index eacb762e..534e6467 100644 --- a/peerstore/src/peer_info.rs +++ b/peerstore/src/peer_info.rs @@ -28,9 +28,9 @@ use TTL; use multiaddr::Multiaddr; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::de::Error as DeserializerError; use serde::ser::SerializeStruct; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::cmp::Ordering; use std::time::{Duration, SystemTime, UNIX_EPOCH}; diff --git a/peerstore/src/peerstore.rs b/peerstore/src/peerstore.rs index ab82ac76..d3b6aff0 100644 --- a/peerstore/src/peerstore.rs +++ b/peerstore/src/peerstore.rs @@ -18,8 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use {PeerId, TTL}; use multiaddr::Multiaddr; +use {PeerId, TTL}; /// Implemented on objects that store peers. /// diff --git a/ping/src/lib.rs b/ping/src/lib.rs index 4b6daf5b..665c9d32 100644 --- a/ping/src/lib.rs +++ b/ping/src/lib.rs @@ -89,9 +89,9 @@ extern crate rand; extern crate tokio_io; use bytes::{BufMut, Bytes, BytesMut}; -use futures::{Future, Sink, Stream}; use futures::future::{loop_fn, FutureResult, IntoFuture, Loop}; use futures::sync::{mpsc, oneshot}; +use futures::{Future, Sink, Stream}; use libp2p_swarm::{ConnectionUpgrade, Endpoint, Multiaddr}; use log::Level; use parking_lot::Mutex; @@ -102,8 +102,8 @@ use std::error::Error; use std::io::Error as IoError; use std::iter; use std::sync::Arc; -use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::{Decoder, Encoder}; +use tokio_io::{AsyncRead, AsyncWrite}; /// Represents a prototype for an upgrade to handle the ping protocol. /// @@ -306,9 +306,9 @@ mod tests { use self::tokio_core::net::TcpStream; use self::tokio_core::reactor::Core; use super::Ping; - use futures::future::join_all; use futures::Future; use futures::Stream; + use futures::future::join_all; use libp2p_swarm::{ConnectionUpgrade, Endpoint}; #[test] diff --git a/ratelimit/src/lib.rs b/ratelimit/src/lib.rs index ea60c4c0..0786b052 100644 --- a/ratelimit/src/lib.rs +++ b/ratelimit/src/lib.rs @@ -140,8 +140,9 @@ pub struct ListenerUpgrade(RateLimited); impl Future for ListenerUpgrade where T: Transport + 'static, + T::Output: AsyncRead + AsyncWrite, { - type Item = (Connection, Multiaddr); + type Item = (Connection, Multiaddr); type Error = io::Error; fn poll(&mut self) -> Poll { @@ -157,9 +158,10 @@ pub struct Dial(RateLimited); impl IntoFuture for Dial where T: Transport + 'static, + T::Output: AsyncRead + AsyncWrite, { type Future = Box>; - type Item = (Connection, Multiaddr); + type Item = (Connection, Multiaddr); type Error = io::Error; fn into_future(self) -> Self::Future { @@ -176,8 +178,9 @@ where impl Transport for RateLimited where T: Transport + 'static, + T::Output: AsyncRead + AsyncWrite, { - type RawConn = Connection; + type Output = Connection; type Listener = Listener; type ListenerUpgrade = ListenerUpgrade; type Dial = Dial; diff --git a/rw-stream-sink/src/lib.rs b/rw-stream-sink/src/lib.rs index 9fe0779b..fe6e9eeb 100644 --- a/rw-stream-sink/src/lib.rs +++ b/rw-stream-sink/src/lib.rs @@ -34,12 +34,12 @@ extern crate bytes; extern crate futures; extern crate tokio_io; +use bytes::{Buf, IntoBuf}; +use futures::{Async, AsyncSink, Poll, Sink, Stream}; use std::cmp; use std::io::Error as IoError; use std::io::ErrorKind as IoErrorKind; use std::io::{Read, Write}; -use bytes::{Buf, IntoBuf}; -use futures::{Async, AsyncSink, Poll, Sink, Stream}; use tokio_io::{AsyncRead, AsyncWrite}; /// Wraps around a `Stream + Sink` whose items are buffers. Implements `AsyncRead` and `AsyncWrite`. @@ -164,11 +164,11 @@ where #[cfg(test)] mod tests { - use bytes::Bytes; - use futures::{Future, Poll, Sink, StartSend, Stream}; - use futures::sync::mpsc::channel; - use std::io::Read; use RwStreamSink; + use bytes::Bytes; + use futures::sync::mpsc::channel; + use futures::{Future, Poll, Sink, StartSend, Stream}; + use std::io::Read; // This struct merges a stream and a sink and is quite useful for tests. struct Wrapper(St, Si); diff --git a/secio/src/codec/mod.rs b/secio/src/codec/mod.rs index afc876b4..3495a95d 100644 --- a/secio/src/codec/mod.rs +++ b/secio/src/codec/mod.rs @@ -26,8 +26,8 @@ use self::encode::EncoderMiddleware; use crypto::symmetriccipher::SynchronousStreamCipher; use ring::hmac; -use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; +use tokio_io::{AsyncRead, AsyncWrite}; mod decode; mod encode; @@ -59,6 +59,9 @@ where #[cfg(test)] mod tests { extern crate tokio_core; + use self::tokio_core::net::TcpListener; + use self::tokio_core::net::TcpStream; + use self::tokio_core::reactor::Core; use super::DecoderMiddleware; use super::EncoderMiddleware; use super::full_codec; @@ -66,16 +69,13 @@ mod tests { use crypto::aessafe::AesSafe256Encryptor; use crypto::blockmodes::CtrMode; use error::SecioError; - use futures::{Future, Sink, Stream}; use futures::sync::mpsc::channel; + use futures::{Future, Sink, Stream}; use rand; use ring::digest::SHA256; use ring::hmac::SigningKey; use ring::hmac::VerificationKey; use std::io::Error as IoError; - use self::tokio_core::net::TcpListener; - use self::tokio_core::net::TcpStream; - use self::tokio_core::reactor::Core; use tokio_io::codec::length_delimited::Framed; #[test] diff --git a/secio/src/handshake.rs b/secio/src/handshake.rs index d03d9683..f33762cb 100644 --- a/secio/src/handshake.rs +++ b/secio/src/handshake.rs @@ -30,19 +30,19 @@ use futures::stream::Stream; use keys_proto::{KeyType as KeyTypeProtobuf, PublicKey as PublicKeyProtobuf}; use protobuf::Message as ProtobufMessage; use protobuf::core::parse_from_bytes as protobuf_parse_from_bytes; -use ring::{agreement, digest, rand}; use ring::agreement::EphemeralPrivateKey; use ring::hmac::{SigningContext, SigningKey, VerificationKey}; use ring::rand::SecureRandom; -use ring::signature::{RSAKeyPair, RSASigningState, RSA_PKCS1_2048_8192_SHA256, RSA_PKCS1_SHA256}; use ring::signature::verify as signature_verify; +use ring::signature::{RSAKeyPair, RSASigningState, RSA_PKCS1_2048_8192_SHA256, RSA_PKCS1_SHA256}; +use ring::{agreement, digest, rand}; use std::cmp::{self, Ordering}; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::mem; use std::sync::Arc; use structs_proto::{Exchange, Propose}; -use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited; +use tokio_io::{AsyncRead, AsyncWrite}; use untrusted::Input as UntrustedInput; /// Performs a handshake on the given socket. @@ -526,6 +526,9 @@ fn stretch_key(key: &SigningKey, result: &mut [u8]) { #[cfg(test)] mod tests { extern crate tokio_core; + use self::tokio_core::net::TcpListener; + use self::tokio_core::net::TcpStream; + use self::tokio_core::reactor::Core; use super::handshake; use super::stretch_key; use futures::Future; @@ -534,9 +537,6 @@ mod tests { use ring::hmac::SigningKey; use ring::signature::RSAKeyPair; use std::sync::Arc; - use self::tokio_core::net::TcpListener; - use self::tokio_core::net::TcpStream; - use self::tokio_core::reactor::Core; use untrusted::Input; #[test] diff --git a/secio/src/lib.rs b/secio/src/lib.rs index 23c88417..349a7791 100644 --- a/secio/src/lib.rs +++ b/secio/src/lib.rs @@ -39,7 +39,7 @@ //! # fn main() { //! use futures::Future; //! use libp2p_secio::{SecioConfig, SecioKeyPair}; -//! use libp2p_swarm::{Multiaddr, Transport}; +//! use libp2p_swarm::{Multiaddr, Transport, upgrade}; //! use libp2p_tcp_transport::TcpConfig; //! use tokio_core::reactor::Core; //! use tokio_io::io::write_all; @@ -52,10 +52,12 @@ //! //let private_key = include_bytes!("test-private-key.pk8"); //! # let public_key = vec![]; //! //let public_key = include_bytes!("test-public-key.der").to_vec(); -//! SecioConfig { -//! // See the documentation of `SecioKeyPair`. +//! let upgrade = SecioConfig { +//! // See the documentation of `SecioKeyPair`. //! key: SecioKeyPair::rsa_from_pkcs8(private_key, public_key).unwrap(), -//! } +//! }; +//! +//! upgrade::map(upgrade, |(socket, _remote_key)| socket) //! }); //! //! let future = transport.dial("/ip4/127.0.0.1/tcp/12345".parse::().unwrap()) @@ -95,8 +97,8 @@ extern crate untrusted; pub use self::error::SecioError; use bytes::{Bytes, BytesMut}; -use futures::{Future, Poll, Sink, StartSend, Stream}; use futures::stream::MapErr as StreamMapErr; +use futures::{Future, Poll, Sink, StartSend, Stream}; use libp2p_swarm::Multiaddr; use ring::signature::RSAKeyPair; use rw_stream_sink::RwStreamSink; @@ -110,8 +112,8 @@ use untrusted::Input; mod algo_support; mod codec; mod error; -mod keys_proto; mod handshake; +mod keys_proto; mod structs_proto; /// Implementation of the `ConnectionUpgrade` trait of `libp2p_swarm`. Automatically applies @@ -179,16 +181,19 @@ enum SecioKeyPairInner { } #[derive(Debug, Clone)] -pub enum SecioPublicKey<'a> { +pub enum SecioPublicKey { /// DER format. - Rsa(&'a [u8]), + Rsa(Vec), } impl libp2p_swarm::ConnectionUpgrade for SecioConfig where S: AsyncRead + AsyncWrite + 'static, { - type Output = RwStreamSink, fn(SecioError) -> IoError>>; + type Output = ( + RwStreamSink, fn(SecioError) -> IoError>>, + SecioPublicKey, + ); type Future = Box>; type NamesIter = iter::Once<(Bytes, ())>; type UpgradeIdentifier = (); @@ -209,9 +214,9 @@ where info!(target: "libp2p-secio", "starting secio upgrade with {:?}", remote_addr); let fut = SecioMiddleware::handshake(incoming, self.key); - let wrapped = fut.map(|stream_sink| { + let wrapped = fut.map(|(stream_sink, pubkey)| { let mapped = stream_sink.map_err(map_err as fn(_) -> _); - RwStreamSink::new(mapped) + (RwStreamSink::new(mapped), pubkey) }).map_err(map_err); Box::new(wrapped) } @@ -229,7 +234,6 @@ fn map_err(err: SecioError) -> IoError { /// individually, so you are encouraged to group data in few frames if possible. pub struct SecioMiddleware { inner: codec::FullCodec, - remote_pubkey_der: Vec, } impl SecioMiddleware @@ -243,25 +247,18 @@ where pub fn handshake<'a>( socket: S, key_pair: SecioKeyPair, - ) -> Box, Error = SecioError> + 'a> + ) -> Box, SecioPublicKey), Error = SecioError> + 'a> where S: 'a, { let SecioKeyPairInner::Rsa { private, public } = key_pair.inner; - let fut = - handshake::handshake(socket, public, private).map(|(inner, pubkey)| SecioMiddleware { - inner: inner, - remote_pubkey_der: pubkey, - }); + let fut = handshake::handshake(socket, public, private).map(|(inner, pubkey)| { + let inner = SecioMiddleware { inner }; + (inner, SecioPublicKey::Rsa(pubkey)) + }); Box::new(fut) } - - /// Returns the public key of the remote in the `DER` format. - #[inline] - pub fn remote_public_key_der(&self) -> SecioPublicKey { - SecioPublicKey::Rsa(&self.remote_pubkey_der) - } } impl Sink for SecioMiddleware diff --git a/swarm/src/connection_reuse.rs b/swarm/src/connection_reuse.rs index b50a7b78..2764544b 100644 --- a/swarm/src/connection_reuse.rs +++ b/swarm/src/connection_reuse.rs @@ -42,14 +42,15 @@ use fnv::FnvHashMap; use futures::future::{self, Either, FutureResult, IntoFuture}; use futures::{Async, Future, Poll, Stream}; -use futures::stream::FuturesUnordered; use futures::stream::Fuse as StreamFuse; +use futures::stream::FuturesUnordered; use futures::sync::mpsc; use multiaddr::Multiaddr; use muxing::StreamMuxer; use parking_lot::Mutex; use std::io::{self, Error as IoError}; use std::sync::Arc; +use tokio_io::{AsyncRead, AsyncWrite}; use transport::{MuxedTransport, Transport, UpgradedNode}; use upgrade::ConnectionUpgrade; @@ -62,7 +63,8 @@ use upgrade::ConnectionUpgrade; pub struct ConnectionReuse where T: Transport, - C: ConnectionUpgrade, + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade, C::Output: StreamMuxer, { // Underlying transport and connection upgrade for when we need to dial or listen. @@ -94,7 +96,8 @@ where impl From> for ConnectionReuse where T: Transport, - C: ConnectionUpgrade, + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade, C::Output: StreamMuxer, { #[inline] @@ -115,16 +118,17 @@ where impl Transport for ConnectionReuse where - T: Transport + 'static, // TODO: 'static :( - C: ConnectionUpgrade + 'static, // TODO: 'static :( + T: Transport + 'static, // TODO: 'static :( + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade + 'static, // TODO: 'static :( C: Clone, C::Output: StreamMuxer + Clone, C::NamesIter: Clone, // TODO: not elegant { - type RawConn = ::Substream; + type Output = ::Substream; type Listener = Box>; - type ListenerUpgrade = FutureResult<(Self::RawConn, Multiaddr), IoError>; - type Dial = Box>; + type ListenerUpgrade = FutureResult<(Self::Output, Multiaddr), IoError>; + type Dial = Box>; fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { let (listener, new_addr) = match self.inner.listen_on(addr.clone()) { @@ -215,8 +219,9 @@ where impl MuxedTransport for ConnectionReuse where - T: Transport + 'static, // TODO: 'static :( - C: ConnectionUpgrade + 'static, // TODO: 'static :( + T: Transport + 'static, // TODO: 'static :( + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade + 'static, // TODO: 'static :( C: Clone, C::Output: StreamMuxer + Clone, C::NamesIter: Clone, // TODO: not elegant diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 730a8f79..274f3ce4 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -188,11 +188,12 @@ //! let transport = libp2p_tcp_transport::TcpConfig::new(core.handle()) //! .with_dummy_muxing(); //! -//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport, Ping, |(mut pinger, service), client_addr| { -//! pinger.ping().map_err(|_| panic!()) -//! .select(service).map_err(|_| panic!()) -//! .map(|_| ()) -//! }); +//! let (swarm_controller, swarm_future) = libp2p_swarm::swarm(transport.with_upgrade(Ping), +//! |(mut pinger, service), client_addr| { +//! pinger.ping().map_err(|_| panic!()) +//! .select(service).map_err(|_| panic!()) +//! .map(|_| ()) +//! }); //! //! // The `swarm_controller` can then be used to do some operations. //! swarm_controller.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); diff --git a/swarm/src/swarm.rs b/swarm/src/swarm.rs index 3a8242d4..07399a77 100644 --- a/swarm/src/swarm.rs +++ b/swarm/src/swarm.rs @@ -18,13 +18,12 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use std::fmt; -use std::io::Error as IoError; -use futures::{future, Async, Future, IntoFuture, Poll, Stream}; use futures::stream::{FuturesUnordered, StreamFuture}; use futures::sync::mpsc; -use transport::UpgradedNode; -use {ConnectionUpgrade, Multiaddr, MuxedTransport}; +use futures::{future, Async, Future, IntoFuture, Poll, Stream}; +use std::fmt; +use std::io::Error as IoError; +use {Multiaddr, MuxedTransport, Transport}; /// Creates a swarm. /// @@ -34,29 +33,24 @@ use {ConnectionUpgrade, Multiaddr, MuxedTransport}; /// Produces a `SwarmController` and an implementation of `Future`. The controller can be used to /// control, and the `Future` must be driven to completion in order for things to work. /// -pub fn swarm( +pub fn swarm( transport: T, - upgrade: C, handler: H, -) -> (SwarmController, SwarmFuture) +) -> (SwarmController, SwarmFuture) where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ - C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ - C::NamesIter: Clone, // TODO: not elegant - H: FnMut(C::Output, Multiaddr) -> F, + H: FnMut(T::Output, Multiaddr) -> F, F: IntoFuture, { let (new_dialers_tx, new_dialers_rx) = mpsc::unbounded(); let (new_listeners_tx, new_listeners_rx) = mpsc::unbounded(); let (new_toprocess_tx, new_toprocess_rx) = mpsc::unbounded(); - let upgraded = transport.clone().with_upgrade(upgrade); - let future = SwarmFuture { - upgraded: upgraded.clone(), + transport: transport.clone(), handler: handler, new_listeners: new_listeners_rx, - next_incoming: upgraded.clone().next_incoming(), + next_incoming: transport.clone().next_incoming(), listeners: FuturesUnordered::new(), listeners_upgrade: FuturesUnordered::new(), dialers: FuturesUnordered::new(), @@ -67,7 +61,6 @@ where let controller = SwarmController { transport: transport, - upgraded: upgraded, new_listeners: new_listeners_tx, new_dialers: new_dialers_tx, new_toprocess: new_toprocess_tx, @@ -77,46 +70,34 @@ where } /// Allows control of what the swarm is doing. -pub struct SwarmController +pub struct SwarmController where - T: MuxedTransport + 'static, // TODO: 'static :-/ - C: ConnectionUpgrade + 'static, // TODO: 'static :-/ + T: MuxedTransport + 'static, // TODO: 'static :-/ { transport: T, - upgraded: UpgradedNode, - new_listeners: mpsc::UnboundedSender< - Box< - Stream< - Item = Box>, - Error = IoError, - >, - >, - >, - new_dialers: mpsc::UnboundedSender>>, + new_listeners: mpsc::UnboundedSender, + new_dialers: mpsc::UnboundedSender>>, new_toprocess: mpsc::UnboundedSender>>, } -impl fmt::Debug for SwarmController +impl fmt::Debug for SwarmController where T: fmt::Debug + MuxedTransport + 'static, // TODO: 'static :-/ - C: fmt::Debug + ConnectionUpgrade + 'static, // TODO: 'static :-/ { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_tuple("SwarmController") - .field(&self.upgraded) + .field(&self.transport) .finish() } } -impl Clone for SwarmController +impl Clone for SwarmController where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ - C: ConnectionUpgrade + 'static + Clone, // TODO: 'static :-/ { - fn clone(&self) -> SwarmController { + fn clone(&self) -> SwarmController { SwarmController { transport: self.transport.clone(), - upgraded: self.upgraded.clone(), new_listeners: self.new_listeners.clone(), new_dialers: self.new_dialers.clone(), new_toprocess: self.new_toprocess.clone(), @@ -124,31 +105,27 @@ where } } -impl SwarmController +impl SwarmController where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/ - C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ - C::NamesIter: Clone, // TODO: not elegant { /// Asks the swarm to dial the node with the given multiaddress. The connection is then /// upgraded using the `upgrade`, and the output is sent to the handler that was passed when /// calling `swarm`. // TODO: consider returning a future so that errors can be processed? - pub fn dial_to_handler(&self, multiaddr: Multiaddr, upgrade: Du) -> Result<(), Multiaddr> + pub fn dial_to_handler(&self, multiaddr: Multiaddr, transport: Du) -> Result<(), Multiaddr> where - Du: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ - Du::Output: Into, + Du: Transport + 'static, // TODO: 'static :-/ + Du::Output: Into, { trace!(target: "libp2p-swarm", "Swarm dialing {}", multiaddr); - match self.transport - .clone() - .with_upgrade(upgrade) - .dial(multiaddr.clone()) - { + match transport.dial(multiaddr.clone()) { Ok(dial) => { - let dial = Box::new(dial.map(|(d, client_addr)| (d.into(), client_addr))) - as Box>; + let dial = Box::new( + dial.into_future() + .map(|(d, client_addr)| (d.into(), client_addr)), + ) as Box>; // Ignoring errors if the receiver has been closed, because in that situation // nothing is going to be processed anyway. let _ = self.new_dialers.unbounded_send(dial); @@ -167,19 +144,19 @@ where pub fn dial_custom_handler( &self, multiaddr: Multiaddr, - upgrade: Du, + transport: Du, and_then: Df, ) -> Result<(), Multiaddr> where - Du: ConnectionUpgrade + 'static, // TODO: 'static :-/ - Df: FnOnce(Du::Output, Multiaddr) -> Dfu + 'static, // TODO: 'static :-/ + Du: Transport + 'static, // TODO: 'static :-/ + Df: FnOnce(Du::Output, Multiaddr) -> Dfu + 'static, // TODO: 'static :-/ Dfu: IntoFuture + 'static, // TODO: 'static :-/ { trace!(target: "libp2p-swarm", "Swarm dialing {} with custom handler", multiaddr); - match self.transport.clone().with_upgrade(upgrade).dial(multiaddr) { + match transport.dial(multiaddr) { Ok(dial) => { - let dial = Box::new(dial.and_then(|(d, m)| and_then(d, m))) as Box<_>; + let dial = Box::new(dial.into_future().and_then(|(d, m)| and_then(d, m))) as Box<_>; // Ignoring errors if the receiver has been closed, because in that situation // nothing is going to be processed anyway. let _ = self.new_toprocess.unbounded_send(dial); @@ -192,7 +169,7 @@ where /// Adds a multiaddr to listen on. All the incoming connections will use the `upgrade` that /// was passed to `swarm`. pub fn listen_on(&self, multiaddr: Multiaddr) -> Result { - match self.upgraded.clone().listen_on(multiaddr) { + match self.transport.clone().listen_on(multiaddr) { Ok((listener, new_addr)) => { trace!(target: "libp2p-swarm", "Swarm listening on {}", new_addr); // Ignoring errors if the receiver has been closed, because in that situation @@ -206,49 +183,37 @@ where } /// Future that must be driven to completion in order for the swarm to work. -pub struct SwarmFuture +pub struct SwarmFuture where - T: MuxedTransport + 'static, // TODO: 'static :-/ - C: ConnectionUpgrade + 'static, // TODO: 'static :-/ + T: MuxedTransport + 'static, // TODO: 'static :-/ { - upgraded: UpgradedNode, + transport: T, handler: H, - new_listeners: mpsc::UnboundedReceiver< - Box< - Stream< - Item = Box>, - Error = IoError, - >, - >, - >, - next_incoming: Box< - Future>, Error = IoError>, - >, + new_listeners: mpsc::UnboundedReceiver, + next_incoming: T::Incoming, listeners: FuturesUnordered< StreamFuture< Box< Stream< - Item = Box>, + Item = Box>, Error = IoError, >, >, >, >, listeners_upgrade: - FuturesUnordered>>, - dialers: FuturesUnordered>>, + FuturesUnordered>>, + dialers: FuturesUnordered>>, new_dialers: - mpsc::UnboundedReceiver>>, + mpsc::UnboundedReceiver>>, to_process: FuturesUnordered>>>, new_toprocess: mpsc::UnboundedReceiver>>, } -impl Future for SwarmFuture +impl Future for SwarmFuture where T: MuxedTransport + Clone + 'static, // TODO: 'static :-/, - C: ConnectionUpgrade + Clone + 'static, // TODO: 'static :-/ - C::NamesIter: Clone, // TODO: not elegant - H: FnMut(C::Output, Multiaddr) -> If, + H: FnMut(T::Output, Multiaddr) -> If, If: IntoFuture, F: Future, { @@ -262,19 +227,22 @@ where Ok(Async::Ready(connec)) => { debug!(target: "libp2p-swarm", "Swarm received new multiplexed \ incoming connection"); - self.next_incoming = self.upgraded.clone().next_incoming(); - self.listeners_upgrade.push(connec); + self.next_incoming = self.transport.clone().next_incoming(); + self.listeners_upgrade.push(Box::new(connec) as Box<_>); } Ok(Async::NotReady) => {} Err(err) => { debug!(target: "libp2p-swarm", "Error in multiplexed incoming \ connection: {:?}", err); - self.next_incoming = self.upgraded.clone().next_incoming(); + self.next_incoming = self.transport.clone().next_incoming(); } }; match self.new_listeners.poll() { Ok(Async::Ready(Some(new_listener))) => { + let new_listener = Box::new( + new_listener.map(|f| Box::new(f) as Box>), + ) as Box>; self.listeners.push(new_listener.into_future()); } Ok(Async::Ready(None)) | Err(_) => { diff --git a/swarm/src/transport/and_then.rs b/swarm/src/transport/and_then.rs new file mode 100644 index 00000000..b87bc012 --- /dev/null +++ b/swarm/src/transport/and_then.rs @@ -0,0 +1,141 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use multiaddr::Multiaddr; +use std::io::Error as IoError; +use transport::{MuxedTransport, Transport}; +use upgrade::Endpoint; + +/// See the `Transport::and_then` method. +#[inline] +pub fn and_then(transport: T, upgrade: C) -> AndThen { + AndThen { transport, upgrade } +} + +/// See the `Transport::and_then` method. +#[derive(Debug, Clone)] +pub struct AndThen { + transport: T, + upgrade: C, +} + +impl Transport for AndThen +where + T: Transport + 'static, + C: FnOnce(T::Output, Endpoint, Multiaddr) -> F + Clone + 'static, + F: Future + 'static, +{ + type Output = O; + type Listener = Box>; + type ListenerUpgrade = Box>; + type Dial = Box>; + + #[inline] + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + let upgrade = self.upgrade; + + let (listening_stream, new_addr) = match self.transport.listen_on(addr) { + Ok((l, new_addr)) => (l, new_addr), + Err((trans, addr)) => { + let builder = AndThen { + transport: trans, + upgrade: upgrade, + }; + + return Err((builder, addr)); + } + }; + + // Try to negotiate the protocol. + // Note that failing to negotiate a protocol will never produce a future with an error. + // Instead the `stream` will produce `Ok(Err(...))`. + // `stream` can only produce an `Err` if `listening_stream` produces an `Err`. + let stream = listening_stream.map(move |connection| { + let upgrade = upgrade.clone(); + let future = connection.and_then(move |(stream, client_addr)| { + upgrade(stream, Endpoint::Listener, client_addr.clone()).map(|o| (o, client_addr)) + }); + + Box::new(future) as Box<_> + }); + + Ok((Box::new(stream), new_addr)) + } + + #[inline] + fn dial(self, addr: Multiaddr) -> Result { + let upgrade = self.upgrade; + + let dialed_fut = match self.transport.dial(addr.clone()) { + Ok(f) => f.into_future(), + Err((trans, addr)) => { + let builder = AndThen { + transport: trans, + upgrade: upgrade, + }; + + return Err((builder, addr)); + } + }; + + let future = dialed_fut + // Try to negotiate the protocol. + .and_then(move |(connection, client_addr)| { + upgrade(connection, Endpoint::Dialer, client_addr.clone()) + .map(|o| (o, client_addr)) + }); + + Ok(Box::new(future)) + } + + #[inline] + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.nat_traversal(server, observed) + } +} + +impl MuxedTransport for AndThen +where + T: MuxedTransport + 'static, + C: FnOnce(T::Output, Endpoint, Multiaddr) -> F + Clone + 'static, + F: Future + 'static, +{ + type Incoming = Box>; + type IncomingUpgrade = Box>; + + #[inline] + fn next_incoming(self) -> Self::Incoming { + let upgrade = self.upgrade; + + let future = self.transport.next_incoming().map(|future| { + // Try to negotiate the protocol. + let future = future.and_then(move |(connection, client_addr)| { + let upgrade = upgrade.clone(); + upgrade(connection, Endpoint::Listener, client_addr.clone()) + .map(|o| (o, client_addr)) + }); + + Box::new(future) as Box> + }); + + Box::new(future) as Box<_> + } +} diff --git a/swarm/src/transport/choice.rs b/swarm/src/transport/choice.rs index dd37a121..dd981524 100644 --- a/swarm/src/transport/choice.rs +++ b/swarm/src/transport/choice.rs @@ -39,7 +39,7 @@ where A: Transport, B: Transport, { - type RawConn = EitherSocket; + type Output = EitherSocket; type Listener = EitherListenStream; type ListenerUpgrade = EitherListenUpgrade; type Dial = @@ -88,12 +88,12 @@ where B::Incoming: 'static, // TODO: meh :-/ A::IncomingUpgrade: 'static, // TODO: meh :-/ B::IncomingUpgrade: 'static, // TODO: meh :-/ - A::RawConn: 'static, // TODO: meh :-/ - B::RawConn: 'static, // TODO: meh :-/ + A::Output: 'static, // TODO: meh :-/ + B::Output: 'static, // TODO: meh :-/ { type Incoming = Box>; type IncomingUpgrade = - Box, Multiaddr), Error = IoError>>; + Box, Multiaddr), Error = IoError>>; #[inline] fn next_incoming(self) -> Self::Incoming { diff --git a/swarm/src/transport/denied.rs b/swarm/src/transport/denied.rs index b59df8bd..8a5e6325 100644 --- a/swarm/src/transport/denied.rs +++ b/swarm/src/transport/denied.rs @@ -22,8 +22,8 @@ use futures::future; use futures::prelude::*; use multiaddr::Multiaddr; use std::io::{self, Cursor}; -use transport::Transport; use transport::MuxedTransport; +use transport::Transport; /// Dummy implementation of `Transport` that just denies every single attempt. #[derive(Debug, Copy, Clone)] @@ -31,10 +31,10 @@ pub struct DeniedTransport; impl Transport for DeniedTransport { // TODO: could use `!` for associated types once stable - type RawConn = Cursor>; + type Output = Cursor>; type Listener = Box>; - type ListenerUpgrade = Box>; - type Dial = Box>; + type ListenerUpgrade = Box>; + type Dial = Box>; #[inline] fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { @@ -54,7 +54,7 @@ impl Transport for DeniedTransport { impl MuxedTransport for DeniedTransport { type Incoming = future::Empty; - type IncomingUpgrade = future::Empty<(Self::RawConn, Multiaddr), io::Error>; + type IncomingUpgrade = future::Empty<(Self::Output, Multiaddr), io::Error>; #[inline] fn next_incoming(self) -> Self::Incoming { diff --git a/swarm/src/transport/dummy.rs b/swarm/src/transport/dummy.rs index 8a3940fa..d0876bbd 100644 --- a/swarm/src/transport/dummy.rs +++ b/swarm/src/transport/dummy.rs @@ -40,7 +40,7 @@ where T: Transport, { type Incoming = future::Empty; - type IncomingUpgrade = future::Empty<(T::RawConn, Multiaddr), IoError>; + type IncomingUpgrade = future::Empty<(T::Output, Multiaddr), IoError>; fn next_incoming(self) -> Self::Incoming where @@ -54,7 +54,7 @@ impl Transport for DummyMuxing where T: Transport, { - type RawConn = T::RawConn; + type Output = T::Output; type Listener = T::Listener; type ListenerUpgrade = T::ListenerUpgrade; type Dial = T::Dial; diff --git a/swarm/src/transport/map.rs b/swarm/src/transport/map.rs new file mode 100644 index 00000000..94070f2f --- /dev/null +++ b/swarm/src/transport/map.rs @@ -0,0 +1,108 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::prelude::*; +use multiaddr::Multiaddr; +use std::io::Error as IoError; +use transport::{MuxedTransport, Transport}; +use Endpoint; + +/// See `Transport::map`. +#[derive(Debug, Copy, Clone)] +pub struct Map { + transport: T, + map: F, +} + +impl Map { + /// Internal function that builds a `Map`. + #[inline] + pub(crate) fn new(transport: T, map: F) -> Map { + Map { transport, map } + } +} + +impl Transport for Map +where + T: Transport + 'static, // TODO: 'static :-/ + F: FnOnce(T::Output, Endpoint, Multiaddr) -> D + Clone + 'static, // TODO: 'static :-/ +{ + type Output = D; + type Listener = Box>; + type ListenerUpgrade = Box>; + type Dial = Box>; + + fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { + let map = self.map; + + match self.transport.listen_on(addr) { + Ok((stream, listen_addr)) => { + let stream = stream.map(move |future| { + let map = map.clone(); + let future = future + .into_future() + .map(move |(output, addr)| (map(output, Endpoint::Listener, addr.clone()), addr)); + Box::new(future) as Box<_> + }); + Ok((Box::new(stream), listen_addr)) + } + Err((transport, addr)) => Err((Map { transport, map }, addr)), + } + } + + fn dial(self, addr: Multiaddr) -> Result { + let map = self.map; + + match self.transport.dial(addr) { + Ok(future) => { + let future = future + .into_future() + .map(move |(output, addr)| (map(output, Endpoint::Dialer, addr.clone()), addr)); + Ok(Box::new(future)) + } + Err((transport, addr)) => Err((Map { transport, map }, addr)), + } + } + + #[inline] + fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { + self.transport.nat_traversal(server, observed) + } +} + +impl MuxedTransport for Map +where + T: MuxedTransport + 'static, // TODO: 'static :-/ + F: FnOnce(T::Output, Endpoint, Multiaddr) -> D + Clone + 'static, // TODO: 'static :-/ +{ + type Incoming = Box>; + type IncomingUpgrade = Box>; + + fn next_incoming(self) -> Self::Incoming { + let map = self.map; + let future = self.transport.next_incoming().map(move |upgrade| { + let future = upgrade.map(move |(output, addr)| { + (map(output, Endpoint::Listener, addr.clone()), addr) + }); + Box::new(future) as Box<_> + }); + Box::new(future) + } +} diff --git a/swarm/src/transport/mod.rs b/swarm/src/transport/mod.rs index 865ea389..6ed9c2ec 100644 --- a/swarm/src/transport/mod.rs +++ b/swarm/src/transport/mod.rs @@ -33,11 +33,13 @@ use futures::prelude::*; use multiaddr::Multiaddr; use std::io::Error as IoError; use tokio_io::{AsyncRead, AsyncWrite}; -use upgrade::ConnectionUpgrade; +use upgrade::{ConnectionUpgrade, Endpoint}; +pub mod and_then; pub mod choice; pub mod denied; pub mod dummy; +pub mod map; pub mod muxed; pub mod upgrade; @@ -59,7 +61,7 @@ pub use self::upgrade::UpgradedNode; /// > on `Foo`. pub trait Transport { /// The raw connection to a peer. - type RawConn: AsyncRead + AsyncWrite; + type Output; /// The listener produces incoming connections. /// @@ -71,10 +73,10 @@ pub trait Transport { /// After a connection has been received, we may need to do some asynchronous pre-processing /// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we /// want to be able to continue polling on the listener. - type ListenerUpgrade: Future; + type ListenerUpgrade: Future; /// A future which indicates that we are currently dialing to a peer. - type Dial: IntoFuture; + type Dial: IntoFuture; /// Listen on the given multiaddr. Returns a stream of incoming connections, plus a modified /// version of the `Multiaddr`. This new `Multiaddr` is the one that that should be advertised @@ -108,6 +110,16 @@ pub trait Transport { /// doesn't recognize the protocols, or if `server` and `observed` are related. fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; + /// Applies a function on the output of the `Transport`. + #[inline] + fn map(self, map: F) -> map::Map + where + Self: Sized, + F: FnOnce(Self::Output, Endpoint, Multiaddr) -> O + Clone + 'static, // TODO: 'static :-/ + { + map::Map::new(self, map) + } + /// Builds a new struct that implements `Transport` that contains both `self` and `other`. /// /// The returned object will redirect its calls to `self`, except that if `listen_on` or `dial` @@ -129,11 +141,27 @@ pub trait Transport { fn with_upgrade(self, upgrade: U) -> UpgradedNode where Self: Sized, - U: ConnectionUpgrade, + Self::Output: AsyncRead + AsyncWrite, + U: ConnectionUpgrade, { UpgradedNode::new(self, upgrade) } + /// Wraps this transport inside an upgrade. Whenever a connection that uses this transport + /// is established, it is wrapped inside the upgrade. + /// + /// > **Note**: The concept of an *upgrade* for example includes middlewares such *secio* + /// > (communication encryption), *multiplex*, but also a protocol handler. + #[inline] + fn and_then(self, upgrade: C) -> and_then::AndThen + where + Self: Sized, + C: FnOnce(Self::Output, Endpoint, Multiaddr) -> F + Clone + 'static, + F: Future + 'static, + { + and_then::and_then(self, upgrade) + } + /// Builds a dummy implementation of `MuxedTransport` that uses this transport. /// /// The resulting object will not actually use muxing. This means that dialing the same node diff --git a/swarm/src/transport/muxed.rs b/swarm/src/transport/muxed.rs index 42798d92..83e11340 100644 --- a/swarm/src/transport/muxed.rs +++ b/swarm/src/transport/muxed.rs @@ -30,7 +30,7 @@ pub trait MuxedTransport: Transport { /// Future resolving to a future that will resolve to an incoming connection. type Incoming: Future; /// Future resolving to an incoming connection. - type IncomingUpgrade: Future; + type IncomingUpgrade: Future; /// Returns the next incoming substream opened by a node that we dialed ourselves. /// diff --git a/swarm/src/transport/upgrade.rs b/swarm/src/transport/upgrade.rs index 63aade23..6c8ba0c3 100644 --- a/swarm/src/transport/upgrade.rs +++ b/swarm/src/transport/upgrade.rs @@ -18,16 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use bytes::Bytes; use connection_reuse::ConnectionReuse; use futures::prelude::*; use multiaddr::Multiaddr; -use multistream_select; use muxing::StreamMuxer; -use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use std::io::Error as IoError; use tokio_io::{AsyncRead, AsyncWrite}; use transport::{MuxedTransport, Transport}; -use upgrade::{ConnectionUpgrade, Endpoint}; +use upgrade::{apply, ConnectionUpgrade, Endpoint}; /// Implements the `Transport` trait. Dials or listens, then upgrades any dialed or received /// connection. @@ -51,7 +49,8 @@ impl UpgradedNode { impl<'a, T, C> UpgradedNode where T: Transport + 'a, - C: ConnectionUpgrade + 'a, + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade + 'a, { /// Turns this upgraded node into a `ConnectionReuse`. If the `Output` implements the /// `StreamMuxer` trait, the returned object will implement `Transport` and `MuxedTransport`. @@ -79,6 +78,8 @@ where self, addr: Multiaddr, ) -> Result + 'a>, (Self, Multiaddr)> + where + C::NamesIter: Clone, // TODO: not elegant { let upgrade = self.upgrade; @@ -97,39 +98,7 @@ where let future = dialed_fut // Try to negotiate the protocol. .and_then(move |(connection, client_addr)| { - let iter = upgrade.protocol_names() - .map(|(name, id)| (name, ::eq, id)); - debug!(target: "libp2p-swarm", "Starting protocol negotiation (dialer)"); - let negotiated = multistream_select::dialer_select_proto(connection, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)); - negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade, client_addr)) - }) - .then(|negotiated| { - match negotiated { - Ok((_, _, _, ref client_addr)) => { - debug!(target: "libp2p-swarm", "Successfully negotiated protocol \ - upgrade with {}", client_addr) - }, - Err(ref err) => { - debug!(target: "libp2p-swarm", "Error while negotiated protocol \ - upgrade: {:?}", err) - }, - }; - negotiated - }) - .and_then(move |(upgrade_id, connection, upgrade, client_addr)| { - let f = upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer, &client_addr); - debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}", - client_addr); - f.map(|v| (v, client_addr)) - }) - .then(|val| { - match val { - Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \ - protocol"), - Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated protocol"), - } - val + apply(connection, upgrade, Endpoint::Dialer, client_addr) }); Ok(Box::new(future)) @@ -144,9 +113,9 @@ where self, ) -> Box< Future< - Item = Box + 'a>, - Error = IoError, - > + Item = Box + 'a>, + Error = IoError, + > + 'a, > where @@ -158,44 +127,9 @@ where let future = self.transports.next_incoming().map(|future| { // Try to negotiate the protocol. - let future = future - .and_then(move |(connection, addr)| { - let iter = upgrade - .protocol_names() - .map::<_, fn(_) -> _>(|(name, id)| (name, ::eq, id)); - debug!(target: "libp2p-swarm", "Starting protocol negotiation (incoming)"); - let negotiated = multistream_select::listener_select_proto(connection, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)); - negotiated.map(move |(upgrade_id, conn)| (upgrade_id, conn, upgrade, addr)) - }) - .then(|negotiated| { - match negotiated { - Ok((_, _, _, ref client_addr)) => { - debug!(target: "libp2p-swarm", "Successfully negotiated protocol \ - upgrade with {}", client_addr) - } - Err(ref err) => { - debug!(target: "libp2p-swarm", "Error while negotiated protocol \ - upgrade: {:?}", err) - } - }; - negotiated - }) - .and_then(move |(upgrade_id, connection, upgrade, addr)| { - let upg = upgrade.upgrade(connection, upgrade_id, Endpoint::Listener, &addr); - debug!(target: "libp2p-swarm", "Trying to apply negotiated protocol with {}", - addr); - upg.map(|u| (u, addr)) - }) - .then(|val| { - match val { - Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \ - protocol"), - Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated \ - protocol"), - } - val - }); + let future = future.and_then(move |(connection, client_addr)| { + apply(connection, upgrade, Endpoint::Listener, client_addr) + }); Box::new(future) as Box> }); @@ -216,9 +150,9 @@ where ( Box< Stream< - Item = Box + 'a>, - Error = IoError, - > + Item = Box + 'a>, + Error = IoError, + > + 'a, >, Multiaddr, @@ -250,38 +184,10 @@ where let stream = listening_stream.map(move |connection| { let upgrade = upgrade.clone(); let connection = connection - // Try to negotiate the protocol - .and_then(move |(connection, remote_addr)| { - let iter = upgrade.protocol_names() - .map::<_, fn(_) -> _>(|(n, t)| (n, ::eq, t)); - let remote_addr2 = remote_addr.clone(); - debug!(target: "libp2p-swarm", "Starting protocol negotiation (listener)"); - multistream_select::listener_select_proto(connection, iter) - .map_err(|err| IoError::new(IoErrorKind::Other, err)) - .then(move |negotiated| { - match negotiated { - Ok(_) => { - debug!(target: "libp2p-swarm", "Successfully negotiated \ - protocol upgrade with {}", remote_addr2) - }, - Err(ref err) => { - debug!(target: "libp2p-swarm", "Error while negotiated \ - protocol upgrade: {:?}", err) - }, - }; - negotiated - }) - .and_then(move |(upgrade_id, connection)| { - let fut = upgrade.upgrade( - connection, - upgrade_id, - Endpoint::Listener, - &remote_addr, - ); - fut.map(move |c| (c, remote_addr)) - }) - .into_future() - }); + // Try to negotiate the protocol. + .and_then(move |(connection, client_addr)| { + apply(connection, upgrade, Endpoint::Listener, client_addr) + }); Box::new(connection) as Box<_> }); @@ -293,12 +199,12 @@ where impl Transport for UpgradedNode where T: Transport + 'static, - C: ConnectionUpgrade + 'static, - C::Output: AsyncRead + AsyncWrite, + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade + 'static, C::NamesIter: Clone, // TODO: not elegant C: Clone, { - type RawConn = C::Output; + type Output = C::Output; type Listener = Box>; type ListenerUpgrade = Box>; type Dial = Box>; @@ -322,8 +228,8 @@ where impl MuxedTransport for UpgradedNode where T: MuxedTransport + 'static, - C: ConnectionUpgrade + 'static, - C::Output: AsyncRead + AsyncWrite, + T::Output: AsyncRead + AsyncWrite, + C: ConnectionUpgrade + 'static, C::NamesIter: Clone, // TODO: not elegant C: Clone, { diff --git a/swarm/src/upgrade/apply.rs b/swarm/src/upgrade/apply.rs new file mode 100644 index 00000000..5a6b5cf2 --- /dev/null +++ b/swarm/src/upgrade/apply.rs @@ -0,0 +1,81 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use bytes::Bytes; +use futures::prelude::*; +use multiaddr::Multiaddr; +use multistream_select; +use std::io::{Error as IoError, ErrorKind as IoErrorKind}; +use tokio_io::{AsyncRead, AsyncWrite}; +use upgrade::{ConnectionUpgrade, Endpoint}; + +/// Applies a connection upgrade on a socket. +/// +/// Returns a `Future` that returns the outcome of the connection upgrade. +#[inline] +pub fn apply<'a, C, U>( + connection: C, + upgrade: U, + endpoint: Endpoint, + remote_addr: Multiaddr, +) -> Box + 'a> +where + U: ConnectionUpgrade + 'a, + U::NamesIter: Clone, // TODO: not elegant + C: AsyncRead + AsyncWrite + 'a, +{ + let iter = upgrade + .protocol_names() + .map::<_, fn(_) -> _>(|(n, t)| (n, ::eq, t)); + let remote_addr2 = remote_addr.clone(); + debug!(target: "libp2p-swarm", "Starting protocol negotiation"); + + let negotiation = match endpoint { + Endpoint::Listener => multistream_select::listener_select_proto(connection, iter), + Endpoint::Dialer => multistream_select::dialer_select_proto(connection, iter), + }; + + let future = negotiation + .map_err(|err| IoError::new(IoErrorKind::Other, err)) + .then(move |negotiated| { + match negotiated { + Ok(_) => debug!(target: "libp2p-swarm", "Successfully negotiated \ + protocol upgrade with {}", remote_addr2), + Err(ref err) => debug!(target: "libp2p-swarm", "Error while negotiated \ + protocol upgrade: {:?}", err), + }; + negotiated + }) + .and_then(move |(upgrade_id, connection)| { + let fut = upgrade.upgrade(connection, upgrade_id, endpoint, &remote_addr); + fut.map(move |c| (c, remote_addr)) + }) + .into_future() + .then(|val| { + match val { + Ok(_) => debug!(target: "libp2p-swarm", "Successfully applied negotiated \ + protocol"), + Err(_) => debug!(target: "libp2p-swarm", "Failed to apply negotiated protocol"), + } + val + }); + + Box::new(future) +} diff --git a/swarm/src/upgrade/choice.rs b/swarm/src/upgrade/choice.rs index 1fd5aaaf..a2a70ce5 100644 --- a/swarm/src/upgrade/choice.rs +++ b/swarm/src/upgrade/choice.rs @@ -26,16 +26,19 @@ use std::io::Error as IoError; use tokio_io::{AsyncRead, AsyncWrite}; use upgrade::{ConnectionUpgrade, Endpoint}; -/// See `transport::Transport::or_upgrade()`. +/// Builds a new `ConnectionUpgrade` that chooses between `A` and `B`. +/// +/// If both `A` and `B` are supported by the remote, then `A` will be chosen. +// TODO: write a test for this ^ +#[inline] +pub fn or(me: A, other: B) -> OrUpgrade { + OrUpgrade(me, other) +} + +/// See `upgrade::or`. #[derive(Debug, Copy, Clone)] pub struct OrUpgrade(A, B); -impl OrUpgrade { - pub fn new(a: A, b: B) -> OrUpgrade { - OrUpgrade(a, b) - } -} - impl ConnectionUpgrade for OrUpgrade where C: AsyncRead + AsyncWrite, diff --git a/swarm/src/upgrade/denied.rs b/swarm/src/upgrade/denied.rs index b5938965..577d2ba9 100644 --- a/swarm/src/upgrade/denied.rs +++ b/swarm/src/upgrade/denied.rs @@ -19,11 +19,11 @@ // DEALINGS IN THE SOFTWARE. use bytes::Bytes; -use upgrade::{ConnectionUpgrade, Endpoint}; use futures::prelude::*; use multiaddr::Multiaddr; use std::{io, iter}; use tokio_io::{AsyncRead, AsyncWrite}; +use upgrade::{ConnectionUpgrade, Endpoint}; /// Implementation of `ConnectionUpgrade` that always fails to negotiate. #[derive(Debug, Copy, Clone)] diff --git a/swarm/src/upgrade/map.rs b/swarm/src/upgrade/map.rs new file mode 100644 index 00000000..a0cda2e2 --- /dev/null +++ b/swarm/src/upgrade/map.rs @@ -0,0 +1,66 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{future, prelude::*}; +use multiaddr::Multiaddr; +use tokio_io::{AsyncRead, AsyncWrite}; +use upgrade::{ConnectionUpgrade, Endpoint}; + +/// Applies a closure on the output of a connection upgrade. +#[inline] +pub fn map(upgrade: U, map: F) -> Map { + Map { upgrade, map } +} + +/// Application of a closure on the output of a connection upgrade. +#[derive(Debug, Copy, Clone)] +pub struct Map { + upgrade: U, + map: F, +} + +impl ConnectionUpgrade for Map +where + U: ConnectionUpgrade, + C: AsyncRead + AsyncWrite, + F: FnOnce(U::Output) -> O, +{ + type NamesIter = U::NamesIter; + type UpgradeIdentifier = U::UpgradeIdentifier; + + fn protocol_names(&self) -> Self::NamesIter { + self.upgrade.protocol_names() + } + + type Output = O; + type Future = future::Map; + + fn upgrade( + self, + socket: C, + id: Self::UpgradeIdentifier, + ty: Endpoint, + remote_addr: &Multiaddr, + ) -> Self::Future { + self.upgrade + .upgrade(socket, id, ty, remote_addr) + .map(self.map) + } +} diff --git a/swarm/src/upgrade/mod.rs b/swarm/src/upgrade/mod.rs index 2db5df18..b2b973a8 100644 --- a/swarm/src/upgrade/mod.rs +++ b/swarm/src/upgrade/mod.rs @@ -18,14 +18,18 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -pub mod denied; -pub mod traits; +pub mod apply; pub mod choice; +pub mod denied; +pub mod map; pub mod plaintext; pub mod simple; +pub mod traits; -pub use self::choice::OrUpgrade; +pub use self::apply::apply; +pub use self::choice::{or, OrUpgrade}; pub use self::denied::DeniedConnectionUpgrade; +pub use self::map::map; pub use self::plaintext::PlainTextConfig; pub use self::simple::SimpleProtocol; -pub use self::traits::{ConnectionUpgrade, Endpoint, UpgradeExt}; +pub use self::traits::{ConnectionUpgrade, Endpoint}; diff --git a/swarm/src/upgrade/traits.rs b/swarm/src/upgrade/traits.rs index 465d1964..273236ce 100644 --- a/swarm/src/upgrade/traits.rs +++ b/swarm/src/upgrade/traits.rs @@ -23,7 +23,6 @@ use futures::future::Future; use multiaddr::Multiaddr; use std::io::Error as IoError; use tokio_io::{AsyncRead, AsyncWrite}; -use upgrade::choice::OrUpgrade; /// Type of connection for the upgrade. #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -74,19 +73,3 @@ pub trait ConnectionUpgrade { remote_addr: &Multiaddr, ) -> Self::Future; } - -/// Extension trait for `ConnectionUpgrade`. Automatically implemented on everything. -pub trait UpgradeExt { - /// Builds a struct that will choose an upgrade between `self` and `other`, depending on what - /// the remote supports. - fn or_upgrade(self, other: T) -> OrUpgrade - where - Self: Sized; -} - -impl UpgradeExt for T { - #[inline] - fn or_upgrade(self, other: U) -> OrUpgrade { - OrUpgrade::new(self, other) - } -} diff --git a/swarm/tests/multiplex.rs b/swarm/tests/multiplex.rs index 4921e2a4..0ffed4ec 100644 --- a/swarm/tests/multiplex.rs +++ b/swarm/tests/multiplex.rs @@ -31,10 +31,10 @@ use futures::future::Future; use futures::{Sink, Stream}; use libp2p_swarm::{Multiaddr, MuxedTransport, StreamMuxer, Transport}; use libp2p_tcp_transport::TcpConfig; -use tokio_core::reactor::Core; -use tokio_io::codec::length_delimited::Framed; use std::sync::{atomic, mpsc}; use std::thread; +use tokio_core::reactor::Core; +use tokio_io::codec::length_delimited::Framed; // Ensures that a transport is only ever used once for dialing. #[derive(Debug)] @@ -53,7 +53,7 @@ impl Clone for OnlyOnce { } } impl Transport for OnlyOnce { - type RawConn = T::RawConn; + type Output = T::Output; type Listener = T::Listener; type ListenerUpgrade = T::ListenerUpgrade; type Dial = T::Dial; diff --git a/tcp-transport/src/lib.rs b/tcp-transport/src/lib.rs index e6343965..37eed216 100644 --- a/tcp-transport/src/lib.rs +++ b/tcp-transport/src/lib.rs @@ -57,15 +57,15 @@ extern crate multiaddr; extern crate tokio_core; extern crate tokio_io; -use std::io::Error as IoError; -use std::iter; -use std::net::SocketAddr; -use tokio_core::reactor::Handle; -use tokio_core::net::{TcpListener, TcpStream}; use futures::future::{self, Future, FutureResult, IntoFuture}; use futures::stream::Stream; use multiaddr::{AddrComponent, Multiaddr, ToMultiaddr}; +use std::io::Error as IoError; +use std::iter; +use std::net::SocketAddr; use swarm::Transport; +use tokio_core::net::{TcpListener, TcpStream}; +use tokio_core::reactor::Handle; /// Represents the configuration for a TCP/IP transport capability for libp2p. /// @@ -87,9 +87,9 @@ impl TcpConfig { } impl Transport for TcpConfig { - type RawConn = TcpStream; + type Output = TcpStream; type Listener = Box>; - type ListenerUpgrade = FutureResult<(Self::RawConn, Multiaddr), IoError>; + type ListenerUpgrade = FutureResult<(Self::Output, Multiaddr), IoError>; type Dial = Box>; /// Listen on the given multi-addr. @@ -195,14 +195,14 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { #[cfg(test)] mod tests { use super::{multiaddr_to_socketaddr, TcpConfig}; - use std; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use tokio_core::reactor::Core; - use tokio_io; use futures::Future; use futures::stream::Stream; use multiaddr::Multiaddr; + use std; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use swarm::Transport; + use tokio_core::reactor::Core; + use tokio_io; #[test] fn multiaddr_to_tcp_conversion() { diff --git a/varint-rs/src/lib.rs b/varint-rs/src/lib.rs index 6898deb3..93b1f08b 100644 --- a/varint-rs/src/lib.rs +++ b/varint-rs/src/lib.rs @@ -35,12 +35,12 @@ use bytes::{BufMut, Bytes, BytesMut, IntoBuf}; use futures::{Async, Poll}; use num_bigint::BigUint; use num_traits::ToPrimitive; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::{Decoder, Encoder}; use std::io; use std::io::prelude::*; use std::marker::PhantomData; use std::mem; +use tokio_io::codec::{Decoder, Encoder}; +use tokio_io::{AsyncRead, AsyncWrite}; mod errors { error_chain! { @@ -106,7 +106,7 @@ macro_rules! impl_bits { (std::mem::size_of::<$t>() * 8) - self.leading_zeros() as usize } } - } + }; } impl_bits!(usize); @@ -139,7 +139,9 @@ macro_rules! impl_decoderstate { |a: $t, b| -> Option<$t> { a.checked_shl(b as u32) } ); }; - ($t:ty, $make_fn:expr) => { impl_decoderstate!($t, $make_fn, $make_fn); }; + ($t:ty, $make_fn:expr) => { + impl_decoderstate!($t, $make_fn, $make_fn); + }; ($t:ty, $make_fn:expr, $shift_fn:expr) => { impl DecoderHelper for $t { #[inline] @@ -148,13 +150,14 @@ macro_rules! impl_decoderstate { byte: u8, ) -> ::errors::Result> { let res = decoder.accumulator.take().and_then(|accumulator| { - let out = accumulator | match $shift_fn( - $make_fn(byte & 0x7F), - decoder.shift * USABLE_BITS_PER_BYTE, - ) { - Some(a) => a, - None => return Some(Err(ErrorKind::ParseError.into())), - }; + let out = accumulator + | match $shift_fn( + $make_fn(byte & 0x7F), + decoder.shift * USABLE_BITS_PER_BYTE, + ) { + Some(a) => a, + None => return Some(Err(ErrorKind::ParseError.into())), + }; decoder.shift += 1; if byte & 0x80 == 0 { @@ -174,7 +177,7 @@ macro_rules! impl_decoderstate { fn read( decoder: &mut DecoderState, - mut input: R + mut input: R, ) -> Poll, Error> { if decoder.accumulator == AccumulatorState::Finished { return Err(Error::with_chain( @@ -202,17 +205,19 @@ macro_rules! impl_decoderstate { Err(inner) => if decoder.accumulator == AccumulatorState::NotStarted { break Ok(Async::Ready(None)); } else { - break Err(Error::with_chain(inner, ErrorKind::ParseError)) + break Err(Error::with_chain(inner, ErrorKind::ParseError)); }, } } } } - } + }; } macro_rules! impl_encoderstate { - ($t:ty) => { impl_encoderstate!($t, <$t>::from); }; + ($t:ty) => { + impl_encoderstate!($t, <$t>::from); + }; ($t:ty, $make_fn:expr) => { impl EncoderHelper for $t { /// Write as much as possible of the inner integer to the output `AsyncWrite` @@ -227,11 +232,11 @@ macro_rules! impl_encoderstate { return None; } - let masked = (&encoder.source >> (encoder.cur_chunk * USABLE_BITS_PER_BYTE)) & - $make_fn((1 << USABLE_BITS_PER_BYTE) - 1usize); - let masked = masked.to_u8().expect( - "Masked with 0b0111_1111, is less than u8::MAX, QED", - ); + let masked = (&encoder.source >> (encoder.cur_chunk * USABLE_BITS_PER_BYTE)) + & $make_fn((1 << USABLE_BITS_PER_BYTE) - 1usize); + let masked = masked + .to_u8() + .expect("Masked with 0b0111_1111, is less than u8::MAX, QED"); if encoder.cur_chunk == last_chunk { Some(masked) @@ -258,9 +263,9 @@ macro_rules! impl_encoderstate { Ok(Async::Ready(WriteState::Pending(written))) }; } - Err(inner) => break Err( - Error::with_chain(inner, ErrorKind::WriteError) - ), + Err(inner) => { + break Err(Error::with_chain(inner, ErrorKind::WriteError)) + } } } else { break Ok(Async::Ready(WriteState::Done(written))); @@ -268,7 +273,7 @@ macro_rules! impl_encoderstate { } } } - } + }; } impl_encoderstate!(usize); @@ -315,8 +320,8 @@ enum AccumulatorState { impl AccumulatorState { fn take(&mut self) -> Option { - use std::mem; use AccumulatorState::*; + use std::mem; match mem::replace(self, AccumulatorState::Finished) { InProgress(inner) => Some(inner), @@ -511,15 +516,15 @@ pub fn encode(input: T) -> Bytes { #[cfg(test)] mod tests { use super::{decode, EncoderState, VarintDecoder}; - use tokio_io::codec::FramedRead; - use num_bigint::BigUint; use futures::{Future, Stream}; + use num_bigint::BigUint; + use tokio_io::codec::FramedRead; #[test] fn large_number_fails() { - use std::io::Cursor; - use futures::Async; use super::WriteState; + use futures::Async; + use std::io::Cursor; let mut out = vec![0u8; 10]; @@ -586,9 +591,9 @@ mod tests { #[test] fn can_encode_basic_biguint_async() { - use std::io::Cursor; - use futures::Async; use super::WriteState; + use futures::Async; + use std::io::Cursor; let mut out = vec![0u8; 2]; @@ -608,9 +613,9 @@ mod tests { #[test] fn can_encode_basic_usize_async() { - use std::io::Cursor; - use futures::Async; use super::WriteState; + use futures::Async; + use std::io::Cursor; let mut out = vec![0u8; 2]; diff --git a/websocket/src/browser.rs b/websocket/src/browser.rs index f889c3fe..e13e5f56 100644 --- a/websocket/src/browser.rs +++ b/websocket/src/browser.rs @@ -18,17 +18,17 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::{Async, Future, Poll, Stream}; use futures::stream::Then as StreamThen; use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Poll, Stream}; use multiaddr::{AddrComponent, Multiaddr}; use rw_stream_sink::RwStreamSink; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use std::io::{Read, Write}; use std::iter; use std::sync::{Arc, Mutex}; -use stdweb::{self, Reference}; use stdweb::web::TypedArray; +use stdweb::{self, Reference}; use swarm::Transport; use tokio_io::{AsyncRead, AsyncWrite}; @@ -52,10 +52,10 @@ impl BrowserWsConfig { } impl Transport for BrowserWsConfig { - type RawConn = BrowserWsConn; + type Output = BrowserWsConn; type Listener = Box>; // TODO: use `!` - type ListenerUpgrade = Box>; // TODO: use `!` - type Dial = Box>; + type ListenerUpgrade = Box>; // TODO: use `!` + type Dial = Box>; #[inline] fn listen_on(self, a: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> { diff --git a/websocket/src/desktop.rs b/websocket/src/desktop.rs index 4a6b92a2..16eb2b33 100644 --- a/websocket/src/desktop.rs +++ b/websocket/src/desktop.rs @@ -23,6 +23,7 @@ use multiaddr::{AddrComponent, Multiaddr}; use rw_stream_sink::RwStreamSink; use std::io::{Error as IoError, ErrorKind as IoErrorKind}; use swarm::Transport; +use tokio_io::{AsyncRead, AsyncWrite}; use websocket::client::builder::ClientBuilder; use websocket::message::OwnedMessage; use websocket::server::upgrade::async::IntoWs; @@ -59,13 +60,13 @@ where // TODO: this 'static is pretty arbitrary and is necessary because of the websocket library T: Transport + 'static, // TODO: this Send is pretty arbitrary and is necessary because of the websocket library - T::RawConn: Send, + T::Output: AsyncRead + AsyncWrite + Send, { - type RawConn = Box; + type Output = Box; type Listener = stream::Map::ListenerUpgrade) -> Self::ListenerUpgrade>; - type ListenerUpgrade = Box>; - type Dial = Box>; + type ListenerUpgrade = Box>; + type Dial = Box>; fn listen_on( self, diff --git a/websocket/src/lib.rs b/websocket/src/lib.rs index 2819968e..31dc2521 100644 --- a/websocket/src/lib.rs +++ b/websocket/src/lib.rs @@ -88,10 +88,10 @@ extern crate stdweb; #[cfg(not(target_os = "emscripten"))] extern crate websocket; -#[cfg(not(target_os = "emscripten"))] -mod desktop; #[cfg(target_os = "emscripten")] mod browser; +#[cfg(not(target_os = "emscripten"))] +mod desktop; #[cfg(target_os = "emscripten")] pub use self::browser::{BrowserWsConfig, BrowserWsConn};