diff --git a/docs/architecture/adr-033-pubsub.md b/docs/architecture/adr-033-pubsub.md index 0ef0cae6..c52bf44a 100644 --- a/docs/architecture/adr-033-pubsub.md +++ b/docs/architecture/adr-033-pubsub.md @@ -5,6 +5,8 @@ Author: Anton Kaliaev (@melekes) ## Changelog 02-10-2018: Initial draft +16-01-2019: Second version based on our conversation with Jae +17-01-2019: Third version explaining how new design solves current issues ## Context @@ -40,7 +42,14 @@ goroutines can be used to avoid uncontrolled memory growth. In certain cases, this is what you want. But in our case, because we need strict ordering of events (if event A was published before B, the guaranteed -delivery order will be A -> B), we can't use goroutines. +delivery order will be A -> B), we can't publish msg in a new goroutine every time. + +We can also have a goroutine per subscriber, although we'd need to be careful +with the number of subscribers. It's more difficult to implement as well + +unclear if we'll benefit from it (cause we'd be forced to create N additional +channels to distribute msg to these goroutines). + +### Non-blocking send There is also a question whenever we should have a non-blocking send: @@ -56,15 +65,14 @@ for each subscriber { ``` This fixes the "slow client problem", but there is no way for a slow client to -know if it had missed a message. On the other hand, if we're going to stick -with blocking send, **devs must always ensure subscriber's handling code does not -block**. As you can see, there is an implicit choice between ordering guarantees -and using goroutines. +know if it had missed a message. We could return a second channel and close it +to indicate subscription termination. On the other hand, if we're going to +stick with blocking send, **devs must always ensure subscriber's handling code +does not block**, which is a hard task to put on their shoulders. The interim option is to run goroutines pool for a single message, wait for all goroutines to finish. This will solve "slow client problem", but we'd still have to wait `max(goroutine_X_time)` before we can publish the next message. -My opinion: not worth doing. ### Channels vs Callbacks @@ -76,8 +84,6 @@ memory leaks and/or memory usage increase. Go channels are de-facto standard for carrying data between goroutines. -**Question: Is it worth switching to callback functions?** - ### Why `Subscribe()` accepts an `out` channel? Because in our tests, we create buffered channels (cap: 1). Alternatively, we @@ -85,27 +91,89 @@ can make capacity an argument. ## Decision -Change Subscribe() function to return out channel: +Change Subscribe() function to return a `Subscription` struct: ```go -// outCap can be used to set capacity of out channel (unbuffered by default). -Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (out <-chan interface{}, err error) { +type Subscription struct { + // private fields +} + +func (s *Subscription) Out() <-chan MsgAndTags +func (s *Subscription) Cancelled() <-chan struct{} +func (s *Subscription) Err() error ``` -It's more idiomatic since we're closing it during Unsubscribe/UnsubscribeAll calls. +Out returns a channel onto which messages and tags are published. +Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from +receiving a nil message. -Also, we should make tags available to subscribers: +Cancelled returns a channel that's closed when the subscription is terminated +and supposed to be used in a select statement. + +If Cancelled is not closed yet, Err() returns nil. +If Cancelled is closed, Err returns a non-nil error explaining why: +Unsubscribed if the subscriber choose to unsubscribe, +OutOfCapacity if the subscriber is not pulling messages fast enough and the Out channel become full. +After Err returns a non-nil error, successive calls to Err() return the same error. + +```go +subscription, err := pubsub.Subscribe(...) +if err != nil { + // ... +} +for { +select { + case msgAndTags <- subscription.Out(): + // ... + case <-subscription.Cancelled(): + return subscription.Err() +} +``` + +Make Out() channel buffered (cap: 1) by default. In most cases, we want to +terminate the slow subscriber. Only in rare cases, we want to block the pubsub +(e.g. when debugging consensus). This should lower the chances of the pubsub +being frozen. + +```go +// outCap can be used to set capacity of Out channel (1 by default). Set to 0 +for unbuffered channel (WARNING: it may block the pubsub). +Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (Subscription, error) { +``` + +Also, Out() channel should return tags along with a message: ```go type MsgAndTags struct { Msg interface{} Tags TagMap } - -// outCap can be used to set capacity of out channel (unbuffered by default). -Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (out <-chan MsgAndTags, err error) { ``` +to inform clients of which Tags were used with Msg. + +### How this new design solves the current issues? + +https://github.com/tendermint/tendermint/issues/951 (https://github.com/tendermint/tendermint/issues/1880) + +Because of non-blocking send, situation where we'll deadlock is not possible +anymore. If the client stops reading messages, it will be removed. + +https://github.com/tendermint/tendermint/issues/1879 + +MsgAndTags is used now instead of a plain message. + +### Future problems and their possible solutions + +https://github.com/tendermint/tendermint/issues/2826 + +One question I am still pondering about: how to prevent pubsub from slowing +down consensus. We can increase the pubsub queue size (which is 0 now). Also, +it's probably a good idea to limit the total number of subscribers. + +This can be made automatically. Say we set queue size to 1000 and, when it's >= +80% full, refuse new subscriptions. + ## Status In review @@ -116,7 +184,10 @@ In review - more idiomatic interface - subscribers know what tags msg was published with +- subscribers aware of the reason their subscription was cancelled ### Negative +- (since v1) no concurrency when it comes to publishing messages + ### Neutral