Compare commits

...

12 Commits

Author SHA1 Message Date
Sean Braithwaite
9d41770a99 Close rdy channel
+ close `rdy` channel to ensure that calls to `<-ready()` will
    always return if the routine is ready
2019-08-21 21:37:53 +02:00
Sean Braithwaite
f81c319ece Add some docs 2019-08-13 19:29:28 +02:00
Sean Braithwaite
78d4c3b88a fixes based on feedback 2019-08-13 17:57:17 +02:00
Sean Braithwaite
2c8cbfc26a linter fixes 2019-08-08 17:42:46 +02:00
Sean Braithwaite
acbfe67fb8 set logger 2019-08-08 16:56:39 +02:00
Sean Braithwaite
aeac4743cc typo fix 2019-08-08 16:54:25 +02:00
Sean Braithwaite
e826ca3c49 demuxer cleanup 2019-08-08 16:48:07 +02:00
Sean Braithwaite
5b880fbcff cleanup events 2019-08-08 15:53:02 +02:00
Sean Braithwaite
c081b60ef6 Solidify API:
+ use `trySend` the replicate peer sending
    + expose `next()` as a chan of events as output
    + expose `final()` as a chan of error, for the final error
    + add `ready()` as chan struct when routine is ready
2019-08-08 15:12:11 +02:00
Sean Braithwaite
e4913f533a Fix race condition in shutdown:
+ ensure that we stop accepting messages once `stop` has been called
      to avoid the case in which we attempt to write to a channel which
      has already been closed
2019-08-06 18:00:14 +02:00
Sean Braithwaite
0cf9f86292 Modification based on feedback
+ `routine.send` returns false when routine is not running
    + this will prevent panics sending to channels which have been
    closed
    + Make output channels routine specific removing the risk of someone
    writting to a channel which was closed by another touine.
    + consistency changes between the routines and the demuxer
2019-08-06 13:27:15 +02:00
Sean Braithwaite
d1671d6175 blockchain v2: routines
+ Include an implementaiton of the routines specified in ADR-43
    along with a demuxer and some dummy reactor code
2019-08-03 09:19:32 +02:00
7 changed files with 748 additions and 0 deletions

166
blockchain/v2/demuxer.go Normal file
View File

@@ -0,0 +1,166 @@
// nolint:unused
package v2
import (
"fmt"
"sync/atomic"
"github.com/tendermint/tendermint/libs/log"
)
type scFull struct{}
type pcFull struct{}
const demuxerBufferSize = 10
type demuxer struct {
input chan Event
scheduler *Routine
processor *Routine
fin chan error
stopped chan struct{}
rdy chan struct{}
running *uint32
stopping *uint32
logger log.Logger
}
func newDemuxer(scheduler *Routine, processor *Routine) *demuxer {
return &demuxer{
input: make(chan Event, demuxerBufferSize),
scheduler: scheduler,
processor: processor,
stopped: make(chan struct{}, 1),
fin: make(chan error, 1),
rdy: make(chan struct{}, 1),
running: new(uint32),
stopping: new(uint32),
logger: log.NewNopLogger(),
}
}
func (dm *demuxer) setLogger(logger log.Logger) {
dm.logger = logger
}
func (dm *demuxer) start() {
starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1))
if !starting {
panic("Routine has already started")
}
dm.logger.Info("demuxer: run")
dm.rdy <- struct{}{}
for {
if !dm.isRunning() {
break
}
select {
case event, ok := <-dm.input:
if !ok {
dm.logger.Info("demuxer: stopping")
dm.terminate(fmt.Errorf("stopped"))
dm.stopped <- struct{}{}
return
}
oEvents, err := dm.handle(event)
if err != nil {
dm.terminate(err)
return
}
for _, event := range oEvents {
dm.input <- event
}
case event, ok := <-dm.scheduler.next():
if !ok {
dm.logger.Info("demuxer: scheduler output closed")
continue
}
oEvents, err := dm.handle(event)
if err != nil {
dm.terminate(err)
return
}
for _, event := range oEvents {
dm.input <- event
}
case event, ok := <-dm.processor.next():
if !ok {
dm.logger.Info("demuxer: processor output closed")
continue
}
oEvents, err := dm.handle(event)
if err != nil {
dm.terminate(err)
return
}
for _, event := range oEvents {
dm.input <- event
}
}
}
}
func (dm *demuxer) handle(event Event) (Events, error) {
received := dm.scheduler.trySend(event)
if !received {
return Events{scFull{}}, nil // backpressure
}
received = dm.processor.trySend(event)
if !received {
return Events{pcFull{}}, nil // backpressure
}
return Events{}, nil
}
func (dm *demuxer) trySend(event Event) bool {
if !dm.isRunning() || dm.isStopping() {
dm.logger.Info("dummuxer isn't running")
return false
}
select {
case dm.input <- event:
return true
default:
dm.logger.Info("demuxer channel was full")
return false
}
}
func (dm *demuxer) isRunning() bool {
return atomic.LoadUint32(dm.running) == 1
}
func (dm *demuxer) isStopping() bool {
return atomic.LoadUint32(dm.stopping) == 1
}
func (dm *demuxer) ready() chan struct{} {
return dm.rdy
}
func (dm *demuxer) stop() {
if !dm.isRunning() {
return
}
stopping := atomic.CompareAndSwapUint32(dm.stopping, uint32(0), uint32(1))
if !stopping {
panic("Demuxer has already stopped")
}
dm.logger.Info("demuxer stop")
close(dm.input)
<-dm.stopped
}
func (dm *demuxer) terminate(reason error) {
stopped := atomic.CompareAndSwapUint32(dm.running, uint32(1), uint32(0))
if !stopped {
panic("called terminate but already terminated")
}
dm.fin <- reason
}
func (dm *demuxer) final() chan error {
return dm.fin
}

