diff --git a/node/node.go b/node/node.go index 5620f1bb..892a0c80 100644 --- a/node/node.go +++ b/node/node.go @@ -658,9 +658,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetConsensusReactor(n.consensusReactor) rpccore.SetEventBus(n.eventBus) rpccore.SetLogger(n.Logger.With("module", "rpc")) - rpccore.MaxSubscriptionClients = n.config.RPC.MaxSubscriptionClients - rpccore.MaxSubscriptionsPerClient = n.config.RPC.MaxSubscriptionsPerClient - rpccore.TimeoutBroadcastTxCommit = n.config.RPC.TimeoutBroadcastTxCommit + rpccore.SetConfig(*n.config.RPC) } func (n *Node) startRPC() ([]net.Listener, error) { diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 79b41246..49c9796e 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -295,7 +295,9 @@ func (w *WSEvents) OnStop() { } // Subscribe implements EventsClient by using WSClient to subscribe given -// subscriber to query. +// subscriber to query. By default, returns a channel with cap=1. Error is +// returned if it fails to subscribe. +// Channel is never closed to prevent clients from seeing an erroneus event. func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { @@ -303,7 +305,7 @@ func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string, return nil, err } - outCap := 0 + outCap := 1 if len(outCapacity) > 0 { outCap = outCapacity[0] } diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index 5b86a92f..080ca5d7 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -7,6 +7,7 @@ import ( "github.com/pkg/errors" cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" nm "github.com/tendermint/tendermint/node" @@ -31,6 +32,7 @@ powerful control during testing, you probably want the "client/mock" package. */ type Local struct { *types.EventBus + Logger log.Logger } // NewLocal configures a client that calls the Node directly. @@ -43,6 +45,7 @@ func NewLocal(node *nm.Node) *Local { node.ConfigureRPC() return &Local{ EventBus: node.EventBus(), + Logger: log.NewNopLogger(), } } @@ -52,6 +55,11 @@ var ( _ EventsClient = (*Local)(nil) ) +// SetLogger allows to set a logger on the client. +func (c *Local) SetLogger(l log.Logger) { + c.Logger = l +} + func (Local) Status() (*ctypes.ResultStatus, error) { return core.Status() } @@ -145,7 +153,9 @@ func (Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.Resu } // Subscribe implements EventsClient by using local eventBus to subscribe given -// subscriber to query. +// subscriber to query.By default, returns a channel with cap=1. Error is +// returned if it fails to subscribe. +// Channel is never closed to prevent clients from seeing an erroneus event. func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { q, err := tmquery.New(query) if err != nil { @@ -156,7 +166,7 @@ func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapa return nil, errors.Wrap(err, "failed to subscribe") } - outCap := 0 + outCap := 1 if len(outCapacity) > 0 { outCap = outCapacity[0] } @@ -166,18 +176,20 @@ func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapa for { select { case msg := <-sub.Out(): + result := ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()} if cap(outc) == 0 { - outc <- ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()} + outc <- result } else { select { - case outc <- ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}: + case outc <- result: default: - // XXX: client has missed an event. inform it somehow! + c.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query) } } case <-sub.Cancelled(): if sub.Err() != tmpubsub.ErrUnsubscribed { // resubscribe + c.Logger.Error("subscription was cancelled, resubscribing...", "err", err, "query", query) var err error for { if !c.IsRunning() { diff --git a/rpc/core/events.go b/rpc/core/events.go index 5139cd38..5188b871 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -93,10 +93,10 @@ import ( func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) { addr := wsCtx.GetRemoteAddr() - if eventBus.NumClients() > MaxSubscriptionClients { - return nil, fmt.Errorf("max_subscription_clients %d reached", MaxSubscriptionClients) - } else if eventBus.NumClientSubscriptions(addr) > MaxSubscriptionsPerClient { - return nil, fmt.Errorf("max_subscriptions_per_client %d reached", MaxSubscriptionsPerClient) + if eventBus.NumClients() > config.MaxSubscriptionClients { + return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients) + } else if eventBus.NumClientSubscriptions(addr) > config.MaxSubscriptionsPerClient { + return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient) } logger.Info("Subscribe to query", "remote", addr, "query", query) diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index b894d75d..48855b01 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -168,10 +168,10 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // XXX: should be the remote IP address of the caller subscriber := "mempool" - if eventBus.NumClients() > MaxSubscriptionClients { - return nil, fmt.Errorf("max_subscription_clients %d reached", MaxSubscriptionClients) - } else if eventBus.NumClientSubscriptions(subscriber) > MaxSubscriptionsPerClient { - return nil, fmt.Errorf("max_subscriptions_per_client %d reached", MaxSubscriptionsPerClient) + if eventBus.NumClients() > config.MaxSubscriptionClients { + return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients) + } else if eventBus.NumClientSubscriptions(subscriber) > config.MaxSubscriptionsPerClient { + return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient) } // Subscribe to tx being committed in block. @@ -223,7 +223,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { DeliverTx: abci.ResponseDeliverTx{}, Hash: tx.Hash(), }, err - case <-time.After(TimeoutBroadcastTxCommit): + case <-time.After(config.TimeoutBroadcastTxCommit): err = errors.New("Timed out waiting for tx to be included in a block") logger.Error("Error on broadcastTxCommit", "err", err) return &ctypes.ResultBroadcastTxCommit{ diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 34355223..0b760344 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -1,8 +1,7 @@ package core import ( - "time" - + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" dbm "github.com/tendermint/tendermint/libs/db" @@ -74,14 +73,7 @@ var ( logger log.Logger - // config variables - - // MaxSubscriptionClients mirrors RPCConfig.MaxSubscriptionClients - MaxSubscriptionClients int - // MaxSubscriptionsPerClient mirrors RPCConfig.MaxSubscriptionsPerClient - MaxSubscriptionsPerClient int - // TimeoutBroadcastTxCommit mirrors RPCConfig.TimeoutBroadcastTxCommit - TimeoutBroadcastTxCommit time.Duration + config cfg.RPCConfig ) func SetStateDB(db dbm.DB) { @@ -144,6 +136,11 @@ func SetEventBus(b *types.EventBus) { eventBus = b } +// SetConfig sets an RPCConfig. +func SetConfig(c cfg.RPCConfig) { + config = c +} + func validatePage(page, perPage, totalCount int) int { if perPage < 1 { return 1