From 99b7a33f90f23ba1ae2b3375a205af555fc7410e Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Sat, 14 Sep 2019 13:01:19 -0400 Subject: [PATCH] align buffer sizes --- blockchain/v2/reactor.go | 6 +++--- blockchain/v2/reactor_test.go | 3 --- blockchain/v2/routine.go | 8 +++----- blockchain/v2/routine_test.go | 28 +++++++++++++++++++--------- 4 files changed, 25 insertions(+), 20 deletions(-) diff --git a/blockchain/v2/reactor.go b/blockchain/v2/reactor.go index 26b2cb97..8cc8ac21 100644 --- a/blockchain/v2/reactor.go +++ b/blockchain/v2/reactor.go @@ -42,10 +42,10 @@ var bufferSize int = 10 func NewReactor() *Reactor { return &Reactor{ - events: make(chan Event, bufferSize), + events: make(chan Event, bufferSize*2), stopDemux: make(chan struct{}), - scheduler: newRoutine("scheduler", schedulerHandle), - processor: newRoutine("processor", processorHandle), + scheduler: newRoutine("scheduler", schedulerHandle, bufferSize), + processor: newRoutine("processor", processorHandle, bufferSize), ticker: time.NewTicker(1 * time.Second), logger: log.NewNopLogger(), } diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index e14e618d..86ac728a 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -2,14 +2,11 @@ package v2 import ( "testing" - - "github.com/tendermint/tendermint/libs/log" ) func TestReactor(t *testing.T) { reactor := NewReactor() reactor.Start() - reactor.setLogger(log.TestingLogger()) script := []Event{ // TODO } diff --git a/blockchain/v2/routine.go b/blockchain/v2/routine.go index 823485bf..cc7e7ea0 100644 --- a/blockchain/v2/routine.go +++ b/blockchain/v2/routine.go @@ -28,14 +28,12 @@ type Routine struct { metrics *Metrics } -var queueSize int = 10 - -func newRoutine(name string, handleFunc handleFunc) *Routine { +func newRoutine(name string, handleFunc handleFunc, bufferSize int) *Routine { return &Routine{ name: name, handle: handleFunc, - queue: queue.NewPriorityQueue(queueSize, true), - out: make(chan Event, queueSize), + queue: queue.NewPriorityQueue(bufferSize, true), + out: make(chan Event, bufferSize), rdy: make(chan struct{}, 1), fin: make(chan error, 1), running: new(uint32), diff --git a/blockchain/v2/routine_test.go b/blockchain/v2/routine_test.go index e32394ee..2bd5a1a3 100644 --- a/blockchain/v2/routine_test.go +++ b/blockchain/v2/routine_test.go @@ -6,7 +6,6 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/tendermint/tendermint/libs/log" ) type eventA struct { @@ -24,7 +23,10 @@ func simpleHandler(event Event) (Event, error) { } func TestRoutineFinal(t *testing.T) { - routine := newRoutine("simpleRoutine", simpleHandler) + var ( + bufferSize = 10 + routine = newRoutine("simpleRoutine", simpleHandler, bufferSize) + ) assert.False(t, routine.isRunning(), "expected an initialized routine to not be running") @@ -44,7 +46,10 @@ func TestRoutineFinal(t *testing.T) { } func TestRoutineStop(t *testing.T) { - routine := newRoutine("simpleRoutine", simpleHandler) + var ( + bufferSize = 10 + routine = newRoutine("simpleRoutine", simpleHandler, bufferSize) + ) assert.False(t, routine.send(eventA{}), "expected sending to an unstarted routine to fail") @@ -91,10 +96,12 @@ func feedback(r *Routine) { } func TestStatefulRoutine(t *testing.T) { - count := 10 - handler := genStatefulHandler(count) - routine := newRoutine("statefulRoutine", handler) - routine.setLogger(log.TestingLogger()) + var ( + count = 10 + handler = genStatefulHandler(count) + bufferSize = 20 + routine = newRoutine("statefulRoutine", handler, bufferSize) + ) go routine.start() go feedback(routine) @@ -131,8 +138,11 @@ func handleWithPriority(event Event) (Event, error) { } func TestPriority(t *testing.T) { - // XXX: align with buffer size - routine := newRoutine("priorityRoutine", handleWithPriority) + var ( + bufferSize = 20 + routine = newRoutine("priorityRoutine", handleWithPriority, bufferSize) + ) + go routine.start() <-routine.ready() go func() {