diff --git a/CHANGELOG.md b/CHANGELOG.md index c5be0877..055c7fb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ FEATURES: - Tooling to run multiple blockchains/apps, possibly in a single process - State syncing (without transaction replay) - Add authentication and rate-limitting to the RPC +- new unsubscribe_all WebSocket RPC endpoint IMPROVEMENTS: - Improve subtleties around mempool caching and logic diff --git a/consensus/reactor.go b/consensus/reactor.go index 88f3e328..1568e37a 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -328,7 +328,7 @@ func (conR *ConsensusReactor) FastSync() bool { // broadcastNewRoundStepsAndVotes subscribes for new round steps and votes // using the event bus and broadcasts events to peers upon receiving them. func (conR *ConsensusReactor) broadcastNewRoundStepsAndVotes() error { - subscriber := "consensus-reactor" + const subscriber = "consensus-reactor" ctx := context.Background() // new round steps diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 3bdd349e..6b52b5b0 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -20,6 +20,11 @@ import ( dbm "github.com/tendermint/tmlibs/db" ) +const ( + // event bus subscriber + subscriber = "replay-file" +) + //-------------------------------------------------------- // replay messages interactively or all at once @@ -47,11 +52,11 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { newStepCh := make(chan interface{}, 1) ctx := context.Background() - err := cs.eventBus.Subscribe(ctx, "replay-file", types.EventQueryNewRoundStep, newStepCh) + err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh) if err != nil { - return errors.Errorf("failed to subscribe replay-file to %v", types.EventQueryNewRoundStep) + return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep) } - defer cs.eventBus.Unsubscribe(ctx, "replay-file", types.EventQueryNewRoundStep) + defer cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) // just open the file for reading, no need to use wal fp, err := os.OpenFile(file, os.O_RDONLY, 0666) @@ -208,11 +213,11 @@ func (pb *playback) replayConsoleLoop() int { // ensure all new step events are regenerated as expected newStepCh := make(chan interface{}, 1) - err := pb.cs.eventBus.Subscribe(ctx, "replay-file", types.EventQueryNewRoundStep, newStepCh) + err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh) if err != nil { - cmn.Exit(fmt.Sprintf("failed to subscribe replay-file to %v", types.EventQueryNewRoundStep)) + cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)) } - defer pb.cs.eventBus.Unsubscribe(ctx, "replay-file", types.EventQueryNewRoundStep) + defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) if len(tokens) == 1 { pb.replayReset(1, newStepCh) diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 82fdded4..31a80f58 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -291,9 +291,10 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context) error { // After being reconnected, it is necessary to redo subscription to server // otherwise no data will be automatically received. func (w *WSEvents) redoSubscriptions() { - for query, out := range w.subscriptions { - // NOTE: no timeout for reconnect - w.Subscribe(context.Background(), query, out) + for query := range w.subscriptions { + // NOTE: no timeout for resubscribing + // FIXME: better logging/handling of errors?? + w.ws.Subscribe(context.Background(), query) } } diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index 1fea2afb..55a0e0fb 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -13,6 +13,11 @@ import ( tmquery "github.com/tendermint/tmlibs/pubsub/query" ) +const ( + // event bus subscriber + subscriber = "rpc-localclient" +) + /* Local is a Client implementation that directly executes the rpc functions on a given node, without going through HTTP or GRPC. @@ -67,7 +72,7 @@ func (c Local) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery, return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions) } -func (c Local) ABCIQueryWithOptions(path string, data data.Bytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { +func (Local) ABCIQueryWithOptions(path string, data data.Bytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { return core.ABCIQuery(path, data, opts.Height, opts.Trusted) } @@ -124,7 +129,7 @@ func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interfac if err != nil { return errors.Wrap(err, "failed to subscribe") } - if err = c.EventBus.Subscribe(ctx, "rpclocalclient", q, out); err != nil { + if err = c.EventBus.Subscribe(ctx, subscriber, q, out); err != nil { return errors.Wrap(err, "failed to subscribe") } c.subscriptions[query] = q @@ -136,7 +141,7 @@ func (c *Local) Unsubscribe(ctx context.Context, query string) error { if !ok { return errors.New("subscription not found") } - if err := c.EventBus.Unsubscribe(ctx, "rpclocalclient", q); err != nil { + if err := c.EventBus.Unsubscribe(ctx, subscriber, q); err != nil { return errors.Wrap(err, "failed to unsubscribe") } delete(c.subscriptions, query) @@ -144,7 +149,7 @@ func (c *Local) Unsubscribe(ctx context.Context, query string) error { } func (c *Local) UnsubscribeAll(ctx context.Context) error { - if err := c.EventBus.UnsubscribeAll(ctx, "rpclocalclient"); err != nil { + if err := c.EventBus.UnsubscribeAll(ctx, subscriber); err != nil { return errors.Wrap(err, "failed to unsubscribe") } c.subscriptions = make(map[string]*tmquery.Query)