make some params configurable

This commit is contained in:
Ethan Buchman
2016-03-02 22:32:39 +00:00
committed by Jae Kwon
parent 7f6aad20fb
commit f28f791fff
4 changed files with 70 additions and 20 deletions

View File

@ -22,8 +22,6 @@ const (
idleTimeoutMinutes = 5
updateStatsSeconds = 2
pingTimeoutSeconds = 40
defaultSendRate = 512000 // 500KB/s
defaultRecvRate = 512000 // 500KB/s
flushThrottleMS = 100
defaultSendQueueCapacity = 1
defaultRecvBufferCapacity = 4096
@ -31,6 +29,13 @@ const (
defaultSendTimeoutSeconds = 10
)
// config keys
const (
sendRateKey = "p2p_send_rate"
recvRateKey = "p2p_recv_rate"
maxPayloadSizeKey = "p2p_max_msg_packet_payload_size"
)
type receiveCbFunc func(chID byte, msgBytes []byte)
type errorCbFunc func(interface{})
@ -94,8 +99,8 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei
bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
sendMonitor: flow.New(0, 0),
recvMonitor: flow.New(0, 0),
sendRate: defaultSendRate,
recvRate: defaultRecvRate,
sendRate: int64(config.GetInt(sendRateKey)),
recvRate: int64(config.GetInt(recvRateKey)),
send: make(chan struct{}, 1),
pong: make(chan struct{}),
onReceive: onReceive,
@ -314,7 +319,7 @@ func (c *MConnection) sendSomeMsgPackets() bool {
// Block until .sendMonitor says we can write.
// Once we're ready we send more than we asked for,
// but amortized it should even out.
c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.sendRate), true)
c.sendMonitor.Limit(maxMsgPacketTotalSize(), atomic.LoadInt64(&c.sendRate), true)
// Now send some msgPackets.
for i := 0; i < numBatchMsgPackets; i++ {
@ -372,7 +377,7 @@ func (c *MConnection) recvRoutine() {
FOR_LOOP:
for {
// Block until .recvMonitor says we can read.
c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.recvRate), true)
c.recvMonitor.Limit(maxMsgPacketTotalSize(), atomic.LoadInt64(&c.recvRate), true)
/*
// Peek into bufReader for debugging
@ -413,7 +418,7 @@ FOR_LOOP:
log.Info("Receive Pong")
case packetTypeMsg:
pkt, n, err := msgPacket{}, int(0), error(nil)
wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err)
wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize(), &n, &err)
c.recvMonitor.Update(int(n))
if err != nil {
if c.IsRunning() {
@ -593,14 +598,15 @@ func (ch *Channel) isSendPending() bool {
func (ch *Channel) nextMsgPacket() msgPacket {
packet := msgPacket{}
packet.ChannelID = byte(ch.id)
packet.Bytes = ch.sending[:MinInt(maxMsgPacketPayloadSize, len(ch.sending))]
if len(ch.sending) <= maxMsgPacketPayloadSize {
maxPayloadSize := config.GetInt(maxPayloadSizeKey)
packet.Bytes = ch.sending[:MinInt(maxPayloadSize, len(ch.sending))]
if len(ch.sending) <= maxPayloadSize {
packet.EOF = byte(0x01)
ch.sending = nil
atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize
} else {
packet.EOF = byte(0x00)
ch.sending = ch.sending[MinInt(maxMsgPacketPayloadSize, len(ch.sending)):]
ch.sending = ch.sending[MinInt(maxPayloadSize, len(ch.sending)):]
}
return packet
}
@ -644,10 +650,12 @@ func (ch *Channel) updateStats() {
//-----------------------------------------------------------------------------
func maxMsgPacketTotalSize() int {
return config.GetInt(maxPayloadSizeKey) + maxMsgPacketOverheadSize
}
const (
maxMsgPacketPayloadSize = 1024
maxMsgPacketOverheadSize = 10 // It's actually lower but good enough
maxMsgPacketTotalSize = maxMsgPacketPayloadSize + maxMsgPacketOverheadSize
packetTypePing = byte(0x01)
packetTypePong = byte(0x02)
packetTypeMsg = byte(0x03)