mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-15 14:21:22 +00:00
msgCountByPeer is a CMap
This commit is contained in:
@ -49,7 +49,7 @@ type PEXReactor struct {
|
|||||||
ensurePeersPeriod time.Duration
|
ensurePeersPeriod time.Duration
|
||||||
|
|
||||||
// tracks message count by peer, so we can prevent abuse
|
// tracks message count by peer, so we can prevent abuse
|
||||||
msgCountByPeer map[string]uint16
|
msgCountByPeer *cmn.CMap
|
||||||
maxMsgCountByPeer uint16
|
maxMsgCountByPeer uint16
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -58,7 +58,7 @@ func NewPEXReactor(b *AddrBook) *PEXReactor {
|
|||||||
r := &PEXReactor{
|
r := &PEXReactor{
|
||||||
book: b,
|
book: b,
|
||||||
ensurePeersPeriod: defaultEnsurePeersPeriod,
|
ensurePeersPeriod: defaultEnsurePeersPeriod,
|
||||||
msgCountByPeer: make(map[string]uint16),
|
msgCountByPeer: cmn.NewCMap(),
|
||||||
maxMsgCountByPeer: defaultMaxMsgCountByPeer,
|
maxMsgCountByPeer: defaultMaxMsgCountByPeer,
|
||||||
}
|
}
|
||||||
r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r)
|
r.BaseReactor = *NewBaseReactor(log, "PEXReactor", r)
|
||||||
@ -122,7 +122,8 @@ func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
|
|||||||
func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
|
func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
|
||||||
srcAddr := src.Connection().RemoteAddress
|
srcAddr := src.Connection().RemoteAddress
|
||||||
srcAddrStr := srcAddr.String()
|
srcAddrStr := srcAddr.String()
|
||||||
r.msgCountByPeer[srcAddrStr]++
|
|
||||||
|
r.IncrementMsgCountForPeer(srcAddrStr)
|
||||||
if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
|
if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
|
||||||
log.Warn("Maximum number of messages reached for peer", "peer", srcAddrStr)
|
log.Warn("Maximum number of messages reached for peer", "peer", srcAddrStr)
|
||||||
// TODO remove src from peers?
|
// TODO remove src from peers?
|
||||||
@ -175,8 +176,20 @@ func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
|
|||||||
|
|
||||||
// ReachedMaxMsgCountForPeer returns true if we received too many
|
// ReachedMaxMsgCountForPeer returns true if we received too many
|
||||||
// messages from peer with address `addr`.
|
// messages from peer with address `addr`.
|
||||||
|
// NOTE: assumes the value in the CMap is non-nil
|
||||||
func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
|
func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
|
||||||
return r.msgCountByPeer[addr] >= r.maxMsgCountByPeer
|
return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increment or initialize the msg count for the peer in the CMap
|
||||||
|
func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
|
||||||
|
var count uint16
|
||||||
|
countI := r.msgCountByPeer.Get(addr)
|
||||||
|
if countI != nil {
|
||||||
|
count = countI.(uint16)
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
r.msgCountByPeer.Set(addr, count)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensures that sufficient peers are connected. (continuous)
|
// Ensures that sufficient peers are connected. (continuous)
|
||||||
@ -288,7 +301,7 @@ func (r *PEXReactor) flushMsgCountByPeer() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
r.msgCountByPeer = make(map[string]uint16)
|
r.msgCountByPeer.Clear()
|
||||||
case <-r.Quit:
|
case <-r.Quit:
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
return
|
return
|
||||||
|
Reference in New Issue
Block a user