no timeout for unsubscribe

but 1s Local (5s HTTP) timeout for resubscribe
This commit is contained in:
Anton Kaliaev
2019-02-11 16:07:29 +04:00
parent 898b4ed138
commit d245a1eb7d
5 changed files with 31 additions and 23 deletions

View File

@ -3,7 +3,6 @@ package proxy
import ( import (
"context" "context"
"net/http" "net/http"
"time"
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -36,14 +35,12 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe
mux := http.NewServeMux() mux := http.NewServeMux()
rpcserver.RegisterRPCFuncs(mux, r, cdc, logger) rpcserver.RegisterRPCFuncs(mux, r, cdc, logger)
wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.OnDisconnect(func(remoteAddr string) { unsubscribeFromAllEvents := func(remoteAddr string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) if err := c.UnsubscribeAll(context.Background(), remoteAddr); err != nil {
defer cancel()
err := c.UnsubscribeAll(ctx, remoteAddr)
if err != nil {
logger.Error("Failed to unsubscribe from events", "err", err) logger.Error("Failed to unsubscribe from events", "err", err)
} }
})) }
wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.OnDisconnect(unsubscribeFromAllEvents))
wm.SetLogger(logger) wm.SetLogger(logger)
core.SetLogger(logger) core.SetLogger(logger)
mux.HandleFunc(wsEndpoint, wm.WebsocketHandler) mux.HandleFunc(wsEndpoint, wm.WebsocketHandler)

View File

@ -3,7 +3,6 @@ package proxy
import ( import (
"context" "context"
"fmt" "fmt"
"time"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
@ -153,9 +152,7 @@ func (w Wrapper) Commit(height *int64) (*ctypes.ResultCommit, error) {
// SubscribeWS subscribes for events using the given query and remote address as // SubscribeWS subscribes for events using the given query and remote address as
// a subscriber, but does not verify responses (FIXME)! // a subscriber, but does not verify responses (FIXME)!
func (w Wrapper) SubscribeWS(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) { func (w Wrapper) SubscribeWS(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) out, err := w.Client.Subscribe(context.Background(), wsCtx.GetRemoteAddr(), query)
defer cancel()
out, err := w.Client.Subscribe(ctx, wsCtx.GetRemoteAddr(), query)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -681,9 +681,7 @@ func (n *Node) startRPC() ([]net.Listener, error) {
wmLogger := rpcLogger.With("protocol", "websocket") wmLogger := rpcLogger.With("protocol", "websocket")
wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec,
rpcserver.OnDisconnect(func(remoteAddr string) { rpcserver.OnDisconnect(func(remoteAddr string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) err := n.eventBus.UnsubscribeAll(context.Background(), remoteAddr)
defer cancel()
err := n.eventBus.UnsubscribeAll(ctx, remoteAddr)
if err != nil { if err != nil {
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err) wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
} }

View File

@ -3,6 +3,7 @@ package client
import ( import (
"context" "context"
"sync" "sync"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -351,10 +352,14 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error
// After being reconnected, it is necessary to redo subscription to server // After being reconnected, it is necessary to redo subscription to server
// otherwise no data will be automatically received. // otherwise no data will be automatically received.
func (w *WSEvents) redoSubscriptions() { func (w *WSEvents) redoSubscriptions() {
const timeout = 5 * time.Second
for q := range w.subscriptions { for q := range w.subscriptions {
// NOTE: no timeout for resubscribing ctx, cancel := context.WithTimeout(context.Background(), timeout)
// FIXME: better logging/handling of errors?? _ = w.ws.Subscribe(ctx, q)
w.ws.Subscribe(context.Background(), q) // FIXME: err is either ErrAlreadySubscribed or max client (subscriptions per
// client) reached.
// We can ignore ErrAlreadySubscribed, but need to retry in the second case.
cancel()
} }
} }
@ -373,7 +378,7 @@ func (w *WSEvents) eventListener() {
if resp.Error != nil { if resp.Error != nil {
w.Logger.Error("WS error", "err", resp.Error.Error()) w.Logger.Error("WS error", "err", resp.Error.Error())
// we don't know which subscription failed, so redo all of them // we don't know which subscription failed, so redo all of them
// resubscribe with exponential timeout // ErrAlreadySubscribed can be ignored
w.redoSubscriptions() w.redoSubscriptions()
continue continue
} }

View File

@ -2,6 +2,7 @@ package client
import ( import (
"context" "context"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -171,19 +172,29 @@ func (c *Local) Subscribe(ctx context.Context, subscriber, query string, outCapa
select { select {
case outc <- ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}: case outc <- ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}:
default: default:
// XXX: log error // XXX: client has missed an event. inform it somehow!
} }
} }
case <-sub.Cancelled(): case <-sub.Cancelled():
if sub.Err() != tmpubsub.ErrUnsubscribed { if sub.Err() != tmpubsub.ErrUnsubscribed {
// resubscribe with exponential timeout // resubscribe
var err error var err error
sub, err = c.EventBus.Subscribe(ctx, subscriber, q) for {
if err != nil { if !c.IsRunning() {
// TODO return
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
sub, err = c.EventBus.Subscribe(ctx, subscriber, q)
if err == nil {
break
}
} }
} }
return return
case <-c.Quit():
return
} }
} }
}(sub) }(sub)