new HTTPClient interface for subscriptions

This commit is contained in:
Anton Kaliaev 2019-02-07 18:53:59 +04:00
parent 0cddfdc81d
commit 3966988bcc
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
4 changed files with 100 additions and 83 deletions

View File

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -60,8 +61,17 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
eventCh := make(chan *ctypes.ResultEvent, 1)
errCh := make(chan error, 1)
callback := func(event *ctypes.ResultEvent, err error) {
if err != nil {
errCh <- err
} else {
eventCh <- event
}
}
// register for the next event of this type // register for the next event of this type
sub, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp)) err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String(), callback)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to subscribe") return nil, errors.Wrap(err, "failed to subscribe")
} }
@ -69,10 +79,10 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
defer c.UnsubscribeAll(ctx, subscriber) defer c.UnsubscribeAll(ctx, subscriber)
select { select {
case msg := <-sub.Out(): case event := <-eventCh:
return msg.Data().(types.TMEventData), nil return event.Data.(types.TMEventData), nil
case <-sub.Cancelled(): case err := <-errCh:
return nil, errors.New("subscription was cancelled") return nil, err
case <-ctx.Done(): case <-ctx.Done():
return nil, errors.New("timed out waiting for event") return nil, errors.New("timed out waiting for event")
} }

View File

