rust-libp2p/examples/file-sharing.rs
Max Inden 220f84a97f
swarm/: Enable advanced dialing requests (#2317)
Enable advanced dialing requests both on `Swarm` and via
`NetworkBehaviourAction`. Users can now trigger a dial with a specific
set of addresses, optionally extended via
`NetworkBehaviour::addresses_of_peer`. In addition the whole process is
now modelled in a type safe way via the builder pattern.

Example of a `NetworkBehaviour` requesting a dial to a specific peer
with a set of addresses additionally extended through
`NetworkBehaviour::addresses_of_peer`:

```rust
NetworkBehaviourAction::Dial {
    opts: DialOpts::peer_id(peer_id)
              .condition(PeerCondition::Always)
              .addresses(addresses)
              .extend_addresses_through_behaviour()
              .build(),
    handler,
}
```

Example of a user requesting a dial to an unknown peer with a single
address via `Swarm`:

```rust
swarm1.dial(
    DialOpts::unknown_peer_id()
        .address(addr2.clone())
        .build()
)
```
2021-11-15 14:17:23 +01:00

729 lines
26 KiB
Rust

// Copyright 2021 Protocol Labs.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! # File sharing example
//!
//! Basic file sharing application with peers either providing or locating and
//! getting files by name.
//!
//! While obviously showcasing how to build a basic file sharing application,
//! the actual goal of this example is **to show how to integrate rust-libp2p
//! into a larger application**.
//!
//! ## Sample plot
//!
//! Assuming there are 3 nodes, A, B and C. A and B each provide a file while C
//! retrieves a file.
//!
//! Provider nodes A and B each provide a file, file FA and FB respectively.
//! They do so by advertising themselves as a provider for their file on a DHT
//! via [`libp2p-kad`]. The two, among other nodes of the network, are
//! interconnected via the DHT.
//!
//! Node C can locate the providers for file FA or FB on the DHT via
//! [`libp2p-kad`] without being connected to the specific node providing the
//! file, but any node of the DHT. Node C then connects to the corresponding
//! node and requests the file content of the file via
//! [`libp2p-request-response`].
//!
//! ## Architectural properties
//!
//! - Clean clonable async/await interface ([`Client`]) to interact with the
//! network layer.
//!
//! - Single task driving the network layer, no locks required.
//!
//! ## Usage
//!
//! A two node setup with one node providing the file and one node requesting the file.
//!
//! 1. Run command below in one terminal.
//!
//! ```
//! cargo run --example file-sharing -- \
//! --listen-address /ip4/127.0.0.1/tcp/40837 \
//! --secret-key-seed 1 \
//! provide \
//! --path <path-to-your-file> \
//! --name <name-for-others-to-find-your-file>
//! ```
//!
//! 2. Run command below in another terminal.
//!
//! ```
//! cargo run --example file-sharing -- \
//! --peer /ip4/127.0.0.1/tcp/40837/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X \
//! get \
//! --name <name-for-others-to-find-your-file>
//! ```
//!
//! Note: The client does not need to be directly connected to the providing
//! peer, as long as both are connected to some node on the same DHT.
use async_std::io;
use async_std::task::spawn;
use futures::prelude::*;
use libp2p::core::{Multiaddr, PeerId};
use libp2p::multiaddr::Protocol;
use std::error::Error;
use std::path::PathBuf;
use structopt::StructOpt;
#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
let opt = Opt::from_args();
let (mut network_client, mut network_events, network_event_loop) =
network::new(opt.secret_key_seed).await?;
// Spawn the network task for it to run in the background.
spawn(network_event_loop.run());
// In case a listen address was provided use it, otherwise listen on any
// address.
match opt.listen_address {
Some(addr) => network_client
.start_listening(addr)
.await
.expect("Listening not to fail."),
None => network_client
.start_listening("/ip4/0.0.0.0/tcp/0".parse()?)
.await
.expect("Listening not to fail."),
};
// In case the user provided an address of a peer on the CLI, dial it.
if let Some(addr) = opt.peer {
let peer_id = match addr.iter().last() {
Some(Protocol::P2p(hash)) => PeerId::from_multihash(hash).expect("Valid hash."),
_ => return Err("Expect peer multiaddr to contain peer ID.".into()),
};
network_client
.dial(peer_id, addr)
.await
.expect("Dial to succeed");
}
match opt.argument {
// Providing a file.
CliArgument::Provide { path, name } => {
// Advertise oneself as a provider of the file on the DHT.
network_client.start_providing(name.clone()).await;
loop {
match network_events.next().await {
// Reply with the content of the file on incoming requests.
Some(network::Event::InboundRequest { request, channel }) => {
if request == name {
let file_content = std::fs::read_to_string(&path)?;
network_client.respond_file(file_content, channel).await;
}
}
_ => todo!(),
}
}
}
// Locating and getting a file.
CliArgument::Get { name } => {
// Locate all nodes providing the file.
let providers = network_client.get_providers(name.clone()).await;
if providers.is_empty() {
return Err(format!("Could not find provider for file {}.", name).into());
}
// Request the content of the file from each node.
let requests = providers.into_iter().map(|p| {
let mut network_client = network_client.clone();
let name = name.clone();
async move { network_client.request_file(p, name).await }.boxed()
});
// Await the requests, ignore the remaining once a single one succeeds.
let file = futures::future::select_ok(requests)
.await
.map_err(|_| "None of the providers returned file.")?
.0;
println!("Content of file {}: {}", name, file);
}
}
Ok(())
}
#[derive(Debug, StructOpt)]
#[structopt(name = "libp2p file sharing example")]
struct Opt {
/// Fixed value to generate deterministic peer ID.
#[structopt(long)]
secret_key_seed: Option<u8>,
#[structopt(long)]
peer: Option<Multiaddr>,
#[structopt(long)]
listen_address: Option<Multiaddr>,
#[structopt(subcommand)]
argument: CliArgument,
}
#[derive(Debug, StructOpt)]
enum CliArgument {
Provide {
#[structopt(long)]
path: PathBuf,
#[structopt(long)]
name: String,
},
Get {
#[structopt(long)]
name: String,
},
}
/// The network module, encapsulating all network related logic.
mod network {
use super::*;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use libp2p::core::either::EitherError;
use libp2p::core::upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName};
use libp2p::identity;
use libp2p::identity::ed25519;
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult};
use libp2p::multiaddr::Protocol;
use libp2p::request_response::{
ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent,
RequestResponseMessage, ResponseChannel,
};
use libp2p::swarm::{ProtocolsHandlerUpgrErr, SwarmBuilder, SwarmEvent};
use libp2p::{NetworkBehaviour, Swarm};
use std::collections::{HashMap, HashSet};
use std::iter;
/// Creates the network components, namely:
///
/// - The network client to interact with the network layer from anywhere
/// within your application.
///
/// - The network event stream, e.g. for incoming requests.
///
/// - The network task driving the network itself.
pub async fn new(
secret_key_seed: Option<u8>,
) -> Result<(Client, impl Stream<Item = Event>, EventLoop), Box<dyn Error>> {
// Create a public/private key pair, either random or based on a seed.
let id_keys = match secret_key_seed {
Some(seed) => {
let mut bytes = [0u8; 32];
bytes[0] = seed;
let secret_key = ed25519::SecretKey::from_bytes(&mut bytes).expect(
"this returns `Err` only if the length is wrong; the length is correct; qed",
);
identity::Keypair::Ed25519(secret_key.into())
}
None => identity::Keypair::generate_ed25519(),
};
let peer_id = id_keys.public().to_peer_id();
// Build the Swarm, connecting the lower layer transport logic with the
// higher layer network behaviour logic.
let swarm = SwarmBuilder::new(
libp2p::development_transport(id_keys).await?,
ComposedBehaviour {
kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)),
request_response: RequestResponse::new(
FileExchangeCodec(),
iter::once((FileExchangeProtocol(), ProtocolSupport::Full)),
Default::default(),
),
},
peer_id,
)
.build();
let (command_sender, command_receiver) = mpsc::channel(0);
let (event_sender, event_receiver) = mpsc::channel(0);
Ok((
Client {
sender: command_sender,
},
event_receiver,
EventLoop::new(swarm, command_receiver, event_sender),
))
}
#[derive(Clone)]
pub struct Client {
sender: mpsc::Sender<Command>,
}
impl Client {
/// Listen for incoming connections on the given address.
pub async fn start_listening(
&mut self,
addr: Multiaddr,
) -> Result<(), Box<dyn Error + Send>> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::StartListening { addr, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}
/// Dial the given peer at the given address.
pub async fn dial(
&mut self,
peer_id: PeerId,
peer_addr: Multiaddr,
) -> Result<(), Box<dyn Error + Send>> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::Dial {
peer_id,
peer_addr,
sender,
})
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}
/// Advertise the local node as the provider of the given file on the DHT.
pub async fn start_providing(&mut self, file_name: String) {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::StartProviding { file_name, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.");
}
/// Find the providers for the given file on the DHT.
pub async fn get_providers(&mut self, file_name: String) -> HashSet<PeerId> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::GetProviders { file_name, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}
/// Request the content of the given file from the given peer.
pub async fn request_file(
&mut self,
peer: PeerId,
file_name: String,
) -> Result<String, Box<dyn Error + Send>> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::RequestFile {
file_name,
peer,
sender,
})
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not be dropped.")
}
/// Respond with the provided file content to the given request.
pub async fn respond_file(&mut self, file: String, channel: ResponseChannel<FileResponse>) {
self.sender
.send(Command::RespondFile { file, channel })
.await
.expect("Command receiver not to be dropped.");
}
}
pub struct EventLoop {
swarm: Swarm<ComposedBehaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), Box<dyn Error + Send>>>>,
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_request_file:
HashMap<RequestId, oneshot::Sender<Result<String, Box<dyn Error + Send>>>>,
}
impl EventLoop {
fn new(
swarm: Swarm<ComposedBehaviour>,
command_receiver: mpsc::Receiver<Command>,
event_sender: mpsc::Sender<Event>,
) -> Self {
Self {
swarm,
command_receiver,
event_sender,
pending_dial: Default::default(),
pending_start_providing: Default::default(),
pending_get_providers: Default::default(),
pending_request_file: Default::default(),
}
}
pub async fn run(mut self) {
loop {
futures::select! {
event = self.swarm.next() => self.handle_event(event.expect("Swarm stream to be infinite.")).await ,
command = self.command_receiver.next() => match command {
Some(c) => self.handle_command(c).await,
// Command channel closed, thus shutting down the network event loop.
None=> return,
},
}
}
}
async fn handle_event(
&mut self,
event: SwarmEvent<
ComposedEvent,
EitherError<ProtocolsHandlerUpgrErr<io::Error>, io::Error>,
>,
) {
match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted {
id,
result: QueryResult::StartProviding(_),
..
},
)) => {
let sender: oneshot::Sender<()> = self
.pending_start_providing
.remove(&id)
.expect("Completed query to be previously pending.");
let _ = sender.send(());
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryCompleted {
id,
result: QueryResult::GetProviders(Ok(GetProvidersOk { providers, .. })),
..
},
)) => {
let _ = self
.pending_get_providers
.remove(&id)
.expect("Completed query to be previously pending.")
.send(providers);
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {}
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
RequestResponseEvent::Message { message, .. },
)) => match message {
RequestResponseMessage::Request {
request, channel, ..
} => {
self.event_sender
.send(Event::InboundRequest {
request: request.0,
channel,
})
.await
.expect("Event receiver not to be dropped.");
}
RequestResponseMessage::Response {
request_id,
response,
} => {
let _ = self
.pending_request_file
.remove(&request_id)
.expect("Request to still be pending.")
.send(Ok(response.0));
}
},
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
RequestResponseEvent::OutboundFailure {
request_id, error, ..
},
)) => {
let _ = self
.pending_request_file
.remove(&request_id)
.expect("Request to still be pending.")
.send(Err(Box::new(error)));
}
SwarmEvent::Behaviour(ComposedEvent::RequestResponse(
RequestResponseEvent::ResponseSent { .. },
)) => {}
SwarmEvent::NewListenAddr { address, .. } => {
let local_peer_id = *self.swarm.local_peer_id();
println!(
"Local node is listening on {:?}",
address.with(Protocol::P2p(local_peer_id.into()))
);
}
SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::ConnectionEstablished {
peer_id, endpoint, ..
} => {
if endpoint.is_dialer() {
if let Some(sender) = self.pending_dial.remove(&peer_id) {
let _ = sender.send(Ok(()));
}
}
}
SwarmEvent::ConnectionClosed { .. } => {}
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
if let Some(peer_id) = peer_id {
if let Some(sender) = self.pending_dial.remove(&peer_id) {
let _ = sender.send(Err(Box::new(error)));
}
}
}
e => panic!("{:?}", e),
}
}
async fn handle_command(&mut self, command: Command) {
match command {
Command::StartListening { addr, sender } => {
let _ = match self.swarm.listen_on(addr) {
Ok(_) => sender.send(Ok(())),
Err(e) => sender.send(Err(Box::new(e))),
};
}
Command::Dial {
peer_id,
peer_addr,
sender,
} => {
if self.pending_dial.contains_key(&peer_id) {
todo!("Already dialing peer.");
} else {
self.swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, peer_addr.clone());
match self
.swarm
.dial(peer_addr.with(Protocol::P2p(peer_id.into())))
{
Ok(()) => {
self.pending_dial.insert(peer_id, sender);
}
Err(e) => {
let _ = sender.send(Err(Box::new(e)));
}
}
}
}
Command::StartProviding { file_name, sender } => {
let query_id = self
.swarm
.behaviour_mut()
.kademlia
.start_providing(file_name.into_bytes().into())
.expect("No store error.");
self.pending_start_providing.insert(query_id, sender);
}
Command::GetProviders { file_name, sender } => {
let query_id = self
.swarm
.behaviour_mut()
.kademlia
.get_providers(file_name.into_bytes().into());
self.pending_get_providers.insert(query_id, sender);
}
Command::RequestFile {
file_name,
peer,
sender,
} => {
let request_id = self
.swarm
.behaviour_mut()
.request_response
.send_request(&peer, FileRequest(file_name));
self.pending_request_file.insert(request_id, sender);
}
Command::RespondFile { file, channel } => {
self.swarm
.behaviour_mut()
.request_response
.send_response(channel, FileResponse(file))
.expect("Connection to peer to be still open.");
}
}
}
}
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "ComposedEvent")]
struct ComposedBehaviour {
request_response: RequestResponse<FileExchangeCodec>,
kademlia: Kademlia<MemoryStore>,
}
#[derive(Debug)]
enum ComposedEvent {
RequestResponse(RequestResponseEvent<FileRequest, FileResponse>),
Kademlia(KademliaEvent),
}
impl From<RequestResponseEvent<FileRequest, FileResponse>> for ComposedEvent {
fn from(event: RequestResponseEvent<FileRequest, FileResponse>) -> Self {
ComposedEvent::RequestResponse(event)
}
}
impl From<KademliaEvent> for ComposedEvent {
fn from(event: KademliaEvent) -> Self {
ComposedEvent::Kademlia(event)
}
}
#[derive(Debug)]
enum Command {
StartListening {
addr: Multiaddr,
sender: oneshot::Sender<Result<(), Box<dyn Error + Send>>>,
},
Dial {
peer_id: PeerId,
peer_addr: Multiaddr,
sender: oneshot::Sender<Result<(), Box<dyn Error + Send>>>,
},
StartProviding {
file_name: String,
sender: oneshot::Sender<()>,
},
GetProviders {
file_name: String,
sender: oneshot::Sender<HashSet<PeerId>>,
},
RequestFile {
file_name: String,
peer: PeerId,
sender: oneshot::Sender<Result<String, Box<dyn Error + Send>>>,
},
RespondFile {
file: String,
channel: ResponseChannel<FileResponse>,
},
}
pub enum Event {
InboundRequest {
request: String,
channel: ResponseChannel<FileResponse>,
},
}
// Simple file exchange protocol
#[derive(Debug, Clone)]
struct FileExchangeProtocol();
#[derive(Clone)]
struct FileExchangeCodec();
#[derive(Debug, Clone, PartialEq, Eq)]
struct FileRequest(String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileResponse(String);
impl ProtocolName for FileExchangeProtocol {
fn protocol_name(&self) -> &[u8] {
"/file-exchange/1".as_bytes()
}
}
#[async_trait]
impl RequestResponseCodec for FileExchangeCodec {
type Protocol = FileExchangeProtocol;
type Request = FileRequest;
type Response = FileResponse;
async fn read_request<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
) -> io::Result<Self::Request>
where
T: AsyncRead + Unpin + Send,
{
let vec = read_length_prefixed(io, 1_000_000).await?;
if vec.is_empty() {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok(FileRequest(String::from_utf8(vec).unwrap()))
}
async fn read_response<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
) -> io::Result<Self::Response>
where
T: AsyncRead + Unpin + Send,
{
let vec = read_length_prefixed(io, 1_000_000).await?;
if vec.is_empty() {
return Err(io::ErrorKind::UnexpectedEof.into());
}
Ok(FileResponse(String::from_utf8(vec).unwrap()))
}
async fn write_request<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
FileRequest(data): FileRequest,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
write_length_prefixed(io, data).await?;
io.close().await?;
Ok(())
}
async fn write_response<T>(
&mut self,
_: &FileExchangeProtocol,
io: &mut T,
FileResponse(data): FileResponse,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
write_length_prefixed(io, data).await?;
io.close().await?;
Ok(())
}
}
}