diff --git a/README.md b/README.md index fd01b380..e74cf802 100644 --- a/README.md +++ b/README.md @@ -1 +1,4 @@ -# go-rpc \ No newline at end of file +# go-rpc + +HTTP RPC server supporting calls via uri params, jsonrpc, and jsonrpc over websockets + diff --git a/client/http_client.go b/client/http_client.go new file mode 100644 index 00000000..133f2b72 --- /dev/null +++ b/client/http_client.go @@ -0,0 +1,133 @@ +package rpcclient + +import ( + "bytes" + "errors" + "io/ioutil" + "net/http" + "net/url" + "strings" + + . "github.com/tendermint/go-common" + "github.com/tendermint/go-rpc/types" + "github.com/tendermint/go-wire" +) + +// JSON rpc takes params as a slice +type ClientJSONRPC struct { + remote string +} + +func NewClientJSONRPC(remote string) *ClientJSONRPC { + return &ClientJSONRPC{remote} +} + +func (c *ClientJSONRPC) Call(method string, params []interface{}) (interface{}, error) { + return CallHTTP_JSONRPC(c.remote, method, params) +} + +// URI takes params as a map +type ClientURI struct { + remote string +} + +func NewClientURI(remote string) *ClientURI { + if !strings.HasSuffix(remote, "/") { + remote = remote + "/" + } + return &ClientURI{remote} +} + +func (c *ClientURI) Call(method string, params map[string]interface{}) (interface{}, error) { + return CallHTTP_URI(c.remote, method, params) +} + +func CallHTTP_JSONRPC(remote string, method string, params []interface{}) (interface{}, error) { + // Make request and get responseBytes + request := rpctypes.RPCRequest{ + JSONRPC: "2.0", + Method: method, + Params: params, + ID: "", + } + requestBytes := wire.JSONBytes(request) + requestBuf := bytes.NewBuffer(requestBytes) + log.Info(Fmt("RPC request to %v: %v", remote, string(requestBytes))) + httpResponse, err := http.Post(remote, "text/json", requestBuf) + if err != nil { + return nil, err + } + defer httpResponse.Body.Close() + responseBytes, err := ioutil.ReadAll(httpResponse.Body) + if err != nil { + return nil, err + } + log.Info(Fmt("RPC response: %v", string(responseBytes))) + return unmarshalResponseBytes(responseBytes) +} + +func CallHTTP_URI(remote string, method string, params map[string]interface{}) (interface{}, error) { + values, err := argsToURLValues(params) + if err != nil { + return nil, err + } + log.Info(Fmt("URI request to %v: %v", remote, values)) + resp, err := http.PostForm(remote+method, values) + if err != nil { + return nil, err + } + defer resp.Body.Close() + responseBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + return unmarshalResponseBytes(responseBytes) +} + +//------------------------------------------------ + +func unmarshalResponseBytes(responseBytes []byte) (interface{}, error) { + // read response + // if rpc/core/types is imported, the result will unmarshal + // into the correct type + var err error + response := &rpctypes.RPCResponse{} + wire.ReadJSON(response, responseBytes, &err) + if err != nil { + return nil, err + } + errorStr := response.Error + if errorStr != "" { + return nil, errors.New(errorStr) + } + return response.Result, err +} + +func argsToURLValues(args map[string]interface{}) (url.Values, error) { + values := make(url.Values) + if len(args) == 0 { + return values, nil + } + err := argsToJson(args) + if err != nil { + return nil, err + } + for key, val := range args { + values.Set(key, val.(string)) + } + return values, nil +} + +func argsToJson(args map[string]interface{}) error { + var n int + var err error + for k, v := range args { + buf := new(bytes.Buffer) + wire.WriteJSON(v, buf, &n, &err) + if err != nil { + return err + } + args[k] = buf.String() + } + return nil +} diff --git a/client/log.go b/client/log.go new file mode 100644 index 00000000..8b33e2f1 --- /dev/null +++ b/client/log.go @@ -0,0 +1,7 @@ +package rpcclient + +import ( + "github.com/tendermint/log15" +) + +var log = log15.New("module", "rpcclient") diff --git a/client/ws_client.go b/client/ws_client.go new file mode 100644 index 00000000..ae2b324a --- /dev/null +++ b/client/ws_client.go @@ -0,0 +1,119 @@ +package rpcclient + +import ( + "net/http" + "time" + + "github.com/gorilla/websocket" + . "github.com/tendermint/go-common" + "github.com/tendermint/go-rpc/types" + "github.com/tendermint/go-wire" +) + +const ( + wsResultsChannelCapacity = 10 + wsWriteTimeoutSeconds = 10 +) + +type WSClient struct { + QuitService + Address string + *websocket.Conn + ResultsCh chan rpctypes.Result // closes upon WSClient.Stop() +} + +// create a new connection +func NewWSClient(addr string) *WSClient { + wsClient := &WSClient{ + Address: addr, + Conn: nil, + ResultsCh: make(chan rpctypes.Result, wsResultsChannelCapacity), + } + wsClient.QuitService = *NewQuitService(log, "WSClient", wsClient) + return wsClient +} + +func (wsc *WSClient) OnStart() error { + wsc.QuitService.OnStart() + err := wsc.dial() + if err != nil { + return err + } + go wsc.receiveEventsRoutine() + return nil +} + +func (wsc *WSClient) dial() error { + // Dial + dialer := websocket.DefaultDialer + rHeader := http.Header{} + con, _, err := dialer.Dial(wsc.Address, rHeader) + if err != nil { + return err + } + // Set the ping/pong handlers + con.SetPingHandler(func(m string) error { + // NOTE: https://github.com/gorilla/websocket/issues/97 + log.Debug("Client received ping, writing pong") + go con.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) + return nil + }) + con.SetPongHandler(func(m string) error { + log.Debug("Client received pong") + // NOTE: https://github.com/gorilla/websocket/issues/97 + return nil + }) + wsc.Conn = con + return nil +} + +func (wsc *WSClient) OnStop() { + wsc.QuitService.OnStop() + // ResultsCh is closed in receiveEventsRoutine. +} + +func (wsc *WSClient) receiveEventsRoutine() { + for { + log.Notice("Waiting for wsc message ...") + _, data, err := wsc.ReadMessage() + if err != nil { + log.Info("WSClient failed to read message", "error", err, "data", string(data)) + wsc.Stop() + break + } else { + var response rpctypes.RPCResponse + wire.ReadJSON(&response, data, &err) + if err != nil { + log.Info("WSClient failed to parse message", "error", err, "data", string(data)) + wsc.Stop() + break + } + wsc.ResultsCh <- response.Result + } + } + + // Cleanup + close(wsc.ResultsCh) +} + +// subscribe to an event +func (wsc *WSClient) Subscribe(eventid string) error { + err := wsc.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "subscribe", + Params: []interface{}{eventid}, + }) + return err +} + +// unsubscribe from an event +func (wsc *WSClient) Unsubscribe(eventid string) error { + err := wsc.WriteJSON(rpctypes.RPCRequest{ + JSONRPC: "2.0", + ID: "", + Method: "unsubscribe", + Params: []interface{}{eventid}, + }) + return err +} diff --git a/server/handlers.go b/server/handlers.go new file mode 100644 index 00000000..64c15180 --- /dev/null +++ b/server/handlers.go @@ -0,0 +1,553 @@ +package rpcserver + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "reflect" + "sort" + "time" + + "github.com/gorilla/websocket" + . "github.com/tendermint/go-common" + "github.com/tendermint/go-events" + . "github.com/tendermint/go-rpc/types" + "github.com/tendermint/go-wire" +) + +func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) { + // HTTP endpoints + for funcName, rpcFunc := range funcMap { + mux.HandleFunc("/"+funcName, makeHTTPHandler(rpcFunc)) + } + + // JSONRPC endpoints + mux.HandleFunc("/", makeJSONRPCHandler(funcMap)) +} + +//------------------------------------- +// function introspection + +// holds all type information for each function +type RPCFunc struct { + f reflect.Value // underlying rpc function + args []reflect.Type // type of each function arg + returns []reflect.Type // type of each return arg + argNames []string // name of each argument + ws bool // websocket only +} + +// wraps a function for quicker introspection +func NewRPCFunc(f interface{}, args []string) *RPCFunc { + return &RPCFunc{ + f: reflect.ValueOf(f), + args: funcArgTypes(f), + returns: funcReturnTypes(f), + argNames: args, + ws: false, + } +} + +func NewWSRPCFunc(f interface{}, args []string) *RPCFunc { + return &RPCFunc{ + f: reflect.ValueOf(f), + args: funcArgTypes(f), + returns: funcReturnTypes(f), + argNames: args, + ws: true, + } +} + +// return a function's argument types +func funcArgTypes(f interface{}) []reflect.Type { + t := reflect.TypeOf(f) + n := t.NumIn() + typez := make([]reflect.Type, n) + for i := 0; i < n; i++ { + typez[i] = t.In(i) + } + return typez +} + +// return a function's return types +func funcReturnTypes(f interface{}) []reflect.Type { + t := reflect.TypeOf(f) + n := t.NumOut() + typez := make([]reflect.Type, n) + for i := 0; i < n; i++ { + typez[i] = t.Out(i) + } + return typez +} + +// function introspection +//----------------------------------------------------------------------------- +// rpc.json + +// jsonrpc calls grab the given method's function info and runs reflect.Call +func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + b, _ := ioutil.ReadAll(r.Body) + // if its an empty request (like from a browser), + // just display a list of functions + if len(b) == 0 { + writeListOfEndpoints(w, r, funcMap) + return + } + + var request RPCRequest + err := json.Unmarshal(b, &request) + if err != nil { + WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) + return + } + if len(r.URL.Path) > 1 { + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path))) + return + } + rpcFunc := funcMap[request.Method] + if rpcFunc == nil { + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) + return + } + if rpcFunc.ws { + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, "RPC method is only for websockets: "+request.Method)) + return + } + args, err := jsonParamsToArgs(rpcFunc, request.Params) + if err != nil { + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) + return + } + returns := rpcFunc.f.Call(args) + log.Info("HTTPJSONRPC", "method", request.Method, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, nil, err.Error())) + return + } + WriteRPCResponseHTTP(w, NewRPCResponse(request.ID, result, "")) + } +} + +// Convert a list of interfaces to properly typed values +func jsonParamsToArgs(rpcFunc *RPCFunc, params []interface{}) ([]reflect.Value, error) { + if len(rpcFunc.argNames) != len(params) { + return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", + len(rpcFunc.argNames), rpcFunc.argNames, len(params), params)) + } + values := make([]reflect.Value, len(params)) + for i, p := range params { + ty := rpcFunc.args[i] + v, err := _jsonObjectToArg(ty, p) + if err != nil { + return nil, err + } + values[i] = v + } + return values, nil +} + +// Same as above, but with the first param the websocket connection +func jsonParamsToArgsWS(rpcFunc *RPCFunc, params []interface{}, wsCtx WSRPCContext) ([]reflect.Value, error) { + if len(rpcFunc.argNames) != len(params) { + return nil, errors.New(fmt.Sprintf("Expected %v parameters (%v), got %v (%v)", + len(rpcFunc.argNames)-1, rpcFunc.argNames[1:], len(params), params)) + } + values := make([]reflect.Value, len(params)+1) + values[0] = reflect.ValueOf(wsCtx) + for i, p := range params { + ty := rpcFunc.args[i+1] + v, err := _jsonObjectToArg(ty, p) + if err != nil { + return nil, err + } + values[i+1] = v + } + return values, nil +} + +func _jsonObjectToArg(ty reflect.Type, object interface{}) (reflect.Value, error) { + var err error + v := reflect.New(ty) + wire.ReadJSONObjectPtr(v.Interface(), object, &err) + if err != nil { + return v, err + } + v = v.Elem() + return v, nil +} + +// rpc.json +//----------------------------------------------------------------------------- +// rpc.http + +// convert from a function name to the http handler +func makeHTTPHandler(rpcFunc *RPCFunc) func(http.ResponseWriter, *http.Request) { + // Exception for websocket endpoints + if rpcFunc.ws { + return func(w http.ResponseWriter, r *http.Request) { + WriteRPCResponseHTTP(w, NewRPCResponse("", nil, "This RPC method is only for websockets")) + } + } + // All other endpoints + return func(w http.ResponseWriter, r *http.Request) { + args, err := httpParamsToArgs(rpcFunc, r) + if err != nil { + WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) + return + } + returns := rpcFunc.f.Call(args) + log.Info("HTTPRestRPC", "method", r.URL.Path, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + WriteRPCResponseHTTP(w, NewRPCResponse("", nil, err.Error())) + return + } + WriteRPCResponseHTTP(w, NewRPCResponse("", result, "")) + } +} + +// Covert an http query to a list of properly typed values. +// To be properly decoded the arg must be a concrete type from tendermint (if its an interface). +func httpParamsToArgs(rpcFunc *RPCFunc, r *http.Request) ([]reflect.Value, error) { + argTypes := rpcFunc.args + argNames := rpcFunc.argNames + + var err error + values := make([]reflect.Value, len(argNames)) + for i, name := range argNames { + ty := argTypes[i] + arg := GetParam(r, name) + values[i], err = _jsonStringToArg(ty, arg) + if err != nil { + return nil, err + } + } + return values, nil +} + +func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) { + var err error + v := reflect.New(ty) + wire.ReadJSONPtr(v.Interface(), []byte(arg), &err) + if err != nil { + return v, err + } + v = v.Elem() + return v, nil +} + +// rpc.http +//----------------------------------------------------------------------------- +// rpc.websocket + +const ( + writeChanCapacity = 1000 + wsWriteTimeoutSeconds = 30 // each write times out after this + wsReadTimeoutSeconds = 30 // connection times out if we haven't received *anything* in this long, not even pings. + wsPingTickerSeconds = 10 // send a ping every PingTickerSeconds. +) + +// a single websocket connection +// contains listener id, underlying ws connection, +// and the event switch for subscribing to events +type wsConnection struct { + QuitService + + remoteAddr string + baseConn *websocket.Conn + writeChan chan RPCResponse + readTimeout *time.Timer + pingTicker *time.Ticker + + funcMap map[string]*RPCFunc + evsw *events.EventSwitch +} + +// new websocket connection wrapper +func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *wsConnection { + wsc := &wsConnection{ + remoteAddr: baseConn.RemoteAddr().String(), + baseConn: baseConn, + writeChan: make(chan RPCResponse, writeChanCapacity), // error when full. + funcMap: funcMap, + evsw: evsw, + } + wsc.QuitService = *NewQuitService(log, "wsConnection", wsc) + return wsc +} + +// wsc.Start() blocks until the connection closes. +func (wsc *wsConnection) OnStart() error { + wsc.QuitService.OnStart() + + // Read subscriptions/unsubscriptions to events + go wsc.readRoutine() + + // Custom Ping handler to touch readTimeout + wsc.readTimeout = time.NewTimer(time.Second * wsReadTimeoutSeconds) + wsc.pingTicker = time.NewTicker(time.Second * wsPingTickerSeconds) + wsc.baseConn.SetPingHandler(func(m string) error { + // NOTE: https://github.com/gorilla/websocket/issues/97 + go wsc.baseConn.WriteControl(websocket.PongMessage, []byte(m), time.Now().Add(time.Second*wsWriteTimeoutSeconds)) + wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) + return nil + }) + wsc.baseConn.SetPongHandler(func(m string) error { + // NOTE: https://github.com/gorilla/websocket/issues/97 + wsc.readTimeout.Reset(time.Second * wsReadTimeoutSeconds) + return nil + }) + go wsc.readTimeoutRoutine() + + // Write responses, BLOCKING. + wsc.writeRoutine() + return nil +} + +func (wsc *wsConnection) OnStop() { + wsc.QuitService.OnStop() + wsc.evsw.RemoveListener(wsc.remoteAddr) + wsc.readTimeout.Stop() + wsc.pingTicker.Stop() + // The write loop closes the websocket connection + // when it exits its loop, and the read loop + // closes the writeChan +} + +func (wsc *wsConnection) readTimeoutRoutine() { + select { + case <-wsc.readTimeout.C: + log.Notice("Stopping connection due to read timeout") + wsc.Stop() + case <-wsc.Quit: + return + } +} + +// Implements WSRPCConnection +func (wsc *wsConnection) GetRemoteAddr() string { + return wsc.remoteAddr +} + +// Implements WSRPCConnection +func (wsc *wsConnection) GetEventSwitch() *events.EventSwitch { + return wsc.evsw +} + +// Implements WSRPCConnection +// Blocking write to writeChan until service stops. +func (wsc *wsConnection) WriteRPCResponse(resp RPCResponse) { + select { + case <-wsc.Quit: + return + case wsc.writeChan <- resp: + } +} + +// Implements WSRPCConnection +// Nonblocking write. +func (wsc *wsConnection) TryWriteRPCResponse(resp RPCResponse) bool { + select { + case <-wsc.Quit: + return false + case wsc.writeChan <- resp: + return true + default: + return false + } +} + +// Read from the socket and subscribe to or unsubscribe from events +func (wsc *wsConnection) readRoutine() { + // Do not close writeChan, to allow WriteRPCResponse() to fail. + // defer close(wsc.writeChan) + + for { + select { + case <-wsc.Quit: + return + default: + var in []byte + // Do not set a deadline here like below: + // wsc.baseConn.SetReadDeadline(time.Now().Add(time.Second * wsReadTimeoutSeconds)) + // The client may not send anything for a while. + // We use `readTimeout` to handle read timeouts. + _, in, err := wsc.baseConn.ReadMessage() + if err != nil { + log.Notice("Failed to read from connection", "remote", wsc.remoteAddr) + // an error reading the connection, + // kill the connection + wsc.Stop() + return + } + var request RPCRequest + err = json.Unmarshal(in, &request) + if err != nil { + errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error()) + wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, errStr)) + continue + } + + // Now, fetch the RPCFunc and execute it. + + rpcFunc := wsc.funcMap[request.Method] + if rpcFunc == nil { + wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, "RPC method unknown: "+request.Method)) + continue + } + var args []reflect.Value + if rpcFunc.ws { + wsCtx := WSRPCContext{Request: request, WSRPCConnection: wsc} + args, err = jsonParamsToArgsWS(rpcFunc, request.Params, wsCtx) + } else { + args, err = jsonParamsToArgs(rpcFunc, request.Params) + } + if err != nil { + wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) + continue + } + returns := rpcFunc.f.Call(args) + log.Info("WSJSONRPC", "method", request.Method, "args", args, "returns", returns) + result, err := unreflectResult(returns) + if err != nil { + wsc.WriteRPCResponse(NewRPCResponse(request.ID, nil, err.Error())) + continue + } else { + wsc.WriteRPCResponse(NewRPCResponse(request.ID, result, "")) + continue + } + + } + } +} + +// receives on a write channel and writes out on the socket +func (wsc *wsConnection) writeRoutine() { + defer wsc.baseConn.Close() + var n, err = int(0), error(nil) + for { + select { + case <-wsc.Quit: + return + case <-wsc.pingTicker.C: + err := wsc.baseConn.WriteMessage(websocket.PingMessage, []byte{}) + if err != nil { + log.Error("Failed to write ping message on websocket", "error", err) + wsc.Stop() + return + } + case msg := <-wsc.writeChan: + buf := new(bytes.Buffer) + wire.WriteJSON(msg, buf, &n, &err) + if err != nil { + log.Error("Failed to marshal RPCResponse to JSON", "error", err) + } else { + wsc.baseConn.SetWriteDeadline(time.Now().Add(time.Second * wsWriteTimeoutSeconds)) + bufBytes := buf.Bytes() + if err = wsc.baseConn.WriteMessage(websocket.TextMessage, bufBytes); err != nil { + log.Warn("Failed to write response on websocket", "error", err) + wsc.Stop() + return + } + } + } + } +} + +//---------------------------------------- + +// Main manager for all websocket connections +// Holds the event switch +// NOTE: The websocket path is defined externally, e.g. in node/node.go +type WebsocketManager struct { + websocket.Upgrader + funcMap map[string]*RPCFunc + evsw *events.EventSwitch +} + +func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw *events.EventSwitch) *WebsocketManager { + return &WebsocketManager{ + funcMap: funcMap, + evsw: evsw, + Upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + // TODO + return true + }, + }, + } +} + +// Upgrade the request/response (via http.Hijack) and starts the wsConnection. +func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) { + wsConn, err := wm.Upgrade(w, r, nil) + if err != nil { + // TODO - return http error + log.Error("Failed to upgrade to websocket connection", "error", err) + return + } + + // register connection + con := NewWSConnection(wsConn, wm.funcMap, wm.evsw) + log.Notice("New websocket connection", "remote", con.remoteAddr) + con.Start() // Blocking +} + +// rpc.websocket +//----------------------------------------------------------------------------- + +// returns is result struct and error. If error is not nil, return it +func unreflectResult(returns []reflect.Value) (interface{}, error) { + errV := returns[1] + if errV.Interface() != nil { + return nil, fmt.Errorf("%v", errV.Interface()) + } + return returns[0].Interface(), nil +} + +// writes a list of available rpc endpoints as an html page +func writeListOfEndpoints(w http.ResponseWriter, r *http.Request, funcMap map[string]*RPCFunc) { + noArgNames := []string{} + argNames := []string{} + for name, funcData := range funcMap { + if len(funcData.args) == 0 { + noArgNames = append(noArgNames, name) + } else { + argNames = append(argNames, name) + } + } + sort.Strings(noArgNames) + sort.Strings(argNames) + buf := new(bytes.Buffer) + buf.WriteString("
") + buf.WriteString("