From f9a5f17f91297a77890e882afdbb2d64e5a42999 Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Sat, 3 Aug 2019 08:32:52 +0200 Subject: [PATCH] Add metrics --- blockchain/v2/metrics.go | 124 +++++++++++++++++++++++++++++++++++++++ blockchain/v2/routine.go | 28 +++++++-- 2 files changed, 146 insertions(+), 6 deletions(-) create mode 100644 blockchain/v2/metrics.go diff --git a/blockchain/v2/metrics.go b/blockchain/v2/metrics.go new file mode 100644 index 00000000..d865e736 --- /dev/null +++ b/blockchain/v2/metrics.go @@ -0,0 +1,124 @@ +package v2 + +import ( + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/discard" + "github.com/go-kit/kit/metrics/prometheus" + stdprometheus "github.com/prometheus/client_golang/prometheus" +) + +const ( + // MetricsSubsystem is a subsystem shared by all metrics exposed by this + // package. + MetricsSubsystem = "blockchain" +) + +// Metrics contains metrics exposed by this package. +type Metrics struct { + // events_in + EventsIn metrics.Counter + // events_in + EventsHandled metrics.Counter + // events_out + EventsOut metrics.Counter + // errors_in + ErrorsIn metrics.Counter + // errors_handled + ErrorsHandled metrics.Counter + // errors_out + ErrorsOut metrics.Counter + // events_shed + EventsShed metrics.Counter + // events_sent + EventsSent metrics.Counter + // errors_sent + ErrorsSent metrics.Counter + // errors_shed + ErrorsShed metrics.Counter +} + +// Can we burn in the routine name here? +func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { + labels := []string{} + for i := 0; i < len(labelsAndValues); i += 2 { + labels = append(labels, labelsAndValues[i]) + } + return &Metrics{ + EventsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_in", + Help: "Events read from the channel.", + }, labels).With(labelsAndValues...), + EventsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_handled", + Help: "Events handled", + }, labels).With(labelsAndValues...), + EventsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_out", + Help: "Events output from routine.", + }, labels).With(labelsAndValues...), + ErrorsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_in", + Help: "Errors read from the channel.", + }, labels).With(labelsAndValues...), + ErrorsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_handled", + Help: "Errors handled.", + }, labels).With(labelsAndValues...), + ErrorsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_out", + Help: "Errors output from routine.", + }, labels).With(labelsAndValues...), + ErrorsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_sent", + Help: "Errors sent to routine.", + }, labels).With(labelsAndValues...), + ErrorsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "errors_shed", + Help: "Errors dropped from sending.", + }, labels).With(labelsAndValues...), + EventsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_sent", + Help: "Events sent to routine.", + }, labels).With(labelsAndValues...), + EventsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: namespace, + Subsystem: MetricsSubsystem, + Name: "events_shed", + Help: "Events dropped from sending.", + }, labels).With(labelsAndValues...), + } +} + +// NopMetrics returns no-op Metrics. +func NopMetrics() *Metrics { + return &Metrics{ + EventsIn: discard.NewCounter(), + EventsHandled: discard.NewCounter(), + EventsOut: discard.NewCounter(), + ErrorsIn: discard.NewCounter(), + ErrorsHandled: discard.NewCounter(), + ErrorsOut: discard.NewCounter(), + EventsShed: discard.NewCounter(), + EventsSent: discard.NewCounter(), + ErrorsSent: discard.NewCounter(), + ErrorsShed: discard.NewCounter(), + } +} diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index b346b0af..31b91892 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -8,23 +8,25 @@ import ( ) // TODO -// metrics -// revisit panic conditions -// audit log levels -// maybe routine should be an interface and the concret tpe should be handlerRoutine +// * revisit panic conditions +// * audit log levels +// * maybe routine should be an interface and the concret tpe should be handlerRoutine +// Adding Metrics +// we need a metrics definition type handleFunc = func(event Event) (Events, error) type Routine struct { name string input chan Event errors chan error - logger log.Logger output chan Event stopped chan struct{} finished chan error running *uint32 handle handleFunc + logger log.Logger + metrics *Metrics } func newRoutine(name string, output chan Event, handleFunc handleFunc) *Routine { @@ -38,6 +40,7 @@ func newRoutine(name string, output chan Event, handleFunc handleFunc) *Routine finished: make(chan error, 1), running: new(uint32), logger: log.NewNopLogger(), + metrics: NopMetrics(), } } @@ -45,6 +48,10 @@ func (rt *Routine) setLogger(logger log.Logger) { rt.logger = logger } +func (rt *Routine) setMetrics(metrics *Metrics) { + rt.metrics = metrics +} + func (rt *Routine) run() { rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) @@ -58,6 +65,7 @@ func (rt *Routine) run() { } select { case iEvent, ok := <-rt.input: + rt.metrics.EventsIn.With("routine", rt.name).Add(1) if !ok { if !errorsDrained { continue // wait for errors to be drainned @@ -67,27 +75,31 @@ func (rt *Routine) run() { return } oEvents, err := rt.handle(iEvent) + rt.metrics.EventsHandled.With("routine", rt.name).Add(1) if err != nil { rt.terminate(err) return } - + rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents))) rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents))) for _, event := range oEvents { rt.logger.Info(fmt.Sprintln("writting back to output")) rt.output <- event } case iEvent, ok := <-rt.errors: + rt.metrics.ErrorsIn.With("routine", rt.name).Add(1) if !ok { rt.logger.Info(fmt.Sprintf("%s: errors closed\n", rt.name)) errorsDrained = true continue } oEvents, err := rt.handle(iEvent) + rt.metrics.ErrorsHandled.With("routine", rt.name).Add(1) if err != nil { rt.terminate(err) return } + rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents))) for _, event := range oEvents { rt.output <- event } @@ -104,16 +116,20 @@ func (rt *Routine) send(event Event) bool { if err, ok := event.(error); ok { select { case rt.errors <- err: + rt.metrics.ErrorsSent.With("routine", rt.name).Add(1) return true default: + rt.metrics.ErrorsShed.With("routine", rt.name).Add(1) rt.logger.Info(fmt.Sprintf("%s: errors channel was full\n", rt.name)) return false } } else { select { case rt.input <- event: + rt.metrics.EventsSent.With("routine", rt.name).Add(1) return true default: + rt.metrics.EventsShed.With("routine", rt.name).Add(1) rt.logger.Info(fmt.Sprintf("%s: channel was full\n", rt.name)) return false }