diff --git a/consensus/common_test.go b/consensus/common_test.go index 469fc9c6..1443232a 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -206,7 +206,7 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { go func() { for { v := <-voteCh0 - vote := v.(types.EventDataVote) + vote := v.(types.TMEventData).Unwrap().(types.EventDataVote) // we only fire for our own votes if bytes.Equal(addr, vote.Vote.ValidatorAddress) { voteCh <- v diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 8808185d..15e6a155 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -44,7 +44,7 @@ func TestTxConcurrentWithCommit(t *testing.T) { for nTxs := 0; nTxs < NTxs; { select { case b := <-newBlockCh: - nTxs += b.(types.EventDataNewBlock).Block.Header.NumTxs + nTxs += b.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block.Header.NumTxs case <-ticker.C: panic("Timed out waiting to commit blocks with transactions") } diff --git a/consensus/reactor.go b/consensus/reactor.go index 5fe45cc0..5734298e 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -8,11 +8,11 @@ import ( "sync" "time" - . "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tendermint/p2p" "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/p2p" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" + . "github.com/tendermint/tmlibs/common" ) const ( @@ -299,12 +299,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) { func (conR *ConsensusReactor) registerEventCallbacks() { types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) { - rs := data.(types.EventDataRoundState).RoundState.(*RoundState) + rs := data.Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) conR.broadcastNewRoundStep(rs) }) types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) { - edv := data.(types.EventDataVote) + edv := data.Unwrap().(types.EventDataVote) conR.broadcastHasVoteMessage(edv.Vote) }) } diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index 66827c15..fdf59015 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -8,10 +8,10 @@ import ( "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/tmlibs/events" + "github.com/tendermint/abci/example/dummy" "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/types" - "github.com/tendermint/abci/example/dummy" + "github.com/tendermint/tmlibs/events" ) func init() { @@ -252,7 +252,7 @@ func TestReactorWithTimeoutCommit(t *testing.T) { func waitForAndValidateBlock(t *testing.T, n int, activeVals map[string]struct{}, eventChans []chan interface{}, css []*ConsensusState, txs ...[]byte) { timeoutWaitGroup(t, n, func(wg *sync.WaitGroup, j int) { newBlockI := <-eventChans[j] - newBlock := newBlockI.(types.EventDataNewBlock).Block + newBlock := newBlockI.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block log.Warn("Got block", "height", newBlock.Height, "validator", j) err := validateBlock(newBlock, activeVals) if err != nil { diff --git a/consensus/state_test.go b/consensus/state_test.go index d2d34e3a..2d5ed410 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -6,9 +6,9 @@ import ( "testing" "time" - . "github.com/tendermint/tmlibs/common" "github.com/tendermint/tendermint/config/tendermint_test" "github.com/tendermint/tendermint/types" + . "github.com/tendermint/tmlibs/common" ) func init() { @@ -248,7 +248,7 @@ func TestFullRound1(t *testing.T) { // grab proposal re := <-propCh - propBlockHash := re.(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash() + propBlockHash := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState).ProposalBlock.Hash() <-voteCh // wait for prevote // NOTE: voteChan cap of 0 ensures we can complete this @@ -345,7 +345,7 @@ func TestLockNoPOL(t *testing.T) { cs1.startRoutines(0) re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*RoundState) + rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -385,7 +385,7 @@ func TestLockNoPOL(t *testing.T) { // now we're on a new round and not the proposer, so wait for timeout re = <-timeoutProposeCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) if rs.ProposalBlock != nil { panic("Expected proposal block to be nil") @@ -429,7 +429,7 @@ func TestLockNoPOL(t *testing.T) { incrementRound(vs2) re = <-proposalCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) // now we're on a new round and are the proposer if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { @@ -518,7 +518,7 @@ func TestLockPOLRelock(t *testing.T) { <-newRoundCh re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*RoundState) + rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -589,9 +589,9 @@ func TestLockPOLRelock(t *testing.T) { _, _ = <-voteCh, <-voteCh be := <-newBlockCh - b := be.(types.EventDataNewBlockHeader) + b := be.(types.TMEventData).Unwrap().(types.EventDataNewBlockHeader) re = <-newRoundCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) if rs.Height != 2 { panic("Expected height to increment") } @@ -627,7 +627,7 @@ func TestLockPOLUnlock(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*RoundState) + rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) theBlockHash := rs.ProposalBlock.Hash() <-voteCh // prevote @@ -653,7 +653,7 @@ func TestLockPOLUnlock(t *testing.T) { // timeout to new round re = <-timeoutWaitCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) lockedBlockHash := rs.LockedBlock.Hash() //XXX: this isnt gauranteed to get there before the timeoutPropose ... @@ -713,7 +713,7 @@ func TestLockPOLSafety1(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*RoundState) + rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) propBlock := rs.ProposalBlock <-voteCh // prevote @@ -761,7 +761,7 @@ func TestLockPOLSafety1(t *testing.T) { re = <-proposalCh } - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) if rs.LockedBlock != nil { panic("we should not be locked!") @@ -1009,7 +1009,7 @@ func TestHalt1(t *testing.T) { startTestRound(cs1, cs1.Height, 0) <-newRoundCh re := <-proposalCh - rs := re.(types.EventDataRoundState).RoundState.(*RoundState) + rs := re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) propBlock := rs.ProposalBlock propBlockParts := propBlock.MakePartSet(partSize) @@ -1032,7 +1032,7 @@ func TestHalt1(t *testing.T) { // timeout to new round <-timeoutWaitCh re = <-newRoundCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) log.Notice("### ONTO ROUND 1") /*Round2 @@ -1050,7 +1050,7 @@ func TestHalt1(t *testing.T) { // receiving that precommit should take us straight to commit <-newBlockCh re = <-newRoundCh - rs = re.(types.EventDataRoundState).RoundState.(*RoundState) + rs = re.(types.TMEventData).Unwrap().(types.EventDataRoundState).RoundState.(*RoundState) if rs.Height != 2 { panic("expected height to increment") diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index cc421ad9..1b99854c 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -25,7 +25,7 @@ func TestHeaderEvents(t *testing.T) { evtTyp := types.EventStringNewBlockHeader() evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) require.Nil(err, "%d: %+v", i, err) - _, ok := evt.(types.EventDataNewBlockHeader) + _, ok := evt.Unwrap().(types.EventDataNewBlockHeader) require.True(ok, "%d: %#v", i, evt) // TODO: more checks... } @@ -56,7 +56,7 @@ func TestTxEvents(t *testing.T) { evt, err := client.WaitForOneEvent(c, evtTyp, 1*time.Second) require.Nil(err, "%d: %+v", i, err) // and make sure it has the proper info - txe, ok := evt.(types.EventDataTx) + txe, ok := evt.Unwrap().(types.EventDataTx) require.True(ok, "%d: %#v", i, evt) // make sure this is the proper tx require.EqualValues(tx, txe.Tx) diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 330bcd19..bc26ea57 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -4,9 +4,9 @@ import ( "time" "github.com/pkg/errors" + "github.com/tendermint/tendermint/types" cmn "github.com/tendermint/tmlibs/common" events "github.com/tendermint/tmlibs/events" - "github.com/tendermint/tendermint/types" ) // Waiter is informed of current height, decided whether to quit early @@ -77,12 +77,12 @@ func WaitForOneEvent(evsw types.EventSwitch, select { case <-quit: - return nil, errors.New("timed out waiting for event") + return types.TMEventData{}, errors.New("timed out waiting for event") case evt := <-evts: tmevt, ok := evt.(types.TMEventData) if ok { return tmevt, nil } - return nil, errors.Errorf("Got unexpected event type: %#v", evt) + return types.TMEventData{}, errors.Errorf("Got unexpected event type: %#v", evt) } } diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index 4c1b54cf..a6e7093f 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -50,7 +50,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // subscribe to tx being committed in block deliverTxResCh := make(chan types.EventDataTx, 1) types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) { - deliverTxResCh <- data.(types.EventDataTx) + deliverTxResCh <- data.Unwrap().(types.EventDataTx) }) // broadcast the tx and register checktx callback diff --git a/rpc/lib/rpc_test.go b/rpc/lib/rpc_test.go index 2f61f388..9cbdc29a 100644 --- a/rpc/lib/rpc_test.go +++ b/rpc/lib/rpc_test.go @@ -208,12 +208,10 @@ func testWithHTTPClient(t *testing.T, cl client.HTTPClient) { require.Nil(t, err) assert.Equal(t, got3, val3) - /* - val4 := rand.Intn(10000) - got4, err := echoIntViaHTTP(cl, val4) - require.Nil(t, err) - assert.Equal(t, got4, val4) - */ + val4 := rand.Intn(10000) + got4, err := echoIntViaHTTP(cl, val4) + require.Nil(t, err) + assert.Equal(t, got4, val4) } func echoViaWS(cl *client.WSClient, val string) (string, error) { diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index c9a922d7..8e636c33 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" abci "github.com/tendermint/abci/types" + "github.com/tendermint/go-wire/data" "github.com/tendermint/tendermint/rpc/core" ctypes "github.com/tendermint/tendermint/rpc/core/types" rpc "github.com/tendermint/tendermint/rpc/lib/client" @@ -116,7 +117,7 @@ func testABCIQuery(t *testing.T, client rpc.HTTPClient) { time.Sleep(time.Millisecond * 500) tmResult := new(ctypes.TMResult) _, err := client.Call("abci_query", - map[string]interface{}{"path": "", "data": k, "prove": false}, tmResult) + map[string]interface{}{"path": "", "data": data.Bytes(k), "prove": false}, tmResult) require.Nil(t, err) resQuery := tmResult.Unwrap().(*ctypes.ResultABCIQuery) @@ -286,7 +287,7 @@ func TestWSBlockchainGrowth(t *testing.T) { var initBlockN int for i := 0; i < 3; i++ { waitForEvent(t, wsc, eid, true, func() {}, func(eid string, eventData interface{}) error { - block := eventData.(types.EventDataNewBlock).Block + block := eventData.(types.TMEventData).Unwrap().(types.EventDataNewBlock).Block if i == 0 { initBlockN = block.Header.Height } else { @@ -320,7 +321,7 @@ func TestWSTxEvent(t *testing.T) { require.Nil(err) waitForEvent(t, wsc, eid, true, func() {}, func(eid string, b interface{}) error { - evt, ok := b.(types.EventDataTx) + evt, ok := b.(types.TMEventData).Unwrap().(types.EventDataTx) require.True(ok, "Got wrong event type: %#v", b) require.Equal(tx, []byte(evt.Tx), "Returned different tx") require.Equal(abci.CodeType_OK, evt.Code) diff --git a/rpc/test/helpers.go b/rpc/test/helpers.go index 76d04f51..2b01d0a6 100644 --- a/rpc/test/helpers.go +++ b/rpc/test/helpers.go @@ -131,9 +131,11 @@ func waitForEvent(t *testing.T, wsc *client.WSClient, eventid string, dieOnTimeo for { select { case r := <-wsc.ResultsCh: + fmt.Println("GOT IT", string(r)) result := new(ctypes.TMResult) err = json.Unmarshal(r, result) if err != nil { + fmt.Println("POOP", err) errCh <- err break LOOP } diff --git a/types/events.go b/types/events.go index 953859d0..8c29c444 100644 --- a/types/events.go +++ b/types/events.go @@ -3,7 +3,6 @@ package types import ( // for registering TMEventData as events.EventData abci "github.com/tendermint/abci/types" - "github.com/tendermint/go-wire" "github.com/tendermint/go-wire/data" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/events" @@ -34,10 +33,47 @@ func EventStringVote() string { return "Vote" } //---------------------------------------- +var ( + EventDataNameNewBlock = "new_block" + EventDataNameNewBlockHeader = "new_block_header" + EventDataNameTx = "tx" + EventDataNameRoundState = "round_state" + EventDataNameVote = "vote" +) + +//---------------------------------------- + // implements events.EventData -type TMEventData interface { +type TMEventDataInner interface { events.EventData - AssertIsTMEventData() +} + +type TMEventData struct { + TMEventDataInner `json:"unwrap"` +} + +func (tmr TMEventData) MarshalJSON() ([]byte, error) { + return tmEventDataMapper.ToJSON(tmr.TMEventDataInner) +} + +func (tmr *TMEventData) UnmarshalJSON(data []byte) (err error) { + parsed, err := tmEventDataMapper.FromJSON(data) + if err == nil && parsed != nil { + tmr.TMEventDataInner = parsed.(TMEventDataInner) + } + return +} + +func (tmr TMEventData) Unwrap() TMEventDataInner { + tmrI := tmr.TMEventDataInner + for wrap, ok := tmrI.(TMEventData); ok; wrap, ok = tmrI.(TMEventData) { + tmrI = wrap.TMEventDataInner + } + return tmrI +} + +func (tmr TMEventData) Empty() bool { + return tmr.TMEventDataInner == nil } const ( @@ -50,15 +86,12 @@ const ( EventDataTypeVote = byte(0x12) ) -var _ = wire.RegisterInterface( - struct{ TMEventData }{}, - wire.ConcreteType{EventDataNewBlock{}, EventDataTypeNewBlock}, - wire.ConcreteType{EventDataNewBlockHeader{}, EventDataTypeNewBlockHeader}, - // wire.ConcreteType{EventDataFork{}, EventDataTypeFork }, - wire.ConcreteType{EventDataTx{}, EventDataTypeTx}, - wire.ConcreteType{EventDataRoundState{}, EventDataTypeRoundState}, - wire.ConcreteType{EventDataVote{}, EventDataTypeVote}, -) +var tmEventDataMapper = data.NewMapper(TMEventData{}). + RegisterImplementation(EventDataNewBlock{}, EventDataNameNewBlock, EventDataTypeNewBlock). + RegisterImplementation(EventDataNewBlockHeader{}, EventDataNameNewBlockHeader, EventDataTypeNewBlockHeader). + RegisterImplementation(EventDataTx{}, EventDataNameTx, EventDataTypeTx). + RegisterImplementation(EventDataRoundState{}, EventDataNameRoundState, EventDataTypeRoundState). + RegisterImplementation(EventDataVote{}, EventDataNameVote, EventDataTypeVote) // Most event messages are basic types (a block, a transaction) // but some (an input to a call tx or a receive) are more exotic @@ -147,55 +180,55 @@ func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEven //--- block, tx, and vote events func FireEventNewBlock(fireable events.Fireable, block EventDataNewBlock) { - fireEvent(fireable, EventStringNewBlock(), block) + fireEvent(fireable, EventStringNewBlock(), TMEventData{block}) } func FireEventNewBlockHeader(fireable events.Fireable, header EventDataNewBlockHeader) { - fireEvent(fireable, EventStringNewBlockHeader(), header) + fireEvent(fireable, EventStringNewBlockHeader(), TMEventData{header}) } func FireEventVote(fireable events.Fireable, vote EventDataVote) { - fireEvent(fireable, EventStringVote(), vote) + fireEvent(fireable, EventStringVote(), TMEventData{vote}) } func FireEventTx(fireable events.Fireable, tx EventDataTx) { - fireEvent(fireable, EventStringTx(tx.Tx), tx) + fireEvent(fireable, EventStringTx(tx.Tx), TMEventData{tx}) } //--- EventDataRoundState events func FireEventNewRoundStep(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringNewRoundStep(), rs) + fireEvent(fireable, EventStringNewRoundStep(), TMEventData{rs}) } func FireEventTimeoutPropose(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringTimeoutPropose(), rs) + fireEvent(fireable, EventStringTimeoutPropose(), TMEventData{rs}) } func FireEventTimeoutWait(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringTimeoutWait(), rs) + fireEvent(fireable, EventStringTimeoutWait(), TMEventData{rs}) } func FireEventNewRound(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringNewRound(), rs) + fireEvent(fireable, EventStringNewRound(), TMEventData{rs}) } func FireEventCompleteProposal(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringCompleteProposal(), rs) + fireEvent(fireable, EventStringCompleteProposal(), TMEventData{rs}) } func FireEventPolka(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringPolka(), rs) + fireEvent(fireable, EventStringPolka(), TMEventData{rs}) } func FireEventUnlock(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringUnlock(), rs) + fireEvent(fireable, EventStringUnlock(), TMEventData{rs}) } func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringRelock(), rs) + fireEvent(fireable, EventStringRelock(), TMEventData{rs}) } func FireEventLock(fireable events.Fireable, rs EventDataRoundState) { - fireEvent(fireable, EventStringLock(), rs) + fireEvent(fireable, EventStringLock(), TMEventData{rs}) }