tendermint/blockchain/v2/reactor.go

121 lines
2.4 KiB
Go
Raw Normal View History

package v2
import (
"fmt"
"time"
2019-08-08 16:56:39 +02:00
"github.com/tendermint/tendermint/libs/log"
)
2019-08-08 15:53:02 +02:00
type timeCheck struct {
priorityHigh
2019-08-13 17:57:17 +02:00
time time.Time
2019-08-08 15:53:02 +02:00
}
func schedulerHandle(event Event) (Event, error) {
switch event.(type) {
case timeCheck:
fmt.Println("scheduler handle timeCheck")
}
return noOp, nil
}
func processorHandle(event Event) (Event, error) {
switch event.(type) {
case timeCheck:
fmt.Println("processor handle timeCheck")
}
return noOp, nil
}
type Reactor struct {
events chan Event
stopDemux chan struct{}
2019-08-13 17:57:17 +02:00
scheduler *Routine
processor *Routine
ticker *time.Ticker
logger log.Logger
}
var bufferSize int = 10
func NewReactor() *Reactor {
return &Reactor{
events: make(chan Event, bufferSize),
stopDemux: make(chan struct{}),
scheduler: newRoutine("scheduler", schedulerHandle),
processor: newRoutine("processor", processorHandle),
ticker: time.NewTicker(1 * time.Second),
logger: log.NewNopLogger(),
}
}
2019-08-08 17:42:46 +02:00
// nolint:unused
2019-08-08 16:56:39 +02:00
func (r *Reactor) setLogger(logger log.Logger) {
r.logger = logger
2019-08-08 16:56:39 +02:00
r.scheduler.setLogger(logger)
r.processor.setLogger(logger)
}
func (r *Reactor) Start() {
go r.scheduler.start()
go r.processor.start()
go r.demux()
2019-08-08 17:42:46 +02:00
<-r.scheduler.ready()
<-r.processor.ready()
go func() {
2019-08-13 17:57:17 +02:00
for t := range r.ticker.C {
r.events <- timeCheck{time: t}
}
}()
}
2019-09-13 18:36:02 -04:00
// Would it be possible here to provide some kind of type safety for the types
// of events that each routine can produce and consume?
func (r *Reactor) demux() {
for {
select {
case event := <-r.events:
// XXX: check for backpressure
r.scheduler.trySend(event)
r.processor.trySend(event)
case _ = <-r.stopDemux:
r.logger.Info("demuxing stopped")
return
case event := <-r.scheduler.next():
2019-09-13 18:36:02 -04:00
r.processor.trySend(event)
case event := <-r.processor.next():
2019-09-13 18:36:02 -04:00
r.scheduler.trySend(event)
case err := <-r.scheduler.final():
r.logger.Info(fmt.Sprintf("scheduler final %s", err))
case err := <-r.processor.final():
r.logger.Info(fmt.Sprintf("processor final %s", err))
// XXX: switch to consensus
}
}
}
func (r *Reactor) Stop() {
r.logger.Info("reactor stopping")
2019-08-13 17:57:17 +02:00
r.ticker.Stop()
r.scheduler.stop()
r.processor.stop()
close(r.stopDemux)
close(r.events)
r.logger.Info("reactor stopped")
}
func (r *Reactor) Receive(event Event) {
// XXX: decode and serialize write events
r.events <- event
}
func (r *Reactor) AddPeer() {
// TODO: add peer event and send to demuxer
}