diff --git a/lite/proxy/proxy.go b/lite/proxy/proxy.go index 93c2c04d..9a3598a6 100644 --- a/lite/proxy/proxy.go +++ b/lite/proxy/proxy.go @@ -3,7 +3,6 @@ package proxy import ( "context" "net/http" - "time" amino "github.com/tendermint/go-amino" "github.com/tendermint/tendermint/libs/log" @@ -36,14 +35,12 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe mux := http.NewServeMux() rpcserver.RegisterRPCFuncs(mux, r, cdc, logger) - wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.OnDisconnect(func(remoteAddr string) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - err := c.UnsubscribeAll(ctx, remoteAddr) - if err != nil { + unsubscribeFromAllEvents := func(remoteAddr string) { + if err := c.UnsubscribeAll(context.Background(), remoteAddr); err != nil { logger.Error("Failed to unsubscribe from events", "err", err) } - })) + } + wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.OnDisconnect(unsubscribeFromAllEvents)) wm.SetLogger(logger) core.SetLogger(logger) mux.HandleFunc(wsEndpoint, wm.WebsocketHandler) diff --git a/lite/proxy/wrapper.go b/lite/proxy/wrapper.go index 1f73b85e..7d7c26bb 100644 --- a/lite/proxy/wrapper.go +++ b/lite/proxy/wrapper.go @@ -3,7 +3,6 @@ package proxy import ( "context" "fmt" - "time" cmn "github.com/tendermint/tendermint/libs/common" @@ -153,9 +152,7 @@ func (w Wrapper) Commit(height *int64) (*ctypes.ResultCommit, error) { // SubscribeWS subscribes for events using the given query and remote address as // a subscriber, but does not verify responses (FIXME)! func (w Wrapper) SubscribeWS(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - out, err := w.Client.Subscribe(ctx, wsCtx.GetRemoteAddr(), query) + out, err := w.Client.Subscribe(context.Background(), wsCtx.GetRemoteAddr(), query) if err != nil { return nil, err } diff --git a/node/node.go b/node/node.go index 061517c0..5620f1bb 100644 --- a/node/node.go +++ b/node/node.go @@ -681,9 +681,7 @@ func (n *Node) startRPC() ([]net.Listener, error) { wmLogger := rpcLogger.With("protocol", "websocket") wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.OnDisconnect(func(remoteAddr string) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - err := n.eventBus.UnsubscribeAll(ctx, remoteAddr) + err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr) if err != nil { wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) } diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 756bc127..79b41246 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -3,6 +3,7 @@ package client import ( "context" "sync" + "time" "github.com/pkg/errors" @@ -351,10 +352,14 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error // After being reconnected, it is necessary to redo subscription to server // otherwise no data will be automatically received. func (w *WSEvents) redoSubscriptions() { + const timeout = 5 * time.Second for q := range w.subscriptions { - // NOTE: no timeout for resubscribing - // FIXME: better logging/handling of errors?? - w.ws.Subscribe(context.Background(), q) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + _ = w.ws.Subscribe(ctx, q) + // FIXME: err is either ErrAlreadySubscribed or max client (subscriptions per + // client) reached. + // We can ignore ErrAlreadySubscribed, but need to retry in the second case. + cancel() } } @@ -373,7 +378,7 @@ func (w *WSEvents) eventListener() { if resp.Error != nil { w.Logger.Error("WS error", "err", resp.Error.Error()) // we don't know which subscription failed, so redo all of them - // resubscribe with exponential timeout + // ErrAlreadySubscribed can be ignored w.redoSubscriptions() continue } diff --git a/rpc/client/localclient.go b/rpc/client/localclient.go index 7e9edb0f..5b86a92f 100644 --- a/rpc/client/localclient.go +++ b/rpc/client/localclient.go @@ -2,6 +2,7 @@ package client import ( "context" + "time" "github.com/pkg/errors" @@ -171,19 +172,29 @@ func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapa select { case outc <- ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}: default: - // XXX: log error + // XXX: client has missed an event. inform it somehow! } } case <-sub.Cancelled(): if sub.Err() != tmpubsub.ErrUnsubscribed { - // resubscribe with exponential timeout + // resubscribe var err error - sub, err = c.EventBus.Subscribe(ctx, subscriber, q) - if err != nil { - // TODO + for { + if !c.IsRunning() { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + sub, err = c.EventBus.Subscribe(ctx, subscriber, q) + if err == nil { + break + } } } return + case <-c.Quit(): + return } } }(sub)