mirror of
https://github.com/fluencelabs/tendermint
synced 2025-07-31 04:01:55 +00:00
Compare commits
28 Commits
release/v0
...
anton/new-
Author | SHA1 | Date | |
---|---|---|---|
|
a52b6e9fad | ||
|
6a76146318 | ||
|
142469cd1d | ||
|
5bc282beff | ||
|
6faf95e90e | ||
|
5b7034a329 | ||
|
398626573b | ||
|
0d9107a098 | ||
|
545b33ed29 | ||
|
f1d63ede26 | ||
|
b30cb5fde9 | ||
|
95dc174b97 | ||
|
2e029e8fd4 | ||
|
cf67a8b1b7 | ||
|
ebe8625478 | ||
|
165bb2abfe | ||
|
c6e3059e32 | ||
|
6464bcba7d | ||
|
d185bdb944 | ||
|
17f1cb0a8d | ||
|
4257407aea | ||
|
1be7e341b1 | ||
|
61155f66a7 | ||
|
54cc5100f8 | ||
|
641182e5d3 | ||
|
67be801052 | ||
|
c6e7015245 | ||
|
7268ec8d10 |
@@ -7,6 +7,7 @@ Special thanks to external contributors on this release:
|
||||
### BREAKING CHANGES:
|
||||
|
||||
* CLI/RPC/Config
|
||||
- [httpclient] Update Subscribe interface to reflect new pubsub/eventBus API [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
|
||||
|
||||
* Apps
|
||||
|
||||
@@ -21,3 +22,4 @@ Special thanks to external contributors on this release:
|
||||
### IMPROVEMENTS:
|
||||
|
||||
### BUG FIXES:
|
||||
- [libs/pubsub] \#951, \#1880 use non-blocking send when dispatching messages [ADR-33](https://github.com/tendermint/tendermint/blob/develop/docs/architecture/adr-033-pubsub.md)
|
@@ -46,7 +46,7 @@ func TestByzantine(t *testing.T) {
|
||||
switches[i].SetLogger(p2pLogger.With("validator", i))
|
||||
}
|
||||
|
||||
eventChans := make([]chan interface{}, N)
|
||||
blocksSubs := make([]types.Subscription, N)
|
||||
reactors := make([]p2p.Reactor, N)
|
||||
for i := 0; i < N; i++ {
|
||||
// make first val byzantine
|
||||
@@ -65,8 +65,8 @@ func TestByzantine(t *testing.T) {
|
||||
eventBus := css[i].eventBus
|
||||
eventBus.SetLogger(logger.With("module", "events", "validator", i))
|
||||
|
||||
eventChans[i] = make(chan interface{}, 1)
|
||||
err := eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i])
|
||||
var err error
|
||||
blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states
|
||||
@@ -131,7 +131,7 @@ func TestByzantine(t *testing.T) {
|
||||
p2p.Connect2Switches(switches, ind1, ind2)
|
||||
|
||||
// wait for someone in the big partition (B) to make a block
|
||||
<-eventChans[ind2]
|
||||
<-blocksSubs[ind2].Out()
|
||||
|
||||
t.Log("A block has been committed. Healing partition")
|
||||
p2p.Connect2Switches(switches, ind0, ind1)
|
||||
@@ -143,7 +143,7 @@ func TestByzantine(t *testing.T) {
|
||||
wg.Add(2)
|
||||
for i := 1; i < N-1; i++ {
|
||||
go func(j int) {
|
||||
<-eventChans[j]
|
||||
<-blocksSubs[j].Out()
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
|
@@ -7,7 +7,6 @@ import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
@@ -25,6 +24,7 @@ import (
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
dbm "github.com/tendermint/tendermint/libs/db"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
mempl "github.com/tendermint/tendermint/mempool"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
@@ -229,24 +229,22 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo
|
||||
cs.mtx.Unlock()
|
||||
}
|
||||
|
||||
// genesis
|
||||
func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
|
||||
voteCh0 := make(chan interface{})
|
||||
err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote, voteCh0)
|
||||
func subscribeToVoter(cs *ConsensusState, addr []byte) <-chan tmpubsub.Message {
|
||||
votesSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote))
|
||||
}
|
||||
voteCh := make(chan interface{})
|
||||
ch := make(chan tmpubsub.Message)
|
||||
go func() {
|
||||
for v := range voteCh0 {
|
||||
vote := v.(types.EventDataVote)
|
||||
for msg := range votesSub.Out() {
|
||||
vote := msg.Data().(types.EventDataVote)
|
||||
// we only fire for our own votes
|
||||
if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
|
||||
voteCh <- v
|
||||
ch <- msg
|
||||
}
|
||||
}
|
||||
}()
|
||||
return voteCh
|
||||
return ch
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------------------------
|
||||
@@ -323,7 +321,7 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
|
||||
|
||||
//-------------------------------------------------------------------------------
|
||||
|
||||
func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration,
|
||||
func ensureNoNewEvent(ch <-chan tmpubsub.Message, timeout time.Duration,
|
||||
errorMessage string) {
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
@@ -333,28 +331,28 @@ func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration,
|
||||
}
|
||||
}
|
||||
|
||||
func ensureNoNewEventOnChannel(ch <-chan interface{}) {
|
||||
func ensureNoNewEventOnChannel(ch <-chan tmpubsub.Message) {
|
||||
ensureNoNewEvent(
|
||||
ch,
|
||||
ensureTimeout,
|
||||
"We should be stuck waiting, not receiving new event on the channel")
|
||||
}
|
||||
|
||||
func ensureNoNewRoundStep(stepCh <-chan interface{}) {
|
||||
func ensureNoNewRoundStep(stepCh <-chan tmpubsub.Message) {
|
||||
ensureNoNewEvent(
|
||||
stepCh,
|
||||
ensureTimeout,
|
||||
"We should be stuck waiting, not receiving NewRoundStep event")
|
||||
}
|
||||
|
||||
func ensureNoNewUnlock(unlockCh <-chan interface{}) {
|
||||
func ensureNoNewUnlock(unlockCh <-chan tmpubsub.Message) {
|
||||
ensureNoNewEvent(
|
||||
unlockCh,
|
||||
ensureTimeout,
|
||||
"We should be stuck waiting, not receiving Unlock event")
|
||||
}
|
||||
|
||||
func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) {
|
||||
func ensureNoNewTimeout(stepCh <-chan tmpubsub.Message, timeout int64) {
|
||||
timeoutDuration := time.Duration(timeout*5) * time.Nanosecond
|
||||
ensureNoNewEvent(
|
||||
stepCh,
|
||||
@@ -362,142 +360,157 @@ func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) {
|
||||
"We should be stuck waiting, not receiving NewTimeout event")
|
||||
}
|
||||
|
||||
func ensureNewEvent(
|
||||
ch <-chan interface{},
|
||||
height int64,
|
||||
round int,
|
||||
timeout time.Duration,
|
||||
errorMessage string) {
|
||||
|
||||
func ensureNewEvent(ch <-chan tmpubsub.Message, height int64, round int, timeout time.Duration, errorMessage string) {
|
||||
select {
|
||||
case <-time.After(timeout):
|
||||
panic(errorMessage)
|
||||
case ev := <-ch:
|
||||
rs, ok := ev.(types.EventDataRoundState)
|
||||
case msg := <-ch:
|
||||
roundStateEvent, ok := msg.Data().(types.EventDataRoundState)
|
||||
if !ok {
|
||||
panic(
|
||||
fmt.Sprintf(
|
||||
"expected a EventDataRoundState, got %v.Wrong subscription channel?",
|
||||
reflect.TypeOf(rs)))
|
||||
panic(fmt.Sprintf("expected a EventDataRoundState, got %T. Wrong subscription channel?",
|
||||
msg.Data()))
|
||||
}
|
||||
if rs.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
|
||||
if roundStateEvent.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, roundStateEvent.Height))
|
||||
}
|
||||
if rs.Round != round {
|
||||
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
|
||||
if roundStateEvent.Round != round {
|
||||
panic(fmt.Sprintf("expected round %v, got %v", round, roundStateEvent.Round))
|
||||
}
|
||||
// TODO: We could check also for a step at this point!
|
||||
}
|
||||
}
|
||||
|
||||
func ensureNewRound(roundCh <-chan interface{}, height int64, round int) {
|
||||
func ensureNewRound(roundCh <-chan tmpubsub.Message, height int64, round int) {
|
||||
select {
|
||||
case <-time.After(ensureTimeout):
|
||||
panic("Timeout expired while waiting for NewRound event")
|
||||
case ev := <-roundCh:
|
||||
rs, ok := ev.(types.EventDataNewRound)
|
||||
case msg := <-roundCh:
|
||||
newRoundEvent, ok := msg.Data().(types.EventDataNewRound)
|
||||
if !ok {
|
||||
panic(
|
||||
fmt.Sprintf(
|
||||
"expected a EventDataNewRound, got %v.Wrong subscription channel?",
|
||||
reflect.TypeOf(rs)))
|
||||
panic(fmt.Sprintf("expected a EventDataNewRound, got %T. Wrong subscription channel?",
|
||||
msg.Data()))
|
||||
}
|
||||
if rs.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
|
||||
if newRoundEvent.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, newRoundEvent.Height))
|
||||
}
|
||||
if rs.Round != round {
|
||||
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
|
||||
if newRoundEvent.Round != round {
|
||||
panic(fmt.Sprintf("expected round %v, got %v", round, newRoundEvent.Round))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) {
|
||||
func ensureNewTimeout(timeoutCh <-chan tmpubsub.Message, height int64, round int, timeout int64) {
|
||||
timeoutDuration := time.Duration(timeout*5) * time.Nanosecond
|
||||
ensureNewEvent(timeoutCh, height, round, timeoutDuration,
|
||||
"Timeout expired while waiting for NewTimeout event")
|
||||
}
|
||||
|
||||
func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) {
|
||||
func ensureNewProposal(proposalCh <-chan tmpubsub.Message, height int64, round int) {
|
||||
select {
|
||||
case <-time.After(ensureTimeout):
|
||||
panic("Timeout expired while waiting for NewProposal event")
|
||||
case ev := <-proposalCh:
|
||||
rs, ok := ev.(types.EventDataCompleteProposal)
|
||||
case msg := <-proposalCh:
|
||||
proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal)
|
||||
if !ok {
|
||||
panic(
|
||||
fmt.Sprintf(
|
||||
"expected a EventDataCompleteProposal, got %v.Wrong subscription channel?",
|
||||
reflect.TypeOf(rs)))
|
||||
panic(fmt.Sprintf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
|
||||
msg.Data()))
|
||||
}
|
||||
if rs.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
|
||||
if proposalEvent.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, proposalEvent.Height))
|
||||
}
|
||||
if rs.Round != round {
|
||||
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
|
||||
if proposalEvent.Round != round {
|
||||
panic(fmt.Sprintf("expected round %v, got %v", round, proposalEvent.Round))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ensureNewValidBlock(validBlockCh <-chan interface{}, height int64, round int) {
|
||||
func ensureNewValidBlock(validBlockCh <-chan tmpubsub.Message, height int64, round int) {
|
||||
ensureNewEvent(validBlockCh, height, round, ensureTimeout,
|
||||
"Timeout expired while waiting for NewValidBlock event")
|
||||
}
|
||||
|
||||
func ensureNewBlock(blockCh <-chan interface{}, height int64) {
|
||||
func ensureNewBlock(blockCh <-chan tmpubsub.Message, height int64) {
|
||||
select {
|
||||
case <-time.After(ensureTimeout):
|
||||
panic("Timeout expired while waiting for NewBlock event")
|
||||
case ev := <-blockCh:
|
||||
block, ok := ev.(types.EventDataNewBlock)
|
||||
case msg := <-blockCh:
|
||||
blockEvent, ok := msg.Data().(types.EventDataNewBlock)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("expected a *types.EventDataNewBlock, "+
|
||||
"got %v. wrong subscription channel?",
|
||||
reflect.TypeOf(block)))
|
||||
panic(fmt.Sprintf("expected a EventDataNewBlock, got %T. Wrong subscription channel?",
|
||||
msg.Data()))
|
||||
}
|
||||
if block.Block.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, block.Block.Height))
|
||||
if blockEvent.Block.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, blockEvent.Block.Height))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ensureNewBlockHeader(blockCh <-chan interface{}, height int64, blockHash cmn.HexBytes) {
|
||||
func ensureNewBlockHeader(blockCh <-chan tmpubsub.Message, height int64, blockHash cmn.HexBytes) {
|
||||
select {
|
||||
case <-time.After(ensureTimeout):
|
||||
panic("Timeout expired while waiting for NewBlockHeader event")
|
||||
case ev := <-blockCh:
|
||||
blockHeader, ok := ev.(types.EventDataNewBlockHeader)
|
||||
case msg := <-blockCh:
|
||||
blockHeaderEvent, ok := msg.Data().(types.EventDataNewBlockHeader)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("expected a *types.EventDataNewBlockHeader, "+
|
||||
"got %v. wrong subscription channel?",
|
||||
reflect.TypeOf(blockHeader)))
|
||||
panic(fmt.Sprintf("expected a EventDataNewBlockHeader, got %T. Wrong subscription channel?",
|
||||
msg.Data()))
|
||||
}
|
||||
if blockHeader.Header.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, blockHeader.Header.Height))
|
||||
if blockHeaderEvent.Header.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, blockHeaderEvent.Header.Height))
|
||||
}
|
||||
if !bytes.Equal(blockHeader.Header.Hash(), blockHash) {
|
||||
panic(fmt.Sprintf("expected header %X, got %X", blockHash, blockHeader.Header.Hash()))
|
||||
if !bytes.Equal(blockHeaderEvent.Header.Hash(), blockHash) {
|
||||
panic(fmt.Sprintf("expected header %X, got %X", blockHash, blockHeaderEvent.Header.Hash()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ensureNewUnlock(unlockCh <-chan interface{}, height int64, round int) {
|
||||
func ensureNewUnlock(unlockCh <-chan tmpubsub.Message, height int64, round int) {
|
||||
ensureNewEvent(unlockCh, height, round, ensureTimeout,
|
||||
"Timeout expired while waiting for NewUnlock event")
|
||||
}
|
||||
|
||||
func ensureVote(voteCh <-chan interface{}, height int64, round int,
|
||||
func ensureProposal(proposalCh <-chan tmpubsub.Message, height int64, round int, propID types.BlockID) {
|
||||
select {
|
||||
case <-time.After(ensureTimeout):
|
||||
panic("Timeout expired while waiting for NewProposal event")
|
||||
case msg := <-proposalCh:
|
||||
proposalEvent, ok := msg.Data().(types.EventDataCompleteProposal)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
|
||||
msg.Data()))
|
||||
}
|
||||
if proposalEvent.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, proposalEvent.Height))
|
||||
}
|
||||
if proposalEvent.Round != round {
|
||||
panic(fmt.Sprintf("expected round %v, got %v", round, proposalEvent.Round))
|
||||
}
|
||||
if !proposalEvent.BlockID.Equals(propID) {
|
||||
panic("Proposed block does not match expected block")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ensurePrecommit(voteCh <-chan tmpubsub.Message, height int64, round int) {
|
||||
ensureVote(voteCh, height, round, types.PrecommitType)
|
||||
}
|
||||
|
||||
func ensurePrevote(voteCh <-chan tmpubsub.Message, height int64, round int) {
|
||||
ensureVote(voteCh, height, round, types.PrevoteType)
|
||||
}
|
||||
|
||||
func ensureVote(voteCh <-chan tmpubsub.Message, height int64, round int,
|
||||
voteType types.SignedMsgType) {
|
||||
select {
|
||||
case <-time.After(ensureTimeout):
|
||||
panic("Timeout expired while waiting for NewVote event")
|
||||
case v := <-voteCh:
|
||||
edv, ok := v.(types.EventDataVote)
|
||||
case msg := <-voteCh:
|
||||
voteEvent, ok := msg.Data().(types.EventDataVote)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("expected a *types.Vote, "+
|
||||
"got %v. wrong subscription channel?",
|
||||
reflect.TypeOf(v)))
|
||||
panic(fmt.Sprintf("expected a EventDataVote, got %T. Wrong subscription channel?",
|
||||
msg.Data()))
|
||||
}
|
||||
vote := edv.Vote
|
||||
vote := voteEvent.Vote
|
||||
if vote.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height))
|
||||
}
|
||||
@@ -510,39 +523,7 @@ func ensureVote(voteCh <-chan interface{}, height int64, round int,
|
||||
}
|
||||
}
|
||||
|
||||
func ensureProposal(proposalCh <-chan interface{}, height int64, round int, propId types.BlockID) {
|
||||
select {
|
||||
case <-time.After(ensureTimeout):
|
||||
panic("Timeout expired while waiting for NewProposal event")
|
||||
case ev := <-proposalCh:
|
||||
rs, ok := ev.(types.EventDataCompleteProposal)
|
||||
if !ok {
|
||||
panic(
|
||||
fmt.Sprintf(
|
||||
"expected a EventDataCompleteProposal, got %v.Wrong subscription channel?",
|
||||
reflect.TypeOf(rs)))
|
||||
}
|
||||
if rs.Height != height {
|
||||
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
|
||||
}
|
||||
if rs.Round != round {
|
||||
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
|
||||
}
|
||||
if !rs.BlockID.Equals(propId) {
|
||||
panic("Proposed block does not match expected block")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ensurePrecommit(voteCh <-chan interface{}, height int64, round int) {
|
||||
ensureVote(voteCh, height, round, types.PrecommitType)
|
||||
}
|
||||
|
||||
func ensurePrevote(voteCh <-chan interface{}, height int64, round int) {
|
||||
ensureVote(voteCh, height, round, types.PrevoteType)
|
||||
}
|
||||
|
||||
func ensureNewEventOnChannel(ch <-chan interface{}) {
|
||||
func ensureNewEventOnChannel(ch <-chan tmpubsub.Message) {
|
||||
select {
|
||||
case <-time.After(ensureTimeout):
|
||||
panic("Timeout expired while waiting for new activity on the channel")
|
||||
|
@@ -117,9 +117,9 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
|
||||
for nTxs := 0; nTxs < NTxs; {
|
||||
ticker := time.NewTicker(time.Second * 30)
|
||||
select {
|
||||
case b := <-newBlockCh:
|
||||
evt := b.(types.EventDataNewBlock)
|
||||
nTxs += int(evt.Block.Header.NumTxs)
|
||||
case msg := <-newBlockCh:
|
||||
blockEvent := msg.Data().(types.EventDataNewBlock)
|
||||
nTxs += int(blockEvent.Block.Header.NumTxs)
|
||||
case <-ticker.C:
|
||||
panic("Timed out waiting to commit blocks with transactions")
|
||||
}
|
||||
|
@@ -30,9 +30,13 @@ import (
|
||||
//----------------------------------------------
|
||||
// in-process testnets
|
||||
|
||||
func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*ConsensusReactor, []chan interface{}, []*types.EventBus) {
|
||||
func startConsensusNet(t *testing.T, css []*ConsensusState, N int) (
|
||||
[]*ConsensusReactor,
|
||||
[]types.Subscription,
|
||||
[]*types.EventBus,
|
||||
) {
|
||||
reactors := make([]*ConsensusReactor, N)
|
||||
eventChans := make([]chan interface{}, N)
|
||||
blocksSubs := make([]types.Subscription, 0)
|
||||
eventBuses := make([]*types.EventBus, N)
|
||||
for i := 0; i < N; i++ {
|
||||
/*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info")
|
||||
@@ -44,9 +48,9 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
|
||||
eventBuses[i] = css[i].eventBus
|
||||
reactors[i].SetEventBus(eventBuses[i])
|
||||
|
||||
eventChans[i] = make(chan interface{}, 1)
|
||||
err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, eventChans[i])
|
||||
blocksSub, err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
|
||||
require.NoError(t, err)
|
||||
blocksSubs = append(blocksSubs, blocksSub)
|
||||
}
|
||||
// make connected switches and start all reactors
|
||||
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
|
||||
@@ -63,7 +67,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ([]*Consensus
|
||||
s := reactors[i].conS.GetState()
|
||||
reactors[i].SwitchToConsensus(s, 0)
|
||||
}
|
||||
return reactors, eventChans, eventBuses
|
||||
return reactors, blocksSubs, eventBuses
|
||||
}
|
||||
|
||||
func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuses []*types.EventBus) {
|
||||
@@ -84,11 +88,11 @@ func TestReactorBasic(t *testing.T) {
|
||||
N := 4
|
||||
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
|
||||
defer cleanup()
|
||||
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
|
||||
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
|
||||
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||
// wait till everyone makes the first new block
|
||||
timeoutWaitGroup(t, N, func(j int) {
|
||||
<-eventChans[j]
|
||||
<-blocksSubs[j].Out()
|
||||
}, css)
|
||||
}
|
||||
|
||||
@@ -161,20 +165,20 @@ func TestReactorWithEvidence(t *testing.T) {
|
||||
css[i] = cs
|
||||
}
|
||||
|
||||
reactors, eventChans, eventBuses := startConsensusNet(t, css, nValidators)
|
||||
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, nValidators)
|
||||
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||
|
||||
// wait till everyone makes the first new block with no evidence
|
||||
timeoutWaitGroup(t, nValidators, func(j int) {
|
||||
blockI := <-eventChans[j]
|
||||
block := blockI.(types.EventDataNewBlock).Block
|
||||
msg := <-blocksSubs[j].Out()
|
||||
block := msg.Data().(types.EventDataNewBlock).Block
|
||||
assert.True(t, len(block.Evidence.Evidence) == 0)
|
||||
}, css)
|
||||
|
||||
// second block should have evidence
|
||||
timeoutWaitGroup(t, nValidators, func(j int) {
|
||||
blockI := <-eventChans[j]
|
||||
block := blockI.(types.EventDataNewBlock).Block
|
||||
msg := <-blocksSubs[j].Out()
|
||||
block := msg.Data().(types.EventDataNewBlock).Block
|
||||
assert.True(t, len(block.Evidence.Evidence) > 0)
|
||||
}, css)
|
||||
}
|
||||
@@ -221,7 +225,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
|
||||
c.Consensus.CreateEmptyBlocks = false
|
||||
})
|
||||
defer cleanup()
|
||||
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
|
||||
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
|
||||
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||
|
||||
// send a tx
|
||||
@@ -231,7 +235,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) {
|
||||
|
||||
// wait till everyone makes the first new block
|
||||
timeoutWaitGroup(t, N, func(j int) {
|
||||
<-eventChans[j]
|
||||
<-blocksSubs[j].Out()
|
||||
}, css)
|
||||
}
|
||||
|
||||
@@ -240,12 +244,12 @@ func TestReactorRecordsVotesAndBlockParts(t *testing.T) {
|
||||
N := 4
|
||||
css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter)
|
||||
defer cleanup()
|
||||
reactors, eventChans, eventBuses := startConsensusNet(t, css, N)
|
||||
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N)
|
||||
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||
|
||||
// wait till everyone makes the first new block
|
||||
timeoutWaitGroup(t, N, func(j int) {
|
||||
<-eventChans[j]
|
||||
<-blocksSubs[j].Out()
|
||||
}, css)
|
||||
|
||||
// Get peer
|
||||
@@ -265,7 +269,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
|
||||
logger := log.TestingLogger()
|
||||
css, cleanup := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentKVStore)
|
||||
defer cleanup()
|
||||
reactors, eventChans, eventBuses := startConsensusNet(t, css, nVals)
|
||||
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, nVals)
|
||||
defer stopConsensusNet(logger, reactors, eventBuses)
|
||||
|
||||
// map of active validators
|
||||
@@ -277,7 +281,7 @@ func TestReactorVotingPowerChange(t *testing.T) {
|
||||
|
||||
// wait till everyone makes block 1
|
||||
timeoutWaitGroup(t, nVals, func(j int) {
|
||||
<-eventChans[j]
|
||||
<-blocksSubs[j].Out()
|
||||
}, css)
|
||||
|
||||
//---------------------------------------------------------------------------
|
||||
@@ -288,10 +292,10 @@ func TestReactorVotingPowerChange(t *testing.T) {
|
||||
updateValidatorTx := kvstore.MakeValSetChangeTx(val1PubKeyABCI, 25)
|
||||
previousTotalVotingPower := css[0].GetRoundState().LastValidators.TotalVotingPower()
|
||||
|
||||
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
|
||||
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
|
||||
waitForAndValidateBlockWithTx(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
|
||||
|
||||
if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
|
||||
t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
|
||||
@@ -300,10 +304,10 @@ func TestReactorVotingPowerChange(t *testing.T) {
|
||||
updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 2)
|
||||
previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
|
||||
|
||||
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
|
||||
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
|
||||
waitForAndValidateBlockWithTx(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
|
||||
|
||||
if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
|
||||
t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
|
||||
@@ -312,10 +316,10 @@ func TestReactorVotingPowerChange(t *testing.T) {
|
||||
updateValidatorTx = kvstore.MakeValSetChangeTx(val1PubKeyABCI, 26)
|
||||
previousTotalVotingPower = css[0].GetRoundState().LastValidators.TotalVotingPower()
|
||||
|
||||
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css, updateValidatorTx)
|
||||
waitForAndValidateBlockWithTx(t, nVals, activeVals, eventChans, css, updateValidatorTx)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, eventChans, css)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
|
||||
waitForAndValidateBlockWithTx(t, nVals, activeVals, blocksSubs, css, updateValidatorTx)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
|
||||
waitForAndValidateBlock(t, nVals, activeVals, blocksSubs, css)
|
||||
|
||||
if css[0].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
|
||||
t.Fatalf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[0].GetRoundState().LastValidators.TotalVotingPower())
|
||||
@@ -329,7 +333,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
|
||||
defer cleanup()
|
||||
logger := log.TestingLogger()
|
||||
|
||||
reactors, eventChans, eventBuses := startConsensusNet(t, css, nPeers)
|
||||
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, nPeers)
|
||||
defer stopConsensusNet(logger, reactors, eventBuses)
|
||||
|
||||
// map of active validators
|
||||
@@ -341,7 +345,7 @@ func TestReactorValidatorSetChanges(t *testing.T) {
|
||||
|
||||
// wait till everyone makes block 1
|
||||
timeoutWaitGroup(t, nPeers, func(j int) {
|
||||
<-eventChans[j]
|
||||
<-blocksSubs[j].Out()
|
||||
}, css)
|
||||
|
||||
//---------------------------------------------------------------------------
|
||||
@@ -354,22 +358,22 @@ func TestReactorValidatorSetChanges(t *testing.T) {
|
||||
// wait till everyone makes block 2
|
||||
// ensure the commit includes all validators
|
||||
// send newValTx to change vals in block 3
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx1)
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, newValidatorTx1)
|
||||
|
||||
// wait till everyone makes block 3.
|
||||
// it includes the commit for block 2, which is by the original validator set
|
||||
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx1)
|
||||
waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, newValidatorTx1)
|
||||
|
||||
// wait till everyone makes block 4.
|
||||
// it includes the commit for block 3, which is by the original validator set
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css)
|
||||
|
||||
// the commits for block 4 should be with the updated validator set
|
||||
activeVals[string(newValidatorPubKey1.Address())] = struct{}{}
|
||||
|
||||
// wait till everyone makes block 5
|
||||
// it includes the commit for block 4, which should have the updated validator set
|
||||
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
|
||||
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css)
|
||||
|
||||
//---------------------------------------------------------------------------
|
||||
logger.Info("---------------------------- Testing changing the voting power of one validator")
|
||||
@@ -379,10 +383,10 @@ func TestReactorValidatorSetChanges(t *testing.T) {
|
||||
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
|
||||
previousTotalVotingPower := css[nVals].GetRoundState().LastValidators.TotalVotingPower()
|
||||
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
|
||||
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, updateValidatorTx1)
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
|
||||
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, updateValidatorTx1)
|
||||
waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, updateValidatorTx1)
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css)
|
||||
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css)
|
||||
|
||||
if css[nVals].GetRoundState().LastValidators.TotalVotingPower() == previousTotalVotingPower {
|
||||
t.Errorf("expected voting power to change (before: %d, after: %d)", previousTotalVotingPower, css[nVals].GetRoundState().LastValidators.TotalVotingPower())
|
||||
@@ -399,12 +403,12 @@ func TestReactorValidatorSetChanges(t *testing.T) {
|
||||
newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3)
|
||||
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
|
||||
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
|
||||
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, newValidatorTx2, newValidatorTx3)
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, newValidatorTx2, newValidatorTx3)
|
||||
waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, newValidatorTx2, newValidatorTx3)
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css)
|
||||
activeVals[string(newValidatorPubKey2.Address())] = struct{}{}
|
||||
activeVals[string(newValidatorPubKey3.Address())] = struct{}{}
|
||||
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
|
||||
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css)
|
||||
|
||||
//---------------------------------------------------------------------------
|
||||
logger.Info("---------------------------- Testing removing two validators at once")
|
||||
@@ -412,12 +416,12 @@ func TestReactorValidatorSetChanges(t *testing.T) {
|
||||
removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0)
|
||||
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
|
||||
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
|
||||
waitForAndValidateBlockWithTx(t, nPeers, activeVals, eventChans, css, removeValidatorTx2, removeValidatorTx3)
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, eventChans, css)
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css, removeValidatorTx2, removeValidatorTx3)
|
||||
waitForAndValidateBlockWithTx(t, nPeers, activeVals, blocksSubs, css, removeValidatorTx2, removeValidatorTx3)
|
||||
waitForAndValidateBlock(t, nPeers, activeVals, blocksSubs, css)
|
||||
delete(activeVals, string(newValidatorPubKey2.Address()))
|
||||
delete(activeVals, string(newValidatorPubKey3.Address()))
|
||||
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, eventChans, css)
|
||||
waitForBlockWithUpdatedValsAndValidateIt(t, nPeers, activeVals, blocksSubs, css)
|
||||
}
|
||||
|
||||
// Check we can make blocks with skip_timeout_commit=false
|
||||
@@ -430,23 +434,27 @@ func TestReactorWithTimeoutCommit(t *testing.T) {
|
||||
css[i].config.SkipTimeoutCommit = false
|
||||
}
|
||||
|
||||
reactors, eventChans, eventBuses := startConsensusNet(t, css, N-1)
|
||||
reactors, blocksSubs, eventBuses := startConsensusNet(t, css, N-1)
|
||||
defer stopConsensusNet(log.TestingLogger(), reactors, eventBuses)
|
||||
|
||||
// wait till everyone makes the first new block
|
||||
timeoutWaitGroup(t, N-1, func(j int) {
|
||||
<-eventChans[j]
|
||||
<-blocksSubs[j].Out()
|
||||
}, css)
|
||||
}
|
||||
|
||||
func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
|
||||
func waitForAndValidateBlock(
|
||||
t *testing.T,
|
||||
n int,
|
||||
activeVals map[string]struct{},
|
||||
blocksSubs []types.Subscription,
|
||||
css []*ConsensusState,
|
||||
txs ...[]byte,
|
||||
) {
|
||||
timeoutWaitGroup(t, n, func(j int) {
|
||||
css[j].Logger.Debug("waitForAndValidateBlock")
|
||||
newBlockI, ok := <-eventChans[j]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
newBlock := newBlockI.(types.EventDataNewBlock).Block
|
||||
msg := <-blocksSubs[j].Out()
|
||||
newBlock := msg.Data().(types.EventDataNewBlock).Block
|
||||
css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height)
|
||||
err := validateBlock(newBlock, activeVals)
|
||||
assert.Nil(t, err)
|
||||
@@ -457,17 +465,21 @@ func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}
|
||||
}, css)
|
||||
}
|
||||
|
||||
func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) {
|
||||
func waitForAndValidateBlockWithTx(
|
||||
t *testing.T,
|
||||
n int,
|
||||
activeVals map[string]struct{},
|
||||
blocksSubs []types.Subscription,
|
||||
css []*ConsensusState,
|
||||
txs ...[]byte,
|
||||
) {
|
||||
timeoutWaitGroup(t, n, func(j int) {
|
||||
ntxs := 0
|
||||
BLOCK_TX_LOOP:
|
||||
for {
|
||||
css[j].Logger.Debug("waitForAndValidateBlockWithTx", "ntxs", ntxs)
|
||||
newBlockI, ok := <-eventChans[j]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
newBlock := newBlockI.(types.EventDataNewBlock).Block
|
||||
msg := <-blocksSubs[j].Out()
|
||||
newBlock := msg.Data().(types.EventDataNewBlock).Block
|
||||
css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height)
|
||||
err := validateBlock(newBlock, activeVals)
|
||||
assert.Nil(t, err)
|
||||
@@ -488,18 +500,21 @@ func waitForAndValidateBlockWithTx(t *testing.T, n int, activeVals map[string]st
|
||||
}, css)
|
||||
}
|
||||
|
||||
func waitForBlockWithUpdatedValsAndValidateIt(t *testing.T, n int, updatedVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState) {
|
||||
func waitForBlockWithUpdatedValsAndValidateIt(
|
||||
t *testing.T,
|
||||
n int,
|
||||
updatedVals map[string]struct{},
|
||||
blocksSubs []types.Subscription,
|
||||
css []*ConsensusState,
|
||||
) {
|
||||
timeoutWaitGroup(t, n, func(j int) {
|
||||
|
||||
var newBlock *types.Block
|
||||
LOOP:
|
||||
for {
|
||||
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt")
|
||||
newBlockI, ok := <-eventChans[j]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
newBlock = newBlockI.(types.EventDataNewBlock).Block
|
||||
msg := <-blocksSubs[j].Out()
|
||||
newBlock = msg.Data().(types.EventDataNewBlock).Block
|
||||
if newBlock.LastCommit.Size() == len(updatedVals) {
|
||||
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height)
|
||||
break LOOP
|
||||
|
@@ -42,7 +42,7 @@ var crc32c = crc32.MakeTable(crc32.Castagnoli)
|
||||
// Unmarshal and apply a single message to the consensus state as if it were
|
||||
// received in receiveRoutine. Lines that start with "#" are ignored.
|
||||
// NOTE: receiveRoutine should not be running.
|
||||
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan interface{}) error {
|
||||
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscription) error {
|
||||
// Skip meta messages which exist for demarcating boundaries.
|
||||
if _, ok := msg.Msg.(EndHeightMessage); ok {
|
||||
return nil
|
||||
@@ -54,15 +54,17 @@ func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepCh chan
|
||||
cs.Logger.Info("Replay: New Step", "height", m.Height, "round", m.Round, "step", m.Step)
|
||||
// these are playback checks
|
||||
ticker := time.After(time.Second * 2)
|
||||
if newStepCh != nil {
|
||||
if newStepSub != nil {
|
||||
select {
|
||||
case mi := <-newStepCh:
|
||||
m2 := mi.(types.EventDataRoundState)
|
||||
case stepMsg := <-newStepSub.Out():
|
||||
m2 := stepMsg.Data().(types.EventDataRoundState)
|
||||
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
|
||||
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
|
||||
}
|
||||
case <-newStepSub.Cancelled():
|
||||
return fmt.Errorf("Failed to read off newStepSub.Out(). newStepSub was cancelled")
|
||||
case <-ticker:
|
||||
return fmt.Errorf("Failed to read off newStepCh")
|
||||
return fmt.Errorf("Failed to read off newStepSub.Out()")
|
||||
}
|
||||
}
|
||||
case msgInfo:
|
||||
|
@@ -51,25 +51,13 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
|
||||
cs.startForReplay()
|
||||
|
||||
// ensure all new step events are regenerated as expected
|
||||
newStepCh := make(chan interface{}, 1)
|
||||
|
||||
ctx := context.Background()
|
||||
err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh)
|
||||
newStepSub, err := cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
|
||||
if err != nil {
|
||||
return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)
|
||||
}
|
||||
defer func() {
|
||||
// drain newStepCh to make sure we don't block
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-newStepCh:
|
||||
default:
|
||||
break LOOP
|
||||
}
|
||||
}
|
||||
cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
|
||||
}()
|
||||
defer cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
|
||||
|
||||
// just open the file for reading, no need to use wal
|
||||
fp, err := os.OpenFile(file, os.O_RDONLY, 0600)
|
||||
@@ -94,7 +82,7 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil {
|
||||
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -131,7 +119,7 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState sm.S
|
||||
}
|
||||
|
||||
// go back count steps by resetting the state and running (pb.count - count) steps
|
||||
func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
|
||||
func (pb *playback) replayReset(count int, newStepSub types.Subscription) error {
|
||||
pb.cs.Stop()
|
||||
pb.cs.Wait()
|
||||
|
||||
@@ -161,7 +149,7 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := pb.cs.readReplayMessage(msg, newStepCh); err != nil {
|
||||
if err := pb.cs.readReplayMessage(msg, newStepSub); err != nil {
|
||||
return err
|
||||
}
|
||||
pb.count++
|
||||
@@ -225,27 +213,15 @@ func (pb *playback) replayConsoleLoop() int {
|
||||
|
||||
ctx := context.Background()
|
||||
// ensure all new step events are regenerated as expected
|
||||
newStepCh := make(chan interface{}, 1)
|
||||
|
||||
err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, newStepCh)
|
||||
newStepSub, err := pb.cs.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep)
|
||||
if err != nil {
|
||||
cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep))
|
||||
}
|
||||
defer func() {
|
||||
// drain newStepCh to make sure we don't block
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-newStepCh:
|
||||
default:
|
||||
break LOOP
|
||||
}
|
||||
}
|
||||
pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
|
||||
}()
|
||||
defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep)
|
||||
|
||||
if len(tokens) == 1 {
|
||||
if err := pb.replayReset(1, newStepCh); err != nil {
|
||||
if err := pb.replayReset(1, newStepSub); err != nil {
|
||||
pb.cs.Logger.Error("Replay reset error", "err", err)
|
||||
}
|
||||
} else {
|
||||
@@ -255,7 +231,7 @@ func (pb *playback) replayConsoleLoop() int {
|
||||
} else if i > pb.count {
|
||||
fmt.Printf("argument to back must not be larger than the current count (%d)\n", pb.count)
|
||||
} else {
|
||||
if err := pb.replayReset(i, newStepCh); err != nil {
|
||||
if err := pb.replayReset(i, newStepSub); err != nil {
|
||||
pb.cs.Logger.Error("Replay reset error", "err", err)
|
||||
}
|
||||
}
|
||||
|
@@ -78,13 +78,14 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, consensusReplayConfig *
|
||||
// in the WAL itself. Assuming the consensus state is running, replay of any
|
||||
// WAL, including the empty one, should eventually be followed by a new
|
||||
// block, or else something is wrong.
|
||||
newBlockCh := make(chan interface{}, 1)
|
||||
err = cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock, newBlockCh)
|
||||
newBlockSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
|
||||
require.NoError(t, err)
|
||||
select {
|
||||
case <-newBlockCh:
|
||||
case <-newBlockSub.Out():
|
||||
case <-newBlockSub.Cancelled():
|
||||
t.Fatal("newBlockSub was cancelled")
|
||||
case <-time.After(120 * time.Second):
|
||||
t.Fatalf("Timed out waiting for new block (see trace above)")
|
||||
t.Fatal("Timed out waiting for new block (see trace above)")
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -1620,11 +1620,10 @@ func TestStateOutputVoteStats(t *testing.T) {
|
||||
}
|
||||
|
||||
// subscribe subscribes test client to the given query and returns a channel with cap = 1.
|
||||
func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan interface{} {
|
||||
out := make(chan interface{}, 1)
|
||||
err := eventBus.Subscribe(context.Background(), testSubscriber, q, out)
|
||||
func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.Message {
|
||||
sub, err := eventBus.Subscribe(context.Background(), testSubscriber, q)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, q))
|
||||
}
|
||||
return out
|
||||
return sub.Out()
|
||||
}
|
||||
|
@@ -19,10 +19,9 @@ func TestExample(t *testing.T) {
|
||||
defer s.Stop()
|
||||
|
||||
ctx := context.Background()
|
||||
ch := make(chan interface{}, 1)
|
||||
err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"), ch)
|
||||
subscription, err := s.Subscribe(ctx, "example-client", query.MustParse("abci.account.name='John'"))
|
||||
require.NoError(t, err)
|
||||
err = s.PublishWithTags(ctx, "Tombstone", pubsub.NewTagMap(map[string]string{"abci.account.name": "John"}))
|
||||
err = s.PublishWithTags(ctx, "Tombstone", map[string]string{"abci.account.name": "John"})
|
||||
require.NoError(t, err)
|
||||
assertReceive(t, "Tombstone", ch)
|
||||
assertReceive(t, "Tombstone", subscription.Out())
|
||||
}
|
||||
|
@@ -10,41 +10,25 @@
|
||||
// match, this message will be pushed to all clients, subscribed to that query.
|
||||
// See query subpackage for our implementation.
|
||||
//
|
||||
// Due to the blocking send implementation, a single subscriber can freeze an
|
||||
// entire server by not reading messages before it unsubscribes. To avoid such
|
||||
// scenario, subscribers must either:
|
||||
// Example:
|
||||
//
|
||||
// a) make sure they continue to read from the out channel until
|
||||
// Unsubscribe(All) is called
|
||||
// q, err := query.New("account.name='John'")
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// ctx, cancel := context.WithTimeout(context.Background(), 1 * time.Second)
|
||||
// defer cancel()
|
||||
// subscription, err := pubsub.Subscribe(ctx, "johns-transactions", q)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// s.Subscribe(ctx, sub, qry, out)
|
||||
// go func() {
|
||||
// for msg := range out {
|
||||
// // handle msg
|
||||
// // will exit automatically when out is closed by Unsubscribe(All)
|
||||
// }
|
||||
// }()
|
||||
// s.UnsubscribeAll(ctx, sub)
|
||||
//
|
||||
// b) drain the out channel before calling Unsubscribe(All)
|
||||
//
|
||||
// s.Subscribe(ctx, sub, qry, out)
|
||||
// defer func() {
|
||||
// // drain out to make sure we don't block
|
||||
// LOOP:
|
||||
// for {
|
||||
// select {
|
||||
// case <-out:
|
||||
// default:
|
||||
// break LOOP
|
||||
// }
|
||||
// }
|
||||
// s.UnsubscribeAll(ctx, sub)
|
||||
// }()
|
||||
// for msg := range out {
|
||||
// // handle msg
|
||||
// if err != nil {
|
||||
// return err
|
||||
// for {
|
||||
// select {
|
||||
// case msg <- subscription.Out():
|
||||
// // handle msg.Data() and msg.Tags()
|
||||
// case <-subscription.Cancelled():
|
||||
// return subscription.Err()
|
||||
// }
|
||||
// }
|
||||
//
|
||||
@@ -77,21 +61,25 @@ var (
|
||||
ErrAlreadySubscribed = errors.New("already subscribed")
|
||||
)
|
||||
|
||||
type cmd struct {
|
||||
op operation
|
||||
query Query
|
||||
ch chan<- interface{}
|
||||
clientID string
|
||||
msg interface{}
|
||||
tags TagMap
|
||||
}
|
||||
|
||||
// Query defines an interface for a query to be used for subscribing.
|
||||
type Query interface {
|
||||
Matches(tags TagMap) bool
|
||||
Matches(tags map[string]string) bool
|
||||
String() string
|
||||
}
|
||||
|
||||
type cmd struct {
|
||||
op operation
|
||||
|
||||
// subscribe, unsubscribe
|
||||
query Query
|
||||
subscription *Subscription
|
||||
clientID string
|
||||
|
||||
// publish
|
||||
msg interface{}
|
||||
tags map[string]string
|
||||
}
|
||||
|
||||
// Server allows clients to subscribe/unsubscribe for messages, publishing
|
||||
// messages with or without tags, and manages internal state.
|
||||
type Server struct {
|
||||
@@ -107,37 +95,6 @@ type Server struct {
|
||||
// Option sets a parameter for the server.
|
||||
type Option func(*Server)
|
||||
|
||||
// TagMap is used to associate tags to a message.
|
||||
// They can be queried by subscribers to choose messages they will received.
|
||||
type TagMap interface {
|
||||
// Get returns the value for a key, or nil if no value is present.
|
||||
// The ok result indicates whether value was found in the tags.
|
||||
Get(key string) (value string, ok bool)
|
||||
// Len returns the number of tags.
|
||||
Len() int
|
||||
}
|
||||
|
||||
type tagMap map[string]string
|
||||
|
||||
var _ TagMap = (*tagMap)(nil)
|
||||
|
||||
// NewTagMap constructs a new immutable tag set from a map.
|
||||
func NewTagMap(data map[string]string) TagMap {
|
||||
return tagMap(data)
|
||||
}
|
||||
|
||||
// Get returns the value for a key, or nil if no value is present.
|
||||
// The ok result indicates whether value was found in the tags.
|
||||
func (ts tagMap) Get(key string) (value string, ok bool) {
|
||||
value, ok = ts[key]
|
||||
return
|
||||
}
|
||||
|
||||
// Len returns the number of tags.
|
||||
func (ts tagMap) Len() int {
|
||||
return len(ts)
|
||||
}
|
||||
|
||||
// NewServer returns a new server. See the commentary on the Option functions
|
||||
// for a detailed description of how to configure buffering. If no options are
|
||||
// provided, the resulting server's queue is unbuffered.
|
||||
@@ -174,11 +131,34 @@ func (s *Server) BufferCapacity() int {
|
||||
return s.cmdsCap
|
||||
}
|
||||
|
||||
// Subscribe creates a subscription for the given client. It accepts a channel
|
||||
// on which messages matching the given query can be received. An error will be
|
||||
// returned to the caller if the context is canceled or if subscription already
|
||||
// exist for pair clientID and query.
|
||||
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, out chan<- interface{}) error {
|
||||
// Subscribe creates a subscription for the given client.
|
||||
//
|
||||
// An error will be returned to the caller if the context is canceled or if
|
||||
// subscription already exist for pair clientID and query.
|
||||
//
|
||||
// outCapacity can be used to set a capacity for Subscription#Out channel (1 by
|
||||
// default). Panics if outCapacity is less than or equal to zero. If you want
|
||||
// an unbuffered channel, use SubscribeUnbuffered.
|
||||
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, outCapacity ...int) (*Subscription, error) {
|
||||
outCap := 1
|
||||
if len(outCapacity) > 0 {
|
||||
if outCapacity[0] <= 0 {
|
||||
panic("Negative or zero capacity. Use SubscribeUnbuffered if you want an unbuffered channel")
|
||||
}
|
||||
outCap = outCapacity[0]
|
||||
}
|
||||
|
||||
return s.subscribe(ctx, clientID, query, outCap)
|
||||
}
|
||||
|
||||
// SubscribeUnbuffered does the same as Subscribe, except it returns a
|
||||
// subscription with unbuffered channel. Use with caution as it can freeze the
|
||||
// server.
|
||||
func (s *Server) SubscribeUnbuffered(ctx context.Context, clientID string, query Query) (*Subscription, error) {
|
||||
return s.subscribe(ctx, clientID, query, 0)
|
||||
}
|
||||
|
||||
func (s *Server) subscribe(ctx context.Context, clientID string, query Query, outCapacity int) (*Subscription, error) {
|
||||
s.mtx.RLock()
|
||||
clientSubscriptions, ok := s.subscriptions[clientID]
|
||||
if ok {
|
||||
@@ -186,22 +166,23 @@ func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, ou
|
||||
}
|
||||
s.mtx.RUnlock()
|
||||
if ok {
|
||||
return ErrAlreadySubscribed
|
||||
return nil, ErrAlreadySubscribed
|
||||
}
|
||||
|
||||
subscription := NewSubscription(outCapacity)
|
||||
select {
|
||||
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, ch: out}:
|
||||
case s.cmds <- cmd{op: sub, clientID: clientID, query: query, subscription: subscription}:
|
||||
s.mtx.Lock()
|
||||
if _, ok = s.subscriptions[clientID]; !ok {
|
||||
s.subscriptions[clientID] = make(map[string]struct{})
|
||||
}
|
||||
s.subscriptions[clientID][query.String()] = struct{}{}
|
||||
s.mtx.Unlock()
|
||||
return nil
|
||||
return subscription, nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
return nil, ctx.Err()
|
||||
case <-s.Quit():
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -261,13 +242,13 @@ func (s *Server) UnsubscribeAll(ctx context.Context, clientID string) error {
|
||||
// Publish publishes the given message. An error will be returned to the caller
|
||||
// if the context is canceled.
|
||||
func (s *Server) Publish(ctx context.Context, msg interface{}) error {
|
||||
return s.PublishWithTags(ctx, msg, NewTagMap(make(map[string]string)))
|
||||
return s.PublishWithTags(ctx, msg, make(map[string]string))
|
||||
}
|
||||
|
||||
// PublishWithTags publishes the given message with the set of tags. The set is
|
||||
// matched with clients queries. If there is a match, the message is sent to
|
||||
// the client.
|
||||
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags TagMap) error {
|
||||
func (s *Server) PublishWithTags(ctx context.Context, msg interface{}, tags map[string]string) error {
|
||||
select {
|
||||
case s.cmds <- cmd{op: pub, msg: msg, tags: tags}:
|
||||
return nil
|
||||
@@ -285,10 +266,8 @@ func (s *Server) OnStop() {
|
||||
|
||||
// NOTE: not goroutine safe
|
||||
type state struct {
|
||||
// query string -> client -> ch
|
||||
queryToChanMap map[string]map[string]chan<- interface{}
|
||||
// client -> query string -> struct{}
|
||||
clientToQueryMap map[string]map[string]struct{}
|
||||
// query string -> client -> subscription
|
||||
subscriptions map[string]map[string]*Subscription
|
||||
// query string -> queryPlusRefCount
|
||||
queries map[string]*queryPlusRefCount
|
||||
}
|
||||
@@ -303,9 +282,8 @@ type queryPlusRefCount struct {
|
||||
// OnStart implements Service.OnStart by starting the server.
|
||||
func (s *Server) OnStart() error {
|
||||
go s.loop(state{
|
||||
queryToChanMap: make(map[string]map[string]chan<- interface{}),
|
||||
clientToQueryMap: make(map[string]map[string]struct{}),
|
||||
queries: make(map[string]*queryPlusRefCount),
|
||||
subscriptions: make(map[string]map[string]*Subscription),
|
||||
queries: make(map[string]*queryPlusRefCount),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
@@ -321,75 +299,57 @@ loop:
|
||||
switch cmd.op {
|
||||
case unsub:
|
||||
if cmd.query != nil {
|
||||
state.remove(cmd.clientID, cmd.query)
|
||||
state.remove(cmd.clientID, cmd.query.String(), ErrUnsubscribed)
|
||||
} else {
|
||||
state.removeAll(cmd.clientID)
|
||||
state.removeClient(cmd.clientID, ErrUnsubscribed)
|
||||
}
|
||||
case shutdown:
|
||||
for clientID := range state.clientToQueryMap {
|
||||
state.removeAll(clientID)
|
||||
}
|
||||
state.removeAll(nil)
|
||||
break loop
|
||||
case sub:
|
||||
state.add(cmd.clientID, cmd.query, cmd.ch)
|
||||
state.add(cmd.clientID, cmd.query, cmd.subscription)
|
||||
case pub:
|
||||
state.send(cmd.msg, cmd.tags)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (state *state) add(clientID string, q Query, ch chan<- interface{}) {
|
||||
func (state *state) add(clientID string, q Query, subscription *Subscription) {
|
||||
qStr := q.String()
|
||||
|
||||
// initialize clientToChannelMap per query if needed
|
||||
if _, ok := state.queryToChanMap[qStr]; !ok {
|
||||
state.queryToChanMap[qStr] = make(map[string]chan<- interface{})
|
||||
// initialize subscription for this client per query if needed
|
||||
if _, ok := state.subscriptions[qStr]; !ok {
|
||||
state.subscriptions[qStr] = make(map[string]*Subscription)
|
||||
}
|
||||
|
||||
// create subscription
|
||||
state.queryToChanMap[qStr][clientID] = ch
|
||||
state.subscriptions[qStr][clientID] = subscription
|
||||
|
||||
// initialize queries if needed
|
||||
// initialize query if needed
|
||||
if _, ok := state.queries[qStr]; !ok {
|
||||
state.queries[qStr] = &queryPlusRefCount{q: q, refCount: 0}
|
||||
}
|
||||
// increment reference counter
|
||||
state.queries[qStr].refCount++
|
||||
|
||||
// add client if needed
|
||||
if _, ok := state.clientToQueryMap[clientID]; !ok {
|
||||
state.clientToQueryMap[clientID] = make(map[string]struct{})
|
||||
}
|
||||
state.clientToQueryMap[clientID][qStr] = struct{}{}
|
||||
}
|
||||
|
||||
func (state *state) remove(clientID string, q Query) {
|
||||
qStr := q.String()
|
||||
|
||||
clientToChannelMap, ok := state.queryToChanMap[qStr]
|
||||
func (state *state) remove(clientID string, qStr string, reason error) {
|
||||
clientSubscriptions, ok := state.subscriptions[qStr]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
ch, ok := clientToChannelMap[clientID]
|
||||
subscription, ok := clientSubscriptions[clientID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
close(ch)
|
||||
subscription.cancel(reason)
|
||||
|
||||
// remove the query from client map.
|
||||
// if client is not subscribed to anything else, remove it.
|
||||
delete(state.clientToQueryMap[clientID], qStr)
|
||||
if len(state.clientToQueryMap[clientID]) == 0 {
|
||||
delete(state.clientToQueryMap, clientID)
|
||||
}
|
||||
|
||||
// remove the client from query map.
|
||||
// remove client from query map.
|
||||
// if query has no other clients subscribed, remove it.
|
||||
delete(state.queryToChanMap[qStr], clientID)
|
||||
if len(state.queryToChanMap[qStr]) == 0 {
|
||||
delete(state.queryToChanMap, qStr)
|
||||
delete(state.subscriptions[qStr], clientID)
|
||||
if len(state.subscriptions[qStr]) == 0 {
|
||||
delete(state.subscriptions, qStr)
|
||||
}
|
||||
|
||||
// decrease ref counter in queries
|
||||
@@ -400,41 +360,38 @@ func (state *state) remove(clientID string, q Query) {
|
||||
}
|
||||
}
|
||||
|
||||
func (state *state) removeAll(clientID string) {
|
||||
queryMap, ok := state.clientToQueryMap[clientID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
for qStr := range queryMap {
|
||||
ch := state.queryToChanMap[qStr][clientID]
|
||||
close(ch)
|
||||
|
||||
// remove the client from query map.
|
||||
// if query has no other clients subscribed, remove it.
|
||||
delete(state.queryToChanMap[qStr], clientID)
|
||||
if len(state.queryToChanMap[qStr]) == 0 {
|
||||
delete(state.queryToChanMap, qStr)
|
||||
}
|
||||
|
||||
// decrease ref counter in queries
|
||||
state.queries[qStr].refCount--
|
||||
// remove the query if nobody else is using it
|
||||
if state.queries[qStr].refCount == 0 {
|
||||
delete(state.queries, qStr)
|
||||
func (state *state) removeClient(clientID string, reason error) {
|
||||
for qStr, clientSubscriptions := range state.subscriptions {
|
||||
if _, ok := clientSubscriptions[clientID]; ok {
|
||||
state.remove(clientID, qStr, reason)
|
||||
}
|
||||
}
|
||||
|
||||
// remove the client.
|
||||
delete(state.clientToQueryMap, clientID)
|
||||
}
|
||||
|
||||
func (state *state) send(msg interface{}, tags TagMap) {
|
||||
for qStr, clientToChannelMap := range state.queryToChanMap {
|
||||
func (state *state) removeAll(reason error) {
|
||||
for qStr, clientSubscriptions := range state.subscriptions {
|
||||
for clientID := range clientSubscriptions {
|
||||
state.remove(clientID, qStr, reason)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (state *state) send(msg interface{}, tags map[string]string) {
|
||||
for qStr, clientSubscriptions := range state.subscriptions {
|
||||
q := state.queries[qStr].q
|
||||
if q.Matches(tags) {
|
||||
for _, ch := range clientToChannelMap {
|
||||
ch <- msg
|
||||
for clientID, subscription := range clientSubscriptions {
|
||||
if cap(subscription.out) == 0 {
|
||||
// block on unbuffered channel
|
||||
subscription.out <- Message{msg, tags}
|
||||
} else {
|
||||
// don't block on buffered channels
|
||||
select {
|
||||
case subscription.out <- Message{msg, tags}:
|
||||
default:
|
||||
state.remove(clientID, qStr, ErrOutOfCapacity)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -27,16 +27,97 @@ func TestSubscribe(t *testing.T) {
|
||||
defer s.Stop()
|
||||
|
||||
ctx := context.Background()
|
||||
ch := make(chan interface{}, 1)
|
||||
err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
|
||||
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
|
||||
require.NoError(t, err)
|
||||
err = s.Publish(ctx, "Ka-Zar")
|
||||
require.NoError(t, err)
|
||||
assertReceive(t, "Ka-Zar", ch)
|
||||
assertReceive(t, "Ka-Zar", subscription.Out())
|
||||
|
||||
err = s.Publish(ctx, "Quicksilver")
|
||||
published := make(chan struct{})
|
||||
go func() {
|
||||
defer close(published)
|
||||
|
||||
err := s.Publish(ctx, "Quicksilver")
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = s.Publish(ctx, "Asylum")
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-published:
|
||||
assertReceive(t, "Quicksilver", subscription.Out())
|
||||
assertCancelled(t, subscription, pubsub.ErrOutOfCapacity)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("Expected Publish(Asylum) not to block")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeWithCapacity(t *testing.T) {
|
||||
s := pubsub.NewServer()
|
||||
s.SetLogger(log.TestingLogger())
|
||||
s.Start()
|
||||
defer s.Stop()
|
||||
|
||||
ctx := context.Background()
|
||||
assert.Panics(t, func() {
|
||||
s.Subscribe(ctx, clientID, query.Empty{}, -1)
|
||||
})
|
||||
assert.Panics(t, func() {
|
||||
s.Subscribe(ctx, clientID, query.Empty{}, 0)
|
||||
})
|
||||
subscription, err := s.Subscribe(ctx, clientID, query.Empty{}, 1)
|
||||
require.NoError(t, err)
|
||||
assertReceive(t, "Quicksilver", ch)
|
||||
err = s.Publish(ctx, "Aggamon")
|
||||
require.NoError(t, err)
|
||||
assertReceive(t, "Aggamon", subscription.Out())
|
||||
}
|
||||
|
||||
func TestSubscribeUnbuffered(t *testing.T) {
|
||||
s := pubsub.NewServer()
|
||||
s.SetLogger(log.TestingLogger())
|
||||
s.Start()
|
||||
defer s.Stop()
|
||||
|
||||
ctx := context.Background()
|
||||
subscription, err := s.SubscribeUnbuffered(ctx, clientID, query.Empty{})
|
||||
require.NoError(t, err)
|
||||
|
||||
published := make(chan struct{})
|
||||
go func() {
|
||||
defer close(published)
|
||||
|
||||
err := s.Publish(ctx, "Ultron")
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = s.Publish(ctx, "Darkhawk")
|
||||
assert.NoError(t, err)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-published:
|
||||
t.Fatal("Expected Publish(Darkhawk) to block")
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
assertReceive(t, "Ultron", subscription.Out())
|
||||
assertReceive(t, "Darkhawk", subscription.Out())
|
||||
}
|
||||
}
|
||||
|
||||
func TestSlowClientIsRemovedWithErrOutOfCapacity(t *testing.T) {
|
||||
s := pubsub.NewServer()
|
||||
s.SetLogger(log.TestingLogger())
|
||||
s.Start()
|
||||
defer s.Stop()
|
||||
|
||||
ctx := context.Background()
|
||||
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
|
||||
require.NoError(t, err)
|
||||
err = s.Publish(ctx, "Fat Cobra")
|
||||
require.NoError(t, err)
|
||||
err = s.Publish(ctx, "Viper")
|
||||
require.NoError(t, err)
|
||||
|
||||
assertCancelled(t, subscription, pubsub.ErrOutOfCapacity)
|
||||
}
|
||||
|
||||
func TestDifferentClients(t *testing.T) {
|
||||
@@ -46,27 +127,24 @@ func TestDifferentClients(t *testing.T) {
|
||||
defer s.Stop()
|
||||
|
||||
ctx := context.Background()
|
||||
ch1 := make(chan interface{}, 1)
|
||||
err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"), ch1)
|
||||
subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"))
|
||||
require.NoError(t, err)
|
||||
err = s.PublishWithTags(ctx, "Iceman", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
|
||||
err = s.PublishWithTags(ctx, "Iceman", map[string]string{"tm.events.type": "NewBlock"})
|
||||
require.NoError(t, err)
|
||||
assertReceive(t, "Iceman", ch1)
|
||||
assertReceive(t, "Iceman", subscription1.Out())
|
||||
|
||||
ch2 := make(chan interface{}, 1)
|
||||
err = s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"), ch2)
|
||||
subscription2, err := s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"))
|
||||
require.NoError(t, err)
|
||||
err = s.PublishWithTags(ctx, "Ultimo", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"}))
|
||||
err = s.PublishWithTags(ctx, "Ultimo", map[string]string{"tm.events.type": "NewBlock", "abci.account.name": "Igor"})
|
||||
require.NoError(t, err)
|
||||
assertReceive(t, "Ultimo", ch1)
|
||||
assertReceive(t, "Ultimo", ch2)
|
||||
assertReceive(t, "Ultimo", subscription1.Out())
|
||||
assertReceive(t, "Ultimo", subscription2.Out())
|
||||
|
||||
ch3 := make(chan interface{}, 1)
|
||||
err = s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"), ch3)
|
||||
subscription3, err := s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"))
|
||||
require.NoError(t, err)
|
||||
err = s.PublishWithTags(ctx, "Valeria Richards", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewRoundStep"}))
|
||||
err = s.PublishWithTags(ctx, "Valeria Richards", map[string]string{"tm.events.type": "NewRoundStep"})
|
||||
require.NoError(t, err)
|
||||
assert.Zero(t, len(ch3))
|
||||
assert.Zero(t, len(subscription3.Out()))
|
||||
}
|
||||
|
||||
func TestClientSubscribesTwice(t *testing.T) {
|
||||
@@ -78,20 +156,19 @@ func TestClientSubscribesTwice(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
q := query.MustParse("tm.events.type='NewBlock'")
|
||||
|
||||
ch1 := make(chan interface{}, 1)
|
||||
err := s.Subscribe(ctx, clientID, q, ch1)
|
||||
subscription1, err := s.Subscribe(ctx, clientID, q)
|
||||
require.NoError(t, err)
|
||||
err = s.PublishWithTags(ctx, "Goblin Queen", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
|
||||
err = s.PublishWithTags(ctx, "Goblin Queen", map[string]string{"tm.events.type": "NewBlock"})
|
||||
require.NoError(t, err)
|
||||
assertReceive(t, "Goblin Queen", ch1)
|
||||
assertReceive(t, "Goblin Queen", subscription1.Out())
|
||||
|
||||
ch2 := make(chan interface{}, 1)
|
||||
err = s.Subscribe(ctx, clientID, q, ch2)
|
||||
subscription2, err := s.Subscribe(ctx, clientID, q)
|
||||
require.Error(t, err)
|
||||
require.Nil(t, subscription2)
|
||||
|
||||
err = s.PublishWithTags(ctx, "Spider-Man", pubsub.NewTagMap(map[string]string{"tm.events.type": "NewBlock"}))
|
||||
err = s.PublishWithTags(ctx, "Spider-Man", map[string]string{"tm.events.type": "NewBlock"})
|
||||
require.NoError(t, err)
|
||||
assertReceive(t, "Spider-Man", ch1)
|
||||
assertReceive(t, "Spider-Man", subscription1.Out())
|
||||
}
|
||||
|
||||
func TestUnsubscribe(t *testing.T) {
|
||||
@@ -101,18 +178,16 @@ func TestUnsubscribe(t *testing.T) {
|
||||
defer s.Stop()
|
||||
|
||||
ctx := context.Background()
|
||||
ch := make(chan interface{})
|
||||
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch)
|
||||
subscription, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
|
||||
require.NoError(t, err)
|
||||
err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.Publish(ctx, "Nick Fury")
|
||||
require.NoError(t, err)
|
||||
assert.Zero(t, len(ch), "Should not receive anything after Unsubscribe")
|
||||
assert.Zero(t, len(subscription.Out()), "Should not receive anything after Unsubscribe")
|
||||
|
||||
_, ok := <-ch
|
||||
assert.False(t, ok)
|
||||
assertCancelled(t, subscription, pubsub.ErrUnsubscribed)
|
||||
}
|
||||
|
||||
func TestClientUnsubscribesTwice(t *testing.T) {
|
||||
@@ -122,8 +197,7 @@ func TestClientUnsubscribesTwice(t *testing.T) {
|
||||
defer s.Stop()
|
||||
|
||||
ctx := context.Background()
|
||||
ch := make(chan interface{})
|
||||
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch)
|
||||
_, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
|
||||
require.NoError(t, err)
|
||||
err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
|
||||
require.NoError(t, err)
|
||||
@@ -141,18 +215,16 @@ func TestResubscribe(t *testing.T) {
|
||||
defer s.Stop()
|
||||
|
||||
ctx := context.Background()
|
||||
ch := make(chan interface{})
|
||||
err := s.Subscribe(ctx, clientID, query.Empty{}, ch)
|
||||
subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
|
||||
require.NoError(t, err)
|
||||
err = s.Unsubscribe(ctx, clientID, query.Empty{})
|
||||
require.NoError(t, err)
|
||||
ch = make(chan interface{})
|
||||
err = s.Subscribe(ctx, clientID, query.Empty{}, ch)
|
||||
subscription, err = s.Subscribe(ctx, clientID, query.Empty{})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.Publish(ctx, "Cable")
|
||||
require.NoError(t, err)
|
||||
assertReceive(t, "Cable", ch)
|
||||
assertReceive(t, "Cable", subscription.Out())
|
||||
}
|
||||
|
||||
func TestUnsubscribeAll(t *testing.T) {
|
||||
@@ -162,10 +234,9 @@ func TestUnsubscribeAll(t *testing.T) {
|
||||
defer s.Stop()
|
||||
|
||||
ctx := context.Background()
|
||||
ch1, ch2 := make(chan interface{}, 1), make(chan interface{}, 1)
|
||||
err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"), ch1)
|
||||
subscription1, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
|
||||
require.NoError(t, err)
|
||||
err = s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"), ch2)
|
||||
subscription2, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.UnsubscribeAll(ctx, clientID)
|
||||
@@ -173,13 +244,11 @@ func TestUnsubscribeAll(t *testing.T) {
|
||||
|
||||
err = s.Publish(ctx, "Nick Fury")
|
||||
require.NoError(t, err)
|
||||
assert.Zero(t, len(ch1), "Should not receive anything after UnsubscribeAll")
|
||||
assert.Zero(t, len(ch2), "Should not receive anything after UnsubscribeAll")
|
||||
assert.Zero(t, len(subscription1.Out()), "Should not receive anything after UnsubscribeAll")
|
||||
assert.Zero(t, len(subscription2.Out()), "Should not receive anything after UnsubscribeAll")
|
||||
|
||||
_, ok := <-ch1
|
||||
assert.False(t, ok)
|
||||
_, ok = <-ch2
|
||||
assert.False(t, ok)
|
||||
assertCancelled(t, subscription1, pubsub.ErrUnsubscribed)
|
||||
assertCancelled(t, subscription2, pubsub.ErrUnsubscribed)
|
||||
}
|
||||
|
||||
func TestBufferCapacity(t *testing.T) {
|
||||
@@ -217,18 +286,26 @@ func benchmarkNClients(n int, b *testing.B) {
|
||||
|
||||
ctx := context.Background()
|
||||
for i := 0; i < n; i++ {
|
||||
ch := make(chan interface{})
|
||||
subscription, err := s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
go func() {
|
||||
for range ch {
|
||||
for {
|
||||
select {
|
||||
case <-subscription.Out():
|
||||
continue
|
||||
case <-subscription.Cancelled():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)), ch)
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)}))
|
||||
s.PublishWithTags(ctx, "Gamora", map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": string(i)})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -240,18 +317,26 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
|
||||
ctx := context.Background()
|
||||
q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1")
|
||||
for i := 0; i < n; i++ {
|
||||
ch := make(chan interface{})
|
||||
subscription, err := s.Subscribe(ctx, clientID, q)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
go func() {
|
||||
for range ch {
|
||||
for {
|
||||
select {
|
||||
case <-subscription.Out():
|
||||
continue
|
||||
case <-subscription.Cancelled():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
s.Subscribe(ctx, clientID, q, ch)
|
||||
}
|
||||
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
s.PublishWithTags(ctx, "Gamora", pubsub.NewTagMap(map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"}))
|
||||
s.PublishWithTags(ctx, "Gamora", map[string]string{"abci.Account.Owner": "Ivan", "abci.Invoices.Number": "1"})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -259,14 +344,18 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
|
||||
/// HELPERS
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
func assertReceive(t *testing.T, expected interface{}, ch <-chan interface{}, msgAndArgs ...interface{}) {
|
||||
func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.Message, msgAndArgs ...interface{}) {
|
||||
select {
|
||||
case actual := <-ch:
|
||||
if actual != nil {
|
||||
assert.Equal(t, expected, actual, msgAndArgs...)
|
||||
}
|
||||
assert.Equal(t, expected, actual.Data(), msgAndArgs...)
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected)
|
||||
debug.PrintStack()
|
||||
}
|
||||
}
|
||||
|
||||
func assertCancelled(t *testing.T, subscription *pubsub.Subscription, err error) {
|
||||
_, ok := <-subscription.Cancelled()
|
||||
assert.False(t, ok)
|
||||
assert.Equal(t, err, subscription.Err())
|
||||
}
|
||||
|
@@ -1,13 +1,11 @@
|
||||
package query
|
||||
|
||||
import "github.com/tendermint/tendermint/libs/pubsub"
|
||||
|
||||
// Empty query matches any set of tags.
|
||||
type Empty struct {
|
||||
}
|
||||
|
||||
// Matches always returns true.
|
||||
func (Empty) Matches(tags pubsub.TagMap) bool {
|
||||
func (Empty) Matches(tags map[string]string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
|
@@ -5,14 +5,13 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/pubsub"
|
||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
)
|
||||
|
||||
func TestEmptyQueryMatchesAnything(t *testing.T) {
|
||||
q := query.Empty{}
|
||||
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{})))
|
||||
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Asher": "Roth"})))
|
||||
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66"})))
|
||||
assert.True(t, q.Matches(pubsub.NewTagMap(map[string]string{"Route": "66", "Billy": "Blue"})))
|
||||
assert.True(t, q.Matches(map[string]string{}))
|
||||
assert.True(t, q.Matches(map[string]string{"Asher": "Roth"}))
|
||||
assert.True(t, q.Matches(map[string]string{"Route": "66"}))
|
||||
assert.True(t, q.Matches(map[string]string{"Route": "66", "Billy": "Blue"}))
|
||||
}
|
||||
|
@@ -14,8 +14,6 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/pubsub"
|
||||
)
|
||||
|
||||
// Query holds the query string and the query parser.
|
||||
@@ -154,8 +152,8 @@ func (q *Query) Conditions() []Condition {
|
||||
//
|
||||
// For example, query "name=John" matches tags = {"name": "John"}. More
|
||||
// examples could be found in parser_test.go and query_test.go.
|
||||
func (q *Query) Matches(tags pubsub.TagMap) bool {
|
||||
if tags.Len() == 0 {
|
||||
func (q *Query) Matches(tags map[string]string) bool {
|
||||
if len(tags) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -240,9 +238,9 @@ func (q *Query) Matches(tags pubsub.TagMap) bool {
|
||||
// value from it to the operand using the operator.
|
||||
//
|
||||
// "tx.gas", "=", "7", { "tx.gas": 7, "tx.ID": "4AE393495334" }
|
||||
func match(tag string, op Operator, operand reflect.Value, tags pubsub.TagMap) bool {
|
||||
func match(tag string, op Operator, operand reflect.Value, tags map[string]string) bool {
|
||||
// look up the tag from the query in tags
|
||||
value, ok := tags.Get(tag)
|
||||
value, ok := tags[tag]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
@@ -8,7 +8,6 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/pubsub"
|
||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
)
|
||||
|
||||
@@ -53,9 +52,9 @@ func TestMatches(t *testing.T) {
|
||||
}
|
||||
|
||||
if tc.matches {
|
||||
assert.True(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should match %v", tc.s, tc.tags)
|
||||
assert.True(t, q.Matches(tc.tags), "Query '%s' should match %v", tc.s, tc.tags)
|
||||
} else {
|
||||
assert.False(t, q.Matches(pubsub.NewTagMap(tc.tags)), "Query '%s' should not match %v", tc.s, tc.tags)
|
||||
assert.False(t, q.Matches(tc.tags), "Query '%s' should not match %v", tc.s, tc.tags)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
89
libs/pubsub/subscription.go
Normal file
89
libs/pubsub/subscription.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrUnsubscribed is returned by Err when a client unsubscribes.
|
||||
ErrUnsubscribed = errors.New("client unsubscribed")
|
||||
|
||||
// ErrOutOfCapacity is returned by Err when a client is not pulling messages
|
||||
// fast enough. Note the client's subscription will be terminated.
|
||||
ErrOutOfCapacity = errors.New("client is not pulling messages fast enough")
|
||||
)
|
||||
|
||||
// A Subscription represents a client subscription for a particular query and
|
||||
// consists of three things:
|
||||
// 1) channel onto which messages and tags are published
|
||||
// 2) channel which is closed if a client is too slow or choose to unsubscribe
|
||||
// 3) err indicating the reason for (2)
|
||||
type Subscription struct {
|
||||
out chan Message
|
||||
|
||||
cancelled chan struct{}
|
||||
mtx sync.RWMutex
|
||||
err error
|
||||
}
|
||||
|
||||
// NewSubscription returns a new subscription with the given outCapacity.
|
||||
func NewSubscription(outCapacity int) *Subscription {
|
||||
return &Subscription{
|
||||
out: make(chan Message, outCapacity),
|
||||
cancelled: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Out returns a channel onto which messages and tags are published.
|
||||
// Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from
|
||||
// receiving a nil message.
|
||||
func (s *Subscription) Out() <-chan Message {
|
||||
return s.out
|
||||
}
|
||||
|
||||
// Cancelled returns a channel that's closed when the subscription is
|
||||
// terminated and supposed to be used in a select statement.
|
||||
func (s *Subscription) Cancelled() <-chan struct{} {
|
||||
return s.cancelled
|
||||
}
|
||||
|
||||
// Err returns nil if the channel returned by Cancelled is not yet closed.
|
||||
// If the channel is closed, Err returns a non-nil error explaining why:
|
||||
// - ErrUnsubscribed if the subscriber choose to unsubscribe,
|
||||
// - ErrOutOfCapacity if the subscriber is not pulling messages fast enough
|
||||
// and the channel returned by Out became full,
|
||||
// After Err returns a non-nil error, successive calls to Err return the same
|
||||
// error.
|
||||
func (s *Subscription) Err() error {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
return s.err
|
||||
}
|
||||
|
||||
func (s *Subscription) cancel(err error) {
|
||||
s.mtx.Lock()
|
||||
s.err = err
|
||||
s.mtx.Unlock()
|
||||
close(s.cancelled)
|
||||
}
|
||||
|
||||
// Message glues data and tags together.
|
||||
type Message struct {
|
||||
data interface{}
|
||||
tags map[string]string
|
||||
}
|
||||
|
||||
func NewMessage(data interface{}, tags map[string]string) Message {
|
||||
return Message{data, tags}
|
||||
}
|
||||
|
||||
// Data returns an original data published.
|
||||
func (msg Message) Data() interface{} {
|
||||
return msg.data
|
||||
}
|
||||
|
||||
// Tags returns tags, which matched the client's query.
|
||||
func (msg Message) Tags() map[string]string {
|
||||
return msg.tags
|
||||
}
|
@@ -42,11 +42,12 @@ func TestNodeStartStop(t *testing.T) {
|
||||
t.Logf("Started node %v", n.sw.NodeInfo())
|
||||
|
||||
// wait for the node to produce a block
|
||||
blockCh := make(chan interface{})
|
||||
err = n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock, blockCh)
|
||||
blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock)
|
||||
require.NoError(t, err)
|
||||
select {
|
||||
case <-blockCh:
|
||||
case <-blocksSub.Out():
|
||||
case <-blocksSub.Cancelled():
|
||||
t.Fatal("blocksSub was cancelled")
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timed out waiting for the node to produce a block")
|
||||
}
|
||||
|
@@ -59,32 +59,20 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
|
||||
const subscriber = "helpers"
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
evts := make(chan interface{}, 1)
|
||||
|
||||
// register for the next event of this type
|
||||
query := types.QueryForEvent(evtTyp)
|
||||
err := c.Subscribe(ctx, subscriber, query, evts)
|
||||
sub, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to subscribe")
|
||||
}
|
||||
|
||||
// make sure to unregister after the test is over
|
||||
defer func() {
|
||||
// drain evts to make sure we don't block
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-evts:
|
||||
default:
|
||||
break LOOP
|
||||
}
|
||||
}
|
||||
c.UnsubscribeAll(ctx, subscriber)
|
||||
}()
|
||||
defer c.UnsubscribeAll(ctx, subscriber)
|
||||
|
||||
select {
|
||||
case evt := <-evts:
|
||||
return evt.(types.TMEventData), nil
|
||||
case msg := <-sub.Out():
|
||||
return msg.Data().(types.TMEventData), nil
|
||||
case <-sub.Cancelled():
|
||||
return nil, errors.New("subscription was cancelled")
|
||||
case <-ctx.Done():
|
||||
return nil, errors.New("timed out waiting for event")
|
||||
}
|
||||
|
@@ -249,6 +249,28 @@ func (c *HTTP) Validators(height *int64) (*ctypes.ResultValidators, error) {
|
||||
|
||||
/** websocket event stuff here... **/
|
||||
|
||||
type subscription struct {
|
||||
out chan tmpubsub.Message
|
||||
cancelled chan struct{}
|
||||
|
||||
mtx sync.RWMutex
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *subscription) Out() <-chan tmpubsub.Message {
|
||||
return s.out
|
||||
}
|
||||
|
||||
func (s *subscription) Cancelled() <-chan struct{} {
|
||||
return s.cancelled
|
||||
}
|
||||
|
||||
func (s *subscription) Err() error {
|
||||
s.mtx.RLock()
|
||||
defer s.mtx.RUnlock()
|
||||
return s.err
|
||||
}
|
||||
|
||||
type WSEvents struct {
|
||||
cmn.BaseService
|
||||
cdc *amino.Codec
|
||||
@@ -256,8 +278,9 @@ type WSEvents struct {
|
||||
endpoint string
|
||||
ws *rpcclient.WSClient
|
||||
|
||||
mtx sync.RWMutex
|
||||
subscriptions map[string]chan<- interface{}
|
||||
mtx sync.RWMutex
|
||||
// query -> subscription
|
||||
subscriptions map[string]*subscription
|
||||
}
|
||||
|
||||
func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
|
||||
@@ -265,7 +288,7 @@ func newWSEvents(cdc *amino.Codec, remote, endpoint string) *WSEvents {
|
||||
cdc: cdc,
|
||||
endpoint: endpoint,
|
||||
remote: remote,
|
||||
subscriptions: make(map[string]chan<- interface{}),
|
||||
subscriptions: make(map[string]*subscription),
|
||||
}
|
||||
|
||||
wsEvents.BaseService = *cmn.NewBaseService(nil, "WSEvents", wsEvents)
|
||||
@@ -295,21 +318,29 @@ func (w *WSEvents) OnStop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
|
||||
func (w *WSEvents) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) {
|
||||
q := query.String()
|
||||
|
||||
err := w.ws.Subscribe(ctx, q)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
outCap := 1
|
||||
if len(outCapacity) > 0 && outCapacity[0] >= 0 {
|
||||
outCap = outCapacity[0]
|
||||
}
|
||||
|
||||
w.mtx.Lock()
|
||||
// subscriber param is ignored because Tendermint will override it with
|
||||
// remote IP anyway.
|
||||
w.subscriptions[q] = out
|
||||
w.subscriptions[q] = &subscription{
|
||||
out: make(chan tmpubsub.Message, outCap),
|
||||
cancelled: make(chan struct{}),
|
||||
}
|
||||
w.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
return w.subscriptions[q], nil
|
||||
}
|
||||
|
||||
func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
|
||||
@@ -321,9 +352,12 @@ func (w *WSEvents) Unsubscribe(ctx context.Context, subscriber string, query tmp
|
||||
}
|
||||
|
||||
w.mtx.Lock()
|
||||
ch, ok := w.subscriptions[q]
|
||||
sub, ok := w.subscriptions[q]
|
||||
if ok {
|
||||
close(ch)
|
||||
close(sub.cancelled)
|
||||
sub.mtx.Lock()
|
||||
sub.err = errors.New("unsubscribed")
|
||||
sub.mtx.Unlock()
|
||||
delete(w.subscriptions, q)
|
||||
}
|
||||
w.mtx.Unlock()
|
||||
@@ -338,10 +372,13 @@ func (w *WSEvents) UnsubscribeAll(ctx context.Context, subscriber string) error
|
||||
}
|
||||
|
||||
w.mtx.Lock()
|
||||
for _, ch := range w.subscriptions {
|
||||
close(ch)
|
||||
for _, sub := range w.subscriptions {
|
||||
close(sub.cancelled)
|
||||
sub.mtx.Lock()
|
||||
sub.err = errors.New("unsubscribed")
|
||||
sub.mtx.Unlock()
|
||||
}
|
||||
w.subscriptions = make(map[string]chan<- interface{})
|
||||
w.subscriptions = make(map[string]*subscription)
|
||||
w.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
@@ -381,8 +418,8 @@ func (w *WSEvents) eventListener() {
|
||||
// NOTE: writing also happens inside mutex so we can't close a channel in
|
||||
// Unsubscribe/UnsubscribeAll.
|
||||
w.mtx.RLock()
|
||||
if ch, ok := w.subscriptions[result.Query]; ok {
|
||||
ch <- result.Data
|
||||
if sub, ok := w.subscriptions[result.Query]; ok {
|
||||
sub.out <- tmpubsub.NewMessage(result.Data, result.Tags)
|
||||
}
|
||||
w.mtx.RUnlock()
|
||||
case <-w.Quit():
|
||||
|
@@ -140,8 +140,8 @@ func (Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.Resu
|
||||
return core.TxSearch(query, prove, page, perPage)
|
||||
}
|
||||
|
||||
func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
|
||||
return c.EventBus.Subscribe(ctx, subscriber, query, out)
|
||||
func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) {
|
||||
return c.EventBus.Subscribe(ctx, subscriber, query, outCapacity...)
|
||||
}
|
||||
|
||||
func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
|
||||
|
@@ -101,16 +101,30 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
|
||||
defer cancel()
|
||||
ch := make(chan interface{})
|
||||
err = eventBusFor(wsCtx).Subscribe(ctx, addr, q, ch)
|
||||
sub, err := eventBusFor(wsCtx).Subscribe(ctx, addr, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for event := range ch {
|
||||
tmResult := &ctypes.ResultEvent{Query: query, Data: event.(tmtypes.TMEventData)}
|
||||
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCSuccessResponse(wsCtx.Codec(), rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)), tmResult))
|
||||
for {
|
||||
select {
|
||||
case msg := <-sub.Out():
|
||||
resultEvent := &ctypes.ResultEvent{Query: query, Data: msg.Data(), Tags: msg.Tags()}
|
||||
wsCtx.TryWriteRPCResponse(
|
||||
rpctypes.NewRPCSuccessResponse(
|
||||
wsCtx.Codec(),
|
||||
rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)),
|
||||
resultEvent,
|
||||
))
|
||||
case <-sub.Cancelled():
|
||||
wsCtx.TryWriteRPCResponse(
|
||||
rpctypes.RPCServerError(rpctypes.JSONRPCStringID(
|
||||
fmt.Sprintf("%v#event", wsCtx.Request.ID)),
|
||||
fmt.Errorf("subscription was cancelled (reason: %v)", sub.Err()),
|
||||
))
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@@ -169,26 +169,14 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
|
||||
// Subscribe to tx being committed in block.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
|
||||
defer cancel()
|
||||
deliverTxResCh := make(chan interface{}, 1)
|
||||
q := types.EventQueryTxFor(tx)
|
||||
err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh)
|
||||
deliverTxSub, err := eventBus.Subscribe(ctx, "mempool", q)
|
||||
if err != nil {
|
||||
err = errors.Wrap(err, "failed to subscribe to tx")
|
||||
logger.Error("Error on broadcast_tx_commit", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
// drain deliverTxResCh to make sure we don't block
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case <-deliverTxResCh:
|
||||
default:
|
||||
break LOOP
|
||||
}
|
||||
}
|
||||
eventBus.Unsubscribe(context.Background(), "mempool", q)
|
||||
}()
|
||||
defer eventBus.Unsubscribe(context.Background(), "mempool", q)
|
||||
|
||||
// Broadcast tx and wait for CheckTx result
|
||||
checkTxResCh := make(chan *abci.Response, 1)
|
||||
@@ -213,17 +201,22 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
|
||||
// TODO: configurable?
|
||||
var deliverTxTimeout = rpcserver.WriteTimeout / 2
|
||||
select {
|
||||
case deliverTxResMsg, ok := <-deliverTxResCh: // The tx was included in a block.
|
||||
if !ok {
|
||||
return nil, errors.New("Error on broadcastTxCommit: expected DeliverTxResult, got nil. Did the Tendermint stop?")
|
||||
}
|
||||
deliverTxRes := deliverTxResMsg.(types.EventDataTx)
|
||||
case msg := <-deliverTxSub.Out(): // The tx was included in a block.
|
||||
deliverTxRes := msg.Data().(types.EventDataTx)
|
||||
return &ctypes.ResultBroadcastTxCommit{
|
||||
CheckTx: *checkTxRes,
|
||||
DeliverTx: deliverTxRes.Result,
|
||||
Hash: tx.Hash(),
|
||||
Height: deliverTxRes.Height,
|
||||
}, nil
|
||||
case <-deliverTxSub.Cancelled():
|
||||
err = errors.New("deliverTxSub was cancelled. Did the Tendermint stop?")
|
||||
logger.Error("Error on broadcastTxCommit", "err", err)
|
||||
return &ctypes.ResultBroadcastTxCommit{
|
||||
CheckTx: *checkTxRes,
|
||||
DeliverTx: abci.ResponseDeliverTx{},
|
||||
Hash: tx.Hash(),
|
||||
}, err
|
||||
case <-time.After(deliverTxTimeout):
|
||||
err = errors.New("Timed out waiting for tx to be included in a block")
|
||||
logger.Error("Error on broadcastTxCommit", "err", err)
|
||||
|
@@ -205,4 +205,5 @@ type (
|
||||
type ResultEvent struct {
|
||||
Query string `json:"query"`
|
||||
Data types.TMEventData `json:"data"`
|
||||
Tags map[string]string `json:"tags"`
|
||||
}
|
||||
|
@@ -526,6 +526,7 @@ func (wsc *wsConnection) OnStart() error {
|
||||
func (wsc *wsConnection) OnStop() {
|
||||
// Both read and write loops close the websocket connection when they exit their loops.
|
||||
// The writeChan is never closed, to allow WriteRPCResponse() to fail.
|
||||
|
||||
if wsc.eventSub != nil {
|
||||
wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr)
|
||||
}
|
||||
|
@@ -12,6 +12,7 @@ import (
|
||||
amino "github.com/tendermint/go-amino"
|
||||
|
||||
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
|
||||
tmtypes "github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// a wrapper to emulate a sum type: jsonrpcid = string | int
|
||||
@@ -244,19 +245,19 @@ type WSRPCConnection interface {
|
||||
Codec() *amino.Codec
|
||||
}
|
||||
|
||||
// EventSubscriber mirros tendermint/tendermint/types.EventBusSubscriber
|
||||
type EventSubscriber interface {
|
||||
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
|
||||
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
|
||||
UnsubscribeAll(ctx context.Context, subscriber string) error
|
||||
}
|
||||
|
||||
// websocket-only RPCFuncs take this as the first parameter.
|
||||
type WSRPCContext struct {
|
||||
Request RPCRequest
|
||||
WSRPCConnection
|
||||
}
|
||||
|
||||
// EventSubscriber mirrors tendermint/tendermint/types.EventBusSubscriber
|
||||
type EventSubscriber interface {
|
||||
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (tmtypes.Subscription, error)
|
||||
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
|
||||
UnsubscribeAll(ctx context.Context, subscriber string) error
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
// SOCKETS
|
||||
//
|
||||
|
@@ -317,8 +317,7 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
|
||||
defer eventBus.Stop()
|
||||
blockExec.SetEventBus(eventBus)
|
||||
|
||||
updatesCh := make(chan interface{}, 1)
|
||||
err = eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates, updatesCh)
|
||||
updatesSub, err := eventBus.Subscribe(context.Background(), "TestEndBlockValidatorUpdates", types.EventQueryValidatorSetUpdates)
|
||||
require.NoError(t, err)
|
||||
|
||||
block := makeBlock(state, 1)
|
||||
@@ -342,13 +341,15 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
|
||||
|
||||
// test we threw an event
|
||||
select {
|
||||
case e := <-updatesCh:
|
||||
event, ok := e.(types.EventDataValidatorSetUpdates)
|
||||
require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", e)
|
||||
case msg := <-updatesSub.Out():
|
||||
event, ok := msg.Data().(types.EventDataValidatorSetUpdates)
|
||||
require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", msg.Data())
|
||||
if assert.NotEmpty(t, event.ValidatorUpdates) {
|
||||
assert.Equal(t, pubkey, event.ValidatorUpdates[0].PubKey)
|
||||
assert.EqualValues(t, 10, event.ValidatorUpdates[0].VotingPower)
|
||||
}
|
||||
case <-updatesSub.Cancelled():
|
||||
t.Fatalf("updatesSub was cancelled (reason: %v)", updatesSub.Err())
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Fatal("Did not receive EventValidatorSetUpdates within 1 sec.")
|
||||
}
|
||||
|
@@ -31,35 +31,40 @@ func NewIndexerService(idr TxIndexer, eventBus *types.EventBus) *IndexerService
|
||||
// OnStart implements cmn.Service by subscribing for all transactions
|
||||
// and indexing them by tags.
|
||||
func (is *IndexerService) OnStart() error {
|
||||
blockHeadersCh := make(chan interface{})
|
||||
if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryNewBlockHeader, blockHeadersCh); err != nil {
|
||||
// Use SubscribeUnbuffered here to ensure both subscriptions does not get
|
||||
// cancelled due to not pulling messages fast enough. Cause this might
|
||||
// sometimes happen when there are no other subscribers.
|
||||
|
||||
blockHeadersSub, err := is.eventBus.SubscribeUnbuffered(context.Background(), subscriber, types.EventQueryNewBlockHeader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
txsCh := make(chan interface{})
|
||||
if err := is.eventBus.Subscribe(context.Background(), subscriber, types.EventQueryTx, txsCh); err != nil {
|
||||
txsSub, err := is.eventBus.SubscribeUnbuffered(context.Background(), subscriber, types.EventQueryTx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
e, ok := <-blockHeadersCh
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
header := e.(types.EventDataNewBlockHeader).Header
|
||||
msg := <-blockHeadersSub.Out()
|
||||
header := msg.Data().(types.EventDataNewBlockHeader).Header
|
||||
batch := NewBatch(header.NumTxs)
|
||||
for i := int64(0); i < header.NumTxs; i++ {
|
||||
e, ok := <-txsCh
|
||||
if !ok {
|
||||
is.Logger.Error("Failed to index all transactions due to closed transactions channel", "height", header.Height, "numTxs", header.NumTxs, "numProcessed", i)
|
||||
return
|
||||
msg2 := <-txsSub.Out()
|
||||
txResult := msg2.Data().(types.EventDataTx).TxResult
|
||||
if err = batch.Add(&txResult); err != nil {
|
||||
is.Logger.Error("Can't add tx to batch",
|
||||
"height", header.Height,
|
||||
"index", txResult.Index,
|
||||
"err", err)
|
||||
}
|
||||
txResult := e.(types.EventDataTx).TxResult
|
||||
batch.Add(&txResult)
|
||||
}
|
||||
is.idr.AddBatch(batch)
|
||||
is.Logger.Info("Indexed block", "height", header.Height)
|
||||
if err = is.idr.AddBatch(batch); err != nil {
|
||||
is.Logger.Error("Failed to index block", "height", header.Height, "err", err)
|
||||
} else {
|
||||
is.Logger.Info("Indexed block", "height", header.Height)
|
||||
}
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
|
64
state/txindex/indexer_service_test.go
Normal file
64
state/txindex/indexer_service_test.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package txindex_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
"github.com/tendermint/tendermint/libs/db"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/state/txindex"
|
||||
"github.com/tendermint/tendermint/state/txindex/kv"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func TestIndexerServiceIndexesBlocks(t *testing.T) {
|
||||
// event bus
|
||||
eventBus := types.NewEventBus()
|
||||
eventBus.SetLogger(log.TestingLogger())
|
||||
err := eventBus.Start()
|
||||
require.NoError(t, err)
|
||||
defer eventBus.Stop()
|
||||
|
||||
// tx indexer
|
||||
store := db.NewMemDB()
|
||||
txIndexer := kv.NewTxIndex(store, kv.IndexAllTags())
|
||||
|
||||
service := txindex.NewIndexerService(txIndexer, eventBus)
|
||||
service.SetLogger(log.TestingLogger())
|
||||
err = service.Start()
|
||||
require.NoError(t, err)
|
||||
defer service.Stop()
|
||||
|
||||
// publish block with txs
|
||||
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
|
||||
Header: types.Header{Height: 1, NumTxs: 2},
|
||||
})
|
||||
txResult1 := &types.TxResult{
|
||||
Height: 1,
|
||||
Index: uint32(0),
|
||||
Tx: types.Tx("foo"),
|
||||
Result: abci.ResponseDeliverTx{Code: 0},
|
||||
}
|
||||
eventBus.PublishEventTx(types.EventDataTx{*txResult1})
|
||||
txResult2 := &types.TxResult{
|
||||
Height: 1,
|
||||
Index: uint32(1),
|
||||
Tx: types.Tx("bar"),
|
||||
Result: abci.ResponseDeliverTx{Code: 0},
|
||||
}
|
||||
eventBus.PublishEventTx(types.EventDataTx{*txResult2})
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// check the result
|
||||
res, err := txIndexer.Get(types.Tx("foo").Hash())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, txResult1, res)
|
||||
res, err = txIndexer.Get(types.Tx("bar").Hash())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, txResult2, res)
|
||||
}
|
@@ -12,11 +12,17 @@ import (
|
||||
const defaultCapacity = 0
|
||||
|
||||
type EventBusSubscriber interface {
|
||||
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
|
||||
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error)
|
||||
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
|
||||
UnsubscribeAll(ctx context.Context, subscriber string) error
|
||||
}
|
||||
|
||||
type Subscription interface {
|
||||
Out() <-chan tmpubsub.Message
|
||||
Cancelled() <-chan struct{}
|
||||
Err() error
|
||||
}
|
||||
|
||||
// EventBus is a common bus for all events going through the system. All calls
|
||||
// are proxied to underlying pubsub server. All events must be published using
|
||||
// EventBus to ensure correct data types.
|
||||
@@ -52,8 +58,14 @@ func (b *EventBus) OnStop() {
|
||||
b.pubsub.Stop()
|
||||
}
|
||||
|
||||
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error {
|
||||
return b.pubsub.Subscribe(ctx, subscriber, query, out)
|
||||
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) {
|
||||
return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...)
|
||||
}
|
||||
|
||||
// This method can be used for a local consensus explorer and synchronous
|
||||
// testing. Do not use for for public facing / untrusted subscriptions!
|
||||
func (b *EventBus) SubscribeUnbuffered(ctx context.Context, subscriber string, query tmpubsub.Query) (Subscription, error) {
|
||||
return b.pubsub.SubscribeUnbuffered(ctx, subscriber, query)
|
||||
}
|
||||
|
||||
func (b *EventBus) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {
|
||||
@@ -67,7 +79,7 @@ func (b *EventBus) UnsubscribeAll(ctx context.Context, subscriber string) error
|
||||
func (b *EventBus) Publish(eventType string, eventData TMEventData) error {
|
||||
// no explicit deadline for publishing events
|
||||
ctx := context.Background()
|
||||
b.pubsub.PublishWithTags(ctx, eventData, tmpubsub.NewTagMap(map[string]string{EventTypeKey: eventType}))
|
||||
b.pubsub.PublishWithTags(ctx, eventData, map[string]string{EventTypeKey: eventType})
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -95,7 +107,7 @@ func (b *EventBus) PublishEventNewBlock(data EventDataNewBlock) error {
|
||||
logIfTagExists(EventTypeKey, tags, b.Logger)
|
||||
tags[EventTypeKey] = EventNewBlock
|
||||
|
||||
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
|
||||
b.pubsub.PublishWithTags(ctx, data, tags)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -111,7 +123,7 @@ func (b *EventBus) PublishEventNewBlockHeader(data EventDataNewBlockHeader) erro
|
||||
logIfTagExists(EventTypeKey, tags, b.Logger)
|
||||
tags[EventTypeKey] = EventNewBlockHeader
|
||||
|
||||
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
|
||||
b.pubsub.PublishWithTags(ctx, data, tags)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -142,7 +154,7 @@ func (b *EventBus) PublishEventTx(data EventDataTx) error {
|
||||
logIfTagExists(TxHeightKey, tags, b.Logger)
|
||||
tags[TxHeightKey] = fmt.Sprintf("%d", data.Height)
|
||||
|
||||
b.pubsub.PublishWithTags(ctx, data, tmpubsub.NewTagMap(tags))
|
||||
b.pubsub.PublishWithTags(ctx, data, tags)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -24,23 +24,20 @@ func TestEventBusPublishEventTx(t *testing.T) {
|
||||
tx := Tx("foo")
|
||||
result := abci.ResponseDeliverTx{Data: []byte("bar"), Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
|
||||
|
||||
txEventsCh := make(chan interface{})
|
||||
|
||||
// PublishEventTx adds all these 3 tags, so the query below should work
|
||||
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND baz=1", tx.Hash())
|
||||
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
|
||||
txsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
|
||||
require.NoError(t, err)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for e := range txEventsCh {
|
||||
edt := e.(EventDataTx)
|
||||
assert.Equal(t, int64(1), edt.Height)
|
||||
assert.Equal(t, uint32(0), edt.Index)
|
||||
assert.Equal(t, tx, edt.Tx)
|
||||
assert.Equal(t, result, edt.Result)
|
||||
close(done)
|
||||
}
|
||||
msg := <-txsSub.Out()
|
||||
edt := msg.Data().(EventDataTx)
|
||||
assert.Equal(t, int64(1), edt.Height)
|
||||
assert.Equal(t, uint32(0), edt.Index)
|
||||
assert.Equal(t, tx, edt.Tx)
|
||||
assert.Equal(t, result, edt.Result)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
err = eventBus.PublishEventTx(EventDataTx{TxResult{
|
||||
@@ -68,22 +65,19 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
|
||||
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
|
||||
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
|
||||
|
||||
txEventsCh := make(chan interface{})
|
||||
|
||||
// PublishEventNewBlock adds the tm.event tag, so the query below should work
|
||||
query := "tm.event='NewBlock' AND baz=1 AND foz=2"
|
||||
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
|
||||
blocksSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
|
||||
require.NoError(t, err)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for e := range txEventsCh {
|
||||
edt := e.(EventDataNewBlock)
|
||||
assert.Equal(t, block, edt.Block)
|
||||
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
|
||||
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
|
||||
close(done)
|
||||
}
|
||||
msg := <-blocksSub.Out()
|
||||
edt := msg.Data().(EventDataNewBlock)
|
||||
assert.Equal(t, block, edt.Block)
|
||||
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
|
||||
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
err = eventBus.PublishEventNewBlock(EventDataNewBlock{
|
||||
@@ -110,22 +104,19 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
|
||||
resultBeginBlock := abci.ResponseBeginBlock{Tags: []cmn.KVPair{{Key: []byte("baz"), Value: []byte("1")}}}
|
||||
resultEndBlock := abci.ResponseEndBlock{Tags: []cmn.KVPair{{Key: []byte("foz"), Value: []byte("2")}}}
|
||||
|
||||
txEventsCh := make(chan interface{})
|
||||
|
||||
// PublishEventNewBlockHeader adds the tm.event tag, so the query below should work
|
||||
query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2"
|
||||
err = eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query), txEventsCh)
|
||||
headersSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
|
||||
require.NoError(t, err)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
for e := range txEventsCh {
|
||||
edt := e.(EventDataNewBlockHeader)
|
||||
assert.Equal(t, block.Header, edt.Header)
|
||||
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
|
||||
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
|
||||
close(done)
|
||||
}
|
||||
msg := <-headersSub.Out()
|
||||
edt := msg.Data().(EventDataNewBlockHeader)
|
||||
assert.Equal(t, block.Header, edt.Header)
|
||||
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
|
||||
assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
err = eventBus.PublishEventNewBlockHeader(EventDataNewBlockHeader{
|
||||
@@ -148,18 +139,19 @@ func TestEventBusPublish(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
defer eventBus.Stop()
|
||||
|
||||
eventsCh := make(chan interface{})
|
||||
err = eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, eventsCh)
|
||||
const numEventsExpected = 14
|
||||
|
||||
sub, err := eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}, numEventsExpected)
|
||||
require.NoError(t, err)
|
||||
|
||||
const numEventsExpected = 14
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
numEvents := 0
|
||||
for range eventsCh {
|
||||
for range sub.Out() {
|
||||
numEvents++
|
||||
if numEvents >= numEventsExpected {
|
||||
close(done)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -243,15 +235,22 @@ func benchmarkEventBus(numClients int, randQueries bool, randEvents bool, b *tes
|
||||
q := EventQueryNewBlock
|
||||
|
||||
for i := 0; i < numClients; i++ {
|
||||
ch := make(chan interface{})
|
||||
go func() {
|
||||
for range ch {
|
||||
}
|
||||
}()
|
||||
if randQueries {
|
||||
q = randQuery()
|
||||
}
|
||||
eventBus.Subscribe(ctx, fmt.Sprintf("client-%d", i), q, ch)
|
||||
sub, err := eventBus.Subscribe(ctx, fmt.Sprintf("client-%d", i), q)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-sub.Out():
|
||||
case <-sub.Cancelled():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
eventType := EventNewBlock
|
||||
|
Reference in New Issue
Block a user