Merge pull request #788 from tendermint/feature/548-indexing-tags

new pubsub package
This commit is contained in:
Ethan Buchman
2017-11-08 01:02:48 +00:00
committed by GitHub
50 changed files with 1526 additions and 988 deletions

View File

@ -31,7 +31,7 @@ func TestHeaderEvents(t *testing.T) {
defer c.Stop()
}
evtTyp := types.EventStringNewBlockHeader()
evtTyp := types.EventNewBlockHeader
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
require.Nil(err, "%d: %+v", i, err)
_, ok := evt.Unwrap().(types.EventDataNewBlockHeader)
@ -54,20 +54,20 @@ func TestBlockEvents(t *testing.T) {
// listen for a new block; ensure height increases by 1
var firstBlockHeight int
for i := 0; i < 3; i++ {
evtTyp := types.EventStringNewBlock()
for j := 0; j < 3; j++ {
evtTyp := types.EventNewBlock
evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second)
require.Nil(err, "%d: %+v", i, err)
require.Nil(err, "%d: %+v", j, err)
blockEvent, ok := evt.Unwrap().(types.EventDataNewBlock)
require.True(ok, "%d: %#v", i, evt)
require.True(ok, "%d: %#v", j, evt)
block := blockEvent.Block
if i == 0 {
if j == 0 {
firstBlockHeight = block.Header.Height
continue
}
require.Equal(block.Header.Height, firstBlockHeight+i)
require.Equal(block.Header.Height, firstBlockHeight+j)
}
}
}
@ -86,7 +86,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) {
// make the tx
_, _, tx := MakeTxKV()
evtTyp := types.EventStringTx(types.Tx(tx))
evtTyp := types.EventTx
// send async
txres, err := c.BroadcastTxAsync(tx)
@ -119,9 +119,9 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) {
// make the tx
_, _, tx := MakeTxKV()
evtTyp := types.EventStringTx(types.Tx(tx))
evtTyp := types.EventTx
// send async
// send sync
txres, err := c.BroadcastTxSync(tx)
require.Nil(err, "%+v", err)
require.True(txres.Code.IsOK())

View File

@ -1,12 +1,12 @@
package client
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
events "github.com/tendermint/tmlibs/events"
)
// Waiter is informed of current height, decided whether to quit early
@ -56,33 +56,25 @@ func WaitForHeight(c StatusClient, h int, waiter Waiter) error {
// when the timeout duration has expired.
//
// This handles subscribing and unsubscribing under the hood
func WaitForOneEvent(evsw types.EventSwitch,
evtTyp string, timeout time.Duration) (types.TMEventData, error) {
listener := cmn.RandStr(12)
evts, quit := make(chan events.EventData, 10), make(chan bool, 1)
// start timeout count-down
go func() {
time.Sleep(timeout)
quit <- true
}()
func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (types.TMEventData, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
evts := make(chan interface{}, 1)
// register for the next event of this type
evsw.AddListenerForEvent(listener, evtTyp, func(data events.EventData) {
evts <- data
})
query := fmt.Sprintf("%s='%s'", types.EventTypeKey, evtTyp)
err := c.Subscribe(ctx, query, evts)
if err != nil {
return types.TMEventData{}, errors.Wrap(err, "failed to subscribe")
}
// make sure to unregister after the test is over
defer evsw.RemoveListenerForEvent(evtTyp, listener)
// defer evsw.RemoveListener(listener) // this also works
defer c.Unsubscribe(ctx, query)
select {
case <-quit:
return types.TMEventData{}, errors.New("timed out waiting for event")
case evt := <-evts:
tmevt, ok := evt.(types.TMEventData)
if ok {
return tmevt, nil
}
return types.TMEventData{}, errors.Errorf("Got unexpected event type: %#v", evt)
return evt.(types.TMEventData), nil
case <-ctx.Done():
return types.TMEventData{}, errors.New("timed out waiting for event")
}
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/pkg/errors"
@ -11,7 +12,7 @@ import (
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tendermint/types"
events "github.com/tendermint/tmlibs/events"
cmn "github.com/tendermint/tmlibs/common"
)
/*
@ -40,10 +41,9 @@ func NewHTTP(remote, wsEndpoint string) *HTTP {
}
var (
_ Client = (*HTTP)(nil)
_ NetworkClient = (*HTTP)(nil)
_ types.EventSwitch = (*HTTP)(nil)
_ types.EventSwitch = (*WSEvents)(nil)
_ Client = (*HTTP)(nil)
_ NetworkClient = (*HTTP)(nil)
_ EventsClient = (*HTTP)(nil)
)
func (c *HTTP) Status() (*ctypes.ResultStatus, error) {
@ -186,128 +186,113 @@ func (c *HTTP) Validators(height *int) (*ctypes.ResultValidators, error) {
/** websocket event stuff here... **/
type WSEvents struct {
types.EventSwitch
cmn.BaseService
remote string
endpoint string
ws *rpcclient.WSClient
subscriptions map[string]chan<- interface{}
mtx sync.RWMutex
// used for signaling the goroutine that feeds ws -> EventSwitch
quit chan bool
done chan bool
// used to maintain counts of actively listened events
// so we can properly subscribe/unsubscribe
// FIXME: thread-safety???
// FIXME: reuse code from tmlibs/events???
evtCount map[string]int // count how many time each event is subscribed
listeners map[string][]string // keep track of which events each listener is listening to
}
func newWSEvents(remote, endpoint string) *WSEvents {
return &WSEvents{
EventSwitch: types.NewEventSwitch(),
endpoint: endpoint,
remote: remote,
quit: make(chan bool, 1),
done: make(chan bool, 1),
evtCount: map[string]int{},
listeners: map[string][]string{},
wsEvents := &WSEvents{
endpoint: endpoint,
remote: remote,
quit: make(chan bool, 1),
done: make(chan bool, 1),
subscriptions: make(map[string]chan<- interface{}),
}
wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents)
return wsEvents
}
// Start is the only way I could think the extend OnStart from
// events.eventSwitch. If only it wasn't private...
// BaseService.Start -> eventSwitch.OnStart -> WSEvents.Start
func (w *WSEvents) Start() (bool, error) {
st, err := w.EventSwitch.Start()
// if we did start, then OnStart here...
if st && err == nil {
ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
w.redoSubscriptions()
}))
_, err = ws.Start()
if err == nil {
w.ws = ws
go w.eventListener()
}
ws := rpcclient.NewWSClient(w.remote, w.endpoint, rpcclient.OnReconnect(func() {
w.redoSubscriptions()
}))
started, err := ws.Start()
if err == nil {
w.ws = ws
go w.eventListener()
}
return st, errors.Wrap(err, "StartWSEvent")
return started, errors.Wrap(err, "StartWSEvent")
}
// Stop wraps the BaseService/eventSwitch actions as Start does
func (w *WSEvents) Stop() bool {
stop := w.EventSwitch.Stop()
if stop {
// send a message to quit to stop the eventListener
w.quit <- true
<-w.done
w.ws.Stop()
w.ws = nil
}
return stop
// send a message to quit to stop the eventListener
w.quit <- true
<-w.done
w.ws.Stop()
w.ws = nil
return true
}
/** TODO: more intelligent subscriptions! **/
func (w *WSEvents) AddListenerForEvent(listenerID, event string, cb events.EventCallback) {
// no one listening -> subscribe
if w.evtCount[event] == 0 {
w.subscribe(event)
func (w *WSEvents) Subscribe(ctx context.Context, query string, out chan<- interface{}) error {
if ch := w.getSubscription(query); ch != nil {
return errors.New("already subscribed")
}
// if this listener was already listening to this event, return early
for _, s := range w.listeners[listenerID] {
if event == s {
return
}
err := w.ws.Subscribe(ctx, query)
if err != nil {
return errors.Wrap(err, "failed to subscribe")
}
// otherwise, add this event to this listener
w.evtCount[event] += 1
w.listeners[listenerID] = append(w.listeners[listenerID], event)
w.EventSwitch.AddListenerForEvent(listenerID, event, cb)
w.mtx.Lock()
w.subscriptions[query] = out
w.mtx.Unlock()
return nil
}
func (w *WSEvents) RemoveListenerForEvent(event string, listenerID string) {
// if this listener is listening already, splice it out
found := false
l := w.listeners[listenerID]
for i, s := range l {
if event == s {
found = true
w.listeners[listenerID] = append(l[:i], l[i+1:]...)
break
}
}
// if the listener wasn't already listening to the event, exit early
if !found {
return
func (w *WSEvents) Unsubscribe(ctx context.Context, query string) error {
err := w.ws.Unsubscribe(ctx, query)
if err != nil {
return err
}
// now we can update the subscriptions
w.evtCount[event] -= 1
if w.evtCount[event] == 0 {
w.unsubscribe(event)
w.mtx.Lock()
defer w.mtx.Unlock()
ch, ok := w.subscriptions[query]
if ok {
close(ch)
delete(w.subscriptions, query)
}
w.EventSwitch.RemoveListenerForEvent(event, listenerID)
return nil
}
func (w *WSEvents) RemoveListener(listenerID string) {
// remove all counts for this listener
for _, s := range w.listeners[listenerID] {
w.evtCount[s] -= 1
if w.evtCount[s] == 0 {
w.unsubscribe(s)
}
func (w *WSEvents) UnsubscribeAll(ctx context.Context) error {
err := w.ws.UnsubscribeAll(ctx)
if err != nil {
return err
}
w.listeners[listenerID] = nil
// then let the switch do it's magic
w.EventSwitch.RemoveListener(listenerID)
w.mtx.Lock()
defer w.mtx.Unlock()
for _, ch := range w.subscriptions {
close(ch)
}
w.subscriptions = make(map[string]chan<- interface{})
return nil
}
// After being reconnected, it is necessary to redo subscription
// to server otherwise no data will be automatically received
// After being reconnected, it is necessary to redo subscription to server
// otherwise no data will be automatically received.
func (w *WSEvents) redoSubscriptions() {
for event, _ := range w.evtCount {
w.subscribe(event)
for query := range w.subscriptions {
// NOTE: no timeout for resubscribing
// FIXME: better logging/handling of errors??
w.ws.Subscribe(context.Background(), query)
}
}
@ -325,10 +310,15 @@ func (w *WSEvents) eventListener() {
fmt.Printf("ws err: %+v\n", resp.Error.Error())
continue
}
err := w.parseEvent(*resp.Result)
result := new(ctypes.ResultEvent)
err := json.Unmarshal(resp.Result, result)
if err != nil {
// FIXME: better logging/handling of errors??
fmt.Printf("ws result: %+v\n", err)
// ignore silently (eg. subscribe, unsubscribe and maybe other events)
// TODO: ?
continue
}
if ch := w.getSubscription(result.Query); ch != nil {
ch <- result.Data
}
case <-w.quit:
// send a message so we can wait for the routine to exit
@ -339,34 +329,8 @@ func (w *WSEvents) eventListener() {
}
}
// parseEvent unmarshals the json message and converts it into
// some implementation of types.TMEventData, and sends it off
// on the merry way to the EventSwitch
func (w *WSEvents) parseEvent(data []byte) (err error) {
result := new(ctypes.ResultEvent)
err = json.Unmarshal(data, result)
if err != nil {
// ignore silently (eg. subscribe, unsubscribe and maybe other events)
// TODO: ?
return nil
}
// looks good! let's fire this baby!
w.EventSwitch.FireEvent(result.Name, result.Data)
return nil
}
// no way of exposing these failures, so we panic.
// is this right? or silently ignore???
func (w *WSEvents) subscribe(event string) {
err := w.ws.Subscribe(context.TODO(), event)
if err != nil {
panic(err)
}
}
func (w *WSEvents) unsubscribe(event string) {
err := w.ws.Unsubscribe(context.TODO(), event)
if err != nil {
panic(err)
}
func (w *WSEvents) getSubscription(query string) chan<- interface{} {
w.mtx.RLock()
defer w.mtx.RUnlock()
return w.subscriptions[query]
}

View File

@ -20,9 +20,12 @@ implementation.
package client
import (
"context"
data "github.com/tendermint/go-wire/data"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
)
// ABCIClient groups together the functionality that principally
@ -64,14 +67,12 @@ type StatusClient interface {
// if you want to listen for events, test if it also
// implements events.EventSwitch
type Client interface {
cmn.Service
ABCIClient
SignClient
HistoryClient
StatusClient
// this Client is reactive, you can subscribe to any TMEventData
// type, given the proper string. see tendermint/types/events.go
types.EventSwitch
EventsClient
}
// NetworkClient is general info about the network state. May not
@ -83,3 +84,11 @@ type NetworkClient interface {
NetInfo() (*ctypes.ResultNetInfo, error)
DumpConsensusState() (*ctypes.ResultDumpConsensusState, error)
}
// EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go
type EventsClient interface {
Subscribe(ctx context.Context, query string, out chan<- interface{}) error
Unsubscribe(ctx context.Context, query string) error
UnsubscribeAll(ctx context.Context) error
}

View File

@ -1,22 +1,32 @@
package client
import (
"context"
"github.com/pkg/errors"
data "github.com/tendermint/go-wire/data"
nm "github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
tmquery "github.com/tendermint/tmlibs/pubsub/query"
)
const (
// event bus subscriber
subscriber = "rpc-localclient"
)
/*
Local is a Client implementation that directly executes the rpc
functions on a given node, without going through HTTP or GRPC
functions on a given node, without going through HTTP or GRPC.
This implementation is useful for:
* Running tests against a node in-process without the overhead
of going through an http server
* Communication between an ABCI app and tendermin core when they
* Communication between an ABCI app and Tendermint core when they
are compiled in process.
For real clients, you probably want to use client.HTTP. For more
@ -24,7 +34,9 @@ powerful control during testing, you probably want the "client/mock" package.
*/
type Local struct {
node *nm.Node
types.EventSwitch
*types.EventBus
subscriptions map[string]*tmquery.Query
}
// NewLocal configures a client that calls the Node directly.
@ -33,24 +45,26 @@ type Local struct {
// you can only have one node per process. So make sure test cases
// don't run in parallel, or try to simulate an entire network in
// one process...
func NewLocal(node *nm.Node) Local {
func NewLocal(node *nm.Node) *Local {
node.ConfigureRPC()
return Local{
node: node,
EventSwitch: node.EventSwitch(),
return &Local{
node: node,
EventBus: node.EventBus(),
subscriptions: make(map[string]*tmquery.Query),
}
}
var (
_ Client = Local{}
_ Client = (*Local)(nil)
_ NetworkClient = Local{}
_ EventsClient = (*Local)(nil)
)
func (c Local) Status() (*ctypes.ResultStatus, error) {
func (Local) Status() (*ctypes.ResultStatus, error) {
return core.Status()
}
func (c Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
func (Local) ABCIInfo() (*ctypes.ResultABCIInfo, error) {
return core.ABCIInfo()
}
@ -58,54 +72,86 @@ func (c Local) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery,
return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions)
}
func (c Local) ABCIQueryWithOptions(path string, data data.Bytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
func (Local) ABCIQueryWithOptions(path string, data data.Bytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
return core.ABCIQuery(path, data, opts.Height, opts.Trusted)
}
func (c Local) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
func (Local) BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
return core.BroadcastTxCommit(tx)
}
func (c Local) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
func (Local) BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxAsync(tx)
}
func (c Local) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
func (Local) BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
return core.BroadcastTxSync(tx)
}
func (c Local) NetInfo() (*ctypes.ResultNetInfo, error) {
func (Local) NetInfo() (*ctypes.ResultNetInfo, error) {
return core.NetInfo()
}
func (c Local) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
func (Local) DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
return core.DumpConsensusState()
}
func (c Local) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) {
func (Local) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) {
return core.UnsafeDialSeeds(seeds)
}
func (c Local) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
func (Local) BlockchainInfo(minHeight, maxHeight int) (*ctypes.ResultBlockchainInfo, error) {
return core.BlockchainInfo(minHeight, maxHeight)
}
func (c Local) Genesis() (*ctypes.ResultGenesis, error) {
func (Local) Genesis() (*ctypes.ResultGenesis, error) {
return core.Genesis()
}
func (c Local) Block(height *int) (*ctypes.ResultBlock, error) {
func (Local) Block(height *int) (*ctypes.ResultBlock, error) {
return core.Block(height)
}
func (c Local) Commit(height *int) (*ctypes.ResultCommit, error) {
func (Local) Commit(height *int) (*ctypes.ResultCommit, error) {
return core.Commit(height)
}
func (c Local) Validators(height *int) (*ctypes.ResultValidators, error) {
func (Local) Validators(height *int) (*ctypes.ResultValidators, error) {
return core.Validators(height)
}
func (c Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
func (Local) Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) {
return core.Tx(hash, prove)
}
func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interface{}) error {
q, err := tmquery.New(query)
if err != nil {
return errors.Wrap(err, "failed to subscribe")
}
if err = c.EventBus.Subscribe(ctx, subscriber, q, out); err != nil {
return errors.Wrap(err, "failed to subscribe")
}
c.subscriptions[query] = q
return nil
}
func (c *Local) Unsubscribe(ctx context.Context, query string) error {
q, ok := c.subscriptions[query]
if !ok {
return errors.New("subscription not found")
}
if err := c.EventBus.Unsubscribe(ctx, subscriber, q); err != nil {
return errors.Wrap(err, "failed to unsubscribe")
}
delete(c.subscriptions, query)
return nil
}
func (c *Local) UnsubscribeAll(ctx context.Context) error {
if err := c.EventBus.UnsubscribeAll(ctx, subscriber); err != nil {
return errors.Wrap(err, "failed to unsubscribe")
}
c.subscriptions = make(map[string]*tmquery.Query)
return nil
}

View File

@ -21,6 +21,7 @@ import (
"github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
)
// Client wraps arbitrary implementations of the various interfaces.
@ -33,8 +34,8 @@ type Client struct {
client.SignClient
client.HistoryClient
client.StatusClient
// create a mock with types.NewEventSwitch()
types.EventSwitch
client.EventsClient
cmn.Service
}
var _ client.Client = Client{}

View File

@ -18,7 +18,7 @@ func getHTTPClient() *client.HTTP {
return client.NewHTTP(rpcAddr, "/websocket")
}
func getLocalClient() client.Local {
func getLocalClient() *client.Local {
return client.NewLocal(node)
}

View File

@ -1,9 +1,15 @@
package core
import (
"context"
"time"
"github.com/pkg/errors"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tendermint/types"
tmtypes "github.com/tendermint/tendermint/types"
tmquery "github.com/tendermint/tmlibs/pubsub/query"
)
// Subscribe for events via WebSocket.
@ -33,14 +39,35 @@ import (
// | event | string | "" | true | Event name |
//
// <aside class="notice">WebSocket only</aside>
func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) {
logger.Info("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event)
types.AddListenerForEvent(wsCtx.GetEventSwitch(), wsCtx.GetRemoteAddr(), event, func(msg types.TMEventData) {
// NOTE: EventSwitch callbacks must be nonblocking
// NOTE: RPCResponses of subscribed events have id suffix "#event"
tmResult := &ctypes.ResultEvent{event, msg}
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Request.ID+"#event", tmResult))
})
func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) {
addr := wsCtx.GetRemoteAddr()
logger.Info("Subscribe to query", "remote", addr, "query", query)
q, err := tmquery.New(query)
if err != nil {
return nil, errors.Wrap(err, "failed to parse a query")
}
err = wsCtx.AddSubscription(query, q)
if err != nil {
return nil, errors.Wrap(err, "failed to add subscription")
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
ch := make(chan interface{})
err = eventBus.Subscribe(ctx, addr, q, ch)
if err != nil {
return nil, errors.Wrap(err, "failed to subscribe")
}
go func() {
for event := range ch {
tmResult := &ctypes.ResultEvent{query, event.(tmtypes.TMEventData)}
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Request.ID+"#event", tmResult))
}
}()
return &ctypes.ResultSubscribe{}, nil
}
@ -71,8 +98,21 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscri
// | event | string | "" | true | Event name |
//
// <aside class="notice">WebSocket only</aside>
func Unsubscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultUnsubscribe, error) {
logger.Info("Unsubscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event)
wsCtx.GetEventSwitch().RemoveListenerForEvent(event, wsCtx.GetRemoteAddr())
func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error) {
addr := wsCtx.GetRemoteAddr()
logger.Info("Unsubscribe from query", "remote", addr, "query", query)
q, ok := wsCtx.DeleteSubscription(query)
if !ok {
return nil, errors.New("subscription not found")
}
eventBus.Unsubscribe(context.Background(), addr, q.(*tmquery.Query))
return &ctypes.ResultUnsubscribe{}, nil
}
func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) {
addr := wsCtx.GetRemoteAddr()
logger.Info("Unsubscribe from all", "remote", addr)
eventBus.UnsubscribeAll(context.Background(), addr)
wsCtx.DeleteAllSubscriptions()
return &ctypes.ResultUnsubscribe{}, nil
}

View File

@ -1,9 +1,12 @@
package core
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
abci "github.com/tendermint/abci/types"
data "github.com/tendermint/go-wire/data"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
@ -147,21 +150,27 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// |-----------+------+---------+----------+-----------------|
// | tx | Tx | nil | true | The transaction |
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// subscribe to tx being committed in block
deliverTxResCh := make(chan types.EventDataTx, 1)
types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) {
deliverTxResCh <- data.Unwrap().(types.EventDataTx)
})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
deliverTxResCh := make(chan interface{})
q := types.EventQueryTx(tx)
err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh)
if err != nil {
err = errors.Wrap(err, "failed to subscribe to tx")
logger.Error("Error on broadcastTxCommit", "err", err)
return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)
}
defer eventBus.Unsubscribe(context.Background(), "mempool", q)
// broadcast the tx and register checktx callback
checkTxResCh := make(chan *abci.Response, 1)
err := mempool.CheckTx(tx, func(res *abci.Response) {
err = mempool.CheckTx(tx, func(res *abci.Response) {
checkTxResCh <- res
})
if err != nil {
logger.Error("err", "err", err)
return nil, fmt.Errorf("Error broadcasting transaction: %v", err)
logger.Error("Error on broadcastTxCommit", "err", err)
return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)
}
checkTxRes := <-checkTxResCh
checkTxR := checkTxRes.GetCheckTx()
@ -179,7 +188,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// TODO: configurable?
timer := time.NewTimer(60 * 2 * time.Second)
select {
case deliverTxRes := <-deliverTxResCh:
case deliverTxResMsg := <-deliverTxResCh:
deliverTxRes := deliverTxResMsg.(types.TMEventData).Unwrap().(types.EventDataTx)
// The tx was included in a block.
deliverTxR := &abci.ResponseDeliverTx{
Code: deliverTxRes.Code,

View File

@ -36,7 +36,6 @@ type P2P interface {
var (
// external, thread safe interfaces
eventSwitch types.EventSwitch
proxyAppQuery proxy.AppConnQuery
// interfaces defined in types and above
@ -51,14 +50,11 @@ var (
addrBook *p2p.AddrBook
txIndexer txindex.TxIndexer
consensusReactor *consensus.ConsensusReactor
eventBus *types.EventBus // thread safe
logger log.Logger
)
func SetEventSwitch(evsw types.EventSwitch) {
eventSwitch = evsw
}
func SetBlockStore(bs types.BlockStore) {
blockStore = bs
}
@ -102,3 +98,7 @@ func SetConsensusReactor(conR *consensus.ConsensusReactor) {
func SetLogger(l log.Logger) {
logger = l
}
func SetEventBus(b *types.EventBus) {
eventBus = b
}

View File

@ -7,8 +7,9 @@ import (
// TODO: better system than "unsafe" prefix
var Routes = map[string]*rpc.RPCFunc{
// subscribe/unsubscribe are reserved for websocket events.
"subscribe": rpc.NewWSRPCFunc(Subscribe, "event"),
"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "event"),
"subscribe": rpc.NewWSRPCFunc(Subscribe, "query"),
"unsubscribe": rpc.NewWSRPCFunc(Unsubscribe, "query"),
"unsubscribe_all": rpc.NewWSRPCFunc(UnsubscribeAll, ""),
// info API
"status": rpc.NewRPCFunc(Status, ""),

View File

@ -140,6 +140,6 @@ type ResultSubscribe struct{}
type ResultUnsubscribe struct{}
type ResultEvent struct {
Name string `json:"name"`
Data types.TMEventData `json:"data"`
Query string `json:"query"`
Data types.TMEventData `json:"data"`
}

View File

@ -153,7 +153,7 @@ func unmarshalResponseBytes(responseBytes []byte, result interface{}) (interface
return nil, errors.Errorf("Response error: %v", response.Error)
}
// unmarshal the RawMessage into the result
err = json.Unmarshal(*response.Result, result)
err = json.Unmarshal(response.Result, result)
if err != nil {
return nil, errors.Errorf("Error unmarshalling rpc response result: %v", err)
}

View File

@ -449,17 +449,17 @@ func (c *WSClient) readRoutine() {
///////////////////////////////////////////////////////////////////////////////
// Predefined methods
// Subscribe to an event. Note the server must have a "subscribe" route
// Subscribe to a query. Note the server must have a "subscribe" route
// defined.
func (c *WSClient) Subscribe(ctx context.Context, eventType string) error {
params := map[string]interface{}{"event": eventType}
func (c *WSClient) Subscribe(ctx context.Context, query string) error {
params := map[string]interface{}{"query": query}
return c.Call(ctx, "subscribe", params)
}
// Unsubscribe from an event. Note the server must have a "unsubscribe" route
// Unsubscribe from a query. Note the server must have a "unsubscribe" route
// defined.
func (c *WSClient) Unsubscribe(ctx context.Context, eventType string) error {
params := map[string]interface{}{"event": eventType}
func (c *WSClient) Unsubscribe(ctx context.Context, query string) error {
params := map[string]interface{}{"query": query}
return c.Call(ctx, "unsubscribe", params)
}

View File

@ -46,7 +46,7 @@ func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.mtx.RUnlock()
res := json.RawMessage(`{}`)
emptyRespBytes, _ := json.Marshal(types.RPCResponse{Result: &res})
emptyRespBytes, _ := json.Marshal(types.RPCResponse{Result: res})
if err := conn.WriteMessage(messageType, emptyRespBytes); err != nil {
return
}
@ -204,7 +204,7 @@ func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) {
if resp.Error != nil {
t.Fatalf("unexpected error: %v", resp.Error)
}
if *resp.Result != nil {
if resp.Result != nil {
wg.Done()
}
case <-c.Quit:

View File

@ -77,7 +77,7 @@ Now start the server:
```
mux := http.NewServeMux()
rpcserver.RegisterRPCFuncs(mux, Routes)
wm := rpcserver.NewWebsocketManager(Routes, nil)
wm := rpcserver.NewWebsocketManager(Routes)
mux.HandleFunc("/websocket", wm.WebsocketHandler)
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout))
go func() {

View File

@ -114,7 +114,7 @@ func setup() {
tcpLogger := logger.With("socket", "tcp")
mux := http.NewServeMux()
server.RegisterRPCFuncs(mux, Routes, tcpLogger)
wm := server.NewWebsocketManager(Routes, nil, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second))
wm := server.NewWebsocketManager(Routes, server.ReadWait(5*time.Second), server.PingPeriod(1*time.Second))
wm.SetLogger(tcpLogger)
mux.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
go func() {
@ -127,7 +127,7 @@ func setup() {
unixLogger := logger.With("socket", "unix")
mux2 := http.NewServeMux()
server.RegisterRPCFuncs(mux2, Routes, unixLogger)
wm = server.NewWebsocketManager(Routes, nil)
wm = server.NewWebsocketManager(Routes)
wm.SetLogger(unixLogger)
mux2.HandleFunc(websocketEndpoint, wm.WebsocketHandler)
go func() {
@ -223,7 +223,7 @@ func echoViaWS(cl *client.WSClient, val string) (string, error) {
}
result := new(ResultEcho)
err = json.Unmarshal(*msg.Result, result)
err = json.Unmarshal(msg.Result, result)
if err != nil {
return "", nil
}
@ -247,7 +247,7 @@ func echoBytesViaWS(cl *client.WSClient, bytes []byte) ([]byte, error) {
}
result := new(ResultEchoBytes)
err = json.Unmarshal(*msg.Result, result)
err = json.Unmarshal(msg.Result, result)
if err != nil {
return []byte{}, nil
}
@ -328,7 +328,7 @@ func TestWSNewWSRPCFunc(t *testing.T) {
t.Fatal(err)
}
result := new(ResultEcho)
err = json.Unmarshal(*msg.Result, result)
err = json.Unmarshal(msg.Result, result)
require.Nil(t, err)
got := result.Value
assert.Equal(t, got, val)
@ -353,7 +353,7 @@ func TestWSHandlesArrayParams(t *testing.T) {
t.Fatalf("%+v", err)
}
result := new(ResultEcho)
err = json.Unmarshal(*msg.Result, result)
err = json.Unmarshal(msg.Result, result)
require.Nil(t, err)
got := result.Value
assert.Equal(t, got, val)

View File

@ -18,7 +18,6 @@ import (
types "github.com/tendermint/tendermint/rpc/lib/types"
cmn "github.com/tendermint/tmlibs/common"
events "github.com/tendermint/tmlibs/events"
"github.com/tendermint/tmlibs/log"
)
@ -350,9 +349,10 @@ const (
defaultWSPingPeriod = (defaultWSReadWait * 9) / 10
)
// a single websocket connection
// contains listener id, underlying ws connection,
// and the event switch for subscribing to events
// a single websocket connection contains listener id, underlying ws
// connection, and the event switch for subscribing to events.
//
// In case of an error, the connection is stopped.
type wsConnection struct {
cmn.BaseService
@ -361,7 +361,8 @@ type wsConnection struct {
writeChan chan types.RPCResponse
funcMap map[string]*RPCFunc
evsw events.EventSwitch
subscriptions map[string]interface{}
// write channel capacity
writeChanCapacity int
@ -374,19 +375,23 @@ type wsConnection struct {
// Send pings to server with this period. Must be less than readWait, but greater than zero.
pingPeriod time.Duration
// called before stopping the connection.
onDisconnect func(remoteAddr string)
}
// NewWSConnection wraps websocket.Conn. See the commentary on the
// func(*wsConnection) functions for a detailed description of how to configure
// ping period and pong wait time.
// NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect.
// see https://github.com/gorilla/websocket/issues/97
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection {
// NewWSConnection wraps websocket.Conn.
//
// See the commentary on the func(*wsConnection) functions for a detailed
// description of how to configure ping period and pong wait time. NOTE: if the
// write buffer is full, pongs may be dropped, which may cause clients to
// disconnect. see https://github.com/gorilla/websocket/issues/97
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, options ...func(*wsConnection)) *wsConnection {
wsc := &wsConnection{
remoteAddr: baseConn.RemoteAddr().String(),
baseConn: baseConn,
funcMap: funcMap,
evsw: evsw,
subscriptions: make(map[string]interface{}),
writeWait: defaultWSWriteWait,
writeChanCapacity: defaultWSWriteChanCapacity,
readWait: defaultWSReadWait,
@ -431,7 +436,16 @@ func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
}
}
// OnStart starts the read and write routines. It blocks until the connection closes.
// OnDisconnect called before stopping the connection.
// It should only be used in the constructor - not Goroutine-safe.
func OnDisconnect(cb func(remoteAddr string)) func(*wsConnection) {
return func(wsc *wsConnection) {
wsc.onDisconnect = cb
}
}
// OnStart implements cmn.Service by starting the read and write routines. It
// blocks until the connection closes.
func (wsc *wsConnection) OnStart() error {
wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity)
@ -443,13 +457,13 @@ func (wsc *wsConnection) OnStart() error {
return nil
}
// OnStop unsubscribes from all events.
// OnStop implements cmn.Service by calling OnDisconnect callback.
func (wsc *wsConnection) OnStop() {
if wsc.evsw != nil {
wsc.evsw.RemoveListener(wsc.remoteAddr)
}
// Both read and write loops close the websocket connection when they exit their loops.
// The writeChan is never closed, to allow WriteRPCResponse() to fail.
if wsc.onDisconnect != nil {
wsc.onDisconnect(wsc.remoteAddr)
}
}
// GetRemoteAddr returns the remote address of the underlying connection.
@ -458,12 +472,6 @@ func (wsc *wsConnection) GetRemoteAddr() string {
return wsc.remoteAddr
}
// GetEventSwitch returns the event switch.
// It implements WSRPCConnection
func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
return wsc.evsw
}
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
// It implements WSRPCConnection. It is Goroutine-safe.
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
@ -487,6 +495,28 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
}
}
func (wsc *wsConnection) AddSubscription(query string, data interface{}) error {
if _, ok := wsc.subscriptions[query]; ok {
return errors.New("Already subscribed")
}
wsc.subscriptions[query] = data
return nil
}
func (wsc *wsConnection) DeleteSubscription(query string) (interface{}, bool) {
data, ok := wsc.subscriptions[query]
if ok {
delete(wsc.subscriptions, query)
return data, true
}
return nil, false
}
func (wsc *wsConnection) DeleteAllSubscriptions() {
wsc.subscriptions = make(map[string]interface{})
}
// Read from the socket and subscribe to or unsubscribe from events
func (wsc *wsConnection) readRoutine() {
defer func() {
@ -644,17 +674,16 @@ func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error
type WebsocketManager struct {
websocket.Upgrader
funcMap map[string]*RPCFunc
evsw events.EventSwitch
logger log.Logger
wsConnOptions []func(*wsConnection)
}
// NewWebsocketManager returns a new WebsocketManager that routes according to the given funcMap, listens on the given event switch,
// and connects to the server with the given connection options.
func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
// NewWebsocketManager returns a new WebsocketManager that routes according to
// the given funcMap and connects to the server with the given connection
// options.
func NewWebsocketManager(funcMap map[string]*RPCFunc, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
return &WebsocketManager{
funcMap: funcMap,
evsw: evsw,
Upgrader: websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// TODO ???
@ -681,7 +710,7 @@ func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Requ
}
// register connection
con := NewWSConnection(wsConn, wm.funcMap, wm.evsw, wm.wsConnOptions...)
con := NewWSConnection(wsConn, wm.funcMap, wm.wsConnOptions...)
con.SetLogger(wm.logger.With("remote", wsConn.RemoteAddr()))
wm.logger.Info("New websocket connection", "remote", con.remoteAddr)
con.Start() // Blocking

View File

@ -6,7 +6,6 @@ import (
"strings"
"github.com/pkg/errors"
events "github.com/tendermint/tmlibs/events"
)
//----------------------------------------
@ -68,14 +67,14 @@ func (err RPCError) Error() string {
}
type RPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Result *json.RawMessage `json:"result,omitempty"`
Error *RPCError `json:"error,omitempty"`
JSONRPC string `json:"jsonrpc"`
ID string `json:"id"`
Result json.RawMessage `json:"result,omitempty"`
Error *RPCError `json:"error,omitempty"`
}
func NewRPCSuccessResponse(id string, res interface{}) RPCResponse {
var raw *json.RawMessage
var rawMsg json.RawMessage
if res != nil {
var js []byte
@ -83,11 +82,10 @@ func NewRPCSuccessResponse(id string, res interface{}) RPCResponse {
if err != nil {
return RPCInternalError(id, errors.Wrap(err, "Error marshalling response"))
}
rawMsg := json.RawMessage(js)
raw = &rawMsg
rawMsg = json.RawMessage(js)
}
return RPCResponse{JSONRPC: "2.0", ID: id, Result: raw}
return RPCResponse{JSONRPC: "2.0", ID: id, Result: rawMsg}
}
func NewRPCErrorResponse(id string, code int, msg string, data string) RPCResponse {
@ -99,7 +97,7 @@ func NewRPCErrorResponse(id string, code int, msg string, data string) RPCRespon
}
func (resp RPCResponse) String() string {
if resp.Error == nil {
if resp.Error != nil {
return fmt.Sprintf("[%s %v]", resp.ID, resp.Result)
} else {
return fmt.Sprintf("[%s %s]", resp.ID, resp.Error)
@ -135,9 +133,12 @@ func RPCServerError(id string, err error) RPCResponse {
// *wsConnection implements this interface.
type WSRPCConnection interface {
GetRemoteAddr() string
GetEventSwitch() events.EventSwitch
WriteRPCResponse(resp RPCResponse)
TryWriteRPCResponse(resp RPCResponse) bool
AddSubscription(string, interface{}) error
DeleteSubscription(string) (interface{}, bool)
DeleteAllSubscriptions()
}
// websocket-only RPCFuncs take this as the first parameter.