* New core

* Fix lifetime requirements

* Remove identify transport

* Address &mut & ref ref mut

* Fix whitespaces
This commit is contained in:
Pierre Krieger
2018-10-17 10:17:40 +01:00
committed by GitHub
parent 724d0f5d82
commit 5d1c54cc10
52 changed files with 849 additions and 1492 deletions

View File

@ -134,8 +134,7 @@ impl TcpConfig {
impl Transport for TcpConfig {
type Output = TcpTransStream;
type Listener = TcpListenStream;
type ListenerUpgrade = FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;
type MultiaddrFuture = FutureResult<Multiaddr, IoError>;
type ListenerUpgrade = FutureResult<Self::Output, IoError>;
type Dial = TcpDialFut;
fn listen_on(self, addr: Multiaddr) -> Result<(Self::Listener, Multiaddr), (Self, Multiaddr)> {
@ -181,7 +180,6 @@ impl Transport for TcpConfig {
Ok(TcpDialFut {
inner: TcpStream::connect(&socket_addr),
config: self,
addr: Some(addr),
})
} else {
debug!("Instantly refusing dialing {}, as it is invalid", addr);
@ -260,32 +258,21 @@ pub struct TcpDialFut {
inner: ConnectFuture,
/// Original configuration.
config: TcpConfig,
/// Address we're dialing. Extracted when the `Future` finishes.
addr: Option<Multiaddr>,
}
impl Future for TcpDialFut {
type Item = (TcpTransStream, FutureResult<Multiaddr, IoError>);
type Item = TcpTransStream;
type Error = IoError;
fn poll(&mut self) -> Poll<(TcpTransStream, FutureResult<Multiaddr, IoError>), IoError> {
fn poll(&mut self) -> Poll<TcpTransStream, IoError> {
match self.inner.poll() {
Ok(Async::Ready(stream)) => {
apply_config(&self.config, &stream)?;
let addr = self
.addr
.take()
.expect("TcpDialFut polled again after finished");
let out = TcpTransStream { inner: stream };
Ok(Async::Ready((out, future::ok(addr))))
Ok(Async::Ready(TcpTransStream { inner: stream }))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(err) => {
let addr = self
.addr
.as_ref()
.expect("TcpDialFut polled again after finished");
debug!("Error while dialing {:?} => {:?}", addr, err);
debug!("Error while dialing => {:?}", err);
Err(err)
}
}
@ -300,13 +287,13 @@ pub struct TcpListenStream {
}
impl Stream for TcpListenStream {
type Item = FutureResult<(TcpTransStream, FutureResult<Multiaddr, IoError>), IoError>;
type Item = (FutureResult<TcpTransStream, IoError>, Multiaddr);
type Error = IoError;
fn poll(
&mut self,
) -> Poll<
Option<FutureResult<(TcpTransStream, FutureResult<Multiaddr, IoError>), IoError>>,
Option<(FutureResult<TcpTransStream, IoError>, Multiaddr)>,
IoError,
> {
let inner = match self.inner {
@ -316,28 +303,36 @@ impl Stream for TcpListenStream {
}
};
match inner.poll() {
Ok(Async::Ready(Some(sock))) => {
match apply_config(&self.config, &sock) {
Ok(()) => (),
Err(err) => return Ok(Async::Ready(Some(future::err(err)))),
};
loop {
match inner.poll() {
Ok(Async::Ready(Some(sock))) => {
let addr = match sock.peer_addr() {
// TODO: remove this expect()
Ok(addr) => addr
.to_multiaddr()
.expect("generating a multiaddr from a socket addr never fails"),
Err(err) => {
// If we can't get the address of the newly-opened socket, there's
// nothing we can except ignore this connection attempt.
error!("Ignored incoming because could't determine its \
address: {:?}", err);
continue
},
};
let addr = match sock.peer_addr() {
// TODO: remove this expect()
Ok(addr) => addr
.to_multiaddr()
.expect("generating a multiaddr from a socket addr never fails"),
Err(err) => return Ok(Async::Ready(Some(future::err(err)))),
};
match apply_config(&self.config, &sock) {
Ok(()) => (),
Err(err) => return Ok(Async::Ready(Some((future::err(err), addr)))),
};
debug!("Incoming connection from {}", addr);
let ret = future::ok((TcpTransStream { inner: sock }, future::ok(addr)));
Ok(Async::Ready(Some(ret)))
debug!("Incoming connection from {}", addr);
let ret = future::ok(TcpTransStream { inner: sock });
break Ok(Async::Ready(Some((ret, addr))))
}
Ok(Async::Ready(None)) => break Ok(Async::Ready(None)),
Ok(Async::NotReady) => break Ok(Async::NotReady),
Err(()) => unreachable!("sleep_on_error never produces an error"),
}
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(()) => unreachable!("sleep_on_error never produces an error"),
}
}
}
@ -465,8 +460,8 @@ mod tests {
std::thread::spawn(move || {
let addr = "/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap();
let tcp = TcpConfig::new();
let listener = tcp.listen_on(addr).unwrap().0.for_each(|sock| {
sock.and_then(|(sock, _)| {
let listener = tcp.listen_on(addr).unwrap().0.for_each(|(sock, _)| {
sock.and_then(|sock| {
// Define what to do with the socket that just connected to us
// Which in this case is read 3 bytes
let handle_conn = tokio_io::io::read_exact(sock, [0; 3])
@ -489,7 +484,7 @@ mod tests {
let socket = tcp.dial(addr.clone()).unwrap();
// Define what to do with the socket once it's obtained
let action = socket.then(|sock| -> Result<(), ()> {
sock.unwrap().0.write(&[0x1, 0x2, 0x3]).unwrap();
sock.unwrap().write(&[0x1, 0x2, 0x3]).unwrap();
Ok(())
});
// Execute the future in our event loop