From e45fb87a5f547e1b528d80e51bb69dbefe342fc3 Mon Sep 17 00:00:00 2001 From: Fredrik Date: Mon, 18 Sep 2017 16:52:51 +0200 Subject: [PATCH] First take on a working TCP transport --- Cargo.toml | 2 +- libp2p-tcp-transport/Cargo.toml | 11 +++ libp2p-tcp-transport/src/lib.rs | 147 ++++++++++++++++++++++++++++++++ libp2p-transport/Cargo.toml | 3 +- libp2p-transport/src/lib.rs | 18 ++-- 5 files changed, 171 insertions(+), 10 deletions(-) create mode 100644 libp2p-tcp-transport/Cargo.toml create mode 100644 libp2p-tcp-transport/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index e8ef317d..ec970444 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,4 +4,4 @@ version = "0.1.0" authors = ["Parity Technologies "] [workspace] -members = ["libp2p-transport", "libp2p-host"] \ No newline at end of file +members = ["libp2p-transport", "libp2p-host", "libp2p-tcp-transport"] diff --git a/libp2p-tcp-transport/Cargo.toml b/libp2p-tcp-transport/Cargo.toml new file mode 100644 index 00000000..0a198a0c --- /dev/null +++ b/libp2p-tcp-transport/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "libp2p-tcp-transport" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +libp2p-transport = { path = "../libp2p-transport" } +futures = "0.1" +multiaddr = "0.2.0" +tokio-core = "0.1" +tokio-io = "0.1" diff --git a/libp2p-tcp-transport/src/lib.rs b/libp2p-tcp-transport/src/lib.rs new file mode 100644 index 00000000..642c56c6 --- /dev/null +++ b/libp2p-tcp-transport/src/lib.rs @@ -0,0 +1,147 @@ +extern crate libp2p_transport as transport; +extern crate tokio_core; +extern crate tokio_io; +extern crate multiaddr; +extern crate futures; + +use std::io::Error as IoError; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use tokio_core::reactor::Handle; +use tokio_core::net::{TcpStream, TcpListener}; +use futures::Future; +use futures::stream::Stream; +use multiaddr::{Multiaddr, Protocol}; +use transport::Transport; + +pub struct TCP; + +impl Transport for TCP { + /// The raw connection. + type RawConn = TcpStream; + + /// The listener produces incoming connections. + type Listener = Box>; + + /// A future which indicates currently dialing to a peer. + type Dial = Box>; + + /// Listen on the given multi-addr. + /// Returns the address back if it isn't supported. + fn listen_on(&mut self, handle: &Handle, addr: Multiaddr) -> Result { + if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { + Ok(Box::new(futures::future::result(TcpListener::bind(&socket_addr, handle)).map(|listener| { + // Pull out a stream of sockets for incoming connections + listener.incoming().map(|x| x.0) + }).flatten_stream())) + } else { + Err(addr) + } + } + + /// Dial to the given multi-addr. + /// Returns either a future which may resolve to a connection, + /// or gives back the multiaddress. + fn dial(&mut self, handle: &Handle, addr: Multiaddr) -> Result { + if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { + Ok(Box::new(TcpStream::connect(&socket_addr, handle))) + } else { + Err(addr) + } + } +} + +// This type of logic should probably be moved into the multiaddr package +fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { + let protocols = addr.protocol(); + match (protocols[0], protocols[1]) { + (Protocol::IP4, Protocol::TCP) => { + let bs = addr.as_slice(); + Ok(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(bs[1], bs[2], bs[3], bs[4])), + (bs[6] as u16) << 8 | bs[7] as u16 + )) + }, + (Protocol::IP6, Protocol::TCP) => { + let bs = addr.as_slice(); + if let Ok(Some(s)) = Protocol::IP6.bytes_to_string(&bs[1..17]) { + if let Ok(ipv6addr) = s.parse() { + return Ok(SocketAddr::new(IpAddr::V6(ipv6addr), (bs[18] as u16) << 8 | bs[19] as u16)) + } + } + Err(addr) + }, + _ => Err(addr), + } +} + +#[test] +fn multiaddr_to_tcp_conversion() { + use std::net::{Ipv6Addr}; + + assert_eq!( + multiaddr_to_socketaddr(&Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap()), + Ok(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345)) + ); + assert_eq!( + multiaddr_to_socketaddr(&Multiaddr::new("/ip4/255.255.255.255/tcp/8080").unwrap()), + Ok(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080)) + ); + assert_eq!( + multiaddr_to_socketaddr(&Multiaddr::new("/ip6/::1/tcp/12345").unwrap()), + Ok(SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345)) + ); + assert_eq!( + multiaddr_to_socketaddr(&Multiaddr::new("/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080").unwrap()), + Ok(SocketAddr::new(IpAddr::V6(Ipv6Addr::new(65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535)), 8080)) + ); +} + +#[test] +fn communicating_between_dialer_and_listener() { + use tokio_core::reactor::Core; + use std::io::Write; + + /// This thread is running the listener + /// while the main thread runs the dialer + std::thread::spawn(move || { + let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); + let mut ev_loop = Core::new().unwrap(); + let handle = ev_loop.handle(); + let server = TCP.listen_on(&handle, addr).unwrap().for_each(|sock| { + // Define what to do with the socket that just connected to us + // Which in this case is read 3 bytes + let handle_conn = tokio_io::io::read_exact(sock, [0; 3]).map(|(_, buf)| { + println!("Actually read {:?}", buf); + assert_eq!(buf, [1,2,3]) + }).map_err(|err| { + panic!("IO error {:?}", err) + }); + + // Spawn the future as a concurrent task + handle.spawn(handle_conn); + + Ok(()) + }); + + // Spin up the server on the event loop + ev_loop.run(server).unwrap(); + }); + let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); + let mut ev_loop = Core::new().unwrap(); + let handle = ev_loop.handle(); + // Obtain a future socket through dialing + let socket = TCP.dial(&handle, addr.clone()).unwrap(); + // Define what to do with the socket once it's obtained + let action = socket.then(|sock| { + match sock { + Ok(mut s) => { + let written = s.write(&[0x1,0x2,0x3]).unwrap(); + Ok(written) + } + Err(x) => Err(x) + } + }); + // Execute the future in our event loop + ev_loop.run(action).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(1000)); +} diff --git a/libp2p-transport/Cargo.toml b/libp2p-transport/Cargo.toml index 9cdc337f..75990294 100644 --- a/libp2p-transport/Cargo.toml +++ b/libp2p-transport/Cargo.toml @@ -6,4 +6,5 @@ authors = ["Parity Technologies "] [dependencies] multiaddr = "0.2.0" futures = "0.1" -tokio-io = "0.1" \ No newline at end of file +tokio-io = "0.1" +tokio-core = "0.1" diff --git a/libp2p-transport/src/lib.rs b/libp2p-transport/src/lib.rs index 0f1ea92a..38195e20 100644 --- a/libp2p-transport/src/lib.rs +++ b/libp2p-transport/src/lib.rs @@ -2,13 +2,17 @@ extern crate futures; extern crate tokio_io; +extern crate tokio_core; /// Multi-address re-export. pub extern crate multiaddr; -use futures::*; +use multiaddr::Multiaddr; +use futures::{IntoFuture, Future}; +use futures::stream::Stream; use std::io::Error as IoError; use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_core::reactor::Handle; // Something more strongly-typed? pub type ProtocolId = String; @@ -17,7 +21,7 @@ pub type PeerId = String; /// A logical wire between us and a peer. We can read and write through this asynchronously. /// /// You can have multiple `Socket`s between you and any given peer. -pub trait Socket: AsyncRead + AsyncWrite { +pub trait Socket: AsyncRead + AsyncWrite { /// Get the protocol ID this socket uses. fn protocol_id(&self) -> ProtocolId; @@ -31,11 +35,9 @@ pub trait Conn { type Socket; /// Initiate a socket between you and the peer on the given protocol. - fn make_socket(&self, proto: ProtocolId) -> BoxFuture; + fn make_socket(&self, proto: ProtocolId) -> Box>; } -pub struct MultiAddr; // stub for multiaddr crate type. - /// A transport is a stream producing incoming connections. /// These are transports or wrappers around them. pub trait Transport { @@ -50,10 +52,10 @@ pub trait Transport { /// Listen on the given multi-addr. /// Returns the address back if it isn't supported. - fn listen_on(&mut self, addr: MultiAddr) -> Result; + fn listen_on(&mut self, handle: &Handle, addr: Multiaddr) -> Result; /// Dial to the given multi-addr. /// Returns either a future which may resolve to a connection, /// or gives back the multiaddress. - fn dial(&mut self, addr: MultiAddr) -> Result; -} \ No newline at end of file + fn dial(&mut self, handle: &Handle, addr: Multiaddr) -> Result; +}