// Copyright 2019 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use crate::error::PlainTextError; use crate::handshake::Remote; use bytes::BytesMut; use futures::future::{self, Ready}; use futures::prelude::*; use futures::{future::BoxFuture, Sink, Stream}; use futures_codec::Framed; use libp2p_core::{ identity, InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated, PeerId, PublicKey, }; use log::debug; use rw_stream_sink::RwStreamSink; use std::{io, iter, pin::Pin, task::{Context, Poll}}; use unsigned_varint::codec::UviBytes; use void::Void; mod error; mod handshake; mod pb; /// `PlainText1Config` is an insecure connection handshake for testing purposes only. /// /// > **Note**: Given that `PlainText1Config` has no notion of exchanging peer identity information it is not compatible /// > with the `libp2p_core::transport::upgrade::Builder` pattern. See /// > [`PlainText2Config`](struct.PlainText2Config.html) if compatibility is needed. Even though not compatible with the /// > Builder pattern one can still do an upgrade *manually*: /// /// ``` /// # use libp2p_core::transport::{ Transport, memory::MemoryTransport }; /// # use libp2p_plaintext::PlainText1Config; /// # /// MemoryTransport::default() /// .and_then(move |io, endpoint| { /// libp2p_core::upgrade::apply( /// io, /// PlainText1Config{}, /// endpoint, /// libp2p_core::transport::upgrade::Version::V1, /// ) /// }) /// .map(|plaintext, _endpoint| { /// unimplemented!(); /// // let peer_id = somehow_derive_peer_id(); /// // return (peer_id, plaintext); /// }); /// ``` #[derive(Debug, Copy, Clone)] pub struct PlainText1Config; impl UpgradeInfo for PlainText1Config { type Info = &'static [u8]; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { iter::once(b"/plaintext/1.0.0") } } impl InboundUpgrade for PlainText1Config { type Output = Negotiated; type Error = Void; type Future = Ready, Self::Error>>; fn upgrade_inbound(self, i: Negotiated, _: Self::Info) -> Self::Future { future::ready(Ok(i)) } } impl OutboundUpgrade for PlainText1Config { type Output = Negotiated; type Error = Void; type Future = Ready, Self::Error>>; fn upgrade_outbound(self, i: Negotiated, _: Self::Info) -> Self::Future { future::ready(Ok(i)) } } /// `PlainText2Config` is an insecure connection handshake for testing purposes only, implementing /// the libp2p plaintext connection handshake specification. #[derive(Clone)] pub struct PlainText2Config { pub local_public_key: identity::PublicKey, } impl UpgradeInfo for PlainText2Config { type Info = &'static [u8]; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { iter::once(b"/plaintext/2.0.0") } } impl InboundUpgrade for PlainText2Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { type Output = (PeerId, PlainTextOutput>); type Error = PlainTextError; type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { Box::pin(self.handshake(socket)) } } impl OutboundUpgrade for PlainText2Config where C: AsyncRead + AsyncWrite + Send + Unpin + 'static { type Output = (PeerId, PlainTextOutput>); type Error = PlainTextError; type Future = BoxFuture<'static, Result>; fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { Box::pin(self.handshake(socket)) } } impl PlainText2Config { async fn handshake(self, socket: T) -> Result<(PeerId, PlainTextOutput), PlainTextError> where T: AsyncRead + AsyncWrite + Send + Unpin + 'static { debug!("Starting plaintext upgrade"); let (stream_sink, remote) = PlainTextMiddleware::handshake(socket, self).await?; let mapped = stream_sink.map_err(map_err as fn(_) -> _); Ok(( remote.peer_id, PlainTextOutput { stream: RwStreamSink::new(mapped), remote_key: remote.public_key, } )) } } fn map_err(err: io::Error) -> io::Error { debug!("error during plaintext handshake {:?}", err); io::Error::new(io::ErrorKind::InvalidData, err) } pub struct PlainTextMiddleware { inner: Framed>, } impl PlainTextMiddleware where S: AsyncRead + AsyncWrite + Send + Unpin, { async fn handshake(socket: S, config: PlainText2Config) -> Result<(PlainTextMiddleware, Remote), PlainTextError> { let (inner, remote) = handshake::handshake(socket, config).await?; Ok((PlainTextMiddleware { inner }, remote)) } } impl Sink for PlainTextMiddleware where S: AsyncRead + AsyncWrite + Unpin, { type Error = io::Error; fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { Sink::poll_ready(Pin::new(&mut self.inner), cx) } fn start_send(mut self: Pin<&mut Self>, item: BytesMut) -> Result<(), Self::Error> { Sink::start_send(Pin::new(&mut self.inner), item) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { Sink::poll_flush(Pin::new(&mut self.inner), cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { Sink::poll_close(Pin::new(&mut self.inner), cx) } } impl Stream for PlainTextMiddleware where S: AsyncRead + AsyncWrite + Unpin, { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Stream::poll_next(Pin::new(&mut self.inner), cx) } } /// Output of the plaintext protocol. pub struct PlainTextOutput where S: AsyncRead + AsyncWrite + Unpin, { /// The plaintext stream. pub stream: RwStreamSink, fn(io::Error) -> io::Error>>, /// The public key of the remote. pub remote_key: PublicKey, } impl AsyncRead for PlainTextOutput { fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context, buf: &mut [u8]) -> Poll> { AsyncRead::poll_read(Pin::new(&mut self.stream), cx, buf) } } impl AsyncWrite for PlainTextOutput { fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { AsyncWrite::poll_write(Pin::new(&mut self.stream), cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { AsyncWrite::poll_flush(Pin::new(&mut self.stream), cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { AsyncWrite::poll_close(Pin::new(&mut self.stream), cx) } }