diff --git a/blockchain/v2/routines.go b/blockchain/v2/routines.go index 717eccb0..85eadf93 100644 --- a/blockchain/v2/routines.go +++ b/blockchain/v2/routines.go @@ -49,8 +49,9 @@ type timeCheck struct { // scheduler type scheduler struct { - input chan Event - output chan Event + input chan Event + output chan Event + stopped chan struct{} } func newScheduler(output chan Event) *scheduler { @@ -61,18 +62,18 @@ func newScheduler(output chan Event) *scheduler { } } -func (hd *scheduler) run() { +func (sc *scheduler) run() { fmt.Println("scheduler run") for { - iEvent := <-hd.input - _, ok := iEvent.(stopEvent) - if ok { + iEvent, ok := <-sc.input + if !ok { fmt.Println("stopping scheduler") + sc.stopped <- struct{}{} break } - oEvents := hd.handle(iEvent) + oEvents := sc.handle(iEvent) for _, event := range oEvents { - hd.output <- event + sc.output <- event } } } @@ -98,15 +99,17 @@ func (sc *scheduler) handle(event Event) Events { return Events{} } -func (hd *scheduler) stop() { +func (sc *scheduler) stop() { fmt.Println("scheduler stop") - hd.input <- stopEvent{} + close(sc.input) + <-sc.stopped } // processor type processor struct { - input chan Event - output chan Event + input chan Event + output chan Event + stopped chan struct{} } func newProcessor(output chan Event) *processor { @@ -117,26 +120,27 @@ func newProcessor(output chan Event) *processor { } } -func (hd *processor) run() { +func (pc *processor) run() { fmt.Println("processor run") for { - iEvent := <-hd.input - _, ok := iEvent.(stopEvent) - if ok { + iEvent, ok := <-pc.input + if !ok { fmt.Println("stopping processor") + pc.stopped <- struct{}{} break } - oEvents := hd.handle(iEvent) + + oEvents := pc.handle(iEvent) for _, event := range oEvents { - hd.output <- event + pc.output <- event } } } -func (fs *processor) send(event Event) bool { +func (pc *processor) send(event Event) bool { fmt.Println("processor send") select { - case fs.input <- event: + case pc.input <- event: return true default: fmt.Println("processor channel was full") @@ -144,7 +148,7 @@ func (fs *processor) send(event Event) bool { } } -func (sc *processor) handle(event Event) Events { +func (pc *processor) handle(event Event) Events { switch event.(type) { case timeCheck: fmt.Println("processor handle timeCheck") @@ -154,9 +158,10 @@ func (sc *processor) handle(event Event) Events { return Events{} } -func (hd *processor) stop() { +func (pc *processor) stop() { fmt.Println("processor stop") - hd.input <- stopEvent{} + close(pc.input) + <-pc.stopped } // demuxer @@ -165,6 +170,7 @@ type demuxer struct { output chan Event scheduler *scheduler processor *processor + stopped chan struct{} } func newDemuxer(output chan Event, scheduler *scheduler, processor *processor) *demuxer { @@ -180,7 +186,13 @@ func newDemuxer(output chan Event, scheduler *scheduler, processor *processor) * func (dm *demuxer) run() { fmt.Println("Running demuxer") for { - event := <-dm.input + // so now we need a way to flush + event, ok := <-dm.input + if !ok { + fmt.Println("demuxer stopping") + dm.stopped <- struct{}{} + break + } // event.time = time.Now() received := dm.scheduler.send(event) if !received { @@ -191,19 +203,14 @@ func (dm *demuxer) run() { if !received { panic("couldn't send to the processor") } - - _, ok := event.(stopEvent) - if ok { - fmt.Println("demuxer stopping") - break - } } } -func (fs *demuxer) send(event Event) bool { +func (dm *demuxer) send(event Event) bool { fmt.Println("demuxer send") + // we need to close if this is closed first select { - case fs.input <- event: + case dm.input <- event: return true default: fmt.Println("demuxer channel was full") @@ -211,42 +218,63 @@ func (fs *demuxer) send(event Event) bool { } } -func (hd *demuxer) stop() { +func (dm *demuxer) stop() { fmt.Println("demuxer stop") - hd.input <- stopEvent{} + close(dm.input) + <-dm.stopped + fmt.Println("demuxer stopped") } // reactor type DummyReactor struct { - events chan Event - demuxer *demuxer - ticker *time.Ticker + events chan Event + demuxer *demuxer + scheduler *scheduler + processor *processor + ticker *time.Ticker + tickerStopped chan struct{} } func (dr *DummyReactor) Start() { bufferSize := 10 events := make(chan Event, bufferSize) - scheduler := newScheduler(events) - processor := newProcessor(events) - dr.demuxer = newDemuxer(events, scheduler, processor) - dr.ticker = time.NewTicker(1 * time.Second) + dr.scheduler = newScheduler(events) + dr.processor = newProcessor(events) + dr.demuxer = newDemuxer(events, dr.scheduler, dr.processor) + dr.tickerStopped = make(chan struct{}) - go scheduler.run() - go processor.run() + go dr.scheduler.run() + go dr.processor.run() go dr.demuxer.run() + go func() { - for t := range dr.ticker.C { - dr.demuxer.send(timeCheck{t}) + ticker := time.NewTicker(1 * time.Second) + for { + select { + case <-ticker.C: + dr.demuxer.send(timeCheck{}) + case <-dr.tickerStopped: + fmt.Println("ticker stopped") + return + } } }() } +// XXX: We need to have a smooth shutdown process func (dr *DummyReactor) Stop() { + fmt.Println("reactor stopping") // this should be synchronous - dr.ticker.Stop() - dr.demuxer.stop() + dr.tickerStopped <- struct{}{} + fmt.Println("waiting for ticker") + // the order here matters + dr.demuxer.stop() // this need to drain first + dr.scheduler.stop() + dr.processor.stop() + + fmt.Println("reactor stopped") } func (dr *DummyReactor) Receive(event Event) {