rpc: subscribe on reconnection (#689)

* rpc: subscribe on reconnection

* rpc: fix unit tests
This commit is contained in:
Alexandre Thibault 2017-09-29 11:31:39 +02:00 committed by Zach Ramsay
parent b61f5482d4
commit ce36a0111a
3 changed files with 17 additions and 2 deletions

View File

@ -305,6 +305,14 @@ func (w *WSEvents) RemoveListener(listenerID string) {
w.EventSwitch.RemoveListener(listenerID) w.EventSwitch.RemoveListener(listenerID)
} }
// After being reconnected, it is necessary to redo subscription
// to server otherwise no data will be automatically received
func (w *WSEvents) redoSubscriptions() {
for event, _ := range w.evtCount {
w.subscribe(event)
}
}
// eventListener is an infinite loop pulling all websocket events // eventListener is an infinite loop pulling all websocket events
// and pushing them to the EventSwitch. // and pushing them to the EventSwitch.
// //
@ -327,6 +335,8 @@ func (w *WSEvents) eventListener() {
// before cleaning up the w.ws stuff // before cleaning up the w.ws stuff
w.done <- true w.done <- true
return return
case <-w.ws.ReconnectCh:
w.redoSubscriptions()
} }
} }
} }

View File

@ -43,6 +43,7 @@ type WSClient struct {
// user facing channels, closed only when the client is being stopped. // user facing channels, closed only when the client is being stopped.
ResultsCh chan json.RawMessage ResultsCh chan json.RawMessage
ErrorsCh chan error ErrorsCh chan error
ReconnectCh chan bool
// internal channels // internal channels
send chan types.RPCRequest // user requests send chan types.RPCRequest // user requests
@ -139,6 +140,7 @@ func (c *WSClient) OnStart() error {
c.ResultsCh = make(chan json.RawMessage) c.ResultsCh = make(chan json.RawMessage)
c.ErrorsCh = make(chan error) c.ErrorsCh = make(chan error)
c.ReconnectCh = make(chan bool)
c.send = make(chan types.RPCRequest) c.send = make(chan types.RPCRequest)
// 1 additional error may come from the read/write // 1 additional error may come from the read/write
@ -254,6 +256,7 @@ func (c *WSClient) reconnect() error {
c.Logger.Error("failed to redial", "err", err) c.Logger.Error("failed to redial", "err", err)
} else { } else {
c.Logger.Info("reconnected") c.Logger.Info("reconnected")
c.ReconnectCh <- true
return nil return nil
} }

View File

@ -186,6 +186,8 @@ func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) {
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
case <-c.ReconnectCh:
t.Log("Reconnected")
case <-c.Quit: case <-c.Quit:
return return
} }