mirror of
https://github.com/fluencelabs/tendermint
synced 2025-05-11 14:34:46 +00:00
Merge pull request #634 from tendermint/config-p2p
p2p: put maxMsgPacketPayloadSize, recvRate, sendRate in config
This commit is contained in:
commit
b856c45b9c
@ -224,16 +224,28 @@ type P2PConfig struct {
|
|||||||
|
|
||||||
// Time to wait before flushing messages out on the connection. In ms
|
// Time to wait before flushing messages out on the connection. In ms
|
||||||
FlushThrottleTimeout int `mapstructure:"flush_throttle_timeout"`
|
FlushThrottleTimeout int `mapstructure:"flush_throttle_timeout"`
|
||||||
|
|
||||||
|
// Maximum size of a message packet payload
|
||||||
|
MaxMsgPacketPayloadSize int `mapstructure:"max_msg_packet_payload_size"`
|
||||||
|
|
||||||
|
// Rate at which packets can be sent (in bytes/second)
|
||||||
|
SendRate int64 `mapstructure:"send_rate"`
|
||||||
|
|
||||||
|
// Rate at which packets can be received (in bytes/second)
|
||||||
|
RecvRate int64 `mapstructure:"recv_rate"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultP2PConfig returns a default configuration for the peer-to-peer layer
|
// DefaultP2PConfig returns a default configuration for the peer-to-peer layer
|
||||||
func DefaultP2PConfig() *P2PConfig {
|
func DefaultP2PConfig() *P2PConfig {
|
||||||
return &P2PConfig{
|
return &P2PConfig{
|
||||||
ListenAddress: "tcp://0.0.0.0:46656",
|
ListenAddress: "tcp://0.0.0.0:46656",
|
||||||
AddrBook: "addrbook.json",
|
AddrBook: "addrbook.json",
|
||||||
AddrBookStrict: true,
|
AddrBookStrict: true,
|
||||||
MaxNumPeers: 50,
|
MaxNumPeers: 50,
|
||||||
FlushThrottleTimeout: 100,
|
FlushThrottleTimeout: 100,
|
||||||
|
MaxMsgPacketPayloadSize: 1024, // 1 kB
|
||||||
|
SendRate: 512000, // 500 kB/s
|
||||||
|
RecvRate: 512000, // 500 kB/s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,15 +22,15 @@ const (
|
|||||||
updateState = 2 * time.Second
|
updateState = 2 * time.Second
|
||||||
pingTimeout = 40 * time.Second
|
pingTimeout = 40 * time.Second
|
||||||
|
|
||||||
// flushThrottle used here as a default.
|
// some of these defaults are written in the user config
|
||||||
// overwritten by the user config.
|
// flushThrottle, sendRate, recvRate
|
||||||
// TODO: remove
|
// TODO: remove values present in config
|
||||||
flushThrottle = 100 * time.Millisecond
|
defaultFlushThrottle = 100 * time.Millisecond
|
||||||
|
|
||||||
defaultSendQueueCapacity = 1
|
defaultSendQueueCapacity = 1
|
||||||
defaultSendRate = int64(512000) // 500KB/s
|
|
||||||
defaultRecvBufferCapacity = 4096
|
defaultRecvBufferCapacity = 4096
|
||||||
defaultRecvMessageCapacity = 22020096 // 21MB
|
defaultRecvMessageCapacity = 22020096 // 21MB
|
||||||
|
defaultSendRate = int64(512000) // 500KB/s
|
||||||
defaultRecvRate = int64(512000) // 500KB/s
|
defaultRecvRate = int64(512000) // 500KB/s
|
||||||
defaultSendTimeout = 10 * time.Second
|
defaultSendTimeout = 10 * time.Second
|
||||||
)
|
)
|
||||||
@ -94,15 +94,22 @@ type MConnConfig struct {
|
|||||||
SendRate int64 `mapstructure:"send_rate"`
|
SendRate int64 `mapstructure:"send_rate"`
|
||||||
RecvRate int64 `mapstructure:"recv_rate"`
|
RecvRate int64 `mapstructure:"recv_rate"`
|
||||||
|
|
||||||
|
maxMsgPacketPayloadSize int
|
||||||
|
|
||||||
flushThrottle time.Duration
|
flushThrottle time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cfg *MConnConfig) maxMsgPacketTotalSize() int {
|
||||||
|
return cfg.maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
|
||||||
|
}
|
||||||
|
|
||||||
// DefaultMConnConfig returns the default config.
|
// DefaultMConnConfig returns the default config.
|
||||||
func DefaultMConnConfig() *MConnConfig {
|
func DefaultMConnConfig() *MConnConfig {
|
||||||
return &MConnConfig{
|
return &MConnConfig{
|
||||||
SendRate: defaultSendRate,
|
SendRate: defaultSendRate,
|
||||||
RecvRate: defaultRecvRate,
|
RecvRate: defaultRecvRate,
|
||||||
flushThrottle: flushThrottle,
|
maxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize,
|
||||||
|
flushThrottle: defaultFlushThrottle,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -342,7 +349,7 @@ func (c *MConnection) sendSomeMsgPackets() bool {
|
|||||||
// Block until .sendMonitor says we can write.
|
// Block until .sendMonitor says we can write.
|
||||||
// Once we're ready we send more than we asked for,
|
// Once we're ready we send more than we asked for,
|
||||||
// but amortized it should even out.
|
// but amortized it should even out.
|
||||||
c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true)
|
c.sendMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.SendRate), true)
|
||||||
|
|
||||||
// Now send some msgPackets.
|
// Now send some msgPackets.
|
||||||
for i := 0; i < numBatchMsgPackets; i++ {
|
for i := 0; i < numBatchMsgPackets; i++ {
|
||||||
@ -400,7 +407,7 @@ func (c *MConnection) recvRoutine() {
|
|||||||
FOR_LOOP:
|
FOR_LOOP:
|
||||||
for {
|
for {
|
||||||
// Block until .recvMonitor says we can read.
|
// Block until .recvMonitor says we can read.
|
||||||
c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true)
|
c.recvMonitor.Limit(c.config.maxMsgPacketTotalSize(), atomic.LoadInt64(&c.config.RecvRate), true)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
// Peek into bufReader for debugging
|
// Peek into bufReader for debugging
|
||||||
@ -441,7 +448,7 @@ FOR_LOOP:
|
|||||||
c.Logger.Debug("Receive Pong")
|
c.Logger.Debug("Receive Pong")
|
||||||
case packetTypeMsg:
|
case packetTypeMsg:
|
||||||
pkt, n, err := msgPacket{}, int(0), error(nil)
|
pkt, n, err := msgPacket{}, int(0), error(nil)
|
||||||
wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
|
wire.ReadBinaryPtr(&pkt, c.bufReader, c.config.maxMsgPacketTotalSize(), &n, &err)
|
||||||
c.recvMonitor.Update(int(n))
|
c.recvMonitor.Update(int(n))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if c.IsRunning() {
|
if c.IsRunning() {
|
||||||
@ -547,6 +554,8 @@ type Channel struct {
|
|||||||
sending []byte
|
sending []byte
|
||||||
priority int
|
priority int
|
||||||
recentlySent int64 // exponential moving average
|
recentlySent int64 // exponential moving average
|
||||||
|
|
||||||
|
maxMsgPacketPayloadSize int
|
||||||
}
|
}
|
||||||
|
|
||||||
func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
|
func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
|
||||||
@ -555,12 +564,13 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
|
|||||||
cmn.PanicSanity("Channel default priority must be a postive integer")
|
cmn.PanicSanity("Channel default priority must be a postive integer")
|
||||||
}
|
}
|
||||||
return &Channel{
|
return &Channel{
|
||||||
conn: conn,
|
conn: conn,
|
||||||
desc: desc,
|
desc: desc,
|
||||||
id: desc.ID,
|
id: desc.ID,
|
||||||
sendQueue: make(chan []byte, desc.SendQueueCapacity),
|
sendQueue: make(chan []byte, desc.SendQueueCapacity),
|
||||||
recving: make([]byte, 0, desc.RecvBufferCapacity),
|
recving: make([]byte, 0, desc.RecvBufferCapacity),
|
||||||
priority: desc.Priority,
|
priority: desc.Priority,
|
||||||
|
maxMsgPacketPayloadSize: conn.config.maxMsgPacketPayloadSize,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -619,14 +629,15 @@ func (ch *Channel) isSendPending() bool {
|
|||||||
func (ch *Channel) nextMsgPacket() msgPacket {
|
func (ch *Channel) nextMsgPacket() msgPacket {
|
||||||
packet := msgPacket{}
|
packet := msgPacket{}
|
||||||
packet.ChannelID = byte(ch.id)
|
packet.ChannelID = byte(ch.id)
|
||||||
packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
|
maxSize := ch.maxMsgPacketPayloadSize
|
||||||
if len(ch.sending) <= maxMsgPacketPayloadSize {
|
packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
|
||||||
|
if len(ch.sending) <= maxSize {
|
||||||
packet.EOF = byte(0x01)
|
packet.EOF = byte(0x01)
|
||||||
ch.sending = nil
|
ch.sending = nil
|
||||||
atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
|
atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
|
||||||
} else {
|
} else {
|
||||||
packet.EOF = byte(0x00)
|
packet.EOF = byte(0x00)
|
||||||
ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
|
ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
|
||||||
}
|
}
|
||||||
return packet
|
return packet
|
||||||
}
|
}
|
||||||
@ -675,9 +686,9 @@ func (ch *Channel) updateStats() {
|
|||||||
//-----------------------------------------------------------------------------
|
//-----------------------------------------------------------------------------
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxMsgPacketPayloadSize = 1024
|
defaultMaxMsgPacketPayloadSize = 1024
|
||||||
|
|
||||||
maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
|
maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
|
||||||
maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
|
|
||||||
packetTypePing = byte(0x01)
|
packetTypePing = byte(0x01)
|
||||||
packetTypePong = byte(0x02)
|
packetTypePong = byte(0x02)
|
||||||
packetTypeMsg = byte(0x03)
|
packetTypeMsg = byte(0x03)
|
||||||
|
@ -90,7 +90,13 @@ func NewSwitch(config *cfg.P2PConfig) *Switch {
|
|||||||
dialing: cmn.NewCMap(),
|
dialing: cmn.NewCMap(),
|
||||||
nodeInfo: nil,
|
nodeInfo: nil,
|
||||||
}
|
}
|
||||||
sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond // TODO: collapse the peerConfig into the config ?
|
|
||||||
|
// TODO: collapse the peerConfig into the config ?
|
||||||
|
sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond
|
||||||
|
sw.peerConfig.MConfig.SendRate = config.SendRate
|
||||||
|
sw.peerConfig.MConfig.RecvRate = config.RecvRate
|
||||||
|
sw.peerConfig.MConfig.maxMsgPacketPayloadSize = config.MaxMsgPacketPayloadSize
|
||||||
|
|
||||||
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
|
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
|
||||||
return sw
|
return sw
|
||||||
}
|
}
|
||||||
@ -176,7 +182,7 @@ func (sw *Switch) OnStart() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start listeners
|
// Start listeners
|
||||||
for _, listener := range sw.listeners {
|
for _, listener := range sw.listeners {
|
||||||
go sw.listenerRoutine(listener)
|
go sw.listenerRoutine(listener)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user