diff --git a/core/src/either.rs b/core/src/either.rs index a3c24ade..bbb68123 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -18,7 +18,12 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{muxing::StreamMuxer, ProtocolName, transport::ListenerEvent}; +use crate::{ + muxing::StreamMuxer, + ProtocolName, + transport::{Transport, ListenerEvent, TransportError}, + Multiaddr +}; use futures::{prelude::*, io::{IoSlice, IoSliceMut}}; use pin_project::{pin_project, project}; use std::{fmt, io::{Error as IoError}, pin::Pin, task::Context, task::Poll}; @@ -442,3 +447,53 @@ impl ProtocolName for EitherName { } } } + +#[derive(Debug, Copy, Clone)] +pub enum EitherTransport { + Left(A), + Right(B), +} + +impl Transport for EitherTransport +where + B: Transport, + A: Transport, +{ + type Output = EitherOutput; + type Error = EitherError; + type Listener = EitherListenStream; + type ListenerUpgrade = EitherFuture; + type Dial = EitherFuture; + + fn listen_on(self, addr: Multiaddr) -> Result> { + use TransportError::*; + match self { + EitherTransport::Left(a) => match a.listen_on(addr) { + Ok(listener) => Ok(EitherListenStream::First(listener)), + Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), + Err(Other(err)) => Err(Other(EitherError::A(err))), + }, + EitherTransport::Right(b) => match b.listen_on(addr) { + Ok(listener) => Ok(EitherListenStream::Second(listener)), + Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), + Err(Other(err)) => Err(Other(EitherError::B(err))), + }, + } + } + + fn dial(self, addr: Multiaddr) -> Result> { + use TransportError::*; + match self { + EitherTransport::Left(a) => match a.dial(addr) { + Ok(connec) => Ok(EitherFuture::First(connec)), + Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), + Err(Other(err)) => Err(Other(EitherError::A(err))), + }, + EitherTransport::Right(b) => match b.dial(addr) { + Ok(connec) => Ok(EitherFuture::Second(connec)), + Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), + Err(Other(err)) => Err(Other(EitherError::B(err))), + }, + } + } +} diff --git a/examples/ipfs-private.rs b/examples/ipfs-private.rs new file mode 100644 index 00000000..9d347685 --- /dev/null +++ b/examples/ipfs-private.rs @@ -0,0 +1,308 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! A minimal node that can interact with ipfs +//! +//! This node implements the gossipsub, ping and identify protocols. It supports +//! the ipfs private swarms feature by reading the pre shared key file `swarm.key` +//! from the IPFS_PATH environment variable or from the default location. +//! +//! You can pass any number of nodes to be dialed. +//! +//! On startup, this example will show a list of addresses that you can dial +//! from a go-ipfs or js-ipfs node. +//! +//! You can ping this node, or use pubsub (gossipsub) on the topic "chat". For this +//! to work, the ipfs node needs to be configured to use gossipsub. +use async_std::{io, task}; +use futures::{future, prelude::*}; +use libp2p::{ + core::{either::EitherTransport, transport::upgrade::Version, StreamMuxer}, + gossipsub::{self, Gossipsub, GossipsubConfigBuilder, GossipsubEvent}, + identify::{Identify, IdentifyEvent}, + identity, + multiaddr::Protocol, + ping::{self, Ping, PingConfig, PingEvent}, + pnet::{PnetConfig, PreSharedKey}, + secio::SecioConfig, + swarm::NetworkBehaviourEventProcess, + tcp::TcpConfig, + yamux::Config as YamuxConfig, + Multiaddr, NetworkBehaviour, PeerId, Swarm, Transport, +}; +use std::{ + env, + error::Error, + fs, + path::Path, + str::FromStr, + task::{Context, Poll}, + time::Duration, +}; + +/// Builds the transport that serves as a common ground for all connections. +pub fn build_transport( + key_pair: identity::Keypair, + psk: Option, +) -> impl Transport< + Output = ( + PeerId, + impl StreamMuxer< + OutboundSubstream = impl Send, + Substream = impl Send, + Error = impl Into, + > + Send + + Sync, + ), + Error = impl Error + Send, + Listener = impl Send, + Dial = impl Send, + ListenerUpgrade = impl Send, +> + Clone { + let secio_config = SecioConfig::new(key_pair); + let yamux_config = YamuxConfig::default(); + + let base_transport = TcpConfig::new().nodelay(true); + let maybe_encrypted = match psk { + Some(psk) => EitherTransport::Left( + base_transport.and_then(move |socket, _| PnetConfig::new(psk).handshake(socket)), + ), + None => EitherTransport::Right(base_transport), + }; + maybe_encrypted + .upgrade(Version::V1) + .authenticate(secio_config) + .multiplex(yamux_config) + .timeout(Duration::from_secs(20)) +} + +/// Get the current ipfs repo path, either from the IPFS_PATH environment variable or +/// from the default $HOME/.ipfs +fn get_ipfs_path() -> Box { + env::var("IPFS_PATH") + .map(|ipfs_path| Path::new(&ipfs_path).into()) + .unwrap_or_else(|_| { + env::var("HOME") + .map(|home| Path::new(&home).join(".ipfs")) + .expect("could not determine home directory") + .into() + }) +} + +/// Read the pre shared key file from the given ipfs directory +fn get_psk(path: Box) -> std::io::Result> { + let swarm_key_file = path.join("swarm.key"); + match fs::read_to_string(swarm_key_file) { + Ok(text) => Ok(Some(text)), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e), + } +} + +/// for a multiaddr that ends with a peer id, this strips this suffix. Rust-libp2p +/// only supports dialing to an address without providing the peer id. +fn strip_peer_id(addr: &mut Multiaddr) { + let last = addr.pop(); + match last { + Some(Protocol::P2p(peer_id)) => { + let mut addr = Multiaddr::empty(); + addr.push(Protocol::P2p(peer_id)); + println!( + "removing peer id {} so this address can be dialed by rust-libp2p", + addr + ); + } + Some(other) => addr.push(other), + _ => {} + } +} + +/// parse a legacy multiaddr (replace ipfs with p2p), and strip the peer id +/// so it can be dialed by rust-libp2p +fn parse_legacy_multiaddr(text: &str) -> Result> { + let sanitized = text + .split('/') + .map(|part| if part == "ipfs" { "p2p" } else { part }) + .collect::>() + .join("/"); + let mut res = Multiaddr::from_str(&sanitized)?; + strip_peer_id(&mut res); + Ok(res) +} + +fn main() -> Result<(), Box> { + env_logger::init(); + + let ipfs_path: Box = get_ipfs_path(); + println!("using IPFS_PATH {:?}", ipfs_path); + let psk: Option = get_psk(ipfs_path)? + .map(|text| PreSharedKey::from_str(&text)) + .transpose()?; + + // Create a random PeerId + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + println!("using random peer id: {:?}", local_peer_id); + for psk in psk { + println!("using swarm key with fingerprint: {}", psk.fingerprint()); + } + + // Set up a an encrypted DNS-enabled TCP Transport over and Yamux protocol + let transport = build_transport(local_key.clone(), psk); + + // Create a Gosspipsub topic + let gossipsub_topic = gossipsub::Topic::new("chat".into()); + + // We create a custom network behaviour that combines gossipsub, ping and identify. + #[derive(NetworkBehaviour)] + struct MyBehaviour { + gossipsub: Gossipsub, + identify: Identify, + ping: Ping, + } + + impl NetworkBehaviourEventProcess + for MyBehaviour + { + // Called when `identify` produces an event. + fn inject_event(&mut self, event: IdentifyEvent) { + println!("identify: {:?}", event); + } + } + + impl NetworkBehaviourEventProcess + for MyBehaviour + { + // Called when `gossipsub` produces an event. + fn inject_event(&mut self, event: GossipsubEvent) { + match event { + GossipsubEvent::Message(peer_id, id, message) => println!( + "Got message: {} with id: {} from peer: {:?}", + String::from_utf8_lossy(&message.data), + id, + peer_id + ), + _ => {} + } + } + } + + impl NetworkBehaviourEventProcess + for MyBehaviour + { + // Called when `ping` produces an event. + fn inject_event(&mut self, event: PingEvent) { + use ping::handler::{PingFailure, PingSuccess}; + match event { + PingEvent { + peer, + result: Result::Ok(PingSuccess::Ping { rtt }), + } => { + println!( + "ping: rtt to {} is {} ms", + peer.to_base58(), + rtt.as_millis() + ); + } + PingEvent { + peer, + result: Result::Ok(PingSuccess::Pong), + } => { + println!("ping: pong from {}", peer.to_base58()); + } + PingEvent { + peer, + result: Result::Err(PingFailure::Timeout), + } => { + println!("ping: timeout to {}", peer.to_base58()); + } + PingEvent { + peer, + result: Result::Err(PingFailure::Other { error }), + } => { + println!("ping: failure with {}: {}", peer.to_base58(), error); + } + } + } + } + + // Create a Swarm to manage peers and events + let mut swarm = { + let gossipsub_config = GossipsubConfigBuilder::default() + .max_transmit_size(262144) + .build(); + let mut behaviour = MyBehaviour { + gossipsub: Gossipsub::new(local_peer_id.clone(), gossipsub_config), + identify: Identify::new( + "/ipfs/0.1.0".into(), + "rust-ipfs-example".into(), + local_key.public(), + ), + ping: Ping::new(PingConfig::new()), + }; + + println!("Subscribing to {:?}", gossipsub_topic); + behaviour.gossipsub.subscribe(gossipsub_topic.clone()); + Swarm::new(transport, behaviour, local_peer_id.clone()) + }; + + // Reach out to other nodes if specified + for to_dial in std::env::args().skip(1) { + let addr: Multiaddr = parse_legacy_multiaddr(&to_dial)?; + Swarm::dial_addr(&mut swarm, addr)?; + println!("Dialed {:?}", to_dial) + } + + // Read full lines from stdin + let mut stdin = io::BufReader::new(io::stdin()).lines(); + + // Listen on all interfaces and whatever port the OS assigns + Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse()?)?; + + // Kick it off + let mut listening = false; + task::block_on(future::poll_fn(move |cx: &mut Context| { + loop { + match stdin.try_poll_next_unpin(cx)? { + Poll::Ready(Some(line)) => { + swarm.gossipsub.publish(&gossipsub_topic, line.as_bytes()); + } + Poll::Ready(None) => panic!("Stdin closed"), + Poll::Pending => break, + } + } + loop { + match swarm.poll_next_unpin(cx) { + Poll::Ready(Some(event)) => println!("{:?}", event), + Poll::Ready(None) => return Poll::Ready(Ok(())), + Poll::Pending => { + if !listening { + for addr in Swarm::listeners(&swarm) { + println!("Address {}/ipfs/{}", addr, local_peer_id); + listening = true; + } + } + break; + } + } + } + Poll::Pending + })) +} diff --git a/protocols/pnet/src/lib.rs b/protocols/pnet/src/lib.rs index 2b61b296..ab9b0bc3 100644 --- a/protocols/pnet/src/lib.rs +++ b/protocols/pnet/src/lib.rs @@ -161,6 +161,21 @@ pub enum KeyParseError { InvalidKeyChar(ParseIntError), } +impl fmt::Display for KeyParseError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl error::Error for KeyParseError { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + match *self { + KeyParseError::InvalidKeyChar(ref err) => Some(err), + _ => None, + } + } +} + /// Private network configuration #[derive(Debug, Copy, Clone)] pub struct PnetConfig {