Cleanup and remove unnecessary trait objects

This commit is contained in:
Vurich 2017-10-23 11:45:35 +02:00
parent 0e4375fc90
commit 384d15e24a
3 changed files with 187 additions and 150 deletions

View File

@ -3,39 +3,45 @@
extern crate futures; extern crate futures;
extern crate libp2p_transport as transport; extern crate libp2p_transport as transport;
use futures::{Future, IntoFuture, BoxFuture}; use futures::{Future, IntoFuture};
use transport::{ProtocolId, MultiAddr, Socket}; use transport::{ProtocolId, Socket};
use transport::multiaddr::Multiaddr;
/// Produces a future for each incoming `Socket`. /// Produces a future for each incoming `Socket`.
pub trait Handler<S: Socket> { pub trait Handler<S: Socket> {
type Future: IntoFuture<Item=(), Error=()>; type Future: IntoFuture<Item = (), Error = ()>;
/// Handle the incoming socket, producing a future which should resolve /// Handle the incoming socket, producing a future which should resolve
/// when the handler is finished. /// when the handler is finished.
fn handle(&self, socket: S) -> Self::Future; fn handle(&self, socket: S) -> Self::Future;
fn boxed(self) -> BoxHandler<S> where fn boxed(self) -> BoxHandler<S>
Self: Sized + Send + 'static, where
<Self::Future as IntoFuture>::Future: Send + 'static Self: Sized + Send + 'static,
<Self::Future as IntoFuture>::Future: Send + 'static,
{ {
BoxHandler(Box::new(move |socket| BoxHandler(Box::new(move |socket| {
self.handle(socket).into_future().boxed() Box::new(self.handle(socket).into_future()) as _
)) }))
} }
} }
impl<S: Socket, F, U> Handler<S> for F impl<S: Socket, F, U> Handler<S> for F
where F: Fn(S) -> U, U: IntoFuture<Item=(), Error=()> where
F: Fn(S) -> U,
U: IntoFuture<Item = (), Error = ()>,
{ {
type Future = U; type Future = U;
fn handle(&self, socket: S) -> U { (self)(socket) } fn handle(&self, socket: S) -> U {
(self)(socket)
}
} }
/// A boxed handler. /// A boxed handler.
pub struct BoxHandler<S: Socket>(Box<Handler<S, Future=BoxFuture<(), ()>>>); pub struct BoxHandler<S: Socket>(Box<Handler<S, Future = Box<Future<Item = (), Error = ()>>>>);
impl<S: Socket> Handler<S> for BoxHandler<S> { impl<S: Socket> Handler<S> for BoxHandler<S> {
type Future = BoxFuture<(), ()>; type Future = Box<Future<Item = (), Error = ()>>;
fn handle(&self, socket: S) -> Self::Future { fn handle(&self, socket: S) -> Self::Future {
self.0.handle(socket) self.0.handle(socket)
@ -49,7 +55,7 @@ pub trait Mux: Sync {
/// Attach an incoming socket. /// Attach an incoming socket.
fn push(&self, socket: Self::Socket); fn push(&self, socket: Self::Socket);
/// Set the socket handler for a given protocol id. /// Set the socket handler for a given protocol id.
fn set_handler(&self, proto: ProtocolId, handler: BoxHandler<Self::Socket>); fn set_handler(&self, proto: ProtocolId, handler: BoxHandler<Self::Socket>);
@ -59,19 +65,21 @@ pub trait Mux: Sync {
/// Unimplemented. Maps peer IDs to connected addresses, protocols, and data. /// Unimplemented. Maps peer IDs to connected addresses, protocols, and data.
pub trait PeerStore {} pub trait PeerStore {}
/// This is a common abstraction over the low-level bits of libp2p. /// This is a common abstraction over the low-level bits of libp2p.
/// ///
/// It handles connecting over, adding and removing transports, /// It handles connecting over, adding and removing transports,
/// wraps an arbitrary event loop, and manages protocol IDs. /// wraps an arbitrary event loop, and manages protocol IDs.
pub trait Host { pub trait Host {
type Socket: Socket; type Socket: Socket;
type Mux: Mux<Socket = Self::Socket>;
type Multiaddrs: IntoIterator<Item = Multiaddr>;
/// Get a handle to the peer store. /// Get a handle to the peer store.
fn peer_store(&self) -> &PeerStore; fn peer_store(&self) -> &PeerStore;
/// Get a handle to the underlying muxer. /// Get a handle to the underlying muxer.
fn mux(&self) -> &Mux<Socket=Self::Socket>; fn mux(&self) -> &Self::Mux;
/// Set the socket handler for a given protocol id. /// Set the socket handler for a given protocol id.
fn set_handler(&self, proto: ProtocolId, handler: BoxHandler<Self::Socket>) { fn set_handler(&self, proto: ProtocolId, handler: BoxHandler<Self::Socket>) {
@ -84,5 +92,5 @@ pub trait Host {
} }
/// Addresses we're listening on. /// Addresses we're listening on.
fn listen_addrs(&self) -> Vec<MultiAddr>; fn listen_addrs(&self) -> Self::Multiaddrs;
} }

View File

@ -14,153 +14,179 @@ use multiaddr::{Multiaddr, Protocol};
use transport::Transport; use transport::Transport;
pub struct Tcp { pub struct Tcp {
pub event_loop: Core, pub event_loop: Core,
} }
impl Tcp { impl Tcp {
pub fn new() -> Result<Tcp, IoError> { pub fn new() -> Result<Tcp, IoError> {
Ok(Tcp { Ok(Tcp { event_loop: Core::new()? })
event_loop: Core::new()?, }
})
}
} }
impl Transport for Tcp { impl Transport for Tcp {
/// The raw connection. /// The raw connection.
type RawConn = TcpStream; type RawConn = TcpStream;
/// The listener produces incoming connections. /// The listener produces incoming connections.
type Listener = Box<Stream<Item=Self::RawConn, Error=IoError>>; type Listener = Box<Stream<Item = Self::RawConn, Error = IoError>>;
/// A future which indicates currently dialing to a peer. /// A future which indicates currently dialing to a peer.
type Dial = TcpStreamNew; type Dial = TcpStreamNew;
/// Listen on the given multi-addr. /// Listen on the given multi-addr.
/// Returns the address back if it isn't supported. /// Returns the address back if it isn't supported.
fn listen_on(&mut self, addr: Multiaddr) -> Result<Self::Listener, Multiaddr> { fn listen_on(&mut self, addr: Multiaddr) -> Result<Self::Listener, Multiaddr> {
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
Ok(Box::new(futures::future::result(TcpListener::bind(&socket_addr, &self.event_loop.handle())).map(|listener| { Ok(Box::new(
// Pull out a stream of sockets for incoming connections futures::future::result(
listener.incoming().map(|x| x.0) TcpListener::bind(&socket_addr, &self.event_loop.handle()),
}).flatten_stream())) ).map(|listener| {
} else { // Pull out a stream of sockets for incoming connections
Err(addr) listener.incoming().map(|x| x.0)
} })
} .flatten_stream(),
))
} else {
Err(addr)
}
}
/// Dial to the given multi-addr. /// Dial to the given multi-addr.
/// Returns either a future which may resolve to a connection, /// Returns either a future which may resolve to a connection,
/// or gives back the multiaddress. /// or gives back the multiaddress.
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, Multiaddr> { fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, Multiaddr> {
if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
Ok(TcpStream::connect(&socket_addr, &self.event_loop.handle())) Ok(TcpStream::connect(&socket_addr, &self.event_loop.handle()))
} else { } else {
Err(addr) Err(addr)
} }
} }
} }
// This type of logic should probably be moved into the multiaddr package // This type of logic should probably be moved into the multiaddr package
fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, &Multiaddr> { fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, &Multiaddr> {
let protocols = addr.protocol(); let protocols = addr.protocol();
match (protocols[0], protocols[1]) {
(Protocol::IP4, Protocol::TCP) => { // TODO: This is nonconforming (since a multiaddr could specify TCP first) but we can't fix that
let bs = addr.as_slice(); // until multiaddrs-rs is improved.
Ok(SocketAddr::new( match (protocols[0], protocols[1]) {
IpAddr::V4(Ipv4Addr::new(bs[1], bs[2], bs[3], bs[4])), (Protocol::IP4, Protocol::TCP) => {
(bs[6] as u16) << 8 | bs[7] as u16 let bs = addr.as_slice();
)) Ok(SocketAddr::new(
}, IpAddr::V4(Ipv4Addr::new(bs[1], bs[2], bs[3], bs[4])),
(Protocol::IP6, Protocol::TCP) => { (bs[6] as u16) << 8 | bs[7] as u16,
let bs = addr.as_slice(); ))
if let Ok(Some(s)) = Protocol::IP6.bytes_to_string(&bs[1..17]) { }
if let Ok(ipv6addr) = s.parse() { (Protocol::IP6, Protocol::TCP) => {
return Ok(SocketAddr::new(IpAddr::V6(ipv6addr), (bs[18] as u16) << 8 | bs[19] as u16)) let bs = addr.as_slice();
} if let Ok(Some(s)) = Protocol::IP6.bytes_to_string(&bs[1..17]) {
} if let Ok(ipv6addr) = s.parse() {
Err(addr) return Ok(SocketAddr::new(
}, IpAddr::V6(ipv6addr),
_ => Err(addr), (bs[18] as u16) << 8 | bs[19] as u16,
} ));
}
}
Err(addr)
}
_ => Err(addr),
}
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::{Tcp, multiaddr_to_socketaddr}; use super::{Tcp, multiaddr_to_socketaddr};
use std; use std;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tokio_io; use tokio_io;
use futures::Future; use futures::Future;
use futures::stream::Stream; use futures::stream::Stream;
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
use transport::Transport; use transport::Transport;
#[test] #[test]
fn multiaddr_to_tcp_conversion() { fn multiaddr_to_tcp_conversion() {
use std::net::{Ipv6Addr}; use std::net::Ipv6Addr;
assert_eq!( assert_eq!(
multiaddr_to_socketaddr(&Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap()), multiaddr_to_socketaddr(&Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap()),
Ok(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345)) Ok(SocketAddr::new(
); IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
assert_eq!( 12345,
multiaddr_to_socketaddr(&Multiaddr::new("/ip4/255.255.255.255/tcp/8080").unwrap()), ))
Ok(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)), 8080)) );
); assert_eq!(
assert_eq!( multiaddr_to_socketaddr(&Multiaddr::new("/ip4/255.255.255.255/tcp/8080").unwrap()),
multiaddr_to_socketaddr(&Multiaddr::new("/ip6/::1/tcp/12345").unwrap()), Ok(SocketAddr::new(
Ok(SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345)) IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
); 8080,
assert_eq!( ))
multiaddr_to_socketaddr(&Multiaddr::new("/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080").unwrap()), );
Ok(SocketAddr::new(IpAddr::V6(Ipv6Addr::new(65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535)), 8080)) assert_eq!(
); multiaddr_to_socketaddr(&Multiaddr::new("/ip6/::1/tcp/12345").unwrap()),
} Ok(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
12345,
))
);
assert_eq!(
multiaddr_to_socketaddr(&Multiaddr::new(
"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080",
).unwrap()),
Ok(SocketAddr::new(
IpAddr::V6(Ipv6Addr::new(
65535,
65535,
65535,
65535,
65535,
65535,
65535,
65535,
)),
8080,
))
);
}
#[test] #[test]
fn communicating_between_dialer_and_listener() { fn communicating_between_dialer_and_listener() {
use std::io::Write; use std::io::Write;
/// This thread is running the listener std::thread::spawn(move || {
/// while the main thread runs the dialer let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap();
std::thread::spawn(move || { let mut tcp = Tcp::new().unwrap();
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); let handle = tcp.event_loop.handle();
let mut tcp = Tcp::new().unwrap(); let listener = tcp.listen_on(addr).unwrap().for_each(|sock| {
let handle = tcp.event_loop.handle(); // Define what to do with the socket that just connected to us
let listener = tcp.listen_on(addr).unwrap().for_each(|sock| { // Which in this case is read 3 bytes
// Define what to do with the socket that just connected to us let handle_conn = tokio_io::io::read_exact(sock, [0; 3])
// Which in this case is read 3 bytes .map(|(_, buf)| assert_eq!(buf, [1, 2, 3]))
let handle_conn = tokio_io::io::read_exact(sock, [0; 3]).map(|(_, buf)| { .map_err(|err| panic!("IO error {:?}", err));
assert_eq!(buf, [1,2,3])
}).map_err(|err| {
panic!("IO error {:?}", err)
});
// Spawn the future as a concurrent task // Spawn the future as a concurrent task
handle.spawn(handle_conn); handle.spawn(handle_conn);
Ok(()) Ok(())
}); });
tcp.event_loop.run(listener).unwrap(); tcp.event_loop.run(listener).unwrap();
}); });
std::thread::sleep(std::time::Duration::from_millis(100)); std::thread::sleep(std::time::Duration::from_millis(100));
let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap(); let addr = Multiaddr::new("/ip4/127.0.0.1/tcp/12345").unwrap();
let mut tcp = Tcp::new().unwrap(); let mut tcp = Tcp::new().unwrap();
// Obtain a future socket through dialing // Obtain a future socket through dialing
let socket = tcp.dial(addr.clone()).unwrap(); let socket = tcp.dial(addr.clone()).unwrap();
// Define what to do with the socket once it's obtained // Define what to do with the socket once it's obtained
let action = socket.then(|sock| { let action = socket.then(|sock| match sock {
match sock { Ok(mut s) => {
Ok(mut s) => { let written = s.write(&[0x1, 0x2, 0x3]).unwrap();
let written = s.write(&[0x1,0x2,0x3]).unwrap(); Ok(written)
Ok(written) }
} Err(x) => Err(x),
Err(x) => Err(x) });
} // Execute the future in our event loop
}); tcp.event_loop.run(action).unwrap();
// Execute the future in our event loop std::thread::sleep(std::time::Duration::from_millis(100));
tcp.event_loop.run(action).unwrap(); }
std::thread::sleep(std::time::Duration::from_millis(100));
}
} }

