handlers now flush all events before stopping

This commit is contained in:
Sean Braithwaite
2019-07-28 09:07:35 +02:00
parent 6621857bfb
commit ed7b3bb89b

View File

@ -55,17 +55,17 @@ type scheduler struct {
} }
func newScheduler(output chan Event) *scheduler { func newScheduler(output chan Event) *scheduler {
input := make(chan Event, 1)
return &scheduler{ return &scheduler{
input: input, input: make(chan Event, 1),
output: output, output: output,
stopped: make(chan struct{}, 1),
} }
} }
func (sc *scheduler) run() { func (sc *scheduler) run() {
fmt.Println("scheduler run") fmt.Println("scheduler run")
for { for {
iEvent, ok := <-sc.input iEvent, ok := <-sc.input // stuckhere
if !ok { if !ok {
fmt.Println("stopping scheduler") fmt.Println("stopping scheduler")
sc.stopped <- struct{}{} sc.stopped <- struct{}{}
@ -113,10 +113,10 @@ type processor struct {
} }
func newProcessor(output chan Event) *processor { func newProcessor(output chan Event) *processor {
input := make(chan Event, 1)
return &processor{ return &processor{
input: input, input: make(chan Event, 1),
output: output, output: output,
stopped: make(chan struct{}, 1),
} }
} }
@ -174,10 +174,10 @@ type demuxer struct {
} }
func newDemuxer(output chan Event, scheduler *scheduler, processor *processor) *demuxer { func newDemuxer(output chan Event, scheduler *scheduler, processor *processor) *demuxer {
input := make(chan Event, 1)
return &demuxer{ return &demuxer{
input: input, input: make(chan Event, 1),
output: output, output: output,
stopped: make(chan struct{}, 1),
scheduler: scheduler, scheduler: scheduler,
processor: processor, processor: processor,
} }
@ -263,14 +263,12 @@ func (dr *DummyReactor) Start() {
}() }()
} }
// XXX: We need to have a smooth shutdown process
func (dr *DummyReactor) Stop() { func (dr *DummyReactor) Stop() {
fmt.Println("reactor stopping") fmt.Println("reactor stopping")
// this should be synchronous // this should be synchronous
dr.tickerStopped <- struct{}{} dr.tickerStopped <- struct{}{}
fmt.Println("waiting for ticker") fmt.Println("waiting for ticker")
// the order here matters dr.demuxer.stop()
dr.demuxer.stop() // this need to drain first
dr.scheduler.stop() dr.scheduler.stop()
dr.processor.stop() dr.processor.stop()