limit number of /subscribe clients and queries per client

Add the following config variables (under [rpc] section):
  * max_subscription_clients
  * max_subscriptions_per_client
  * timeout_broadcast_tx_commit

Fixes #2826
This commit is contained in:
Anton Kaliaev 2019-02-05 14:50:36 +04:00
parent 5b7034a329
commit 0cddfdc81d
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
11 changed files with 110 additions and 2 deletions

View File

@ -323,6 +323,17 @@ type RPCConfig struct {
// Should be < {ulimit -Sn} - {MaxNumInboundPeers} - {MaxNumOutboundPeers} - {N of wal, db and other open files} // Should be < {ulimit -Sn} - {MaxNumInboundPeers} - {MaxNumOutboundPeers} - {N of wal, db and other open files}
// 1024 - 40 - 10 - 50 = 924 = ~900 // 1024 - 40 - 10 - 50 = 924 = ~900
MaxOpenConnections int `mapstructure:"max_open_connections"` MaxOpenConnections int `mapstructure:"max_open_connections"`
// Maximum number of unique clientIDs that can /subscribe
MaxSubscriptionClients int `mapstructure:"max_subscription_clients"`
// Maximum number of unique queries a given client can /subscribe to (note
// all calls to /broadcast_tx_commit uses the same client, so this would
// limit the number of /broadcast_tx_commit calls that can be open at once)
MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"`
// How long to wait for a tx to be committed during /broadcast_tx_commit
TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"`
} }
// DefaultRPCConfig returns a default configuration for the RPC server // DefaultRPCConfig returns a default configuration for the RPC server
@ -337,6 +348,10 @@ func DefaultRPCConfig() *RPCConfig {
Unsafe: false, Unsafe: false,
MaxOpenConnections: 900, MaxOpenConnections: 900,
MaxSubscriptionClients: 100,
MaxSubscriptionsPerClient: 5,
TimeoutBroadcastTxCommit: 10 * time.Second,
} }
} }
@ -358,6 +373,15 @@ func (cfg *RPCConfig) ValidateBasic() error {
if cfg.MaxOpenConnections < 0 { if cfg.MaxOpenConnections < 0 {
return errors.New("max_open_connections can't be negative") return errors.New("max_open_connections can't be negative")
} }
if cfg.MaxSubscriptionClients < 0 {
return errors.New("max_subscription_clients can't be negative")
}
if cfg.MaxSubscriptionsPerClient < 0 {
return errors.New("max_subscriptions_per_client can't be negative")
}
if cfg.TimeoutBroadcastTxCommit < 0 {
return errors.New("timeout_broadcast_tx_commit can't be negative")
}
return nil return nil
} }

View File

@ -162,6 +162,17 @@ unsafe = {{ .RPC.Unsafe }}
# 1024 - 40 - 10 - 50 = 924 = ~900 # 1024 - 40 - 10 - 50 = 924 = ~900
max_open_connections = {{ .RPC.MaxOpenConnections }} max_open_connections = {{ .RPC.MaxOpenConnections }}
# Maximum number of unique clientIDs that can /subscribe
max_subscription_clients = {{ .RPC.MaxSubscriptionClients }}
# Maximum number of unique queries a given client can /subscribe to (note
# all calls to /broadcast_tx_commit uses the same client, so this would
# limit the number of /broadcast_tx_commit calls that can be open at once)
max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }}
# How long to wait for a tx to be committed during /broadcast_tx_commit.
timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}"
##### peer to peer configuration options ##### ##### peer to peer configuration options #####
[p2p] [p2p]

View File

@ -111,6 +111,17 @@ unsafe = false
# 1024 - 40 - 10 - 50 = 924 = ~900 # 1024 - 40 - 10 - 50 = 924 = ~900
max_open_connections = 900 max_open_connections = 900
# Maximum number of unique clientIDs that can /subscribe
max_subscription_clients = 100
# Maximum number of unique queries a given client can /subscribe to (note
# all calls to /broadcast_tx_commit uses the same client, so this would
# limit the number of /broadcast_tx_commit calls that can be open at once)
max_subscriptions_per_client = 5
# How long to wait for a tx to be committed during /broadcast_tx_commit.
timeout_broadcast_tx_commit = "10s"
##### peer to peer configuration options ##### ##### peer to peer configuration options #####
[p2p] [p2p]

View File

@ -242,6 +242,18 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
} }
} }
func (s *Server) NumClients() int {
s.mtx.RLock()
defer s.mtx.RUnlock()
return len(s.subscriptions)
}
func (s *Server) NumClientSubscriptions(clientID string) int {
s.mtx.RLock()
defer s.mtx.RUnlock()
return len(s.subscriptions[clientID])
}
// Publish publishes the given message. An error will be returned to the caller // Publish publishes the given message. An error will be returned to the caller
// if the context is canceled. // if the context is canceled.
func (s *Server) Publish(ctx context.Context, msg interface{}) error { func (s *Server) Publish(ctx context.Context, msg interface{}) error {

View File

@ -658,6 +658,9 @@ func (n *Node) ConfigureRPC() {
rpccore.SetConsensusReactor(n.consensusReactor) rpccore.SetConsensusReactor(n.consensusReactor)
rpccore.SetEventBus(n.eventBus) rpccore.SetEventBus(n.eventBus)
rpccore.SetLogger(n.Logger.With("module", "rpc")) rpccore.SetLogger(n.Logger.With("module", "rpc"))
rpccore.MaxSubscriptionClients = n.config.RPC.MaxSubscriptionClients
rpccore.MaxSubscriptionsPerClient = n.config.RPC.MaxSubscriptionsPerClient
rpccore.TimeoutBroadcastTxCommit = n.config.RPC.TimeoutBroadcastTxCommit
} }
func (n *Node) startRPC() ([]net.Listener, error) { func (n *Node) startRPC() ([]net.Listener, error) {

View File

@ -318,6 +318,16 @@ func (w *WSEvents) OnStop() {
} }
} }
// TODO: remove
func (w *WSEvents) NumClients() int {
return 1
}
// TODO: remove
func (w *WSEvents) NumClientSubscriptions(clientID string) int {
return len(w.subscriptions)
}
func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) { func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) {
q := query.String() q := query.String()

View File

@ -92,6 +92,13 @@ import (
// <aside class="notice">WebSocket only</aside> // <aside class="notice">WebSocket only</aside>
func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) { func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) {
addr := wsCtx.GetRemoteAddr() addr := wsCtx.GetRemoteAddr()
if eventBusFor(wsCtx).NumClients() > MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", MaxSubscriptionClients)
} else if eventBusFor(wsCtx).NumClientSubscriptions(addr) > MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", MaxSubscriptionsPerClient)
}
logger.Info("Subscribe to query", "remote", addr, "query", query) logger.Info("Subscribe to query", "remote", addr, "query", query)
q, err := tmquery.New(query) q, err := tmquery.New(query)

View File

@ -166,17 +166,26 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// |-----------+------+---------+----------+-----------------| // |-----------+------+---------+----------+-----------------|
// | tx | Tx | nil | true | The transaction | // | tx | Tx | nil | true | The transaction |
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// XXX: should be the remote IP address of the caller
subscriber := "mempool"
if eventBus.NumClients() > MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", MaxSubscriptionClients)
} else if eventBus.NumClientSubscriptions(subscriber) > MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", MaxSubscriptionsPerClient)
}
// Subscribe to tx being committed in block. // Subscribe to tx being committed in block.
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel() defer cancel()
q := types.EventQueryTxFor(tx) q := types.EventQueryTxFor(tx)
deliverTxSub, err := eventBus.Subscribe(ctx, "mempool", q) deliverTxSub, err := eventBus.Subscribe(ctx, subscriber, q)
if err != nil { if err != nil {
err = errors.Wrap(err, "failed to subscribe to tx") err = errors.Wrap(err, "failed to subscribe to tx")
logger.Error("Error on broadcast_tx_commit", "err", err) logger.Error("Error on broadcast_tx_commit", "err", err)
return nil, err return nil, err
} }
defer eventBus.Unsubscribe(context.Background(), "mempool", q) defer eventBus.Unsubscribe(context.Background(), subscriber, q)
// Broadcast tx and wait for CheckTx result // Broadcast tx and wait for CheckTx result
checkTxResCh := make(chan *abci.Response, 1) checkTxResCh := make(chan *abci.Response, 1)

View File

@ -1,6 +1,8 @@
package core package core
import ( import (
"time"
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
@ -71,6 +73,11 @@ var (
mempool *mempl.Mempool mempool *mempl.Mempool
logger log.Logger logger log.Logger
// XXX: godoc comment
MaxSubscriptionClients int
MaxSubscriptionsPerClient int
TimeoutBroadcastTxCommit time.Duration
) )
func SetStateDB(db dbm.DB) { func SetStateDB(db dbm.DB) {

View File

@ -256,6 +256,9 @@ type EventSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (tmtypes.Subscription, error) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (tmtypes.Subscription, error)
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error UnsubscribeAll(ctx context.Context, subscriber string) error
NumClients() int
NumClientSubscriptions(clientID string) int
} }
//---------------------------------------- //----------------------------------------

View File

@ -15,6 +15,9 @@ type EventBusSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error)
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error UnsubscribeAll(ctx context.Context, subscriber string) error
NumClients() int
NumClientSubscriptions(clientID string) int
} }
type Subscription interface { type Subscription interface {
@ -58,6 +61,14 @@ func (b *EventBus) OnStop() {
b.pubsub.Stop() b.pubsub.Stop()
} }
func (b *EventBus) NumClients() int {
return b.pubsub.NumClients()
}
func (b *EventBus) NumClientSubscriptions(clientID string) int {
return b.pubsub.NumClientSubscriptions(clientID)
}
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) { func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) {
return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...) return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...)
} }