This commit is contained in:
Sean Braithwaite
2019-09-17 15:18:15 -04:00
parent 99b7a33f90
commit d3d034e572
2 changed files with 10 additions and 7 deletions

View File

@ -38,11 +38,9 @@ type Reactor struct {
logger log.Logger logger log.Logger
} }
var bufferSize int = 10 func NewReactor(bufferSize int) *Reactor {
func NewReactor() *Reactor {
return &Reactor{ return &Reactor{
events: make(chan Event, bufferSize*2), events: make(chan Event, bufferSize),
stopDemux: make(chan struct{}), stopDemux: make(chan struct{}),
scheduler: newRoutine("scheduler", schedulerHandle, bufferSize), scheduler: newRoutine("scheduler", schedulerHandle, bufferSize),
processor: newRoutine("processor", processorHandle, bufferSize), processor: newRoutine("processor", processorHandle, bufferSize),
@ -73,7 +71,7 @@ func (r *Reactor) Start() {
}() }()
} }
// Would it be possible here to provide some kind of type safety for the types // XXX: Would it be possible here to provide some kind of type safety for the types
// of events that each routine can produce and consume? // of events that each routine can produce and consume?
func (r *Reactor) demux() { func (r *Reactor) demux() {
for { for {
@ -82,7 +80,7 @@ func (r *Reactor) demux() {
// XXX: check for backpressure // XXX: check for backpressure
r.scheduler.send(event) r.scheduler.send(event)
r.processor.send(event) r.processor.send(event)
case _ = <-r.stopDemux: case <-r.stopDemux:
r.logger.Info("demuxing stopped") r.logger.Info("demuxing stopped")
return return
case event := <-r.scheduler.next(): case event := <-r.scheduler.next():
@ -112,6 +110,7 @@ func (r *Reactor) Stop() {
func (r *Reactor) Receive(event Event) { func (r *Reactor) Receive(event Event) {
// XXX: decode and serialize write events // XXX: decode and serialize write events
// TODO: backpressure
r.events <- event r.events <- event
} }

View File

@ -5,7 +5,11 @@ import (
) )
func TestReactor(t *testing.T) { func TestReactor(t *testing.T) {
reactor := NewReactor() var (
bufferSize = 10
reactor = NewReactor(bufferSize)
)
reactor.Start() reactor.Start()
script := []Event{ script := []Event{
// TODO // TODO