feat: add rust-libp2p-server to monorepo

Moves https://github.com/mxinden/rust-libp2p-server to the rust-libp2p monorepository.

> # Rust libp2p Server
>
> A rust-libp2p based server implementation running:
>
> - the [Kademlia protocol](https://github.com/libp2p/specs/tree/master/kad-dht)
>
> - the [Circuit Relay v2 protocol](https://github.com/libp2p/specs/blob/master/relay/circuit-v2.md)
>
> - the [AutoNAT protocol](https://github.com/libp2p/specs/blob/master/autonat/README.md)

Pull-Request: #4311.
This commit is contained in:
Max Inden
2023-08-21 20:49:42 +02:00
committed by GitHub
parent 72472d075f
commit e974efb755
12 changed files with 649 additions and 0 deletions

View File

@ -0,0 +1,78 @@
use libp2p::autonat;
use libp2p::identify;
use libp2p::kad::{record::store::MemoryStore, Kademlia, KademliaConfig};
use libp2p::ping;
use libp2p::relay;
use libp2p::swarm::behaviour::toggle::Toggle;
use libp2p::{identity, swarm::NetworkBehaviour, Multiaddr, PeerId};
use std::str::FromStr;
use std::time::Duration;
const BOOTNODES: [&str; 4] = [
"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
];
#[derive(NetworkBehaviour)]
pub(crate) struct Behaviour {
relay: relay::Behaviour,
ping: ping::Behaviour,
identify: identify::Behaviour,
pub(crate) kademlia: Toggle<Kademlia<MemoryStore>>,
autonat: Toggle<autonat::Behaviour>,
}
impl Behaviour {
pub(crate) fn new(
pub_key: identity::PublicKey,
enable_kademlia: bool,
enable_autonat: bool,
) -> Self {
let kademlia = if enable_kademlia {
let mut kademlia_config = KademliaConfig::default();
// Instantly remove records and provider records.
//
// TODO: Replace hack with option to disable both.
kademlia_config.set_record_ttl(Some(Duration::from_secs(0)));
kademlia_config.set_provider_record_ttl(Some(Duration::from_secs(0)));
let mut kademlia = Kademlia::with_config(
pub_key.to_peer_id(),
MemoryStore::new(pub_key.to_peer_id()),
kademlia_config,
);
let bootaddr = Multiaddr::from_str("/dnsaddr/bootstrap.libp2p.io").unwrap();
for peer in &BOOTNODES {
kademlia.add_address(&PeerId::from_str(peer).unwrap(), bootaddr.clone());
}
kademlia.bootstrap().unwrap();
Some(kademlia)
} else {
None
}
.into();
let autonat = if enable_autonat {
Some(autonat::Behaviour::new(
PeerId::from(pub_key.clone()),
Default::default(),
))
} else {
None
}
.into();
Self {
relay: relay::Behaviour::new(PeerId::from(pub_key.clone()), Default::default()),
ping: ping::Behaviour::new(ping::Config::new()),
identify: identify::Behaviour::new(
identify::Config::new("ipfs/0.1.0".to_string(), pub_key).with_agent_version(
format!("rust-libp2p-server/{}", env!("CARGO_PKG_VERSION")),
),
),
kademlia,
autonat,
}
}
}

39
misc/server/src/config.rs Normal file
View File

@ -0,0 +1,39 @@
use libp2p::Multiaddr;
use serde_derive::Deserialize;
use std::error::Error;
use std::path::Path;
#[derive(Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct Config {
pub(crate) identity: Identity,
pub(crate) addresses: Addresses,
}
impl Config {
pub(crate) fn from_file(path: &Path) -> Result<Self, Box<dyn Error>> {
Ok(serde_json::from_str(&std::fs::read_to_string(path)?)?)
}
}
#[derive(Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct Identity {
#[serde(rename = "PeerID")]
pub(crate) peer_id: String,
pub(crate) priv_key: String,
}
#[derive(Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) struct Addresses {
pub(crate) swarm: Vec<Multiaddr>,
pub(crate) append_announce: Vec<Multiaddr>,
}
impl zeroize::Zeroize for Config {
fn zeroize(&mut self) {
self.identity.peer_id.zeroize();
self.identity.priv_key.zeroize();
}
}

