mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-24 18:21:38 +00:00
Revert "delete everything" (includes everything non-go-crypto)
This reverts commit 96a3502
This commit is contained in:
43
rpc/lib/client/args_test.go
Normal file
43
rpc/lib/client/args_test.go
Normal file
@ -0,0 +1,43 @@
|
||||
package rpcclient
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/go-amino"
|
||||
)
|
||||
|
||||
type Tx []byte
|
||||
|
||||
type Foo struct {
|
||||
Bar int
|
||||
Baz string
|
||||
}
|
||||
|
||||
func TestArgToJSON(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
require := require.New(t)
|
||||
|
||||
cases := []struct {
|
||||
input interface{}
|
||||
expected string
|
||||
}{
|
||||
{[]byte("1234"), "0x31323334"},
|
||||
{Tx("654"), "0x363534"},
|
||||
{Foo{7, "hello"}, `{"Bar":"7","Baz":"hello"}`},
|
||||
}
|
||||
|
||||
cdc := amino.NewCodec()
|
||||
|
||||
for i, tc := range cases {
|
||||
args := map[string]interface{}{"data": tc.input}
|
||||
err := argsToJSON(cdc, args)
|
||||
require.Nil(err, "%d: %+v", i, err)
|
||||
require.Equal(1, len(args), "%d", i)
|
||||
data, ok := args["data"].(string)
|
||||
require.True(ok, "%d: %#v", i, args["data"])
|
||||
assert.Equal(tc.expected, data, "%d", i)
|
||||
}
|
||||
}
|
219
rpc/lib/client/http_client.go
Normal file
219
rpc/lib/client/http_client.go
Normal file
@ -0,0 +1,219 @@
|
||||
package rpcclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tendermint/go-amino"
|
||||
|
||||
types "github.com/tendermint/tendermint/rpc/lib/types"
|
||||
)
|
||||
|
||||
// HTTPClient is a common interface for JSONRPCClient and URIClient.
|
||||
type HTTPClient interface {
|
||||
Call(method string, params map[string]interface{}, result interface{}) (interface{}, error)
|
||||
Codec() *amino.Codec
|
||||
SetCodec(*amino.Codec)
|
||||
}
|
||||
|
||||
// TODO: Deprecate support for IP:PORT or /path/to/socket
|
||||
func makeHTTPDialer(remoteAddr string) (string, func(string, string) (net.Conn, error)) {
|
||||
parts := strings.SplitN(remoteAddr, "://", 2)
|
||||
var protocol, address string
|
||||
if len(parts) == 1 {
|
||||
// default to tcp if nothing specified
|
||||
protocol, address = "tcp", remoteAddr
|
||||
} else if len(parts) == 2 {
|
||||
protocol, address = parts[0], parts[1]
|
||||
} else {
|
||||
// return a invalid message
|
||||
msg := fmt.Sprintf("Invalid addr: %s", remoteAddr)
|
||||
return msg, func(_ string, _ string) (net.Conn, error) {
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
}
|
||||
// accept http as an alias for tcp
|
||||
if protocol == "http" {
|
||||
protocol = "tcp"
|
||||
}
|
||||
|
||||
// replace / with . for http requests (kvstore domain)
|
||||
trimmedAddress := strings.Replace(address, "/", ".", -1)
|
||||
return trimmedAddress, func(proto, addr string) (net.Conn, error) {
|
||||
return net.Dial(protocol, address)
|
||||
}
|
||||
}
|
||||
|
||||
// We overwrite the http.Client.Dial so we can do http over tcp or unix.
|
||||
// remoteAddr should be fully featured (eg. with tcp:// or unix://)
|
||||
func makeHTTPClient(remoteAddr string) (string, *http.Client) {
|
||||
address, dialer := makeHTTPDialer(remoteAddr)
|
||||
return "http://" + address, &http.Client{
|
||||
Transport: &http.Transport{
|
||||
Dial: dialer,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------------
|
||||
|
||||
// JSONRPCClient takes params as a slice
|
||||
type JSONRPCClient struct {
|
||||
address string
|
||||
client *http.Client
|
||||
cdc *amino.Codec
|
||||
}
|
||||
|
||||
// NewJSONRPCClient returns a JSONRPCClient pointed at the given address.
|
||||
func NewJSONRPCClient(remote string) *JSONRPCClient {
|
||||
address, client := makeHTTPClient(remote)
|
||||
return &JSONRPCClient{
|
||||
address: address,
|
||||
client: client,
|
||||
cdc: amino.NewCodec(),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *JSONRPCClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
|
||||
request, err := types.MapToRequest(c.cdc, "jsonrpc-client", method, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
requestBytes, err := json.Marshal(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// log.Info(string(requestBytes))
|
||||
requestBuf := bytes.NewBuffer(requestBytes)
|
||||
// log.Info(Fmt("RPC request to %v (%v): %v", c.remote, method, string(requestBytes)))
|
||||
httpResponse, err := c.client.Post(c.address, "text/json", requestBuf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer httpResponse.Body.Close() // nolint: errcheck
|
||||
|
||||
responseBytes, err := ioutil.ReadAll(httpResponse.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// log.Info(Fmt("RPC response: %v", string(responseBytes)))
|
||||
return unmarshalResponseBytes(c.cdc, responseBytes, result)
|
||||
}
|
||||
|
||||
func (c *JSONRPCClient) Codec() *amino.Codec {
|
||||
return c.cdc
|
||||
}
|
||||
|
||||
func (c *JSONRPCClient) SetCodec(cdc *amino.Codec) {
|
||||
c.cdc = cdc
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------
|
||||
|
||||
// URI takes params as a map
|
||||
type URIClient struct {
|
||||
address string
|
||||
client *http.Client
|
||||
cdc *amino.Codec
|
||||
}
|
||||
|
||||
func NewURIClient(remote string) *URIClient {
|
||||
address, client := makeHTTPClient(remote)
|
||||
return &URIClient{
|
||||
address: address,
|
||||
client: client,
|
||||
cdc: amino.NewCodec(),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *URIClient) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
|
||||
values, err := argsToURLValues(c.cdc, params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// log.Info(Fmt("URI request to %v (%v): %v", c.address, method, values))
|
||||
resp, err := c.client.PostForm(c.address+"/"+method, values)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close() // nolint: errcheck
|
||||
|
||||
responseBytes, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return unmarshalResponseBytes(c.cdc, responseBytes, result)
|
||||
}
|
||||
|
||||
func (c *URIClient) Codec() *amino.Codec {
|
||||
return c.cdc
|
||||
}
|
||||
|
||||
func (c *URIClient) SetCodec(cdc *amino.Codec) {
|
||||
c.cdc = cdc
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
|
||||
func unmarshalResponseBytes(cdc *amino.Codec, responseBytes []byte, result interface{}) (interface{}, error) {
|
||||
// Read response. If rpc/core/types is imported, the result will unmarshal
|
||||
// into the correct type.
|
||||
// log.Notice("response", "response", string(responseBytes))
|
||||
var err error
|
||||
response := &types.RPCResponse{}
|
||||
err = json.Unmarshal(responseBytes, response)
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("Error unmarshalling rpc response: %v", err)
|
||||
}
|
||||
if response.Error != nil {
|
||||
return nil, errors.Errorf("Response error: %v", response.Error)
|
||||
}
|
||||
// Unmarshal the RawMessage into the result.
|
||||
err = cdc.UnmarshalJSON(response.Result, result)
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("Error unmarshalling rpc response result: %v", err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func argsToURLValues(cdc *amino.Codec, args map[string]interface{}) (url.Values, error) {
|
||||
values := make(url.Values)
|
||||
if len(args) == 0 {
|
||||
return values, nil
|
||||
}
|
||||
err := argsToJSON(cdc, args)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for key, val := range args {
|
||||
values.Set(key, val.(string))
|
||||
}
|
||||
return values, nil
|
||||
}
|
||||
|
||||
func argsToJSON(cdc *amino.Codec, args map[string]interface{}) error {
|
||||
for k, v := range args {
|
||||
rt := reflect.TypeOf(v)
|
||||
isByteSlice := rt.Kind() == reflect.Slice && rt.Elem().Kind() == reflect.Uint8
|
||||
if isByteSlice {
|
||||
bytes := reflect.ValueOf(v).Bytes()
|
||||
args[k] = fmt.Sprintf("0x%X", bytes)
|
||||
continue
|
||||
}
|
||||
|
||||
data, err := cdc.MarshalJSON(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
args[k] = string(data)
|
||||
}
|
||||
return nil
|
||||
}
|
66
rpc/lib/client/integration_test.go
Normal file
66
rpc/lib/client/integration_test.go
Normal file
@ -0,0 +1,66 @@
|
||||
// +build release
|
||||
|
||||
// The code in here is comprehensive as an integration
|
||||
// test and is long, hence is only run before releases.
|
||||
|
||||
package rpcclient
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"net"
|
||||
"regexp"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
)
|
||||
|
||||
func TestWSClientReconnectWithJitter(t *testing.T) {
|
||||
n := 8
|
||||
maxReconnectAttempts := 3
|
||||
// Max wait time is ceil(1+0.999) + ceil(2+0.999) + ceil(4+0.999) + ceil(...) = 2 + 3 + 5 = 10s + ...
|
||||
maxSleepTime := time.Second * time.Duration(((1<<uint(maxReconnectAttempts))-1)+maxReconnectAttempts)
|
||||
|
||||
var errNotConnected = errors.New("not connected")
|
||||
clientMap := make(map[int]*WSClient)
|
||||
buf := new(bytes.Buffer)
|
||||
logger := log.NewTMLogger(buf)
|
||||
for i := 0; i < n; i++ {
|
||||
c := NewWSClient("tcp://foo", "/websocket")
|
||||
c.Dialer = func(string, string) (net.Conn, error) {
|
||||
return nil, errNotConnected
|
||||
}
|
||||
c.SetLogger(logger)
|
||||
c.maxReconnectAttempts = maxReconnectAttempts
|
||||
// Not invoking defer c.Stop() because
|
||||
// after all the reconnect attempts have been
|
||||
// exhausted, c.Stop is implicitly invoked.
|
||||
clientMap[i] = c
|
||||
// Trigger the reconnect routine that performs exponential backoff.
|
||||
go c.reconnect()
|
||||
}
|
||||
|
||||
stopCount := 0
|
||||
time.Sleep(maxSleepTime)
|
||||
for key, c := range clientMap {
|
||||
if !c.IsActive() {
|
||||
delete(clientMap, key)
|
||||
stopCount += 1
|
||||
}
|
||||
}
|
||||
require.Equal(t, stopCount, n, "expecting all clients to have been stopped")
|
||||
|
||||
// Next we have to examine the logs to ensure that no single time was repeated
|
||||
backoffDurRegexp := regexp.MustCompile(`backoff_duration=(.+)`)
|
||||
matches := backoffDurRegexp.FindAll(buf.Bytes(), -1)
|
||||
seenMap := make(map[string]int)
|
||||
for i, match := range matches {
|
||||
if origIndex, seen := seenMap[string(match)]; seen {
|
||||
t.Errorf("Match #%d (%q) was seen originally at log entry #%d", i, match, origIndex)
|
||||
} else {
|
||||
seenMap[string(match)] = i
|
||||
}
|
||||
}
|
||||
}
|
494
rpc/lib/client/ws_client.go
Normal file
494
rpc/lib/client/ws_client.go
Normal file
@ -0,0 +1,494 @@
|
||||
package rpcclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/pkg/errors"
|
||||
metrics "github.com/rcrowley/go-metrics"
|
||||
|
||||
"github.com/tendermint/go-amino"
|
||||
types "github.com/tendermint/tendermint/rpc/lib/types"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMaxReconnectAttempts = 25
|
||||
defaultWriteWait = 0
|
||||
defaultReadWait = 0
|
||||
defaultPingPeriod = 0
|
||||
)
|
||||
|
||||
// WSClient is a WebSocket client. The methods of WSClient are safe for use by
|
||||
// multiple goroutines.
|
||||
type WSClient struct {
|
||||
cmn.BaseService
|
||||
|
||||
conn *websocket.Conn
|
||||
cdc *amino.Codec
|
||||
|
||||
Address string // IP:PORT or /path/to/socket
|
||||
Endpoint string // /websocket/url/endpoint
|
||||
Dialer func(string, string) (net.Conn, error)
|
||||
|
||||
// Time between sending a ping and receiving a pong. See
|
||||
// https://godoc.org/github.com/rcrowley/go-metrics#Timer.
|
||||
PingPongLatencyTimer metrics.Timer
|
||||
|
||||
// Single user facing channel to read RPCResponses from, closed only when the client is being stopped.
|
||||
ResponsesCh chan types.RPCResponse
|
||||
|
||||
// Callback, which will be called each time after successful reconnect.
|
||||
onReconnect func()
|
||||
|
||||
// internal channels
|
||||
send chan types.RPCRequest // user requests
|
||||
backlog chan types.RPCRequest // stores a single user request received during a conn failure
|
||||
reconnectAfter chan error // reconnect requests
|
||||
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
mtx sync.RWMutex
|
||||
sentLastPingAt time.Time
|
||||
reconnecting bool
|
||||
|
||||
// Maximum reconnect attempts (0 or greater; default: 25).
|
||||
maxReconnectAttempts int
|
||||
|
||||
// Time allowed to write a message to the server. 0 means block until operation succeeds.
|
||||
writeWait time.Duration
|
||||
|
||||
// Time allowed to read the next message from the server. 0 means block until operation succeeds.
|
||||
readWait time.Duration
|
||||
|
||||
// Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
|
||||
pingPeriod time.Duration
|
||||
}
|
||||
|
||||
// NewWSClient returns a new client. See the commentary on the func(*WSClient)
|
||||
// functions for a detailed description of how to configure ping period and
|
||||
// pong wait time. The endpoint argument must begin with a `/`.
|
||||
func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient {
|
||||
addr, dialer := makeHTTPDialer(remoteAddr)
|
||||
c := &WSClient{
|
||||
cdc: amino.NewCodec(),
|
||||
Address: addr,
|
||||
Dialer: dialer,
|
||||
Endpoint: endpoint,
|
||||
PingPongLatencyTimer: metrics.NewTimer(),
|
||||
|
||||
maxReconnectAttempts: defaultMaxReconnectAttempts,
|
||||
readWait: defaultReadWait,
|
||||
writeWait: defaultWriteWait,
|
||||
pingPeriod: defaultPingPeriod,
|
||||
}
|
||||
c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
|
||||
for _, option := range options {
|
||||
option(c)
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
||||
// MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
|
||||
// It should only be used in the constructor and is not Goroutine-safe.
|
||||
func MaxReconnectAttempts(max int) func(*WSClient) {
|
||||
return func(c *WSClient) {
|
||||
c.maxReconnectAttempts = max
|
||||
}
|
||||
}
|
||||
|
||||
// ReadWait sets the amount of time to wait before a websocket read times out.
|
||||
// It should only be used in the constructor and is not Goroutine-safe.
|
||||
func ReadWait(readWait time.Duration) func(*WSClient) {
|
||||
return func(c *WSClient) {
|
||||
c.readWait = readWait
|
||||
}
|
||||
}
|
||||
|
||||
// WriteWait sets the amount of time to wait before a websocket write times out.
|
||||
// It should only be used in the constructor and is not Goroutine-safe.
|
||||
func WriteWait(writeWait time.Duration) func(*WSClient) {
|
||||
return func(c *WSClient) {
|
||||
c.writeWait = writeWait
|
||||
}
|
||||
}
|
||||
|
||||
// PingPeriod sets the duration for sending websocket pings.
|
||||
// It should only be used in the constructor - not Goroutine-safe.
|
||||
func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
|
||||
return func(c *WSClient) {
|
||||
c.pingPeriod = pingPeriod
|
||||
}
|
||||
}
|
||||
|
||||
// OnReconnect sets the callback, which will be called every time after
|
||||
// successful reconnect.
|
||||
func OnReconnect(cb func()) func(*WSClient) {
|
||||
return func(c *WSClient) {
|
||||
c.onReconnect = cb
|
||||
}
|
||||
}
|
||||
|
||||
// String returns WS client full address.
|
||||
func (c *WSClient) String() string {
|
||||
return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint)
|
||||
}
|
||||
|
||||
// OnStart implements cmn.Service by dialing a server and creating read and
|
||||
// write routines.
|
||||
func (c *WSClient) OnStart() error {
|
||||
err := c.dial()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.ResponsesCh = make(chan types.RPCResponse)
|
||||
|
||||
c.send = make(chan types.RPCRequest)
|
||||
// 1 additional error may come from the read/write
|
||||
// goroutine depending on which failed first.
|
||||
c.reconnectAfter = make(chan error, 1)
|
||||
// capacity for 1 request. a user won't be able to send more because the send
|
||||
// channel is unbuffered.
|
||||
c.backlog = make(chan types.RPCRequest, 1)
|
||||
|
||||
c.startReadWriteRoutines()
|
||||
go c.reconnectRoutine()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service.
|
||||
func (c *WSClient) OnStop() {}
|
||||
|
||||
// Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
|
||||
// channel is closed.
|
||||
func (c *WSClient) Stop() error {
|
||||
if err := c.BaseService.Stop(); err != nil {
|
||||
return err
|
||||
}
|
||||
// only close user-facing channels when we can't write to them
|
||||
c.wg.Wait()
|
||||
close(c.ResponsesCh)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsReconnecting returns true if the client is reconnecting right now.
|
||||
func (c *WSClient) IsReconnecting() bool {
|
||||
c.mtx.RLock()
|
||||
defer c.mtx.RUnlock()
|
||||
return c.reconnecting
|
||||
}
|
||||
|
||||
// IsActive returns true if the client is running and not reconnecting.
|
||||
func (c *WSClient) IsActive() bool {
|
||||
return c.IsRunning() && !c.IsReconnecting()
|
||||
}
|
||||
|
||||
// Send the given RPC request to the server. Results will be available on
|
||||
// ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or
|
||||
// ctx.Done is closed.
|
||||
func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
|
||||
select {
|
||||
case c.send <- request:
|
||||
c.Logger.Info("sent a request", "req", request)
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Call the given method. See Send description.
|
||||
func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
|
||||
request, err := types.MapToRequest(c.cdc, "ws-client", method, params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.Send(ctx, request)
|
||||
}
|
||||
|
||||
// CallWithArrayParams the given method with params in a form of array. See
|
||||
// Send description.
|
||||
func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error {
|
||||
request, err := types.ArrayToRequest(c.cdc, "ws-client", method, params)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.Send(ctx, request)
|
||||
}
|
||||
|
||||
func (c *WSClient) Codec() *amino.Codec {
|
||||
return c.cdc
|
||||
}
|
||||
|
||||
func (c *WSClient) SetCodec(cdc *amino.Codec) {
|
||||
c.cdc = cdc
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// Private methods
|
||||
|
||||
func (c *WSClient) dial() error {
|
||||
dialer := &websocket.Dialer{
|
||||
NetDial: c.Dialer,
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
}
|
||||
rHeader := http.Header{}
|
||||
conn, _, err := dialer.Dial("ws://"+c.Address+c.Endpoint, rHeader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.conn = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
// reconnect tries to redial up to maxReconnectAttempts with exponential
|
||||
// backoff.
|
||||
func (c *WSClient) reconnect() error {
|
||||
attempt := 0
|
||||
|
||||
c.mtx.Lock()
|
||||
c.reconnecting = true
|
||||
c.mtx.Unlock()
|
||||
defer func() {
|
||||
c.mtx.Lock()
|
||||
c.reconnecting = false
|
||||
c.mtx.Unlock()
|
||||
}()
|
||||
|
||||
for {
|
||||
jitterSeconds := time.Duration(cmn.RandFloat64() * float64(time.Second)) // 1s == (1e9 ns)
|
||||
backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second)
|
||||
|
||||
c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
|
||||
time.Sleep(backoffDuration)
|
||||
|
||||
err := c.dial()
|
||||
if err != nil {
|
||||
c.Logger.Error("failed to redial", "err", err)
|
||||
} else {
|
||||
c.Logger.Info("reconnected")
|
||||
if c.onReconnect != nil {
|
||||
go c.onReconnect()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
attempt++
|
||||
|
||||
if attempt > c.maxReconnectAttempts {
|
||||
return errors.Wrap(err, "reached maximum reconnect attempts")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *WSClient) startReadWriteRoutines() {
|
||||
c.wg.Add(2)
|
||||
c.readRoutineQuit = make(chan struct{})
|
||||
go c.readRoutine()
|
||||
go c.writeRoutine()
|
||||
}
|
||||
|
||||
func (c *WSClient) processBacklog() error {
|
||||
select {
|
||||
case request := <-c.backlog:
|
||||
if c.writeWait > 0 {
|
||||
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
|
||||
c.Logger.Error("failed to set write deadline", "err", err)
|
||||
}
|
||||
}
|
||||
if err := c.conn.WriteJSON(request); err != nil {
|
||||
c.Logger.Error("failed to resend request", "err", err)
|
||||
c.reconnectAfter <- err
|
||||
// requeue request
|
||||
c.backlog <- request
|
||||
return err
|
||||
}
|
||||
c.Logger.Info("resend a request", "req", request)
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *WSClient) reconnectRoutine() {
|
||||
for {
|
||||
select {
|
||||
case originalError := <-c.reconnectAfter:
|
||||
// wait until writeRoutine and readRoutine finish
|
||||
c.wg.Wait()
|
||||
if err := c.reconnect(); err != nil {
|
||||
c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError)
|
||||
c.Stop()
|
||||
return
|
||||
}
|
||||
// drain reconnectAfter
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-c.reconnectAfter:
|
||||
default:
|
||||
break LOOP
|
||||
}
|
||||
}
|
||||
err := c.processBacklog()
|
||||
if err == nil {
|
||||
c.startReadWriteRoutines()
|
||||
}
|
||||
|
||||
case <-c.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The client ensures that there is at most one writer to a connection by
|
||||
// executing all writes from this goroutine.
|
||||
func (c *WSClient) writeRoutine() {
|
||||
var ticker *time.Ticker
|
||||
if c.pingPeriod > 0 {
|
||||
// ticker with a predefined period
|
||||
ticker = time.NewTicker(c.pingPeriod)
|
||||
} else {
|
||||
// ticker that never fires
|
||||
ticker = &time.Ticker{C: make(<-chan time.Time)}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
ticker.Stop()
|
||||
if err := c.conn.Close(); err != nil {
|
||||
// ignore error; it will trigger in tests
|
||||
// likely because it's closing an already closed connection
|
||||
}
|
||||
c.wg.Done()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case request := <-c.send:
|
||||
if c.writeWait > 0 {
|
||||
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
|
||||
c.Logger.Error("failed to set write deadline", "err", err)
|
||||
}
|
||||
}
|
||||
if err := c.conn.WriteJSON(request); err != nil {
|
||||
c.Logger.Error("failed to send request", "err", err)
|
||||
c.reconnectAfter <- err
|
||||
// add request to the backlog, so we don't lose it
|
||||
c.backlog <- request
|
||||
return
|
||||
}
|
||||
case <-ticker.C:
|
||||
if c.writeWait > 0 {
|
||||
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
|
||||
c.Logger.Error("failed to set write deadline", "err", err)
|
||||
}
|
||||
}
|
||||
if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
|
||||
c.Logger.Error("failed to write ping", "err", err)
|
||||
c.reconnectAfter <- err
|
||||
return
|
||||
}
|
||||
c.mtx.Lock()
|
||||
c.sentLastPingAt = time.Now()
|
||||
c.mtx.Unlock()
|
||||
c.Logger.Debug("sent ping")
|
||||
case <-c.readRoutineQuit:
|
||||
return
|
||||
case <-c.Quit():
|
||||
if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
|
||||
c.Logger.Error("failed to write message", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The client ensures that there is at most one reader to a connection by
|
||||
// executing all reads from this goroutine.
|
||||
func (c *WSClient) readRoutine() {
|
||||
defer func() {
|
||||
if err := c.conn.Close(); err != nil {
|
||||
// ignore error; it will trigger in tests
|
||||
// likely because it's closing an already closed connection
|
||||
}
|
||||
c.wg.Done()
|
||||
}()
|
||||
|
||||
c.conn.SetPongHandler(func(string) error {
|
||||
// gather latency stats
|
||||
c.mtx.RLock()
|
||||
t := c.sentLastPingAt
|
||||
c.mtx.RUnlock()
|
||||
c.PingPongLatencyTimer.UpdateSince(t)
|
||||
|
||||
c.Logger.Debug("got pong")
|
||||
return nil
|
||||
})
|
||||
|
||||
for {
|
||||
// reset deadline for every message type (control or data)
|
||||
if c.readWait > 0 {
|
||||
if err := c.conn.SetReadDeadline(time.Now().Add(c.readWait)); err != nil {
|
||||
c.Logger.Error("failed to set read deadline", "err", err)
|
||||
}
|
||||
}
|
||||
_, data, err := c.conn.ReadMessage()
|
||||
if err != nil {
|
||||
if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
|
||||
return
|
||||
}
|
||||
|
||||
c.Logger.Error("failed to read response", "err", err)
|
||||
close(c.readRoutineQuit)
|
||||
c.reconnectAfter <- err
|
||||
return
|
||||
}
|
||||
|
||||
var response types.RPCResponse
|
||||
err = json.Unmarshal(data, &response)
|
||||
if err != nil {
|
||||
c.Logger.Error("failed to parse response", "err", err, "data", string(data))
|
||||
continue
|
||||
}
|
||||
c.Logger.Info("got response", "resp", response.Result)
|
||||
// Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking
|
||||
// c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
|
||||
// both readRoutine and writeRoutine
|
||||
select {
|
||||
case <-c.Quit():
|
||||
case c.ResponsesCh <- response:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// Predefined methods
|
||||
|
||||
// Subscribe to a query. Note the server must have a "subscribe" route
|
||||
// defined.
|
||||
func (c *WSClient) Subscribe(ctx context.Context, query string) error {
|
||||
params := map[string]interface{}{"query": query}
|
||||
return c.Call(ctx, "subscribe", params)
|
||||
}
|
||||
|
||||
// Unsubscribe from a query. Note the server must have a "unsubscribe" route
|
||||
// defined.
|
||||
func (c *WSClient) Unsubscribe(ctx context.Context, query string) error {
|
||||
params := map[string]interface{}{"query": query}
|
||||
return c.Call(ctx, "unsubscribe", params)
|
||||
}
|
||||
|
||||
// UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route
|
||||
// defined.
|
||||
func (c *WSClient) UnsubscribeAll(ctx context.Context) error {
|
||||
params := map[string]interface{}{}
|
||||
return c.Call(ctx, "unsubscribe_all", params)
|
||||
}
|
224
rpc/lib/client/ws_client_test.go
Normal file
224
rpc/lib/client/ws_client_test.go
Normal file
@ -0,0 +1,224 @@
|
||||
package rpcclient
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
|
||||
types "github.com/tendermint/tendermint/rpc/lib/types"
|
||||
)
|
||||
|
||||
var wsCallTimeout = 5 * time.Second
|
||||
|
||||
type myHandler struct {
|
||||
closeConnAfterRead bool
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
var upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
|
||||
func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer conn.Close() // nolint: errcheck
|
||||
for {
|
||||
messageType, _, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
h.mtx.RLock()
|
||||
if h.closeConnAfterRead {
|
||||
if err := conn.Close(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
h.mtx.RUnlock()
|
||||
|
||||
res := json.RawMessage(`{}`)
|
||||
emptyRespBytes, _ := json.Marshal(types.RPCResponse{Result: res})
|
||||
if err := conn.WriteMessage(messageType, emptyRespBytes); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWSClientReconnectsAfterReadFailure(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// start server
|
||||
h := &myHandler{}
|
||||
s := httptest.NewServer(h)
|
||||
defer s.Close()
|
||||
|
||||
c := startClient(t, s.Listener.Addr())
|
||||
defer c.Stop()
|
||||
|
||||
wg.Add(1)
|
||||
go callWgDoneOnResult(t, c, &wg)
|
||||
|
||||
h.mtx.Lock()
|
||||
h.closeConnAfterRead = true
|
||||
h.mtx.Unlock()
|
||||
|
||||
// results in WS read error, no send retry because write succeeded
|
||||
call(t, "a", c)
|
||||
|
||||
// expect to reconnect almost immediately
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
h.mtx.Lock()
|
||||
h.closeConnAfterRead = false
|
||||
h.mtx.Unlock()
|
||||
|
||||
// should succeed
|
||||
call(t, "b", c)
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestWSClientReconnectsAfterWriteFailure(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// start server
|
||||
h := &myHandler{}
|
||||
s := httptest.NewServer(h)
|
||||
|
||||
c := startClient(t, s.Listener.Addr())
|
||||
defer c.Stop()
|
||||
|
||||
wg.Add(2)
|
||||
go callWgDoneOnResult(t, c, &wg)
|
||||
|
||||
// hacky way to abort the connection before write
|
||||
if err := c.conn.Close(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// results in WS write error, the client should resend on reconnect
|
||||
call(t, "a", c)
|
||||
|
||||
// expect to reconnect almost immediately
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// should succeed
|
||||
call(t, "b", c)
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestWSClientReconnectFailure(t *testing.T) {
|
||||
// start server
|
||||
h := &myHandler{}
|
||||
s := httptest.NewServer(h)
|
||||
|
||||
c := startClient(t, s.Listener.Addr())
|
||||
defer c.Stop()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-c.ResponsesCh:
|
||||
case <-c.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// hacky way to abort the connection before write
|
||||
if err := c.conn.Close(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
s.Close()
|
||||
|
||||
// results in WS write error
|
||||
// provide timeout to avoid blocking
|
||||
ctx, cancel := context.WithTimeout(context.Background(), wsCallTimeout)
|
||||
defer cancel()
|
||||
if err := c.Call(ctx, "a", make(map[string]interface{})); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// expect to reconnect almost immediately
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
// client should block on this
|
||||
call(t, "b", c)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// test that client blocks on the second send
|
||||
select {
|
||||
case <-done:
|
||||
t.Fatal("client should block on calling 'b' during reconnect")
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Log("All good")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNotBlockingOnStop(t *testing.T) {
|
||||
timeout := 2 * time.Second
|
||||
s := httptest.NewServer(&myHandler{})
|
||||
c := startClient(t, s.Listener.Addr())
|
||||
c.Call(context.Background(), "a", make(map[string]interface{}))
|
||||
// Let the readRoutine get around to blocking
|
||||
time.Sleep(time.Second)
|
||||
passCh := make(chan struct{})
|
||||
go func() {
|
||||
// Unless we have a non-blocking write to ResponsesCh from readRoutine
|
||||
// this blocks forever ont the waitgroup
|
||||
c.Stop()
|
||||
passCh <- struct{}{}
|
||||
}()
|
||||
select {
|
||||
case <-passCh:
|
||||
// Pass
|
||||
case <-time.After(timeout):
|
||||
t.Fatalf("WSClient did failed to stop within %v seconds - is one of the read/write routines blocking?",
|
||||
timeout.Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
func startClient(t *testing.T, addr net.Addr) *WSClient {
|
||||
c := NewWSClient(addr.String(), "/websocket")
|
||||
err := c.Start()
|
||||
require.Nil(t, err)
|
||||
c.SetLogger(log.TestingLogger())
|
||||
return c
|
||||
}
|
||||
|
||||
func call(t *testing.T, method string, c *WSClient) {
|
||||
err := c.Call(context.Background(), method, make(map[string]interface{}))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) {
|
||||
for {
|
||||
select {
|
||||
case resp := <-c.ResponsesCh:
|
||||
if resp.Error != nil {
|
||||
t.Fatalf("unexpected error: %v", resp.Error)
|
||||
}
|
||||
if resp.Result != nil {
|
||||
wg.Done()
|
||||
}
|
||||
case <-c.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user