124
blockchain/v2/metrics.go Normal file
View File

@@ -0,0 +1,124 @@
package v2
import (
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "blockchain"
)
// Metrics contains metrics exposed by this package.
type Metrics struct {
// events_in
EventsIn metrics.Counter
// events_in
EventsHandled metrics.Counter
// events_out
EventsOut metrics.Counter
// errors_in
ErrorsIn metrics.Counter
// errors_handled
ErrorsHandled metrics.Counter
// errors_out
ErrorsOut metrics.Counter
// events_shed
EventsShed metrics.Counter
// events_sent
EventsSent metrics.Counter
// errors_sent
ErrorsSent metrics.Counter
// errors_shed
ErrorsShed metrics.Counter
}
// Can we burn in the routine name here?
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
labels := []string{}
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
return &Metrics{
EventsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "events_in",
Help: "Events read from the channel.",
}, labels).With(labelsAndValues...),
EventsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "events_handled",
Help: "Events handled",
}, labels).With(labelsAndValues...),
EventsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "events_out",
Help: "Events output from routine.",
}, labels).With(labelsAndValues...),
ErrorsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "errors_in",
Help: "Errors read from the channel.",
}, labels).With(labelsAndValues...),
ErrorsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "errors_handled",
Help: "Errors handled.",
}, labels).With(labelsAndValues...),
ErrorsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "errors_out",
Help: "Errors output from routine.",
}, labels).With(labelsAndValues...),
ErrorsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "errors_sent",
Help: "Errors sent to routine.",
}, labels).With(labelsAndValues...),
ErrorsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "errors_shed",
Help: "Errors dropped from sending.",
}, labels).With(labelsAndValues...),
EventsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "events_sent",
Help: "Events sent to routine.",
}, labels).With(labelsAndValues...),
EventsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "events_shed",
Help: "Events dropped from sending.",
}, labels).With(labelsAndValues...),
}
}
// NopMetrics returns no-op Metrics.
func NopMetrics() *Metrics {
return &Metrics{
EventsIn: discard.NewCounter(),
EventsHandled: discard.NewCounter(),
EventsOut: discard.NewCounter(),
ErrorsIn: discard.NewCounter(),
ErrorsHandled: discard.NewCounter(),
ErrorsOut: discard.NewCounter(),
EventsShed: discard.NewCounter(),
EventsSent: discard.NewCounter(),
ErrorsSent: discard.NewCounter(),
ErrorsShed: discard.NewCounter(),
}
}

97
blockchain/v2/reactor.go Normal file
View File

