green pubsub tests :OK:

This commit is contained in:
Anton Kaliaev 2019-01-29 18:46:59 +04:00
parent 11e36d0bfb
commit 7268ec8d10
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
5 changed files with 241 additions and 166 deletions

View File

@ -19,10 +19,9 @@ func TestExample(t *testing.T) {
defer s.Stop() defer s.Stop()
ctx := context.Background() ctx := context.Background()
ch := make(chan interface{}, 1) subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), 1)
err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]string{"abci.account.name": "John"})) err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]string{"abci.account.name": "John"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Tombstone", ch) assertReceive(t, "Tombstone", subscription.Out())
} }

View File

@ -10,42 +10,26 @@
// match, this message will be pushed to all clients, subscribed to that query. // match, this message will be pushed to all clients, subscribed to that query.
// See query subpackage for our implementation. // See query subpackage for our implementation.
// //
// Due to the blocking send implementation, a single subscriber can freeze an // Example:
// entire server by not reading messages before it unsubscribes. To avoid such
// scenario, subscribers must either:
// //
// a) make sure they continue to read from the out channel until // q, err := query.New("account.name='John'")
// Unsubscribe(All) is called
//
// s.Subscribe(ctx, sub, qry, out)
// go func() {
// for msg := range out {
// // handle msg
// // will exit automatically when out is closed by Unsubscribe(All)
// }
// }()
// s.UnsubscribeAll(ctx, sub)
//
// b) drain the out channel before calling Unsubscribe(All)
//
// s.Subscribe(ctx, sub, qry, out)
// defer func() {
// // drain out to make sure we don't block
// LOOP:
// for {
// select {
// case <-out:
// default:
// break LOOP
// }
// }
// s.UnsubscribeAll(ctx, sub)
// }()
// for msg := range out {
// // handle msg
// if err != nil { // if err != nil {
// return err // return err
// } // }
// ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
// defer cancel()
// subscription, err := pubsub.Subscribe(ctx, "johns-transactions", q, 1)
// if err != nil {
// return err
// }
//
// for {
// select {
// case msgAndTags <- subscription.Out():
// // handle msg and tags
// case <-subscription.Cancelled():
// return subscription.Err()
// }
// } // }
// //
package pubsub package pubsub
@ -77,21 +61,25 @@ var (
ErrAlreadySubscribed = errors.New("already subscribed") ErrAlreadySubscribed = errors.New("already subscribed")
) )
type cmd struct {
op operation
query Query
ch chan<- interface{}
clientID string
msg interface{}
tags TagMap
}
// Query defines an interface for a query to be used for subscribing. // Query defines an interface for a query to be used for subscribing.
type Query interface { type Query interface {
Matches(tags TagMap) bool Matches(tags TagMap) bool
String() string String() string
} }
type cmd struct {
op operation
// subscribe, unsubscribe
query Query
subscription *Subscription
clientID string
// publish
msg interface{}
tags TagMap
}
// Server allows clients to subscribe/unsubscribe for messages, publishing // Server allows clients to subscribe/unsubscribe for messages, publishing
// messages with or without tags, and manages internal state. // messages with or without tags, and manages internal state.
type Server struct { type Server struct {
@ -107,37 +95,6 @@ type Server struct {
// Option sets a parameter for the server. // Option sets a parameter for the server.
type Option func(*Server) type Option func(*Server)
// TagMap is used to associate tags to a message.
// They can be queried by subscribers to choose messages they will received.
type TagMap interface {
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
Get(key string) (value string, ok bool)
// Len returns the number of tags.
Len() int
}
type tagMap map[string]string
var _ TagMap = (*tagMap)(nil)
// NewTagMap constructs a new immutable tag set from a map.
func NewTagMap(data map[string]string) TagMap {
return tagMap(data)
}
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
func (ts tagMap) Get(key string) (value string, ok bool) {
value, ok = ts[key]
return
}
// Len returns the number of tags.
func (ts tagMap) Len() int {
return len(ts)
}
// NewServer returns a new server. See the commentary on the Option functions // NewServer returns a new server. See the commentary on the Option functions
// for a detailed description of how to configure buffering. If no options are // for a detailed description of how to configure buffering. If no options are
// provided, the resulting server's queue is unbuffered. // provided, the resulting server's queue is unbuffered.
@ -174,11 +131,11 @@ func (s *Server) BufferCapacity() int {
return s.cmdsCap return s.cmdsCap
} }
// Subscribe creates a subscription for the given client. It accepts a channel // Subscribe creates a subscription for the given client. An error will be
// on which messages matching the given query can be received. An error will be
// returned to the caller if the context is canceled or if subscription already // returned to the caller if the context is canceled or if subscription already
// exist for pair clientID and query. // exist for pair clientID and query. outCapacity will be used to set a
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error { // capacity for Subscription#Out channel.
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, outCapacity int) (*Subscription, error) {
s.mtx.RLock() s.mtx.RLock()
clientSubscriptions, ok := s.subscriptions[clientID] clientSubscriptions, ok := s.subscriptions[clientID]
if ok { if ok {
@ -186,22 +143,26 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou
} }
s.mtx.RUnlock() s.mtx.RUnlock()
if ok { if ok {
return ErrAlreadySubscribed return nil, ErrAlreadySubscribed
} }
subscription := &Subscription{
out: make(chan MsgAndTags, outCapacity),
cancelled: make(chan struct{}),
}
select { select {
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}: case s.cmds <- cmd{op: sub, clientID: clientID, query: query, subscription: subscription}:
s.mtx.Lock() s.mtx.Lock()
if _, ok = s.subscriptions[clientID]; !ok { if _, ok = s.subscriptions[clientID]; !ok {
s.subscriptions[clientID] = make(map[string]struct{}) s.subscriptions[clientID] = make(map[string]struct{})
} }
s.subscriptions[clientID][query.String()] = struct{}{} s.subscriptions[clientID][query.String()] = struct{}{}
s.mtx.Unlock() s.mtx.Unlock()
return nil return subscription, nil
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return nil, ctx.Err()
case <-s.Quit(): case <-s.Quit():
return nil return nil, nil
} }
} }
@ -285,8 +246,8 @@ func (s *Server) OnStop() {
// NOTE: not goroutine safe // NOTE: not goroutine safe
type state struct { type state struct {
// query string -> client -> ch // query string -> client -> subscription
queryToChanMap map[string]map[string]chan<- interface{} queryToSubscriptionMap map[string]map[string]*Subscription
// client -> query string -> struct{} // client -> query string -> struct{}
clientToQueryMap map[string]map[string]struct{} clientToQueryMap map[string]map[string]struct{}
// query string -> queryPlusRefCount // query string -> queryPlusRefCount
@ -303,7 +264,7 @@ type queryPlusRefCount struct {
// OnStart implements Service.OnStart by starting the server. // OnStart implements Service.OnStart by starting the server.
func (s *Server) OnStart() error { func (s *Server) OnStart() error {
go s.loop(state{ go s.loop(state{
queryToChanMap: make(map[string]map[string]chan<- interface{}), queryToSubscriptionMap: make(map[string]map[string]*Subscription),
clientToQueryMap: make(map[string]map[string]struct{}), clientToQueryMap: make(map[string]map[string]struct{}),
queries: make(map[string]*queryPlusRefCount), queries: make(map[string]*queryPlusRefCount),
}) })
@ -321,33 +282,32 @@ loop:
switch cmd.op { switch cmd.op {
case unsub: case unsub:
if cmd.query != nil { if cmd.query != nil {
state.remove(cmd.clientID, cmd.query) state.remove(cmd.clientID, cmd.query, ErrUnsubscribed)
} else { } else {
state.removeAll(cmd.clientID) state.removeAll(cmd.clientID, ErrUnsubscribed)
} }
case shutdown: case shutdown:
for clientID := range state.clientToQueryMap { for clientID := range state.clientToQueryMap {
state.removeAll(clientID) state.removeAll(clientID, nil)
} }
break loop break loop
case sub: case sub:
state.add(cmd.clientID, cmd.query, cmd.ch) state.add(cmd.clientID, cmd.query, cmd.subscription)
case pub: case pub:
state.send(cmd.msg, cmd.tags) state.send(cmd.msg, cmd.tags)
} }
} }
} }
func (state *state) add(clientID string, q Query, ch chan<- interface{}) { func (state *state) add(clientID string, q Query, subscription *Subscription) {
qStr := q.String() qStr := q.String()
// initialize clientToChannelMap per query if needed // initialize clientToSubscriptionMap per query if needed
if _, ok := state.queryToChanMap[qStr]; !ok { if _, ok := state.queryToSubscriptionMap[qStr]; !ok {
state.queryToChanMap[qStr] = make(map[string]chan<- interface{}) state.queryToSubscriptionMap[qStr] = make(map[string]*Subscription)
} }
// create subscription // create subscription
state.queryToChanMap[qStr][clientID] = ch state.queryToSubscriptionMap[qStr][clientID] = subscription
// initialize queries if needed // initialize queries if needed
if _, ok := state.queries[qStr]; !ok { if _, ok := state.queries[qStr]; !ok {
@ -363,20 +323,23 @@ func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
state.clientToQueryMap[clientID][qStr] = struct{}{} state.clientToQueryMap[clientID][qStr] = struct{}{}
} }
func (state *state) remove(clientID string, q Query) { func (state *state) remove(clientID string, q Query, reason error) {
qStr := q.String() qStr := q.String()
clientToChannelMap, ok := state.queryToChanMap[qStr] clientToSubscriptionMap, ok := state.queryToSubscriptionMap[qStr]
if !ok { if !ok {
return return
} }
ch, ok := clientToChannelMap[clientID] subscription, ok := clientToSubscriptionMap[clientID]
if !ok { if !ok {
return return
} }
close(ch) subscription.mtx.Lock()
subscription.err = reason
subscription.mtx.Unlock()
close(subscription.cancelled)
// remove the query from client map. // remove the query from client map.
// if client is not subscribed to anything else, remove it. // if client is not subscribed to anything else, remove it.
@ -387,9 +350,9 @@ func (state *state) remove(clientID string, q Query) {
// remove the client from query map. // remove the client from query map.
// if query has no other clients subscribed, remove it. // if query has no other clients subscribed, remove it.
delete(state.queryToChanMap[qStr], clientID) delete(state.queryToSubscriptionMap[qStr], clientID)
if len(state.queryToChanMap[qStr]) == 0 { if len(state.queryToSubscriptionMap[qStr]) == 0 {
delete(state.queryToChanMap, qStr) delete(state.queryToSubscriptionMap, qStr)
} }
// decrease ref counter in queries // decrease ref counter in queries
@ -400,21 +363,24 @@ func (state *state) remove(clientID string, q Query) {
} }
} }
func (state *state) removeAll(clientID string) { func (state *state) removeAll(clientID string, reason error) {
queryMap, ok := state.clientToQueryMap[clientID] queryMap, ok := state.clientToQueryMap[clientID]
if !ok { if !ok {
return return
} }
for qStr := range queryMap { for qStr := range queryMap {
ch := state.queryToChanMap[qStr][clientID] subscription := state.queryToSubscriptionMap[qStr][clientID]
close(ch) subscription.mtx.Lock()
subscription.err = reason
subscription.mtx.Unlock()
close(subscription.cancelled)
// remove the client from query map. // remove the client from query map.
// if query has no other clients subscribed, remove it. // if query has no other clients subscribed, remove it.
delete(state.queryToChanMap[qStr], clientID) delete(state.queryToSubscriptionMap[qStr], clientID)
if len(state.queryToChanMap[qStr]) == 0 { if len(state.queryToSubscriptionMap[qStr]) == 0 {
delete(state.queryToChanMap, qStr) delete(state.queryToSubscriptionMap, qStr)
} }
// decrease ref counter in queries // decrease ref counter in queries
@ -430,11 +396,21 @@ func (state *state) removeAll(clientID string) {
} }
func (state *state) send(msg interface{}, tags TagMap) { func (state *state) send(msg interface{}, tags TagMap) {
for qStr, clientToChannelMap := range state.queryToChanMap { for qStr, clientToSubscriptionMap := range state.queryToSubscriptionMap {
q := state.queries[qStr].q q := state.queries[qStr].q
if q.Matches(tags) { if q.Matches(tags) {
for _, ch := range clientToChannelMap { for clientID, subscription := range clientToSubscriptionMap {
ch <- msg if cap(subscription.out) == 0 {
// block on unbuffered channel
subscription.out <- MsgAndTags{msg, tags}
} else {
// don't block on buffered channels
select {
case subscription.out <- MsgAndTags{msg, tags}:
default:
state.remove(clientID, q, ErrOutOfCapacity)
}
}
} }
} }
} }

View File

@ -27,16 +27,15 @@ func TestSubscribe(t *testing.T) {
defer s.Stop() defer s.Stop()
ctx := context.Background() ctx := context.Background()
ch := make(chan interface{}, 1) subscription, err := s.Subscribe(ctx, clientID, query.Empty{}, 1)
err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
require.NoError(t, err) require.NoError(t, err)
err = s.Publish(ctx, "Ka-Zar") err = s.Publish(ctx, "Ka-Zar")
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Ka-Zar", ch) assertReceive(t, "Ka-Zar", subscription.Out())
err = s.Publish(ctx, "Quicksilver") err = s.Publish(ctx, "Quicksilver")
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Quicksilver", ch) assertReceive(t, "Quicksilver", subscription.Out())
} }
func TestDifferentClients(t *testing.T) { func TestDifferentClients(t *testing.T) {
@ -46,27 +45,24 @@ func TestDifferentClients(t *testing.T) {
defer s.Stop() defer s.Stop()
ctx := context.Background() ctx := context.Background()
ch1 := make(chan interface{}, 1) subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), 1)
err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Iceman", ch1) assertReceive(t, "Iceman", subscription1.Out())
ch2 := make(chan interface{}, 1) subscription2, err := s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), 1)
err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})) err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Ultimo", ch1) assertReceive(t, "Ultimo", subscription1.Out())
assertReceive(t, "Ultimo", ch2) assertReceive(t, "Ultimo", subscription2.Out())
ch3 := make(chan interface{}, 1) subscription3, err := s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), 1)
err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewRoundStep"})) err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewRoundStep"}))
require.NoError(t, err) require.NoError(t, err)
assert.Zero(t, len(ch3)) assert.Zero(t, len(subscription3.Out()))
} }
func TestClientSubscribesTwice(t *testing.T) { func TestClientSubscribesTwice(t *testing.T) {
@ -78,20 +74,19 @@ func TestClientSubscribesTwice(t *testing.T) {
ctx := context.Background() ctx := context.Background()
q := query.MustParse("tm.events.type='NewBlock'") q := query.MustParse("tm.events.type='NewBlock'")
ch1 := make(chan interface{}, 1) subscription1, err := s.Subscribe(ctx, clientID, q, 1)
err := s.Subscribe(ctx, clientID, q, ch1)
require.NoError(t, err) require.NoError(t, err)
err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Goblin Queen", ch1) assertReceive(t, "Goblin Queen", subscription1.Out())
ch2 := make(chan interface{}, 1) subscription2, err := s.Subscribe(ctx, clientID, q, 1)
err = s.Subscribe(ctx, clientID, q, ch2)
require.Error(t, err) require.Error(t, err)
require.Nil(t, subscription2)
err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"})) err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Spider-Man", ch1) assertReceive(t, "Spider-Man", subscription1.Out())
} }
func TestUnsubscribe(t *testing.T) { func TestUnsubscribe(t *testing.T) {
@ -101,18 +96,19 @@ func TestUnsubscribe(t *testing.T) {
defer s.Stop() defer s.Stop()
ctx := context.Background() ctx := context.Background()
ch := make(chan interface{}) subscription, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), 1)
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch)
require.NoError(t, err) require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err) require.NoError(t, err)
err = s.Publish(ctx, "Nick Fury") err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err) require.NoError(t, err)
assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe") assert.Zero(t, len(subscription.Out()), "Should not receive anything after Unsubscribe")
_, ok := <-ch _, ok := <-subscription.Cancelled()
assert.False(t, ok) assert.False(t, ok)
assert.Equal(t, pubsub.ErrUnsubscribed, subscription.Err())
} }
func TestClientUnsubscribesTwice(t *testing.T) { func TestClientUnsubscribesTwice(t *testing.T) {
@ -122,8 +118,7 @@ func TestClientUnsubscribesTwice(t *testing.T) {
defer s.Stop() defer s.Stop()
ctx := context.Background() ctx := context.Background()
ch := make(chan interface{}) _, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), 1)
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch)
require.NoError(t, err) require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'")) err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
require.NoError(t, err) require.NoError(t, err)
@ -141,18 +136,16 @@ func TestResubscribe(t *testing.T) {
defer s.Stop() defer s.Stop()
ctx := context.Background() ctx := context.Background()
ch := make(chan interface{}) subscription, err := s.Subscribe(ctx, clientID, query.Empty{}, 1)
err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
require.NoError(t, err) require.NoError(t, err)
err = s.Unsubscribe(ctx, clientID, query.Empty{}) err = s.Unsubscribe(ctx, clientID, query.Empty{})
require.NoError(t, err) require.NoError(t, err)
ch = make(chan interface{}) subscription, err = s.Subscribe(ctx, clientID, query.Empty{}, 1)
err = s.Subscribe(ctx, clientID, query.Empty{}, ch)
require.NoError(t, err) require.NoError(t, err)
err = s.Publish(ctx, "Cable") err = s.Publish(ctx, "Cable")
require.NoError(t, err) require.NoError(t, err)
assertReceive(t, "Cable", ch) assertReceive(t, "Cable", subscription.Out())
} }
func TestUnsubscribeAll(t *testing.T) { func TestUnsubscribeAll(t *testing.T) {
@ -162,10 +155,9 @@ func TestUnsubscribeAll(t *testing.T) {
defer s.Stop() defer s.Stop()
ctx := context.Background() ctx := context.Background()
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1) subscription1, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), 1)
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch1)
require.NoError(t, err) require.NoError(t, err)
err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), ch2) subscription2, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), 1)
require.NoError(t, err) require.NoError(t, err)
err = s.UnsubscribeAll(ctx, clientID) err = s.UnsubscribeAll(ctx, clientID)
@ -173,13 +165,15 @@ func TestUnsubscribeAll(t *testing.T) {
err = s.Publish(ctx, "Nick Fury") err = s.Publish(ctx, "Nick Fury")
require.NoError(t, err) require.NoError(t, err)
assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll") assert.Zero(t, len(subscription1.Out()), "Should not receive anything after UnsubscribeAll")
assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll") assert.Zero(t, len(subscription2.Out()), "Should not receive anything after UnsubscribeAll")
_, ok := <-ch1 _, ok := <-subscription1.Cancelled()
assert.False(t, ok) assert.False(t, ok)
_, ok = <-ch2 assert.Equal(t, pubsub.ErrUnsubscribed, subscription1.Err())
_, ok = <-subscription2.Cancelled()
assert.False(t, ok) assert.False(t, ok)
assert.Equal(t, pubsub.ErrUnsubscribed, subscription2.Err())
} }
func TestBufferCapacity(t *testing.T) { func TestBufferCapacity(t *testing.T) {
@ -217,12 +211,20 @@ func benchmarkNClients(n int, b *testing.B) {
ctx := context.Background() ctx := context.Background()
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
ch := make(chan interface{}) subscription, err := s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), 1)
if err != nil {
b.Fatal(err)
}
go func() { go func() {
for range ch { for {
select {
case <-subscription.Out():
continue
case <-subscription.Cancelled():
return
}
} }
}() }()
s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch)
} }
b.ReportAllocs() b.ReportAllocs()
@ -240,12 +242,20 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
ctx := context.Background() ctx := context.Background()
q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1") q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1")
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
ch := make(chan interface{}) subscription, err := s.Subscribe(ctx, clientID, q, 1)
if err != nil {
b.Fatal(err)
}
go func() { go func() {
for range ch { for {
select {
case <-subscription.Out():
continue
case <-subscription.Cancelled():
return
}
} }
}() }()
s.Subscribe(ctx, clientID, q, ch)
} }
b.ReportAllocs() b.ReportAllocs()
@ -259,12 +269,10 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
/// HELPERS /// HELPERS
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) { func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.MsgAndTags, msgAndArgs ...interface{}) {
select { select {
case actual := <-ch: case actual := <-ch:
if actual != nil { assert.Equal(t, expected, actual.Msg, msgAndArgs...)
assert.Equal(t, expected, actual, msgAndArgs...)
}
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected) t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected)
debug.PrintStack() debug.PrintStack()

View File

@ -0,0 +1,60 @@
package pubsub
import (
"errors"
"sync"
)
var (
// ErrUnsubscribed is returned by Err when a client unsubscribes.
ErrUnsubscribed = errors.New("client unsubscribed")
// ErrOutOfCapacity is returned by Err when a client is not pulling messages
// fast enough. Note the client's subscription will be terminated.
ErrOutOfCapacity = errors.New("client is not pulling messages fast enough")
)
// A Subscription represents a client subscription for a particular query and
// consists of three things:
// 1) channel onto which messages and tags are published
// 2) channel which is closed if a client is too slow or choose to unsubscribe
// 3) err indicating the reason for (2)
type Subscription struct {
out chan MsgAndTags
cancelled chan struct{}
mtx sync.RWMutex
err error
}
// Out returns a channel onto which messages and tags are published.
// Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from
// receiving a nil message.
func (s *Subscription) Out() <-chan MsgAndTags {
return s.out
}
// Cancelled returns a channel that's closed when the subscription is
// terminated and supposed to be used in a select statement.
func (s *Subscription) Cancelled() <-chan struct{} {
return s.cancelled
}
// Err returns nil if the channel returned by Cancelled is not yet closed.
// If the channel is closed, Err returns a non-nil error explaining why:
// - ErrUnsubscribed if the subscriber choose to unsubscribe,
// - ErrOutOfCapacity if the subscriber is not pulling messages fast enough
// and the channel returned by Out became full,
// After Err returns a non-nil error, successive calls to Err return the same
// error.
func (s *Subscription) Err() error {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.err
}
// MsgAndTags glues a message and tags together.
type MsgAndTags struct {
Msg interface{}
Tags TagMap
}

32
libs/pubsub/tag_map.go Normal file
View File

@ -0,0 +1,32 @@
package pubsub
// TagMap is used to associate tags to a message.
// They can be queried by subscribers to choose messages they will received.
type TagMap interface {
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
Get(key string) (value string, ok bool)
// Len returns the number of tags.
Len() int
}
type tagMap map[string]string
var _ TagMap = (*tagMap)(nil)
// NewTagMap constructs a new immutable tag set from a map.
func NewTagMap(data map[string]string) TagMap {
return tagMap(data)
}
// Get returns the value for a key, or nil if no value is present.
// The ok result indicates whether value was found in the tags.
func (ts tagMap) Get(key string) (value string, ok bool) {
value, ok = ts[key]
return
}
// Len returns the number of tags.
func (ts tagMap) Len() int {
return len(ts)
}