fixes from my own review

This commit is contained in:
Anton Kaliaev
2017-10-27 00:01:00 +03:00
parent f6539737de
commit 1c1c68df8d
5 changed files with 26 additions and 14 deletions

View File

@ -15,6 +15,7 @@ FEATURES:
- Tooling to run multiple blockchains/apps, possibly in a single process - Tooling to run multiple blockchains/apps, possibly in a single process
- State syncing (without transaction replay) - State syncing (without transaction replay)
- Add authentication and rate-limitting to the RPC - Add authentication and rate-limitting to the RPC
- new unsubscribe_all WebSocket RPC endpoint
IMPROVEMENTS: IMPROVEMENTS:
- Improve subtleties around mempool caching and logic - Improve subtleties around mempool caching and logic

View File

@ -328,7 +328,7 @@ func (conR *ConsensusReactor) FastSync() bool {
// broadcastNewRoundStepsAndVotes subscribes for new round steps and votes // broadcastNewRoundStepsAndVotes subscribes for new round steps and votes
// using the event bus and broadcasts events to peers upon receiving them. // using the event bus and broadcasts events to peers upon receiving them.
func (conR *ConsensusReactor) broadcastNewRoundStepsAndVotes() error { func (conR *ConsensusReactor) broadcastNewRoundStepsAndVotes() error {
subscriber := "consensus-reactor" const subscriber = "consensus-reactor"
ctx := context.Background() ctx := context.Background()
// new round steps // new round steps

View File

@ -20,6 +20,11 @@ import (
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
) )
const (
// event bus subscriber
subscriber = "replay-file"
)
//-------------------------------------------------------- //--------------------------------------------------------
// replay messages interactively or all at once // replay messages interactively or all at once
@ -47,11 +52,11 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
newStepCh := make(chan interface{}, 1) newStepCh := make(chan interface{}, 1)
ctx := context.Background() ctx := context.Background()
err := cs.eventBus.Subscribe(ctx, "replay-file", types.EventQueryNewRoundStep, newStepCh) err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh)
if err != nil { if err != nil {
return errors.Errorf("failed to subscribe replay-file to %v", types.EventQueryNewRoundStep) return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)
} }
defer cs.eventBus.Unsubscribe(ctx, "replay-file", types.EventQueryNewRoundStep) defer cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
// just open the file for reading, no need to use wal // just open the file for reading, no need to use wal
fp, err := os.OpenFile(file, os.O_RDONLY, 0666) fp, err := os.OpenFile(file, os.O_RDONLY, 0666)
@ -208,11 +213,11 @@ func (pb *playback) replayConsoleLoop() int {
// ensure all new step events are regenerated as expected // ensure all new step events are regenerated as expected
newStepCh := make(chan interface{}, 1) newStepCh := make(chan interface{}, 1)
err := pb.cs.eventBus.Subscribe(ctx, "replay-file", types.EventQueryNewRoundStep, newStepCh) err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh)
if err != nil { if err != nil {
cmn.Exit(fmt.Sprintf("failed to subscribe replay-file to %v", types.EventQueryNewRoundStep)) cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep))
} }
defer pb.cs.eventBus.Unsubscribe(ctx, "replay-file", types.EventQueryNewRoundStep) defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
if len(tokens) == 1 { if len(tokens) == 1 {
pb.replayReset(1, newStepCh) pb.replayReset(1, newStepCh)

View File

@ -291,9 +291,10 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context) error {
// After being reconnected, it is necessary to redo subscription to server // After being reconnected, it is necessary to redo subscription to server
// otherwise no data will be automatically received. // otherwise no data will be automatically received.
func (w *WSEvents) redoSubscriptions() { func (w *WSEvents) redoSubscriptions() {
for query, out := range w.subscriptions { for query := range w.subscriptions {
// NOTE: no timeout for reconnect // NOTE: no timeout for resubscribing
w.Subscribe(context.Background(), query, out) // FIXME: better logging/handling of errors??
w.ws.Subscribe(context.Background(), query)
} }
} }

View File

@ -13,6 +13,11 @@ import (
tmquery "github.com/tendermint/tmlibs/pubsub/query" tmquery "github.com/tendermint/tmlibs/pubsub/query"
) )
const (
// event bus subscriber
subscriber = "rpc-localclient"
)
/* /*
Local is a Client implementation that directly executes the rpc Local is a Client implementation that directly executes the rpc
functions on a given node, without going through HTTP or GRPC. functions on a given node, without going through HTTP or GRPC.
@ -67,7 +72,7 @@ func (c Local) ABCIQuery(path string, data data.Bytes) (*ctypes.ResultABCIQuery,
return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions) return c.ABCIQueryWithOptions(path, data, DefaultABCIQueryOptions)
} }
func (c Local) ABCIQueryWithOptions(path string, data data.Bytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) { func (Local) ABCIQueryWithOptions(path string, data data.Bytes, opts ABCIQueryOptions) (*ctypes.ResultABCIQuery, error) {
return core.ABCIQuery(path, data, opts.Height, opts.Trusted) return core.ABCIQuery(path, data, opts.Height, opts.Trusted)
} }
@ -124,7 +129,7 @@ func (c *Local) Subscribe(ctx context.Context, query string, out chan<- interfac
if err != nil { if err != nil {
return errors.Wrap(err, "failed to subscribe") return errors.Wrap(err, "failed to subscribe")
} }
if err = c.EventBus.Subscribe(ctx, "rpclocalclient", q, out); err != nil { if err = c.EventBus.Subscribe(ctx, subscriber, q, out); err != nil {
return errors.Wrap(err, "failed to subscribe") return errors.Wrap(err, "failed to subscribe")
} }
c.subscriptions[query] = q c.subscriptions[query] = q
@ -136,7 +141,7 @@ func (c *Local) Unsubscribe(ctx context.Context, query string) error {
if !ok { if !ok {
return errors.New("subscription not found") return errors.New("subscription not found")
} }
if err := c.EventBus.Unsubscribe(ctx, "rpclocalclient", q); err != nil { if err := c.EventBus.Unsubscribe(ctx, subscriber, q); err != nil {
return errors.Wrap(err, "failed to unsubscribe") return errors.Wrap(err, "failed to unsubscribe")
} }
delete(c.subscriptions, query) delete(c.subscriptions, query)
@ -144,7 +149,7 @@ func (c *Local) Unsubscribe(ctx context.Context, query string) error {
} }
func (c *Local) UnsubscribeAll(ctx context.Context) error { func (c *Local) UnsubscribeAll(ctx context.Context) error {
if err := c.EventBus.UnsubscribeAll(ctx, "rpclocalclient"); err != nil { if err := c.EventBus.UnsubscribeAll(ctx, subscriber); err != nil {
return errors.Wrap(err, "failed to unsubscribe") return errors.Wrap(err, "failed to unsubscribe")
} }
c.subscriptions = make(map[string]*tmquery.Query) c.subscriptions = make(map[string]*tmquery.Query)