diff --git a/blockchain/v2/routines.go b/blockchain/v2/routines.go index 89d137f1..63b078bc 100644 --- a/blockchain/v2/routines.go +++ b/blockchain/v2/routines.go @@ -6,32 +6,15 @@ import ( ) /* -# Message passing between components - * output of one routine becomes input for all other routines - * avoid loops somehow - * Message priority - -# Components have isolated lifecycle management - * Lifecycle management - * Setup - * Teardown -# Individual - * message passing should be non blocking - * backpressure between components - * Lifecycle management of components - * Observable behavior: - * progress - * blocking components - What would look a test look like? - Lifecycle management - Start/Stop - - How to make this non blocking? - - How to avoid Thread saturation - How to handle concurrency +TODO: + * Look at refactoring routines + * single struct, paramterize the handle function + * introduce an sendError and seperate error channel + * How could this be tested? + * Ensure Start/Stopping + * Ensure all messages sent are processed + * Ensure that errors can be processed depsite outstanding messages */ - type testEvent struct { msg string time time.Time @@ -46,50 +29,65 @@ type timeCheck struct { time time.Time } -// scheduler +type handleFunc = func(event Event) Events -type scheduler struct { +// Routine +type Routine struct { + name string input chan Event output chan Event stopped chan struct{} + handle handleFunc } -func newScheduler(output chan Event) *scheduler { - return &scheduler{ +func newRoutine(name string, output chan Event, handleFunc handleFunc) *Routine { + return &Routine{ + name: name, input: make(chan Event, 1), output: output, stopped: make(chan struct{}, 1), + handle: handleFunc, } } -func (sc *scheduler) run() { - fmt.Println("scheduler run") +// XXX: what about error handling? +// we need an additional error channel here which will ensure +// errors get processed as soon as possible +// XXX: what about state here, we need per routine state +func (rt *Routine) run() { + fmt.Printf("%s: run\n", rt.name) for { - iEvent, ok := <-sc.input // stuckhere + iEvent, ok := <-rt.input if !ok { - fmt.Println("stopping scheduler") - sc.stopped <- struct{}{} + fmt.Printf("%s: stopping\n", rt.name) + rt.stopped <- struct{}{} break } - oEvents := sc.handle(iEvent) + oEvents := rt.handle(iEvent) for _, event := range oEvents { - sc.output <- event + rt.output <- event } } } -func (fs *scheduler) send(event Event) bool { - fmt.Println("scheduler send") +func (rt *Routine) send(event Event) bool { + fmt.Printf("%s: send\n", rt.name) select { - case fs.input <- event: + case rt.input <- event: return true default: - fmt.Println("scheduler channel was full") + fmt.Printf("%s: channel was full\n", rt.name) return false } } -func (sc *scheduler) handle(event Event) Events { +func (rt *Routine) stop() { + fmt.Printf("%s: stop\n", rt.name) + close(rt.input) + <-rt.stopped +} + +func schedulerHandle(event Event) Events { switch event.(type) { case timeCheck: fmt.Println("scheduler handle timeCheck") @@ -99,56 +97,7 @@ func (sc *scheduler) handle(event Event) Events { return Events{} } -func (sc *scheduler) stop() { - fmt.Println("scheduler stop") - close(sc.input) - <-sc.stopped -} - -// processor -type processor struct { - input chan Event - output chan Event - stopped chan struct{} -} - -func newProcessor(output chan Event) *processor { - return &processor{ - input: make(chan Event, 1), - output: output, - stopped: make(chan struct{}, 1), - } -} - -func (pc *processor) run() { - fmt.Println("processor run") - for { - iEvent, ok := <-pc.input - if !ok { - fmt.Println("stopping processor") - pc.stopped <- struct{}{} - break - } - - oEvents := pc.handle(iEvent) - for _, event := range oEvents { - pc.output <- event - } - } -} - -func (pc *processor) send(event Event) bool { - fmt.Println("processor send") - select { - case pc.input <- event: - return true - default: - fmt.Println("processor channel was full") - return false - } -} - -func (pc *processor) handle(event Event) Events { +func processorHandle(event Event) Events { switch event.(type) { case timeCheck: fmt.Println("processor handle timeCheck") @@ -158,80 +107,29 @@ func (pc *processor) handle(event Event) Events { return Events{} } -func (pc *processor) stop() { - fmt.Println("processor stop") - close(pc.input) - <-pc.stopped -} - -// demuxer -type demuxer struct { - input chan Event - output chan Event - scheduler *scheduler - processor *processor - stopped chan struct{} -} - -func newDemuxer(output chan Event, scheduler *scheduler, processor *processor) *demuxer { - return &demuxer{ - input: make(chan Event, 1), - output: output, - stopped: make(chan struct{}, 1), - scheduler: scheduler, - processor: processor, - } -} - -func (dm *demuxer) run() { - fmt.Println("Running demuxer") - for { - // so now we need a way to flush - event, ok := <-dm.input - if !ok { - fmt.Println("demuxer stopping") - dm.stopped <- struct{}{} - break - } - // event.time = time.Now() - received := dm.scheduler.send(event) +func genDemuxerHandle(scheduler *Routine, processor *Routine) handleFunc { + return func(event Event) Events { + received := scheduler.send(event) if !received { panic("couldn't send to scheduler") } - received = dm.processor.send(event) + received = processor.send(event) if !received { panic("couldn't send to the processor") } - } -} -func (dm *demuxer) send(event Event) bool { - fmt.Println("demuxer send") - // we need to close if this is closed first - select { - case dm.input <- event: - return true - default: - fmt.Println("demuxer channel was full") - return false + // XXX: think about emitting backpressure if !received + return Events{} } } -func (dm *demuxer) stop() { - fmt.Println("demuxer stop") - close(dm.input) - <-dm.stopped - fmt.Println("demuxer stopped") -} - // reactor - type DummyReactor struct { events chan Event - demuxer *demuxer - scheduler *scheduler - processor *processor + demuxer *Routine + scheduler *Routine + processor *Routine ticker *time.Ticker tickerStopped chan struct{} } @@ -240,9 +138,10 @@ func (dr *DummyReactor) Start() { bufferSize := 10 events := make(chan Event, bufferSize) - dr.scheduler = newScheduler(events) - dr.processor = newProcessor(events) - dr.demuxer = newDemuxer(events, dr.scheduler, dr.processor) + dr.scheduler = newRoutine("scheduler", events, schedulerHandle) + dr.processor = newRoutine("processor", events, processorHandle) + demuxerHandle := genDemuxerHandle(dr.scheduler, dr.processor) + dr.demuxer = newRoutine("demuxer", events, demuxerHandle) dr.tickerStopped = make(chan struct{}) go dr.scheduler.run() @@ -267,7 +166,6 @@ func (dr *DummyReactor) Stop() { fmt.Println("reactor stopping") // this should be synchronous dr.tickerStopped <- struct{}{} - fmt.Println("waiting for ticker") dr.demuxer.stop() dr.scheduler.stop() dr.processor.stop()