make RPC server's ping period and pong wait configurable via options

This commit is contained in:
Anton Kaliaev 2017-08-07 18:29:55 -04:00
parent 57eee2466b
commit c14b39da5f
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
2 changed files with 54 additions and 29 deletions

View File

@ -31,6 +31,8 @@ const (
unixAddr = "unix://" + unixSocket unixAddr = "unix://" + unixSocket
websocketEndpoint = "/websocket/endpoint" websocketEndpoint = "/websocket/endpoint"
testPongWait = 2 * time.Second
) )
type ResultEcho struct { type ResultEcho struct {
@ -113,7 +115,7 @@ func setup() {
tcpLogger := logger.With("socket", "tcp") tcpLogger := logger.With("socket", "tcp")
mux := http.NewServeMux() mux := http.NewServeMux()
server.RegisterRPCFuncs(mux, Routes, tcpLogger) server.RegisterRPCFuncs(mux, Routes, tcpLogger)
wm := server.NewWebsocketManager(Routes, nil) wm := server.NewWebsocketManager(Routes, nil, server.PingPong((testPongWait*9)/10, testPongWait))
wm.SetLogger(tcpLogger) wm.SetLogger(tcpLogger)
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler) mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
go func() { go func() {
@ -276,7 +278,7 @@ func TestServersAndClientsBasic(t *testing.T) {
testWithHTTPClient(t, cl2) testWithHTTPClient(t, cl2)
cl3 := client.NewWSClient(addr, websocketEndpoint) cl3 := client.NewWSClient(addr, websocketEndpoint)
cl3.SetLogger(log.TestingLogger()) cl3.SetLogger(log.TestingLogger())
_, err := cl3.Start() _, err := cl3.Start()
require.Nil(t, err) require.Nil(t, err)
fmt.Printf("=== testing server on %s using %v client", addr, cl3) fmt.Printf("=== testing server on %s using %v client", addr, cl3)
@ -305,7 +307,7 @@ func TestQuotedStringArg(t *testing.T) {
func TestWSNewWSRPCFunc(t *testing.T) { func TestWSNewWSRPCFunc(t *testing.T) {
cl := client.NewWSClient(tcpAddr, websocketEndpoint) cl := client.NewWSClient(tcpAddr, websocketEndpoint)
cl.SetLogger(log.TestingLogger()) cl.SetLogger(log.TestingLogger())
_, err := cl.Start() _, err := cl.Start()
require.Nil(t, err) require.Nil(t, err)
defer cl.Stop() defer cl.Stop()
@ -331,7 +333,7 @@ func TestWSNewWSRPCFunc(t *testing.T) {
func TestWSHandlesArrayParams(t *testing.T) { func TestWSHandlesArrayParams(t *testing.T) {
cl := client.NewWSClient(tcpAddr, websocketEndpoint) cl := client.NewWSClient(tcpAddr, websocketEndpoint)
cl.SetLogger(log.TestingLogger()) cl.SetLogger(log.TestingLogger())
_, err := cl.Start() _, err := cl.Start()
require.Nil(t, err) require.Nil(t, err)
defer cl.Stop() defer cl.Stop()
@ -356,17 +358,13 @@ func TestWSHandlesArrayParams(t *testing.T) {
// TestWSClientPingPong checks that a client & server exchange pings // TestWSClientPingPong checks that a client & server exchange pings
// & pongs so connection stays alive. // & pongs so connection stays alive.
func TestWSClientPingPong(t *testing.T) { func TestWSClientPingPong(t *testing.T) {
if testing.Short() {
t.Skip("skipping ping pong in short mode")
}
cl := client.NewWSClient(tcpAddr, websocketEndpoint) cl := client.NewWSClient(tcpAddr, websocketEndpoint)
cl.SetLogger(log.TestingLogger()) cl.SetLogger(log.TestingLogger())
_, err := cl.Start() _, err := cl.Start()
require.Nil(t, err) require.Nil(t, err)
defer cl.Stop() defer cl.Stop()
time.Sleep(35 * time.Second) time.Sleep((testPongWait * 11) / 10)
} }
func randBytes(t *testing.T) []byte { func randBytes(t *testing.T) []byte {

View File

@ -337,10 +337,10 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) {
// rpc.websocket // rpc.websocket
const ( const (
writeChanCapacity = 1000 writeChanCapacity = 1000
wsWriteTimeoutSeconds = 30 // each write times out after this. wsWriteWait = 30 * time.Second // each write times out after this.
wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. defaultWSPongWait = 30 * time.Second
wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. defaultWSPingPeriod = 10 * time.Second
) )
// a single websocket connection // a single websocket connection
@ -357,29 +357,54 @@ type wsConnection struct {
funcMap map[string]*RPCFunc funcMap map[string]*RPCFunc
evsw events.EventSwitch evsw events.EventSwitch
// Connection times out if we haven't received *anything* in this long, not even pings.
pongWait time.Duration
// Send pings to server with this period. Must be less than pongWait.
pingPeriod time.Duration
} }
// new websocket connection wrapper // NewWSConnection wraps websocket.Conn. See the commentary on the
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch) *wsConnection { // func(*wsConnection) functions for a detailed description of how to configure
// ping period and pong wait time.
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection {
wsc := &wsConnection{ wsc := &wsConnection{
remoteAddr: baseConn.RemoteAddr().String(), remoteAddr: baseConn.RemoteAddr().String(),
baseConn: baseConn, baseConn: baseConn,
writeChan: make(chan types.RPCResponse, writeChanCapacity), // error when full. writeChan: make(chan types.RPCResponse, writeChanCapacity), // error when full.
funcMap: funcMap, funcMap: funcMap,
evsw: evsw, evsw: evsw,
pongWait: defaultWSPongWait,
pingPeriod: defaultWSPingPeriod,
}
for _, option := range options {
option(wsc)
} }
wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc) wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc)
return wsc return wsc
} }
// PingPong allows changing ping period and pong wait time. If ping period
// greater or equal to pong wait time, panic will be thrown.
func PingPong(pingPeriod, pongWait time.Duration) func(*wsConnection) {
return func(wsc *wsConnection) {
if pingPeriod >= pongWait {
panic(fmt.Sprintf("ping period (%v) must be less than pong wait time (%v)", pingPeriod, pongWait))
}
wsc.pingPeriod = pingPeriod
wsc.pongWait = pongWait
}
}
// wsc.Start() blocks until the connection closes. // wsc.Start() blocks until the connection closes.
func (wsc *wsConnection) OnStart() error { func (wsc *wsConnection) OnStart() error {
wsc.BaseService.OnStart() wsc.BaseService.OnStart()
// these must be set before the readRoutine is created, as it may // these must be set before the readRoutine is created, as it may
// call wsc.Stop(), which accesses these timers // call wsc.Stop(), which accesses these timers
wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds) wsc.readTimeout = time.NewTimer(wsc.pongWait)
wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds) wsc.pingTicker = time.NewTicker(wsc.pingPeriod)
// Read subscriptions/unsubscriptions to events // Read subscriptions/unsubscriptions to events
go wsc.readRoutine() go wsc.readRoutine()
@ -387,13 +412,13 @@ func (wsc *wsConnection) OnStart() error {
// Custom Ping handler to touch readTimeout // Custom Ping handler to touch readTimeout
wsc.baseConn.SetPingHandler(func(m string) error { wsc.baseConn.SetPingHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97 // NOTE: https://github.com/gorilla/websocket/issues/97
go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(wsWriteWait))
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) wsc.readTimeout.Reset(wsc.pongWait)
return nil return nil
}) })
wsc.baseConn.SetPongHandler(func(m string) error { wsc.baseConn.SetPongHandler(func(m string) error {
// NOTE: https://github.com/gorilla/websocket/issues/97 // NOTE: https://github.com/gorilla/websocket/issues/97
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) wsc.readTimeout.Reset(wsc.pongWait)
return nil return nil
}) })
go wsc.readTimeoutRoutine() go wsc.readTimeoutRoutine()
@ -472,7 +497,7 @@ func (wsc *wsConnection) readRoutine() {
default: default:
var in []byte var in []byte
// Do not set a deadline here like below: // Do not set a deadline here like below:
// wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds)) // wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.pongWait))
// The client may not send anything for a while. // The client may not send anything for a while.
// We use `readTimeout` to handle read timeouts. // We use `readTimeout` to handle read timeouts.
_, in, err := wsc.baseConn.ReadMessage() _, in, err := wsc.baseConn.ReadMessage()
@ -559,7 +584,7 @@ func (wsc *wsConnection) writeRoutine() {
// All writes to the websocket must (re)set the write deadline. // All writes to the websocket must (re)set the write deadline.
// If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553) // If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553)
func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error { func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error {
wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) wsc.baseConn.SetWriteDeadline(time.Now().Add(wsWriteWait))
return wsc.baseConn.WriteMessage(msgType, msg) return wsc.baseConn.WriteMessage(msgType, msg)
} }
@ -570,12 +595,13 @@ func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error
// NOTE: The websocket path is defined externally, e.g. in node/node.go // NOTE: The websocket path is defined externally, e.g. in node/node.go
type WebsocketManager struct { type WebsocketManager struct {
websocket.Upgrader websocket.Upgrader
funcMap map[string]*RPCFunc funcMap map[string]*RPCFunc
evsw events.EventSwitch evsw events.EventSwitch
logger log.Logger logger log.Logger
wsConnOptions []func(*wsConnection)
} }
func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch) *WebsocketManager { func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
return &WebsocketManager{ return &WebsocketManager{
funcMap: funcMap, funcMap: funcMap,
evsw: evsw, evsw: evsw,
@ -587,7 +613,8 @@ func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch) *
return true return true
}, },
}, },
logger: log.NewNopLogger(), logger: log.NewNopLogger(),
wsConnOptions: wsConnOptions,
} }
} }
@ -605,7 +632,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
} }
// register connection // register connection
con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) con := NewWSConnection(wsConn, wm.funcMap, wm.evsw, wm.wsConnOptions...)
con.SetLogger(wm.logger) con.SetLogger(wm.logger)
wm.logger.Info("New websocket connection", "remote", con.remoteAddr) wm.logger.Info("New websocket connection", "remote", con.remoteAddr)
con.Start() // Blocking con.Start() // Blocking