mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-09 03:21:19 +00:00
adr-033 update
This commit is contained in:
parent
9b6b792ce7
commit
d91ea9b59d
@ -10,6 +10,8 @@ Author: Anton Kaliaev (@melekes)
|
|||||||
|
|
||||||
17-01-2019: Third version explaining how new design solves current issues
|
17-01-2019: Third version explaining how new design solves current issues
|
||||||
|
|
||||||
|
25-01-2019: Fourth version to treat buffered and unbuffered channels differently
|
||||||
|
|
||||||
## Context
|
## Context
|
||||||
|
|
||||||
Since the initial version of the pubsub, there's been a number of issues
|
Since the initial version of the pubsub, there's been a number of issues
|
||||||
@ -53,7 +55,10 @@ channels to distribute msg to these goroutines).
|
|||||||
|
|
||||||
### Non-blocking send
|
### Non-blocking send
|
||||||
|
|
||||||
There is also a question whenever we should have a non-blocking send:
|
There is also a question whenever we should have a non-blocking send.
|
||||||
|
Currently, sends are blocking, so publishing to one client can block on
|
||||||
|
publishing to another. This means a slow or unresponsive client can halt the
|
||||||
|
system. Instead, we can use a non-blocking send:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
for each subscriber {
|
for each subscriber {
|
||||||
@ -89,10 +94,25 @@ Go channels are de-facto standard for carrying data between goroutines.
|
|||||||
### Why `Subscribe()` accepts an `out` channel?
|
### Why `Subscribe()` accepts an `out` channel?
|
||||||
|
|
||||||
Because in our tests, we create buffered channels (cap: 1). Alternatively, we
|
Because in our tests, we create buffered channels (cap: 1). Alternatively, we
|
||||||
can make capacity an argument.
|
can make capacity an argument and return a channel.
|
||||||
|
|
||||||
## Decision
|
## Decision
|
||||||
|
|
||||||
|
### MsgAndTags
|
||||||
|
|
||||||
|
Use a `MsgAndTags` struct on the subscription channel to indicate what tags the
|
||||||
|
msg matched.
|
||||||
|
|
||||||
|
```go
|
||||||
|
type MsgAndTags struct {
|
||||||
|
Msg interface{}
|
||||||
|
Tags TagMap
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Subscription Struct
|
||||||
|
|
||||||
|
|
||||||
Change `Subscribe()` function to return a `Subscription` struct:
|
Change `Subscribe()` function to return a `Subscription` struct:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
@ -132,27 +152,53 @@ select {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Capacity and Subscriptions
|
||||||
|
|
||||||
Make the `Out()` channel buffered (with capacity 1) by default. In most cases, we want to
|
Make the `Out()` channel buffered (with capacity 1) by default. In most cases, we want to
|
||||||
terminate the slow subscriber. Only in rare cases, we want to block the pubsub
|
terminate the slow subscriber. Only in rare cases, we want to block the pubsub
|
||||||
(e.g. when debugging consensus). This should lower the chances of the pubsub
|
(e.g. when debugging consensus). This should lower the chances of the pubsub
|
||||||
being frozen.
|
being frozen.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
// outCap can be used to set capacity of Out channel (1 by default). Set to 0
|
// outCap can be used to set capacity of Out channel
|
||||||
// for unbuffered channel (WARNING: it may block the pubsub).
|
// (1 by default, must be greater than 0).
|
||||||
Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (Subscription, error) {
|
Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (Subscription, error) {
|
||||||
```
|
```
|
||||||
|
|
||||||
Also, the `Out()` channel should return tags along with a message:
|
Use a different function for an unbuffered channel:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
type MsgAndTags struct {
|
// Subscription uses an unbuffered channel. Publishing will block.
|
||||||
Msg interface{}
|
SubscribeUnbuffered(ctx context.Context, clientID string, query Query) (Subscription, error) {
|
||||||
Tags TagMap
|
|
||||||
}
|
|
||||||
```
|
```
|
||||||
|
|
||||||
to inform clients of which Tags were used with Msg.
|
SubscribeUnbuffered should not be exposed to users.
|
||||||
|
|
||||||
|
### Blocking/Nonblocking
|
||||||
|
|
||||||
|
The publisher should treat these kinds of channels separately.
|
||||||
|
It should block on unbuffered channels (for use with internal consensus events
|
||||||
|
in the consensus tests) and not block on the buffered ones. If a client is too
|
||||||
|
slow to keep up with it's messages, it's subscription is terminated:
|
||||||
|
|
||||||
|
for each subscription {
|
||||||
|
out := subscription.outChan
|
||||||
|
if cap(out) == 0 {
|
||||||
|
// block on unbuffered channel
|
||||||
|
out <- msg
|
||||||
|
} else {
|
||||||
|
// don't block on buffered channels
|
||||||
|
select {
|
||||||
|
case out <- msg:
|
||||||
|
default:
|
||||||
|
// set the error, notify on the cancel chan
|
||||||
|
subscription.err = fmt.Errorf("client is too slow for msg)
|
||||||
|
close(subscription.cancelChan)
|
||||||
|
|
||||||
|
// ... unsubscribe and close out
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
### How this new design solves the current issues?
|
### How this new design solves the current issues?
|
||||||
|
|
||||||
@ -167,7 +213,7 @@ MsgAndTags is used now instead of a plain message.
|
|||||||
|
|
||||||
### Future problems and their possible solutions
|
### Future problems and their possible solutions
|
||||||
|
|
||||||
https://github.com/tendermint/tendermint/issues/2826
|
[#2826]
|
||||||
|
|
||||||
One question I am still pondering about: how to prevent pubsub from slowing
|
One question I am still pondering about: how to prevent pubsub from slowing
|
||||||
down consensus. We can increase the pubsub queue size (which is 0 now). Also,
|
down consensus. We can increase the pubsub queue size (which is 0 now). Also,
|
||||||
@ -198,3 +244,4 @@ In review
|
|||||||
[#951]: https://github.com/tendermint/tendermint/issues/951
|
[#951]: https://github.com/tendermint/tendermint/issues/951
|
||||||
[#1879]: https://github.com/tendermint/tendermint/issues/1879
|
[#1879]: https://github.com/tendermint/tendermint/issues/1879
|
||||||
[#1880]: https://github.com/tendermint/tendermint/issues/1880
|
[#1880]: https://github.com/tendermint/tendermint/issues/1880
|
||||||
|
[#2826]: https://github.com/tendermint/tendermint/issues/2826
|
||||||
|
Loading…
x
Reference in New Issue
Block a user