From 384d15e24a311981d08aebbef92e42eccf369ad7 Mon Sep 17 00:00:00 2001 From: Vurich Date: Mon, 23 Oct 2017 11:45:35 +0200 Subject: [PATCH] Cleanup and remove unnecessary trait objects --- libp2p-host/src/lib.rs | 48 +++--- libp2p-tcp-transport/src/lib.rs | 276 +++++++++++++++++--------------- libp2p-transport/src/lib.rs | 13 +- 3 files changed, 187 insertions(+), 150 deletions(-) diff --git a/libp2p-host/src/lib.rs b/libp2p-host/src/lib.rs index fd09b1f6..652a0697 100644 --- a/libp2p-host/src/lib.rs +++ b/libp2p-host/src/lib.rs @@ -3,39 +3,45 @@ extern crate futures; extern crate libp2p_transport as transport; -use futures::{Future, IntoFuture, BoxFuture}; -use transport::{ProtocolId, MultiAddr, Socket}; +use futures::{Future, IntoFuture}; +use transport::{ProtocolId, Socket}; +use transport::multiaddr::Multiaddr; /// Produces a future for each incoming `Socket`. pub trait Handler { - type Future: IntoFuture; + type Future: IntoFuture; - /// Handle the incoming socket, producing a future which should resolve + /// Handle the incoming socket, producing a future which should resolve /// when the handler is finished. fn handle(&self, socket: S) -> Self::Future; - fn boxed(self) -> BoxHandler where - Self: Sized + Send + 'static, - ::Future: Send + 'static + fn boxed(self) -> BoxHandler + where + Self: Sized + Send + 'static, + ::Future: Send + 'static, { - BoxHandler(Box::new(move |socket| - self.handle(socket).into_future().boxed() - )) + BoxHandler(Box::new(move |socket| { + Box::new(self.handle(socket).into_future()) as _ + })) } } -impl Handler for F - where F: Fn(S) -> U, U: IntoFuture +impl Handler for F +where + F: Fn(S) -> U, + U: IntoFuture, { type Future = U; - fn handle(&self, socket: S) -> U { (self)(socket) } + fn handle(&self, socket: S) -> U { + (self)(socket) + } } /// A boxed handler. -pub struct BoxHandler(Box>>); +pub struct BoxHandler(Box>>>); impl Handler for BoxHandler { - type Future = BoxFuture<(), ()>; + type Future = Box>; fn handle(&self, socket: S) -> Self::Future { self.0.handle(socket) @@ -49,7 +55,7 @@ pub trait Mux: Sync { /// Attach an incoming socket. fn push(&self, socket: Self::Socket); - + /// Set the socket handler for a given protocol id. fn set_handler(&self, proto: ProtocolId, handler: BoxHandler); @@ -59,19 +65,21 @@ pub trait Mux: Sync { /// Unimplemented. Maps peer IDs to connected addresses, protocols, and data. pub trait PeerStore {} - + /// This is a common abstraction over the low-level bits of libp2p. /// /// It handles connecting over, adding and removing transports, /// wraps an arbitrary event loop, and manages protocol IDs. pub trait Host { type Socket: Socket; + type Mux: Mux; + type Multiaddrs: IntoIterator; /// Get a handle to the peer store. fn peer_store(&self) -> &PeerStore; /// Get a handle to the underlying muxer. - fn mux(&self) -> &Mux; + fn mux(&self) -> &Self::Mux; /// Set the socket handler for a given protocol id. fn set_handler(&self, proto: ProtocolId, handler: BoxHandler) { @@ -84,5 +92,5 @@ pub trait Host { } /// Addresses we're listening on. - fn listen_addrs(&self) -> Vec; -} \ No newline at end of file + fn listen_addrs(&self) -> Self::Multiaddrs; +} diff --git a/libp2p-tcp-transport/src/lib.rs b/libp2p-tcp-transport/src/lib.rs index a2986d59..4846de33 100644 --- a/libp2p-tcp-transport/src/lib.rs +++ b/libp2p-tcp-transport/src/lib.rs @@ -14,153 +14,179 @@ use multiaddr::{Multiaddr, Protocol}; use transport::Transport; pub struct Tcp { - pub event_loop: Core, + pub event_loop: Core, } impl Tcp { - pub fn new() -> Result { - Ok(Tcp { - event_loop: Core::new()?, - }) - } + pub fn new() -> Result { + Ok(Tcp { event_loop: Core::new()? }) + } } impl Transport for Tcp { - /// The raw connection. - type RawConn = TcpStream; + /// The raw connection. + type RawConn = TcpStream; - /// The listener produces incoming connections. - type Listener = Box>; + /// The listener produces incoming connections. + type Listener = Box>; - /// A future which indicates currently dialing to a peer. - type Dial = TcpStreamNew; + /// A future which indicates currently dialing to a peer. + type Dial = TcpStreamNew; - /// Listen on the given multi-addr. - /// Returns the address back if it isn't supported. - fn listen_on(&mut self, addr: Multiaddr) -> Result { - if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { - Ok(Box::new(futures::future::result(TcpListener::bind(&socket_addr, &self.event_loop.handle())).map(|listener| { - // Pull out a stream of sockets for incoming connections - listener.incoming().map(|x| x.0) - }).flatten_stream())) - } else { - Err(addr) - } - } + /// Listen on the given multi-addr. + /// Returns the address back if it isn't supported. + fn listen_on(&mut self, addr: Multiaddr) -> Result { + if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { + Ok(Box::new( + futures::future::result( + TcpListener::bind(&socket_addr, &self.event_loop.handle()), + ).map(|listener| { + // Pull out a stream of sockets for incoming connections + listener.incoming().map(|x| x.0) + }) + .flatten_stream(), + )) + } else { + Err(addr) + } + } - /// Dial to the given multi-addr. - /// Returns either a future which may resolve to a connection, - /// or gives back the multiaddress. - fn dial(&mut self, addr: Multiaddr) -> Result { - if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { - Ok(TcpStream::connect(&socket_addr, &self.event_loop.handle())) - } else { - Err(addr) - } - } + /// Dial to the given multi-addr. + /// Returns either a future which may resolve to a connection, + /// or gives back the multiaddress. + fn dial(&mut self, addr: Multiaddr) -> Result { + if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { + Ok(TcpStream::connect(&socket_addr, &self.event_loop.handle())) + } else { + Err(addr) + } + } } // This type of logic should probably be moved into the multiaddr package -fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { - let protocols = addr.protocol(); - match (protocols[0], protocols[1]) { - (Protocol::IP4, Protocol::TCP) => { - let bs = addr.as_slice(); - Ok(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(bs[1], bs[2], bs[3], bs[4])), - (bs[6] as u16) << 8 | bs[7] as u16 - )) - }, - (Protocol::IP6, Protocol::TCP) => { - let bs = addr.as_slice(); - if let Ok(Some(s)) = Protocol::IP6.bytes_to_string(&bs[1..17]) { - if let Ok(ipv6addr) = s.parse() { - return Ok(SocketAddr::new(IpAddr::V6(ipv6addr), (bs[18] as u16) << 8 | bs[19] as u16)) - } - } - Err(addr) - }, - _ => Err(addr), - } +fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { + let protocols = addr.protocol(); + + // TODO: This is nonconforming (since a multiaddr could specify TCP first) but we can't fix that + // until multiaddrs-rs is improved. + match (protocols[0], protocols[1]) { + (Protocol::IP4, Protocol::TCP) => { + let bs = addr.as_slice(); + Ok(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(bs[1], bs[2], bs[3], bs[4])), + (bs[6] as u16) << 8 | bs[7] as u16, + )) + } + (Protocol::IP6, Protocol::TCP) => { + let bs = addr.as_slice(); + if let Ok(Some(s)) = Protocol::IP6.bytes_to_string(&bs[1..17]) { + if let Ok(ipv6addr) = s.parse() { + return Ok(SocketAddr::new( + IpAddr::V6(ipv6addr), + (bs[18] as u16) << 8 | bs[19] as u16, + )); + } + } + Err(addr) + } + _ => Err(addr), + } } #[cfg(test)] mod tests { - use super::{Tcp, multiaddr_to_socketaddr}; - use std; - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use tokio_io; - use futures::Future; - use futures::stream::Stream; - use multiaddr::Multiaddr; - use transport::Transport; + use super::{Tcp, multiaddr_to_socketaddr}; + use std; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use tokio_io; + use futures::Future; + use futures::stream::Stream; + use multiaddr::Multiaddr; + use transport::Transport; - #[test] - fn multiaddr_to_tcp_conversion() { - use std::net::{Ipv6Addr}; + #[test] + fn multiaddr_to_tcp_conversion() { + use std::net::Ipv6Addr; - assert_eq!( - multiaddr_to_socketaddr(&Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap()), - Ok(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345)) - ); - assert_eq!( - multiaddr_to_socketaddr(&Multiaddr::new("/ip4/255.255.255.255/tcp/8080").unwrap()), - Ok(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080)) - ); - assert_eq!( - multiaddr_to_socketaddr(&Multiaddr::new("/ip6/::1/tcp/12345").unwrap()), - Ok(SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345)) - ); - assert_eq!( - multiaddr_to_socketaddr(&Multiaddr::new("/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080").unwrap()), - Ok(SocketAddr::new(IpAddr::V6(Ipv6Addr::new(65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535)), 8080)) - ); - } + assert_eq!( + multiaddr_to_socketaddr(&Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap()), + Ok(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + 12345, + )) + ); + assert_eq!( + multiaddr_to_socketaddr(&Multiaddr::new("/ip4/255.255.255.255/tcp/8080").unwrap()), + Ok(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), + 8080, + )) + ); + assert_eq!( + multiaddr_to_socketaddr(&Multiaddr::new("/ip6/::1/tcp/12345").unwrap()), + Ok(SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), + 12345, + )) + ); + assert_eq!( + multiaddr_to_socketaddr(&Multiaddr::new( + "/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080", + ).unwrap()), + Ok(SocketAddr::new( + IpAddr::V6(Ipv6Addr::new( + 65535, + 65535, + 65535, + 65535, + 65535, + 65535, + 65535, + 65535, + )), + 8080, + )) + ); + } - #[test] - fn communicating_between_dialer_and_listener() { - use std::io::Write; + #[test] + fn communicating_between_dialer_and_listener() { + use std::io::Write; - /// This thread is running the listener - /// while the main thread runs the dialer - std::thread::spawn(move || { - let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); - let mut tcp = Tcp::new().unwrap(); - let handle = tcp.event_loop.handle(); - let listener = tcp.listen_on(addr).unwrap().for_each(|sock| { - // Define what to do with the socket that just connected to us - // Which in this case is read 3 bytes - let handle_conn = tokio_io::io::read_exact(sock, [0; 3]).map(|(_, buf)| { - assert_eq!(buf, [1,2,3]) - }).map_err(|err| { - panic!("IO error {:?}", err) - }); + std::thread::spawn(move || { + let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); + let mut tcp = Tcp::new().unwrap(); + let handle = tcp.event_loop.handle(); + let listener = tcp.listen_on(addr).unwrap().for_each(|sock| { + // Define what to do with the socket that just connected to us + // Which in this case is read 3 bytes + let handle_conn = tokio_io::io::read_exact(sock, [0; 3]) + .map(|(_, buf)| assert_eq!(buf, [1, 2, 3])) + .map_err(|err| panic!("IO error {:?}", err)); - // Spawn the future as a concurrent task - handle.spawn(handle_conn); + // Spawn the future as a concurrent task + handle.spawn(handle_conn); - Ok(()) - }); + Ok(()) + }); - tcp.event_loop.run(listener).unwrap(); - }); - std::thread::sleep(std::time::Duration::from_millis(100)); - let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); - let mut tcp = Tcp::new().unwrap(); - // Obtain a future socket through dialing - let socket = tcp.dial(addr.clone()).unwrap(); - // Define what to do with the socket once it's obtained - let action = socket.then(|sock| { - match sock { - Ok(mut s) => { - let written = s.write(&[0x1,0x2,0x3]).unwrap(); - Ok(written) - } - Err(x) => Err(x) - } - }); - // Execute the future in our event loop - tcp.event_loop.run(action).unwrap(); - std::thread::sleep(std::time::Duration::from_millis(100)); - } + tcp.event_loop.run(listener).unwrap(); + }); + std::thread::sleep(std::time::Duration::from_millis(100)); + let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); + let mut tcp = Tcp::new().unwrap(); + // Obtain a future socket through dialing + let socket = tcp.dial(addr.clone()).unwrap(); + // Define what to do with the socket once it's obtained + let action = socket.then(|sock| match sock { + Ok(mut s) => { + let written = s.write(&[0x1, 0x2, 0x3]).unwrap(); + Ok(written) + } + Err(x) => Err(x), + }); + // Execute the future in our event loop + tcp.event_loop.run(action).unwrap(); + std::thread::sleep(std::time::Duration::from_millis(100)); + } } diff --git a/libp2p-transport/src/lib.rs b/libp2p-transport/src/lib.rs index 8657d241..58dfe51f 100644 --- a/libp2p-transport/src/lib.rs +++ b/libp2p-transport/src/lib.rs @@ -19,21 +19,24 @@ pub type PeerId = String; /// A logical wire between us and a peer. We can read and write through this asynchronously. /// /// You can have multiple `Socket`s between you and any given peer. -pub trait Socket: AsyncRead + AsyncWrite { +pub trait Socket: AsyncRead + AsyncWrite + Sized { + type Conn: Conn; + /// Get the protocol ID this socket uses. fn protocol_id(&self) -> ProtocolId; /// Access the underlying connection. - fn conn(&self) -> &Conn; + fn conn(&self) -> &Self::Conn; } /// A connection between you and a peer. pub trait Conn { /// The socket type this connection manages. type Socket; + type SocketFuture: IntoFuture; /// Initiate a socket between you and the peer on the given protocol. - fn make_socket(&self, proto: ProtocolId) -> Box>; + fn make_socket(&self, proto: ProtocolId) -> Self::SocketFuture; } /// A transport is a stream producing incoming connections. @@ -43,10 +46,10 @@ pub trait Transport { type RawConn: AsyncRead + AsyncWrite; /// The listener produces incoming connections. - type Listener: Stream; + type Listener: Stream; /// A future which indicates currently dialing to a peer. - type Dial: IntoFuture; + type Dial: IntoFuture; /// Listen on the given multi-addr. /// Returns the address back if it isn't supported.