mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-03 00:21:20 +00:00
docs: update pubsub ADR (#3131)
* docs: update pubsub ADR * third version
This commit is contained in:
parent
98b42e9eb2
commit
1efacaa8d3
@ -5,6 +5,8 @@ Author: Anton Kaliaev (@melekes)
|
|||||||
## Changelog
|
## Changelog
|
||||||
|
|
||||||
02-10-2018: Initial draft
|
02-10-2018: Initial draft
|
||||||
|
16-01-2019: Second version based on our conversation with Jae
|
||||||
|
17-01-2019: Third version explaining how new design solves current issues
|
||||||
|
|
||||||
## Context
|
## Context
|
||||||
|
|
||||||
@ -40,7 +42,14 @@ goroutines can be used to avoid uncontrolled memory growth.
|
|||||||
|
|
||||||
In certain cases, this is what you want. But in our case, because we need
|
In certain cases, this is what you want. But in our case, because we need
|
||||||
strict ordering of events (if event A was published before B, the guaranteed
|
strict ordering of events (if event A was published before B, the guaranteed
|
||||||
delivery order will be A -> B), we can't use goroutines.
|
delivery order will be A -> B), we can't publish msg in a new goroutine every time.
|
||||||
|
|
||||||
|
We can also have a goroutine per subscriber, although we'd need to be careful
|
||||||
|
with the number of subscribers. It's more difficult to implement as well +
|
||||||
|
unclear if we'll benefit from it (cause we'd be forced to create N additional
|
||||||
|
channels to distribute msg to these goroutines).
|
||||||
|
|
||||||
|
### Non-blocking send
|
||||||
|
|
||||||
There is also a question whenever we should have a non-blocking send:
|
There is also a question whenever we should have a non-blocking send:
|
||||||
|
|
||||||
@ -56,15 +65,14 @@ for each subscriber {
|
|||||||
```
|
```
|
||||||
|
|
||||||
This fixes the "slow client problem", but there is no way for a slow client to
|
This fixes the "slow client problem", but there is no way for a slow client to
|
||||||
know if it had missed a message. On the other hand, if we're going to stick
|
know if it had missed a message. We could return a second channel and close it
|
||||||
with blocking send, **devs must always ensure subscriber's handling code does not
|
to indicate subscription termination. On the other hand, if we're going to
|
||||||
block**. As you can see, there is an implicit choice between ordering guarantees
|
stick with blocking send, **devs must always ensure subscriber's handling code
|
||||||
and using goroutines.
|
does not block**, which is a hard task to put on their shoulders.
|
||||||
|
|
||||||
The interim option is to run goroutines pool for a single message, wait for all
|
The interim option is to run goroutines pool for a single message, wait for all
|
||||||
goroutines to finish. This will solve "slow client problem", but we'd still
|
goroutines to finish. This will solve "slow client problem", but we'd still
|
||||||
have to wait `max(goroutine_X_time)` before we can publish the next message.
|
have to wait `max(goroutine_X_time)` before we can publish the next message.
|
||||||
My opinion: not worth doing.
|
|
||||||
|
|
||||||
### Channels vs Callbacks
|
### Channels vs Callbacks
|
||||||
|
|
||||||
@ -76,8 +84,6 @@ memory leaks and/or memory usage increase.
|
|||||||
|
|
||||||
Go channels are de-facto standard for carrying data between goroutines.
|
Go channels are de-facto standard for carrying data between goroutines.
|
||||||
|
|
||||||
**Question: Is it worth switching to callback functions?**
|
|
||||||
|
|
||||||
### Why `Subscribe()` accepts an `out` channel?
|
### Why `Subscribe()` accepts an `out` channel?
|
||||||
|
|
||||||
Because in our tests, we create buffered channels (cap: 1). Alternatively, we
|
Because in our tests, we create buffered channels (cap: 1). Alternatively, we
|
||||||
@ -85,27 +91,89 @@ can make capacity an argument.
|
|||||||
|
|
||||||
## Decision
|
## Decision
|
||||||
|
|
||||||
Change Subscribe() function to return out channel:
|
Change Subscribe() function to return a `Subscription` struct:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
// outCap can be used to set capacity of out channel (unbuffered by default).
|
type Subscription struct {
|
||||||
Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (out <-chan interface{}, err error) {
|
// private fields
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Subscription) Out() <-chan MsgAndTags
|
||||||
|
func (s *Subscription) Cancelled() <-chan struct{}
|
||||||
|
func (s *Subscription) Err() error
|
||||||
```
|
```
|
||||||
|
|
||||||
It's more idiomatic since we're closing it during Unsubscribe/UnsubscribeAll calls.
|
Out returns a channel onto which messages and tags are published.
|
||||||
|
Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from
|
||||||
|
receiving a nil message.
|
||||||
|
|
||||||
Also, we should make tags available to subscribers:
|
Cancelled returns a channel that's closed when the subscription is terminated
|
||||||
|
and supposed to be used in a select statement.
|
||||||
|
|
||||||
|
If Cancelled is not closed yet, Err() returns nil.
|
||||||
|
If Cancelled is closed, Err returns a non-nil error explaining why:
|
||||||
|
Unsubscribed if the subscriber choose to unsubscribe,
|
||||||
|
OutOfCapacity if the subscriber is not pulling messages fast enough and the Out channel become full.
|
||||||
|
After Err returns a non-nil error, successive calls to Err() return the same error.
|
||||||
|
|
||||||
|
```go
|
||||||
|
subscription, err := pubsub.Subscribe(...)
|
||||||
|
if err != nil {
|
||||||
|
// ...
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msgAndTags <- subscription.Out():
|
||||||
|
// ...
|
||||||
|
case <-subscription.Cancelled():
|
||||||
|
return subscription.Err()
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Make Out() channel buffered (cap: 1) by default. In most cases, we want to
|
||||||
|
terminate the slow subscriber. Only in rare cases, we want to block the pubsub
|
||||||
|
(e.g. when debugging consensus). This should lower the chances of the pubsub
|
||||||
|
being frozen.
|
||||||
|
|
||||||
|
```go
|
||||||
|
// outCap can be used to set capacity of Out channel (1 by default). Set to 0
|
||||||
|
for unbuffered channel (WARNING: it may block the pubsub).
|
||||||
|
Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (Subscription, error) {
|
||||||
|
```
|
||||||
|
|
||||||
|
Also, Out() channel should return tags along with a message:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
type MsgAndTags struct {
|
type MsgAndTags struct {
|
||||||
Msg interface{}
|
Msg interface{}
|
||||||
Tags TagMap
|
Tags TagMap
|
||||||
}
|
}
|
||||||
|
|
||||||
// outCap can be used to set capacity of out channel (unbuffered by default).
|
|
||||||
Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (out <-chan MsgAndTags, err error) {
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
to inform clients of which Tags were used with Msg.
|
||||||
|
|
||||||
|
### How this new design solves the current issues?
|
||||||
|
|
||||||
|
https://github.com/tendermint/tendermint/issues/951 (https://github.com/tendermint/tendermint/issues/1880)
|
||||||
|
|
||||||
|
Because of non-blocking send, situation where we'll deadlock is not possible
|
||||||
|
anymore. If the client stops reading messages, it will be removed.
|
||||||
|
|
||||||
|
https://github.com/tendermint/tendermint/issues/1879
|
||||||
|
|
||||||
|
MsgAndTags is used now instead of a plain message.
|
||||||
|
|
||||||
|
### Future problems and their possible solutions
|
||||||
|
|
||||||
|
https://github.com/tendermint/tendermint/issues/2826
|
||||||
|
|
||||||
|
One question I am still pondering about: how to prevent pubsub from slowing
|
||||||
|
down consensus. We can increase the pubsub queue size (which is 0 now). Also,
|
||||||
|
it's probably a good idea to limit the total number of subscribers.
|
||||||
|
|
||||||
|
This can be made automatically. Say we set queue size to 1000 and, when it's >=
|
||||||
|
80% full, refuse new subscriptions.
|
||||||
|
|
||||||
## Status
|
## Status
|
||||||
|
|
||||||
In review
|
In review
|
||||||
@ -116,7 +184,10 @@ In review
|
|||||||
|
|
||||||
- more idiomatic interface
|
- more idiomatic interface
|
||||||
- subscribers know what tags msg was published with
|
- subscribers know what tags msg was published with
|
||||||
|
- subscribers aware of the reason their subscription was cancelled
|
||||||
|
|
||||||
### Negative
|
### Negative
|
||||||
|
|
||||||
|
- (since v1) no concurrency when it comes to publishing messages
|
||||||
|
|
||||||
### Neutral
|
### Neutral
|
||||||
|
Loading…
x
Reference in New Issue
Block a user