This commit is contained in:
Anton Kaliaev
2019-01-30 20:32:53 +04:00
parent 641182e5d3
commit 54cc5100f8
21 changed files with 170 additions and 212 deletions

View File

@ -9,7 +9,6 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -50,7 +49,7 @@ func TestByzantine(t *testing.T) {
switches[i].SetLogger(p2pLogger.With("validator", i)) switches[i].SetLogger(p2pLogger.With("validator", i))
} }
eventSubs := make([]*tmpubsub.Subscription, N) eventSubs := make([]types.Subscription, N)
reactors := make([]p2p.Reactor, N) reactors := make([]p2p.Reactor, N)
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
// make first val byzantine // make first val byzantine

View File

@ -7,7 +7,6 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"reflect"
"sort" "sort"
"sync" "sync"
"testing" "testing"
@ -220,22 +219,22 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo
} }
// genesis // genesis
func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { func subscribeToVoter(cs *ConsensusState, addr []byte) <-chan tmpubsub.MsgAndTags {
voteCh0Sub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote) voteCh0Sub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote)
if err != nil { if err != nil {
panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote)) panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote))
} }
voteCh := make(chan interface{}) ch := make(chan tmpubsub.MsgAndTags)
go func() { go func() {
for msgAndTags := range voteCh0Sub.Out() { for mt := range voteCh0Sub.Out() {
vote := msgAndTags.Msg.(types.EventDataVote) vote := mt.Msg().(types.EventDataVote)
// we only fire for our own votes // we only fire for our own votes
if bytes.Equal(addr, vote.Vote.ValidatorAddress) { if bytes.Equal(addr, vote.Vote.ValidatorAddress) {
voteCh <- msgAndTags.Msg ch <- mt
} }
} }
}() }()
return voteCh return ch
} }
//------------------------------------------------------------------------------- //-------------------------------------------------------------------------------
@ -350,29 +349,21 @@ func ensureNoNewTimeout(stepCh <-chan tmpubsub.MsgAndTags, timeout int64) {
"We should be stuck waiting, not receiving NewTimeout event") "We should be stuck waiting, not receiving NewTimeout event")
} }
func ensureNewEvent( func ensureNewEvent(ch <-chan tmpubsub.MsgAndTags, height int64, round int, timeout time.Duration, errorMessage string) {
ch <-chan tmpubsub.MsgAndTags,
height int64,
round int,
timeout time.Duration,
errorMessage string) {
select { select {
case <-time.After(timeout): case <-time.After(timeout):
panic(errorMessage) panic(errorMessage)
case ev := <-ch: case mt := <-ch:
rs, ok := ev.Msg.(types.EventDataRoundState) roundStateEvent, ok := mt.Msg().(types.EventDataRoundState)
if !ok { if !ok {
panic( panic(fmt.Sprintf("expected a EventDataRoundState, got %T. Wrong subscription channel?",
fmt.Sprintf( mt.Msg()))
"expected a EventDataRoundState, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
} }
if rs.Height != height { if roundStateEvent.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height)) panic(fmt.Sprintf("expected height %v, got %v", height, roundStateEvent.Height))
} }
if rs.Round != round { if roundStateEvent.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round)) panic(fmt.Sprintf("expected round %v, got %v", round, roundStateEvent.Round))
} }
// TODO: We could check also for a step at this point! // TODO: We could check also for a step at this point!
} }
@ -382,19 +373,17 @@ func ensureNewRound(roundCh <-chan tmpubsub.MsgAndTags, height int64, round int)
select { select {
case <-time.After(ensureTimeout): case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewRound event") panic("Timeout expired while waiting for NewRound event")
case ev := <-roundCh: case mt := <-roundCh:
rs, ok := ev.Msg.(types.EventDataNewRound) newRoundEvent, ok := mt.Msg().(types.EventDataNewRound)
if !ok { if !ok {
panic( panic(fmt.Sprintf("expected a EventDataNewRound, got %T. Wrong subscription channel?",
fmt.Sprintf( mt.Msg()))
"expected a EventDataNewRound, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
} }
if rs.Height != height { if newRoundEvent.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height)) panic(fmt.Sprintf("expected height %v, got %v", height, newRoundEvent.Height))
} }
if rs.Round != round { if newRoundEvent.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round)) panic(fmt.Sprintf("expected round %v, got %v", round, newRoundEvent.Round))
} }
} }
} }
@ -409,19 +398,17 @@ func ensureNewProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, roun
select { select {
case <-time.After(ensureTimeout): case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event") panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh: case mt := <-proposalCh:
rs, ok := ev.Msg.(types.EventDataCompleteProposal) proposalEvent, ok := mt.Msg().(types.EventDataCompleteProposal)
if !ok { if !ok {
panic( panic(fmt.Sprintf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
fmt.Sprintf( mt.Msg()))
"expected a EventDataCompleteProposal, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
} }
if rs.Height != height { if proposalEvent.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height)) panic(fmt.Sprintf("expected height %v, got %v", height, proposalEvent.Height))
} }
if rs.Round != round { if proposalEvent.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round)) panic(fmt.Sprintf("expected round %v, got %v", round, proposalEvent.Round))
} }
} }
} }
@ -435,15 +422,14 @@ func ensureNewBlock(blockCh <-chan tmpubsub.MsgAndTags, height int64) {
select { select {
case <-time.After(ensureTimeout): case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewBlock event") panic("Timeout expired while waiting for NewBlock event")
case ev := <-blockCh: case mt := <-blockCh:
block, ok := ev.Msg.(types.EventDataNewBlock) blockEvent, ok := mt.Msg().(types.EventDataNewBlock)
if !ok { if !ok {
panic(fmt.Sprintf("expected a *types.EventDataNewBlock, "+ panic(fmt.Sprintf("expected a EventDataNewBlock, got %T. Wrong subscription channel?",
"got %v. wrong subscription channel?", mt.Msg()))
reflect.TypeOf(block)))
} }
if block.Block.Height != height { if blockEvent.Block.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, block.Block.Height)) panic(fmt.Sprintf("expected height %v, got %v", height, blockEvent.Block.Height))
} }
} }
} }
@ -452,18 +438,17 @@ func ensureNewBlockHeader(blockCh <-chan tmpubsub.MsgAndTags, height int64, bloc
select { select {
case <-time.After(ensureTimeout): case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewBlockHeader event") panic("Timeout expired while waiting for NewBlockHeader event")
case ev := <-blockCh: case mt := <-blockCh:
blockHeader, ok := ev.Msg.(types.EventDataNewBlockHeader) blockHeaderEvent, ok := mt.Msg().(types.EventDataNewBlockHeader)
if !ok { if !ok {
panic(fmt.Sprintf("expected a *types.EventDataNewBlockHeader, "+ panic(fmt.Sprintf("expected a EventDataNewBlockHeader, got %T. Wrong subscription channel?",
"got %v. wrong subscription channel?", mt.Msg()))
reflect.TypeOf(blockHeader)))
} }
if blockHeader.Header.Height != height { if blockHeaderEvent.Header.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, blockHeader.Header.Height)) panic(fmt.Sprintf("expected height %v, got %v", height, blockHeaderEvent.Header.Height))
} }
if !bytes.Equal(blockHeader.Header.Hash(), blockHash) { if !bytes.Equal(blockHeaderEvent.Header.Hash(), blockHash) {
panic(fmt.Sprintf("expected header %X, got %X", blockHash, blockHeader.Header.Hash())) panic(fmt.Sprintf("expected header %X, got %X", blockHash, blockHeaderEvent.Header.Hash()))
} }
} }
} }
@ -473,51 +458,48 @@ func ensureNewUnlock(unlockCh <-chan tmpubsub.MsgAndTags, height int64, round in
"Timeout expired while waiting for NewUnlock event") "Timeout expired while waiting for NewUnlock event")
} }
func ensureProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, round int, propId types.BlockID) { func ensureProposal(proposalCh <-chan tmpubsub.MsgAndTags, height int64, round int, propID types.BlockID) {
select { select {
case <-time.After(ensureTimeout): case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewProposal event") panic("Timeout expired while waiting for NewProposal event")
case ev := <-proposalCh: case mt := <-proposalCh:
rs, ok := ev.Msg.(types.EventDataCompleteProposal) proposalEvent, ok := mt.Msg().(types.EventDataCompleteProposal)
if !ok { if !ok {
panic( panic(fmt.Sprintf("expected a EventDataCompleteProposal, got %T. Wrong subscription channel?",
fmt.Sprintf( mt.Msg()))
"expected a EventDataCompleteProposal, got %v.Wrong subscription channel?",
reflect.TypeOf(rs)))
} }
if rs.Height != height { if proposalEvent.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height)) panic(fmt.Sprintf("expected height %v, got %v", height, proposalEvent.Height))
} }
if rs.Round != round { if proposalEvent.Round != round {
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round)) panic(fmt.Sprintf("expected round %v, got %v", round, proposalEvent.Round))
} }
if !rs.BlockID.Equals(propId) { if !proposalEvent.BlockID.Equals(propID) {
panic("Proposed block does not match expected block") panic("Proposed block does not match expected block")
} }
} }
} }
func ensurePrecommit(voteCh <-chan interface{}, height int64, round int) { func ensurePrecommit(voteCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
ensureVote(voteCh, height, round, types.PrecommitType) ensureVote(voteCh, height, round, types.PrecommitType)
} }
func ensurePrevote(voteCh <-chan interface{}, height int64, round int) { func ensurePrevote(voteCh <-chan tmpubsub.MsgAndTags, height int64, round int) {
ensureVote(voteCh, height, round, types.PrevoteType) ensureVote(voteCh, height, round, types.PrevoteType)
} }
func ensureVote(voteCh <-chan interface{}, height int64, round int, func ensureVote(voteCh <-chan tmpubsub.MsgAndTags, height int64, round int,
voteType types.SignedMsgType) { voteType types.SignedMsgType) {
select { select {
case <-time.After(ensureTimeout): case <-time.After(ensureTimeout):
panic("Timeout expired while waiting for NewVote event") panic("Timeout expired while waiting for NewVote event")
case v := <-voteCh: case mt := <-voteCh:
edv, ok := v.(types.EventDataVote) voteEvent, ok := mt.Msg().(types.EventDataVote)
if !ok { if !ok {
panic(fmt.Sprintf("expected a *types.Vote, "+ panic(fmt.Sprintf("expected a EventDataVote, got %T. Wrong subscription channel?",
"got %v. wrong subscription channel?", mt.Msg()))
reflect.TypeOf(v)))
} }
vote := edv.Vote vote := voteEvent.Vote
if vote.Height != height { if vote.Height != height {
panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height)) panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height))
} }

