mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-12 01:21:21 +00:00
[WIP] Integrate multiplex in the examples
This commit is contained in:
@ -6,6 +6,7 @@ authors = ["pierre <pierre.krieger1708@gmail.com>"]
|
||||
[dependencies]
|
||||
bytes = "0.4"
|
||||
futures = "0.1"
|
||||
multiplex = { path = "../multiplex-rs" }
|
||||
libp2p-secio = { path = "../libp2p-secio" }
|
||||
libp2p-swarm = { path = "../libp2p-swarm" }
|
||||
libp2p-tcp-transport = { path = "../libp2p-tcp-transport" }
|
||||
|
@ -20,6 +20,7 @@
|
||||
|
||||
extern crate bytes;
|
||||
extern crate futures;
|
||||
extern crate multiplex;
|
||||
extern crate libp2p_secio as secio;
|
||||
extern crate libp2p_swarm as swarm;
|
||||
extern crate libp2p_tcp_transport as tcp;
|
||||
@ -52,6 +53,10 @@ fn main() {
|
||||
}
|
||||
})
|
||||
|
||||
.with_upgrade(multiplex::MultiplexConfig);
|
||||
let transport: swarm::ConnectionReuse<_, _> = transport.into();
|
||||
|
||||
let transport = transport
|
||||
// On top of plaintext or secio, we use the "echo" protocol, which is a custom protocol
|
||||
// just for this example.
|
||||
// For this purpose, we create a `SimpleProtocol` struct.
|
||||
@ -68,17 +73,20 @@ fn main() {
|
||||
// of any opened stream.
|
||||
|
||||
// We use it to dial `/ip4/127.0.0.1/tcp/10333`.
|
||||
let dialer = transport.dial(swarm::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap())
|
||||
let dialer = transport.dial_and_listen(swarm::Multiaddr::new("/ip4/127.0.0.1/tcp/10333").unwrap())
|
||||
.unwrap_or_else(|_| panic!("unsupported multiaddr protocol ; should never happen"))
|
||||
.and_then(|echo| {
|
||||
.and_then(|(echo, incoming)| {
|
||||
// `echo` is what the closure used when initializing "echo" returns.
|
||||
// Consequently, please note that the `send` method is available only because the type
|
||||
// `length_delimited::Framed` has a `send` method.
|
||||
echo.send("hello world".into())
|
||||
echo.send("hello world".into()).map(Option::Some)
|
||||
.select(incoming.for_each(|_| { println!("opened"); Ok(()) }).map(|()| None))
|
||||
.map(|(n, _)| n)
|
||||
.map_err(|(e, _)| e)
|
||||
})
|
||||
.and_then(|echo| {
|
||||
// The message has been successfully sent. Now wait for an answer.
|
||||
echo.into_future()
|
||||
echo.unwrap().into_future()
|
||||
.map(|(msg, rest)| {
|
||||
println!("received: {:?}", msg);
|
||||
rest
|
||||
|
@ -181,4 +181,4 @@ pub use self::connection_reuse::ConnectionReuse;
|
||||
pub use self::multiaddr::Multiaddr;
|
||||
pub use self::muxing::StreamMuxer;
|
||||
pub use self::transport::{ConnectionUpgrade, PlainTextConfig, Transport, UpgradedNode, OrUpgrade};
|
||||
pub use self::transport::{Endpoint, SimpleProtocol};
|
||||
pub use self::transport::{Endpoint, SimpleProtocol, MuxedTransport};
|
||||
|
@ -786,7 +786,7 @@ impl<'a, T, C> UpgradedNode<T, C>
|
||||
negotiated.map(|(upgrade_id, conn)| (upgrade_id, conn, upgrade2))
|
||||
}
|
||||
.and_then(|(upgrade_id, connection, upgrade)| {
|
||||
upgrade.upgrade(connection, upgrade_id)
|
||||
upgrade.upgrade(connection, upgrade_id, Endpoint::Dialer)
|
||||
});
|
||||
|
||||
let in_stream = in_stream
|
||||
@ -803,7 +803,7 @@ impl<'a, T, C> UpgradedNode<T, C>
|
||||
.map_err(|err| IoError::new(IoErrorKind::Other, err))
|
||||
})
|
||||
.and_then(|(upgrade_id, connection, upgrade)| {
|
||||
upgrade.upgrade(connection, upgrade_id)
|
||||
upgrade.upgrade(connection, upgrade_id, Endpoint::Listener)
|
||||
});
|
||||
|
||||
dialer.map(|d| (d, Box::new(in_stream) as Box<Stream<Item = _, Error = _>>))
|
||||
|
@ -217,7 +217,7 @@ impl<T> OutboundFuture<T> {
|
||||
}
|
||||
|
||||
fn nonce_to_id(id: usize, end: Endpoint) -> u32 {
|
||||
id as u32 * 2 + if end == Endpoint::Dialer { 1 } else { 0 }
|
||||
id as u32 * 2 + if end == Endpoint::Dialer { 50 } else { 0 }
|
||||
}
|
||||
|
||||
impl<T: AsyncWrite> Future for OutboundFuture<T> {
|
||||
@ -330,6 +330,7 @@ impl<T: AsyncRead + AsyncWrite> StreamMuxer for Multiplex<T> {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct MultiplexConfig;
|
||||
|
||||
impl<C> ConnectionUpgrade<C> for MultiplexConfig
|
||||
|
@ -95,6 +95,7 @@ pub fn dialer_select_proto_serial<'a, R, I, P>(
|
||||
})
|
||||
// Once read, analyze the response.
|
||||
.and_then(|(message, rest, proto_name, proto_value)| {
|
||||
if message.is_none() { println!("empty"); }
|
||||
let message = message.ok_or(ProtocolChoiceError::UnexpectedMessage)?;
|
||||
|
||||
match message {
|
||||
@ -107,7 +108,7 @@ pub fn dialer_select_proto_serial<'a, R, I, P>(
|
||||
ListenerToDialerMessage::NotAvailable => {
|
||||
Ok(Loop::Continue(rest))
|
||||
},
|
||||
_ => Err(ProtocolChoiceError::UnexpectedMessage)
|
||||
_ => { println!("c {:?}", message); Err(ProtocolChoiceError::UnexpectedMessage) }
|
||||
}
|
||||
})
|
||||
})
|
||||
@ -140,7 +141,7 @@ pub fn dialer_select_proto_parallel<'a, R, I, M, P>(
|
||||
.and_then(move |(msg, dialer)| {
|
||||
let list = match msg {
|
||||
Some(ListenerToDialerMessage::ProtocolsListResponse { list }) => list,
|
||||
_ => return Err(ProtocolChoiceError::UnexpectedMessage),
|
||||
_ => { println!("b {:?}", msg); return Err(ProtocolChoiceError::UnexpectedMessage) },
|
||||
};
|
||||
|
||||
let mut found = None;
|
||||
@ -174,7 +175,7 @@ pub fn dialer_select_proto_parallel<'a, R, I, M, P>(
|
||||
Some(ListenerToDialerMessage::ProtocolAck { ref name }) if name == &proto_name => {
|
||||
Ok((proto_val, dialer.into_inner()))
|
||||
}
|
||||
_ => Err(ProtocolChoiceError::UnexpectedMessage),
|
||||
_ => { println!("a {:?}", msg); Err(ProtocolChoiceError::UnexpectedMessage) },
|
||||
});
|
||||
|
||||
// The "Rust doesn't have impl Trait yet" tax.
|
||||
|
Reference in New Issue
Block a user