View File

@ -0,0 +1,131 @@
// Copyright 2022 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
use hyper::http::StatusCode;
use hyper::service::Service;
use hyper::{Body, Method, Request, Response, Server};
use log::{error, info};
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";
pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> {
// Serve on localhost.
let addr = ([127, 0, 0, 1], 8080).into();
// Use the tokio runtime to run the hyper server.
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let server = Server::bind(&addr).serve(MakeMetricService::new(registry));
info!("Metrics server on http://{}/metrics", server.local_addr());
if let Err(e) = server.await {
error!("server error: {}", e);
}
Ok(())
})
}
pub(crate) struct MetricService {
reg: Arc<Mutex<Registry>>,
}
type SharedRegistry = Arc<Mutex<Registry>>;
impl MetricService {
fn get_reg(&mut self) -> SharedRegistry {
Arc::clone(&self.reg)
}
fn respond_with_metrics(&mut self) -> Response<String> {
let mut response: Response<String> = Response::default();
response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE.try_into().unwrap(),
);
let reg = self.get_reg();
encode(&mut response.body_mut(), &reg.lock().unwrap()).unwrap();
*response.status_mut() = StatusCode::OK;
response
}
fn respond_with_404_not_found(&mut self) -> Response<String> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body("Not found try localhost:[port]/metrics".to_string())
.unwrap()
}
}
impl Service<Request<Body>> for MetricService {
type Response = Response<String>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
let req_path = req.uri().path();
let req_method = req.method();
let resp = if (req_method == Method::GET) && (req_path == "/metrics") {
// Encode and serve metrics from registry.
self.respond_with_metrics()
} else {
self.respond_with_404_not_found()
};
Box::pin(async { Ok(resp) })
}
}
pub(crate) struct MakeMetricService {
reg: SharedRegistry,
}
impl MakeMetricService {
pub(crate) fn new(registry: Registry) -> MakeMetricService {
MakeMetricService {
reg: Arc::new(Mutex::new(registry)),
}
}
}
impl<T> Service<T> for MakeMetricService {
type Response = MetricService;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _: T) -> Self::Future {
let reg = self.reg.clone();
let fut = async move { Ok(MetricService { reg }) };
Box::pin(fut)
}
}

213
misc/server/src/main.rs Normal file
View File