@ -8,7 +8,6 @@ import (
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcclient "github.com/tendermint/tendermint/rpc/lib/client" rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -249,28 +248,6 @@ func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) {
/** websocket event stuff here... **/ /** websocket event stuff here... **/
type subscription struct {
out chan tmpubsub.Message
cancelled chan struct{}
mtx sync.RWMutex
err error
}
func (s *subscription) Out() <-chan tmpubsub.Message {
return s.out
}
func (s *subscription) Cancelled() <-chan struct{} {
return s.cancelled
}
func (s *subscription) Err() error {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.err
}
type WSEvents struct { type WSEvents struct {
cmn.BaseService cmn.BaseService
cdc *amino.Codec cdc *amino.Codec
@ -279,8 +256,8 @@ type WSEvents struct {
ws *rpcclient.WSClient ws *rpcclient.WSClient
mtx sync.RWMutex mtx sync.RWMutex
// query -> subscription // query -> EventCallback
subscriptions map[string]*subscription subscriptions map[string]EventCallback
} }
func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
@ -288,7 +265,7 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
cdc: cdc, cdc: cdc,
endpoint: endpoint, endpoint: endpoint,
remote: remote, remote: remote,
subscriptions: make(map[string]*subscription), subscriptions: make(map[string]EventCallback),
} }
wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents) wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents)
@ -318,77 +295,56 @@ func (w *WSEvents) OnStop() {
} }
} }
// TODO: remove // Subscribe implements EventsClient by using WSClient to subscribe given
func (w *WSEvents) NumClients() int { // subscriber to query.
return 1 func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string,
} callback EventCallback) error {
// TODO: remove if err := w.ws.Subscribe(ctx, query); err != nil {
func (w *WSEvents) NumClientSubscriptions(clientID string) int { return err
return len(w.subscriptions)
}
func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) {
q := query.String()
err := w.ws.Subscribe(ctx, q)
if err != nil {
return nil, err
}
outCap := 1
if len(outCapacity) > 0 && outCapacity[0] >= 0 {
outCap = outCapacity[0]
} }
w.mtx.Lock() w.mtx.Lock()
// subscriber param is ignored because Tendermint will override it with // subscriber param is ignored because Tendermint will override it with
// remote IP anyway. // remote IP anyway.
w.subscriptions[q] = &subscription{ w.subscriptions[query] = callback
out: make(chan tmpubsub.Message, outCap),
cancelled: make(chan struct{}),
}
w.mtx.Unlock() w.mtx.Unlock()
return w.subscriptions[q], nil return nil
} }
func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { // Unsubscribe implements EventsClient by using WSClient to unsubscribe given
q := query.String() // subscriber from query.
func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) error {
err := w.ws.Unsubscribe(ctx, q) if err := w.ws.Unsubscribe(ctx, query); err != nil {
if err != nil {
return err return err
} }
w.mtx.Lock() w.mtx.Lock()
sub, ok := w.subscriptions[q] callback, ok := w.subscriptions[query]
if ok { if ok {
close(sub.cancelled) // TODO: ErrUnsubscribed
sub.mtx.Lock() callback(nil, errors.New("unsubscribed"))
sub.err = errors.New("unsubscribed") delete(w.subscriptions, query)
sub.mtx.Unlock()
delete(w.subscriptions, q)
} }
w.mtx.Unlock() w.mtx.Unlock()
return nil return nil
} }
// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe
// given subscriber from all the queries.
func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error {
err := w.ws.UnsubscribeAll(ctx) if err := w.ws.UnsubscribeAll(ctx); err != nil {
if err != nil {
return err return err
} }
w.mtx.Lock() w.mtx.Lock()
for _, sub := range w.subscriptions { for _, callback := range w.subscriptions {
close(sub.cancelled) // TODO: ErrUnsubscribed
sub.mtx.Lock() callback(nil, errors.New("unsubscribed"))
sub.err = errors.New("unsubscribed")
sub.mtx.Unlock()
} }
w.subscriptions = make(map[string]*subscription) w.subscriptions = make(map[string]EventCallback)
w.mtx.Unlock() w.mtx.Unlock()
return nil return nil
@ -417,6 +373,7 @@ func (w *WSEvents) eventListener() {
} }
if resp.Error != nil { if resp.Error != nil {
w.Logger.Error("WS error", "err", resp.Error.Error()) w.Logger.Error("WS error", "err", resp.Error.Error())
// TODO: if err==ErrUnsubscribed, make sure to call user's callback
continue continue
} }
result := new(ctypes.ResultEvent) result := new(ctypes.ResultEvent)
@ -428,8 +385,8 @@ func (w *WSEvents) eventListener() {
// NOTE: writing also happens inside mutex so we can't close a channel in // NOTE: writing also happens inside mutex so we can't close a channel in
// Unsubscribe/UnsubscribeAll. // Unsubscribe/UnsubscribeAll.
w.mtx.RLock() w.mtx.RLock()
if sub, ok := w.subscriptions[result.Query]; ok { if callback, ok := w.subscriptions[result.Query]; ok {
sub.out <- tmpubsub.NewMessage(result.Data, result.Tags) callback(result, nil)
} }
w.mtx.RUnlock() w.mtx.RUnlock()
case <-w.Quit(): case <-w.Quit():

View File

@ -21,6 +21,8 @@ implementation.
*/ */
import ( import (
"context"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -88,10 +90,23 @@ type NetworkClient interface {
Health() (*ctypes.ResultHealth, error) Health() (*ctypes.ResultHealth, error)
} }
// EventCallback is used by Subscribe to deliver events.
type EventCallback func(event *ctypes.ResultEvent, err error)
// EventsClient is reactive, you can subscribe to any message, given the proper // EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go // string. see tendermint/types/events.go
type EventsClient interface { type EventsClient interface {
types.EventBusSubscriber // Subscribe subscribes given subscriber to query. When a matching event is
// published, callback is called (err is nil). When/if subscription is
// terminated, callback is called again with non-nil error.
//
// ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe
// or UnsubscribeAll.
Subscribe(ctx context.Context, subscriber, query string, callback EventCallback) error
// Unsubscribe unsubscribes given subscriber from query.
Unsubscribe(ctx context.Context, subscriber, query string) error
// UnsubscribeAll unsubscribes given subscriber from all the queries.
UnsubscribeAll(ctx context.Context, subscriber string) error
} }
// MempoolClient shows us data about current mempool state. // MempoolClient shows us data about current mempool state.

View File

@ -3,8 +3,10 @@ package client
import ( import (
"context" "context"
"github.com/pkg/errors"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
nm "github.com/tendermint/tendermint/node" nm "github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/rpc/core" "github.com/tendermint/tendermint/rpc/core"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
@ -140,14 +142,47 @@ func (Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.Resu
return core.TxSearch(query, prove, page, perPage) return core.TxSearch(query, prove, page, perPage)
} }
func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) { // Subscribe implements EventsClient by using local eventBus to subscribe given
return c.EventBus.Subscribe(ctx, subscriber, query, outCapacity...) // subscriber to query.
func (c *Local) Subscribe(ctx context.Context, subscriber, query string, callback EventCallback) error {
q, err := tmquery.New(query)
if err != nil {
return errors.Wrap(err, "failed to parse query")
}
sub, err := c.EventBus.Subscribe(ctx, subscriber, q)
if err != nil {
return errors.Wrap(err, "failed to subscribe")
}
go func() {
for {
select {
case msg := <-sub.Out():
// can panic
callback(&ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}, nil)
case <-sub.Cancelled():
// can panic
callback(nil, sub.Err())
return
}
}
}()
return nil
} }
func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { // Unsubscribe implements EventsClient by using local eventBus to unsubscribe
return c.EventBus.Unsubscribe(ctx, subscriber, query) // given subscriber from query.
func (c *Local) Unsubscribe(ctx context.Context, subscriber, query string) error {
q, err := tmquery.New(query)
if err != nil {
return errors.Wrap(err, "failed to parse query")
}
return c.EventBus.Unsubscribe(ctx, subscriber, q)
} }
// UnsubscribeAll implements EventsClient by using local eventBus to
// unsubscribe given subscriber from all queries.
func (c *Local) UnsubscribeAll(ctx context.Context, subscriber string) error { func (c *Local) UnsubscribeAll(ctx context.Context, subscriber string) error {
return c.EventBus.UnsubscribeAll(ctx, subscriber) return c.EventBus.UnsubscribeAll(ctx, subscriber)
} }