mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-27 00:31:35 +00:00
Update to soketto v0.4.0 (#1603)
* Update to soketto v0.4.0 * Remove patch section from Cargo.toml Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
This commit is contained in:
@ -11,7 +11,6 @@ categories = ["network-programming", "asynchronous"]
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
async-tls = "0.7.0"
|
async-tls = "0.7.0"
|
||||||
bytes = "0.5"
|
|
||||||
either = "1.5.3"
|
either = "1.5.3"
|
||||||
futures = "0.3.1"
|
futures = "0.3.1"
|
||||||
libp2p-core = { version = "0.19.0", path = "../../core" }
|
libp2p-core = { version = "0.19.0", path = "../../core" }
|
||||||
@ -19,10 +18,10 @@ log = "0.4.8"
|
|||||||
quicksink = "0.1"
|
quicksink = "0.1"
|
||||||
rustls = "0.17.0"
|
rustls = "0.17.0"
|
||||||
rw-stream-sink = "0.2.0"
|
rw-stream-sink = "0.2.0"
|
||||||
soketto = { version = "0.3", features = ["deflate"] }
|
soketto = { version = "0.4", features = ["deflate"] }
|
||||||
url = "2.1"
|
url = "2.1"
|
||||||
webpki = "0.21"
|
webpki = "0.21"
|
||||||
webpki-roots = "0.18"
|
webpki-roots = "0.18"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
libp2p-tcp = { version = "0.19.0", path = "../tcp" }
|
libp2p-tcp = { version = "0.19.0", path = "../tcp", features = ["async-std"] }
|
||||||
|
@ -19,7 +19,6 @@
|
|||||||
// DEALINGS IN THE SOFTWARE.
|
// DEALINGS IN THE SOFTWARE.
|
||||||
|
|
||||||
use async_tls::{client, server};
|
use async_tls::{client, server};
|
||||||
use bytes::BytesMut;
|
|
||||||
use crate::{error::Error, tls};
|
use crate::{error::Error, tls};
|
||||||
use either::Either;
|
use either::Either;
|
||||||
use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream};
|
use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream};
|
||||||
@ -30,8 +29,8 @@ use libp2p_core::{
|
|||||||
transport::{ListenerEvent, TransportError}
|
transport::{ListenerEvent, TransportError}
|
||||||
};
|
};
|
||||||
use log::{debug, trace};
|
use log::{debug, trace};
|
||||||
use soketto::{connection, data, extension::deflate::Deflate, handshake};
|
use soketto::{connection, extension::deflate::Deflate, handshake};
|
||||||
use std::{convert::TryInto, fmt, io, pin::Pin, task::Context, task::Poll};
|
use std::{convert::TryInto, fmt, io, mem, pin::Pin, task::Context, task::Poll};
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
/// Max. number of payload bytes of a single frame.
|
/// Max. number of payload bytes of a single frame.
|
||||||
@ -406,36 +405,55 @@ fn location_to_multiaddr<T>(location: &str) -> Result<Multiaddr, Error<T>> {
|
|||||||
|
|
||||||
/// The websocket connection.
|
/// The websocket connection.
|
||||||
pub struct Connection<T> {
|
pub struct Connection<T> {
|
||||||
receiver: BoxStream<'static, Result<data::Incoming, connection::Error>>,
|
receiver: BoxStream<'static, Result<IncomingData, connection::Error>>,
|
||||||
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
|
sender: Pin<Box<dyn Sink<OutgoingData, Error = connection::Error> + Send>>,
|
||||||
_marker: std::marker::PhantomData<T>
|
_marker: std::marker::PhantomData<T>
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Data received over the websocket connection.
|
/// Data received over the websocket connection.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct IncomingData(data::Incoming);
|
pub enum IncomingData {
|
||||||
|
/// Binary application data.
|
||||||
|
Binary(Vec<u8>),
|
||||||
|
/// UTF-8 encoded application data.
|
||||||
|
Text(Vec<u8>),
|
||||||
|
/// PONG control frame data.
|
||||||
|
Pong(Vec<u8>)
|
||||||
|
}
|
||||||
|
|
||||||
impl IncomingData {
|
impl IncomingData {
|
||||||
|
pub fn is_data(&self) -> bool {
|
||||||
|
self.is_binary() || self.is_text()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn is_binary(&self) -> bool {
|
pub fn is_binary(&self) -> bool {
|
||||||
self.0.is_binary()
|
if let IncomingData::Binary(_) = self { true } else { false }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_text(&self) -> bool {
|
pub fn is_text(&self) -> bool {
|
||||||
self.0.is_text()
|
if let IncomingData::Text(_) = self { true } else { false }
|
||||||
}
|
|
||||||
|
|
||||||
pub fn is_data(&self) -> bool {
|
|
||||||
self.0.is_data()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_pong(&self) -> bool {
|
pub fn is_pong(&self) -> bool {
|
||||||
self.0.is_pong()
|
if let IncomingData::Pong(_) = self { true } else { false }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_bytes(self) -> Vec<u8> {
|
||||||
|
match self {
|
||||||
|
IncomingData::Binary(d) => d,
|
||||||
|
IncomingData::Text(d) => d,
|
||||||
|
IncomingData::Pong(d) => d
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AsRef<[u8]> for IncomingData {
|
impl AsRef<[u8]> for IncomingData {
|
||||||
fn as_ref(&self) -> &[u8] {
|
fn as_ref(&self) -> &[u8] {
|
||||||
self.0.as_ref()
|
match self {
|
||||||
|
IncomingData::Binary(d) => d,
|
||||||
|
IncomingData::Text(d) => d,
|
||||||
|
IncomingData::Pong(d) => d
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -443,12 +461,12 @@ impl AsRef<[u8]> for IncomingData {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum OutgoingData {
|
pub enum OutgoingData {
|
||||||
/// Send some bytes.
|
/// Send some bytes.
|
||||||
Binary(BytesMut),
|
Binary(Vec<u8>),
|
||||||
/// Send a PING message.
|
/// Send a PING message.
|
||||||
Ping(BytesMut),
|
Ping(Vec<u8>),
|
||||||
/// Send an unsolicited PONG message.
|
/// Send an unsolicited PONG message.
|
||||||
/// (Incoming PINGs are answered automatically.)
|
/// (Incoming PINGs are answered automatically.)
|
||||||
Pong(BytesMut)
|
Pong(Vec<u8>)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> fmt::Debug for Connection<T> {
|
impl<T> fmt::Debug for Connection<T> {
|
||||||
@ -469,13 +487,13 @@ where
|
|||||||
sender.send_binary_mut(x).await?
|
sender.send_binary_mut(x).await?
|
||||||
}
|
}
|
||||||
quicksink::Action::Send(OutgoingData::Ping(x)) => {
|
quicksink::Action::Send(OutgoingData::Ping(x)) => {
|
||||||
let data = x.as_ref().try_into().map_err(|_| {
|
let data = x[..].try_into().map_err(|_| {
|
||||||
io::Error::new(io::ErrorKind::InvalidInput, "PING data must be < 126 bytes")
|
io::Error::new(io::ErrorKind::InvalidInput, "PING data must be < 126 bytes")
|
||||||
})?;
|
})?;
|
||||||
sender.send_ping(data).await?
|
sender.send_ping(data).await?
|
||||||
}
|
}
|
||||||
quicksink::Action::Send(OutgoingData::Pong(x)) => {
|
quicksink::Action::Send(OutgoingData::Pong(x)) => {
|
||||||
let data = x.as_ref().try_into().map_err(|_| {
|
let data = x[..].try_into().map_err(|_| {
|
||||||
io::Error::new(io::ErrorKind::InvalidInput, "PONG data must be < 126 bytes")
|
io::Error::new(io::ErrorKind::InvalidInput, "PONG data must be < 126 bytes")
|
||||||
})?;
|
})?;
|
||||||
sender.send_pong(data).await?
|
sender.send_pong(data).await?
|
||||||
@ -485,26 +503,41 @@ where
|
|||||||
}
|
}
|
||||||
Ok(sender)
|
Ok(sender)
|
||||||
});
|
});
|
||||||
|
let stream = stream::unfold((Vec::new(), receiver), |(mut data, mut receiver)| async {
|
||||||
|
match receiver.receive(&mut data).await {
|
||||||
|
Ok(soketto::Incoming::Data(soketto::Data::Text(_))) => {
|
||||||
|
Some((Ok(IncomingData::Text(mem::take(&mut data))), (data, receiver)))
|
||||||
|
}
|
||||||
|
Ok(soketto::Incoming::Data(soketto::Data::Binary(_))) => {
|
||||||
|
Some((Ok(IncomingData::Binary(mem::take(&mut data))), (data, receiver)))
|
||||||
|
}
|
||||||
|
Ok(soketto::Incoming::Pong(pong)) => {
|
||||||
|
Some((Ok(IncomingData::Pong(Vec::from(pong))), (data, receiver)))
|
||||||
|
}
|
||||||
|
Err(connection::Error::Closed) => None,
|
||||||
|
Err(e) => Some((Err(e), (data, receiver)))
|
||||||
|
}
|
||||||
|
});
|
||||||
Connection {
|
Connection {
|
||||||
receiver: connection::into_stream(receiver).boxed(),
|
receiver: stream.boxed(),
|
||||||
sender: Box::pin(sink),
|
sender: Box::pin(sink),
|
||||||
_marker: std::marker::PhantomData
|
_marker: std::marker::PhantomData
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send binary application data to the remote.
|
/// Send binary application data to the remote.
|
||||||
pub fn send_data(&mut self, data: impl Into<BytesMut>) -> sink::Send<'_, Self, OutgoingData> {
|
pub fn send_data(&mut self, data: Vec<u8>) -> sink::Send<'_, Self, OutgoingData> {
|
||||||
self.send(OutgoingData::Binary(data.into()))
|
self.send(OutgoingData::Binary(data))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send a PING to the remote.
|
/// Send a PING to the remote.
|
||||||
pub fn send_ping(&mut self, data: impl Into<BytesMut>) -> sink::Send<'_, Self, OutgoingData> {
|
pub fn send_ping(&mut self, data: Vec<u8>) -> sink::Send<'_, Self, OutgoingData> {
|
||||||
self.send(OutgoingData::Ping(data.into()))
|
self.send(OutgoingData::Ping(data))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send an unsolicited PONG to the remote.
|
/// Send an unsolicited PONG to the remote.
|
||||||
pub fn send_pong(&mut self, data: impl Into<BytesMut>) -> sink::Send<'_, Self, OutgoingData> {
|
pub fn send_pong(&mut self, data: Vec<u8>) -> sink::Send<'_, Self, OutgoingData> {
|
||||||
self.send(OutgoingData::Pong(data.into()))
|
self.send(OutgoingData::Pong(data))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -517,7 +550,7 @@ where
|
|||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
let item = ready!(self.receiver.poll_next_unpin(cx));
|
let item = ready!(self.receiver.poll_next_unpin(cx));
|
||||||
let item = item.map(|result| {
|
let item = item.map(|result| {
|
||||||
result.map(IncomingData).map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
result.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
|
||||||
});
|
});
|
||||||
Poll::Ready(item)
|
Poll::Ready(item)
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,6 @@ pub mod error;
|
|||||||
pub mod framed;
|
pub mod framed;
|
||||||
pub mod tls;
|
pub mod tls;
|
||||||
|
|
||||||
use bytes::BytesMut;
|
|
||||||
use error::Error;
|
use error::Error;
|
||||||
use framed::Connection;
|
use framed::Connection;
|
||||||
use futures::{future::BoxFuture, prelude::*, stream::BoxStream, ready};
|
use futures::{future::BoxFuture, prelude::*, stream::BoxStream, ready};
|
||||||
@ -142,13 +141,13 @@ impl<T> Stream for BytesConnection<T>
|
|||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + Send + Unpin + 'static
|
T: AsyncRead + AsyncWrite + Send + Unpin + 'static
|
||||||
{
|
{
|
||||||
type Item = io::Result<BytesMut>;
|
type Item = io::Result<Vec<u8>>;
|
||||||
|
|
||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
loop {
|
loop {
|
||||||
if let Some(item) = ready!(self.0.try_poll_next_unpin(cx)?) {
|
if let Some(item) = ready!(self.0.try_poll_next_unpin(cx)?) {
|
||||||
if item.is_data() {
|
if item.is_data() {
|
||||||
return Poll::Ready(Some(Ok(BytesMut::from(item.as_ref()))))
|
return Poll::Ready(Some(Ok(item.into_bytes())))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Poll::Ready(None)
|
return Poll::Ready(None)
|
||||||
@ -157,7 +156,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> Sink<BytesMut> for BytesConnection<T>
|
impl<T> Sink<Vec<u8>> for BytesConnection<T>
|
||||||
where
|
where
|
||||||
T: AsyncRead + AsyncWrite + Send + Unpin + 'static
|
T: AsyncRead + AsyncWrite + Send + Unpin + 'static
|
||||||
{
|
{
|
||||||
@ -167,7 +166,7 @@ where
|
|||||||
Pin::new(&mut self.0).poll_ready(cx)
|
Pin::new(&mut self.0).poll_ready(cx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_send(mut self: Pin<&mut Self>, item: BytesMut) -> io::Result<()> {
|
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> io::Result<()> {
|
||||||
Pin::new(&mut self.0).start_send(framed::OutgoingData::Binary(item))
|
Pin::new(&mut self.0).start_send(framed::OutgoingData::Binary(item))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user