mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-14 22:01:20 +00:00
rpc: comments
This commit is contained in:
@ -92,24 +92,32 @@ func NewWSClient(remoteAddr, endpoint string, options ...func(*WSClient)) *WSCli
|
|||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MaxReconnectAttempts sets the maximum number of reconnect attempts before returning an error.
|
||||||
|
// It should only be used in the constructor and is not Goroutine-safe.
|
||||||
func MaxReconnectAttempts(max int) func(*WSClient) {
|
func MaxReconnectAttempts(max int) func(*WSClient) {
|
||||||
return func(c *WSClient) {
|
return func(c *WSClient) {
|
||||||
c.maxReconnectAttempts = max
|
c.maxReconnectAttempts = max
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadWait sets the amount of time to wait before a websocket read times out.
|
||||||
|
// It should only be used in the constructor and is not Goroutine-safe.
|
||||||
func ReadWait(readWait time.Duration) func(*WSClient) {
|
func ReadWait(readWait time.Duration) func(*WSClient) {
|
||||||
return func(c *WSClient) {
|
return func(c *WSClient) {
|
||||||
c.readWait = readWait
|
c.readWait = readWait
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WriteWait sets the amount of time to wait before a websocket write times out.
|
||||||
|
// It should only be used in the constructor and is not Goroutine-safe.
|
||||||
func WriteWait(writeWait time.Duration) func(*WSClient) {
|
func WriteWait(writeWait time.Duration) func(*WSClient) {
|
||||||
return func(c *WSClient) {
|
return func(c *WSClient) {
|
||||||
c.writeWait = writeWait
|
c.writeWait = writeWait
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PingPeriod sets the duration for sending websocket pings.
|
||||||
|
// It should only be used in the constructor - not Goroutine-safe.
|
||||||
func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
|
func PingPeriod(pingPeriod time.Duration) func(*WSClient) {
|
||||||
return func(c *WSClient) {
|
return func(c *WSClient) {
|
||||||
c.pingPeriod = pingPeriod
|
c.pingPeriod = pingPeriod
|
||||||
|
@ -32,6 +32,7 @@ func (h *myHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
defer conn.Close()
|
||||||
for {
|
for {
|
||||||
messageType, _, err := conn.ReadMessage()
|
messageType, _, err := conn.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -21,7 +21,7 @@ import (
|
|||||||
"github.com/tendermint/tmlibs/log"
|
"github.com/tendermint/tmlibs/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
|
// RegisterRPCFuncs adds a route for each function in the funcMap, as well as general jsonrpc and websocket handlers for all functions.
|
||||||
// "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
|
// "result" is the interface on which the result objects are registered, and is popualted with every RPCResponse
|
||||||
func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger log.Logger) {
|
func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger log.Logger) {
|
||||||
// HTTP endpoints
|
// HTTP endpoints
|
||||||
@ -36,7 +36,7 @@ func RegisterRPCFuncs(mux *http.ServeMux, funcMap map[string]*RPCFunc, logger lo
|
|||||||
//-------------------------------------
|
//-------------------------------------
|
||||||
// function introspection
|
// function introspection
|
||||||
|
|
||||||
// holds all type information for each function
|
// RPCFunc contains the introspected type information for a function
|
||||||
type RPCFunc struct {
|
type RPCFunc struct {
|
||||||
f reflect.Value // underlying rpc function
|
f reflect.Value // underlying rpc function
|
||||||
args []reflect.Type // type of each function arg
|
args []reflect.Type // type of each function arg
|
||||||
@ -45,12 +45,13 @@ type RPCFunc struct {
|
|||||||
ws bool // websocket only
|
ws bool // websocket only
|
||||||
}
|
}
|
||||||
|
|
||||||
// wraps a function for quicker introspection
|
// NewRPCFunc wraps a function for introspection.
|
||||||
// f is the function, args are comma separated argument names
|
// f is the function, args are comma separated argument names
|
||||||
func NewRPCFunc(f interface{}, args string) *RPCFunc {
|
func NewRPCFunc(f interface{}, args string) *RPCFunc {
|
||||||
return newRPCFunc(f, args, false)
|
return newRPCFunc(f, args, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewWSRPCFunc wraps a function for introspection and use in the websockets.
|
||||||
func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
|
func NewWSRPCFunc(f interface{}, args string) *RPCFunc {
|
||||||
return newRPCFunc(f, args, true)
|
return newRPCFunc(f, args, true)
|
||||||
}
|
}
|
||||||
@ -372,6 +373,8 @@ type wsConnection struct {
|
|||||||
// NewWSConnection wraps websocket.Conn. See the commentary on the
|
// NewWSConnection wraps websocket.Conn. See the commentary on the
|
||||||
// func(*wsConnection) functions for a detailed description of how to configure
|
// func(*wsConnection) functions for a detailed description of how to configure
|
||||||
// ping period and pong wait time.
|
// ping period and pong wait time.
|
||||||
|
// NOTE: if the write buffer is full, pongs may be dropped, which may cause clients to disconnect.
|
||||||
|
// see https://github.com/gorilla/websocket/issues/97
|
||||||
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection {
|
func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw events.EventSwitch, options ...func(*wsConnection)) *wsConnection {
|
||||||
wsc := &wsConnection{
|
wsc := &wsConnection{
|
||||||
remoteAddr: baseConn.RemoteAddr().String(),
|
remoteAddr: baseConn.RemoteAddr().String(),
|
||||||
@ -390,31 +393,39 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, evsw
|
|||||||
return wsc
|
return wsc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WriteWait sets the amount of time to wait before a websocket write times out.
|
||||||
|
// It should only be used in the constructor - not Goroutine-safe.
|
||||||
func WriteWait(writeWait time.Duration) func(*wsConnection) {
|
func WriteWait(writeWait time.Duration) func(*wsConnection) {
|
||||||
return func(wsc *wsConnection) {
|
return func(wsc *wsConnection) {
|
||||||
wsc.writeWait = writeWait
|
wsc.writeWait = writeWait
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WriteChanCapacity sets the capacity of the websocket write channel.
|
||||||
|
// It should only be used in the constructor - not Goroutine-safe.
|
||||||
func WriteChanCapacity(cap int) func(*wsConnection) {
|
func WriteChanCapacity(cap int) func(*wsConnection) {
|
||||||
return func(wsc *wsConnection) {
|
return func(wsc *wsConnection) {
|
||||||
wsc.writeChanCapacity = cap
|
wsc.writeChanCapacity = cap
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReadWait sets the amount of time to wait before a websocket read times out.
|
||||||
|
// It should only be used in the constructor - not Goroutine-safe.
|
||||||
func ReadWait(readWait time.Duration) func(*wsConnection) {
|
func ReadWait(readWait time.Duration) func(*wsConnection) {
|
||||||
return func(wsc *wsConnection) {
|
return func(wsc *wsConnection) {
|
||||||
wsc.readWait = readWait
|
wsc.readWait = readWait
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PingPeriod sets the duration for sending websocket pings.
|
||||||
|
// It should only be used in the constructor - not Goroutine-safe.
|
||||||
func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
|
func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
|
||||||
return func(wsc *wsConnection) {
|
return func(wsc *wsConnection) {
|
||||||
wsc.pingPeriod = pingPeriod
|
wsc.pingPeriod = pingPeriod
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// wsc.Start() blocks until the connection closes.
|
// OnStart starts the read and write routines. It blocks until the connection closes.
|
||||||
func (wsc *wsConnection) OnStart() error {
|
func (wsc *wsConnection) OnStart() error {
|
||||||
wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity)
|
wsc.writeChan = make(chan types.RPCResponse, wsc.writeChanCapacity)
|
||||||
|
|
||||||
@ -426,27 +437,29 @@ func (wsc *wsConnection) OnStart() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OnStop unsubscribes from all events.
|
||||||
func (wsc *wsConnection) OnStop() {
|
func (wsc *wsConnection) OnStop() {
|
||||||
if wsc.evsw != nil {
|
if wsc.evsw != nil {
|
||||||
wsc.evsw.RemoveListener(wsc.remoteAddr)
|
wsc.evsw.RemoveListener(wsc.remoteAddr)
|
||||||
}
|
}
|
||||||
// The write loop closes the websocket connection when it exits its loop, and
|
// Both read and write loops close the websocket connection when they exit their loops.
|
||||||
// the read loop closes the writeChan.
|
// The writeChan is never closed, to allow WriteRPCResponse() to fail.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements WSRPCConnection
|
// GetRemoteAddr returns the remote address of the underlying connection.
|
||||||
|
// It implements WSRPCConnection
|
||||||
func (wsc *wsConnection) GetRemoteAddr() string {
|
func (wsc *wsConnection) GetRemoteAddr() string {
|
||||||
return wsc.remoteAddr
|
return wsc.remoteAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements WSRPCConnection
|
// GetEventSwitch returns the event switch.
|
||||||
|
// It implements WSRPCConnection
|
||||||
func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
|
func (wsc *wsConnection) GetEventSwitch() events.EventSwitch {
|
||||||
return wsc.evsw
|
return wsc.evsw
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements WSRPCConnection
|
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
|
||||||
// Blocking write to writeChan until service stops.
|
// It implements WSRPCConnection. It is Goroutine-safe.
|
||||||
// Goroutine-safe
|
|
||||||
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
|
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
|
||||||
select {
|
select {
|
||||||
case <-wsc.Quit:
|
case <-wsc.Quit:
|
||||||
@ -455,9 +468,8 @@ func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Implements WSRPCConnection
|
// TryWriteRPCResponse attempts to push a response to the writeChan, but does not block.
|
||||||
// Nonblocking write.
|
// It implements WSRPCConnection. It is Goroutine-safe
|
||||||
// Goroutine-safe
|
|
||||||
func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
|
func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
|
||||||
select {
|
select {
|
||||||
case <-wsc.Quit:
|
case <-wsc.Quit:
|
||||||
@ -475,8 +487,6 @@ func (wsc *wsConnection) readRoutine() {
|
|||||||
wsc.baseConn.Close()
|
wsc.baseConn.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Do not close writeChan, to allow WriteRPCResponse() to fail.
|
|
||||||
// defer close(wsc.writeChan)
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-wsc.Quit:
|
case <-wsc.Quit:
|
||||||
@ -598,8 +608,8 @@ func (wsc *wsConnection) writeMessageWithDeadline(msgType int, msg []byte) error
|
|||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
|
||||||
// Main manager for all websocket connections
|
// WebsocketManager is the main manager for all websocket connections.
|
||||||
// Holds the event switch
|
// It holds the event switch and a map of functions for routing.
|
||||||
// NOTE: The websocket path is defined externally, e.g. in node/node.go
|
// NOTE: The websocket path is defined externally, e.g. in node/node.go
|
||||||
type WebsocketManager struct {
|
type WebsocketManager struct {
|
||||||
websocket.Upgrader
|
websocket.Upgrader
|
||||||
@ -609,6 +619,8 @@ type WebsocketManager struct {
|
|||||||
wsConnOptions []func(*wsConnection)
|
wsConnOptions []func(*wsConnection)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewWebsocketManager returns a new WebsocketManager that routes according to the given funcMap, listens on the given event switch,
|
||||||
|
// and connects to the server with the given connection options.
|
||||||
func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
|
func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, wsConnOptions ...func(*wsConnection)) *WebsocketManager {
|
||||||
return &WebsocketManager{
|
return &WebsocketManager{
|
||||||
funcMap: funcMap,
|
funcMap: funcMap,
|
||||||
@ -624,11 +636,12 @@ func NewWebsocketManager(funcMap map[string]*RPCFunc, evsw events.EventSwitch, w
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetLogger sets the logger.
|
||||||
func (wm *WebsocketManager) SetLogger(l log.Logger) {
|
func (wm *WebsocketManager) SetLogger(l log.Logger) {
|
||||||
wm.logger = l
|
wm.logger = l
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upgrade the request/response (via http.Hijack) and starts the wsConnection.
|
// WebsocketHandler upgrades the request/response (via http.Hijack) and starts the wsConnection.
|
||||||
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
|
func (wm *WebsocketManager) WebsocketHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
wsConn, err := wm.Upgrade(w, r, nil)
|
wsConn, err := wm.Upgrade(w, r, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Reference in New Issue
Block a user