diff --git a/core/src/connection/manager.rs b/core/src/connection/manager.rs index c0a4af44..c366a071 100644 --- a/core/src/connection/manager.rs +++ b/core/src/connection/manager.rs @@ -133,7 +133,7 @@ where /// the associated user data. #[derive(Debug)] struct TaskInfo { - /// channel endpoint to send messages to the task + /// Channel endpoint to send messages to the task. sender: mpsc::Sender>, /// The state of the task as seen by the `Manager`. state: TaskState, @@ -286,40 +286,6 @@ impl Manager { ConnectionId(task_id) } - /// Notifies the handlers of all managed connections of an event. - /// - /// This function is "atomic", in the sense that if `Poll::Pending` is - /// returned then no event has been sent. - #[must_use] - pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()> - where - I: Clone - { - for task in self.tasks.values_mut() { - if let Poll::Pending = task.sender.poll_ready(cx) { // (*) - return Poll::Pending; - } - } - - for (id, task) in self.tasks.iter_mut() { - let cmd = task::Command::NotifyHandler(event.clone()); - match task.sender.start_send(cmd) { - Ok(()) => {}, - Err(e) if e.is_full() => unreachable!("by (*)"), - Err(e) if e.is_disconnected() => { - // The background task ended. The manager will eventually be - // informed through an `Error` event from the task. - log::trace!("Connection dropped: {:?}", id); - }, - Err(e) => { - log::error!("Unexpected error: {:?}", e); - } - } - } - - Poll::Ready(()) - } - /// Gets an entry for a managed connection, if it exists. pub fn entry(&mut self, id: ConnectionId) -> Option> { if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) { diff --git a/core/src/connection/manager/task.rs b/core/src/connection/manager/task.rs index 9149e89e..4272722d 100644 --- a/core/src/connection/manager/task.rs +++ b/core/src/connection/manager/task.rs @@ -87,7 +87,7 @@ where commands: stream::Fuse>>, /// Inner state of this `Task`. - state: State, + state: State, } impl Task @@ -111,7 +111,6 @@ where state: State::Pending { future: Box::pin(future), handler, - events: Vec::new() }, } } @@ -133,7 +132,7 @@ where } /// The state associated with the `Task` of a connection. -enum State +enum State where M: StreamMuxer, H: IntoConnectionHandler, @@ -146,12 +145,6 @@ where future: Pin>, /// The intended handler for the established connection. handler: H, - /// While we are dialing the future, we need to buffer the events received via - /// `Command::NotifyHandler` so that they get delivered to the `handler` - /// once the connection is established. We can't leave these in `Task::receiver` - /// because we have to detect if the connection attempt has been aborted (by - /// dropping the corresponding `sender` owned by the manager). - events: Vec }, /// The connection is established and a new event is ready to be emitted. @@ -198,30 +191,29 @@ where 'poll: loop { match std::mem::replace(&mut this.state, State::Done) { - State::Pending { mut future, handler, mut events } => { - // Process commands from the manager. - loop { - match Stream::poll_next(Pin::new(&mut this.commands), cx) { - Poll::Pending => break, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(Command::NotifyHandler(event))) => - events.push(event), - } + State::Pending { mut future, handler } => { + // Check if the manager aborted this task by dropping the `commands` + // channel sender side. + match Stream::poll_next(Pin::new(&mut this.commands), cx) { + Poll::Pending => {}, + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(Command::NotifyHandler(_))) => unreachable!( + "Manager does not allow sending commands to pending tasks.", + ) } // Check if the connection succeeded. match Future::poll(Pin::new(&mut future), cx) { Poll::Ready(Ok((info, muxer))) => { - let mut c = Connection::new(muxer, handler.into_handler(&info)); - for event in events { - c.inject_event(event) - } this.state = State::EstablishedReady { - connection: Some(c), + connection: Some(Connection::new( + muxer, + handler.into_handler(&info), + )), event: Event::Established { id, info } } } Poll::Pending => { - this.state = State::Pending { future, handler, events }; + this.state = State::Pending { future, handler }; return Poll::Pending } Poll::Ready(Err(error)) => { @@ -338,4 +330,3 @@ where } } } - diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index e6441beb..300c649a 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -330,18 +330,6 @@ where id } - /// Sends an event to all nodes. - /// - /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event - /// has been sent to any node yet. - #[must_use] - pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()> - where - TInEvent: Clone - { - self.manager.poll_broadcast(event, cx) - } - /// Adds an existing established connection to the pool. /// /// Returns the assigned connection ID on success. An error is returned diff --git a/core/src/network.rs b/core/src/network.rs index 52d0da80..39ea68ed 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -269,18 +269,6 @@ where }) } - /// Notifies the connection handler of _every_ connection of _every_ peer of an event. - /// - /// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event - /// has been sent to any node yet. - #[must_use] - pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()> - where - TInEvent: Clone - { - self.pool.poll_broadcast(event, cx) - } - /// Returns a list of all connected peers, i.e. peers to whom the `Network` /// has at least one established connection. pub fn connected_peers(&self) -> impl Iterator { @@ -641,4 +629,3 @@ impl NetworkConfig { self } } -