2016-01-12 16:50:06 -05:00
|
|
|
package rpcclient
|
|
|
|
|
|
|
|
import (
|
2017-07-31 18:44:46 -04:00
|
|
|
"context"
|
2016-01-13 18:37:35 -05:00
|
|
|
"encoding/json"
|
2017-07-31 18:44:46 -04:00
|
|
|
"fmt"
|
2016-08-10 01:12:01 -04:00
|
|
|
"net"
|
2016-01-12 16:50:06 -05:00
|
|
|
"net/http"
|
2017-07-31 18:44:46 -04:00
|
|
|
"sync"
|
2016-01-12 16:50:06 -05:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/gorilla/websocket"
|
2017-03-09 19:00:05 +04:00
|
|
|
"github.com/pkg/errors"
|
2017-08-03 14:19:39 -04:00
|
|
|
metrics "github.com/rcrowley/go-metrics"
|
|
|
|
|
2018-04-05 15:45:11 -07:00
|
|
|
"github.com/tendermint/go-amino"
|
2017-04-26 19:57:33 -04:00
|
|
|
types "github.com/tendermint/tendermint/rpc/lib/types"
|
2017-04-28 16:24:06 +02:00
|
|
|
cmn "github.com/tendermint/tmlibs/common"
|
2016-01-12 16:50:06 -05:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2017-08-10 17:39:38 -04:00
|
|
|
defaultMaxReconnectAttempts = 25
|
|
|
|
defaultWriteWait = 0
|
|
|
|
defaultReadWait = 0
|
|
|
|
defaultPingPeriod = 0
|
2016-01-12 16:50:06 -05:00
|
|
|
)
|
|
|
|
|
2017-08-07 17:56:38 -04:00
|
|
|
// WSClient is a WebSocket client. The methods of WSClient are safe for use by
|
|
|
|
// multiple goroutines.
|
2016-01-12 16:50:06 -05:00
|
|
|
type WSClient struct {
|
2017-03-07 18:34:54 +04:00
|
|
|
cmn.BaseService
|
2017-07-31 18:44:46 -04:00
|
|
|
|
|
|
|
conn *websocket.Conn
|
2018-04-05 15:45:11 -07:00
|
|
|
cdc *amino.Codec
|
2017-07-31 18:44:46 -04:00
|
|
|
|
2016-02-19 02:05:24 +00:00
|
|
|
Address string // IP:PORT or /path/to/socket
|
|
|
|
Endpoint string // /websocket/url/endpoint
|
2016-08-10 01:12:01 -04:00
|
|
|
Dialer func(string, string) (net.Conn, error)
|
2017-07-31 18:44:46 -04:00
|
|
|
|
2017-08-08 13:20:58 -04:00
|
|
|
// Time between sending a ping and receiving a pong. See
|
|
|
|
// https://godoc.org/github.com/rcrowley/go-metrics#Timer.
|
2017-08-03 14:19:39 -04:00
|
|
|
PingPongLatencyTimer metrics.Timer
|
|
|
|
|
2017-10-24 17:38:12 +01:00
|
|
|
// Single user facing channel to read RPCResponses from, closed only when the client is being stopped.
|
|
|
|
ResponsesCh chan types.RPCResponse
|
2017-09-29 14:11:46 +04:00
|
|
|
|
|
|
|
// Callback, which will be called each time after successful reconnect.
|
|
|
|
onReconnect func()
|
2017-07-31 18:44:46 -04:00
|
|
|
|
|
|
|
// internal channels
|
2017-11-06 13:20:39 -05:00
|
|
|
send chan types.RPCRequest // user requests
|
|
|
|
backlog chan types.RPCRequest // stores a single user request received during a conn failure
|
|
|
|
reconnectAfter chan error // reconnect requests
|
|
|
|
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
|
2017-07-31 18:44:46 -04:00
|
|
|
|
2017-08-05 12:07:02 -04:00
|
|
|
wg sync.WaitGroup
|
|
|
|
|
|
|
|
mtx sync.RWMutex
|
|
|
|
sentLastPingAt time.Time
|
2017-08-07 17:56:38 -04:00
|
|
|
reconnecting bool
|
2017-08-04 10:30:22 -04:00
|
|
|
|
2017-08-10 17:39:38 -04:00
|
|
|
// Maximum reconnect attempts (0 or greater; default: 25).
|
|
|
|
maxReconnectAttempts int
|
|
|
|
|
|
|
|
// Time allowed to write a message to the server. 0 means block until operation succeeds.
|
|
|
|
writeWait time.Duration
|
|
|
|
|
|
|
|
// Time allowed to read the next message from the server. 0 means block until operation succeeds.
|
|
|
|
readWait time.Duration
|
2017-08-04 10:30:22 -04:00
|
|
|
|
2017-08-10 17:39:38 -04:00
|
|
|
// Send pings to server with this period. Must be less than readWait. If 0, no pings will be sent.
|
2017-08-04 10:30:22 -04:00
|
|
|
pingPeriod time.Duration
|
2016-01-12 16:50:06 -05:00
|
|
|
}
|
|
|
|
|
2017-08-07 17:56:38 -04:00
|
|
|
// NewWSClient returns a new client. See the commentary on the func(*WSClient)
|
|
|
|
// functions for a detailed description of how to configure ping period and
|
2017-08-08 17:33:17 -04:00
|
|
|
// pong wait time. The endpoint argument must begin with a `/`.
|
2017-08-04 10:30:22 -04:00
|
|
|
func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSClient {
|
2016-08-10 01:12:01 -04:00
|
|
|
addr, dialer := makeHTTPDialer(remoteAddr)
|
2017-08-04 10:30:22 -04:00
|
|
|
c := &WSClient{
|
2018-04-05 15:45:11 -07:00
|
|
|
cdc: amino.NewCodec(),
|
2017-08-03 14:19:39 -04:00
|
|
|
Address: addr,
|
|
|
|
Dialer: dialer,
|
|
|
|
Endpoint: endpoint,
|
|
|
|
PingPongLatencyTimer: metrics.NewTimer(),
|
2017-08-10 17:39:38 -04:00
|
|
|
|
|
|
|
maxReconnectAttempts: defaultMaxReconnectAttempts,
|
|
|
|
readWait: defaultReadWait,
|
|
|
|
writeWait: defaultWriteWait,
|
2017-08-04 10:30:22 -04:00
|
|
|
pingPeriod: defaultPingPeriod,
|
|
|
|
}
|
|
|
|
c.BaseService = *cmn.NewBaseService(nil, "WSClient", c)
|
|
|
|
for _, option := range options {
|
|
|
|
option(c)
|
|
|
|
}
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
2017-08-24 16:25:56 -04:00
|
|
|
// MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
|
|
|
|
// It should only be used in the constructor and is not Goroutine-safe.
|
2017-08-10 17:39:38 -04:00
|
|
|
func MaxReconnectAttempts(max int) func(*WSClient) {
|
|
|
|
return func(c *WSClient) {
|
|
|
|
c.maxReconnectAttempts = max
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-08-24 16:25:56 -04:00
|
|
|
// ReadWait sets the amount of time to wait before a websocket read times out.
|
|
|
|
// It should only be used in the constructor and is not Goroutine-safe.
|
2017-08-10 17:39:38 -04:00
|
|
|
func ReadWait(readWait time.Duration) func(*WSClient) {
|
|
|
|
return func(c *WSClient) {
|
|
|
|
c.readWait = readWait
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-08-24 16:25:56 -04:00
|
|
|
// WriteWait sets the amount of time to wait before a websocket write times out.
|
|
|
|
// It should only be used in the constructor and is not Goroutine-safe.
|
2017-08-10 17:39:38 -04:00
|
|
|
func WriteWait(writeWait time.Duration) func(*WSClient) {
|
|
|
|
return func(c *WSClient) {
|
|
|
|
c.writeWait = writeWait
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-08-24 16:25:56 -04:00
|
|
|
// PingPeriod sets the duration for sending websocket pings.
|
|
|
|
// It should only be used in the constructor - not Goroutine-safe.
|
2017-08-10 17:39:38 -04:00
|
|
|
func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
|
2017-08-04 10:30:22 -04:00
|
|
|
return func(c *WSClient) {
|
|
|
|
c.pingPeriod = pingPeriod
|
2016-01-12 16:50:06 -05:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-09-29 14:11:46 +04:00
|
|
|
// OnReconnect sets the callback, which will be called every time after
|
|
|
|
// successful reconnect.
|
|
|
|
func OnReconnect(cb func()) func(*WSClient) {
|
|
|
|
return func(c *WSClient) {
|
|
|
|
c.onReconnect = cb
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
// String returns WS client full address.
|
|
|
|
func (c *WSClient) String() string {
|
|
|
|
return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint)
|
2016-02-03 02:01:28 -05:00
|
|
|
}
|
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
// OnStart implements cmn.Service by dialing a server and creating read and
|
|
|
|
// write routines.
|
|
|
|
func (c *WSClient) OnStart() error {
|
|
|
|
err := c.dial()
|
2016-01-12 16:50:06 -05:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-07-31 18:44:46 -04:00
|
|
|
|
2017-10-24 17:38:12 +01:00
|
|
|
c.ResponsesCh = make(chan types.RPCResponse)
|
2017-07-31 18:44:46 -04:00
|
|
|
|
|
|
|
c.send = make(chan types.RPCRequest)
|
|
|
|
// 1 additional error may come from the read/write
|
|
|
|
// goroutine depending on which failed first.
|
|
|
|
c.reconnectAfter = make(chan error, 1)
|
|
|
|
// capacity for 1 request. a user won't be able to send more because the send
|
|
|
|
// channel is unbuffered.
|
|
|
|
c.backlog = make(chan types.RPCRequest, 1)
|
|
|
|
|
|
|
|
c.startReadWriteRoutines()
|
|
|
|
go c.reconnectRoutine()
|
|
|
|
|
2016-01-12 16:50:06 -05:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
// OnStop implements cmn.Service.
|
|
|
|
func (c *WSClient) OnStop() {}
|
|
|
|
|
|
|
|
// Stop overrides cmn.Service#Stop. There is no other way to wait until Quit
|
|
|
|
// channel is closed.
|
2017-11-06 13:20:39 -05:00
|
|
|
func (c *WSClient) Stop() error {
|
2017-12-10 17:44:22 +00:00
|
|
|
if err := c.BaseService.Stop(); err != nil {
|
|
|
|
return err
|
2017-11-06 13:23:51 -05:00
|
|
|
}
|
2017-12-10 17:44:22 +00:00
|
|
|
// only close user-facing channels when we can't write to them
|
|
|
|
c.wg.Wait()
|
|
|
|
close(c.ResponsesCh)
|
|
|
|
|
|
|
|
return nil
|
2016-01-12 16:50:06 -05:00
|
|
|
}
|
|
|
|
|
2017-08-03 14:20:17 -04:00
|
|
|
// IsReconnecting returns true if the client is reconnecting right now.
|
|
|
|
func (c *WSClient) IsReconnecting() bool {
|
2017-08-07 17:56:38 -04:00
|
|
|
c.mtx.RLock()
|
|
|
|
defer c.mtx.RUnlock()
|
2017-08-03 14:20:17 -04:00
|
|
|
return c.reconnecting
|
|
|
|
}
|
|
|
|
|
|
|
|
// IsActive returns true if the client is running and not reconnecting.
|
|
|
|
func (c *WSClient) IsActive() bool {
|
|
|
|
return c.IsRunning() && !c.IsReconnecting()
|
|
|
|
}
|
|
|
|
|
2017-08-05 12:07:02 -04:00
|
|
|
// Send the given RPC request to the server. Results will be available on
|
2017-10-24 17:38:12 +01:00
|
|
|
// ResponsesCh, errors, if any, on ErrorsCh. Will block until send succeeds or
|
2017-08-05 12:07:02 -04:00
|
|
|
// ctx.Done is closed.
|
2017-07-31 18:44:46 -04:00
|
|
|
func (c *WSClient) Send(ctx context.Context, request types.RPCRequest) error {
|
|
|
|
select {
|
|
|
|
case c.send <- request:
|
|
|
|
c.Logger.Info("sent a request", "req", request)
|
|
|
|
return nil
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
2016-02-19 02:05:24 +00:00
|
|
|
|
2017-08-05 12:07:02 -04:00
|
|
|
// Call the given method. See Send description.
|
2017-07-31 18:44:46 -04:00
|
|
|
func (c *WSClient) Call(ctx context.Context, method string, params map[string]interface{}) error {
|
2018-04-05 15:45:11 -07:00
|
|
|
request, err := types.MapToRequest(c.cdc, "ws-client", method, params)
|
2017-07-31 18:44:46 -04:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return c.Send(ctx, request)
|
|
|
|
}
|
|
|
|
|
2017-08-05 12:07:02 -04:00
|
|
|
// CallWithArrayParams the given method with params in a form of array. See
|
|
|
|
// Send description.
|
2017-07-31 18:44:46 -04:00
|
|
|
func (c *WSClient) CallWithArrayParams(ctx context.Context, method string, params []interface{}) error {
|
2018-04-05 15:45:11 -07:00
|
|
|
request, err := types.ArrayToRequest(c.cdc, "ws-client", method, params)
|
2017-07-31 18:44:46 -04:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return c.Send(ctx, request)
|
|
|
|
}
|
|
|
|
|
2018-04-05 15:45:11 -07:00
|
|
|
func (c *WSClient) Codec() *amino.Codec {
|
|
|
|
return c.cdc
|
|
|
|
}
|
|
|
|
|
2018-04-05 21:19:14 -07:00
|
|
|
func (c *WSClient) SetCodec(cdc *amino.Codec) {
|
|
|
|
c.cdc = cdc
|
|
|
|
}
|
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Private methods
|
|
|
|
|
|
|
|
func (c *WSClient) dial() error {
|
2016-02-19 02:05:24 +00:00
|
|
|
dialer := &websocket.Dialer{
|
2017-07-31 18:44:46 -04:00
|
|
|
NetDial: c.Dialer,
|
2016-02-19 02:05:24 +00:00
|
|
|
Proxy: http.ProxyFromEnvironment,
|
|
|
|
}
|
2016-01-12 16:50:06 -05:00
|
|
|
rHeader := http.Header{}
|
2017-07-31 18:44:46 -04:00
|
|
|
conn, _, err := dialer.Dial("ws://"+c.Address+c.Endpoint, rHeader)
|
2016-01-12 16:50:06 -05:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-07-31 18:44:46 -04:00
|
|
|
c.conn = conn
|
2016-01-12 16:50:06 -05:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
// reconnect tries to redial up to maxReconnectAttempts with exponential
|
|
|
|
// backoff.
|
|
|
|
func (c *WSClient) reconnect() error {
|
|
|
|
attempt := 0
|
2016-01-12 16:50:06 -05:00
|
|
|
|
2017-08-07 17:56:38 -04:00
|
|
|
c.mtx.Lock()
|
2017-08-03 14:20:17 -04:00
|
|
|
c.reconnecting = true
|
2017-08-07 17:56:38 -04:00
|
|
|
c.mtx.Unlock()
|
2017-08-03 14:20:17 -04:00
|
|
|
defer func() {
|
2017-08-07 17:56:38 -04:00
|
|
|
c.mtx.Lock()
|
2017-08-03 14:20:17 -04:00
|
|
|
c.reconnecting = false
|
2017-08-07 17:56:38 -04:00
|
|
|
c.mtx.Unlock()
|
2017-08-03 14:20:17 -04:00
|
|
|
}()
|
|
|
|
|
2016-01-12 16:50:06 -05:00
|
|
|
for {
|
2018-04-11 11:38:30 +02:00
|
|
|
jitterSeconds := time.Duration(cmn.RandFloat64() * float64(time.Second)) // 1s == (1e9 ns)
|
2017-10-25 19:20:33 -07:00
|
|
|
backoffDuration := jitterSeconds + ((1 << uint(attempt)) * time.Second)
|
2017-07-31 18:44:46 -04:00
|
|
|
|
2017-10-24 02:00:17 -07:00
|
|
|
c.Logger.Info("reconnecting", "attempt", attempt+1, "backoff_duration", backoffDuration)
|
|
|
|
time.Sleep(backoffDuration)
|
2017-07-31 18:44:46 -04:00
|
|
|
|
|
|
|
err := c.dial()
|
2016-01-12 16:50:06 -05:00
|
|
|
if err != nil {
|
2017-07-31 18:44:46 -04:00
|
|
|
c.Logger.Error("failed to redial", "err", err)
|
2016-01-12 16:50:06 -05:00
|
|
|
} else {
|
2017-07-31 18:44:46 -04:00
|
|
|
c.Logger.Info("reconnected")
|
2017-09-29 14:11:46 +04:00
|
|
|
if c.onReconnect != nil {
|
|
|
|
go c.onReconnect()
|
|
|
|
}
|
2017-07-31 18:44:46 -04:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
attempt++
|
|
|
|
|
2017-08-10 17:39:38 -04:00
|
|
|
if attempt > c.maxReconnectAttempts {
|
2017-07-31 18:44:46 -04:00
|
|
|
return errors.Wrap(err, "reached maximum reconnect attempts")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *WSClient) startReadWriteRoutines() {
|
|
|
|
c.wg.Add(2)
|
2017-08-05 12:07:02 -04:00
|
|
|
c.readRoutineQuit = make(chan struct{})
|
|
|
|
go c.readRoutine()
|
2017-07-31 18:44:46 -04:00
|
|
|
go c.writeRoutine()
|
|
|
|
}
|
|
|
|
|
2017-08-08 19:02:47 -04:00
|
|
|
func (c *WSClient) processBacklog() error {
|
|
|
|
select {
|
|
|
|
case request := <-c.backlog:
|
2017-08-10 17:39:38 -04:00
|
|
|
if c.writeWait > 0 {
|
2017-09-06 11:50:43 -04:00
|
|
|
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
|
2017-09-21 09:55:06 -04:00
|
|
|
c.Logger.Error("failed to set write deadline", "err", err)
|
2017-09-06 11:50:43 -04:00
|
|
|
}
|
2017-08-10 17:39:38 -04:00
|
|
|
}
|
2017-09-06 11:50:43 -04:00
|
|
|
if err := c.conn.WriteJSON(request); err != nil {
|
2017-08-08 19:02:47 -04:00
|
|
|
c.Logger.Error("failed to resend request", "err", err)
|
|
|
|
c.reconnectAfter <- err
|
|
|
|
// requeue request
|
|
|
|
c.backlog <- request
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
c.Logger.Info("resend a request", "req", request)
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
func (c *WSClient) reconnectRoutine() {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case originalError := <-c.reconnectAfter:
|
2017-08-05 12:07:02 -04:00
|
|
|
// wait until writeRoutine and readRoutine finish
|
2017-07-31 18:44:46 -04:00
|
|
|
c.wg.Wait()
|
2017-09-06 11:50:43 -04:00
|
|
|
if err := c.reconnect(); err != nil {
|
2017-07-31 18:44:46 -04:00
|
|
|
c.Logger.Error("failed to reconnect", "err", err, "original_err", originalError)
|
|
|
|
c.Stop()
|
|
|
|
return
|
2018-04-02 10:21:17 +02:00
|
|
|
}
|
|
|
|
// drain reconnectAfter
|
|
|
|
LOOP:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-c.reconnectAfter:
|
|
|
|
default:
|
|
|
|
break LOOP
|
2017-08-08 19:02:47 -04:00
|
|
|
}
|
2017-07-31 18:44:46 -04:00
|
|
|
}
|
2018-04-02 10:21:17 +02:00
|
|
|
err := c.processBacklog()
|
|
|
|
if err == nil {
|
|
|
|
c.startReadWriteRoutines()
|
|
|
|
}
|
|
|
|
|
2018-02-12 14:31:52 +04:00
|
|
|
case <-c.Quit():
|
2017-07-31 18:44:46 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// The client ensures that there is at most one writer to a connection by
|
|
|
|
// executing all writes from this goroutine.
|
|
|
|
func (c *WSClient) writeRoutine() {
|
2017-08-10 17:39:38 -04:00
|
|
|
var ticker *time.Ticker
|
|
|
|
if c.pingPeriod > 0 {
|
|
|
|
// ticker with a predefined period
|
|
|
|
ticker = time.NewTicker(c.pingPeriod)
|
|
|
|
} else {
|
|
|
|
// ticker that never fires
|
|
|
|
ticker = &time.Ticker{C: make(<-chan time.Time)}
|
|
|
|
}
|
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
defer func() {
|
|
|
|
ticker.Stop()
|
2017-09-06 11:50:43 -04:00
|
|
|
if err := c.conn.Close(); err != nil {
|
2017-09-21 10:56:42 -04:00
|
|
|
// ignore error; it will trigger in tests
|
2017-10-03 18:49:20 -04:00
|
|
|
// likely because it's closing an already closed connection
|
2017-09-06 11:50:43 -04:00
|
|
|
}
|
2017-07-31 18:44:46 -04:00
|
|
|
c.wg.Done()
|
|
|
|
}()
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case request := <-c.send:
|
2017-08-10 17:39:38 -04:00
|
|
|
if c.writeWait > 0 {
|
2017-09-06 11:50:43 -04:00
|
|
|
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
|
2017-09-21 09:55:06 -04:00
|
|
|
c.Logger.Error("failed to set write deadline", "err", err)
|
2017-09-06 11:50:43 -04:00
|
|
|
}
|
2017-08-10 17:39:38 -04:00
|
|
|
}
|
2017-09-06 11:50:43 -04:00
|
|
|
if err := c.conn.WriteJSON(request); err != nil {
|
2017-07-31 18:44:46 -04:00
|
|
|
c.Logger.Error("failed to send request", "err", err)
|
|
|
|
c.reconnectAfter <- err
|
|
|
|
// add request to the backlog, so we don't lose it
|
|
|
|
c.backlog <- request
|
|
|
|
return
|
|
|
|
}
|
|
|
|
case <-ticker.C:
|
2017-08-10 17:39:38 -04:00
|
|
|
if c.writeWait > 0 {
|
2017-09-06 11:50:43 -04:00
|
|
|
if err := c.conn.SetWriteDeadline(time.Now().Add(c.writeWait)); err != nil {
|
2017-09-21 09:55:06 -04:00
|
|
|
c.Logger.Error("failed to set write deadline", "err", err)
|
2017-09-06 11:50:43 -04:00
|
|
|
}
|
2017-08-10 17:39:38 -04:00
|
|
|
}
|
2017-09-06 11:50:43 -04:00
|
|
|
if err := c.conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
|
2017-07-31 18:44:46 -04:00
|
|
|
c.Logger.Error("failed to write ping", "err", err)
|
|
|
|
c.reconnectAfter <- err
|
|
|
|
return
|
2016-01-13 21:21:16 -05:00
|
|
|
}
|
2017-08-03 14:19:39 -04:00
|
|
|
c.mtx.Lock()
|
|
|
|
c.sentLastPingAt = time.Now()
|
|
|
|
c.mtx.Unlock()
|
2017-07-31 18:44:46 -04:00
|
|
|
c.Logger.Debug("sent ping")
|
2017-08-05 12:07:02 -04:00
|
|
|
case <-c.readRoutineQuit:
|
2017-07-31 18:44:46 -04:00
|
|
|
return
|
2018-02-12 14:31:52 +04:00
|
|
|
case <-c.Quit():
|
2017-09-06 11:50:43 -04:00
|
|
|
if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
|
2017-09-21 09:55:06 -04:00
|
|
|
c.Logger.Error("failed to write message", "err", err)
|
2017-09-06 11:50:43 -04:00
|
|
|
}
|
2017-07-31 18:44:46 -04:00
|
|
|
return
|
2016-01-12 16:50:06 -05:00
|
|
|
}
|
|
|
|
}
|
2017-07-31 18:44:46 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
// The client ensures that there is at most one reader to a connection by
|
|
|
|
// executing all reads from this goroutine.
|
2017-08-05 12:07:02 -04:00
|
|
|
func (c *WSClient) readRoutine() {
|
2017-07-31 18:44:46 -04:00
|
|
|
defer func() {
|
2017-09-06 11:50:43 -04:00
|
|
|
if err := c.conn.Close(); err != nil {
|
2017-09-21 10:56:42 -04:00
|
|
|
// ignore error; it will trigger in tests
|
2017-10-03 18:49:20 -04:00
|
|
|
// likely because it's closing an already closed connection
|
2017-09-06 11:50:43 -04:00
|
|
|
}
|
2017-07-31 18:44:46 -04:00
|
|
|
c.wg.Done()
|
|
|
|
}()
|
2016-01-12 16:50:06 -05:00
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
c.conn.SetPongHandler(func(string) error {
|
2017-08-10 17:39:38 -04:00
|
|
|
// gather latency stats
|
2017-08-03 14:19:39 -04:00
|
|
|
c.mtx.RLock()
|
2017-08-10 17:39:38 -04:00
|
|
|
t := c.sentLastPingAt
|
2017-08-03 14:19:39 -04:00
|
|
|
c.mtx.RUnlock()
|
2017-08-10 17:39:38 -04:00
|
|
|
c.PingPongLatencyTimer.UpdateSince(t)
|
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
c.Logger.Debug("got pong")
|
|
|
|
return nil
|
|
|
|
})
|
2017-08-05 12:07:02 -04:00
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
for {
|
2017-08-10 17:39:38 -04:00
|
|
|
// reset deadline for every message type (control or data)
|
|
|
|
if c.readWait > 0 {
|
2017-09-06 11:50:43 -04:00
|
|
|
if err := c.conn.SetReadDeadline(time.Now().Add(c.readWait)); err != nil {
|
2017-09-21 09:55:06 -04:00
|
|
|
c.Logger.Error("failed to set read deadline", "err", err)
|
2017-09-06 11:50:43 -04:00
|
|
|
}
|
2017-08-10 17:39:38 -04:00
|
|
|
}
|
2017-07-31 18:44:46 -04:00
|
|
|
_, data, err := c.conn.ReadMessage()
|
|
|
|
if err != nil {
|
2017-08-10 17:39:38 -04:00
|
|
|
if !websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
|
2017-07-31 18:44:46 -04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
c.Logger.Error("failed to read response", "err", err)
|
2017-08-05 12:07:02 -04:00
|
|
|
close(c.readRoutineQuit)
|
2017-07-31 18:44:46 -04:00
|
|
|
c.reconnectAfter <- err
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
var response types.RPCResponse
|
|
|
|
err = json.Unmarshal(data, &response)
|
|
|
|
if err != nil {
|
|
|
|
c.Logger.Error("failed to parse response", "err", err, "data", string(data))
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
c.Logger.Info("got response", "resp", response.Result)
|
2017-10-24 21:53:42 +01:00
|
|
|
// Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking
|
|
|
|
// c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
|
|
|
|
// both readRoutine and writeRoutine
|
2017-10-24 13:20:49 +01:00
|
|
|
select {
|
2018-02-12 14:31:52 +04:00
|
|
|
case <-c.Quit():
|
2017-10-24 17:38:12 +01:00
|
|
|
case c.ResponsesCh <- response:
|
2017-10-24 13:20:49 +01:00
|
|
|
}
|
2017-07-31 18:44:46 -04:00
|
|
|
}
|
2016-01-12 16:50:06 -05:00
|
|
|
}
|
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
// Predefined methods
|
|
|
|
|
new pubsub package
comment out failing consensus tests for now
rewrite rpc httpclient to use new pubsub package
import pubsub as tmpubsub, query as tmquery
make event IDs constants
EventKey -> EventTypeKey
rename EventsPubsub to PubSub
mempool does not use pubsub
rename eventsSub to pubsub
new subscribe API
fix channel size issues and consensus tests bugs
refactor rpc client
add missing discardFromChan method
add mutex
rename pubsub to eventBus
remove IsRunning from WSRPCConnection interface (not needed)
add a comment in broadcastNewRoundStepsAndVotes
rename registerEventCallbacks to broadcastNewRoundStepsAndVotes
See https://dave.cheney.net/2014/03/19/channel-axioms
stop eventBuses after reactor tests
remove unnecessary Unsubscribe
return subscribe helper function
move discardFromChan to where it is used
subscribe now returns an err
this gives us ability to refuse to subscribe if pubsub is at its max
capacity.
use context for control overflow
cache queries
handle err when subscribing in replay_test
rename testClientID to testSubscriber
extract var
set channel buffer capacity to 1 in replay_file
fix byzantine_test
unsubscribe from single event, not all events
refactor httpclient to return events to appropriate channels
return failing testReplayCrashBeforeWriteVote test
fix TestValidatorSetChanges
refactor code a bit
fix testReplayCrashBeforeWriteVote
add comment
fix TestValidatorSetChanges
fixes from Bucky's review
update comment [ci skip]
test TxEventBuffer
update changelog
fix TestValidatorSetChanges (2nd attempt)
only do wg.Done when no errors
benchmark event bus
create pubsub server inside NewEventBus
only expose config params (later if needed)
set buffer capacity to 0 so we are not testing cache
new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ}
This should allow to subscribe to all transactions! or a specific one
using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'"
use TimeoutCommit instead of afterPublishEventNewBlockTimeout
TimeoutCommit is the time a node waits after committing a block, before
it goes into the next height. So it will finish everything from the last
block, but then wait a bit. The idea is this gives it time to hear more
votes from other validators, to strengthen the commit it includes in the
next block. But it also gives it time to hear about new transactions.
waitForBlockWithUpdatedVals
rewrite WAL crash tests
Task:
test that we can recover from any WAL crash.
Solution:
the old tests were relying on event hub being run in the same thread (we
were injecting the private validator's last signature).
when considering a rewrite, we considered two possible solutions: write
a "fuzzy" testing system where WAL is crashing upon receiving a new
message, or inject failures and trigger them in tests using something
like https://github.com/coreos/gofail.
remove sleep
no cs.Lock around wal.Save
test different cases (empty block, non-empty block, ...)
comments
add comments
test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks
fixes as per Bucky's last review
reset subscriptions on UnsubscribeAll
use a simple counter to track message for which we panicked
also, set a smaller part size for all test cases
2017-06-26 19:00:30 +04:00
|
|
|
// Subscribe to a query. Note the server must have a "subscribe" route
|
2017-03-10 15:23:43 +04:00
|
|
|
// defined.
|
new pubsub package
comment out failing consensus tests for now
rewrite rpc httpclient to use new pubsub package
import pubsub as tmpubsub, query as tmquery
make event IDs constants
EventKey -> EventTypeKey
rename EventsPubsub to PubSub
mempool does not use pubsub
rename eventsSub to pubsub
new subscribe API
fix channel size issues and consensus tests bugs
refactor rpc client
add missing discardFromChan method
add mutex
rename pubsub to eventBus
remove IsRunning from WSRPCConnection interface (not needed)
add a comment in broadcastNewRoundStepsAndVotes
rename registerEventCallbacks to broadcastNewRoundStepsAndVotes
See https://dave.cheney.net/2014/03/19/channel-axioms
stop eventBuses after reactor tests
remove unnecessary Unsubscribe
return subscribe helper function
move discardFromChan to where it is used
subscribe now returns an err
this gives us ability to refuse to subscribe if pubsub is at its max
capacity.
use context for control overflow
cache queries
handle err when subscribing in replay_test
rename testClientID to testSubscriber
extract var
set channel buffer capacity to 1 in replay_file
fix byzantine_test
unsubscribe from single event, not all events
refactor httpclient to return events to appropriate channels
return failing testReplayCrashBeforeWriteVote test
fix TestValidatorSetChanges
refactor code a bit
fix testReplayCrashBeforeWriteVote
add comment
fix TestValidatorSetChanges
fixes from Bucky's review
update comment [ci skip]
test TxEventBuffer
update changelog
fix TestValidatorSetChanges (2nd attempt)
only do wg.Done when no errors
benchmark event bus
create pubsub server inside NewEventBus
only expose config params (later if needed)
set buffer capacity to 0 so we are not testing cache
new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ}
This should allow to subscribe to all transactions! or a specific one
using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'"
use TimeoutCommit instead of afterPublishEventNewBlockTimeout
TimeoutCommit is the time a node waits after committing a block, before
it goes into the next height. So it will finish everything from the last
block, but then wait a bit. The idea is this gives it time to hear more
votes from other validators, to strengthen the commit it includes in the
next block. But it also gives it time to hear about new transactions.
waitForBlockWithUpdatedVals
rewrite WAL crash tests
Task:
test that we can recover from any WAL crash.
Solution:
the old tests were relying on event hub being run in the same thread (we
were injecting the private validator's last signature).
when considering a rewrite, we considered two possible solutions: write
a "fuzzy" testing system where WAL is crashing upon receiving a new
message, or inject failures and trigger them in tests using something
like https://github.com/coreos/gofail.
remove sleep
no cs.Lock around wal.Save
test different cases (empty block, non-empty block, ...)
comments
add comments
test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks
fixes as per Bucky's last review
reset subscriptions on UnsubscribeAll
use a simple counter to track message for which we panicked
also, set a smaller part size for all test cases
2017-06-26 19:00:30 +04:00
|
|
|
func (c *WSClient) Subscribe(ctx context.Context, query string) error {
|
|
|
|
params := map[string]interface{}{"query": query}
|
2017-07-31 18:44:46 -04:00
|
|
|
return c.Call(ctx, "subscribe", params)
|
2016-01-12 16:50:06 -05:00
|
|
|
}
|
|
|
|
|
new pubsub package
comment out failing consensus tests for now
rewrite rpc httpclient to use new pubsub package
import pubsub as tmpubsub, query as tmquery
make event IDs constants
EventKey -> EventTypeKey
rename EventsPubsub to PubSub
mempool does not use pubsub
rename eventsSub to pubsub
new subscribe API
fix channel size issues and consensus tests bugs
refactor rpc client
add missing discardFromChan method
add mutex
rename pubsub to eventBus
remove IsRunning from WSRPCConnection interface (not needed)
add a comment in broadcastNewRoundStepsAndVotes
rename registerEventCallbacks to broadcastNewRoundStepsAndVotes
See https://dave.cheney.net/2014/03/19/channel-axioms
stop eventBuses after reactor tests
remove unnecessary Unsubscribe
return subscribe helper function
move discardFromChan to where it is used
subscribe now returns an err
this gives us ability to refuse to subscribe if pubsub is at its max
capacity.
use context for control overflow
cache queries
handle err when subscribing in replay_test
rename testClientID to testSubscriber
extract var
set channel buffer capacity to 1 in replay_file
fix byzantine_test
unsubscribe from single event, not all events
refactor httpclient to return events to appropriate channels
return failing testReplayCrashBeforeWriteVote test
fix TestValidatorSetChanges
refactor code a bit
fix testReplayCrashBeforeWriteVote
add comment
fix TestValidatorSetChanges
fixes from Bucky's review
update comment [ci skip]
test TxEventBuffer
update changelog
fix TestValidatorSetChanges (2nd attempt)
only do wg.Done when no errors
benchmark event bus
create pubsub server inside NewEventBus
only expose config params (later if needed)
set buffer capacity to 0 so we are not testing cache
new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ}
This should allow to subscribe to all transactions! or a specific one
using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'"
use TimeoutCommit instead of afterPublishEventNewBlockTimeout
TimeoutCommit is the time a node waits after committing a block, before
it goes into the next height. So it will finish everything from the last
block, but then wait a bit. The idea is this gives it time to hear more
votes from other validators, to strengthen the commit it includes in the
next block. But it also gives it time to hear about new transactions.
waitForBlockWithUpdatedVals
rewrite WAL crash tests
Task:
test that we can recover from any WAL crash.
Solution:
the old tests were relying on event hub being run in the same thread (we
were injecting the private validator's last signature).
when considering a rewrite, we considered two possible solutions: write
a "fuzzy" testing system where WAL is crashing upon receiving a new
message, or inject failures and trigger them in tests using something
like https://github.com/coreos/gofail.
remove sleep
no cs.Lock around wal.Save
test different cases (empty block, non-empty block, ...)
comments
add comments
test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks
fixes as per Bucky's last review
reset subscriptions on UnsubscribeAll
use a simple counter to track message for which we panicked
also, set a smaller part size for all test cases
2017-06-26 19:00:30 +04:00
|
|
|
// Unsubscribe from a query. Note the server must have a "unsubscribe" route
|
2017-03-10 15:23:43 +04:00
|
|
|
// defined.
|
new pubsub package
comment out failing consensus tests for now
rewrite rpc httpclient to use new pubsub package
import pubsub as tmpubsub, query as tmquery
make event IDs constants
EventKey -> EventTypeKey
rename EventsPubsub to PubSub
mempool does not use pubsub
rename eventsSub to pubsub
new subscribe API
fix channel size issues and consensus tests bugs
refactor rpc client
add missing discardFromChan method
add mutex
rename pubsub to eventBus
remove IsRunning from WSRPCConnection interface (not needed)
add a comment in broadcastNewRoundStepsAndVotes
rename registerEventCallbacks to broadcastNewRoundStepsAndVotes
See https://dave.cheney.net/2014/03/19/channel-axioms
stop eventBuses after reactor tests
remove unnecessary Unsubscribe
return subscribe helper function
move discardFromChan to where it is used
subscribe now returns an err
this gives us ability to refuse to subscribe if pubsub is at its max
capacity.
use context for control overflow
cache queries
handle err when subscribing in replay_test
rename testClientID to testSubscriber
extract var
set channel buffer capacity to 1 in replay_file
fix byzantine_test
unsubscribe from single event, not all events
refactor httpclient to return events to appropriate channels
return failing testReplayCrashBeforeWriteVote test
fix TestValidatorSetChanges
refactor code a bit
fix testReplayCrashBeforeWriteVote
add comment
fix TestValidatorSetChanges
fixes from Bucky's review
update comment [ci skip]
test TxEventBuffer
update changelog
fix TestValidatorSetChanges (2nd attempt)
only do wg.Done when no errors
benchmark event bus
create pubsub server inside NewEventBus
only expose config params (later if needed)
set buffer capacity to 0 so we are not testing cache
new tx event format: key = "Tx" plus a tag {"tx.hash": XYZ}
This should allow to subscribe to all transactions! or a specific one
using a query: "tm.events.type = Tx and tx.hash = '013ABF99434...'"
use TimeoutCommit instead of afterPublishEventNewBlockTimeout
TimeoutCommit is the time a node waits after committing a block, before
it goes into the next height. So it will finish everything from the last
block, but then wait a bit. The idea is this gives it time to hear more
votes from other validators, to strengthen the commit it includes in the
next block. But it also gives it time to hear about new transactions.
waitForBlockWithUpdatedVals
rewrite WAL crash tests
Task:
test that we can recover from any WAL crash.
Solution:
the old tests were relying on event hub being run in the same thread (we
were injecting the private validator's last signature).
when considering a rewrite, we considered two possible solutions: write
a "fuzzy" testing system where WAL is crashing upon receiving a new
message, or inject failures and trigger them in tests using something
like https://github.com/coreos/gofail.
remove sleep
no cs.Lock around wal.Save
test different cases (empty block, non-empty block, ...)
comments
add comments
test 4 cases: empty block, non-empty block, non-empty block with smaller part size, many blocks
fixes as per Bucky's last review
reset subscriptions on UnsubscribeAll
use a simple counter to track message for which we panicked
also, set a smaller part size for all test cases
2017-06-26 19:00:30 +04:00
|
|
|
func (c *WSClient) Unsubscribe(ctx context.Context, query string) error {
|
|
|
|
params := map[string]interface{}{"query": query}
|
2017-07-31 18:44:46 -04:00
|
|
|
return c.Call(ctx, "unsubscribe", params)
|
2016-01-12 16:50:06 -05:00
|
|
|
}
|
2017-03-10 15:23:43 +04:00
|
|
|
|
2017-07-31 18:44:46 -04:00
|
|
|
// UnsubscribeAll from all. Note the server must have a "unsubscribe_all" route
|
|
|
|
// defined.
|
|
|
|
func (c *WSClient) UnsubscribeAll(ctx context.Context) error {
|
|
|
|
params := map[string]interface{}{}
|
|
|
|
return c.Call(ctx, "unsubscribe_all", params)
|
2017-03-10 15:23:43 +04:00
|
|
|
}
|