mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-25 18:51:39 +00:00
Merge branch 'rpc_jae' into develop
Conflicts: node/node.go rpc/core/accounts.go rpc/core_client/client.go rpc/handlers.go rpc/http_server.go rpc/test/helpers.go rpc/test/http_rpc_test.go rpc/test/json_rpc_test.go
This commit is contained in:
178
rpc/handlers.go
178
rpc/handlers.go
@ -1,12 +1,16 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/tendermint/tendermint/binary"
|
||||
"github.com/tendermint/tendermint/events"
|
||||
"golang.org/x/net/websocket"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"time"
|
||||
)
|
||||
|
||||
func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
|
||||
@ -19,6 +23,15 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc) {
|
||||
mux.HandleFunc("/", makeJSONRPCHandler(funcMap))
|
||||
}
|
||||
|
||||
func RegisterEventsHandler(mux *http.ServeMux, evsw *events.EventSwitch) {
|
||||
// websocket endpoint
|
||||
w := NewWebsocketManager(evsw)
|
||||
mux.Handle("/events", websocket.Handler(w.eventsHandler))
|
||||
}
|
||||
|
||||
//-------------------------------------
|
||||
// function introspection
|
||||
|
||||
// holds all type information for each function
|
||||
type RPCFunc struct {
|
||||
f reflect.Value // underlying rpc function
|
||||
@ -27,6 +40,7 @@ type RPCFunc struct {
|
||||
argNames []string // name of each argument
|
||||
}
|
||||
|
||||
// wraps a function for quicker introspection
|
||||
func NewRPCFunc(f interface{}, args []string) *RPCFunc {
|
||||
return &RPCFunc{
|
||||
f: reflect.ValueOf(f),
|
||||
@ -36,6 +50,7 @@ func NewRPCFunc(f interface{}, args []string) *RPCFunc {
|
||||
}
|
||||
}
|
||||
|
||||
// return a function's argument types
|
||||
func funcArgTypes(f interface{}) []reflect.Type {
|
||||
t := reflect.TypeOf(f)
|
||||
n := t.NumIn()
|
||||
@ -46,6 +61,7 @@ func funcArgTypes(f interface{}) []reflect.Type {
|
||||
return types
|
||||
}
|
||||
|
||||
// return a function's return types
|
||||
func funcReturnTypes(f interface{}) []reflect.Type {
|
||||
t := reflect.TypeOf(f)
|
||||
n := t.NumOut()
|
||||
@ -56,12 +72,17 @@ func funcReturnTypes(f interface{}) []reflect.Type {
|
||||
return types
|
||||
}
|
||||
|
||||
// function introspection
|
||||
//-----------------------------------------------------------------------------
|
||||
// rpc.json
|
||||
|
||||
// jsonrpc calls grab the given method's function info and runs reflect.Call
|
||||
func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if len(r.URL.Path) > 1 {
|
||||
WriteRPCResponse(w, NewRPCResponse(nil, fmt.Sprintf("Invalid JSONRPC endpoint %s", r.URL.Path)))
|
||||
return
|
||||
}
|
||||
b, _ := ioutil.ReadAll(r.Body)
|
||||
var request RPCRequest
|
||||
err := json.Unmarshal(b, &request)
|
||||
@ -69,7 +90,6 @@ func makeJSONRPCHandler(funcMap map[string]*RPCFunc) http.HandlerFunc {
|
||||
WriteRPCResponse(w, NewRPCResponse(nil, err.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
rpcFunc := funcMap[request.Method]
|
||||
if rpcFunc == nil {
|
||||
WriteRPCResponse(w, NewRPCResponse(nil, "RPC method unknown: "+request.Method))
|
||||
@ -169,6 +189,162 @@ func _jsonStringToArg(ty reflect.Type, arg string) (reflect.Value, error) {
|
||||
|
||||
// rpc.http
|
||||
//-----------------------------------------------------------------------------
|
||||
// rpc.websocket
|
||||
|
||||
// for requests coming in
|
||||
type WsRequest struct {
|
||||
Type string // subscribe or unsubscribe
|
||||
Event string
|
||||
}
|
||||
|
||||
// for responses going out
|
||||
type WsResponse struct {
|
||||
Event string
|
||||
Data interface{}
|
||||
Error string
|
||||
}
|
||||
|
||||
// a single websocket connection
|
||||
// contains the listeners id
|
||||
type Connection struct {
|
||||
id string
|
||||
wsCon *websocket.Conn
|
||||
writeChan chan WsResponse
|
||||
quitChan chan struct{}
|
||||
failedSends uint
|
||||
}
|
||||
|
||||
// new websocket connection wrapper
|
||||
func NewConnection(con *websocket.Conn) *Connection {
|
||||
return &Connection{
|
||||
id: con.RemoteAddr().String(),
|
||||
wsCon: con,
|
||||
writeChan: make(chan WsResponse, WriteChanBuffer), // buffered. we keep track when its full
|
||||
}
|
||||
}
|
||||
|
||||
// close the connection
|
||||
func (c *Connection) Close() {
|
||||
c.wsCon.Close()
|
||||
close(c.writeChan)
|
||||
close(c.quitChan)
|
||||
}
|
||||
|
||||
// main manager for all websocket connections
|
||||
// holds the event switch
|
||||
type WebsocketManager struct {
|
||||
ew *events.EventSwitch
|
||||
cons map[string]*Connection
|
||||
}
|
||||
|
||||
func NewWebsocketManager(ew *events.EventSwitch) *WebsocketManager {
|
||||
return &WebsocketManager{
|
||||
ew: ew,
|
||||
cons: make(map[string]*Connection),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WebsocketManager) eventsHandler(con *websocket.Conn) {
|
||||
// register connection
|
||||
c := NewConnection(con)
|
||||
w.cons[con.RemoteAddr().String()] = c
|
||||
|
||||
// read subscriptions/unsubscriptions to events
|
||||
go w.read(c)
|
||||
// write responses
|
||||
go w.write(c)
|
||||
}
|
||||
|
||||
const (
|
||||
WsConnectionReaperSeconds = 5
|
||||
MaxFailedSendsSeconds = 10
|
||||
WriteChanBuffer = 10
|
||||
)
|
||||
|
||||
// read from the socket and subscribe to or unsubscribe from events
|
||||
func (w *WebsocketManager) read(con *Connection) {
|
||||
reaper := time.Tick(time.Second * WsConnectionReaperSeconds)
|
||||
for {
|
||||
select {
|
||||
case <-reaper:
|
||||
if con.failedSends > MaxFailedSendsSeconds {
|
||||
// sending has failed too many times.
|
||||
// kill the connection
|
||||
con.quitChan <- struct{}{}
|
||||
}
|
||||
default:
|
||||
var in []byte
|
||||
if err := websocket.Message.Receive(con.wsCon, &in); err != nil {
|
||||
// an error reading the connection,
|
||||
// so kill the connection
|
||||
con.quitChan <- struct{}{}
|
||||
}
|
||||
var req WsRequest
|
||||
err := json.Unmarshal(in, &req)
|
||||
if err != nil {
|
||||
errStr := fmt.Sprintf("Error unmarshaling data: %s", err.Error())
|
||||
con.writeChan <- WsResponse{Error: errStr}
|
||||
}
|
||||
switch req.Type {
|
||||
case "subscribe":
|
||||
w.ew.AddListenerForEvent(con.id, req.Event, func(msg interface{}) {
|
||||
resp := WsResponse{
|
||||
Event: req.Event,
|
||||
Data: msg,
|
||||
}
|
||||
select {
|
||||
case con.writeChan <- resp:
|
||||
// yay
|
||||
con.failedSends = 0
|
||||
default:
|
||||
// channel is full
|
||||
// if this happens too many times,
|
||||
// close connection
|
||||
con.failedSends += 1
|
||||
}
|
||||
})
|
||||
case "unsubscribe":
|
||||
if req.Event != "" {
|
||||
w.ew.RemoveListenerForEvent(req.Event, con.id)
|
||||
} else {
|
||||
w.ew.RemoveListener(con.id)
|
||||
}
|
||||
default:
|
||||
con.writeChan <- WsResponse{Error: "Unknown request type: " + req.Type}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// receives on a write channel and writes out to the socket
|
||||
func (w *WebsocketManager) write(con *Connection) {
|
||||
n, err := new(int64), new(error)
|
||||
for {
|
||||
select {
|
||||
case msg := <-con.writeChan:
|
||||
buf := new(bytes.Buffer)
|
||||
binary.WriteJSON(msg, buf, n, err)
|
||||
if *err != nil {
|
||||
log.Error("Failed to write JSON WsResponse", "error", err)
|
||||
} else {
|
||||
websocket.Message.Send(con.wsCon, buf.Bytes())
|
||||
}
|
||||
case <-con.quitChan:
|
||||
w.closeConn(con)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// close a connection and delete from manager
|
||||
func (w *WebsocketManager) closeConn(con *Connection) {
|
||||
con.Close()
|
||||
delete(w.cons, con.id)
|
||||
}
|
||||
|
||||
// rpc.websocket
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
// returns is Response struct and error. If error is not nil, return it
|
||||
func unreflectResponse(returns []reflect.Value) (interface{}, error) {
|
||||
|
Reference in New Issue
Block a user