// Copyright 2019 Parity Technologies (UK) Ltd. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. use futures::future::{self, Ready}; use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, upgrade::Negotiated}; use std::iter; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_io::codec::length_delimited::Framed; use crate::error::PlainTextError; use void::Void; use futures::future::FutureResult; use crate::handshake::Remote; mod error; mod handshake; mod pb; #[derive(Debug, Copy, Clone)] pub struct PlainText1Config; impl UpgradeInfo for PlainText1Config { type Info = &'static [u8]; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { iter::once(b"/plaintext/1.0.0") } } impl InboundUpgrade for PlainText1Config { type Output = Negotiated; type Error = Void; type Future = Ready, Self::Error>>; fn upgrade_inbound(self, i: Negotiated, _: Self::Info) -> Self::Future { future::ready(Ok(i)) } } impl OutboundUpgrade for PlainText1Config { type Output = Negotiated; type Error = Void; type Future = Ready, Self::Error>>; fn upgrade_outbound(self, i: Negotiated, _: Self::Info) -> Self::Future { future::ready(Ok(i)) } } #[derive(Clone)] pub struct PlainText2Config { pub local_public_key: identity::PublicKey, } impl UpgradeInfo for PlainText2Config { type Info = &'static [u8]; type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { iter::once(b"/plaintext/2.0.0") } } impl InboundUpgrade for PlainText2Config where C: AsyncRead + AsyncWrite + Send + 'static { type Output = (PeerId, PlainTextOutput>); type Error = PlainTextError; type Future = Box + Send>; fn upgrade_inbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { Box::new(self.handshake(socket)) } } impl OutboundUpgrade for PlainText2Config where C: AsyncRead + AsyncWrite + Send + 'static { type Output = (PeerId, PlainTextOutput>); type Error = PlainTextError; type Future = Box + Send>; fn upgrade_outbound(self, socket: Negotiated, _: Self::Info) -> Self::Future { Box::new(self.handshake(socket)) } } impl PlainText2Config { fn handshake(self, socket: T) -> impl Future), Error = PlainTextError> where T: AsyncRead + AsyncWrite + Send + 'static { debug!("Starting plaintext upgrade"); PlainTextMiddleware::handshake(socket, self) .map(|(stream_sink, remote)| { let mapped = stream_sink.map_err(map_err as fn(_) -> _); ( remote.peer_id, PlainTextOutput { stream: RwStreamSink::new(mapped), remote_key: remote.public_key, } ) }) } } #[inline] fn map_err(err: io::Error) -> io::Error { debug!("error during plaintext handshake {:?}", err); io::Error::new(io::ErrorKind::InvalidData, err) } pub struct PlainTextMiddleware { inner: Framed, } impl PlainTextMiddleware where S: AsyncRead + AsyncWrite + Send, { fn handshake(socket: S, config: PlainText2Config) -> impl Future, Remote), Error = PlainTextError> { handshake::handshake(socket, config).map(|(inner, remote)| { (PlainTextMiddleware { inner }, remote) }) } } impl Sink for PlainTextMiddleware where S: AsyncRead + AsyncWrite, { type SinkItem = BytesMut; type SinkError = io::Error; #[inline] fn start_send(&mut self, item: Self::SinkItem) -> StartSend { self.inner.start_send(item) } #[inline] fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { self.inner.poll_complete() } #[inline] fn close(&mut self) -> Poll<(), Self::SinkError> { self.inner.close() } } impl Stream for PlainTextMiddleware where S: AsyncRead + AsyncWrite, { type Item = BytesMut; type Error = io::Error; #[inline] fn poll(&mut self) -> Poll, Self::Error> { self.inner.poll() } } /// Output of the plaintext protocol. pub struct PlainTextOutput where S: AsyncRead + AsyncWrite, { /// The plaintext stream. pub stream: RwStreamSink, fn(io::Error) -> io::Error>>, /// The public key of the remote. pub remote_key: PublicKey, } impl std::io::Read for PlainTextOutput { fn read(&mut self, buf: &mut [u8]) -> std::io::Result { self.stream.read(buf) } } impl AsyncRead for PlainTextOutput { unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { self.stream.prepare_uninitialized_buffer(buf) } } impl std::io::Write for PlainTextOutput { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.stream.write(buf) } fn flush(&mut self) -> std::io::Result<()> { self.stream.flush() } } impl AsyncWrite for PlainTextOutput { fn shutdown(&mut self) -> Poll<(), io::Error> { self.stream.shutdown() } }