mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-13 13:21:20 +00:00
Merge pull request #604 from tendermint/bugfix/ws-io-timeout
Biff up RPC WSClient
This commit is contained in:
@ -1,30 +1,28 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"time"
|
"time"
|
||||||
//"encoding/hex"
|
//"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
|
||||||
"github.com/tendermint/go-wire"
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
_ "github.com/tendermint/tendermint/rpc/core/types" // Register RPCResponse > Result types
|
|
||||||
"github.com/tendermint/tendermint/rpc/lib/client"
|
|
||||||
"github.com/tendermint/tendermint/rpc/lib/types"
|
|
||||||
. "github.com/tendermint/tmlibs/common"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
ws := rpcclient.NewWSClient("127.0.0.1:46657", "/websocket")
|
wsc := rpcclient.NewWSClient("127.0.0.1:46657", "/websocket")
|
||||||
_, err := ws.Start()
|
_, err := wsc.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Exit(err.Error())
|
cmn.Exit(err.Error())
|
||||||
}
|
}
|
||||||
|
defer wsc.Stop()
|
||||||
|
|
||||||
// Read a bunch of responses
|
// Read a bunch of responses
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
_, ok := <-ws.ResultsCh
|
_, ok := <-wsc.ResultsCh
|
||||||
if !ok {
|
if !ok {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -37,24 +35,14 @@ func main() {
|
|||||||
for i := 0; ; i++ {
|
for i := 0; ; i++ {
|
||||||
binary.BigEndian.PutUint64(buf, uint64(i))
|
binary.BigEndian.PutUint64(buf, uint64(i))
|
||||||
//txBytes := hex.EncodeToString(buf[:n])
|
//txBytes := hex.EncodeToString(buf[:n])
|
||||||
request, err := rpctypes.MapToRequest("fakeid",
|
|
||||||
"broadcast_tx",
|
|
||||||
map[string]interface{}{"tx": buf[:8]})
|
|
||||||
if err != nil {
|
|
||||||
Exit(err.Error())
|
|
||||||
}
|
|
||||||
reqBytes := wire.JSONBytes(request)
|
|
||||||
//fmt.Println("!!", string(reqBytes))
|
|
||||||
fmt.Print(".")
|
fmt.Print(".")
|
||||||
err = ws.WriteMessage(websocket.TextMessage, reqBytes)
|
err = wsc.Call(context.TODO(), "broadcast_tx", map[string]interface{}{"tx": buf[:8]})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Exit(err.Error())
|
cmn.Exit(err.Error())
|
||||||
}
|
}
|
||||||
if i%1000 == 0 {
|
if i%1000 == 0 {
|
||||||
fmt.Println(i)
|
fmt.Println(i)
|
||||||
}
|
}
|
||||||
time.Sleep(time.Microsecond * 1000)
|
time.Sleep(time.Microsecond * 1000)
|
||||||
}
|
}
|
||||||
|
|
||||||
ws.Stop()
|
|
||||||
}
|
}
|
||||||
|
7
glide.lock
generated
7
glide.lock
generated
@ -1,5 +1,5 @@
|
|||||||
hash: 2c988aae9517b386ee911e4da5deb9f5034359b7e2ccf448952a3ddb9771222d
|
hash: 41581813ff97225a7feb86b5accb0fe4acb3e198b64592d7452240e9473c479f
|
||||||
updated: 2017-06-28T13:04:20.907047164+02:00
|
updated: 2017-08-03T19:17:16.410522485Z
|
||||||
imports:
|
imports:
|
||||||
- name: github.com/btcsuite/btcd
|
- name: github.com/btcsuite/btcd
|
||||||
version: b8df516b4b267acf2de46be593a9d948d1d2c420
|
version: b8df516b4b267acf2de46be593a9d948d1d2c420
|
||||||
@ -61,6 +61,8 @@ imports:
|
|||||||
version: 5ccdfb18c776b740aecaf085c4d9a2779199c279
|
version: 5ccdfb18c776b740aecaf085c4d9a2779199c279
|
||||||
- name: github.com/pkg/errors
|
- name: github.com/pkg/errors
|
||||||
version: 645ef00459ed84a119197bfb8d8205042c6df63d
|
version: 645ef00459ed84a119197bfb8d8205042c6df63d
|
||||||
|
- name: github.com/rcrowley/go-metrics
|
||||||
|
version: 1f30fe9094a513ce4c700b9a54458bbb0c96996c
|
||||||
- name: github.com/spf13/afero
|
- name: github.com/spf13/afero
|
||||||
version: 9be650865eab0c12963d8753212f4f9c66cdcf12
|
version: 9be650865eab0c12963d8753212f4f9c66cdcf12
|
||||||
subpackages:
|
subpackages:
|
||||||
@ -126,7 +128,6 @@ imports:
|
|||||||
- clist
|
- clist
|
||||||
- common
|
- common
|
||||||
- db
|
- db
|
||||||
- events
|
|
||||||
- flowrate
|
- flowrate
|
||||||
- log
|
- log
|
||||||
- merkle
|
- merkle
|
||||||
|
20
glide.yaml
20
glide.yaml
@ -7,11 +7,10 @@ import:
|
|||||||
- package: github.com/golang/protobuf
|
- package: github.com/golang/protobuf
|
||||||
subpackages:
|
subpackages:
|
||||||
- proto
|
- proto
|
||||||
- package: github.com/pelletier/go-toml
|
|
||||||
version: ^1.0.0
|
|
||||||
- package: github.com/gorilla/websocket
|
- package: github.com/gorilla/websocket
|
||||||
- package: github.com/pkg/errors
|
- package: github.com/pkg/errors
|
||||||
version: ~0.8.0
|
version: ~0.8.0
|
||||||
|
- package: github.com/rcrowley/go-metrics
|
||||||
- package: github.com/spf13/cobra
|
- package: github.com/spf13/cobra
|
||||||
- package: github.com/spf13/viper
|
- package: github.com/spf13/viper
|
||||||
- package: github.com/tendermint/abci
|
- package: github.com/tendermint/abci
|
||||||
@ -26,21 +25,15 @@ import:
|
|||||||
version: ~0.6.2
|
version: ~0.6.2
|
||||||
subpackages:
|
subpackages:
|
||||||
- data
|
- data
|
||||||
- package: github.com/tendermint/merkleeyes
|
|
||||||
version: ~0.2.4
|
|
||||||
subpackages:
|
|
||||||
- app
|
|
||||||
- iavl
|
|
||||||
- testutil
|
|
||||||
- package: github.com/tendermint/tmlibs
|
- package: github.com/tendermint/tmlibs
|
||||||
version: ~0.2.2
|
version: ~0.2.2
|
||||||
subpackages:
|
subpackages:
|
||||||
- autofile
|
- autofile
|
||||||
- cli
|
- cli
|
||||||
|
- cli/flags
|
||||||
- clist
|
- clist
|
||||||
- common
|
- common
|
||||||
- db
|
- db
|
||||||
- events
|
|
||||||
- flowrate
|
- flowrate
|
||||||
- log
|
- log
|
||||||
- merkle
|
- merkle
|
||||||
@ -53,7 +46,16 @@ import:
|
|||||||
subpackages:
|
subpackages:
|
||||||
- context
|
- context
|
||||||
- package: google.golang.org/grpc
|
- package: google.golang.org/grpc
|
||||||
|
- package: github.com/tendermint/merkleeyes
|
||||||
|
version: ~0.2.4
|
||||||
|
subpackages:
|
||||||
|
- app
|
||||||
|
- iavl
|
||||||
|
- testutil
|
||||||
testImport:
|
testImport:
|
||||||
|
- package: github.com/go-kit/kit
|
||||||
|
subpackages:
|
||||||
|
- log/term
|
||||||
- package: github.com/stretchr/testify
|
- package: github.com/stretchr/testify
|
||||||
subpackages:
|
subpackages:
|
||||||
- assert
|
- assert
|
||||||
|
@ -335,9 +335,9 @@ func (n *Node) startRPC() ([]net.Listener, error) {
|
|||||||
listeners := make([]net.Listener, len(listenAddrs))
|
listeners := make([]net.Listener, len(listenAddrs))
|
||||||
for i, listenAddr := range listenAddrs {
|
for i, listenAddr := range listenAddrs {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
|
|
||||||
rpcLogger := n.Logger.With("module", "rpc-server")
|
rpcLogger := n.Logger.With("module", "rpc-server")
|
||||||
wm.SetLogger(rpcLogger)
|
wm := rpcserver.NewWebsocketManager(rpccore.Routes, n.evsw)
|
||||||
|
wm.SetLogger(rpcLogger.With("protocol", "websocket"))
|
||||||
mux.HandleFunc("/websocket", wm.WebsocketHandler)
|
mux.HandleFunc("/websocket", wm.WebsocketHandler)
|
||||||
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
|
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)
|
||||||
listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
|
listener, err := rpcserver.StartHTTPServer(listenAddr, mux, rpcLogger)
|
||||||
|
@ -1,13 +1,14 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
data "github.com/tendermint/go-wire/data"
|
data "github.com/tendermint/go-wire/data"
|
||||||
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||||
"github.com/tendermint/tendermint/rpc/lib/client"
|
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
events "github.com/tendermint/tmlibs/events"
|
events "github.com/tendermint/tmlibs/events"
|
||||||
)
|
)
|
||||||
@ -349,14 +350,14 @@ func (w *WSEvents) parseEvent(data []byte) (err error) {
|
|||||||
// no way of exposing these failures, so we panic.
|
// no way of exposing these failures, so we panic.
|
||||||
// is this right? or silently ignore???
|
// is this right? or silently ignore???
|
||||||
func (w *WSEvents) subscribe(event string) {
|
func (w *WSEvents) subscribe(event string) {
|
||||||
err := w.ws.Subscribe(event)
|
err := w.ws.Subscribe(context.TODO(), event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WSEvents) unsubscribe(event string) {
|
func (w *WSEvents) unsubscribe(event string) {
|
||||||
err := w.ws.Unsubscribe(event)
|
err := w.ws.Unsubscribe(context.TODO(), event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
@ -1,160 +1,456 @@
|
|||||||
package rpcclient
|
package rpcclient
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
metrics "github.com/rcrowley/go-metrics"
|
||||||
|
|
||||||
types "github.com/tendermint/tendermint/rpc/lib/types"
|
types "github.com/tendermint/tendermint/rpc/lib/types"
|
||||||
cmn "github.com/tendermint/tmlibs/common"
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
wsResultsChannelCapacity = 10
|
defaultMaxReconnectAttempts = 25
|
||||||
wsErrorsChannelCapacity = 1
|
defaultWriteWait = 0
|
||||||
wsWriteTimeoutSeconds = 10
|
defaultReadWait = 0
|
||||||
|
defaultPingPeriod = 0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// WSClient is a WebSocket client. The methods of WSClient are safe for use by
|
||||||
|
// multiple goroutines.
|
||||||
type WSClient struct {
|
type WSClient struct {
|
||||||
cmn.BaseService
|
cmn.BaseService
|
||||||
|
|
||||||
|
conn *websocket.Conn
|
||||||
|
|
||||||
Address string // IP:PORT or /path/to/socket
|
Address string // IP:PORT or /path/to/socket
|
||||||
Endpoint string // /websocket/url/endpoint
|
Endpoint string // /websocket/url/endpoint
|
||||||
Dialer func(string, string) (net.Conn, error)
|
Dialer func(string, string) (net.Conn, error)
|
||||||
*websocket.Conn
|
|
||||||
ResultsCh chan json.RawMessage // closes upon WSClient.Stop()
|
// Time between sending a ping and receiving a pong. See
|
||||||
ErrorsCh chan error // closes upon WSClient.Stop()
|
// https://godoc.org/github.com/rcrowley/go-metrics#Timer.
|
||||||
|
PingPongLatencyTimer metrics.Timer
|
||||||
|
|
||||||
|
// user facing channels, closed only when the client is being stopped.
|
||||||
|
ResultsCh chan json.RawMessage
|
||||||
|
ErrorsCh chan error
|
||||||
|
|
||||||
|
// internal channels
|
||||||
|
send chan types.RPCRequest // user requests
|
||||||
|
backlog chan types.RPCRequest // stores a single user request received during a conn failure
|
||||||
|
reconnectAfter chan error // reconnect requests
|
||||||
|
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
|
||||||
|
|
||||||
|
wg sync.WaitGroup
|
||||||
|
|
||||||
|
mtx sync.RWMutex
|
||||||
|
sentLastPingAt time.Time
|
||||||
|
reconnecting bool
|
||||||
|
|
||||||
|
// Maximum reconnect attempts (0 or greater; default: 25).
|
||||||
|
maxReconnectAttempts int
|
||||||
|
|
||||||
|
// Time allowed to write a message to the server. 0 means block until operation succeeds.
|
||||||
|
writeWait time.Duration
|
||||||
|
|
||||||
|
// Time allowed to read the next message from the server. 0 means block until operation succeeds.
|
||||||
|
readWait time.Duration
|
||||||
|
|
||||||
|
// Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
|
||||||
|
pingPeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a new connection
|
// NewWSClient returns a new client. See the commentary on the func(*WSClient)
|
||||||
func NewWSClient(remoteAddr, endpoint string) *WSClient {
|
// functions for a detailed description of how to configure ping period and
|
||||||
|
// pong wait time. The endpoint argument must begin with a `/`.
|
||||||
|
func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient {
|
||||||
addr, dialer := makeHTTPDialer(remoteAddr)
|
addr, dialer := makeHTTPDialer(remoteAddr)
|
||||||
wsClient := &WSClient{
|
c := &WSClient{
|
||||||
Address: addr,
|
Address: addr,
|
||||||
Dialer: dialer,
|
Dialer: dialer,
|
||||||
Endpoint: endpoint,
|
Endpoint: endpoint,
|
||||||
Conn: nil,
|
PingPongLatencyTimer: metrics.NewTimer(),
|
||||||
|
|
||||||
|
maxReconnectAttempts: defaultMaxReconnectAttempts,
|
||||||
|
readWait: defaultReadWait,
|
||||||
|
writeWait: defaultWriteWait,
|
||||||
|
pingPeriod: defaultPingPeriod,
|
||||||
}
|
}
|
||||||
wsClient.BaseService = *cmn.NewBaseService(nil, "WSClient", wsClient)
|
c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
|
||||||
return wsClient
|
for _, option := range options {
|
||||||
|
option(c)
|
||||||
|
}
|
||||||
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wsc *WSClient) String() string {
|
// MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
|
||||||
return wsc.Address + ", " + wsc.Endpoint
|
// It should only be used in the constructor and is not Goroutine-safe.
|
||||||
|
func MaxReconnectAttempts(max int) func(*WSClient) {
|
||||||
|
return func(c *WSClient) {
|
||||||
|
c.maxReconnectAttempts = max
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnStart implements cmn.BaseService interface
|
// ReadWait sets the amount of time to wait before a websocket read times out.
|
||||||
func (wsc *WSClient) OnStart() error {
|
// It should only be used in the constructor and is not Goroutine-safe.
|
||||||
wsc.BaseService.OnStart()
|
func ReadWait(readWait time.Duration) func(*WSClient) {
|
||||||
err := wsc.dial()
|
return func(c *WSClient) {
|
||||||
|
c.readWait = readWait
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteWait sets the amount of time to wait before a websocket write times out.
|
||||||
|
// It should only be used in the constructor and is not Goroutine-safe.
|
||||||
|
func WriteWait(writeWait time.Duration) func(*WSClient) {
|
||||||
|
return func(c *WSClient) {
|
||||||
|
c.writeWait = writeWait
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PingPeriod sets the duration for sending websocket pings.
|
||||||
|
// It should only be used in the constructor - not Goroutine-safe.
|
||||||
|
func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
|
||||||
|
return func(c *WSClient) {
|
||||||
|
c.pingPeriod = pingPeriod
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns WS client full address.
|
||||||
|
func (c *WSClient) String() string {
|
||||||
|
return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnStart implements cmn.Service by dialing a server and creating read and
|
||||||
|
// write routines.
|
||||||
|
func (c *WSClient) OnStart() error {
|
||||||
|
err := c.dial()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
wsc.ResultsCh = make(chan json.RawMessage, wsResultsChannelCapacity)
|
|
||||||
wsc.ErrorsCh = make(chan error, wsErrorsChannelCapacity)
|
c.ResultsCh = make(chan json.RawMessage)
|
||||||
go wsc.receiveEventsRoutine()
|
c.ErrorsCh = make(chan error)
|
||||||
|
|
||||||
|
c.send = make(chan types.RPCRequest)
|
||||||
|
// 1 additional error may come from the read/write
|
||||||
|
// goroutine depending on which failed first.
|
||||||
|
c.reconnectAfter = make(chan error, 1)
|
||||||
|
// capacity for 1 request. a user won't be able to send more because the send
|
||||||
|
// channel is unbuffered.
|
||||||
|
c.backlog = make(chan types.RPCRequest, 1)
|
||||||
|
|
||||||
|
c.startReadWriteRoutines()
|
||||||
|
go c.reconnectRoutine()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnReset implements cmn.BaseService interface
|
// OnStop implements cmn.Service.
|
||||||
func (wsc *WSClient) OnReset() error {
|
func (c *WSClient) OnStop() {}
|
||||||
return nil
|
|
||||||
|
// Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
|
||||||
|
// channel is closed.
|
||||||
|
func (c *WSClient) Stop() bool {
|
||||||
|
success := c.BaseService.Stop()
|
||||||
|
// only close user-facing channels when we can't write to them
|
||||||
|
c.wg.Wait()
|
||||||
|
close(c.ResultsCh)
|
||||||
|
close(c.ErrorsCh)
|
||||||
|
return success
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wsc *WSClient) dial() error {
|
// IsReconnecting returns true if the client is reconnecting right now.
|
||||||
|
func (c *WSClient) IsReconnecting() bool {
|
||||||
|
c.mtx.RLock()
|
||||||
|
defer c.mtx.RUnlock()
|
||||||
|
return c.reconnecting
|
||||||
|
}
|
||||||
|
|
||||||
// Dial
|
// IsActive returns true if the client is running and not reconnecting.
|
||||||
|
func (c *WSClient) IsActive() bool {
|
||||||
|
return c.IsRunning() && !c.IsReconnecting()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the given RPC request to the server. Results will be available on
|
||||||
|
// ResultsCh, errors, if any, on ErrorsCh. Will block until send succeeds or
|
||||||
|
// ctx.Done is closed.
|
||||||
|
func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
|
||||||
|
select {
|
||||||
|
case c.send <- request:
|
||||||
|
c.Logger.Info("sent a request", "req", request)
|
||||||
|
return nil
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the given method. See Send description.
|
||||||
|
func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
|
||||||
|
request, err := types.MapToRequest("", method, params)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return c.Send(ctx, request)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CallWithArrayParams the given method with params in a form of array. See
|
||||||
|
// Send description.
|
||||||
|
func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error {
|
||||||
|
request, err := types.ArrayToRequest("", method, params)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return c.Send(ctx, request)
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Private methods
|
||||||
|
|
||||||
|
func (c *WSClient) dial() error {
|
||||||
dialer := &websocket.Dialer{
|
dialer := &websocket.Dialer{
|
||||||
NetDial: wsc.Dialer,
|
NetDial: c.Dialer,
|
||||||
Proxy: http.ProxyFromEnvironment,
|
Proxy: http.ProxyFromEnvironment,
|
||||||
}
|
}
|
||||||
rHeader := http.Header{}
|
rHeader := http.Header{}
|
||||||
con, _, err := dialer.Dial("ws://"+wsc.Address+wsc.Endpoint, rHeader)
|
conn, _, err := dialer.Dial("ws://"+c.Address+c.Endpoint, rHeader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Set the ping/pong handlers
|
c.conn = conn
|
||||||
con.SetPingHandler(func(m string) error {
|
|
||||||
// NOTE: https://github.com/gorilla/websocket/issues/97
|
|
||||||
go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
con.SetPongHandler(func(m string) error {
|
|
||||||
// NOTE: https://github.com/gorilla/websocket/issues/97
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
wsc.Conn = con
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnStop implements cmn.BaseService interface
|
// reconnect tries to redial up to maxReconnectAttempts with exponential
|
||||||
func (wsc *WSClient) OnStop() {
|
// backoff.
|
||||||
wsc.BaseService.OnStop()
|
func (c *WSClient) reconnect() error {
|
||||||
wsc.Conn.Close()
|
attempt := 0
|
||||||
// ResultsCh/ErrorsCh is closed in receiveEventsRoutine.
|
|
||||||
}
|
c.mtx.Lock()
|
||||||
|
c.reconnecting = true
|
||||||
|
c.mtx.Unlock()
|
||||||
|
defer func() {
|
||||||
|
c.mtx.Lock()
|
||||||
|
c.reconnecting = false
|
||||||
|
c.mtx.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
func (wsc *WSClient) receiveEventsRoutine() {
|
|
||||||
for {
|
for {
|
||||||
_, data, err := wsc.ReadMessage()
|
c.Logger.Info("reconnecting", "attempt", attempt+1)
|
||||||
|
|
||||||
|
d := time.Duration(math.Exp2(float64(attempt)))
|
||||||
|
time.Sleep(d * time.Second)
|
||||||
|
|
||||||
|
err := c.dial()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wsc.Logger.Info("WSClient failed to read message", "err", err, "data", string(data))
|
c.Logger.Error("failed to redial", "err", err)
|
||||||
wsc.Stop()
|
|
||||||
break
|
|
||||||
} else {
|
} else {
|
||||||
var response types.RPCResponse
|
c.Logger.Info("reconnected")
|
||||||
err := json.Unmarshal(data, &response)
|
return nil
|
||||||
if err != nil {
|
}
|
||||||
wsc.Logger.Info("WSClient failed to parse message", "err", err, "data", string(data))
|
|
||||||
wsc.ErrorsCh <- err
|
attempt++
|
||||||
continue
|
|
||||||
}
|
if attempt > c.maxReconnectAttempts {
|
||||||
if response.Error != "" {
|
return errors.Wrap(err, "reached maximum reconnect attempts")
|
||||||
wsc.ErrorsCh <- errors.Errorf(response.Error)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
wsc.ResultsCh <- *response.Result
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// this must be modified in the same go-routine that reads from the
|
|
||||||
// connection to avoid race conditions
|
|
||||||
wsc.Conn = nil
|
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
close(wsc.ResultsCh)
|
|
||||||
close(wsc.ErrorsCh)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *WSClient) startReadWriteRoutines() {
|
||||||
|
c.wg.Add(2)
|
||||||
|
c.readRoutineQuit = make(chan struct{})
|
||||||
|
go c.readRoutine()
|
||||||
|
go c.writeRoutine()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *WSClient) processBacklog() error {
|
||||||
|
select {
|
||||||
|
case request := <-c.backlog:
|
||||||
|
if c.writeWait > 0 {
|
||||||
|
c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
|
||||||
|
}
|
||||||
|
err := c.conn.WriteJSON(request)
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Error("failed to resend request", "err", err)
|
||||||
|
c.reconnectAfter <- err
|
||||||
|
// requeue request
|
||||||
|
c.backlog <- request
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.Logger.Info("resend a request", "req", request)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *WSClient) reconnectRoutine() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case originalError := <-c.reconnectAfter:
|
||||||
|
// wait until writeRoutine and readRoutine finish
|
||||||
|
c.wg.Wait()
|
||||||
|
err := c.reconnect()
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError)
|
||||||
|
c.Stop()
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
// drain reconnectAfter
|
||||||
|
LOOP:
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.reconnectAfter:
|
||||||
|
default:
|
||||||
|
break LOOP
|
||||||
|
}
|
||||||
|
}
|
||||||
|
err = c.processBacklog()
|
||||||
|
if err == nil {
|
||||||
|
c.startReadWriteRoutines()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-c.Quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The client ensures that there is at most one writer to a connection by
|
||||||
|
// executing all writes from this goroutine.
|
||||||
|
func (c *WSClient) writeRoutine() {
|
||||||
|
var ticker *time.Ticker
|
||||||
|
if c.pingPeriod > 0 {
|
||||||
|
// ticker with a predefined period
|
||||||
|
ticker = time.NewTicker(c.pingPeriod)
|
||||||
|
} else {
|
||||||
|
// ticker that never fires
|
||||||
|
ticker = &time.Ticker{C: make(<-chan time.Time)}
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
ticker.Stop()
|
||||||
|
c.conn.Close()
|
||||||
|
c.wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case request := <-c.send:
|
||||||
|
if c.writeWait > 0 {
|
||||||
|
c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
|
||||||
|
}
|
||||||
|
err := c.conn.WriteJSON(request)
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Error("failed to send request", "err", err)
|
||||||
|
c.reconnectAfter <- err
|
||||||
|
// add request to the backlog, so we don't lose it
|
||||||
|
c.backlog <- request
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-ticker.C:
|
||||||
|
if c.writeWait > 0 {
|
||||||
|
c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
|
||||||
|
}
|
||||||
|
err := c.conn.WriteMessage(websocket.PingMessage, []byte{})
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Error("failed to write ping", "err", err)
|
||||||
|
c.reconnectAfter <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.mtx.Lock()
|
||||||
|
c.sentLastPingAt = time.Now()
|
||||||
|
c.mtx.Unlock()
|
||||||
|
c.Logger.Debug("sent ping")
|
||||||
|
case <-c.readRoutineQuit:
|
||||||
|
return
|
||||||
|
case <-c.Quit:
|
||||||
|
c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The client ensures that there is at most one reader to a connection by
|
||||||
|
// executing all reads from this goroutine.
|
||||||
|
func (c *WSClient) readRoutine() {
|
||||||
|
defer func() {
|
||||||
|
c.conn.Close()
|
||||||
|
c.wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
c.conn.SetPongHandler(func(string) error {
|
||||||
|
// gather latency stats
|
||||||
|
c.mtx.RLock()
|
||||||
|
t := c.sentLastPingAt
|
||||||
|
c.mtx.RUnlock()
|
||||||
|
c.PingPongLatencyTimer.UpdateSince(t)
|
||||||
|
|
||||||
|
c.Logger.Debug("got pong")
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
for {
|
||||||
|
// reset deadline for every message type (control or data)
|
||||||
|
if c.readWait > 0 {
|
||||||
|
c.conn.SetReadDeadline(time.Now().Add(c.readWait))
|
||||||
|
}
|
||||||
|
_, data, err := c.conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Logger.Error("failed to read response", "err", err)
|
||||||
|
close(c.readRoutineQuit)
|
||||||
|
c.reconnectAfter <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var response types.RPCResponse
|
||||||
|
err = json.Unmarshal(data, &response)
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Error("failed to parse response", "err", err, "data", string(data))
|
||||||
|
c.ErrorsCh <- err
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if response.Error != "" {
|
||||||
|
c.ErrorsCh <- errors.Errorf(response.Error)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
c.Logger.Info("got response", "resp", response.Result)
|
||||||
|
c.ResultsCh <- *response.Result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// Predefined methods
|
||||||
|
|
||||||
// Subscribe to an event. Note the server must have a "subscribe" route
|
// Subscribe to an event. Note the server must have a "subscribe" route
|
||||||
// defined.
|
// defined.
|
||||||
func (wsc *WSClient) Subscribe(eventid string) error {
|
func (c *WSClient) Subscribe(ctx context.Context, eventType string) error {
|
||||||
params := map[string]interface{}{"event": eventid}
|
params := map[string]interface{}{"event": eventType}
|
||||||
request, err := types.MapToRequest("", "subscribe", params)
|
return c.Call(ctx, "subscribe", params)
|
||||||
if err == nil {
|
|
||||||
err = wsc.WriteJSON(request)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unsubscribe from an event. Note the server must have a "unsubscribe" route
|
// Unsubscribe from an event. Note the server must have a "unsubscribe" route
|
||||||
// defined.
|
// defined.
|
||||||
func (wsc *WSClient) Unsubscribe(eventid string) error {
|
func (c *WSClient) Unsubscribe(ctx context.Context, eventType string) error {
|
||||||
params := map[string]interface{}{"event": eventid}
|
params := map[string]interface{}{"event": eventType}
|
||||||
request, err := types.MapToRequest("", "unsubscribe", params)
|
return c.Call(ctx, "unsubscribe", params)
|
||||||
if err == nil {
|
|
||||||
err = wsc.WriteJSON(request)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call asynchronously calls a given method by sending an RPCRequest to the
|
// UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route
|
||||||
// server. Results will be available on ResultsCh, errors, if any, on ErrorsCh.
|
// defined.
|
||||||
func (wsc *WSClient) Call(method string, params map[string]interface{}) error {
|
func (c *WSClient) UnsubscribeAll(ctx context.Context) error {
|
||||||
request, err := types.MapToRequest("", method, params)
|
params := map[string]interface{}{}
|
||||||
if err == nil {
|
return c.Call(ctx, "unsubscribe_all", params)
|
||||||
err = wsc.WriteJSON(request)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
193
rpc/lib/client/ws_client_test.go
Normal file
193
rpc/lib/client/ws_client_test.go
Normal file
@ -0,0 +1,193 @@
|
|||||||
|
package rpcclient
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"github.com/tendermint/tmlibs/log"
|
||||||
|
|
||||||
|
types "github.com/tendermint/tendermint/rpc/lib/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type myHandler struct {
|
||||||
|
closeConnAfterRead bool
|
||||||
|
mtx sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
var upgrader = websocket.Upgrader{
|
||||||
|
ReadBufferSize: 1024,
|
||||||
|
WriteBufferSize: 1024,
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
for {
|
||||||
|
messageType, _, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.mtx.RLock()
|
||||||
|
if h.closeConnAfterRead {
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
h.mtx.RUnlock()
|
||||||
|
|
||||||
|
res := json.RawMessage(`{}`)
|
||||||
|
emptyRespBytes, _ := json.Marshal(types.RPCResponse{Result: &res})
|
||||||
|
if err := conn.WriteMessage(messageType, emptyRespBytes); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWSClientReconnectsAfterReadFailure(t *testing.T) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// start server
|
||||||
|
h := &myHandler{}
|
||||||
|
s := httptest.NewServer(h)
|
||||||
|
defer s.Close()
|
||||||
|
|
||||||
|
c := startClient(t, s.Listener.Addr())
|
||||||
|
defer c.Stop()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go callWgDoneOnResult(t, c, &wg)
|
||||||
|
|
||||||
|
h.mtx.Lock()
|
||||||
|
h.closeConnAfterRead = true
|
||||||
|
h.mtx.Unlock()
|
||||||
|
|
||||||
|
// results in WS read error, no send retry because write succeeded
|
||||||
|
call(t, "a", c)
|
||||||
|
|
||||||
|
// expect to reconnect almost immediately
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
h.mtx.Lock()
|
||||||
|
h.closeConnAfterRead = false
|
||||||
|
h.mtx.Unlock()
|
||||||
|
|
||||||
|
// should succeed
|
||||||
|
call(t, "b", c)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWSClientReconnectsAfterWriteFailure(t *testing.T) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
// start server
|
||||||
|
h := &myHandler{}
|
||||||
|
s := httptest.NewServer(h)
|
||||||
|
|
||||||
|
c := startClient(t, s.Listener.Addr())
|
||||||
|
defer c.Stop()
|
||||||
|
|
||||||
|
wg.Add(2)
|
||||||
|
go callWgDoneOnResult(t, c, &wg)
|
||||||
|
|
||||||
|
// hacky way to abort the connection before write
|
||||||
|
c.conn.Close()
|
||||||
|
|
||||||
|
// results in WS write error, the client should resend on reconnect
|
||||||
|
call(t, "a", c)
|
||||||
|
|
||||||
|
// expect to reconnect almost immediately
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
// should succeed
|
||||||
|
call(t, "b", c)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWSClientReconnectFailure(t *testing.T) {
|
||||||
|
// start server
|
||||||
|
h := &myHandler{}
|
||||||
|
s := httptest.NewServer(h)
|
||||||
|
|
||||||
|
c := startClient(t, s.Listener.Addr())
|
||||||
|
defer c.Stop()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.ResultsCh:
|
||||||
|
case <-c.ErrorsCh:
|
||||||
|
case <-c.Quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// hacky way to abort the connection before write
|
||||||
|
c.conn.Close()
|
||||||
|
s.Close()
|
||||||
|
|
||||||
|
// results in WS write error
|
||||||
|
// provide timeout to avoid blocking
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
c.Call(ctx, "a", make(map[string]interface{}))
|
||||||
|
|
||||||
|
// expect to reconnect almost immediately
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
// client should block on this
|
||||||
|
call(t, "b", c)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// test that client blocks on the second send
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
t.Fatal("client should block on calling 'b' during reconnect")
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Log("All good")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func startClient(t *testing.T, addr net.Addr) *WSClient {
|
||||||
|
c := NewWSClient(addr.String(), "/websocket")
|
||||||
|
_, err := c.Start()
|
||||||
|
require.Nil(t, err)
|
||||||
|
c.SetLogger(log.TestingLogger())
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
|
func call(t *testing.T, method string, c *WSClient) {
|
||||||
|
err := c.Call(context.Background(), method, make(map[string]interface{}))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case res := <-c.ResultsCh:
|
||||||
|
if res != nil {
|
||||||
|
wg.Done()
|
||||||
|
}
|
||||||
|
case err := <-c.ErrorsCh:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
case <-c.Quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -2,15 +2,18 @@ package rpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
crand "crypto/rand"
|
crand "crypto/rand"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/go-kit/kit/log/term"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/tendermint/go-wire/data"
|
"github.com/tendermint/go-wire/data"
|
||||||
@ -75,8 +78,29 @@ func EchoDataBytesResult(v data.Bytes) (*ResultEchoDataBytes, error) {
|
|||||||
return &ResultEchoDataBytes{v}, nil
|
return &ResultEchoDataBytes{v}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
setup()
|
||||||
|
code := m.Run()
|
||||||
|
os.Exit(code)
|
||||||
|
}
|
||||||
|
|
||||||
|
var colorFn = func(keyvals ...interface{}) term.FgBgColor {
|
||||||
|
for i := 0; i < len(keyvals)-1; i += 2 {
|
||||||
|
if keyvals[i] == "socket" {
|
||||||
|
if keyvals[i+1] == "tcp" {
|
||||||
|
return term.FgBgColor{Fg: term.DarkBlue}
|
||||||
|
} else if keyvals[i+1] == "unix" {
|
||||||
|
return term.FgBgColor{Fg: term.DarkCyan}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return term.FgBgColor{}
|
||||||
|
}
|
||||||
|
|
||||||
// launch unix and tcp servers
|
// launch unix and tcp servers
|
||||||
func init() {
|
func setup() {
|
||||||
|
logger := log.NewTMLoggerWithColorFn(log.NewSyncWriter(os.Stdout), colorFn)
|
||||||
|
|
||||||
cmd := exec.Command("rm", "-f", unixSocket)
|
cmd := exec.Command("rm", "-f", unixSocket)
|
||||||
err := cmd.Start()
|
err := cmd.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -86,25 +110,27 @@ func init() {
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tcpLogger := logger.With("socket", "tcp")
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
server.RegisterRPCFuncs(mux, Routes, log.TestingLogger())
|
server.RegisterRPCFuncs(mux, Routes, tcpLogger)
|
||||||
wm := server.NewWebsocketManager(Routes, nil)
|
wm := server.NewWebsocketManager(Routes, nil, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second))
|
||||||
wm.SetLogger(log.TestingLogger())
|
wm.SetLogger(tcpLogger)
|
||||||
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
|
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
|
||||||
go func() {
|
go func() {
|
||||||
_, err := server.StartHTTPServer(tcpAddr, mux, log.TestingLogger())
|
_, err := server.StartHTTPServer(tcpAddr, mux, tcpLogger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
unixLogger := logger.With("socket", "unix")
|
||||||
mux2 := http.NewServeMux()
|
mux2 := http.NewServeMux()
|
||||||
server.RegisterRPCFuncs(mux2, Routes, log.TestingLogger())
|
server.RegisterRPCFuncs(mux2, Routes, unixLogger)
|
||||||
wm = server.NewWebsocketManager(Routes, nil)
|
wm = server.NewWebsocketManager(Routes, nil)
|
||||||
wm.SetLogger(log.TestingLogger())
|
wm.SetLogger(unixLogger)
|
||||||
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
|
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
|
||||||
go func() {
|
go func() {
|
||||||
_, err := server.StartHTTPServer(unixAddr, mux2, log.TestingLogger())
|
_, err := server.StartHTTPServer(unixAddr, mux2, unixLogger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
@ -184,7 +210,7 @@ func echoViaWS(cl *client.WSClient, val string) (string, error) {
|
|||||||
params := map[string]interface{}{
|
params := map[string]interface{}{
|
||||||
"arg": val,
|
"arg": val,
|
||||||
}
|
}
|
||||||
err := cl.Call("echo", params)
|
err := cl.Call(context.Background(), "echo", params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
@ -206,7 +232,7 @@ func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) {
|
|||||||
params := map[string]interface{}{
|
params := map[string]interface{}{
|
||||||
"arg": bytes,
|
"arg": bytes,
|
||||||
}
|
}
|
||||||
err := cl.Call("echo_bytes", params)
|
err := cl.Call(context.Background(), "echo_bytes", params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []byte{}, err
|
return []byte{}, err
|
||||||
}
|
}
|
||||||
@ -250,6 +276,7 @@ func TestServersAndClientsBasic(t *testing.T) {
|
|||||||
testWithHTTPClient(t, cl2)
|
testWithHTTPClient(t, cl2)
|
||||||
|
|
||||||
cl3 := client.NewWSClient(addr, websocketEndpoint)
|
cl3 := client.NewWSClient(addr, websocketEndpoint)
|
||||||
|
cl3.SetLogger(log.TestingLogger())
|
||||||
_, err := cl3.Start()
|
_, err := cl3.Start()
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
fmt.Printf("=== testing server on %s using %v client", addr, cl3)
|
fmt.Printf("=== testing server on %s using %v client", addr, cl3)
|
||||||
@ -278,6 +305,7 @@ func TestQuotedStringArg(t *testing.T) {
|
|||||||
|
|
||||||
func TestWSNewWSRPCFunc(t *testing.T) {
|
func TestWSNewWSRPCFunc(t *testing.T) {
|
||||||
cl := client.NewWSClient(tcpAddr, websocketEndpoint)
|
cl := client.NewWSClient(tcpAddr, websocketEndpoint)
|
||||||
|
cl.SetLogger(log.TestingLogger())
|
||||||
_, err := cl.Start()
|
_, err := cl.Start()
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
defer cl.Stop()
|
defer cl.Stop()
|
||||||
@ -286,7 +314,7 @@ func TestWSNewWSRPCFunc(t *testing.T) {
|
|||||||
params := map[string]interface{}{
|
params := map[string]interface{}{
|
||||||
"arg": val,
|
"arg": val,
|
||||||
}
|
}
|
||||||
err = cl.Call("echo_ws", params)
|
err = cl.Call(context.Background(), "echo_ws", params)
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -303,15 +331,14 @@ func TestWSNewWSRPCFunc(t *testing.T) {
|
|||||||
|
|
||||||
func TestWSHandlesArrayParams(t *testing.T) {
|
func TestWSHandlesArrayParams(t *testing.T) {
|
||||||
cl := client.NewWSClient(tcpAddr, websocketEndpoint)
|
cl := client.NewWSClient(tcpAddr, websocketEndpoint)
|
||||||
|
cl.SetLogger(log.TestingLogger())
|
||||||
_, err := cl.Start()
|
_, err := cl.Start()
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
defer cl.Stop()
|
defer cl.Stop()
|
||||||
|
|
||||||
val := "acbd"
|
val := "acbd"
|
||||||
params := []interface{}{val}
|
params := []interface{}{val}
|
||||||
request, err := types.ArrayToRequest("", "echo_ws", params)
|
err = cl.CallWithArrayParams(context.Background(), "echo_ws", params)
|
||||||
require.Nil(t, err)
|
|
||||||
err = cl.WriteJSON(request)
|
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -326,6 +353,18 @@ func TestWSHandlesArrayParams(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestWSClientPingPong checks that a client & server exchange pings
|
||||||
|
// & pongs so connection stays alive.
|
||||||
|
func TestWSClientPingPong(t *testing.T) {
|
||||||
|
cl := client.NewWSClient(tcpAddr, websocketEndpoint)
|
||||||
|
cl.SetLogger(log.TestingLogger())
|
||||||
|
_, err := cl.Start()
|
||||||
|
require.Nil(t, err)
|
||||||
|
defer cl.Stop()
|
||||||
|
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
func randBytes(t *testing.T) []byte {
|
func randBytes(t *testing.T) []byte {
|
||||||
n := rand.Intn(10) + 2
|
n := rand.Intn(10) + 2
|
||||||
buf := make([]byte, n)
|
buf := make([]byte, n)
|
||||||
|
@ -21,7 +21,7 @@ import (
|
|||||||
"github.com/tendermint/tmlibs/log"
|
"github.com/tendermint/tmlibs/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
|
// RegisterRPCFuncs adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
|
||||||
// "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
|
// "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
|
||||||
func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger log.Logger) {
|
func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger log.Logger) {
|
||||||
// HTTP endpoints
|
// HTTP endpoints
|
||||||
@ -36,7 +36,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger lo
|
|||||||
//-------------------------------------
|
//-------------------------------------
|
||||||
// function introspection
|
// function introspection
|
||||||
|
|
||||||
// holds all type information for each function
|
// RPCFunc contains the introspected type information for a function
|
||||||
type RPCFunc struct {
|
type RPCFunc struct {
|
||||||
f reflect.Value // underlying rpc function
|
f reflect.Value // underlying rpc function
|
||||||
args []reflect.Type // type of each function arg
|
args []reflect.Type // type of each function arg
|
||||||
@ -45,12 +45,13 @@ type RPCFunc struct {
|
|||||||
ws bool // websocket only
|
ws bool // websocket only
|
||||||
}
|
}
|
||||||
|
|
||||||
// wraps a function for quicker introspection
|
// NewRPCFunc wraps a function for introspection.
|
||||||
// f is the function, args are comma separated argument names
|
// f is the function, args are comma separated argument names
|
||||||
func NewRPCFunc(f interface{}, args string) *RPCFunc {
|
func NewRPCFunc(f interface{}, args string) *RPCFunc {
|
||||||
return newRPCFunc(f, args, false)
|
return newRPCFunc(f, args, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewWSRPCFunc wraps a function for introspection and use in the websockets.
|
||||||
func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
|
func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
|
||||||
return newRPCFunc(f, args, true)
|
return newRPCFunc(f, args, true)
|
||||||
}
|
}
|
||||||
@ -337,10 +338,10 @@ func nonJsonToArg(ty reflect.Type, arg string) (reflect.Value, error, bool) {
|
|||||||
// rpc.websocket
|
// rpc.websocket
|
||||||
|
|
||||||
const (
|
const (
|
||||||
writeChanCapacity = 1000
|
defaultWSWriteChanCapacity = 1000
|
||||||
wsWriteTimeoutSeconds = 30 // each write times out after this.
|
defaultWSWriteWait = 10 * time.Second
|
||||||
wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings.
|
defaultWSReadWait = 30 * time.Second
|
||||||
wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds.
|
defaultWSPingPeriod = (defaultWSReadWait * 9) / 10
|
||||||
)
|
)
|
||||||
|
|
||||||
// a single websocket connection
|
// a single websocket connection
|
||||||
@ -349,95 +350,116 @@ const (
|
|||||||
type wsConnection struct {
|
type wsConnection struct {
|
||||||
cmn.BaseService
|
cmn.BaseService
|
||||||
|
|
||||||
remoteAddr string
|
remoteAddr string
|
||||||
baseConn *websocket.Conn
|
baseConn *websocket.Conn
|
||||||
writeChan chan types.RPCResponse
|
writeChan chan types.RPCResponse
|
||||||
readTimeout *time.Timer
|
|
||||||
pingTicker *time.Ticker
|
|
||||||
|
|
||||||
funcMap map[string]*RPCFunc
|
funcMap map[string]*RPCFunc
|
||||||
evsw events.EventSwitch
|
evsw events.EventSwitch
|
||||||
|
|
||||||
|
// write channel capacity
|
||||||
|
writeChanCapacity int
|
||||||
|
|
||||||
|
// each write times out after this.
|
||||||
|
writeWait time.Duration
|
||||||
|
|
||||||
|
// Connection times out if we haven't received *anything* in this long, not even pings.
|
||||||
|
readWait time.Duration
|
||||||
|
|
||||||
|
// Send pings to server with this period. Must be less than readWait, but greater than zero.
|
||||||
|
pingPeriod time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// new websocket connection wrapper
|
// NewWSConnection wraps websocket.Conn. See the commentary on the
|
||||||
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch) *wsConnection {
|
// func(*wsConnection) functions for a detailed description of how to configure
|
||||||
|
// ping period and pong wait time.
|
||||||
|
// NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect.
|
||||||
|
// see https://github.com/gorilla/websocket/issues/97
|
||||||
|
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection {
|
||||||
wsc := &wsConnection{
|
wsc := &wsConnection{
|
||||||
remoteAddr: baseConn.RemoteAddr().String(),
|
remoteAddr: baseConn.RemoteAddr().String(),
|
||||||
baseConn: baseConn,
|
baseConn: baseConn,
|
||||||
writeChan: make(chan types.RPCResponse, writeChanCapacity), // error when full.
|
funcMap: funcMap,
|
||||||
funcMap: funcMap,
|
evsw: evsw,
|
||||||
evsw: evsw,
|
writeWait: defaultWSWriteWait,
|
||||||
|
writeChanCapacity: defaultWSWriteChanCapacity,
|
||||||
|
readWait: defaultWSReadWait,
|
||||||
|
pingPeriod: defaultWSPingPeriod,
|
||||||
|
}
|
||||||
|
for _, option := range options {
|
||||||
|
option(wsc)
|
||||||
}
|
}
|
||||||
wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc)
|
wsc.BaseService = *cmn.NewBaseService(nil, "wsConnection", wsc)
|
||||||
return wsc
|
return wsc
|
||||||
}
|
}
|
||||||
|
|
||||||
// wsc.Start() blocks until the connection closes.
|
// WriteWait sets the amount of time to wait before a websocket write times out.
|
||||||
func (wsc *wsConnection) OnStart() error {
|
// It should only be used in the constructor - not Goroutine-safe.
|
||||||
wsc.BaseService.OnStart()
|
func WriteWait(writeWait time.Duration) func(*wsConnection) {
|
||||||
|
return func(wsc *wsConnection) {
|
||||||
|
wsc.writeWait = writeWait
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// these must be set before the readRoutine is created, as it may
|
// WriteChanCapacity sets the capacity of the websocket write channel.
|
||||||
// call wsc.Stop(), which accesses these timers
|
// It should only be used in the constructor - not Goroutine-safe.
|
||||||
wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds)
|
func WriteChanCapacity(cap int) func(*wsConnection) {
|
||||||
wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds)
|
return func(wsc *wsConnection) {
|
||||||
|
wsc.writeChanCapacity = cap
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReadWait sets the amount of time to wait before a websocket read times out.
|
||||||
|
// It should only be used in the constructor - not Goroutine-safe.
|
||||||
|
func ReadWait(readWait time.Duration) func(*wsConnection) {
|
||||||
|
return func(wsc *wsConnection) {
|
||||||
|
wsc.readWait = readWait
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PingPeriod sets the duration for sending websocket pings.
|
||||||
|
// It should only be used in the constructor - not Goroutine-safe.
|
||||||
|
func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
|
||||||
|
return func(wsc *wsConnection) {
|
||||||
|
wsc.pingPeriod = pingPeriod
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnStart starts the read and write routines. It blocks until the connection closes.
|
||||||
|
func (wsc *wsConnection) OnStart() error {
|
||||||
|
wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity)
|
||||||
|
|
||||||
// Read subscriptions/unsubscriptions to events
|
// Read subscriptions/unsubscriptions to events
|
||||||
go wsc.readRoutine()
|
go wsc.readRoutine()
|
||||||
|
|
||||||
// Custom Ping handler to touch readTimeout
|
|
||||||
wsc.baseConn.SetPingHandler(func(m string) error {
|
|
||||||
// NOTE: https://github.com/gorilla/websocket/issues/97
|
|
||||||
go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds))
|
|
||||||
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
wsc.baseConn.SetPongHandler(func(m string) error {
|
|
||||||
// NOTE: https://github.com/gorilla/websocket/issues/97
|
|
||||||
wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
go wsc.readTimeoutRoutine()
|
|
||||||
|
|
||||||
// Write responses, BLOCKING.
|
// Write responses, BLOCKING.
|
||||||
wsc.writeRoutine()
|
wsc.writeRoutine()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OnStop unsubscribes from all events.
|
||||||
func (wsc *wsConnection) OnStop() {
|
func (wsc *wsConnection) OnStop() {
|
||||||
wsc.BaseService.OnStop()
|
|
||||||
if wsc.evsw != nil {
|
if wsc.evsw != nil {
|
||||||
wsc.evsw.RemoveListener(wsc.remoteAddr)
|
wsc.evsw.RemoveListener(wsc.remoteAddr)
|
||||||
}
|
}
|
||||||
wsc.readTimeout.Stop()
|
// Both read and write loops close the websocket connection when they exit their loops.
|
||||||
wsc.pingTicker.Stop()
|
// The writeChan is never closed, to allow WriteRPCResponse() to fail.
|
||||||
// The write loop closes the websocket connection
|
|
||||||
// when it exits its loop, and the read loop
|
|
||||||
// closes the writeChan
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wsc *wsConnection) readTimeoutRoutine() {
|
// GetRemoteAddr returns the remote address of the underlying connection.
|
||||||
select {
|
// It implements WSRPCConnection
|
||||||
case <-wsc.readTimeout.C:
|
|
||||||
wsc.Logger.Info("Stopping connection due to read timeout")
|
|
||||||
wsc.Stop()
|
|
||||||
case <-wsc.Quit:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implements WSRPCConnection
|
|
||||||
func (wsc *wsConnection) GetRemoteAddr() string {
|
func (wsc *wsConnection) GetRemoteAddr() string {
|
||||||
return wsc.remoteAddr
|
return wsc.remoteAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements WSRPCConnection
|
// GetEventSwitch returns the event switch.
|
||||||
|
// It implements WSRPCConnection
|
||||||
func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
|
func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
|
||||||
return wsc.evsw
|
return wsc.evsw
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements WSRPCConnection
|
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
|
||||||
// Blocking write to writeChan until service stops.
|
// It implements WSRPCConnection. It is Goroutine-safe.
|
||||||
// Goroutine-safe
|
|
||||||
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
|
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
|
||||||
select {
|
select {
|
||||||
case <-wsc.Quit:
|
case <-wsc.Quit:
|
||||||
@ -446,9 +468,8 @@ func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements WSRPCConnection
|
// TryWriteRPCResponse attempts to push a response to the writeChan, but does not block.
|
||||||
// Nonblocking write.
|
// It implements WSRPCConnection. It is Goroutine-safe
|
||||||
// Goroutine-safe
|
|
||||||
func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
|
func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
|
||||||
select {
|
select {
|
||||||
case <-wsc.Quit:
|
case <-wsc.Quit:
|
||||||
@ -462,27 +483,29 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
|
|||||||
|
|
||||||
// Read from the socket and subscribe to or unsubscribe from events
|
// Read from the socket and subscribe to or unsubscribe from events
|
||||||
func (wsc *wsConnection) readRoutine() {
|
func (wsc *wsConnection) readRoutine() {
|
||||||
// Do not close writeChan, to allow WriteRPCResponse() to fail.
|
defer func() {
|
||||||
// defer close(wsc.writeChan)
|
wsc.baseConn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-wsc.Quit:
|
case <-wsc.Quit:
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
|
// reset deadline for every type of message (control or data)
|
||||||
|
wsc.baseConn.SetReadDeadline(time.Now().Add(wsc.readWait))
|
||||||
var in []byte
|
var in []byte
|
||||||
// Do not set a deadline here like below:
|
|
||||||
// wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds))
|
|
||||||
// The client may not send anything for a while.
|
|
||||||
// We use `readTimeout` to handle read timeouts.
|
|
||||||
_, in, err := wsc.baseConn.ReadMessage()
|
_, in, err := wsc.baseConn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wsc.Logger.Info("Failed to read from connection", "remote", wsc.remoteAddr, "err", err.Error())
|
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
||||||
// an error reading the connection,
|
wsc.Logger.Info("Client closed the connection")
|
||||||
// kill the connection
|
} else {
|
||||||
|
wsc.Logger.Error("Failed to read request", "err", err)
|
||||||
|
}
|
||||||
wsc.Stop()
|
wsc.Stop()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var request types.RPCRequest
|
var request types.RPCRequest
|
||||||
err = json.Unmarshal(in, &request)
|
err = json.Unmarshal(in, &request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -529,15 +552,33 @@ func (wsc *wsConnection) readRoutine() {
|
|||||||
|
|
||||||
// receives on a write channel and writes out on the socket
|
// receives on a write channel and writes out on the socket
|
||||||
func (wsc *wsConnection) writeRoutine() {
|
func (wsc *wsConnection) writeRoutine() {
|
||||||
defer wsc.baseConn.Close()
|
pingTicker := time.NewTicker(wsc.pingPeriod)
|
||||||
|
defer func() {
|
||||||
|
pingTicker.Stop()
|
||||||
|
wsc.baseConn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// https://github.com/gorilla/websocket/issues/97
|
||||||
|
pongs := make(chan string, 1)
|
||||||
|
wsc.baseConn.SetPingHandler(func(m string) error {
|
||||||
|
select {
|
||||||
|
case pongs <- m:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-wsc.Quit:
|
case m := <-pongs:
|
||||||
return
|
err := wsc.writeMessageWithDeadline(websocket.PongMessage, []byte(m))
|
||||||
case <-wsc.pingTicker.C:
|
if err != nil {
|
||||||
|
wsc.Logger.Info("Failed to write pong (client may disconnect)", "err", err)
|
||||||
|
}
|
||||||
|
case <-pingTicker.C:
|
||||||
err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{})
|
err := wsc.writeMessageWithDeadline(websocket.PingMessage, []byte{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wsc.Logger.Error("Failed to write ping message on websocket", "err", err)
|
wsc.Logger.Error("Failed to write ping", "err", err)
|
||||||
wsc.Stop()
|
wsc.Stop()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -547,11 +588,13 @@ func (wsc *wsConnection) writeRoutine() {
|
|||||||
wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err)
|
wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err)
|
||||||
} else {
|
} else {
|
||||||
if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil {
|
if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil {
|
||||||
wsc.Logger.Error("Failed to write response on websocket", "err", err)
|
wsc.Logger.Error("Failed to write response", "err", err)
|
||||||
wsc.Stop()
|
wsc.Stop()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case <-wsc.Quit:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -559,43 +602,46 @@ func (wsc *wsConnection) writeRoutine() {
|
|||||||
// All writes to the websocket must (re)set the write deadline.
|
// All writes to the websocket must (re)set the write deadline.
|
||||||
// If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553)
|
// If some writes don't set it while others do, they may timeout incorrectly (https://github.com/tendermint/tendermint/issues/553)
|
||||||
func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error {
|
func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error {
|
||||||
wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds))
|
wsc.baseConn.SetWriteDeadline(time.Now().Add(wsc.writeWait))
|
||||||
return wsc.baseConn.WriteMessage(msgType, msg)
|
return wsc.baseConn.WriteMessage(msgType, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
|
||||||
// Main manager for all websocket connections
|
// WebsocketManager is the main manager for all websocket connections.
|
||||||
// Holds the event switch
|
// It holds the event switch and a map of functions for routing.
|
||||||
// NOTE: The websocket path is defined externally, e.g. in node/node.go
|
// NOTE: The websocket path is defined externally, e.g. in node/node.go
|
||||||
type WebsocketManager struct {
|
type WebsocketManager struct {
|
||||||
websocket.Upgrader
|
websocket.Upgrader
|
||||||
funcMap map[string]*RPCFunc
|
funcMap map[string]*RPCFunc
|
||||||
evsw events.EventSwitch
|
evsw events.EventSwitch
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
|
wsConnOptions []func(*wsConnection)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch) *WebsocketManager {
|
// NewWebsocketManager returns a new WebsocketManager that routes according to the given funcMap, listens on the given event switch,
|
||||||
|
// and connects to the server with the given connection options.
|
||||||
|
func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
|
||||||
return &WebsocketManager{
|
return &WebsocketManager{
|
||||||
funcMap: funcMap,
|
funcMap: funcMap,
|
||||||
evsw: evsw,
|
evsw: evsw,
|
||||||
Upgrader: websocket.Upgrader{
|
Upgrader: websocket.Upgrader{
|
||||||
ReadBufferSize: 1024,
|
|
||||||
WriteBufferSize: 1024,
|
|
||||||
CheckOrigin: func(r *http.Request) bool {
|
CheckOrigin: func(r *http.Request) bool {
|
||||||
// TODO
|
// TODO ???
|
||||||
return true
|
return true
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
logger: log.NewNopLogger(),
|
logger: log.NewNopLogger(),
|
||||||
|
wsConnOptions: wsConnOptions,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetLogger sets the logger.
|
||||||
func (wm *WebsocketManager) SetLogger(l log.Logger) {
|
func (wm *WebsocketManager) SetLogger(l log.Logger) {
|
||||||
wm.logger = l
|
wm.logger = l
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upgrade the request/response (via http.Hijack) and starts the wsConnection.
|
// WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection.
|
||||||
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
|
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
wsConn, err := wm.Upgrade(w, r, nil)
|
wsConn, err := wm.Upgrade(w, r, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -605,8 +651,8 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
|
|||||||
}
|
}
|
||||||
|
|
||||||
// register connection
|
// register connection
|
||||||
con := NewWSConnection(wsConn, wm.funcMap, wm.evsw)
|
con := NewWSConnection(wsConn, wm.funcMap, wm.evsw, wm.wsConnOptions...)
|
||||||
con.SetLogger(wm.logger)
|
con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr()))
|
||||||
wm.logger.Info("New websocket connection", "remote", con.remoteAddr)
|
wm.logger.Info("New websocket connection", "remote", con.remoteAddr)
|
||||||
con.Start() // Blocking
|
con.Start() // Blocking
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package rpctypes
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
events "github.com/tendermint/tmlibs/events"
|
events "github.com/tendermint/tmlibs/events"
|
||||||
@ -23,6 +24,10 @@ func NewRPCRequest(id string, method string, params json.RawMessage) RPCRequest
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (req RPCRequest) String() string {
|
||||||
|
return fmt.Sprintf("[%s %s]", req.ID, req.Method)
|
||||||
|
}
|
||||||
|
|
||||||
func MapToRequest(id string, method string, params map[string]interface{}) (RPCRequest, error) {
|
func MapToRequest(id string, method string, params map[string]interface{}) (RPCRequest, error) {
|
||||||
payload, err := json.Marshal(params)
|
payload, err := json.Marshal(params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -70,6 +75,14 @@ func NewRPCResponse(id string, res interface{}, err string) RPCResponse {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (resp RPCResponse) String() string {
|
||||||
|
if resp.Error == "" {
|
||||||
|
return fmt.Sprintf("[%s %v]", resp.ID, resp.Result)
|
||||||
|
} else {
|
||||||
|
return fmt.Sprintf("[%s %s]", resp.ID, resp.Error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
|
||||||
// *wsConnection implements this interface.
|
// *wsConnection implements this interface.
|
||||||
|
Reference in New Issue
Block a user