remove ID from responses from subscribe

Fixes #2949
This commit is contained in:
Anton Kaliaev 2019-08-01 16:00:12 +04:00
parent 9c79fad3da
commit 34fd20009b
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
12 changed files with 31 additions and 20 deletions

View File

@ -40,7 +40,6 @@ Response:
```
{
"jsonrpc": "2.0",
"id": "0#event",
"result": {
"query": "tm.event='ValidatorSetUpdates'",
"data": {

View File

@ -2,7 +2,6 @@ package proxy
import (
"context"
"fmt"
cmn "github.com/tendermint/tendermint/libs/common"
@ -170,7 +169,7 @@ func (w Wrapper) SubscribeWS(ctx *rpctypes.Context, query string) (*ctypes.Resul
ctx.WSConn.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse(
ctx.WSConn.Codec(),
rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)),
ctx.JSONReq.ID,
resultEvent,
))
case <-w.Client.Quit():

View File

@ -11,6 +11,7 @@ import (
amino "github.com/tendermint/go-amino"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
@ -100,6 +101,10 @@ func NewHTTP(remote, wsEndpoint string) *HTTP {
var _ Client = (*HTTP)(nil)
func (c *HTTP) SetLogger(l log.Logger) {
c.WSEvents.SetLogger(l)
}
// NewBatch creates a new batch client for this HTTP client.
func (c *HTTP) NewBatch() *BatchHTTP {
rpcBatch := c.rpc.NewRequestBatch()
@ -376,6 +381,7 @@ func (w *WSEvents) OnStart() error {
w.redoSubscriptionsAfter(0 * time.Second)
}))
w.ws.SetCodec(w.cdc)
w.ws.SetLogger(w.Logger)
err := w.ws.Start()
if err != nil {

View File

@ -17,6 +17,7 @@ import (
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto/tmhash"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
@ -26,7 +27,9 @@ import (
func getHTTPClient() *client.HTTP {
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
return client.NewHTTP(rpcAddr, "/websocket")
c := client.NewHTTP(rpcAddr, "/websocket")
c.SetLogger(log.TestingLogger())
return c
}
func getLocalClient() *client.Local {

View File

@ -180,7 +180,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
ctx.WSConn.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse(
ctx.WSConn.Codec(),
rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)),
ctx.JSONReq.ID,
resultEvent,
))
case <-sub.Cancelled():
@ -192,8 +192,8 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
reason = sub.Err().Error()
}
ctx.WSConn.TryWriteRPCResponse(
rpctypes.RPCServerError(rpctypes.JSONRPCStringID(
fmt.Sprintf("%v#event", ctx.JSONReq.ID)),
rpctypes.RPCServerError(
ctx.JSONReq.ID,
fmt.Errorf("subscription was cancelled (reason: %s)", reason),
))
}

View File

@ -215,7 +215,9 @@ func (b *JSONRPCRequestBatch) Send() ([]interface{}, error) {
// Call enqueues a request to call the given RPC method with the specified
// parameters, in the same way that the `JSONRPCClient.Call` function would.
func (b *JSONRPCRequestBatch) Call(method string, params map[string]interface{}, result interface{}) (interface{}, error) {
b.mtx.Lock()
id := b.client.nextRequestID()
b.mtx.Unlock()
request, err := types.MapToRequest(b.client.cdc, id, method, params)
if err != nil {

View File

@ -153,7 +153,7 @@ func OnReconnect(cb func()) func(*WSClient) {
// String returns WS client full address.
func (c *WSClient) String() string {
return fmt.Sprintf("%s (%s)", c.Address, c.Endpoint)
return fmt.Sprintf("WSClient{%s (%s)}", c.Address, c.Endpoint)
}
// OnStart implements cmn.Service by dialing a server and creating read and
@ -484,7 +484,7 @@ func (c *WSClient) readRoutine() {
continue
}
c.Logger.Info("got response", "id", response.ID, "resp", response.Result)
c.Logger.Info("got response", "id", response.ID, "result", fmt.Sprintf("%X", response.Result))
// Combine a non-blocking read on BaseService.Quit with a non-blocking write on ResponsesCh to avoid blocking
// c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
// both readRoutine and writeRoutine

View File

@ -57,7 +57,7 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo
// A Notification is a Request object without an "id" member.
// The Server MUST NOT reply to a Notification, including those that are within a batch request.
if request.ID == nil {
logger.Debug("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)")
logger.Debug("HTTPJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", "req", request)
continue
}
if len(r.URL.Path) > 1 {

View File

@ -160,7 +160,7 @@ func RecoverAndLogHandler(handler http.Handler, logger log.Logger) http.Handler
"Panic in RPC HTTP handler", "err", e, "stack",
string(debug.Stack()),
)
WriteRPCResponseHTTPError(rww, http.StatusInternalServerError, types.RPCInternalError(types.JSONRPCStringID(""), e.(error)))
WriteRPCResponseHTTPError(rww, http.StatusInternalServerError, types.RPCInternalError(types.JSONRPCIntID(0), e.(error)))
}
}

View File

@ -276,6 +276,8 @@ func (wsc *wsConnection) Context() context.Context {
// Read from the socket and subscribe to or unsubscribe from events
func (wsc *wsConnection) readRoutine() {
var request types.RPCRequest
defer func() {
if r := recover(); r != nil {
err, ok := r.(error)
@ -283,7 +285,7 @@ func (wsc *wsConnection) readRoutine() {
err = fmt.Errorf("WSJSONRPC: %v", r)
}
wsc.Logger.Error("Panic in WSJSONRPC handler", "err", err, "stack", string(debug.Stack()))
wsc.WriteRPCResponse(types.RPCInternalError(types.JSONRPCIntID(0), err))
wsc.WriteRPCResponse(types.RPCInternalError(request.ID, err))
go wsc.readRoutine()
} else {
wsc.baseConn.Close() // nolint: errcheck
@ -315,7 +317,6 @@ func (wsc *wsConnection) readRoutine() {
return
}
var request types.RPCRequest
err = json.Unmarshal(in, &request)
if err != nil {
wsc.WriteRPCResponse(types.RPCParseError(errors.Wrap(err, "error unmarshalling request")))
@ -324,8 +325,8 @@ func (wsc *wsConnection) readRoutine() {
// A Notification is a Request object without an "id" member.
// The Server MUST NOT reply to a Notification, including those that are within a batch request.
if request.ID == types.JSONRPCIntID(0) {
wsc.Logger.Debug("WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)")
if request.ID == nil {
wsc.Logger.Debug("WSJSONRPC received a notification, skipping... (please send a non-empty ID if you want to call a method)", "req", request)
continue
}
@ -402,7 +403,7 @@ func (wsc *wsConnection) writeRoutine() {
if err != nil {
wsc.Logger.Error("Failed to marshal RPCResponse to JSON", "err", err)
} else if err = wsc.writeMessageWithDeadline(websocket.TextMessage, jsonBytes); err != nil {
wsc.Logger.Error("Failed to write response", "err", err)
wsc.Logger.Error("Failed to write response", "msg", msg, "err", err)
wsc.Stop()
return
}

View File

@ -22,6 +22,7 @@ func TestWebsocketManagerHandler(t *testing.T) {
d := websocket.Dialer{}
c, dialResp, err := d.Dial("ws://"+s.Listener.Addr().String()+"/websocket", nil)
require.NoError(t, err)
defer dialResp.Body.Close() // nolint: errcheck
if got, want := dialResp.StatusCode, http.StatusSwitchingProtocols; got != want {
t.Errorf("dialResp.StatusCode = %q, want %q", got, want)

View File

@ -61,7 +61,7 @@ type RPCRequest struct {
func (request *RPCRequest) UnmarshalJSON(data []byte) error {
unsafeReq := &struct {
JSONRPC string `json:"jsonrpc"`
ID interface{} `json:"id"`
ID interface{} `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params"` // must be map[string]interface{} or []interface{}
}{}
@ -93,7 +93,7 @@ func NewRPCRequest(id jsonrpcid, method string, params json.RawMessage) RPCReque
}
func (req RPCRequest) String() string {
return fmt.Sprintf("[%s %s]", req.ID, req.Method)
return fmt.Sprintf("RPCRequest{%s %s/%X}", req.ID, req.Method, req.Params)
}
func MapToRequest(cdc *amino.Codec, id jsonrpcid, method string, params map[string]interface{}) (RPCRequest, error) {
@ -205,9 +205,9 @@ func NewRPCErrorResponse(id jsonrpcid, code int, msg string, data string) RPCRes
func (resp RPCResponse) String() string {
if resp.Error == nil {
return fmt.Sprintf("[%s %v]", resp.ID, resp.Result)
return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Result)
}
return fmt.Sprintf("[%s %s]", resp.ID, resp.Error)
return fmt.Sprintf("RPCResponse{%s %v}", resp.ID, resp.Error)
}
// From the JSON-RPC 2.0 spec: