change Subscribe#out cap to 1

and replace config vars with RPCConfig
This commit is contained in:
Anton Kaliaev
2019-02-28 16:51:29 +03:00
parent ee8884b24f
commit f45b01aeff
6 changed files with 38 additions and 29 deletions

View File

@ -658,9 +658,7 @@ func (n *Node) ConfigureRPC() {
rpccore.SetConsensusReactor(n.consensusReactor) rpccore.SetConsensusReactor(n.consensusReactor)
rpccore.SetEventBus(n.eventBus) rpccore.SetEventBus(n.eventBus)
rpccore.SetLogger(n.Logger.With("module", "rpc")) rpccore.SetLogger(n.Logger.With("module", "rpc"))
rpccore.MaxSubscriptionClients = n.config.RPC.MaxSubscriptionClients rpccore.SetConfig(*n.config.RPC)
rpccore.MaxSubscriptionsPerClient = n.config.RPC.MaxSubscriptionsPerClient
rpccore.TimeoutBroadcastTxCommit = n.config.RPC.TimeoutBroadcastTxCommit
} }
func (n *Node) startRPC() ([]net.Listener, error) { func (n *Node) startRPC() ([]net.Listener, error) {

View File

@ -295,7 +295,9 @@ func (w *WSEvents) OnStop() {
} }
// Subscribe implements EventsClient by using WSClient to subscribe given // Subscribe implements EventsClient by using WSClient to subscribe given
// subscriber to query. // subscriber to query. By default, returns a channel with cap=1. Error is
// returned if it fails to subscribe.
// Channel is never closed to prevent clients from seeing an erroneus event.
func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string, func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string,
outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
@ -303,7 +305,7 @@ func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string,
return nil, err return nil, err
} }
outCap := 0 outCap := 1
if len(outCapacity) > 0 { if len(outCapacity) > 0 {
outCap = outCapacity[0] outCap = outCapacity[0]
} }

View File

@ -7,6 +7,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query" tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
nm "github.com/tendermint/tendermint/node" nm "github.com/tendermint/tendermint/node"
@ -31,6 +32,7 @@ powerful control during testing, you probably want the "client/mock" package.
*/ */
type Local struct { type Local struct {
*types.EventBus *types.EventBus
Logger log.Logger
} }
// NewLocal configures a client that calls the Node directly. // NewLocal configures a client that calls the Node directly.
@ -43,6 +45,7 @@ func NewLocal(node *nm.Node) *Local {
node.ConfigureRPC() node.ConfigureRPC()
return &Local{ return &Local{
EventBus: node.EventBus(), EventBus: node.EventBus(),
Logger: log.NewNopLogger(),
} }
} }
@ -52,6 +55,11 @@ var (
_ EventsClient = (*Local)(nil) _ EventsClient = (*Local)(nil)
) )
// SetLogger allows to set a logger on the client.
func (c *Local) SetLogger(l log.Logger) {
c.Logger = l
}
func (Local) Status() (*ctypes.ResultStatus, error) { func (Local) Status() (*ctypes.ResultStatus, error) {
return core.Status() return core.Status()
} }
@ -145,7 +153,9 @@ func (Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.Resu
} }
// Subscribe implements EventsClient by using local eventBus to subscribe given // Subscribe implements EventsClient by using local eventBus to subscribe given
// subscriber to query. // subscriber to query.By default, returns a channel with cap=1. Error is
// returned if it fails to subscribe.
// Channel is never closed to prevent clients from seeing an erroneus event.
func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
q, err := tmquery.New(query) q, err := tmquery.New(query)
if err != nil { if err != nil {
@ -156,7 +166,7 @@ func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapa
return nil, errors.Wrap(err, "failed to subscribe") return nil, errors.Wrap(err, "failed to subscribe")
} }
outCap := 0 outCap := 1
if len(outCapacity) > 0 { if len(outCapacity) > 0 {
outCap = outCapacity[0] outCap = outCapacity[0]
} }
@ -166,18 +176,20 @@ func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapa
for { for {
select { select {
case msg := <-sub.Out(): case msg := <-sub.Out():
result := ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}
if cap(outc) == 0 { if cap(outc) == 0 {
outc <- ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()} outc <- result
} else { } else {
select { select {
case outc <- ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}: case outc <- result:
default: default:
// XXX: client has missed an event. inform it somehow! c.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query)
} }
} }
case <-sub.Cancelled(): case <-sub.Cancelled():
if sub.Err() != tmpubsub.ErrUnsubscribed { if sub.Err() != tmpubsub.ErrUnsubscribed {
// resubscribe // resubscribe
c.Logger.Error("subscription was cancelled, resubscribing...", "err", err, "query", query)
var err error var err error
for { for {
if !c.IsRunning() { if !c.IsRunning() {

View File

@ -93,10 +93,10 @@ import (
func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) { func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) {
addr := wsCtx.GetRemoteAddr() addr := wsCtx.GetRemoteAddr()
if eventBus.NumClients() > MaxSubscriptionClients { if eventBus.NumClients() > config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", MaxSubscriptionClients) return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients)
} else if eventBus.NumClientSubscriptions(addr) > MaxSubscriptionsPerClient { } else if eventBus.NumClientSubscriptions(addr) > config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", MaxSubscriptionsPerClient) return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient)
} }
logger.Info("Subscribe to query", "remote", addr, "query", query) logger.Info("Subscribe to query", "remote", addr, "query", query)

View File

@ -168,10 +168,10 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// XXX: should be the remote IP address of the caller // XXX: should be the remote IP address of the caller
subscriber := "mempool" subscriber := "mempool"
if eventBus.NumClients() > MaxSubscriptionClients { if eventBus.NumClients() > config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", MaxSubscriptionClients) return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients)
} else if eventBus.NumClientSubscriptions(subscriber) > MaxSubscriptionsPerClient { } else if eventBus.NumClientSubscriptions(subscriber) > config.MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", MaxSubscriptionsPerClient) return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient)
} }
// Subscribe to tx being committed in block. // Subscribe to tx being committed in block.
@ -223,7 +223,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
DeliverTx: abci.ResponseDeliverTx{}, DeliverTx: abci.ResponseDeliverTx{},
Hash: tx.Hash(), Hash: tx.Hash(),
}, err }, err
case <-time.After(TimeoutBroadcastTxCommit): case <-time.After(config.TimeoutBroadcastTxCommit):
err = errors.New("Timed out waiting for tx to be included in a block") err = errors.New("Timed out waiting for tx to be included in a block")
logger.Error("Error on broadcastTxCommit", "err", err) logger.Error("Error on broadcastTxCommit", "err", err)
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{

View File

@ -1,8 +1,7 @@
package core package core
import ( import (
"time" cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
@ -74,14 +73,7 @@ var (
logger log.Logger logger log.Logger
// config variables config cfg.RPCConfig
// MaxSubscriptionClients mirrors RPCConfig.MaxSubscriptionClients
MaxSubscriptionClients int
// MaxSubscriptionsPerClient mirrors RPCConfig.MaxSubscriptionsPerClient
MaxSubscriptionsPerClient int
// TimeoutBroadcastTxCommit mirrors RPCConfig.TimeoutBroadcastTxCommit
TimeoutBroadcastTxCommit time.Duration
) )
func SetStateDB(db dbm.DB) { func SetStateDB(db dbm.DB) {
@ -144,6 +136,11 @@ func SetEventBus(b *types.EventBus) {
eventBus = b eventBus = b
} }
// SetConfig sets an RPCConfig.
func SetConfig(c cfg.RPCConfig) {
config = c
}
func validatePage(page, perPage, totalCount int) int { func validatePage(page, perPage, totalCount int) int {
if perPage < 1 { if perPage < 1 {
return 1 return 1