remove EventSubscriber

This commit is contained in:
Anton Kaliaev 2019-02-11 12:48:39 +04:00
parent 421b3c11d4
commit a801b14850
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
5 changed files with 37 additions and 52 deletions

View File

@ -1,7 +1,9 @@
package proxy package proxy
import ( import (
"context"
"net/http" "net/http"
"time"
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -34,7 +36,14 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe
mux := http.NewServeMux() mux := http.NewServeMux()
rpcserver.RegisterRPCFuncs(mux, r, cdc, logger) rpcserver.RegisterRPCFuncs(mux, r, cdc, logger)
wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.EventSubscriber(c)) wm := rpcserver.NewWebsocketManager(r, cdc, rpcserver.OnDisconnect(func(remoteAddr string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := c.UnsubscribeAll(ctx, remoteAddr)
if err != nil {
logger.Error("Failed to unsubscribe from events", "err", err)
}
}))
wm.SetLogger(logger) wm.SetLogger(logger)
core.SetLogger(logger) core.SetLogger(logger)
mux.HandleFunc(wsEndpoint, wm.WebsocketHandler) mux.HandleFunc(wsEndpoint, wm.WebsocketHandler)
@ -51,13 +60,11 @@ func StartProxy(c rpcclient.Client, listenAddr string, logger log.Logger, maxOpe
// //
// if we want security, the client must implement it as a secure client // if we want security, the client must implement it as a secure client
func RPCRoutes(c rpcclient.Client) map[string]*rpcserver.RPCFunc { func RPCRoutes(c rpcclient.Client) map[string]*rpcserver.RPCFunc {
return map[string]*rpcserver.RPCFunc{ return map[string]*rpcserver.RPCFunc{
// Subscribe/unsubscribe are reserved for websocket events. // Subscribe/unsubscribe are reserved for websocket events.
// We can just use the core tendermint impl, which uses the "subscribe": rpcserver.NewWSRPCFunc(c.Subscribe, "query"),
// EventSwitch we registered in NewWebsocketManager above "unsubscribe": rpcserver.NewWSRPCFunc(c.Unsubscribe, "query"),
"subscribe": rpcserver.NewWSRPCFunc(core.Subscribe, "query"), "unsubscribe_all": rpcserver.NewWSRPCFunc(c.UnsubscribeAll, ""),
"unsubscribe": rpcserver.NewWSRPCFunc(core.Unsubscribe, "query"),
// info API // info API
"status": rpcserver.NewRPCFunc(c.Status, ""), "status": rpcserver.NewRPCFunc(c.Status, ""),

View File

@ -678,8 +678,17 @@ func (n *Node) startRPC() ([]net.Listener, error) {
for i, listenAddr := range listenAddrs { for i, listenAddr := range listenAddrs {
mux := http.NewServeMux() mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server") rpcLogger := n.Logger.With("module", "rpc-server")
wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.EventSubscriber(n.eventBus)) wmLogger := rpcLogger.With("protocol", "websocket")
wm.SetLogger(rpcLogger.With("protocol", "websocket")) wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec,
rpcserver.OnDisconnect(func(remoteAddr string) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := n.eventBus.UnsubscribeAll(ctx, remoteAddr)
if err != nil {
wmLogger.Error("Failed to unsubscribe addr from events", "addr", remoteAddr, "err", err)
}
}))
wm.SetLogger(wmLogger)
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)

View File

@ -10,7 +10,6 @@ import (
tmquery "github.com/tendermint/tendermint/libs/pubsub/query" tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types" rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
tmtypes "github.com/tendermint/tendermint/types"
) )
// Subscribe for events via WebSocket. // Subscribe for events via WebSocket.
@ -94,9 +93,9 @@ import (
func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) { func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) {
addr := wsCtx.GetRemoteAddr() addr := wsCtx.GetRemoteAddr()
if eventBusFor(wsCtx).NumClients() > MaxSubscriptionClients { if eventBus.NumClients() > MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", MaxSubscriptionClients) return nil, fmt.Errorf("max_subscription_clients %d reached", MaxSubscriptionClients)
} else if eventBusFor(wsCtx).NumClientSubscriptions(addr) > MaxSubscriptionsPerClient { } else if eventBus.NumClientSubscriptions(addr) > MaxSubscriptionsPerClient {
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", MaxSubscriptionsPerClient) return nil, fmt.Errorf("max_subscriptions_per_client %d reached", MaxSubscriptionsPerClient)
} }
@ -109,7 +108,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel() defer cancel()
sub, err := eventBusFor(wsCtx).Subscribe(ctx, addr, q) sub, err := eventBus.Subscribe(ctx, addr, q)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -179,7 +178,7 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsub
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to parse query") return nil, errors.Wrap(err, "failed to parse query")
} }
err = eventBusFor(wsCtx).Unsubscribe(context.Background(), addr, q) err = eventBus.Unsubscribe(context.Background(), addr, q)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -213,17 +212,9 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsub
func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) { func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) {
addr := wsCtx.GetRemoteAddr() addr := wsCtx.GetRemoteAddr()
logger.Info("Unsubscribe from all", "remote", addr) logger.Info("Unsubscribe from all", "remote", addr)
err := eventBusFor(wsCtx).UnsubscribeAll(context.Background(), addr) err := eventBus.UnsubscribeAll(context.Background(), addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &ctypes.ResultUnsubscribe{}, nil return &ctypes.ResultUnsubscribe{}, nil
} }
func eventBusFor(wsCtx rpctypes.WSRPCContext) tmtypes.EventBusSubscriber {
es := wsCtx.GetEventSubscriber()
if es == nil {
es = eventBus
}
return es
}

View File

@ -2,7 +2,6 @@ package rpcserver
import ( import (
"bytes" "bytes"
"context"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -434,8 +433,8 @@ type wsConnection struct {
// Send pings to server with this period. Must be less than readWait, but greater than zero. // Send pings to server with this period. Must be less than readWait, but greater than zero.
pingPeriod time.Duration pingPeriod time.Duration
// object that is used to subscribe / unsubscribe from events // callback which is called upon disconnect
eventSub types.EventSubscriber onDisconnect func(remoteAddr string)
} }
// NewWSConnection wraps websocket.Conn. // NewWSConnection wraps websocket.Conn.
@ -468,12 +467,11 @@ func NewWSConnection(
return wsc return wsc
} }
// EventSubscriber sets object that is used to subscribe / unsubscribe from // OnDisconnect sets a callback which is used upon disconnect - not
// events - not Goroutine-safe. If none given, default node's eventBus will be // Goroutine-safe. Nop by default.
// used. func OnDisconnect(onDisconnect func(remoteAddr string)) func(*wsConnection) {
func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) {
return func(wsc *wsConnection) { return func(wsc *wsConnection) {
wsc.eventSub = eventSub wsc.onDisconnect = onDisconnect
} }
} }
@ -527,8 +525,8 @@ func (wsc *wsConnection) OnStop() {
// Both read and write loops close the websocket connection when they exit their loops. // Both read and write loops close the websocket connection when they exit their loops.
// The writeChan is never closed, to allow WriteRPCResponse() to fail. // The writeChan is never closed, to allow WriteRPCResponse() to fail.
if wsc.eventSub != nil { if wsc.onDisconnect != nil {
wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr) wsc.onDisconnect(wsc.remoteAddr)
} }
} }
@ -538,11 +536,6 @@ func (wsc *wsConnection) GetRemoteAddr() string {
return wsc.remoteAddr return wsc.remoteAddr
} }
// GetEventSubscriber implements WSRPCConnection by returning event subscriber.
func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber {
return wsc.eventSub
}
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
// It implements WSRPCConnection. It is Goroutine-safe. // It implements WSRPCConnection. It is Goroutine-safe.
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {

View File

@ -1,7 +1,6 @@
package rpctypes package rpctypes
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"reflect" "reflect"
@ -10,9 +9,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmtypes "github.com/tendermint/tendermint/types"
) )
// a wrapper to emulate a sum type: jsonrpcid = string | int // a wrapper to emulate a sum type: jsonrpcid = string | int
@ -241,7 +237,6 @@ type WSRPCConnection interface {
GetRemoteAddr() string GetRemoteAddr() string
WriteRPCResponse(resp RPCResponse) WriteRPCResponse(resp RPCResponse)
TryWriteRPCResponse(resp RPCResponse) bool TryWriteRPCResponse(resp RPCResponse) bool
GetEventSubscriber() EventSubscriber
Codec() *amino.Codec Codec() *amino.Codec
} }
@ -251,16 +246,6 @@ type WSRPCContext struct {
WSRPCConnection WSRPCConnection
} }
// EventSubscriber mirrors tendermint/tendermint/types.EventBusSubscriber
type EventSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (tmtypes.Subscription, error)
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
NumClients() int
NumClientSubscriptions(clientID string) int
}
//---------------------------------------- //----------------------------------------
// SOCKETS // SOCKETS
// //