mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-29 09:41:34 +00:00
Run rustfmt on files that aren't really touched (#289)
This commit is contained in:
@ -203,8 +203,11 @@ fn resolve_dns(
|
||||
|
||||
resolver.resolve(name).and_then(move |addrs| {
|
||||
if log_enabled!(Level::Trace) {
|
||||
trace!("DNS component resolution: {} => {:?}",
|
||||
debug_name.expect("trace log level was enabled"), addrs);
|
||||
trace!(
|
||||
"DNS component resolution: {} => {:?}",
|
||||
debug_name.expect("trace log level was enabled"),
|
||||
addrs
|
||||
);
|
||||
}
|
||||
|
||||
addrs
|
||||
@ -226,11 +229,11 @@ fn resolve_dns(
|
||||
mod tests {
|
||||
extern crate libp2p_tcp_transport;
|
||||
use self::libp2p_tcp_transport::TcpConfig;
|
||||
use DnsConfig;
|
||||
use futures::future;
|
||||
use multiaddr::{AddrComponent, Multiaddr};
|
||||
use std::io::Error as IoError;
|
||||
use swarm::Transport;
|
||||
use DnsConfig;
|
||||
|
||||
#[test]
|
||||
fn basic_resolve() {
|
||||
|
@ -21,10 +21,10 @@
|
||||
//! Contains the `dialer_select_proto` code, which allows selecting a protocol thanks to
|
||||
//! `multistream-select` for the dialer.
|
||||
|
||||
use ProtocolChoiceError;
|
||||
use bytes::Bytes;
|
||||
use futures::future::{loop_fn, result, Loop};
|
||||
use futures::{Future, Sink, Stream};
|
||||
use ProtocolChoiceError;
|
||||
|
||||
use protocol::Dialer;
|
||||
use protocol::DialerToListenerMessage;
|
||||
|
@ -21,10 +21,10 @@
|
||||
//! Contains the `listener_select_proto` code, which allows selecting a protocol thanks to
|
||||
//! `multistream-select` for the listener.
|
||||
|
||||
use ProtocolChoiceError;
|
||||
use bytes::Bytes;
|
||||
use futures::future::{err, loop_fn, Loop};
|
||||
use futures::{Future, Sink, Stream};
|
||||
use ProtocolChoiceError;
|
||||
|
||||
use protocol::DialerToListenerMessage;
|
||||
use protocol::Listener;
|
||||
|
@ -25,8 +25,8 @@ use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
|
||||
use length_delimited::LengthDelimitedFramedRead;
|
||||
use protocol::DialerToListenerMessage;
|
||||
use protocol::ListenerToDialerMessage;
|
||||
use protocol::MULTISTREAM_PROTOCOL_WITH_LF;
|
||||
use protocol::MultistreamSelectError;
|
||||
use protocol::MULTISTREAM_PROTOCOL_WITH_LF;
|
||||
use std::io::{BufRead, Cursor, Read};
|
||||
use tokio_io::codec::length_delimited::Builder as LengthDelimitedBuilder;
|
||||
use tokio_io::codec::length_delimited::FramedWrite as LengthDelimitedFramedWrite;
|
||||
|
@ -25,8 +25,8 @@ use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
|
||||
use length_delimited::LengthDelimitedFramedRead;
|
||||
use protocol::DialerToListenerMessage;
|
||||
use protocol::ListenerToDialerMessage;
|
||||
use protocol::MULTISTREAM_PROTOCOL_WITH_LF;
|
||||
use protocol::MultistreamSelectError;
|
||||
use protocol::MULTISTREAM_PROTOCOL_WITH_LF;
|
||||
use tokio_io::codec::length_delimited::Builder as LengthDelimitedBuilder;
|
||||
use tokio_io::codec::length_delimited::FramedWrite as LengthDelimitedFramedWrite;
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
|
@ -27,12 +27,12 @@ extern crate tokio_core;
|
||||
use self::tokio_core::net::TcpListener;
|
||||
use self::tokio_core::net::TcpStream;
|
||||
use self::tokio_core::reactor::Core;
|
||||
use ProtocolChoiceError;
|
||||
use bytes::Bytes;
|
||||
use dialer_select::{dialer_select_proto_parallel, dialer_select_proto_serial};
|
||||
use futures::Future;
|
||||
use futures::{Sink, Stream};
|
||||
use protocol::{Dialer, DialerToListenerMessage, Listener, ListenerToDialerMessage};
|
||||
use ProtocolChoiceError;
|
||||
use {dialer_select_proto, listener_select_proto};
|
||||
|
||||
#[test]
|
||||
|
@ -21,7 +21,6 @@
|
||||
//! Implementation of the `Peerstore` trait that uses a single JSON file as backend.
|
||||
|
||||
use super::TTL;
|
||||
use PeerId;
|
||||
use bs58;
|
||||
use datastore::{Datastore, JsonFileDatastore, JsonFileDatastoreEntry, Query};
|
||||
use futures::{Future, Stream};
|
||||
@ -32,6 +31,7 @@ use std::io::Error as IoError;
|
||||
use std::iter;
|
||||
use std::path::PathBuf;
|
||||
use std::vec::IntoIter as VecIntoIter;
|
||||
use PeerId;
|
||||
|
||||
/// Peerstore backend that uses a Json file.
|
||||
pub struct JsonPeerstore {
|
||||
|
@ -21,7 +21,6 @@
|
||||
//! Implementation of the `Peerstore` trait that simple stores peers in memory.
|
||||
|
||||
use super::TTL;
|
||||
use PeerId;
|
||||
use multiaddr::Multiaddr;
|
||||
use owning_ref::OwningRefMut;
|
||||
use peer_info::{AddAddrBehaviour, PeerInfo};
|
||||
@ -30,6 +29,7 @@ use std::collections::HashMap;
|
||||
use std::iter;
|
||||
use std::sync::{Mutex, MutexGuard};
|
||||
use std::vec::IntoIter as VecIntoIter;
|
||||
use PeerId;
|
||||
|
||||
/// Implementation of the `Peerstore` trait that simply stores the peer information in memory.
|
||||
#[derive(Debug)]
|
||||
|
@ -26,13 +26,13 @@
|
||||
//! If the `PeerInfo` struct ever gets exposed to the public API of the crate, we may want to give
|
||||
//! more thoughts about this.
|
||||
|
||||
use TTL;
|
||||
use multiaddr::Multiaddr;
|
||||
use serde::de::Error as DeserializerError;
|
||||
use serde::ser::SerializeStruct;
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use std::cmp::Ordering;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use TTL;
|
||||
|
||||
/// Information about a peer.
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
||||
@ -186,12 +186,10 @@ mod tests {
|
||||
#[test]
|
||||
fn ser_and_deser() {
|
||||
let peer_info = PeerInfo {
|
||||
addrs: vec![
|
||||
(
|
||||
"/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap(),
|
||||
UNIX_EPOCH,
|
||||
),
|
||||
],
|
||||
addrs: vec![(
|
||||
"/ip4/0.0.0.0/tcp/0".parse::<Multiaddr>().unwrap(),
|
||||
UNIX_EPOCH,
|
||||
)],
|
||||
};
|
||||
let serialized = serde_json::to_string(&peer_info).unwrap();
|
||||
let deserialized: PeerInfo = serde_json::from_str(&serialized).unwrap();
|
||||
|
@ -19,8 +19,8 @@
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
use multiaddr::Multiaddr;
|
||||
use {PeerId, TTL};
|
||||
use std::time::Duration;
|
||||
use {PeerId, TTL};
|
||||
|
||||
/// Implemented on objects that store peers.
|
||||
///
|
||||
|
@ -95,8 +95,8 @@ use futures::sync::{mpsc, oneshot};
|
||||
use futures::{Future, Sink, Stream};
|
||||
use libp2p_core::{ConnectionUpgrade, Endpoint};
|
||||
use parking_lot::Mutex;
|
||||
use rand::Rand;
|
||||
use rand::os::OsRng;
|
||||
use rand::Rand;
|
||||
use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::io::Error as IoError;
|
||||
@ -161,8 +161,7 @@ where
|
||||
// TODO: can't figure out how to make it work without using an Arc/Mutex
|
||||
let expected_pongs = Arc::new(Mutex::new(HashMap::with_capacity(4)));
|
||||
|
||||
let sink_stream = Framed::new(socket, Codec)
|
||||
.map(|msg| Message::Received(msg.freeze()));
|
||||
let sink_stream = Framed::new(socket, Codec).map(|msg| Message::Received(msg.freeze()));
|
||||
let (sink, stream) = sink_stream.split();
|
||||
|
||||
let future = loop_fn((sink, stream.select(rx)), move |(sink, stream)| {
|
||||
@ -290,9 +289,9 @@ mod tests {
|
||||
use self::tokio_core::net::TcpStream;
|
||||
use self::tokio_core::reactor::Core;
|
||||
use super::Ping;
|
||||
use futures::future::{self, join_all};
|
||||
use futures::Future;
|
||||
use futures::Stream;
|
||||
use futures::future::{self, join_all};
|
||||
use libp2p_core::{ConnectionUpgrade, Endpoint, Multiaddr};
|
||||
use std::io::Error as IoError;
|
||||
|
||||
|
@ -24,11 +24,11 @@ use bytes::BytesMut;
|
||||
use crypto::symmetriccipher::SynchronousStreamCipher;
|
||||
|
||||
use error::SecioError;
|
||||
use futures::sink::Sink;
|
||||
use futures::stream::Stream;
|
||||
use futures::Async;
|
||||
use futures::Poll;
|
||||
use futures::StartSend;
|
||||
use futures::sink::Sink;
|
||||
use futures::stream::Stream;
|
||||
use ring::hmac;
|
||||
|
||||
/// Wraps around a `Stream<Item = BytesMut>`. The buffers produced by the underlying stream
|
||||
@ -53,7 +53,7 @@ impl<S> DecoderMiddleware<S> {
|
||||
raw_stream: S,
|
||||
cipher: Box<SynchronousStreamCipher>,
|
||||
hmac_key: hmac::VerificationKey,
|
||||
hmac_num_bytes: usize, // TODO: remove this parameter
|
||||
hmac_num_bytes: usize, // TODO: remove this parameter
|
||||
) -> DecoderMiddleware<S> {
|
||||
DecoderMiddleware {
|
||||
cipher_state: cipher,
|
||||
|
@ -22,10 +22,10 @@
|
||||
|
||||
use bytes::BytesMut;
|
||||
use crypto::symmetriccipher::SynchronousStreamCipher;
|
||||
use futures::Poll;
|
||||
use futures::StartSend;
|
||||
use futures::sink::Sink;
|
||||
use futures::stream::Stream;
|
||||
use futures::Poll;
|
||||
use futures::StartSend;
|
||||
use ring::hmac;
|
||||
|
||||
/// Wraps around a `Sink`. Encodes the buffers passed to it and passes it to the underlying sink.
|
||||
|
@ -63,9 +63,9 @@ mod tests {
|
||||
use self::tokio_core::net::TcpListener;
|
||||
use self::tokio_core::net::TcpStream;
|
||||
use self::tokio_core::reactor::Core;
|
||||
use super::full_codec;
|
||||
use super::DecoderMiddleware;
|
||||
use super::EncoderMiddleware;
|
||||
use super::full_codec;
|
||||
use bytes::BytesMut;
|
||||
use crypto::aessafe::AesSafe256Encryptor;
|
||||
use crypto::blockmodes::CtrMode;
|
||||
|
@ -23,18 +23,18 @@ use bytes::BytesMut;
|
||||
use codec::{full_codec, FullCodec};
|
||||
use crypto::aes::{ctr, KeySize};
|
||||
use error::SecioError;
|
||||
use futures::Future;
|
||||
use futures::future;
|
||||
use futures::sink::Sink;
|
||||
use futures::stream::Stream;
|
||||
use futures::Future;
|
||||
use libp2p_core::PublicKey;
|
||||
use protobuf::Message as ProtobufMessage;
|
||||
use protobuf::parse_from_bytes as protobuf_parse_from_bytes;
|
||||
use protobuf::Message as ProtobufMessage;
|
||||
use ring::agreement::EphemeralPrivateKey;
|
||||
use ring::hmac::{SigningContext, SigningKey, VerificationKey};
|
||||
use ring::rand::SecureRandom;
|
||||
use ring::signature::verify as signature_verify;
|
||||
use ring::signature::{RSASigningState, RSA_PKCS1_2048_8192_SHA256, RSA_PKCS1_SHA256, ED25519};
|
||||
use ring::signature::{ED25519, RSASigningState, RSA_PKCS1_2048_8192_SHA256, RSA_PKCS1_SHA256};
|
||||
use ring::{agreement, digest, rand};
|
||||
#[cfg(feature = "secp256k1")]
|
||||
use secp256k1;
|
||||
|
@ -101,13 +101,13 @@ extern crate untrusted;
|
||||
pub use self::error::SecioError;
|
||||
|
||||
#[cfg(feature = "secp256k1")]
|
||||
use asn1_der::{DerObject, traits::FromDerEncoded, traits::FromDerObject};
|
||||
use asn1_der::{traits::FromDerEncoded, traits::FromDerObject, DerObject};
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use futures::stream::MapErr as StreamMapErr;
|
||||
use futures::{Future, Poll, Sink, StartSend, Stream};
|
||||
use libp2p_core::{PeerId, PublicKey};
|
||||
use ring::signature::{Ed25519KeyPair, RSAKeyPair};
|
||||
use ring::rand::SystemRandom;
|
||||
use ring::signature::{Ed25519KeyPair, RSAKeyPair};
|
||||
use rw_stream_sink::RwStreamSink;
|
||||
use std::error::Error;
|
||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
@ -179,7 +179,8 @@ impl SecioKeyPair {
|
||||
|
||||
/// Builds a `SecioKeyPair` from a PKCS8 ED25519 private key.
|
||||
pub fn ed25519_from_pkcs8<K>(key: K) -> Result<SecioKeyPair, Box<Error + Send + Sync>>
|
||||
where K: AsRef<[u8]>
|
||||
where
|
||||
K: AsRef<[u8]>,
|
||||
{
|
||||
let key_pair =
|
||||
Ed25519KeyPair::from_pkcs8(Input::from(key.as_ref())).map_err(|err| Box::new(err))?;
|
||||
@ -202,48 +203,48 @@ impl SecioKeyPair {
|
||||
/// Builds a `SecioKeyPair` from a raw secp256k1 32 bytes private key.
|
||||
#[cfg(feature = "secp256k1")]
|
||||
pub fn secp256k1_raw_key<K>(key: K) -> Result<SecioKeyPair, Box<Error + Send + Sync>>
|
||||
where K: AsRef<[u8]>
|
||||
where
|
||||
K: AsRef<[u8]>,
|
||||
{
|
||||
let secp = secp256k1::Secp256k1::with_caps(secp256k1::ContextFlag::None);
|
||||
let private = secp256k1::key::SecretKey::from_slice(&secp, key.as_ref())?;
|
||||
|
||||
Ok(SecioKeyPair {
|
||||
inner: SecioKeyPairInner::Secp256k1 {
|
||||
private,
|
||||
},
|
||||
inner: SecioKeyPairInner::Secp256k1 { private },
|
||||
})
|
||||
}
|
||||
|
||||
/// Builds a `SecioKeyPair` from a secp256k1 private key in DER format.
|
||||
#[cfg(feature = "secp256k1")]
|
||||
pub fn secp256k1_from_der<K>(key: K) -> Result<SecioKeyPair, Box<Error + Send + Sync>>
|
||||
where K: AsRef<[u8]>
|
||||
where
|
||||
K: AsRef<[u8]>,
|
||||
{
|
||||
// See ECPrivateKey in https://tools.ietf.org/html/rfc5915
|
||||
let obj: Vec<DerObject> = FromDerEncoded::with_der_encoded(key.as_ref())
|
||||
.map_err(|err| err.to_string())?;
|
||||
let priv_key_obj = obj.into_iter().nth(1).ok_or("Not enough elements in DER".to_string())?;
|
||||
let private_key: Vec<u8> = FromDerObject::from_der_object(priv_key_obj)
|
||||
.map_err(|err| err.to_string())?;
|
||||
let obj: Vec<DerObject> =
|
||||
FromDerEncoded::with_der_encoded(key.as_ref()).map_err(|err| err.to_string())?;
|
||||
let priv_key_obj = obj.into_iter()
|
||||
.nth(1)
|
||||
.ok_or("Not enough elements in DER".to_string())?;
|
||||
let private_key: Vec<u8> =
|
||||
FromDerObject::from_der_object(priv_key_obj).map_err(|err| err.to_string())?;
|
||||
SecioKeyPair::secp256k1_raw_key(&private_key)
|
||||
}
|
||||
|
||||
/// Returns the public key corresponding to this key pair.
|
||||
pub fn to_public_key(&self) -> PublicKey {
|
||||
match self.inner {
|
||||
SecioKeyPairInner::Rsa { ref public, .. } => {
|
||||
PublicKey::Rsa(public.clone())
|
||||
},
|
||||
SecioKeyPairInner::Rsa { ref public, .. } => PublicKey::Rsa(public.clone()),
|
||||
SecioKeyPairInner::Ed25519 { ref key_pair } => {
|
||||
PublicKey::Ed25519(key_pair.public_key_bytes().to_vec())
|
||||
},
|
||||
}
|
||||
#[cfg(feature = "secp256k1")]
|
||||
SecioKeyPairInner::Secp256k1 { ref private } => {
|
||||
let secp = secp256k1::Secp256k1::with_caps(secp256k1::ContextFlag::SignOnly);
|
||||
let pubkey = secp256k1::key::PublicKey::from_secret_key(&secp, private)
|
||||
.expect("wrong secp256k1 private key ; type safety violated");
|
||||
PublicKey::Secp256k1(pubkey.serialize_vec(&secp, true).to_vec())
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -269,14 +270,13 @@ enum SecioKeyPairInner {
|
||||
key_pair: Arc<Ed25519KeyPair>,
|
||||
},
|
||||
#[cfg(feature = "secp256k1")]
|
||||
Secp256k1 {
|
||||
private: secp256k1::key::SecretKey,
|
||||
},
|
||||
Secp256k1 { private: secp256k1::key::SecretKey },
|
||||
}
|
||||
|
||||
/// Output of the secio protocol.
|
||||
pub struct SecioOutput<S>
|
||||
where S: AsyncRead + AsyncWrite
|
||||
where
|
||||
S: AsyncRead + AsyncWrite,
|
||||
{
|
||||
/// The encrypted stream.
|
||||
pub stream: RwStreamSink<StreamMapErr<SecioMiddleware<S>, fn(SecioError) -> IoError>>,
|
||||
@ -288,8 +288,8 @@ where S: AsyncRead + AsyncWrite
|
||||
|
||||
impl<S, Maf> libp2p_core::ConnectionUpgrade<S, Maf> for SecioConfig
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + 'static, // TODO: 'static :(
|
||||
Maf: 'static, // TODO: 'static :(
|
||||
S: AsyncRead + AsyncWrite + 'static, // TODO: 'static :(
|
||||
Maf: 'static, // TODO: 'static :(
|
||||
{
|
||||
type Output = SecioOutput<S>;
|
||||
type MultiaddrFuture = Maf;
|
||||
|
@ -135,7 +135,8 @@ impl Transport for TcpConfig {
|
||||
// If so, we instantly refuse dialing instead of going through the kernel.
|
||||
if socket_addr.port() != 0 && !socket_addr.ip().is_unspecified() {
|
||||
debug!("Dialing {}", addr);
|
||||
let fut = TcpStream::connect(&socket_addr, &self.event_loop).map(|t| (t, future::ok(addr)));
|
||||
let fut = TcpStream::connect(&socket_addr, &self.event_loop)
|
||||
.map(|t| (t, future::ok(addr)));
|
||||
Ok(Box::new(fut) as Box<_>)
|
||||
} else {
|
||||
debug!("Instantly refusing dialing {}, as it is invalid", addr);
|
||||
@ -198,8 +199,8 @@ fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{multiaddr_to_socketaddr, TcpConfig};
|
||||
use futures::Future;
|
||||
use futures::stream::Stream;
|
||||
use futures::Future;
|
||||
use multiaddr::Multiaddr;
|
||||
use std;
|
||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||
@ -245,14 +246,7 @@ mod tests {
|
||||
.unwrap()),
|
||||
Ok(SocketAddr::new(
|
||||
IpAddr::V6(Ipv6Addr::new(
|
||||
65535,
|
||||
65535,
|
||||
65535,
|
||||
65535,
|
||||
65535,
|
||||
65535,
|
||||
65535,
|
||||
65535,
|
||||
65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
|
||||
)),
|
||||
8080,
|
||||
))
|
||||
|
@ -30,10 +30,10 @@ extern crate libp2p_core;
|
||||
extern crate log;
|
||||
extern crate tokio_timer;
|
||||
|
||||
use futures::{Async, Future, Poll, Stream};
|
||||
use libp2p_core::{Multiaddr, MuxedTransport, Transport};
|
||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
use std::time::{Duration, Instant};
|
||||
use futures::{Future, Poll, Async, Stream};
|
||||
use libp2p_core::{Transport, Multiaddr, MuxedTransport};
|
||||
use tokio_timer::{Deadline, DeadlineError};
|
||||
|
||||
/// Wraps around a `Transport` and adds a timeout to all the incoming and outgoing connections.
|
||||
@ -64,7 +64,7 @@ impl<InnerTrans> TransportTimeout<InnerTrans> {
|
||||
TransportTimeout {
|
||||
inner: trans,
|
||||
outgoing_timeout: timeout,
|
||||
incoming_timeout: Duration::from_secs(100 * 365 * 24 * 3600), // 100 years
|
||||
incoming_timeout: Duration::from_secs(100 * 365 * 24 * 3600), // 100 years
|
||||
}
|
||||
}
|
||||
|
||||
@ -73,14 +73,15 @@ impl<InnerTrans> TransportTimeout<InnerTrans> {
|
||||
pub fn with_ingoing_timeout(trans: InnerTrans, timeout: Duration) -> Self {
|
||||
TransportTimeout {
|
||||
inner: trans,
|
||||
outgoing_timeout: Duration::from_secs(100 * 365 * 24 * 3600), // 100 years
|
||||
outgoing_timeout: Duration::from_secs(100 * 365 * 24 * 3600), // 100 years
|
||||
incoming_timeout: timeout,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<InnerTrans> Transport for TransportTimeout<InnerTrans>
|
||||
where InnerTrans: Transport,
|
||||
where
|
||||
InnerTrans: Transport,
|
||||
{
|
||||
type Output = InnerTrans::Output;
|
||||
type MultiaddrFuture = InnerTrans::MultiaddrFuture;
|
||||
@ -97,7 +98,7 @@ where InnerTrans: Transport,
|
||||
};
|
||||
|
||||
Ok((listener, addr))
|
||||
},
|
||||
}
|
||||
Err((inner, addr)) => {
|
||||
let transport = TransportTimeout {
|
||||
inner,
|
||||
@ -112,11 +113,9 @@ where InnerTrans: Transport,
|
||||
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||
match self.inner.dial(addr) {
|
||||
Ok(dial) => {
|
||||
Ok(TokioTimerMapErr {
|
||||
inner: Deadline::new(dial, Instant::now() + self.outgoing_timeout)
|
||||
})
|
||||
},
|
||||
Ok(dial) => Ok(TokioTimerMapErr {
|
||||
inner: Deadline::new(dial, Instant::now() + self.outgoing_timeout),
|
||||
}),
|
||||
Err((inner, addr)) => {
|
||||
let transport = TransportTimeout {
|
||||
inner,
|
||||
@ -136,7 +135,8 @@ where InnerTrans: Transport,
|
||||
}
|
||||
|
||||
impl<InnerTrans> MuxedTransport for TransportTimeout<InnerTrans>
|
||||
where InnerTrans: MuxedTransport
|
||||
where
|
||||
InnerTrans: MuxedTransport,
|
||||
{
|
||||
type Incoming = TimeoutIncoming<InnerTrans::Incoming>;
|
||||
type IncomingUpgrade = TokioTimerMapErr<Deadline<InnerTrans::IncomingUpgrade>>;
|
||||
@ -158,7 +158,8 @@ pub struct TimeoutListener<InnerStream> {
|
||||
}
|
||||
|
||||
impl<InnerStream> Stream for TimeoutListener<InnerStream>
|
||||
where InnerStream: Stream,
|
||||
where
|
||||
InnerStream: Stream,
|
||||
{
|
||||
type Item = TokioTimerMapErr<Deadline<InnerStream::Item>>;
|
||||
type Error = InnerStream::Error;
|
||||
@ -167,7 +168,7 @@ where InnerStream: Stream,
|
||||
let inner_fut = try_ready!(self.inner.poll());
|
||||
if let Some(inner_fut) = inner_fut {
|
||||
let fut = TokioTimerMapErr {
|
||||
inner: Deadline::new(inner_fut, Instant::now() + self.timeout)
|
||||
inner: Deadline::new(inner_fut, Instant::now() + self.timeout),
|
||||
};
|
||||
Ok(Async::Ready(Some(fut)))
|
||||
} else {
|
||||
@ -184,7 +185,8 @@ pub struct TimeoutIncoming<InnerFut> {
|
||||
}
|
||||
|
||||
impl<InnerFut> Future for TimeoutIncoming<InnerFut>
|
||||
where InnerFut: Future,
|
||||
where
|
||||
InnerFut: Future,
|
||||
{
|
||||
type Item = TokioTimerMapErr<Deadline<InnerFut::Item>>;
|
||||
type Error = InnerFut::Error;
|
||||
@ -192,7 +194,7 @@ where InnerFut: Future,
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
let inner_fut = try_ready!(self.inner.poll());
|
||||
let fut = TokioTimerMapErr {
|
||||
inner: Deadline::new(inner_fut, Instant::now() + self.timeout)
|
||||
inner: Deadline::new(inner_fut, Instant::now() + self.timeout),
|
||||
};
|
||||
Ok(Async::Ready(fut))
|
||||
}
|
||||
@ -206,25 +208,25 @@ pub struct TokioTimerMapErr<InnerFut> {
|
||||
}
|
||||
|
||||
impl<InnerFut> Future for TokioTimerMapErr<InnerFut>
|
||||
where InnerFut: Future<Error = DeadlineError<IoError>>
|
||||
where
|
||||
InnerFut: Future<Error = DeadlineError<IoError>>,
|
||||
{
|
||||
type Item = InnerFut::Item;
|
||||
type Error = IoError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||
self.inner.poll()
|
||||
.map_err(|err: DeadlineError<IoError>| {
|
||||
if err.is_inner() {
|
||||
err.into_inner().expect("ensured by is_inner()")
|
||||
} else if err.is_elapsed() {
|
||||
debug!("timeout elapsed for connection");
|
||||
IoErrorKind::TimedOut.into()
|
||||
} else {
|
||||
assert!(err.is_timer());
|
||||
debug!("tokio timer error in timeout wrapper");
|
||||
let err = err.into_timer().expect("ensure by is_timer()");
|
||||
IoError::new(IoErrorKind::Other, err)
|
||||
}
|
||||
})
|
||||
self.inner.poll().map_err(|err: DeadlineError<IoError>| {
|
||||
if err.is_inner() {
|
||||
err.into_inner().expect("ensured by is_inner()")
|
||||
} else if err.is_elapsed() {
|
||||
debug!("timeout elapsed for connection");
|
||||
IoErrorKind::TimedOut.into()
|
||||
} else {
|
||||
assert!(err.is_timer());
|
||||
debug!("tokio timer error in timeout wrapper");
|
||||
let err = err.into_timer().expect("ensure by is_timer()");
|
||||
IoError::new(IoErrorKind::Other, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -20,7 +20,7 @@
|
||||
|
||||
use futures::stream::Then as StreamThen;
|
||||
use futures::sync::{mpsc, oneshot};
|
||||
use futures::{Async, future, Future, Poll, Stream, future::FutureResult};
|
||||
use futures::{future, future::FutureResult, Async, Future, Poll, Stream};
|
||||
use multiaddr::{AddrComponent, Multiaddr};
|
||||
use rw_stream_sink::RwStreamSink;
|
||||
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
|
||||
@ -55,7 +55,8 @@ impl Transport for BrowserWsConfig {
|
||||
type Output = BrowserWsConn;
|
||||
type MultiaddrFuture = FutureResult<Multiaddr, IoError>;
|
||||
type Listener = Box<Stream<Item = Self::ListenerUpgrade, Error = IoError>>; // TODO: use `!`
|
||||
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>; // TODO: use `!`
|
||||
type ListenerUpgrade =
|
||||
Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>; // TODO: use `!`
|
||||
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
|
||||
|
||||
#[inline]
|
||||
|
@ -66,7 +66,8 @@ where
|
||||
type MultiaddrFuture = Box<Future<Item = Multiaddr, Error = IoError>>;
|
||||
type Listener =
|
||||
stream::Map<T::Listener, fn(<T as Transport>::ListenerUpgrade) -> Self::ListenerUpgrade>;
|
||||
type ListenerUpgrade = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
|
||||
type ListenerUpgrade =
|
||||
Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
|
||||
type Dial = Box<Future<Item = (Self::Output, Self::MultiaddrFuture), Error = IoError>>;
|
||||
|
||||
fn listen_on(
|
||||
@ -159,8 +160,10 @@ where
|
||||
Some(AddrComponent::WS) => false,
|
||||
Some(AddrComponent::WSS) => true,
|
||||
_ => {
|
||||
trace!("Ignoring dial attempt for {} because it is not a websocket multiaddr",
|
||||
original_addr);
|
||||
trace!(
|
||||
"Ignoring dial attempt for {} because it is not a websocket multiaddr",
|
||||
original_addr
|
||||
);
|
||||
return Err((self, original_addr));
|
||||
}
|
||||
};
|
||||
@ -172,9 +175,10 @@ where
|
||||
let inner_dial = match self.transport.dial(inner_addr) {
|
||||
Ok(d) => d,
|
||||
Err((transport, old_addr)) => {
|
||||
debug!("Failed to dial {} because {} is not supported by the underlying transport",
|
||||
original_addr,
|
||||
old_addr);
|
||||
debug!(
|
||||
"Failed to dial {} because {} is not supported by the underlying transport",
|
||||
original_addr, old_addr
|
||||
);
|
||||
return Err((
|
||||
WsConfig {
|
||||
transport: transport,
|
||||
@ -221,9 +225,7 @@ where
|
||||
let read_write = RwStreamSink::new(framed_data);
|
||||
Box::new(read_write) as Box<AsyncStream>
|
||||
})
|
||||
.map(move |c| {
|
||||
(c, client_addr)
|
||||
})
|
||||
.map(move |c| (c, client_addr))
|
||||
});
|
||||
|
||||
Ok(Box::new(dial) as Box<_>)
|
||||
@ -289,10 +291,10 @@ mod tests {
|
||||
extern crate libp2p_tcp_transport as tcp;
|
||||
extern crate tokio_core;
|
||||
use self::tokio_core::reactor::Core;
|
||||
use WsConfig;
|
||||
use futures::{Future, Stream};
|
||||
use multiaddr::Multiaddr;
|
||||
use swarm::Transport;
|
||||
use WsConfig;
|
||||
|
||||
#[test]
|
||||
fn dialer_connects_to_listener_ipv4() {
|
||||
|
Reference in New Issue
Block a user