mirror of
https://github.com/fluencelabs/tendermint
synced 2025-07-30 19:51:58 +00:00
Compare commits
12 Commits
updateconf
...
brapse/blo
Author | SHA1 | Date | |
---|---|---|---|
|
9d41770a99 | ||
|
f81c319ece | ||
|
78d4c3b88a | ||
|
2c8cbfc26a | ||
|
acbfe67fb8 | ||
|
aeac4743cc | ||
|
e826ca3c49 | ||
|
5b880fbcff | ||
|
c081b60ef6 | ||
|
e4913f533a | ||
|
0cf9f86292 | ||
|
d1671d6175 |
166
blockchain/v2/demuxer.go
Normal file
166
blockchain/v2/demuxer.go
Normal file
@@ -0,0 +1,166 @@
|
||||
// nolint:unused
|
||||
package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
type scFull struct{}
|
||||
type pcFull struct{}
|
||||
|
||||
const demuxerBufferSize = 10
|
||||
|
||||
type demuxer struct {
|
||||
input chan Event
|
||||
scheduler *Routine
|
||||
processor *Routine
|
||||
fin chan error
|
||||
stopped chan struct{}
|
||||
rdy chan struct{}
|
||||
running *uint32
|
||||
stopping *uint32
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func newDemuxer(scheduler *Routine, processor *Routine) *demuxer {
|
||||
return &demuxer{
|
||||
input: make(chan Event, demuxerBufferSize),
|
||||
scheduler: scheduler,
|
||||
processor: processor,
|
||||
stopped: make(chan struct{}, 1),
|
||||
fin: make(chan error, 1),
|
||||
rdy: make(chan struct{}, 1),
|
||||
running: new(uint32),
|
||||
stopping: new(uint32),
|
||||
logger: log.NewNopLogger(),
|
||||
}
|
||||
}
|
||||
|
||||
func (dm *demuxer) setLogger(logger log.Logger) {
|
||||
dm.logger = logger
|
||||
}
|
||||
|
||||
func (dm *demuxer) start() {
|
||||
starting := atomic.CompareAndSwapUint32(dm.running, uint32(0), uint32(1))
|
||||
if !starting {
|
||||
panic("Routine has already started")
|
||||
}
|
||||
dm.logger.Info("demuxer: run")
|
||||
dm.rdy <- struct{}{}
|
||||
for {
|
||||
if !dm.isRunning() {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case event, ok := <-dm.input:
|
||||
if !ok {
|
||||
dm.logger.Info("demuxer: stopping")
|
||||
dm.terminate(fmt.Errorf("stopped"))
|
||||
dm.stopped <- struct{}{}
|
||||
return
|
||||
}
|
||||
oEvents, err := dm.handle(event)
|
||||
if err != nil {
|
||||
dm.terminate(err)
|
||||
return
|
||||
}
|
||||
for _, event := range oEvents {
|
||||
dm.input <- event
|
||||
}
|
||||
case event, ok := <-dm.scheduler.next():
|
||||
if !ok {
|
||||
dm.logger.Info("demuxer: scheduler output closed")
|
||||
continue
|
||||
}
|
||||
oEvents, err := dm.handle(event)
|
||||
if err != nil {
|
||||
dm.terminate(err)
|
||||
return
|
||||
}
|
||||
for _, event := range oEvents {
|
||||
dm.input <- event
|
||||
}
|
||||
case event, ok := <-dm.processor.next():
|
||||
if !ok {
|
||||
dm.logger.Info("demuxer: processor output closed")
|
||||
continue
|
||||
}
|
||||
oEvents, err := dm.handle(event)
|
||||
if err != nil {
|
||||
dm.terminate(err)
|
||||
return
|
||||
}
|
||||
for _, event := range oEvents {
|
||||
dm.input <- event
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dm *demuxer) handle(event Event) (Events, error) {
|
||||
received := dm.scheduler.trySend(event)
|
||||
if !received {
|
||||
return Events{scFull{}}, nil // backpressure
|
||||
}
|
||||
|
||||
received = dm.processor.trySend(event)
|
||||
if !received {
|
||||
return Events{pcFull{}}, nil // backpressure
|
||||
}
|
||||
|
||||
return Events{}, nil
|
||||
}
|
||||
|
||||
func (dm *demuxer) trySend(event Event) bool {
|
||||
if !dm.isRunning() || dm.isStopping() {
|
||||
dm.logger.Info("dummuxer isn't running")
|
||||
return false
|
||||
}
|
||||
select {
|
||||
case dm.input <- event:
|
||||
return true
|
||||
default:
|
||||
dm.logger.Info("demuxer channel was full")
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (dm *demuxer) isRunning() bool {
|
||||
return atomic.LoadUint32(dm.running) == 1
|
||||
}
|
||||
|
||||
func (dm *demuxer) isStopping() bool {
|
||||
return atomic.LoadUint32(dm.stopping) == 1
|
||||
}
|
||||
|
||||
func (dm *demuxer) ready() chan struct{} {
|
||||
return dm.rdy
|
||||
}
|
||||
|
||||
func (dm *demuxer) stop() {
|
||||
if !dm.isRunning() {
|
||||
return
|
||||
}
|
||||
stopping := atomic.CompareAndSwapUint32(dm.stopping, uint32(0), uint32(1))
|
||||
if !stopping {
|
||||
panic("Demuxer has already stopped")
|
||||
}
|
||||
dm.logger.Info("demuxer stop")
|
||||
close(dm.input)
|
||||
<-dm.stopped
|
||||
}
|
||||
|
||||
func (dm *demuxer) terminate(reason error) {
|
||||
stopped := atomic.CompareAndSwapUint32(dm.running, uint32(1), uint32(0))
|
||||
if !stopped {
|
||||
panic("called terminate but already terminated")
|
||||
}
|
||||
dm.fin <- reason
|
||||
}
|
||||
|
||||
func (dm *demuxer) final() chan error {
|
||||
return dm.fin
|
||||
}
|
124
blockchain/v2/metrics.go
Normal file
124
blockchain/v2/metrics.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/go-kit/kit/metrics/discard"
|
||||
"github.com/go-kit/kit/metrics/prometheus"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
|
||||
// package.
|
||||
MetricsSubsystem = "blockchain"
|
||||
)
|
||||
|
||||
// Metrics contains metrics exposed by this package.
|
||||
type Metrics struct {
|
||||
// events_in
|
||||
EventsIn metrics.Counter
|
||||
// events_in
|
||||
EventsHandled metrics.Counter
|
||||
// events_out
|
||||
EventsOut metrics.Counter
|
||||
// errors_in
|
||||
ErrorsIn metrics.Counter
|
||||
// errors_handled
|
||||
ErrorsHandled metrics.Counter
|
||||
// errors_out
|
||||
ErrorsOut metrics.Counter
|
||||
// events_shed
|
||||
EventsShed metrics.Counter
|
||||
// events_sent
|
||||
EventsSent metrics.Counter
|
||||
// errors_sent
|
||||
ErrorsSent metrics.Counter
|
||||
// errors_shed
|
||||
ErrorsShed metrics.Counter
|
||||
}
|
||||
|
||||
// Can we burn in the routine name here?
|
||||
func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
|
||||
labels := []string{}
|
||||
for i := 0; i < len(labelsAndValues); i += 2 {
|
||||
labels = append(labels, labelsAndValues[i])
|
||||
}
|
||||
return &Metrics{
|
||||
EventsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "events_in",
|
||||
Help: "Events read from the channel.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
EventsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "events_handled",
|
||||
Help: "Events handled",
|
||||
}, labels).With(labelsAndValues...),
|
||||
EventsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "events_out",
|
||||
Help: "Events output from routine.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ErrorsIn: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "errors_in",
|
||||
Help: "Errors read from the channel.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ErrorsHandled: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "errors_handled",
|
||||
Help: "Errors handled.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ErrorsOut: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "errors_out",
|
||||
Help: "Errors output from routine.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ErrorsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "errors_sent",
|
||||
Help: "Errors sent to routine.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
ErrorsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "errors_shed",
|
||||
Help: "Errors dropped from sending.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
EventsSent: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "events_sent",
|
||||
Help: "Events sent to routine.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
EventsShed: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: MetricsSubsystem,
|
||||
Name: "events_shed",
|
||||
Help: "Events dropped from sending.",
|
||||
}, labels).With(labelsAndValues...),
|
||||
}
|
||||
}
|
||||
|
||||
// NopMetrics returns no-op Metrics.
|
||||
func NopMetrics() *Metrics {
|
||||
return &Metrics{
|
||||
EventsIn: discard.NewCounter(),
|
||||
EventsHandled: discard.NewCounter(),
|
||||
EventsOut: discard.NewCounter(),
|
||||
ErrorsIn: discard.NewCounter(),
|
||||
ErrorsHandled: discard.NewCounter(),
|
||||
ErrorsOut: discard.NewCounter(),
|
||||
EventsShed: discard.NewCounter(),
|
||||
EventsSent: discard.NewCounter(),
|
||||
ErrorsSent: discard.NewCounter(),
|
||||
ErrorsShed: discard.NewCounter(),
|
||||
}
|
||||
}
|
97
blockchain/v2/reactor.go
Normal file
97
blockchain/v2/reactor.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
type timeCheck struct {
|
||||
time time.Time
|
||||
}
|
||||
|
||||
func schedulerHandle(event Event) (Events, error) {
|
||||
switch event.(type) {
|
||||
case timeCheck:
|
||||
fmt.Println("scheduler handle timeCheck")
|
||||
case Event:
|
||||
fmt.Println("scheduler handle testEvent")
|
||||
}
|
||||
return Events{}, nil
|
||||
}
|
||||
|
||||
func processorHandle(event Event) (Events, error) {
|
||||
switch event.(type) {
|
||||
case timeCheck:
|
||||
fmt.Println("processor handle timeCheck")
|
||||
case Event:
|
||||
fmt.Println("processor handle event")
|
||||
}
|
||||
return Events{}, nil
|
||||
}
|
||||
|
||||
type Reactor struct {
|
||||
demuxer *demuxer
|
||||
scheduler *Routine
|
||||
processor *Routine
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
// nolint:unused
|
||||
func (r *Reactor) setLogger(logger log.Logger) {
|
||||
r.scheduler.setLogger(logger)
|
||||
r.processor.setLogger(logger)
|
||||
r.demuxer.setLogger(logger)
|
||||
}
|
||||
|
||||
func (r *Reactor) Start() {
|
||||
r.scheduler = newRoutine("scheduler", schedulerHandle)
|
||||
r.processor = newRoutine("processor", processorHandle)
|
||||
r.demuxer = newDemuxer(r.scheduler, r.processor)
|
||||
r.ticker = time.NewTicker(1 * time.Second)
|
||||
|
||||
go r.scheduler.start()
|
||||
go r.processor.start()
|
||||
go r.demuxer.start()
|
||||
|
||||
<-r.scheduler.ready()
|
||||
<-r.processor.ready()
|
||||
<-r.demuxer.ready()
|
||||
|
||||
go func() {
|
||||
for t := range r.ticker.C {
|
||||
r.demuxer.trySend(timeCheck{t})
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *Reactor) Wait() {
|
||||
fmt.Println("completed routines")
|
||||
r.Stop()
|
||||
}
|
||||
|
||||
func (r *Reactor) Stop() {
|
||||
fmt.Println("reactor stopping")
|
||||
|
||||
r.ticker.Stop()
|
||||
r.demuxer.stop()
|
||||
r.scheduler.stop()
|
||||
r.processor.stop()
|
||||
// todo: accumulator
|
||||
// todo: io
|
||||
|
||||
fmt.Println("reactor stopped")
|
||||
}
|
||||
|
||||
func (r *Reactor) Receive(event Event) {
|
||||
fmt.Println("receive event")
|
||||
sent := r.demuxer.trySend(event)
|
||||
if !sent {
|
||||
fmt.Println("demuxer is full")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reactor) AddPeer() {
|
||||
// TODO: add peer event and send to demuxer
|
||||
}
|
17
blockchain/v2/reactor_test.go
Normal file
17
blockchain/v2/reactor_test.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package v2
|
||||
|
||||
import "testing"
|
||||
|
||||
// XXX: This makes assumptions about the message routing
|
||||
func TestReactor(t *testing.T) {
|
||||
reactor := Reactor{}
|
||||
reactor.Start()
|
||||
script := Events{
|
||||
struct{}{},
|
||||
}
|
||||
|
||||
for _, event := range script {
|
||||
reactor.Receive(event)
|
||||
}
|
||||
reactor.Wait()
|
||||
}
|
200
blockchain/v2/routine.go
Normal file
200
blockchain/v2/routine.go
Normal file
@@ -0,0 +1,200 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
// TODO
|
||||
// * revisit panic conditions
|
||||
// * audit log levels
|
||||
// * Convert routine to an interface with concrete implmentation
|
||||
|
||||
type handleFunc = func(event Event) (Events, error)
|
||||
|
||||
// Routines are a structure which model a finite state machine as serialized
|
||||
// stream of events processed by a handle function. This Routine structure
|
||||
// handles the concurrency and messaging guarantees. Events are sent via
|
||||
// `trySend` are handled by the `handle` function to produce an iterator
|
||||
// `next()`. Calling `close()` on a routine will conclude processing of all
|
||||
// sent events and produce `last()` event representing the terminal state.
|
||||
type Routine struct {
|
||||
name string
|
||||
input chan Event
|
||||
errors chan error
|
||||
out chan Event
|
||||
stopped chan struct{}
|
||||
rdy chan struct{}
|
||||
fin chan error
|
||||
running *uint32
|
||||
handle handleFunc
|
||||
logger log.Logger
|
||||
metrics *Metrics
|
||||
stopping *uint32
|
||||
}
|
||||
|
||||
func newRoutine(name string, handleFunc handleFunc) *Routine {
|
||||
return &Routine{
|
||||
name: name,
|
||||
input: make(chan Event, 1),
|
||||
handle: handleFunc,
|
||||
errors: make(chan error, 1),
|
||||
out: make(chan Event, 1),
|
||||
stopped: make(chan struct{}, 1),
|
||||
rdy: make(chan struct{}, 1),
|
||||
fin: make(chan error, 1),
|
||||
running: new(uint32),
|
||||
stopping: new(uint32),
|
||||
logger: log.NewNopLogger(),
|
||||
metrics: NopMetrics(),
|
||||
}
|
||||
}
|
||||
|
||||
func (rt *Routine) setLogger(logger log.Logger) {
|
||||
rt.logger = logger
|
||||
}
|
||||
|
||||
// nolint:unused
|
||||
func (rt *Routine) setMetrics(metrics *Metrics) {
|
||||
rt.metrics = metrics
|
||||
}
|
||||
|
||||
func (rt *Routine) start() {
|
||||
rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name))
|
||||
starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1))
|
||||
if !starting {
|
||||
panic("Routine has already started")
|
||||
}
|
||||
close(rt.rdy)
|
||||
errorsDrained := false
|
||||
for {
|
||||
if !rt.isRunning() {
|
||||
rt.logger.Info(fmt.Sprintf("%s: breaking because not running\n", rt.name))
|
||||
break
|
||||
}
|
||||
select {
|
||||
case iEvent, ok := <-rt.input:
|
||||
rt.metrics.EventsIn.With("routine", rt.name).Add(1)
|
||||
if !ok {
|
||||
if !errorsDrained {
|
||||
rt.logger.Info(fmt.Sprintf("%s: waiting for errors to drain\n", rt.name))
|
||||
|
||||
continue // wait for errors to be drainned
|
||||
}
|
||||
rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name))
|
||||
close(rt.stopped)
|
||||
rt.terminate(fmt.Errorf("stopped"))
|
||||
return
|
||||
}
|
||||
oEvents, err := rt.handle(iEvent)
|
||||
rt.metrics.EventsHandled.With("routine", rt.name).Add(1)
|
||||
if err != nil {
|
||||
rt.terminate(err)
|
||||
return
|
||||
}
|
||||
rt.metrics.EventsOut.With("routine", rt.name).Add(float64(len(oEvents)))
|
||||
rt.logger.Info(fmt.Sprintf("%s handled %d events\n", rt.name, len(oEvents)))
|
||||
for _, event := range oEvents {
|
||||
rt.logger.Info(fmt.Sprintln("writing back to output"))
|
||||
rt.out <- event
|
||||
}
|
||||
case iEvent, ok := <-rt.errors:
|
||||
rt.metrics.ErrorsIn.With("routine", rt.name).Add(1)
|
||||
if !ok {
|
||||
rt.logger.Info(fmt.Sprintf("%s: errors closed\n", rt.name))
|
||||
errorsDrained = true
|
||||
continue
|
||||
}
|
||||
oEvents, err := rt.handle(iEvent)
|
||||
rt.metrics.ErrorsHandled.With("routine", rt.name).Add(1)
|
||||
if err != nil {
|
||||
rt.terminate(err)
|
||||
return
|
||||
}
|
||||
rt.metrics.ErrorsOut.With("routine", rt.name).Add(float64(len(oEvents)))
|
||||
for _, event := range oEvents {
|
||||
rt.out <- event
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
func (rt *Routine) feedback() {
|
||||
for event := range rt.out {
|
||||
rt.trySend(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (rt *Routine) trySend(event Event) bool {
|
||||
if !rt.isRunning() || rt.isStopping() {
|
||||
return false
|
||||
}
|
||||
|
||||
rt.logger.Info(fmt.Sprintf("%s: sending %+v", rt.name, event))
|
||||
if err, ok := event.(error); ok {
|
||||
select {
|
||||
case rt.errors <- err:
|
||||
rt.metrics.ErrorsSent.With("routine", rt.name).Add(1)
|
||||
return true
|
||||
default:
|
||||
rt.metrics.ErrorsShed.With("routine", rt.name).Add(1)
|
||||
rt.logger.Info(fmt.Sprintf("%s: errors channel was full\n", rt.name))
|
||||
return false
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case rt.input <- event:
|
||||
rt.metrics.EventsSent.With("routine", rt.name).Add(1)
|
||||
return true
|
||||
default:
|
||||
rt.metrics.EventsShed.With("routine", rt.name).Add(1)
|
||||
rt.logger.Info(fmt.Sprintf("%s: channel was full\n", rt.name))
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rt *Routine) isRunning() bool {
|
||||
return atomic.LoadUint32(rt.running) == 1
|
||||
}
|
||||
|
||||
func (rt *Routine) isStopping() bool {
|
||||
return atomic.LoadUint32(rt.stopping) == 1
|
||||
}
|
||||
|
||||
func (rt *Routine) ready() chan struct{} {
|
||||
return rt.rdy
|
||||
}
|
||||
|
||||
func (rt *Routine) next() chan Event {
|
||||
return rt.out
|
||||
}
|
||||
|
||||
func (rt *Routine) stop() {
|
||||
if !rt.isRunning() {
|
||||
return
|
||||
}
|
||||
|
||||
rt.logger.Info(fmt.Sprintf("%s: stop\n", rt.name))
|
||||
stopping := atomic.CompareAndSwapUint32(rt.stopping, uint32(0), uint32(1))
|
||||
if !stopping {
|
||||
panic("Routine has already stopped")
|
||||
}
|
||||
|
||||
close(rt.input)
|
||||
close(rt.errors)
|
||||
<-rt.stopped
|
||||
}
|
||||
|
||||
func (rt *Routine) final() chan error {
|
||||
return rt.fin
|
||||
}
|
||||
|
||||
func (rt *Routine) terminate(reason error) {
|
||||
stopped := atomic.CompareAndSwapUint32(rt.running, uint32(1), uint32(0))
|
||||
if !stopped {
|
||||
panic("called stop but already stopped")
|
||||
}
|
||||
rt.fin <- reason
|
||||
}
|
140
blockchain/v2/routine_test.go
Normal file
140
blockchain/v2/routine_test.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
type eventA struct{}
|
||||
type eventB struct{}
|
||||
type errEvent struct{}
|
||||
|
||||
var done = fmt.Errorf("done")
|
||||
|
||||
func simpleHandler(event Event) (Events, error) {
|
||||
switch event.(type) {
|
||||
case eventA:
|
||||
return Events{eventB{}}, nil
|
||||
case eventB:
|
||||
return Events{}, done
|
||||
}
|
||||
return Events{}, nil
|
||||
}
|
||||
|
||||
func TestRoutine(t *testing.T) {
|
||||
routine := newRoutine("simpleRoutine", simpleHandler)
|
||||
|
||||
assert.False(t, routine.isRunning(),
|
||||
"expected an initialized routine to not be running")
|
||||
go routine.start()
|
||||
go routine.feedback()
|
||||
<-routine.ready()
|
||||
|
||||
assert.True(t, routine.trySend(eventA{}),
|
||||
"expected sending to a ready routine to succeed")
|
||||
|
||||
assert.Equal(t, done, <-routine.final(),
|
||||
"expected the final event to be done")
|
||||
}
|
||||
|
||||
func TestRoutineSend(t *testing.T) {
|
||||
routine := newRoutine("simpleRoutine", simpleHandler)
|
||||
|
||||
assert.False(t, routine.trySend(eventA{}),
|
||||
"expected sending to an unstarted routine to fail")
|
||||
|
||||
go routine.start()
|
||||
|
||||
go routine.feedback()
|
||||
<-routine.ready()
|
||||
|
||||
assert.True(t, routine.trySend(eventA{}),
|
||||
"expected sending to a running routine to succeed")
|
||||
|
||||
routine.stop()
|
||||
|
||||
assert.False(t, routine.trySend(eventA{}),
|
||||
"expected sending to a stopped routine to fail")
|
||||
}
|
||||
|
||||
type finalCount struct {
|
||||
count int
|
||||
}
|
||||
|
||||
func (f finalCount) Error() string {
|
||||
return "end"
|
||||
}
|
||||
|
||||
func genStatefulHandler(maxCount int) handleFunc {
|
||||
counter := 0
|
||||
return func(event Event) (Events, error) {
|
||||
// golint fixme
|
||||
switch event.(type) {
|
||||
case eventA:
|
||||
counter += 1
|
||||
if counter >= maxCount {
|
||||
return Events{}, finalCount{counter}
|
||||
}
|
||||
|
||||
return Events{eventA{}}, nil
|
||||
case eventB:
|
||||
return Events{}, nil
|
||||
}
|
||||
return Events{}, nil
|
||||
}
|
||||
}
|
||||
|
||||
func TestStatefulRoutine(t *testing.T) {
|
||||
count := 10
|
||||
handler := genStatefulHandler(count)
|
||||
routine := newRoutine("statefulRoutine", handler)
|
||||
routine.setLogger(log.TestingLogger())
|
||||
|
||||
go routine.start()
|
||||
go routine.feedback()
|
||||
|
||||
<-routine.ready()
|
||||
|
||||
assert.True(t, routine.trySend(eventA{}),
|
||||
"expected sending to a started routine to succeed")
|
||||
|
||||
final := <-routine.final()
|
||||
fnl, ok := final.(finalCount)
|
||||
if ok {
|
||||
assert.Equal(t, count, fnl.count,
|
||||
"expected the routine to count to 10")
|
||||
} else {
|
||||
t.Fail()
|
||||
}
|
||||
}
|
||||
|
||||
func handleWithErrors(event Event) (Events, error) {
|
||||
switch event.(type) {
|
||||
case eventA:
|
||||
return Events{}, nil
|
||||
case errEvent:
|
||||
return Events{}, done
|
||||
}
|
||||
return Events{}, nil
|
||||
}
|
||||
|
||||
func TestErrorSaturation(t *testing.T) {
|
||||
routine := newRoutine("errorRoutine", handleWithErrors)
|
||||
go routine.start()
|
||||
<-routine.ready()
|
||||
go func() {
|
||||
for {
|
||||
routine.trySend(eventA{})
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
|
||||
assert.True(t, routine.trySend(errEvent{}),
|
||||
"expected send to succeed even when saturated")
|
||||
|
||||
assert.Equal(t, done, <-routine.final())
|
||||
}
|
4
blockchain/v2/types.go
Normal file
4
blockchain/v2/types.go
Normal file
@@ -0,0 +1,4 @@
|
||||
package v2
|
||||
|
||||
type Event interface{}
|
||||
type Events []Event
|
Reference in New Issue
Block a user