Merge pull request #948 from tendermint/945-transparent-websocket

bring back transparent websocket (Refs #945)
This commit is contained in:
Ethan Buchman 2017-12-10 19:05:32 -05:00 committed by GitHub
commit 3019b9f320
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 161 additions and 206 deletions

View File

@ -27,6 +27,14 @@ BUG FIXES:
- Graceful handling/recovery for apps that have non-determinism or fail to halt - Graceful handling/recovery for apps that have non-determinism or fail to halt
- Graceful handling/recovery for violations of safety, or liveness - Graceful handling/recovery for violations of safety, or liveness
## 0.14.0 (TBD)
BREAKING CHANGES:
- rpc/client: changed Subscribe/Unsubscribe/UnsubscribeAll funcs signatures to be identical to event bus.
IMPROVEMENTS:
- rpc/client: can act as event bus subscriber (See https://github.com/tendermint/tendermint/issues/945).
## 0.13.0 (December 6, 2017) ## 0.13.0 (December 6, 2017)
BREAKING CHANGES: BREAKING CHANGES:

6
glide.lock generated
View File

@ -1,5 +1,5 @@
hash: 09fc7f59ca6b718fe236368bb55f4801455295cfe455ea5865d544ee4dcfdc08 hash: f420f1f858100218dad50997d939eaaf129ff654a0648a47ddc60d626ab0b8e9
updated: 2017-12-06T03:31:34.476581624-05:00 updated: 2017-12-10T05:37:46.41123196Z
imports: imports:
- name: github.com/btcsuite/btcd - name: github.com/btcsuite/btcd
version: 2e60448ffcc6bf78332d1fe590260095f554dd78 version: 2e60448ffcc6bf78332d1fe590260095f554dd78
@ -129,7 +129,7 @@ imports:
subpackages: subpackages:
- iavl - iavl
- name: github.com/tendermint/tmlibs - name: github.com/tendermint/tmlibs
version: bfcc0217f120d3bee6730ba0789d2eb72fc2e889 version: e4ef2835f0081c2ece83b9c1f777cf071f956e81
subpackages: subpackages:
- autofile - autofile
- cli - cli

View File

@ -34,7 +34,7 @@ import:
subpackages: subpackages:
- iavl - iavl
- package: github.com/tendermint/tmlibs - package: github.com/tendermint/tmlibs
version: ~0.5.0 version: e4ef2835f0081c2ece83b9c1f777cf071f956e81
subpackages: subpackages:
- autofile - autofile
- cli - cli

View File

@ -2,7 +2,6 @@ package node
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -441,13 +440,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
for i, listenAddr := range listenAddrs { for i, listenAddr := range listenAddrs {
mux := http.NewServeMux() mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server") rpcLogger := n.Logger.With("module", "rpc-server")
onDisconnect := rpcserver.OnDisconnect(func(remoteAddr string) { wm := rpcserver.NewWebsocketManager(rpccore.Routes, rpcserver.EventSubscriber(n.eventBus))
err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
if err != nil {
rpcLogger.Error("Error unsubsribing from all on disconnect", "err", err)
}
})
wm := rpcserver.NewWebsocketManager(rpccore.Routes, onDisconnect)
wm.SetLogger(rpcLogger.With("protocol", "websocket")) wm.SetLogger(rpcLogger.With("protocol", "websocket"))
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, rpcLogger)

View File

@ -2,7 +2,6 @@ package client
import ( import (
"context" "context"
"fmt"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -57,19 +56,20 @@ func WaitForHeight(c StatusClient, h int64, waiter Waiter) error {
// //
// This handles subscribing and unsubscribing under the hood // This handles subscribing and unsubscribing under the hood
func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) { func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) {
const subscriber = "helpers"
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
evts := make(chan interface{}, 1) evts := make(chan interface{}, 1)
// register for the next event of this type // register for the next event of this type
query := fmt.Sprintf("%s='%s'", types.EventTypeKey, evtTyp) query := types.QueryForEvent(evtTyp)
err := c.Subscribe(ctx, query, evts) err := c.Subscribe(ctx, subscriber, query, evts)
if err != nil { if err != nil {
return types.TMEventData{}, errors.Wrap(err, "failed to subscribe") return types.TMEventData{}, errors.Wrap(err, "failed to subscribe")
} }
// make sure to unregister after the test is over // make sure to unregister after the test is over
defer c.Unsubscribe(ctx, query) defer c.UnsubscribeAll(ctx, subscriber)
select { select {
case evt := <-evts: case evt := <-evts:

View File

@ -3,7 +3,6 @@ package client
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"sync" "sync"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -13,6 +12,7 @@ import (
rpcclient "github.com/tendermint/tendermint/rpc/lib/client" rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
tmpubsub "github.com/tendermint/tmlibs/pubsub"
) )
/* /*
@ -204,20 +204,14 @@ type WSEvents struct {
endpoint string endpoint string
ws *rpcclient.WSClient ws *rpcclient.WSClient
subscriptions map[string]chan<- interface{}
mtx sync.RWMutex mtx sync.RWMutex
subscriptions map[string]chan<- interface{}
// used for signaling the goroutine that feeds ws -> EventSwitch
quit chan bool
done chan bool
} }
func newWSEvents(remote, endpoint string) *WSEvents { func newWSEvents(remote, endpoint string) *WSEvents {
wsEvents := &WSEvents{ wsEvents := &WSEvents{
endpoint: endpoint, endpoint: endpoint,
remote: remote, remote: remote,
quit: make(chan bool, 1),
done: make(chan bool, 1),
subscriptions: make(map[string]chan<- interface{}), subscriptions: make(map[string]chan<- interface{}),
} }
@ -225,87 +219,86 @@ func newWSEvents(remote, endpoint string) *WSEvents {
return wsEvents return wsEvents
} }
// Start is the only way I could think the extend OnStart from func (w *WSEvents) OnStart() error {
// events.eventSwitch. If only it wasn't private... w.ws = rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
// BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
func (w *WSEvents) Start() error {
ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
w.redoSubscriptions() w.redoSubscriptions()
})) }))
err := ws.Start() err := w.ws.Start()
if err == nil { if err != nil {
w.ws = ws return err
go w.eventListener()
} }
return err
go w.eventListener()
return nil
} }
// Stop wraps the BaseService/eventSwitch actions as Start does // Stop wraps the BaseService/eventSwitch actions as Start does
func (w *WSEvents) Stop() error { func (w *WSEvents) OnStop() {
// send a message to quit to stop the eventListener err := w.ws.Stop()
w.quit <- true
<-w.done
w.ws.Stop()
w.ws = nil
return nil
}
func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error {
if ch := w.getSubscription(query); ch != nil {
return errors.New("already subscribed")
}
err := w.ws.Subscribe(ctx, query)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to subscribe") w.Logger.Error("failed to stop WSClient", "err", err)
} }
w.mtx.Lock()
w.subscriptions[query] = out
w.mtx.Unlock()
return nil
} }
func (w *WSEvents) Unsubscribe(ctx context.Context, query string) error { func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
err := w.ws.Unsubscribe(ctx, query) q := query.String()
err := w.ws.Subscribe(ctx, q)
if err != nil { if err != nil {
return err return err
} }
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() // subscriber param is ignored because Tendermint will override it with
ch, ok := w.subscriptions[query] // remote IP anyway.
if ok { w.subscriptions[q] = out
close(ch) w.mtx.Unlock()
delete(w.subscriptions, query)
}
return nil return nil
} }
func (w *WSEvents) UnsubscribeAll(ctx context.Context) error { func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
q := query.String()
err := w.ws.Unsubscribe(ctx, q)
if err != nil {
return err
}
w.mtx.Lock()
ch, ok := w.subscriptions[q]
if ok {
close(ch)
delete(w.subscriptions, q)
}
w.mtx.Unlock()
return nil
}
func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
err := w.ws.UnsubscribeAll(ctx) err := w.ws.UnsubscribeAll(ctx)
if err != nil { if err != nil {
return err return err
} }
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock()
for _, ch := range w.subscriptions { for _, ch := range w.subscriptions {
close(ch) close(ch)
} }
w.subscriptions = make(map[string]chan<- interface{}) w.subscriptions = make(map[string]chan<- interface{})
w.mtx.Unlock()
return nil return nil
} }
// After being reconnected, it is necessary to redo subscription to server // After being reconnected, it is necessary to redo subscription to server
// otherwise no data will be automatically received. // otherwise no data will be automatically received.
func (w *WSEvents) redoSubscriptions() { func (w *WSEvents) redoSubscriptions() {
for query := range w.subscriptions { for q := range w.subscriptions {
// NOTE: no timeout for resubscribing // NOTE: no timeout for resubscribing
// FIXME: better logging/handling of errors?? // FIXME: better logging/handling of errors??
w.ws.Subscribe(context.Background(), query) w.ws.Subscribe(context.Background(), q)
} }
} }
@ -316,34 +309,29 @@ func (w *WSEvents) redoSubscriptions() {
func (w *WSEvents) eventListener() { func (w *WSEvents) eventListener() {
for { for {
select { select {
case resp := <-w.ws.ResponsesCh: case resp, ok := <-w.ws.ResponsesCh:
// res is json.RawMessage if !ok {
return
}
if resp.Error != nil { if resp.Error != nil {
// FIXME: better logging/handling of errors?? w.Logger.Error("WS error", "err", resp.Error.Error())
fmt.Printf("ws err: %+v\n", resp.Error.Error())
continue continue
} }
result := new(ctypes.ResultEvent) result := new(ctypes.ResultEvent)
err := json.Unmarshal(resp.Result, result) err := json.Unmarshal(resp.Result, result)
if err != nil { if err != nil {
// ignore silently (eg. subscribe, unsubscribe and maybe other events) w.Logger.Error("failed to unmarshal response", "err", err)
// TODO: ?
continue continue
} }
if ch := w.getSubscription(result.Query); ch != nil { // NOTE: writing also happens inside mutex so we can't close a channel in
// Unsubscribe/UnsubscribeAll.
w.mtx.RLock()
if ch, ok := w.subscriptions[result.Query]; ok {
ch <- result.Data ch <- result.Data
} }
case <-w.quit: w.mtx.RUnlock()
// send a message so we can wait for the routine to exit case <-w.Quit:
// before cleaning up the w.ws stuff
w.done <- true
return return
} }
} }
} }
func (w *WSEvents) getSubscription(query string) chan<- interface{} {
w.mtx.RLock()
defer w.mtx.RUnlock()
return w.subscriptions[query]
}

View File

@ -20,8 +20,6 @@ implementation.
package client package client
import ( import (
"context"
data "github.com/tendermint/go-wire/data" data "github.com/tendermint/go-wire/data"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -89,7 +87,5 @@ type NetworkClient interface {
// EventsClient is reactive, you can subscribe to any message, given the proper // EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go // string. see tendermint/types/events.go
type EventsClient interface { type EventsClient interface {
Subscribe(ctx context.Context, query string, out chan<- interface{}) error types.EventBusSubscriber
Unsubscribe(ctx context.Context, query string) error
UnsubscribeAll(ctx context.Context) error
} }

View File

@ -3,19 +3,12 @@ package client
import ( import (
"context" "context"
"github.com/pkg/errors"
data "github.com/tendermint/go-wire/data" data "github.com/tendermint/go-wire/data"
nm "github.com/tendermint/tendermint/node" nm "github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/rpc/core" "github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
tmquery "github.com/tendermint/tmlibs/pubsub/query" tmpubsub "github.com/tendermint/tmlibs/pubsub"
)
const (
// event bus subscriber
subscriber = "rpc-localclient"
) )
/* /*
@ -33,10 +26,7 @@ For real clients, you probably want to use client.HTTP. For more
powerful control during testing, you probably want the "client/mock" package. powerful control during testing, you probably want the "client/mock" package.
*/ */
type Local struct { type Local struct {
node *nm.Node
*types.EventBus *types.EventBus
subscriptions map[string]*tmquery.Query
} }
// NewLocal configures a client that calls the Node directly. // NewLocal configures a client that calls the Node directly.
@ -48,9 +38,7 @@ type Local struct {
func NewLocal(node *nm.Node) *Local { func NewLocal(node *nm.Node) *Local {
node.ConfigureRPC() node.ConfigureRPC()
return &Local{ return &Local{
node: node, EventBus: node.EventBus(),
EventBus: node.EventBus(),
subscriptions: make(map[string]*tmquery.Query),
} }
} }
@ -68,7 +56,7 @@ func (Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
return core.ABCIInfo() return core.ABCIInfo()
} }
func (c Local) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery, error) { func (c *Local) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery, error) {
return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions) return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions)
} }
@ -128,34 +116,14 @@ func (Local) TxSearch(query string, prove bool) ([]*ctypes.ResultTx, error) {
return core.TxSearch(query, prove) return core.TxSearch(query, prove)
} }
func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interface{}) error { func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
q, err := tmquery.New(query) return c.EventBus.Subscribe(ctx, subscriber, query, out)
if err != nil {
return errors.Wrap(err, "failed to subscribe")
}
if err = c.EventBus.Subscribe(ctx, subscriber, q, out); err != nil {
return errors.Wrap(err, "failed to subscribe")
}
c.subscriptions[query] = q
return nil
} }
func (c *Local) Unsubscribe(ctx context.Context, query string) error { func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
q, ok := c.subscriptions[query] return c.EventBus.Unsubscribe(ctx, subscriber, query)
if !ok {
return errors.New("subscription not found")
}
if err := c.EventBus.Unsubscribe(ctx, subscriber, q); err != nil {
return errors.Wrap(err, "failed to unsubscribe")
}
delete(c.subscriptions, query)
return nil
} }
func (c *Local) UnsubscribeAll(ctx context.Context) error { func (c *Local) UnsubscribeAll(ctx context.Context, subscriber string) error {
if err := c.EventBus.UnsubscribeAll(ctx, subscriber); err != nil { return c.EventBus.UnsubscribeAll(ctx, subscriber)
return errors.Wrap(err, "failed to unsubscribe")
}
c.subscriptions = make(map[string]*tmquery.Query)
return nil
} }

