mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-07-02 03:01:33 +00:00
Use bounded channels in transport (#987)
* Implement DialFuture * Update with recommended changes to buffer size, `expect()` and `close()`
This commit is contained in:
@ -29,19 +29,50 @@ use rw_stream_sink::RwStreamSink;
|
|||||||
use std::{collections::hash_map::Entry, error, fmt, io, num::NonZeroU64};
|
use std::{collections::hash_map::Entry, error, fmt, io, num::NonZeroU64};
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref HUB: Mutex<FnvHashMap<NonZeroU64, mpsc::UnboundedSender<Channel<Bytes>>>> = Mutex::new(FnvHashMap::default());
|
static ref HUB: Mutex<FnvHashMap<NonZeroU64, mpsc::Sender<Channel<Bytes>>>> = Mutex::new(FnvHashMap::default());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Transport that supports `/memory/N` multiaddresses.
|
/// Transport that supports `/memory/N` multiaddresses.
|
||||||
#[derive(Debug, Copy, Clone, Default)]
|
#[derive(Debug, Copy, Clone, Default)]
|
||||||
pub struct MemoryTransport;
|
pub struct MemoryTransport;
|
||||||
|
|
||||||
|
/// Connection to a `MemoryTransport` currently being opened.
|
||||||
|
pub struct DialFuture {
|
||||||
|
sender: mpsc::Sender<Channel<Bytes>>,
|
||||||
|
channel_to_send: Option<Channel<Bytes>>,
|
||||||
|
channel_to_return: Option<Channel<Bytes>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for DialFuture {
|
||||||
|
type Item = Channel<Bytes>;
|
||||||
|
type Error = MemoryTransportError;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
if let Some(c) = self.channel_to_send.take() {
|
||||||
|
match self.sender.start_send(c) {
|
||||||
|
Err(_) => return Err(MemoryTransportError::Unreachable),
|
||||||
|
Ok(AsyncSink::NotReady(t)) => {
|
||||||
|
self.channel_to_send = Some(t);
|
||||||
|
return Ok(Async::NotReady)
|
||||||
|
},
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
match self.sender.close() {
|
||||||
|
Err(_) => Err(MemoryTransportError::Unreachable),
|
||||||
|
Ok(Async::NotReady) => Ok(Async::NotReady),
|
||||||
|
Ok(Async::Ready(_)) => Ok(Async::Ready(self.channel_to_return.take()
|
||||||
|
.expect("Future should not be polled again once complete"))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Transport for MemoryTransport {
|
impl Transport for MemoryTransport {
|
||||||
type Output = Channel<Bytes>;
|
type Output = Channel<Bytes>;
|
||||||
type Error = MemoryTransportError;
|
type Error = MemoryTransportError;
|
||||||
type Listener = Listener;
|
type Listener = Listener;
|
||||||
type ListenerUpgrade = FutureResult<Self::Output, Self::Error>;
|
type ListenerUpgrade = FutureResult<Self::Output, Self::Error>;
|
||||||
type Dial = FutureResult<Self::Output, Self::Error>;
|
type Dial = DialFuture;
|
||||||
|
|
||||||
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
|
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), TransportError<Self::Error>> {
|
||||||
let port = if let Ok(port) = parse_memory_addr(&addr) {
|
let port = if let Ok(port) = parse_memory_addr(&addr) {
|
||||||
@ -68,7 +99,7 @@ impl Transport for MemoryTransport {
|
|||||||
|
|
||||||
let actual_addr = Protocol::Memory(port.get()).into();
|
let actual_addr = Protocol::Memory(port.get()).into();
|
||||||
|
|
||||||
let (tx, rx) = mpsc::unbounded();
|
let (tx, rx) = mpsc::channel(2);
|
||||||
match hub.entry(port) {
|
match hub.entry(port) {
|
||||||
Entry::Occupied(_) => return Err(TransportError::Other(MemoryTransportError::Unreachable)),
|
Entry::Occupied(_) => return Err(TransportError::Other(MemoryTransportError::Unreachable)),
|
||||||
Entry::Vacant(e) => e.insert(tx),
|
Entry::Vacant(e) => e.insert(tx),
|
||||||
@ -82,7 +113,7 @@ impl Transport for MemoryTransport {
|
|||||||
Ok((listener, actual_addr))
|
Ok((listener, actual_addr))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
fn dial(self, addr: Multiaddr) -> Result<DialFuture, TransportError<Self::Error>> {
|
||||||
let port = if let Ok(port) = parse_memory_addr(&addr) {
|
let port = if let Ok(port) = parse_memory_addr(&addr) {
|
||||||
if let Some(port) = NonZeroU64::new(port) {
|
if let Some(port) = NonZeroU64::new(port) {
|
||||||
port
|
port
|
||||||
@ -94,20 +125,18 @@ impl Transport for MemoryTransport {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let hub = HUB.lock();
|
let hub = HUB.lock();
|
||||||
let chan = if let Some(tx) = hub.get(&port) {
|
if let Some(sender) = hub.get(&port) {
|
||||||
let (a_tx, a_rx) = mpsc::unbounded();
|
let (a_tx, a_rx) = mpsc::channel(4096);
|
||||||
let (b_tx, b_rx) = mpsc::unbounded();
|
let (b_tx, b_rx) = mpsc::channel(4096);
|
||||||
let a = RwStreamSink::new(Chan { incoming: a_rx, outgoing: b_tx });
|
Ok(DialFuture {
|
||||||
let b = RwStreamSink::new(Chan { incoming: b_rx, outgoing: a_tx });
|
sender: sender.clone(),
|
||||||
if tx.unbounded_send(b).is_err() {
|
channel_to_send: Some(RwStreamSink::new(Chan { incoming: a_rx, outgoing: b_tx })),
|
||||||
return Err(TransportError::Other(MemoryTransportError::Unreachable));
|
channel_to_return: Some(RwStreamSink::new(Chan { incoming: b_rx, outgoing: a_tx })),
|
||||||
}
|
|
||||||
a
|
|
||||||
} else {
|
|
||||||
return Err(TransportError::Other(MemoryTransportError::Unreachable));
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(future::ok(chan))
|
})
|
||||||
|
} else {
|
||||||
|
Err(TransportError::Other(MemoryTransportError::Unreachable))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
|
fn nat_traversal(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
|
||||||
@ -145,7 +174,7 @@ pub struct Listener {
|
|||||||
/// Port we're listening on.
|
/// Port we're listening on.
|
||||||
port: NonZeroU64,
|
port: NonZeroU64,
|
||||||
/// Receives incoming connections.
|
/// Receives incoming connections.
|
||||||
receiver: mpsc::UnboundedReceiver<Channel<Bytes>>,
|
receiver: mpsc::Receiver<Channel<Bytes>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Stream for Listener {
|
impl Stream for Listener {
|
||||||
@ -197,8 +226,8 @@ pub type Channel<T> = RwStreamSink<Chan<T>>;
|
|||||||
///
|
///
|
||||||
/// Implements `Sink` and `Stream`.
|
/// Implements `Sink` and `Stream`.
|
||||||
pub struct Chan<T = Bytes> {
|
pub struct Chan<T = Bytes> {
|
||||||
incoming: mpsc::UnboundedReceiver<T>,
|
incoming: mpsc::Receiver<T>,
|
||||||
outgoing: mpsc::UnboundedSender<T>,
|
outgoing: mpsc::Sender<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Stream for Chan<T> {
|
impl<T> Stream for Chan<T> {
|
||||||
|
Reference in New Issue
Block a user