mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-27 08:41:36 +00:00
core/: Remove DisconnectedPeer::set_connected and Pool::add (#2195)
This logic seems to be a leftover of https://github.com/libp2p/rust-libp2p/pull/889 and unused today.
This commit is contained in:
@ -20,8 +20,8 @@
|
||||
|
||||
use super::{
|
||||
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
|
||||
Connected, ConnectedPoint, Connection, ConnectionError, ConnectionHandler,
|
||||
IntoConnectionHandler, PendingConnectionError, Substream,
|
||||
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, IntoConnectionHandler,
|
||||
PendingConnectionError, Substream,
|
||||
};
|
||||
use crate::{muxing::StreamMuxer, Executor};
|
||||
use fnv::FnvHashMap;
|
||||
@ -276,40 +276,6 @@ impl<H: IntoConnectionHandler, TE> Manager<H, TE> {
|
||||
ConnectionId(task_id)
|
||||
}
|
||||
|
||||
/// Adds an existing connection to the manager.
|
||||
pub fn add<M>(&mut self, conn: Connection<M, H::Handler>, info: Connected) -> ConnectionId
|
||||
where
|
||||
H: IntoConnectionHandler + Send + 'static,
|
||||
H::Handler: ConnectionHandler<Substream = Substream<M>> + Send + 'static,
|
||||
<H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
|
||||
TE: error::Error + Send + 'static,
|
||||
M: StreamMuxer + Send + Sync + 'static,
|
||||
M::OutboundSubstream: Send + 'static,
|
||||
{
|
||||
let task_id = self.next_task_id;
|
||||
self.next_task_id.0 += 1;
|
||||
|
||||
let (tx, rx) = mpsc::channel(self.task_command_buffer_size);
|
||||
self.tasks.insert(
|
||||
task_id,
|
||||
TaskInfo {
|
||||
sender: tx,
|
||||
state: TaskState::Established(info),
|
||||
},
|
||||
);
|
||||
|
||||
let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _>>> =
|
||||
Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn));
|
||||
|
||||
if let Some(executor) = &mut self.executor {
|
||||
executor.exec(task);
|
||||
} else {
|
||||
self.local_spawns.push(task);
|
||||
}
|
||||
|
||||
ConnectionId(task_id)
|
||||
}
|
||||
|
||||
/// Gets an entry for a managed connection, if it exists.
|
||||
pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, THandlerInEvent<H>>> {
|
||||
if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
|
||||
|
@ -130,24 +130,6 @@ where
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a task for an existing node we are already connected to.
|
||||
pub fn established(
|
||||
id: TaskId,
|
||||
events: mpsc::Sender<Event<H, E>>,
|
||||
commands: mpsc::Receiver<Command<THandlerInEvent<H>>>,
|
||||
connection: Connection<M, H::Handler>,
|
||||
) -> Self {
|
||||
Task {
|
||||
id,
|
||||
events,
|
||||
commands: commands.fuse(),
|
||||
state: State::Established {
|
||||
connection,
|
||||
event: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The state associated with the `Task` of a connection.
|
||||
|
@ -20,11 +20,10 @@
|
||||
|
||||
use crate::{
|
||||
connection::{
|
||||
self,
|
||||
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
|
||||
manager::{self, Manager, ManagerConfig},
|
||||
Connected, Connection, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit,
|
||||
IncomingInfo, IntoConnectionHandler, OutgoingInfo, PendingConnectionError, Substream,
|
||||
Connected, ConnectionError, ConnectionHandler, ConnectionId, ConnectionLimit, IncomingInfo,
|
||||
IntoConnectionHandler, OutgoingInfo, PendingConnectionError, Substream,
|
||||
},
|
||||
muxing::StreamMuxer,
|
||||
ConnectedPoint, PeerId,
|
||||
@ -313,37 +312,6 @@ impl<THandler: IntoConnectionHandler, TTransErr> Pool<THandler, TTransErr> {
|
||||
id
|
||||
}
|
||||
|
||||
/// Adds an existing established connection to the pool.
|
||||
///
|
||||
/// Returns the assigned connection ID on success. An error is returned
|
||||
/// if the configured maximum number of established connections for the
|
||||
/// connected peer has been reached.
|
||||
pub fn add<TMuxer>(
|
||||
&mut self,
|
||||
c: Connection<TMuxer, THandler::Handler>,
|
||||
i: Connected,
|
||||
) -> Result<ConnectionId, ConnectionLimit>
|
||||
where
|
||||
THandler: IntoConnectionHandler + Send + 'static,
|
||||
THandler::Handler:
|
||||
ConnectionHandler<Substream = connection::Substream<TMuxer>> + Send + 'static,
|
||||
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
|
||||
TTransErr: error::Error + Send + 'static,
|
||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||
TMuxer::OutboundSubstream: Send + 'static,
|
||||
{
|
||||
self.counters.check_max_established(&i.endpoint)?;
|
||||
self.counters
|
||||
.check_max_established_per_peer(self.num_peer_established(&i.peer_id))?;
|
||||
let id = self.manager.add(c, i.clone());
|
||||
self.counters.inc_established(&i.endpoint);
|
||||
self.established
|
||||
.entry(i.peer_id)
|
||||
.or_default()
|
||||
.insert(id, i.endpoint);
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
/// Gets an entry representing a connection in the pool.
|
||||
///
|
||||
/// Returns `None` if the pool has no connection with the given ID.
|
||||
|
@ -21,9 +21,9 @@
|
||||
use super::{DialError, DialingOpts, Network};
|
||||
use crate::{
|
||||
connection::{
|
||||
handler::THandlerInEvent, pool::Pool, Connected, ConnectedPoint, Connection,
|
||||
ConnectionHandler, ConnectionId, ConnectionLimit, EstablishedConnection,
|
||||
EstablishedConnectionIter, IntoConnectionHandler, PendingConnection, Substream,
|
||||
handler::THandlerInEvent, pool::Pool, ConnectedPoint, ConnectionHandler, ConnectionId,
|
||||
ConnectionLimit, EstablishedConnection, EstablishedConnectionIter, IntoConnectionHandler,
|
||||
PendingConnection, Substream,
|
||||
},
|
||||
Multiaddr, PeerId, StreamMuxer, Transport,
|
||||
};
|
||||
@ -472,44 +472,6 @@ where
|
||||
pub fn into_peer(self) -> Peer<'a, TTrans, THandler> {
|
||||
Peer::Disconnected(self)
|
||||
}
|
||||
|
||||
/// Moves the peer into a connected state by supplying an existing
|
||||
/// established connection.
|
||||
///
|
||||
/// No event is generated for this action.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
/// Panics if `connected.peer_id` does not identify the current peer.
|
||||
pub fn set_connected<TMuxer>(
|
||||
self,
|
||||
connected: Connected,
|
||||
connection: Connection<TMuxer, THandler::Handler>,
|
||||
) -> Result<ConnectedPeer<'a, TTrans, THandler>, ConnectionLimit>
|
||||
where
|
||||
THandler: Send + 'static,
|
||||
TTrans::Error: Send + 'static,
|
||||
THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>> + Send,
|
||||
<THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
|
||||
<THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
|
||||
TMuxer: StreamMuxer + Send + Sync + 'static,
|
||||
TMuxer::OutboundSubstream: Send,
|
||||
{
|
||||
if connected.peer_id != self.peer_id {
|
||||
panic!(
|
||||
"Invalid peer ID given: {:?}. Expected: {:?}",
|
||||
connected.peer_id, self.peer_id
|
||||
)
|
||||
}
|
||||
|
||||
self.network
|
||||
.pool
|
||||
.add(connection, connected)
|
||||
.map(move |_id| ConnectedPeer {
|
||||
network: self.network,
|
||||
peer_id: self.peer_id,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// The (internal) state of a `DialingAttempt`, tracking the
|
||||
|
Reference in New Issue
Block a user