Set task correctly when not blocked

This commit is contained in:
Vurich
2017-12-11 17:57:11 +01:00
parent 496100cdb5
commit 624d2238f9
2 changed files with 34 additions and 33 deletions

View File

@ -356,6 +356,7 @@ where
mod tests { mod tests {
use super::*; use super::*;
use std::io; use std::io;
use tokio_io::io as tokio;
#[test] #[test]
fn can_use_one_stream() { fn can_use_one_stream() {
@ -367,7 +368,7 @@ mod tests {
let mut substream = mplex.clone().outbound().wait().unwrap(); let mut substream = mplex.clone().outbound().wait().unwrap();
assert!(substream.write(message).is_ok()); assert!(tokio::write_all(&mut substream, message).wait().is_ok());
let id = substream.id(); let id = substream.id();
@ -394,7 +395,7 @@ mod tests {
let mut buf = vec![0; message.len()]; let mut buf = vec![0; message.len()];
assert!(substream.read(&mut buf).is_ok()); assert!(tokio::read(&mut substream, &mut buf).wait().is_ok());
assert_eq!(&buf, message); assert_eq!(&buf, message);
} }
@ -404,35 +405,31 @@ mod tests {
let mplex = Multiplex::dial(stream); let mplex = Multiplex::dial(stream);
let mut outbound: Vec<Substream<_>> = vec![ let mut outbound: Vec<Substream<_>> = vec![];
mplex.clone().outbound().wait().unwrap(),
mplex.clone().outbound().wait().unwrap(), for _ in 0..5 {
mplex.clone().outbound().wait().unwrap(), outbound.push(mplex.clone().outbound().wait().unwrap());
mplex.clone().outbound().wait().unwrap(), }
mplex.clone().outbound().wait().unwrap(),
];
outbound.sort_by_key(|a| a.id()); outbound.sort_by_key(|a| a.id());
for (i, substream) in outbound.iter_mut().enumerate() { for (i, substream) in outbound.iter_mut().enumerate() {
assert!(substream.write(i.to_string().as_bytes()).is_ok()); assert!(tokio::write_all(substream, i.to_string().as_bytes()).wait().is_ok());
} }
let stream = io::Cursor::new(mplex.state.lock().stream.get_ref().clone()); let stream = io::Cursor::new(mplex.state.lock().stream.get_ref().clone());
let mplex = Multiplex::listen(stream); let mplex = Multiplex::listen(stream);
let mut inbound: Vec<Substream<_>> = vec![ let mut inbound: Vec<Substream<_>> = vec![];
mplex.clone().inbound().wait().unwrap(),
mplex.clone().inbound().wait().unwrap(), for _ in 0..5 {
mplex.clone().inbound().wait().unwrap(), inbound.push(mplex.clone().inbound().wait().unwrap());
mplex.clone().inbound().wait().unwrap(), }
mplex.clone().inbound().wait().unwrap(),
];
inbound.sort_by_key(|a| a.id()); inbound.sort_by_key(|a| a.id());
for (substream, outbound) in inbound.iter_mut().zip(outbound.iter()) { for (mut substream, outbound) in inbound.iter_mut().zip(outbound.iter()) {
let id = outbound.id(); let id = outbound.id();
assert_eq!(id, substream.id()); assert_eq!(id, substream.id());
assert_eq!( assert_eq!(
@ -443,7 +440,7 @@ mod tests {
); );
let mut buf = [0; 3]; let mut buf = [0; 3];
assert_eq!(substream.read(&mut buf).unwrap(), 1); assert_eq!(tokio::read(&mut substream, &mut buf).wait().unwrap().2, 1);
} }
} }
@ -494,7 +491,7 @@ mod tests {
let mut buf = vec![0; message.len()]; let mut buf = vec![0; message.len()];
assert!(substream.read(&mut buf).is_ok()); assert!(tokio::read(&mut substream, &mut buf).wait().is_ok());
assert_eq!(&buf, message); assert_eq!(&buf, message);
} }
@ -541,7 +538,7 @@ mod tests {
assert_eq!(substream.id(), 0); assert_eq!(substream.id(), 0);
assert_eq!(substream.name(), None); assert_eq!(substream.name(), None);
assert_eq!(substream.read(&mut [0; 100][..]).unwrap(), 0); assert_eq!(tokio::read(&mut substream, &mut [0; 100][..]).wait().unwrap().2, 0);
} }
#[test] #[test]
@ -589,7 +586,7 @@ mod tests {
for _ in 0..20 { for _ in 0..20 {
let mut buf = [0; 1]; let mut buf = [0; 1];
assert_eq!(substream.read(&mut buf[..]).unwrap(), 1); assert_eq!(tokio::read(&mut substream, &mut buf[..]).wait().unwrap().2, 1);
out.push(buf[0]); out.push(buf[0]);
} }

View File

@ -95,6 +95,21 @@ pub fn read_stream<'a, O: Into<Option<(u32, &'a mut [u8])>>, T: AsyncRead>(
Err(io::ErrorKind::WouldBlock.into()) Err(io::ErrorKind::WouldBlock.into())
}; };
if let Some((ref mut id, ..)) = stream_data {
// TODO: We can do this only hashing `id` once using the entry API.
let write = lock.open_streams
.get(id)
.and_then(SubstreamMetadata::write_task)
.cloned();
lock.open_streams.insert(
*id,
SubstreamMetadata::Open {
read: Some(task::current()),
write,
},
);
}
loop { loop {
match lock.read_state.take().unwrap_or_default() { match lock.read_state.take().unwrap_or_default() {
Header { Header {
@ -341,17 +356,6 @@ pub fn read_stream<'a, O: Into<Option<(u32, &'a mut [u8])>>, T: AsyncRead>(
task.notify(); task.notify();
} }
let write = lock.open_streams
.get(id)
.and_then(SubstreamMetadata::write_task)
.cloned();
lock.open_streams.insert(
*id,
SubstreamMetadata::Open {
read: Some(task::current()),
write,
},
);
// We cannot make progress here, another stream has to accept this packet // We cannot make progress here, another stream has to accept this packet
return on_block; return on_block;
} }