mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-04-25 19:02:13 +00:00
Remove spaces before semicolons (#591)
This commit is contained in:
parent
585c90a33c
commit
45cd7db6e9
@ -192,8 +192,8 @@ impl<'a, TInEvent, TOutEvent, THandler> CollectionReachEvent<'a, TInEvent, TOutE
|
|||||||
let ret_value = if let Some(former_task_id) = former_task_id {
|
let ret_value = if let Some(former_task_id) = former_task_id {
|
||||||
self.parent.inner.task(former_task_id)
|
self.parent.inner.task(former_task_id)
|
||||||
.expect("whenever we receive a TaskClosed event or close a node, we remove the \
|
.expect("whenever we receive a TaskClosed event or close a node, we remove the \
|
||||||
corresponding entry from self.nodes ; therefore all elements in \
|
corresponding entry from self.nodes; therefore all elements in \
|
||||||
self.nodes are valid tasks in the HandledNodesTasks ; qed")
|
self.nodes are valid tasks in the HandledNodesTasks; qed")
|
||||||
.close();
|
.close();
|
||||||
let _former_other_state = self.parent.tasks.remove(&former_task_id);
|
let _former_other_state = self.parent.tasks.remove(&former_task_id);
|
||||||
debug_assert_eq!(_former_other_state, Some(TaskState::Connected(self.peer_id.clone())));
|
debug_assert_eq!(_former_other_state, Some(TaskState::Connected(self.peer_id.clone())));
|
||||||
@ -237,10 +237,10 @@ impl<'a, TInEvent, TOutEvent, THandler> Drop for CollectionReachEvent<'a, TInEve
|
|||||||
let task_state = self.parent.tasks.remove(&self.id);
|
let task_state = self.parent.tasks.remove(&self.id);
|
||||||
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
|
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
|
||||||
self.parent.inner.task(self.id)
|
self.parent.inner.task(self.id)
|
||||||
.expect("we create the CollectionReachEvent with a valid task id ; the \
|
.expect("we create the CollectionReachEvent with a valid task id; the \
|
||||||
CollectionReachEvent mutably borrows the collection, therefore nothing \
|
CollectionReachEvent mutably borrows the collection, therefore nothing \
|
||||||
can delete this task during the lifetime of the CollectionReachEvent ; \
|
can delete this task during the lifetime of the CollectionReachEvent; \
|
||||||
therefore the task is still valid when we delete it ; qed")
|
therefore the task is still valid when we delete it; qed")
|
||||||
.close();
|
.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -304,9 +304,9 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
|
|||||||
entry.remove();
|
entry.remove();
|
||||||
self.inner.task(id.0)
|
self.inner.task(id.0)
|
||||||
.expect("whenever we receive a TaskClosed event or interrupt a task, we \
|
.expect("whenever we receive a TaskClosed event or interrupt a task, we \
|
||||||
remove the corresponding entry from self.tasks ; therefore all \
|
remove the corresponding entry from self.tasks; therefore all \
|
||||||
elements in self.tasks are valid tasks in the \
|
elements in self.tasks are valid tasks in the \
|
||||||
HandledNodesTasks ; qed")
|
HandledNodesTasks; qed")
|
||||||
.close();
|
.close();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -381,7 +381,7 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
|
|||||||
})
|
})
|
||||||
},
|
},
|
||||||
(Some(TaskState::Pending), _, _) => {
|
(Some(TaskState::Pending), _, _) => {
|
||||||
// TODO: this variant shouldn't happen ; prove this
|
// TODO: this variant shouldn't happen; prove this
|
||||||
panic!()
|
panic!()
|
||||||
},
|
},
|
||||||
(Some(TaskState::Connected(peer_id)), Ok(()), _handler) => {
|
(Some(TaskState::Connected(peer_id)), Ok(()), _handler) => {
|
||||||
@ -402,9 +402,9 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
|
|||||||
})
|
})
|
||||||
},
|
},
|
||||||
(None, _, _) => {
|
(None, _, _) => {
|
||||||
panic!("self.tasks is always kept in sync with the tasks in self.inner ; \
|
panic!("self.tasks is always kept in sync with the tasks in self.inner; \
|
||||||
when we add a task in self.inner we add a corresponding entry in \
|
when we add a task in self.inner we add a corresponding entry in \
|
||||||
self.tasks, and remove the entry only when the task is closed ; \
|
self.tasks, and remove the entry only when the task is closed; \
|
||||||
qed")
|
qed")
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -420,9 +420,9 @@ impl<TInEvent, TOutEvent, THandler> CollectionStream<TInEvent, TOutEvent, THandl
|
|||||||
let peer_id = match self.tasks.get(&id) {
|
let peer_id = match self.tasks.get(&id) {
|
||||||
Some(TaskState::Connected(peer_id)) => peer_id.clone(),
|
Some(TaskState::Connected(peer_id)) => peer_id.clone(),
|
||||||
_ => panic!("we can only receive NodeEvent events from a task after we \
|
_ => panic!("we can only receive NodeEvent events from a task after we \
|
||||||
received a corresponding NodeReached event from that same task ; \
|
received a corresponding NodeReached event from that same task; \
|
||||||
when we receive a NodeReached event, we ensure that the entry in \
|
when we receive a NodeReached event, we ensure that the entry in \
|
||||||
self.tasks is switched to the Connected state ; qed"),
|
self.tasks is switched to the Connected state; qed"),
|
||||||
};
|
};
|
||||||
|
|
||||||
Async::Ready(CollectionEvent::NodeEvent {
|
Async::Ready(CollectionEvent::NodeEvent {
|
||||||
@ -457,8 +457,8 @@ impl<'a, TInEvent> PeerMut<'a, TInEvent> {
|
|||||||
let old_task_id = self.nodes.remove(&peer_id);
|
let old_task_id = self.nodes.remove(&peer_id);
|
||||||
debug_assert_eq!(old_task_id, Some(self.inner.id()));
|
debug_assert_eq!(old_task_id, Some(self.inner.id()));
|
||||||
} else {
|
} else {
|
||||||
panic!("a PeerMut can only be created if an entry is present in nodes ; an entry in \
|
panic!("a PeerMut can only be created if an entry is present in nodes; an entry in \
|
||||||
nodes always matched a Connected entry in tasks ; qed");
|
nodes always matched a Connected entry in tasks; qed");
|
||||||
};
|
};
|
||||||
|
|
||||||
self.inner.close();
|
self.inner.close();
|
||||||
|
@ -25,7 +25,7 @@ use std::io::Error as IoError;
|
|||||||
|
|
||||||
/// Handler for the substreams of a node.
|
/// Handler for the substreams of a node.
|
||||||
// TODO: right now it is possible for a node handler to be built, then shut down right after if we
|
// TODO: right now it is possible for a node handler to be built, then shut down right after if we
|
||||||
// realize we dialed the wrong peer for example ; this could be surprising and should either
|
// realize we dialed the wrong peer for example; this could be surprising and should either
|
||||||
// be documented or changed (favouring the "documented" right now)
|
// be documented or changed (favouring the "documented" right now)
|
||||||
pub trait NodeHandler {
|
pub trait NodeHandler {
|
||||||
/// Custom event that can be received from the outside.
|
/// Custom event that can be received from the outside.
|
||||||
|
@ -407,7 +407,7 @@ where
|
|||||||
node.inject_event(event);
|
node.inject_event(event);
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(None)) => {
|
Ok(Async::Ready(None)) => {
|
||||||
// Node closed by the external API ; start shutdown process.
|
// Node closed by the external API; start shutdown process.
|
||||||
node.shutdown();
|
node.shutdown();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -481,7 +481,7 @@ where
|
|||||||
if actual_peer_id == expected_peer_id {
|
if actual_peer_id == expected_peer_id {
|
||||||
Ok((actual_peer_id, muxer))
|
Ok((actual_peer_id, muxer))
|
||||||
} else {
|
} else {
|
||||||
let msg = format!("public key mismatch ; expected = {:?} ; obtained = {:?}",
|
let msg = format!("public key mismatch; expected = {:?}; obtained = {:?}",
|
||||||
expected_peer_id, actual_peer_id);
|
expected_peer_id, actual_peer_id);
|
||||||
Err(IoError::new(IoErrorKind::Other, msg))
|
Err(IoError::new(IoErrorKind::Other, msg))
|
||||||
}
|
}
|
||||||
@ -573,10 +573,10 @@ where
|
|||||||
}) => {
|
}) => {
|
||||||
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
|
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
|
||||||
.expect("We insert into connected_points whenever a connection is \
|
.expect("We insert into connected_points whenever a connection is \
|
||||||
opened and remove only when a connection is closed ; the \
|
opened and remove only when a connection is closed; the \
|
||||||
underlying API is guaranteed to always deliver a connection \
|
underlying API is guaranteed to always deliver a connection \
|
||||||
closed message after it has been opened, and no two closed \
|
closed message after it has been opened, and no two closed \
|
||||||
messages ; qed");
|
messages; qed");
|
||||||
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
|
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
|
||||||
action = Default::default();
|
action = Default::default();
|
||||||
out_event = RawSwarmEvent::NodeError {
|
out_event = RawSwarmEvent::NodeError {
|
||||||
@ -588,10 +588,10 @@ where
|
|||||||
Async::Ready(CollectionEvent::NodeClosed { peer_id }) => {
|
Async::Ready(CollectionEvent::NodeClosed { peer_id }) => {
|
||||||
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
|
let endpoint = self.reach_attempts.connected_points.remove(&peer_id)
|
||||||
.expect("We insert into connected_points whenever a connection is \
|
.expect("We insert into connected_points whenever a connection is \
|
||||||
opened and remove only when a connection is closed ; the \
|
opened and remove only when a connection is closed; the \
|
||||||
underlying API is guaranteed to always deliver a connection \
|
underlying API is guaranteed to always deliver a connection \
|
||||||
closed message after it has been opened, and no two closed \
|
closed message after it has been opened, and no two closed \
|
||||||
messages ; qed");
|
messages; qed");
|
||||||
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
|
debug_assert!(!self.reach_attempts.out_reach_attempts.contains_key(&peer_id));
|
||||||
action = Default::default();
|
action = Default::default();
|
||||||
out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint };
|
out_event = RawSwarmEvent::NodeClosed { peer_id, endpoint };
|
||||||
@ -607,15 +607,15 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
if let Some(interrupt) = action.interrupt {
|
if let Some(interrupt) = action.interrupt {
|
||||||
// TODO: improve proof or remove ; this is too complicated right now
|
// TODO: improve proof or remove; this is too complicated right now
|
||||||
self.active_nodes
|
self.active_nodes
|
||||||
.interrupt(interrupt)
|
.interrupt(interrupt)
|
||||||
.expect("interrupt is guaranteed to be gathered from `out_reach_attempts` ;
|
.expect("interrupt is guaranteed to be gathered from `out_reach_attempts`;
|
||||||
we insert in out_reach_attempts only when we call \
|
we insert in out_reach_attempts only when we call \
|
||||||
active_nodes.add_reach_attempt, and we remove only when we call \
|
active_nodes.add_reach_attempt, and we remove only when we call \
|
||||||
interrupt or when a reach attempt succeeds or errors ; therefore the \
|
interrupt or when a reach attempt succeeds or errors; therefore the \
|
||||||
out_reach_attempts should always be in sync with the actual \
|
out_reach_attempts should always be in sync with the actual \
|
||||||
attempts ; qed");
|
attempts; qed");
|
||||||
}
|
}
|
||||||
|
|
||||||
return Async::Ready(out_event);
|
return Async::Ready(out_event);
|
||||||
@ -688,9 +688,9 @@ where
|
|||||||
if outcome == CollectionNodeAccept::ReplacedExisting {
|
if outcome == CollectionNodeAccept::ReplacedExisting {
|
||||||
let closed_endpoint = closed_endpoint
|
let closed_endpoint = closed_endpoint
|
||||||
.expect("We insert into connected_points whenever a connection is opened and \
|
.expect("We insert into connected_points whenever a connection is opened and \
|
||||||
remove only when a connection is closed ; the underlying API is \
|
remove only when a connection is closed; the underlying API is \
|
||||||
guaranteed to always deliver a connection closed message after it has \
|
guaranteed to always deliver a connection closed message after it has \
|
||||||
been opened, and no two closed messages ; qed");
|
been opened, and no two closed messages; qed");
|
||||||
return (action, RawSwarmEvent::Replaced {
|
return (action, RawSwarmEvent::Replaced {
|
||||||
peer_id,
|
peer_id,
|
||||||
endpoint: opened_endpoint,
|
endpoint: opened_endpoint,
|
||||||
@ -726,9 +726,9 @@ where
|
|||||||
if outcome == CollectionNodeAccept::ReplacedExisting {
|
if outcome == CollectionNodeAccept::ReplacedExisting {
|
||||||
let closed_endpoint = closed_endpoint
|
let closed_endpoint = closed_endpoint
|
||||||
.expect("We insert into connected_points whenever a connection is opened and \
|
.expect("We insert into connected_points whenever a connection is opened and \
|
||||||
remove only when a connection is closed ; the underlying API is guaranteed \
|
remove only when a connection is closed; the underlying API is guaranteed \
|
||||||
to always deliver a connection closed message after it has been opened, \
|
to always deliver a connection closed message after it has been opened, \
|
||||||
and no two closed messages ; qed");
|
and no two closed messages; qed");
|
||||||
return (Default::default(), RawSwarmEvent::Replaced {
|
return (Default::default(), RawSwarmEvent::Replaced {
|
||||||
peer_id,
|
peer_id,
|
||||||
endpoint: opened_endpoint,
|
endpoint: opened_endpoint,
|
||||||
@ -740,7 +740,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
// We didn't find any entry in neither the outgoing connections not ingoing connections.
|
// We didn't find any entry in neither the outgoing connections not ingoing connections.
|
||||||
// TODO: improve proof or remove ; this is too complicated right now
|
// TODO: improve proof or remove; this is too complicated right now
|
||||||
panic!("The API of collection guarantees that the id sent back in NodeReached (which is where \
|
panic!("The API of collection guarantees that the id sent back in NodeReached (which is where \
|
||||||
we call handle_node_reached) is one that was passed to add_reach_attempt. Whenever we \
|
we call handle_node_reached) is one that was passed to add_reach_attempt. Whenever we \
|
||||||
call add_reach_attempt, we also insert at the same time an entry either in \
|
call add_reach_attempt, we also insert at the same time an entry either in \
|
||||||
@ -817,7 +817,7 @@ where TTrans: Transport
|
|||||||
}
|
}
|
||||||
|
|
||||||
// The id was neither in the outbound list nor the inbound list.
|
// The id was neither in the outbound list nor the inbound list.
|
||||||
// TODO: improve proof or remove ; this is too complicated right now
|
// TODO: improve proof or remove; this is too complicated right now
|
||||||
panic!("The API of collection guarantees that the id sent back in ReachError events \
|
panic!("The API of collection guarantees that the id sent back in ReachError events \
|
||||||
(which is where we call handle_reach_error) is one that was passed to \
|
(which is where we call handle_reach_error) is one that was passed to \
|
||||||
add_reach_attempt. Whenever we call add_reach_attempt, we also insert \
|
add_reach_attempt. Whenever we call add_reach_attempt, we also insert \
|
||||||
@ -999,7 +999,7 @@ impl<'a, TInEvent> PeerConnected<'a, TInEvent> {
|
|||||||
/// Closes the connection to this node.
|
/// Closes the connection to this node.
|
||||||
///
|
///
|
||||||
/// No `NodeClosed` message will be generated for this node.
|
/// No `NodeClosed` message will be generated for this node.
|
||||||
// TODO: consider returning a `PeerNotConnected` ; however this makes all the borrows things
|
// TODO: consider returning a `PeerNotConnected`; however this makes all the borrows things
|
||||||
// much more annoying to deal with
|
// much more annoying to deal with
|
||||||
pub fn close(self) {
|
pub fn close(self) {
|
||||||
self.connected_points.remove(&self.peer_id);
|
self.connected_points.remove(&self.peer_id);
|
||||||
@ -1011,9 +1011,9 @@ impl<'a, TInEvent> PeerConnected<'a, TInEvent> {
|
|||||||
pub fn endpoint(&self) -> &ConnectedPoint {
|
pub fn endpoint(&self) -> &ConnectedPoint {
|
||||||
self.connected_points.get(&self.peer_id)
|
self.connected_points.get(&self.peer_id)
|
||||||
.expect("We insert into connected_points whenever a connection is opened and remove \
|
.expect("We insert into connected_points whenever a connection is opened and remove \
|
||||||
only when a connection is closed ; the underlying API is guaranteed to always \
|
only when a connection is closed; the underlying API is guaranteed to always \
|
||||||
deliver a connection closed message after it has been opened, and no two \
|
deliver a connection closed message after it has been opened, and no two \
|
||||||
closed messages ; qed")
|
closed messages; qed")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sends an event to the node.
|
/// Sends an event to the node.
|
||||||
@ -1031,13 +1031,13 @@ pub struct PeerPendingConnect<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a> {
|
|||||||
|
|
||||||
impl<'a, TInEvent, TOutEvent, THandler> PeerPendingConnect<'a, TInEvent, TOutEvent, THandler> {
|
impl<'a, TInEvent, TOutEvent, THandler> PeerPendingConnect<'a, TInEvent, TOutEvent, THandler> {
|
||||||
/// Interrupt this connection attempt.
|
/// Interrupt this connection attempt.
|
||||||
// TODO: consider returning a PeerNotConnected ; however that is really pain in terms of
|
// TODO: consider returning a PeerNotConnected; however that is really pain in terms of
|
||||||
// borrows
|
// borrows
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn interrupt(self) {
|
pub fn interrupt(self) {
|
||||||
let attempt = self.attempt.remove();
|
let attempt = self.attempt.remove();
|
||||||
if let Err(_) = self.active_nodes.interrupt(attempt.id) {
|
if let Err(_) = self.active_nodes.interrupt(attempt.id) {
|
||||||
// TODO: improve proof or remove ; this is too complicated right now
|
// TODO: improve proof or remove; this is too complicated right now
|
||||||
panic!("We retreived this attempt.id from out_reach_attempts. We insert in \
|
panic!("We retreived this attempt.id from out_reach_attempts. We insert in \
|
||||||
out_reach_attempts only at the same time as we call add_reach_attempt. \
|
out_reach_attempts only at the same time as we call add_reach_attempt. \
|
||||||
Whenever we receive a NodeReached, NodeReplaced or ReachError event, which \
|
Whenever we receive a NodeReached, NodeReplaced or ReachError event, which \
|
||||||
|
@ -329,7 +329,7 @@ where C: AsyncRead + AsyncWrite
|
|||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
|
|
||||||
if inner.opened_substreams.len() >= inner.config.max_substreams {
|
if inner.opened_substreams.len() >= inner.config.max_substreams {
|
||||||
debug!("Refused substream ; reached maximum number of substreams {}", inner.config.max_substreams);
|
debug!("Refused substream; reached maximum number of substreams {}", inner.config.max_substreams);
|
||||||
return Err(IoError::new(IoErrorKind::ConnectionRefused,
|
return Err(IoError::new(IoErrorKind::ConnectionRefused,
|
||||||
"exceeded maximum number of open substreams"));
|
"exceeded maximum number of open substreams"));
|
||||||
}
|
}
|
||||||
@ -460,7 +460,7 @@ where C: AsyncRead + AsyncWrite
|
|||||||
Ok(Async::Ready(Some(data))) => substream.current_data = data,
|
Ok(Async::Ready(Some(data))) => substream.current_data = data,
|
||||||
Ok(Async::Ready(None)) => return Ok(Async::Ready(0)),
|
Ok(Async::Ready(None)) => return Ok(Async::Ready(0)),
|
||||||
Ok(Async::NotReady) => {
|
Ok(Async::NotReady) => {
|
||||||
// There was no data packet in the buffer about this substream ; maybe it's
|
// There was no data packet in the buffer about this substream; maybe it's
|
||||||
// because it has been closed.
|
// because it has been closed.
|
||||||
if inner.opened_substreams.contains(&(substream.num, substream.endpoint)) {
|
if inner.opened_substreams.contains(&(substream.num, substream.endpoint)) {
|
||||||
return Ok(Async::NotReady)
|
return Ok(Async::NotReady)
|
||||||
|
@ -345,7 +345,7 @@ impl FloodSubController {
|
|||||||
let topics = topics.into_iter();
|
let topics = topics.into_iter();
|
||||||
|
|
||||||
if log_enabled!(Level::Debug) {
|
if log_enabled!(Level::Debug) {
|
||||||
debug!("Queuing sub/unsub message ; sub = {:?} ; unsub = {:?}",
|
debug!("Queuing sub/unsub message; sub = {:?}; unsub = {:?}",
|
||||||
topics.clone().filter(|t| t.1)
|
topics.clone().filter(|t| t.1)
|
||||||
.map(|t| t.0.hash().clone().into_string())
|
.map(|t| t.0.hash().clone().into_string())
|
||||||
.collect::<Vec<_>>(),
|
.collect::<Vec<_>>(),
|
||||||
@ -389,7 +389,7 @@ impl FloodSubController {
|
|||||||
{
|
{
|
||||||
let topics = topics.into_iter().collect::<Vec<_>>();
|
let topics = topics.into_iter().collect::<Vec<_>>();
|
||||||
|
|
||||||
debug!("Queueing publish message ; topics = {:?} ; data_len = {:?}",
|
debug!("Queueing publish message; topics = {:?}; data_len = {:?}",
|
||||||
topics.iter().map(|t| t.hash().clone().into_string()).collect::<Vec<_>>(),
|
topics.iter().map(|t| t.hash().clone().into_string()).collect::<Vec<_>>(),
|
||||||
data.len());
|
data.len());
|
||||||
|
|
||||||
@ -554,7 +554,7 @@ fn handle_packet_received(
|
|||||||
let mut input = match protobuf::parse_from_bytes::<rpc_proto::RPC>(&bytes) {
|
let mut input = match protobuf::parse_from_bytes::<rpc_proto::RPC>(&bytes) {
|
||||||
Ok(msg) => msg,
|
Ok(msg) => msg,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("Failed to parse protobuf message ; err = {:?}", err);
|
debug!("Failed to parse protobuf message; err = {:?}", err);
|
||||||
return Err(err.into());
|
return Err(err.into());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -588,7 +588,7 @@ fn handle_packet_received(
|
|||||||
.lock()
|
.lock()
|
||||||
.insert(hash((from.clone(), publish.take_seqno())))
|
.insert(hash((from.clone(), publish.take_seqno())))
|
||||||
{
|
{
|
||||||
trace!("Skipping message because we had already received it ; payload = {} bytes",
|
trace!("Skipping message because we had already received it; payload = {} bytes",
|
||||||
publish.get_data().len());
|
publish.get_data().len());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -609,7 +609,7 @@ fn handle_packet_received(
|
|||||||
.map(|h| TopicHash::from_raw(h))
|
.map(|h| TopicHash::from_raw(h))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
trace!("Processing message for topics {:?} ; payload = {} bytes",
|
trace!("Processing message for topics {:?}; payload = {} bytes",
|
||||||
topics,
|
topics,
|
||||||
publish.get_data().len());
|
publish.get_data().len());
|
||||||
|
|
||||||
|
@ -87,7 +87,7 @@ where
|
|||||||
|
|
||||||
let bytes = message
|
let bytes = message
|
||||||
.write_to_bytes()
|
.write_to_bytes()
|
||||||
.expect("writing protobuf failed ; should never happen");
|
.expect("writing protobuf failed; should never happen");
|
||||||
|
|
||||||
let future = self.inner.send(bytes).map(|_| ());
|
let future = self.inner.send(bytes).map(|_| ());
|
||||||
Box::new(future) as Box<_>
|
Box::new(future) as Box<_>
|
||||||
@ -142,7 +142,7 @@ where
|
|||||||
let (info, observed_addr) = match parse_proto_msg(msg) {
|
let (info, observed_addr) = match parse_proto_msg(msg) {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("Failed to parse protobuf message ; error = {:?}", err);
|
debug!("Failed to parse protobuf message; error = {:?}", err);
|
||||||
return Err(err.into());
|
return Err(err.into());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -192,7 +192,7 @@ where F: FnMut(&PeerId) -> Fut + Send + 'a,
|
|||||||
fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
|
fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
|
||||||
let my_id_len = my_id.as_bytes().len();
|
let my_id_len = my_id.as_bytes().len();
|
||||||
|
|
||||||
// TODO: this 2 is magic here ; it is the length of the hash of the multihash
|
// TODO: this 2 is magic here; it is the length of the hash of the multihash
|
||||||
let bits_diff = bucket_num + 1;
|
let bits_diff = bucket_num + 1;
|
||||||
if bits_diff > 8 * (my_id_len - 2) {
|
if bits_diff > 8 * (my_id_len - 2) {
|
||||||
return Err(());
|
return Err(());
|
||||||
@ -232,7 +232,7 @@ where F: FnMut(&PeerId) -> Fut + 'a,
|
|||||||
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
|
Fut: IntoFuture<Item = KadConnecController, Error = IoError> + 'a,
|
||||||
Fut::Future: Send,
|
Fut::Future: Send,
|
||||||
{
|
{
|
||||||
debug!("Start query for {:?} ; num results = {}", searched_key, num_results);
|
debug!("Start query for {:?}; num results = {}", searched_key, num_results);
|
||||||
|
|
||||||
// State of the current iterative process.
|
// State of the current iterative process.
|
||||||
struct State<'a, F> {
|
struct State<'a, F> {
|
||||||
@ -322,7 +322,7 @@ where F: FnMut(&PeerId) -> Fut + 'a,
|
|||||||
to_contact
|
to_contact
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!("New query round ; {} queries in progress ; contacting {} new peers",
|
debug!("New query round; {} queries in progress; contacting {} new peers",
|
||||||
state.current_attempts_fut.len(),
|
state.current_attempts_fut.len(),
|
||||||
to_contact.len());
|
to_contact.len());
|
||||||
|
|
||||||
@ -449,7 +449,7 @@ where F: FnMut(&PeerId) -> Fut + 'a,
|
|||||||
|
|
||||||
} else {
|
} else {
|
||||||
if !local_nearest_node_updated {
|
if !local_nearest_node_updated {
|
||||||
trace!("Loop didn't update closer node ; jumping to step 2");
|
trace!("Loop didn't update closer node; jumping to step 2");
|
||||||
state.stage = Stage::SecondStep;
|
state.stage = Stage::SecondStep;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -88,7 +88,7 @@ impl KadPeer {
|
|||||||
// Builds a `KadPeer` from its raw protobuf equivalent.
|
// Builds a `KadPeer` from its raw protobuf equivalent.
|
||||||
// TODO: use TryFrom once stable
|
// TODO: use TryFrom once stable
|
||||||
fn from_peer(peer: &mut protobuf_structs::dht::Message_Peer) -> Result<KadPeer, IoError> {
|
fn from_peer(peer: &mut protobuf_structs::dht::Message_Peer) -> Result<KadPeer, IoError> {
|
||||||
// TODO: this is in fact a CID ; not sure if this should be handled in `from_bytes` or
|
// TODO: this is in fact a CID; not sure if this should be handled in `from_bytes` or
|
||||||
// as a special case here
|
// as a special case here
|
||||||
let node_id = PeerId::from_bytes(peer.get_id().to_vec())
|
let node_id = PeerId::from_bytes(peer.get_id().to_vec())
|
||||||
.map_err(|_| IoError::new(IoErrorKind::InvalidData, "invalid peer id"))?;
|
.map_err(|_| IoError::new(IoErrorKind::InvalidData, "invalid peer id"))?;
|
||||||
@ -339,7 +339,7 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result<KadMsg, I
|
|||||||
|
|
||||||
} else {
|
} else {
|
||||||
// TODO: for now we don't parse the peer properly, so it is possible that we get
|
// TODO: for now we don't parse the peer properly, so it is possible that we get
|
||||||
// parsing errors for peers even when they are valid ; we ignore these
|
// parsing errors for peers even when they are valid; we ignore these
|
||||||
// errors for now, but ultimately we should just error altogether
|
// errors for now, but ultimately we should just error altogether
|
||||||
let closer_peers = message.mut_closerPeers()
|
let closer_peers = message.mut_closerPeers()
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
@ -362,7 +362,7 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result<KadMsg, I
|
|||||||
|
|
||||||
} else {
|
} else {
|
||||||
// TODO: for now we don't parse the peer properly, so it is possible that we get
|
// TODO: for now we don't parse the peer properly, so it is possible that we get
|
||||||
// parsing errors for peers even when they are valid ; we ignore these
|
// parsing errors for peers even when they are valid; we ignore these
|
||||||
// errors for now, but ultimately we should just error altogether
|
// errors for now, but ultimately we should just error altogether
|
||||||
let closer_peers = message.mut_closerPeers()
|
let closer_peers = message.mut_closerPeers()
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
@ -382,7 +382,7 @@ fn proto_to_msg(mut message: protobuf_structs::dht::Message) -> Result<KadMsg, I
|
|||||||
|
|
||||||
protobuf_structs::dht::Message_MessageType::ADD_PROVIDER => {
|
protobuf_structs::dht::Message_MessageType::ADD_PROVIDER => {
|
||||||
// TODO: for now we don't parse the peer properly, so it is possible that we get
|
// TODO: for now we don't parse the peer properly, so it is possible that we get
|
||||||
// parsing errors for peers even when they are valid ; we ignore these
|
// parsing errors for peers even when they are valid; we ignore these
|
||||||
// errors for now, but ultimately we should just error altogether
|
// errors for now, but ultimately we should just error altogether
|
||||||
let provider_peer = message.mut_providerPeers()
|
let provider_peer = message.mut_providerPeers()
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
|
@ -100,7 +100,7 @@ where
|
|||||||
fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
|
fn gen_random_id(my_id: &PeerId, bucket_num: usize) -> Result<PeerId, ()> {
|
||||||
let my_id_len = my_id.as_bytes().len();
|
let my_id_len = my_id.as_bytes().len();
|
||||||
|
|
||||||
// TODO: this 2 is magic here ; it is the length of the hash of the multihash
|
// TODO: this 2 is magic here; it is the length of the hash of the multihash
|
||||||
let bits_diff = bucket_num + 1;
|
let bits_diff = bucket_num + 1;
|
||||||
if bits_diff > 8 * (my_id_len - 2) {
|
if bits_diff > 8 * (my_id_len - 2) {
|
||||||
return Err(());
|
return Err(());
|
||||||
@ -137,7 +137,7 @@ where
|
|||||||
FBuckets: Fn(PeerId) -> Vec<PeerId> + 'a + Clone,
|
FBuckets: Fn(PeerId) -> Vec<PeerId> + 'a + Clone,
|
||||||
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError> + Send> + 'a + Clone,
|
FFindNode: Fn(Multiaddr, PeerId) -> Box<Future<Item = Vec<protocol::Peer>, Error = IoError> + Send> + 'a + Clone,
|
||||||
{
|
{
|
||||||
debug!("Start query for {:?} ; num results = {}", searched_key, num_results);
|
debug!("Start query for {:?}; num results = {}", searched_key, num_results);
|
||||||
|
|
||||||
// State of the current iterative process.
|
// State of the current iterative process.
|
||||||
struct State<'a> {
|
struct State<'a> {
|
||||||
@ -230,7 +230,7 @@ where
|
|||||||
to_contact
|
to_contact
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!("New query round ; {} queries in progress ; contacting {} new peers",
|
debug!("New query round; {} queries in progress; contacting {} new peers",
|
||||||
state.current_attempts_fut.len(),
|
state.current_attempts_fut.len(),
|
||||||
to_contact.len());
|
to_contact.len());
|
||||||
|
|
||||||
@ -350,7 +350,7 @@ where
|
|||||||
|
|
||||||
} else {
|
} else {
|
||||||
if !local_nearest_node_updated {
|
if !local_nearest_node_updated {
|
||||||
trace!("Loop didn't update closer node ; jumping to step 2");
|
trace!("Loop didn't update closer node; jumping to step 2");
|
||||||
state.stage = Stage::SecondStep;
|
state.stage = Stage::SecondStep;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -311,7 +311,7 @@ where TSocket: AsyncRead + AsyncWrite
|
|||||||
PingListenerState::Listening => {
|
PingListenerState::Listening => {
|
||||||
match self.inner.poll() {
|
match self.inner.poll() {
|
||||||
Ok(Async::Ready(Some(payload))) => {
|
Ok(Async::Ready(Some(payload))) => {
|
||||||
debug!("Received ping (payload={:?}) ; sending back", payload);
|
debug!("Received ping (payload={:?}); sending back", payload);
|
||||||
self.state = PingListenerState::Sending(payload.freeze())
|
self.state = PingListenerState::Sending(payload.freeze())
|
||||||
},
|
},
|
||||||
Ok(Async::Ready(None)) => self.state = PingListenerState::Closing,
|
Ok(Async::Ready(None)) => self.state = PingListenerState::Closing,
|
||||||
|
@ -324,7 +324,7 @@ where
|
|||||||
.and_then(|context| {
|
.and_then(|context| {
|
||||||
// Generate our nonce.
|
// Generate our nonce.
|
||||||
let context = context.with_local()?;
|
let context = context.with_local()?;
|
||||||
trace!("starting handshake ; local nonce = {:?}", context.state.nonce);
|
trace!("starting handshake; local nonce = {:?}", context.state.nonce);
|
||||||
Ok(context)
|
Ok(context)
|
||||||
})
|
})
|
||||||
.and_then(|context| {
|
.and_then(|context| {
|
||||||
@ -346,7 +346,7 @@ where
|
|||||||
return Err(err.into())
|
return Err(err.into())
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
trace!("received proposition from remote ; pubkey = {:?} ; nonce = {:?}",
|
trace!("received proposition from remote; pubkey = {:?}; nonce = {:?}",
|
||||||
context.state.public_key, context.state.nonce);
|
context.state.public_key, context.state.nonce);
|
||||||
Ok((socket, context))
|
Ok((socket, context))
|
||||||
})
|
})
|
||||||
@ -436,7 +436,7 @@ where
|
|||||||
let remote_exch = match protobuf_parse_from_bytes::<Exchange>(&raw) {
|
let remote_exch = match protobuf_parse_from_bytes::<Exchange>(&raw) {
|
||||||
Ok(e) => e,
|
Ok(e) => e,
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
debug!("failed to parse remote's exchange protobuf ; {:?}", err);
|
debug!("failed to parse remote's exchange protobuf; {:?}", err);
|
||||||
return Err(SecioError::HandshakeParsingFailure);
|
return Err(SecioError::HandshakeParsingFailure);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -306,7 +306,7 @@ impl SecioKeyPair {
|
|||||||
SecioKeyPairInner::Secp256k1 { ref private } => {
|
SecioKeyPairInner::Secp256k1 { ref private } => {
|
||||||
let secp = secp256k1::Secp256k1::with_caps(secp256k1::ContextFlag::SignOnly);
|
let secp = secp256k1::Secp256k1::with_caps(secp256k1::ContextFlag::SignOnly);
|
||||||
let pubkey = secp256k1::key::PublicKey::from_secret_key(&secp, private)
|
let pubkey = secp256k1::key::PublicKey::from_secret_key(&secp, private)
|
||||||
.expect("wrong secp256k1 private key ; type safety violated");
|
.expect("wrong secp256k1 private key; type safety violated");
|
||||||
PublicKey::Secp256k1(pubkey.serialize_vec(&secp, true).to_vec())
|
PublicKey::Secp256k1(pubkey.serialize_vec(&secp, true).to_vec())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@
|
|||||||
//! General-purpose key-value storage.
|
//! General-purpose key-value storage.
|
||||||
//! The keys are strings, and the values are of any type you want.
|
//! The keys are strings, and the values are of any type you want.
|
||||||
//!
|
//!
|
||||||
//! > **Note**: This crate is meant to be a utility for the implementation of other crates ; it
|
//! > **Note**: This crate is meant to be a utility for the implementation of other crates; it
|
||||||
//! > does not directly participate in the stack of libp2p.
|
//! > does not directly participate in the stack of libp2p.
|
||||||
//!
|
//!
|
||||||
//! This crate provides the `Datastore` trait, whose template parameter is the type of the value.
|
//! This crate provides the `Datastore` trait, whose template parameter is the type of the value.
|
||||||
|
@ -179,7 +179,7 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// How to resolve ; to an IPv4 address or an IPv6 address?
|
// How to resolve; to an IPv4 address or an IPv6 address?
|
||||||
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||||
enum ResolveTy {
|
enum ResolveTy {
|
||||||
Dns4,
|
Dns4,
|
||||||
|
@ -206,7 +206,7 @@ where
|
|||||||
OwnedMessage::Binary(data) => Ok(data),
|
OwnedMessage::Binary(data) => Ok(data),
|
||||||
OwnedMessage::Text(data) => Ok(data.into_bytes()),
|
OwnedMessage::Text(data) => Ok(data.into_bytes()),
|
||||||
// TODO: pings and pongs and close messages need to be
|
// TODO: pings and pongs and close messages need to be
|
||||||
// answered ; and this is really hard ; for now we produce
|
// answered; and this is really hard; for now we produce
|
||||||
// an error when that happens
|
// an error when that happens
|
||||||
_ => Err(IoError::new(IoErrorKind::Other, "unimplemented")),
|
_ => Err(IoError::new(IoErrorKind::Other, "unimplemented")),
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user