mirror of
https://github.com/fluencelabs/tendermint
synced 2025-05-02 01:52:16 +00:00
deliverTxResCh, like any other eventBus (pubsub) channel, is closed when eventBus is stopped. We must check if the channel is still open. The alternative approach is to not close any channels, which seems a bit odd. Fixes #2408
288 lines
7.2 KiB
Go
288 lines
7.2 KiB
Go
package core
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
abci "github.com/tendermint/tendermint/abci/types"
|
|
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
|
rpcserver "github.com/tendermint/tendermint/rpc/lib/server"
|
|
"github.com/tendermint/tendermint/types"
|
|
)
|
|
|
|
//-----------------------------------------------------------------------------
|
|
// NOTE: tx should be signed, but this is only checked at the app level (not by Tendermint!)
|
|
|
|
// Returns right away, with no response
|
|
//
|
|
// ```shell
|
|
// curl 'localhost:26657/broadcast_tx_async?tx="123"'
|
|
// ```
|
|
//
|
|
// ```go
|
|
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
|
|
// result, err := client.BroadcastTxAsync("123")
|
|
// ```
|
|
//
|
|
// > The above command returns JSON structured like this:
|
|
//
|
|
// ```json
|
|
// {
|
|
// "error": "",
|
|
// "result": {
|
|
// "hash": "E39AAB7A537ABAA237831742DCE1117F187C3C52",
|
|
// "log": "",
|
|
// "data": "",
|
|
// "code": "0"
|
|
// },
|
|
// "id": "",
|
|
// "jsonrpc": "2.0"
|
|
// }
|
|
// ```
|
|
//
|
|
// ### Query Parameters
|
|
//
|
|
// | Parameter | Type | Default | Required | Description |
|
|
// |-----------+------+---------+----------+-----------------|
|
|
// | tx | Tx | nil | true | The transaction |
|
|
func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
|
|
err := mempool.CheckTx(tx, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil
|
|
}
|
|
|
|
// Returns with the response from CheckTx.
|
|
//
|
|
// ```shell
|
|
// curl 'localhost:26657/broadcast_tx_sync?tx="456"'
|
|
// ```
|
|
//
|
|
// ```go
|
|
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
|
|
// result, err := client.BroadcastTxSync("456")
|
|
// ```
|
|
//
|
|
// > The above command returns JSON structured like this:
|
|
//
|
|
// ```json
|
|
// {
|
|
// "jsonrpc": "2.0",
|
|
// "id": "",
|
|
// "result": {
|
|
// "code": "0",
|
|
// "data": "",
|
|
// "log": "",
|
|
// "hash": "0D33F2F03A5234F38706E43004489E061AC40A2E"
|
|
// },
|
|
// "error": ""
|
|
// }
|
|
// ```
|
|
//
|
|
// ### Query Parameters
|
|
//
|
|
// | Parameter | Type | Default | Required | Description |
|
|
// |-----------+------+---------+----------+-----------------|
|
|
// | tx | Tx | nil | true | The transaction |
|
|
func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
|
|
resCh := make(chan *abci.Response, 1)
|
|
err := mempool.CheckTx(tx, func(res *abci.Response) {
|
|
resCh <- res
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
res := <-resCh
|
|
r := res.GetCheckTx()
|
|
return &ctypes.ResultBroadcastTx{
|
|
Code: r.Code,
|
|
Data: r.Data,
|
|
Log: r.Log,
|
|
Hash: tx.Hash(),
|
|
}, nil
|
|
}
|
|
|
|
// CONTRACT: only returns error if mempool.CheckTx() errs or if we timeout
|
|
// waiting for tx to commit.
|
|
//
|
|
// If CheckTx or DeliverTx fail, no error will be returned, but the returned result
|
|
// will contain a non-OK ABCI code.
|
|
//
|
|
// ```shell
|
|
// curl 'localhost:26657/broadcast_tx_commit?tx="789"'
|
|
// ```
|
|
//
|
|
// ```go
|
|
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
|
|
// result, err := client.BroadcastTxCommit("789")
|
|
// ```
|
|
//
|
|
// > The above command returns JSON structured like this:
|
|
//
|
|
// ```json
|
|
// {
|
|
// "error": "",
|
|
// "result": {
|
|
// "height": "26682",
|
|
// "hash": "75CA0F856A4DA078FC4911580360E70CEFB2EBEE",
|
|
// "deliver_tx": {
|
|
// "log": "",
|
|
// "data": "",
|
|
// "code": "0"
|
|
// },
|
|
// "check_tx": {
|
|
// "log": "",
|
|
// "data": "",
|
|
// "code": "0"
|
|
// }
|
|
// },
|
|
// "id": "",
|
|
// "jsonrpc": "2.0"
|
|
// }
|
|
// ```
|
|
//
|
|
// ### Query Parameters
|
|
//
|
|
// | Parameter | Type | Default | Required | Description |
|
|
// |-----------+------+---------+----------+-----------------|
|
|
// | tx | Tx | nil | true | The transaction |
|
|
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
|
|
// Subscribe to tx being committed in block.
|
|
ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout)
|
|
defer cancel()
|
|
deliverTxResCh := make(chan interface{}, 1)
|
|
q := types.EventQueryTxFor(tx)
|
|
err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh)
|
|
if err != nil {
|
|
err = errors.Wrap(err, "failed to subscribe to tx")
|
|
logger.Error("Error on broadcast_tx_commit", "err", err)
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
// drain deliverTxResCh to make sure we don't block
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case <-deliverTxResCh:
|
|
default:
|
|
break LOOP
|
|
}
|
|
}
|
|
eventBus.Unsubscribe(context.Background(), "mempool", q)
|
|
}()
|
|
|
|
// Broadcast tx and wait for CheckTx result
|
|
checkTxResCh := make(chan *abci.Response, 1)
|
|
err = mempool.CheckTx(tx, func(res *abci.Response) {
|
|
checkTxResCh <- res
|
|
})
|
|
if err != nil {
|
|
logger.Error("Error on broadcastTxCommit", "err", err)
|
|
return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err)
|
|
}
|
|
checkTxResMsg := <-checkTxResCh
|
|
checkTxRes := checkTxResMsg.GetCheckTx()
|
|
if checkTxRes.Code != abci.CodeTypeOK {
|
|
return &ctypes.ResultBroadcastTxCommit{
|
|
CheckTx: *checkTxRes,
|
|
DeliverTx: abci.ResponseDeliverTx{},
|
|
Hash: tx.Hash(),
|
|
}, nil
|
|
}
|
|
|
|
// Wait for the tx to be included in a block or timeout.
|
|
// TODO: configurable?
|
|
var deliverTxTimeout = rpcserver.WriteTimeout / 2
|
|
select {
|
|
case deliverTxResMsg, ok := <-deliverTxResCh: // The tx was included in a block.
|
|
if !ok {
|
|
return nil, errors.New("Error on broadcastTxCommit: expected DeliverTxResult, got nil. Did the Tendermint stop?")
|
|
}
|
|
deliverTxRes := deliverTxResMsg.(types.EventDataTx)
|
|
return &ctypes.ResultBroadcastTxCommit{
|
|
CheckTx: *checkTxRes,
|
|
DeliverTx: deliverTxRes.Result,
|
|
Hash: tx.Hash(),
|
|
Height: deliverTxRes.Height,
|
|
}, nil
|
|
case <-time.After(deliverTxTimeout):
|
|
err = errors.New("Timed out waiting for tx to be included in a block")
|
|
logger.Error("Error on broadcastTxCommit", "err", err)
|
|
return &ctypes.ResultBroadcastTxCommit{
|
|
CheckTx: *checkTxRes,
|
|
DeliverTx: abci.ResponseDeliverTx{},
|
|
Hash: tx.Hash(),
|
|
}, err
|
|
}
|
|
}
|
|
|
|
// Get unconfirmed transactions (maximum ?limit entries) including their number.
|
|
//
|
|
// ```shell
|
|
// curl 'localhost:26657/unconfirmed_txs'
|
|
// ```
|
|
//
|
|
// ```go
|
|
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
|
|
// result, err := client.UnconfirmedTxs()
|
|
// ```
|
|
//
|
|
// > The above command returns JSON structured like this:
|
|
//
|
|
// ```json
|
|
// {
|
|
// "error": "",
|
|
// "result": {
|
|
// "txs": [],
|
|
// "n_txs": "0"
|
|
// },
|
|
// "id": "",
|
|
// "jsonrpc": "2.0"
|
|
// }
|
|
//
|
|
// ### Query Parameters
|
|
//
|
|
// | Parameter | Type | Default | Required | Description |
|
|
// |-----------+------+---------+----------+--------------------------------------|
|
|
// | limit | int | 30 | false | Maximum number of entries (max: 100) |
|
|
// ```
|
|
func UnconfirmedTxs(limit int) (*ctypes.ResultUnconfirmedTxs, error) {
|
|
// reuse per_page validator
|
|
limit = validatePerPage(limit)
|
|
|
|
txs := mempool.ReapMaxTxs(limit)
|
|
return &ctypes.ResultUnconfirmedTxs{len(txs), txs}, nil
|
|
}
|
|
|
|
// Get number of unconfirmed transactions.
|
|
//
|
|
// ```shell
|
|
// curl 'localhost:26657/num_unconfirmed_txs'
|
|
// ```
|
|
//
|
|
// ```go
|
|
// client := client.NewHTTP("tcp://0.0.0.0:26657", "/websocket")
|
|
// result, err := client.UnconfirmedTxs()
|
|
// ```
|
|
//
|
|
// > The above command returns JSON structured like this:
|
|
//
|
|
// ```json
|
|
// {
|
|
// "error": "",
|
|
// "result": {
|
|
// "txs": null,
|
|
// "n_txs": "0"
|
|
// },
|
|
// "id": "",
|
|
// "jsonrpc": "2.0"
|
|
// }
|
|
// ```
|
|
func NumUnconfirmedTxs() (*ctypes.ResultUnconfirmedTxs, error) {
|
|
return &ctypes.ResultUnconfirmedTxs{N: mempool.Size()}, nil
|
|
}
|