mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 06:42:16 +00:00
Fixes https://github.com/tendermint/tendermint/issues/751. Adds jitter to our exponential backoff to mitigate a self DDOS vector. The jitter is a randomly picked percentage of a second whose purpose is to ensure that each exponential backoff retry occurs within (1<<attempts) == 2**attempts, but with the delay each client will have a random buffer time before it tries to reconnect instead of all at once reconnections that might even bring back the previous conditions that might have caused the dial to the WSServer to have failed e.g * Network outage * File descriptor exhaustion * False positives from firewalls etc
473 lines
12 KiB
Go
473 lines
12 KiB
Go
package rpcclient
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/pkg/errors"
|
|
metrics "github.com/rcrowley/go-metrics"
|
|
|
|
types "github.com/tendermint/tendermint/rpc/lib/types"
|
|
cmn "github.com/tendermint/tmlibs/common"
|
|
)
|
|
|
|
const (
|
|
defaultMaxReconnectAttempts = 25
|
|
defaultWriteWait = 0
|
|
defaultReadWait = 0
|
|
defaultPingPeriod = 0
|
|
)
|
|
|
|
// WSClient is a WebSocket client. The methods of WSClient are safe for use by
|
|
// multiple goroutines.
|
|
type WSClient struct {
|
|
cmn.BaseService
|
|
|
|
conn *websocket.Conn
|
|
|
|
Address string // IP:PORT or /path/to/socket
|
|
Endpoint string // /websocket/url/endpoint
|
|
Dialer func(string, string) (net.Conn, error)
|
|
|
|
// Time between sending a ping and receiving a pong. See
|
|
// https://godoc.org/github.com/rcrowley/go-metrics#Timer.
|
|
PingPongLatencyTimer metrics.Timer
|
|
|
|
// user facing channels, closed only when the client is being stopped.
|
|
ResultsCh chan json.RawMessage
|
|
ErrorsCh chan error
|
|
|
|
// Callback, which will be called each time after successful reconnect.
|
|
onReconnect func()
|
|
|
|
// internal channels
|
|
send chan types.RPCRequest // user requests
|
|
backlog chan types.RPCRequest // stores a single user request received during a conn failure
|
|
reconnectAfter chan error // reconnect requests
|
|
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
|
|
|
|
wg sync.WaitGroup
|
|
|
|
mtx sync.RWMutex
|
|
sentLastPingAt time.Time
|
|
reconnecting bool
|
|
|
|
// Maximum reconnect attempts (0 or greater; default: 25).
|
|
maxReconnectAttempts int
|
|
|
|
// Time allowed to write a message to the server. 0 means block until operation succeeds.
|
|
writeWait time.Duration
|
|
|
|
// Time allowed to read the next message from the server. 0 means block until operation succeeds.
|
|
readWait time.Duration
|
|
|
|
// Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
|
|
pingPeriod time.Duration
|
|
}
|
|
|
|
// NewWSClient returns a new client. See the commentary on the func(*WSClient)
|
|
// functions for a detailed description of how to configure ping period and
|
|
// pong wait time. The endpoint argument must begin with a `/`.
|
|
func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient {
|
|
addr, dialer := makeHTTPDialer(remoteAddr)
|
|
c := &WSClient{
|
|
Address: addr,
|
|
Dialer: dialer,
|
|
Endpoint: endpoint,
|
|
PingPongLatencyTimer: metrics.NewTimer(),
|
|
|
|
maxReconnectAttempts: defaultMaxReconnectAttempts,
|
|
readWait: defaultReadWait,
|
|
writeWait: defaultWriteWait,
|
|
pingPeriod: defaultPingPeriod,
|
|
}
|
|
c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
|
|
for _, option := range options {
|
|
option(c)
|
|
}
|
|
return c
|
|
}
|
|
|
|
// MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
|
|
// It should only be used in the constructor and is not Goroutine-safe.
|
|
func MaxReconnectAttempts(max int) func(*WSClient) {
|
|
return func(c *WSClient) {
|
|
c.maxReconnectAttempts = max
|
|
}
|
|
}
|
|
|
|
// ReadWait sets the amount of time to wait before a websocket read times out.
|
|
// It should only be used in the constructor and is not Goroutine-safe.
|
|
func ReadWait(readWait time.Duration) func(*WSClient) {
|
|
return func(c *WSClient) {
|
|
c.readWait = readWait
|
|
}
|
|
}
|
|
|
|
// WriteWait sets the amount of time to wait before a websocket write times out.
|
|
// It should only be used in the constructor and is not Goroutine-safe.
|
|
func WriteWait(writeWait time.Duration) func(*WSClient) {
|
|
return func(c *WSClient) {
|
|
c.writeWait = writeWait
|
|
}
|
|
}
|
|
|
|
// PingPeriod sets the duration for sending websocket pings.
|
|
// It should only be used in the constructor - not Goroutine-safe.
|
|
func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
|
|
return func(c *WSClient) {
|
|
c.pingPeriod = pingPeriod
|
|
}
|
|
}
|
|
|
|
// OnReconnect sets the callback, which will be called every time after
|
|
// successful reconnect.
|
|
func OnReconnect(cb func()) func(*WSClient) {
|
|
return func(c *WSClient) {
|
|
c.onReconnect = cb
|
|
}
|
|
}
|
|
|
|
// String returns WS client full address.
|
|
func (c *WSClient) String() string {
|
|
return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint)
|
|
}
|
|
|
|
// OnStart implements cmn.Service by dialing a server and creating read and
|
|
// write routines.
|
|
func (c *WSClient) OnStart() error {
|
|
err := c.dial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.ResultsCh = make(chan json.RawMessage)
|
|
c.ErrorsCh = make(chan error)
|
|
|
|
c.send = make(chan types.RPCRequest)
|
|
// 1 additional error may come from the read/write
|
|
// goroutine depending on which failed first.
|
|
c.reconnectAfter = make(chan error, 1)
|
|
// capacity for 1 request. a user won't be able to send more because the send
|
|
// channel is unbuffered.
|
|
c.backlog = make(chan types.RPCRequest, 1)
|
|
|
|
c.startReadWriteRoutines()
|
|
go c.reconnectRoutine()
|
|
|
|
return nil
|
|
}
|
|
|
|
// OnStop implements cmn.Service.
|
|
func (c *WSClient) OnStop() {}
|
|
|
|
// Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
|
|
// channel is closed.
|
|
func (c *WSClient) Stop() bool {
|
|
success := c.BaseService.Stop()
|
|
// only close user-facing channels when we can't write to them
|
|
c.wg.Wait()
|
|
close(c.ResultsCh)
|
|
close(c.ErrorsCh)
|
|
return success
|
|
}
|
|
|
|
// IsReconnecting returns true if the client is reconnecting right now.
|
|
func (c *WSClient) IsReconnecting() bool {
|
|
c.mtx.RLock()
|
|
defer c.mtx.RUnlock()
|
|
return c.reconnecting
|
|
}
|
|
|
|
// IsActive returns true if the client is running and not reconnecting.
|
|
func (c *WSClient) IsActive() bool {
|
|
return c.IsRunning() && !c.IsReconnecting()
|
|
}
|
|
|
|
// Send the given RPC request to the server. Results will be available on
|
|
// ResultsCh, errors, if any, on ErrorsCh. Will block until send succeeds or
|
|
// ctx.Done is closed.
|
|
func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
|
|
select {
|
|
case c.send <- request:
|
|
c.Logger.Info("sent a request", "req", request)
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Call the given method. See Send description.
|
|
func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
|
|
request, err := types.MapToRequest("ws-client", method, params)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return c.Send(ctx, request)
|
|
}
|
|
|
|
// CallWithArrayParams the given method with params in a form of array. See
|
|
// Send description.
|
|
func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error {
|
|
request, err := types.ArrayToRequest("ws-client", method, params)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return c.Send(ctx, request)
|
|
}
|
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
// Private methods
|
|
|
|
func (c *WSClient) dial() error {
|
|
dialer := &websocket.Dialer{
|
|
NetDial: c.Dialer,
|
|
Proxy: http.ProxyFromEnvironment,
|
|
}
|
|
rHeader := http.Header{}
|
|
conn, _, err := dialer.Dial("ws://"+c.Address+c.Endpoint, rHeader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.conn = conn
|
|
return nil
|
|
}
|
|
|
|
// reconnect tries to redial up to maxReconnectAttempts with exponential
|
|
// backoff.
|
|
func (c *WSClient) reconnect() error {
|
|
attempt := 0
|
|
|
|
c.mtx.Lock()
|
|
c.reconnecting = true
|
|
c.mtx.Unlock()
|
|
defer func() {
|
|
c.mtx.Lock()
|
|
c.reconnecting = false
|
|
c.mtx.Unlock()
|
|
}()
|
|
|
|
_1sAsNs := float64(time.Second.Nanoseconds())
|
|
for {
|
|
jitter := time.Duration(rand.Float64() * _1sAsNs)
|
|
backoffDuration := jitter + ((1 << uint(attempt)) * time.Second)
|
|
|
|
c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
|
|
time.Sleep(backoffDuration)
|
|
|
|
err := c.dial()
|
|
if err != nil {
|
|
c.Logger.Error("failed to redial", "err", err)
|
|
} else {
|
|
c.Logger.Info("reconnected")
|
|
if c.onReconnect != nil {
|
|
go c.onReconnect()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
attempt++
|
|
|
|
if attempt > c.maxReconnectAttempts {
|
|
return errors.Wrap(err, "reached maximum reconnect attempts")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *WSClient) startReadWriteRoutines() {
|
|
c.wg.Add(2)
|
|
c.readRoutineQuit = make(chan struct{})
|
|
go c.readRoutine()
|
|
go c.writeRoutine()
|
|
}
|
|
|
|
func (c *WSClient) processBacklog() error {
|
|
select {
|
|
case request := <-c.backlog:
|
|
if c.writeWait > 0 {
|
|
c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
|
|
}
|
|
err := c.conn.WriteJSON(request)
|
|
if err != nil {
|
|
c.Logger.Error("failed to resend request", "err", err)
|
|
c.reconnectAfter <- err
|
|
// requeue request
|
|
c.backlog <- request
|
|
return err
|
|
}
|
|
c.Logger.Info("resend a request", "req", request)
|
|
default:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *WSClient) reconnectRoutine() {
|
|
for {
|
|
select {
|
|
case originalError := <-c.reconnectAfter:
|
|
// wait until writeRoutine and readRoutine finish
|
|
c.wg.Wait()
|
|
err := c.reconnect()
|
|
if err != nil {
|
|
c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError)
|
|
c.Stop()
|
|
return
|
|
} else {
|
|
// drain reconnectAfter
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case <-c.reconnectAfter:
|
|
default:
|
|
break LOOP
|
|
}
|
|
}
|
|
err = c.processBacklog()
|
|
if err == nil {
|
|
c.startReadWriteRoutines()
|
|
}
|
|
}
|
|
case <-c.Quit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// The client ensures that there is at most one writer to a connection by
|
|
// executing all writes from this goroutine.
|
|
func (c *WSClient) writeRoutine() {
|
|
var ticker *time.Ticker
|
|
if c.pingPeriod > 0 {
|
|
// ticker with a predefined period
|
|
ticker = time.NewTicker(c.pingPeriod)
|
|
} else {
|
|
// ticker that never fires
|
|
ticker = &time.Ticker{C: make(<-chan time.Time)}
|
|
}
|
|
|
|
defer func() {
|
|
ticker.Stop()
|
|
c.conn.Close()
|
|
c.wg.Done()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case request := <-c.send:
|
|
if c.writeWait > 0 {
|
|
c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
|
|
}
|
|
err := c.conn.WriteJSON(request)
|
|
if err != nil {
|
|
c.Logger.Error("failed to send request", "err", err)
|
|
c.reconnectAfter <- err
|
|
// add request to the backlog, so we don't lose it
|
|
c.backlog <- request
|
|
return
|
|
}
|
|
case <-ticker.C:
|
|
if c.writeWait > 0 {
|
|
c.conn.SetWriteDeadline(time.Now().Add(c.writeWait))
|
|
}
|
|
err := c.conn.WriteMessage(websocket.PingMessage, []byte{})
|
|
if err != nil {
|
|
c.Logger.Error("failed to write ping", "err", err)
|
|
c.reconnectAfter <- err
|
|
return
|
|
}
|
|
c.mtx.Lock()
|
|
c.sentLastPingAt = time.Now()
|
|
c.mtx.Unlock()
|
|
c.Logger.Debug("sent ping")
|
|
case <-c.readRoutineQuit:
|
|
return
|
|
case <-c.Quit:
|
|
c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// The client ensures that there is at most one reader to a connection by
|
|
// executing all reads from this goroutine.
|
|
func (c *WSClient) readRoutine() {
|
|
defer func() {
|
|
c.conn.Close()
|
|
c.wg.Done()
|
|
}()
|
|
|
|
c.conn.SetPongHandler(func(string) error {
|
|
// gather latency stats
|
|
c.mtx.RLock()
|
|
t := c.sentLastPingAt
|
|
c.mtx.RUnlock()
|
|
c.PingPongLatencyTimer.UpdateSince(t)
|
|
|
|
c.Logger.Debug("got pong")
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
// reset deadline for every message type (control or data)
|
|
if c.readWait > 0 {
|
|
c.conn.SetReadDeadline(time.Now().Add(c.readWait))
|
|
}
|
|
_, data, err := c.conn.ReadMessage()
|
|
if err != nil {
|
|
if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
|
|
return
|
|
}
|
|
|
|
c.Logger.Error("failed to read response", "err", err)
|
|
close(c.readRoutineQuit)
|
|
c.reconnectAfter <- err
|
|
return
|
|
}
|
|
|
|
var response types.RPCResponse
|
|
err = json.Unmarshal(data, &response)
|
|
if err != nil {
|
|
c.Logger.Error("failed to parse response", "err", err, "data", string(data))
|
|
c.ErrorsCh <- err
|
|
continue
|
|
}
|
|
if response.Error != nil {
|
|
c.ErrorsCh <- response.Error
|
|
continue
|
|
}
|
|
c.Logger.Info("got response", "resp", response.Result)
|
|
c.ResultsCh <- *response.Result
|
|
}
|
|
}
|
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
// Predefined methods
|
|
|
|
// Subscribe to an event. Note the server must have a "subscribe" route
|
|
// defined.
|
|
func (c *WSClient) Subscribe(ctx context.Context, eventType string) error {
|
|
params := map[string]interface{}{"event": eventType}
|
|
return c.Call(ctx, "subscribe", params)
|
|
}
|
|
|
|
// Unsubscribe from an event. Note the server must have a "unsubscribe" route
|
|
// defined.
|
|
func (c *WSClient) Unsubscribe(ctx context.Context, eventType string) error {
|
|
params := map[string]interface{}{"event": eventType}
|
|
return c.Call(ctx, "unsubscribe", params)
|
|
}
|
|
|
|
// UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route
|
|
// defined.
|
|
func (c *WSClient) UnsubscribeAll(ctx context.Context) error {
|
|
params := map[string]interface{}{}
|
|
return c.Call(ctx, "unsubscribe_all", params)
|
|
}
|