From 78d4c3b88a098249e00b347cabadae1523d2a42d Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Tue, 13 Aug 2019 17:57:17 +0200 Subject: [PATCH] fixes based on feedback --- blockchain/v2/demuxer.go | 4 +++- blockchain/v2/reactor.go | 24 +++++++++--------------- blockchain/v2/routine.go | 3 ++- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/blockchain/v2/demuxer.go b/blockchain/v2/demuxer.go index 7996ecfa..e19816a2 100644 --- a/blockchain/v2/demuxer.go +++ b/blockchain/v2/demuxer.go @@ -11,6 +11,8 @@ import ( type scFull struct{} type pcFull struct{} +const demuxerBufferSize = 10 + type demuxer struct { input chan Event scheduler *Routine @@ -25,7 +27,7 @@ type demuxer struct { func newDemuxer(scheduler *Routine, processor *Routine) *demuxer { return &demuxer{ - input: make(chan Event, 10), + input: make(chan Event, demuxerBufferSize), scheduler: scheduler, processor: processor, stopped: make(chan struct{}, 1), diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 97226a5d..7c7bf4e2 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -8,6 +8,7 @@ import ( ) type timeCheck struct { + time time.Time } func schedulerHandle(event Event) (Events, error) { @@ -31,10 +32,10 @@ func processorHandle(event Event) (Events, error) { } type Reactor struct { - demuxer *demuxer - scheduler *Routine - processor *Routine - tickerStopped chan struct{} + demuxer *demuxer + scheduler *Routine + processor *Routine + ticker *time.Ticker } // nolint:unused @@ -48,7 +49,7 @@ func (r *Reactor) Start() { r.scheduler = newRoutine("scheduler", schedulerHandle) r.processor = newRoutine("processor", processorHandle) r.demuxer = newDemuxer(r.scheduler, r.processor) - r.tickerStopped = make(chan struct{}) + r.ticker = time.NewTicker(1 * time.Second) go r.scheduler.start() go r.processor.start() @@ -59,15 +60,8 @@ func (r *Reactor) Start() { <-r.demuxer.ready() go func() { - ticker := time.NewTicker(1 * time.Second) - for { - select { - case <-ticker.C: - r.demuxer.trySend(timeCheck{}) - case <-r.tickerStopped: - fmt.Println("ticker stopped") - return - } + for t := range r.ticker.C { + r.demuxer.trySend(timeCheck{t}) } }() } @@ -80,7 +74,7 @@ func (r *Reactor) Wait() { func (r *Reactor) Stop() { fmt.Println("reactor stopping") - r.tickerStopped <- struct{}{} + r.ticker.Stop() r.demuxer.stop() r.scheduler.stop() r.processor.stop() diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 977d2cd1..ecd12c82 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -56,6 +56,7 @@ func (rt *Routine) setMetrics(metrics *Metrics) { } func (rt *Routine) start() { + // what if we call baseService.start rt.logger.Info(fmt.Sprintf("%s: run\n", rt.name)) starting := atomic.CompareAndSwapUint32(rt.running, uint32(0), uint32(1)) if !starting { @@ -78,7 +79,7 @@ func (rt *Routine) start() { continue // wait for errors to be drainned } rt.logger.Info(fmt.Sprintf("%s: stopping\n", rt.name)) - rt.stopped <- struct{}{} + close(rt.stopped) rt.terminate(fmt.Errorf("stopped")) return }