mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-05-08 00:52:18 +00:00
Merge pull request #80 from tomaka/fix-multi-connec
Change API to allow multiple simultaneous clients
This commit is contained in:
commit
642d18e1ac
@ -95,62 +95,49 @@ fn main() {
|
|||||||
println!("Now listening on {:?}", address);
|
println!("Now listening on {:?}", address);
|
||||||
|
|
||||||
let future = listener
|
let future = listener
|
||||||
.filter_map(|(socket, client_addr)| {
|
|
||||||
let client_addr = client_addr.to_string();
|
|
||||||
|
|
||||||
// This closure is called whenever a new connection has been received. The `socket`
|
|
||||||
// is a `Result<..., IoError>` which contains an error if for example protocol
|
|
||||||
// negotiation or the secio handshake failed. We handle this situation by printing a
|
|
||||||
// message on stderr and ignoring the connection.
|
|
||||||
match socket {
|
|
||||||
Ok(s) => Some((s, client_addr)),
|
|
||||||
Err(err) => {
|
|
||||||
eprintln!("Failed connection attempt from {}\n => Error: {:?}",
|
|
||||||
client_addr, err);
|
|
||||||
None
|
|
||||||
},
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
.for_each(|(socket, client_addr)| {
|
.for_each(|(socket, client_addr)| {
|
||||||
// This closure is called whenever a new connection has been received and successfully
|
// This closure is called whenever a new connection has been received.
|
||||||
// upgraded to use secio/plaintext and echo.
|
// `socket` is a future that will be triggered once the upgrade to secio, multiplex
|
||||||
println!("Successfully negotiated protocol with {}", client_addr);
|
// and echo is complete.
|
||||||
|
let client_addr = client_addr.to_string();
|
||||||
|
println!("Incoming connection from {}", client_addr);
|
||||||
|
|
||||||
// We loop forever in order to handle all the messages sent by the client.
|
socket
|
||||||
let client_finished = {
|
.and_then(move |socket| {
|
||||||
let client_addr = client_addr.clone();
|
println!("Successfully negotiated protocol with {}", client_addr);
|
||||||
loop_fn(socket, move |socket| {
|
|
||||||
let client_addr = client_addr.clone();
|
// We loop forever in order to handle all the messages sent by the client.
|
||||||
socket.into_future()
|
loop_fn(socket, move |socket| {
|
||||||
.map_err(|(err, _)| err)
|
let client_addr = client_addr.clone();
|
||||||
.and_then(move |(msg, rest)| {
|
socket.into_future()
|
||||||
if let Some(msg) = msg {
|
.map_err(|(err, _)| err)
|
||||||
// One message has been received. We send it back to the client.
|
.and_then(move |(msg, rest)| {
|
||||||
println!("Received a message from {}: {:?}\n => Sending back \
|
if let Some(msg) = msg {
|
||||||
identical message to remote", client_addr, msg);
|
// One message has been received. We send it back to the client.
|
||||||
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
|
println!("Received a message from {}: {:?}\n => Sending back \
|
||||||
as Box<Future<Item = _, Error = _>>
|
identical message to remote", client_addr, msg);
|
||||||
} else {
|
Box::new(rest.send(msg).map(|m| Loop::Continue(m)))
|
||||||
// End of stream. Connection closed. Breaking the loop.
|
as Box<Future<Item = _, Error = _>>
|
||||||
println!("Received EOF from {}\n => Dropping connection",
|
} else {
|
||||||
client_addr);
|
// End of stream. Connection closed. Breaking the loop.
|
||||||
Box::new(Ok(Loop::Break(())).into_future())
|
println!("Received EOF from {}\n => Dropping connection",
|
||||||
as Box<Future<Item = _, Error = _>>
|
client_addr);
|
||||||
}
|
Box::new(Ok(Loop::Break(())).into_future())
|
||||||
})
|
as Box<Future<Item = _, Error = _>>
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
};
|
|
||||||
|
|
||||||
// We absorb errors from the `client_finished` future so that an error while processing
|
// We absorb errors from the future so that an error while processing a client
|
||||||
// a client (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
|
// (eg. if the client unexpectedly disconnects) doesn't propagate and stop the
|
||||||
// entire server.
|
// entire server.
|
||||||
client_finished.then(move |res| {
|
.then(move |res| {
|
||||||
if let Err(err) = res {
|
if let Err(err) = res {
|
||||||
println!("Error while processing client {}: {:?}", client_addr, err);
|
println!("Error while processing client: {:?}", err);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
// `future` is a future that contains all the behaviour that we want, but nothing has actually
|
// `future` is a future that contains all the behaviour that we want, but nothing has actually
|
||||||
|
@ -44,6 +44,7 @@
|
|||||||
//! figured out.
|
//! figured out.
|
||||||
|
|
||||||
use futures::{Async, Future, Poll, Stream};
|
use futures::{Async, Future, Poll, Stream};
|
||||||
|
use futures::future::{IntoFuture, FutureResult};
|
||||||
use futures::stream::Fuse as StreamFuse;
|
use futures::stream::Fuse as StreamFuse;
|
||||||
use futures::stream;
|
use futures::stream;
|
||||||
use multiaddr::Multiaddr;
|
use multiaddr::Multiaddr;
|
||||||
@ -87,15 +88,8 @@ where
|
|||||||
C::NamesIter: Clone, // TODO: not elegant
|
C::NamesIter: Clone, // TODO: not elegant
|
||||||
{
|
{
|
||||||
type RawConn = <C::Output as StreamMuxer>::Substream;
|
type RawConn = <C::Output as StreamMuxer>::Substream;
|
||||||
type Listener = ConnectionReuseListener<
|
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>;
|
||||||
Box<
|
type ListenerUpgrade = FutureResult<Self::RawConn, IoError>;
|
||||||
Stream<
|
|
||||||
Item = (Result<C::Output, IoError>, Multiaddr),
|
|
||||||
Error = IoError,
|
|
||||||
>,
|
|
||||||
>,
|
|
||||||
C::Output,
|
|
||||||
>;
|
|
||||||
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
||||||
|
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
|
||||||
@ -108,10 +102,11 @@ where
|
|||||||
|
|
||||||
let listener = ConnectionReuseListener {
|
let listener = ConnectionReuseListener {
|
||||||
listener: listener.fuse(),
|
listener: listener.fuse(),
|
||||||
|
current_upgrades: Vec::new(),
|
||||||
connections: Vec::new(),
|
connections: Vec::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((listener, new_addr))
|
Ok((Box::new(listener) as Box<_>, new_addr))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, (Self, Multiaddr)> {
|
||||||
@ -163,34 +158,29 @@ where
|
|||||||
|
|
||||||
/// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
|
/// Implementation of `Stream<Item = (impl AsyncRead + AsyncWrite, Multiaddr)` for the
|
||||||
/// `ConnectionReuse` struct.
|
/// `ConnectionReuse` struct.
|
||||||
pub struct ConnectionReuseListener<S, M>
|
pub struct ConnectionReuseListener<S, F, M>
|
||||||
where
|
where
|
||||||
S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>,
|
S: Stream<Item = (F, Multiaddr), Error = IoError>,
|
||||||
M: StreamMuxer
|
F: Future<Item = M, Error = IoError>,
|
||||||
|
M: StreamMuxer,
|
||||||
{
|
{
|
||||||
listener: StreamFuse<S>,
|
listener: StreamFuse<S>,
|
||||||
|
current_upgrades: Vec<(F, Multiaddr)>,
|
||||||
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
|
connections: Vec<(M, <M as StreamMuxer>::InboundSubstream, Multiaddr)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S, M> Stream for ConnectionReuseListener<S, M>
|
impl<S, F, M> Stream for ConnectionReuseListener<S, F, M>
|
||||||
where S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>,
|
where S: Stream<Item = (F, Multiaddr), Error = IoError>,
|
||||||
|
F: Future<Item = M, Error = IoError>,
|
||||||
M: StreamMuxer + Clone + 'static // TODO: 'static :(
|
M: StreamMuxer + Clone + 'static // TODO: 'static :(
|
||||||
{
|
{
|
||||||
type Item = (Result<M::Substream, IoError>, Multiaddr);
|
type Item = (FutureResult<M::Substream, IoError>, Multiaddr);
|
||||||
type Error = IoError;
|
type Error = IoError;
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
match self.listener.poll() {
|
match self.listener.poll() {
|
||||||
Ok(Async::Ready(Some((upgrade, client_addr)))) => {
|
Ok(Async::Ready(Some((upgrade, client_addr)))) => {
|
||||||
match upgrade {
|
self.current_upgrades.push((upgrade, client_addr));
|
||||||
Ok(upgrade) => {
|
|
||||||
let next_incoming = upgrade.clone().inbound();
|
|
||||||
self.connections.push((upgrade, next_incoming, client_addr));
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
return Ok(Async::Ready(Some((Err(err), client_addr))));
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => (),
|
Ok(Async::NotReady) => (),
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
@ -208,7 +198,37 @@ where S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>,
|
|||||||
// Most of the time, this array will contain 0 or 1 elements, but sometimes it may contain
|
// Most of the time, this array will contain 0 or 1 elements, but sometimes it may contain
|
||||||
// more and we don't want to panic if that happens. With 8 elements, we can be pretty
|
// more and we don't want to panic if that happens. With 8 elements, we can be pretty
|
||||||
// confident that this is never going to spill into a `Vec`.
|
// confident that this is never going to spill into a `Vec`.
|
||||||
let mut connections_to_drop: SmallVec<[_; 8]> = SmallVec::new();
|
let mut upgrades_to_drop: SmallVec<[_; 8]> = SmallVec::new();
|
||||||
|
let mut early_ret = None;
|
||||||
|
|
||||||
|
for (index, &mut (ref mut current_upgrade, ref mut client_addr)) in
|
||||||
|
self.current_upgrades.iter_mut().enumerate()
|
||||||
|
{
|
||||||
|
match current_upgrade.poll() {
|
||||||
|
Ok(Async::Ready(muxer)) => {
|
||||||
|
let next_incoming = muxer.clone().inbound();
|
||||||
|
self.connections.push((muxer, next_incoming, client_addr.clone()));
|
||||||
|
upgrades_to_drop.push(index);
|
||||||
|
},
|
||||||
|
Ok(Async::NotReady) => {},
|
||||||
|
Err(err) => {
|
||||||
|
upgrades_to_drop.push(index);
|
||||||
|
early_ret = Some(Async::Ready(Some((Err(err).into_future(), client_addr.clone()))));
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for &index in upgrades_to_drop.iter().rev() {
|
||||||
|
self.current_upgrades.swap_remove(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(early_ret) = early_ret {
|
||||||
|
return Ok(early_ret);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We reuse `upgrades_to_drop`.
|
||||||
|
upgrades_to_drop.clear();
|
||||||
|
let mut connections_to_drop = upgrades_to_drop;
|
||||||
|
|
||||||
for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in
|
for (index, &mut (ref mut muxer, ref mut next_incoming, ref client_addr)) in
|
||||||
self.connections.iter_mut().enumerate()
|
self.connections.iter_mut().enumerate()
|
||||||
@ -217,7 +237,7 @@ where S: Stream<Item = (Result<M, IoError>, Multiaddr), Error = IoError>,
|
|||||||
Ok(Async::Ready(incoming)) => {
|
Ok(Async::Ready(incoming)) => {
|
||||||
let mut new_next = muxer.clone().inbound();
|
let mut new_next = muxer.clone().inbound();
|
||||||
*next_incoming = new_next;
|
*next_incoming = new_next;
|
||||||
return Ok(Async::Ready(Some((Ok(incoming), client_addr.clone()))));
|
return Ok(Async::Ready(Some((Ok(incoming).into_future(), client_addr.clone()))));
|
||||||
}
|
}
|
||||||
Ok(Async::NotReady) => {}
|
Ok(Async::NotReady) => {}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
|
@ -26,8 +26,8 @@
|
|||||||
//! encryption middleware to the connection).
|
//! encryption middleware to the connection).
|
||||||
//!
|
//!
|
||||||
//! Thanks to the `Transport::or_transport`, `Transport::with_upgrade` and
|
//! Thanks to the `Transport::or_transport`, `Transport::with_upgrade` and
|
||||||
//! `UpgradeNode::or_upgrade` methods, you can combine multiple transports and/or upgrades together
|
//! `UpgradedNode::or_upgrade` methods, you can combine multiple transports and/or upgrades
|
||||||
//! in a complex chain of protocols negotiation.
|
//! together in a complex chain of protocols negotiation.
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use connection_reuse::ConnectionReuse;
|
use connection_reuse::ConnectionReuse;
|
||||||
@ -56,7 +56,16 @@ pub trait Transport {
|
|||||||
type RawConn: AsyncRead + AsyncWrite;
|
type RawConn: AsyncRead + AsyncWrite;
|
||||||
|
|
||||||
/// The listener produces incoming connections.
|
/// The listener produces incoming connections.
|
||||||
type Listener: Stream<Item = (Result<Self::RawConn, IoError>, Multiaddr), Error = IoError>;
|
///
|
||||||
|
/// An item should be produced whenever a connection is received at the lowest level of the
|
||||||
|
/// transport stack. The item is a `Future` that is signalled once some pre-processing has
|
||||||
|
/// taken place, and that connection has been upgraded to the wanted protocols.
|
||||||
|
type Listener: Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>;
|
||||||
|
|
||||||
|
/// After a connection has been received, we may need to do some asynchronous pre-processing
|
||||||
|
/// on it (eg. an intermediary protocol negotiation). While this pre-processing takes place, we
|
||||||
|
/// want to be able to continue polling on the listener.
|
||||||
|
type ListenerUpgrade: Future<Item = Self::RawConn, Error = IoError>;
|
||||||
|
|
||||||
/// A future which indicates that we are currently dialing to a peer.
|
/// A future which indicates that we are currently dialing to a peer.
|
||||||
type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
|
type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
|
||||||
@ -138,7 +147,8 @@ pub struct DeniedTransport;
|
|||||||
impl Transport for DeniedTransport {
|
impl Transport for DeniedTransport {
|
||||||
// TODO: could use `!` for associated types once stable
|
// TODO: could use `!` for associated types once stable
|
||||||
type RawConn = Cursor<Vec<u8>>;
|
type RawConn = Cursor<Vec<u8>>;
|
||||||
type Listener = Box<Stream<Item = (Result<Self::RawConn, IoError>, Multiaddr), Error = IoError>>;
|
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>;
|
||||||
|
type ListenerUpgrade = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
||||||
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
type Dial = Box<Future<Item = Self::RawConn, Error = IoError>>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -175,6 +185,7 @@ where
|
|||||||
{
|
{
|
||||||
type RawConn = EitherSocket<A::RawConn, B::RawConn>;
|
type RawConn = EitherSocket<A::RawConn, B::RawConn>;
|
||||||
type Listener = EitherListenStream<A::Listener, B::Listener>;
|
type Listener = EitherListenStream<A::Listener, B::Listener>;
|
||||||
|
type ListenerUpgrade = EitherTransportFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
|
||||||
type Dial =
|
type Dial =
|
||||||
EitherTransportFuture<<A::Dial as IntoFuture>::Future, <B::Dial as IntoFuture>::Future>;
|
EitherTransportFuture<<A::Dial as IntoFuture>::Future, <B::Dial as IntoFuture>::Future>;
|
||||||
|
|
||||||
@ -308,19 +319,19 @@ pub enum EitherListenStream<A, B> {
|
|||||||
|
|
||||||
impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
|
impl<AStream, BStream, AInner, BInner> Stream for EitherListenStream<AStream, BStream>
|
||||||
where
|
where
|
||||||
AStream: Stream<Item = (Result<AInner, IoError>, Multiaddr), Error = IoError>,
|
AStream: Stream<Item = (AInner, Multiaddr), Error = IoError>,
|
||||||
BStream: Stream<Item = (Result<BInner, IoError>, Multiaddr), Error = IoError>,
|
BStream: Stream<Item = (BInner, Multiaddr), Error = IoError>,
|
||||||
{
|
{
|
||||||
type Item = (Result<EitherSocket<AInner, BInner>, IoError>, Multiaddr);
|
type Item = (EitherTransportFuture<AInner, BInner>, Multiaddr);
|
||||||
type Error = IoError;
|
type Error = IoError;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||||
match self {
|
match self {
|
||||||
&mut EitherListenStream::First(ref mut a) => a.poll()
|
&mut EitherListenStream::First(ref mut a) => a.poll()
|
||||||
.map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::First), a)))),
|
.map(|i| i.map(|v| v.map(|(s, a)| (EitherTransportFuture::First(s), a)))),
|
||||||
&mut EitherListenStream::Second(ref mut a) => a.poll()
|
&mut EitherListenStream::Second(ref mut a) => a.poll()
|
||||||
.map(|i| i.map(|v| v.map(|(s, a)| (s.map(EitherSocket::Second), a)))),
|
.map(|i| i.map(|v| v.map(|(s, a)| (EitherTransportFuture::Second(s), a)))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -853,7 +864,7 @@ where
|
|||||||
self,
|
self,
|
||||||
addr: Multiaddr,
|
addr: Multiaddr,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
(Box<Stream<Item = (Result<C::Output, IoError>, Multiaddr), Error = IoError> + 'a>, Multiaddr),
|
(Box<Stream<Item = (Box<Future<Item = C::Output, Error = IoError> + 'a>, Multiaddr), Error = IoError> + 'a>, Multiaddr),
|
||||||
(Self, Multiaddr),
|
(Self, Multiaddr),
|
||||||
>
|
>
|
||||||
where
|
where
|
||||||
@ -879,30 +890,22 @@ where
|
|||||||
// Instead the `stream` will produce `Ok(Err(...))`.
|
// Instead the `stream` will produce `Ok(Err(...))`.
|
||||||
// `stream` can only produce an `Err` if `listening_stream` produces an `Err`.
|
// `stream` can only produce an `Err` if `listening_stream` produces an `Err`.
|
||||||
let stream = listening_stream
|
let stream = listening_stream
|
||||||
// Try to negotiate the protocol
|
.map(move |(connection, client_addr)| {
|
||||||
.and_then(move |(connection, client_addr)| {
|
let upgrade = upgrade.clone();
|
||||||
// Turn the `Result<impl AsyncRead + AsyncWrite, IoError>` into
|
let connection = connection
|
||||||
// a `Result<impl Future<Item = impl AsyncRead + AsyncWrite, Error = IoError>, IoError>`
|
// Try to negotiate the protocol
|
||||||
let connection = connection.map(|connection| {
|
.and_then(move |connection| {
|
||||||
let upgrade = upgrade.clone();
|
let iter = upgrade.protocol_names()
|
||||||
let iter = upgrade.protocol_names()
|
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
|
||||||
.map::<_, fn(_) -> _>(|(n, t)| (n, <Bytes as PartialEq>::eq, t));
|
multistream_select::listener_select_proto(connection, iter)
|
||||||
multistream_select::listener_select_proto(connection, iter)
|
.map_err(|err| IoError::new(IoErrorKind::Other, err))
|
||||||
.map_err(|err| IoError::new(IoErrorKind::Other, err))
|
.and_then(|(upgrade_id, connection)| {
|
||||||
.and_then(|(upgrade_id, connection)| {
|
upgrade.upgrade(connection, upgrade_id, Endpoint::Listener)
|
||||||
upgrade.upgrade(connection, upgrade_id, Endpoint::Listener)
|
})
|
||||||
})
|
.into_future()
|
||||||
});
|
});
|
||||||
|
|
||||||
connection
|
(Box::new(connection) as Box<_>, client_addr)
|
||||||
.into_future()
|
|
||||||
.flatten()
|
|
||||||
.then(move |nego_res| {
|
|
||||||
match nego_res {
|
|
||||||
Ok(upgraded) => Ok((Ok(upgraded), client_addr)),
|
|
||||||
Err(err) => Ok((Err(err), client_addr)),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok((Box::new(stream), new_addr))
|
Ok((Box::new(stream), new_addr))
|
||||||
@ -918,7 +921,8 @@ where
|
|||||||
C: Clone,
|
C: Clone,
|
||||||
{
|
{
|
||||||
type RawConn = C::Output;
|
type RawConn = C::Output;
|
||||||
type Listener = Box<Stream<Item = (Result<C::Output, IoError>, Multiaddr), Error = IoError>>;
|
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>;
|
||||||
|
type ListenerUpgrade = Box<Future<Item = C::Output, Error = IoError>>;
|
||||||
type Dial = Box<Future<Item = C::Output, Error = IoError>>;
|
type Dial = Box<Future<Item = C::Output, Error = IoError>>;
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
|
@ -59,7 +59,7 @@ use std::io::Error as IoError;
|
|||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use tokio_core::reactor::Handle;
|
use tokio_core::reactor::Handle;
|
||||||
use tokio_core::net::{TcpStream, TcpListener, TcpStreamNew};
|
use tokio_core::net::{TcpStream, TcpListener, TcpStreamNew};
|
||||||
use futures::Future;
|
use futures::future::{self, Future, FutureResult, IntoFuture};
|
||||||
use futures::stream::Stream;
|
use futures::stream::Stream;
|
||||||
use multiaddr::{Multiaddr, AddrComponent, ToMultiaddr};
|
use multiaddr::{Multiaddr, AddrComponent, ToMultiaddr};
|
||||||
use swarm::Transport;
|
use swarm::Transport;
|
||||||
@ -84,13 +84,9 @@ impl TcpConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Transport for TcpConfig {
|
impl Transport for TcpConfig {
|
||||||
/// The raw connection.
|
|
||||||
type RawConn = TcpStream;
|
type RawConn = TcpStream;
|
||||||
|
type Listener = Box<Stream<Item = (Self::ListenerUpgrade, Multiaddr), Error = IoError>>;
|
||||||
/// The listener produces incoming connections.
|
type ListenerUpgrade = FutureResult<Self::RawConn, IoError>;
|
||||||
type Listener = Box<Stream<Item = (Result<Self::RawConn, IoError>, Multiaddr), Error = IoError>>;
|
|
||||||
|
|
||||||
/// A future which indicates currently dialing to a peer.
|
|
||||||
type Dial = TcpStreamNew;
|
type Dial = TcpStreamNew;
|
||||||
|
|
||||||
/// Listen on the given multi-addr.
|
/// Listen on the given multi-addr.
|
||||||
@ -109,12 +105,13 @@ impl Transport for TcpConfig {
|
|||||||
}
|
}
|
||||||
Err(_) => addr,
|
Err(_) => addr,
|
||||||
};
|
};
|
||||||
let future = futures::future::result(listener).map(|listener| {
|
|
||||||
|
let future = future::result(listener).map(|listener| {
|
||||||
// Pull out a stream of sockets for incoming connections
|
// Pull out a stream of sockets for incoming connections
|
||||||
listener.incoming().map(|(sock, addr)| {
|
listener.incoming().map(|(sock, addr)| {
|
||||||
let addr = addr.to_multiaddr()
|
let addr = addr.to_multiaddr()
|
||||||
.expect("generating a multiaddr from a socket addr never fails");
|
.expect("generating a multiaddr from a socket addr never fails");
|
||||||
(Ok(sock), addr)
|
(Ok(sock).into_future(), addr)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.flatten_stream();
|
.flatten_stream();
|
||||||
@ -224,16 +221,18 @@ mod tests {
|
|||||||
let tcp = TcpConfig::new(core.handle());
|
let tcp = TcpConfig::new(core.handle());
|
||||||
let handle = core.handle();
|
let handle = core.handle();
|
||||||
let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| {
|
let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| {
|
||||||
// Define what to do with the socket that just connected to us
|
sock.and_then(|sock| {
|
||||||
// Which in this case is read 3 bytes
|
// Define what to do with the socket that just connected to us
|
||||||
let handle_conn = tokio_io::io::read_exact(sock.unwrap(), [0; 3])
|
// Which in this case is read 3 bytes
|
||||||
.map(|(_, buf)| assert_eq!(buf, [1, 2, 3]))
|
let handle_conn = tokio_io::io::read_exact(sock, [0; 3])
|
||||||
.map_err(|err| panic!("IO error {:?}", err));
|
.map(|(_, buf)| assert_eq!(buf, [1, 2, 3]))
|
||||||
|
.map_err(|err| panic!("IO error {:?}", err));
|
||||||
|
|
||||||
// Spawn the future as a concurrent task
|
// Spawn the future as a concurrent task
|
||||||
handle.spawn(handle_conn);
|
handle.spawn(handle_conn);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
core.run(listener).unwrap();
|
core.run(listener).unwrap();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user