diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 29400929..a8998c99 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -7,7 +7,7 @@ Special thanks to external contributors on this release: ### BREAKING CHANGES: * CLI/RPC/Config -- [httpclient] Update Subscribe interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md) +- [rpc/client] Update Subscribe interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md) * Apps @@ -27,6 +27,7 @@ Special thanks to external contributors on this release: - [config] \#2920 Remove `consensus.blocktime_iota` parameter - [genesis] \#2920 Add `time_iota_ms` to block's consensus parameters (not exposed to the application) - [genesis] \#2920 Rename `consensus_params.block_size` to `consensus_params.block` +- [lite] add `/unsubscribe_all` endpoint, which allows you to unsubscribe from all events ### IMPROVEMENTS: - [libs/common] \#3238 exit with zero (0) code upon receiving SIGTERM/SIGINT @@ -41,7 +42,6 @@ Special thanks to external contributors on this release: - leveldb.aliveiters ### BUG FIXES: - - [p2p/conn] \#3347 Reject all-zero shared secrets in the Diffie-Hellman step of secret-connection - [libs/pubsub] \#951, \#1880 use non-blocking send when dispatching messages [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md) - [p2p] \#3369 do not panic when filter times out diff --git a/config/config.go b/config/config.go index cfd76060..540012a5 100644 --- a/config/config.go +++ b/config/config.go @@ -7,6 +7,7 @@ import ( "time" "github.com/pkg/errors" + rpcserver "github.com/tendermint/tendermint/rpc/lib/server" ) const ( @@ -323,6 +324,19 @@ type RPCConfig struct { // Should be < {ulimit -Sn} - {MaxNumInboundPeers} - {MaxNumOutboundPeers} - {N of wal, db and other open files} // 1024 - 40 - 10 - 50 = 924 = ~900 MaxOpenConnections int `mapstructure:"max_open_connections"` + + // Maximum number of unique clientIDs that can /subscribe + // If you're using /broadcast_tx_commit, set to the estimated maximum number + // of broadcast_tx_commit calls per block. + MaxSubscriptionClients int `mapstructure:"max_subscription_clients"` + + // Maximum number of unique queries a given client can /subscribe to + // If you're using GRPC (or Local RPC client) and /broadcast_tx_commit, set + // to the estimated maximum number of broadcast_tx_commit calls per block. + MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"` + + // How long to wait for a tx to be committed during /broadcast_tx_commit + TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"` } // DefaultRPCConfig returns a default configuration for the RPC server @@ -337,6 +351,10 @@ func DefaultRPCConfig() *RPCConfig { Unsafe: false, MaxOpenConnections: 900, + + MaxSubscriptionClients: 100, + MaxSubscriptionsPerClient: 5, + TimeoutBroadcastTxCommit: 10 * time.Second, } } @@ -358,6 +376,18 @@ func (cfg *RPCConfig) ValidateBasic() error { if cfg.MaxOpenConnections < 0 { return errors.New("max_open_connections can't be negative") } + if cfg.MaxSubscriptionClients < 0 { + return errors.New("max_subscription_clients can't be negative") + } + if cfg.MaxSubscriptionsPerClient < 0 { + return errors.New("max_subscriptions_per_client can't be negative") + } + if cfg.TimeoutBroadcastTxCommit < 0 { + return errors.New("timeout_broadcast_tx_commit can't be negative") + } + if cfg.TimeoutBroadcastTxCommit > rpcserver.WriteTimeout { + return fmt.Errorf("timeout_broadcast_tx_commit can't be greater than rpc server's write timeout: %v", rpcserver.WriteTimeout) + } return nil } diff --git a/config/toml.go b/config/toml.go index 45b9a671..9ce7e76c 100644 --- a/config/toml.go +++ b/config/toml.go @@ -165,6 +165,19 @@ unsafe = {{ .RPC.Unsafe }} # 1024 - 40 - 10 - 50 = 924 = ~900 max_open_connections = {{ .RPC.MaxOpenConnections }} +# Maximum number of unique clientIDs that can /subscribe +# If you're using /broadcast_tx_commit, set to the estimated maximum number +# of broadcast_tx_commit calls per block. +max_subscription_clients = {{ .RPC.MaxSubscriptionClients }} + +# Maximum number of unique queries a given client can /subscribe to +# If you're using GRPC (or Local RPC client) and /broadcast_tx_commit, set to +# the estimated # maximum number of broadcast_tx_commit calls per block. +max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }} + +# How long to wait for a tx to be committed during /broadcast_tx_commit. +timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}" + ##### peer to peer configuration options ##### [p2p] diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index 4e188aae..f1ac753a 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -111,6 +111,19 @@ unsafe = false # 1024 - 40 - 10 - 50 = 924 = ~900 max_open_connections = 900 +# Maximum number of unique clientIDs that can /subscribe +# If you're using /broadcast_tx_commit, set to the estimated maximum number +# of broadcast_tx_commit calls per block. +max_subscription_clients = 100 + +# Maximum number of unique queries a given client can /subscribe to +# If you're using GRPC (or Local RPC client) and /broadcast_tx_commit, set to +# the estimated # maximum number of broadcast_tx_commit calls per block. +max_subscriptions_per_client = 5 + +# How long to wait for a tx to be committed during /broadcast_tx_commit. +timeout_broadcast_tx_commit = "10s" + ##### peer to peer configuration options ##### [p2p] diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 8d4d1fb0..f78dac1b 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -241,6 +241,20 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error { } } +// NumClients returns the number of clients. +func (s *Server) NumClients() int { + s.mtx.RLock() + defer s.mtx.RUnlock() + return len(s.subscriptions) +} + +// NumClientSubscriptions returns the number of subscriptions the client has. +func (s *Server) NumClientSubscriptions(clientID string) int { + s.mtx.RLock() + defer s.mtx.RUnlock() + return len(s.subscriptions[clientID]) +} + // Publish publishes the given message. An error will be returned to the caller // if the context is canceled. func (s *Server) Publish(ctx context.Context, msg interface{}) error { diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index e2bd50e6..88447756 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -29,6 +29,10 @@ func TestSubscribe(t *testing.T) { ctx := context.Background() subscription, err := s.Subscribe(ctx, clientID, query.Empty{}) require.NoError(t, err) + + assert.Equal(t, 1, s.NumClients()) + assert.Equal(t, 1, s.NumClientSubscriptions(clientID)) + err = s.Publish(ctx, "Ka-Zar") require.NoError(t, err) assertReceive(t, "Ka-Zar", subscription.Out()) diff --git a/lite/proxy/proxy.go b/lite/proxy/proxy.go index 39baf5a4..020e5753 100644 --- a/lite/proxy/proxy.go +++ b/lite/proxy/proxy.go @@ -1,6 +1,7 @@ package proxy import ( + "context" "net/http" amino "github.com/tendermint/go-amino" @@ -34,7 +35,12 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe mux := http.NewServeMux() rpcserver.RegisterRPCFuncs(mux, r, cdc, logger) - wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.EventSubscriber(c)) + unsubscribeFromAllEvents := func(remoteAddr string) { + if err := c.UnsubscribeAll(context.Background(), remoteAddr); err != nil { + logger.Error("Failed to unsubscribe from events", "err", err) + } + } + wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.OnDisconnect(unsubscribeFromAllEvents)) wm.SetLogger(logger) core.SetLogger(logger) mux.HandleFunc(wsEndpoint, wm.WebsocketHandler) @@ -51,13 +57,11 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe // // if we want security, the client must implement it as a secure client func RPCRoutes(c rpcclient.Client) map[string]*rpcserver.RPCFunc { - return map[string]*rpcserver.RPCFunc{ // Subscribe/unsubscribe are reserved for websocket events. - // We can just use the core tendermint impl, which uses the - // EventSwitch we registered in NewWebsocketManager above - "subscribe": rpcserver.NewWSRPCFunc(core.Subscribe, "query"), - "unsubscribe": rpcserver.NewWSRPCFunc(core.Unsubscribe, "query"), + "subscribe": rpcserver.NewWSRPCFunc(c.(Wrapper).SubscribeWS, "query"), + "unsubscribe": rpcserver.NewWSRPCFunc(c.(Wrapper).UnsubscribeWS, "query"), + "unsubscribe_all": rpcserver.NewWSRPCFunc(c.(Wrapper).UnsubscribeAllWS, ""), // info API "status": rpcserver.NewRPCFunc(c.Status, ""), diff --git a/lite/proxy/wrapper.go b/lite/proxy/wrapper.go index c90cdb27..2d333e9f 100644 --- a/lite/proxy/wrapper.go +++ b/lite/proxy/wrapper.go @@ -1,12 +1,16 @@ package proxy import ( + "context" + "fmt" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/crypto/merkle" "github.com/tendermint/tendermint/lite" rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" ) var _ rpcclient.Client = Wrapper{} @@ -149,6 +153,55 @@ func (w Wrapper) RegisterOpDecoder(typ string, dec merkle.OpDecoder) { w.prt.RegisterOpDecoder(typ, dec) } +// SubscribeWS subscribes for events using the given query and remote address as +// a subscriber, but does not verify responses (UNSAFE)! +func (w Wrapper) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { + out, err := w.Client.Subscribe(context.Background(), ctx.RemoteAddr(), query) + if err != nil { + return nil, err + } + + go func() { + for { + select { + case resultEvent := <-out: + // XXX(melekes) We should have a switch here that performs a validation + // depending on the event's type. + ctx.WSConn.TryWriteRPCResponse( + rpctypes.NewRPCSuccessResponse( + ctx.WSConn.Codec(), + rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + resultEvent, + )) + case <-w.Client.Quit(): + return + } + } + }() + + return &ctypes.ResultSubscribe{}, nil +} + +// UnsubscribeWS calls original client's Unsubscribe using remote address as a +// subscriber. +func (w Wrapper) UnsubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { + err := w.Client.Unsubscribe(context.Background(), ctx.RemoteAddr(), query) + if err != nil { + return nil, err + } + return &ctypes.ResultUnsubscribe{}, nil +} + +// UnsubscribeAllWS calls original client's UnsubscribeAll using remote address +// as a subscriber. +func (w Wrapper) UnsubscribeAllWS(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { + err := w.Client.UnsubscribeAll(context.Background(), ctx.RemoteAddr()) + if err != nil { + return nil, err + } + return &ctypes.ResultUnsubscribe{}, nil +} + // // WrappedSwitch creates a websocket connection that auto-verifies any info // // coming through before passing it along. // // diff --git a/node/node.go b/node/node.go index 2b803502..f3f9dca3 100644 --- a/node/node.go +++ b/node/node.go @@ -26,6 +26,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" dbm "github.com/tendermint/tendermint/libs/db" "github.com/tendermint/tendermint/libs/log" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p/pex" @@ -658,6 +659,7 @@ func (n *Node) ConfigureRPC() { rpccore.SetConsensusReactor(n.consensusReactor) rpccore.SetEventBus(n.eventBus) rpccore.SetLogger(n.Logger.With("module", "rpc")) + rpccore.SetConfig(*n.config.RPC) } func (n *Node) startRPC() ([]net.Listener, error) { @@ -675,8 +677,15 @@ func (n *Node) startRPC() ([]net.Listener, error) { for i, listenAddr := range listenAddrs { mux := http.NewServeMux() rpcLogger := n.Logger.With("module", "rpc-server") - wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.EventSubscriber(n.eventBus)) - wm.SetLogger(rpcLogger.With("protocol", "websocket")) + wmLogger := rpcLogger.With("protocol", "websocket") + wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, + rpcserver.OnDisconnect(func(remoteAddr string) { + err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr) + if err != nil && err != tmpubsub.ErrSubscriptionNotFound { + wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) + } + })) + wm.SetLogger(wmLogger) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger) diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index 7b00d6ea..b0a40fc2 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -129,3 +129,9 @@ func testTxEventsSent(t *testing.T, broadcastMethod string) { }) } } + +// Test HTTPClient resubscribes upon disconnect && subscription error. +// Test Local client resubscribes upon subscription error. +func TestClientsResubscribe(t *testing.T) { + // TODO(melekes) +} diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index ec63fb3b..4889b074 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -61,7 +61,7 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type defer cancel() // register for the next event of this type - sub, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp)) + eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String()) if err != nil { return nil, errors.Wrap(err, "failed to subscribe") } @@ -69,10 +69,8 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type defer c.UnsubscribeAll(ctx, subscriber) select { - case msg := <-sub.Out(): - return msg.Data().(types.TMEventData), nil - case <-sub.Cancelled(): - return nil, errors.New("subscription was cancelled") + case event := <-eventCh: + return event.Data.(types.TMEventData), nil case <-ctx.Done(): return nil, errors.New("timed out waiting for event") } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index a1dee991..e982292e 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -2,11 +2,14 @@ package client import ( "context" + "strings" "sync" + "time" "github.com/pkg/errors" amino "github.com/tendermint/go-amino" + cmn "github.com/tendermint/tendermint/libs/common" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -15,13 +18,18 @@ import ( ) /* -HTTP is a Client implementation that communicates -with a tendermint node over json rpc and websockets. +HTTP is a Client implementation that communicates with a tendermint node over +json rpc and websockets. -This is the main implementation you probably want to use in -production code. There are other implementations when calling -the tendermint node in-process (local), or when you want to mock -out the server for test code (mock). +This is the main implementation you probably want to use in production code. +There are other implementations when calling the tendermint node in-process +(Local), or when you want to mock out the server for test code (mock). + +You can subscribe for any event published by Tendermint using Subscribe method. +Note delivery is best-effort. If you don't read events fast enough or network +is slow, Tendermint might cancel the subscription. The client will attempt to +resubscribe (you don't need to do anything). It will keep trying every second +indefinitely until successful. */ type HTTP struct { remote string @@ -249,28 +257,6 @@ func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) { /** websocket event stuff here... **/ -type subscription struct { - out chan tmpubsub.Message - cancelled chan struct{} - - mtx sync.RWMutex - err error -} - -func (s *subscription) Out() <-chan tmpubsub.Message { - return s.out -} - -func (s *subscription) Cancelled() <-chan struct{} { - return s.cancelled -} - -func (s *subscription) Err() error { - s.mtx.RLock() - defer s.mtx.RUnlock() - return s.err -} - type WSEvents struct { cmn.BaseService cdc *amino.Codec @@ -279,8 +265,8 @@ type WSEvents struct { ws *rpcclient.WSClient mtx sync.RWMutex - // query -> subscription - subscriptions map[string]*subscription + // query -> chan + subscriptions map[string]chan ctypes.ResultEvent } func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { @@ -288,16 +274,18 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { cdc: cdc, endpoint: endpoint, remote: remote, - subscriptions: make(map[string]*subscription), + subscriptions: make(map[string]chan ctypes.ResultEvent), } wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents) return wsEvents } +// OnStart implements cmn.Service by starting WSClient and event loop. func (w *WSEvents) OnStart() error { w.ws = rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() { - w.redoSubscriptions() + // resubscribe immediately + w.redoSubscriptionsAfter(0 * time.Second) })) w.ws.SetCodec(w.cdc) @@ -310,75 +298,63 @@ func (w *WSEvents) OnStart() error { return nil } -// Stop wraps the BaseService/eventSwitch actions as Start does +// OnStop implements cmn.Service by stopping WSClient. func (w *WSEvents) OnStop() { - err := w.ws.Stop() - if err != nil { - w.Logger.Error("failed to stop WSClient", "err", err) - } + _ = w.ws.Stop() } -func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) { - q := query.String() +// Subscribe implements EventsClient by using WSClient to subscribe given +// subscriber to query. By default, returns a channel with cap=1. Error is +// returned if it fails to subscribe. +// Channel is never closed to prevent clients from seeing an erroneus event. +func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string, + outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { - err := w.ws.Subscribe(ctx, q) - if err != nil { + if err := w.ws.Subscribe(ctx, query); err != nil { return nil, err } outCap := 1 - if len(outCapacity) > 0 && outCapacity[0] >= 0 { + if len(outCapacity) > 0 { outCap = outCapacity[0] } + outc := make(chan ctypes.ResultEvent, outCap) w.mtx.Lock() // subscriber param is ignored because Tendermint will override it with // remote IP anyway. - w.subscriptions[q] = &subscription{ - out: make(chan tmpubsub.Message, outCap), - cancelled: make(chan struct{}), - } + w.subscriptions[query] = outc w.mtx.Unlock() - return w.subscriptions[q], nil + return outc, nil } -func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { - q := query.String() - - err := w.ws.Unsubscribe(ctx, q) - if err != nil { +// Unsubscribe implements EventsClient by using WSClient to unsubscribe given +// subscriber from query. +func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) error { + if err := w.ws.Unsubscribe(ctx, query); err != nil { return err } w.mtx.Lock() - sub, ok := w.subscriptions[q] + _, ok := w.subscriptions[query] if ok { - close(sub.cancelled) - sub.mtx.Lock() - sub.err = errors.New("unsubscribed") - sub.mtx.Unlock() - delete(w.subscriptions, q) + delete(w.subscriptions, query) } w.mtx.Unlock() return nil } +// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe +// given subscriber from all the queries. func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { - err := w.ws.UnsubscribeAll(ctx) - if err != nil { + if err := w.ws.UnsubscribeAll(ctx); err != nil { return err } w.mtx.Lock() - for _, sub := range w.subscriptions { - close(sub.cancelled) - sub.mtx.Lock() - sub.err = errors.New("unsubscribed") - sub.mtx.Unlock() - } - w.subscriptions = make(map[string]*subscription) + w.subscriptions = make(map[string]chan ctypes.ResultEvent) w.mtx.Unlock() return nil @@ -386,18 +362,21 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error // After being reconnected, it is necessary to redo subscription to server // otherwise no data will be automatically received. -func (w *WSEvents) redoSubscriptions() { +func (w *WSEvents) redoSubscriptionsAfter(d time.Duration) { + time.Sleep(d) + for q := range w.subscriptions { - // NOTE: no timeout for resubscribing - // FIXME: better logging/handling of errors?? - w.ws.Subscribe(context.Background(), q) + err := w.ws.Subscribe(context.Background(), q) + if err != nil { + w.Logger.Error("Failed to resubscribe", "err", err) + } } } -// eventListener is an infinite loop pulling all websocket events -// and pushing them to the EventSwitch. -// -// the goroutine only stops by closing quit +func isErrAlreadySubscribed(err error) bool { + return strings.Contains(err.Error(), tmpubsub.ErrAlreadySubscribed.Error()) +} + func (w *WSEvents) eventListener() { for { select { @@ -405,21 +384,39 @@ func (w *WSEvents) eventListener() { if !ok { return } + if resp.Error != nil { w.Logger.Error("WS error", "err", resp.Error.Error()) + // Error can be ErrAlreadySubscribed or max client (subscriptions per + // client) reached or Tendermint exited. + // We can ignore ErrAlreadySubscribed, but need to retry in other + // cases. + if !isErrAlreadySubscribed(resp.Error) { + // Resubscribe after 1 second to give Tendermint time to restart (if + // crashed). + w.redoSubscriptionsAfter(1 * time.Second) + } continue } + result := new(ctypes.ResultEvent) err := w.cdc.UnmarshalJSON(resp.Result, result) if err != nil { w.Logger.Error("failed to unmarshal response", "err", err) continue } - // NOTE: writing also happens inside mutex so we can't close a channel in - // Unsubscribe/UnsubscribeAll. + w.mtx.RLock() - if sub, ok := w.subscriptions[result.Query]; ok { - sub.out <- tmpubsub.NewMessage(result.Data, result.Tags) + if out, ok := w.subscriptions[result.Query]; ok { + if cap(out) == 0 { + out <- *result + } else { + select { + case out <- *result: + default: + w.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query) + } + } } w.mtx.RUnlock() case <-w.Quit(): diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 7477225e..605d84ba 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -21,6 +21,8 @@ implementation. */ import ( + "context" + cmn "github.com/tendermint/tendermint/libs/common" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" @@ -91,7 +93,18 @@ type NetworkClient interface { // EventsClient is reactive, you can subscribe to any message, given the proper // string. see tendermint/types/events.go type EventsClient interface { - types.EventBusSubscriber + // Subscribe subscribes given subscriber to query. Returns a channel with + // cap=1 onto which events are published. An error is returned if it fails to + // subscribe. outCapacity can be used optionally to set capacity for the + // channel. Channel is never closed to prevent accidental reads. + // + // ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe + // or UnsubscribeAll. + Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) + // Unsubscribe unsubscribes given subscriber from query. + Unsubscribe(ctx context.Context, subscriber, query string) error + // UnsubscribeAll unsubscribes given subscriber from all the queries. + UnsubscribeAll(ctx context.Context, subscriber string) error } // MempoolClient shows us data about current mempool state. diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index 33a1ce22..976c9892 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -2,12 +2,18 @@ package client import ( "context" + "time" + + "github.com/pkg/errors" cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + tmquery "github.com/tendermint/tendermint/libs/pubsub/query" nm "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" "github.com/tendermint/tendermint/types" ) @@ -24,9 +30,17 @@ are compiled in process. For real clients, you probably want to use client.HTTP. For more powerful control during testing, you probably want the "client/mock" package. + +You can subscribe for any event published by Tendermint using Subscribe method. +Note delivery is best-effort. If you don't read events fast enough, Tendermint +might cancel the subscription. The client will attempt to resubscribe (you +don't need to do anything). It will keep trying indefinitely with exponential +backoff (10ms -> 20ms -> 40ms) until successful. */ type Local struct { *types.EventBus + Logger log.Logger + ctx *rpctypes.Context } // NewLocal configures a client that calls the Node directly. @@ -39,113 +53,189 @@ func NewLocal(node *nm.Node) *Local { node.ConfigureRPC() return &Local{ EventBus: node.EventBus(), + Logger: log.NewNopLogger(), + ctx: &rpctypes.Context{}, } } var ( _ Client = (*Local)(nil) - _ NetworkClient = Local{} + _ NetworkClient = (*Local)(nil) _ EventsClient = (*Local)(nil) ) -func (Local) Status() (*ctypes.ResultStatus, error) { - return core.Status() +// SetLogger allows to set a logger on the client. +func (c *Local) SetLogger(l log.Logger) { + c.Logger = l } -func (Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) { - return core.ABCIInfo() +func (c *Local) Status() (*ctypes.ResultStatus, error) { + return core.Status(c.ctx) +} + +func (c *Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) { + return core.ABCIInfo(c.ctx) } func (c *Local) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQuery, error) { return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions) } -func (Local) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { - return core.ABCIQuery(path, data, opts.Height, opts.Prove) +func (c *Local) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { + return core.ABCIQuery(c.ctx, path, data, opts.Height, opts.Prove) } -func (Local) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { - return core.BroadcastTxCommit(tx) +func (c *Local) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { + return core.BroadcastTxCommit(c.ctx, tx) } -func (Local) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - return core.BroadcastTxAsync(tx) +func (c *Local) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + return core.BroadcastTxAsync(c.ctx, tx) } -func (Local) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - return core.BroadcastTxSync(tx) +func (c *Local) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { + return core.BroadcastTxSync(c.ctx, tx) } -func (Local) UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) { - return core.UnconfirmedTxs(limit) +func (c *Local) UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) { + return core.UnconfirmedTxs(c.ctx, limit) } -func (Local) NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { - return core.NumUnconfirmedTxs() +func (c *Local) NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { + return core.NumUnconfirmedTxs(c.ctx) } -func (Local) NetInfo() (*ctypes.ResultNetInfo, error) { - return core.NetInfo() +func (c *Local) NetInfo() (*ctypes.ResultNetInfo, error) { + return core.NetInfo(c.ctx) } -func (Local) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { - return core.DumpConsensusState() +func (c *Local) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { + return core.DumpConsensusState(c.ctx) } -func (Local) ConsensusState() (*ctypes.ResultConsensusState, error) { - return core.ConsensusState() +func (c *Local) ConsensusState() (*ctypes.ResultConsensusState, error) { + return core.ConsensusState(c.ctx) } -func (Local) Health() (*ctypes.ResultHealth, error) { - return core.Health() +func (c *Local) Health() (*ctypes.ResultHealth, error) { + return core.Health(c.ctx) } -func (Local) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { - return core.UnsafeDialSeeds(seeds) +func (c *Local) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { + return core.UnsafeDialSeeds(c.ctx, seeds) } -func (Local) DialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, error) { - return core.UnsafeDialPeers(peers, persistent) +func (c *Local) DialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, error) { + return core.UnsafeDialPeers(c.ctx, peers, persistent) } -func (Local) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { - return core.BlockchainInfo(minHeight, maxHeight) +func (c *Local) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { + return core.BlockchainInfo(c.ctx, minHeight, maxHeight) } -func (Local) Genesis() (*ctypes.ResultGenesis, error) { - return core.Genesis() +func (c *Local) Genesis() (*ctypes.ResultGenesis, error) { + return core.Genesis(c.ctx) } -func (Local) Block(height *int64) (*ctypes.ResultBlock, error) { - return core.Block(height) +func (c *Local) Block(height *int64) (*ctypes.ResultBlock, error) { + return core.Block(c.ctx, height) } -func (Local) BlockResults(height *int64) (*ctypes.ResultBlockResults, error) { - return core.BlockResults(height) +func (c *Local) BlockResults(height *int64) (*ctypes.ResultBlockResults, error) { + return core.BlockResults(c.ctx, height) } -func (Local) Commit(height *int64) (*ctypes.ResultCommit, error) { - return core.Commit(height) +func (c *Local) Commit(height *int64) (*ctypes.ResultCommit, error) { + return core.Commit(c.ctx, height) } -func (Local) Validators(height *int64) (*ctypes.ResultValidators, error) { - return core.Validators(height) +func (c *Local) Validators(height *int64) (*ctypes.ResultValidators, error) { + return core.Validators(c.ctx, height) } -func (Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { - return core.Tx(hash, prove) +func (c *Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { + return core.Tx(c.ctx, hash, prove) } -func (Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) { - return core.TxSearch(query, prove, page, perPage) +func (c *Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) { + return core.TxSearch(c.ctx, query, prove, page, perPage) } -func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) { - return c.EventBus.Subscribe(ctx, subscriber, query, outCapacity...) +func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { + q, err := tmquery.New(query) + if err != nil { + return nil, errors.Wrap(err, "failed to parse query") + } + sub, err := c.EventBus.Subscribe(ctx, subscriber, q) + if err != nil { + return nil, errors.Wrap(err, "failed to subscribe") + } + + outCap := 1 + if len(outCapacity) > 0 { + outCap = outCapacity[0] + } + + outc := make(chan ctypes.ResultEvent, outCap) + go c.eventsRoutine(sub, subscriber, q, outc) + + return outc, nil } -func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { - return c.EventBus.Unsubscribe(ctx, subscriber, query) +func (c *Local) eventsRoutine(sub types.Subscription, subscriber string, q tmpubsub.Query, outc chan<- ctypes.ResultEvent) { + for { + select { + case msg := <-sub.Out(): + result := ctypes.ResultEvent{Query: q.String(), Data: msg.Data(), Tags: msg.Tags()} + if cap(outc) == 0 { + outc <- result + } else { + select { + case outc <- result: + default: + c.Logger.Error("wanted to publish ResultEvent, but out channel is full", "result", result, "query", result.Query) + } + } + case <-sub.Cancelled(): + if sub.Err() == tmpubsub.ErrUnsubscribed { + return + } + + c.Logger.Error("subscription was cancelled, resubscribing...", "err", sub.Err(), "query", q.String()) + sub = c.resubscribe(subscriber, q) + if sub == nil { // client was stopped + return + } + case <-c.Quit(): + return + } + } +} + +// Try to resubscribe with exponential backoff. +func (c *Local) resubscribe(subscriber string, q tmpubsub.Query) types.Subscription { + attempts := 0 + for { + if !c.IsRunning() { + return nil + } + + sub, err := c.EventBus.Subscribe(context.Background(), subscriber, q) + if err == nil { + return sub + } + + attempts++ + time.Sleep((10 << uint(attempts)) * time.Millisecond) // 10ms -> 20ms -> 40ms + } +} + +func (c *Local) Unsubscribe(ctx context.Context, subscriber, query string) error { + q, err := tmquery.New(query) + if err != nil { + return errors.Wrap(err, "failed to parse query") + } + return c.EventBus.Unsubscribe(ctx, subscriber, q) } func (c *Local) UnsubscribeAll(ctx context.Context, subscriber string) error { diff --git a/rpc/client/mock/client.go b/rpc/client/mock/client.go index ef2d4f19..9c0eb75b 100644 --- a/rpc/client/mock/client.go +++ b/rpc/client/mock/client.go @@ -21,6 +21,7 @@ import ( "github.com/tendermint/tendermint/rpc/client" "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" "github.com/tendermint/tendermint/types" ) @@ -76,11 +77,11 @@ func (c Call) GetResponse(args interface{}) (interface{}, error) { } func (c Client) Status() (*ctypes.ResultStatus, error) { - return core.Status() + return core.Status(&rpctypes.Context{}) } func (c Client) ABCIInfo() (*ctypes.ResultABCIInfo, error) { - return core.ABCIInfo() + return core.ABCIInfo(&rpctypes.Context{}) } func (c Client) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQuery, error) { @@ -88,49 +89,49 @@ func (c Client) ABCIQuery(path string, data cmn.HexBytes) (*ctypes.ResultABCIQue } func (c Client) ABCIQueryWithOptions(path string, data cmn.HexBytes, opts client.ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { - return core.ABCIQuery(path, data, opts.Height, opts.Prove) + return core.ABCIQuery(&rpctypes.Context{}, path, data, opts.Height, opts.Prove) } func (c Client) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { - return core.BroadcastTxCommit(tx) + return core.BroadcastTxCommit(&rpctypes.Context{}, tx) } func (c Client) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - return core.BroadcastTxAsync(tx) + return core.BroadcastTxAsync(&rpctypes.Context{}, tx) } func (c Client) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - return core.BroadcastTxSync(tx) + return core.BroadcastTxSync(&rpctypes.Context{}, tx) } func (c Client) NetInfo() (*ctypes.ResultNetInfo, error) { - return core.NetInfo() + return core.NetInfo(&rpctypes.Context{}) } func (c Client) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { - return core.UnsafeDialSeeds(seeds) + return core.UnsafeDialSeeds(&rpctypes.Context{}, seeds) } func (c Client) DialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, error) { - return core.UnsafeDialPeers(peers, persistent) + return core.UnsafeDialPeers(&rpctypes.Context{}, peers, persistent) } func (c Client) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { - return core.BlockchainInfo(minHeight, maxHeight) + return core.BlockchainInfo(&rpctypes.Context{}, minHeight, maxHeight) } func (c Client) Genesis() (*ctypes.ResultGenesis, error) { - return core.Genesis() + return core.Genesis(&rpctypes.Context{}) } func (c Client) Block(height *int64) (*ctypes.ResultBlock, error) { - return core.Block(height) + return core.Block(&rpctypes.Context{}, height) } func (c Client) Commit(height *int64) (*ctypes.ResultCommit, error) { - return core.Commit(height) + return core.Commit(&rpctypes.Context{}, height) } func (c Client) Validators(height *int64) (*ctypes.ResultValidators, error) { - return core.Validators(height) + return core.Validators(&rpctypes.Context{}, height) } diff --git a/rpc/core/abci.go b/rpc/core/abci.go index aa6089b6..ce15ac14 100644 --- a/rpc/core/abci.go +++ b/rpc/core/abci.go @@ -5,6 +5,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/proxy" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" ) // Query the application for some information. @@ -52,7 +53,7 @@ import ( // | data | []byte | false | true | Data | // | height | int64 | 0 | false | Height (0 means latest) | // | prove | bool | false | false | Includes proof if true | -func ABCIQuery(path string, data cmn.HexBytes, height int64, prove bool) (*ctypes.ResultABCIQuery, error) { +func ABCIQuery(ctx *rpctypes.Context, path string, data cmn.HexBytes, height int64, prove bool) (*ctypes.ResultABCIQuery, error) { resQuery, err := proxyAppQuery.QuerySync(abci.RequestQuery{ Path: path, Data: data, @@ -96,7 +97,7 @@ func ABCIQuery(path string, data cmn.HexBytes, height int64, prove bool) (*ctype // "jsonrpc": "2.0" // } // ``` -func ABCIInfo() (*ctypes.ResultABCIInfo, error) { +func ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) { resInfo, err := proxyAppQuery.InfoSync(proxy.RequestInfo) if err != nil { return nil, err diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 906aea7b..40b6811d 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -5,6 +5,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -68,7 +69,7 @@ import ( // ``` // // -func BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { +func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { // maximum 20 block metas const limit int64 = 20 @@ -226,7 +227,7 @@ func filterMinMax(height, min, max, limit int64) (int64, int64, error) { // "jsonrpc": "2.0" // } // ``` -func Block(heightPtr *int64) (*ctypes.ResultBlock, error) { +func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) { storeHeight := blockStore.Height() height, err := getHeight(storeHeight, heightPtr) if err != nil { @@ -313,7 +314,7 @@ func Block(heightPtr *int64) (*ctypes.ResultBlock, error) { // "jsonrpc": "2.0" // } // ``` -func Commit(heightPtr *int64) (*ctypes.ResultCommit, error) { +func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) { storeHeight := blockStore.Height() height, err := getHeight(storeHeight, heightPtr) if err != nil { @@ -372,7 +373,7 @@ func Commit(heightPtr *int64) (*ctypes.ResultCommit, error) { // ] // } // ``` -func BlockResults(heightPtr *int64) (*ctypes.ResultBlockResults, error) { +func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) { storeHeight := blockStore.Height() height, err := getHeight(storeHeight, heightPtr) if err != nil { diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 81694b7e..b8a91f10 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -3,6 +3,7 @@ package core import ( cm "github.com/tendermint/tendermint/consensus" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -47,7 +48,7 @@ import ( // "jsonrpc": "2.0" // } // ``` -func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) { +func Validators(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultValidators, error) { // The latest validator that we know is the // NextValidator of the last block. height := consensusState.GetState().LastBlockHeight + 1 @@ -200,7 +201,7 @@ func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) { // } // } // ``` -func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { +func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) { // Get Peer consensus states. peers := p2pPeers.Peers().List() peerStates := make([]ctypes.PeerStateInfo, len(peers)) @@ -277,7 +278,7 @@ func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { // } //} //``` -func ConsensusState() (*ctypes.ResultConsensusState, error) { +func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) { // Get self round state. bz, err := consensusState.GetRoundStateSimpleJSON() return &ctypes.ResultConsensusState{RoundState: bz}, err @@ -320,7 +321,7 @@ func ConsensusState() (*ctypes.ResultConsensusState, error) { // } // } // ``` -func ConsensusParams(heightPtr *int64) (*ctypes.ResultConsensusParams, error) { +func ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultConsensusParams, error) { height := consensusState.GetState().LastBlockHeight + 1 height, err := getHeight(height, heightPtr) if err != nil { diff --git a/rpc/core/dev.go b/rpc/core/dev.go index 0b515476..71f284f8 100644 --- a/rpc/core/dev.go +++ b/rpc/core/dev.go @@ -5,16 +5,19 @@ import ( "runtime/pprof" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" ) -func UnsafeFlushMempool() (*ctypes.ResultUnsafeFlushMempool, error) { +// UnsafeFlushMempool removes all transactions from the mempool. +func UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) { mempool.Flush() return &ctypes.ResultUnsafeFlushMempool{}, nil } var profFile *os.File -func UnsafeStartCPUProfiler(filename string) (*ctypes.ResultUnsafeProfile, error) { +// UnsafeStartCPUProfiler starts a pprof profiler using the given filename. +func UnsafeStartCPUProfiler(ctx *rpctypes.Context, filename string) (*ctypes.ResultUnsafeProfile, error) { var err error profFile, err = os.Create(filename) if err != nil { @@ -27,7 +30,8 @@ func UnsafeStartCPUProfiler(filename string) (*ctypes.ResultUnsafeProfile, error return &ctypes.ResultUnsafeProfile{}, nil } -func UnsafeStopCPUProfiler() (*ctypes.ResultUnsafeProfile, error) { +// UnsafeStopCPUProfiler stops the running pprof profiler. +func UnsafeStopCPUProfiler(ctx *rpctypes.Context) (*ctypes.ResultUnsafeProfile, error) { pprof.StopCPUProfile() if err := profFile.Close(); err != nil { return nil, err @@ -35,7 +39,8 @@ func UnsafeStopCPUProfiler() (*ctypes.ResultUnsafeProfile, error) { return &ctypes.ResultUnsafeProfile{}, nil } -func UnsafeWriteHeapProfile(filename string) (*ctypes.ResultUnsafeProfile, error) { +// UnsafeWriteHeapProfile dumps a heap profile to the given filename. +func UnsafeWriteHeapProfile(ctx *rpctypes.Context, filename string) (*ctypes.ResultUnsafeProfile, error) { memProfFile, err := os.Create(filename) if err != nil { return nil, err diff --git a/rpc/core/events.go b/rpc/core/events.go index 22c7ea78..3ea33fa8 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -6,10 +6,10 @@ import ( "github.com/pkg/errors" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/lib/types" - tmtypes "github.com/tendermint/tendermint/types" ) // Subscribe for events via WebSocket. @@ -90,8 +90,15 @@ import ( // | query | string | "" | true | Query | // // -func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) { - addr := wsCtx.GetRemoteAddr() +func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { + addr := ctx.RemoteAddr() + + if eventBus.NumClients() >= config.MaxSubscriptionClients { + return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients) + } else if eventBus.NumClientSubscriptions(addr) >= config.MaxSubscriptionsPerClient { + return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient) + } + logger.Info("Subscribe to query", "remote", addr, "query", query) q, err := tmquery.New(query) @@ -99,9 +106,9 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri return nil, errors.Wrap(err, "failed to parse query") } - ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) + subCtx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() - sub, err := eventBusFor(wsCtx).Subscribe(ctx, addr, q) + sub, err := eventBus.Subscribe(subCtx, addr, q) if err != nil { return nil, err } @@ -111,18 +118,26 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri select { case msg := <-sub.Out(): resultEvent := &ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()} - wsCtx.TryWriteRPCResponse( + ctx.WSConn.TryWriteRPCResponse( rpctypes.NewRPCSuccessResponse( - wsCtx.Codec(), - rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)), + ctx.WSConn.Codec(), + rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)), resultEvent, )) case <-sub.Cancelled(): - wsCtx.TryWriteRPCResponse( - rpctypes.RPCServerError(rpctypes.JSONRPCStringID( - fmt.Sprintf("%v#event", wsCtx.Request.ID)), - fmt.Errorf("subscription was cancelled (reason: %v)", sub.Err()), - )) + if sub.Err() != tmpubsub.ErrUnsubscribed { + var reason string + if sub.Err() == nil { + reason = "Tendermint exited" + } else { + reason = sub.Err().Error() + } + ctx.WSConn.TryWriteRPCResponse( + rpctypes.RPCServerError(rpctypes.JSONRPCStringID( + fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + fmt.Errorf("subscription was cancelled (reason: %s)", reason), + )) + } return } } @@ -161,14 +176,14 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri // | query | string | "" | true | Query | // // -func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error) { - addr := wsCtx.GetRemoteAddr() +func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { + addr := ctx.RemoteAddr() logger.Info("Unsubscribe from query", "remote", addr, "query", query) q, err := tmquery.New(query) if err != nil { return nil, errors.Wrap(err, "failed to parse query") } - err = eventBusFor(wsCtx).Unsubscribe(context.Background(), addr, q) + err = eventBus.Unsubscribe(context.Background(), addr, q) if err != nil { return nil, err } @@ -199,20 +214,12 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsub // ``` // // -func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) { - addr := wsCtx.GetRemoteAddr() +func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { + addr := ctx.RemoteAddr() logger.Info("Unsubscribe from all", "remote", addr) - err := eventBusFor(wsCtx).UnsubscribeAll(context.Background(), addr) + err := eventBus.UnsubscribeAll(context.Background(), addr) if err != nil { return nil, err } return &ctypes.ResultUnsubscribe{}, nil } - -func eventBusFor(wsCtx rpctypes.WSRPCContext) tmtypes.EventBusSubscriber { - es := wsCtx.GetEventSubscriber() - if es == nil { - es = eventBus - } - return es -} diff --git a/rpc/core/health.go b/rpc/core/health.go index eeb8686b..41186a04 100644 --- a/rpc/core/health.go +++ b/rpc/core/health.go @@ -2,6 +2,7 @@ package core import ( ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" ) // Get node health. Returns empty result (200 OK) on success, no response - in @@ -31,6 +32,6 @@ import ( // "jsonrpc": "2.0" // } // ``` -func Health() (*ctypes.ResultHealth, error) { +func Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) { return &ctypes.ResultHealth{}, nil } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 42aa56af..6ebdbcfc 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -9,7 +9,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" - rpcserver "github.com/tendermint/tendermint/rpc/lib/server" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" "github.com/tendermint/tendermint/types" ) @@ -59,7 +59,7 @@ import ( // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | -func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := mempool.CheckTx(tx, nil) if err != nil { return nil, err @@ -108,7 +108,7 @@ func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | -func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.Response, 1) err := mempool.CheckTx(tx, func(res *abci.Response) { resCh <- res @@ -128,6 +128,11 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // Returns with the responses from CheckTx and DeliverTx. // +// IMPORTANT: use only for testing and development. In production, use +// BroadcastTxSync or BroadcastTxAsync. You can subscribe for the transaction +// result using JSONRPC via a websocket. See +// https://tendermint.com/docs/app-dev/subscribing-to-events-via-websocket.html +// // CONTRACT: only returns error if mempool.CheckTx() errs or if we timeout // waiting for tx to commit. // @@ -182,18 +187,26 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | -func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { +func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { + subscriber := ctx.RemoteAddr() + + if eventBus.NumClients() >= config.MaxSubscriptionClients { + return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients) + } else if eventBus.NumClientSubscriptions(subscriber) >= config.MaxSubscriptionsPerClient { + return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient) + } + // Subscribe to tx being committed in block. - ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) + subCtx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() q := types.EventQueryTxFor(tx) - deliverTxSub, err := eventBus.Subscribe(ctx, "mempool", q) + deliverTxSub, err := eventBus.Subscribe(subCtx, subscriber, q) if err != nil { err = errors.Wrap(err, "failed to subscribe to tx") logger.Error("Error on broadcast_tx_commit", "err", err) return nil, err } - defer eventBus.Unsubscribe(context.Background(), "mempool", q) + defer eventBus.Unsubscribe(context.Background(), subscriber, q) // Broadcast tx and wait for CheckTx result checkTxResCh := make(chan *abci.Response, 1) @@ -215,8 +228,6 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { } // Wait for the tx to be included in a block or timeout. - // TODO: configurable? - var deliverTxTimeout = rpcserver.WriteTimeout / 2 select { case msg := <-deliverTxSub.Out(): // The tx was included in a block. deliverTxRes := msg.Data().(types.EventDataTx) @@ -227,14 +238,20 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { Height: deliverTxRes.Height, }, nil case <-deliverTxSub.Cancelled(): - err = errors.New("deliverTxSub was cancelled. Did the Tendermint stop?") + var reason string + if deliverTxSub.Err() == nil { + reason = "Tendermint exited" + } else { + reason = deliverTxSub.Err().Error() + } + err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason) logger.Error("Error on broadcastTxCommit", "err", err) return &ctypes.ResultBroadcastTxCommit{ CheckTx: *checkTxRes, DeliverTx: abci.ResponseDeliverTx{}, Hash: tx.Hash(), }, err - case <-time.After(deliverTxTimeout): + case <-time.After(config.TimeoutBroadcastTxCommit): err = errors.New("Timed out waiting for tx to be included in a block") logger.Error("Error on broadcastTxCommit", "err", err) return &ctypes.ResultBroadcastTxCommit{ @@ -281,7 +298,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+--------------------------------------| // | limit | int | 30 | false | Maximum number of entries (max: 100) | -func UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) { +// ``` +func UnconfirmedTxs(ctx *rpctypes.Context, limit int) (*ctypes.ResultUnconfirmedTxs, error) { // reuse per_page validator limit = validatePerPage(limit) @@ -323,7 +341,7 @@ func UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) { // } // } // ``` -func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { +func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) { return &ctypes.ResultUnconfirmedTxs{ Count: mempool.Size(), Total: mempool.Size(), diff --git a/rpc/core/net.go b/rpc/core/net.go index e920ea7c..23bc40e8 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -7,6 +7,7 @@ import ( "github.com/tendermint/tendermint/p2p" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" ) // Get network info. @@ -153,7 +154,7 @@ import ( // ... // } // ``` -func NetInfo() (*ctypes.ResultNetInfo, error) { +func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { out, in, _ := p2pPeers.NumPeers() peers := make([]ctypes.Peer, 0, out+in) for _, peer := range p2pPeers.Peers().List() { @@ -179,7 +180,7 @@ func NetInfo() (*ctypes.ResultNetInfo, error) { }, nil } -func UnsafeDialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { +func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialSeeds, error) { if len(seeds) == 0 { return &ctypes.ResultDialSeeds{}, errors.New("No seeds provided") } @@ -192,7 +193,7 @@ func UnsafeDialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { return &ctypes.ResultDialSeeds{Log: "Dialing seeds in progress. See /net_info for details"}, nil } -func UnsafeDialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, error) { +func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent bool) (*ctypes.ResultDialPeers, error) { if len(peers) == 0 { return &ctypes.ResultDialPeers{}, errors.New("No peers provided") } @@ -247,6 +248,6 @@ func UnsafeDialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, // "jsonrpc": "2.0" // } // ``` -func Genesis() (*ctypes.ResultGenesis, error) { +func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) { return &ctypes.ResultGenesis{Genesis: genDoc}, nil } diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 23649544..0b760344 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -1,6 +1,7 @@ package core import ( + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" dbm "github.com/tendermint/tendermint/libs/db" @@ -71,6 +72,8 @@ var ( mempool *mempl.Mempool logger log.Logger + + config cfg.RPCConfig ) func SetStateDB(db dbm.DB) { @@ -133,6 +136,11 @@ func SetEventBus(b *types.EventBus) { eventBus = b } +// SetConfig sets an RPCConfig. +func SetConfig(c cfg.RPCConfig) { + config = c +} + func validatePage(page, perPage, totalCount int) int { if perPage < 1 { return 1 diff --git a/rpc/core/status.go b/rpc/core/status.go index ae22ecd3..aab86466 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -7,6 +7,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/p2p" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -70,7 +71,7 @@ import ( // } // } // ``` -func Status() (*ctypes.ResultStatus, error) { +func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { var latestHeight int64 if consensusReactor.FastSync() { latestHeight = blockStore.Height() diff --git a/rpc/core/tx.go b/rpc/core/tx.go index aa439218..575553f8 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -7,6 +7,7 @@ import ( tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" ) @@ -77,7 +78,7 @@ import ( // - `index`: `int` - index of the transaction // - `height`: `int` - height of the block where this transaction was in // - `hash`: `[]byte` - hash of the transaction -func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { +func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { // if index is disabled, return error if _, ok := txIndexer.(*null.TxIndex); ok { @@ -183,7 +184,7 @@ func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { // - `index`: `int` - index of the transaction // - `height`: `int` - height of the block where this transaction was in // - `hash`: `[]byte` - hash of the transaction -func TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) { +func TxSearch(ctx *rpctypes.Context, query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) { // if index is disabled, return error if _, ok := txIndexer.(*null.TxIndex); ok { return nil, fmt.Errorf("Transaction indexing is disabled") diff --git a/rpc/grpc/api.go b/rpc/grpc/api.go index 0b840e3e..741d63af 100644 --- a/rpc/grpc/api.go +++ b/rpc/grpc/api.go @@ -5,6 +5,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" core "github.com/tendermint/tendermint/rpc/core" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" ) type broadcastAPI struct { @@ -16,12 +17,14 @@ func (bapi *broadcastAPI) Ping(ctx context.Context, req *RequestPing) (*Response } func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) { - res, err := core.BroadcastTxCommit(req.Tx) + // NOTE: there's no way to get client's remote address + // see https://stackoverflow.com/questions/33684570/session-and-remote-ip-address-in-grpc-go + res, err := core.BroadcastTxCommit(&rpctypes.Context{}, req.Tx) if err != nil { return nil, err } - return &ResponseBroadcastTx{ + return &ResponseBroadcastTx{ CheckTx: &abci.ResponseCheckTx{ Code: res.CheckTx.Code, Data: res.CheckTx.Data, diff --git a/rpc/grpc/client_server.go b/rpc/grpc/client_server.go index 2bc89864..922016dd 100644 --- a/rpc/grpc/client_server.go +++ b/rpc/grpc/client_server.go @@ -14,7 +14,8 @@ type Config struct { MaxOpenConnections int } -// StartGRPCServer starts a new gRPC BroadcastAPIServer using the given net.Listener. +// StartGRPCServer starts a new gRPC BroadcastAPIServer using the given +// net.Listener. // NOTE: This function blocks - you may want to call it in a go-routine. func StartGRPCServer(ln net.Listener) error { grpcServer := grpc.NewServer() diff --git a/rpc/grpc/grpc_test.go b/rpc/grpc/grpc_test.go index b82e5222..1e1f2540 100644 --- a/rpc/grpc/grpc_test.go +++ b/rpc/grpc/grpc_test.go @@ -25,9 +25,8 @@ func TestMain(m *testing.M) { } func TestBroadcastTx(t *testing.T) { - require := require.New(t) res, err := rpctest.GetGRPCClient().BroadcastTx(context.Background(), &core_grpc.RequestBroadcastTx{Tx: []byte("this is a tx")}) - require.Nil(err, "%+v", err) - require.EqualValues(0, res.CheckTx.Code) - require.EqualValues(0, res.DeliverTx.Code) + require.NoError(t, err) + require.EqualValues(t, 0, res.CheckTx.Code) + require.EqualValues(t, 0, res.DeliverTx.Code) } diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index 794ab462..68c134a7 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -63,23 +63,23 @@ var Routes = map[string]*server.RPCFunc{ // Amino codec required to encode/decode everything above. var RoutesCdc = amino.NewCodec() -func EchoResult(v string) (*ResultEcho, error) { +func EchoResult(ctx *types.Context, v string) (*ResultEcho, error) { return &ResultEcho{v}, nil } -func EchoWSResult(wsCtx types.WSRPCContext, v string) (*ResultEcho, error) { +func EchoWSResult(ctx *types.Context, v string) (*ResultEcho, error) { return &ResultEcho{v}, nil } -func EchoIntResult(v int) (*ResultEchoInt, error) { +func EchoIntResult(ctx *types.Context, v int) (*ResultEchoInt, error) { return &ResultEchoInt{v}, nil } -func EchoBytesResult(v []byte) (*ResultEchoBytes, error) { +func EchoBytesResult(ctx *types.Context, v []byte) (*ResultEchoBytes, error) { return &ResultEchoBytes{v}, nil } -func EchoDataBytesResult(v cmn.HexBytes) (*ResultEchoDataBytes, error) { +func EchoDataBytesResult(ctx *types.Context, v cmn.HexBytes) (*ResultEchoDataBytes, error) { return &ResultEchoDataBytes{v}, nil } diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 80eb4308..36ea47da 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -2,7 +2,6 @@ package rpcserver import ( "bytes" - "context" "encoding/hex" "encoding/json" "fmt" @@ -129,20 +128,26 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(request.ID, errors.Errorf("Path %s is invalid", r.URL.Path))) return } + rpcFunc := funcMap[request.Method] if rpcFunc == nil || rpcFunc.ws { WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(request.ID)) return } - var args []reflect.Value + + ctx := &types.Context{JSONReq: &request, HTTPReq: r} + args := []reflect.Value{reflect.ValueOf(ctx)} if len(request.Params) > 0 { - args, err = jsonParamsToArgsRPC(rpcFunc, cdc, request.Params) + fnArgs, err := jsonParamsToArgs(rpcFunc, cdc, request.Params) if err != nil { WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "Error converting json params to arguments"))) return } + args = append(args, fnArgs...) } + returns := rpcFunc.f.Call(args) + logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) result, err := unreflectResult(returns) if err != nil { @@ -205,13 +210,14 @@ func arrayParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, params []json.RawMess return values, nil } -// `raw` is unparsed json (from json.RawMessage) encoding either a map or an array. -// `argsOffset` should be 0 for RPC calls, and 1 for WS requests, where len(rpcFunc.args) != len(rpcFunc.argNames). +// raw is unparsed json (from json.RawMessage) encoding either a map or an +// array. // // Example: -// rpcFunc.args = [rpctypes.WSRPCContext string] +// rpcFunc.args = [rpctypes.Context string] // rpcFunc.argNames = ["arg"] -func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte, argsOffset int) ([]reflect.Value, error) { +func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte) ([]reflect.Value, error) { + const argsOffset = 1 // TODO: Make more efficient, perhaps by checking the first character for '{' or '['? // First, try to get the map. @@ -232,20 +238,6 @@ func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte, argsOffset return nil, errors.Errorf("Unknown type for JSON params: %v. Expected map or array", err) } -// Convert a []interface{} OR a map[string]interface{} to properly typed values -func jsonParamsToArgsRPC(rpcFunc *RPCFunc, cdc *amino.Codec, params json.RawMessage) ([]reflect.Value, error) { - return jsonParamsToArgs(rpcFunc, cdc, params, 0) -} - -// Same as above, but with the first param the websocket connection -func jsonParamsToArgsWS(rpcFunc *RPCFunc, cdc *amino.Codec, params json.RawMessage, wsCtx types.WSRPCContext) ([]reflect.Value, error) { - values, err := jsonParamsToArgs(rpcFunc, cdc, params, 1) - if err != nil { - return nil, err - } - return append([]reflect.Value{reflect.ValueOf(wsCtx)}, values...), nil -} - // rpc.json //----------------------------------------------------------------------------- // rpc.http @@ -258,15 +250,23 @@ func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(types.JSONRPCStringID(""))) } } + // All other endpoints return func(w http.ResponseWriter, r *http.Request) { logger.Debug("HTTP HANDLER", "req", r) - args, err := httpParamsToArgs(rpcFunc, cdc, r) + + ctx := &types.Context{HTTPReq: r} + args := []reflect.Value{reflect.ValueOf(ctx)} + + fnArgs, err := httpParamsToArgs(rpcFunc, cdc, r) if err != nil { WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(types.JSONRPCStringID(""), errors.Wrap(err, "Error converting http params to arguments"))) return } + args = append(args, fnArgs...) + returns := rpcFunc.f.Call(args) + logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) result, err := unreflectResult(returns) if err != nil { @@ -280,10 +280,13 @@ func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func // Covert an http query to a list of properly typed values. // To be properly decoded the arg must be a concrete type from tendermint (if its an interface). func httpParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, r *http.Request) ([]reflect.Value, error) { - values := make([]reflect.Value, len(rpcFunc.args)) + // skip types.Context + const argsOffset = 1 + + values := make([]reflect.Value, len(rpcFunc.argNames)) for i, name := range rpcFunc.argNames { - argType := rpcFunc.args[i] + argType := rpcFunc.args[i+argsOffset] values[i] = reflect.Zero(argType) // set default for that type @@ -434,8 +437,8 @@ type wsConnection struct { // Send pings to server with this period. Must be less than readWait, but greater than zero. pingPeriod time.Duration - // object that is used to subscribe / unsubscribe from events - eventSub types.EventSubscriber + // callback which is called upon disconnect + onDisconnect func(remoteAddr string) } // NewWSConnection wraps websocket.Conn. @@ -468,12 +471,11 @@ func NewWSConnection( return wsc } -// EventSubscriber sets object that is used to subscribe / unsubscribe from -// events - not Goroutine-safe. If none given, default node's eventBus will be -// used. -func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) { +// OnDisconnect sets a callback which is used upon disconnect - not +// Goroutine-safe. Nop by default. +func OnDisconnect(onDisconnect func(remoteAddr string)) func(*wsConnection) { return func(wsc *wsConnection) { - wsc.eventSub = eventSub + wsc.onDisconnect = onDisconnect } } @@ -527,8 +529,8 @@ func (wsc *wsConnection) OnStop() { // Both read and write loops close the websocket connection when they exit their loops. // The writeChan is never closed, to allow WriteRPCResponse() to fail. - if wsc.eventSub != nil { - wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr) + if wsc.onDisconnect != nil { + wsc.onDisconnect(wsc.remoteAddr) } } @@ -538,11 +540,6 @@ func (wsc *wsConnection) GetRemoteAddr() string { return wsc.remoteAddr } -// GetEventSubscriber implements WSRPCConnection by returning event subscriber. -func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber { - return wsc.eventSub -} - // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. // It implements WSRPCConnection. It is Goroutine-safe. func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { @@ -628,27 +625,23 @@ func (wsc *wsConnection) readRoutine() { } // Now, fetch the RPCFunc and execute it. - rpcFunc := wsc.funcMap[request.Method] if rpcFunc == nil { wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID)) continue } - var args []reflect.Value - if rpcFunc.ws { - wsCtx := types.WSRPCContext{Request: request, WSRPCConnection: wsc} - if len(request.Params) > 0 { - args, err = jsonParamsToArgsWS(rpcFunc, wsc.cdc, request.Params, wsCtx) - } - } else { - if len(request.Params) > 0 { - args, err = jsonParamsToArgsRPC(rpcFunc, wsc.cdc, request.Params) + + ctx := &types.Context{JSONReq: &request, WSConn: wsc} + args := []reflect.Value{reflect.ValueOf(ctx)} + if len(request.Params) > 0 { + fnArgs, err := jsonParamsToArgs(rpcFunc, wsc.cdc, request.Params) + if err != nil { + wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "Error converting json params to arguments"))) + continue } + args = append(args, fnArgs...) } - if err != nil { - wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "Error converting json params to arguments"))) - continue - } + returns := rpcFunc.f.Call(args) // TODO: Need to encode args/returns to string if we want to log them diff --git a/rpc/lib/server/handlers_test.go b/rpc/lib/server/handlers_test.go index b1d3c788..f8ad0610 100644 --- a/rpc/lib/server/handlers_test.go +++ b/rpc/lib/server/handlers_test.go @@ -28,7 +28,7 @@ import ( func testMux() *http.ServeMux { funcMap := map[string]*rs.RPCFunc{ - "c": rs.NewRPCFunc(func(s string, i int) (string, error) { return "foo", nil }, "s,i"), + "c": rs.NewRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), } cdc := amino.NewCodec() mux := http.NewServeMux() @@ -195,7 +195,7 @@ func TestWebsocketManagerHandler(t *testing.T) { func newWSServer() *httptest.Server { funcMap := map[string]*rs.RPCFunc{ - "c": rs.NewWSRPCFunc(func(wsCtx types.WSRPCContext, s string, i int) (string, error) { return "foo", nil }, "s,i"), + "c": rs.NewWSRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), } wm := rs.NewWebsocketManager(funcMap, amino.NewCodec()) wm.SetLogger(log.TestingLogger()) diff --git a/rpc/lib/server/parse_test.go b/rpc/lib/server/parse_test.go index 7b0aacdb..9196bb71 100644 --- a/rpc/lib/server/parse_test.go +++ b/rpc/lib/server/parse_test.go @@ -10,24 +10,23 @@ import ( "github.com/stretchr/testify/assert" amino "github.com/tendermint/go-amino" cmn "github.com/tendermint/tendermint/libs/common" + types "github.com/tendermint/tendermint/rpc/lib/types" ) func TestParseJSONMap(t *testing.T) { - assert := assert.New(t) - input := []byte(`{"value":"1234","height":22}`) // naive is float,string var p1 map[string]interface{} err := json.Unmarshal(input, &p1) - if assert.Nil(err) { + if assert.Nil(t, err) { h, ok := p1["height"].(float64) - if assert.True(ok, "%#v", p1["height"]) { - assert.EqualValues(22, h) + if assert.True(t, ok, "%#v", p1["height"]) { + assert.EqualValues(t, 22, h) } v, ok := p1["value"].(string) - if assert.True(ok, "%#v", p1["value"]) { - assert.EqualValues("1234", v) + if assert.True(t, ok, "%#v", p1["value"]) { + assert.EqualValues(t, "1234", v) } } @@ -38,14 +37,14 @@ func TestParseJSONMap(t *testing.T) { "height": &tmp, } err = json.Unmarshal(input, &p2) - if assert.Nil(err) { + if assert.Nil(t, err) { h, ok := p2["height"].(float64) - if assert.True(ok, "%#v", p2["height"]) { - assert.EqualValues(22, h) + if assert.True(t, ok, "%#v", p2["height"]) { + assert.EqualValues(t, 22, h) } v, ok := p2["value"].(string) - if assert.True(ok, "%#v", p2["value"]) { - assert.EqualValues("1234", v) + if assert.True(t, ok, "%#v", p2["value"]) { + assert.EqualValues(t, "1234", v) } } @@ -60,14 +59,14 @@ func TestParseJSONMap(t *testing.T) { Value: &cmn.HexBytes{}, } err = json.Unmarshal(input, &p3) - if assert.Nil(err) { + if assert.Nil(t, err) { h, ok := p3.Height.(*int) - if assert.True(ok, "%#v", p3.Height) { - assert.Equal(22, *h) + if assert.True(t, ok, "%#v", p3.Height) { + assert.Equal(t, 22, *h) } v, ok := p3.Value.(*cmn.HexBytes) - if assert.True(ok, "%#v", p3.Value) { - assert.EqualValues([]byte{0x12, 0x34}, *v) + if assert.True(t, ok, "%#v", p3.Value) { + assert.EqualValues(t, []byte{0x12, 0x34}, *v) } } @@ -77,46 +76,44 @@ func TestParseJSONMap(t *testing.T) { Height int `json:"height"` }{} err = json.Unmarshal(input, &p4) - if assert.Nil(err) { - assert.EqualValues(22, p4.Height) - assert.EqualValues([]byte{0x12, 0x34}, p4.Value) + if assert.Nil(t, err) { + assert.EqualValues(t, 22, p4.Height) + assert.EqualValues(t, []byte{0x12, 0x34}, p4.Value) } // so, let's use this trick... // dynamic keys on map, and we can deserialize to the desired types var p5 map[string]*json.RawMessage err = json.Unmarshal(input, &p5) - if assert.Nil(err) { + if assert.Nil(t, err) { var h int err = json.Unmarshal(*p5["height"], &h) - if assert.Nil(err) { - assert.Equal(22, h) + if assert.Nil(t, err) { + assert.Equal(t, 22, h) } var v cmn.HexBytes err = json.Unmarshal(*p5["value"], &v) - if assert.Nil(err) { - assert.Equal(cmn.HexBytes{0x12, 0x34}, v) + if assert.Nil(t, err) { + assert.Equal(t, cmn.HexBytes{0x12, 0x34}, v) } } } func TestParseJSONArray(t *testing.T) { - assert := assert.New(t) - input := []byte(`["1234",22]`) // naive is float,string var p1 []interface{} err := json.Unmarshal(input, &p1) - if assert.Nil(err) { + if assert.Nil(t, err) { v, ok := p1[0].(string) - if assert.True(ok, "%#v", p1[0]) { - assert.EqualValues("1234", v) + if assert.True(t, ok, "%#v", p1[0]) { + assert.EqualValues(t, "1234", v) } h, ok := p1[1].(float64) - if assert.True(ok, "%#v", p1[1]) { - assert.EqualValues(22, h) + if assert.True(t, ok, "%#v", p1[1]) { + assert.EqualValues(t, 22, h) } } @@ -124,22 +121,20 @@ func TestParseJSONArray(t *testing.T) { tmp := 0 p2 := []interface{}{&cmn.HexBytes{}, &tmp} err = json.Unmarshal(input, &p2) - if assert.Nil(err) { + if assert.Nil(t, err) { v, ok := p2[0].(*cmn.HexBytes) - if assert.True(ok, "%#v", p2[0]) { - assert.EqualValues([]byte{0x12, 0x34}, *v) + if assert.True(t, ok, "%#v", p2[0]) { + assert.EqualValues(t, []byte{0x12, 0x34}, *v) } h, ok := p2[1].(*int) - if assert.True(ok, "%#v", p2[1]) { - assert.EqualValues(22, *h) + if assert.True(t, ok, "%#v", p2[1]) { + assert.EqualValues(t, 22, *h) } } } func TestParseJSONRPC(t *testing.T) { - assert := assert.New(t) - - demo := func(height int, name string) {} + demo := func(ctx *types.Context, height int, name string) {} call := NewRPCFunc(demo, "height,name") cdc := amino.NewCodec() @@ -162,14 +157,14 @@ func TestParseJSONRPC(t *testing.T) { for idx, tc := range cases { i := strconv.Itoa(idx) data := []byte(tc.raw) - vals, err := jsonParamsToArgs(call, cdc, data, 0) + vals, err := jsonParamsToArgs(call, cdc, data) if tc.fail { - assert.NotNil(err, i) + assert.NotNil(t, err, i) } else { - assert.Nil(err, "%s: %+v", i, err) - if assert.Equal(2, len(vals), i) { - assert.Equal(tc.height, vals[0].Int(), i) - assert.Equal(tc.name, vals[1].String(), i) + assert.Nil(t, err, "%s: %+v", i, err) + if assert.Equal(t, 2, len(vals), i) { + assert.Equal(t, tc.height, vals[0].Int(), i) + assert.Equal(t, tc.name, vals[1].String(), i) } } @@ -177,8 +172,7 @@ func TestParseJSONRPC(t *testing.T) { } func TestParseURI(t *testing.T) { - - demo := func(height int, name string) {} + demo := func(ctx *types.Context, height int, name string) {} call := NewRPCFunc(demo, "height,name") cdc := amino.NewCodec() diff --git a/rpc/lib/test/main.go b/rpc/lib/test/main.go index b2f94580..3afc1ac1 100644 --- a/rpc/lib/test/main.go +++ b/rpc/lib/test/main.go @@ -6,16 +6,18 @@ import ( "os" amino "github.com/tendermint/go-amino" + cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" rpcserver "github.com/tendermint/tendermint/rpc/lib/server" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" ) var routes = map[string]*rpcserver.RPCFunc{ "hello_world": rpcserver.NewRPCFunc(HelloWorld, "name,num"), } -func HelloWorld(name string, num int) (Result, error) { +func HelloWorld(ctx *rpctypes.Context, name string, num int) (Result, error) { return Result{fmt.Sprintf("hi %s %d", name, num)}, nil } diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index d4e82b10..21623e41 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -1,18 +1,15 @@ package rpctypes import ( - "context" "encoding/json" "fmt" + "net/http" "reflect" "strings" "github.com/pkg/errors" amino "github.com/tendermint/go-amino" - - tmpubsub "github.com/tendermint/tendermint/libs/pubsub" - tmtypes "github.com/tendermint/tendermint/types" ) // a wrapper to emulate a sum type: jsonrpcid = string | int @@ -236,30 +233,47 @@ func RPCServerError(id jsonrpcid, err error) RPCResponse { //---------------------------------------- -// *wsConnection implements this interface. +// WSRPCConnection represents a websocket connection. type WSRPCConnection interface { + // GetRemoteAddr returns a remote address of the connection. GetRemoteAddr() string + // WriteRPCResponse writes the resp onto connection (BLOCKING). WriteRPCResponse(resp RPCResponse) + // TryWriteRPCResponse tries to write the resp onto connection (NON-BLOCKING). TryWriteRPCResponse(resp RPCResponse) bool - GetEventSubscriber() EventSubscriber + // Codec returns an Amino codec used. Codec() *amino.Codec } -// websocket-only RPCFuncs take this as the first parameter. -type WSRPCContext struct { - Request RPCRequest - WSRPCConnection +// Context is the first parameter for all functions. It carries a json-rpc +// request, http request and websocket connection. +// +// - JSONReq is non-nil when JSONRPC is called over websocket or HTTP. +// - WSConn is non-nil when we're connected via a websocket. +// - HTTPReq is non-nil when URI or JSONRPC is called over HTTP. +type Context struct { + // json-rpc request + JSONReq *RPCRequest + // websocket connection + WSConn WSRPCConnection + // http request + HTTPReq *http.Request } -// EventSubscriber mirrors tendermint/tendermint/types.EventBusSubscriber -type EventSubscriber interface { - Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (tmtypes.Subscription, error) - Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error - UnsubscribeAll(ctx context.Context, subscriber string) error +// RemoteAddr returns either HTTPReq#RemoteAddr or result of the +// WSConn#GetRemoteAddr(). +func (ctx *Context) RemoteAddr() string { + if ctx.HTTPReq != nil { + return ctx.HTTPReq.RemoteAddr + } else if ctx.WSConn != nil { + return ctx.WSConn.GetRemoteAddr() + } + return "" } //---------------------------------------- // SOCKETS + // // Determine if its a unix or tcp socket. // If tcp, must specify the port; `0.0.0.0` will return incorrectly as "unix" since there's no port diff --git a/types/event_bus.go b/types/event_bus.go index 2aa84a4a..da959090 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -15,6 +15,9 @@ type EventBusSubscriber interface { Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error UnsubscribeAll(ctx context.Context, subscriber string) error + + NumClients() int + NumClientSubscriptions(clientID string) int } type Subscription interface { @@ -58,6 +61,14 @@ func (b *EventBus) OnStop() { b.pubsub.Stop() } +func (b *EventBus) NumClients() int { + return b.pubsub.NumClients() +} + +func (b *EventBus) NumClientSubscriptions(clientID string) int { + return b.pubsub.NumClientSubscriptions(clientID) +} + func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) { return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...) }