mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-12 01:21:21 +00:00
core/src: Remove poll_broadcast connection notification mechanism (#1527)
* core/src: Remove poll_broadcast connection notification mechanism The `Network::poll_broadcast` function has not proven to be useful. This commit removes the mechanism all the way down to the connection manager. With `poll_broadcast` gone there is no mechanism left to send commands to pending connections. Thereby command buffering for pending connections is not needed anymore and is thus removed in this commit as well. * core/src/connection/manager.rs: Remove warning comment Co-Authored-By: Pierre Krieger <pierre.krieger1708@gmail.com> Co-authored-by: Pierre Krieger <pierre.krieger1708@gmail.com>
This commit is contained in:
@ -133,7 +133,7 @@ where
|
||||
/// the associated user data.
|
||||
#[derive(Debug)]
|
||||
struct TaskInfo<I, C> {
|
||||
/// channel endpoint to send messages to the task
|
||||
/// Channel endpoint to send messages to the task.
|
||||
sender: mpsc::Sender<task::Command<I>>,
|
||||
/// The state of the task as seen by the `Manager`.
|
||||
state: TaskState<C>,
|
||||
@ -286,40 +286,6 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
|
||||
ConnectionId(task_id)
|
||||
}
|
||||
|
||||
/// Notifies the handlers of all managed connections of an event.
|
||||
///
|
||||
/// This function is "atomic", in the sense that if `Poll::Pending` is
|
||||
/// returned then no event has been sent.
|
||||
#[must_use]
|
||||
pub fn poll_broadcast(&mut self, event: &I, cx: &mut Context) -> Poll<()>
|
||||
where
|
||||
I: Clone
|
||||
{
|
||||
for task in self.tasks.values_mut() {
|
||||
if let Poll::Pending = task.sender.poll_ready(cx) { // (*)
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
|
||||
for (id, task) in self.tasks.iter_mut() {
|
||||
let cmd = task::Command::NotifyHandler(event.clone());
|
||||
match task.sender.start_send(cmd) {
|
||||
Ok(()) => {},
|
||||
Err(e) if e.is_full() => unreachable!("by (*)"),
|
||||
Err(e) if e.is_disconnected() => {
|
||||
// The background task ended. The manager will eventually be
|
||||
// informed through an `Error` event from the task.
|
||||
log::trace!("Connection dropped: {:?}", id);
|
||||
},
|
||||
Err(e) => {
|
||||
log::error!("Unexpected error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(())
|
||||
}
|
||||
|
||||
/// Gets an entry for a managed connection, if it exists.
|
||||
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, I, C>> {
|
||||
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
|
||||
|
@ -87,7 +87,7 @@ where
|
||||
commands: stream::Fuse<mpsc::Receiver<Command<I>>>,
|
||||
|
||||
/// Inner state of this `Task`.
|
||||
state: State<F, M, H, I, O, E, C>,
|
||||
state: State<F, M, H, O, E, C>,
|
||||
}
|
||||
|
||||
impl<F, M, H, I, O, E, C> Task<F, M, H, I, O, E, C>
|
||||
@ -111,7 +111,6 @@ where
|
||||
state: State::Pending {
|
||||
future: Box::pin(future),
|
||||
handler,
|
||||
events: Vec::new()
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -133,7 +132,7 @@ where
|
||||
}
|
||||
|
||||
/// The state associated with the `Task` of a connection.
|
||||
enum State<F, M, H, I, O, E, C>
|
||||
enum State<F, M, H, O, E, C>
|
||||
where
|
||||
M: StreamMuxer,
|
||||
H: IntoConnectionHandler<C>,
|
||||
@ -146,12 +145,6 @@ where
|
||||
future: Pin<Box<F>>,
|
||||
/// The intended handler for the established connection.
|
||||
handler: H,
|
||||
/// While we are dialing the future, we need to buffer the events received via
|
||||
/// `Command::NotifyHandler` so that they get delivered to the `handler`
|
||||
/// once the connection is established. We can't leave these in `Task::receiver`
|
||||
/// because we have to detect if the connection attempt has been aborted (by
|
||||
/// dropping the corresponding `sender` owned by the manager).
|
||||
events: Vec<I>
|
||||
},
|
||||
|
||||
/// The connection is established and a new event is ready to be emitted.
|
||||
@ -198,30 +191,29 @@ where
|
||||
|
||||
'poll: loop {
|
||||
match std::mem::replace(&mut this.state, State::Done) {
|
||||
State::Pending { mut future, handler, mut events } => {
|
||||
// Process commands from the manager.
|
||||
loop {
|
||||
match Stream::poll_next(Pin::new(&mut this.commands), cx) {
|
||||
Poll::Pending => break,
|
||||
Poll::Ready(None) => return Poll::Ready(()),
|
||||
Poll::Ready(Some(Command::NotifyHandler(event))) =>
|
||||
events.push(event),
|
||||
}
|
||||
State::Pending { mut future, handler } => {
|
||||
// Check if the manager aborted this task by dropping the `commands`
|
||||
// channel sender side.
|
||||
match Stream::poll_next(Pin::new(&mut this.commands), cx) {
|
||||
Poll::Pending => {},
|
||||
Poll::Ready(None) => return Poll::Ready(()),
|
||||
Poll::Ready(Some(Command::NotifyHandler(_))) => unreachable!(
|
||||
"Manager does not allow sending commands to pending tasks.",
|
||||
)
|
||||
}
|
||||
// Check if the connection succeeded.
|
||||
match Future::poll(Pin::new(&mut future), cx) {
|
||||
Poll::Ready(Ok((info, muxer))) => {
|
||||
let mut c = Connection::new(muxer, handler.into_handler(&info));
|
||||
for event in events {
|
||||
c.inject_event(event)
|
||||
}
|
||||
this.state = State::EstablishedReady {
|
||||
connection: Some(c),
|
||||
connection: Some(Connection::new(
|
||||
muxer,
|
||||
handler.into_handler(&info),
|
||||
)),
|
||||
event: Event::Established { id, info }
|
||||
}
|
||||
}
|
||||
Poll::Pending => {
|
||||
this.state = State::Pending { future, handler, events };
|
||||
this.state = State::Pending { future, handler };
|
||||
return Poll::Pending
|
||||
}
|
||||
Poll::Ready(Err(error)) => {
|
||||
@ -338,4 +330,3 @@ where
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -330,18 +330,6 @@ where
|
||||
id
|
||||
}
|
||||
|
||||
/// Sends an event to all nodes.
|
||||
///
|
||||
/// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
|
||||
/// has been sent to any node yet.
|
||||
#[must_use]
|
||||
pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()>
|
||||
where
|
||||
TInEvent: Clone
|
||||
{
|
||||
self.manager.poll_broadcast(event, cx)
|
||||
}
|
||||
|
||||
/// Adds an existing established connection to the pool.
|
||||
///
|
||||
/// Returns the assigned connection ID on success. An error is returned
|
||||
|
@ -269,18 +269,6 @@ where
|
||||
})
|
||||
}
|
||||
|
||||
/// Notifies the connection handler of _every_ connection of _every_ peer of an event.
|
||||
///
|
||||
/// This function is "atomic", in the sense that if `Poll::Pending` is returned then no event
|
||||
/// has been sent to any node yet.
|
||||
#[must_use]
|
||||
pub fn poll_broadcast(&mut self, event: &TInEvent, cx: &mut Context) -> Poll<()>
|
||||
where
|
||||
TInEvent: Clone
|
||||
{
|
||||
self.pool.poll_broadcast(event, cx)
|
||||
}
|
||||
|
||||
/// Returns a list of all connected peers, i.e. peers to whom the `Network`
|
||||
/// has at least one established connection.
|
||||
pub fn connected_peers(&self) -> impl Iterator<Item = &TPeerId> {
|
||||
@ -641,4 +629,3 @@ impl NetworkConfig {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user