mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-30 09:12:14 +00:00
Merge pull request #896 from tendermint/normalize-priority-and-id
normalize priority and id and remove pointers in ChannelDescriptor
This commit is contained in:
commit
94e400a5d6
6
glide.lock
generated
6
glide.lock
generated
@ -1,5 +1,5 @@
|
||||
hash: 223d8e42a118e7861cb673ea58a035e99d3a98c94e4b71fb52998d320f9c3b49
|
||||
updated: 2017-11-22T07:33:50.996598926-08:00
|
||||
updated: 2017-11-25T22:00:24.612202481-08:00
|
||||
imports:
|
||||
- name: github.com/btcsuite/btcd
|
||||
version: 8cea3866d0f7fb12d567a20744942c0d078c7d15
|
||||
@ -159,9 +159,7 @@ imports:
|
||||
- lex/httplex
|
||||
- trace
|
||||
- name: golang.org/x/sys
|
||||
version: 82aafbf43bf885069dc71b7e7c2f9d7a614d47da
|
||||
subpackages:
|
||||
- unix
|
||||
version: b98136db334ff9cb24f28a68e3be3cb6608f7630
|
||||
- name: golang.org/x/text
|
||||
version: 88f656faf3f37f690df1a32515b479415e1a6769
|
||||
subpackages:
|
||||
|
@ -149,9 +149,8 @@ func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onRec
|
||||
var channels = []*Channel{}
|
||||
|
||||
for _, desc := range chDescs {
|
||||
descCopy := *desc // copy the desc else unsafe access across connections
|
||||
channel := newChannel(mconn, &descCopy)
|
||||
channelsIdx[channel.id] = channel
|
||||
channel := newChannel(mconn, *desc)
|
||||
channelsIdx[channel.desc.ID] = channel
|
||||
channels = append(channels, channel)
|
||||
}
|
||||
mconn.channels = channels
|
||||
@ -375,7 +374,7 @@ func (c *MConnection) sendMsgPacket() bool {
|
||||
continue
|
||||
}
|
||||
// Get ratio, and keep track of lowest ratio.
|
||||
ratio := float32(channel.recentlySent) / float32(channel.priority)
|
||||
ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
|
||||
if ratio < leastRatio {
|
||||
leastRatio = ratio
|
||||
leastChannel = channel
|
||||
@ -519,10 +518,10 @@ func (c *MConnection) Status() ConnectionStatus {
|
||||
status.Channels = make([]ChannelStatus, len(c.channels))
|
||||
for i, channel := range c.channels {
|
||||
status.Channels[i] = ChannelStatus{
|
||||
ID: channel.id,
|
||||
ID: channel.desc.ID,
|
||||
SendQueueCapacity: cap(channel.sendQueue),
|
||||
SendQueueSize: int(channel.sendQueueSize), // TODO use atomic
|
||||
Priority: channel.priority,
|
||||
Priority: channel.desc.Priority,
|
||||
RecentlySent: channel.recentlySent,
|
||||
}
|
||||
}
|
||||
@ -539,7 +538,7 @@ type ChannelDescriptor struct {
|
||||
RecvMessageCapacity int
|
||||
}
|
||||
|
||||
func (chDesc *ChannelDescriptor) FillDefaults() {
|
||||
func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
|
||||
if chDesc.SendQueueCapacity == 0 {
|
||||
chDesc.SendQueueCapacity = defaultSendQueueCapacity
|
||||
}
|
||||
@ -549,36 +548,34 @@ func (chDesc *ChannelDescriptor) FillDefaults() {
|
||||
if chDesc.RecvMessageCapacity == 0 {
|
||||
chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
|
||||
}
|
||||
filled = chDesc
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: lowercase.
|
||||
// NOTE: not goroutine-safe.
|
||||
type Channel struct {
|
||||
conn *MConnection
|
||||
desc *ChannelDescriptor
|
||||
id byte
|
||||
desc ChannelDescriptor
|
||||
sendQueue chan []byte
|
||||
sendQueueSize int32 // atomic.
|
||||
recving []byte
|
||||
sending []byte
|
||||
priority int
|
||||
recentlySent int64 // exponential moving average
|
||||
|
||||
maxMsgPacketPayloadSize int
|
||||
}
|
||||
|
||||
func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
|
||||
desc.FillDefaults()
|
||||
func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel {
|
||||
desc = desc.FillDefaults()
|
||||
if desc.Priority <= 0 {
|
||||
cmn.PanicSanity("Channel default priority must be a postive integer")
|
||||
}
|
||||
return &Channel{
|
||||
conn: conn,
|
||||
desc: desc,
|
||||
id: desc.ID,
|
||||
sendQueue: make(chan []byte, desc.SendQueueCapacity),
|
||||
recving: make([]byte, 0, desc.RecvBufferCapacity),
|
||||
priority: desc.Priority,
|
||||
maxMsgPacketPayloadSize: conn.config.maxMsgPacketPayloadSize,
|
||||
}
|
||||
}
|
||||
@ -637,7 +634,7 @@ func (ch *Channel) isSendPending() bool {
|
||||
// Not goroutine-safe
|
||||
func (ch *Channel) nextMsgPacket() msgPacket {
|
||||
packet := msgPacket{}
|
||||
packet.ChannelID = byte(ch.id)
|
||||
packet.ChannelID = byte(ch.desc.ID)
|
||||
maxSize := ch.maxMsgPacketPayloadSize
|
||||
packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
|
||||
if len(ch.sending) <= maxSize {
|
||||
|
Loading…
x
Reference in New Issue
Block a user