View File

@ -19,21 +19,24 @@ pub type PeerId = String;
/// A logical wire between us and a peer. We can read and write through this asynchronously. /// A logical wire between us and a peer. We can read and write through this asynchronously.
/// ///
/// You can have multiple `Socket`s between you and any given peer. /// You can have multiple `Socket`s between you and any given peer.
pub trait Socket: AsyncRead + AsyncWrite { pub trait Socket: AsyncRead + AsyncWrite + Sized {
type Conn: Conn<Socket = Self>;
/// Get the protocol ID this socket uses. /// Get the protocol ID this socket uses.
fn protocol_id(&self) -> ProtocolId; fn protocol_id(&self) -> ProtocolId;
/// Access the underlying connection. /// Access the underlying connection.
fn conn(&self) -> &Conn<Socket=Self>; fn conn(&self) -> &Self::Conn;
} }
/// A connection between you and a peer. /// A connection between you and a peer.
pub trait Conn { pub trait Conn {
/// The socket type this connection manages. /// The socket type this connection manages.
type Socket; type Socket;
type SocketFuture: IntoFuture<Item = Self::Socket, Error = IoError>;
/// Initiate a socket between you and the peer on the given protocol. /// Initiate a socket between you and the peer on the given protocol.
fn make_socket(&self, proto: ProtocolId) -> Box<Future<Item=Self::Socket, Error=IoError>>; fn make_socket(&self, proto: ProtocolId) -> Self::SocketFuture;
} }
/// A transport is a stream producing incoming connections. /// A transport is a stream producing incoming connections.
@ -43,10 +46,10 @@ pub trait Transport {
type RawConn: AsyncRead + AsyncWrite; type RawConn: AsyncRead + AsyncWrite;
/// The listener produces incoming connections. /// The listener produces incoming connections.
type Listener: Stream<Item=Self::RawConn>; type Listener: Stream<Item = Self::RawConn>;
/// A future which indicates currently dialing to a peer. /// A future which indicates currently dialing to a peer.
type Dial: IntoFuture<Item=Self::RawConn, Error=IoError>; type Dial: IntoFuture<Item = Self::RawConn, Error = IoError>;
/// Listen on the given multi-addr. /// Listen on the given multi-addr.
/// Returns the address back if it isn't supported. /// Returns the address back if it isn't supported.