mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-07 10:31:19 +00:00
refactor MConnection#sendBytes
This commit is contained in:
parent
06d219db8e
commit
ebe23f1379
@ -557,14 +557,12 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
|
|||||||
// Goroutine-safe
|
// Goroutine-safe
|
||||||
// Times out (and returns false) after defaultSendTimeout
|
// Times out (and returns false) after defaultSendTimeout
|
||||||
func (ch *Channel) sendBytes(bytes []byte) bool {
|
func (ch *Channel) sendBytes(bytes []byte) bool {
|
||||||
timeout := time.NewTimer(defaultSendTimeout)
|
|
||||||
select {
|
select {
|
||||||
case <-timeout.C:
|
|
||||||
// timeout
|
|
||||||
return false
|
|
||||||
case ch.sendQueue <- bytes:
|
case ch.sendQueue <- bytes:
|
||||||
atomic.AddInt32(&ch.sendQueueSize, 1)
|
atomic.AddInt32(&ch.sendQueueSize, 1)
|
||||||
return true
|
return true
|
||||||
|
case <-time.After(defaultSendTimeout):
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,7 +19,7 @@ func createMConnection(conn net.Conn) *p2p.MConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
|
func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msgBytes []byte), onError func(r interface{})) *p2p.MConnection {
|
||||||
chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1}}
|
chDescs := []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{ID: 0x01, Priority: 1, SendQueueCapacity: 1}}
|
||||||
return p2p.NewMConnection(conn, chDescs, onReceive, onError)
|
return p2p.NewMConnection(conn, chDescs, onReceive, onError)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -37,13 +37,18 @@ func TestMConnectionSend(t *testing.T) {
|
|||||||
|
|
||||||
msg := "Ant-Man"
|
msg := "Ant-Man"
|
||||||
assert.True(mconn.Send(0x01, msg))
|
assert.True(mconn.Send(0x01, msg))
|
||||||
assert.False(mconn.CanSend(0x01))
|
assert.False(mconn.CanSend(0x01), "CanSend should return false because queue is full")
|
||||||
|
// assert.False(mconn.Send(0x01, msg), "Send should return false because queue is full")
|
||||||
|
// assert.False(mconn.TrySend(0x01, msg), "TrySend should return false because queue is full")
|
||||||
server.Read(make([]byte, len(msg)))
|
server.Read(make([]byte, len(msg)))
|
||||||
assert.True(mconn.CanSend(0x01))
|
assert.True(mconn.CanSend(0x01))
|
||||||
|
|
||||||
msg = "Spider-Man"
|
msg = "Spider-Man"
|
||||||
assert.True(mconn.TrySend(0x01, msg))
|
assert.True(mconn.TrySend(0x01, msg))
|
||||||
server.Read(make([]byte, len(msg)))
|
server.Read(make([]byte, len(msg)))
|
||||||
|
|
||||||
|
assert.False(mconn.CanSend(0x05), "CanSend should return false because channel is unknown")
|
||||||
|
assert.False(mconn.Send(0x05, "Absorbing Man"), "Send should return false because channel is unknown")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMConnectionReceive(t *testing.T) {
|
func TestMConnectionReceive(t *testing.T) {
|
||||||
|
21
peer_test.go
21
peer_test.go
@ -55,6 +55,27 @@ func TestPeerWithoutAuthEnc(t *testing.T) {
|
|||||||
assert.True(p.IsRunning())
|
assert.True(p.IsRunning())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPeerSend(t *testing.T) {
|
||||||
|
assert, require := assert.New(t), require.New(t)
|
||||||
|
|
||||||
|
config := DefaultPeerConfig()
|
||||||
|
config.AuthEnc = false
|
||||||
|
|
||||||
|
// simulate remote peer
|
||||||
|
rp := &remotePeer{PrivKey: crypto.GenPrivKeyEd25519(), Config: config}
|
||||||
|
rp.Start()
|
||||||
|
defer rp.Stop()
|
||||||
|
|
||||||
|
p, err := createOutboundPeerAndPerformHandshake(rp.Addr(), config)
|
||||||
|
require.Nil(err)
|
||||||
|
|
||||||
|
p.Start()
|
||||||
|
defer p.Stop()
|
||||||
|
|
||||||
|
assert.True(p.CanSend(0x01))
|
||||||
|
assert.True(p.Send(0x01, "Asylum"))
|
||||||
|
}
|
||||||
|
|
||||||
func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*Peer, error) {
|
func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*Peer, error) {
|
||||||
chDescs := []*ChannelDescriptor{
|
chDescs := []*ChannelDescriptor{
|
||||||
&ChannelDescriptor{ID: 0x01, Priority: 1},
|
&ChannelDescriptor{ID: 0x01, Priority: 1},
|
||||||
|
Loading…
x
Reference in New Issue
Block a user