mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 06:42:16 +00:00
* limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
138 lines
3.8 KiB
Go
138 lines
3.8 KiB
Go
package client_test
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
cmn "github.com/tendermint/tendermint/libs/common"
|
|
"github.com/tendermint/tendermint/rpc/client"
|
|
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
var waitForEventTimeout = 5 * time.Second
|
|
|
|
// MakeTxKV returns a text transaction, allong with expected key, value pair
|
|
func MakeTxKV() ([]byte, []byte, []byte) {
|
|
k := []byte(cmn.RandStr(8))
|
|
v := []byte(cmn.RandStr(8))
|
|
return k, v, append(k, append([]byte("="), v...)...)
|
|
}
|
|
|
|
func TestHeaderEvents(t *testing.T) {
|
|
for i, c := range GetClients() {
|
|
i, c := i, c // capture params
|
|
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
|
|
// start for this test it if it wasn't already running
|
|
if !c.IsRunning() {
|
|
// if so, then we start it, listen, and stop it.
|
|
err := c.Start()
|
|
require.Nil(t, err, "%d: %+v", i, err)
|
|
defer c.Stop()
|
|
}
|
|
|
|
evtTyp := types.EventNewBlockHeader
|
|
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
|
|
require.Nil(t, err, "%d: %+v", i, err)
|
|
_, ok := evt.(types.EventDataNewBlockHeader)
|
|
require.True(t, ok, "%d: %#v", i, evt)
|
|
// TODO: more checks...
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestBlockEvents(t *testing.T) {
|
|
for i, c := range GetClients() {
|
|
i, c := i, c // capture params
|
|
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
|
|
|
|
// start for this test it if it wasn't already running
|
|
if !c.IsRunning() {
|
|
// if so, then we start it, listen, and stop it.
|
|
err := c.Start()
|
|
require.Nil(t, err, "%d: %+v", i, err)
|
|
defer c.Stop()
|
|
}
|
|
|
|
// listen for a new block; ensure height increases by 1
|
|
var firstBlockHeight int64
|
|
for j := 0; j < 3; j++ {
|
|
evtTyp := types.EventNewBlock
|
|
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
|
|
require.Nil(t, err, "%d: %+v", j, err)
|
|
blockEvent, ok := evt.(types.EventDataNewBlock)
|
|
require.True(t, ok, "%d: %#v", j, evt)
|
|
|
|
block := blockEvent.Block
|
|
if j == 0 {
|
|
firstBlockHeight = block.Header.Height
|
|
continue
|
|
}
|
|
|
|
require.Equal(t, block.Header.Height, firstBlockHeight+int64(j))
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { testTxEventsSent(t, "async") }
|
|
func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { testTxEventsSent(t, "sync") }
|
|
|
|
func testTxEventsSent(t *testing.T, broadcastMethod string) {
|
|
for i, c := range GetClients() {
|
|
i, c := i, c // capture params
|
|
t.Run(reflect.TypeOf(c).String(), func(t *testing.T) {
|
|
|
|
// start for this test it if it wasn't already running
|
|
if !c.IsRunning() {
|
|
// if so, then we start it, listen, and stop it.
|
|
err := c.Start()
|
|
require.Nil(t, err, "%d: %+v", i, err)
|
|
defer c.Stop()
|
|
}
|
|
|
|
// make the tx
|
|
_, _, tx := MakeTxKV()
|
|
evtTyp := types.EventTx
|
|
|
|
// send
|
|
var (
|
|
txres *ctypes.ResultBroadcastTx
|
|
err error
|
|
)
|
|
switch broadcastMethod {
|
|
case "async":
|
|
txres, err = c.BroadcastTxAsync(tx)
|
|
case "sync":
|
|
txres, err = c.BroadcastTxSync(tx)
|
|
default:
|
|
panic(fmt.Sprintf("Unknown broadcastMethod %s", broadcastMethod))
|
|
}
|
|
|
|
require.NoError(t, err)
|
|
require.Equal(t, txres.Code, abci.CodeTypeOK)
|
|
|
|
// and wait for confirmation
|
|
evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout)
|
|
require.Nil(t, err, "%d: %+v", i, err)
|
|
// and make sure it has the proper info
|
|
txe, ok := evt.(types.EventDataTx)
|
|
require.True(t, ok, "%d: %#v", i, evt)
|
|
// make sure this is the proper tx
|
|
require.EqualValues(t, tx, txe.Tx)
|
|
require.True(t, txe.Result.IsOK())
|
|
})
|
|
}
|
|
}
|
|
|
|
// Test HTTPClient resubscribes upon disconnect && subscription error.
|
|
// Test Local client resubscribes upon subscription error.
|
|
func TestClientsResubscribe(t *testing.T) {
|
|
// TODO(melekes)
|
|
}
|