toy example

This commit is contained in:
Sean Braithwaite
2019-07-27 10:58:58 +02:00
parent 0786f4593f
commit fa60855cca
2 changed files with 115 additions and 80 deletions

View File

@@ -32,9 +32,6 @@ import (
How to handle concurrency
*/
type Event interface{}
type Events []Event
type testEvent struct {
msg string
time time.Time
@@ -49,18 +46,28 @@ type timeCheck struct {
time time.Time
}
type handler struct {
// scheduler
type scheduler struct {
input chan Event
output chan Event
}
// base handler
func (hd *handler) run() {
func newScheduler(output chan Event) *scheduler {
input := make(chan Event, 1)
return &scheduler{
input: input,
output: output,
}
}
func (hd *scheduler) run() {
fmt.Println("scheduler run")
for {
iEvent := <-hd.input
stop, ok := iEvent.(stopEvent)
_, ok := iEvent.(stopEvent)
if ok {
fmt.Println("stopping handler")
fmt.Println("stopping scheduler")
break
}
oEvents := hd.handle(iEvent)
@@ -70,45 +77,19 @@ func (hd *handler) run() {
}
}
func (hd *handler) handle(even Event) Events {
fmt.Println("handler stand in handle")
return Events{}
}
func (hd *handler) stop() {
// XXX: What if this is full?
hd.input <- struct{}{}
}
func (fs *handler) send(event Event) bool {
func (fs *scheduler) send(event Event) bool {
fmt.Println("scheduler send")
select {
case fs.input <- event:
return true
default:
fmt.Println("scheduler channel was full")
return false
}
}
// scheduler
type scheduler struct {
handler
}
func newScheduler(output chan Event) *scheduler {
input := make(chan Event)
handler := handler{
input: input,
output: output,
}
return &scheduler{
handler: handler,
}
}
func (sc *scheduler) handle(event Event) Events {
switch event := event.(type) {
switch event.(type) {
case timeCheck:
fmt.Println("scheduler handle timeCheck")
case testEvent:
@@ -117,82 +98,130 @@ func (sc *scheduler) handle(event Event) Events {
return Events{}
}
func (hd *scheduler) stop() {
fmt.Println("scheduler stop")
hd.input <- stopEvent{}
}
// processor
type processor struct {
handler
input chan Event
output chan Event
}
func newProcessor(output chan Event) *processor {
handler := handler{output: output}
input := make(chan Event, 1)
return &processor{
handler: handler,
input: input,
output: output,
}
}
func (hd *processor) run() {
fmt.Println("processor run")
for {
iEvent := <-hd.input
_, ok := iEvent.(stopEvent)
if ok {
fmt.Println("stopping processor")
break
}
oEvents := hd.handle(iEvent)
for _, event := range oEvents {
hd.output <- event
}
}
}
func (fs *processor) send(event Event) bool {
fmt.Println("processor send")
select {
case fs.input <- event:
return true
default:
fmt.Println("processor channel was full")
return false
}
}
func (sc *processor) handle(event Event) Events {
switch event := event.(type) {
switch event.(type) {
case timeCheck:
fmt.Println("processor handle evTimeCheck")
fmt.Println("processor handle timeCheck")
case testEvent:
fmt.Println("processor handle testEvent")
}
return Events{}
}
func (hd *processor) stop() {
fmt.Println("processor stop")
hd.input <- stopEvent{}
}
// demuxer
type demuxer struct {
handler
input chan Event
output chan Event
scheduler *scheduler
processor *processor
}
func newDemuxer(output chan Event, scheduler *scheduler, processor *processor) *demuxer {
input := make(chan Event)
handler := handler{
input: input,
output: output,
}
input := make(chan Event, 1)
return &demuxer{
handler: handler,
input: input,
output: output,
scheduler: scheduler,
processor: processor,
}
}
func (dm *demuxer) run() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
fmt.Println("Running demuxer")
for {
select {
case <-ticker.C:
now := timeCheck{time: time.Now()}
dm.input <- now
case event := <-dm.input:
// event.time = time.Now()
received := dm.scheduler.send(event)
if !received {
panic("couldn't send to scheduler")
}
event := <-dm.input
// event.time = time.Now()
received := dm.scheduler.send(event)
if !received {
panic("couldn't send to scheduler")
}
received = dm.processor.send(event)
if !received {
panic("couldn't send to the processor")
}
received = dm.processor.send(event)
if !received {
panic("couldn't send to the processor")
}
_, ok := event.(stopEvent)
if ok {
fmt.Println("demuxer stopping")
break
}
_, ok := event.(stopEvent)
if ok {
fmt.Println("demuxer stopping")
break
}
}
}
func (fs *demuxer) send(event Event) bool {
fmt.Println("demuxer send")
select {
case fs.input <- event:
return true
default:
fmt.Println("demuxer channel was full")
return false
}
}
func (hd *demuxer) stop() {
fmt.Println("demuxer stop")
hd.input <- stopEvent{}
}
// reactor
type DummyReactor struct {
events chan Event
demuxer *demuxer
ticker *time.Ticker
}
func (dr *DummyReactor) Start() {
@@ -202,17 +231,26 @@ func (dr *DummyReactor) Start() {
scheduler := newScheduler(events)
processor := newProcessor(events)
dr.demuxer = newDemuxer(events, scheduler, processor)
dr.ticker = time.NewTicker(1 * time.Second)
go scheduler.run()
go processor.run()
go dr.demuxer.run()
go func() {
for t := range dr.ticker.C {
dr.demuxer.send(timeCheck{t})
}
}()
}
func (dr *DummyReactor) Stop() {
_ = dr.demuxer.send(stopEvent{})
// this should be synchronous
dr.ticker.Stop()
dr.demuxer.stop()
}
func (dr *DummyReactor) Receive(event Event) {
fmt.Println("receive event")
sent := dr.demuxer.send(event)
if !sent {
panic("demuxer is full")

View File

@@ -1,11 +1,13 @@
package v2
import (
"fmt"
"testing"
"time"
)
func TestReactor(t *testing.T) {
reactor = DummyReactor{}
reactor := DummyReactor{}
reactor.Start()
script := Events{
testEvent{},
@@ -14,12 +16,7 @@ func TestReactor(t *testing.T) {
for _, event := range script {
reactor.Receive(event)
}
fmt.Println("sleeping")
time.Sleep(5 * time.Second)
reactor.Stop()
}
/*
* Can we send a message to all routines
* Can routines emit events
* can we start routines
* can we stop all routines
*/