fixes based on feedback

This commit is contained in:
Sean Braithwaite
2019-08-13 17:57:17 +02:00
parent 2c8cbfc26a
commit 78d4c3b88a
3 changed files with 14 additions and 17 deletions

View File

@ -11,6 +11,8 @@ import (
type scFull struct{} type scFull struct{}
type pcFull struct{} type pcFull struct{}
const demuxerBufferSize = 10
type demuxer struct { type demuxer struct {
input chan Event input chan Event
scheduler *Routine scheduler *Routine
@ -25,7 +27,7 @@ type demuxer struct {
func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { func newDemuxer(scheduler *Routine, processor *Routine) *demuxer {
return &demuxer{ return &demuxer{
input: make(chan Event, 10), input: make(chan Event, demuxerBufferSize),
scheduler: scheduler, scheduler: scheduler,
processor: processor, processor: processor,
stopped: make(chan struct{}, 1), stopped: make(chan struct{}, 1),

View File

@ -8,6 +8,7 @@ import (
) )
type timeCheck struct { type timeCheck struct {
time time.Time
} }
func schedulerHandle(event Event) (Events, error) { func schedulerHandle(event Event) (Events, error) {
@ -31,10 +32,10 @@ func processorHandle(event Event) (Events, error) {
} }
type Reactor struct { type Reactor struct {
demuxer *demuxer demuxer *demuxer
scheduler *Routine scheduler *Routine
processor *Routine processor *Routine
tickerStopped chan struct{} ticker *time.Ticker
} }
// nolint:unused // nolint:unused
@ -48,7 +49,7 @@ func (r *Reactor) Start() {
r.scheduler = newRoutine("scheduler", schedulerHandle) r.scheduler = newRoutine("scheduler", schedulerHandle)
r.processor = newRoutine("processor", processorHandle) r.processor = newRoutine("processor", processorHandle)
r.demuxer = newDemuxer(r.scheduler, r.processor) r.demuxer = newDemuxer(r.scheduler, r.processor)
r.tickerStopped = make(chan struct{}) r.ticker = time.NewTicker(1 * time.Second)
go r.scheduler.start() go r.scheduler.start()
go r.processor.start() go r.processor.start()
@ -59,15 +60,8 @@ func (r *Reactor) Start() {
<-r.demuxer.ready() <-r.demuxer.ready()
go func() { go func() {
ticker := time.NewTicker(1 * time.Second) for t := range r.ticker.C {
for { r.demuxer.trySend(timeCheck{t})
select {
case <-ticker.C:
r.demuxer.trySend(timeCheck{})
case <-r.tickerStopped:
fmt.Println("ticker stopped")
return
}
} }
}() }()
} }
@ -80,7 +74,7 @@ func (r *Reactor) Wait() {
func (r *Reactor) Stop() { func (r *Reactor) Stop() {
fmt.Println("reactor stopping") fmt.Println("reactor stopping")
r.tickerStopped <- struct{}{} r.ticker.Stop()
r.demuxer.stop() r.demuxer.stop()
r.scheduler.stop() r.scheduler.stop()
r.processor.stop() r.processor.stop()

View File

@ -56,6 +56,7 @@ func (rt *Routine) setMetrics(metrics *Metrics) {
} }
func (rt *Routine) start() { func (rt *Routine) start() {
// what if we call baseService.start
rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name))
starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1))
if !starting { if !starting {
@ -78,7 +79,7 @@ func (rt *Routine) start() {
continue // wait for errors to be drainned continue // wait for errors to be drainned
} }
rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name))
rt.stopped <- struct{}{} close(rt.stopped)
rt.terminate(fmt.Errorf("stopped")) rt.terminate(fmt.Errorf("stopped"))
return return
} }