mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-07-31 00:41:59 +00:00
Update libp2p-uds to futures 0.3 (#1308)
* Update libp2p-uds to futures 0.3 * Some clean-up
This commit is contained in:
@@ -10,13 +10,10 @@ keywords = ["peer-to-peer", "libp2p", "networking"]
|
||||
categories = ["network-programming", "asynchronous"]
|
||||
|
||||
[target.'cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))'.dependencies]
|
||||
async-std = "1.0"
|
||||
libp2p-core = { version = "0.13.0", path = "../../core" }
|
||||
log = "0.4.1"
|
||||
futures-preview = "0.3.0-alpha.18"
|
||||
romio = "0.3.0-alpha.9"
|
||||
futures = "0.3.1"
|
||||
|
||||
[target.'cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))'.dev-dependencies]
|
||||
tempfile = "3.0"
|
||||
|
||||
[dev-dependencies]
|
||||
async-std = "0.99"
|
||||
|
@@ -44,16 +44,16 @@
|
||||
|
||||
#![cfg(all(unix, not(any(target_os = "emscripten", target_os = "unknown"))))]
|
||||
|
||||
use futures::{prelude::*, ready, future::Ready};
|
||||
use async_std::os::unix::net::{UnixListener, UnixStream};
|
||||
use futures::{prelude::*, future::Ready};
|
||||
use futures::stream::Stream;
|
||||
use log::debug;
|
||||
use romio::uds::{UnixListener, UnixStream};
|
||||
use std::{io, path::PathBuf, pin::Pin, task::Context, task::Poll};
|
||||
use libp2p_core::{
|
||||
Transport,
|
||||
multiaddr::{Protocol, Multiaddr},
|
||||
transport::{ListenerEvent, TransportError}
|
||||
};
|
||||
use log::debug;
|
||||
use std::{io, path::PathBuf, pin::Pin};
|
||||
|
||||
/// Represents the configuration for a Unix domain sockets transport capability for libp2p.
|
||||
///
|
||||
@@ -74,27 +74,38 @@ impl UdsConfig {
|
||||
impl Transport for UdsConfig {
|
||||
type Output = UnixStream;
|
||||
type Error = io::Error;
|
||||
type Listener = ListenerStream<romio::uds::Incoming>;
|
||||
type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade>, Self::Error>> + Send>>;
|
||||
type ListenerUpgrade = Ready<Result<Self::Output, io::Error>>;
|
||||
type Dial = romio::uds::ConnectFuture;
|
||||
type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
||||
|
||||
fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
|
||||
if let Ok(path) = multiaddr_to_path(&addr) {
|
||||
let listener = UnixListener::bind(&path);
|
||||
// We need to build the `Multiaddr` to return from this function. If an error happened,
|
||||
// just return the original multiaddr.
|
||||
match listener {
|
||||
Ok(listener) => {
|
||||
debug!("Now listening on {}", addr);
|
||||
let future = ListenerStream {
|
||||
stream: listener.incoming(),
|
||||
addr: addr.clone(),
|
||||
tell_new_addr: true
|
||||
};
|
||||
Ok(future)
|
||||
}
|
||||
Err(_) => return Err(TransportError::MultiaddrNotSupported(addr)),
|
||||
}
|
||||
Ok(Box::pin(async move { UnixListener::bind(&path).await }
|
||||
.map_ok(move |listener| {
|
||||
stream::once({
|
||||
let addr = addr.clone();
|
||||
async move {
|
||||
debug!("Now listening on {}", addr);
|
||||
Ok(ListenerEvent::NewAddress(addr))
|
||||
}
|
||||
}).chain(stream::unfold(listener, move |listener| {
|
||||
let addr = addr.clone();
|
||||
async move {
|
||||
let (stream, _) = match listener.accept().await {
|
||||
Ok(v) => v,
|
||||
Err(err) => return Some((Err(err), listener))
|
||||
};
|
||||
debug!("incoming connection on {}", addr);
|
||||
let event = ListenerEvent::Upgrade {
|
||||
upgrade: future::ok(stream),
|
||||
local_addr: addr.clone(),
|
||||
remote_addr: addr.clone()
|
||||
};
|
||||
Some((Ok(event), listener))
|
||||
}
|
||||
}))
|
||||
})
|
||||
.try_flatten_stream()))
|
||||
} else {
|
||||
Err(TransportError::MultiaddrNotSupported(addr))
|
||||
}
|
||||
@@ -103,7 +114,7 @@ impl Transport for UdsConfig {
|
||||
fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
|
||||
if let Ok(path) = multiaddr_to_path(&addr) {
|
||||
debug!("Dialing {}", addr);
|
||||
Ok(UnixStream::connect(&path))
|
||||
Ok(Box::pin(async move { UnixStream::connect(&path).await }))
|
||||
} else {
|
||||
Err(TransportError::MultiaddrNotSupported(addr))
|
||||
}
|
||||
@@ -135,38 +146,6 @@ fn multiaddr_to_path(addr: &Multiaddr) -> Result<PathBuf, ()> {
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
pub struct ListenerStream<T> {
|
||||
stream: T,
|
||||
addr: Multiaddr,
|
||||
tell_new_addr: bool
|
||||
}
|
||||
|
||||
impl<T> Stream for ListenerStream<T>
|
||||
where
|
||||
T: TryStream + Unpin
|
||||
{
|
||||
type Item = Result<ListenerEvent<future::Ready<Result<T::Ok, T::Error>>>, T::Error>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
if self.tell_new_addr {
|
||||
self.tell_new_addr = false;
|
||||
return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(self.addr.clone()))))
|
||||
}
|
||||
|
||||
match ready!(TryStream::try_poll_next(Pin::new(&mut self.stream), cx)) {
|
||||
Some(item) => {
|
||||
debug!("incoming connection on {}", self.addr);
|
||||
Poll::Ready(Some(Ok(ListenerEvent::Upgrade {
|
||||
upgrade: future::ready(item),
|
||||
local_addr: self.addr.clone(),
|
||||
remote_addr: self.addr.clone()
|
||||
})))
|
||||
}
|
||||
None => Poll::Ready(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{multiaddr_to_path, UdsConfig};
|
||||
|
Reference in New Issue
Block a user