mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-29 00:32:14 +00:00
* limit number of /subscribe clients and queries per client Add the following config variables (under [rpc] section): * max_subscription_clients * max_subscriptions_per_client * timeout_broadcast_tx_commit Fixes #2826 new HTTPClient interface for subscriptions finalize HTTPClient events interface remove EventSubscriber fix data race ``` WARNING: DATA RACE Read at 0x00c000a36060 by goroutine 129: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe.func1() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:168 +0x1f0 Previous write at 0x00c000a36060 by goroutine 132: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:191 +0x4e0 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 129 (running) created at: github.com/tendermint/tendermint/rpc/client.(*Local).Subscribe() /go/src/github.com/tendermint/tendermint/rpc/client/localclient.go:164 +0x4b7 github.com/tendermint/tendermint/rpc/client.WaitForOneEvent() /go/src/github.com/tendermint/tendermint/rpc/client/helpers.go:64 +0x178 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync.func1() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:139 +0x298 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 Goroutine 132 (running) created at: testing.(*T).Run() /usr/local/go/src/testing/testing.go:878 +0x659 github.com/tendermint/tendermint/rpc/client_test.TestTxEventsSentWithBroadcastTxSync() /go/src/github.com/tendermint/tendermint/rpc/client/event_test.go:119 +0x186 testing.tRunner() /usr/local/go/src/testing/testing.go:827 +0x162 ================== ``` lite client works (tested manually) godoc comments httpclient: do not close the out channel use TimeoutBroadcastTxCommit no timeout for unsubscribe but 1s Local (5s HTTP) timeout for resubscribe format code change Subscribe#out cap to 1 and replace config vars with RPCConfig TimeoutBroadcastTxCommit can't be greater than rpcserver.WriteTimeout rpc: Context as first parameter to all functions reformat code fixes after my own review fixes after Ethan's review add test stubs fix config.toml * fixes after manual testing - rpc: do not recommend to use BroadcastTxCommit because it's slow and wastes Tendermint resources (pubsub) - rpc: better error in Subscribe and BroadcastTxCommit - HTTPClient: do not resubscribe if err = ErrAlreadySubscribed * fixes after Ismail's review * Update rpc/grpc/grpc_test.go Co-Authored-By: melekes <anton.kalyaev@gmail.com>
350 lines
9.3 KiB
Go
350 lines
9.3 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
|
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
//-----------------------------------------------------------------------------
|
|
// NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!)
|
|
|
|
// Returns right away, with no response. Does not wait for CheckTx nor
|
|
// DeliverTx results.
|
|
//
|
|
// Please refer to
|
|
// https://tendermint.com/docs/tendermint-core/using-tendermint.html#formatting
|
|
// for formatting/encoding rules.
|
|
//
|
|
//
|
|
// ```shell
|
|
// curl 'localhost:26657/broadcast_tx_async?tx="123"'
|
|
// ```
|
|
//
|
|
// ```go
|
|
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
|
|
// err := client.Start()
|
|
// if err != nil {
|
|
// // handle error
|
|
// }
|
|
// defer client.Stop()
|
|
// result, err := client.BroadcastTxAsync("123")
|
|
// ```
|
|
//
|
|
// > The above command returns JSON structured like this:
|
|
//
|
|
// ```json
|
|
// {
|
|
// "error": "",
|
|
// "result": {
|
|
// "hash": "E39AAB7A537ABAA237831742DCE1117F187C3C52",
|
|
// "log": "",
|
|
// "data": "",
|
|
// "code": "0"
|
|
// },
|
|
// "id": "",
|
|
// "jsonrpc": "2.0"
|
|
// }
|
|
// ```
|
|
//
|
|
// ### Query Parameters
|
|
//
|
|
// | Parameter | Type | Default | Required | Description |
|
|
// |-----------+------+---------+----------+-----------------|
|
|
// | tx | Tx | nil | true | The transaction |
|
|
func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
|
|
err := mempool.CheckTx(tx, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
|
|
}
|
|
|
|
// Returns with the response from CheckTx. Does not wait for DeliverTx result.
|
|
//
|
|
// Please refer to
|
|
// https://tendermint.com/docs/tendermint-core/using-tendermint.html#formatting
|
|
// for formatting/encoding rules.
|
|
//
|
|
// ```shell
|
|
// curl 'localhost:26657/broadcast_tx_sync?tx="456"'
|
|
// ```
|
|
//
|
|
// ```go
|
|
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
|
|
// err := client.Start()
|
|
// if err != nil {
|
|
// // handle error
|
|
// }
|
|
// defer client.Stop()
|
|
// result, err := client.BroadcastTxSync("456")
|
|
// ```
|
|
//
|
|
// > The above command returns JSON structured like this:
|
|
//
|
|
// ```json
|
|
// {
|
|
// "jsonrpc": "2.0",
|
|
// "id": "",
|
|
// "result": {
|
|
// "code": "0",
|
|
// "data": "",
|
|
// "log": "",
|
|
// "hash": "0D33F2F03A5234F38706E43004489E061AC40A2E"
|
|
// },
|
|
// "error": ""
|
|
// }
|
|
// ```
|
|
//
|
|
// ### Query Parameters
|
|
//
|
|
// | Parameter | Type | Default | Required | Description |
|
|
// |-----------+------+---------+----------+-----------------|
|
|
// | tx | Tx | nil | true | The transaction |
|
|
func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
|
|
resCh := make(chan *abci.Response, 1)
|
|
err := mempool.CheckTx(tx, func(res *abci.Response) {
|
|
resCh <- res
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
res := <-resCh
|
|
r := res.GetCheckTx()
|
|
return &ctypes.ResultBroadcastTx{
|
|
Code: r.Code,
|
|
Data: r.Data,
|
|
Log: r.Log,
|
|
Hash: tx.Hash(),
|
|
}, nil
|
|
}
|
|
|
|
// Returns with the responses from CheckTx and DeliverTx.
|
|
//
|
|
// IMPORTANT: use only for testing and development. In production, use
|
|
// BroadcastTxSync or BroadcastTxAsync. You can subscribe for the transaction
|
|
// result using JSONRPC via a websocket. See
|
|
// https://tendermint.com/docs/app-dev/subscribing-to-events-via-websocket.html
|
|
//
|
|
// CONTRACT: only returns error if mempool.CheckTx() errs or if we timeout
|
|
// waiting for tx to commit.
|
|
//
|
|
// If CheckTx or DeliverTx fail, no error will be returned, but the returned result
|
|
// will contain a non-OK ABCI code.
|
|
//
|
|
// Please refer to
|
|
// https://tendermint.com/docs/tendermint-core/using-tendermint.html#formatting
|
|
// for formatting/encoding rules.
|
|
//
|
|
//
|
|
// ```shell
|
|
// curl 'localhost:26657/broadcast_tx_commit?tx="789"'
|
|
// ```
|
|
//
|
|
// ```go
|
|
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
|
|
// err := client.Start()
|
|
// if err != nil {
|
|
// // handle error
|
|
// }
|
|
// defer client.Stop()
|
|
// result, err := client.BroadcastTxCommit("789")
|
|
// ```
|
|
//
|
|
// > The above command returns JSON structured like this:
|
|
//
|
|
// ```json
|
|
// {
|
|
// "error": "",
|
|
// "result": {
|
|
// "height": "26682",
|
|
// "hash": "75CA0F856A4DA078FC4911580360E70CEFB2EBEE",
|
|
// "deliver_tx": {
|
|
// "log": "",
|
|
// "data": "",
|
|
// "code": "0"
|
|
// },
|
|
// "check_tx": {
|
|
// "log": "",
|
|
// "data": "",
|
|
// "code": "0"
|
|
// }
|
|
// },
|
|
// "id": "",
|
|
// "jsonrpc": "2.0"
|
|
// }
|
|
// ```
|
|
//
|
|
// ### Query Parameters
|
|
//
|
|
// | Parameter | Type | Default | Required | Description |
|
|
// |-----------+------+---------+----------+-----------------|
|
|
// | tx | Tx | nil | true | The transaction |
|
|
func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
|
|
subscriber := ctx.RemoteAddr()
|
|
|
|
if eventBus.NumClients() >= config.MaxSubscriptionClients {
|
|
return nil, fmt.Errorf("max_subscription_clients %d reached", config.MaxSubscriptionClients)
|
|
} else if eventBus.NumClientSubscriptions(subscriber) >= config.MaxSubscriptionsPerClient {
|
|
return nil, fmt.Errorf("max_subscriptions_per_client %d reached", config.MaxSubscriptionsPerClient)
|
|
}
|
|
|
|
// Subscribe to tx being committed in block.
|
|
subCtx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
|
|
defer cancel()
|
|
q := types.EventQueryTxFor(tx)
|
|
deliverTxSub, err := eventBus.Subscribe(subCtx, subscriber, q)
|
|
if err != nil {
|
|
err = errors.Wrap(err, "failed to subscribe to tx")
|
|
logger.Error("Error on broadcast_tx_commit", "err", err)
|
|
return nil, err
|
|
}
|
|
defer eventBus.Unsubscribe(context.Background(), subscriber, q)
|
|
|
|
// Broadcast tx and wait for CheckTx result
|
|
checkTxResCh := make(chan *abci.Response, 1)
|
|
err = mempool.CheckTx(tx, func(res *abci.Response) {
|
|
checkTxResCh <- res
|
|
})
|
|
if err != nil {
|
|
logger.Error("Error on broadcastTxCommit", "err", err)
|
|
return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)
|
|
}
|
|
checkTxResMsg := <-checkTxResCh
|
|
checkTxRes := checkTxResMsg.GetCheckTx()
|
|
if checkTxRes.Code != abci.CodeTypeOK {
|
|
return &ctypes.ResultBroadcastTxCommit{
|
|
CheckTx: *checkTxRes,
|
|
DeliverTx: abci.ResponseDeliverTx{},
|
|
Hash: tx.Hash(),
|
|
}, nil
|
|
}
|
|
|
|
// Wait for the tx to be included in a block or timeout.
|
|
select {
|
|
case msg := <-deliverTxSub.Out(): // The tx was included in a block.
|
|
deliverTxRes := msg.Data().(types.EventDataTx)
|
|
return &ctypes.ResultBroadcastTxCommit{
|
|
CheckTx: *checkTxRes,
|
|
DeliverTx: deliverTxRes.Result,
|
|
Hash: tx.Hash(),
|
|
Height: deliverTxRes.Height,
|
|
}, nil
|
|
case <-deliverTxSub.Cancelled():
|
|
var reason string
|
|
if deliverTxSub.Err() == nil {
|
|
reason = "Tendermint exited"
|
|
} else {
|
|
reason = deliverTxSub.Err().Error()
|
|
}
|
|
err = fmt.Errorf("deliverTxSub was cancelled (reason: %s)", reason)
|
|
logger.Error("Error on broadcastTxCommit", "err", err)
|
|
return &ctypes.ResultBroadcastTxCommit{
|
|
CheckTx: *checkTxRes,
|
|
DeliverTx: abci.ResponseDeliverTx{},
|
|
Hash: tx.Hash(),
|
|
}, err
|
|
case <-time.After(config.TimeoutBroadcastTxCommit):
|
|
err = errors.New("Timed out waiting for tx to be included in a block")
|
|
logger.Error("Error on broadcastTxCommit", "err", err)
|
|
return &ctypes.ResultBroadcastTxCommit{
|
|
CheckTx: *checkTxRes,
|
|
DeliverTx: abci.ResponseDeliverTx{},
|
|
Hash: tx.Hash(),
|
|
}, err
|
|
}
|
|
}
|
|
|
|
// Get unconfirmed transactions (maximum ?limit entries) including their number.
|
|
//
|
|
// ```shell
|
|
// curl 'localhost:26657/unconfirmed_txs'
|
|
// ```
|
|
//
|
|
// ```go
|
|
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
|
|
// err := client.Start()
|
|
// if err != nil {
|
|
// // handle error
|
|
// }
|
|
// defer client.Stop()
|
|
// result, err := client.UnconfirmedTxs()
|
|
// ```
|
|
//
|
|
// > The above command returns JSON structured like this:
|
|
//
|
|
// ```json
|
|
// {
|
|
// "result" : {
|
|
// "txs" : [],
|
|
// "total_bytes" : "0",
|
|
// "n_txs" : "0",
|
|
// "total" : "0"
|
|
// },
|
|
// "jsonrpc" : "2.0",
|
|
// "id" : ""
|
|
// }
|
|
// ```
|
|
//
|
|
// ### Query Parameters
|
|
//
|
|
// | Parameter | Type | Default | Required | Description |
|
|
// |-----------+------+---------+----------+--------------------------------------|
|
|
// | limit | int | 30 | false | Maximum number of entries (max: 100) |
|
|
// ```
|
|
func UnconfirmedTxs(ctx *rpctypes.Context, limit int) (*ctypes.ResultUnconfirmedTxs, error) {
|
|
// reuse per_page validator
|
|
limit = validatePerPage(limit)
|
|
|
|
txs := mempool.ReapMaxTxs(limit)
|
|
return &ctypes.ResultUnconfirmedTxs{
|
|
Count: len(txs),
|
|
Total: mempool.Size(),
|
|
TotalBytes: mempool.TxsBytes(),
|
|
Txs: txs}, nil
|
|
}
|
|
|
|
// Get number of unconfirmed transactions.
|
|
//
|
|
// ```shell
|
|
// curl 'localhost:26657/num_unconfirmed_txs'
|
|
// ```
|
|
//
|
|
// ```go
|
|
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
|
|
// err := client.Start()
|
|
// if err != nil {
|
|
// // handle error
|
|
// }
|
|
// defer client.Stop()
|
|
// result, err := client.UnconfirmedTxs()
|
|
// ```
|
|
//
|
|
// > The above command returns JSON structured like this:
|
|
//
|
|
// ```json
|
|
// {
|
|
// "jsonrpc" : "2.0",
|
|
// "id" : "",
|
|
// "result" : {
|
|
// "n_txs" : "0",
|
|
// "total_bytes" : "0",
|
|
// "txs" : null,
|
|
// "total" : "0"
|
|
// }
|
|
// }
|
|
// ```
|
|
func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) {
|
|
return &ctypes.ResultUnconfirmedTxs{
|
|
Count: mempool.Size(),
|
|
Total: mempool.Size(),
|
|
TotalBytes: mempool.TxsBytes()}, nil
|
|
}
|