@@ -0,0 +1,97 @@
package v2
import (
"fmt"
"time"
"github.com/tendermint/tendermint/libs/log"
)
type timeCheck struct {
time time.Time
}
func schedulerHandle(event Event) (Events, error) {
switch event.(type) {
case timeCheck:
fmt.Println("scheduler handle timeCheck")
case Event:
fmt.Println("scheduler handle testEvent")
}
return Events{}, nil
}
func processorHandle(event Event) (Events, error) {
switch event.(type) {
case timeCheck:
fmt.Println("processor handle timeCheck")
case Event:
fmt.Println("processor handle event")
}
return Events{}, nil
}
type Reactor struct {
demuxer *demuxer
scheduler *Routine
processor *Routine
ticker *time.Ticker
}
// nolint:unused
func (r *Reactor) setLogger(logger log.Logger) {
r.scheduler.setLogger(logger)
r.processor.setLogger(logger)
r.demuxer.setLogger(logger)
}
func (r *Reactor) Start() {
r.scheduler = newRoutine("scheduler", schedulerHandle)
r.processor = newRoutine("processor", processorHandle)
r.demuxer = newDemuxer(r.scheduler, r.processor)
r.ticker = time.NewTicker(1 * time.Second)
go r.scheduler.start()
go r.processor.start()
go r.demuxer.start()
<-r.scheduler.ready()
<-r.processor.ready()
<-r.demuxer.ready()
go func() {
for t := range r.ticker.C {
r.demuxer.trySend(timeCheck{t})
}
}()
}
func (r *Reactor) Wait() {
fmt.Println("completed routines")
r.Stop()
}
func (r *Reactor) Stop() {
fmt.Println("reactor stopping")
r.ticker.Stop()
r.demuxer.stop()
r.scheduler.stop()
r.processor.stop()
// todo: accumulator
// todo: io
fmt.Println("reactor stopped")
}
func (r *Reactor) Receive(event Event) {
fmt.Println("receive event")
sent := r.demuxer.trySend(event)
if !sent {
fmt.Println("demuxer is full")
}
}
func (r *Reactor) AddPeer() {
// TODO: add peer event and send to demuxer
}

View File

@@ -0,0 +1,17 @@
package v2
import "testing"
// XXX: This makes assumptions about the message routing
func TestReactor(t *testing.T) {
reactor := Reactor{}
reactor.Start()
script := Events{
struct{}{},
}
for _, event := range script {
reactor.Receive(event)
}
reactor.Wait()
}

200
blockchain/v2/routine.go Normal file
View File

@@ -0,0 +1,200 @@
package v2
import (
"fmt"
"sync/atomic"
"github.com/tendermint/tendermint/libs/log"
)
// TODO
// * revisit panic conditions
// * audit log levels
// * Convert routine to an interface with concrete implmentation
type handleFunc = func(event Event) (Events, error)
// Routines are a structure which model a finite state machine as serialized
// stream of events processed by a handle function. This Routine structure
// handles the concurrency and messaging guarantees. Events are sent via
// `trySend` are handled by the `handle` function to produce an iterator
// `next()`. Calling `close()` on a routine will conclude processing of all
// sent events and produce `last()` event representing the terminal state.
type Routine struct {
name string
input chan Event
errors chan error
out chan Event
stopped chan struct{}
rdy chan struct{}
fin chan error
running *uint32
handle handleFunc
logger log.Logger
metrics *Metrics
stopping *uint32
}
func newRoutine(name string, handleFunc handleFunc) *Routine {
return &Routine{
name: name,
input: make(chan Event, 1),
handle: handleFunc,
errors: make(chan error, 1),
out: make(chan Event, 1),
stopped: make(chan struct{}, 1),
rdy: make(chan struct{}, 1),
fin: make(chan error, 1),
running: new(uint32),
stopping: new(uint32),
logger: log.NewNopLogger(),
metrics: NopMetrics(),
}
}
func (rt *Routine) setLogger(logger log.Logger) {
rt.logger = logger
}
// nolint:unused
func (rt *Routine) setMetrics(metrics *Metrics) {
rt.metrics = metrics
}
func (rt *Routine) start() {
rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name))
starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1))
if !starting {
panic("Routine has already started")
}
close(rt.rdy)
errorsDrained := false
for {
if !rt.isRunning() {
rt.logger.Info(fmt.Sprintf("%s: breaking because not running\n", rt.name))
break
}
select {
case iEvent, ok := <-rt.input:
rt.metrics.EventsIn.With("routine", rt.name).Add(1)
if !ok {
if !errorsDrained {
rt.logger.Info(fmt.Sprintf("%s: waiting for errors to drain\n", rt.name))
continue // wait for errors to be drainned
}
rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name))
close(rt.stopped)
rt.terminate(fmt.Errorf("stopped"))
return
}
oEvents, err := rt.handle(iEvent)
rt.metrics.EventsHandled.With("routine", rt.name).Add(1)
if err != nil {
rt.terminate(err)
return
}
rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents)))
rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents)))
for _, event := range oEvents {
rt.logger.Info(fmt.Sprintln("writing back to output"))
rt.out <- event
}
case iEvent, ok := <-rt.errors:
rt.metrics.ErrorsIn.With("routine", rt.name).Add(1)
if !ok {
rt.logger.Info(fmt.Sprintf("%s: errors closed\n", rt.name))
errorsDrained = true
continue
}
oEvents, err := rt.handle(iEvent)
rt.metrics.ErrorsHandled.With("routine", rt.name).Add(1)
if err != nil {
rt.terminate(err)
return
}
rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents)))
for _, event := range oEvents {
rt.out <- event
}
}
}
}
func (rt *Routine) feedback() {
for event := range rt.out {
rt.trySend(event)
}
}
func (rt *Routine) trySend(event Event) bool {
if !rt.isRunning() || rt.isStopping() {
return false
}
rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event))
if err, ok := event.(error); ok {
select {
case rt.errors <- err:
rt.metrics.ErrorsSent.With("routine", rt.name).Add(1)
return true
default:
rt.metrics.ErrorsShed.With("routine", rt.name).Add(1)
rt.logger.Info(fmt.Sprintf("%s: errors channel was full\n", rt.name))
return false
}
} else {
select {
case rt.input <- event:
rt.metrics.EventsSent.With("routine", rt.name).Add(1)
return true
default:
rt.metrics.EventsShed.With("routine", rt.name).Add(1)
rt.logger.Info(fmt.Sprintf("%s: channel was full\n", rt.name))
return false
}
}
}
func (rt *Routine) isRunning() bool {
return atomic.LoadUint32(rt.running) == 1
}
func (rt *Routine) isStopping() bool {
return atomic.LoadUint32(rt.stopping) == 1
}
func (rt *Routine) ready() chan struct{} {
return rt.rdy
}
func (rt *Routine) next() chan Event {
return rt.out
}
func (rt *Routine) stop() {
if !rt.isRunning() {
return
}
rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name))
stopping := atomic.CompareAndSwapUint32(rt.stopping, uint32(0), uint32(1))
if !stopping {
panic("Routine has already stopped")
}
close(rt.input)
close(rt.errors)
<-rt.stopped
}
func (rt *Routine) final() chan error {
return rt.fin
}
func (rt *Routine) terminate(reason error) {
stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0))
if !stopped {
panic("called stop but already stopped")
}
rt.fin <- reason
}

