From 421b3c11d43fdd01008385348c2da59ec895dad0 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 8 Feb 2019 15:56:23 +0400 Subject: [PATCH] finalize HTTPClient events interface --- rpc/client/helpers.go | 14 +---------- rpc/client/httpclient.go | 51 ++++++++++++++++++++++++++------------- rpc/client/interface.go | 12 ++++----- rpc/client/localclient.go | 40 ++++++++++++++++++++++-------- rpc/core/events.go | 14 +++++++---- 5 files changed, 79 insertions(+), 52 deletions(-) diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index e024a778..4889b074 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -5,7 +5,6 @@ import ( "time" "github.com/pkg/errors" - ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -61,17 +60,8 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - eventCh := make(chan *ctypes.ResultEvent, 1) - errCh := make(chan error, 1) - callback := func(event *ctypes.ResultEvent, err error) { - if err != nil { - errCh <- err - } else { - eventCh <- event - } - } // register for the next event of this type - err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String(), callback) + eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String()) if err != nil { return nil, errors.Wrap(err, "failed to subscribe") } @@ -81,8 +71,6 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type select { case event := <-eventCh: return event.Data.(types.TMEventData), nil - case err := <-errCh: - return nil, err case <-ctx.Done(): return nil, errors.New("timed out waiting for event") } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index b94a372d..cd2f3363 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -256,8 +256,8 @@ type WSEvents struct { ws *rpcclient.WSClient mtx sync.RWMutex - // query -> EventCallback - subscriptions map[string]EventCallback + // query -> chan + subscriptions map[string]chan ctypes.ResultEvent } func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { @@ -265,7 +265,7 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { cdc: cdc, endpoint: endpoint, remote: remote, - subscriptions: make(map[string]EventCallback), + subscriptions: make(map[string]chan ctypes.ResultEvent), } wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents) @@ -298,19 +298,25 @@ func (w *WSEvents) OnStop() { // Subscribe implements EventsClient by using WSClient to subscribe given // subscriber to query. func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string, - callback EventCallback) error { + outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { if err := w.ws.Subscribe(ctx, query); err != nil { - return err + return nil, err } + outCap := 0 + if len(outCapacity) > 0 { + outCap = outCapacity[0] + } + + outc := make(chan ctypes.ResultEvent, outCap) w.mtx.Lock() // subscriber param is ignored because Tendermint will override it with // remote IP anyway. - w.subscriptions[query] = callback + w.subscriptions[query] = outc w.mtx.Unlock() - return nil + return outc, nil } // Unsubscribe implements EventsClient by using WSClient to unsubscribe given @@ -321,10 +327,9 @@ func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) er } w.mtx.Lock() - callback, ok := w.subscriptions[query] + out, ok := w.subscriptions[query] if ok { - // TODO: ErrUnsubscribed - callback(nil, errors.New("unsubscribed")) + close(out) delete(w.subscriptions, query) } w.mtx.Unlock() @@ -340,11 +345,10 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error } w.mtx.Lock() - for _, callback := range w.subscriptions { - // TODO: ErrUnsubscribed - callback(nil, errors.New("unsubscribed")) + for _, out := range w.subscriptions { + close(out) } - w.subscriptions = make(map[string]EventCallback) + w.subscriptions = make(map[string]chan ctypes.ResultEvent) w.mtx.Unlock() return nil @@ -371,22 +375,35 @@ func (w *WSEvents) eventListener() { if !ok { return } + if resp.Error != nil { w.Logger.Error("WS error", "err", resp.Error.Error()) - // TODO: if err==ErrUnsubscribed, make sure to call user's callback + // we don't know which subscription failed, so redo all of them + // resubscribe with exponential timeout + w.redoSubscriptions() continue } + result := new(ctypes.ResultEvent) err := w.cdc.UnmarshalJSON(resp.Result, result) if err != nil { w.Logger.Error("failed to unmarshal response", "err", err) continue } + // NOTE: writing also happens inside mutex so we can't close a channel in // Unsubscribe/UnsubscribeAll. w.mtx.RLock() - if callback, ok := w.subscriptions[result.Query]; ok { - callback(result, nil) + if out, ok := w.subscriptions[result.Query]; ok { + if cap(out) == 0 { + out <- *result + } else { + select { + case out <- *result: + default: + w.Logger.Error("wanted to publish ResultEvent, but out channel is full", "ResultEvent", result, "query", result.Query) + } + } } w.mtx.RUnlock() case <-w.Quit(): diff --git a/rpc/client/interface.go b/rpc/client/interface.go index 5e53b8d8..b7f91dda 100644 --- a/rpc/client/interface.go +++ b/rpc/client/interface.go @@ -90,19 +90,17 @@ type NetworkClient interface { Health() (*ctypes.ResultHealth, error) } -// EventCallback is used by Subscribe to deliver events. -type EventCallback func(event *ctypes.ResultEvent, err error) - // EventsClient is reactive, you can subscribe to any message, given the proper // string. see tendermint/types/events.go type EventsClient interface { - // Subscribe subscribes given subscriber to query. When a matching event is - // published, callback is called (err is nil). When/if subscription is - // terminated, callback is called again with non-nil error. + // Subscribe subscribes given subscriber to query. Returns an unbuffered + // channel onto which events are published. An error is returned if it fails + // to subscribe. outCapacity can be used optionally to set capacity for the + // channel. Channel is never closed to prevent accidental reads. // // ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe // or UnsubscribeAll. - Subscribe(ctx context.Context, subscriber, query string, callback EventCallback) error + Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) // Unsubscribe unsubscribes given subscriber from query. Unsubscribe(ctx context.Context, subscriber, query string) error // UnsubscribeAll unsubscribes given subscriber from all the queries. diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index a839f11c..ddb459f1 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" cmn "github.com/tendermint/tendermint/libs/common" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" nm "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/rpc/core" @@ -144,31 +145,50 @@ func (Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.Resu // Subscribe implements EventsClient by using local eventBus to subscribe given // subscriber to query. -func (c *Local) Subscribe(ctx context.Context, subscriber, query string, callback EventCallback) error { +func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) { q, err := tmquery.New(query) if err != nil { - return errors.Wrap(err, "failed to parse query") + return nil, errors.Wrap(err, "failed to parse query") } sub, err := c.EventBus.Subscribe(ctx, subscriber, q) if err != nil { - return errors.Wrap(err, "failed to subscribe") + return nil, errors.Wrap(err, "failed to subscribe") } - go func() { + outCap := 0 + if len(outCapacity) > 0 { + outCap = outCapacity[0] + } + + outc := make(chan ctypes.ResultEvent, outCap) + go func(sub types.Subscription) { for { select { case msg := <-sub.Out(): - // can panic - callback(&ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}, nil) + if cap(out) == 0 { + outc <- ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()} + } else { + select { + case outc <- ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}: + default: + // XXX: log error + } + } case <-sub.Cancelled(): - // can panic - callback(nil, sub.Err()) + if sub.Err() != tmpubsub.ErrUnsubscribed { + // resubscribe with exponential timeout + var err error + sub, err = c.EventBus.Subscribe(ctx, subscriber, q) + if err != nil { + // TODO + } + } return } } - }() + }(sub) - return nil + return outc, nil } // Unsubscribe implements EventsClient by using local eventBus to unsubscribe diff --git a/rpc/core/events.go b/rpc/core/events.go index f3254256..e2a7b3c4 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" + tmpubsub "github.com/tendermint/tendermint/libs/pubsub" tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpctypes "github.com/tendermint/tendermint/rpc/lib/types" @@ -125,11 +126,14 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri resultEvent, )) case <-sub.Cancelled(): - wsCtx.TryWriteRPCResponse( - rpctypes.RPCServerError(rpctypes.JSONRPCStringID( - fmt.Sprintf("%v#event", wsCtx.Request.ID)), - fmt.Errorf("subscription was cancelled (reason: %v)", sub.Err()), - )) + if sub.Err() != tmpubsub.ErrUnsubscribed { + // should not happen + wsCtx.TryWriteRPCResponse( + rpctypes.RPCServerError(rpctypes.JSONRPCStringID( + fmt.Sprintf("%v#event", wsCtx.Request.ID)), + fmt.Errorf("subscription was cancelled (reason: %v).", sub.Err()), + )) + } return } }