From f5e7461cecb0a7f292af594b9d2bc965e215a93d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Tue, 28 Jan 2020 13:22:09 +0100 Subject: [PATCH] Implement private networks (#1385) * Add pnet protocol copied from plaintext protocol, since that seems to be the closest match * Minimalize the pnet protocol * WIP private swarms with fixed key * Different nonces for write and read * Use per stream write buffer to avoid allocations * Add parsing and formating of PSKs * Directly call handshake Also remove unneeded InboundUpgrade and OutboundUpgrade * Add HandshakeError * Add dedicated pnet example * Add tests for PSK parsing and formatting * Some more tests for the parsing, fail case * Add fingerprint To be able to check if a go-ipfs and rust-libp2p use the same key without having to dump the actual key. Not sure if there is a spec for this anywhere, but it is basically just copied from go-ipfs. * Minimize dependencies and remove dead code * Rename PSK to PreSharedKey and use pin_project * Add crypt_writer Basically a stripped down and modified version of async_std BufWriter that also encrypts using the given cipher. * cargo fmt * Actually get rid of the Unpin requirement * Rewrite flushing and remove written count from state * Add docs for pnet/lib.rs * Increase library version * Remove pnet example There will be a more elaborate and useful example in a different PR * Return pending on pending... also make doc text less ambiguous * Add debug assertions to check invariants of poll_flush_buf Also, clarify the invariants in the comments of that method --- Cargo.toml | 1 + protocols/pnet/Cargo.toml | 24 ++ protocols/pnet/src/crypt_writer.rs | 153 +++++++++++++ protocols/pnet/src/lib.rs | 349 +++++++++++++++++++++++++++++ src/lib.rs | 21 ++ 5 files changed, 548 insertions(+) create mode 100644 protocols/pnet/Cargo.toml create mode 100644 protocols/pnet/src/crypt_writer.rs create mode 100644 protocols/pnet/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index b41eb074..7b1f4c57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ libp2p-floodsub = { version = "0.15.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.15.0", path = "./protocols/gossipsub" } libp2p-ping = { version = "0.15.0", path = "protocols/ping" } libp2p-plaintext = { version = "0.15.0", path = "protocols/plaintext" } +libp2p-pnet = { version = "0.15.0", path = "protocols/pnet" } libp2p-core = { version = "0.15.0", path = "core" } libp2p-core-derive = { version = "0.15.0", path = "misc/core-derive" } libp2p-secio = { version = "0.15.0", path = "protocols/secio", default-features = false } diff --git a/protocols/pnet/Cargo.toml b/protocols/pnet/Cargo.toml new file mode 100644 index 00000000..d9f15f68 --- /dev/null +++ b/protocols/pnet/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "libp2p-pnet" +edition = "2018" +description = "Private swarm support for libp2p" +version = "0.15.0" +authors = ["Parity Technologies "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +futures = "0.3.1" +libp2p-core = { version = "0.15.0", path = "../../core" } +log = "0.4.8" +salsa20 = "0.3.0" +sha3 = "0.8" +rand = "0.7" +pin-project = "0.4.6" + +[dev-dependencies] +env_logger = "0.7.1" +quickcheck = "0.9.0" +futures-timer = "2.0" diff --git a/protocols/pnet/src/crypt_writer.rs b/protocols/pnet/src/crypt_writer.rs new file mode 100644 index 00000000..96a798a3 --- /dev/null +++ b/protocols/pnet/src/crypt_writer.rs @@ -0,0 +1,153 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use futures::{ + io::{self, AsyncWrite}, + ready, + task::{Context, Poll}, +}; +use log::trace; +use pin_project::pin_project; +use salsa20::{stream_cipher::SyncStreamCipher, XSalsa20}; +use std::{fmt, pin::Pin}; + +/// A writer that encrypts and forwards to an inner writer +#[pin_project] +pub struct CryptWriter { + #[pin] + inner: W, + buf: Vec, + cipher: XSalsa20, +} + +impl CryptWriter { + /// Creates a new `CryptWriter` with the specified buffer capacity. + pub fn with_capacity(capacity: usize, inner: W, cipher: XSalsa20) -> CryptWriter { + CryptWriter { + inner, + buf: Vec::with_capacity(capacity), + cipher, + } + } + + /// Gets a pinned mutable reference to the inner writer. + /// + /// It is inadvisable to directly write to the inner writer. + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { + self.project().inner + } +} + +/// Write the contents of a Vec into an AsyncWrite. +/// +/// The handling 0 byte progress and the Interrupted error was taken from BufWriter in async_std. +/// +/// If this fn returns Ready(Ok(())), the buffer has been completely flushed and is empty. +fn poll_flush_buf( + inner: &mut Pin<&mut W>, + buf: &mut Vec, + cx: &mut Context<'_>, +) -> Poll> { + let mut ret = Poll::Ready(Ok(())); + let mut written = 0; + let len = buf.len(); + while written < len { + match inner.as_mut().poll_write(cx, &buf[written..]) { + Poll::Ready(Ok(n)) => { + if n > 0 { + // we made progress, so try again + written += n; + } else { + // we got Ok but got no progress whatsoever, so bail out so we don't spin writing 0 bytes. + ret = Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "Failed to write buffered data", + ))); + break; + } + } + Poll::Ready(Err(e)) => { + // Interrupted is the only error that we consider to be recoverable by trying again + if e.kind() != io::ErrorKind::Interrupted { + // for any other error, don't try again + ret = Poll::Ready(Err(e)); + break; + } + } + Poll::Pending => { + ret = Poll::Pending; + break; + } + } + } + if written > 0 { + buf.drain(..written); + } + if let Poll::Ready(Ok(())) = ret { debug_assert!(buf.is_empty()); } + ret +} + +impl AsyncWrite for CryptWriter { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let mut this = self.project(); + // completely flush the buffer, returning pending if not possible + ready!(poll_flush_buf(&mut this.inner, this.buf, cx))?; + // if we get here, the buffer is empty + debug_assert!(this.buf.is_empty()); + let res = Pin::new(&mut *this.buf).poll_write(cx, buf); + if let Poll::Ready(Ok(count)) = res { + this.cipher.apply_keystream(&mut this.buf[0..count]); + trace!("encrypted {} bytes", count); + } else { + debug_assert!(false); + }; + // flush immediately afterwards, but if we get a pending we don't care + if let Poll::Ready(Err(e)) = poll_flush_buf(&mut this.inner, this.buf, cx) { + Poll::Ready(Err(e)) + } else { + res + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + ready!(poll_flush_buf(&mut this.inner, this.buf, cx))?; + this.inner.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + ready!(poll_flush_buf(&mut this.inner, this.buf, cx))?; + this.inner.poll_close(cx) + } +} + +impl fmt::Debug for CryptWriter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CryptWriter") + .field("writer", &self.inner) + .field("buf", &self.buf) + .finish() + } +} diff --git a/protocols/pnet/src/lib.rs b/protocols/pnet/src/lib.rs new file mode 100644 index 00000000..2b61b296 --- /dev/null +++ b/protocols/pnet/src/lib.rs @@ -0,0 +1,349 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! The `pnet` protocol implements *Pre-shared Key Based Private Networks in libp2p*, +//! as specified in [the spec](https://github.com/libp2p/specs/blob/master/pnet/Private-Networks-PSK-V1.md) +//! +//! Libp2p nodes configured with a pre-shared key can only communicate with other nodes with +//! the same key. +mod crypt_writer; +use crypt_writer::CryptWriter; +use futures::prelude::*; +use log::trace; +use pin_project::pin_project; +use rand::RngCore; +use salsa20::{ + stream_cipher::{NewStreamCipher, SyncStreamCipher}, + Salsa20, XSalsa20, +}; +use sha3::{digest::ExtendableOutput, Shake128}; +use std::{ + error, + fmt::{self, Write}, + io, + io::Error as IoError, + num::ParseIntError, + pin::Pin, + str::FromStr, + task::{Context, Poll}, +}; + +const KEY_SIZE: usize = 32; +const NONCE_SIZE: usize = 24; +const WRITE_BUFFER_SIZE: usize = 1024; +const FINGERPRINT_SIZE: usize = 16; + +/// A pre-shared key, consisting of 32 bytes of random data. +#[derive(Copy, Clone, PartialEq, Eq)] +pub struct PreSharedKey([u8; KEY_SIZE]); + +impl PreSharedKey { + /// Compute PreSharedKey fingerprint identical to the go-libp2p fingerprint. + /// The computation of the fingerprint is not specified in the spec. + /// + /// This provides a way to check that private keys are properly configured + /// without dumping the key itself to the console. + pub fn fingerprint(&self) -> Fingerprint { + use std::io::{Read, Write}; + let mut enc = [0u8; 64]; + let nonce: [u8; 8] = *b"finprint"; + let mut out = [0u8; 16]; + let mut cipher = Salsa20::new(&self.0.into(), &nonce.into()); + cipher.apply_keystream(&mut enc); + let mut hasher = Shake128::default(); + hasher.write_all(&enc).expect("shake128 failed"); + hasher.xof_result().read(&mut out).expect("shake128 failed"); + Fingerprint(out) + } +} + +fn parse_hex_key(s: &str) -> Result<[u8; KEY_SIZE], KeyParseError> { + if s.len() == KEY_SIZE * 2 { + let mut r = [0u8; KEY_SIZE]; + for i in 0..KEY_SIZE { + r[i] = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16) + .map_err(KeyParseError::InvalidKeyChar)?; + } + Ok(r) + } else { + Err(KeyParseError::InvalidKeyLength) + } +} + +fn to_hex(bytes: &[u8]) -> String { + let mut hex = String::with_capacity(bytes.len() * 2); + + for byte in bytes { + write!(hex, "{:02x}", byte).expect("Can't fail on writing to string"); + } + + hex +} + +/// Parses a PreSharedKey from a key file +/// +/// currently supports only base16 encoding. +impl FromStr for PreSharedKey { + type Err = KeyParseError; + + fn from_str(s: &str) -> Result { + if let &[keytype, encoding, key] = s.lines().take(3).collect::>().as_slice() { + if keytype != "/key/swarm/psk/1.0.0/" { + return Err(KeyParseError::InvalidKeyType); + } + if encoding != "/base16/" { + return Err(KeyParseError::InvalidKeyEncoding); + } + parse_hex_key(key.trim_end()).map(PreSharedKey) + } else { + Err(KeyParseError::InvalidKeyFile) + } + } +} + +impl fmt::Debug for PreSharedKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("PreSharedKey") + .field(&to_hex(&self.0)) + .finish() + } +} + +/// Dumps a PreSharedKey in key file format compatible with go-libp2p +impl fmt::Display for PreSharedKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "/key/swarm/psk/1.0.0/")?; + writeln!(f, "/base16/")?; + writeln!(f, "{}", to_hex(&self.0)) + } +} + +/// A PreSharedKey fingerprint computed from a PreSharedKey +#[derive(Copy, Clone, PartialEq, Eq)] +pub struct Fingerprint([u8; FINGERPRINT_SIZE]); + +/// Dumps the fingerprint as hex +impl fmt::Display for Fingerprint { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", to_hex(&self.0)) + } +} + +/// Error when parsing a PreSharedKey +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum KeyParseError { + /// file does not have the expected structure + InvalidKeyFile, + /// unsupported key type + InvalidKeyType, + /// unsupported key encoding. Currently only base16 is supported + InvalidKeyEncoding, + /// Key is of the wrong length + InvalidKeyLength, + /// key string contains a char that is not consistent with the specified encoding + InvalidKeyChar(ParseIntError), +} + +/// Private network configuration +#[derive(Debug, Copy, Clone)] +pub struct PnetConfig { + /// the PreSharedKey to use for encryption + key: PreSharedKey, +} +impl PnetConfig { + pub fn new(key: PreSharedKey) -> Self { + Self { key } + } + + /// upgrade a connection to use pre shared key encryption. + /// + /// the upgrade works by both sides exchanging 24 byte nonces and then encrypting + /// subsequent traffic with XSalsa20 + pub async fn handshake( + self, + mut socket: TSocket, + ) -> Result, PnetError> + where + TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, + { + trace!("exchanging nonces"); + let mut local_nonce = [0u8; NONCE_SIZE]; + let mut remote_nonce = [0u8; NONCE_SIZE]; + rand::thread_rng().fill_bytes(&mut local_nonce); + socket + .write_all(&local_nonce) + .await + .map_err(PnetError::HandshakeError)?; + socket + .read_exact(&mut remote_nonce) + .await + .map_err(PnetError::HandshakeError)?; + trace!("setting up ciphers"); + let write_cipher = XSalsa20::new(&self.key.0.into(), &local_nonce.into()); + let read_cipher = XSalsa20::new(&self.key.0.into(), &remote_nonce.into()); + Ok(PnetOutput::new(socket, write_cipher, read_cipher)) + } +} + +/// The result of a handshake. This implements AsyncRead and AsyncWrite and can therefore +/// be used as base for additional upgrades. +#[pin_project] +pub struct PnetOutput { + #[pin] + inner: CryptWriter, + read_cipher: XSalsa20, +} + +impl PnetOutput { + fn new(inner: S, write_cipher: XSalsa20, read_cipher: XSalsa20) -> Self { + Self { + inner: CryptWriter::with_capacity(WRITE_BUFFER_SIZE, inner, write_cipher), + read_cipher, + } + } +} + +impl AsyncRead for PnetOutput { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut [u8], + ) -> Poll> { + let this = self.project(); + let result = this.inner.get_pin_mut().poll_read(cx, buf); + if let Poll::Ready(Ok(size)) = &result { + trace!("read {} bytes", size); + this.read_cipher.apply_keystream(&mut buf[..*size]); + trace!("decrypted {} bytes", size); + } + result + } +} + +impl AsyncWrite for PnetOutput { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + self.project().inner.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.project().inner.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.project().inner.poll_close(cx) + } +} + +/// Error when writing or reading private swarms +#[derive(Debug)] +pub enum PnetError { + /// Error during handshake. + HandshakeError(IoError), + /// I/O error. + IoError(IoError), +} + +impl From for PnetError { + #[inline] + fn from(err: IoError) -> PnetError { + PnetError::IoError(err) + } +} + +impl error::Error for PnetError { + fn cause(&self) -> Option<&dyn error::Error> { + match *self { + PnetError::HandshakeError(ref err) => Some(err), + PnetError::IoError(ref err) => Some(err), + } + } +} + +impl fmt::Display for PnetError { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + match self { + PnetError::HandshakeError(e) => write!(f, "Handshake error: {}", e), + PnetError::IoError(e) => write!(f, "I/O error: {}", e), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use quickcheck::*; + + impl Arbitrary for PreSharedKey { + fn arbitrary(g: &mut G) -> PreSharedKey { + let mut key = [0; KEY_SIZE]; + g.fill_bytes(&mut key); + PreSharedKey(key) + } + } + + #[test] + fn psk_tostring_parse() { + fn prop(key: PreSharedKey) -> bool { + let text = key.to_string(); + text.parse::() + .map(|res| res == key) + .unwrap_or(false) + } + QuickCheck::new() + .tests(10) + .quickcheck(prop as fn(PreSharedKey) -> _); + } + + #[test] + fn psk_parse_failure() { + use KeyParseError::*; + assert_eq!("".parse::().unwrap_err(), InvalidKeyFile); + assert_eq!( + "a\nb\nc".parse::().unwrap_err(), + InvalidKeyType + ); + assert_eq!( + "/key/swarm/psk/1.0.0/\nx\ny" + .parse::() + .unwrap_err(), + InvalidKeyEncoding + ); + assert_eq!( + "/key/swarm/psk/1.0.0/\n/base16/\ny" + .parse::() + .unwrap_err(), + InvalidKeyLength + ); + } + + #[test] + fn fingerprint() { + // checked against go-ipfs output + let key = "/key/swarm/psk/1.0.0/\n/base16/\n6189c5cf0b87fb800c1a9feeda73c6ab5e998db48fb9e6a978575c770ceef683".parse::().unwrap(); + let expected = "45fc986bbc9388a11d939df26f730f0c"; + let actual = key.fingerprint().to_string(); + assert_eq!(expected, actual); + } +} diff --git a/src/lib.rs b/src/lib.rs index 2d762a80..94bf747c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -152,6 +152,7 @@ #![doc(html_logo_url = "https://libp2p.io/img/logo_small.png")] #![doc(html_favicon_url = "https://libp2p.io/img/favicon.png")] +use libp2p_pnet::{PnetConfig, PreSharedKey}; pub use bytes; pub use futures; #[doc(inline)] @@ -203,6 +204,8 @@ pub use libp2p_wasm_ext as wasm_ext; pub use libp2p_websocket as websocket; #[doc(inline)] pub use libp2p_yamux as yamux; +#[doc(inline)] +pub use libp2p_pnet as pnet; mod transport_ext; @@ -251,6 +254,24 @@ pub fn build_tcp_ws_secio_mplex_yamux(keypair: identity::Keypair) .timeout(Duration::from_secs(20))) } +/// Builds an implementation of `Transport` that is suitable for usage with the `Swarm`. +/// +/// The implementation supports TCP/IP, WebSockets over TCP/IP, secio as the encryption layer, +/// and mplex or yamux as the multiplexing layer. +/// +/// > **Note**: If you ever need to express the type of this `Transport`. +pub fn build_tcp_ws_pnet_secio_mplex_yamux(keypair: identity::Keypair, psk: PreSharedKey) + -> io::Result> + Send + Sync), Error = impl error::Error + Send, Listener = impl Send, Dial = impl Send, ListenerUpgrade = impl Send> + Clone> +{ + Ok(CommonTransport::new()? + .and_then(move |socket, _| PnetConfig::new(psk).handshake(socket)) + .upgrade(core::upgrade::Version::V1) + .authenticate(secio::SecioConfig::new(keypair)) + .multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new())) + .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) + .timeout(Duration::from_secs(20))) +} + /// Implementation of `Transport` that supports the most common protocols. /// /// The list currently is TCP/IP, DNS, and WebSockets. However this list could change in the