diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index e5f5aba7..9f0a585e 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -12,6 +12,8 @@ import ( "github.com/tendermint/tendermint/types" ) +var waitForEventTimeout = 5 * time.Second + // MakeTxKV returns a text transaction, allong with expected key, value pair func MakeTxKV() ([]byte, []byte, []byte) { k := []byte(cmn.RandStr(8)) @@ -32,7 +34,7 @@ func TestHeaderEvents(t *testing.T) { } evtTyp := types.EventNewBlockHeader - evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) + evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(err, "%d: %+v", i, err) _, ok := evt.Unwrap().(types.EventDataNewBlockHeader) require.True(ok, "%d: %#v", i, evt) @@ -56,7 +58,7 @@ func TestBlockEvents(t *testing.T) { var firstBlockHeight int for j := 0; j < 3; j++ { evtTyp := types.EventNewBlock - evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) + evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(err, "%d: %+v", j, err) blockEvent, ok := evt.Unwrap().(types.EventDataNewBlock) require.True(ok, "%d: %#v", j, evt) @@ -94,7 +96,7 @@ func TestTxEventsSentWithBroadcastTxAsync(t *testing.T) { require.True(txres.Code.IsOK()) // and wait for confirmation - evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) + evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(err, "%d: %+v", i, err) // and make sure it has the proper info txe, ok := evt.Unwrap().(types.EventDataTx) @@ -127,7 +129,7 @@ func TestTxEventsSentWithBroadcastTxSync(t *testing.T) { require.True(txres.Code.IsOK()) // and wait for confirmation - evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) + evt, err := client.WaitForOneEvent(c, evtTyp, waitForEventTimeout) require.Nil(err, "%d: %+v", i, err) // and make sure it has the proper info txe, ok := evt.Unwrap().(types.EventDataTx) diff --git a/rpc/core/events.go b/rpc/core/events.go index af224a6b..81f1c919 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -2,7 +2,6 @@ package core import ( "context" - "time" "github.com/pkg/errors" @@ -53,7 +52,7 @@ func Subscribe(wsCtx rpctypes.WSRPCContext, query string) (*ctypes.ResultSubscri return nil, errors.Wrap(err, "failed to add subscription") } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() ch := make(chan interface{}) err = eventBus.Subscribe(ctx, addr, q, ch) diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 46204ebf..382b2f55 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -151,7 +151,7 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // | tx | Tx | nil | true | The transaction | func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // subscribe to tx being committed in block - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() deliverTxResCh := make(chan interface{}) q := types.EventQueryTx(tx) diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index cbe6cc42..0f3f7472 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -1,6 +1,8 @@ package core import ( + "time" + crypto "github.com/tendermint/go-crypto" "github.com/tendermint/tendermint/consensus" cstypes "github.com/tendermint/tendermint/consensus/types" @@ -12,6 +14,8 @@ import ( "github.com/tendermint/tmlibs/log" ) +var subscribeTimeout = 5 * time.Second + //---------------------------------------------- // These interfaces are used by RPC and must be thread safe diff --git a/rpc/lib/client/ws_client_test.go b/rpc/lib/client/ws_client_test.go index 190cbcdc..3a0632e3 100644 --- a/rpc/lib/client/ws_client_test.go +++ b/rpc/lib/client/ws_client_test.go @@ -17,6 +17,8 @@ import ( types "github.com/tendermint/tendermint/rpc/lib/types" ) +var wsCallTimeout = 5 * time.Second + type myHandler struct { closeConnAfterRead bool mtx sync.RWMutex @@ -138,7 +140,7 @@ func TestWSClientReconnectFailure(t *testing.T) { // results in WS write error // provide timeout to avoid blocking - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(context.Background(), wsCallTimeout) defer cancel() c.Call(ctx, "a", make(map[string]interface{}))