mirror of
https://github.com/fluencelabs/rust-libp2p
synced 2025-06-17 20:11:22 +00:00
fix(relay): send correct PeerId
to client in outbound STOP message
Previously, the relay server would erroneously send its own `PeerId` in the STOP message to the client upon an incoming relay connection. This is obviously wrong and results in failed connection upgrades in other implementations. Pull-Request: #3767.
This commit is contained in:
@ -1,8 +1,12 @@
|
|||||||
## 0.15.2 - unreleased
|
## 0.15.2 - unreleased
|
||||||
|
|
||||||
|
- Send correct `PeerId` in outbound STOP message to client.
|
||||||
|
See [PR 3767].
|
||||||
|
|
||||||
- As a relay, when forwarding data between relay-connection-source and -destination and vice versa, flush write side when read currently has no more data available.
|
- As a relay, when forwarding data between relay-connection-source and -destination and vice versa, flush write side when read currently has no more data available.
|
||||||
See [PR 3765].
|
See [PR 3765].
|
||||||
|
|
||||||
|
[PR 3767]: https://github.com/libp2p/rust-libp2p/pull/3767
|
||||||
[PR 3765]: https://github.com/libp2p/rust-libp2p/pull/3765
|
[PR 3765]: https://github.com/libp2p/rust-libp2p/pull/3765
|
||||||
|
|
||||||
## 0.15.1
|
## 0.15.1
|
||||||
|
@ -520,7 +520,6 @@ impl NetworkBehaviour for Behaviour {
|
|||||||
event: Either::Left(handler::In::NegotiateOutboundConnect {
|
event: Either::Left(handler::In::NegotiateOutboundConnect {
|
||||||
circuit_id,
|
circuit_id,
|
||||||
inbound_circuit_req,
|
inbound_circuit_req,
|
||||||
relay_peer_id: self.local_peer_id,
|
|
||||||
src_peer_id: event_source,
|
src_peer_id: event_source,
|
||||||
src_connection_id: connection,
|
src_connection_id: connection,
|
||||||
}),
|
}),
|
||||||
|
@ -69,7 +69,6 @@ pub enum In {
|
|||||||
NegotiateOutboundConnect {
|
NegotiateOutboundConnect {
|
||||||
circuit_id: CircuitId,
|
circuit_id: CircuitId,
|
||||||
inbound_circuit_req: inbound_hop::CircuitReq,
|
inbound_circuit_req: inbound_hop::CircuitReq,
|
||||||
relay_peer_id: PeerId,
|
|
||||||
src_peer_id: PeerId,
|
src_peer_id: PeerId,
|
||||||
src_connection_id: ConnectionId,
|
src_connection_id: ConnectionId,
|
||||||
},
|
},
|
||||||
@ -112,13 +111,11 @@ impl fmt::Debug for In {
|
|||||||
In::NegotiateOutboundConnect {
|
In::NegotiateOutboundConnect {
|
||||||
circuit_id,
|
circuit_id,
|
||||||
inbound_circuit_req: _,
|
inbound_circuit_req: _,
|
||||||
relay_peer_id,
|
|
||||||
src_peer_id,
|
src_peer_id,
|
||||||
src_connection_id,
|
src_connection_id,
|
||||||
} => f
|
} => f
|
||||||
.debug_struct("In::NegotiateOutboundConnect")
|
.debug_struct("In::NegotiateOutboundConnect")
|
||||||
.field("circuit_id", circuit_id)
|
.field("circuit_id", circuit_id)
|
||||||
.field("relay_peer_id", relay_peer_id)
|
|
||||||
.field("src_peer_id", src_peer_id)
|
.field("src_peer_id", src_peer_id)
|
||||||
.field("src_connection_id", src_connection_id)
|
.field("src_connection_id", src_connection_id)
|
||||||
.finish(),
|
.finish(),
|
||||||
@ -655,7 +652,6 @@ impl ConnectionHandler for Handler {
|
|||||||
In::NegotiateOutboundConnect {
|
In::NegotiateOutboundConnect {
|
||||||
circuit_id,
|
circuit_id,
|
||||||
inbound_circuit_req,
|
inbound_circuit_req,
|
||||||
relay_peer_id,
|
|
||||||
src_peer_id,
|
src_peer_id,
|
||||||
src_connection_id,
|
src_connection_id,
|
||||||
} => {
|
} => {
|
||||||
@ -663,7 +659,7 @@ impl ConnectionHandler for Handler {
|
|||||||
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
|
.push_back(ConnectionHandlerEvent::OutboundSubstreamRequest {
|
||||||
protocol: SubstreamProtocol::new(
|
protocol: SubstreamProtocol::new(
|
||||||
outbound_stop::Upgrade {
|
outbound_stop::Upgrade {
|
||||||
relay_peer_id,
|
src_peer_id,
|
||||||
max_circuit_duration: self.config.max_circuit_duration,
|
max_circuit_duration: self.config.max_circuit_duration,
|
||||||
max_circuit_bytes: self.config.max_circuit_bytes,
|
max_circuit_bytes: self.config.max_circuit_bytes,
|
||||||
},
|
},
|
||||||
|
@ -199,10 +199,7 @@ impl Handler {
|
|||||||
});
|
});
|
||||||
|
|
||||||
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
|
self.queued_events.push_back(ConnectionHandlerEvent::Custom(
|
||||||
Event::InboundCircuitEstablished {
|
Event::InboundCircuitEstablished { src_peer_id, limit },
|
||||||
src_peer_id: self.remote_peer_id,
|
|
||||||
limit,
|
|
||||||
},
|
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
Reservation::None => {
|
Reservation::None => {
|
||||||
|
@ -32,7 +32,7 @@ use std::time::Duration;
|
|||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
pub struct Upgrade {
|
pub struct Upgrade {
|
||||||
pub relay_peer_id: PeerId,
|
pub src_peer_id: PeerId,
|
||||||
pub max_circuit_duration: Duration,
|
pub max_circuit_duration: Duration,
|
||||||
pub max_circuit_bytes: u64,
|
pub max_circuit_bytes: u64,
|
||||||
}
|
}
|
||||||
@ -55,7 +55,7 @@ impl upgrade::OutboundUpgrade<NegotiatedSubstream> for Upgrade {
|
|||||||
let msg = proto::StopMessage {
|
let msg = proto::StopMessage {
|
||||||
type_pb: proto::StopMessageType::CONNECT,
|
type_pb: proto::StopMessageType::CONNECT,
|
||||||
peer: Some(proto::Peer {
|
peer: Some(proto::Peer {
|
||||||
id: self.relay_peer_id.to_bytes(),
|
id: self.src_peer_id.to_bytes(),
|
||||||
addrs: vec![],
|
addrs: vec![],
|
||||||
}),
|
}),
|
||||||
limit: Some(proto::Limit {
|
limit: Some(proto::Limit {
|
||||||
|
@ -203,34 +203,46 @@ fn connect() {
|
|||||||
relay_peer_id,
|
relay_peer_id,
|
||||||
false, // No renewal.
|
false, // No renewal.
|
||||||
));
|
));
|
||||||
spawn_swarm_on_pool(&pool, dst);
|
|
||||||
|
|
||||||
let mut src = build_client();
|
let mut src = build_client();
|
||||||
|
let src_peer_id = *src.local_peer_id();
|
||||||
|
|
||||||
src.dial(dst_addr).unwrap();
|
src.dial(dst_addr).unwrap();
|
||||||
|
|
||||||
pool.run_until(async {
|
pool.run_until(futures::future::join(
|
||||||
|
connection_established_to(src, relay_peer_id, dst_peer_id),
|
||||||
|
connection_established_to(dst, relay_peer_id, src_peer_id),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn connection_established_to(mut swarm: Swarm<Client>, relay_peer_id: PeerId, other: PeerId) {
|
||||||
loop {
|
loop {
|
||||||
match src.select_next_some().await {
|
match swarm.select_next_some().await {
|
||||||
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
|
SwarmEvent::Dialing(peer_id) if peer_id == relay_peer_id => {}
|
||||||
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == relay_peer_id => {}
|
||||||
SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. }))
|
SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. })) if peer == other => {
|
||||||
if peer == dst_peer_id =>
|
|
||||||
{
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
SwarmEvent::Behaviour(ClientEvent::Relay(
|
SwarmEvent::Behaviour(ClientEvent::Relay(
|
||||||
relay::client::Event::OutboundCircuitEstablished { .. },
|
relay::client::Event::OutboundCircuitEstablished { .. },
|
||||||
)) => {}
|
)) => {}
|
||||||
|
SwarmEvent::Behaviour(ClientEvent::Relay(
|
||||||
|
relay::client::Event::InboundCircuitEstablished { src_peer_id, .. },
|
||||||
|
)) => {
|
||||||
|
assert_eq!(src_peer_id, other);
|
||||||
|
}
|
||||||
SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. }))
|
SwarmEvent::Behaviour(ClientEvent::Ping(ping::Event { peer, .. }))
|
||||||
if peer == relay_peer_id => {}
|
if peer == relay_peer_id => {}
|
||||||
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == dst_peer_id => {
|
SwarmEvent::ConnectionEstablished { peer_id, .. } if peer_id == other => break,
|
||||||
break
|
SwarmEvent::IncomingConnection { send_back_addr, .. } => {
|
||||||
|
let peer_id_from_addr =
|
||||||
|
PeerId::try_from_multiaddr(&send_back_addr).expect("to have /p2p");
|
||||||
|
|
||||||
|
assert_eq!(peer_id_from_addr, other)
|
||||||
}
|
}
|
||||||
e => panic!("{e:?}"),
|
e => panic!("{e:?}"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
Reference in New Issue
Block a user