diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index 863d23d4..f43a47e9 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -6,14 +6,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - events "github.com/tendermint/go-events" + merktest "github.com/tendermint/merkleeyes/testutil" + "github.com/tendermint/tendermint/rpc/client" "github.com/tendermint/tendermint/types" ) -func TestEvents(t *testing.T) { +func TestHeaderEvents(t *testing.T) { require := require.New(t) for i, c := range GetClients() { - // for i, c := range []client.Client{getLocalClient()} { // test if this client implements event switch as well. evsw, ok := c.(types.EventSwitch) if !assert.True(t, ok, "%d: %v", i, c) { @@ -29,31 +29,50 @@ func TestEvents(t *testing.T) { defer evsw.Stop() } - // let's wait for the next header... - listener := "fooz" - event, timeout := make(chan events.EventData, 10), make(chan bool, 1) - // start timeout count-down - go func() { - time.Sleep(1 * time.Second) - timeout <- true - }() - - // register for the next header event evtTyp := types.EventStringNewBlockHeader() - evsw.AddListenerForEvent(listener, evtTyp, func(data events.EventData) { - event <- data - }) - // make sure to unregister after the test is over - // TODO: don't require both! - defer evsw.RemoveListenerForEvent(listener, evtTyp) - defer evsw.RemoveListener(listener) - - select { - case <-timeout: - require.True(false, "%d: a timeout waiting for event", i) - case evt := <-event: - _, ok := evt.(types.EventDataNewBlockHeader) - require.True(ok, "%d: %#v", i, evt) - } + evt, err := client.WaitForOneEvent(evsw, evtTyp, 1*time.Second) + require.Nil(err, "%d: %+v", i, err) + _, ok = evt.(types.EventDataNewBlockHeader) + require.True(ok, "%d: %#v", i, evt) + // TODO: more checks... + } +} + +func TestTxEvents(t *testing.T) { + require := require.New(t) + for i, c := range GetClients() { + // test if this client implements event switch as well. + evsw, ok := c.(types.EventSwitch) + if !assert.True(t, ok, "%d: %v", i, c) { + continue + } + + // start for this test it if it wasn't already running + if !evsw.IsRunning() { + // if so, then we start it, listen, and stop it. + st, err := evsw.Start() + require.Nil(err, "%d: %+v", i, err) + require.True(st, "%d", i) + defer evsw.Stop() + } + + // make the tx + _, _, tx := merktest.MakeTxKV() + evtTyp := types.EventStringTx(types.Tx(tx)) + + // send async + txres, err := c.BroadcastTxAsync(tx) + require.Nil(err, "%+v", err) + require.True(txres.Code.IsOK()) + + // and wait for confirmation + evt, err := client.WaitForOneEvent(evsw, evtTyp, 1*time.Second) + require.Nil(err, "%d: %+v", i, err) + // and make sure it has the proper info + txe, ok := evt.(types.EventDataTx) + require.True(ok, "%d: %#v", i, evt) + // make sure this is the proper tx + require.EqualValues(tx, txe.Tx) + require.True(txe.Code.IsOK()) } } diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index bf8860e7..5af1fe20 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -4,6 +4,8 @@ import ( "time" "github.com/pkg/errors" + cmn "github.com/tendermint/go-common" + events "github.com/tendermint/go-events" ) // Waiter is informed of current height, decided whether to quit early @@ -47,3 +49,36 @@ func WaitForHeight(c StatusClient, h int, waiter Waiter) error { } return nil } + +// WaitForOneEvent subscribes to a websocket event for the given +// event time and returns upon receiving it one time, or +// when the timeout duration has expired. +// +// This handles subscribing and unsubscribing under the hood +func WaitForOneEvent(evsw events.EventSwitch, + evtTyp string, timeout time.Duration) (events.EventData, error) { + listener := cmn.RandStr(12) + + evts, quit := make(chan events.EventData, 10), make(chan bool, 1) + // start timeout count-down + go func() { + time.Sleep(1 * time.Second) + quit <- true + }() + + // register for the next event of this type + evsw.AddListenerForEvent(listener, evtTyp, func(data events.EventData) { + evts <- data + }) + // make sure to unregister after the test is over + // TODO: don't require both! + defer evsw.RemoveListenerForEvent(listener, evtTyp) + defer evsw.RemoveListener(listener) + + select { + case <-quit: + return nil, errors.New("timed out waiting for event") + case evt := <-evts: + return evt, nil + } +} diff --git a/rpc/client/httpclient.go b/rpc/client/httpclient.go index f07775eb..4eb94f5a 100644 --- a/rpc/client/httpclient.go +++ b/rpc/client/httpclient.go @@ -173,6 +173,7 @@ type WSEvents struct { endpoint string ws *rpcclient.WSClient quit chan bool + done chan bool } func newWSEvents(remote, endpoint string) *WSEvents { @@ -181,6 +182,7 @@ func newWSEvents(remote, endpoint string) *WSEvents { endpoint: endpoint, remote: remote, quit: make(chan bool, 1), + done: make(chan bool, 1), } } @@ -211,7 +213,9 @@ func (w *WSEvents) Stop() bool { if stop { // send a message to quit to stop the eventListener w.quit <- true + <-w.done w.ws.Stop() + w.ws = nil } return stop } @@ -249,7 +253,9 @@ func (w *WSEvents) eventListener() { // FIXME: better logging/handling of errors?? fmt.Printf("ws err: %+v\n", err) case <-w.quit: - // only way to finish this method + // send a message so we can wait for the routine to exit + // before cleaning up the w.ws stuff + w.done <- true return } } diff --git a/rpc/client/rpc_test.go b/rpc/client/rpc_test.go index 4c9e5abf..11c7f726 100644 --- a/rpc/client/rpc_test.go +++ b/rpc/client/rpc_test.go @@ -179,59 +179,3 @@ func TestAppCalls(t *testing.T) { } } } - -// TestSubscriptions only works for HTTPClient -// -// TODO: generalize this functionality -> Local and Client -// func TestSubscriptions(t *testing.T) { -// require := require.New(t) -// c := getHTTPClient() -// err := c.StartWebsocket() -// require.Nil(err) -// defer c.StopWebsocket() - -// // subscribe to a transaction event -// _, _, tx := merktest.MakeTxKV() -// eventType := types.EventStringTx(types.Tx(tx)) -// c.Subscribe(eventType) - -// // set up a listener -// r, e := c.GetEventChannels() -// go func() { -// // send a tx and wait for it to propogate -// _, err = c.BroadcastTxCommit(tx) -// require.Nil(err, string(tx)) -// }() - -// checkData := func(data []byte, kind byte) { -// x := []interface{}{} -// err := json.Unmarshal(data, &x) -// require.Nil(err) -// // gotta love wire's json format -// require.EqualValues(kind, x[0]) -// } - -// res := <-r -// checkData(res, ctypes.ResultTypeSubscribe) - -// // read one event, must be success -// select { -// case res := <-r: -// checkData(res, ctypes.ResultTypeEvent) -// // this is good.. let's get the data... ugh... -// // result := new(ctypes.TMResult) -// // wire.ReadJSON(result, res, &err) -// // require.Nil(err, "%+v", err) -// // event, ok := (*result).(*ctypes.ResultEvent) -// // require.True(ok) -// // assert.Equal("foo", event.Name) -// // data, ok := event.Data.(types.EventDataTx) -// // require.True(ok) -// // assert.EqualValues(0, data.Code) -// // assert.EqualValues(tx, data.Tx) -// case err := <-e: -// // this is a failure -// require.Nil(err) -// } - -// }