examples/file-sharing: Support binary files (#2786)

This commit is contained in:
qidu 2022-08-16 11:12:28 +08:00 committed by GitHub
parent cef505685c
commit 0e5a25dea8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -84,6 +84,7 @@ use futures::prelude::*;
use libp2p::core::{Multiaddr, PeerId}; use libp2p::core::{Multiaddr, PeerId};
use libp2p::multiaddr::Protocol; use libp2p::multiaddr::Protocol;
use std::error::Error; use std::error::Error;
use std::io::Write;
use std::path::PathBuf; use std::path::PathBuf;
#[async_std::main] #[async_std::main]
@ -134,8 +135,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Reply with the content of the file on incoming requests. // Reply with the content of the file on incoming requests.
Some(network::Event::InboundRequest { request, channel }) => { Some(network::Event::InboundRequest { request, channel }) => {
if request == name { if request == name {
let file_content = std::fs::read_to_string(&path)?; network_client
network_client.respond_file(file_content, channel).await; .respond_file(std::fs::read(&path)?, channel)
.await;
} }
} }
e => todo!("{:?}", e), e => todo!("{:?}", e),
@ -158,12 +160,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
}); });
// Await the requests, ignore the remaining once a single one succeeds. // Await the requests, ignore the remaining once a single one succeeds.
let file = futures::future::select_ok(requests) let file_content = futures::future::select_ok(requests)
.await .await
.map_err(|_| "None of the providers returned file.")? .map_err(|_| "None of the providers returned file.")?
.0; .0;
println!("Content of file {}: {}", name, file); std::io::stdout().write_all(&file_content)?;
} }
} }
@ -337,7 +339,7 @@ mod network {
&mut self, &mut self,
peer: PeerId, peer: PeerId,
file_name: String, file_name: String,
) -> Result<String, Box<dyn Error + Send>> { ) -> Result<Vec<u8>, Box<dyn Error + Send>> {
let (sender, receiver) = oneshot::channel(); let (sender, receiver) = oneshot::channel();
self.sender self.sender
.send(Command::RequestFile { .send(Command::RequestFile {
@ -351,9 +353,16 @@ mod network {
} }
/// Respond with the provided file content to the given request. /// Respond with the provided file content to the given request.
pub async fn respond_file(&mut self, file: String, channel: ResponseChannel<FileResponse>) { pub async fn respond_file(
&mut self,
file: Vec<u8>,
channel: ResponseChannel<FileResponse>,
) {
self.sender self.sender
.send(Command::RespondFile { file, channel }) .send(Command::RespondFile {
file: file,
channel,
})
.await .await
.expect("Command receiver not to be dropped."); .expect("Command receiver not to be dropped.");
} }
@ -367,7 +376,7 @@ mod network {
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>, pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>, pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_request_file: pending_request_file:
HashMap<RequestId, oneshot::Sender<Result<String, Box<dyn Error + Send>>>>, HashMap<RequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
} }
impl EventLoop { impl EventLoop {
@ -476,7 +485,7 @@ mod network {
)) => {} )) => {}
SwarmEvent::NewListenAddr { address, .. } => { SwarmEvent::NewListenAddr { address, .. } => {
let local_peer_id = *self.swarm.local_peer_id(); let local_peer_id = *self.swarm.local_peer_id();
println!( eprintln!(
"Local node is listening on {:?}", "Local node is listening on {:?}",
address.with(Protocol::P2p(local_peer_id.into())) address.with(Protocol::P2p(local_peer_id.into()))
); );
@ -500,7 +509,7 @@ mod network {
} }
} }
SwarmEvent::IncomingConnectionError { .. } => {} SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::Dialing(peer_id) => println!("Dialing {}", peer_id), SwarmEvent::Dialing(peer_id) => eprintln!("Dialing {}", peer_id),
e => panic!("{:?}", e), e => panic!("{:?}", e),
} }
} }
@ -625,10 +634,10 @@ mod network {
RequestFile { RequestFile {
file_name: String, file_name: String,
peer: PeerId, peer: PeerId,
sender: oneshot::Sender<Result<String, Box<dyn Error + Send>>>, sender: oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>,
}, },
RespondFile { RespondFile {
file: String, file: Vec<u8>,
channel: ResponseChannel<FileResponse>, channel: ResponseChannel<FileResponse>,
}, },
} }
@ -650,7 +659,7 @@ mod network {
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
struct FileRequest(String); struct FileRequest(String);
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileResponse(String); pub struct FileResponse(Vec<u8>);
impl ProtocolName for FileExchangeProtocol { impl ProtocolName for FileExchangeProtocol {
fn protocol_name(&self) -> &[u8] { fn protocol_name(&self) -> &[u8] {
@ -689,13 +698,13 @@ mod network {
where where
T: AsyncRead + Unpin + Send, T: AsyncRead + Unpin + Send,
{ {
let vec = read_length_prefixed(io, 1_000_000).await?; let vec = read_length_prefixed(io, 500_000_000).await?; // update transfer maximum
if vec.is_empty() { if vec.is_empty() {
return Err(io::ErrorKind::UnexpectedEof.into()); return Err(io::ErrorKind::UnexpectedEof.into());
} }
Ok(FileResponse(String::from_utf8(vec).unwrap())) Ok(FileResponse(vec))
} }
async fn write_request<T>( async fn write_request<T>(