mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-25 02:31:46 +00:00
Fix WSClient blocking in the readRoutine after Stop() as it tries to write to ResultsCh
This commit is contained in:
@ -52,6 +52,7 @@ type WSClient struct {
|
|||||||
backlog chan types.RPCRequest // stores a single user request received during a conn failure
|
backlog chan types.RPCRequest // stores a single user request received during a conn failure
|
||||||
reconnectAfter chan error // reconnect requests
|
reconnectAfter chan error // reconnect requests
|
||||||
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
|
readRoutineQuit chan struct{} // a way for readRoutine to close writeRoutine
|
||||||
|
writeRoutineQuit chan struct{} // a way for writeRoutine to close readRoutine (on <-BaseService.Quit)
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
|
||||||
@ -282,6 +283,7 @@ func (c *WSClient) reconnect() error {
|
|||||||
func (c *WSClient) startReadWriteRoutines() {
|
func (c *WSClient) startReadWriteRoutines() {
|
||||||
c.wg.Add(2)
|
c.wg.Add(2)
|
||||||
c.readRoutineQuit = make(chan struct{})
|
c.readRoutineQuit = make(chan struct{})
|
||||||
|
c.writeRoutineQuit = make(chan struct{})
|
||||||
go c.readRoutine()
|
go c.readRoutine()
|
||||||
go c.writeRoutine()
|
go c.writeRoutine()
|
||||||
}
|
}
|
||||||
@ -387,6 +389,9 @@ func (c *WSClient) writeRoutine() {
|
|||||||
case <-c.readRoutineQuit:
|
case <-c.readRoutineQuit:
|
||||||
return
|
return
|
||||||
case <-c.Quit:
|
case <-c.Quit:
|
||||||
|
// We need to fan out the quit message from the single BaseService Quit Channel to the readRoutine
|
||||||
|
// Use a non-blocking close rather than a send in case readRoutine is in the process of quitting
|
||||||
|
close(c.writeRoutineQuit)
|
||||||
c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -441,7 +446,12 @@ func (c *WSClient) readRoutine() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c.Logger.Info("got response", "resp", response.Result)
|
c.Logger.Info("got response", "resp", response.Result)
|
||||||
c.ResultsCh <- *response.Result
|
// Combine a non-blocking read on writeRoutineQuit with a non-blocking write on ResultsCh to avoid blocking
|
||||||
|
// c.wg.Wait() in c.Stop()
|
||||||
|
select {
|
||||||
|
case <-c.writeRoutineQuit:
|
||||||
|
case c.ResultsCh <- *response.Result:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,6 +162,29 @@ func TestWSClientReconnectFailure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNotBlockingOnStop(t *testing.T) {
|
||||||
|
timeout := 2 *time.Second
|
||||||
|
s := httptest.NewServer(&myHandler{})
|
||||||
|
c := startClient(t, s.Listener.Addr())
|
||||||
|
c.Call(context.Background(), "a", make(map[string]interface{}))
|
||||||
|
// Let the readRoutine get around to blocking
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
passCh := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
// Unless we have a non-blocking write to ResultsCh from readRoutine
|
||||||
|
// this blocks forever ont the waitgroup
|
||||||
|
c.Stop()
|
||||||
|
passCh <- struct{}{}
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-passCh:
|
||||||
|
// Pass
|
||||||
|
case <-time.After(timeout):
|
||||||
|
t.Fatalf("WSClient did failed to stop within %v seconds - is one of the read/write routines blocking?",
|
||||||
|
timeout.Seconds())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func startClient(t *testing.T, addr net.Addr) *WSClient {
|
func startClient(t *testing.T, addr net.Addr) *WSClient {
|
||||||
c := NewWSClient(addr.String(), "/websocket")
|
c := NewWSClient(addr.String(), "/websocket")
|
||||||
_, err := c.Start()
|
_, err := c.Start()
|
||||||
|
Reference in New Issue
Block a user