mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-30 09:12:14 +00:00
This is just to match the style of the rest of the codebase, and to reduce the number of files in types.
239 lines
6.7 KiB
Go
239 lines
6.7 KiB
Go
package types
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
cmn "github.com/tendermint/tendermint/libs/common"
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
|
)
|
|
|
|
const defaultCapacity = 0
|
|
|
|
type EventBusSubscriber interface {
|
|
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
|
|
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
|
|
UnsubscribeAll(ctx context.Context, subscriber string) error
|
|
}
|
|
|
|
// EventBus is a common bus for all events going through the system. All calls
|
|
// are proxied to underlying pubsub server. All events must be published using
|
|
// EventBus to ensure correct data types.
|
|
type EventBus struct {
|
|
cmn.BaseService
|
|
pubsub *tmpubsub.Server
|
|
}
|
|
|
|
// NewEventBus returns a new event bus.
|
|
func NewEventBus() *EventBus {
|
|
return NewEventBusWithBufferCapacity(defaultCapacity)
|
|
}
|
|
|
|
// NewEventBusWithBufferCapacity returns a new event bus with the given buffer capacity.
|
|
func NewEventBusWithBufferCapacity(cap int) *EventBus {
|
|
// capacity could be exposed later if needed
|
|
pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(cap))
|
|
b := &EventBus{pubsub: pubsub}
|
|
b.BaseService = *cmn.NewBaseService(nil, "EventBus", b)
|
|
return b
|
|
}
|
|
|
|
func (b *EventBus) SetLogger(l log.Logger) {
|
|
b.BaseService.SetLogger(l)
|
|
b.pubsub.SetLogger(l.With("module", "pubsub"))
|
|
}
|
|
|
|
func (b *EventBus) OnStart() error {
|
|
return b.pubsub.Start()
|
|
}
|
|
|
|
func (b *EventBus) OnStop() {
|
|
b.pubsub.Stop()
|
|
}
|
|
|
|
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
|
|
return b.pubsub.Subscribe(ctx, subscriber, query, out)
|
|
}
|
|
|
|
func (b *EventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
|
|
return b.pubsub.Unsubscribe(ctx, subscriber, query)
|
|
}
|
|
|
|
func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error {
|
|
return b.pubsub.UnsubscribeAll(ctx, subscriber)
|
|
}
|
|
|
|
func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
|
|
// no explicit deadline for publishing events
|
|
ctx := context.Background()
|
|
b.pubsub.PublishWithTags(ctx, eventData, tmpubsub.NewTagMap(map[string]string{EventTypeKey: eventType}))
|
|
return nil
|
|
}
|
|
|
|
func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
|
|
return b.Publish(EventNewBlock, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
|
|
return b.Publish(EventNewBlockHeader, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventVote(data EventDataVote) error {
|
|
return b.Publish(EventVote, data)
|
|
}
|
|
|
|
// PublishEventTx publishes tx event with tags from Result. Note it will add
|
|
// predefined tags (EventTypeKey, TxHashKey). Existing tags with the same names
|
|
// will be overwritten.
|
|
func (b *EventBus) PublishEventTx(data EventDataTx) error {
|
|
// no explicit deadline for publishing events
|
|
ctx := context.Background()
|
|
|
|
tags := make(map[string]string)
|
|
|
|
// validate and fill tags from tx result
|
|
for _, tag := range data.Result.Tags {
|
|
// basic validation
|
|
if len(tag.Key) == 0 {
|
|
b.Logger.Info("Got tag with an empty key (skipping)", "tag", tag, "tx", data.Tx)
|
|
continue
|
|
}
|
|
tags[string(tag.Key)] = string(tag.Value)
|
|
}
|
|
|
|
// add predefined tags
|
|
logIfTagExists(EventTypeKey, tags, b.Logger)
|
|
tags[EventTypeKey] = EventTx
|
|
|
|
logIfTagExists(TxHashKey, tags, b.Logger)
|
|
tags[TxHashKey] = fmt.Sprintf("%X", data.Tx.Hash())
|
|
|
|
logIfTagExists(TxHeightKey, tags, b.Logger)
|
|
tags[TxHeightKey] = fmt.Sprintf("%d", data.Height)
|
|
|
|
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
|
|
return nil
|
|
}
|
|
|
|
func (b *EventBus) PublishEventProposalHeartbeat(data EventDataProposalHeartbeat) error {
|
|
return b.Publish(EventProposalHeartbeat, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventNewRoundStep(data EventDataRoundState) error {
|
|
return b.Publish(EventNewRoundStep, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventTimeoutPropose(data EventDataRoundState) error {
|
|
return b.Publish(EventTimeoutPropose, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
|
|
return b.Publish(EventTimeoutWait, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventNewRound(data EventDataRoundState) error {
|
|
return b.Publish(EventNewRound, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventCompleteProposal(data EventDataRoundState) error {
|
|
return b.Publish(EventCompleteProposal, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventPolka(data EventDataRoundState) error {
|
|
return b.Publish(EventPolka, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventUnlock(data EventDataRoundState) error {
|
|
return b.Publish(EventUnlock, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventRelock(data EventDataRoundState) error {
|
|
return b.Publish(EventRelock, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventLock(data EventDataRoundState) error {
|
|
return b.Publish(EventLock, data)
|
|
}
|
|
|
|
func (b *EventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error {
|
|
return b.Publish(EventValidatorSetUpdates, data)
|
|
}
|
|
|
|
func logIfTagExists(tag string, tags map[string]string, logger log.Logger) {
|
|
if value, ok := tags[tag]; ok {
|
|
logger.Error("Found predefined tag (value will be overwritten)", "tag", tag, "value", value)
|
|
}
|
|
}
|
|
|
|
//-----------------------------------------------------------------------------
|
|
type NopEventBus struct{}
|
|
|
|
func (NopEventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) UnsubscribeAll(ctx context.Context, subscriber string) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventNewBlock(data EventDataNewBlock) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventVote(data EventDataVote) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventTx(data EventDataTx) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventNewRoundStep(data EventDataRoundState) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventTimeoutPropose(data EventDataRoundState) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventNewRound(data EventDataRoundState) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventCompleteProposal(data EventDataRoundState) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventPolka(data EventDataRoundState) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventUnlock(data EventDataRoundState) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventRelock(data EventDataRoundState) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventLock(data EventDataRoundState) error {
|
|
return nil
|
|
}
|
|
|
|
func (NopEventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error {
|
|
return nil
|
|
}
|