mirror of
https://github.com/fluencelabs/tendermint
synced 2025-07-31 20:21:56 +00:00
remove TryBroadcast
This commit is contained in:
@@ -202,44 +202,21 @@ func (sw *Switch) OnStop() {
|
|||||||
|
|
||||||
// Broadcast runs a go routine for each attempted send, which will block
|
// Broadcast runs a go routine for each attempted send, which will block
|
||||||
// trying to send for defaultSendTimeoutSeconds. Returns a channel
|
// trying to send for defaultSendTimeoutSeconds. Returns a channel
|
||||||
// which receives broadcast result for each attempted send (success=false if times out).
|
// which receives success values for each attempted send (false if times out).
|
||||||
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
|
// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
|
||||||
// TODO: Something more intelligent.
|
// TODO: Something more intelligent.
|
||||||
|
func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
|
||||||
type BroadcastResult struct {
|
successChan := make(chan bool, len(sw.peers.List()))
|
||||||
PeerKey string
|
|
||||||
Success bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sw *Switch) Broadcast(chID byte, msg interface{}) chan BroadcastResult {
|
|
||||||
successChan := make(chan BroadcastResult, len(sw.peers.List()))
|
|
||||||
sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg)
|
sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg)
|
||||||
for _, peer := range sw.peers.List() {
|
for _, peer := range sw.peers.List() {
|
||||||
go func(peer Peer) {
|
go func(peer Peer) {
|
||||||
success := peer.Send(chID, msg)
|
success := peer.Send(chID, msg)
|
||||||
successChan <- BroadcastResult{peer.Key(), success}
|
successChan <- success
|
||||||
}(peer)
|
}(peer)
|
||||||
}
|
}
|
||||||
return successChan
|
return successChan
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sw *Switch) TryBroadcast(chID byte, msg interface{}) chan BroadcastResult {
|
|
||||||
successChan := make(chan BroadcastResult, len(sw.peers.List()))
|
|
||||||
sw.Logger.Debug("TryBroadcast", "channel", chID, "msg", msg)
|
|
||||||
for _, peer := range sw.peers.List() {
|
|
||||||
success := peer.TrySend(chID, msg)
|
|
||||||
if success {
|
|
||||||
successChan <- BroadcastResult{peer.Key(), success}
|
|
||||||
} else {
|
|
||||||
go func(peer Peer) {
|
|
||||||
success := peer.Send(chID, msg)
|
|
||||||
successChan <- BroadcastResult{peer.Key(), success}
|
|
||||||
}(peer)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return successChan
|
|
||||||
}
|
|
||||||
|
|
||||||
// NumPeers returns the count of outbound/inbound and outbound-dialing peers.
|
// NumPeers returns the count of outbound/inbound and outbound-dialing peers.
|
||||||
func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
|
func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
|
||||||
peers := sw.peers.List()
|
peers := sw.peers.List()
|
||||||
|
@@ -128,17 +128,14 @@ func TestSwitches(t *testing.T) {
|
|||||||
ch0Msg := "channel zero"
|
ch0Msg := "channel zero"
|
||||||
ch1Msg := "channel foo"
|
ch1Msg := "channel foo"
|
||||||
ch2Msg := "channel bar"
|
ch2Msg := "channel bar"
|
||||||
ch3Msg := "channel baz"
|
|
||||||
|
|
||||||
s1.Broadcast(byte(0x00), ch0Msg)
|
s1.Broadcast(byte(0x00), ch0Msg)
|
||||||
s1.Broadcast(byte(0x01), ch1Msg)
|
s1.Broadcast(byte(0x01), ch1Msg)
|
||||||
s1.Broadcast(byte(0x02), ch2Msg)
|
s1.Broadcast(byte(0x02), ch2Msg)
|
||||||
s1.TryBroadcast(byte(0x03), ch3Msg)
|
|
||||||
|
|
||||||
assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
assertMsgReceivedWithTimeout(t, ch0Msg, byte(0x00), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
||||||
assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
assertMsgReceivedWithTimeout(t, ch1Msg, byte(0x01), s2.Reactor("foo").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
||||||
assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
assertMsgReceivedWithTimeout(t, ch2Msg, byte(0x02), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
||||||
assertMsgReceivedWithTimeout(t, ch3Msg, byte(0x03), s2.Reactor("bar").(*TestReactor), 10*time.Millisecond, 5*time.Second)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
|
func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reactor *TestReactor, checkPeriod, timeout time.Duration) {
|
||||||
@@ -331,11 +328,8 @@ func BenchmarkSwitches(b *testing.B) {
|
|||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
chID := byte(i % 4)
|
chID := byte(i % 4)
|
||||||
successChan := s1.Broadcast(chID, "test data")
|
successChan := s1.Broadcast(chID, "test data")
|
||||||
for res := range successChan {
|
for s := range successChan {
|
||||||
if !s1.peers.Has(res.PeerKey) {
|
if s {
|
||||||
b.Error("Unexpected peerKey: " + res.PeerKey)
|
|
||||||
}
|
|
||||||
if res.Success {
|
|
||||||
numSuccess++
|
numSuccess++
|
||||||
} else {
|
} else {
|
||||||
numFailure++
|
numFailure++
|
||||||
|
Reference in New Issue
Block a user