align buffer sizes

This commit is contained in:
Sean Braithwaite 2019-09-14 13:01:19 -04:00
parent 822942a2e4
commit 99b7a33f90
4 changed files with 25 additions and 20 deletions

View File

@ -42,10 +42,10 @@ var bufferSize int = 10
func NewReactor() *Reactor { func NewReactor() *Reactor {
return &Reactor{ return &Reactor{
events: make(chan Event, bufferSize), events: make(chan Event, bufferSize*2),
stopDemux: make(chan struct{}), stopDemux: make(chan struct{}),
scheduler: newRoutine("scheduler", schedulerHandle), scheduler: newRoutine("scheduler", schedulerHandle, bufferSize),
processor: newRoutine("processor", processorHandle), processor: newRoutine("processor", processorHandle, bufferSize),
ticker: time.NewTicker(1 * time.Second), ticker: time.NewTicker(1 * time.Second),
logger: log.NewNopLogger(), logger: log.NewNopLogger(),
} }

View File

@ -2,14 +2,11 @@ package v2
import ( import (
"testing" "testing"
"github.com/tendermint/tendermint/libs/log"
) )
func TestReactor(t *testing.T) { func TestReactor(t *testing.T) {
reactor := NewReactor() reactor := NewReactor()
reactor.Start() reactor.Start()
reactor.setLogger(log.TestingLogger())
script := []Event{ script := []Event{
// TODO // TODO
} }

View File

@ -28,14 +28,12 @@ type Routine struct {
metrics *Metrics metrics *Metrics
} }
var queueSize int = 10 func newRoutine(name string, handleFunc handleFunc, bufferSize int) *Routine {
func newRoutine(name string, handleFunc handleFunc) *Routine {
return &Routine{ return &Routine{
name: name, name: name,
handle: handleFunc, handle: handleFunc,
queue: queue.NewPriorityQueue(queueSize, true), queue: queue.NewPriorityQueue(bufferSize, true),
out: make(chan Event, queueSize), out: make(chan Event, bufferSize),
rdy: make(chan struct{}, 1), rdy: make(chan struct{}, 1),
fin: make(chan error, 1), fin: make(chan error, 1),
running: new(uint32), running: new(uint32),

View File

@ -6,7 +6,6 @@ import (
"time" "time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/libs/log"
) )
type eventA struct { type eventA struct {
@ -24,7 +23,10 @@ func simpleHandler(event Event) (Event, error) {
} }
func TestRoutineFinal(t *testing.T) { func TestRoutineFinal(t *testing.T) {
routine := newRoutine("simpleRoutine", simpleHandler) var (
bufferSize = 10
routine = newRoutine("simpleRoutine", simpleHandler, bufferSize)
)
assert.False(t, routine.isRunning(), assert.False(t, routine.isRunning(),
"expected an initialized routine to not be running") "expected an initialized routine to not be running")
@ -44,7 +46,10 @@ func TestRoutineFinal(t *testing.T) {
} }
func TestRoutineStop(t *testing.T) { func TestRoutineStop(t *testing.T) {
routine := newRoutine("simpleRoutine", simpleHandler) var (
bufferSize = 10
routine = newRoutine("simpleRoutine", simpleHandler, bufferSize)
)
assert.False(t, routine.send(eventA{}), assert.False(t, routine.send(eventA{}),
"expected sending to an unstarted routine to fail") "expected sending to an unstarted routine to fail")
@ -91,10 +96,12 @@ func feedback(r *Routine) {
} }
func TestStatefulRoutine(t *testing.T) { func TestStatefulRoutine(t *testing.T) {
count := 10 var (
handler := genStatefulHandler(count) count = 10
routine := newRoutine("statefulRoutine", handler) handler = genStatefulHandler(count)
routine.setLogger(log.TestingLogger()) bufferSize = 20
routine = newRoutine("statefulRoutine", handler, bufferSize)
)
go routine.start() go routine.start()
go feedback(routine) go feedback(routine)
@ -131,8 +138,11 @@ func handleWithPriority(event Event) (Event, error) {
} }
func TestPriority(t *testing.T) { func TestPriority(t *testing.T) {
// XXX: align with buffer size var (
routine := newRoutine("priorityRoutine", handleWithPriority) bufferSize = 20
routine = newRoutine("priorityRoutine", handleWithPriority, bufferSize)
)
go routine.start() go routine.start()
<-routine.ready() <-routine.ready()
go func() { go func() {