tendermint/libs/pubsub/subscription.go

90 lines
2.5 KiB
Go
Raw Normal View History

pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
2019-02-23 08:11:27 +04:00
package pubsub
import (
"errors"
"sync"
)
var (
// ErrUnsubscribed is returned by Err when a client unsubscribes.
ErrUnsubscribed = errors.New("client unsubscribed")
// ErrOutOfCapacity is returned by Err when a client is not pulling messages
// fast enough. Note the client's subscription will be terminated.
ErrOutOfCapacity = errors.New("client is not pulling messages fast enough")
)
// A Subscription represents a client subscription for a particular query and
// consists of three things:
// 1) channel onto which messages and tags are published
// 2) channel which is closed if a client is too slow or choose to unsubscribe
// 3) err indicating the reason for (2)
type Subscription struct {
out chan Message
cancelled chan struct{}
mtx sync.RWMutex
err error
}
// NewSubscription returns a new subscription with the given outCapacity.
func NewSubscription(outCapacity int) *Subscription {
return &Subscription{
out: make(chan Message, outCapacity),
cancelled: make(chan struct{}),
}
}
// Out returns a channel onto which messages and tags are published.
// Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from
// receiving a nil message.
func (s *Subscription) Out() <-chan Message {
return s.out
}
// Cancelled returns a channel that's closed when the subscription is
// terminated and supposed to be used in a select statement.
func (s *Subscription) Cancelled() <-chan struct{} {
return s.cancelled
}
// Err returns nil if the channel returned by Cancelled is not yet closed.
// If the channel is closed, Err returns a non-nil error explaining why:
// - ErrUnsubscribed if the subscriber choose to unsubscribe,
// - ErrOutOfCapacity if the subscriber is not pulling messages fast enough
// and the channel returned by Out became full,
// After Err returns a non-nil error, successive calls to Err return the same
// error.
func (s *Subscription) Err() error {
s.mtx.RLock()
defer s.mtx.RUnlock()
return s.err
}
func (s *Subscription) cancel(err error) {
s.mtx.Lock()
s.err = err
s.mtx.Unlock()
close(s.cancelled)
}
// Message glues data and tags together.
type Message struct {
abci: Refactor tagging events using list of lists (#3643) ## PR This PR introduces a fundamental breaking change to the structure of ABCI response and tx tags and the way they're processed. Namely, the SDK can support more complex and aggregated events for distribution and slashing. In addition, block responses can include duplicate keys in events. Implement new Event type. An event has a type and a list of KV pairs (ie. list-of-lists). Typical events may look like: "rewards": [{"amount": "5000uatom", "validator": "...", "recipient": "..."}] "sender": [{"address": "...", "balance": "100uatom"}] The events are indexed by {even.type}.{even.attribute[i].key}/.... In this case a client would subscribe or query for rewards.recipient='...' ABCI response types and related types now include Events []Event instead of Tags []cmn.KVPair. PubSub logic now publishes/matches against map[string][]string instead of map[string]string to support duplicate keys in response events (from #1385). A match is successful if the value is found in the slice of strings. closes: #1859 closes: #2905 ## Commits: * Implement Event ABCI type and updates responses to use events * Update messages_test.go * Update kvstore.go * Update event_bus.go * Update subscription.go * Update pubsub.go * Update kvstore.go * Update query logic to handle slice of strings in events * Update Empty#Matches and unit tests * Update pubsub logic * Update EventBus#Publish * Update kv tx indexer * Update godocs * Update ResultEvent to use slice of strings; update RPC * Update more tests * Update abci.md * Check for key in validateAndStringifyEvents * Fix KV indexer to skip empty keys * Fix linting errors * Update CHANGELOG_PENDING.md * Update docs/spec/abci/abci.md Co-Authored-By: Federico Kunze <31522760+fedekunze@users.noreply.github.com> * Update abci/types/types.proto Co-Authored-By: Ethan Buchman <ethan@coinculture.info> * Update docs/spec/abci/abci.md Co-Authored-By: Ethan Buchman <ethan@coinculture.info> * Update libs/pubsub/query/query.go Co-Authored-By: Ethan Buchman <ethan@coinculture.info> * Update match function to match if ANY value matches * Implement TestSubscribeDuplicateKeys * Update TestMatches to include multi-key test cases * Update events.go * Update Query interface godoc * Update match godoc * Add godoc for matchValue * DRY-up tx indexing * Return error from PublishWithEvents in EventBus#Publish * Update PublishEventNewBlockHeader to return an error * Fix build * Update events doc in ABCI * Update ABCI events godoc * Implement TestEventBusPublishEventTxDuplicateKeys * Update TestSubscribeDuplicateKeys to be table-driven * Remove mod file * Remove markdown from events godoc * Implement TestTxSearchDeprecatedIndexing test
2019-06-12 14:03:45 +02:00
data interface{}
events map[string][]string
pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
2019-02-23 08:11:27 +04:00
}
abci: Refactor tagging events using list of lists (#3643) ## PR This PR introduces a fundamental breaking change to the structure of ABCI response and tx tags and the way they're processed. Namely, the SDK can support more complex and aggregated events for distribution and slashing. In addition, block responses can include duplicate keys in events. Implement new Event type. An event has a type and a list of KV pairs (ie. list-of-lists). Typical events may look like: "rewards": [{"amount": "5000uatom", "validator": "...", "recipient": "..."}] "sender": [{"address": "...", "balance": "100uatom"}] The events are indexed by {even.type}.{even.attribute[i].key}/.... In this case a client would subscribe or query for rewards.recipient='...' ABCI response types and related types now include Events []Event instead of Tags []cmn.KVPair. PubSub logic now publishes/matches against map[string][]string instead of map[string]string to support duplicate keys in response events (from #1385). A match is successful if the value is found in the slice of strings. closes: #1859 closes: #2905 ## Commits: * Implement Event ABCI type and updates responses to use events * Update messages_test.go * Update kvstore.go * Update event_bus.go * Update subscription.go * Update pubsub.go * Update kvstore.go * Update query logic to handle slice of strings in events * Update Empty#Matches and unit tests * Update pubsub logic * Update EventBus#Publish * Update kv tx indexer * Update godocs * Update ResultEvent to use slice of strings; update RPC * Update more tests * Update abci.md * Check for key in validateAndStringifyEvents * Fix KV indexer to skip empty keys * Fix linting errors * Update CHANGELOG_PENDING.md * Update docs/spec/abci/abci.md Co-Authored-By: Federico Kunze <31522760+fedekunze@users.noreply.github.com> * Update abci/types/types.proto Co-Authored-By: Ethan Buchman <ethan@coinculture.info> * Update docs/spec/abci/abci.md Co-Authored-By: Ethan Buchman <ethan@coinculture.info> * Update libs/pubsub/query/query.go Co-Authored-By: Ethan Buchman <ethan@coinculture.info> * Update match function to match if ANY value matches * Implement TestSubscribeDuplicateKeys * Update TestMatches to include multi-key test cases * Update events.go * Update Query interface godoc * Update match godoc * Add godoc for matchValue * DRY-up tx indexing * Return error from PublishWithEvents in EventBus#Publish * Update PublishEventNewBlockHeader to return an error * Fix build * Update events doc in ABCI * Update ABCI events godoc * Implement TestEventBusPublishEventTxDuplicateKeys * Update TestSubscribeDuplicateKeys to be table-driven * Remove mod file * Remove markdown from events godoc * Implement TestTxSearchDeprecatedIndexing test
2019-06-12 14:03:45 +02:00
func NewMessage(data interface{}, events map[string][]string) Message {
return Message{data, events}
pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
2019-02-23 08:11:27 +04:00
}
// Data returns an original data published.
func (msg Message) Data() interface{} {
return msg.data
}
abci: Refactor tagging events using list of lists (#3643) ## PR This PR introduces a fundamental breaking change to the structure of ABCI response and tx tags and the way they're processed. Namely, the SDK can support more complex and aggregated events for distribution and slashing. In addition, block responses can include duplicate keys in events. Implement new Event type. An event has a type and a list of KV pairs (ie. list-of-lists). Typical events may look like: "rewards": [{"amount": "5000uatom", "validator": "...", "recipient": "..."}] "sender": [{"address": "...", "balance": "100uatom"}] The events are indexed by {even.type}.{even.attribute[i].key}/.... In this case a client would subscribe or query for rewards.recipient='...' ABCI response types and related types now include Events []Event instead of Tags []cmn.KVPair. PubSub logic now publishes/matches against map[string][]string instead of map[string]string to support duplicate keys in response events (from #1385). A match is successful if the value is found in the slice of strings. closes: #1859 closes: #2905 ## Commits: * Implement Event ABCI type and updates responses to use events * Update messages_test.go * Update kvstore.go * Update event_bus.go * Update subscription.go * Update pubsub.go * Update kvstore.go * Update query logic to handle slice of strings in events * Update Empty#Matches and unit tests * Update pubsub logic * Update EventBus#Publish * Update kv tx indexer * Update godocs * Update ResultEvent to use slice of strings; update RPC * Update more tests * Update abci.md * Check for key in validateAndStringifyEvents * Fix KV indexer to skip empty keys * Fix linting errors * Update CHANGELOG_PENDING.md * Update docs/spec/abci/abci.md Co-Authored-By: Federico Kunze <31522760+fedekunze@users.noreply.github.com> * Update abci/types/types.proto Co-Authored-By: Ethan Buchman <ethan@coinculture.info> * Update docs/spec/abci/abci.md Co-Authored-By: Ethan Buchman <ethan@coinculture.info> * Update libs/pubsub/query/query.go Co-Authored-By: Ethan Buchman <ethan@coinculture.info> * Update match function to match if ANY value matches * Implement TestSubscribeDuplicateKeys * Update TestMatches to include multi-key test cases * Update events.go * Update Query interface godoc * Update match godoc * Add godoc for matchValue * DRY-up tx indexing * Return error from PublishWithEvents in EventBus#Publish * Update PublishEventNewBlockHeader to return an error * Fix build * Update events doc in ABCI * Update ABCI events godoc * Implement TestEventBusPublishEventTxDuplicateKeys * Update TestSubscribeDuplicateKeys to be table-driven * Remove mod file * Remove markdown from events godoc * Implement TestTxSearchDeprecatedIndexing test
2019-06-12 14:03:45 +02:00
// Events returns events, which matched the client's query.
func (msg Message) Events() map[string][]string {
return msg.events
pubsub 2.0 (#3227) * green pubsub tests :OK: * get rid of clientToQueryMap * Subscribe and SubscribeUnbuffered * start adapting other pkgs to new pubsub * nope * rename MsgAndTags to Message * remove TagMap it does not bring any additional benefits * bring back EventSubscriber * fix test * fix data race in TestStartNextHeightCorrectly ``` Write at 0x00c0001c7418 by goroutine 796: github.com/tendermint/tendermint/consensus.TestStartNextHeightCorrectly() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:1296 +0xad testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Previous read at 0x00c0001c7418 by goroutine 858: github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1631 +0x1366 github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote() /go/src/github.com/tendermint/tendermint/consensus/state.go:1476 +0x8f github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg() /go/src/github.com/tendermint/tendermint/consensus/state.go:667 +0xa1e github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine() /go/src/github.com/tendermint/tendermint/consensus/state.go:628 +0x794 Goroutine 796 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 testing.runTests.func1() /usr/local/go/src/testing/testing.go:1119 +0xa8 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 testing.runTests() /usr/local/go/src/testing/testing.go:1117 +0x4ee testing.(*M).Run() /usr/local/go/src/testing/testing.go:1034 +0x2ee main.main() _testmain.go:214 +0x332 Goroutine 858 (running) created at: github.com/tendermint/tendermint/consensus.(*ConsensusState).startRoutines() /go/src/github.com/tendermint/tendermint/consensus/state.go:334 +0x221 github.com/tendermint/tendermint/consensus.startTestRound() /go/src/github.com/tendermint/tendermint/consensus/common_test.go:122 +0x63 github.com/tendermint/tendermint/consensus.TestStateFullRound1() /go/src/github.com/tendermint/tendermint/consensus/state_test.go:255 +0x397 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ``` * fixes after my own review * fix formatting * wait 100ms before kicking a subscriber out + a test for indexer_service * fixes after my second review * no timeout * add changelog entries * fix merge conflicts * fix typos after Thane's review Co-Authored-By: melekes <anton.kalyaev@gmail.com> * reformat code * rewrite indexer service in the attempt to fix failing test https://github.com/tendermint/tendermint/pull/3227/#issuecomment-462316527 * Revert "rewrite indexer service in the attempt to fix failing test" This reverts commit 0d9107a098230de7138abb1c201877c246e89ed1. * another attempt to fix indexer * fixes after Ethan's review * use unbuffered channel when indexing transactions Refs https://github.com/tendermint/tendermint/pull/3227#discussion_r258786716 * add a comment for EventBus#SubscribeUnbuffered * format code
2019-02-23 08:11:27 +04:00
}