[tm-monitor] call latency callback in a separate goroutine

This commit is contained in:
Anton Kaliaev
2017-08-08 15:44:16 -04:00
parent 081bd0805e
commit 42f58ceb4b
2 changed files with 22 additions and 22 deletions

View File

@ -152,7 +152,7 @@ func (t *transacter) sendLoop(connIndex int) {
Params: &rawParamsJson, Params: &rawParamsJson,
}) })
if err != nil { if err != nil {
fmt.Printf("%v. Try increasing the connections count and reducing the rate.\n", errors.Wrap(err, "txs send failed")) fmt.Printf("%v. Try reducing the connections count and increasing the rate.\n", errors.Wrap(err, "txs send failed"))
os.Exit(1) os.Exit(1)
} }
@ -163,7 +163,7 @@ func (t *transacter) sendLoop(connIndex int) {
time.Sleep(time.Second - timeToSend) time.Sleep(time.Second - timeToSend)
logger.Info(fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend) logger.Info(fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend)
case <-pingsTicker.C: case <-pingsTicker.C:
// Right now go-rpc server closes the connection in the absence of pings // go-rpc server closes the connection in the absence of pings
c.SetWriteDeadline(time.Now().Add(sendTimeout)) c.SetWriteDeadline(time.Now().Add(sendTimeout))
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil { if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
logger.Error("failed to write ping message", "err", err) logger.Error("failed to write ping message", "err", err)

View File

@ -140,18 +140,6 @@ func (em *EventMeter) Stop() {
} }
} }
// StopAndCallDisconnectCallback stops the EventMeter and calls
// disconnectCallback if present.
func (em *EventMeter) StopAndCallDisconnectCallback() {
em.Stop()
em.mtx.Lock()
defer em.mtx.Unlock()
if em.disconnectCallback != nil {
go em.disconnectCallback()
}
}
// Subscribe for the given event type. Callback function will be called upon // Subscribe for the given event type. Callback function will be called upon
// receiving an event. // receiving an event.
func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error { func (em *EventMeter) Subscribe(eventType string, cb EventCallbackFunc) error {
@ -245,11 +233,9 @@ func (em *EventMeter) receiveRoutine() {
} }
case <-latencyTicker.C: case <-latencyTicker.C:
if em.wsc.IsActive() { if em.wsc.IsActive() {
em.latencyCallback(em.wsc.PingPongLatencyTimer.Mean()) em.callLatencyCallback(em.wsc.PingPongLatencyTimer.Mean())
} }
case <-em.wsc.Quit: case <-em.wsc.Quit:
em.logger.Error("WebSocket client closed unexpectedly")
em.StopAndCallDisconnectCallback()
return return
case <-em.quit: case <-em.quit:
return return
@ -263,16 +249,14 @@ func (em *EventMeter) disconnectRoutine() {
select { select {
case <-ticker.C: case <-ticker.C:
if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once if em.wsc.IsReconnecting() && em.subscribed { // notify user about disconnect only once
em.mtx.Lock() em.callDisconnectCallback()
if em.disconnectCallback != nil {
go em.disconnectCallback()
}
em.mtx.Unlock()
em.subscribed = false em.subscribed = false
} else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe } else if !em.wsc.IsReconnecting() && !em.subscribed { // resubscribe
em.subscribe() em.subscribe()
em.subscribed = true em.subscribed = true
} }
case <-em.wsc.Quit:
return
case <-em.quit: case <-em.quit:
return return
} }
@ -304,3 +288,19 @@ func (em *EventMeter) updateMetric(eventType string, data events.EventData) {
go metric.callback(metric.Copy(), data) go metric.callback(metric.Copy(), data)
} }
} }
func (em *EventMeter) callDisconnectCallback() {
em.mtx.Lock()
if em.disconnectCallback != nil {
go em.disconnectCallback()
}
em.mtx.Unlock()
}
func (em *EventMeter) callLatencyCallback(meanLatencyNanoSeconds float64) {
em.mtx.Lock()
if em.latencyCallback != nil {
go em.latencyCallback(meanLatencyNanoSeconds)
}
em.mtx.Unlock()
}