Refacor routines to reduce duplication

This commit is contained in:
Sean Braithwaite
2019-07-29 08:45:59 +02:00
parent ed7b3bb89b
commit f77fde35c4

View File

@ -6,32 +6,15 @@ import (
) )
/* /*
# Message passing between components TODO:
* output of one routine becomes input for all other routines * Look at refactoring routines
* avoid loops somehow * single struct, paramterize the handle function
* Message priority * introduce an sendError and seperate error channel
* How could this be tested?
# Components have isolated lifecycle management * Ensure Start/Stopping
* Lifecycle management * Ensure all messages sent are processed
* Setup * Ensure that errors can be processed depsite outstanding messages
* Teardown
# Individual
* message passing should be non blocking
* backpressure between components
* Lifecycle management of components
* Observable behavior:
* progress
* blocking components
What would look a test look like?
Lifecycle management
Start/Stop
How to make this non blocking?
How to avoid Thread saturation
How to handle concurrency
*/ */
type testEvent struct { type testEvent struct {
msg string msg string
time time.Time time time.Time
@ -46,50 +29,65 @@ type timeCheck struct {
time time.Time time time.Time
} }
// scheduler type handleFunc = func(event Event) Events
type scheduler struct { // Routine
type Routine struct {
name string
input chan Event input chan Event
output chan Event output chan Event
stopped chan struct{} stopped chan struct{}
handle handleFunc
} }
func newScheduler(output chan Event) *scheduler { func newRoutine(name string, output chan Event, handleFunc handleFunc) *Routine {
return &scheduler{ return &Routine{
name: name,
input: make(chan Event, 1), input: make(chan Event, 1),
output: output, output: output,
stopped: make(chan struct{}, 1), stopped: make(chan struct{}, 1),
handle: handleFunc,
} }
} }
func (sc *scheduler) run() { // XXX: what about error handling?
fmt.Println("scheduler run") // we need an additional error channel here which will ensure
// errors get processed as soon as possible
// XXX: what about state here, we need per routine state
func (rt *Routine) run() {
fmt.Printf("%s: run\n", rt.name)
for { for {
iEvent, ok := <-sc.input // stuckhere iEvent, ok := <-rt.input
if !ok { if !ok {
fmt.Println("stopping scheduler") fmt.Printf("%s: stopping\n", rt.name)
sc.stopped <- struct{}{} rt.stopped <- struct{}{}
break break
} }
oEvents := sc.handle(iEvent) oEvents := rt.handle(iEvent)
for _, event := range oEvents { for _, event := range oEvents {
sc.output <- event rt.output <- event
} }
} }
} }
func (fs *scheduler) send(event Event) bool { func (rt *Routine) send(event Event) bool {
fmt.Println("scheduler send") fmt.Printf("%s: send\n", rt.name)
select { select {
case fs.input <- event: case rt.input <- event:
return true return true
default: default:
fmt.Println("scheduler channel was full") fmt.Printf("%s: channel was full\n", rt.name)
return false return false
} }
} }
func (sc *scheduler) handle(event Event) Events { func (rt *Routine) stop() {
fmt.Printf("%s: stop\n", rt.name)
close(rt.input)
<-rt.stopped
}
func schedulerHandle(event Event) Events {
switch event.(type) { switch event.(type) {
case timeCheck: case timeCheck:
fmt.Println("scheduler handle timeCheck") fmt.Println("scheduler handle timeCheck")
@ -99,56 +97,7 @@ func (sc *scheduler) handle(event Event) Events {
return Events{} return Events{}
} }
func (sc *scheduler) stop() { func processorHandle(event Event) Events {
fmt.Println("scheduler stop")
close(sc.input)
<-sc.stopped
}
// processor
type processor struct {
input chan Event
output chan Event
stopped chan struct{}
}
func newProcessor(output chan Event) *processor {
return &processor{
input: make(chan Event, 1),
output: output,
stopped: make(chan struct{}, 1),
}
}
func (pc *processor) run() {
fmt.Println("processor run")
for {
iEvent, ok := <-pc.input
if !ok {
fmt.Println("stopping processor")
pc.stopped <- struct{}{}
break
}
oEvents := pc.handle(iEvent)
for _, event := range oEvents {
pc.output <- event
}
}
}
func (pc *processor) send(event Event) bool {
fmt.Println("processor send")
select {
case pc.input <- event:
return true
default:
fmt.Println("processor channel was full")
return false
}
}
func (pc *processor) handle(event Event) Events {
switch event.(type) { switch event.(type) {
case timeCheck: case timeCheck:
fmt.Println("processor handle timeCheck") fmt.Println("processor handle timeCheck")
@ -158,80 +107,29 @@ func (pc *processor) handle(event Event) Events {
return Events{} return Events{}
} }
func (pc *processor) stop() { func genDemuxerHandle(scheduler *Routine, processor *Routine) handleFunc {
fmt.Println("processor stop") return func(event Event) Events {
close(pc.input) received := scheduler.send(event)
<-pc.stopped
}
// demuxer
type demuxer struct {
input chan Event
output chan Event
scheduler *scheduler
processor *processor
stopped chan struct{}
}
func newDemuxer(output chan Event, scheduler *scheduler, processor *processor) *demuxer {
return &demuxer{
input: make(chan Event, 1),
output: output,
stopped: make(chan struct{}, 1),
scheduler: scheduler,
processor: processor,
}
}
func (dm *demuxer) run() {
fmt.Println("Running demuxer")
for {
// so now we need a way to flush
event, ok := <-dm.input
if !ok {
fmt.Println("demuxer stopping")
dm.stopped <- struct{}{}
break
}
// event.time = time.Now()
received := dm.scheduler.send(event)
if !received { if !received {
panic("couldn't send to scheduler") panic("couldn't send to scheduler")
} }
received = dm.processor.send(event) received = processor.send(event)
if !received { if !received {
panic("couldn't send to the processor") panic("couldn't send to the processor")
} }
}
}
func (dm *demuxer) send(event Event) bool { // XXX: think about emitting backpressure if !received
fmt.Println("demuxer send") return Events{}
// we need to close if this is closed first
select {
case dm.input <- event:
return true
default:
fmt.Println("demuxer channel was full")
return false
} }
} }
func (dm *demuxer) stop() {
fmt.Println("demuxer stop")
close(dm.input)
<-dm.stopped
fmt.Println("demuxer stopped")
}
// reactor // reactor
type DummyReactor struct { type DummyReactor struct {
events chan Event events chan Event
demuxer *demuxer demuxer *Routine
scheduler *scheduler scheduler *Routine
processor *processor processor *Routine
ticker *time.Ticker ticker *time.Ticker
tickerStopped chan struct{} tickerStopped chan struct{}
} }
@ -240,9 +138,10 @@ func (dr *DummyReactor) Start() {
bufferSize := 10 bufferSize := 10
events := make(chan Event, bufferSize) events := make(chan Event, bufferSize)
dr.scheduler = newScheduler(events) dr.scheduler = newRoutine("scheduler", events, schedulerHandle)
dr.processor = newProcessor(events) dr.processor = newRoutine("processor", events, processorHandle)
dr.demuxer = newDemuxer(events, dr.scheduler, dr.processor) demuxerHandle := genDemuxerHandle(dr.scheduler, dr.processor)
dr.demuxer = newRoutine("demuxer", events, demuxerHandle)
dr.tickerStopped = make(chan struct{}) dr.tickerStopped = make(chan struct{})
go dr.scheduler.run() go dr.scheduler.run()
@ -267,7 +166,6 @@ func (dr *DummyReactor) Stop() {
fmt.Println("reactor stopping") fmt.Println("reactor stopping")
// this should be synchronous // this should be synchronous
dr.tickerStopped <- struct{}{} dr.tickerStopped <- struct{}{}
fmt.Println("waiting for ticker")
dr.demuxer.stop() dr.demuxer.stop()
dr.scheduler.stop() dr.scheduler.stop()
dr.processor.stop() dr.processor.stop()