diff --git a/docs/app-dev/subscribing-to-events-via-websocket.md b/docs/app-dev/subscribing-to-events-via-websocket.md index 890b061b..be35f49c 100644 --- a/docs/app-dev/subscribing-to-events-via-websocket.md +++ b/docs/app-dev/subscribing-to-events-via-websocket.md @@ -40,7 +40,6 @@ Response: ``` { "jsonrpc": "2.0", - "id": "0#event", "result": { "query": "tm.event='ValidatorSetUpdates'", "data": { diff --git a/lite/proxy/wrapper.go b/lite/proxy/wrapper.go index 2d333e9f..55d9659d 100644 --- a/lite/proxy/wrapper.go +++ b/lite/proxy/wrapper.go @@ -2,7 +2,6 @@ package proxy import ( "context" - "fmt" cmn "github.com/tendermint/tendermint/libs/common" @@ -170,7 +169,7 @@ func (w Wrapper) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Resul ctx.WSConn.TryWriteRPCResponse( rpctypes.NewRPCSuccessResponse( ctx.WSConn.Codec(), - rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + ctx.JSONReq.ID, resultEvent, )) case <-w.Client.Quit(): diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index 85f065b6..1b496bb6 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -11,6 +11,7 @@ import ( amino "github.com/tendermint/go-amino" cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpcclient "github.com/tendermint/tendermint/rpc/lib/client" @@ -100,6 +101,10 @@ func NewHTTP(remote, wsEndpoint string) *HTTP { var _ Client = (*HTTP)(nil) +func (c *HTTP) SetLogger(l log.Logger) { + c.WSEvents.SetLogger(l) +} + // NewBatch creates a new batch client for this HTTP client. func (c *HTTP) NewBatch() *BatchHTTP { rpcBatch := c.rpc.NewRequestBatch() @@ -376,6 +381,7 @@ func (w *WSEvents) OnStart() error { w.redoSubscriptionsAfter(0 * time.Second) })) w.ws.SetCodec(w.cdc) + w.ws.SetLogger(w.Logger) err := w.ws.Start() if err != nil { diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index de5e18f1..fd0504b9 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -17,6 +17,7 @@ import ( "github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/tmhash" cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -26,7 +27,9 @@ import ( func getHTTPClient() *client.HTTP { rpcAddr := rpctest.GetConfig().RPC.ListenAddress - return client.NewHTTP(rpcAddr, "/websocket") + c := client.NewHTTP(rpcAddr, "/websocket") + c.SetLogger(log.TestingLogger()) + return c } func getLocalClient() *client.Local { diff --git a/rpc/core/events.go b/rpc/core/events.go index acb90b46..c8256356 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -180,7 +180,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er ctx.WSConn.TryWriteRPCResponse( rpctypes.NewRPCSuccessResponse( ctx.WSConn.Codec(), - rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + ctx.JSONReq.ID, resultEvent, )) case <-sub.Cancelled(): @@ -192,8 +192,8 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er reason = sub.Err().Error() } ctx.WSConn.TryWriteRPCResponse( - rpctypes.RPCServerError(rpctypes.JSONRPCStringID( - fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + rpctypes.RPCServerError( + ctx.JSONReq.ID, fmt.Errorf("subscription was cancelled (reason: %s)", reason), )) } diff --git a/rpc/lib/client/http_json_client.go b/rpc/lib/client/http_json_client.go index 4656a450..51d91469 100644 --- a/rpc/lib/client/http_json_client.go +++ b/rpc/lib/client/http_json_client.go @@ -215,7 +215,9 @@ func (b *JSONRPCRequestBatch) Send() ([]interface{}, error) { // Call enqueues a request to call the given RPC method with the specified // parameters, in the same way that the `JSONRPCClient.Call` function would. func (b *JSONRPCRequestBatch) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) { + b.mtx.Lock() id := b.client.nextRequestID() + b.mtx.Unlock() request, err := types.MapToRequest(b.client.cdc, id, method, params) if err != nil { diff --git a/rpc/lib/client/ws_client.go b/rpc/lib/client/ws_client.go index 7a9a9611..4592acae 100644 --- a/rpc/lib/client/ws_client.go +++ b/rpc/lib/client/ws_client.go @@ -153,7 +153,7 @@ func OnReconnect(cb func()) func(*WSClient) { // String returns WS client full address. func (c *WSClient) String() string { - return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint) + return fmt.Sprintf("WSClient{%s (%s)}", c.Address, c.Endpoint) } // OnStart implements cmn.Service by dialing a server and creating read and @@ -484,7 +484,7 @@ func (c *WSClient) readRoutine() { continue } - c.Logger.Info("got response", "id", response.ID, "resp", response.Result) + c.Logger.Info("got response", "id", response.ID, "result", fmt.Sprintf("%X", response.Result)) // Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking // c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop // both readRoutine and writeRoutine diff --git a/rpc/lib/server/http_json_handler.go b/rpc/lib/server/http_json_handler.go index 6798bb8b..8fa0fbd2 100644 --- a/rpc/lib/server/http_json_handler.go +++ b/rpc/lib/server/http_json_handler.go @@ -57,7 +57,7 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo // A Notification is a Request object without an "id" member. // The Server MUST NOT reply to a Notification, including those that are within a batch request. if request.ID == nil { - logger.Debug("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)") + logger.Debug("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", "req", request) continue } if len(r.URL.Path) > 1 { diff --git a/rpc/lib/server/http_server.go b/rpc/lib/server/http_server.go index c97739bd..ead3dd37 100644 --- a/rpc/lib/server/http_server.go +++ b/rpc/lib/server/http_server.go @@ -160,7 +160,7 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler "Panic in RPC HTTP handler", "err", e, "stack", string(debug.Stack()), ) - WriteRPCResponseHTTPError(rww, http.StatusInternalServerError, types.RPCInternalError(types.JSONRPCStringID(""), e.(error))) + WriteRPCResponseHTTPError(rww, http.StatusInternalServerError, types.RPCInternalError(types.JSONRPCIntID(0), e.(error))) } } diff --git a/rpc/lib/server/ws_handler.go b/rpc/lib/server/ws_handler.go index 48a489fd..c4a44c8f 100644 --- a/rpc/lib/server/ws_handler.go +++ b/rpc/lib/server/ws_handler.go @@ -276,6 +276,8 @@ func (wsc *wsConnection) Context() context.Context { // Read from the socket and subscribe to or unsubscribe from events func (wsc *wsConnection) readRoutine() { + var request types.RPCRequest + defer func() { if r := recover(); r != nil { err, ok := r.(error) @@ -283,7 +285,7 @@ func (wsc *wsConnection) readRoutine() { err = fmt.Errorf("WSJSONRPC: %v", r) } wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack())) - wsc.WriteRPCResponse(types.RPCInternalError(types.JSONRPCIntID(0), err)) + wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err)) go wsc.readRoutine() } else { wsc.baseConn.Close() // nolint: errcheck @@ -315,7 +317,6 @@ func (wsc *wsConnection) readRoutine() { return } - var request types.RPCRequest err = json.Unmarshal(in, &request) if err != nil { wsc.WriteRPCResponse(types.RPCParseError(errors.Wrap(err, "error unmarshalling request"))) @@ -324,8 +325,8 @@ func (wsc *wsConnection) readRoutine() { // A Notification is a Request object without an "id" member. // The Server MUST NOT reply to a Notification, including those that are within a batch request. - if request.ID == types.JSONRPCIntID(0) { - wsc.Logger.Debug("WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)") + if request.ID == nil { + wsc.Logger.Debug("WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", "req", request) continue } @@ -402,7 +403,7 @@ func (wsc *wsConnection) writeRoutine() { if err != nil { wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err) } else if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil { - wsc.Logger.Error("Failed to write response", "err", err) + wsc.Logger.Error("Failed to write response", "msg", msg, "err", err) wsc.Stop() return } diff --git a/rpc/lib/server/ws_handler_test.go b/rpc/lib/server/ws_handler_test.go index 375ea415..dc648cc0 100644 --- a/rpc/lib/server/ws_handler_test.go +++ b/rpc/lib/server/ws_handler_test.go @@ -22,6 +22,7 @@ func TestWebsocketManagerHandler(t *testing.T) { d := websocket.Dialer{} c, dialResp, err := d.Dial("ws://"+s.Listener.Addr().String()+"/websocket", nil) require.NoError(t, err) + defer dialResp.Body.Close() // nolint: errcheck if got, want := dialResp.StatusCode, http.StatusSwitchingProtocols; got != want { t.Errorf("dialResp.StatusCode = %q, want %q", got, want) diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index d8faa161..a015cf81 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -61,7 +61,7 @@ type RPCRequest struct { func (request *RPCRequest) UnmarshalJSON(data []byte) error { unsafeReq := &struct { JSONRPC string `json:"jsonrpc"` - ID interface{} `json:"id"` + ID interface{} `json:"id,omitempty"` Method string `json:"method"` Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{} }{} @@ -93,7 +93,7 @@ func NewRPCRequest(id jsonrpcid, method string, params json.RawMessage) RPCReque } func (req RPCRequest) String() string { - return fmt.Sprintf("[%s %s]", req.ID, req.Method) + return fmt.Sprintf("RPCRequest{%s %s/%X}", req.ID, req.Method, req.Params) } func MapToRequest(cdc *amino.Codec, id jsonrpcid, method string, params map[string]interface{}) (RPCRequest, error) { @@ -205,9 +205,9 @@ func NewRPCErrorResponse(id jsonrpcid, code int, msg string, data string) RPCRes func (resp RPCResponse) String() string { if resp.Error == nil { - return fmt.Sprintf("[%s %v]", resp.ID, resp.Result) + return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Result) } - return fmt.Sprintf("[%s %s]", resp.ID, resp.Error) + return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Error) } // From the JSON-RPC 2.0 spec: