diff --git a/CHANGELOG.md b/CHANGELOG.md index 8968a7a7..f0ba675a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,8 +11,8 @@ This release is primarily about the new pubsub implementation, dubbed `pubsub 2. like configurable limits on the number of active RPC subscriptions at a time (`max_subscription_clients`). Pubsub 2.0 is an improved version of the older pubsub that is non-blocking and has a nicer API. Note the improved pubsub API also resulted in some improvements to the HTTPClient interface and the API for WebSocket subscriptions. -This release also adds a configurable limit to the mempool size, `max_txs_bytes`, with -default 1GB, and includes many smaller improvements and bug-fixes. +This release also adds a configurable limit to the mempool size (`max_txs_bytes`, default 1GB) +and a configurable timeout for the `/broadcast_tx_commit` endpoint. See the [v0.31.0 Milestone](https://github.com/tendermint/tendermint/milestone/19?closed=1) for @@ -30,6 +30,7 @@ program](https://hackerone.com/tendermint). the subscription. - [rpc] [\#3269](https://github.com/tendermint/tendermint/issues/2826) Limit number of unique clientIDs with open subscriptions. Configurable via `rpc.max_subscription_clients` - [rpc] [\#3269](https://github.com/tendermint/tendermint/issues/2826) Limit number of unique queries a given client can subscribe to at once. Configurable via `rpc.max_subscriptions_per_client`. + - [rpc] [\#3435](https://github.com/tendermint/tendermint/issues/3435) Default ReadTimeout and WriteTimeout changed to 10s. WriteTimeout can increased by setting `rpc.timeout_broadcast_tx_commit` in the config. - [rpc/client] [\#3269](https://github.com/tendermint/tendermint/issues/3269) Update `EventsClient` interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md). This includes `Subscribe`, `Unsubscribe`, and `UnsubscribeAll` methods. * Apps @@ -43,6 +44,7 @@ program](https://hackerone.com/tendermint). * TrapSignal should not be responsible for blocking thread of execution - [libs/db] [\#3397](https://github.com/tendermint/tendermint/pull/3397) Add possibility to `Close()` `Batch` to prevent memory leak when using ClevelDB. (@Stumble) - [types] [\#3354](https://github.com/tendermint/tendermint/issues/3354) Remove RoundState from EventDataRoundState + - [rpc] [\#3435](https://github.com/tendermint/tendermint/issues/3435) `StartHTTPServer` / `StartHTTPAndTLSServer` now require a Config (use `rpcserver.DefaultConfig`) * Blockchain Protocol diff --git a/config/config.go b/config/config.go index 540012a5..8342921a 100644 --- a/config/config.go +++ b/config/config.go @@ -7,7 +7,6 @@ import ( "time" "github.com/pkg/errors" - rpcserver "github.com/tendermint/tendermint/rpc/lib/server" ) const ( @@ -336,6 +335,9 @@ type RPCConfig struct { MaxSubscriptionsPerClient int `mapstructure:"max_subscriptions_per_client"` // How long to wait for a tx to be committed during /broadcast_tx_commit + // WARNING: Using a value larger than 10s will result in increasing the + // global HTTP write timeout, which applies to all connections and endpoints. + // See https://github.com/tendermint/tendermint/issues/3435 TimeoutBroadcastTxCommit time.Duration `mapstructure:"timeout_broadcast_tx_commit"` } @@ -385,9 +387,6 @@ func (cfg *RPCConfig) ValidateBasic() error { if cfg.TimeoutBroadcastTxCommit < 0 { return errors.New("timeout_broadcast_tx_commit can't be negative") } - if cfg.TimeoutBroadcastTxCommit > rpcserver.WriteTimeout { - return fmt.Errorf("timeout_broadcast_tx_commit can't be greater than rpc server's write timeout: %v", rpcserver.WriteTimeout) - } return nil } diff --git a/config/toml.go b/config/toml.go index 9ce7e76c..a0b651d9 100644 --- a/config/toml.go +++ b/config/toml.go @@ -176,6 +176,9 @@ max_subscription_clients = {{ .RPC.MaxSubscriptionClients }} max_subscriptions_per_client = {{ .RPC.MaxSubscriptionsPerClient }} # How long to wait for a tx to be committed during /broadcast_tx_commit. +# WARNING: Using a value larger than 10s will result in increasing the +# global HTTP write timeout, which applies to all connections and endpoints. +# See https://github.com/tendermint/tendermint/issues/3435 timeout_broadcast_tx_commit = "{{ .RPC.TimeoutBroadcastTxCommit }}" ##### peer to peer configuration options ##### diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index f1ac753a..aa275c7a 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -122,6 +122,9 @@ max_subscription_clients = 100 max_subscriptions_per_client = 5 # How long to wait for a tx to be committed during /broadcast_tx_commit. +# WARNING: Using a value larger than 10s will result in increasing the +# global HTTP write timeout, which applies to all connections and endpoints. +# See https://github.com/tendermint/tendermint/issues/3435 timeout_broadcast_tx_commit = "10s" ##### peer to peer configuration options ##### diff --git a/lite/proxy/proxy.go b/lite/proxy/proxy.go index 020e5753..d3c16d4a 100644 --- a/lite/proxy/proxy.go +++ b/lite/proxy/proxy.go @@ -45,11 +45,13 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe core.SetLogger(logger) mux.HandleFunc(wsEndpoint, wm.WebsocketHandler) - l, err := rpcserver.Listen(listenAddr, rpcserver.Config{MaxOpenConnections: maxOpenConnections}) + config := rpcserver.DefaultConfig() + config.MaxOpenConnections = maxOpenConnections + l, err := rpcserver.Listen(listenAddr, config) if err != nil { return err } - return rpcserver.StartHTTPServer(l, mux, logger) + return rpcserver.StartHTTPServer(l, mux, logger, config) } // RPCRoutes just routes everything to the given client, as if it were diff --git a/node/node.go b/node/node.go index f3f9dca3..8f71fa31 100644 --- a/node/node.go +++ b/node/node.go @@ -689,9 +689,18 @@ func (n *Node) startRPC() ([]net.Listener, error) { mux.HandleFunc("/websocket", wm.WebsocketHandler) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger) + config := rpcserver.DefaultConfig() + config.MaxOpenConnections = n.config.RPC.MaxOpenConnections + // If necessary adjust global WriteTimeout to ensure it's greater than + // TimeoutBroadcastTxCommit. + // See https://github.com/tendermint/tendermint/issues/3435 + if config.WriteTimeout <= n.config.RPC.TimeoutBroadcastTxCommit { + config.WriteTimeout = n.config.RPC.TimeoutBroadcastTxCommit + 1*time.Second + } + listener, err := rpcserver.Listen( listenAddr, - rpcserver.Config{MaxOpenConnections: n.config.RPC.MaxOpenConnections}, + config, ) if err != nil { return nil, err @@ -711,6 +720,7 @@ func (n *Node) startRPC() ([]net.Listener, error) { listener, rootHandler, rpcLogger, + config, ) listeners[i] = listener } @@ -718,8 +728,9 @@ func (n *Node) startRPC() ([]net.Listener, error) { // we expose a simplified api over grpc for convenience to app devs grpcListenAddr := n.config.RPC.GRPCListenAddress if grpcListenAddr != "" { - listener, err := rpcserver.Listen( - grpcListenAddr, rpcserver.Config{MaxOpenConnections: n.config.RPC.GRPCMaxOpenConnections}) + config := rpcserver.DefaultConfig() + config.MaxOpenConnections = n.config.RPC.MaxOpenConnections + listener, err := rpcserver.Listen(grpcListenAddr, config) if err != nil { return nil, err } diff --git a/rpc/core/events.go b/rpc/core/events.go index 3ea33fa8..6bc5ecc7 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -105,8 +105,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er if err != nil { return nil, errors.Wrap(err, "failed to parse query") } - - subCtx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) + subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout) defer cancel() sub, err := eventBus.Subscribe(subCtx, addr, q) if err != nil { diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 6ebdbcfc..967466e7 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -197,7 +197,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc } // Subscribe to tx being committed in block. - subCtx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) + subCtx, cancel := context.WithTimeout(ctx.Context(), SubscribeTimeout) defer cancel() q := types.EventQueryTxFor(tx) deliverTxSub, err := eventBus.Subscribe(subCtx, subscriber, q) diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index 0b760344..ad8afdef 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -1,6 +1,8 @@ package core import ( + "time" + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/consensus" "github.com/tendermint/tendermint/crypto" @@ -9,7 +11,6 @@ import ( mempl "github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" - rpcserver "github.com/tendermint/tendermint/rpc/lib/server" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" @@ -19,9 +20,11 @@ const ( // see README defaultPerPage = 30 maxPerPage = 100 -) -var subscribeTimeout = rpcserver.WriteTimeout / 2 + // SubscribeTimeout is the maximum time we wait to subscribe for an event. + // must be less than the server's write timeout (see rpcserver.DefaultConfig) + SubscribeTimeout = 5 * time.Second +) //---------------------------------------------- // These interfaces are used by RPC and must be thread safe diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index 68c134a7..3fa4de47 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -121,11 +121,12 @@ func setup() { wm := server.NewWebsocketManager(Routes, RoutesCdc, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second)) wm.SetLogger(tcpLogger) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) - listener1, err := server.Listen(tcpAddr, server.Config{}) + config := server.DefaultConfig() + listener1, err := server.Listen(tcpAddr, config) if err != nil { panic(err) } - go server.StartHTTPServer(listener1, mux, tcpLogger) + go server.StartHTTPServer(listener1, mux, tcpLogger, config) unixLogger := logger.With("socket", "unix") mux2 := http.NewServeMux() @@ -133,11 +134,11 @@ func setup() { wm = server.NewWebsocketManager(Routes, RoutesCdc) wm.SetLogger(unixLogger) mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler) - listener2, err := server.Listen(unixAddr, server.Config{}) + listener2, err := server.Listen(unixAddr, config) if err != nil { panic(err) } - go server.StartHTTPServer(listener2, mux2, unixLogger) + go server.StartHTTPServer(listener2, mux2, unixLogger, config) // wait for servers to start time.Sleep(time.Second * 2) diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index 36ea47da..6391b009 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -2,6 +2,7 @@ package rpcserver import ( "bytes" + "context" "encoding/hex" "encoding/json" "fmt" @@ -439,6 +440,9 @@ type wsConnection struct { // callback which is called upon disconnect onDisconnect func(remoteAddr string) + + ctx context.Context + cancel context.CancelFunc } // NewWSConnection wraps websocket.Conn. @@ -532,6 +536,10 @@ func (wsc *wsConnection) OnStop() { if wsc.onDisconnect != nil { wsc.onDisconnect(wsc.remoteAddr) } + + if wsc.ctx != nil { + wsc.cancel() + } } // GetRemoteAddr returns the remote address of the underlying connection. @@ -569,6 +577,16 @@ func (wsc *wsConnection) Codec() *amino.Codec { return wsc.cdc } +// Context returns the connection's context. +// The context is canceled when the client's connection closes. +func (wsc *wsConnection) Context() context.Context { + if wsc.ctx != nil { + return wsc.ctx + } + wsc.ctx, wsc.cancel = context.WithCancel(context.Background()) + return wsc.ctx +} + // Read from the socket and subscribe to or unsubscribe from events func (wsc *wsConnection) readRoutine() { defer func() { diff --git a/rpc/lib/server/http_server.go b/rpc/lib/server/http_server.go index 9db69b6f..c4bb6fa1 100644 --- a/rpc/lib/server/http_server.go +++ b/rpc/lib/server/http_server.go @@ -18,9 +18,23 @@ import ( types "github.com/tendermint/tendermint/rpc/lib/types" ) -// Config is an RPC server configuration. +// Config is a RPC server configuration. type Config struct { + // see netutil.LimitListener MaxOpenConnections int + // mirrors http.Server#ReadTimeout + ReadTimeout time.Duration + // mirrors http.Server#WriteTimeout + WriteTimeout time.Duration +} + +// DefaultConfig returns a default configuration. +func DefaultConfig() *Config { + return &Config{ + MaxOpenConnections: 0, // unlimited + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } } const ( @@ -30,25 +44,17 @@ const ( // same as the net/http default maxHeaderBytes = 1 << 20 - - // Timeouts for reading/writing to the http connection. - // Public so handlers can read them - - // /broadcast_tx_commit has it's own timeout, which should - // be less than the WriteTimeout here. - // TODO: use a config instead. - ReadTimeout = 3 * time.Second - WriteTimeout = 20 * time.Second ) // StartHTTPServer takes a listener and starts an HTTP server with the given handler. // It wraps handler with RecoverAndLogHandler. // NOTE: This function blocks - you may want to call it in a go-routine. -func StartHTTPServer(listener net.Listener, handler http.Handler, logger log.Logger) error { +func StartHTTPServer(listener net.Listener, handler http.Handler, logger log.Logger, config *Config) error { logger.Info(fmt.Sprintf("Starting RPC HTTP server on %s", listener.Addr())) s := &http.Server{ Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger), - ReadTimeout: ReadTimeout, - WriteTimeout: WriteTimeout, + ReadTimeout: config.ReadTimeout, + WriteTimeout: config.WriteTimeout, MaxHeaderBytes: maxHeaderBytes, } err := s.Serve(listener) @@ -64,13 +70,14 @@ func StartHTTPAndTLSServer( handler http.Handler, certFile, keyFile string, logger log.Logger, + config *Config, ) error { logger.Info(fmt.Sprintf("Starting RPC HTTPS server on %s (cert: %q, key: %q)", listener.Addr(), certFile, keyFile)) s := &http.Server{ Handler: RecoverAndLogHandler(maxBytesHandler{h: handler, n: maxBodyBytes}, logger), - ReadTimeout: ReadTimeout, - WriteTimeout: WriteTimeout, + ReadTimeout: config.ReadTimeout, + WriteTimeout: config.WriteTimeout, MaxHeaderBytes: maxHeaderBytes, } err := s.ServeTLS(listener, certFile, keyFile) @@ -180,7 +187,7 @@ func (h maxBytesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Listen starts a new net.Listener on the given address. // It returns an error if the address is invalid or the call to Listen() fails. -func Listen(addr string, config Config) (listener net.Listener, err error) { +func Listen(addr string, config *Config) (listener net.Listener, err error) { parts := strings.SplitN(addr, "://", 2) if len(parts) != 2 { return nil, errors.Errorf( diff --git a/rpc/lib/server/http_server_test.go b/rpc/lib/server/http_server_test.go index 6b852afa..7f47a30b 100644 --- a/rpc/lib/server/http_server_test.go +++ b/rpc/lib/server/http_server_test.go @@ -30,10 +30,12 @@ func TestMaxOpenConnections(t *testing.T) { time.Sleep(10 * time.Millisecond) fmt.Fprint(w, "some body") }) - l, err := Listen("tcp://127.0.0.1:0", Config{MaxOpenConnections: max}) + config := DefaultConfig() + config.MaxOpenConnections = max + l, err := Listen("tcp://127.0.0.1:0", config) require.NoError(t, err) defer l.Close() - go StartHTTPServer(l, mux, log.TestingLogger()) + go StartHTTPServer(l, mux, log.TestingLogger(), config) // Make N GET calls to the server. attempts := max * 2 @@ -64,15 +66,17 @@ func TestMaxOpenConnections(t *testing.T) { } func TestStartHTTPAndTLSServer(t *testing.T) { + config := DefaultConfig() + config.MaxOpenConnections = 1 // set up fixtures listenerAddr := "tcp://0.0.0.0:0" - listener, err := Listen(listenerAddr, Config{MaxOpenConnections: 1}) + listener, err := Listen(listenerAddr, config) require.NoError(t, err) mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {}) // test failure - err = StartHTTPAndTLSServer(listener, mux, "", "", log.TestingLogger()) + err = StartHTTPAndTLSServer(listener, mux, "", "", log.TestingLogger(), config) require.IsType(t, (*os.PathError)(nil), err) // TODO: test that starting the server can actually work diff --git a/rpc/lib/test/main.go b/rpc/lib/test/main.go index 3afc1ac1..2e433b90 100644 --- a/rpc/lib/test/main.go +++ b/rpc/lib/test/main.go @@ -36,9 +36,10 @@ func main() { cmn.TrapSignal(logger, func() {}) rpcserver.RegisterRPCFuncs(mux, routes, cdc, logger) - listener, err := rpcserver.Listen("0.0.0.0:8008", rpcserver.Config{}) + config := rpcserver.DefaultConfig() + listener, err := rpcserver.Listen("0.0.0.0:8008", config) if err != nil { cmn.Exit(err.Error()) } - rpcserver.StartHTTPServer(listener, mux, logger) + rpcserver.StartHTTPServer(listener, mux, logger, config) } diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index 21623e41..14317d43 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -1,6 +1,7 @@ package rpctypes import ( + "context" "encoding/json" "fmt" "net/http" @@ -243,6 +244,8 @@ type WSRPCConnection interface { TryWriteRPCResponse(resp RPCResponse) bool // Codec returns an Amino codec used. Codec() *amino.Codec + // Context returns the connection's context. + Context() context.Context } // Context is the first parameter for all functions. It carries a json-rpc @@ -260,8 +263,12 @@ type Context struct { HTTPReq *http.Request } -// RemoteAddr returns either HTTPReq#RemoteAddr or result of the -// WSConn#GetRemoteAddr(). +// RemoteAddr returns the remote address (usually a string "IP:port"). +// If neither HTTPReq nor WSConn is set, an empty string is returned. +// HTTP: +// http.Request#RemoteAddr +// WS: +// result of GetRemoteAddr func (ctx *Context) RemoteAddr() string { if ctx.HTTPReq != nil { return ctx.HTTPReq.RemoteAddr @@ -271,6 +278,22 @@ func (ctx *Context) RemoteAddr() string { return "" } +// Context returns the request's context. +// The returned context is always non-nil; it defaults to the background context. +// HTTP: +// The context is canceled when the client's connection closes, the request +// is canceled (with HTTP/2), or when the ServeHTTP method returns. +// WS: +// The context is canceled when the client's connections closes. +func (ctx *Context) Context() context.Context { + if ctx.HTTPReq != nil { + return ctx.HTTPReq.Context() + } else if ctx.WSConn != nil { + return ctx.WSConn.Context() + } + return context.Background() +} + //---------------------------------------- // SOCKETS diff --git a/tools/tm-monitor/rpc.go b/tools/tm-monitor/rpc.go index 1a08a9ec..4412e6e0 100644 --- a/tools/tm-monitor/rpc.go +++ b/tools/tm-monitor/rpc.go @@ -17,11 +17,12 @@ func startRPC(listenAddr string, m *monitor.Monitor, logger log.Logger) net.List wm := rpc.NewWebsocketManager(routes, nil) mux.HandleFunc("/websocket", wm.WebsocketHandler) rpc.RegisterRPCFuncs(mux, routes, cdc, logger) - listener, err := rpc.Listen(listenAddr, rpc.Config{}) + config := rpc.DefaultConfig() + listener, err := rpc.Listen(listenAddr, config) if err != nil { panic(err) } - go rpc.StartHTTPServer(listener, mux, logger) + go rpc.StartHTTPServer(listener, mux, logger, config) return listener }