diff --git a/blockchain/v2/routines.go b/blockchain/v2/routines.go index 85eadf93..89d137f1 100644 --- a/blockchain/v2/routines.go +++ b/blockchain/v2/routines.go @@ -55,17 +55,17 @@ type scheduler struct { } func newScheduler(output chan Event) *scheduler { - input := make(chan Event, 1) return &scheduler{ - input: input, - output: output, + input: make(chan Event, 1), + output: output, + stopped: make(chan struct{}, 1), } } func (sc *scheduler) run() { fmt.Println("scheduler run") for { - iEvent, ok := <-sc.input + iEvent, ok := <-sc.input // stuckhere if !ok { fmt.Println("stopping scheduler") sc.stopped <- struct{}{} @@ -113,10 +113,10 @@ type processor struct { } func newProcessor(output chan Event) *processor { - input := make(chan Event, 1) return &processor{ - input: input, - output: output, + input: make(chan Event, 1), + output: output, + stopped: make(chan struct{}, 1), } } @@ -174,10 +174,10 @@ type demuxer struct { } func newDemuxer(output chan Event, scheduler *scheduler, processor *processor) *demuxer { - input := make(chan Event, 1) return &demuxer{ - input: input, + input: make(chan Event, 1), output: output, + stopped: make(chan struct{}, 1), scheduler: scheduler, processor: processor, } @@ -263,14 +263,12 @@ func (dr *DummyReactor) Start() { }() } -// XXX: We need to have a smooth shutdown process func (dr *DummyReactor) Stop() { fmt.Println("reactor stopping") // this should be synchronous dr.tickerStopped <- struct{}{} fmt.Println("waiting for ticker") - // the order here matters - dr.demuxer.stop() // this need to drain first + dr.demuxer.stop() dr.scheduler.stop() dr.processor.stop()