bring back transparent websocket (Refs #945)

This commit is contained in:
Anton Kaliaev
2017-12-09 23:40:51 -06:00
parent b50cef742d
commit 950a64f756
12 changed files with 155 additions and 206 deletions

View File

@ -2,6 +2,7 @@ package rpcserver
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
@ -366,8 +367,6 @@ type wsConnection struct {
funcMap map[string]*RPCFunc
subscriptions map[string]interface{}
// write channel capacity
writeChanCapacity int
@ -380,8 +379,8 @@ type wsConnection struct {
// Send pings to server with this period. Must be less than readWait, but greater than zero.
pingPeriod time.Duration
// called before stopping the connection.
onDisconnect func(remoteAddr string)
// object that is used to subscribe / unsubscribe from events
eventSub types.EventSubscriber
}
// NewWSConnection wraps websocket.Conn.
@ -395,7 +394,6 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, opti
remoteAddr: baseConn.RemoteAddr().String(),
baseConn: baseConn,
funcMap: funcMap,
subscriptions: make(map[string]interface{}),
writeWait: defaultWSWriteWait,
writeChanCapacity: defaultWSWriteChanCapacity,
readWait: defaultWSReadWait,
@ -408,6 +406,15 @@ func NewWSConnection(baseConn *websocket.Conn, funcMap map[string]*RPCFunc, opti
return wsc
}
// EventSubscriber sets object that is used to subscribe / unsubscribe from
// events - not Goroutine-safe. If none given, default node's eventBus will be
// used.
func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) {
return func(wsc *wsConnection) {
wsc.eventSub = eventSub
}
}
// WriteWait sets the amount of time to wait before a websocket write times out.
// It should only be used in the constructor - not Goroutine-safe.
func WriteWait(writeWait time.Duration) func(*wsConnection) {
@ -440,14 +447,6 @@ func PingPeriod(pingPeriod time.Duration) func(*wsConnection) {
}
}
// OnDisconnect called before stopping the connection.
// It should only be used in the constructor - not Goroutine-safe.
func OnDisconnect(cb func(remoteAddr string)) func(*wsConnection) {
return func(wsc *wsConnection) {
wsc.onDisconnect = cb
}
}
// OnStart implements cmn.Service by starting the read and write routines. It
// blocks until the connection closes.
func (wsc *wsConnection) OnStart() error {
@ -461,12 +460,12 @@ func (wsc *wsConnection) OnStart() error {
return nil
}
// OnStop implements cmn.Service by calling OnDisconnect callback.
// OnStop implements cmn.Service by unsubscribing remoteAddr from all subscriptions.
func (wsc *wsConnection) OnStop() {
// Both read and write loops close the websocket connection when they exit their loops.
// The writeChan is never closed, to allow WriteRPCResponse() to fail.
if wsc.onDisconnect != nil {
wsc.onDisconnect(wsc.remoteAddr)
if wsc.eventSub != nil {
wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr)
}
}
@ -476,6 +475,11 @@ func (wsc *wsConnection) GetRemoteAddr() string {
return wsc.remoteAddr
}
// GetEventSubscriber implements WSRPCConnection by returning event subscriber.
func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber {
return wsc.eventSub
}
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
// It implements WSRPCConnection. It is Goroutine-safe.
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
@ -499,28 +503,6 @@ func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
}
}
func (wsc *wsConnection) AddSubscription(query string, data interface{}) error {
if _, ok := wsc.subscriptions[query]; ok {
return errors.New("Already subscribed")
}
wsc.subscriptions[query] = data
return nil
}
func (wsc *wsConnection) DeleteSubscription(query string) (interface{}, bool) {
data, ok := wsc.subscriptions[query]
if ok {
delete(wsc.subscriptions, query)
return data, true
}
return nil, false
}
func (wsc *wsConnection) DeleteAllSubscriptions() {
wsc.subscriptions = make(map[string]interface{})
}
// Read from the socket and subscribe to or unsubscribe from events
func (wsc *wsConnection) readRoutine() {
defer func() {