limit number of open connections

Refs #1740

also, expose limit option for number concurrent streams for gRPC
(unlimited by default)
This commit is contained in:
Anton Kaliaev
2018-06-20 18:38:42 +04:00
parent 3e1baf68f8
commit 936a655990
15 changed files with 245 additions and 38 deletions

View File

@ -17,7 +17,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/go-amino"
amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
@ -123,7 +123,7 @@ func setup() {
wm.SetLogger(tcpLogger)
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
go func() {
_, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger)
_, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger, server.Config{})
if err != nil {
panic(err)
}
@ -136,7 +136,7 @@ func setup() {
wm.SetLogger(unixLogger)
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
go func() {
_, err := server.StartHTTPServer(unixAddr, mux2, unixLogger)
_, err := server.StartHTTPServer(unixAddr, mux2, unixLogger, server.Config{})
if err != nil {
panic(err)
}
@ -274,18 +274,18 @@ func TestServersAndClientsBasic(t *testing.T) {
serverAddrs := [...]string{tcpAddr, unixAddr}
for _, addr := range serverAddrs {
cl1 := client.NewURIClient(addr)
fmt.Printf("=== testing server on %s using %v client", addr, cl1)
fmt.Printf("=== testing server on %s using URI client", addr)
testWithHTTPClient(t, cl1)
cl2 := client.NewJSONRPCClient(addr)
fmt.Printf("=== testing server on %s using %v client", addr, cl2)
fmt.Printf("=== testing server on %s using JSONRPC client", addr)
testWithHTTPClient(t, cl2)
cl3 := client.NewWSClient(addr, websocketEndpoint)
cl3.SetLogger(log.TestingLogger())
err := cl3.Start()
require.Nil(t, err)
fmt.Printf("=== testing server on %s using %v client", addr, cl3)
fmt.Printf("=== testing server on %s using WS client", addr)
testWithWSClient(t, cl3)
cl3.Stop()
}

View File

@ -12,12 +12,20 @@ import (
"time"
"github.com/pkg/errors"
"golang.org/x/net/netutil"
types "github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tmlibs/log"
)
func StartHTTPServer(listenAddr string, handler http.Handler, logger log.Logger) (listener net.Listener, err error) {
// Config is an RPC server configuration.
type Config struct {
MaxOpenConnections int
}
// StartHTTPServer starts an HTTP server on listenAddr with the given handler.
// It wraps handler with RecoverAndLogHandler.
func StartHTTPServer(listenAddr string, handler http.Handler, logger log.Logger, config Config) (listener net.Listener, err error) {
var proto, addr string
parts := strings.SplitN(listenAddr, "://", 2)
if len(parts) != 2 {
@ -30,6 +38,9 @@ func StartHTTPServer(listenAddr string, handler http.Handler, logger log.Logger)
if err != nil {
return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err)
}
if config.MaxOpenConnections > 0 {
listener = netutil.LimitListener(listener, config.MaxOpenConnections)
}
go func() {
err := http.Serve(
@ -41,7 +52,10 @@ func StartHTTPServer(listenAddr string, handler http.Handler, logger log.Logger)
return listener, nil
}
func StartHTTPAndTLSServer(listenAddr string, handler http.Handler, certFile, keyFile string, logger log.Logger) (listener net.Listener, err error) {
// StartHTTPAndTLSServer starts an HTTPS server on listenAddr with the given
// handler.
// It wraps handler with RecoverAndLogHandler.
func StartHTTPAndTLSServer(listenAddr string, handler http.Handler, certFile, keyFile string, logger log.Logger, config Config) (listener net.Listener, err error) {
var proto, addr string
parts := strings.SplitN(listenAddr, "://", 2)
if len(parts) != 2 {
@ -54,6 +68,9 @@ func StartHTTPAndTLSServer(listenAddr string, handler http.Handler, certFile, ke
if err != nil {
return nil, errors.Errorf("Failed to listen on %v: %v", listenAddr, err)
}
if config.MaxOpenConnections > 0 {
listener = netutil.LimitListener(listener, config.MaxOpenConnections)
}
go func() {
err := http.ServeTLS(

View File

@ -0,0 +1,62 @@
package rpcserver
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/tendermint/tmlibs/log"
)
func TestMaxOpenConnections(t *testing.T) {
const max = 5 // max simultaneous connections
// Start the server.
var open int32
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if n := atomic.AddInt32(&open, 1); n > int32(max) {
t.Errorf("%d open connections, want <= %d", n, max)
}
defer atomic.AddInt32(&open, -1)
time.Sleep(10 * time.Millisecond)
fmt.Fprint(w, "some body")
})
l, err := StartHTTPServer("tcp://127.0.0.1:0", mux, log.TestingLogger(), Config{MaxOpenConnections: max})
if err != nil {
t.Fatal(err)
}
defer l.Close()
// Make N GET calls to the server.
attempts := max * 2
var wg sync.WaitGroup
var failed int32
for i := 0; i < attempts; i++ {
wg.Add(1)
go func() {
defer wg.Done()
c := http.Client{Timeout: 3 * time.Second}
r, err := c.Get("http://" + l.Addr().String())
if err != nil {
t.Log(err)
atomic.AddInt32(&failed, 1)
return
}
defer r.Body.Close()
io.Copy(ioutil.Discard, r.Body)
}()
}
wg.Wait()
// We expect some Gets to fail as the server's accept queue is filled,
// but most should succeed.
if int(failed) >= attempts/2 {
t.Errorf("%d requests failed within %d attempts", failed, attempts)
}
}

View File

@ -5,7 +5,7 @@ import (
"net/http"
"os"
"github.com/tendermint/go-amino"
amino "github.com/tendermint/go-amino"
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
@ -28,7 +28,7 @@ func main() {
cdc := amino.NewCodec()
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
rpcserver.RegisterRPCFuncs(mux, routes, cdc, logger)
_, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux, logger)
_, err := rpcserver.StartHTTPServer("0.0.0.0:8008", mux, logger, rpcserver.Config{})
if err != nil {
cmn.Exit(err.Error())
}