finalize HTTPClient events interface

This commit is contained in:
Anton Kaliaev
2019-02-08 15:56:23 +04:00
parent 3966988bcc
commit 421b3c11d4
5 changed files with 79 additions and 52 deletions

View File

@ -5,7 +5,6 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -61,17 +60,8 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
ctx, cancel := context.WithTimeout(context.Background(), timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel() defer cancel()
eventCh := make(chan *ctypes.ResultEvent, 1)
errCh := make(chan error, 1)
callback := func(event *ctypes.ResultEvent, err error) {
if err != nil {
errCh <- err
} else {
eventCh <- event
}
}
// register for the next event of this type // register for the next event of this type
err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String(), callback) eventCh, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp).String())
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to subscribe") return nil, errors.Wrap(err, "failed to subscribe")
} }
@ -81,8 +71,6 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
select { select {
case event := <-eventCh: case event := <-eventCh:
return event.Data.(types.TMEventData), nil return event.Data.(types.TMEventData), nil
case err := <-errCh:
return nil, err
case <-ctx.Done(): case <-ctx.Done():
return nil, errors.New("timed out waiting for event") return nil, errors.New("timed out waiting for event")
} }

View File

@ -256,8 +256,8 @@ type WSEvents struct {
ws *rpcclient.WSClient ws *rpcclient.WSClient
mtx sync.RWMutex mtx sync.RWMutex
// query -> EventCallback // query -> chan
subscriptions map[string]EventCallback subscriptions map[string]chan ctypes.ResultEvent
} }
func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents { func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
@ -265,7 +265,7 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
cdc: cdc, cdc: cdc,
endpoint: endpoint, endpoint: endpoint,
remote: remote, remote: remote,
subscriptions: make(map[string]EventCallback), subscriptions: make(map[string]chan ctypes.ResultEvent),
} }
wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents) wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents)
@ -298,19 +298,25 @@ func (w *WSEvents) OnStop() {
// Subscribe implements EventsClient by using WSClient to subscribe given // Subscribe implements EventsClient by using WSClient to subscribe given
// subscriber to query. // subscriber to query.
func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string, func (w *WSEvents) Subscribe(ctx context.Context, subscriber, query string,
callback EventCallback) error { outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
if err := w.ws.Subscribe(ctx, query); err != nil { if err := w.ws.Subscribe(ctx, query); err != nil {
return err return nil, err
} }
outCap := 0
if len(outCapacity) > 0 {
outCap = outCapacity[0]
}
outc := make(chan ctypes.ResultEvent, outCap)
w.mtx.Lock() w.mtx.Lock()
// subscriber param is ignored because Tendermint will override it with // subscriber param is ignored because Tendermint will override it with
// remote IP anyway. // remote IP anyway.
w.subscriptions[query] = callback w.subscriptions[query] = outc
w.mtx.Unlock() w.mtx.Unlock()
return nil return outc, nil
} }
// Unsubscribe implements EventsClient by using WSClient to unsubscribe given // Unsubscribe implements EventsClient by using WSClient to unsubscribe given
@ -321,10 +327,9 @@ func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber, query string) er
} }
w.mtx.Lock() w.mtx.Lock()
callback, ok := w.subscriptions[query] out, ok := w.subscriptions[query]
if ok { if ok {
// TODO: ErrUnsubscribed close(out)
callback(nil, errors.New("unsubscribed"))
delete(w.subscriptions, query) delete(w.subscriptions, query)
} }
w.mtx.Unlock() w.mtx.Unlock()
@ -340,11 +345,10 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error
} }
w.mtx.Lock() w.mtx.Lock()
for _, callback := range w.subscriptions { for _, out := range w.subscriptions {
// TODO: ErrUnsubscribed close(out)
callback(nil, errors.New("unsubscribed"))
} }
w.subscriptions = make(map[string]EventCallback) w.subscriptions = make(map[string]chan ctypes.ResultEvent)
w.mtx.Unlock() w.mtx.Unlock()
return nil return nil
@ -371,22 +375,35 @@ func (w *WSEvents) eventListener() {
if !ok { if !ok {
return return
} }
if resp.Error != nil { if resp.Error != nil {
w.Logger.Error("WS error", "err", resp.Error.Error()) w.Logger.Error("WS error", "err", resp.Error.Error())
// TODO: if err==ErrUnsubscribed, make sure to call user's callback // we don't know which subscription failed, so redo all of them
// resubscribe with exponential timeout
w.redoSubscriptions()
continue continue
} }
result := new(ctypes.ResultEvent) result := new(ctypes.ResultEvent)
err := w.cdc.UnmarshalJSON(resp.Result, result) err := w.cdc.UnmarshalJSON(resp.Result, result)
if err != nil { if err != nil {
w.Logger.Error("failed to unmarshal response", "err", err) w.Logger.Error("failed to unmarshal response", "err", err)
continue continue
} }
// NOTE: writing also happens inside mutex so we can't close a channel in // NOTE: writing also happens inside mutex so we can't close a channel in
// Unsubscribe/UnsubscribeAll. // Unsubscribe/UnsubscribeAll.
w.mtx.RLock() w.mtx.RLock()
if callback, ok := w.subscriptions[result.Query]; ok { if out, ok := w.subscriptions[result.Query]; ok {
callback(result, nil) if cap(out) == 0 {
out <- *result
} else {
select {
case out <- *result:
default:
w.Logger.Error("wanted to publish ResultEvent, but out channel is full", "ResultEvent", result, "query", result.Query)
}
}
} }
w.mtx.RUnlock() w.mtx.RUnlock()
case <-w.Quit(): case <-w.Quit():

View File

@ -90,19 +90,17 @@ type NetworkClient interface {
Health() (*ctypes.ResultHealth, error) Health() (*ctypes.ResultHealth, error)
} }
// EventCallback is used by Subscribe to deliver events.
type EventCallback func(event *ctypes.ResultEvent, err error)
// EventsClient is reactive, you can subscribe to any message, given the proper // EventsClient is reactive, you can subscribe to any message, given the proper
// string. see tendermint/types/events.go // string. see tendermint/types/events.go
type EventsClient interface { type EventsClient interface {
// Subscribe subscribes given subscriber to query. When a matching event is // Subscribe subscribes given subscriber to query. Returns an unbuffered
// published, callback is called (err is nil). When/if subscription is // channel onto which events are published. An error is returned if it fails
// terminated, callback is called again with non-nil error. // to subscribe. outCapacity can be used optionally to set capacity for the
// channel. Channel is never closed to prevent accidental reads.
// //
// ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe // ctx cannot be used to unsubscribe. To unsubscribe, use either Unsubscribe
// or UnsubscribeAll. // or UnsubscribeAll.
Subscribe(ctx context.Context, subscriber, query string, callback EventCallback) error Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error)
// Unsubscribe unsubscribes given subscriber from query. // Unsubscribe unsubscribes given subscriber from query.
Unsubscribe(ctx context.Context, subscriber, query string) error Unsubscribe(ctx context.Context, subscriber, query string) error
// UnsubscribeAll unsubscribes given subscriber from all the queries. // UnsubscribeAll unsubscribes given subscriber from all the queries.

View File

@ -6,6 +6,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query" tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
nm "github.com/tendermint/tendermint/node" nm "github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/rpc/core" "github.com/tendermint/tendermint/rpc/core"
@ -144,31 +145,50 @@ func (Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.Resu
// Subscribe implements EventsClient by using local eventBus to subscribe given // Subscribe implements EventsClient by using local eventBus to subscribe given
// subscriber to query. // subscriber to query.
func (c *Local) Subscribe(ctx context.Context, subscriber, query string, callback EventCallback) error { func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapacity ...int) (out <-chan ctypes.ResultEvent, err error) {
q, err := tmquery.New(query) q, err := tmquery.New(query)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to parse query") return nil, errors.Wrap(err, "failed to parse query")
} }
sub, err := c.EventBus.Subscribe(ctx, subscriber, q) sub, err := c.EventBus.Subscribe(ctx, subscriber, q)
if err != nil { if err != nil {
return errors.Wrap(err, "failed to subscribe") return nil, errors.Wrap(err, "failed to subscribe")
} }
go func() { outCap := 0
if len(outCapacity) > 0 {
outCap = outCapacity[0]
}
outc := make(chan ctypes.ResultEvent, outCap)
go func(sub types.Subscription) {
for { for {
select { select {
case msg := <-sub.Out(): case msg := <-sub.Out():
// can panic if cap(out) == 0 {
callback(&ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}, nil) outc <- ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}
} else {
select {
case outc <- ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}:
default:
// XXX: log error
}
}
case <-sub.Cancelled(): case <-sub.Cancelled():
// can panic if sub.Err() != tmpubsub.ErrUnsubscribed {
callback(nil, sub.Err()) // resubscribe with exponential timeout
var err error
sub, err = c.EventBus.Subscribe(ctx, subscriber, q)
if err != nil {
// TODO
}
}
return return
} }
} }
}() }(sub)
return nil return outc, nil
} }
// Unsubscribe implements EventsClient by using local eventBus to unsubscribe // Unsubscribe implements EventsClient by using local eventBus to unsubscribe

View File

@ -6,6 +6,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query" tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types" rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
@ -125,11 +126,14 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
resultEvent, resultEvent,
)) ))
case <-sub.Cancelled(): case <-sub.Cancelled():
wsCtx.TryWriteRPCResponse( if sub.Err() != tmpubsub.ErrUnsubscribed {
rpctypes.RPCServerError(rpctypes.JSONRPCStringID( // should not happen
fmt.Sprintf("%v#event", wsCtx.Request.ID)), wsCtx.TryWriteRPCResponse(
fmt.Errorf("subscription was cancelled (reason: %v)", sub.Err()), rpctypes.RPCServerError(rpctypes.JSONRPCStringID(
)) fmt.Sprintf("%v#event", wsCtx.Request.ID)),
fmt.Errorf("subscription was cancelled (reason: %v).", sub.Err()),
))
}
return return
} }
} }