more nonsense

This commit is contained in:
Sean Braithwaite 2019-07-27 11:38:37 +02:00
parent fa60855cca
commit 6621857bfb

View File

@ -51,6 +51,7 @@ type timeCheck struct {
type scheduler struct { type scheduler struct {
input chan Event input chan Event
output chan Event output chan Event
stopped chan struct{}
} }
func newScheduler(output chan Event) *scheduler { func newScheduler(output chan Event) *scheduler {
@ -61,18 +62,18 @@ func newScheduler(output chan Event) *scheduler {
} }
} }
func (hd *scheduler) run() { func (sc *scheduler) run() {
fmt.Println("scheduler run") fmt.Println("scheduler run")
for { for {
iEvent := <-hd.input iEvent, ok := <-sc.input
_, ok := iEvent.(stopEvent) if !ok {
if ok {
fmt.Println("stopping scheduler") fmt.Println("stopping scheduler")
sc.stopped <- struct{}{}
break break
} }
oEvents := hd.handle(iEvent) oEvents := sc.handle(iEvent)
for _, event := range oEvents { for _, event := range oEvents {
hd.output <- event sc.output <- event
} }
} }
} }
@ -98,15 +99,17 @@ func (sc *scheduler) handle(event Event) Events {
return Events{} return Events{}
} }
func (hd *scheduler) stop() { func (sc *scheduler) stop() {
fmt.Println("scheduler stop") fmt.Println("scheduler stop")
hd.input <- stopEvent{} close(sc.input)
<-sc.stopped
} }
// processor // processor
type processor struct { type processor struct {
input chan Event input chan Event
output chan Event output chan Event
stopped chan struct{}
} }
func newProcessor(output chan Event) *processor { func newProcessor(output chan Event) *processor {
@ -117,26 +120,27 @@ func newProcessor(output chan Event) *processor {
} }
} }
func (hd *processor) run() { func (pc *processor) run() {
fmt.Println("processor run") fmt.Println("processor run")
for { for {
iEvent := <-hd.input iEvent, ok := <-pc.input
_, ok := iEvent.(stopEvent) if !ok {
if ok {
fmt.Println("stopping processor") fmt.Println("stopping processor")
pc.stopped <- struct{}{}
break break
} }
oEvents := hd.handle(iEvent)
oEvents := pc.handle(iEvent)
for _, event := range oEvents { for _, event := range oEvents {
hd.output <- event pc.output <- event
} }
} }
} }
func (fs *processor) send(event Event) bool { func (pc *processor) send(event Event) bool {
fmt.Println("processor send") fmt.Println("processor send")
select { select {
case fs.input <- event: case pc.input <- event:
return true return true
default: default:
fmt.Println("processor channel was full") fmt.Println("processor channel was full")
@ -144,7 +148,7 @@ func (fs *processor) send(event Event) bool {
} }
} }
func (sc *processor) handle(event Event) Events { func (pc *processor) handle(event Event) Events {
switch event.(type) { switch event.(type) {
case timeCheck: case timeCheck:
fmt.Println("processor handle timeCheck") fmt.Println("processor handle timeCheck")
@ -154,9 +158,10 @@ func (sc *processor) handle(event Event) Events {
return Events{} return Events{}
} }
func (hd *processor) stop() { func (pc *processor) stop() {
fmt.Println("processor stop") fmt.Println("processor stop")
hd.input <- stopEvent{} close(pc.input)
<-pc.stopped
} }
// demuxer // demuxer
@ -165,6 +170,7 @@ type demuxer struct {
output chan Event output chan Event
scheduler *scheduler scheduler *scheduler
processor *processor processor *processor
stopped chan struct{}
} }
func newDemuxer(output chan Event, scheduler *scheduler, processor *processor) *demuxer { func newDemuxer(output chan Event, scheduler *scheduler, processor *processor) *demuxer {
@ -180,7 +186,13 @@ func newDemuxer(output chan Event, scheduler *scheduler, processor *processor) *
func (dm *demuxer) run() { func (dm *demuxer) run() {
fmt.Println("Running demuxer") fmt.Println("Running demuxer")
for { for {
event := <-dm.input // so now we need a way to flush
event, ok := <-dm.input
if !ok {
fmt.Println("demuxer stopping")
dm.stopped <- struct{}{}
break
}
// event.time = time.Now() // event.time = time.Now()
received := dm.scheduler.send(event) received := dm.scheduler.send(event)
if !received { if !received {
@ -191,19 +203,14 @@ func (dm *demuxer) run() {
if !received { if !received {
panic("couldn't send to the processor") panic("couldn't send to the processor")
} }
_, ok := event.(stopEvent)
if ok {
fmt.Println("demuxer stopping")
break
}
} }
} }
func (fs *demuxer) send(event Event) bool { func (dm *demuxer) send(event Event) bool {
fmt.Println("demuxer send") fmt.Println("demuxer send")
// we need to close if this is closed first
select { select {
case fs.input <- event: case dm.input <- event:
return true return true
default: default:
fmt.Println("demuxer channel was full") fmt.Println("demuxer channel was full")
@ -211,9 +218,11 @@ func (fs *demuxer) send(event Event) bool {
} }
} }
func (hd *demuxer) stop() { func (dm *demuxer) stop() {
fmt.Println("demuxer stop") fmt.Println("demuxer stop")
hd.input <- stopEvent{} close(dm.input)
<-dm.stopped
fmt.Println("demuxer stopped")
} }
// reactor // reactor
@ -221,32 +230,51 @@ func (hd *demuxer) stop() {
type DummyReactor struct { type DummyReactor struct {
events chan Event events chan Event
demuxer *demuxer demuxer *demuxer
scheduler *scheduler
processor *processor
ticker *time.Ticker ticker *time.Ticker
tickerStopped chan struct{}
} }
func (dr *DummyReactor) Start() { func (dr *DummyReactor) Start() {
bufferSize := 10 bufferSize := 10
events := make(chan Event, bufferSize) events := make(chan Event, bufferSize)
scheduler := newScheduler(events) dr.scheduler = newScheduler(events)
processor := newProcessor(events) dr.processor = newProcessor(events)
dr.demuxer = newDemuxer(events, scheduler, processor) dr.demuxer = newDemuxer(events, dr.scheduler, dr.processor)
dr.ticker = time.NewTicker(1 * time.Second) dr.tickerStopped = make(chan struct{})
go scheduler.run() go dr.scheduler.run()
go processor.run() go dr.processor.run()
go dr.demuxer.run() go dr.demuxer.run()
go func() { go func() {
for t := range dr.ticker.C { ticker := time.NewTicker(1 * time.Second)
dr.demuxer.send(timeCheck{t}) for {
select {
case <-ticker.C:
dr.demuxer.send(timeCheck{})
case <-dr.tickerStopped:
fmt.Println("ticker stopped")
return
}
} }
}() }()
} }
// XXX: We need to have a smooth shutdown process
func (dr *DummyReactor) Stop() { func (dr *DummyReactor) Stop() {
fmt.Println("reactor stopping")
// this should be synchronous // this should be synchronous
dr.ticker.Stop() dr.tickerStopped <- struct{}{}
dr.demuxer.stop() fmt.Println("waiting for ticker")
// the order here matters
dr.demuxer.stop() // this need to drain first
dr.scheduler.stop()
dr.processor.stop()
fmt.Println("reactor stopped")
} }
func (dr *DummyReactor) Receive(event Event) { func (dr *DummyReactor) Receive(event Event) {