mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-27 19:51:41 +00:00
merge go-event-meter into tm-monitor (Refs #4)
https://github.com/tendermint/go-event-meter
This commit is contained in:
300
tm-monitor/eventmeter/eventmeter.go
Normal file
300
tm-monitor/eventmeter/eventmeter.go
Normal file
@ -0,0 +1,300 @@
|
||||
package eventmeter
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
metrics "github.com/rcrowley/go-metrics"
|
||||
events "github.com/tendermint/go-events"
|
||||
client "github.com/tendermint/go-rpc/client"
|
||||
log15 "github.com/tendermint/log15"
|
||||
)
|
||||
|
||||
// Log allows you to set your own logger.
|
||||
var Log log15.Logger
|
||||
|
||||
//------------------------------------------------------
|
||||
// Generic system to subscribe to events and record their frequency
|
||||
//------------------------------------------------------
|
||||
|
||||
//------------------------------------------------------
|
||||
// Meter for a particular event
|
||||
|
||||
// Closure to enable side effects from receiving an event
|
||||
type EventCallbackFunc func(em *EventMetric, data events.EventData)
|
||||
|
||||
// Metrics for a given event
|
||||
type EventMetric struct {
|
||||
ID string `json:"id"`
|
||||
Started time.Time `json:"start_time"`
|
||||
LastHeard time.Time `json:"last_heard"`
|
||||
MinDuration int64 `json:"min_duration"`
|
||||
MaxDuration int64 `json:"max_duration"`
|
||||
|
||||
// tracks event count and rate
|
||||
meter metrics.Meter
|
||||
|
||||
// filled in from the Meter
|
||||
Count int64 `json:"count"`
|
||||
Rate1 float64 `json:"rate_1" wire:"unsafe"`
|
||||
Rate5 float64 `json:"rate_5" wire:"unsafe"`
|
||||
Rate15 float64 `json:"rate_15" wire:"unsafe"`
|
||||
RateMean float64 `json:"rate_mean" wire:"unsafe"`
|
||||
|
||||
// so the event can have effects in the event-meter's consumer.
|
||||
// runs in a go routine
|
||||
callback EventCallbackFunc
|
||||
}
|
||||
|
||||
func (metric *EventMetric) Copy() *EventMetric {
|
||||
metric2 := *metric
|
||||
metric2.meter = metric.meter.Snapshot()
|
||||
return &metric2
|
||||
}
|
||||
|
||||
// called on GetMetric
|
||||
func (metric *EventMetric) fillMetric() *EventMetric {
|
||||
metric.Count = metric.meter.Count()
|
||||
metric.Rate1 = metric.meter.Rate1()
|
||||
metric.Rate5 = metric.meter.Rate5()
|
||||
metric.Rate15 = metric.meter.Rate15()
|
||||
metric.RateMean = metric.meter.RateMean()
|
||||
return metric
|
||||
}
|
||||
|
||||
//------------------------------------------------------
|
||||
// Websocket client and event meter for many events
|
||||
|
||||
const maxPingsPerPong = 30 // if we haven't received a pong in this many attempted pings we kill the conn
|
||||
|
||||
// Get the eventID and data out of the raw json received over the go-rpc websocket
|
||||
type EventUnmarshalFunc func(b json.RawMessage) (string, events.EventData, error)
|
||||
|
||||
// Closure to enable side effects from receiving a pong
|
||||
type LatencyCallbackFunc func(meanLatencyNanoSeconds float64)
|
||||
|
||||
// Closure to notify consumer that the connection died
|
||||
type DisconnectCallbackFunc func()
|
||||
|
||||
// Each node gets an event meter to track events for that node
|
||||
type EventMeter struct {
|
||||
wsc *client.WSClient
|
||||
|
||||
mtx sync.Mutex
|
||||
events map[string]*EventMetric
|
||||
|
||||
// to record ws latency
|
||||
timer metrics.Timer
|
||||
lastPing time.Time
|
||||
receivedPong bool
|
||||
latencyCallback LatencyCallbackFunc
|
||||
disconnectCallback DisconnectCallbackFunc
|
||||
|
||||
unmarshalEvent EventUnmarshalFunc
|
||||
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
func NewEventMeter(addr string, unmarshalEvent EventUnmarshalFunc) *EventMeter {
|
||||
em := &EventMeter{
|
||||
wsc: client.NewWSClient(addr, "/websocket"),
|
||||
events: make(map[string]*EventMetric),
|
||||
timer: metrics.NewTimer(),
|
||||
receivedPong: true,
|
||||
unmarshalEvent: unmarshalEvent,
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
return em
|
||||
}
|
||||
|
||||
func (em *EventMeter) String() string {
|
||||
return em.wsc.Address
|
||||
}
|
||||
|
||||
func (em *EventMeter) Start() error {
|
||||
if _, err := em.wsc.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
em.wsc.Conn.SetPongHandler(func(m string) error {
|
||||
// NOTE: https://github.com/gorilla/websocket/issues/97
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
em.receivedPong = true
|
||||
em.timer.UpdateSince(em.lastPing)
|
||||
if em.latencyCallback != nil {
|
||||
go em.latencyCallback(em.timer.Mean())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
go em.receiveRoutine()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (em *EventMeter) Stop() {
|
||||
<-em.quit
|
||||
|
||||
em.RegisterDisconnectCallback(nil) // so we don't try and reconnect
|
||||
em.wsc.Stop() // close(wsc.Quit)
|
||||
}
|
||||
|
||||
func (em *EventMeter) StopAndReconnect() {
|
||||
em.wsc.Stop()
|
||||
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
if em.disconnectCallback != nil {
|
||||
go em.disconnectCallback()
|
||||
}
|
||||
}
|
||||
|
||||
func (em *EventMeter) Subscribe(eventID string, cb EventCallbackFunc) error {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
|
||||
if _, ok := em.events[eventID]; ok {
|
||||
return fmt.Errorf("subscribtion already exists")
|
||||
}
|
||||
if err := em.wsc.Subscribe(eventID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
metric := &EventMetric{
|
||||
ID: eventID,
|
||||
Started: time.Now(),
|
||||
MinDuration: 1 << 62,
|
||||
meter: metrics.NewMeter(),
|
||||
callback: cb,
|
||||
}
|
||||
em.events[eventID] = metric
|
||||
return nil
|
||||
}
|
||||
|
||||
func (em *EventMeter) Unsubscribe(eventID string) error {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
if err := em.wsc.Unsubscribe(eventID); err != nil {
|
||||
return err
|
||||
}
|
||||
// XXX: should we persist or save this info first?
|
||||
delete(em.events, eventID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Fill in the latest data for an event and return a copy
|
||||
func (em *EventMeter) GetMetric(eventID string) (*EventMetric, error) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
metric, ok := em.events[eventID]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Unknown event %s", eventID)
|
||||
}
|
||||
return metric.fillMetric().Copy(), nil
|
||||
}
|
||||
|
||||
// Return the average latency over the websocket
|
||||
func (em *EventMeter) Latency() float64 {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
return em.timer.Mean()
|
||||
}
|
||||
|
||||
func (em *EventMeter) RegisterLatencyCallback(f LatencyCallbackFunc) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
em.latencyCallback = f
|
||||
}
|
||||
|
||||
func (em *EventMeter) RegisterDisconnectCallback(f DisconnectCallbackFunc) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
em.disconnectCallback = f
|
||||
}
|
||||
|
||||
//------------------------------------------------------
|
||||
|
||||
func (em *EventMeter) receiveRoutine() {
|
||||
pingTime := time.Second * 1
|
||||
pingTicker := time.NewTicker(pingTime)
|
||||
pingAttempts := 0 // if this hits maxPingsPerPong we kill the conn
|
||||
var err error
|
||||
for {
|
||||
select {
|
||||
case <-pingTicker.C:
|
||||
if pingAttempts, err = em.pingForLatency(pingAttempts); err != nil {
|
||||
Log.Error("Failed to write ping message on websocket", err)
|
||||
em.StopAndReconnect()
|
||||
return
|
||||
} else if pingAttempts >= maxPingsPerPong {
|
||||
Log.Error(fmt.Sprintf("Have not received a pong in %v", time.Duration(pingAttempts)*pingTime))
|
||||
em.StopAndReconnect()
|
||||
return
|
||||
}
|
||||
case r := <-em.wsc.ResultsCh:
|
||||
if r == nil {
|
||||
em.StopAndReconnect()
|
||||
return
|
||||
}
|
||||
eventID, data, err := em.unmarshalEvent(r)
|
||||
if err != nil {
|
||||
Log.Error(err.Error())
|
||||
continue
|
||||
}
|
||||
if eventID != "" {
|
||||
em.updateMetric(eventID, data)
|
||||
}
|
||||
case <-em.wsc.Quit:
|
||||
em.StopAndReconnect()
|
||||
return
|
||||
case <-em.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (em *EventMeter) pingForLatency(pingAttempts int) (int, error) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
|
||||
// ping to record latency
|
||||
if !em.receivedPong {
|
||||
return pingAttempts + 1, nil
|
||||
}
|
||||
|
||||
em.lastPing = time.Now()
|
||||
em.receivedPong = false
|
||||
err := em.wsc.Conn.WriteMessage(websocket.PingMessage, []byte{})
|
||||
if err != nil {
|
||||
return pingAttempts, err
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (em *EventMeter) updateMetric(eventID string, data events.EventData) {
|
||||
em.mtx.Lock()
|
||||
defer em.mtx.Unlock()
|
||||
|
||||
metric, ok := em.events[eventID]
|
||||
if !ok {
|
||||
// we already unsubscribed, or got an unexpected event
|
||||
return
|
||||
}
|
||||
|
||||
last := metric.LastHeard
|
||||
metric.LastHeard = time.Now()
|
||||
metric.meter.Mark(1)
|
||||
dur := int64(metric.LastHeard.Sub(last))
|
||||
if dur < metric.MinDuration {
|
||||
metric.MinDuration = dur
|
||||
}
|
||||
if !last.IsZero() && dur > metric.MaxDuration {
|
||||
metric.MaxDuration = dur
|
||||
}
|
||||
|
||||
if metric.callback != nil {
|
||||
go metric.callback(metric.Copy(), data)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user