View File

@@ -0,0 +1,140 @@
package v2
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/libs/log"
)
type eventA struct{}
type eventB struct{}
type errEvent struct{}
var done = fmt.Errorf("done")
func simpleHandler(event Event) (Events, error) {
switch event.(type) {
case eventA:
return Events{eventB{}}, nil
case eventB:
return Events{}, done
}
return Events{}, nil
}
func TestRoutine(t *testing.T) {
routine := newRoutine("simpleRoutine", simpleHandler)
assert.False(t, routine.isRunning(),
"expected an initialized routine to not be running")
go routine.start()
go routine.feedback()
<-routine.ready()
assert.True(t, routine.trySend(eventA{}),
"expected sending to a ready routine to succeed")
assert.Equal(t, done, <-routine.final(),
"expected the final event to be done")
}
func TestRoutineSend(t *testing.T) {
routine := newRoutine("simpleRoutine", simpleHandler)
assert.False(t, routine.trySend(eventA{}),
"expected sending to an unstarted routine to fail")
go routine.start()
go routine.feedback()
<-routine.ready()
assert.True(t, routine.trySend(eventA{}),
"expected sending to a running routine to succeed")
routine.stop()
assert.False(t, routine.trySend(eventA{}),
"expected sending to a stopped routine to fail")
}
type finalCount struct {
count int
}
func (f finalCount) Error() string {
return "end"
}
func genStatefulHandler(maxCount int) handleFunc {
counter := 0
return func(event Event) (Events, error) {
// golint fixme
switch event.(type) {
case eventA:
counter += 1
if counter >= maxCount {
return Events{}, finalCount{counter}
}
return Events{eventA{}}, nil
case eventB:
return Events{}, nil
}
return Events{}, nil
}
}
func TestStatefulRoutine(t *testing.T) {
count := 10
handler := genStatefulHandler(count)
routine := newRoutine("statefulRoutine", handler)
routine.setLogger(log.TestingLogger())
go routine.start()
go routine.feedback()
<-routine.ready()
assert.True(t, routine.trySend(eventA{}),
"expected sending to a started routine to succeed")
final := <-routine.final()
fnl, ok := final.(finalCount)
if ok {
assert.Equal(t, count, fnl.count,
"expected the routine to count to 10")
} else {
t.Fail()
}
}
func handleWithErrors(event Event) (Events, error) {
switch event.(type) {
case eventA:
return Events{}, nil
case errEvent:
return Events{}, done
}
return Events{}, nil
}
func TestErrorSaturation(t *testing.T) {
routine := newRoutine("errorRoutine", handleWithErrors)
go routine.start()
<-routine.ready()
go func() {
for {
routine.trySend(eventA{})
time.Sleep(10 * time.Millisecond)
}
}()
assert.True(t, routine.trySend(errEvent{}),
"expected send to succeed even when saturated")
assert.Equal(t, done, <-routine.final())
}

4
blockchain/v2/types.go Normal file
View File

@@ -0,0 +1,4 @@
package v2
type Event interface{}
type Events []Event