diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index ea3b9759..ea7d97c9 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -26,3 +26,6 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES: +- [abci] unlock mutex in localClient so even when app panics (e.g. during CheckTx), consensus continue working +- [abci] fix DATA RACE in localClient +- [rpc] drain channel before calling Unsubscribe(All) in /broadcast_tx_commit diff --git a/abci/client/client.go b/abci/client/client.go index 55858810..e1eea5d4 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -105,8 +105,8 @@ func (reqRes *ReqRes) SetCallback(cb func(res *types.Response)) { return } - defer reqRes.mtx.Unlock() reqRes.cb = cb + reqRes.mtx.Unlock() } func (reqRes *ReqRes) GetCallback() func(*types.Response) { diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 4f37b17b..d5cd233a 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -111,8 +111,8 @@ func (cli *grpcClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *grpcClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() - defer cli.mtx.Unlock() cli.resCb = resCb + cli.mtx.Unlock() } //---------------------------------------- diff --git a/abci/client/local_client.go b/abci/client/local_client.go index 3ac3b6af..d0e50c33 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -9,8 +9,13 @@ import ( var _ Client = (*localClient)(nil) +// NOTE: use defer to unlock mutex because Application might panic (e.g., in +// case of malicious tx or query). It only makes sense for publicly exposed +// methods like CheckTx (/broadcast_tx_* RPC endpoint) or Query (/abci_query +// RPC endpoint), but defers are used everywhere for the sake of consistency. type localClient struct { cmn.BaseService + mtx *sync.Mutex types.Application Callback @@ -30,8 +35,8 @@ func NewLocalClient(mtx *sync.Mutex, app types.Application) *localClient { func (app *localClient) SetResponseCallback(cb Callback) { app.mtx.Lock() - defer app.mtx.Unlock() app.Callback = cb + app.mtx.Unlock() } // TODO: change types.Application to include Error()? @@ -45,6 +50,9 @@ func (app *localClient) FlushAsync() *ReqRes { } func (app *localClient) EchoAsync(msg string) *ReqRes { + app.mtx.Lock() + defer app.mtx.Unlock() + return app.callback( types.ToRequestEcho(msg), types.ToResponseEcho(msg), @@ -53,8 +61,9 @@ func (app *localClient) EchoAsync(msg string) *ReqRes { func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Info(req) - app.mtx.Unlock() return app.callback( types.ToRequestInfo(req), types.ToResponseInfo(res), @@ -63,8 +72,9 @@ func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes { func (app *localClient) SetOptionAsync(req types.RequestSetOption) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.SetOption(req) - app.mtx.Unlock() return app.callback( types.ToRequestSetOption(req), types.ToResponseSetOption(res), @@ -73,8 +83,9 @@ func (app *localClient) SetOptionAsync(req types.RequestSetOption) *ReqRes { func (app *localClient) DeliverTxAsync(tx []byte) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.DeliverTx(tx) - app.mtx.Unlock() return app.callback( types.ToRequestDeliverTx(tx), types.ToResponseDeliverTx(res), @@ -83,8 +94,9 @@ func (app *localClient) DeliverTxAsync(tx []byte) *ReqRes { func (app *localClient) CheckTxAsync(tx []byte) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.CheckTx(tx) - app.mtx.Unlock() return app.callback( types.ToRequestCheckTx(tx), types.ToResponseCheckTx(res), @@ -93,8 +105,9 @@ func (app *localClient) CheckTxAsync(tx []byte) *ReqRes { func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Query(req) - app.mtx.Unlock() return app.callback( types.ToRequestQuery(req), types.ToResponseQuery(res), @@ -103,8 +116,9 @@ func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes { func (app *localClient) CommitAsync() *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Commit() - app.mtx.Unlock() return app.callback( types.ToRequestCommit(), types.ToResponseCommit(res), @@ -113,19 +127,20 @@ func (app *localClient) CommitAsync() *ReqRes { func (app *localClient) InitChainAsync(req types.RequestInitChain) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.InitChain(req) - reqRes := app.callback( + return app.callback( types.ToRequestInitChain(req), types.ToResponseInitChain(res), ) - app.mtx.Unlock() - return reqRes } func (app *localClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.BeginBlock(req) - app.mtx.Unlock() return app.callback( types.ToRequestBeginBlock(req), types.ToResponseBeginBlock(res), @@ -134,8 +149,9 @@ func (app *localClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes { func (app *localClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.EndBlock(req) - app.mtx.Unlock() return app.callback( types.ToRequestEndBlock(req), types.ToResponseEndBlock(res), @@ -154,64 +170,73 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) { func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Info(req) - app.mtx.Unlock() return &res, nil } func (app *localClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.SetOption(req) - app.mtx.Unlock() return &res, nil } func (app *localClient) DeliverTxSync(tx []byte) (*types.ResponseDeliverTx, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.DeliverTx(tx) - app.mtx.Unlock() return &res, nil } func (app *localClient) CheckTxSync(tx []byte) (*types.ResponseCheckTx, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.CheckTx(tx) - app.mtx.Unlock() return &res, nil } func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Query(req) - app.mtx.Unlock() return &res, nil } func (app *localClient) CommitSync() (*types.ResponseCommit, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Commit() - app.mtx.Unlock() return &res, nil } func (app *localClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.InitChain(req) - app.mtx.Unlock() return &res, nil } func (app *localClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.BeginBlock(req) - app.mtx.Unlock() return &res, nil } func (app *localClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.EndBlock(req) - app.mtx.Unlock() return &res, nil } diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index affea1a9..531d12bc 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -118,8 +118,8 @@ func (cli *socketClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *socketClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() - defer cli.mtx.Unlock() cli.resCb = resCb + cli.mtx.Unlock() } //---------------------------------------- diff --git a/consensus/reactor.go b/consensus/reactor.go index 12e8e0f1..1768a8f0 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -183,7 +183,11 @@ func (conR *ConsensusReactor) RemovePeer(peer p2p.Peer, reason interface{}) { return } // TODO - //peer.Get(PeerStateKey).(*PeerState).Disconnect() + // ps, ok := peer.Get(PeerStateKey).(*PeerState) + // if !ok { + // panic(fmt.Sprintf("Peer %v has no state", peer)) + // } + // ps.Disconnect() } // Receive implements Reactor @@ -214,7 +218,10 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) // Get peer states - ps := src.Get(types.PeerStateKey).(*PeerState) + ps, ok := src.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", src)) + } switch chID { case StateChannel: @@ -293,9 +300,9 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) switch msg := msg.(type) { case *VoteMessage: cs := conR.conS - cs.mtx.Lock() + cs.mtx.RLock() height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() - cs.mtx.Unlock() + cs.mtx.RUnlock() ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) @@ -428,7 +435,10 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) { /* // TODO: Make this broadcast more selective. for _, peer := range conR.Switch.Peers().List() { - ps := peer.Get(PeerStateKey).(*PeerState) + ps, ok := peer.Get(PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", peer)) + } prs := ps.GetRoundState() if prs.Height == vote.Height { // TODO: Also filter on round? @@ -826,7 +836,10 @@ func (conR *ConsensusReactor) peerStatsRoutine() { continue } // Get peer state - ps := peer.Get(types.PeerStateKey).(*PeerState) + ps, ok := peer.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", peer)) + } switch msg.Msg.(type) { case *VoteMessage: if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 { @@ -859,7 +872,10 @@ func (conR *ConsensusReactor) StringIndented(indent string) string { s := "ConsensusReactor{\n" s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n" for _, peer := range conR.Switch.Peers().List() { - ps := peer.Get(types.PeerStateKey).(*PeerState) + ps, ok := peer.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", peer)) + } s += indent + " " + ps.StringIndented(indent+" ") + "\n" } s += indent + "}" diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 685eb71f..a326e70e 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -58,7 +58,18 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { if err != nil { return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep) } - defer cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) + defer func() { + // drain newStepCh to make sure we don't block + LOOP: + for { + select { + case <-newStepCh: + default: + break LOOP + } + } + cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) + }() // just open the file for reading, no need to use wal fp, err := os.OpenFile(file, os.O_RDONLY, 0600) @@ -221,7 +232,18 @@ func (pb *playback) replayConsoleLoop() int { if err != nil { cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)) } - defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) + defer func() { + // drain newStepCh to make sure we don't block + LOOP: + for { + select { + case <-newStepCh: + default: + break LOOP + } + } + pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) + }() if len(tokens) == 1 { if err := pb.replayReset(1, newStepCh); err != nil { diff --git a/consensus/state.go b/consensus/state.go index 8c2e292c..e8603011 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -207,18 +207,16 @@ func (cs *ConsensusState) GetState() sm.State { // GetLastHeight returns the last height committed. // If there were no blocks, returns 0. func (cs *ConsensusState) GetLastHeight() int64 { - cs.mtx.Lock() - defer cs.mtx.Unlock() - + cs.mtx.RLock() + defer cs.mtx.RUnlock() return cs.RoundState.Height - 1 } // GetRoundState returns a shallow copy of the internal consensus state. func (cs *ConsensusState) GetRoundState() *cstypes.RoundState { cs.mtx.RLock() - defer cs.mtx.RUnlock() - rs := cs.RoundState // copy + cs.mtx.RUnlock() return &rs } @@ -226,7 +224,6 @@ func (cs *ConsensusState) GetRoundState() *cstypes.RoundState { func (cs *ConsensusState) GetRoundStateJSON() ([]byte, error) { cs.mtx.RLock() defer cs.mtx.RUnlock() - return cdc.MarshalJSON(cs.RoundState) } @@ -234,7 +231,6 @@ func (cs *ConsensusState) GetRoundStateJSON() ([]byte, error) { func (cs *ConsensusState) GetRoundStateSimpleJSON() ([]byte, error) { cs.mtx.RLock() defer cs.mtx.RUnlock() - return cdc.MarshalJSON(cs.RoundState.RoundStateSimple()) } @@ -248,15 +244,15 @@ func (cs *ConsensusState) GetValidators() (int64, []*types.Validator) { // SetPrivValidator sets the private validator account for signing votes. func (cs *ConsensusState) SetPrivValidator(priv types.PrivValidator) { cs.mtx.Lock() - defer cs.mtx.Unlock() cs.privValidator = priv + cs.mtx.Unlock() } // SetTimeoutTicker sets the local timer. It may be useful to overwrite for testing. func (cs *ConsensusState) SetTimeoutTicker(timeoutTicker TimeoutTicker) { cs.mtx.Lock() - defer cs.mtx.Unlock() cs.timeoutTicker = timeoutTicker + cs.mtx.Unlock() } // LoadCommit loads the commit for a given height. diff --git a/evidence/reactor.go b/evidence/reactor.go index 32753b2b..48092fdf 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -160,12 +160,15 @@ func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) { // Returns the message to send the peer, or nil if the evidence is invalid for the peer. // If message is nil, return true if we should sleep and try again. func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evidence) (msg EvidenceMessage, retry bool) { - // make sure the peer is up to date evHeight := ev.Height() peerState, ok := peer.Get(types.PeerStateKey).(PeerState) - if !ok { - evR.Logger.Info("Found peer without PeerState", "peer", peer) + if !ok { + // Peer does not have a state yet. We set it in the consensus reactor, but + // when we add peer in Switch, the order we call reactors#AddPeer is + // different every time due to us using a map. Sometimes other reactors + // will be initialized before the consensus reactor. We should wait a few + // milliseconds and retry. return nil, true } diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 69dcdec5..1c4e731a 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -165,6 +165,16 @@ func TestReactorSelectiveBroadcast(t *testing.T) { // make reactors from statedb reactors := makeAndConnectEvidenceReactors(config, []dbm.DB{stateDB1, stateDB2}) + + // set the peer height on each reactor + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + ps := peerState{height1} + peer.Set(types.PeerStateKey, ps) + } + } + + // update the first reactor peer's height to be very small peer := reactors[0].Switch.Peers().List()[0] ps := peerState{height2} peer.Set(types.PeerStateKey, ps) diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 18f098d8..b4e392bb 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -30,9 +30,15 @@ // // s.Subscribe(ctx, sub, qry, out) // defer func() { -// for range out { -// // drain out to make sure we don't block -// } +// // drain out to make sure we don't block +// LOOP: +// for { +// select { +// case <-out: +// default: +// break LOOP +// } +// } // s.UnsubscribeAll(ctx, sub) // }() // for msg := range out { diff --git a/mempool/mempool.go b/mempool/mempool.go index b84eb4a6..0bdb4714 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -300,6 +300,7 @@ func (mem *Mempool) TxsWaitChan() <-chan struct{} { // CONTRACT: Either cb will get called, or err returned. func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { mem.proxyMtx.Lock() + // use defer to unlock mutex because application (*local client*) might panic defer mem.proxyMtx.Unlock() if mem.Size() >= mem.config.Size { diff --git a/mempool/reactor.go b/mempool/reactor.go index 96988be7..072f9667 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -133,16 +133,23 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { } memTx := next.Value.(*mempoolTx) + // make sure the peer is up to date - height := memTx.Height() - if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil { - peerState := peerState_i.(PeerState) - peerHeight := peerState.GetHeight() - if peerHeight < height-1 { // Allow for a lag of 1 block - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue - } + peerState, ok := peer.Get(types.PeerStateKey).(PeerState) + if !ok { + // Peer does not have a state yet. We set it in the consensus reactor, but + // when we add peer in Switch, the order we call reactors#AddPeer is + // different every time due to us using a map. Sometimes other reactors + // will be initialized before the consensus reactor. We should wait a few + // milliseconds and retry. + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + continue } + if peerState.GetHeight() < memTx.Height()-1 { // Allow for a lag of 1 block + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + continue + } + // send memTx msg := &TxMessage{Tx: memTx.tx} success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg)) diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 8ac400b0..ad9ad8b4 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -21,6 +21,14 @@ import ( "github.com/tendermint/tendermint/types" ) +type peerState struct { + height int64 +} + +func (ps peerState) GetHeight() int64 { + return ps.height +} + // mempoolLogger is a TestingLogger which uses a different // color for each validator ("validator" key must exist). func mempoolLogger() log.Logger { @@ -107,6 +115,11 @@ func TestReactorBroadcastTxMessage(t *testing.T) { r.Stop() } }() + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + peer.Set(types.PeerStateKey, peerState{1}) + } + } // send a bunch of txs to the first reactor's mempool // and wait for them all to be received in the others diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go index 405a4628..e2fcc043 100644 --- a/p2p/pex/addrbook.go +++ b/p2p/pex/addrbook.go @@ -162,10 +162,10 @@ func (a *addrBook) FilePath() string { // AddOurAddress one of our addresses. func (a *addrBook) AddOurAddress(addr *p2p.NetAddress) { - a.mtx.Lock() - defer a.mtx.Unlock() a.Logger.Info("Add our address to book", "addr", addr) + a.mtx.Lock() a.ourAddrs[addr.String()] = struct{}{} + a.mtx.Unlock() } // OurAddress returns true if it is our address. @@ -178,10 +178,10 @@ func (a *addrBook) OurAddress(addr *p2p.NetAddress) bool { func (a *addrBook) AddPrivateIDs(IDs []string) { a.mtx.Lock() - defer a.mtx.Unlock() for _, id := range IDs { a.privateIDs[p2p.ID(id)] = struct{}{} } + a.mtx.Unlock() } // AddAddress implements AddrBook @@ -202,7 +202,7 @@ func (a *addrBook) RemoveAddress(addr *p2p.NetAddress) { if ka == nil { return } - a.Logger.Info("Remove address from book", "addr", ka.Addr, "ID", ka.ID()) + a.Logger.Info("Remove address from book", "addr", addr) a.removeFromAllBuckets(ka) } @@ -217,8 +217,8 @@ func (a *addrBook) IsGood(addr *p2p.NetAddress) bool { // HasAddress returns true if the address is in the book. func (a *addrBook) HasAddress(addr *p2p.NetAddress) bool { a.mtx.Lock() - defer a.mtx.Unlock() ka := a.addrLookup[addr.ID] + a.mtx.Unlock() return ka != nil } @@ -461,13 +461,12 @@ ADDRS_LOOP: // ListOfKnownAddresses returns the new and old addresses. func (a *addrBook) ListOfKnownAddresses() []*knownAddress { - a.mtx.Lock() - defer a.mtx.Unlock() - addrs := []*knownAddress{} + a.mtx.Lock() for _, addr := range a.addrLookup { addrs = append(addrs, addr.copy()) } + a.mtx.Unlock() return addrs } diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 7e64d116..2e80a306 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -69,7 +69,18 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type } // make sure to unregister after the test is over - defer c.UnsubscribeAll(ctx, subscriber) + defer func() { + // drain evts to make sure we don't block + LOOP: + for { + select { + case <-evts: + default: + break LOOP + } + } + c.UnsubscribeAll(ctx, subscriber) + }() select { case evt := <-evts: diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 1c2619d5..14662885 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -193,7 +193,10 @@ func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { peers := p2pPeers.Peers().List() peerStates := make([]ctypes.PeerStateInfo, len(peers)) for i, peer := range peers { - peerState := peer.Get(types.PeerStateKey).(*cm.PeerState) + peerState, ok := peer.Get(types.PeerStateKey).(*cm.PeerState) + if !ok { // peer does not have a state yet + continue + } peerStateJSON, err := peerState.ToJSON() if err != nil { return nil, err diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index c015363a..59877492 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -8,7 +8,6 @@ import ( "github.com/pkg/errors" abci "github.com/tendermint/tendermint/abci/types" - cmn "github.com/tendermint/tendermint/libs/common" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -51,7 +50,7 @@ import ( func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := mempool.CheckTx(tx, nil) if err != nil { - return nil, fmt.Errorf("Error broadcasting transaction: %v", err) + return nil, err } return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil } @@ -94,7 +93,7 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh <- res }) if err != nil { - return nil, fmt.Errorf("Error broadcasting transaction: %v", err) + return nil, err } res := <-resCh r := res.GetCheckTx() @@ -106,8 +105,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { }, nil } -// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app) -// or if we timeout waiting for tx to commit. +// CONTRACT: only returns error if mempool.CheckTx() errs or if we timeout +// waiting for tx to commit. +// // If CheckTx or DeliverTx fail, no error will be returned, but the returned result // will contain a non-OK ABCI code. // @@ -150,20 +150,31 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { - // subscribe to tx being committed in block + // Subscribe to tx being committed in block. ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() - deliverTxResCh := make(chan interface{}) + deliverTxResCh := make(chan interface{}, 1) q := types.EventQueryTxFor(tx) err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh) if err != nil { err = errors.Wrap(err, "failed to subscribe to tx") - logger.Error("Error on broadcastTxCommit", "err", err) - return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err) + logger.Error("Error on broadcast_tx_commit", "err", err) + return nil, err } - defer eventBus.Unsubscribe(context.Background(), "mempool", q) + defer func() { + // drain deliverTxResCh to make sure we don't block + LOOP: + for { + select { + case <-deliverTxResCh: + default: + break LOOP + } + } + eventBus.Unsubscribe(context.Background(), "mempool", q) + }() - // broadcast the tx and register checktx callback + // Broadcast tx and wait for CheckTx result checkTxResCh := make(chan *abci.Response, 1) err = mempool.CheckTx(tx, func(res *abci.Response) { checkTxResCh <- res @@ -172,40 +183,35 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { logger.Error("Error on broadcastTxCommit", "err", err) return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err) } - checkTxRes := <-checkTxResCh - checkTxR := checkTxRes.GetCheckTx() - if checkTxR.Code != abci.CodeTypeOK { - // CheckTx failed! + checkTxResMsg := <-checkTxResCh + checkTxRes := checkTxResMsg.GetCheckTx() + if checkTxRes.Code != abci.CodeTypeOK { return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxR, + CheckTx: *checkTxRes, DeliverTx: abci.ResponseDeliverTx{}, Hash: tx.Hash(), }, nil } - // Wait for the tx to be included in a block, - // timeout after something reasonable. - // TODO: configurable? - timer := time.NewTimer(60 * 2 * time.Second) + // Wait for the tx to be included in a block or timeout. + var deliverTxTimeout = 10 * time.Second // TODO: configurable? select { - case deliverTxResMsg := <-deliverTxResCh: + case deliverTxResMsg := <-deliverTxResCh: // The tx was included in a block. deliverTxRes := deliverTxResMsg.(types.EventDataTx) - // The tx was included in a block. - deliverTxR := deliverTxRes.Result - logger.Info("DeliverTx passed ", "tx", cmn.HexBytes(tx), "response", deliverTxR) return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxR, - DeliverTx: deliverTxR, + CheckTx: *checkTxRes, + DeliverTx: deliverTxRes.Result, Hash: tx.Hash(), Height: deliverTxRes.Height, }, nil - case <-timer.C: - logger.Error("failed to include tx") + case <-time.After(deliverTxTimeout): + err = errors.New("Timed out waiting for tx to be included in a block") + logger.Error("Error on broadcastTxCommit", "err", err) return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxR, + CheckTx: *checkTxRes, DeliverTx: abci.ResponseDeliverTx{}, Hash: tx.Hash(), - }, fmt.Errorf("Timed out waiting for transaction to be included in a block") + }, err } }