From d2eac80b598896e4df829bc6c87781778e516886 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Fri, 1 Mar 2019 18:51:35 +0400 Subject: [PATCH] save commit --- rpc/core/abci.go | 5 ++- rpc/core/blocks.go | 9 +++-- rpc/core/consensus.go | 9 +++-- rpc/core/dev.go | 13 +++++-- rpc/core/events.go | 24 ++++++------ rpc/core/health.go | 3 +- rpc/core/mempool.go | 14 +++---- rpc/core/net.go | 9 +++-- rpc/core/status.go | 3 +- rpc/core/tx.go | 5 ++- rpc/lib/server/handlers.go | 67 ++++++++++++++++----------------- rpc/lib/server/handlers_test.go | 4 +- rpc/lib/server/parse_test.go | 15 +++----- rpc/lib/types/types.go | 34 ++++++++++++++--- 14 files changed, 122 insertions(+), 92 deletions(-) diff --git a/rpc/core/abci.go b/rpc/core/abci.go index aa6089b6..ce15ac14 100644 --- a/rpc/core/abci.go +++ b/rpc/core/abci.go @@ -5,6 +5,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/proxy" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" ) // Query the application for some information. @@ -52,7 +53,7 @@ import ( // | data | []byte | false | true | Data | // | height | int64 | 0 | false | Height (0 means latest) | // | prove | bool | false | false | Includes proof if true | -func ABCIQuery(path string, data cmn.HexBytes, height int64, prove bool) (*ctypes.ResultABCIQuery, error) { +func ABCIQuery(ctx *rpctypes.Context, path string, data cmn.HexBytes, height int64, prove bool) (*ctypes.ResultABCIQuery, error) { resQuery, err := proxyAppQuery.QuerySync(abci.RequestQuery{ Path: path, Data: data, @@ -96,7 +97,7 @@ func ABCIQuery(path string, data cmn.HexBytes, height int64, prove bool) (*ctype // "jsonrpc": "2.0" // } // ``` -func ABCIInfo() (*ctypes.ResultABCIInfo, error) { +func ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) { resInfo, err := proxyAppQuery.InfoSync(proxy.RequestInfo) if err != nil { return nil, err diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 906aea7b..40b6811d 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -5,6 +5,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -68,7 +69,7 @@ import ( // ``` // // -func BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { +func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { // maximum 20 block metas const limit int64 = 20 @@ -226,7 +227,7 @@ func filterMinMax(height, min, max, limit int64) (int64, int64, error) { // "jsonrpc": "2.0" // } // ``` -func Block(heightPtr *int64) (*ctypes.ResultBlock, error) { +func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) { storeHeight := blockStore.Height() height, err := getHeight(storeHeight, heightPtr) if err != nil { @@ -313,7 +314,7 @@ func Block(heightPtr *int64) (*ctypes.ResultBlock, error) { // "jsonrpc": "2.0" // } // ``` -func Commit(heightPtr *int64) (*ctypes.ResultCommit, error) { +func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) { storeHeight := blockStore.Height() height, err := getHeight(storeHeight, heightPtr) if err != nil { @@ -372,7 +373,7 @@ func Commit(heightPtr *int64) (*ctypes.ResultCommit, error) { // ] // } // ``` -func BlockResults(heightPtr *int64) (*ctypes.ResultBlockResults, error) { +func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) { storeHeight := blockStore.Height() height, err := getHeight(storeHeight, heightPtr) if err != nil { diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 81694b7e..2fff86dc 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -3,6 +3,7 @@ package core import ( cm "github.com/tendermint/tendermint/consensus" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -47,7 +48,7 @@ import ( // "jsonrpc": "2.0" // } // ``` -func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) { +func Validators(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultValidators, error) { // The latest validator that we know is the // NextValidator of the last block. height := consensusState.GetState().LastBlockHeight + 1 @@ -200,7 +201,7 @@ func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) { // } // } // ``` -func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { +func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) { // Get Peer consensus states. peers := p2pPeers.Peers().List() peerStates := make([]ctypes.PeerStateInfo, len(peers)) @@ -277,7 +278,7 @@ func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { // } //} //``` -func ConsensusState() (*ctypes.ResultConsensusState, error) { +func ConsensusState(ctx *rpctypes.Context, ) (*ctypes.ResultConsensusState, error) { // Get self round state. bz, err := consensusState.GetRoundStateSimpleJSON() return &ctypes.ResultConsensusState{RoundState: bz}, err @@ -320,7 +321,7 @@ func ConsensusState() (*ctypes.ResultConsensusState, error) { // } // } // ``` -func ConsensusParams(heightPtr *int64) (*ctypes.ResultConsensusParams, error) { +func ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultConsensusParams, error) { height := consensusState.GetState().LastBlockHeight + 1 height, err := getHeight(height, heightPtr) if err != nil { diff --git a/rpc/core/dev.go b/rpc/core/dev.go index 0b515476..71f284f8 100644 --- a/rpc/core/dev.go +++ b/rpc/core/dev.go @@ -5,16 +5,19 @@ import ( "runtime/pprof" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" ) -func UnsafeFlushMempool() (*ctypes.ResultUnsafeFlushMempool, error) { +// UnsafeFlushMempool removes all transactions from the mempool. +func UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) { mempool.Flush() return &ctypes.ResultUnsafeFlushMempool{}, nil } var profFile *os.File -func UnsafeStartCPUProfiler(filename string) (*ctypes.ResultUnsafeProfile, error) { +// UnsafeStartCPUProfiler starts a pprof profiler using the given filename. +func UnsafeStartCPUProfiler(ctx *rpctypes.Context, filename string) (*ctypes.ResultUnsafeProfile, error) { var err error profFile, err = os.Create(filename) if err != nil { @@ -27,7 +30,8 @@ func UnsafeStartCPUProfiler(filename string) (*ctypes.ResultUnsafeProfile, error return &ctypes.ResultUnsafeProfile{}, nil } -func UnsafeStopCPUProfiler() (*ctypes.ResultUnsafeProfile, error) { +// UnsafeStopCPUProfiler stops the running pprof profiler. +func UnsafeStopCPUProfiler(ctx *rpctypes.Context) (*ctypes.ResultUnsafeProfile, error) { pprof.StopCPUProfile() if err := profFile.Close(); err != nil { return nil, err @@ -35,7 +39,8 @@ func UnsafeStopCPUProfiler() (*ctypes.ResultUnsafeProfile, error) { return &ctypes.ResultUnsafeProfile{}, nil } -func UnsafeWriteHeapProfile(filename string) (*ctypes.ResultUnsafeProfile, error) { +// UnsafeWriteHeapProfile dumps a heap profile to the given filename. +func UnsafeWriteHeapProfile(ctx *rpctypes.Context, filename string) (*ctypes.ResultUnsafeProfile, error) { memProfFile, err := os.Create(filename) if err != nil { return nil, err diff --git a/rpc/core/events.go b/rpc/core/events.go index 5188b871..10f50a73 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -90,8 +90,8 @@ import ( // | query | string | "" | true | Query | // // -func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscribe, error) { - addr := wsCtx.GetRemoteAddr() +func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { + addr := ctx.RemoteAddr() if eventBus.NumClients() > config.MaxSubscriptionClients { return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients) @@ -118,19 +118,19 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri select { case msg := <-sub.Out(): resultEvent := &ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()} - wsCtx.TryWriteRPCResponse( + ctx.WSConn.TryWriteRPCResponse( rpctypes.NewRPCSuccessResponse( - wsCtx.Codec(), - rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)), + ctx.WSConn.Codec(), + rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", ctx.JSONReq.ID)), resultEvent, )) case <-sub.Cancelled(): if sub.Err() != tmpubsub.ErrUnsubscribed { // should not happen - wsCtx.TryWriteRPCResponse( + ctx.WSConn.TryWriteRPCResponse( rpctypes.RPCServerError(rpctypes.JSONRPCStringID( - fmt.Sprintf("%v#event", wsCtx.Request.ID)), - fmt.Errorf("subscription was cancelled (reason: %v).", sub.Err()), + fmt.Sprintf("%v#event", ctx.JSONReq.ID)), + fmt.Errorf("subscription was cancelled (reason: %v)", sub.Err()), )) } return @@ -171,8 +171,8 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri // | query | string | "" | true | Query | // // -func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsubscribe, error) { - addr := wsCtx.GetRemoteAddr() +func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { + addr := ctx.RemoteAddr() logger.Info("Unsubscribe from query", "remote", addr, "query", query) q, err := tmquery.New(query) if err != nil { @@ -209,8 +209,8 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsub // ``` // // -func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) { - addr := wsCtx.GetRemoteAddr() +func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { + addr := ctx.RemoteAddr() logger.Info("Unsubscribe from all", "remote", addr) err := eventBus.UnsubscribeAll(context.Background(), addr) if err != nil { diff --git a/rpc/core/health.go b/rpc/core/health.go index eeb8686b..41186a04 100644 --- a/rpc/core/health.go +++ b/rpc/core/health.go @@ -2,6 +2,7 @@ package core import ( ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" ) // Get node health. Returns empty result (200 OK) on success, no response - in @@ -31,6 +32,6 @@ import ( // "jsonrpc": "2.0" // } // ``` -func Health() (*ctypes.ResultHealth, error) { +func Health(ctx *rpctypes.Context) (*ctypes.ResultHealth, error) { return &ctypes.ResultHealth{}, nil } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 48855b01..d1994697 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -9,6 +9,7 @@ import ( abci "github.com/tendermint/tendermint/abci/types" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" "github.com/tendermint/tendermint/types" ) @@ -52,7 +53,7 @@ import ( // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | -func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := mempool.CheckTx(tx, nil) if err != nil { return nil, err @@ -97,7 +98,7 @@ func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | -func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.Response, 1) err := mempool.CheckTx(tx, func(res *abci.Response) { resCh <- res @@ -164,9 +165,8 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // | Parameter | Type | Default | Required | Description | // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | -func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { - // XXX: should be the remote IP address of the caller - subscriber := "mempool" +func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { + subscriber := ctx.RemoteAddr() if eventBus.NumClients() > config.MaxSubscriptionClients { return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients) @@ -269,7 +269,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // |-----------+------+---------+----------+--------------------------------------| // | limit | int | 30 | false | Maximum number of entries (max: 100) | // ``` -func UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) { +func UnconfirmedTxs(ctx *rpctypes.Context, limit int) (*ctypes.ResultUnconfirmedTxs, error) { // reuse per_page validator limit = validatePerPage(limit) @@ -306,6 +306,6 @@ func UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) { // "jsonrpc": "2.0" // } // ``` -func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) { +func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) { return &ctypes.ResultUnconfirmedTxs{N: mempool.Size()}, nil } diff --git a/rpc/core/net.go b/rpc/core/net.go index 56e9624d..5f2c9e21 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -7,6 +7,7 @@ import ( "github.com/tendermint/tendermint/p2p" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" ) // Get network info. @@ -42,7 +43,7 @@ import ( // "jsonrpc": "2.0" // } // ``` -func NetInfo() (*ctypes.ResultNetInfo, error) { +func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { peers := []ctypes.Peer{} for _, peer := range p2pPeers.Peers().List() { nodeInfo, ok := peer.NodeInfo().(p2p.DefaultNodeInfo) @@ -67,7 +68,7 @@ func NetInfo() (*ctypes.ResultNetInfo, error) { }, nil } -func UnsafeDialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { +func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialSeeds, error) { if len(seeds) == 0 { return &ctypes.ResultDialSeeds{}, errors.New("No seeds provided") } @@ -80,7 +81,7 @@ func UnsafeDialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { return &ctypes.ResultDialSeeds{Log: "Dialing seeds in progress. See /net_info for details"}, nil } -func UnsafeDialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, error) { +func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent bool) (*ctypes.ResultDialPeers, error) { if len(peers) == 0 { return &ctypes.ResultDialPeers{}, errors.New("No peers provided") } @@ -135,6 +136,6 @@ func UnsafeDialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, // "jsonrpc": "2.0" // } // ``` -func Genesis() (*ctypes.ResultGenesis, error) { +func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) { return &ctypes.ResultGenesis{Genesis: genDoc}, nil } diff --git a/rpc/core/status.go b/rpc/core/status.go index 224857d0..f200d5f7 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -7,6 +7,7 @@ import ( cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/p2p" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -70,7 +71,7 @@ import ( // } // } // ``` -func Status() (*ctypes.ResultStatus, error) { +func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { var latestHeight int64 = -1 if consensusReactor.FastSync() { latestHeight = blockStore.Height() diff --git a/rpc/core/tx.go b/rpc/core/tx.go index f1bfd56a..57715161 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -7,6 +7,7 @@ import ( tmquery "github.com/tendermint/tendermint/libs/pubsub/query" ctypes "github.com/tendermint/tendermint/rpc/core/types" + rpctypes "github.com/tendermint/tendermint/rpc/lib/types" "github.com/tendermint/tendermint/state/txindex/null" "github.com/tendermint/tendermint/types" ) @@ -76,7 +77,7 @@ import ( // - `index`: `int` - index of the transaction // - `height`: `int` - height of the block where this transaction was in // - `hash`: `[]byte` - hash of the transaction -func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { +func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { // if index is disabled, return error if _, ok := txIndexer.(*null.TxIndex); ok { @@ -182,7 +183,7 @@ func Tx(hash []byte, prove bool) (*ctypes.ResultTx, error) { // - `index`: `int` - index of the transaction // - `height`: `int` - height of the block where this transaction was in // - `hash`: `[]byte` - hash of the transaction -func TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) { +func TxSearch(ctx *rpctypes.Context, query string, prove bool, page, perPage int) (*ctypes.ResultTxSearch, error) { // if index is disabled, return error if _, ok := txIndexer.(*null.TxIndex); ok { return nil, fmt.Errorf("Transaction indexing is disabled") diff --git a/rpc/lib/server/handlers.go b/rpc/lib/server/handlers.go index d3967727..cc9ee130 100644 --- a/rpc/lib/server/handlers.go +++ b/rpc/lib/server/handlers.go @@ -128,20 +128,26 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc, cdc *amino.Codec, logger lo WriteRPCResponseHTTP(w, types.RPCInvalidRequestError(request.ID, errors.Errorf("Path %s is invalid", r.URL.Path))) return } + rpcFunc := funcMap[request.Method] if rpcFunc == nil || rpcFunc.ws { WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(request.ID)) return } - var args []reflect.Value + + ctx := &types.Context{JSONReq: &request, HTTPReq: r} + args := []reflect.Value{reflect.ValueOf(ctx)} if len(request.Params) > 0 { - args, err = jsonParamsToArgsRPC(rpcFunc, cdc, request.Params) + fnArgs, err := jsonParamsToArgs(rpcFunc, cdc, request.Params) if err != nil { WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(request.ID, errors.Wrap(err, "Error converting json params to arguments"))) return } + args = append(args, fnArgs...) } + returns := rpcFunc.f.Call(args) + logger.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) result, err := unreflectResult(returns) if err != nil { @@ -204,13 +210,14 @@ func arrayParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, params []json.RawMess return values, nil } -// `raw` is unparsed json (from json.RawMessage) encoding either a map or an array. -// `argsOffset` should be 0 for RPC calls, and 1 for WS requests, where len(rpcFunc.args) != len(rpcFunc.argNames). +// raw is unparsed json (from json.RawMessage) encoding either a map or an +// array. // // Example: -// rpcFunc.args = [rpctypes.WSRPCContext string] +// rpcFunc.args = [rpctypes.Context string] // rpcFunc.argNames = ["arg"] -func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte, argsOffset int) ([]reflect.Value, error) { +func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte) ([]reflect.Value, error) { + const argsOffset = 1 // TODO: Make more efficient, perhaps by checking the first character for '{' or '['? // First, try to get the map. @@ -231,20 +238,6 @@ func jsonParamsToArgs(rpcFunc *RPCFunc, cdc *amino.Codec, raw []byte, argsOffset return nil, errors.Errorf("Unknown type for JSON params: %v. Expected map or array", err) } -// Convert a []interface{} OR a map[string]interface{} to properly typed values -func jsonParamsToArgsRPC(rpcFunc *RPCFunc, cdc *amino.Codec, params json.RawMessage) ([]reflect.Value, error) { - return jsonParamsToArgs(rpcFunc, cdc, params, 0) -} - -// Same as above, but with the first param the websocket connection -func jsonParamsToArgsWS(rpcFunc *RPCFunc, cdc *amino.Codec, params json.RawMessage, wsCtx types.WSRPCContext) ([]reflect.Value, error) { - values, err := jsonParamsToArgs(rpcFunc, cdc, params, 1) - if err != nil { - return nil, err - } - return append([]reflect.Value{reflect.ValueOf(wsCtx)}, values...), nil -} - // rpc.json //----------------------------------------------------------------------------- // rpc.http @@ -257,15 +250,23 @@ func makeHTTPHandler(rpcFunc *RPCFunc, cdc *amino.Codec, logger log.Logger) func WriteRPCResponseHTTP(w, types.RPCMethodNotFoundError(types.JSONRPCStringID(""))) } } + // All other endpoints return func(w http.ResponseWriter, r *http.Request) { logger.Debug("HTTP HANDLER", "req", r) - args, err := httpParamsToArgs(rpcFunc, cdc, r) + + ctx := &types.Context{HTTPReq: r} + args := []reflect.Value{reflect.ValueOf(ctx)} + + fnArgs, err := httpParamsToArgs(rpcFunc, cdc, r) if err != nil { WriteRPCResponseHTTP(w, types.RPCInvalidParamsError(types.JSONRPCStringID(""), errors.Wrap(err, "Error converting http params to arguments"))) return } + args = append(args, fnArgs...) + returns := rpcFunc.f.Call(args) + logger.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) result, err := unreflectResult(returns) if err != nil { @@ -621,27 +622,23 @@ func (wsc *wsConnection) readRoutine() { } // Now, fetch the RPCFunc and execute it. - rpcFunc := wsc.funcMap[request.Method] if rpcFunc == nil { wsc.WriteRPCResponse(types.RPCMethodNotFoundError(request.ID)) continue } - var args []reflect.Value - if rpcFunc.ws { - wsCtx := types.WSRPCContext{Request: request, WSRPCConnection: wsc} - if len(request.Params) > 0 { - args, err = jsonParamsToArgsWS(rpcFunc, wsc.cdc, request.Params, wsCtx) - } - } else { - if len(request.Params) > 0 { - args, err = jsonParamsToArgsRPC(rpcFunc, wsc.cdc, request.Params) + + ctx := &types.Context{JSONReq: &request, WSConn: wsc} + args := []reflect.Value{reflect.ValueOf(ctx)} + if len(request.Params) > 0 { + fnArgs, err := jsonParamsToArgs(rpcFunc, wsc.cdc, request.Params) + if err != nil { + wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "Error converting json params to arguments"))) + continue } + args = append(args, fnArgs...) } - if err != nil { - wsc.WriteRPCResponse(types.RPCInternalError(request.ID, errors.Wrap(err, "Error converting json params to arguments"))) - continue - } + returns := rpcFunc.f.Call(args) // TODO: Need to encode args/returns to string if we want to log them diff --git a/rpc/lib/server/handlers_test.go b/rpc/lib/server/handlers_test.go index b1d3c788..f8ad0610 100644 --- a/rpc/lib/server/handlers_test.go +++ b/rpc/lib/server/handlers_test.go @@ -28,7 +28,7 @@ import ( func testMux() *http.ServeMux { funcMap := map[string]*rs.RPCFunc{ - "c": rs.NewRPCFunc(func(s string, i int) (string, error) { return "foo", nil }, "s,i"), + "c": rs.NewRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), } cdc := amino.NewCodec() mux := http.NewServeMux() @@ -195,7 +195,7 @@ func TestWebsocketManagerHandler(t *testing.T) { func newWSServer() *httptest.Server { funcMap := map[string]*rs.RPCFunc{ - "c": rs.NewWSRPCFunc(func(wsCtx types.WSRPCContext, s string, i int) (string, error) { return "foo", nil }, "s,i"), + "c": rs.NewWSRPCFunc(func(ctx *types.Context, s string, i int) (string, error) { return "foo", nil }, "s,i"), } wm := rs.NewWebsocketManager(funcMap, amino.NewCodec()) wm.SetLogger(log.TestingLogger()) diff --git a/rpc/lib/server/parse_test.go b/rpc/lib/server/parse_test.go index 7b0aacdb..8d00f1fe 100644 --- a/rpc/lib/server/parse_test.go +++ b/rpc/lib/server/parse_test.go @@ -137,8 +137,6 @@ func TestParseJSONArray(t *testing.T) { } func TestParseJSONRPC(t *testing.T) { - assert := assert.New(t) - demo := func(height int, name string) {} call := NewRPCFunc(demo, "height,name") cdc := amino.NewCodec() @@ -162,14 +160,14 @@ func TestParseJSONRPC(t *testing.T) { for idx, tc := range cases { i := strconv.Itoa(idx) data := []byte(tc.raw) - vals, err := jsonParamsToArgs(call, cdc, data, 0) + vals, err := jsonParamsToArgs(call, cdc, data) if tc.fail { - assert.NotNil(err, i) + assert.NotNil(t, err, i) } else { - assert.Nil(err, "%s: %+v", i, err) - if assert.Equal(2, len(vals), i) { - assert.Equal(tc.height, vals[0].Int(), i) - assert.Equal(tc.name, vals[1].String(), i) + assert.Nil(t, err, "%s: %+v", i, err) + if assert.Equal(t, 2, len(vals), i) { + assert.Equal(t, tc.height, vals[0].Int(), i) + assert.Equal(t, tc.name, vals[1].String(), i) } } @@ -177,7 +175,6 @@ func TestParseJSONRPC(t *testing.T) { } func TestParseURI(t *testing.T) { - demo := func(height int, name string) {} call := NewRPCFunc(demo, "height,name") cdc := amino.NewCodec() diff --git a/rpc/lib/types/types.go b/rpc/lib/types/types.go index ceb7be83..cbfee46b 100644 --- a/rpc/lib/types/types.go +++ b/rpc/lib/types/types.go @@ -3,6 +3,7 @@ package rpctypes import ( "encoding/json" "fmt" + "net/http" "reflect" "strings" @@ -232,22 +233,45 @@ func RPCServerError(id jsonrpcid, err error) RPCResponse { //---------------------------------------- -// *wsConnection implements this interface. +// WSRPCConnection represents a websocket connection. type WSRPCConnection interface { + // GetRemoteAddr returns a remote address of the connection. GetRemoteAddr() string + // WriteRPCResponse writes the resp onto connection (BLOCKING). WriteRPCResponse(resp RPCResponse) + // TryWriteRPCResponse tries to write the resp onto connection (NON-BLOCKING). TryWriteRPCResponse(resp RPCResponse) bool + // Codec returns an Amino codec used. Codec() *amino.Codec } -// websocket-only RPCFuncs take this as the first parameter. -type WSRPCContext struct { - Request RPCRequest - WSRPCConnection +// Context is the first parameter for all functions. It carries a json-rpc +// request, http request and websocket connection. +// +// - JSONReq is non-nil when JSONRPC is called over websocket. +// - WSConn is non-nil when we're connected via a websocket. +// - HTTPReq is non-nil when URI or JSONRPC is called over HTTP. +type Context struct { + // json-rpc request + JSONReq *RPCRequest + // websocket connection + WSConn WSRPCConnection + // http request + HTTPReq *http.Request +} + +// RemoteAddr returns either HTTPReq#RemoteAddr or result of the +// WSConn#GetRemoteAddr(). +func (ctx *Context) RemoteAddr() string { + if ctx.HTTPReq != nil { + return ctx.HTTPReq.RemoteAddr + } + return ctx.WSConn.GetRemoteAddr() } //---------------------------------------- // SOCKETS + // // Determine if its a unix or tcp socket. // If tcp, must specify the port; `0.0.0.0` will return incorrectly as "unix" since there's no port