tendermint/libs/pubsub/pubsub_test.go
Alexander Bezobchuk ab0835463f abci: Refactor tagging events using list of lists (#3643)
## PR

This PR introduces a fundamental breaking change to the structure of ABCI response and tx tags and the way they're processed. Namely, the SDK can support more complex and aggregated events for distribution and slashing. In addition, block responses can include duplicate keys in events.

    Implement new Event type. An event has a type and a list of KV pairs (ie. list-of-lists). Typical events may look like:

"rewards": [{"amount": "5000uatom", "validator": "...", "recipient": "..."}]
"sender": [{"address": "...", "balance": "100uatom"}]

The events are indexed by {even.type}.{even.attribute[i].key}/.... In this case a client would subscribe or query for rewards.recipient='...'

    ABCI response types and related types now include Events []Event instead of Tags []cmn.KVPair.
    PubSub logic now publishes/matches against map[string][]string instead of map[string]string to support duplicate keys in response events (from #1385). A match is successful if the value is found in the slice of strings.

closes: #1859
closes: #2905

## Commits:

* Implement Event ABCI type and updates responses to use events

* Update messages_test.go

* Update kvstore.go

* Update event_bus.go

* Update subscription.go

* Update pubsub.go

* Update kvstore.go

* Update query logic to handle slice of strings in events

* Update Empty#Matches and unit tests

* Update pubsub logic

* Update EventBus#Publish

* Update kv tx indexer

* Update godocs

* Update ResultEvent to use slice of strings; update RPC

* Update more tests

* Update abci.md

* Check for key in validateAndStringifyEvents

* Fix KV indexer to skip empty keys

* Fix linting errors

* Update CHANGELOG_PENDING.md

* Update docs/spec/abci/abci.md

Co-Authored-By: Federico Kunze <31522760+fedekunze@users.noreply.github.com>

* Update abci/types/types.proto

Co-Authored-By: Ethan Buchman <ethan@coinculture.info>

* Update docs/spec/abci/abci.md

Co-Authored-By: Ethan Buchman <ethan@coinculture.info>

* Update libs/pubsub/query/query.go

Co-Authored-By: Ethan Buchman <ethan@coinculture.info>

* Update match function to match if ANY value matches

* Implement TestSubscribeDuplicateKeys

* Update TestMatches to include multi-key test cases

* Update events.go

* Update Query interface godoc

* Update match godoc

* Add godoc for matchValue

* DRY-up tx indexing

* Return error from PublishWithEvents in EventBus#Publish

* Update PublishEventNewBlockHeader to return an error

* Fix build

* Update events doc in ABCI

* Update ABCI events godoc

* Implement TestEventBusPublishEventTxDuplicateKeys

* Update TestSubscribeDuplicateKeys to be table-driven

* Remove mod file

* Remove markdown from events godoc

* Implement TestTxSearchDeprecatedIndexing test
2019-06-12 14:03:45 +02:00

420 lines
11 KiB
Go

package pubsub_test
import (
"context"
"fmt"
"runtime/debug"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/libs/pubsub/query"
)
const (
clientID = "test-client"
)
func TestSubscribe(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
assert.Equal(t, 1, s.NumClients())
assert.Equal(t, 1, s.NumClientSubscriptions(clientID))
err = s.Publish(ctx, "Ka-Zar")
require.NoError(t, err)
assertReceive(t, "Ka-Zar", subscription.Out())
published := make(chan struct{})
go func() {
defer close(published)
err := s.Publish(ctx, "Quicksilver")
assert.NoError(t, err)
err = s.Publish(ctx, "Asylum")
assert.NoError(t, err)
err = s.Publish(ctx, "Ivan")
assert.NoError(t, err)
}()
select {
case <-published:
assertReceive(t, "Quicksilver", subscription.Out())
assertCancelled(t, subscription, pubsub.ErrOutOfCapacity)
case <-time.After(3 * time.Second):
t.Fatal("Expected Publish(Asylum) not to block")
}
}
func TestSubscribeWithCapacity(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
assert.Panics(t, func() {
s.Subscribe(ctx, clientID, query.Empty{}, -1)
})
assert.Panics(t, func() {
s.Subscribe(ctx, clientID, query.Empty{}, 0)
})
subscription, err := s.Subscribe(ctx, clientID, query.Empty{}, 1)
require.NoError(t, err)
err = s.Publish(ctx, "Aggamon")
require.NoError(t, err)
assertReceive(t, "Aggamon", subscription.Out())
}
func TestSubscribeUnbuffered(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
subscription, err := s.SubscribeUnbuffered(ctx, clientID, query.Empty{})
require.NoError(t, err)
published := make(chan struct{})
go func() {
defer close(published)
err := s.Publish(ctx, "Ultron")
assert.NoError(t, err)
err = s.Publish(ctx, "Darkhawk")
assert.NoError(t, err)
}()
select {
case <-published:
t.Fatal("Expected Publish(Darkhawk) to block")
case <-time.After(3 * time.Second):
assertReceive(t, "Ultron", subscription.Out())
assertReceive(t, "Darkhawk", subscription.Out())
}
}
func TestSlowClientIsRemovedWithErrOutOfCapacity(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
err = s.Publish(ctx, "Fat Cobra")
require.NoError(t, err)
err = s.Publish(ctx, "Viper")
require.NoError(t, err)
assertCancelled(t, subscription, pubsub.ErrOutOfCapacity)
}
func TestDifferentClients(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.PublishWithEvents(ctx, "Iceman", map[string][]string{"tm.events.type": {"NewBlock"}})
require.NoError(t, err)
assertReceive(t, "Iceman", subscription1.Out())
subscription2, err := s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"))
require.NoError(t, err)
err = s.PublishWithEvents(ctx, "Ultimo", map[string][]string{"tm.events.type": {"NewBlock"}, "abci.account.name": {"Igor"}})
require.NoError(t, err)
assertReceive(t, "Ultimo", subscription1.Out())
assertReceive(t, "Ultimo", subscription2.Out())
subscription3, err := s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"))
require.NoError(t, err)
err = s.PublishWithEvents(ctx, "Valeria Richards", map[string][]string{"tm.events.type": {"NewRoundStep"}})
require.NoError(t, err)
assert.Zero(t, len(subscription3.Out()))
}
func TestSubscribeDuplicateKeys(t *testing.T) {
ctx := context.Background()
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
require.NoError(t, s.Start())
defer s.Stop()
testCases := []struct {
query string
expected interface{}
}{
{
"withdraw.rewards='17'",
"Iceman",
},
{
"withdraw.rewards='22'",
"Iceman",
},
{
"withdraw.rewards='1' AND withdraw.rewards='22'",
"Iceman",
},
{
"withdraw.rewards='100'",
nil,
},
}
for i, tc := range testCases {
sub, err := s.Subscribe(ctx, fmt.Sprintf("client-%d", i), query.MustParse(tc.query))
require.NoError(t, err)
err = s.PublishWithEvents(
ctx,
"Iceman",
map[string][]string{
"transfer.sender": {"foo", "bar", "baz"},
"withdraw.rewards": {"1", "17", "22"},
},
)
require.NoError(t, err)
if tc.expected != nil {
assertReceive(t, tc.expected, sub.Out())
} else {
require.Zero(t, len(sub.Out()))
}
}
}
func TestClientSubscribesTwice(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
q := query.MustParse("tm.events.type='NewBlock'")
subscription1, err := s.Subscribe(ctx, clientID, q)
require.NoError(t, err)
err = s.PublishWithEvents(ctx, "Goblin Queen", map[string][]string{"tm.events.type": {"NewBlock"}})
require.NoError(t, err)
assertReceive(t, "Goblin Queen", subscription1.Out())
subscription2, err := s.Subscribe(ctx, clientID, q)
require.Error(t, err)
require.Nil(t, subscription2)
err = s.PublishWithEvents(ctx, "Spider-Man", map[string][]string{"tm.events.type": {"NewBlock"}})
require.NoError(t, err)
assertReceive(t, "Spider-Man", subscription1.Out())
}
func TestUnsubscribe(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
subscription, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err)
assert.Zero(t, len(subscription.Out()), "Should not receive anything after Unsubscribe")
assertCancelled(t, subscription, pubsub.ErrUnsubscribed)
}
func TestClientUnsubscribesTwice(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
_, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
assert.Equal(t, pubsub.ErrSubscriptionNotFound, err)
err = s.UnsubscribeAll(ctx, clientID)
assert.Equal(t, pubsub.ErrSubscriptionNotFound, err)
}
func TestResubscribe(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
subscription, err = s.Subscribe(ctx, clientID, query.Empty{})
require.NoError(t, err)
err = s.Publish(ctx, "Cable")
require.NoError(t, err)
assertReceive(t, "Cable", subscription.Out())
}
func TestUnsubscribeAll(t *testing.T) {
s := pubsub.NewServer()
s.SetLogger(log.TestingLogger())
s.Start()
defer s.Stop()
ctx := context.Background()
subscription1, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err)
subscription2, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"))
require.NoError(t, err)
err = s.UnsubscribeAll(ctx, clientID)
require.NoError(t, err)
err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err)
assert.Zero(t, len(subscription1.Out()), "Should not receive anything after UnsubscribeAll")
assert.Zero(t, len(subscription2.Out()), "Should not receive anything after UnsubscribeAll")
assertCancelled(t, subscription1, pubsub.ErrUnsubscribed)
assertCancelled(t, subscription2, pubsub.ErrUnsubscribed)
}
func TestBufferCapacity(t *testing.T) {
s := pubsub.NewServer(pubsub.BufferCapacity(2))
s.SetLogger(log.TestingLogger())
assert.Equal(t, 2, s.BufferCapacity())
ctx := context.Background()
err := s.Publish(ctx, "Nighthawk")
require.NoError(t, err)
err = s.Publish(ctx, "Sage")
require.NoError(t, err)
ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
defer cancel()
err = s.Publish(ctx, "Ironclad")
if assert.Error(t, err) {
assert.Equal(t, context.DeadlineExceeded, err)
}
}
func Benchmark10Clients(b *testing.B) { benchmarkNClients(10, b) }
func Benchmark100Clients(b *testing.B) { benchmarkNClients(100, b) }
func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) }
func Benchmark10ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(10, b) }
func Benchmark100ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(100, b) }
func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) }
func benchmarkNClients(n int, b *testing.B) {
s := pubsub.NewServer()
s.Start()
defer s.Stop()
ctx := context.Background()
for i := 0; i < n; i++ {
subscription, err := s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)))
if err != nil {
b.Fatal(err)
}
go func() {
for {
select {
case <-subscription.Out():
continue
case <-subscription.Cancelled():
return
}
}
}()
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.PublishWithEvents(ctx, "Gamora", map[string][]string{"abci.Account.Owner": {"Ivan"}, "abci.Invoices.Number": {string(i)}})
}
}
func benchmarkNClientsOneQuery(n int, b *testing.B) {
s := pubsub.NewServer()
s.Start()
defer s.Stop()
ctx := context.Background()
q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1")
for i := 0; i < n; i++ {
subscription, err := s.Subscribe(ctx, clientID, q)
if err != nil {
b.Fatal(err)
}
go func() {
for {
select {
case <-subscription.Out():
continue
case <-subscription.Cancelled():
return
}
}
}()
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.PublishWithEvents(ctx, "Gamora", map[string][]string{"abci.Account.Owner": {"Ivan"}, "abci.Invoices.Number": {"1"}})
}
}
///////////////////////////////////////////////////////////////////////////////
/// HELPERS
///////////////////////////////////////////////////////////////////////////////
func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.Message, msgAndArgs ...interface{}) {
select {
case actual := <-ch:
assert.Equal(t, expected, actual.Data(), msgAndArgs...)
case <-time.After(1 * time.Second):
t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected)
debug.PrintStack()
}
}
func assertCancelled(t *testing.T, subscription *pubsub.Subscription, err error) {
_, ok := <-subscription.Cancelled()
assert.False(t, ok)
assert.Equal(t, err, subscription.Err())
}