From 3966988bcc310a032cf7b2d9dbd887ce345c177e Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Thu, 7 Feb 2019 18:53:59 +0400 Subject: [PATCH] new HTTPClient interface for subscriptions --- rpc/client/helpers.go | 20 ++++++-- rpc/client/httpclient.go | 101 +++++++++++--------------------------- rpc/client/interface.go | 17 ++++++- rpc/client/localclient.go | 45 +++++++++++++++-- 4 files changed, 100 insertions(+), 83 deletions(-) diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index ec63fb3b..e024a778 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -5,6 +5,7 @@ import ( "time" "github.com/pkg/errors" + ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -60,8 +61,17 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() + eventCh := make(chan *ctypes.ResultEvent, 1) + errCh := make(chan error, 1) + callback := func(event *ctypes.ResultEvent, err error) { + if err != nil { + errCh <- err + } else { + eventCh <- event + } + } // register for the next event of this type - sub, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp)) + err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String(), callback) if err != nil { return nil, errors.Wrap(err, "failed to subscribe") } @@ -69,10 +79,10 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type defer c.UnsubscribeAll(ctx, subscriber) select { - case msg := <-sub.Out(): - return msg.Data().(types.TMEventData), nil - case <-sub.Cancelled(): - return nil, errors.New("subscription was cancelled") + case event := <-eventCh: + return event.Data.(types.TMEventData), nil + case err := <-errCh: + return nil, err case <-ctx.Done(): return nil, errors.New("timed out waiting for event") } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 8bec9e5e..b94a372d 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -8,7 +8,6 @@ import ( amino "github.com/tendermint/go-amino" cmn "github.com/tendermint/tendermint/libs/common" - tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpcclient "github.com/tendermint/tendermint/rpc/lib/client" "github.com/tendermint/tendermint/types" @@ -249,28 +248,6 @@ func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) { /** websocket event stuff here... **/ -type subscription struct { - out chan tmpubsub.Message - cancelled chan struct{} - - mtx sync.RWMutex - err error -} - -func (s *subscription) Out() <-chan tmpubsub.Message { - return s.out -} - -func (s *subscription) Cancelled() <-chan struct{} { - return s.cancelled -} - -func (s *subscription) Err() error { - s.mtx.RLock() - defer s.mtx.RUnlock() - return s.err -} - type WSEvents struct { cmn.BaseService cdc *amino.Codec @@ -279,8 +256,8 @@ type WSEvents struct { ws *rpcclient.WSClient mtx sync.RWMutex - // query -> subscription - subscriptions map[string]*subscription + // query -> EventCallback + subscriptions map[string]EventCallback } func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { @@ -288,7 +265,7 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { cdc: cdc, endpoint: endpoint, remote: remote, - subscriptions: make(map[string]*subscription), + subscriptions: make(map[string]EventCallback), } wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents) @@ -318,77 +295,56 @@ func (w *WSEvents) OnStop() { } } -// TODO: remove -func (w *WSEvents) NumClients() int { - return 1 -} +// Subscribe implements EventsClient by using WSClient to subscribe given +// subscriber to query. +func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string, + callback EventCallback) error { -// TODO: remove -func (w *WSEvents) NumClientSubscriptions(clientID string) int { - return len(w.subscriptions) -} - -func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) { - q := query.String() - - err := w.ws.Subscribe(ctx, q) - if err != nil { - return nil, err - } - - outCap := 1 - if len(outCapacity) > 0 && outCapacity[0] >= 0 { - outCap = outCapacity[0] + if err := w.ws.Subscribe(ctx, query); err != nil { + return err } w.mtx.Lock() // subscriber param is ignored because Tendermint will override it with // remote IP anyway. - w.subscriptions[q] = &subscription{ - out: make(chan tmpubsub.Message, outCap), - cancelled: make(chan struct{}), - } + w.subscriptions[query] = callback w.mtx.Unlock() - return w.subscriptions[q], nil + return nil } -func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { - q := query.String() - - err := w.ws.Unsubscribe(ctx, q) - if err != nil { +// Unsubscribe implements EventsClient by using WSClient to unsubscribe given +// subscriber from query. +func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) error { + if err := w.ws.Unsubscribe(ctx, query); err != nil { return err } w.mtx.Lock() - sub, ok := w.subscriptions[q] + callback, ok := w.subscriptions[query] if ok { - close(sub.cancelled) - sub.mtx.Lock() - sub.err = errors.New("unsubscribed") - sub.mtx.Unlock() - delete(w.subscriptions, q) + // TODO: ErrUnsubscribed + callback(nil, errors.New("unsubscribed")) + delete(w.subscriptions, query) } w.mtx.Unlock() return nil } +// UnsubscribeAll implements EventsClient by using WSClient to unsubscribe +// given subscriber from all the queries. func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error { - err := w.ws.UnsubscribeAll(ctx) - if err != nil { + if err := w.ws.UnsubscribeAll(ctx); err != nil { return err } w.mtx.Lock() - for _, sub := range w.subscriptions { - close(sub.cancelled) - sub.mtx.Lock() - sub.err = errors.New("unsubscribed") - sub.mtx.Unlock() + for _, callback := range w.subscriptions { + // TODO: ErrUnsubscribed + callback(nil, errors.New("unsubscribed")) } - w.subscriptions = make(map[string]*subscription) + w.subscriptions = make(map[string]EventCallback) w.mtx.Unlock() return nil @@ -417,6 +373,7 @@ func (w *WSEvents) eventListener() { } if resp.Error != nil { w.Logger.Error("WS error", "err", resp.Error.Error()) + // TODO: if err==ErrUnsubscribed, make sure to call user's callback continue } result := new(ctypes.ResultEvent) @@ -428,8 +385,8 @@ func (w *WSEvents) eventListener() { // NOTE: writing also happens inside mutex so we can't close a channel in // Unsubscribe/UnsubscribeAll. w.mtx.RLock() - if sub, ok := w.subscriptions[result.Query]; ok { - sub.out <- tmpubsub.NewMessage(result.Data, result.Tags) + if callback, ok := w.subscriptions[result.Query]; ok { + callback(result, nil) } w.mtx.RUnlock() case <-w.Quit(): diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 7477225e..5e53b8d8 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -21,6 +21,8 @@ implementation. */ import ( + "context" + cmn "github.com/tendermint/tendermint/libs/common" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" @@ -88,10 +90,23 @@ type NetworkClient interface { Health() (*ctypes.ResultHealth, error) } +// EventCallback is used by Subscribe to deliver events. +type EventCallback func(event *ctypes.ResultEvent, err error) + // EventsClient is reactive, you can subscribe to any message, given the proper // string. see tendermint/types/events.go type EventsClient interface { - types.EventBusSubscriber + // Subscribe subscribes given subscriber to query. When a matching event is + // published, callback is called (err is nil). When/if subscription is + // terminated, callback is called again with non-nil error. + // + // ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe + // or UnsubscribeAll. + Subscribe(ctx context.Context, subscriber, query string, callback EventCallback) error + // Unsubscribe unsubscribes given subscriber from query. + Unsubscribe(ctx context.Context, subscriber, query string) error + // UnsubscribeAll unsubscribes given subscriber from all the queries. + UnsubscribeAll(ctx context.Context, subscriber string) error } // MempoolClient shows us data about current mempool state. diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index 33a1ce22..a839f11c 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -3,8 +3,10 @@ package client import ( "context" + "github.com/pkg/errors" + cmn "github.com/tendermint/tendermint/libs/common" - tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + tmquery "github.com/tendermint/tendermint/libs/pubsub/query" nm "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -140,14 +142,47 @@ func (Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.Resu return core.TxSearch(query, prove, page, perPage) } -func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) { - return c.EventBus.Subscribe(ctx, subscriber, query, outCapacity...) +// Subscribe implements EventsClient by using local eventBus to subscribe given +// subscriber to query. +func (c *Local) Subscribe(ctx context.Context, subscriber, query string, callback EventCallback) error { + q, err := tmquery.New(query) + if err != nil { + return errors.Wrap(err, "failed to parse query") + } + sub, err := c.EventBus.Subscribe(ctx, subscriber, q) + if err != nil { + return errors.Wrap(err, "failed to subscribe") + } + + go func() { + for { + select { + case msg := <-sub.Out(): + // can panic + callback(&ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}, nil) + case <-sub.Cancelled(): + // can panic + callback(nil, sub.Err()) + return + } + } + }() + + return nil } -func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { - return c.EventBus.Unsubscribe(ctx, subscriber, query) +// Unsubscribe implements EventsClient by using local eventBus to unsubscribe +// given subscriber from query. +func (c *Local) Unsubscribe(ctx context.Context, subscriber, query string) error { + q, err := tmquery.New(query) + if err != nil { + return errors.Wrap(err, "failed to parse query") + } + return c.EventBus.Unsubscribe(ctx, subscriber, q) } +// UnsubscribeAll implements EventsClient by using local eventBus to +// unsubscribe given subscriber from all queries. func (c *Local) UnsubscribeAll(ctx context.Context, subscriber string) error { return c.EventBus.UnsubscribeAll(ctx, subscriber) }