mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-02 16:11:21 +00:00
backlog must always have higher priority
This commit is contained in:
parent
797acbe911
commit
236489aecf
@ -247,6 +247,24 @@ func (c *WSClient) startReadWriteRoutines() {
|
|||||||
go c.writeRoutine()
|
go c.writeRoutine()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *WSClient) processBacklog() error {
|
||||||
|
select {
|
||||||
|
case request := <-c.backlog:
|
||||||
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
|
err := c.conn.WriteJSON(request)
|
||||||
|
if err != nil {
|
||||||
|
c.Logger.Error("failed to resend request", "err", err)
|
||||||
|
c.reconnectAfter <- err
|
||||||
|
// requeue request
|
||||||
|
c.backlog <- request
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.Logger.Info("resend a request", "req", request)
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *WSClient) reconnectRoutine() {
|
func (c *WSClient) reconnectRoutine() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -268,7 +286,10 @@ func (c *WSClient) reconnectRoutine() {
|
|||||||
break LOOP
|
break LOOP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
c.startReadWriteRoutines()
|
err = c.processBacklog()
|
||||||
|
if err == nil {
|
||||||
|
c.startReadWriteRoutines()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
case <-c.Quit:
|
case <-c.Quit:
|
||||||
return
|
return
|
||||||
@ -288,17 +309,6 @@ func (c *WSClient) writeRoutine() {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case request := <-c.backlog:
|
|
||||||
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
||||||
err := c.conn.WriteJSON(request)
|
|
||||||
if err != nil {
|
|
||||||
c.Logger.Error("failed to resend request", "err", err)
|
|
||||||
c.reconnectAfter <- err
|
|
||||||
// add request to the backlog, so we don't lose it
|
|
||||||
c.backlog <- request
|
|
||||||
return
|
|
||||||
}
|
|
||||||
c.Logger.Info("resend a request", "req", request)
|
|
||||||
case request := <-c.send:
|
case request := <-c.send:
|
||||||
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||||
err := c.conn.WriteJSON(request)
|
err := c.conn.WriteJSON(request)
|
||||||
|
@ -138,7 +138,10 @@ func TestWSClientReconnectFailure(t *testing.T) {
|
|||||||
s.Close()
|
s.Close()
|
||||||
|
|
||||||
// results in WS write error
|
// results in WS write error
|
||||||
call(t, "a", c)
|
// provide timeout to avoid blocking
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
|
defer cancel()
|
||||||
|
c.Call(ctx, "a", make(map[string]interface{}))
|
||||||
|
|
||||||
// expect to reconnect almost immediately
|
// expect to reconnect almost immediately
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user