2019-07-25 16:02:47 +02:00
|
|
|
package v2
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2019-07-31 13:58:21 +02:00
|
|
|
"sync/atomic"
|
2019-08-02 10:58:39 +02:00
|
|
|
|
|
|
|
"github.com/tendermint/tendermint/libs/log"
|
2019-07-25 16:02:47 +02:00
|
|
|
)
|
|
|
|
|
2019-07-30 12:17:51 +02:00
|
|
|
// TODO
|
2019-08-03 08:32:52 +02:00
|
|
|
// * revisit panic conditions
|
|
|
|
// * audit log levels
|
|
|
|
// * maybe routine should be an interface and the concret tpe should be handlerRoutine
|
2019-07-30 12:17:51 +02:00
|
|
|
|
2019-08-03 08:32:52 +02:00
|
|
|
// Adding Metrics
|
|
|
|
// we need a metrics definition
|
2019-07-31 13:58:21 +02:00
|
|
|
type handleFunc = func(event Event) (Events, error)
|
2019-07-27 10:58:58 +02:00
|
|
|
|
2019-07-29 08:45:59 +02:00
|
|
|
type Routine struct {
|
2019-07-29 17:56:56 +02:00
|
|
|
name string
|
|
|
|
input chan Event
|
|
|
|
errors chan error
|
|
|
|
output chan Event
|
|
|
|
stopped chan struct{}
|
2019-07-31 13:58:21 +02:00
|
|
|
finished chan error
|
|
|
|
running *uint32
|
2019-07-29 17:56:56 +02:00
|
|
|
handle handleFunc
|
2019-08-03 08:32:52 +02:00
|
|
|
logger log.Logger
|
|
|
|
metrics *Metrics
|
2019-07-25 16:02:47 +02:00
|
|
|
}
|
|
|
|
|
2019-08-03 07:50:58 +02:00
|
|
|
func newRoutine(name string, output chan Event, handleFunc handleFunc) *Routine {
|
2019-07-29 08:45:59 +02:00
|
|
|
return &Routine{
|
2019-07-29 17:56:56 +02:00
|
|
|
name: name,
|
|
|
|
input: make(chan Event, 1),
|
2019-08-02 10:58:39 +02:00
|
|
|
handle: handleFunc,
|
2019-07-29 17:56:56 +02:00
|
|
|
errors: make(chan error, 1),
|
|
|
|
output: output,
|
|
|
|
stopped: make(chan struct{}, 1),
|
2019-07-31 13:58:21 +02:00
|
|
|
finished: make(chan error, 1),
|
|
|
|
running: new(uint32),
|
2019-08-03 07:50:58 +02:00
|
|
|
logger: log.NewNopLogger(),
|
2019-08-03 08:32:52 +02:00
|
|
|
metrics: NopMetrics(),
|
2019-07-27 10:58:58 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-03 07:50:58 +02:00
|
|
|
func (rt *Routine) setLogger(logger log.Logger) {
|
|
|
|
rt.logger = logger
|
|
|
|
}
|
|
|
|
|
2019-08-03 08:32:52 +02:00
|
|
|
func (rt *Routine) setMetrics(metrics *Metrics) {
|
|
|
|
rt.metrics = metrics
|
|
|
|
}
|
|
|
|
|
2019-07-29 08:45:59 +02:00
|
|
|
func (rt *Routine) run() {
|
2019-08-02 10:58:39 +02:00
|
|
|
rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name))
|
2019-07-31 13:58:21 +02:00
|
|
|
starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1))
|
|
|
|
if !starting {
|
|
|
|
panic("Routine has already started")
|
|
|
|
}
|
|
|
|
errorsDrained := false
|
2019-07-25 16:02:47 +02:00
|
|
|
for {
|
2019-07-31 13:58:21 +02:00
|
|
|
if !rt.isRunning() {
|
|
|
|
break
|
|
|
|
}
|
2019-07-29 12:40:59 +02:00
|
|
|
select {
|
|
|
|
case iEvent, ok := <-rt.input:
|
2019-08-03 08:32:52 +02:00
|
|
|
rt.metrics.EventsIn.With("routine", rt.name).Add(1)
|
2019-07-29 12:40:59 +02:00
|
|
|
if !ok {
|
2019-07-31 13:58:21 +02:00
|
|
|
if !errorsDrained {
|
|
|
|
continue // wait for errors to be drainned
|
|
|
|
}
|
2019-08-02 10:58:39 +02:00
|
|
|
rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name))
|
2019-07-29 12:40:59 +02:00
|
|
|
rt.stopped <- struct{}{}
|
2019-07-29 12:47:43 +02:00
|
|
|
return
|
2019-07-29 12:40:59 +02:00
|
|
|
}
|
2019-07-31 13:58:21 +02:00
|
|
|
oEvents, err := rt.handle(iEvent)
|
2019-08-03 08:32:52 +02:00
|
|
|
rt.metrics.EventsHandled.With("routine", rt.name).Add(1)
|
2019-07-31 13:58:21 +02:00
|
|
|
if err != nil {
|
|
|
|
rt.terminate(err)
|
|
|
|
return
|
|
|
|
}
|
2019-08-03 08:32:52 +02:00
|
|
|
rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents)))
|
2019-08-02 10:58:39 +02:00
|
|
|
rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents)))
|
2019-07-29 12:40:59 +02:00
|
|
|
for _, event := range oEvents {
|
2019-08-02 10:58:39 +02:00
|
|
|
rt.logger.Info(fmt.Sprintln("writting back to output"))
|
2019-07-29 12:40:59 +02:00
|
|
|
rt.output <- event
|
|
|
|
}
|
|
|
|
case iEvent, ok := <-rt.errors:
|
2019-08-03 08:32:52 +02:00
|
|
|
rt.metrics.ErrorsIn.With("routine", rt.name).Add(1)
|
2019-07-29 12:40:59 +02:00
|
|
|
if !ok {
|
2019-08-02 10:58:39 +02:00
|
|
|
rt.logger.Info(fmt.Sprintf("%s: errors closed\n", rt.name))
|
2019-07-31 13:58:21 +02:00
|
|
|
errorsDrained = true
|
2019-07-29 12:40:59 +02:00
|
|
|
continue
|
|
|
|
}
|
2019-07-31 13:58:21 +02:00
|
|
|
oEvents, err := rt.handle(iEvent)
|
2019-08-03 08:32:52 +02:00
|
|
|
rt.metrics.ErrorsHandled.With("routine", rt.name).Add(1)
|
2019-07-31 13:58:21 +02:00
|
|
|
if err != nil {
|
|
|
|
rt.terminate(err)
|
|
|
|
return
|
|
|
|
}
|
2019-08-03 08:32:52 +02:00
|
|
|
rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents)))
|
2019-07-29 12:40:59 +02:00
|
|
|
for _, event := range oEvents {
|
|
|
|
rt.output <- event
|
|
|
|
}
|
2019-07-27 00:25:32 +02:00
|
|
|
}
|
2019-07-25 16:02:47 +02:00
|
|
|
}
|
|
|
|
}
|
2019-07-29 17:56:56 +02:00
|
|
|
func (rt *Routine) feedback() {
|
|
|
|
for event := range rt.output {
|
|
|
|
rt.send(event)
|
|
|
|
}
|
|
|
|
}
|
2019-07-25 16:02:47 +02:00
|
|
|
|
2019-07-29 08:45:59 +02:00
|
|
|
func (rt *Routine) send(event Event) bool {
|
2019-07-29 17:56:56 +02:00
|
|
|
if err, ok := event.(error); ok {
|
2019-07-29 12:40:59 +02:00
|
|
|
select {
|
|
|
|
case rt.errors <- err:
|
2019-08-03 08:32:52 +02:00
|
|
|
rt.metrics.ErrorsSent.With("routine", rt.name).Add(1)
|
2019-07-29 12:40:59 +02:00
|
|
|
return true
|
|
|
|
default:
|
2019-08-03 08:32:52 +02:00
|
|
|
rt.metrics.ErrorsShed.With("routine", rt.name).Add(1)
|
2019-08-02 10:58:39 +02:00
|
|
|
rt.logger.Info(fmt.Sprintf("%s: errors channel was full\n", rt.name))
|
2019-07-29 12:40:59 +02:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
select {
|
|
|
|
case rt.input <- event:
|
2019-08-03 08:32:52 +02:00
|
|
|
rt.metrics.EventsSent.With("routine", rt.name).Add(1)
|
2019-07-29 12:40:59 +02:00
|
|
|
return true
|
|
|
|
default:
|
2019-08-03 08:32:52 +02:00
|
|
|
rt.metrics.EventsShed.With("routine", rt.name).Add(1)
|
2019-08-02 10:58:39 +02:00
|
|
|
rt.logger.Info(fmt.Sprintf("%s: channel was full\n", rt.name))
|
2019-07-29 12:40:59 +02:00
|
|
|
return false
|
|
|
|
}
|
2019-07-27 00:25:32 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-07-31 13:58:21 +02:00
|
|
|
func (rt *Routine) isRunning() bool {
|
|
|
|
return atomic.LoadUint32(rt.running) == 1
|
|
|
|
}
|
|
|
|
|
|
|
|
// rename flush?
|
2019-07-29 08:45:59 +02:00
|
|
|
func (rt *Routine) stop() {
|
2019-08-02 10:58:39 +02:00
|
|
|
// XXX: what if already stopped?
|
|
|
|
rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name))
|
2019-07-29 08:45:59 +02:00
|
|
|
close(rt.input)
|
2019-07-31 13:58:21 +02:00
|
|
|
close(rt.errors)
|
2019-08-02 10:58:39 +02:00
|
|
|
<-rt.stopped
|
2019-07-31 13:58:21 +02:00
|
|
|
rt.terminate(fmt.Errorf("routine stopped"))
|
|
|
|
}
|
|
|
|
|
|
|
|
func (rt *Routine) terminate(reason error) {
|
|
|
|
stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0))
|
|
|
|
if !stopped {
|
|
|
|
panic("called stop but already stopped")
|
|
|
|
}
|
|
|
|
rt.finished <- reason
|
2019-07-29 08:45:59 +02:00
|
|
|
}
|
|
|
|
|
2019-07-29 17:56:56 +02:00
|
|
|
// XXX: this should probably produced the finished
|
|
|
|
// channel and let the caller deicde how long to wait
|
2019-07-31 13:58:21 +02:00
|
|
|
func (rt *Routine) wait() error {
|
|
|
|
return <-rt.finished
|
2019-07-29 17:56:56 +02:00
|
|
|
}
|