@ -0,0 +1,213 @@
use base64::Engine;
use clap::Parser;
use futures::executor::block_on;
use futures::future::Either;
use futures::stream::StreamExt;
use futures_timer::Delay;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::upgrade;
use libp2p::dns;
use libp2p::identify;
use libp2p::identity;
use libp2p::identity::PeerId;
use libp2p::kad;
use libp2p::metrics::{Metrics, Recorder};
use libp2p::noise;
use libp2p::quic;
use libp2p::swarm::{SwarmBuilder, SwarmEvent};
use libp2p::tcp;
use libp2p::yamux;
use libp2p::Transport;
use log::{debug, info};
use prometheus_client::metrics::info::Info;
use prometheus_client::registry::Registry;
use std::error::Error;
use std::io;
use std::path::PathBuf;
use std::str::FromStr;
use std::task::Poll;
use std::thread;
use std::time::Duration;
use zeroize::Zeroizing;
mod behaviour;
mod config;
mod http_service;
const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60);
#[derive(Debug, Parser)]
#[clap(name = "libp2p server", about = "A rust-libp2p server binary.")]
struct Opts {
/// Path to IPFS config file.
#[clap(long)]
config: PathBuf,
/// Metric endpoint path.
#[clap(long, default_value = "/metrics")]
metrics_path: String,
/// Whether to run the libp2p Kademlia protocol and join the IPFS DHT.
#[clap(long)]
enable_kademlia: bool,
/// Whether to run the libp2p Autonat protocol.
#[clap(long)]
enable_autonat: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
let opt = Opts::parse();
let config = Zeroizing::new(config::Config::from_file(opt.config.as_path())?);
let (local_peer_id, local_keypair) = {
let keypair = identity::Keypair::from_protobuf_encoding(&Zeroizing::new(
base64::engine::general_purpose::STANDARD
.decode(config.identity.priv_key.as_bytes())?,
))?;
let peer_id = keypair.public().into();
assert_eq!(
PeerId::from_str(&config.identity.peer_id)?,
peer_id,
"Expect peer id derived from private key and peer id retrieved from config to match."
);
(peer_id, keypair)
};
println!("Local peer id: {local_peer_id}");
let transport = {
let tcp_transport =
tcp::tokio::Transport::new(tcp::Config::new().port_reuse(true).nodelay(true))
.upgrade(upgrade::Version::V1)
.authenticate(noise::Config::new(&local_keypair)?)
.multiplex(yamux::Config::default())
.timeout(Duration::from_secs(20));
let quic_transport = {
let mut config = quic::Config::new(&local_keypair);
config.support_draft_29 = true;
quic::tokio::Transport::new(config)
};
dns::TokioDnsConfig::system(libp2p::core::transport::OrTransport::new(
quic_transport,
tcp_transport,
))?
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
.boxed()
};
let behaviour = behaviour::Behaviour::new(
local_keypair.public(),
opt.enable_kademlia,
opt.enable_autonat,
);
let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build();
if config.addresses.swarm.is_empty() {
log::warn!("No listen addresses configured.");
}
for address in &config.addresses.swarm {
match swarm.listen_on(address.clone()) {
Ok(_) => {}
Err(e @ libp2p::TransportError::MultiaddrNotSupported(_)) => {
log::warn!("Failed to listen on {address}, continuing anyways, {e}")
}
Err(e) => return Err(e.into()),
}
}
if config.addresses.append_announce.is_empty() {
log::warn!("No external addresses configured.");
}
for address in &config.addresses.append_announce {
swarm.add_external_address(address.clone())
}
log::info!(
"External addresses: {:?}",
swarm.external_addresses().collect::<Vec<_>>()
);
let mut metric_registry = Registry::default();
let metrics = Metrics::new(&mut metric_registry);
let build_info = Info::new(vec![("version".to_string(), env!("CARGO_PKG_VERSION"))]);
metric_registry.register(
"build",
"A metric with a constant '1' value labeled by version",
build_info,
);
thread::spawn(move || block_on(http_service::metrics_server(metric_registry)));
let mut bootstrap_timer = Delay::new(BOOTSTRAP_INTERVAL);
loop {
if let Poll::Ready(()) = futures::poll!(&mut bootstrap_timer) {
bootstrap_timer.reset(BOOTSTRAP_INTERVAL);
let _ = swarm
.behaviour_mut()
.kademlia
.as_mut()
.map(|k| k.bootstrap());
}
let event = swarm.next().await.expect("Swarm not to terminate.");
metrics.record(&event);
match event {
SwarmEvent::Behaviour(behaviour::BehaviourEvent::Identify(e)) => {
info!("{:?}", e);
metrics.record(&e);
if let identify::Event::Received {
peer_id,
info:
identify::Info {
listen_addrs,
protocols,
..
},
} = e
{
if protocols.iter().any(|p| *p == kad::PROTOCOL_NAME) {
for addr in listen_addrs {
swarm
.behaviour_mut()
.kademlia
.as_mut()
.map(|k| k.add_address(&peer_id, addr));
}
}
}
}
SwarmEvent::Behaviour(behaviour::BehaviourEvent::Ping(e)) => {
debug!("{:?}", e);
metrics.record(&e);
}
SwarmEvent::Behaviour(behaviour::BehaviourEvent::Kademlia(e)) => {
debug!("{:?}", e);
metrics.record(&e);
}
SwarmEvent::Behaviour(behaviour::BehaviourEvent::Relay(e)) => {
info!("{:?}", e);
metrics.record(&e)
}
SwarmEvent::Behaviour(behaviour::BehaviourEvent::Autonat(e)) => {
info!("{:?}", e);
// TODO: Add metric recording for `NatStatus`.
// metrics.record(&e)
}
SwarmEvent::NewListenAddr { address, .. } => {
println!("Listening on {address:?}");
}
_ => {}
}
}
}