tendermint/types/event_bus.go
Anton Kaliaev d741c7b478
limit number of /subscribe clients and queries per client (#3269)
* limit number of /subscribe clients and queries per client

Add the following config variables (under [rpc] section):
  * max_subscription_clients
  * max_subscriptions_per_client
  * timeout_broadcast_tx_commit

Fixes #2826

new HTTPClient interface for subscriptions

finalize HTTPClient events interface

remove EventSubscriber

fix data race

```
WARNING: DATA RACE
Read at 0x00c000a36060 by goroutine 129:
  github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1()
      /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0

Previous write at 0x00c000a36060 by goroutine 132:
  github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe()
      /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0
  github.com/tendermint/tendermint/rpc/client.WaitForOneEvent()
      /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178
  github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1()
      /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162

Goroutine 129 (running) created at:
  github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe()
      /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7
  github.com/tendermint/tendermint/rpc/client.WaitForOneEvent()
      /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178
  github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1()
      /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162

Goroutine 132 (running) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:878 +0x659
  github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync()
      /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:827 +0x162
==================
```

lite client works (tested manually)

godoc comments

httpclient: do not close the out channel

use TimeoutBroadcastTxCommit

no timeout for unsubscribe

but 1s Local (5s HTTP) timeout for resubscribe

format code

change Subscribe#out cap to 1

and replace config vars with RPCConfig

TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout

rpc: Context as first parameter to all functions

reformat code

fixes after my own review

fixes after Ethan's review

add test stubs

fix config.toml

* fixes after manual testing

- rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes
Tendermint resources (pubsub)
- rpc: better error in Subscribe and BroadcastTxCommit
- HTTPClient: do not resubscribe if err = ErrAlreadySubscribed

* fixes after Ismail's review

* Update rpc/grpc/grpc_test.go

Co-Authored-By: melekes <anton.kalyaev@gmail.com>
2019-03-11 22:45:58 +04:00

288 lines
8.3 KiB
Go

package types
import (
"context"
"fmt"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
)
const defaultCapacity = 0
type EventBusSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error)
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
NumClients() int
NumClientSubscriptions(clientID string) int
}
type Subscription interface {
Out() <-chan tmpubsub.Message
Cancelled() <-chan struct{}
Err() error
}
// EventBus is a common bus for all events going through the system. All calls
// are proxied to underlying pubsub server. All events must be published using
// EventBus to ensure correct data types.
type EventBus struct {
cmn.BaseService
pubsub *tmpubsub.Server
}
// NewEventBus returns a new event bus.
func NewEventBus() *EventBus {
return NewEventBusWithBufferCapacity(defaultCapacity)
}
// NewEventBusWithBufferCapacity returns a new event bus with the given buffer capacity.
func NewEventBusWithBufferCapacity(cap int) *EventBus {
// capacity could be exposed later if needed
pubsub := tmpubsub.NewServer(tmpubsub.BufferCapacity(cap))
b := &EventBus{pubsub: pubsub}
b.BaseService = *cmn.NewBaseService(nil, "EventBus", b)
return b
}
func (b *EventBus) SetLogger(l log.Logger) {
b.BaseService.SetLogger(l)
b.pubsub.SetLogger(l.With("module", "pubsub"))
}
func (b *EventBus) OnStart() error {
return b.pubsub.Start()
}
func (b *EventBus) OnStop() {
b.pubsub.Stop()
}
func (b *EventBus) NumClients() int {
return b.pubsub.NumClients()
}
func (b *EventBus) NumClientSubscriptions(clientID string) int {
return b.pubsub.NumClientSubscriptions(clientID)
}
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) {
return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...)
}
// This method can be used for a local consensus explorer and synchronous
// testing. Do not use for for public facing / untrusted subscriptions!
func (b *EventBus) SubscribeUnbuffered(ctx context.Context, subscriber string, query tmpubsub.Query) (Subscription, error) {
return b.pubsub.SubscribeUnbuffered(ctx, subscriber, query)
}
func (b *EventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
return b.pubsub.Unsubscribe(ctx, subscriber, query)
}
func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error {
return b.pubsub.UnsubscribeAll(ctx, subscriber)
}
func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
// no explicit deadline for publishing events
ctx := context.Background()
b.pubsub.PublishWithTags(ctx, eventData, map[string]string{EventTypeKey: eventType})
return nil
}
func (b *EventBus) validateAndStringifyTags(tags []cmn.KVPair, logger log.Logger) map[string]string {
result := make(map[string]string)
for _, tag := range tags {
// basic validation
if len(tag.Key) == 0 {
logger.Debug("Got tag with an empty key (skipping)", "tag", tag)
continue
}
result[string(tag.Key)] = string(tag.Value)
}
return result
}
func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
// no explicit deadline for publishing events
ctx := context.Background()
resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("block", data.Block.StringShort()))
// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlock
b.pubsub.PublishWithTags(ctx, data, tags)
return nil
}
func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
// no explicit deadline for publishing events
ctx := context.Background()
resultTags := append(data.ResultBeginBlock.Tags, data.ResultEndBlock.Tags...)
// TODO: Create StringShort method for Header and use it in logger.
tags := b.validateAndStringifyTags(resultTags, b.Logger.With("header", data.Header))
// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventNewBlockHeader
b.pubsub.PublishWithTags(ctx, data, tags)
return nil
}
func (b *EventBus) PublishEventVote(data EventDataVote) error {
return b.Publish(EventVote, data)
}
func (b *EventBus) PublishEventValidBlock(data EventDataRoundState) error {
return b.Publish(EventValidBlock, data)
}
// PublishEventTx publishes tx event with tags from Result. Note it will add
// predefined tags (EventTypeKey, TxHashKey). Existing tags with the same names
// will be overwritten.
func (b *EventBus) PublishEventTx(data EventDataTx) error {
// no explicit deadline for publishing events
ctx := context.Background()
tags := b.validateAndStringifyTags(data.Result.Tags, b.Logger.With("tx", data.Tx))
// add predefined tags
logIfTagExists(EventTypeKey, tags, b.Logger)
tags[EventTypeKey] = EventTx
logIfTagExists(TxHashKey, tags, b.Logger)
tags[TxHashKey] = fmt.Sprintf("%X", data.Tx.Hash())
logIfTagExists(TxHeightKey, tags, b.Logger)
tags[TxHeightKey] = fmt.Sprintf("%d", data.Height)
b.pubsub.PublishWithTags(ctx, data, tags)
return nil
}
func (b *EventBus) PublishEventNewRoundStep(data EventDataRoundState) error {
return b.Publish(EventNewRoundStep, data)
}
func (b *EventBus) PublishEventTimeoutPropose(data EventDataRoundState) error {
return b.Publish(EventTimeoutPropose, data)
}
func (b *EventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
return b.Publish(EventTimeoutWait, data)
}
func (b *EventBus) PublishEventNewRound(data EventDataNewRound) error {
return b.Publish(EventNewRound, data)
}
func (b *EventBus) PublishEventCompleteProposal(data EventDataCompleteProposal) error {
return b.Publish(EventCompleteProposal, data)
}
func (b *EventBus) PublishEventPolka(data EventDataRoundState) error {
return b.Publish(EventPolka, data)
}
func (b *EventBus) PublishEventUnlock(data EventDataRoundState) error {
return b.Publish(EventUnlock, data)
}
func (b *EventBus) PublishEventRelock(data EventDataRoundState) error {
return b.Publish(EventRelock, data)
}
func (b *EventBus) PublishEventLock(data EventDataRoundState) error {
return b.Publish(EventLock, data)
}
func (b *EventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error {
return b.Publish(EventValidatorSetUpdates, data)
}
func logIfTagExists(tag string, tags map[string]string, logger log.Logger) {
if value, ok := tags[tag]; ok {
logger.Error("Found predefined tag (value will be overwritten)", "tag", tag, "value", value)
}
}
//-----------------------------------------------------------------------------
type NopEventBus struct{}
func (NopEventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
return nil
}
func (NopEventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
return nil
}
func (NopEventBus) UnsubscribeAll(ctx context.Context, subscriber string) error {
return nil
}
func (NopEventBus) PublishEventNewBlock(data EventDataNewBlock) error {
return nil
}
func (NopEventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) error {
return nil
}
func (NopEventBus) PublishEventVote(data EventDataVote) error {
return nil
}
func (NopEventBus) PublishEventTx(data EventDataTx) error {
return nil
}
func (NopEventBus) PublishEventNewRoundStep(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventTimeoutPropose(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventTimeoutWait(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventNewRound(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventCompleteProposal(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventPolka(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventUnlock(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventRelock(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventLock(data EventDataRoundState) error {
return nil
}
func (NopEventBus) PublishEventValidatorSetUpdates(data EventDataValidatorSetUpdates) error {
return nil
}