View File

@ -117,9 +117,9 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) {
for nTxs := 0; nTxs < NTxs; { for nTxs := 0; nTxs < NTxs; {
ticker := time.NewTicker(time.Second * 30) ticker := time.NewTicker(time.Second * 30)
select { select {
case b := <-newBlockCh: case mt := <-newBlockCh:
evt := b.Msg.(types.EventDataNewBlock) blockEvent := mt.Msg().(types.EventDataNewBlock)
nTxs += int(evt.Block.Header.NumTxs) nTxs += int(blockEvent.Block.Header.NumTxs)
case <-ticker.C: case <-ticker.C:
panic("Timed out waiting to commit blocks with transactions") panic("Timed out waiting to commit blocks with transactions")
} }

View File

@ -21,7 +21,6 @@ import (
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
mempl "github.com/tendermint/tendermint/mempool" mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
@ -37,12 +36,11 @@ func init() {
func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ( func startConsensusNet(t *testing.T, css []*ConsensusState, N int) (
[]*ConsensusReactor, []*ConsensusReactor,
[]*tmpubsub.Subscription, []types.Subscription,
[]*types.EventBus, []*types.EventBus,
) { ) {
var err error
reactors := make([]*ConsensusReactor, N) reactors := make([]*ConsensusReactor, N)
eventSubs := make([]*tmpubsub.Subscription, N) blocksSubs := make([]types.Subscription, 0)
eventBuses := make([]*types.EventBus, N) eventBuses := make([]*types.EventBus, N)
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
/*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info") /*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info")
@ -54,8 +52,9 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) (
eventBuses[i] = css[i].eventBus eventBuses[i] = css[i].eventBus
reactors[i].SetEventBus(eventBuses[i]) reactors[i].SetEventBus(eventBuses[i])
eventSubs[i], err = eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) blocksSub, err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock)
require.NoError(t, err) require.NoError(t, err)
blocksSubs = append(blocksSubs, blocksSub)
} }
// make connected switches and start all reactors // make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
@ -72,7 +71,7 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) (
s := reactors[i].conS.GetState() s := reactors[i].conS.GetState()
reactors[i].SwitchToConsensus(s, 0) reactors[i].SwitchToConsensus(s, 0)
} }
return reactors, eventSubs, eventBuses return reactors, blocksSubs, eventBuses
} }
func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuses []*types.EventBus) { func stopConsensusNet(logger log.Logger, reactors []*ConsensusReactor, eventBuses []*types.EventBus) {
@ -173,15 +172,15 @@ func TestReactorWithEvidence(t *testing.T) {
// wait till everyone makes the first new block with no evidence // wait till everyone makes the first new block with no evidence
timeoutWaitGroup(t, nValidators, func(j int) { timeoutWaitGroup(t, nValidators, func(j int) {
blockI := <-eventSubs[j].Out() mt := <-eventSubs[j].Out()
block := blockI.Msg.(types.EventDataNewBlock).Block block := mt.Msg().(types.EventDataNewBlock).Block
assert.True(t, len(block.Evidence.Evidence) == 0) assert.True(t, len(block.Evidence.Evidence) == 0)
}, css) }, css)
// second block should have evidence // second block should have evidence
timeoutWaitGroup(t, nValidators, func(j int) { timeoutWaitGroup(t, nValidators, func(j int) {
blockI := <-eventSubs[j].Out() mt := <-eventSubs[j].Out()
block := blockI.Msg.(types.EventDataNewBlock).Block block := mt.Msg().(types.EventDataNewBlock).Block
assert.True(t, len(block.Evidence.Evidence) > 0) assert.True(t, len(block.Evidence.Evidence) > 0)
}, css) }, css)
} }
@ -445,17 +444,14 @@ func waitForAndValidateBlock(
t *testing.T, t *testing.T,
n int, n int,
activeVals map[string]struct{}, activeVals map[string]struct{},
eventSubs []*tmpubsub.Subscription, eventSubs []types.Subscription,
css []*ConsensusState, css []*ConsensusState,
txs ...[]byte, txs ...[]byte,
) { ) {
timeoutWaitGroup(t, n, func(j int) { timeoutWaitGroup(t, n, func(j int) {
css[j].Logger.Debug("waitForAndValidateBlock") css[j].Logger.Debug("waitForAndValidateBlock")
newBlockI, ok := <-eventSubs[j].Out() mt := <-eventSubs[j].Out()
if !ok { newBlock := mt.Msg().(types.EventDataNewBlock).Block
return
}
newBlock := newBlockI.Msg.(types.EventDataNewBlock).Block
css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height) css[j].Logger.Debug("waitForAndValidateBlock: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals) err := validateBlock(newBlock, activeVals)
assert.Nil(t, err) assert.Nil(t, err)
@ -470,7 +466,7 @@ func waitForAndValidateBlockWithTx(
t *testing.T, t *testing.T,
n int, n int,
activeVals map[string]struct{}, activeVals map[string]struct{},
eventSubs []*tmpubsub.Subscription, eventSubs []types.Subscription,
css []*ConsensusState, css []*ConsensusState,
txs ...[]byte, txs ...[]byte,
) { ) {
@ -479,11 +475,8 @@ func waitForAndValidateBlockWithTx(
BLOCK_TX_LOOP: BLOCK_TX_LOOP:
for { for {
css[j].Logger.Debug("waitForAndValidateBlockWithTx", "ntxs", ntxs) css[j].Logger.Debug("waitForAndValidateBlockWithTx", "ntxs", ntxs)
newBlockI, ok := <-eventSubs[j].Out() mt := <-eventSubs[j].Out()
if !ok { newBlock := mt.Msg().(types.EventDataNewBlock).Block
return
}
newBlock := newBlockI.Msg.(types.EventDataNewBlock).Block
css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height) css[j].Logger.Debug("waitForAndValidateBlockWithTx: Got block", "height", newBlock.Height)
err := validateBlock(newBlock, activeVals) err := validateBlock(newBlock, activeVals)
assert.Nil(t, err) assert.Nil(t, err)
@ -508,7 +501,7 @@ func waitForBlockWithUpdatedValsAndValidateIt(
t *testing.T, t *testing.T,
n int, n int,
updatedVals map[string]struct{}, updatedVals map[string]struct{},
eventSubs []*tmpubsub.Subscription, eventSubs []types.Subscription,
css []*ConsensusState, css []*ConsensusState,
) { ) {
timeoutWaitGroup(t, n, func(j int) { timeoutWaitGroup(t, n, func(j int) {
@ -517,11 +510,8 @@ func waitForBlockWithUpdatedValsAndValidateIt(
LOOP: LOOP:
for { for {
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt") css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt")
newBlockI, ok := <-eventSubs[j].Out() mt := <-eventSubs[j].Out()
if !ok { newBlock = mt.Msg().(types.EventDataNewBlock).Block
return
}
newBlock = newBlockI.Msg.(types.EventDataNewBlock).Block
if newBlock.LastCommit.Size() == len(updatedVals) { if newBlock.LastCommit.Size() == len(updatedVals) {
css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height) css[j].Logger.Debug("waitForBlockWithUpdatedValsAndValidateIt: Got block", "height", newBlock.Height)
break LOOP break LOOP

View File

@ -17,7 +17,6 @@ import (
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -43,7 +42,7 @@ var crc32c = crc32.MakeTable(crc32.Castagnoli)
// Unmarshal and apply a single message to the consensus state as if it were // Unmarshal and apply a single message to the consensus state as if it were
// received in receiveRoutine. Lines that start with "#" are ignored. // received in receiveRoutine. Lines that start with "#" are ignored.
// NOTE: receiveRoutine should not be running. // NOTE: receiveRoutine should not be running.
func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepSub *tmpubsub.Subscription) error { func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepSub types.Subscription) error {
// Skip meta messages which exist for demarcating boundaries. // Skip meta messages which exist for demarcating boundaries.
if _, ok := msg.Msg.(EndHeightMessage); ok { if _, ok := msg.Msg.(EndHeightMessage); ok {
return nil return nil
@ -57,8 +56,8 @@ func (cs *ConsensusState) readReplayMessage(msg *TimedWALMessage, newStepSub *tm
ticker := time.After(time.Second * 2) ticker := time.After(time.Second * 2)
if newStepSub != nil { if newStepSub != nil {
select { select {
case mi := <-newStepSub.Out(): case mt := <-newStepSub.Out():
m2 := mi.Msg.(types.EventDataRoundState) m2 := mt.Msg().(types.EventDataRoundState)
if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step { if m.Height != m2.Height || m.Round != m2.Round || m.Step != m2.Step {
return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m) return fmt.Errorf("RoundState mismatch. Got %v; Expected %v", m2, m)
} }

View File

@ -16,7 +16,6 @@ import (
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -122,7 +121,7 @@ func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState sm.S
} }
// go back count steps by resetting the state and running (pb.count - count) steps // go back count steps by resetting the state and running (pb.count - count) steps
func (pb *playback) replayReset(count int, newStepSub *tmpubsub.Subscription) error { func (pb *playback) replayReset(count int, newStepSub types.Subscription) error {
pb.cs.Stop() pb.cs.Stop()
pb.cs.Wait() pb.cs.Wait()

View File

@ -131,10 +131,14 @@ func (s *Server) BufferCapacity() int {
return s.cmdsCap return s.cmdsCap
} }
// Subscribe creates a subscription for the given client. An error will be // Subscribe creates a subscription for the given client.
// returned to the caller if the context is canceled or if subscription already //
// exist for pair clientID and query. outCapacity can be used to set a // An error will be returned to the caller if the context is canceled or if
// capacity for Subscription#Out channel (1 by default). // subscription already exist for pair clientID and query.
//
// outCapacity can be used to set a capacity for Subscription#Out channel (1 by
// default). Panics if outCapacity is less than or equal to zero. If you want
// an unbuffered channel, use SubscribeUnbuffered.
func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, outCapacity ...int) (*Subscription, error) { func (s *Server) Subscribe(ctx context.Context, clientID string, query Query, outCapacity ...int) (*Subscription, error) {
outCap := 1 outCap := 1
if len(outCapacity) > 0 { if len(outCapacity) > 0 {

View File

@ -306,7 +306,7 @@ func benchmarkNClientsOneQuery(n int, b *testing.B) {
func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.MsgAndTags, msgAndArgs ...interface{}) { func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.MsgAndTags, msgAndArgs ...interface{}) {
select { select {
case actual := <-ch: case actual := <-ch:
assert.Equal(t, expected, actual.Msg, msgAndArgs...) assert.Equal(t, expected, actual.Msg(), msgAndArgs...)
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected) t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected)
debug.PrintStack() debug.PrintStack()

View File

@ -55,6 +55,16 @@ func (s *Subscription) Err() error {
// MsgAndTags glues a message and tags together. // MsgAndTags glues a message and tags together.
type MsgAndTags struct { type MsgAndTags struct {
Msg interface{} msg interface{}
Tags TagMap tags TagMap
}
// Msg returns a message.
func (mt MsgAndTags) Msg() interface{} {
return mt.msg
}
// Tags returns tags.
func (mt MsgAndTags) Tags() TagMap {
return mt.tags
} }

View File

@ -676,7 +676,11 @@ func (n *Node) startRPC() ([]net.Listener, error) {
for i, listenAddr := range listenAddrs { for i, listenAddr := range listenAddrs {
mux := http.NewServeMux() mux := http.NewServeMux()
rpcLogger := n.Logger.With("module", "rpc-server") rpcLogger := n.Logger.With("module", "rpc-server")
wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.EventSubscriber(n.eventBus)) wm := rpcserver.NewWebsocketManager(rpccore.Routes, coreCodec, rpcserver.DisconnectCallback(func(remoteAddr string) {
// Unsubscribe a client upon disconnect since it won't be able to do it
// itself.
n.eventBus.UnsubscribeAll(context.TODO(), remoteAddr)
}))
wm.SetLogger(rpcLogger.With("protocol", "websocket")) wm.SetLogger(rpcLogger.With("protocol", "websocket"))
mux.HandleFunc("/websocket", wm.WebsocketHandler) mux.HandleFunc("/websocket", wm.WebsocketHandler)
rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger) rpcserver.RegisterRPCFuncs(mux, rpccore.Routes, coreCodec, rpcLogger)

View File

@ -41,11 +41,12 @@ func TestNodeStartStop(t *testing.T) {
t.Logf("Started node %v", n.sw.NodeInfo()) t.Logf("Started node %v", n.sw.NodeInfo())
// wait for the node to produce a block // wait for the node to produce a block
blockCh := make(chan interface{}) blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock)
err = n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock, blockCh)
require.NoError(t, err) require.NoError(t, err)
select { select {
case <-blockCh: case <-blocksSub.Out():
case <-blocksSub.Cancelled():
t.Fatal("blocksSub was cancelled")
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for the node to produce a block") t.Fatal("timed out waiting for the node to produce a block")
} }

View File

@ -62,29 +62,19 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type
evts := make(chan interface{}, 1) evts := make(chan interface{}, 1)
// register for the next event of this type // register for the next event of this type
query := types.QueryForEvent(evtTyp) sub, err := c.Subscribe(ctx, subscriber, types.QueryForEvent(evtTyp))
err := c.Subscribe(ctx, subscriber, query, evts)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to subscribe") return nil, errors.Wrap(err, "failed to subscribe")
} }
// make sure to unregister after the test is over // make sure to unregister after the test is over
defer func() { defer c.UnsubscribeAll(ctx, subscriber)
// drain evts to make sure we don't block
LOOP:
for {
select {
case <-evts:
default:
break LOOP
}
}
c.UnsubscribeAll(ctx, subscriber)
}()
select { select {
case evt := <-evts: case mt := <-sub.Out():
return evt.(types.TMEventData), nil return mt.Msg().(types.TMEventData), nil
case <-sub.Cancelled():
return nil, errors.New("subscription was cancelled")
case <-ctx.Done(): case <-ctx.Done():
return nil, errors.New("timed out waiting for event") return nil, errors.New("timed out waiting for event")
} }

View File

@ -140,8 +140,8 @@ func (Local) TxSearch(query string, prove bool, page, perPage int) (*ctypes.Resu
return core.TxSearch(query, prove, page, perPage) return core.TxSearch(query, prove, page, perPage)
} }
func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error { func (c *Local) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (types.Subscription, error) {
return c.EventBus.Subscribe(ctx, subscriber, query, out) return c.EventBus.Subscribe(ctx, subscriber, query, outCapacity...)
} }
func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error { func (c *Local) Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error {

View File

@ -101,7 +101,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
defer cancel() defer cancel()
sub, err := eventBusFor(wsCtx).Subscribe(ctx, addr, q) sub, err := eventBus.Subscribe(ctx, addr, q)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -109,13 +109,13 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri
go func() { go func() {
for { for {
select { select {
case event := <-sub.Out(): case mt := <-sub.Out():
tmResult := &ctypes.ResultEvent{query, event.Msg.(tmtypes.TMEventData)} resultEvent := &ctypes.ResultEvent{query, mt.Msg().(tmtypes.TMEventData)}
wsCtx.TryWriteRPCResponse( wsCtx.TryWriteRPCResponse(
rpctypes.NewRPCSuccessResponse( rpctypes.NewRPCSuccessResponse(
wsCtx.Codec(), wsCtx.Codec(),
rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)), rpctypes.JSONRPCStringID(fmt.Sprintf("%v#event", wsCtx.Request.ID)),
tmResult, resultEvent,
)) ))
case <-sub.Cancelled(): case <-sub.Cancelled():
wsCtx.TryWriteRPCResponse( wsCtx.TryWriteRPCResponse(
@ -168,7 +168,7 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsub
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to parse query") return nil, errors.Wrap(err, "failed to parse query")
} }
err = eventBusFor(wsCtx).Unsubscribe(context.Background(), addr, q) err = eventBus.Unsubscribe(context.Background(), addr, q)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -202,17 +202,9 @@ func Unsubscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultUnsub
func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) { func UnsubscribeAll(wsCtx rpctypes.WSRPCContext) (*ctypes.ResultUnsubscribe, error) {
addr := wsCtx.GetRemoteAddr() addr := wsCtx.GetRemoteAddr()
logger.Info("Unsubscribe from all", "remote", addr) logger.Info("Unsubscribe from all", "remote", addr)
err := eventBusFor(wsCtx).UnsubscribeAll(context.Background(), addr) err := eventBus.UnsubscribeAll(context.Background(), addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &ctypes.ResultUnsubscribe{}, nil return &ctypes.ResultUnsubscribe{}, nil
} }
func eventBusFor(wsCtx rpctypes.WSRPCContext) tmtypes.EventBusSubscriber {
es := wsCtx.GetEventSubscriber()
if es == nil {
es = eventBus
}
return es
}

View File

@ -201,8 +201,8 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
// TODO: configurable? // TODO: configurable?
var deliverTxTimeout = rpcserver.WriteTimeout / 2 var deliverTxTimeout = rpcserver.WriteTimeout / 2
select { select {
case deliverTxResMsg := <-deliverTxSub.Out(): // The tx was included in a block. case mt := <-deliverTxSub.Out(): // The tx was included in a block.
deliverTxRes := deliverTxResMsg.Msg.(types.EventDataTx) deliverTxRes := mt.Msg().(types.EventDataTx)
return &ctypes.ResultBroadcastTxCommit{ return &ctypes.ResultBroadcastTxCommit{
CheckTx: *checkTxRes, CheckTx: *checkTxRes,
DeliverTx: deliverTxRes.Result, DeliverTx: deliverTxRes.Result,

View File

@ -2,7 +2,6 @@ package rpcserver
import ( import (
"bytes" "bytes"
"context"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
@ -434,8 +433,8 @@ type wsConnection struct {
// Send pings to server with this period. Must be less than readWait, but greater than zero. // Send pings to server with this period. Must be less than readWait, but greater than zero.
pingPeriod time.Duration pingPeriod time.Duration
// object that is used to subscribe / unsubscribe from events // see DisconnectCallback option.
eventSub types.EventSubscriber disconnectCallback func(remoteAddr string)
} }
// NewWSConnection wraps websocket.Conn. // NewWSConnection wraps websocket.Conn.
@ -468,12 +467,11 @@ func NewWSConnection(
return wsc return wsc
} }
// EventSubscriber sets object that is used to subscribe / unsubscribe from // DisconnectCallback can be used optionally to set a callback, which will be
// events - not Goroutine-safe. If none given, default node's eventBus will be // called upon disconnect - not Goroutine-safe.
// used. func DisconnectCallback(cb func(remoteAddr string)) func(*wsConnection) {
func EventSubscriber(eventSub types.EventSubscriber) func(*wsConnection) {
return func(wsc *wsConnection) { return func(wsc *wsConnection) {
wsc.eventSub = eventSub wsc.disconnectCallback = cb
} }
} }
@ -526,8 +524,8 @@ func (wsc *wsConnection) OnStart() error {
func (wsc *wsConnection) OnStop() { func (wsc *wsConnection) OnStop() {
// Both read and write loops close the websocket connection when they exit their loops. // Both read and write loops close the websocket connection when they exit their loops.
// The writeChan is never closed, to allow WriteRPCResponse() to fail. // The writeChan is never closed, to allow WriteRPCResponse() to fail.
if wsc.eventSub != nil { if wsc.disconnectCallback != nil {
wsc.eventSub.UnsubscribeAll(context.TODO(), wsc.remoteAddr) wsc.disconnectCallback(wsc.remoteAddr)
} }
} }
@ -537,11 +535,6 @@ func (wsc *wsConnection) GetRemoteAddr() string {
return wsc.remoteAddr return wsc.remoteAddr
} }
// GetEventSubscriber implements WSRPCConnection by returning event subscriber.
func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber {
return wsc.eventSub
}
// WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted. // WriteRPCResponse pushes a response to the writeChan, and blocks until it is accepted.
// It implements WSRPCConnection. It is Goroutine-safe. // It implements WSRPCConnection. It is Goroutine-safe.
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) { func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {

View File

@ -1,7 +1,6 @@
package rpctypes package rpctypes
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"reflect" "reflect"
@ -10,8 +9,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
) )
// a wrapper to emulate a sum type: jsonrpcid = string | int // a wrapper to emulate a sum type: jsonrpcid = string | int
@ -240,17 +237,9 @@ type WSRPCConnection interface {
GetRemoteAddr() string GetRemoteAddr() string
WriteRPCResponse(resp RPCResponse) WriteRPCResponse(resp RPCResponse)
TryWriteRPCResponse(resp RPCResponse) bool TryWriteRPCResponse(resp RPCResponse) bool
GetEventSubscriber() EventSubscriber
Codec() *amino.Codec Codec() *amino.Codec
} }
// EventSubscriber mirros tendermint/tendermint/types.EventBusSubscriber
type EventSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, out chan<- interface{}) error
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error
}
// websocket-only RPCFuncs take this as the first parameter. // websocket-only RPCFuncs take this as the first parameter.
type WSRPCContext struct { type WSRPCContext struct {
Request RPCRequest Request RPCRequest

View File

@ -341,15 +341,15 @@ func TestEndBlockValidatorUpdates(t *testing.T) {
// test we threw an event // test we threw an event
select { select {
case e := <-updatesSub.Out(): case mt := <-updatesSub.Out():
event, ok := e.Msg.(types.EventDataValidatorSetUpdates) event, ok := mt.Msg().(types.EventDataValidatorSetUpdates)
require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", e) require.True(t, ok, "Expected event of type EventDataValidatorSetUpdates, got %T", mt.Msg())
if assert.NotEmpty(t, event.ValidatorUpdates) { if assert.NotEmpty(t, event.ValidatorUpdates) {
assert.Equal(t, pubkey, event.ValidatorUpdates[0].PubKey) assert.Equal(t, pubkey, event.ValidatorUpdates[0].PubKey)
assert.EqualValues(t, 10, event.ValidatorUpdates[0].VotingPower) assert.EqualValues(t, 10, event.ValidatorUpdates[0].VotingPower)
} }
case <-updatesSub.Cancelled(): case <-updatesSub.Cancelled():
t.Fatal("updatesSub was cancelled.") t.Fatal(fmt.Sprintf("updatesSub was cancelled (reason: %v)", updatesSub.Err()))
case <-time.After(1 * time.Second): case <-time.After(1 * time.Second):
t.Fatal("Did not receive EventValidatorSetUpdates within 1 sec.") t.Fatal("Did not receive EventValidatorSetUpdates within 1 sec.")
} }

View File

@ -44,13 +44,13 @@ func (is *IndexerService) OnStart() error {
go func() { go func() {
for { for {
select { select {
case msgAndTags := <-blockHeadersSub.Out(): case mt := <-blockHeadersSub.Out():
header := msgAndTags.Msg.(types.EventDataNewBlockHeader).Header header := mt.Msg().(types.EventDataNewBlockHeader).Header
batch := NewBatch(header.NumTxs) batch := NewBatch(header.NumTxs)
for i := int64(0); i < header.NumTxs; i++ { for i := int64(0); i < header.NumTxs; i++ {
select { select {
case msgAndTags := <-txsSub.Out(): case mt2 := <-txsSub.Out():
txResult := msgAndTags.Msg.(types.EventDataTx).TxResult txResult := mt2.Msg().(types.EventDataTx).TxResult
batch.Add(&txResult) batch.Add(&txResult)
case <-txsSub.Cancelled(): case <-txsSub.Cancelled():
is.Logger.Error("Failed to index a block. txsSub was cancelled. Did the Tendermint stop?", is.Logger.Error("Failed to index a block. txsSub was cancelled. Did the Tendermint stop?",

View File

@ -12,11 +12,17 @@ import (
const defaultCapacity = 0 const defaultCapacity = 0
type EventBusSubscriber interface { type EventBusSubscriber interface {
Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (*tmpubsub.Subscription, error) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error)
Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error Unsubscribe(ctx context.Context, subscriber string, query tmpubsub.Query) error
UnsubscribeAll(ctx context.Context, subscriber string) error UnsubscribeAll(ctx context.Context, subscriber string) error
} }
type Subscription interface {
Out() <-chan tmpubsub.MsgAndTags
Cancelled() <-chan struct{}
Err() error
}
// EventBus is a common bus for all events going through the system. All calls // EventBus is a common bus for all events going through the system. All calls
// are proxied to underlying pubsub server. All events must be published using // are proxied to underlying pubsub server. All events must be published using
// EventBus to ensure correct data types. // EventBus to ensure correct data types.
@ -52,7 +58,7 @@ func (b *EventBus) OnStop() {
b.pubsub.Stop() b.pubsub.Stop()
} }
func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (*tmpubsub.Subscription, error) { func (b *EventBus) Subscribe(ctx context.Context, subscriber string, query tmpubsub.Query, outCapacity ...int) (Subscription, error) {
return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...) return b.pubsub.Subscribe(ctx, subscriber, query, outCapacity...)
} }

View File

@ -26,13 +26,13 @@ func TestEventBusPublishEventTx(t *testing.T) {
// PublishEventTx adds all these 3 tags, so the query below should work // PublishEventTx adds all these 3 tags, so the query below should work
query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND baz=1", tx.Hash()) query := fmt.Sprintf("tm.event='Tx' AND tx.height=1 AND tx.hash='%X' AND baz=1", tx.Hash())
txEventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query)) txsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err) require.NoError(t, err)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
e := <-txEventsSub.Out() mt := <-txsSub.Out()
edt := e.Msg.(EventDataTx) edt := mt.Msg().(EventDataTx)
assert.Equal(t, int64(1), edt.Height) assert.Equal(t, int64(1), edt.Height)
assert.Equal(t, uint32(0), edt.Index) assert.Equal(t, uint32(0), edt.Index)
assert.Equal(t, tx, edt.Tx) assert.Equal(t, tx, edt.Tx)
@ -67,13 +67,13 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
// PublishEventNewBlock adds the tm.event tag, so the query below should work // PublishEventNewBlock adds the tm.event tag, so the query below should work
query := "tm.event='NewBlock' AND baz=1 AND foz=2" query := "tm.event='NewBlock' AND baz=1 AND foz=2"
txEventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query)) blocksSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err) require.NoError(t, err)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
e := <-txEventsSub.Out() mt := <-blocksSub.Out()
edt := e.Msg.(EventDataNewBlock) edt := mt.Msg().(EventDataNewBlock)
assert.Equal(t, block, edt.Block) assert.Equal(t, block, edt.Block)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock) assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
@ -106,13 +106,13 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {
// PublishEventNewBlockHeader adds the tm.event tag, so the query below should work // PublishEventNewBlockHeader adds the tm.event tag, so the query below should work
query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2" query := "tm.event='NewBlockHeader' AND baz=1 AND foz=2"
txEventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query)) headersSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.MustParse(query))
require.NoError(t, err) require.NoError(t, err)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
e := <-txEventsSub.Out() mt := <-headersSub.Out()
edt := e.Msg.(EventDataNewBlockHeader) edt := mt.Msg().(EventDataNewBlockHeader)
assert.Equal(t, block.Header, edt.Header) assert.Equal(t, block.Header, edt.Header)
assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock) assert.Equal(t, resultBeginBlock, edt.ResultBeginBlock)
assert.Equal(t, resultEndBlock, edt.ResultEndBlock) assert.Equal(t, resultEndBlock, edt.ResultEndBlock)
@ -139,14 +139,14 @@ func TestEventBusPublish(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer eventBus.Stop() defer eventBus.Stop()
eventsSub, err := eventBus.Subscribe(context.Background(), "test", tmquery.Empty{}) sub, err := eventBus.Subscribe(context.Background(), "test", tmquery.Empty{})
require.NoError(t, err) require.NoError(t, err)
const numEventsExpected = 14 const numEventsExpected = 14
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
numEvents := 0 numEvents := 0
for range eventsSub.Out() { for range sub.Out() {
numEvents++ numEvents++
if numEvents >= numEventsExpected { if numEvents >= numEventsExpected {
close(done) close(done)