View File

@ -44,20 +44,15 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
q, err := tmquery.New(query) q, err := tmquery.New(query)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to parse a query") return nil, errors.Wrap(err, "failed to parse query")
}
err = wsCtx.AddSubscription(query, q)
if err != nil {
return nil, errors.Wrap(err, "failed to add subscription")
} }
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel() defer cancel()
ch := make(chan interface{}) ch := make(chan interface{})
err = eventBus.Subscribe(ctx, addr, q, ch) err = eventBusFor(wsCtx).Subscribe(ctx, addr, q, ch)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to subscribe") return nil, err
} }
go func() { go func() {
@ -100,18 +95,31 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error) { func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error) {
addr := wsCtx.GetRemoteAddr() addr := wsCtx.GetRemoteAddr()
logger.Info("Unsubscribe from query", "remote", addr, "query", query) logger.Info("Unsubscribe from query", "remote", addr, "query", query)
q, ok := wsCtx.DeleteSubscription(query) q, err := tmquery.New(query)
if !ok { if err != nil {
return nil, errors.New("subscription not found") return nil, errors.Wrap(err, "failed to parse query")
}
err = eventBusFor(wsCtx).Unsubscribe(context.Background(), addr, q)
if err != nil {
return nil, err
} }
eventBus.Unsubscribe(context.Background(), addr, q.(*tmquery.Query))
return &ctypes.ResultUnsubscribe{}, nil return &ctypes.ResultUnsubscribe{}, nil
} }
func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) { func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) {
addr := wsCtx.GetRemoteAddr() addr := wsCtx.GetRemoteAddr()
logger.Info("Unsubscribe from all", "remote", addr) logger.Info("Unsubscribe from all", "remote", addr)
eventBus.UnsubscribeAll(context.Background(), addr) err := eventBusFor(wsCtx).UnsubscribeAll(context.Background(), addr)
wsCtx.DeleteAllSubscriptions() if err != nil {
return nil, err
}
return &ctypes.ResultUnsubscribe{}, nil return &ctypes.ResultUnsubscribe{}, nil
} }
func eventBusFor(wsCtx rpctypes.WSRPCContext) tmtypes.EventBusSubscriber {
es := wsCtx.GetEventSubscriber()
if es == nil {
es = eventBus
}
return es
}

View File

@ -2,6 +2,7 @@ package rpcserver
import ( import (
"bytes" "bytes"
"context"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -366,8 +367,6 @@ type wsConnection struct {
funcMap map[string]*RPCFunc funcMap map[string]*RPCFunc
subscriptions map[string]interface{}
// write channel capacity // write channel capacity
writeChanCapacity int writeChanCapacity int
@ -380,8 +379,8 @@ type wsConnection struct {
// Send pings to server with this period. Must be less than readWait, but greater than zero. // Send pings to server with this period. Must be less than readWait, but greater than zero.
pingPeriod time.Duration pingPeriod time.Duration
// called before stopping the connection. // object that is used to subscribe / unsubscribe from events
onDisconnect func(remoteAddr string) eventSub types.EventSubscriber
} }
// NewWSConnection wraps websocket.Conn. // NewWSConnection wraps websocket.Conn.
@ -395,7 +394,6 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, opti
remoteAddr: baseConn.RemoteAddr().String(), remoteAddr: baseConn.RemoteAddr().String(),
baseConn: baseConn, baseConn: baseConn,
funcMap: funcMap, funcMap: funcMap,
subscriptions: make(map[string]interface{}),
writeWait: defaultWSWriteWait, writeWait: defaultWSWriteWait,
writeChanCapacity: defaultWSWriteChanCapacity, writeChanCapacity: defaultWSWriteChanCapacity,
readWait: defaultWSReadWait, readWait: defaultWSReadWait,
@ -408,6 +406,15 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, opti
return wsc return wsc
} }
// EventSubscriber sets object that is used to subscribe / unsubscribe from
// events - not Goroutine-safe. If none given, default node's eventBus will be
// used.
func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) {
return func(wsc *wsConnection) {
wsc.eventSub = eventSub
}
}
// WriteWait sets the amount of time to wait before a websocket write times out. // WriteWait sets the amount of time to wait before a websocket write times out.
// It should only be used in the constructor - not Goroutine-safe. // It should only be used in the constructor - not Goroutine-safe.
func WriteWait(writeWait time.Duration) func(*wsConnection) { func WriteWait(writeWait time.Duration) func(*wsConnection) {
@ -440,14 +447,6 @@ func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
} }
} }
// OnDisconnect called before stopping the connection.
// It should only be used in the constructor - not Goroutine-safe.
func OnDisconnect(cb func(remoteAddr string)) func(*wsConnection) {
return func(wsc *wsConnection) {
wsc.onDisconnect = cb
}
}
// OnStart implements cmn.Service by starting the read and write routines. It // OnStart implements cmn.Service by starting the read and write routines. It
// blocks until the connection closes. // blocks until the connection closes.
func (wsc *wsConnection) OnStart() error { func (wsc *wsConnection) OnStart() error {
@ -461,12 +460,12 @@ func (wsc *wsConnection) OnStart() error {
return nil return nil
} }
// OnStop implements cmn.Service by calling OnDisconnect callback. // OnStop implements cmn.Service by unsubscribing remoteAddr from all subscriptions.
func (wsc *wsConnection) OnStop() { func (wsc *wsConnection) OnStop() {
// Both read and write loops close the websocket connection when they exit their loops. // Both read and write loops close the websocket connection when they exit their loops.
// The writeChan is never closed, to allow WriteRPCResponse() to fail. // The writeChan is never closed, to allow WriteRPCResponse() to fail.
if wsc.onDisconnect != nil { if wsc.eventSub != nil {
wsc.onDisconnect(wsc.remoteAddr) wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr)
} }
} }
@ -476,6 +475,11 @@ func (wsc *wsConnection) GetRemoteAddr() string {
return wsc.remoteAddr return wsc.remoteAddr
} }
// GetEventSubscriber implements WSRPCConnection by returning event subscriber.
func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber {
return wsc.eventSub
}
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
// It implements WSRPCConnection. It is Goroutine-safe. // It implements WSRPCConnection. It is Goroutine-safe.
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
@ -499,28 +503,6 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
} }
} }
func (wsc *wsConnection) AddSubscription(query string, data interface{}) error {
if _, ok := wsc.subscriptions[query]; ok {
return errors.New("Already subscribed")
}
wsc.subscriptions[query] = data
return nil
}
func (wsc *wsConnection) DeleteSubscription(query string) (interface{}, bool) {
data, ok := wsc.subscriptions[query]
if ok {
delete(wsc.subscriptions, query)
return data, true
}
return nil, false
}
func (wsc *wsConnection) DeleteAllSubscriptions() {
wsc.subscriptions = make(map[string]interface{})
}
// Read from the socket and subscribe to or unsubscribe from events // Read from the socket and subscribe to or unsubscribe from events
func (wsc *wsConnection) readRoutine() { func (wsc *wsConnection) readRoutine() {
defer func() { defer func() {

View File

@ -1,11 +1,13 @@
package rpctypes package rpctypes
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
"github.com/pkg/errors" "github.com/pkg/errors"
tmpubsub "github.com/tendermint/tmlibs/pubsub"
) )
//---------------------------------------- //----------------------------------------
@ -135,10 +137,14 @@ type WSRPCConnection interface {
GetRemoteAddr() string GetRemoteAddr() string
WriteRPCResponse(resp RPCResponse) WriteRPCResponse(resp RPCResponse)
TryWriteRPCResponse(resp RPCResponse) bool TryWriteRPCResponse(resp RPCResponse) bool
GetEventSubscriber() EventSubscriber
}
AddSubscription(string, interface{}) error // EventSubscriber mirros tendermint/tendermint/types.EventBusSubscriber
DeleteSubscription(string) (interface{}, bool) type EventSubscriber interface {
DeleteAllSubscriptions() Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
} }
// websocket-only RPCFuncs take this as the first parameter. // websocket-only RPCFuncs take this as the first parameter.

View File

@ -12,6 +12,12 @@ import (
const defaultCapacity = 1000 const defaultCapacity = 1000
type EventBusSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
}
// EventBus is a common bus for all events going through the system. All calls // EventBus is a common bus for all events going through the system. All calls
// are proxied to underlying pubsub server. All events must be published using // are proxied to underlying pubsub server. All events must be published using
// EventBus to ensure correct data types. // EventBus to ensure correct data types.

View File

@ -146,32 +146,32 @@ const (
) )
var ( var (
EventQueryBond = queryForEvent(EventBond) EventQueryBond = QueryForEvent(EventBond)
EventQueryUnbond = queryForEvent(EventUnbond) EventQueryUnbond = QueryForEvent(EventUnbond)
EventQueryRebond = queryForEvent(EventRebond) EventQueryRebond = QueryForEvent(EventRebond)
EventQueryDupeout = queryForEvent(EventDupeout) EventQueryDupeout = QueryForEvent(EventDupeout)
EventQueryFork = queryForEvent(EventFork) EventQueryFork = QueryForEvent(EventFork)
EventQueryNewBlock = queryForEvent(EventNewBlock) EventQueryNewBlock = QueryForEvent(EventNewBlock)
EventQueryNewBlockHeader = queryForEvent(EventNewBlockHeader) EventQueryNewBlockHeader = QueryForEvent(EventNewBlockHeader)
EventQueryNewRound = queryForEvent(EventNewRound) EventQueryNewRound = QueryForEvent(EventNewRound)
EventQueryNewRoundStep = queryForEvent(EventNewRoundStep) EventQueryNewRoundStep = QueryForEvent(EventNewRoundStep)
EventQueryTimeoutPropose = queryForEvent(EventTimeoutPropose) EventQueryTimeoutPropose = QueryForEvent(EventTimeoutPropose)
EventQueryCompleteProposal = queryForEvent(EventCompleteProposal) EventQueryCompleteProposal = QueryForEvent(EventCompleteProposal)
EventQueryPolka = queryForEvent(EventPolka) EventQueryPolka = QueryForEvent(EventPolka)
EventQueryUnlock = queryForEvent(EventUnlock) EventQueryUnlock = QueryForEvent(EventUnlock)
EventQueryLock = queryForEvent(EventLock) EventQueryLock = QueryForEvent(EventLock)
EventQueryRelock = queryForEvent(EventRelock) EventQueryRelock = QueryForEvent(EventRelock)
EventQueryTimeoutWait = queryForEvent(EventTimeoutWait) EventQueryTimeoutWait = QueryForEvent(EventTimeoutWait)
EventQueryVote = queryForEvent(EventVote) EventQueryVote = QueryForEvent(EventVote)
EventQueryProposalHeartbeat = queryForEvent(EventProposalHeartbeat) EventQueryProposalHeartbeat = QueryForEvent(EventProposalHeartbeat)
EventQueryTx = queryForEvent(EventTx) EventQueryTx = QueryForEvent(EventTx)
) )
func EventQueryTxFor(tx Tx) tmpubsub.Query { func EventQueryTxFor(tx Tx) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash())) return tmquery.MustParse(fmt.Sprintf("%s='%s' AND %s='%X'", EventTypeKey, EventTx, TxHashKey, tx.Hash()))
} }
func queryForEvent(eventType string) tmpubsub.Query { func QueryForEvent(eventType string) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType)) return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType))
} }