mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-16 11:31:22 +00:00
[identify] Implement /ipfs/id/push/1.0.0 alongside some refactoring. (#1999)
* Implement /ipfs/id/push/1.0.0 alongside some refactoring. * Implement /ipfs/id/push/1.0.0, i.e. the ability to actively push information of the local peer to specific remotes. * Make the initial delay as well as the recurring delay for the periodic identification requests configurable, introducing `IdentifyConfig`. * Fix test. * Fix example. * Update protocols/identify/src/identify.rs Co-authored-by: Max Inden <mail@max-inden.de> * Update protocols/identify/src/identify.rs Co-authored-by: Max Inden <mail@max-inden.de> * Update versions and changelogs. Co-authored-by: Max Inden <mail@max-inden.de>
This commit is contained in:
@ -30,16 +30,44 @@ use prost::Message;
|
||||
use std::convert::TryFrom;
|
||||
use std::{fmt, io, iter, pin::Pin};
|
||||
|
||||
/// Configuration for an upgrade to the `Identify` protocol.
|
||||
/// Substream upgrade protocol for `/ipfs/id/1.0.0`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IdentifyProtocolConfig;
|
||||
pub struct IdentifyProtocol;
|
||||
|
||||
/// Substream upgrade protocol for `/ipfs/id/push/1.0.0`.
|
||||
#[derive(Debug, Clone)]
|
||||
#[non_exhaustive]
|
||||
pub struct RemoteInfo {
|
||||
/// Information about the remote.
|
||||
pub info: IdentifyInfo,
|
||||
/// Address the remote sees for us.
|
||||
pub struct IdentifyPushProtocol<T>(T);
|
||||
pub struct InboundPush();
|
||||
pub struct OutboundPush(IdentifyInfo);
|
||||
|
||||
impl IdentifyPushProtocol<InboundPush> {
|
||||
pub fn inbound() -> Self {
|
||||
IdentifyPushProtocol(InboundPush())
|
||||
}
|
||||
}
|
||||
|
||||
impl IdentifyPushProtocol<OutboundPush> {
|
||||
pub fn outbound(info: IdentifyInfo) -> Self {
|
||||
IdentifyPushProtocol(OutboundPush(info))
|
||||
}
|
||||
}
|
||||
|
||||
/// Information of a peer sent in protocol messages.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IdentifyInfo {
|
||||
/// The public key of the local peer.
|
||||
pub public_key: PublicKey,
|
||||
/// Application-specific version of the protocol family used by the peer,
|
||||
/// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
|
||||
pub protocol_version: String,
|
||||
/// Name and version of the peer, similar to the `User-Agent` header in
|
||||
/// the HTTP protocol.
|
||||
pub agent_version: String,
|
||||
/// The addresses that the peer is listening on.
|
||||
pub listen_addrs: Vec<Multiaddr>,
|
||||
/// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
|
||||
pub protocols: Vec<String>,
|
||||
/// Address observed by or for the remote.
|
||||
pub observed_addr: Multiaddr,
|
||||
}
|
||||
|
||||
@ -60,56 +88,14 @@ where
|
||||
{
|
||||
/// Sends back the requested information on the substream.
|
||||
///
|
||||
/// Consumes the substream, returning a `ReplyFuture` that resolves
|
||||
/// Consumes the substream, returning a future that resolves
|
||||
/// when the reply has been sent on the underlying connection.
|
||||
pub fn send(mut self, info: IdentifyInfo, observed_addr: &Multiaddr)
|
||||
-> impl Future<Output = Result<(), io::Error>>
|
||||
{
|
||||
debug!("Sending identify info to client");
|
||||
trace!("Sending: {:?}", info);
|
||||
|
||||
let listen_addrs = info.listen_addrs
|
||||
.into_iter()
|
||||
.map(|addr| addr.to_vec())
|
||||
.collect();
|
||||
|
||||
let pubkey_bytes = info.public_key.into_protobuf_encoding();
|
||||
|
||||
let message = structs_proto::Identify {
|
||||
agent_version: Some(info.agent_version),
|
||||
protocol_version: Some(info.protocol_version),
|
||||
public_key: Some(pubkey_bytes),
|
||||
listen_addrs,
|
||||
observed_addr: Some(observed_addr.to_vec()),
|
||||
protocols: info.protocols
|
||||
};
|
||||
|
||||
async move {
|
||||
let mut bytes = Vec::with_capacity(message.encoded_len());
|
||||
message.encode(&mut bytes).expect("Vec<u8> provides capacity as needed");
|
||||
upgrade::write_one(&mut self.inner, &bytes).await
|
||||
}
|
||||
pub async fn send(self, info: IdentifyInfo) -> io::Result<()> {
|
||||
send(self.inner, info).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Information of a peer sent in `Identify` protocol responses.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct IdentifyInfo {
|
||||
/// The public key underlying the peer's `PeerId`.
|
||||
pub public_key: PublicKey,
|
||||
/// Version of the protocol family used by the peer, e.g. `ipfs/1.0.0`
|
||||
/// or `polkadot/1.0.0`.
|
||||
pub protocol_version: String,
|
||||
/// Name and version of the peer, similar to the `User-Agent` header in
|
||||
/// the HTTP protocol.
|
||||
pub agent_version: String,
|
||||
/// The addresses that the peer is listening on.
|
||||
pub listen_addrs: Vec<Multiaddr>,
|
||||
/// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
|
||||
pub protocols: Vec<String>,
|
||||
}
|
||||
|
||||
impl UpgradeInfo for IdentifyProtocolConfig {
|
||||
impl UpgradeInfo for IdentifyProtocol {
|
||||
type Info = &'static [u8];
|
||||
type InfoIter = iter::Once<Self::Info>;
|
||||
|
||||
@ -118,59 +104,119 @@ impl UpgradeInfo for IdentifyProtocolConfig {
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> InboundUpgrade<C> for IdentifyProtocolConfig
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
impl<C> InboundUpgrade<C> for IdentifyProtocol {
|
||||
type Output = ReplySubstream<C>;
|
||||
type Error = io::Error;
|
||||
type Future = future::Ready<Result<Self::Output, io::Error>>;
|
||||
|
||||
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
|
||||
trace!("Upgrading inbound connection");
|
||||
future::ok(ReplySubstream { inner: socket })
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> OutboundUpgrade<C> for IdentifyProtocolConfig
|
||||
impl<C> OutboundUpgrade<C> for IdentifyProtocol
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
type Output = RemoteInfo;
|
||||
type Error = upgrade::ReadOneError;
|
||||
type Output = IdentifyInfo;
|
||||
type Error = io::Error;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
||||
|
||||
fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future {
|
||||
Box::pin(async move {
|
||||
socket.close().await?;
|
||||
let msg = upgrade::read_one(&mut socket, 4096).await?;
|
||||
let (info, observed_addr) = match parse_proto_msg(msg) {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
debug!("Failed to parse protobuf message; error = {:?}", err);
|
||||
return Err(err.into())
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Remote observes us as {:?}", observed_addr);
|
||||
trace!("Information received: {:?}", info);
|
||||
|
||||
Ok(RemoteInfo {
|
||||
info,
|
||||
observed_addr,
|
||||
})
|
||||
})
|
||||
fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
|
||||
recv(socket).boxed()
|
||||
}
|
||||
}
|
||||
|
||||
// Turns a protobuf message into an `IdentifyInfo` and an observed address. If something bad
|
||||
// happens, turn it into an `io::Error`.
|
||||
fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), io::Error> {
|
||||
impl<T> UpgradeInfo for IdentifyPushProtocol<T> {
|
||||
type Info = &'static [u8];
|
||||
type InfoIter = iter::Once<Self::Info>;
|
||||
|
||||
fn protocol_info(&self) -> Self::InfoIter {
|
||||
iter::once(b"/ipfs/id/push/1.0.0")
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> InboundUpgrade<C> for IdentifyPushProtocol<InboundPush>
|
||||
where
|
||||
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
type Output = IdentifyInfo;
|
||||
type Error = io::Error;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
||||
|
||||
fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
|
||||
recv(socket).boxed()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> OutboundUpgrade<C> for IdentifyPushProtocol<OutboundPush>
|
||||
where
|
||||
C: AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
type Output = ();
|
||||
type Error = io::Error;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
|
||||
|
||||
fn upgrade_outbound(self, socket: C, _: Self::Info) -> Self::Future {
|
||||
send(socket, self.0.0).boxed()
|
||||
}
|
||||
}
|
||||
|
||||
async fn send<T>(mut io: T, info: IdentifyInfo) -> io::Result<()>
|
||||
where
|
||||
T: AsyncWrite + Unpin
|
||||
{
|
||||
trace!("Sending: {:?}", info);
|
||||
|
||||
let listen_addrs = info.listen_addrs
|
||||
.into_iter()
|
||||
.map(|addr| addr.to_vec())
|
||||
.collect();
|
||||
|
||||
let pubkey_bytes = info.public_key.into_protobuf_encoding();
|
||||
|
||||
let message = structs_proto::Identify {
|
||||
agent_version: Some(info.agent_version),
|
||||
protocol_version: Some(info.protocol_version),
|
||||
public_key: Some(pubkey_bytes),
|
||||
listen_addrs,
|
||||
observed_addr: Some(info.observed_addr.to_vec()),
|
||||
protocols: info.protocols
|
||||
};
|
||||
|
||||
let mut bytes = Vec::with_capacity(message.encoded_len());
|
||||
message.encode(&mut bytes).expect("Vec<u8> provides capacity as needed");
|
||||
upgrade::write_one(&mut io, &bytes).await
|
||||
}
|
||||
|
||||
async fn recv<T>(mut socket: T) -> io::Result<IdentifyInfo>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin
|
||||
{
|
||||
socket.close().await?;
|
||||
|
||||
let msg = upgrade::read_one(&mut socket, 4096)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
|
||||
.await?;
|
||||
|
||||
let info = match parse_proto_msg(msg) {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
debug!("Invalid message: {:?}", err);
|
||||
return Err(err.into())
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Received: {:?}", info);
|
||||
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
/// Turns a protobuf message into an `IdentifyInfo`.
|
||||
fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<IdentifyInfo, io::Error> {
|
||||
match structs_proto::Identify::decode(msg.as_ref()) {
|
||||
Ok(msg) => {
|
||||
// Turn a `Vec<u8>` into a `Multiaddr`. If something bad happens, turn it into
|
||||
// an `io::Error`.
|
||||
fn bytes_to_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, io::Error> {
|
||||
fn parse_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, io::Error> {
|
||||
Multiaddr::try_from(bytes)
|
||||
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
|
||||
}
|
||||
@ -178,7 +224,7 @@ fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), i
|
||||
let listen_addrs = {
|
||||
let mut addrs = Vec::new();
|
||||
for addr in msg.listen_addrs.into_iter() {
|
||||
addrs.push(bytes_to_multiaddr(addr)?);
|
||||
addrs.push(parse_multiaddr(addr)?);
|
||||
}
|
||||
addrs
|
||||
};
|
||||
@ -186,16 +232,17 @@ fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), i
|
||||
let public_key = PublicKey::from_protobuf_encoding(&msg.public_key.unwrap_or_default())
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
|
||||
|
||||
let observed_addr = bytes_to_multiaddr(msg.observed_addr.unwrap_or_default())?;
|
||||
let observed_addr = parse_multiaddr(msg.observed_addr.unwrap_or_default())?;
|
||||
let info = IdentifyInfo {
|
||||
public_key,
|
||||
protocol_version: msg.protocol_version.unwrap_or_default(),
|
||||
agent_version: msg.agent_version.unwrap_or_default(),
|
||||
listen_addrs,
|
||||
protocols: msg.protocols
|
||||
protocols: msg.protocols,
|
||||
observed_addr,
|
||||
};
|
||||
|
||||
Ok((info, observed_addr))
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
Err(err) => Err(io::Error::new(io::ErrorKind::InvalidData, err)),
|
||||
@ -204,7 +251,6 @@ fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), i
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::protocol::{IdentifyInfo, RemoteInfo, IdentifyProtocolConfig};
|
||||
use libp2p_tcp::TcpConfig;
|
||||
use futures::{prelude::*, channel::oneshot};
|
||||
use libp2p_core::{
|
||||
@ -212,6 +258,7 @@ mod tests {
|
||||
Transport,
|
||||
upgrade::{self, apply_outbound, apply_inbound}
|
||||
};
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn correct_transfer() {
|
||||
@ -236,8 +283,13 @@ mod tests {
|
||||
.expect("listen address");
|
||||
tx.send(addr).unwrap();
|
||||
|
||||
let socket = listener.next().await.unwrap().unwrap().into_upgrade().unwrap().0.await.unwrap();
|
||||
let sender = apply_inbound(socket, IdentifyProtocolConfig).await.unwrap();
|
||||
let socket = listener
|
||||
.next().await.unwrap().unwrap()
|
||||
.into_upgrade().unwrap()
|
||||
.0.await.unwrap();
|
||||
|
||||
let sender = apply_inbound(socket, IdentifyProtocol).await.unwrap();
|
||||
|
||||
sender.send(
|
||||
IdentifyInfo {
|
||||
public_key: send_pubkey,
|
||||
@ -248,8 +300,8 @@ mod tests {
|
||||
"/ip6/::1/udp/1000".parse().unwrap(),
|
||||
],
|
||||
protocols: vec!["proto1".to_string(), "proto2".to_string()],
|
||||
observed_addr: "/ip4/100.101.102.103/tcp/5000".parse().unwrap(),
|
||||
},
|
||||
&"/ip4/100.101.102.103/tcp/5000".parse().unwrap(),
|
||||
).await.unwrap();
|
||||
});
|
||||
|
||||
@ -257,9 +309,12 @@ mod tests {
|
||||
let transport = TcpConfig::new();
|
||||
|
||||
let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
|
||||
let RemoteInfo { info, observed_addr, .. } =
|
||||
apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1).await.unwrap();
|
||||
assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap());
|
||||
let info = apply_outbound(
|
||||
socket,
|
||||
IdentifyProtocol,
|
||||
upgrade::Version::V1
|
||||
).await.unwrap();
|
||||
assert_eq!(info.observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap());
|
||||
assert_eq!(info.public_key, recv_pubkey);
|
||||
assert_eq!(info.protocol_version, "proto_version");
|
||||
assert_eq!(info.agent_version, "agent_version");
|
||||
|
Reference in New Issue
Block a user